Load ticks data from single Cassandra instance into Cluster (scala code)
I have one single instance Cassandra with source data, only 3 CF:
There are 3 tables, mts_src.ticks - contains information about Forex tick data for 28 tickers,
db_tsunx - Unixtimestamp.
mts_src.ticks_count_days - Total ticks count by day and ticker_id.
mts_src.ticks_count_total - Total ticks count by ticker_id.
Data from mts_src.ticks is using for bars and frame calculation. And now my task is coping all data from one Cassandra server into another cluster (for using Spark).
Scala minimalistic code for copy data.
And build.sbt for this code.
assembly.sbt from project/target
We can make jar with
Next, I copy ticksloader.jar (16Mb) from C:\barcl\target\scala-2.11 into server that has access to all Cassandra cluster nodes and execute with: scala ticksloader.jar
Output sample:
Also you can copy rows one by one with this (without splitting on parts by sqTicks.grouped(65535))
There are 3 tables, mts_src.ticks - contains information about Forex tick data for 28 tickers,
db_tsunx - Unixtimestamp.
mts_src.ticks_count_days - Total ticks count by day and ticker_id.
mts_src.ticks_count_total - Total ticks count by ticker_id.
DROP TABLE mts_src.ticks;
DROP TABLE mts_src.ticks_count_days;
DROP TABLE mts_src.ticks_count_total;
CREATE TABLE mts_src.ticks(
ticker_id int,
ddate date,
ts bigint,
db_tsunx bigint,
ask double,
bid double,
PRIMARY KEY (( ticker_id, ddate ), ts, db_tsunx)
) WITH CLUSTERING ORDER BY ( ts DESC, db_tsunx DESC );
CREATE TABLE mts_src.ticks_count_days(
ticker_id int,
ddate date,
ticks_count counter,
PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY ( ddate DESC );
CREATE TABLE mts_src.ticks_count_total(
ticker_id int,
ticks_count counter,
PRIMARY KEY (ticker_id)
);
Data from mts_src.ticks is using for bars and frame calculation. And now my task is coping all data from one Cassandra server into another cluster (for using Spark).
Scala minimalistic code for copy data.
package bcapp
import com.datastax.driver.core.{BatchStatement, Cluster, LocalDate}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
case class TicksCountDays(ticker_id :Int,ddate :LocalDate,ticks_count :Long)
case class TicksCountTotal(ticker_id :Int,ticks_count :Long)
case class Tick(
tickerId :Int,
dDate :LocalDate,
ts :Long,
db_tsunx :Long,
ask :Double,
bid :Double
)
object TicksLoader extends App {
val logger = LoggerFactory.getLogger(getClass.getName)
val nodeFrom: String = "xxx.xxx.xxx.xxx" // source ip
val nodeTo: String = "yyy.yyy.yyy.yyy" // target ip
val sessFrom = Cluster.builder().addContactPoint(nodeFrom).build().connect()
val ClusterConfigFrom = sessFrom.getCluster.getConfiguration
ClusterConfigFrom.getSocketOptions.setReadTimeoutMillis(60000)
val sessTo = Cluster.builder().addContactPoint(nodeTo).build().connect()
val ClusterConfigTo = sessTo.getCluster.getConfiguration
ClusterConfigTo.getSocketOptions.setConnectTimeoutMillis(60000)
val poolingOptionsFrom = sessFrom.getCluster.getConfiguration.getPoolingOptions
poolingOptionsFrom.setHeartbeatIntervalSeconds(90);
val poolingOptionsTo = sessTo.getCluster.getConfiguration.getPoolingOptions
poolingOptionsTo.setHeartbeatIntervalSeconds(90);
val bndTicksCountDays = sessFrom.prepare(""" select ticker_id,ddate,ticks_count from mts_src.ticks_count_days """).bind()
val bndTicksTotal = sessFrom.prepare(""" select * from mts_src.ticks_count_total """).bind()
val bndTicksByTickerDdate = sessFrom.prepare(
""" select * from mts_src.ticks
where ticker_id=:tickerId and ddate=:dDate """).bind()
val bndSaveTickWb = sessTo.prepare(""" insert into mts_src.ticks(ticker_id,ddate,ts,db_tsunx,ask,bid)
values(:p_ticker_id,:p_ddate,:p_ts,:p_db_tsunx,:p_ask,:p_bid) """)
val bndSaveTicksByDay = sessTo.prepare(
""" update mts_src.ticks_count_days
set ticks_count=ticks_count+:p_ticks_count where ticker_id=:p_ticker_id and ddate=:p_ddate """).bind()
val bndSaveTicksCntTotal = sessTo.prepare(
""" update mts_src.ticks_count_total
set ticks_count=ticks_count+:p_ticks_count where ticker_id=:p_ticker_id """).bind()
val sqTicksCountTotal = sessFrom.execute(bndTicksTotal)
.all().iterator.asScala.toSeq
.map(r => TicksCountTotal(r.getInt("ticker_id"),r.getLong("ticks_count")))
.sortBy(t => t.ticker_id)
val sqTicksCountDays = sessFrom.execute(bndTicksCountDays)
.all().iterator.asScala.toSeq
.map(r => TicksCountDays(r.getInt("ticker_id"),r.getDate("ddate"),r.getLong("ticks_count")))
.sortBy(t => (t.ticker_id,t.ddate.getMillisSinceEpoch)) //sort by ticker_id AND ddate
for(elm <- sqTicksCountDays){
val tickCntTotal = sqTicksCountTotal.filter(tct => tct.ticker_id == elm.ticker_id).head.ticks_count
println(elm +" TOTAL_TICKS_COUNT = "+tickCntTotal)
val sqTicks = sessFrom.execute(bndTicksByTickerDdate
.setInt("tickerId", elm.ticker_id)
.setDate("dDate",elm.ddate)
).all().iterator.asScala.toSeq.map(r => Tick(
r.getInt("ticker_id"),
r.getDate("ddate"),
r.getLong("ts"),
r.getLong("db_tsunx"),
r.getDouble("ask"),
r.getDouble("bid"))
).sortBy(t => t.db_tsunx)
println(" READ sqTicks.size="+sqTicks.size+" head.db_tsunx="+sqTicks.head.db_tsunx)
val partSqTicks = sqTicks.grouped(65535)
for(thisPartOfSeq <- partSqTicks) {
var batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
thisPartOfSeq.foreach {
t =>
batch.add(bndSaveTickWb.bind
.setInt("p_ticker_id", t.tickerId)
.setDate("p_ddate", t.dDate)
.setLong("p_ts", t.ts)
.setLong("p_db_tsunx", t.db_tsunx)
.setDouble("p_ask", t.ask)
.setDouble("p_bid", t.bid))
}
sessTo.execute(batch)
}
sessTo.execute(bndSaveTicksByDay
.setInt("p_ticker_id", elm.ticker_id)
.setDate("p_ddate", elm.ddate)
.setLong("p_ticks_count",elm.ticks_count))
println(" TICKS SAVED INTO Cluster.")
}
for(elm <- sqTicksCountDays.map(e => e.ticker_id).distinct) {
val tickCntTotal = sqTicksCountTotal.filter(tct => tct.ticker_id == elm).head.ticks_count
sessTo.execute(bndSaveTicksCntTotal
.setInt("p_ticker_id", elm)
.setLong("p_ticks_count",tickCntTotal))
}
sessFrom.close()
sessTo.close()
}
And build.sbt for this code.
name := "barcl"
version := "0.1"
scalaVersion := "2.11.8"
version := "1.0"
libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scala-lang" % "scala-library" % "2.11.8",
"org.scalatest" %% "scalatest" % "3.0.5" % Test
)
//for oracle jdbc driver.
//unmanagedJars in Compile := (file("/lib") ** "*.jar").classpath
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x => MergeStrategy.first
}
assemblyJarName in assembly :="ticksloader.jar"
mainClass in (Compile, packageBin) := Some("bcapp.TicksLoader")
mainClass in (Compile, run) := Some("bcapp.TicksLoader")
assembly.sbt from project/target
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
We can make jar with
C:\barcl>sbt clean assembly
Next, I copy ticksloader.jar (16Mb) from C:\barcl\target\scala-2.11 into server that has access to all Cassandra cluster nodes and execute with: scala ticksloader.jar
Output sample:
....
17:55:55.790 [cluster2-nio-worker-6] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.194:9042-1, inFlight=0, closed=false] heartbeat query succeeded
READ sqTicks.size=72624 head.db_tsunx=1549576801098
TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-11,76209) TOTAL_TICKS_COUNT = 2208257
17:56:00.395 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:00.396 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] heartbeat query succeeded
17:56:01.942 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:01.947 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] heartbeat query succeeded
READ sqTicks.size=76206 head.db_tsunx=1549836022426
TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-12,90379) TOTAL_TICKS_COUNT = 2208257
READ sqTicks.size=90390 head.db_tsunx=1549922401917
TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-13,93447) TOTAL_TICKS_COUNT = 2208257
READ sqTicks.size=93404 head.db_tsunx=1550008825027
TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-14,117838) TOTAL_TICKS_COUNT = 2208257
READ sqTicks.size=117819 head.db_tsunx=1550095202620
TICKS SAVED INTO Cluster.
TicksCountDays(12,2019-02-15,113745) TOTAL_TICKS_COUNT = 2208257
17:56:30.423 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:30.424 [cluster2-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/192.168.122.192:9042-1, inFlight=0, closed=false] heartbeat query succeeded
17:56:31.972 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] was inactive for 90 seconds, sending heartbeat
17:56:31.984 [cluster1-nio-worker-0] DEBUG com.datastax.driver.core.Connection - Connection[/84.201.147.105:9042-1, inFlight=0, closed=false] heartbeat query succeeded
READ sqTicks.size=113715 head.db_tsunx=1550181602540
TICKS SAVED INTO Cluster.
...
Also you can copy rows one by one with this (without splitting on parts by sqTicks.grouped(65535))
for (t <- sqTicks) {
sessTo.execute(bndSaveTick
.setInt("p_ticker_id", t.tickerId)
.setDate("p_ddate", t.dDate)
.setLong("p_ts", t.ts)
.setLong("p_db_tsunx", t.db_tsunx)
.setDouble("p_ask",t.ask)
.setDouble("p_bid",t.bid))
}
Комментарии
Отправить комментарий