Read Write Map fields from Cassandra with Scala

In this little example, I show how to read and write Map columns in Cassandra.

Cassandra table, here we have columns:

ticker_id - ID of financial tool like EURUSD, ORCL, APPL, GOLD.
ddate       - simple date of DAY. ('2019-01-25','2019-01-26',...)


CREATE TABLE mts_bars.td_bars_3600(
  ticker_id int,
  ddate     date,
  bar_1 map,
  bar_2 map,
  bar_3 map,
  bar_4 map,
  bar_5 map,
  bar_6 map,
  bar_7 map,
  bar_8 map,
  bar_9 map,
  bar_10 map,
  bar_11 map,
  bar_12 map,
  bar_13 map,
  bar_14 map,
  bar_15 map,
  bar_16 map,
  bar_17 map,
  bar_18 map,
  bar_19 map,
  bar_20 map,
  bar_21 map,
  bar_22 map,
  bar_23 map,
  bar_24 map,
 PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);


Primary key contains 2 columns, ticker_id and ddate
ticker_id - is a PARTITION KEY, is responsible for data distribution across your nodes.
ddate      - is a Clustering Key, is responsible for data sorting within the partition.

As an idea, each partition (by ticker_id) contains all dates for this ticker_id. And for each hour (3600 seconds) we have a separate column with bar information.

 You can do same partition and clustering schema with this way too:

...
 PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);
...


Pay attention that you can't create a table with next code:

...
 PRIMARY KEY ((ticker_id, ddate))
) WITH CLUSTERING ORDER BY (ddate DESC);


Error raise:


Because you have no clustering key here ((ticker_id, ddate)), you try to create a table with composite partition key - each partition created for ticker_id+ddate.


...
  PRIMARY KEY ((ticker_id, ddate))
);



Here we will have 365 partitions for each ticker for 1 year, one day per partition, each partition has 1 row. Bad idea.
Ok. Go next, we have table create with and with next test data.



...
 PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(1,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.67173',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'u',
  'c' : '1.67173',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(2,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.45',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2,bar_3) values(3,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.45',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);



Scala code on github scala_cass_map_col.git (commit = Commit for article.)

And here some explanation.


import com.datastax.driver.core.{Cluster, LocalDate, Row}
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters
import scala.collection.JavaConverters._

/*
 Immutable Map myJavaMap.asScala.toMap
 Mutable Map   myJavaMap.asScala
*/
object ScalaCassMapCol extends App {

  object slog extends Serializable {
    @transient lazy val log = LoggerFactory.getLogger(getClass.getName)
  }

  case class bars(ticker_id :Int,
                  ddate     :LocalDate,
                  tdBars    :Seq[(String,Map[String,String])]){
    override def toString = "ticker_id="+ddate+"  ["+ddate+"]" + tdBars.toString()
    def getBarByName(barName : String) = {
      tdBars.filter(p => (p._1==barName)).headOption.get._2
    }
  }

  val rowToBars = (row : Row, colNames : Seq[String]) => {
    new bars(
      row.getInt("ticker_id"),
      row.getDate("ddate"),
      for (cn <- colNames) yield
        (cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
    )
  }

  slog.log.debug("BEGIN")
  val node: String = "193.124.112.90"
  private val cluster = Cluster.builder().addContactPoint(node).build()
  val session = cluster.connect()

  val queryTicksCountTotal =
    """ select *
          from mts_bars.td_bars_3600 """

  val pqueryTicksCountTotal = session.prepare(queryTicksCountTotal).bind()
  val bound = pqueryTicksCountTotal

  /**
    * Get meta information about columns in this DataSet.
    * Column names by pattern bar_x (where x from 1 to N) for MAP fields.
    */
  val colDef = session.execute(bound).getColumnDefinitions
  val colsBarsNames :Seq[String] = for (thisColumn <- colDef.asList().asScala 
                                       if thisColumn.getName().substring(0,3) == "bar") yield
    thisColumn.getName()

  val dsTicksCntTotalRow : Seq[bars] = JavaConverters.asScalaIteratorConverter(session.execute(bound)
                                                                               .all().iterator())
    .asScala
    .toSeq.map(rowToBars(_,colsBarsNames))
    .sortBy(ft => ft.ticker_id)(Ordering[Int].reverse)

  slog.log.debug("===================================================================")
  slog.log.debug("ROWS SIZE="+dsTicksCntTotalRow.size)
  slog.log.debug(" ")
  for (rowTicksCntTotalRow <- dsTicksCntTotalRow){
    slog.log.debug("ticker_id = "+rowTicksCntTotalRow.ticker_id+" for ["+rowTicksCntTotalRow.ddate+"]")
    slog.log.debug("  BARS COUNT = "+rowTicksCntTotalRow.tdBars.size)
    slog.log.debug("    BARS WITH DATA = "+rowTicksCntTotalRow.tdBars
                                           .filter(p => (p._2.nonEmpty)).size)
    /**
      * local loop by nonEmpty bars.
      */
      for(neBar <- rowTicksCntTotalRow.tdBars.filter(p => (p._2.nonEmpty))){
        slog.log.debug("      ("+neBar._1+") = "+neBar._2)
      }
    slog.log.debug(" ")
  }
  slog.log.debug("===================================================================")

  /**
    * Now we can take one random bar_x from presented and save it into cassandra.
    */
  val b_tickerId : Int = dsTicksCntTotalRow(2).ticker_id
  val bOneName :String = "bar_1"
  val b : Map[String,String] = dsTicksCntTotalRow(2).getBarByName(bOneName)
  slog.log.debug("b_tickerId = "+b_tickerId+" ("+bOneName+") ="+b)


  val querySaveCountTotal =
    """ insert into mts_bars.td_bars_3600(ticker_id,ddate,  bar_1,bar_2,bar_3,bar_4) 
                      values(?,?,  ?,?,?,?) """
  val pquerySaveCountTotal = session.prepare(querySaveCountTotal)

  val boundSaveCountTotal = pquerySaveCountTotal.bind()
    .setInt("ticker_id", dsTicksCntTotalRow(2).ticker_id)
    .setDate("ddate", dsTicksCntTotalRow(2).ddate)
    .setMap("bar_1",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
    .setMap("bar_2",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
    .setMap("bar_3",b.asJava)
    .setMap("bar_4",b.asJava)

  val rsBar = session.execute(boundSaveCountTotal).one()

  session.close()
}


And output


20:30:11.135 [cluster1-nio-worker-2] DEBUG com.datastax.driver.core.Session - Added connection pool for /193.124.112.90:9042
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ROWS SIZE=3
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 3 for [2019-01-25]
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 3
20:30:11.383 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_3) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 2 for [2019-01-25]
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 2
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 1 for [2019-01-25]
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 2
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> u, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - b_tickerId = 1 (bar_1) =Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.479 [main] DEBUG com.datastax.driver.core.Connection - Connection[/193.124.112.90:9042-2, inFlight=0, closed=true] closing connection


With DataStax DevTool I see that new data save into bar_3,bar_4 for ticker_id=1
(dsTicksCntTotalRow(2))

As you can see in this example I make a dynamic read of fields, we don't know how many is it, only name pattern bar_x (where x from 1 to N)

Variable colsBarsNames conatin this names, bar_1, bar_2 .... and next I use it in function rowToBars that produce objects from rows.


val rowToBars = (row : Row, colNames : Seq[String]) => {
    new bars(
      row.getInt("ticker_id"),
      row.getDate("ddate"),
      for (cn <- colNames) yield
        (cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
    )
  }


When I save data back into Cassandra I use a static code, manually set column names


insert into mts_bars.td_bars_3600(ticker_id,ddate,  bar_1,bar_2,bar_3,bar_4) 
                                   values(?,?,  ?,?,?,?)


of course, it's better to use here dynamic too.

My next task is made dynamic in saving part. Make code more universal for future use in Bar Calculator. I have a server (VDS) where MetaTrader 4 is running and save all ticks from 28 symbols into Cassandra, and I need Scala application for calculating bars and save into Cassandra with different deeps in seconds(h1 bars contains 3600 seconds and etc.)

Комментарии

Популярные сообщения из этого блога

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive