Loading data into Spark from Oracle RDBMS, CSV
In this article shown how read data from Oracle tables with jdbc and direct from csv with Spark.
First of all we need download jdbc Oracle driver ojdbc6.jar and put it into Spark Home jar directory.
For example - C:\spark-2.3.0-bin-hadoop2.7\jars
Example #1.
From Oracle partitioned table load data per pertitions into parquet file.
aaa.xxx.yyyy.zzz - ip address of Oracle server.
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).config("hive.exec.dynamic.partition","true").config("hive.exec.dynamic.partition.mode","nonstrict").enableHiveSupport().getOrCreate()
//recreate table
spark.sql(s"DROP TABLE IF EXISTS T_DATA")
spark.sql(s"CREATE TABLE IF NOT EXISTS T_DATA(id_row STRING,val decimal(38,10)) PARTITIONED BY (id_pok INT) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')")
//get list of table partitions names
val df_ora_parts_names = spark.read.format("jdbc").option("url", "jdbc:oracle:thin:@aaa.xxx.yyyy.zzz:1521/nsser").option("user", "MSK_ARM_LEAD").option("password", "MSK_ARM_LEAD").option("dbtable", s"(select s.partition_name from user_segments s where s.segment_name='T_DATA')").option("fetchSize", "1000").load()
//convert it into scala collection
val ora_parts_list = df_ora_parts_names.as[(String)].collect.toSeq
for (ora_part_name <- ora_parts_list){
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
println(" Processing partition: "+ora_part_name)
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
//get one partition data from Oracle RDBMS
val df_tdata = spark.read.format("jdbc").option("url","jdbc:oracle:thin:@aaa.xxx.yyyy.zzz:1521/nsser").option("user","MSK_ARM_LEAD").option("password","MSK_ARM_LEAD").option("dbtable",s"(select id_row,val,CAST(id_pok AS INTEGER) as ID_POK from T_DATA partition ("+ora_part_name+"))").option("fetchSize", "10000").load()
df_tdata.write.mode(SaveMode.Append).format("parquet").insertInto("T_DATA")
}
val prq_table_df = spark.sql(s"select * from t_data")
val prq_table_row_count = prq_table_df.count()
println(" Finish processing. There are ["+ prq_table_row_count +"] rows in parquet table T_DATA.")
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
val test = spark.sql(s"select * from t_data").count()
Example #2.
From oracle we upload data into csv with using dbms_sql, utl_file
create or replace procedure dump_table_to_csv_1p(
p_dir in varchar2,
p_filename in varchar2
) is
l_output utl_file.file_type;
l_theCursor integer default dbms_sql.open_cursor;
l_columnValue varchar2(4000);
l_status integer;
l_query varchar2(1000) default 'select id_row,val,CAST(id_pok AS INTEGER) as ID_POK from T_DATA PARTITION (PART_20161209_SP_15)';
l_colCnt number := 0;
l_separator varchar2(1);
l_descTbl dbms_sql.desc_tab;
begin
l_output := UTL_FILE.FOPEN(p_dir,p_filename,'w',2048);
dbms_sql.parse( l_theCursor, l_query, dbms_sql.native );
dbms_sql.describe_columns( l_theCursor, l_colCnt, l_descTbl );
for i in 1 .. l_colCnt loop
utl_file.put( l_output, l_separator || '"' || l_descTbl(i).col_name || '"' );
dbms_sql.define_column( l_theCursor, i, l_columnValue, 4000 );
l_separator := ';';
end loop;
utl_file.new_line( l_output );
l_status := dbms_sql.execute(l_theCursor);
while ( dbms_sql.fetch_rows(l_theCursor) > 0 ) loop
l_separator := '';
for i in 1 .. l_colCnt loop
dbms_sql.column_value( l_theCursor, i, l_columnValue );
utl_file.put( l_output, l_separator || l_columnValue );
l_separator := ';';
end loop;
utl_file.new_line( l_output );
end loop;
dbms_sql.close_cursor(l_theCursor);
utl_file.fclose( l_output );
end;
Call (BACKUP is directory name in Oracle that related with OS path )
begin
dump_table_to_csv_1p(p_dir => 'BACKUP',p_filename => 't_data.csv');
end;
After uploading data into file we can take it with WinSCP or SCP. And put into local path -
C:\spark_data\ora\t_data.csv
Next we can read this csv (I additionally load one more table.) and save into parquet.
spark-shell --driver-memory 4G
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).config("hive.exec.dynamic.partition","true").config("hive.exec.dynamic.partition.mode","nonstrict").enableHiveSupport().getOrCreate()
println(warehouseLocation)
spark.sql(s"DROP TABLE IF EXISTS T_DATA")
spark.sql(s"CREATE TABLE IF NOT EXISTS T_DATA(id_row STRING,val decimal(38,10)) PARTITIONED BY (id_pok INT) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')")
spark.sql(s"""
CREATE TABLE IF NOT EXISTS T_KEYS(
DDATE INT,
DDATE_ACTUAL INT,
id_row STRING,
ID_OIV INT,
ID_ORG STRING,
ID_CLASS_GRBS INT,
ID_GP INT,
ID_BUDGET INT,
ID_INDUSTRY INT,
ID_TERRITORY INT,
ID_UK INT,
ID_SERV_WORK INT,
ID_MSP INT,
ID_PREF INT,
ID_INCOME_BUDGET INT,
ID_GU_TYPE INT
) stored AS PARQUET tblproperties ('PARQUET.COMPRESS'='ZLIB')""")
val keysDF = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("C:\\spark_data\\ora\\t_keys.csv")
keysDF.write.mode(SaveMode.Append).format("parquet").insertInto("T_KEYS")
keysDF.unpersist()
val dataDF = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("C:\\spark_data\\ora\\t_data.csv")
dataDF.write.mode(SaveMode.Append).format("parquet").insertInto("T_DATA")
dataDF.unpersist()
//read from parquets:
val keysDF = spark.read.parquet("C:\\spark-warehouse\\t_keys")
keysDF.cache()
val dataDF = spark.read.parquet("C:\\spark-warehouse\\t_data")
dataDF.cache()
And what we have here:
scala> dataDF.printSchema()
root
|-- id_row: string (nullable = true)
|-- val: decimal(38,10) (nullable = true)
|-- id_pok: integer (nullable = true)
scala> dataDF.show()
+--------------------+---------------+------+
| id_row| val|id_pok|
+--------------------+---------------+------+
|20161209_-1_10010...| 9.2900000000| 480|
|20161209_-1_10010...| 19.2700000000| 480|
|20161209_-1_10010...|1897.9000000000| 480|
|20161209_-1_10010...| 404.4200000000| 480|
|20161209_-1_10010...| 237.1200000000| 480|
|20161209_-1_10010...| 313.1100000000| 480|
|20161209_-1_10010...| 480.6700000000| 480|
|20161209_-1_10010...| 381.2000000000| 480|
|20161209_-1_10010...| 199.9400000000| 480|
|20161209_-1_10010...| 29.0400000000| 480|
|20161209_-1_10010...| 20.6200000000| 480|
|20161209_-1_10010...| 26.7500000000| 480|
scala> keysDF.printSchema()
root
|-- DDATE: integer (nullable = true)
|-- DDATE_ACTUAL: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- ID_OIV: integer (nullable = true)
|-- ID_ORG: string (nullable = true)
|-- ID_CLASS_GRBS: integer (nullable = true)
|-- ID_GP: integer (nullable = true)
|-- ID_BUDGET: integer (nullable = true)
|-- ID_INDUSTRY: integer (nullable = true)
|-- ID_TERRITORY: integer (nullable = true)
|-- ID_UK: integer (nullable = true)
|-- ID_SERV_WORK: integer (nullable = true)
|-- ID_MSP: integer (nullable = true)
|-- ID_PREF: integer (nullable = true)
|-- ID_INCOME_BUDGET: integer (nullable = true)
|-- ID_GU_TYPE: integer (nullable = true)
scala> keysDF.show()
+--------+------------+--------------------+------+------+-------------+-----+---------+-----------+------------+-----+------------+------+-------+----------------+----------+
| DDATE|DDATE_ACTUAL| id_row|ID_OIV|ID_ORG|ID_CLASS_GRBS|ID_GP|ID_BUDGET|ID_INDUSTRY|ID_TERRITORY|ID_UK|ID_SERV_WORK|ID_MSP|ID_PREF|ID_INCOME_BUDGET|ID_GU_TYPE|
+--------+------------+--------------------+------+------+-------------+-----+---------+-----------+------------+-----+------------+------+-------+----------------+----------+
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| -1| -1| -1| -1| -1| -1| -1|
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| -20| -1| -1| -1| -1| -1| -1|
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| 106| -1| -1| -1| -1| -1| -1|
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| 108| -1| -1| -1| -1| -1| -1|
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| 110| -1| -1| -1| -1| -1| -1|
|20161209| 20161206|20161209_-1_-1_-1...| -1| -1| -1| -1| 15| -10| 115| -1| -1| -1| -1| -1| -1|
Its all. Next this 2 dataset will used in set of articles about transformaions and actions.
Комментарии
Отправить комментарий