Running parallel calculation in Scala with Futures and individual sleep intervals


In this short topic, I will show have run the calculation in parallel with the same code (method) but with individual sleep intervals.

Basic - https://github.com/AlexGruPerm/barcl

Formulation of the problem:
1) We have a flow of tick data from Forex, Stock market or etc (flow presented as a Column Family in Cassandra database).

2) And we have a couple of dictionaries:
   dictionary of tickers (symbols) (ID, NAME), like 1-EURUSD, 2-ORCL.
   dictionary of BWS (Bar width seconds) (TICKER_ID, BWS)-
For example:
1 - 30
1 - 600
1 - 3600
It means that for ticker with ID=1 we need to calculate bars with widths 30 sec, 1 min, 1 hour( 60 mins).

We need to calculate all bars online, with minimal lag between last tick and TS_END (last bar of ticker and ts_end as unixtimestamp ) of the last bar.
Also, we need to eliminate unnecessary read of the database to minimize resource consumption.

First, simple and unoptimized version was the next: in loop read all for all pair (ticker+bws) approximately 30 tickers and  6 bws. Bad idea and a lot of reads.

This article about refactoring of code with the idea:
For this TICKER and BWS:
  Read the last bar for this BWS, last source of tick data for this ticker.
  Calculate lag between last bar ts_end and last tick timestamp, if the lag is more than BWS in seconds than we can calculate one more bar and save it into DB and go into next iteration.

To eliminate high resource consumption we need to sleep some seconds and not to read data.
For example:
We calculate ticker=1 and BWS=600 seconds. And now lag 500 seconds (it means that approximately 100 seconds ago we saved the last bar), we can sleep 500-delta seconds and save resource.

Preparation of code. Example.



import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Random, Success}
import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {

def calcIteration(tickerId :Int) :Future[Int] ={
  tickerId match {
    case 1 => {
      println("  ")
      println("Calculation of [1] ~~~~~")
      Thread.sleep(1000)
      println("End Calculation of [1]")
      Future(Random.nextInt(10000))
    }
    case 2 => {
      println("  ")
      println("  Calculation of [2] ------------------------------")
      Thread.sleep(2000)
      println("  End Calculation of [2]")
      Future(Random.nextInt(20000))
    }
    case 3 => {
      println("  ")
      println("    Calculation of [3] ========================================")
      Thread.sleep(3000)
      println("    End Calculation of [3]")
      Future(Random.nextInt(30000))
    }
  }
}

val fTicksMeta :Seq[Int] = Seq(1,2,3)

def taskCalcBars(tm :Int): Future[Unit] = Future {
  calcIteration(tm).onComplete {
    case Success(waitDur) => {
      tm match {
        case 1 => {
          println("Begin wait for [1] interval=" + waitDur)
          Thread.sleep(waitDur)
          println("End wait for [1]  ~~~~~")
          println("  ")
        }
        case 2 => {
          println("  Begin wait for [1] interval=" + waitDur)
          Thread.sleep(waitDur)
          println("  End wait for [2] ------------------------------")
          println("  ")
        }
        case 3 => {
          println("    Begin wait for [1] interval=" + waitDur)
          Thread.sleep(waitDur)
          println("    End wait for [3]  ========================================")
          println("  ")
        }
      }
    }
    case Failure(ex) => {
      println("Exception from calcIteration " + ex.getMessage + " - " + ex.getCause)
    }
  }
}

def loopCalcBars(tm :Int): Future[Unit] = taskCalcBars(tm).flatMap(_ => loopCalcBars(tm))

def infiniteLoop(): Seq[Future[Unit]] =
  fTicksMeta.map(tm => Future.sequence(List(loopCalcBars(tm))).map(_ => ()))

  Await.ready(Future.sequence(infiniteLoop), Duration.Inf)
}



Explanation:

First of all, we read the dictionary of tickers from DB
val fTicksMeta :Seq[Int] = Seq(1,2,3)

Next, we call function infiniteLoop that returns a sequence of Future[Unit] and run recursive function loopCalcBars - individual for each ticker - in this example 3.
loopCalcBars inside call taskCalcBars where the main calculation is executed (via calling calcIteration) and returns Future[Unit]. When taskCalcBars returns Future[Unit] the next iteration coming - .flatMap(_ => loopCalcBars(tm)).

You can save this code in a separate file, compile and run it.


bcapp>scalac C:\barcl\src\main\scala\bcapp\SW.sc
bcapp>scala Main


And the output looks like this:


Calculation of [1] ~~~~~

  Calculation of [2] ------------------------------

    Calculation of [3] ========================================
End Calculation of [1]

Calculation of [1] ~~~~~
Begin wait for [1] interval=600
End wait for [1]  ~~~~~

  End Calculation of [2]
  Begin wait for [1] interval=8079

  Calculation of [2] ------------------------------
End Calculation of [1]
Begin wait for [1] interval=9547
    End Calculation of [3]
    Begin wait for [1] interval=23120
  End Calculation of [2]
  Begin wait for [1] interval=15689
  End wait for [2] ------------------------------


Calculation of [1] ~~~~~
End Calculation of [1]
Begin wait for [1] interval=1162
End wait for [1]  ~~~~~


  Calculation of [2] ------------------------------
End wait for [1]  ~~~~~


Calculation of [1] ~~~~~
End Calculation of [1]
Begin wait for [1] interval=6478
  End Calculation of [2]
  Begin wait for [1] interval=7881
  End wait for [2] ------------------------------


Calculation of [1] ~~~~~
End wait for [1]  ~~~~~


    Calculation of [3] ========================================
End Calculation of [1]
Begin wait for [1] interval=7560
  End wait for [2] ------------------------------


  Calculation of [2] ------------------------------
    End Calculation of [3]
    Begin wait for [1] interval=13240
  End Calculation of [2]
  Begin wait for [1] interval=6382
    End wait for [3]  ========================================


  Calculation of [2] ------------------------------
  End Calculation of [2]
  Begin wait for [1] interval=6357
End wait for [1]  ~~~~~


Calculation of [1] ~~~~~
End Calculation of [1]
Begin wait for [1] interval=5709


Stop it with Cntrl+C because this is an infinte loop.

Now my work is implementing this approach in barcl - BarCalculatorTickersBws for replacing BarCalculator.
https://github.com/AlexGruPerm/barcl

Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7