Scala - calculate with Futures, sequentially and parallel

When we use Futures in scala sometimes we need to combine sequentially and parallel execution of Futures.

My research in this way is beginning from a task - generate a lot of Users (100K) and insert it all in different databases (Postgres and Cassandra) to make common score of write speed.


case class User(id: Long,
                name: String,
                email: String,
                edomain: String)


For Postgres, I should have used Slick library and for Cassandra Phantom library (Schema safe, type-safe, reactive Scala driver for Cassandra/Datastax Enterprise).

Steps ware easy, generate Seq of Users and in loop insert there into a database. With Postgres driver and with this count of items it was ok, every row was inserted without error.

But Phantom is reactive and insert command return Future. (somewhere about) A snippet of code:


abstract class CassUsers extends Table[CassUsers, User] with RootConnector {

  override def tableName: String = "users"
  object id extends LongColumn with PartitionKey{
    override lazy val name = "\"id\""
  }
  object name extends StringColumn{
    override lazy val name = "\"name\""
  }
  object email extends StringColumn {
    override lazy val name = "\"email\""
  }
  object edomain extends StringColumn{
    override lazy val name = "\"edomain\""
  }

...
...

  def insUser(u :User): Future[Long] =
    insert.value(_.id, u.id)
      .value(_.name, u.name)
      .value(_.email, u.email)
      .value(_.edomain, u.edomain)
      .consistencyLevel_=(ConsistencyLevel.ALL)
      .ifNotExists()
      .future().map(_ => u.id)
}



insUser return Future with inserted UserId. I know about store but can't use it here, beacuse

This is a known bug with an open issue:  https://github.com/outworkers/phantom/issues/774 
I suggest either using the workaround as described in the link above, or my workaround which was creating my own .store() using .insert().

But, when I had run inserting in the loop, it was a lot of errors:
com.datastax.driver.core.exceptions.BusyPoolException

exactly like in question on stackoverflow
https://stackoverflow.com/questions/47900973/com-datastax-driver-core-exceptions-busypoolexception

And yes, in simple study form it can be solved above issue using PoolingOptions.
But it's a bad way if you want to insert real a lot of rows.

The suggested solution quite correct:


This means that you are submitting too many requests, and not waiting for the futures to complete 
before submitting more.
The default maximum number of requests per connection is 1024. 
If this number is exceeded for all connections, the connection pool will enqueue some requests, 
up to 256. If the queue gets full, a BusyPoolException is thrown. 
Of course you can increase the max number of requests per connection, 
and the number of max connections per host. But the real solution is of course to throttle 
your thread. 
You could e.g. submit your requests by batches of 1000 and then wait 
on the futures to complete before submitting more
...


Description of realization of this logic:
1) We have a collection of Users. And a number N, batch size.
2) Divide source collection with grouping to the collection of batches (pay attention here, that grouping return iterator. And if you take .size it iterate to the end!!!)
3) We want to run inserting Users by batches sequentially with waiting of completion between steps.
4) Inside each batches we want to insert Users parallel.
5) finally we need count of real inserted rows.

I suggest to read the next article before going next:
Scala Concurrency Advice: For Future Fun Fold On A FlatMap

Before write code to insert users, I produce a simple example of behavior.


import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}

val rndInit = scala.util.Random
val sourceItems :Seq[Int] = Seq(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17)
val groupsOfItem :Iterator[Seq[Int]] = sourceItems.grouped(3)//.toList

//emulate insert onw row.
def calcNum(n :Int) :Future[Int] = Future{
  val slp  = rndInit.nextInt(500)
  println(s"   calcNum n=$n  slp=$slp")
  Thread.sleep(slp)
  1
}

//process one batch
def calcGroup(groupSeq :Seq[Int]) :Future[Int] = {
  println(s"calcOneNum First value si ${groupSeq.head}")
  Thread.sleep(1000)
  Future.sequence(groupSeq.map(n => calcNum(n))).map(v => v.sum)
}

groupsOfItem.foldLeft(Future.successful(0)){
  (acc :Future[Int], num :Seq[Int]) => {
    acc.flatMap{
      accInt => calcGroup(num).map(_ + accInt)
    }
  }
}.onComplete{
  case Success(s) => println(s"Success inserted $s rows.")
  case Failure(f) => println(s"pg Failure cause=${f.getCause} msg=${f.getMessage} ")
}



Output from REPL



scala> 
calcOneNum First value si 1
   calcNum n=2  slp=396
   calcNum n=3  slp=364
   calcNum n=1  slp=280
calcOneNum First value si 4
   calcNum n=4  slp=381
   calcNum n=6  slp=242
   calcNum n=5  slp=7
calcOneNum First value si 7
   calcNum n=7  slp=122
   calcNum n=8  slp=298
   calcNum n=9  slp=248
calcOneNum First value si 10
   calcNum n=10  slp=454
   calcNum n=12  slp=185
   calcNum n=11  slp=121
calcOneNum First value si 13
   calcNum n=13  slp=116
   calcNum n=14  slp=366
   calcNum n=15  slp=151
calcOneNum First value si 16
   calcNum n=17  slp=341
   calcNum n=16  slp=443
Success inserted 17 rows.



I will not describe code, because it's done in "medium" article and simple in general.
Important moment, batches processed sequentially (1,4,7,10,13,16 head element), but inside "rows inserted" asynchronously. If you run this snippet in scala repl sleeps lets you see all in real-time.

Final code of real inserting Users in Cassandra (pay attention, we don't strongly care about Exceptions and not propagate it)



  def calcGroup(groupSeq :Seq[User]) :Future[Long] =
    Future.sequence(groupSeq.map(u => db.UserModel.insUser(u))).map(v => v.size)

  def run(cntRow :Int) = {
    val groupRowCnt :Int = 1000
    val rand = new RandomCassData
    val seqUsers :Seq[User] = (1 to cntRow).map(thisUserId => rand.getRandomUser(thisUserId))
    log.info(s"Total users size ${seqUsers.size}")
    val t1 = System.currentTimeMillis

    val r :Future[Long] = seqUsers.grouped(groupRowCnt).foldLeft(Future.successful(0L)){
      (acc :Future[Long], num :Seq[User]) => {
        acc.flatMap{
          accInt => calcGroup(num).map(_ + accInt)
        }
      }
    }

    Await.result(r,5.minutes)
    val t2 = System.currentTimeMillis
  }



you can find examples of code in my github repo slick_study

Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7