Load data from Cassandra to HDFS parquet files and select with Hive


HDFS directory preparation on Hadoop cluster. From LXC with name hdpnn execute next: Hadoop and Cassandra cluster installation you can find in this article.

My idea is writing an application with Scala which will be run on Spark cluster for load data from Cassandra into HDFS parquet files, for future analyzes with Hive.

First I do it manually, step by step.

1) HDFS directory preparation


[hadoop@hdpnn ~]$ hadoop fs -mkdir /user/tickers

[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23

[hadoop@hdpnn ~]$ hdfs dfs -chown root:root /user/tickers

[hadoop@hdpnn ~]$ hadoop fs -ls /user
drwxr-xr-x   - root root  0 2019-01-10 06:27 /user/tickers


2) Load data with Spark using spark-shell (on smn)


spark-shell 
--driver-memory 1G 
--executor-memory 1G 
--driver-cores 1 
--executor-cores 1 
--jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar" 
--conf spark.cassandra.connection.host=192.168.122.192 
--conf "spark.sql.parquet.writeLegacyFormat=true"

>>>
2019-01-10 05:23:58 WARN  Utils:66 - Your hostname, smn resolves to a loopback 
address: 127.0.0.1; using 192.168.122.219 instead (on interface eth0)
2019-01-10 05:23:58 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to 
bind to another address
2019-01-10 05:23:59 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). 
For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://smn:4040

Spark context available as 'sc' 
(master = spark://192.168.122.219:7077, app id = app-20190110052407-0005).
Spark context available as 'sc' 
(master = local[*], app id = local-1547098864785).

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._

 def getTicksFromCass(TickerID :Int) = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
              .load()
              .where(col("ticker_id") === TickerID
              )
              .select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
              .sort(asc("db_tsunx"))
          }

getTicksFromCass: (TickerID: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val dsTicks = getTicksFromCass(23)

dsTicks: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
[ticker_id: int, ddate: date ... 3 more fields]

scala> dsTicks.printSchema()
root
 |-- ticker_id: integer (nullable = true)
 |-- ddate: date (nullable = true)
 |-- db_tsunx: long (nullable = true)
 |-- ask: double (nullable = true)
 |-- bid: double (nullable = true)

scala> dsTicks.count()
res2: Long = 529357

dsTicks.write.mode(SaveMode.Overwrite)
 .parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")

#read back
val df = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
df.show(10)

+---------+----------+-------------+------+------+
|ticker_id|     ddate|     db_tsunx|   ask|   bid|
+---------+----------+-------------+------+------+
|       23|2018-08-30|1535629422415|68.224| 68.05|
|       23|2018-08-30|1535629422491|68.223| 68.05|
|       23|2018-08-30|1535629422503|68.219| 68.05|
|       23|2018-08-30|1535629422756| 68.22|68.047|
|       23|2018-08-30|1535629423007|68.215|68.039|
|       23|2018-08-30|1535629423189|68.213|68.038|
|       23|2018-08-30|1535629423268|68.213| 68.04|
|       23|2018-08-30|1535629424017|68.215|68.039|
|       23|2018-08-30|1535629424082|68.215| 68.04|
|       23|2018-08-30|1535629427767|68.208|68.033|
+---------+----------+-------------+------+------+


3) An external table in Hive with query example.

Driver information, screen from DBeaver.



Connection in DBeaver.


Select query example


Parquet structure in HDFS (browser)


It looks like everything is working fine, but the problem exists. When I write parquet with custom partitioning like this:


dsTicks.write.mode(SaveMode.Overwrite)
.partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")


Later I can read it back with Spark, I can see an expected partition structure in HDFS, but Hive can't read it. There is no error, but HiveQL select return no rows.

I want to try first create parquet table with Hive and next load data into it with Spark.
Clear HDFS.


[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23
2019-01-11 05:44:28,187 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... using builtin-java classes 
where applicable
Deleted /user/tickers/ticker_23

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers
2019-01-11 05:44:41,683 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... using builtin-java classes 
where applicable




drop table ticker_23;

create external table ticker_23(
 db_tsunx  BIGINT,
 ask       double,
 bid       double
) 
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS parquetfile
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet';




[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet
2019-01-11 05:46:34,066 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable




hdfs dfs -chown root:root /user/tickers/ticker_23/ticks.parquet

dsTicks.write.mode("append").partitionBy("ticker_id","ddate").parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet
2019-01-11 05:53:57,496 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   3 root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/_SUCCESS
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticker_23/ticks.parquet/ticker_id=23
2019-01-11 05:54:15,738 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 13 items
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-29
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-30
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-31
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-03
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-06
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-07
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-10
drwxr-xr-x   - root root          0 2019-01-11 05:53 /user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-09-11



Uhh, query "select * from ticker_23;" return no rows and no errors. But in Spark shell I can read it:


scala> val df = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet")
df: org.apache.spark.sql.DataFrame = [db_tsunx: bigint, ask: double ... 3 more fields]

scala> df.show(5)
+-------------+------+------+---------+----------+
|     db_tsunx|   ask|   bid|ticker_id|     ddate|
+-------------+------+------+---------+----------+
|1535629422415|68.224| 68.05|       23|2018-08-30|
|1535629422491|68.223| 68.05|       23|2018-08-30|
|1535629422503|68.219| 68.05|       23|2018-08-30|
|1535629422756| 68.22|68.047|       23|2018-08-30|
|1535629423007|68.215|68.039|       23|2018-08-30|
+-------------+------+------+---------+----------+
only showing top 5 rows



I found that I can read one partition this way


drop table ticker_23;

create external table ticker_23(
 db_tsunx  BIGINT,
 ask       double,
 bid       double
) 
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet/ticker_id=23/ddate=2018-08-29';

select * from ticker_23;


Wow, thanks to this question on stackoverflow


drop table ticker_23;

create external table ticker_23(
 db_tsunx  BIGINT,
 ask       double,
 bid       double
) 
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticker_23/ticks.parquet';

select * from ticker_23;
>> no rows.

MSCK REPAIR TABLE ticker_23;

select * from ticker_23;
>> return rows.

select ddate,
       sum(1) as cnt 
  from ticker_23 
 group by ddate 
 order by 1;


Finally, change structure, names and load more data.


create external table ticker(
 db_tsunx  BIGINT,
 ask       double,
 bid       double
) 
PARTITIONED BY (ticker_id INT,ddate DATE)
STORED AS PARQUET
LOCATION 'hdfs://hdpnn:9000/user/tickers/ticks.parquet/';

select * from ticker;

---------------------------

[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticker_23
Deleted /user/tickers/ticker_23

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers

dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 2 items
-rw-r--r--   3 root root          0 2019-01-11 06:33 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x   - root root          0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23

val dsTicks = getTicksFromCass(1)
dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 3 items
-rw-r--r--   3 root root          0 2019-01-11 06:42 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x   - root root          0 2019-01-11 06:42 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x   - root root          0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23

val dsTicks = getTicksFromCass(10)
dsTicks.write.mode(SaveMode.Append).partitionBy("ticker_id","ddate")
.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet")

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 4 items
-rw-r--r--   3 root root          0 2019-01-11 06:47 /user/tickers/ticks.parquet/_SUCCESS
drwxr-xr-x   - root root          0 2019-01-11 06:42 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x   - root root          0 2019-01-11 06:47 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x   - root root          0 2019-01-11 06:33 /user/tickers/ticks.parquet/ticker_id=23


Query data with Hive from the partitioned table:


MSCK REPAIR TABLE ticker;

select * from ticker;

select ticker_id,
       ddate,
       sum(1) as cnt 
  from ticker
 group by ticker_id,ddate 
 order by 2,1;


What is MSCK!? (metastore check)

Metastore check with repair table option, which will add metadata about partitions to the Hive metastore for partitions for which such metadata doesn't already exist. In other words, it will add any partitions that exist on HDFS but not in metastore to the metastore.


# ticker_id ddate         cnt
1 1         2018-08-29 80619
2 10         2018-08-29 94462
3 23         2018-08-29 40778
4 1         2018-08-30 110859
5 10         2018-08-30 130757
6 23         2018-08-30 48072
7 1         2018-08-31 131783
8 10         2018-08-31 161140
9 23         2018-08-31 49165
10 1         2018-09-03 31861


Recover Partitions (MSCK REPAIR TABLE)

Hive stores a list of partitions for each table in its metastore. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the metastore (and hence Hive) will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.

However, users can run a metastore check command with the repair table option.

Continue:
Task - write scala Spark code (for running in spark-shell) for loading all data from Cassandra into parquet HDFS (ticker by ticker, data by date)

And sync tickers dictionary.

HDFS preparation:


[hadoop@hdpnn ~]$ hadoop fs -mkdir /user/tickers

[hadoop@hdpnn ~]$ hadoop fs -rm -r /user/tickers/ticks.parquet

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet



Scala code:

spark-shell --driver-memory 1G --executor-memory 1G --driver-cores 1 --executor-cores 1 
--jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar" 
--conf spark.cassandra.connection.host=193.124.112.90 
--conf "spark.sql.parquet.writeLegacyFormat=true"

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._

case class T_DAT_META(ticker_id: Int, ddate :java.sql.Date)

def getTickers = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "tickers", "keyspace" -> "mts_meta"))
              .load()
              .sort(asc("ticker_id"))
          }

def getTicksFromCassByTickerDate(TickerID :Int,Ddate :java.sql.Date) = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
              .option("fetchSize", "10000")
              .load()
              .where(col("ticker_id") === TickerID)
              .where(col("ddate") === Ddate)
              .select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
              .sort(asc("db_tsunx"))
          }

def getTicksFromCassByTickerDateUnx(TickerID :Int,Ddate :java.sql.Date, pdb_tsunx :Long) = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
              .option("fetchSize", "10000")
              .load()
              .where(col("ticker_id") === TickerID)
              .where(col("ddate") === Ddate)
              .where(col("db_tsunx") > pdb_tsunx)
              .select(col("ticker_id"), col("ddate"), col("db_tsunx"), col("ask"), col("bid"))
              .sort(asc("db_tsunx"))
          }

def getMetadata = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "ticks", "keyspace" -> "mts_src"))
              .option("fetchSize", "100")
              .load()
              .select(col("ticker_id"),col("ddate"))
              .distinct
              .sort(asc("ticker_id"))
              .collect
              .toSeq
}

val dsMetadata = getMetadata

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import scala.util.{Try, Success, Failure}

var fdate: Option[java.sql.Date] = Try(new SimpleDateFormat("yyyy-MM-dd").parse("2019-01-15"))
.map(d => new java.sql.Date(d.getTime())).toOption



And now we can load data by all tickers and data, in the loop by tickers and dates.

for (thisTickerDate <- dsMetadata.map(row => T_DAT_META(row.getInt(0),row.getDate(1))))  {  
 println("--------------------------------------------------------------------------------------")
 println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
 val dsTicks = getTicksFromCassByTickerDate(thisTickerDate.ticker_id,thisTickerDate.ddate)
 dsTicks.cache()
 val cntRows = dsTicks.count()
 dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
                             ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
 println("INSERTED = "+ cntRows +" ROWS.")
 println("--------------------------------------------------------------------------------------")
}


Or with custom filters:

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import scala.util.{Try, Success, Failure}

var fdate: Option[java.sql.Date] = Try(new SimpleDateFormat("yyyy-MM-dd").parse("2019-01-15")).map(d => new java.sql.Date(d.getTime())).toOption

for (thisTickerDate <- dsMetadata.map(row => T_DAT_META(row.getInt(0),row.getDate(1))) 
        if (thisTickerDate.ticker_id==1 && thisTickerDate.ddate==fdate.get))  {  
 println("--------------------------------------------------------------------------------------")
 println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
 val dsTicks = getTicksFromCassByTickerDate(thisTickerDate.ticker_id,thisTickerDate.ddate)
 dsTicks.cache()
 val cntRows = dsTicks.count()
 dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
                             ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
 println("INSERTED = "+ cntRows +" ROWS.")
 println("--------------------------------------------------------------------------------------")
}


Or you can take max(db_tsunx) from parquet.

select max(db_tsunx)
  from ticker
 where ticker_id=1

for (thisTickerDate <- dsmetadata.map="" row=""> T_DAT_META(row.getInt(0),row.getDate(1))) 
      if (thisTickerDate.ticker_id==1 && thisTickerDate.ddate==fdate.get))  {  
 println("--------------------------------------------------------------------------------------")
 println("BEGIN TICKER_ID="+thisTickerDate.ticker_id + " DDATE=" + thisTickerDate.ddate)
 val dsTicks = getTicksFromCassByTickerDateUnx(thisTickerDate.ticker_id,thisTickerDate.ddate,1547556210822L)
 dsTicks.cache()
 val cntRows = dsTicks.count()
 dsTicks.write.mode(SaveMode.Overwrite).parquet("hdfs://hdpnn:9000/user/tickers/
                             ticks.parquet/ticker_id="+thisTickerDate.ticker_id+"/ddate="+thisTickerDate.ddate)
 println("INSERTED = "+ cntRows +" ROWS.")
 println("--------------------------------------------------------------------------------------")
}


Additional, information about data.

From Cassandra:

[root@sc1 ~]# nodetool tablestats --human-readable  mts_src.ticks
Total number of tables: 55
----------------
Keyspace : mts_src
        Read Count: 38872
        Read Latency: 6.0569655021609385 ms
        Write Count: 0
        Write Latency: NaN ms
        Pending Flushes: 0
                Table: ticks
                SSTable count: 3
                Space used (live): 390.09 MiB
                Space used (total): 390.09 MiB
                Space used by snapshots (total): 0 bytes
                Off heap memory used (total): 142.56 KiB
                SSTable Compression Ratio: 0.3427504897663621
                Number of partitions (estimate): 386
                Memtable cell count: 0
                Memtable data size: 0 bytes
                Memtable off heap memory used: 0 bytes
                Memtable switch count: 0
                Local read count: 38872
                Local read latency: NaN ms
                Local write count: 0
                Local write latency: NaN ms
                Pending flushes: 0
                Percent repaired: 0.0
                Bloom filter false positives: 403
                Bloom filter false ratio: 0.01061
                Bloom filter space used: 536 bytes
                Bloom filter off heap memory used: 512 bytes
                Index summary off heap memory used: 130 bytes
                Compression metadata off heap memory used: 141.93 KiB
                Compacted partition minimum bytes: 29522
                Compacted partition maximum bytes: 12108970
                Compacted partition mean bytes: 3275377
                Average live cells per slice (last five minutes): NaN
                Maximum live cells per slice (last five minutes): 0
                Average tombstones per slice (last five minutes): NaN
                Maximum tombstones per slice (last five minutes): 0
                Dropped Mutations: 0 bytes




From HDFS:

[hadoop@hdpnn ~]$ hadoop fs -du -s -h /user/tickers/ticks.parquet
82.2 M  246.5 M  /user/tickers/ticks.parquet

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x   - root root          0 2019-01-16 04:51 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x   - root root          0 2019-01-15 07:38 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x   - root root          0 2019-01-15 07:40 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x   - root root          0 2019-01-15 07:43 /user/tickers/ticks.parquet/ticker_id=12
drwxr-xr-x   - root root          0 2019-01-15 07:45 /user/tickers/ticks.parquet/ticker_id=13
drwxr-xr-x   - root root          0 2019-01-15 07:46 /user/tickers/ticks.parquet/ticker_id=14
drwxr-xr-x   - root root          0 2019-01-15 07:48 /user/tickers/ticks.parquet/ticker_id=15
drwxr-xr-x   - root root          0 2019-01-15 07:50 /user/tickers/ticks.parquet/ticker_id=16
...
...


Now you can query this parquet with Hive and Dbeaver. Queries examples.

MSCK REPAIR TABLE ticker;

select ddate,sum(1),min(db_tsunx),max(db_tsunx)
  from ticker
 where ticker_id=1
 group by ddate;

select max(db_tsunx)
  from ticker
 where ticker_id=1
 
 select sum(1) as cnt
  from ticker
 where ticker_id=1;

select ddate,sum(1) as cnt
  from ticker
 where ticker_id=11
group by ddate;

select sum(1),min(db_tsunx),max(db_tsunx)
  from ticker
 where ddate='2019-01-09' and ticker_id=1;


Application for this article (Scala) (Download from github cass_to_hdfs_parquet) Commit #1

My next idea is to configure Hive to use Spark as execute engine, now is MR.





Комментарии

Отправить комментарий

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7