Machine Learning with Spark (MultilayerPerceptronClassifier) and Scala

I will use test data like in the previous article (KMeans)

We will train model, check it on test data and finally use it to classify (or clustering) points one by one.

Generate test data set that contains 3 areas,


drop table delit_test_data;

create table delit_test_data as
--cluster 1
select 1 as cluster_number,
       ROUND(2+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*1.5,2) as x,
       ROUND(2+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*1.5,2) as y
  from dual
  connect by rownum<=30
union all
--cluster 2
select 2 as cluster_number,
       ROUND(8+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*2,2) as x,
       ROUND(2+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*2,2) as y
  from dual
  connect by rownum<=40
union all
--cluster 3
select 3 as cluster_number,
       ROUND(6+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*2,2) as x,
       ROUND(6+(case when dbms_random.value<0.5 then -1 else +1 end)*dbms_random.value*2,2) as y
  from dual
  connect by rownum<=50


And with this query, we gather all groups in Dataset for Spark.

select CLUSTER_NUMBER||' '||replace('1:'||x||' '||'2:'||y,':.',':0.') as point_properties
from delit_test_data



File with test data looks like this:


1 1:1.63 2:3.13
1 1:0.83 2:2.63
1 1:1.41 2:3.33
1 1:2.19 2:0.99
1 1:3.37 2:1.61
...
...


And Scala code for Spark

spark-shell --driver-memory 1G --executor-memory 1G --driver-cores 1 --executor-cores 1

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val data = spark.read.format("libsvm").option("numFeatures", "2").load("/root/sample_data.txt")
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
println(" rows:"+data.count()+" train:"+train.count()+" test:"+test.count())

val layers = Array[Int](2, 5, 5, 10)
val trainer = new MultilayerPerceptronClassifier().setLayers(layers).setLabelCol("label")
                  .setFeaturesCol("features").setBlockSize(128).setSeed(1234L).setMaxIter(1000)
val model = trainer.fit(train)

val result = model.transform(test)
result.show()

val predictionAndLabels = result.select("prediction", "label")
predictionAndLabels.show()

val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")


And output


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

rows:120 train:76 test:44

scala> result.show()
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|(2,[0,1],[0.51,1....|[-43.750923192468...|[1.25414675342084...|       1.0|
|  1.0|(2,[0,1],[0.52,2....|[-43.750923192468...|[1.25414675342084...|       1.0|
|  1.0|(2,[0,1],[0.55,1....|[-43.750923192468...|[1.25414675342084...|       1.0|
|  1.0|(2,[0,1],[0.85,1....|[-43.750923192468...|[1.25414675342084...|       1.0|
|  1.0|(2,[0,1],[0.87,1....|[-43.750923192468...|[1.25414675342084...|       1.0|
|  1.0|(2,[0,1],[1.41,3....|[-43.750923192468...|[1.25414675342091...|       1.0|
|  1.0|(2,[0,1],[1.88,2....|[-43.750923192468...|[1.25414675342095...|       1.0|
|  1.0|(2,[0,1],[3.09,0....|[-43.750923192468...|[1.25414675342205...|       1.0|
|  1.0|(2,[0,1],[3.19,2....|[-43.750923192468...|[1.25414675342237...|       1.0|
|  1.0|(2,[0,1],[3.3,3.04])|[-43.750923192468...|[1.25414675342298...|       1.0|
|  2.0|(2,[0,1],[6.11,3....|[-40.614521432898...|[3.51299369680902...|       3.0|
|  2.0| (2,[0,1],[6.2,2.2])|[-30.038805110607...|[2.46871065333158...|       2.0|
|  2.0|(2,[0,1],[6.33,1....|[-30.038964730398...|[2.46651427813516...|       2.0|
|  2.0|(2,[0,1],[6.64,0....|[-30.038795336835...|[2.46883708617917...|       2.0|
|  2.0|(2,[0,1],[6.96,3.8])|[-40.659221876615...|[5.47012383188289...|       3.0|
|  2.0|(2,[0,1],[7.44,1....|[-30.043983671107...|[2.40149619750622...|       2.0|
|  2.0|(2,[0,1],[7.46,1....|[-30.043977312460...|[2.40157674103060...|       2.0|
|  2.0|(2,[0,1],[7.56,3.0])|[-30.050813395745...|[2.31661212502148...|       2.0|
|  2.0|(2,[0,1],[8.27,0....|[-30.053466870737...|[2.28446440147314...|       2.0|
|  2.0|(2,[0,1],[8.97,0....|[-30.072762423415...|[2.06376110823493...|       2.0|
+-----+--------------------+--------------------+--------------------+----------+

scala> predictionAndLabels.show()
+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       3.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       3.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
+----------+-----+

Test set accuracy = 0.9318181818181818


Little description :

.option("numFeatures", "2"). - 2 because we have just 2 properties per row (object), X and Y.

val layers = Array[Int](2, 5, 5, 10)
First parameter equal number of features, there is 2 in out case.
Next parameters except for the last response for a neural network.
And the last one is output. We know that expected output is 3, but if you set 3 as last Array value.
It can be errors like:

scala> val model = trainer.fit(train)

2019-02-07 13:38:16 WARN  BlockManager:66 - Putting block rdd_174_0 failed due to exception java.lang.ArrayIndexOutOfBoundsException: 3.

2019-02-07 13:38:16 WARN  BlockManager:66 - Block rdd_174_0 could not be removed as it was not found on disk or in memory

2019-02-07 13:38:16 ERROR Executor:91 - Exception in task 0.0 in stage 127.0 (TID 318)

java.lang.ArrayIndexOutOfBoundsException: 3



And also we can check that this is exactly 3 classes on the output of the network.

scala> result.select(col="prediction").distinct.show()
+----------+
|prediction|
+----------+
|       1.0|
|       3.0|
|       2.0|
+----------+


Now we can use this trained model for "real" data. For example, we receive points and need to know a group by using our model. I will use 3 points from groups that I know.


val pointM = sc.parallelize(Seq((1.65, 3.13))).toDF("x", "y")
val assembler = new VectorAssembler().setInputCols(Array("x", "y")).setOutputCol("features")
val transformed = assembler.transform(pointM)
val resultPoint = model.transform(transformed)
resultPoint.show()

val pointM = sc.parallelize(Seq((8.01, 2.1))).toDF("x", "y")
val assembler = new VectorAssembler().setInputCols(Array("x", "y")).setOutputCol("features")
val transformed = assembler.transform(pointM)
val resultPoint = model.transform(transformed)
resultPoint.show()

val pointM = sc.parallelize(Seq((6.01, 6.02))).toDF("x", "y")
val assembler = new VectorAssembler().setInputCols(Array("x", "y")).setOutputCol("features")
val transformed = assembler.transform(pointM)
val resultPoint = model.transform(transformed)
resultPoint.show()


Please pay attention to this point's coordinates and the query on the top of this article. We know groups, first one is (1.65, 3.13) this is a group 1, (8.01, 2.1) group 2 and last one (6.01, 6.02) is from group 3.

What about Spark output:


scala> resultPoint.show()
+----+----+-----------+--------------------+--------------------+----------+
|   x|   y|   features|       rawPrediction|         probability|prediction|
+----+----+-----------+--------------------+--------------------+----------+
|1.65|3.13|[1.65,3.13]|[-43.750923192468...|[1.25414675342095...|       1.0|
+----+----+-----------+--------------------+--------------------+----------+

scala> resultPoint.show()
+----+---+----------+--------------------+--------------------+----------+
|   x|  y|  features|       rawPrediction|         probability|prediction|
+----+---+----------+--------------------+--------------------+----------+
|8.01|2.1|[8.01,2.1]|[-30.055399735199...|[2.26132996533543...|       2.0|
+----+---+----------+--------------------+--------------------+----------+

scala> resultPoint.show()
+----+----+-----------+--------------------+--------------------+----------+
|   x|   y|   features|       rawPrediction|         probability|prediction|
+----+----+-----------+--------------------+--------------------+----------+
|6.01|6.02|[6.01,6.02]|[-40.628755273078...|[4.31571456750149...|       3.0|
+----+----+-----------+--------------------+--------------------+----------+


It's ok, everything correct. We can notice that probability is increasing from group 1 to 3. I think it's related to the count of points in each group that we have in the train set. In our case, more data increase the probability.


scala> train.groupBy("label").sum().orderBy("label").show()
+-----+----------+
|label|sum(label)|
+-----+----------+
|  1.0|      20.0|
|  2.0|      54.0|
|  3.0|      87.0|
+-----+----------+


What about points from the rest of our scope.


scala> resultPoint.show()
+-----+---+-----------+--------------------+--------------------+----------+
|    x|  y|   features|       rawPrediction|         probability|prediction|
+-----+---+-----------+--------------------+--------------------+----------+
|0.005|0.1|[0.005,0.1]|[-43.750923192468...|[1.25414675342084...|       1.0|
+-----+---+-----------+--------------------+--------------------+----------+



scala> resultPoint.show()
+----+---+----------+--------------------+--------------------+----------+
|   x|  y|  features|       rawPrediction|         probability|prediction|
+----+---+----------+--------------------+--------------------+----------+
|12.1|1.7|[12.1,1.7]|[-32.119394736137...|[4.31272212408223...|       2.0|
+----+---+----------+--------------------+--------------------+----------+



scala> resultPoint.show()
+---+----+----------+--------------------+--------------------+----------+
|  x|   y|  features|       rawPrediction|         probability|prediction|
+---+----+----------+--------------------+--------------------+----------+
|6.1|12.7|[6.1,12.7]|[-41.352251470709...|[1.57012985171729...|       3.0|
+---+----+----------+--------------------+--------------------+----------+




Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7