Tickerplant Logger Service

In a typical real-time tickerplant environment, the tickerplant service logs all updates to a file on disk. This introduces a dependency on the filesystem, which can pose problems such as I/O latency. To mitigate these problems, we can have a dedicated service responsible for logging updates. This also allows for more customized logic because the logger service can spend extra time performing more complex tasks, while the tickerplant cannot as it needs to tend to new messages from upstream at all times.

We'll be modifying the standard tickerplant code found here: https://github.com/KxSystems/kdb-tick

The big change to tick.q is the removal of all logging related code. The auxiliary file u.q has been kept the same. The file tplogger.q represents the tickerplant logger service. The logger has command line arguments passed into it that specify the tickerplant address and the location of the log directory. On startup, the logger connects via IPC to the tickerplant. If the logger is on the same server as the tickerplant, it will connect via Unix Domain Sockets for faster transmission of data. The logger then subscribes to the tickerplant and retrieves the current date from the tickerplant, and creates the log file for that date. After this, the logger simply waits for updates from the upstream tickerplant.

The upd callback function, which is invoked by the tickerplant, receives the update message from the tickerplant and appends it to the logfile. It also increments the logger's internal log count by one. The log count can be leveraged by external services which need to replay tickerplant logs. The .u.end callback closes the current day's log file, creates a log file for the new day, and opens a new file handle to the file. The .z.ts timer function tries reconnecting to the tickerplant in case the IPC connection becomes disconnected. The .z.pc callback will attempt reconnection to the tickerplant after 5 seconds of sleeping (works for Unix based OS only).

Below is a sample way of starting up the tickerplant and logger in tandem in a Unix environment:

$ q tick.q sym -p 5001
$ q tplogger.q -tp :5001 -outpath logs -t 5000

Code:

                                
                                ######## tick.q ########

/ q tick.q sym -p 5001 
system"l tick/",(first .z.x,enlist"sym"),".q"

if[not system"p";system"p 5010"]

\l u.q
\d .u
tick:{init[];if[not min(`time`sym~2#key flip value@)each t;'`timesym];@[;`sym;`g#]each t;d::.z.D;};

endofday:{end d;d+:1;};
ts:{if[dtype first x;a,x;(enlist(count first x)#a),x]];
 t insert x;}];

if[not system"t";system"t 1000";
 .z.ts:{ts .z.D};
 upd:{[t;x]ts"d"$a:.z.P;
 if[not -16=type first first x;a:"n"$a;x:$[0>type first x;a,x;(enlist(count first x)#a),x]];
 f:key flip value t;pub[t;$[0>type first x;enlist f!x;flip f!x]];}];

\d .
.u.tick[];

######## tplogger.q ########

\d .tpl
opts:.Q.def[`tp`outpath!(enlist":5010";`:.)] .Q.opt .z.x;
tpHostPort:hsym `$raze ":",opts`tp;
tpLogDir:hsym opts`outpath;

connectTP:{
  host:@[;1] hp:`$":" vs string tpHostPort;
  if[host in ``localhost,.z.h;tpHostPort::` sv `:unix:/,hp 2];
  tpH::@[hopen;(tpHostPort;1000);0Ni];
 };

openLog:{[dir;date]
  tpLogFile::` sv dir, ` sv `tp,`$string date;
  tpLogFile set ();
  tpLogH::hopen tpLogFile;
 }[tpLogDir];

init:{
  connectTP[];
  tpDate:last .tpl.tpH"(.u.sub[`;`];.u.d)";
  openLog[tpDate];
 };

\d .

upd:{[tbl;data]
  .tpl.tpLogH enlist (`upd;tbl;data);
  .tpl.logCount+:1;
 };

.u.end:{
  hclose .tpl.tpLogH;
  .tpl.openLog x+1;
  .tpl.logCount:0;
 };

.z.ts:{
  if[null .tpl.tpH;
    -1"Retrying connection to tickerplant";
    .tpl.connectTP[];
  ];
 };

.z.pc:{
  system"sleep 5s";
  -1"Retrying connection to tickerplant";
  .tpl.connectTP[];
 };

.tpl.init[];


                                
                            
Tags:
architecture asynchronous ipc optimizations realtime streaming
Searchable Tags
algorithms api architecture asynchronous c csv data structures dictionaries disk feedhandler finance functions ingestion ipc iterators machine learning math multithreading optimizations realtime shared library sql statistics streaming strings tables temporal utility websockets