Big Data example of calculation (Hive on HDFS, SparkSQL with scala on local NTFS)
We have a task with description.
Thare is json file (events.json) with events:
There are about 500000 lines in file (json objects).
And only this 3 types of objects possible.
_t - is a timestamp
_p - email and we can use it as a unique identifier of user.
_n - event type (app_loaded - application was loaded by user, registered - user has registered)
and last one is additional attribute, device_type, account, channel
1) Load data from json file by events, only app_loaded and registered in 2 parquet files. With structure:
"registered": {
"time": "timestamp"
"email": "string",
"channel": "string"
"app_loaded": {
"time": "timestamp"
"email": "string",
"device_type": "string"
2) With using 2 parquet files make calculation - percent of users that has uploaded application in period of week after registration.
Solution #1 with Hive and HDFS.
I have Hadoop cluster and installed hive (hive metastore and hiveserver2) on namenode.(search in pervouse articles).
With using HDFS Web UI I have loaded events.json file into hdfs://
Create hive table with one string field and try parse input json file
drop TABLE default.evt;
CREATE TABLE default.evt( json string );
LOAD DATA INPATH 'hdfs://' INTO TABLE default.evt;
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
get_json_object(q.json,'$._p') as p,
get_json_object(q.json,'$._n') as n,
get_json_object(q.json,'$.device_type') as device_type,
get_json_object(q.json,'$.account') as account,
get_json_object(q.json,'$.channel') as channel
from default.evt q;
ts |p |n |device_type |account |channel |
2016-12-02 06:00:47 | |app_loaded |desktop | | |
2016-12-02 06:01:16 | |app_loaded |desktop | | |
2016-12-31 08:25:58 | |app_loaded |desktop | | |
2016-12-31 08:33:13 | |app_loaded |desktop | | |
2017-01-04 06:19:07 | |app_loaded |desktop | | |
2017-09-14 07:03:44 | |added_to_team | |1234 | |
2017-09-14 07:07:27 | |added_to_team | |1234 | |
2017-09-14 07:12:52 | |added_to_team | |1234 | |
Now we create 2 hive tables with parquet storage.
--n = "registered"
CREATE TABLE events_reg(
time timestamp,
email string,
channel string
LOCATION 'hdfs://';
--n = "app_loaded"
CREATE TABLE events_load(
time timestamp,
email string,
device_type string
LOCATION 'hdfs://';
Parse json file with Hive and load data into parquet tables.
insert into events_reg
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
get_json_object(q.json,'$._p') as email,
get_json_object(q.json,'$.channel') as channel
from default.evt q
where get_json_object(q.json,'$._n')='registered'
insert into events_load
select from_unixtime(cast(get_json_object(q.json,'$._t') as bigint)) as ts,
get_json_object(q.json,'$._p') as email,
get_json_object(q.json,'$.device_type') as device_type
from default.evt q
where get_json_object(q.json,'$._n')='app_loaded'
field type validation:
desc events_reg
col_name |data_type |
time |timestamp |
email |string |
channel |string |
And couple of final queries.
-- query 1 res: 50.6 26-28 sec.
select round(count(distinct*100/q.cnt_uniqe_regs,1) as prcnt
from events_reg r,
events_load l,
(select count(distinct email) as cnt_uniqe_regs from events_reg) q
where and
r.time < l.time and
datediff(l.time,r.time) <= 7
group by q.cnt_uniqe_regs
-- query 2 res: 50.6 26-28 sec.
select round(appl.cnt_reg_aweek_load*100/regs.cnt_uniqe_regs,1) as prcnt
select count(distinct as cnt_reg_aweek_load
from events_reg r,
events_load l
where and
r.time < l.time and
datediff(l.time,r.time) <= 7
) appl,
(select count(distinct email) as cnt_uniqe_regs from events_reg) regs
Also it's possible to make some indexies, but on this small data it's not neccesary.
DROP INDEX IF EXISTS idx_events_reg ON events_reg;
DROP INDEX IF EXISTS idx_events_load ON events_load;
CREATE INDEX idx_events_reg ON TABLE events_reg(email)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
ALTER INDEX idx_events_reg ON events_reg REBUILD;
CREATE INDEX idx_events_load ON TABLE events_load(email)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
ALTER INDEX idx_events_load ON events_load REBUILD;
Solution #2 with Spark shell (Scala), SparkSQL and local NTFS.
2018-04-24 13:34:02 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://gdev-pk:4040
Spark context available as 'sc' (master = local[*], app id = local-1524558852873).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.
Read json and validate schema
scala> val js ="C:\\spark_data\\events.json");
2018-04-24 13:34:58 WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2018-04-24 13:34:58 WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
2018-04-24 13:34:59 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
js: org.apache.spark.sql.DataFrame = [_n: string, _p: string ... 4 more fields]
scala> js.printSchema
|-- _n: string (nullable = true)
|-- _p: string (nullable = true)
|-- _t: long (nullable = true)
|-- account: string (nullable = true)
|-- channel: string (nullable = true)
|-- device_type: string (nullable = true)
Create TempView from DataFrame, extract filed with filtering event and change string to timestamp.
val jsc = js.cache()
val reg = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as
bigint)),'GMT+0') as time,_p as email,channel FROM jsc j where j._n like
'reg%' ")
scala> reg.printSchema
|-- time: timestamp (nullable = true)
|-- email: string (nullable = true)
|-- channel: string (nullable = true)
Write into parquet file
val apl = spark.sql("SELECT from_utc_timestamp(from_unixtime(cast(_t as
bigint)),'GMT+0') as time,_p as email,device_type FROM jsc j where j._n = 'app_loaded' ")
Next we read this parquet tables and make final query.
spark.conf.set("spark.sql.crossJoin.enabled", true) reg_df.createOrReplaceTempView("r") apl_df.createOrReplaceTempView("a") val res = spark.sql("""SELECT ds.CNT_REG_WEEK_ALOAD, reg_tot.CNT_REG_TOTAL, CASE reg_tot.CNT_REG_TOTAL WHEN 0 THEN 100 ELSE round((ds.CNT_REG_WEEK_ALOAD * 100)/reg_tot.CNT_REG_TOTAL,1) END as PRCNT FROM ( SELECT count(distinct as CNT_REG_WEEK_ALOAD FROM r WHERE EXISTS( SELECT 1 FROM a WHERE = and a.time > r.time and datediff(a.time,r.time) <= 7 )) ds, (SELECT count(distinct as CNT_REG_TOTAL from r) reg_tot """) scala> +------------------+-------------+-----+ |CNT_REG_WEEK_ALOAD|CNT_REG_TOTAL|PRCNT| +------------------+-------------+-----+ | 42| 83| 50.6| +------------------+-------------+-----+
