Load data from Cassandra to HDFS parquet files and select with Hive
HDFS directory preparation on Hadoop cluster. From LXC with name hdpnn execute next: Hadoop and Cassandra cluster installation you can find in this article.
My idea is writing an application with Scala which will be run on Spark cluster for load data from Cassandra into HDFS parquet files, for future analyzes with Hive.
First I do it manually, step by step.
1) HDFS directory preparation
[hadoop@hdpnn ~]$ hadoop fs -mkdir /user/tickers
[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23
[hadoop@hdpnn ~]$ hdfs dfs -chown root:root /user/tickers
[hadoop@hdpnn ~]$ hadoop fs -ls /user
drwxr-xr-x - root root 0 2019-01-10 06:27 /user/tickers
2) Load data with Spark using spark-shell (on smn)
spark-shell
--driver-memory 1G
--executor-memory 1G
--driver-cores 1
--executor-cores 1
--jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar"
--conf spark.cassandra.connection.host=192.168.122.192
--conf "spark.sql.parquet.writeLegacyFormat=true"
>>>
2019-01-10 05:23:58 WARN Utils:66 - Your hostname, smn resolves to a loopback
address: 127.0.0.1; using 192.168.122.219 instead (on interface eth0)
2019-01-10 05:23:58 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to
bind to another address
2019-01-10 05:23:59 WARN NativeCodeLoader:62 - Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://smn:4040
Spark context available as 'sc'
(master = spark://192.168.122.219:7077, app id = app-20190110052407-0005).
Spark context available as 'sc'
(master = local[*], app id = local-1547098864785).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
def getTicksFromCass(TickerID :Int) = {
import org.apache.spark.sql.functions._
spark.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
.load()
.where(col("ticker_id") === TickerID
)
.select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
.sort(asc("db_tsunx"))
}
getTicksFromCass: (TickerID: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
val dsTicks = getTicksFromCass(23)
dsTicks: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
[ticker_id: int, ddate: date ... 3 more fields]
scala> dsTicks.printSchema()
root
|-- ticker_id: integer (nullable = true)
|-- ddate: date (nullable = true)
|-- db_tsunx: long (nullable = true)
|-- ask: double (nullable = true)
|-- bid: double (nullable = true)
scala> dsTicks.count()
res2: Long = 529357
dsTicks.write.mode(SaveMode.Overwrite)
.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
#read back
val df = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
df.show(10)
+---------+----------+-------------+------+------+
|ticker_id| ddate| db_tsunx| ask| bid|
+---------+----------+-------------+------+------+
| 23|2018-08-30|1535629422415|68.224| 68.05|
| 23|2018-08-30|1535629422491|68.223| 68.05|
| 23|2018-08-30|1535629422503|68.219| 68.05|
| 23|2018-08-30|1535629422756| 68.22|68.047|
| 23|2018-08-30|1535629423007|68.215|68.039|
| 23|2018-08-30|1535629423189|68.213|68.038|
| 23|2018-08-30|1535629423268|68.213| 68.04|
| 23|2018-08-30|1535629424017|68.215|68.039|
| 23|2018-08-30|1535629424082|68.215| 68.04|
| 23|2018-08-30|1535629427767|68.208|68.033|
+---------+----------+-------------+------+------+
3) An external table in Hive with query example.
Driver information, screen from DBeaver.
Connection in DBeaver.
Parquet structure in HDFS (browser)
It looks like everything is working fine, but the problem exists. When I write parquet with custom partitioning like this:
dsTicks.write.mode(SaveMode.Overwrite)
.partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
Later I can read it back with Spark, I can see an expected partition structure in HDFS, but Hive can't read it. There is no error, but HiveQL select return no rows.
I want to try first create parquet table with Hive and next load data into it with Spark.
Clear HDFS.
[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23
2019-01-11 05:44:28,187 WARN util.NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java classes
where applicable
Deleted /user/tickers/ticker_23
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers
2019-01-11 05:44:41,683 WARN util.NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java classes
where applicable
drop table ticker_23;
create external table ticker_23(
db_tsunx BIGINT,
ask double,
bid double
)
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS parquetfile
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet';
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet
2019-01-11 05:46:34,066 WARN util.NativeCodeLoader:
Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable
hdfs dfs -chown root:root /user/tickers/ticker_23/ticks.parquet
dsTicks.write.mode("append").partitionBy("ticker_id","ddate").parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet
2019-01-11 05:53:57,496 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 3 root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/_SUCCESS
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet/ticker_id=23
2019-01-11 05:54:15,738 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 13 items
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-29
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-30
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-31
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-03
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-06
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-07
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-10
drwxr-xr-x - root root 0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-11
Uhh, query "select * from ticker_23;" return no rows and no errors. But in Spark shell I can read it:
scala> val df = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
df: org.apache.spark.sql.DataFrame = [db_tsunx: bigint, ask: double ... 3 more fields]
scala> df.show(5)
+-------------+------+------+---------+----------+
| db_tsunx| ask| bid|ticker_id| ddate|
+-------------+------+------+---------+----------+
|1535629422415|68.224| 68.05| 23|2018-08-30|
|1535629422491|68.223| 68.05| 23|2018-08-30|
|1535629422503|68.219| 68.05| 23|2018-08-30|
|1535629422756| 68.22|68.047| 23|2018-08-30|
|1535629423007|68.215|68.039| 23|2018-08-30|
+-------------+------+------+---------+----------+
only showing top 5 rows
I found that I can read one partition this way
drop table ticker_23;
create external table ticker_23(
db_tsunx BIGINT,
ask double,
bid double
)
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-29';
select * from ticker_23;
Wow, thanks to this question on stackoverflow
drop table ticker_23;
create external table ticker_23(
db_tsunx BIGINT,
ask double,
bid double
)
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet';
select * from ticker_23;
>> no rows.
MSCK REPAIR TABLE ticker_23;
select * from ticker_23;
>> return rows.
select ddate,
sum(1) as cnt
from ticker_23
group by ddate
order by 1;
Finally, change structure, names and load more data.
create external table ticker(
db_tsunx BIGINT,
ask double,
bid double
)
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticks.parquet/';
select * from ticker;
---------------------------
[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23
Deleted /user/tickers/ticker_23
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers
dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 2 items
-rw-r--r-- 3 root root 0 2019-01-11 06:33 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x - root root 0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23
val dsTicks = getTicksFromCass(1)
dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 3 items
-rw-r--r-- 3 root root 0 2019-01-11 06:42 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x - root root 0 2019-01-11 06:42 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23
val dsTicks = getTicksFromCass(10)
dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 4 items
-rw-r--r-- 3 root root 0 2019-01-11 06:47 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x - root root 0 2019-01-11 06:42 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-11 06:47 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23
Query data with Hive from the partitioned table:
MSCK REPAIR TABLE ticker;
select * from ticker;
select ticker_id,
ddate,
sum(1) as cnt
from ticker
group by ticker_id,ddate
order by 2,1;
What is MSCK!? (metastore check)
Metastore check with repair table option, which will add metadata about partitions to the Hive metastore for partitions for which such metadata doesn't already exist. In other words, it will add any partitions that exist on HDFS but not in metastore to the metastore.
# ticker_id ddate cnt
1 1 2018-08-29 80619
2 10 2018-08-29 94462
3 23 2018-08-29 40778
4 1 2018-08-30 110859
5 10 2018-08-30 130757
6 23 2018-08-30 48072
7 1 2018-08-31 131783
8 10 2018-08-31 161140
9 23 2018-08-31 49165
10 1 2018-09-03 31861
Recover Partitions (MSCK REPAIR TABLE)
Hive stores a list of partitions for each table in its metastore. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the metastore (and hence Hive) will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.
However, users can run a metastore check command with the repair table option.
Continue:
Task - write scala Spark code (for running in spark-shell) for loading all data from Cassandra into parquet HDFS (ticker by ticker, data by date)
And sync tickers dictionary.
HDFS preparation:
[hadoop@hdpnn ~]$ hadoop fs -mkdir /user/tickers
[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticks.parquet
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Scala code:
spark-shell --driver-memory 1G --executor-memory 1G --driver-cores 1 --executor-cores 1
--jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar"
--conf spark.cassandra.connection.host=193.124.112.90
--conf "spark.sql.parquet.writeLegacyFormat=true"
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
case class T_DAT_META(ticker_id: Int, ddate :java.sql.Date)
def getTickers = {
import org.apache.spark.sql.functions._
spark.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "tickers", "keyspace" -> "mts_meta"))
.load()
.sort(asc("ticker_id"))
}
def getTicksFromCassByTickerDate(TickerID :Int,Ddate :java.sql.Date) = {
import org.apache.spark.sql.functions._
spark.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
.option("fetchSize", "10000")
.load()
.where(col("ticker_id") === TickerID)
.where(col("ddate") === Ddate)
.select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
.sort(asc("db_tsunx"))
}
def getTicksFromCassByTickerDateUnx(TickerID :Int,Ddate :java.sql.Date, pdb_tsunx :Long) = {
import org.apache.spark.sql.functions._
spark.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
.option("fetchSize", "10000")
.load()
.where(col("ticker_id") === TickerID)
.where(col("ddate") === Ddate)
.where(col("db_tsunx") > pdb_tsunx)
.select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
.sort(asc("db_tsunx"))
}
def getMetadata = {
import org.apache.spark.sql.functions._
spark.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
.option("fetchSize", "100")
.load()
.select(col("ticker_id"),col("ddate"))
.distinct
.sort(asc("ticker_id"))
.collect
.toSeq
}
val dsMetadata = getMetadata
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import scala.util.{Try, Success, Failure}
var fdate: Option[java.sql.Date] = Try(new SimpleDateFormat("yyyy-MM-dd").parse("2019-01-15"))
.map(d => new java.sql.Date(d.getTime())).toOption
And now we can load data by all tickers and data, in the loop by tickers and dates.
for (thisTickerDate <- dsMetadata.map(row => T_DAT_META(row.getInt(0),row.getDate(1)))) {
println("--------------------------------------------------------------------------------------")
println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
val dsTicks = getTicksFromCassByTickerDate(thisTickerDate.ticker_id,thisTickerDate.ddate)
dsTicks.cache()
val cntRows = dsTicks.count()
dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
println("INSERTED = "+ cntRows +" ROWS.")
println("--------------------------------------------------------------------------------------")
}
Or with custom filters:
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import scala.util.{Try, Success, Failure}
var fdate: Option[java.sql.Date] = Try(new SimpleDateFormat("yyyy-MM-dd").parse("2019-01-15")).map(d => new java.sql.Date(d.getTime())).toOption
for (thisTickerDate <- dsMetadata.map(row => T_DAT_META(row.getInt(0),row.getDate(1)))
if (thisTickerDate.ticker_id==1 && thisTickerDate.ddate==fdate.get)) {
println("--------------------------------------------------------------------------------------")
println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
val dsTicks = getTicksFromCassByTickerDate(thisTickerDate.ticker_id,thisTickerDate.ddate)
dsTicks.cache()
val cntRows = dsTicks.count()
dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
println("INSERTED = "+ cntRows +" ROWS.")
println("--------------------------------------------------------------------------------------")
}
Or you can take max(db_tsunx) from parquet.
select max(db_tsunx)
from ticker
where ticker_id=1
for (thisTickerDate <- dsmetadata.map="" row=""> T_DAT_META(row.getInt(0),row.getDate(1)))
if (thisTickerDate.ticker_id==1 && thisTickerDate.ddate==fdate.get)) {
println("--------------------------------------------------------------------------------------")
println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
val dsTicks = getTicksFromCassByTickerDateUnx(thisTickerDate.ticker_id,thisTickerDate.ddate,1547556210822L)
dsTicks.cache()
val cntRows = dsTicks.count()
dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
println("INSERTED = "+ cntRows +" ROWS.")
println("--------------------------------------------------------------------------------------")
}
->
Additional, information about data.
From Cassandra:
[root@sc1 ~]# nodetool tablestats --human-readable mts_src.ticks
Total number of tables: 55
----------------
Keyspace : mts_src
Read Count: 38872
Read Latency: 6.0569655021609385 ms
Write Count: 0
Write Latency: NaN ms
Pending Flushes: 0
Table: ticks
SSTable count: 3
Space used (live): 390.09 MiB
Space used (total): 390.09 MiB
Space used by snapshots (total): 0 bytes
Off heap memory used (total): 142.56 KiB
SSTable Compression Ratio: 0.3427504897663621
Number of partitions (estimate): 386
Memtable cell count: 0
Memtable data size: 0 bytes
Memtable off heap memory used: 0 bytes
Memtable switch count: 0
Local read count: 38872
Local read latency: NaN ms
Local write count: 0
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 0.0
Bloom filter false positives: 403
Bloom filter false ratio: 0.01061
Bloom filter space used: 536 bytes
Bloom filter off heap memory used: 512 bytes
Index summary off heap memory used: 130 bytes
Compression metadata off heap memory used: 141.93 KiB
Compacted partition minimum bytes: 29522
Compacted partition maximum bytes: 12108970
Compacted partition mean bytes: 3275377
Average live cells per slice (last five minutes): NaN
Maximum live cells per slice (last five minutes): 0
Average tombstones per slice (last five minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0 bytes
From HDFS:
[hadoop@hdpnn ~]$ hadoop fs -du -s -h /user/tickers/ticks.parquet
82.2 M 246.5 M /user/tickers/ticks.parquet
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x - root root 0 2019-01-16 04:51 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-15 07:38 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-15 07:40 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x - root root 0 2019-01-15 07:43 /user/tickers/ticks.parquet/ticker_id=12
drwxr-xr-x - root root 0 2019-01-15 07:45 /user/tickers/ticks.parquet/ticker_id=13
drwxr-xr-x - root root 0 2019-01-15 07:46 /user/tickers/ticks.parquet/ticker_id=14
drwxr-xr-x - root root 0 2019-01-15 07:48 /user/tickers/ticks.parquet/ticker_id=15
drwxr-xr-x - root root 0 2019-01-15 07:50 /user/tickers/ticks.parquet/ticker_id=16
...
...
Now you can query this parquet with Hive and Dbeaver. Queries examples.
MSCK REPAIR TABLE ticker;
select ddate,sum(1),min(db_tsunx),max(db_tsunx)
from ticker
where ticker_id=1
group by ddate;
select max(db_tsunx)
from ticker
where ticker_id=1
select sum(1) as cnt
from ticker
where ticker_id=1;
select ddate,sum(1) as cnt
from ticker
where ticker_id=11
group by ddate;
select sum(1),min(db_tsunx),max(db_tsunx)
from ticker
where ddate='2019-01-09' and ticker_id=1;
Application for this article (Scala) (Download from github cass_to_hdfs_parquet) Commit #1
My next idea is to configure Hive to use Spark as execute engine, now is MR.
Very nice article,Thank you for sharing it.
ОтветитьУдалитьKeep updating...
Big Data Hadoop Course