Comparison sequential execution and parallel with Futures in Scala

In this short article, I will show a comparison of sequential and parallel executions with scala Futures.

First of all, I have a sequence of tickers (Ticker) with a structure like this:


case class Ticker(tickerId :Int, tickerCode :String, tickerFirst :String, tickerSeconds :String) 


I read it from Cassandra DB and it has size 28.


  val sess = Global.sessInstance
  val tickersFrx: Seq[Ticker] = sess.tickersDict
  println("tickersFrx.size = "+tickersFrx.size)


Next, I need for each Ticker in this sequence execute 2 additional DB query. First to determine the maximum data date. And with knowledge of this date, I need to get maximum UNIX timestamp of last data tick.
Producing Seq[TickerWithDdateTs] p.s. I use limit 1 because we have clustering keys ordered descending.



select ddate from mts_src.ticks_count_days where ticker_id = :tickerId limit 1

select db_tsunx from mts_src.ticks where ticker_id = :tickerId and ddate= :pDdate limit 1

case class TickerWithDdateTs(ticker :Ticker, dbTsunx :Long) extends CommonFuncs{
  private val currTimestamp :Long = System.currentTimeMillis
  val lastTickDateTime = getDateAsString(convertLongToDate(dbTsunx))
  val diffSeconds = currTimestamp/1000L - dbTsunx/1000L
}



Source data tables (column family) structure:


CREATE TABLE mts_src.ticks (
 ticker_id int,
 ddate date,
 ts bigint,
 db_tsunx bigint,
 ask double,
 bid double,
 PRIMARY KEY (( ticker_id, ddate ), ts, db_tsunx)
) WITH CLUSTERING ORDER BY ( ts DESC, db_tsunx DESC );

CREATE TABLE mts_src.ticks_count_days (
 ticker_id int,
 ddate date,
 ticks_count counter,
 PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY ( ddate DESC );


Each key pair (partition key): (ticker_id, ddate) contains 50k-70k rows.

Next Scala code that contains 2 parts, for seq. execution and parallel. With getting timings.


import app.Global
import models._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

  val sess = Global.sessInstance
  val tickersFrx: Seq[Ticker] = sess.tickersDict
  println("tickersFrx.size = "+tickersFrx.size)

  //Test 1 sequence execution.

  val t1 = System.currentTimeMillis
  val seqTickersDdateTs: Seq[TickerWithDdateTs] = tickersFrx
    .map(t => TickerWithDdateTs(t, sess.getTickerLastTs(t)))
    .sortBy(elm => elm.diffSeconds)(Ordering[Long])
  seqTickersDdateTs.foreach(t => println(t.toString))
  println(s"Duration ${(System.currentTimeMillis-t1).toDouble/1000.toDouble} s. 
            res size=${seqTickersDdateTs.size}")


  //Test 2 with Futures.

  val t1f = System.currentTimeMillis
  val futs: Seq[Future[TickerWithDdateTs]] = tickersFrx
    .map(t => Future(TickerWithDdateTs(t, sess.getTickerLastTs(t))))
  val resSeq :Seq[TickerWithDdateTs] = Await.result(Future.sequence(futs), Duration.Inf)
    .sortBy(elm => elm.diffSeconds)(Ordering[Long])
  resSeq.foreach(t => println(t.toString))
  println(s"Duration with Futures ${(System.currentTimeMillis - t1f).toDouble / 1000.toDouble} s. 
            res size=${resSeq.size}")



The output of code:


import ...
[info] c.d.o.d.i.c.DefaultMavenCoordinates - DataStax Java driver for Apache Cassandra(R)
...

tickersFrx.size = 28
TickerWithDdateTs(GBPAUD [9] GBP / AUD,1564646817028)
TickerWithDdateTs(GBPUSD [3] GBP / USD,1564646816538)
TickerWithDdateTs(GBPCAD [10] GBP / CAD,1564646816541)
...
Duration 1.646 s. res size=28

TickerWithDdateTs(EURGBP [12] EUR / GBP,1564646818197)
TickerWithDdateTs(EURNZD [20] EUR / NZD,1564646818006)
TickerWithDdateTs(GBPAUD [9] GBP / AUD,1564646817028)
...
Duration with Futures 0.307 s. res size=28



The result is different in data because I use an online database. Ticks incoming with high frequency.
The most important speed 1.6 vs 0.3 seconds.

I repeat it 3 times and compare timings.



Duration sequential 2.06 s.
Duration with Futures 0.306 s.

Duration sequential 2.277 s.
Duration with Futures 0.571 s.

Duration sequential 6.433 s.
Duration with Futures 0.403 s.





Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7