Spark operations with sparl.sql (Dataset, Dataframe)
I have study Spark cluster and Cassandra cluster that described in this topic.
Here I am going to work (make some examples) with Spark cluster under spark-shell and with data from Cassandra.
First of all, we need to copy Cassandra driver into $SPARK_HOME/jars (/opt/spark-2.3.2/jars/)
1) Read the whole table from Cassandra into Dataset[org.apache.spark.sql.Row]
[root@smn conf]# spark-shell --jars /opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar --driver-memory 3g --executor-memory 3g --conf spark.cassandra.connection.host=192.168.122.192
2018-10-12 06:51:51 WARN Utils:66 - Your hostname, smn resolves to a loopback address: 127.0.0.1; using 192.168.122.219 instead (on interface eth0)
2018-10-12 06:51:51 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-10-12 06:51:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://smn:4040
Spark context available as 'sc' (master = local[*], app id = local-1539327122125).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
Load data from Cassandra table:
val df_tdata = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead")).load().cache()
df_tdata: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ddate: int, id_pok: int ... 2 more fields]
df_tdata.count() -Count is a Action, it means that operation executing is begining
This time thare are a lot of messages (later I will try to fix it)
2018-10-12 06:56:08 WARN MemoryStore:66 - Not enough space to cache rdd_4_275 in memory! (computed 384.0 B so far)
2018-10-12 06:56:08 WARN BlockManager:66 - Persisting block rdd_4_275 to disk instead.
2018-10-12 06:56:08 WARN MemoryStore:66 - Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_4_275 in memory.
And finally result is:
res0: Long = 48935912
Sample output
+--------+------+--------------------+--------------+
| ddate|id_pok| id_row| sval|
+--------+------+--------------------+--------------+
|20180601| 20|20180601_-1_-1_-1...|65185.26051635|
|20180601| 20|20180601_-1_-1_-1...| 38.36128149|
|20180601| 20|20180601_-1_-1_-1...| 1549.16905262|
|20180601| 20|20180601_-1_-1_-1...| 487.27482278|
|20180601| 20|20180601_-1_-1_-1...| 35.22986917|
|20180601| 20|20180601_-1_-1_-1...| 3.93099237|
|20180601| 20|20180601_-1_-1_-1...| 0|
|20180601| 20|20180601_-1_-1_-1...| 31.2988768|
|20180601| 20|20180601_-1_-1_-1...| 0|
+--------+------+--------------------+--------------+
For next examples, I am going extract just part of data and unpersists this df_tdata Dataset.
scala> val dat = df_tdata.filter(df_tdata("ddate") === "20180601").cache()
dat: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ddate: int, id_pok: int ... 2 more fields]
scala> dat.count()
res2: Long = 17567413
scala> df_tdata.unpersist
Step out - Datasets and DataFrames from Spark 2.3.2 documentation.
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. The Dataset API is available in Scala and Java. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.
As you can see above we have org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]. In Scala, a DataFrame is represented by a Dataset of Rows and DataFrame is simply a type alias of Dataset[Row]. It means that there is no difference between DS and DF for us with Scala.
Starting in Spark 2.0, Dataset takes on two distinct APIs characteristics: a strongly-typed API and an untyped API, as shown in the table below. Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define in Scala or a class in Java.
“untyped transformations” are transformation applied to Dataframe (Dataset of Row).
“typed transformations” are transformations applied to Dataset (Dataset[T]) - strongly typed Scala Datasets.
Untyped Dataset Operations (aka DataFrame Operations)
Next, I am going to show all these transformations:
1) map(func)
2) filter(func)
3) flatMap(func)
4) mapPartitions(func)
5) mapPartitionsWithIndex(func)
6) sample(withReplacement, fraction, seed)
7) union(otherDataset)
8) intersection(otherDataset)
9) distinct([numPartitions]))
10) groupByKey([numPartitions])
11) reduceByKey(func, [numPartitions])
12) aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
13) sortByKey([ascending], [numPartitions])
14) join(otherDataset, [numPartitions])
15) cogroup(otherDataset, [numPartitions])
16) cartesian(otherDataset)
17) pipe(command, [envVars])
18) coalesce(numPartitions)
19) repartition(numPartitions)
20) repartitionAndSortWithinPartitions(partitioner)
Some examples, change filed sval datatype from string to double.
scala> dat.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: string (nullable = true)
sval it's a Double value in general but stored in Cassandra as String.
ex 1: ==========================================================
scala> val dat_norm_1 = dat.withColumn("svalTmp", dat("sval").cast(DoubleType)).drop("sval").withColumnRenamed("svalTmp","sval")
dat_norm_1: org.apache.spark.sql.DataFrame = [ddate: int, id_pok: int ... 2 more fields]
scala> dat_norm_1.count() //Long = 17567413
scala> dat_norm_1.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: double (nullable = true)
ex 2: ==========================================================
scala> val dat_norm_2 = dat.withColumn("sval",dat("sval").cast(DoubleType))
scala> dat_norm_2.count() //Long = 17567413
scala> dat_norm_2.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: double (nullable = true)
ex 3: ==========================================================
val dat_norm_3 = dat.select(
dat.columns.map {
case ddate @ "ddate" => dat(ddate)
case id_pok @ "id_pok" => dat(id_pok)
case id_row @ "id_row" => dat(id_row)
case sval @ "sval" => dat(sval).cast(DoubleType).as("sval")
}: _*
)
scala> dat_norm_3.count() //Long = 17567413
scala> dat_norm_3.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: double (nullable = true)
ex 4: ==========================================================
val dat_norm_4 = dat.selectExpr("ddate","id_pok","id_row","cast(sval as Double) as sval")
scala> dat_norm_4.count() //Long = 17567413
scala> dat_norm_4.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: double (nullable = true)
ex 5: ==========================================================
val dat_norm_5 = dat.map(r => (r.getAs[Integer]("ddate"), r.getAs[Integer]("id_pok"), r.getAs[String]("id_row"), r.getAs[Double]("sval")))
scala> dat_norm_5.count() //Long = 17567413
scala> dat_norm_5.printSchema
root
|-- _1: integer (nullable = true)
|-- _2: integer (nullable = true)
|-- _3: string (nullable = true)
|-- _4: double (nullable = false)
here dat_norm_5: Is a Dataset of tuples not Rows.
org.apache.spark.sql.Dataset[(Integer, Integer, String, Double)]
We can convert dataset of tuples into dataframe with filed names
scala> dat_norm_5.toDF("ddate","id_pok","id_row","sval")
res20: org.apache.spark.sql.DataFrame = [ddate: int, id_pok: int ... 2 more fields]
scala> dat_norm_5.toDF("ddate","id_pok","id_row","sval").printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: double (nullable = false)
ex 6: ==========================================================
import org.apache.spark.sql.types.DecimalType
case class rowData(ddate: Int,id_pok: Int,id_row: String, sval: BigDecimal)
val df_tdata_ds = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead")).load()
.filter($"ddate" === "20180601")
.withColumn("sval",$"sval".cast(DecimalType(38,18)))
.as[rowData]
df_tdata_ds: org.apache.spark.sql.Dataset[rowData] = [ddate: int, id_pok: int ... 2 more fields]
Dataset[rowData] is a Dataset[T] and not Dataset[Row]
scala> df_tdata_ds.count() //Long = 17567413
scala> df_tdata_ds.printSchema
scala> df_tdata_ds.printSchema
root
|-- ddate: integer (nullable = true)
|-- id_pok: integer (nullable = true)
|-- id_row: string (nullable = true)
|-- sval: decimal(38,18) (nullable = true)
I think that the last one example is more appropriate for some reasons. And one of this described here:
Spark as a compiler understands your Dataset type JVM object, it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.
spark.catalog.clearCache()
clearCache removes all cached Spark SQL tables from the in-memory cache.
1) map(func)
Return a new distributed dataset formed by passing each element
of the source through a function func.
Input dataset [T]
output dataset [U]
val dat = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"t_data","keyspace"->"msk_arm_lead"))
.load().filter($"ddate" === "20180601").withColumn("sval",$"sval".cast(DecimalType(38,18))).as[rowData]
dat: org.apache.spark.sql.Dataset[rowData] = [ddate: int, id_pok: int ... 2 more fields]
case class rowNewData(ddate: Int,id_pok: Int,id_row: String, sval: BigDecimal, cntrlParam: Int)
val new_dat = dat.map { row =>(row: @unchecked) match {case rowData => rowNewData(row.ddate, row.id_pok, row.id_row, row.sval, {if (row.sval>65000.0) 1 else 0} )}}
org.apache.spark.sql.Dataset[rowNewData] = [ddate: int, id_pok: int ... 3 more fields]
x - 7
x - 8
x - 9
x - 10
I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Apache Spark Certification
ОтветитьУдалитьkayseriescortu.com - alacam.org - xescortun.com
ОтветитьУдалитьworld777 india
ОтветитьУдалитьfully furnished flat in jaipur under best price
class 11 tuition classes in gurgaon
kurti plazo set under 300
handblock print kurti
azure firewall
azure blueprints
azure resource group
azure application gateway
azure express route
A Dedicated Server is a physical server that's assigned to a single organization.
ОтветитьУдалить