Spark operations with sparl.sql (Dataset, Dataframe)


I have study Spark cluster and Cassandra cluster that described in this topic.
Here I am going to work (make some examples) with Spark cluster under spark-shell and with data from Cassandra.
First of all, we need to copy Cassandra driver into $SPARK_HOME/jars (/opt/spark-2.3.2/jars/)


1) Read the whole table from Cassandra into Dataset[org.apache.spark.sql.Row]


[root@smn conf]# spark-shell --jars /opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar --driver-memory 3g --executor-memory 3g --conf spark.cassandra.connection.host=192.168.122.192
2018-10-12 06:51:51 WARN  Utils:66 - Your hostname, smn resolves to a loopback address: 127.0.0.1; using 192.168.122.219 instead (on interface eth0)
2018-10-12 06:51:51 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-12 06:51:52 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://smn:4040
Spark context available as 'sc' (master = local[*], app id = local-1539327122125).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._




Load data from Cassandra table:

val df_tdata = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead")).load().cache()
df_tdata: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ddate: int, id_pok: int ... 2 more fields]

df_tdata.count() -Count is a Action, it means that operation executing is begining 
This time thare are a lot of messages (later I will try to fix it)

2018-10-12 06:56:08 WARN  MemoryStore:66 - Not enough space to cache rdd_4_275 in memory! (computed 384.0 B so far)
2018-10-12 06:56:08 WARN  BlockManager:66 - Persisting block rdd_4_275 to disk instead.
2018-10-12 06:56:08 WARN  MemoryStore:66 - Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_4_275 in memory.

And finally result is: 
res0: Long = 48935912

Sample output

+--------+------+--------------------+--------------+
|   ddate|id_pok|              id_row|          sval|
+--------+------+--------------------+--------------+
|20180601|    20|20180601_-1_-1_-1...|65185.26051635|
|20180601|    20|20180601_-1_-1_-1...|   38.36128149|
|20180601|    20|20180601_-1_-1_-1...| 1549.16905262|
|20180601|    20|20180601_-1_-1_-1...|  487.27482278|
|20180601|    20|20180601_-1_-1_-1...|   35.22986917|
|20180601|    20|20180601_-1_-1_-1...|    3.93099237|
|20180601|    20|20180601_-1_-1_-1...|             0|
|20180601|    20|20180601_-1_-1_-1...|    31.2988768|
|20180601|    20|20180601_-1_-1_-1...|             0|
+--------+------+--------------------+--------------+




For next examples, I am going extract just part of data and unpersists this df_tdata Dataset.


scala> val dat = df_tdata.filter(df_tdata("ddate") === "20180601").cache()
dat: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ddate: int, id_pok: int ... 2 more fields]

scala> dat.count()
res2: Long = 17567413

scala> df_tdata.unpersist



Step out - Datasets and DataFrames from Spark 2.3.2 documentation.

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. The Dataset API is available in Scala and Java. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.

As you can see above we have org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]. In Scala, a DataFrame is represented by a Dataset of Rows and DataFrame is simply a type alias of Dataset[Row]. It means that there is no difference between DS and DF for us with Scala.


Starting in Spark 2.0, Dataset takes on two distinct APIs characteristics: a strongly-typed API and an untyped API, as shown in the table below. Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define in Scala or a class in Java.



“untyped transformations” are transformation applied to Dataframe (Dataset of Row).
 “typed transformations” are transformations applied to Dataset (Dataset[T]) - strongly typed Scala Datasets.

Untyped Dataset Operations (aka DataFrame Operations)

Next, I am going to show all these transformations:

1) map(func)
2) filter(func)
3) flatMap(func)
4) mapPartitions(func)
5) mapPartitionsWithIndex(func)
6) sample(withReplacement, fraction, seed)
7) union(otherDataset)
8) intersection(otherDataset)
9) distinct([numPartitions]))
10) groupByKey([numPartitions])
11) reduceByKey(func, [numPartitions])
12) aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
13) sortByKey([ascending], [numPartitions])
14) join(otherDataset, [numPartitions])
15) cogroup(otherDataset, [numPartitions])
16) cartesian(otherDataset)
17) pipe(command, [envVars])
18) coalesce(numPartitions)
19) repartition(numPartitions)
20) repartitionAndSortWithinPartitions(partitioner)

Some examples, change filed sval datatype from string to double.


scala> dat.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: string (nullable = true)

sval it's a Double value in general but stored in Cassandra as String. 





ex 1: ==========================================================
scala> val dat_norm_1 = dat.withColumn("svalTmp", dat("sval").cast(DoubleType)).drop("sval").withColumnRenamed("svalTmp","sval")
dat_norm_1: org.apache.spark.sql.DataFrame = [ddate: int, id_pok: int ... 2 more fields]
scala> dat_norm_1.count() //Long = 17567413
scala> dat_norm_1.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: double (nullable = true)


ex 2: ==========================================================                
scala> val dat_norm_2 = dat.withColumn("sval",dat("sval").cast(DoubleType))
scala> dat_norm_2.count() //Long = 17567413
scala> dat_norm_2.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: double (nullable = true)


ex 3: ==========================================================
val dat_norm_3 = dat.select( 
   dat.columns.map {
     case ddate  @ "ddate"  => dat(ddate)
     case id_pok @ "id_pok" => dat(id_pok)
     case id_row @ "id_row" => dat(id_row)
     case sval   @ "sval"   => dat(sval).cast(DoubleType).as("sval")
   }: _*
)
scala> dat_norm_3.count() //Long = 17567413
scala> dat_norm_3.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: double (nullable = true)


ex 4: ==========================================================
val dat_norm_4 = dat.selectExpr("ddate","id_pok","id_row","cast(sval as Double) as sval") 
scala> dat_norm_4.count() //Long = 17567413
scala> dat_norm_4.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: double (nullable = true)


ex 5: ==========================================================
val dat_norm_5 = dat.map(r => (r.getAs[Integer]("ddate"), r.getAs[Integer]("id_pok"), r.getAs[String]("id_row"), r.getAs[Double]("sval")))
scala> dat_norm_5.count() //Long = 17567413
scala> dat_norm_5.printSchema
root
 |-- _1: integer (nullable = true)
 |-- _2: integer (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: double (nullable = false)

here dat_norm_5: Is a Dataset of tuples not Rows. 
org.apache.spark.sql.Dataset[(Integer, Integer, String, Double)] 

We can convert dataset of tuples into dataframe with filed names

scala> dat_norm_5.toDF("ddate","id_pok","id_row","sval")
res20: org.apache.spark.sql.DataFrame = [ddate: int, id_pok: int ... 2 more fields]

scala> dat_norm_5.toDF("ddate","id_pok","id_row","sval").printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: double (nullable = false)

ex 6: ==========================================================
import org.apache.spark.sql.types.DecimalType

case class rowData(ddate: Int,id_pok: Int,id_row: String, sval: BigDecimal)

val df_tdata_ds = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead")).load()
                  .filter($"ddate" === "20180601")
                  .withColumn("sval",$"sval".cast(DecimalType(38,18)))
                  .as[rowData]

df_tdata_ds: org.apache.spark.sql.Dataset[rowData] = [ddate: int, id_pok: int ... 2 more fields]

Dataset[rowData] is a Dataset[T] and not Dataset[Row]

scala> df_tdata_ds.count() //Long = 17567413
scala> df_tdata_ds.printSchema

scala> df_tdata_ds.printSchema
root
 |-- ddate: integer (nullable = true)
 |-- id_pok: integer (nullable = true)
 |-- id_row: string (nullable = true)
 |-- sval: decimal(38,18) (nullable = true)


I think that the last one example is more appropriate for some reasons. And one of this described here:

Spark as a compiler understands your Dataset type JVM object, it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.



spark.catalog.clearCache()
clearCache removes all cached Spark SQL tables from the in-memory cache.



1) map(func) 
Return a new distributed dataset formed by passing each element 
of the source through a function func.
Input dataset [T] 
output dataset [U]

val dat = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead"))
.load().filter($"ddate" === "20180601").withColumn("sval",$"sval".cast(DecimalType(38,18))).as[rowData]

dat: org.apache.spark.sql.Dataset[rowData] = [ddate: int, id_pok: int ... 2 more fields]

case class rowNewData(ddate: Int,id_pok: Int,id_row: String, sval: BigDecimal, cntrlParam: Int)

val new_dat = dat.map { row =>(row: @unchecked) match {case rowData => rowNewData(row.ddate, row.id_pok, row.id_row, row.sval, {if (row.sval>65000.0) 1 else 0} )}}

org.apache.spark.sql.Dataset[rowNewData] = [ddate: int, id_pok: int ... 3 more fields]






x - 7





x - 8





x - 9





x - 10




Комментарии

Отправить комментарий

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7