Loading data into Spark from Oracle RDBMS, CSV


In this article shown how read data from Oracle tables with jdbc and direct from csv with Spark.
First of all we need download jdbc Oracle driver ojdbc6.jar and put it into Spark Home jar directory.
For example - C:\spark-2.3.0-bin-hadoop2.7\jars

Example #1.
From Oracle partitioned table load data per pertitions into parquet file.
aaa.xxx.yyyy.zzz - ip address of Oracle server.



import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).config("hive.exec.dynamic.partition","true").config("hive.exec.dynamic.partition.mode","nonstrict").enableHiveSupport().getOrCreate()

//recreate table
spark.sql(s"DROP TABLE IF EXISTS T_DATA")
spark.sql(s"CREATE TABLE IF NOT EXISTS T_DATA(id_row STRING,val decimal(38,10)) PARTITIONED BY (id_pok INT) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')")

//get list of table partitions names
val df_ora_parts_names = spark.read.format("jdbc").option("url", "jdbc:oracle:thin:@aaa.xxx.yyyy.zzz:1521/nsser").option("user", "MSK_ARM_LEAD").option("password", "MSK_ARM_LEAD").option("dbtable", s"(select s.partition_name from user_segments s where s.segment_name='T_DATA')").option("fetchSize", "1000").load()

//convert it into scala collection
val ora_parts_list = df_ora_parts_names.as[(String)].collect.toSeq

for (ora_part_name <- ora_parts_list){ 
 println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
 println("        Processing partition:   "+ora_part_name)
 println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
 //get one partition data from Oracle RDBMS
 val df_tdata = spark.read.format("jdbc").option("url","jdbc:oracle:thin:@aaa.xxx.yyyy.zzz:1521/nsser").option("user","MSK_ARM_LEAD").option("password","MSK_ARM_LEAD").option("dbtable",s"(select id_row,val,CAST(id_pok AS INTEGER) as ID_POK from T_DATA partition ("+ora_part_name+"))").option("fetchSize", "10000").load()

 df_tdata.write.mode(SaveMode.Append).format("parquet").insertInto("T_DATA")
}

val prq_table_df        = spark.sql(s"select * from t_data")
val prq_table_row_count = prq_table_df.count()

println("        Finish processing. There are ["+ prq_table_row_count +"] rows in parquet table T_DATA.")
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")

val test = spark.sql(s"select * from t_data").count() 


Example #2. 
From oracle we upload data into csv with using dbms_sql, utl_file


create or replace procedure dump_table_to_csv_1p(
                                              p_dir      in varchar2,
                                              p_filename in varchar2
                                             ) is
        l_output        utl_file.file_type;
        l_theCursor     integer default dbms_sql.open_cursor;
        l_columnValue   varchar2(4000);
        l_status        integer;
        l_query         varchar2(1000) default 'select id_row,val,CAST(id_pok AS INTEGER) as ID_POK from T_DATA PARTITION (PART_20161209_SP_15)';
       l_colCnt        number := 0;
       l_separator     varchar2(1);
       l_descTbl       dbms_sql.desc_tab;
   begin
       l_output := UTL_FILE.FOPEN(p_dir,p_filename,'w',2048);

       dbms_sql.parse(  l_theCursor,  l_query, dbms_sql.native );
       dbms_sql.describe_columns( l_theCursor, l_colCnt, l_descTbl );

       for i in 1 .. l_colCnt loop
           utl_file.put( l_output, l_separator || '"' || l_descTbl(i).col_name || '"' );
           dbms_sql.define_column( l_theCursor, i, l_columnValue, 4000 );
           l_separator := ';';
       end loop;
       utl_file.new_line( l_output );

       l_status := dbms_sql.execute(l_theCursor);

       while ( dbms_sql.fetch_rows(l_theCursor) > 0 ) loop
           l_separator := '';
           for i in 1 .. l_colCnt loop
               dbms_sql.column_value( l_theCursor, i, l_columnValue );
               utl_file.put( l_output, l_separator || l_columnValue );
               l_separator := ';';
           end loop;
           utl_file.new_line( l_output );
       end loop;
       dbms_sql.close_cursor(l_theCursor);
       utl_file.fclose( l_output );
end;


Call  (BACKUP is directory name in Oracle that related with OS path )

begin
 dump_table_to_csv_1p(p_dir => 'BACKUP',p_filename => 't_data.csv');  
end;


After uploading data into file we can take it with WinSCP or SCP. And put into local path -
C:\spark_data\ora\t_data.csv

Next we can read this csv (I additionally load one more table.) and save into parquet.

spark-shell --driver-memory 4G 

import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).config("hive.exec.dynamic.partition","true").config("hive.exec.dynamic.partition.mode","nonstrict").enableHiveSupport().getOrCreate()
println(warehouseLocation)

spark.sql(s"DROP TABLE IF EXISTS T_DATA")
spark.sql(s"CREATE TABLE IF NOT EXISTS T_DATA(id_row STRING,val decimal(38,10)) PARTITIONED BY (id_pok INT) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')")

spark.sql(s"""
CREATE TABLE IF NOT EXISTS T_KEYS(
DDATE INT,
DDATE_ACTUAL INT,
id_row STRING,
ID_OIV INT,
ID_ORG STRING,
ID_CLASS_GRBS INT,
ID_GP INT,
ID_BUDGET INT,
ID_INDUSTRY INT,
ID_TERRITORY INT,
ID_UK INT,
ID_SERV_WORK INT,
ID_MSP INT,
ID_PREF INT,
ID_INCOME_BUDGET INT,
ID_GU_TYPE INT
) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')""")

val keysDF = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("C:\\spark_data\\ora\\t_keys.csv")
keysDF.write.mode(SaveMode.Append).format("parquet").insertInto("T_KEYS")
keysDF.unpersist()

val dataDF = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("C:\\spark_data\\ora\\t_data.csv")
dataDF.write.mode(SaveMode.Append).format("parquet").insertInto("T_DATA")
dataDF.unpersist()

//read from parquets:

val keysDF = spark.read.parquet("C:\\spark-warehouse\\t_keys")
keysDF.cache()

val dataDF = spark.read.parquet("C:\\spark-warehouse\\t_data")
dataDF.cache()


And what we have here:

scala> dataDF.printSchema()
root
 |-- id_row: string (nullable = true)
 |-- val: decimal(38,10) (nullable = true)
 |-- id_pok: integer (nullable = true) 

scala> dataDF.show()
+--------------------+---------------+------+
|              id_row|            val|id_pok|
+--------------------+---------------+------+
|20161209_-1_10010...|   9.2900000000|   480|
|20161209_-1_10010...|  19.2700000000|   480|
|20161209_-1_10010...|1897.9000000000|   480|
|20161209_-1_10010...| 404.4200000000|   480|
|20161209_-1_10010...| 237.1200000000|   480|
|20161209_-1_10010...| 313.1100000000|   480|
|20161209_-1_10010...| 480.6700000000|   480|
|20161209_-1_10010...| 381.2000000000|   480|
|20161209_-1_10010...| 199.9400000000|   480|
|20161209_-1_10010...|  29.0400000000|   480|
|20161209_-1_10010...|  20.6200000000|   480|
|20161209_-1_10010...|  26.7500000000|   480|


scala> keysDF.printSchema()
root
 |-- DDATE: integer (nullable = true)
 |-- DDATE_ACTUAL: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- ID_OIV: integer (nullable = true)
 |-- ID_ORG: string (nullable = true)
 |-- ID_CLASS_GRBS: integer (nullable = true)
 |-- ID_GP: integer (nullable = true)
 |-- ID_BUDGET: integer (nullable = true)
 |-- ID_INDUSTRY: integer (nullable = true)
 |-- ID_TERRITORY: integer (nullable = true)
 |-- ID_UK: integer (nullable = true)
 |-- ID_SERV_WORK: integer (nullable = true)
 |-- ID_MSP: integer (nullable = true)
 |-- ID_PREF: integer (nullable = true)
 |-- ID_INCOME_BUDGET: integer (nullable = true)
 |-- ID_GU_TYPE: integer (nullable = true)

scala> keysDF.show()
+--------+------------+--------------------+------+------+-------------+-----+---------+-----------+------------+-----+------------+------+-------+----------------+----------+
|   DDATE|DDATE_ACTUAL|              id_row|ID_OIV|ID_ORG|ID_CLASS_GRBS|ID_GP|ID_BUDGET|ID_INDUSTRY|ID_TERRITORY|ID_UK|ID_SERV_WORK|ID_MSP|ID_PREF|ID_INCOME_BUDGET|ID_GU_TYPE|
+--------+------------+--------------------+------+------+-------------+-----+---------+-----------+------------+-----+------------+------+-------+----------------+----------+
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|          -1|   -1|          -1|    -1|     -1|              -1|        -1|
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|         -20|   -1|          -1|    -1|     -1|              -1|        -1|
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|         106|   -1|          -1|    -1|     -1|              -1|        -1|
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|         108|   -1|          -1|    -1|     -1|              -1|        -1|
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|         110|   -1|          -1|    -1|     -1|              -1|        -1|
|20161209|    20161206|20161209_-1_-1_-1...|    -1|    -1|           -1|   -1|       15|        -10|         115|   -1|          -1|    -1|     -1|              -1|        -1|

Its all. Next this 2 dataset will used in set of articles about transformaions and actions.



Комментарии

Популярные сообщения из этого блога

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7