Big Data example of calculation (Hive on HDFS, SparkSQL with scala on local NTFS)

We have a task with description.

Thare is json file (events.json) with events:

...
...
{"_t":1507087562,"_p":"sburke@test.com","_n":"app_loaded","device_type":"desktop"}
{"_t":1505361824,"_p":"pfitza@test.com","_n":"added_to_team","account":"1234"}
{"_t":1505696264,"_p":"keiji@test.com","_n":"registered","channel":"Google_Apps"}
...
...

There are about 500000 lines in file (json objects).

And only this 3 types of objects possible.
_t - is a timestamp
_p - email and we can use it as a unique identifier of user.
_n - event type (app_loaded - application was loaded by user, registered - user has registered)
and last one is additional attribute, device_type, account, channel

Task:
1) Load data from json file by events, only app_loaded  and registered in 2 parquet files. With structure:


"registered": {
  "time":    "timestamp"
  "email":   "string",
  "channel": "string"
}

"app_loaded": {
"time":        "timestamp"
"email":       "string",
"device_type": "string"
}


2) With using 2 parquet files make calculation - percent of users that has uploaded application in period of week after registration.

Solution #1 with Hive and HDFS.

I have Hadoop cluster and installed hive (hive metastore and hiveserver2) on namenode.(search in pervouse articles).

With using HDFS Web UI I have loaded events.json file into  hdfs://10.242.5.88:9000/user/events/events.json
Create hive table with one string field and try parse input json file


drop TABLE default.evt;

CREATE TABLE default.evt( json string );

LOAD DATA INPATH  'hdfs://10.242.5.88:9000/user/events/events.json' INTO TABLE default.evt;

select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p') as p,
       get_json_object(q.json,'$._n') as n,
       get_json_object(q.json,'$.device_type') as device_type,
       get_json_object(q.json,'$.account') as account,
       get_json_object(q.json,'$.channel') as channel
from default.evt q;

ts                  |p                  |n             |device_type |account |channel |
--------------------|-------------------|--------------|------------|--------|--------|
2016-12-02 06:00:47 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-02 06:01:16 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-31 08:25:58 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2016-12-31 08:33:13 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2017-01-04 06:19:07 |rattenbt@test.com  |app_loaded    |desktop     |        |        |
2017-09-14 07:03:44 |pfitza@test.com    |added_to_team |            |1234    |        |
2017-09-14 07:07:27 |konit@test.com     |added_to_team |            |1234    |        |
2017-09-14 07:12:52 |oechslin@test.com  |added_to_team |            |1234    |        |


Now we create 2 hive tables with parquet storage.


--n = "registered"
CREATE TABLE events_reg(
time     timestamp,                   
email    string,        
channel  string
)
STORED AS PARQUET
LOCATION 'hdfs://10.242.5.88:9000/user/events_req/prq';

--n = "app_loaded"
CREATE TABLE events_load(
time         timestamp,                   
email        string,        
device_type  string
)
STORED AS PARQUET
LOCATION 'hdfs://10.242.5.88:9000/user/events_load/prq';

Parse json file with Hive and load data into parquet tables.


insert into events_reg
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p')                                as email,
       get_json_object(q.json,'$.channel')                           as channel
from default.evt q
where get_json_object(q.json,'$._n')='registered'

insert into events_load
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
       get_json_object(q.json,'$._p')                                as email,
       get_json_object(q.json,'$.device_type')                       as device_type
from default.evt q
where get_json_object(q.json,'$._n')='app_loaded'

field type validation:
desc events_reg

col_name |data_type |
---------|----------|
time     |timestamp |
email    |string    |
channel  |string    |

And couple of final queries.


-- query 1  res: 50.6   26-28 sec.
select round(count(distinct r.email)*100/q.cnt_uniqe_regs,1) as prcnt
  from events_reg  r,
       events_load l,
       (select count(distinct email) as cnt_uniqe_regs from events_reg) q
 where r.email=l.email and 
       r.time < l.time and 
       datediff(l.time,r.time) <= 7
group by q.cnt_uniqe_regs


-- query 2  res: 50.6  26-28 sec.
select round(appl.cnt_reg_aweek_load*100/regs.cnt_uniqe_regs,1) as prcnt
  from
(
select count(distinct r.email) as cnt_reg_aweek_load
  from events_reg  r,
       events_load l
 where r.email=l.email and 
       r.time < l.time and 
       datediff(l.time,r.time) <= 7
) appl,
(select count(distinct email) as cnt_uniqe_regs from events_reg) regs

Also it's possible to make some indexies, but on this small data it's not neccesary.


DROP INDEX IF EXISTS idx_events_reg ON events_reg; 
DROP INDEX IF EXISTS idx_events_load ON events_load; 

CREATE INDEX idx_events_reg ON TABLE events_reg(email) 
  AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
  WITH DEFERRED REBUILD; 
  
ALTER INDEX idx_events_reg ON events_reg REBUILD;

CREATE INDEX idx_events_load ON TABLE events_load(email) 
  AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
  WITH DEFERRED REBUILD; 
  
ALTER INDEX idx_events_load ON events_load REBUILD;

Solution #2 with Spark shell (Scala), SparkSQL and local NTFS.

Start spark-shell (Spark installed on Windows 7)


C:\>C:\spark-2.3.0-bin-hadoop2.7\bin\spark-shell
2018-04-24 13:34:02 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://gdev-pk:4040
Spark context available as 'sc' (master = local[*], app id = local-1524558852873).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


Read json and validate schema


scala> val js = spark.read.json("C:\\spark_data\\events.json");
2018-04-24 13:34:58 WARN  ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2018-04-24 13:34:58 WARN  ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
2018-04-24 13:34:59 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
js: org.apache.spark.sql.DataFrame = [_n: string, _p: string ... 4 more fields]

scala> js.printSchema
root
 |-- _n: string (nullable = true)
 |-- _p: string (nullable = true)
 |-- _t: long (nullable = true)
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- device_type: string (nullable = true)


scala>


Create TempView from DataFrame, extract filed with filtering event and change string to timestamp.


val jsc = js.cache()
jsc.createOrReplaceTempView("jsc")
val reg = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as bigint)),'GMT+0') as time,_p as email,channel FROM jsc j where j._n like 'reg%' ")
reg.printSchema
scala> reg.printSchema root |-- time: timestamp (nullable = true) |-- email: string (nullable = true) |-- channel: string (nullable = true)

Write into parquet file


reg.write.parquet("C:\\spark_data\\events\\registered")

val apl = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as 
bigint)),'GMT+0') as time,_p as email,device_type FROM jsc j where j._n = 'app_loaded' ")

apl.write.parquet("C:\\spark_data\\events\\app_loaded")


Next we read this parquet tables and make final query.

spark.conf.set("spark.sql.crossJoin.enabled", true)

reg_df.createOrReplaceTempView("r")
apl_df.createOrReplaceTempView("a")

val res = spark.sql("""SELECT ds.CNT_REG_WEEK_ALOAD, reg_tot.CNT_REG_TOTAL, 
CASE reg_tot.CNT_REG_TOTAL WHEN 0 THEN 100
ELSE round((ds.CNT_REG_WEEK_ALOAD * 100)/reg_tot.CNT_REG_TOTAL,1) END as PRCNT
FROM (
SELECT count(distinct r.email) as CNT_REG_WEEK_ALOAD
FROM r
WHERE EXISTS(
SELECT 1
FROM a
WHERE a.email = r.email
and a.time > r.time and datediff(a.time,r.time) <= 7
)) ds,
(SELECT count(distinct r.email) as CNT_REG_TOTAL from r) reg_tot """)

res.show()

scala> res.show()
+------------------+-------------+-----+
|CNT_REG_WEEK_ALOAD|CNT_REG_TOTAL|PRCNT|
+------------------+-------------+-----+
|                42|           83| 50.6|
+------------------+-------------+-----+

Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7