Load ticks data from single Cassandra instance into Cluster (scala code)

I have one single instance Cassandra with source data, only 3 CF:
There are 3 tables, mts_src.ticks - contains information about Forex tick data for 28 tickers,
db_tsunx - Unixtimestamp.

mts_src.ticks_count_days - Total ticks count by day and ticker_id.
mts_src.ticks_count_total - Total ticks count by ticker_id.


DROP TABLE mts_src.ticks;
DROP TABLE mts_src.ticks_count_days;
DROP TABLE mts_src.ticks_count_total;

CREATE TABLE mts_src.ticks(
   ticker_id int,
   ddate date,
   ts bigint,
   db_tsunx bigint,
   ask double,
   bid double,
   PRIMARY KEY (( ticker_id, ddate ), ts, db_tsunx)
) WITH CLUSTERING ORDER BY ( ts DESC, db_tsunx DESC );

CREATE TABLE mts_src.ticks_count_days(
   ticker_id int,
   ddate date,
   ticks_count counter,
   PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY ( ddate DESC );

CREATE TABLE mts_src.ticks_count_total(
   ticker_id int,
   ticks_count counter,
   PRIMARY KEY (ticker_id)
);

Data from mts_src.ticks is using for bars and frame calculation. And now my task is coping all data from one Cassandra server into another cluster (for using Spark).

Scala minimalistic code for copy data.

package bcapp

import com.datastax.driver.core.{BatchStatement, Cluster, LocalDate}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._

case class TicksCountDays(ticker_id :Int,ddate :LocalDate,ticks_count :Long)

case class TicksCountTotal(ticker_id :Int,ticks_count :Long)

case class  Tick(
                  tickerId  :Int,
                  dDate     :LocalDate,
                  ts        :Long,
                  db_tsunx  :Long,
                  ask       :Double,
                  bid       :Double
                )

object TicksLoader extends App {
  val logger = LoggerFactory.getLogger(getClass.getName)
  val nodeFrom: String = "xxx.xxx.xxx.xxx" // source ip
  val nodeTo: String = "yyy.yyy.yyy.yyy"  // target ip

  val sessFrom = Cluster.builder().addContactPoint(nodeFrom).build().connect()
  val ClusterConfigFrom = sessFrom.getCluster.getConfiguration
  ClusterConfigFrom.getSocketOptions.setReadTimeoutMillis(60000)

  val sessTo = Cluster.builder().addContactPoint(nodeTo).build().connect()
  val ClusterConfigTo = sessTo.getCluster.getConfiguration
  ClusterConfigTo.getSocketOptions.setConnectTimeoutMillis(60000)

  val poolingOptionsFrom = sessFrom.getCluster.getConfiguration.getPoolingOptions
  poolingOptionsFrom.setHeartbeatIntervalSeconds(90);

  val poolingOptionsTo = sessTo.getCluster.getConfiguration.getPoolingOptions
  poolingOptionsTo.setHeartbeatIntervalSeconds(90);

  val bndTicksCountDays = sessFrom.prepare(""" select ticker_id,ddate,ticks_count from mts_src.ticks_count_days """).bind()

  val bndTicksTotal = sessFrom.prepare(""" select * from mts_src.ticks_count_total """).bind()

  val bndTicksByTickerDdate = sessFrom.prepare(
    """ select * from mts_src.ticks
      where ticker_id=:tickerId and ddate=:dDate """).bind()

  val bndSaveTickWb = sessTo.prepare(""" insert into mts_src.ticks(ticker_id,ddate,ts,db_tsunx,ask,bid)
                                      values(:p_ticker_id,:p_ddate,:p_ts,:p_db_tsunx,:p_ask,:p_bid) """)

  val bndSaveTicksByDay = sessTo.prepare(
    """ update mts_src.ticks_count_days
      set ticks_count=ticks_count+:p_ticks_count where ticker_id=:p_ticker_id and ddate=:p_ddate """).bind()

  val bndSaveTicksCntTotal = sessTo.prepare(
    """  update mts_src.ticks_count_total
      set ticks_count=ticks_count+:p_ticks_count where ticker_id=:p_ticker_id """).bind()

  val sqTicksCountTotal = sessFrom.execute(bndTicksTotal)
    .all().iterator.asScala.toSeq
    .map(r => TicksCountTotal(r.getInt("ticker_id"),r.getLong("ticks_count")))
    .sortBy(t => t.ticker_id)

  val sqTicksCountDays = sessFrom.execute(bndTicksCountDays)
    .all().iterator.asScala.toSeq
    .map(r => TicksCountDays(r.getInt("ticker_id"),r.getDate("ddate"),r.getLong("ticks_count")))
    .sortBy(t => (t.ticker_id,t.ddate.getMillisSinceEpoch)) //sort by ticker_id AND ddate

  for(elm <- sqTicksCountDays){
    val tickCntTotal = sqTicksCountTotal.filter(tct => tct.ticker_id == elm.ticker_id).head.ticks_count
    println(elm +"  TOTAL_TICKS_COUNT = "+tickCntTotal)

    val sqTicks = sessFrom.execute(bndTicksByTickerDdate
      .setInt("tickerId", elm.ticker_id)
      .setDate("dDate",elm.ddate)
    ).all().iterator.asScala.toSeq.map(r => Tick(
      r.getInt("ticker_id"),
      r.getDate("ddate"),
      r.getLong("ts"),
      r.getLong("db_tsunx"),
      r.getDouble("ask"),
      r.getDouble("bid"))
    ).sortBy(t => t.db_tsunx)
    println(" READ sqTicks.size="+sqTicks.size+" head.db_tsunx="+sqTicks.head.db_tsunx)

    val partSqTicks = sqTicks.grouped(65535)

    for(thisPartOfSeq <- partSqTicks) {
      var batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
      thisPartOfSeq.foreach {
        t =>
          batch.add(bndSaveTickWb.bind
            .setInt("p_ticker_id", t.tickerId)
            .setDate("p_ddate", t.dDate)
            .setLong("p_ts", t.ts)
            .setLong("p_db_tsunx", t.db_tsunx)
            .setDouble("p_ask", t.ask)
            .setDouble("p_bid", t.bid))
      }
      sessTo.execute(batch)
    }

    sessTo.execute(bndSaveTicksByDay
      .setInt("p_ticker_id", elm.ticker_id)
      .setDate("p_ddate", elm.ddate)
      .setLong("p_ticks_count",elm.ticks_count))

    println("   TICKS SAVED INTO Cluster.")
  }


  for(elm <- sqTicksCountDays.map(e => e.ticker_id).distinct) {
    val tickCntTotal = sqTicksCountTotal.filter(tct => tct.ticker_id == elm).head.ticks_count
    sessTo.execute(bndSaveTicksCntTotal
      .setInt("p_ticker_id", elm)
      .setLong("p_ticks_count",tickCntTotal))
  }

  sessFrom.close()
  sessTo.close()
}

And build.sbt for this code.


name := "barcl"
version := "0.1"
scalaVersion := "2.11.8"
version := "1.0"

libraryDependencies ++= Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "org.scala-lang" % "scala-library" % "2.11.8",
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

//for oracle jdbc driver.
//unmanagedJars in Compile := (file("/lib") ** "*.jar").classpath

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
  case "META-INF/mailcap" => MergeStrategy.last
  case "META-INF/mimetypes.default" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.last
  case "log4j.properties" => MergeStrategy.last
  case x => MergeStrategy.first
}

assemblyJarName in assembly :="ticksloader.jar"

mainClass in (Compile, packageBin) := Some("bcapp.TicksLoader")
mainClass in (Compile, run) := Some("bcapp.TicksLoader")


assembly.sbt from project/target

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")


We can make jar with

C:\barcl>sbt clean assembly


Next, I copy ticksloader.jar (16Mb) from C:\barcl\target\scala-2.11 into server that has access to all Cassandra cluster nodes and execute with: scala ticksloader.jar

Output sample:
....
17:55:55.790 [cluster2-nio-worker-6] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.194:9042-1, inFlight=0, closed=false] heartbeat query succeeded
 READ sqTicks.size=72624 head.db_tsunx=1549576801098
   TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-11,76209)  TOTAL_TICKS_COUNT = 2208257
17:56:00.395 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:00.396 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] heartbeat query succeeded
17:56:01.942 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:01.947 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] heartbeat query succeeded
 READ sqTicks.size=76206 head.db_tsunx=1549836022426
   TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-12,90379)  TOTAL_TICKS_COUNT = 2208257
 READ sqTicks.size=90390 head.db_tsunx=1549922401917
   TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-13,93447)  TOTAL_TICKS_COUNT = 2208257
 READ sqTicks.size=93404 head.db_tsunx=1550008825027
   TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-14,117838)  TOTAL_TICKS_COUNT = 2208257
 READ sqTicks.size=117819 head.db_tsunx=1550095202620
   TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-15,113745)  TOTAL_TICKS_COUNT = 2208257
17:56:30.423 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:30.424 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] heartbeat query succeeded
17:56:31.972 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:31.984 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] heartbeat query succeeded
 READ sqTicks.size=113715 head.db_tsunx=1550181602540
   TICKS SAVED INTO Cluster.
...


Also you can copy rows one by one with this (without splitting on parts by sqTicks.grouped(65535))


    for (t <- sqTicks) {
      sessTo.execute(bndSaveTick
        .setInt("p_ticker_id", t.tickerId)
        .setDate("p_ddate", t.dDate)
        .setLong("p_ts", t.ts)
        .setLong("p_db_tsunx", t.db_tsunx)
        .setDouble("p_ask",t.ask)
        .setDouble("p_bid",t.bid))
    }


Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7