Read Write Map fields from Cassandra with Scala

In this little example, I show how to read and write Map columns in Cassandra.

Cassandra table, here we have columns:

ticker_id - ID of financial tool like EURUSD, ORCL, APPL, GOLD.
ddate       - simple date of DAY. ('2019-01-25','2019-01-26',...)


CREATE TABLE mts_bars.td_bars_3600(
  ticker_id int,
  ddate     date,
  bar_1 map,
  bar_2 map,
  bar_3 map,
  bar_4 map,
  bar_5 map,
  bar_6 map,
  bar_7 map,
  bar_8 map,
  bar_9 map,
  bar_10 map,
  bar_11 map,
  bar_12 map,
  bar_13 map,
  bar_14 map,
  bar_15 map,
  bar_16 map,
  bar_17 map,
  bar_18 map,
  bar_19 map,
  bar_20 map,
  bar_21 map,
  bar_22 map,
  bar_23 map,
  bar_24 map,
 PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);


Primary key contains 2 columns, ticker_id and ddate
ticker_id - is a PARTITION KEY, is responsible for data distribution across your nodes.
ddate      - is a Clustering Key, is responsible for data sorting within the partition.

As an idea, each partition (by ticker_id) contains all dates for this ticker_id. And for each hour (3600 seconds) we have a separate column with bar information.

 You can do same partition and clustering schema with this way too:

...
 PRIMARY KEY (ticker_id, ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);
...


Pay attention that you can't create a table with next code:

...
 PRIMARY KEY ((ticker_id, ddate))
) WITH CLUSTERING ORDER BY (ddate DESC);


Error raise:


Because you have no clustering key here ((ticker_id, ddate)), you try to create a table with composite partition key - each partition created for ticker_id+ddate.


...
  PRIMARY KEY ((ticker_id, ddate))
);



Here we will have 365 partitions for each ticker for 1 year, one day per partition, each partition has 1 row. Bad idea.
Ok. Go next, we have table create with and with next test data.



...
 PRIMARY KEY ((ticker_id), ddate)
) WITH CLUSTERING ORDER BY (ddate DESC);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(1,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.67173',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'u',
  'c' : '1.67173',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2) values(2,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.45',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);

insert into mts_bars.td_bars_3600(ticker_id,ddate,bar_1,bar_2,bar_3) values(3,'2019-01-25',
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.45',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
},
{
  'bar_width_sec' : '3659',
  'btype' : 'd',
  'c' : '1.56',
  'disp' : '0.0017',
  'h' : '1.67184',
  'h_body' : '0.0013',
  'h_shad' : '0.0011',
  'l' : '1.67141',
  'log_co' : '0.023',
  'o' : '1.67156',
  'ticks_cnt' : '35746',
  'ts_begin' : '1548839178248',
  'ts_end' : '1548839188248'
}
);



Scala code on github scala_cass_map_col.git (commit = Commit for article.)

And here some explanation.


import com.datastax.driver.core.{Cluster, LocalDate, Row}
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters
import scala.collection.JavaConverters._

/*
 Immutable Map myJavaMap.asScala.toMap
 Mutable Map   myJavaMap.asScala
*/
object ScalaCassMapCol extends App {

  object slog extends Serializable {
    @transient lazy val log = LoggerFactory.getLogger(getClass.getName)
  }

  case class bars(ticker_id :Int,
                  ddate     :LocalDate,
                  tdBars    :Seq[(String,Map[String,String])]){
    override def toString = "ticker_id="+ddate+"  ["+ddate+"]" + tdBars.toString()
    def getBarByName(barName : String) = {
      tdBars.filter(p => (p._1==barName)).headOption.get._2
    }
  }

  val rowToBars = (row : Row, colNames : Seq[String]) => {
    new bars(
      row.getInt("ticker_id"),
      row.getDate("ddate"),
      for (cn <- colNames) yield
        (cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
    )
  }

  slog.log.debug("BEGIN")
  val node: String = "193.124.112.90"
  private val cluster = Cluster.builder().addContactPoint(node).build()
  val session = cluster.connect()

  val queryTicksCountTotal =
    """ select *
          from mts_bars.td_bars_3600 """

  val pqueryTicksCountTotal = session.prepare(queryTicksCountTotal).bind()
  val bound = pqueryTicksCountTotal

  /**
    * Get meta information about columns in this DataSet.
    * Column names by pattern bar_x (where x from 1 to N) for MAP fields.
    */
  val colDef = session.execute(bound).getColumnDefinitions
  val colsBarsNames :Seq[String] = for (thisColumn <- colDef.asList().asScala 
                                       if thisColumn.getName().substring(0,3) == "bar") yield
    thisColumn.getName()

  val dsTicksCntTotalRow : Seq[bars] = JavaConverters.asScalaIteratorConverter(session.execute(bound)
                                                                               .all().iterator())
    .asScala
    .toSeq.map(rowToBars(_,colsBarsNames))
    .sortBy(ft => ft.ticker_id)(Ordering[Int].reverse)

  slog.log.debug("===================================================================")
  slog.log.debug("ROWS SIZE="+dsTicksCntTotalRow.size)
  slog.log.debug(" ")
  for (rowTicksCntTotalRow <- dsTicksCntTotalRow){
    slog.log.debug("ticker_id = "+rowTicksCntTotalRow.ticker_id+" for ["+rowTicksCntTotalRow.ddate+"]")
    slog.log.debug("  BARS COUNT = "+rowTicksCntTotalRow.tdBars.size)
    slog.log.debug("    BARS WITH DATA = "+rowTicksCntTotalRow.tdBars
                                           .filter(p => (p._2.nonEmpty)).size)
    /**
      * local loop by nonEmpty bars.
      */
      for(neBar <- rowTicksCntTotalRow.tdBars.filter(p => (p._2.nonEmpty))){
        slog.log.debug("      ("+neBar._1+") = "+neBar._2)
      }
    slog.log.debug(" ")
  }
  slog.log.debug("===================================================================")

  /**
    * Now we can take one random bar_x from presented and save it into cassandra.
    */
  val b_tickerId : Int = dsTicksCntTotalRow(2).ticker_id
  val bOneName :String = "bar_1"
  val b : Map[String,String] = dsTicksCntTotalRow(2).getBarByName(bOneName)
  slog.log.debug("b_tickerId = "+b_tickerId+" ("+bOneName+") ="+b)


  val querySaveCountTotal =
    """ insert into mts_bars.td_bars_3600(ticker_id,ddate,  bar_1,bar_2,bar_3,bar_4) 
                      values(?,?,  ?,?,?,?) """
  val pquerySaveCountTotal = session.prepare(querySaveCountTotal)

  val boundSaveCountTotal = pquerySaveCountTotal.bind()
    .setInt("ticker_id", dsTicksCntTotalRow(2).ticker_id)
    .setDate("ddate", dsTicksCntTotalRow(2).ddate)
    .setMap("bar_1",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
    .setMap("bar_2",dsTicksCntTotalRow(2).getBarByName("bar_1").asJava)
    .setMap("bar_3",b.asJava)
    .setMap("bar_4",b.asJava)

  val rsBar = session.execute(boundSaveCountTotal).one()

  session.close()
}


And output


20:30:11.135 [cluster1-nio-worker-2] DEBUG com.datastax.driver.core.Session - Added connection pool for /193.124.112.90:9042
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ - ROWS SIZE=3
20:30:11.376 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 3 for [2019-01-25]
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.378 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 3
20:30:11.383 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_3) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 2 for [2019-01-25]
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 2
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.45, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.56, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.384 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ticker_id = 1 for [2019-01-25]
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -   BARS COUNT = 24
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -     BARS WITH DATA = 2
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_1) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -       (bar_2) = Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> u, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ -  
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - ===================================================================
20:30:11.385 [main] DEBUG ScalaCassMapCol$slog$ - b_tickerId = 1 (bar_1) =Map(ts_end -> 1548839188248, ticks_cnt -> 35746, h_body -> 0.0013, l -> 1.67141, h_shad -> 0.0011, disp -> 0.0017, bar_width_sec -> 3659, c -> 1.67173, h -> 1.67184, btype -> d, log_co -> 0.023, ts_begin -> 1548839178248, o -> 1.67156)
20:30:11.479 [main] DEBUG com.datastax.driver.core.Connection - Connection[/193.124.112.90:9042-2, inFlight=0, closed=true] closing connection


With DataStax DevTool I see that new data save into bar_3,bar_4 for ticker_id=1
(dsTicksCntTotalRow(2))

As you can see in this example I make a dynamic read of fields, we don't know how many is it, only name pattern bar_x (where x from 1 to N)

Variable colsBarsNames conatin this names, bar_1, bar_2 .... and next I use it in function rowToBars that produce objects from rows.


val rowToBars = (row : Row, colNames : Seq[String]) => {
    new bars(
      row.getInt("ticker_id"),
      row.getDate("ddate"),
      for (cn <- colNames) yield
        (cn,row.getMap(cn, classOf[String], classOf[String]).asScala.toMap)
    )
  }


When I save data back into Cassandra I use a static code, manually set column names


insert into mts_bars.td_bars_3600(ticker_id,ddate,  bar_1,bar_2,bar_3,bar_4) 
                                   values(?,?,  ?,?,?,?)


of course, it's better to use here dynamic too.

My next task is made dynamic in saving part. Make code more universal for future use in Bar Calculator. I have a server (VDS) where MetaTrader 4 is running and save all ticks from 28 symbols into Cassandra, and I need Scala application for calculating bars and save into Cassandra with different deeps in seconds(h1 bars contains 3600 seconds and etc.)

Комментарии

Популярные сообщения из этого блога

Loading data into Spark from Oracle RDBMS, CSV

Load data from Cassandra to HDFS parquet files and select with Hive

Hadoop 3.0 cluster - installation, configuration, tests on Cent OS 7