Big Data example of calculation (Hive on HDFS, SparkSQL with scala on local NTFS)

We have a task with description.

Thare is json file (events.json) with events:

...
...
{"_t":1507087562,"_p":"sburke@test.com","_n":"app_loaded","device_type":"desktop"}
{"_t":1505361824,"_p":"pfitza@test.com","_n":"added_to_team","account":"1234"}
{"_t":1505696264,"_p":"keiji@test.com","_n":"registered","channel":"Google_Apps"}
...
...

There are about 500000 lines in file (json objects).

And only this 3 types of objects possible.
_t - is a timestamp
_p - email and we can use it as a unique identifier of user.
_n - event type (app_loaded - application was loaded by user, registered - user has registered)
and last one is additional attribute, device_type, account, channel

Task:
1) Load data from json file by events, only app_loaded  and registered in 2 parquet files. With structure:


"registered": {
  "time":    "timestamp"
  "email":   "string",
  "channel": "string"
}

"app_loaded": {
"time":        "timestamp"
"email":       "string",
"device_type": "string"
}


2) With using 2 parquet files make calculation - percent of users that has uploaded application in period of week after registration.

Solution #1 with Hive and HDFS.

I have Hadoop cluster and installed hive (hive metastore and hiveserver2) on namenode.(search in pervouse articles).

With using HDFS Web UI I have loaded events.json file into  hdfs://10.242.5.88:9000/user/events/events.json
Create hive table with one string field and try parse input json file


drop TABLE default.evt;

CREATE TABLE default.evt( json string );

LOAD DATA INPATH  'hdfs://10.242.5.88:9000/user/events/events.json' INTO TABLE default.evt;

select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p') as p,
       get_json_object(q.json,'$._n') as n,
       get_json_object(q.json,'$.device_type') as device_type,
       get_json_object(q.json,'$.account') as account,
       get_json_object(q.json,'$.channel') as channel
from default.evt q;

ts                  |p                  |n             |device_type |account |channel |
--------------------|-------------------|--------------|------------|--------|--------|
2016-12-02 06:00:47 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-02 06:01:16 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-31 08:25:58 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-31 08:33:13 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2017-01-04 06:19:07 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2017-09-14 07:03:44 |pfitza@test.com    |added_to_team |            |1234    |        |
2017-09-14 07:07:27 |konit@test.com     |added_to_team |            |1234    |        |
2017-09-14 07:12:52 |oechslin@test.com  |added_to_team |            |1234    |        |


Now we create 2 hive tables with parquet storage.


--n = "registered"
CREATE TABLE events_reg(
time     timestamp,                   
email    string,        
channel  string
)
STORED AS PARQUET
LOCATION 'hdfs://10.242.5.88:9000/user/events_req/prq';

--n = "app_loaded"
CREATE TABLE events_load(
time         timestamp,                   
email        string,        
device_type  string
)
STORED AS PARQUET
LOCATION 'hdfs://10.242.5.88:9000/user/events_load/prq';

Parse json file with Hive and load data into parquet tables.


insert into events_reg
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p')                                as email,
       get_json_object(q.json,'$.channel')                           as channel
from default.evt q
where get_json_object(q.json,'$._n')='registered'

insert into events_load
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p')                                as email,
       get_json_object(q.json,'$.device_type')                       as device_type
from default.evt q
where get_json_object(q.json,'$._n')='app_loaded'

field type validation:
desc events_reg

col_name |data_type |
---------|----------|
time     |timestamp |
email    |string    |
channel  |string    |

And couple of final queries.


-- query 1  res: 50.6   26-28 sec.
select round(count(distinct r.email)*100/q.cnt_uniqe_regs,1) as prcnt
  from events_reg  r,
       events_load l,
       (select count(distinct email) as cnt_uniqe_regs from events_reg) q
 where r.email=l.email and 
       r.time < l.time and 
       datediff(l.time,r.time) <= 7
group by q.cnt_uniqe_regs


-- query 2  res: 50.6  26-28 sec.
select round(appl.cnt_reg_aweek_load*100/regs.cnt_uniqe_regs,1) as prcnt
  from
(
select count(distinct r.email) as cnt_reg_aweek_load
  from events_reg  r,
       events_load l
 where r.email=l.email and 
       r.time < l.time and 
       datediff(l.time,r.time) <= 7
) appl,
(select count(distinct email) as cnt_uniqe_regs from events_reg) regs

Also it's possible to make some indexies, but on this small data it's not neccesary.


DROP INDEX IF EXISTS idx_events_reg ON events_reg; 
DROP INDEX IF EXISTS idx_events_load ON events_load; 

CREATE INDEX idx_events_reg ON TABLE events_reg(email) 
  AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
  WITH DEFERRED REBUILD; 
  
ALTER INDEX idx_events_reg ON events_reg REBUILD;

CREATE INDEX idx_events_load ON TABLE events_load(email) 
  AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
  WITH DEFERRED REBUILD; 
  
ALTER INDEX idx_events_load ON events_load REBUILD;

Solution #2 with Spark shell (Scala), SparkSQL and local NTFS.

Start spark-shell (Spark installed on Windows 7)


C:\>C:\spark-2.3.0-bin-hadoop2.7\bin\spark-shell
2018-04-24 13:34:02 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://gdev-pk:4040
Spark context available as 'sc' (master = local[*], app id = local-1524558852873).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


Read json and validate schema


scala> val js = spark.read.json("C:\\spark_data\\events.json");
2018-04-24 13:34:58 WARN  ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2018-04-24 13:34:58 WARN  ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
2018-04-24 13:34:59 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
js: org.apache.spark.sql.DataFrame = [_n: string, _p: string ... 4 more fields]

scala> js.printSchema
root
 |-- _n: string (nullable = true)
 |-- _p: string (nullable = true)
 |-- _t: long (nullable = true)
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- device_type: string (nullable = true)


scala>


Create TempView from DataFrame, extract filed with filtering event and change string to timestamp.


val jsc = js.cache()
jsc.createOrReplaceTempView("jsc")
val reg = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as bigint)),'GMT+0') as time,_p as email,channel FROM jsc j where j._n like 'reg%' ")
reg.printSchema
scala> reg.printSchema root |-- time: timestamp (nullable = true) |-- email: string (nullable = true) |-- channel: string (nullable = true)

Write into parquet file


reg.write.parquet("C:\\spark_data\\events\\registered")

val apl = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as 
bigint)),'GMT+0') as time,_p as email,device_type FROM jsc j where j._n = 'app_loaded' ")

apl.write.parquet("C:\\spark_data\\events\\app_loaded")


Next we read this parquet tables and make final query.

spark.conf.set("spark.sql.crossJoin.enabled", true)

reg_df.createOrReplaceTempView("r")
apl_df.createOrReplaceTempView("a")

val res = spark.sql("""SELECT ds.CNT_REG_WEEK_ALOAD, reg_tot.CNT_REG_TOTAL, 
CASE reg_tot.CNT_REG_TOTAL WHEN 0 THEN 100
ELSE round((ds.CNT_REG_WEEK_ALOAD * 100)/reg_tot.CNT_REG_TOTAL,1) END as PRCNT
FROM (
SELECT count(distinct r.email) as CNT_REG_WEEK_ALOAD
FROM r
WHERE EXISTS(
SELECT 1
FROM a
WHERE a.email = r.email
and a.time > r.time and datediff(a.time,r.time) <= 7
)) ds,
(SELECT count(distinct r.email) as CNT_REG_TOTAL from r) reg_tot """)

res.show()

scala> res.show()
+------------------+-------------+-----+
|CNT_REG_WEEK_ALOAD|CNT_REG_TOTAL|PRCNT|
+------------------+-------------+-----+
|                42|           83| 50.6|
+------------------+-------------+-----+

Комментарии

Популярные сообщения из этого блога

Spark operations with sparl.sql (Dataset, Dataframe)

Load data from Cassandra to HDFS parquet files and select with Hive

Loading data into Spark from Oracle RDBMS, CSV