Streaming Compressed CSV

Question: Loading data from CSV to kdb (usually on disk) is a common operation. Most people are familiar with how to load an entire CSV at once into memory using 0:. This operation can be catastrophic, however, when loading large files because memory usage will peak resulting in thrashing or the process being killed by the OOM killer. Instead, files can be streamed in chunks using the streaming algorithm .Q.fsn. This is more efficient because one chunk is processed at a time, keeping memory usage low. This solution, however, requires the file to be uncompressed, which requires an additional step of decompressing the file to disk for compressed files (ex. TAQ files). This can add significant overhead.

Luckily, there is a way to read directly from compressed files using Unix named pipes. A named pipe is a file on the filesystem which stores an input command. Reading from the named pipe will return the output of the input command fed into the named pipe. We can feed a command to decompress the contents of the file as input into a named pipe and stream the output contents in chunks from q. This saves us the overhead of decompressing to disk, and preserves the streaming capability. Read the link below for more information on streaming from named pipes in q.

Given the NYSE quote file here, define functions 'pipeFn', 'fsnFn', 'regularFn' that take in the compressed file as an argument and store the contents in quote1, quote2, quote3. The column names, types, and delimiter are given below.

More Information:

https://code.kx.com/q/kb/named-pipes/

Example

                                
                                cls:`$"|" vs "Time|Exchange|Symbol|Bid_Price|Bid_Size|Offer_Price|Offer_Size|Quote_Condition|Sequence_Number|National_BBO_Ind|FINRA_BBO_Indicator|FINRA_ADF_MPID_Indicator|Quote_Cancel_Correction|Source_Of_Quote|Retail_Interest_Indicator|Short_Sale_Restriction_Indicator|LULD_BBO_Indicator|SIP_Generated_Message_Identifier|National_BBO_LULD_Indicator|Participant_Timestamp|FINRA_ADF_Timestamp|FINRA_ADF_Market_Participant_Quote_Indicator|Security_Status_Indicator";
typs:"NSSFJFJSJSSHSSSSSSSNNSS";
dlm:"|";
quote1:quote2:quote3:flip cls!typs$count[typs]#();
chunksize:100000000; //100MB chunks
system"g 1";
// Time each function
timeFunc:{
    start:.z.N;
    x @ `SPLITS_US_ALL_BBO_A_20180103.gz;
    tdlta:.z.N - start;
    -1 string[x], " took ", string tdlta;
 };

// After defining your functions

q)timeFunc each `pipeFn`fsnFn`regularFn;
pipeFn took 0D00:02:38.340292000
fsnFn took 0D00:03:42.927997000
regularFn took 0D00:04:03.196060000
q)count each (quote1;quote2;quote3)
37982474 37982474 37982474
q)all (quote1~quote2;quote2~quote3)
1b
                                
                            

Solution

Tags:
csv ingestion streaming
Searchable Tags
algorithms api architecture asynchronous c csv data structures dictionaries disk feedhandler finance functions ingestion ipc iterators machine learning math multithreading optimizations realtime shared library sql statistics streaming strings tables temporal utility websockets