Scala - calculate with Futures, sequentially and parallel
When we use Futures in scala sometimes we need to combine sequentially and parallel execution of Futures.
My research in this way is beginning from a task - generate a lot of Users (100K) and insert it all in different databases (Postgres and Cassandra) to make common score of write speed.
For Postgres, I should have used Slick library and for Cassandra Phantom library (Schema safe, type-safe, reactive Scala driver for Cassandra/Datastax Enterprise).
Steps ware easy, generate Seq of Users and in loop insert there into a database. With Postgres driver and with this count of items it was ok, every row was inserted without error.
But Phantom is reactive and insert command return Future. (somewhere about) A snippet of code:
insUser return Future with inserted UserId. I know about store but can't use it here, beacuse
This is a known bug with an open issue: https://github.com/outworkers/phantom/issues/774
I suggest either using the workaround as described in the link above, or my workaround which was creating my own .store() using .insert().
But, when I had run inserting in the loop, it was a lot of errors:
com.datastax.driver.core.exceptions.BusyPoolException
exactly like in question on stackoverflow
https://stackoverflow.com/questions/47900973/com-datastax-driver-core-exceptions-busypoolexception
And yes, in simple study form it can be solved above issue using PoolingOptions.
But it's a bad way if you want to insert real a lot of rows.
The suggested solution quite correct:
Description of realization of this logic:
1) We have a collection of Users. And a number N, batch size.
2) Divide source collection with grouping to the collection of batches (pay attention here, that grouping return iterator. And if you take .size it iterate to the end!!!)
3) We want to run inserting Users by batches sequentially with waiting of completion between steps.
4) Inside each batches we want to insert Users parallel.
5) finally we need count of real inserted rows.
I suggest to read the next article before going next:
Scala Concurrency Advice: For Future Fun Fold On A FlatMap
Before write code to insert users, I produce a simple example of behavior.
Output from REPL
I will not describe code, because it's done in "medium" article and simple in general.
Important moment, batches processed sequentially (1,4,7,10,13,16 head element), but inside "rows inserted" asynchronously. If you run this snippet in scala repl sleeps lets you see all in real-time.
Final code of real inserting Users in Cassandra (pay attention, we don't strongly care about Exceptions and not propagate it)
you can find examples of code in my github repo slick_study
My research in this way is beginning from a task - generate a lot of Users (100K) and insert it all in different databases (Postgres and Cassandra) to make common score of write speed.
case class User(id: Long,
name: String,
email: String,
edomain: String)
For Postgres, I should have used Slick library and for Cassandra Phantom library (Schema safe, type-safe, reactive Scala driver for Cassandra/Datastax Enterprise).
Steps ware easy, generate Seq of Users and in loop insert there into a database. With Postgres driver and with this count of items it was ok, every row was inserted without error.
But Phantom is reactive and insert command return Future. (somewhere about) A snippet of code:
abstract class CassUsers extends Table[CassUsers, User] with RootConnector {
override def tableName: String = "users"
object id extends LongColumn with PartitionKey{
override lazy val name = "\"id\""
}
object name extends StringColumn{
override lazy val name = "\"name\""
}
object email extends StringColumn {
override lazy val name = "\"email\""
}
object edomain extends StringColumn{
override lazy val name = "\"edomain\""
}
...
...
def insUser(u :User): Future[Long] =
insert.value(_.id, u.id)
.value(_.name, u.name)
.value(_.email, u.email)
.value(_.edomain, u.edomain)
.consistencyLevel_=(ConsistencyLevel.ALL)
.ifNotExists()
.future().map(_ => u.id)
}
insUser return Future with inserted UserId. I know about store but can't use it here, beacuse
This is a known bug with an open issue: https://github.com/outworkers/phantom/issues/774
I suggest either using the workaround as described in the link above, or my workaround which was creating my own .store() using .insert().
But, when I had run inserting in the loop, it was a lot of errors:
com.datastax.driver.core.exceptions.BusyPoolException
exactly like in question on stackoverflow
https://stackoverflow.com/questions/47900973/com-datastax-driver-core-exceptions-busypoolexception
And yes, in simple study form it can be solved above issue using PoolingOptions.
But it's a bad way if you want to insert real a lot of rows.
The suggested solution quite correct:
This means that you are submitting too many requests, and not waiting for the futures to complete
before submitting more.
The default maximum number of requests per connection is 1024.
If this number is exceeded for all connections, the connection pool will enqueue some requests,
up to 256. If the queue gets full, a BusyPoolException is thrown.
Of course you can increase the max number of requests per connection,
and the number of max connections per host. But the real solution is of course to throttle
your thread.
You could e.g. submit your requests by batches of 1000 and then wait
on the futures to complete before submitting more
...
Description of realization of this logic:
1) We have a collection of Users. And a number N, batch size.
2) Divide source collection with grouping to the collection of batches (pay attention here, that grouping return iterator. And if you take .size it iterate to the end!!!)
3) We want to run inserting Users by batches sequentially with waiting of completion between steps.
4) Inside each batches we want to insert Users parallel.
5) finally we need count of real inserted rows.
I suggest to read the next article before going next:
Scala Concurrency Advice: For Future Fun Fold On A FlatMap
Before write code to insert users, I produce a simple example of behavior.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
val rndInit = scala.util.Random
val sourceItems :Seq[Int] = Seq(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17)
val groupsOfItem :Iterator[Seq[Int]] = sourceItems.grouped(3)//.toList
//emulate insert onw row.
def calcNum(n :Int) :Future[Int] = Future{
val slp = rndInit.nextInt(500)
println(s" calcNum n=$n slp=$slp")
Thread.sleep(slp)
1
}
//process one batch
def calcGroup(groupSeq :Seq[Int]) :Future[Int] = {
println(s"calcOneNum First value si ${groupSeq.head}")
Thread.sleep(1000)
Future.sequence(groupSeq.map(n => calcNum(n))).map(v => v.sum)
}
groupsOfItem.foldLeft(Future.successful(0)){
(acc :Future[Int], num :Seq[Int]) => {
acc.flatMap{
accInt => calcGroup(num).map(_ + accInt)
}
}
}.onComplete{
case Success(s) => println(s"Success inserted $s rows.")
case Failure(f) => println(s"pg Failure cause=${f.getCause} msg=${f.getMessage} ")
}
Output from REPL
scala>
calcOneNum First value si 1
calcNum n=2 slp=396
calcNum n=3 slp=364
calcNum n=1 slp=280
calcOneNum First value si 4
calcNum n=4 slp=381
calcNum n=6 slp=242
calcNum n=5 slp=7
calcOneNum First value si 7
calcNum n=7 slp=122
calcNum n=8 slp=298
calcNum n=9 slp=248
calcOneNum First value si 10
calcNum n=10 slp=454
calcNum n=12 slp=185
calcNum n=11 slp=121
calcOneNum First value si 13
calcNum n=13 slp=116
calcNum n=14 slp=366
calcNum n=15 slp=151
calcOneNum First value si 16
calcNum n=17 slp=341
calcNum n=16 slp=443
Success inserted 17 rows.
I will not describe code, because it's done in "medium" article and simple in general.
Important moment, batches processed sequentially (1,4,7,10,13,16 head element), but inside "rows inserted" asynchronously. If you run this snippet in scala repl sleeps lets you see all in real-time.
Final code of real inserting Users in Cassandra (pay attention, we don't strongly care about Exceptions and not propagate it)
def calcGroup(groupSeq :Seq[User]) :Future[Long] =
Future.sequence(groupSeq.map(u => db.UserModel.insUser(u))).map(v => v.size)
def run(cntRow :Int) = {
val groupRowCnt :Int = 1000
val rand = new RandomCassData
val seqUsers :Seq[User] = (1 to cntRow).map(thisUserId => rand.getRandomUser(thisUserId))
log.info(s"Total users size ${seqUsers.size}")
val t1 = System.currentTimeMillis
val r :Future[Long] = seqUsers.grouped(groupRowCnt).foldLeft(Future.successful(0L)){
(acc :Future[Long], num :Seq[User]) => {
acc.flatMap{
accInt => calcGroup(num).map(_ + accInt)
}
}
}
Await.result(r,5.minutes)
val t2 = System.currentTimeMillis
}
you can find examples of code in my github repo slick_study
Комментарии
Отправить комментарий