Read Write Map fields from Cassandra with Scala
In this little example, I show how to read and write Map columns in Cassandra.
Cassandra table, here we have columns:
ticker_id - ID of financial tool like EURUSD, ORCL, APPL, GOLD.
ddate - simple date of DAY. ('2019-01-25','2019-01-26',...)
Primary key contains 2 columns, ticker_id and ddate
ticker_id - is a PARTITION KEY, is responsible for data distribution across your nodes.
ddate - is a Clustering Key, is responsible for data sorting within the partition.
As an idea, each partition (by ticker_id) contains all dates for this ticker_id. And for each hour (3600 seconds) we have a separate column with bar information.
You can do same partition and clustering schema with this way too:
Pay attention that you can't create a table with next code:
Error raise:
Because you have no clustering key here ((ticker_id, ddate)), you try to create a table with composite partition key - each partition created for ticker_id+ddate.
Here we will have 365 partitions for each ticker for 1 year, one day per partition, each partition has 1 row. Bad idea.
Ok. Go next, we have table create with and with next test data.
Scala code on github scala_cass_map_col.git (commit = )
And here some explanation.
And output
With DataStax DevTool I see that new data save into bar_3,bar_4 for ticker_id=1
(dsTicksCntTotalRow(2))
As you can see in this example I make a dynamic read of fields, we don't know how many is it, only name pattern bar_x (where x from 1 to N)
Variable colsBarsNames conatin this names, bar_1, bar_2 .... and next I use it in function rowToBars that produce objects from rows.
When I save data back into Cassandra I use a static code, manually set column names
of course, it's better to use here dynamic too.
My next task is made dynamic in saving part. Make code more universal for future use in Bar Calculator. I have a server (VDS) where MetaTrader 4 is running and save all ticks from 28 symbols into Cassandra, and I need Scala application for calculating bars and save into Cassandra with different deeps in seconds(h1 bars contains 3600 seconds and etc.)
Cassandra table, here we have columns:
ticker_id - ID of financial tool like EURUSD, ORCL, APPL, GOLD.
ddate - simple date of DAY. ('2019-01-25','2019-01-26',...)
CREATE TABLE mts_bars.td_bars_3600(
ticker_id int,
ddate date,
bar_1 map,
bar_2 map,
bar_3 map,
bar_4 map,
bar_5 map,
bar_6 map,
bar_7 map,
bar_8 map,
bar_9 map,
bar_10 map,
bar_11 map,
bar_12 map,
bar_13 map,
bar_14 map,
bar_15 map,
bar_16 map,
bar_17 map,
bar_18 map,
bar_19 map,
bar_20 map,
bar_21 map,
bar_22 map,
bar_23 map,
bar_24 map,
PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);
Primary key contains 2 columns, ticker_id and ddate
ticker_id - is a PARTITION KEY, is responsible for data distribution across your nodes.
ddate - is a Clustering Key, is responsible for data sorting within the partition.
As an idea, each partition (by ticker_id) contains all dates for this ticker_id. And for each hour (3600 seconds) we have a separate column with bar information.
You can do same partition and clustering schema with this way too:
...
PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);
...
Pay attention that you can't create a table with next code:
...
PRIMARY KEY ((ticker_id, ddate))
) WITH CLUSTERING ORDER BY (ddate DESC);
Error raise:
Because you have no clustering key here ((ticker_id, ddate)), you try to create a table with composite partition key - each partition created for ticker_id+ddate.
...
PRIMARY KEY ((ticker_id, ddate))
);
Here we will have 365 partitions for each ticker for 1 year, one day per partition, each partition has 1 row. Bad idea.
Ok. Go next, we have table create with and with next test data.
...
PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);
insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(1,'2019-01-25',
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.67173',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
},
{
'bar_width_sec' : '3659',
'btype' : 'u',
'c' : '1.67173',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
}
);
insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(2,'2019-01-25',
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.45',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
},
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.56',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
}
);
insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2,bar_3) values(3,'2019-01-25',
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.45',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
},
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.56',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
},
{
'bar_width_sec' : '3659',
'btype' : 'd',
'c' : '1.56',
'disp' : '0.0017',
'h' : '1.67184',
'h_body' : '0.0013',
'h_shad' : '0.0011',
'l' : '1.67141',
'log_co' : '0.023',
'o' : '1.67156',
'ticks_cnt' : '35746',
'ts_begin' : '1548839178248',
'ts_end' : '1548839188248'
}
);
Scala code on github scala_cass_map_col.git (commit = )
And here some explanation.
import com.datastax.driver.core.{Cluster, LocalDate, Row}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
/*
Immutable Map myJavaMap.asScala.toMap
Mutable Map myJavaMap.asScala
*/
object ScalaCassMapCol extends App {
object slog extends Serializable {
@transient lazy val log = LoggerFactory.getLogger(getClass.getName)
}
case class bars(ticker_id :Int,
ddate :LocalDate,
tdBars :Seq[(String,Map[String,String])]){
override def toString = "ticker_id="+ddate+" ["+ddate+"]" + tdBars.toString()
def getBarByName(barName : String) = {
tdBars.filter(p => (p._1==barName)).headOption.get._2
}
}
val rowToBars = (row : Row, colNames : Seq[String]) => {
new bars(
row.getInt("ticker_id"),
row.getDate("ddate"),
for (cn <- colNames) yield
(cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
)
}
slog.log.debug("BEGIN")
val node: String = "193.124.112.90"
private val cluster = Cluster.builder().addContactPoint(node).build()
val session = cluster.connect()
val queryTicksCountTotal =
""" select *
from mts_bars.td_bars_3600 """
val pqueryTicksCountTotal = session.prepare(queryTicksCountTotal).bind()
val bound = pqueryTicksCountTotal
/**
* Get meta information about columns in this DataSet.
* Column names by pattern bar_x (where x from 1 to N) for MAP fields.
*/
val colDef = session.execute(bound).getColumnDefinitions
val colsBarsNames :Seq[String] = for (thisColumn <- colDef.asList().asScala
if thisColumn.getName().substring(0,3) == "bar") yield
thisColumn.getName()
val dsTicksCntTotalRow : Seq[bars] = JavaConverters.asScalaIteratorConverter(session.execute(bound)
.all().iterator())
.asScala
.toSeq.map(rowToBars(_,colsBarsNames))
.sortBy(ft => ft.ticker_id)(Ordering[Int].reverse)
slog.log.debug("===================================================================")
slog.log.debug("ROWS SIZE="+dsTicksCntTotalRow.size)
slog.log.debug(" ")
for (rowTicksCntTotalRow <- dsTicksCntTotalRow){
slog.log.debug("ticker_id = "+rowTicksCntTotalRow.ticker_id+" for ["+rowTicksCntTotalRow.ddate+"]")
slog.log.debug(" BARS COUNT = "+rowTicksCntTotalRow.tdBars.size)
slog.log.debug(" BARS WITH DATA = "+rowTicksCntTotalRow.tdBars
.filter(p => (p._2.nonEmpty)).size)
/**
* local loop by nonEmpty bars.
*/
for(neBar <- rowTicksCntTotalRow.tdBars.filter(p => (p._2.nonEmpty))){
slog.log.debug(" ("+neBar._1+") = "+neBar._2)
}
slog.log.debug(" ")
}
slog.log.debug("===================================================================")
/**
* Now we can take one random bar_x from presented and save it into cassandra.
*/
val b_tickerId : Int = dsTicksCntTotalRow(2).ticker_id
val bOneName :String = "bar_1"
val b : Map[String,String] = dsTicksCntTotalRow(2).getBarByName(bOneName)
slog.log.debug("b_tickerId = "+b_tickerId+" ("+bOneName+") ="+b)
val querySaveCountTotal =
""" insert into mts_bars.td_bars_3600(ticker_id,ddate, bar_1,bar_2,bar_3,bar_4)
values(?,?, ?,?,?,?) """
val pquerySaveCountTotal = session.prepare(querySaveCountTotal)
val boundSaveCountTotal = pquerySaveCountTotal.bind()
.setInt("ticker_id", dsTicksCntTotalRow(2).ticker_id)
.setDate("ddate", dsTicksCntTotalRow(2).ddate)
.setMap("bar_1",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
.setMap("bar_2",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
.setMap("bar_3",b.asJava)
.setMap("bar_4",b.asJava)
val rsBar = session.execute(boundSaveCountTotal).one()
session.close()
}
And output
20:30:11.135 [cluster1-nio-worker-2] DEBUG com.datastax.driver.core.Session - Added connection pool for /193.124.112.90:9042
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ROWS SIZE=3
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ -
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 3 for [2019-01-25]
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ - BARS COUNT = 24
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ - BARS WITH DATA = 3
20:30:11.383 [main] DEBUG ScalaCassMapCol$slog$ - (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - (bar_3) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 2 for [2019-01-25]
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - BARS COUNT = 24
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - BARS WITH DATA = 2
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 1 for [2019-01-25]
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - BARS COUNT = 24
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - BARS WITH DATA = 2
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> u, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - b_tickerId = 1 (bar_1) =Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.479 [main] DEBUG com.datastax.driver.core.Connection - Connection[/193.124.112.90:9042-2, inFlight=0, closed=true] closing connection
With DataStax DevTool I see that new data save into bar_3,bar_4 for ticker_id=1
(dsTicksCntTotalRow(2))
As you can see in this example I make a dynamic read of fields, we don't know how many is it, only name pattern bar_x (where x from 1 to N)
Variable colsBarsNames conatin this names, bar_1, bar_2 .... and next I use it in function rowToBars that produce objects from rows.
val rowToBars = (row : Row, colNames : Seq[String]) => {
new bars(
row.getInt("ticker_id"),
row.getDate("ddate"),
for (cn <- colNames) yield
(cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
)
}
When I save data back into Cassandra I use a static code, manually set column names
insert into mts_bars.td_bars_3600(ticker_id,ddate, bar_1,bar_2,bar_3,bar_4)
values(?,?, ?,?,?,?)
of course, it's better to use here dynamic too.
My next task is made dynamic in saving part. Make code more universal for future use in Bar Calculator. I have a server (VDS) where MetaTrader 4 is running and save all ticks from 28 symbols into Cassandra, and I need Scala application for calculating bars and save into Cassandra with different deeps in seconds(h1 bars contains 3600 seconds and etc.)
Комментарии
Отправить комментарий