Integration akka-http with Spark (Scala example)

In this topic, I will explain one example of integration akka-http (server-side) with Spark (spark session). Why it can be necessary sometimes?
Imagine that:
1) You have source data (from Cassandra, Oracle, Parquet, etc)
2) And you have a machine learning model that uses this data to make classification, for example, Multilayer perceptron classifier (MLPC). MLPC is a classifier based on the feedforward artificial neural network.
3) You have written code on Scala for Spark that load source data, train MLPC model and can be used to predict output value (label) by input value(features).

You want to run it all on Spark with standalone jar application and communicate with application from external, it can be RPC or any. I will use the easiest way - simple HTTP and HTML.

1) Source data description:
I have Cassandra NoSQL database with previously calculated Column Family (like a table in RBDMS). Screenshot made from DataStax DevCenter 1.5.0
Here we have different Forex tickers information.
Generally speaking, it's not so important to take full information about source data because my goal is just to show the common way of integration.
ticker_id - 16 is AUDCHF
bar_width_sec - width of source bar in seconds (bar calculated from ticks)
ts_begin - ts_end - Unix timestamps where Form begins and end.
log_oe - logarithm of price growth, open - exit of Form (not a bar).
formdeepkoef - characteristic of Form.
res_type (of Form) - 'mx' when the price has gone up and 'mn' - otherwise.
formprops - all Form properties.

In other words -we have historical information and want to use it to predict current enter position, buy or sell and TP/Sl levels.


2) Spark cluster. I use a cluster described in the prevouse post.


Only one restriction, I have unfixed problems when the cluster contains 3 slaves.
There are a lot of errors related with
Caused by: java.io.IOException: Failed to connect to smn/xxx.xxx.xxx.xxx:34363
When nodes communicate between and to/from the master.
For this article, I use this cluster with one slave and run application(driver) on worker.
Look at "deploy-mode" explanation

Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.

Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).

spark-shell Scala code of the application.
I run it with command

spark-shell
--driver-memory 8G
--executor-memory 8G
--executor-cores 1
--jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar"
--conf spark.driver.maxResultSize=8G
--conf "spark.cassandra.connection.host=192.168.122.192"

You need to download and put spark-cassandra-connector-assembly-2.3.2.jar into your location.



import org.apache.spark.sql._
import org.apache.spark.sql.functions.round
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
import org.apache.spark.sql.types.{DoubleType}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{DenseVector,SparseVector,Vectors}
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import spark.implicits._
import org.apache.spark.ml.feature.{StringIndexer,SQLTransformer}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import scala.collection.mutable.ListBuffer
import org.jpmml.sparkml.PMMLBuilder

case class Form(res_type      :String,
                frmconfpeak   :String,
                sps           :Double,
                sms           :Double,
                tcvolprofile  :Int,
                acf_05_bws    :Double,
                acf_1_bws     :Double,
                acf_2_bws     :Double)

def showLogs(ds :Dataset[Form]) = println(" SIZE = "+ds.count)


 def getFormsDb(TickerID :Int, BarWidthSec: Int) :Dataset[Form] = {
            import org.apache.spark.sql.functions._
            spark.read.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "bars_forms", "keyspace" -> "mts_bars"))
              .load()
              .where(col("ticker_id") === TickerID)
              .where(col("bar_width_sec") === BarWidthSec)
              .select(
                col("res_type"),
                col("formprops")("frmconfpeak").as("frmconfpeak"),
                col("formprops")("sps").as("sps").cast("Double"),
                col("formprops")("sms").as("sms").cast("Double"),
                col("formprops")("tcvolprofile").as("tcvolprofile").cast("Int"),
                col("formprops")("acf_05_bws").as("acf_05_bws").cast("Double"),
                col("formprops")("acf_1_bws").as("acf_1_bws").cast("Double"),
                col("formprops")("acf_2_bws").as("acf_2_bws").cast("Double")
                ).as[Form]}

val ds :Dataset[Form] = Seq(1,3,5).map(elm => getFormsDb(elm,30)).reduce(_ union _)
ds.cache()
showLogs(ds)

val stages = new ListBuffer[PipelineStage]()
stages += new StringIndexer().setInputCol("frmconfpeak").setOutputCol("confpeakIndex")
stages += new StringIndexer().setInputCol("tcvolprofile").setOutputCol("tcvolprofileIndex")
-------
stages += new StringIndexer().setInputCol("res_type").setOutputCol("label")
stages += new VectorAssembler().setInputCols(Array("tcvolprofileIndex","sps","acf_1_bws","acf_2_bws","confpeakIndex")).setOutputCol("features")
stages += new SQLTransformer().setStatement("SELECT label, features FROM __THIS__")

val MLPCclassif = new MultilayerPerceptronClassifier().setLayers(Array[Int](5, 9, 5, 2)).setLabelCol("label").setFeaturesCol("features").setBlockSize(128).setSeed(1234L).setMaxIter(10)
stages += MLPCclassif

val splits = ds.randomSplit(Array(0.7, 0.3), seed = 1234L)
val train = splits(0)
val test = splits(1)

val estimator = new Pipeline().setStages(stages.toArray)
val model = estimator.fit(train)
val mlpc_predictions = model.transform(test)
val predictionAndLabels = mlpc_predictions.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accur = evaluator.evaluate(predictionAndLabels)
val model = estimator.fit(ds)





I will not explain this code but can say that in the case of using tickers 1,3,5 (EURUSD, GBPUSD, EURCHF) accuracy on test data (model.transform(test)) is 0.8. It's quite good for Forex. :)
In the end, we train the model on the full dataset.

In our very simple example, we will take on Form randomly from the full dataset and make a prediction,
ds.sample(false, 0.05).limit(1)

Full source (just couple of files) you can find on the github repository

the jar file is building with


sbt "set test in assembly := {}" clean compile assembly


It will generate jar for you. In my case there is C:\barlc_predict\target\barclpred.jar - 46Mb.
Next, you need copy this jar file into Spark master and Spark slave (single slave in this case).
You can use WinSCP or scp.
And run it from the master node with the command:


spark-submit --class pred.Main --master spark://192.168.122.219:6066 --driver-memory 6G --total-executor-cores 1 --num-executors 1 --executor-memory 2G --jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar" --conf "spark.cassandra.connection.host=192.168.122.192"  --deploy-mode=cluster /root/barclpred.jar


You will see something like this:


[root@smn ~]# spark-submit --class pred.Main --master spark://192.168.122.219:6066 --driver-memory 6G --total-executor-cores 1 --num-executors 1 --executor-memory 2G --jars "/opt/spark-2.3.2/jars/spark-cassandra-connector-assembly-2.3.2.jar" --conf "spark.cassandra.connection.host=192.168.122.192"  --deploy-mode=cluster /root/barclpred.jar
2019-05-23 05:30:20 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Running Spark using the REST application submission protocol.
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - Submitting a request to launch an application in spark://192.168.122.219:6066.
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - Submission successfully created as driver-20190523053021-0005. Polling submission state...
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - Submitting a request for the status of submission driver-20190523053021-0005 in spark://192.168.122.219:6066.
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - State of driver driver-20190523053021-0005 is now RUNNING.
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - Driver is running on worker worker-20190522081635-192.168.122.192-46691 at 192.168.122.192:46691.
2019-05-23 05:30:21 INFO  RestSubmissionClient:54 - Server responded with CreateSubmissionResponse:
{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20190523053021-0005",
  "serverSparkVersion" : "2.3.2",
  "submissionId" : "driver-20190523053021-0005",
  "success" : true
}
2019-05-23 05:30:21 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-05-23 05:30:21 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-644424c3-04fe-45f1-be49-5beee088a16c
[root@smn ~]#



One Spark web UI

What the next.
Go into your single worker and search port 8085(look inside source code, 8085 is a port for akka-http).


[root@sc1 0]# netstat -nltp | grep 8085
tcp6       0      0 192.168.122.192:8085    :::*                    LISTEN      19762/java


Next, go into worker logs and open it with watch command. In web UI you can find
driver-20190523053021-0005


[root@sc1 work]# pwd
/opt/spark-2.3.2/work
[root@sc1 work]# ls -all
total 0
drwxr-xr-x  6 root root 136 May 23 05:30 .
drwxrwxr-x 15 1000 1000 235 Oct  4  2018 ..
drwxr-xr-x  3 root root  15 May 22 13:16 app-20190522131614-0006
drwxr-xr-x  3 root root  15 May 23 05:30 app-20190523053029-0007
drwxr-xr-x  3 root root  78 May 22 13:16 driver-20190522131611-0004
drwxr-xr-x  3 root root  78 May 23 05:30 driver-20190523053021-0005



p.s. Don't forget to make ports forwarding.


iptables -t nat -A PREROUTING -d 10.241.5.234  -p tcp --dport 4040  -j DNAT --to 192.168.122.192:4040
iptables -I FORWARD -d 192.168.122.192/32 -p tcp -m state --state NEW -m tcp --dport 7077 -j ACCEPT


Ok, now open stdout of Worker with


[root@sc1 driver-20190523053021-0005]# pwd
/opt/spark-2.3.2/work/driver-20190523053021-0005
[root@sc1 driver-20190523053021-0005]# watch tail -n 20 stdout


you will see the last 20 lines of log in real time.
At this time open browser and link address http://10.241.5.234:8085/accur

The average duration is 2 seconds.
Here selected random Form and results of classification. Where
labelValue - converted from res_type
stages += new StringIndexer().setInputCol("res_type").setOutputCol("label")
predValue - predicated value.

Little bit source code:
When you call link in a browser, executed code:


  val reqHandler: HttpRequest => Future[HttpResponse] = {
...
   case req@HttpRequest(HttpMethods.GET, Uri.Path("/accur"), _, ent, _)
    => logger.info("request (1) "+req.uri+" - "+req.method)
      try {
        val resAccur = mlpcModel1.getPredictionByModel
        logger.info("resAccur="+resAccur)
      } catch {
        case ex: Throwable => logger.info(ex.getLocalizedMessage)
      }
      Future.successful {
        HttpResponse(StatusCodes.OK, entity = "Test set accuracy = ["+mlpcModel1.getPredictionByModel+"]")
      }


Here called functions  mlpcModel1.getPredictionByModel - heh this is a bug, double colling mlpcModel1.getPredictionByModel. But it's no matter, of course, we need just one call.


  def getPredictionByModel:String = {
    logger.info("Call funciton getPredictionByModel")
    val inputData = ds.sample(false, 0.05).limit(1) //INPUT DS ONE ROW
    logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    logger.info("INSIDE getPredictionByModel")
    val tdForm  = inputData.first()
    logger.info("Input data is count="+inputData.count()+" tdForm="+tdForm)
    logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
    val mlpc_predictions_res = predModel.transform(inputData)
    val predictionAndLabels = mlpc_predictions_res.select("prediction", "label")
    val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
    val accur = evaluator.evaluate(predictionAndLabels)
    val predValue = predictionAndLabels.first().getDouble(0)
    val labelVal = predictionAndLabels.first().getDouble(1)
    "tdForm = "+tdForm+" predValue = "+predValue+" labelVal = "+labelVal+" accur = "+accur
  }


Also pay attention, when you recall link in a browser you can see execution in real time in console with "watch tail -n 20 stdout"

Комментарии

Отправить комментарий

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7