Skip to main content

How to save (large) data generated from simulations

The following is a simple example of a much larger simulation that I want to run

Define some processes

norTheta[mu_, sigma_] := Random[NormalDistribution[mu, sigma]];
norPi[mu_, sigma_] := Random[NormalDistribution[mu, sigma]];

thetaNext[thetaNow_] := 
thetaNow + (-lambdaTheta*(thetaNow - thetaBar)*deltaT + 
sigmaTheta*norTheta[0, 1]*Sqrt[deltaT]);
piNext[piNow_, thetaNow_] := 
piNow + (-lambdaPi*(piNow - thetaNow)*deltaT + 
sigmaPi*norPi[0, 1]*Sqrt[deltaT]);
 
Then define some parameters
 
lambdaTheta = 0.07;
thetaBar = 0.02;     
sigmaTheta = 0.012; 
lambdaPi = 1.0;        
sigmaPi = 0.0125;
 
 
Then simulate the system
 
steps = 252;
T = 50; 
deltaT = 1/steps; // N 
Maturity = T*steps;

process = Transpose[NestList[{  piNext[#[[1]], #[[2]]],
thetaNext[#[[2]]]} &,{0.02, 0.02}, Maturity]];
 
 
File-backed lists/variables for handling large data
 

Low-level  data exchange API

EDIT

Since using .mx files is so much faster, I added some switch functions which will allow one to switch between using usual files and .mx files:
 

ClearAll[$fileNameFunction,fileName, $importFunction,import, 
$exportFunction,  export,  $compressFunction, $uncompressFunction]

$fileNameFunction = fileName;
    $importFunction  = import;
$exportFunction = export;
    $compressFunction = Compress;
$uncompressFunction = Uncompress;

fileName[dir_, hash_] := 
   FileNameJoin[{dir, StringJoin["data", ToString[hash], ".dat"]}];
mxFileName[dir_, hash_] := 
   FileNameJoin[{dir, StringJoin["data", ToString[hash], ".mx"]}];
import =  
   Function[fname, Import[fname, "String"]];
export = 
   Function[{fname, compressedValue}, 
      Export[fname, compressedValue, "String"]];
mxImport = 
   Function[fname, Block[{data}, Get[fname]; data]];
mxExport = 
   Function[{fname, compressedValue}, 
       Block[{data = compressedValue}, DumpSave[fname, data]]];
 
compression / uncompression we will also be able to switch on and off.
 
END EDIT
 
some high-level structure, which will represent the "skeleton" of 
the original list,and which will manage the on-demand data fetching 
and saving. As such a structure,I will use just a single symbol, say s. 
Here is the function which implements the management (the large one): 

ClearAll[definePartAPI];
definePartAPI[s_Symbol, part_Integer, dir_String] :=
 LetL[{sym = Unique[], hash = Hash[sym], 
     fname = $fileNameFunction[dir, hash]
       },
       sym := sym =  $uncompressFunction@$importFunction[fname];
   s /: HoldPattern[Part[s, part]] := sym;

   (* Release memory and renew for next reuse *)
   s /: releasePart[s, part] :=
       Replace[Hold[$uncompressFunction@$importFunction[fname]], 
          Hold[def_] :> (ClearAll[sym]; sym := sym = def)];

   (* Check if on disk *)
   s /: savedOnDisk[s, part] := FileExistsQ[fname];

   (* remove from disk *)
   s /: removePartOnDisk[s, part] := DeleteFile[fname];

   (* save new on disk *)
   s /: savePartOnDisk[s, part, value_] :=
      $exportFunction[fname, $compressFunction @value];

   (* Set a given part to a new value *)
   If[! TrueQ[setPartDefined[s]],
     s /: setPart[s, pt_, value_] :=
       Module[{},
         savePartOnDisk[s, pt, value];
         releasePart[s, pt];
         value
       ];
     s /: setPartDefined[s] = True;
   ];
(* Release the API for this part. Irreversible *)
s /: releaseAPI[s, part] := Remove[sym];
];
 
what happens here. First, LetL is a sequentially-binding version
of With, which I will display in a minute. It allows to avoid nested
With statements. The parameters of the function are the main top-level
symbol s,the part index, and the directory where our key-value store 
will be located. 
Basically, in OO terms, this function creates an instance of a class,
with these methods: Part (part extraction), releasePart 
(releasing the memory occupied by the part, and getting ready to extract 
it from file again, savedOnDisk - checks is the part has been backed 
into a file,  removePartOnDisk - deletes the backing file for the part,
 savePartOnDisk - save the part contents to a file, and releaseAPI
needed to release resources at the end.

All this is implemented via UpValues for s. In particular, the Part is overloaded, so now when I call s[[part]], it will look and feel like I extracted the part of s (not true of course, but very convenient). The content of the part is stored in the generated symbol sym, which is unique for a given part. Notice that the definition is lazy and self-uncompressing. This is a similar technique to one I used in this answer. Upon the first call, sym loads the content from file and uncompresses it, and then assigns it to itself. All subsequent calls will be constant time, with the content of the part stored in sym. Note also that when I call releasePart, I remove the direct part content from sym, feed it to the garbage collector, and reconstruct back the lazy definition for sym. This is my mechanism to be able to release part content when no longer needed, but also be able to load it back again on demand.
There are two important points to note regarding Compress. One is that it does not unpack packed arrays. Another is that it is cross-platform. Both are huge wins for us. Note that, essentially, for each part I create an instance of a class, where sym plays a role of instance variable. Note also that I use the Hash of the name of sym, to construct the file name. There are two flaws with this approach actually. One is that there in principle can be hash collisions, and currently I don't handle them at all. Another is that the symbols sym are unique only within a single session, while, as we'll see, I will be exporting their definitions. Both problems are surmountable, but for the sake of simplicity, I ignore them for now. So, the above code represents the low-level data-exchange API on the level of a single list's part.

Here is the code for LetL macro:
 
*A macro to bind sequentially. 
Generates nested With at run-time*
 
The details of how it works are explained in much detail  here.
 

Higher-level interface: the list-building function

This is the main function used in list-building. Its name pretty much tells what it does - it extends the list with one more element. This, however, does not cost us a performance penalty, since our "list" is faked - it is a symbol s which pretends to be a list but in fact is not (it is more like a hash-table filled with class instances).

ClearAll[appendTo];
Options[appendTo] = {
   ElementSizeLimit :> $elementSizeLimit,
       DestinationDirectory :> $destinationDirectory
 };
appendTo[s_Symbol, value_, opts : OptionsPattern[]] :=
  LetL[{len = Length[s], part = len + 1,
     dir = OptionValue[DestinationDirectory],
     blim = OptionValue[ElementSizeLimit]
    },
    definePartAPI[s, part, dir];
    s /: Length[s] = part;
    If[ByteCount[value] > blim,
       definePartAPI[s, part, dir];
       savePartOnDisk[s, part, value];
       releasePart[s, part],
       (* else *)
       With[{compressed = $compressFunction @value}, 
           s /: Part[s, part] := 
          (s /: Part[s, part] = $uncompressFunction@compressed);
         s /: Part[s, part, parts___] := Part[s, part][[parts]];
  ]]];
 
As you can see from this code, not all parts of the list are 
backed byfiles. Those which are below the threshold in terms 
of size, are merely compressed and also assigned to s via 
UpValues and overloaded Part, but are not on the disk. The 
code of this function is pretty self-explanatory,so I will 
move on. 

Integration with the system and initialization.

The following function (partially) integrates my construction with some commands that we all love. This will help to better masquerade our symbol s so that in many respects it now behaves as an ordinary list.

ClearAll[initList];
initList[s_Symbol] :=
  Module[{},
   ClearAll[s];
   (* Set a new value for part, including update on disk *)
   s /: Length[s] = 0;
   s /: HoldPattern[Take[s, {n_}]] := s[[n]];
   s /: HoldPattern[Take[s, n_]] := Take[s, {1, n}];
   s /: HoldPattern[Take[s, {m_, n_}]] := Table[s[[i]], {i, m, n}];
   s /: HoldPattern[Drop[s, {n_}]] := Drop[s, {n, n}];
   s /: HoldPattern[Drop[s, n_]] := 
      Table[s[[i]], {i, n + 1, Length[s]}];
   s /: HoldPattern[Drop[s, {m_, n_}]] :=
     Table[s[[i]], {i, Range[m - 1] ~~ Join ~~ Range[n + 1, Length[s]]}];
   s /: Map[f_, s] := Table[f[s[[i]]], {i, Length[s]}];
   s /: HoldPattern[First[s]] := s[[1]];
   s /: HoldPattern[Last[s]] := s[[Length[s]]];
   s /: HoldPattern[Rest[s]] := Drop[s, 1];
   s /: HoldPattern[Most[s]] := Take[s, {1, Length[s] - 1}];
   s /: Position[s, patt_] :=
      If[# === {}, {}, First@#] &@
     Reap[Do[If[MatchQ[s[[i]], patt], Sow[{i}]], {i, Length[s]}]][[2]]
  ];
 
The above code probably does not need any comments.

Settings

There are a few settings I use, basically defaults for the directory and the size threshold.

ClearAll[releasePart, savedOnDisk, removePartOnDisk, removePartOnDisk,
   savePartOnDisk, releaseAPI]
$destinationDirectory = $TemporaryDirectory ;
$elementSizeLimit = 50000;
 

Higher-level and management-level functions

The following functions realize higher-level API which is actually what the end user is supposed to work with.

ClearAll[appendList];
appendList[s_Symbol, l_List, opts : OptionsPattern[]] :=
   Do[appendTo[s, l[[i]], opts], {i, 1, Length[l]}];

ClearAll[removeStorage];
removeStorage[s_Symbol] :=
Do[If[savedOnDisk[s, i], removePartOnDisk[s, i]], {i, Length[s]}];

ClearAll[releaseAllMemory];
releaseAllMemory[s_Symbol] :=
   Do[releasePart[s, i], {i, Length[s]}];

The last several functions are concerned with disk management, and storing the main structure / definitions on disk. The point is that in the process of creating our key-value store, we generated lots of UpValues for s, and all those private symbols sym for each part, must also be saved together with s, if we want to fully reconstruct the environment on a fresh kernel.
This will find the dependencies of the main symbol s. We only use UpValues, so this is quite straightforward.

(* Our current system only has one-step dependencies*)
ClearAll[getDependencies];
getDependencies[s_Symbol] :=
 Thread[
   Prepend[
     Union@Cases[UpValues[s],
  sym_Symbol /; Context[sym] =!= "System`" :> HoldComplete[sym],
     {0, Infinity}, Heads -> True],
   HoldComplete[s]
  ],
  HoldComplete] 
 
This generates a file name. It is important that the extension 
for the main file is .m (Mathematica package) - will come to 
that later.
 
ClearAll[getMainListFileName];
Options[getMainListFileName] = {
   DestinationDirectory :> $destinationDirectory,
   ListFileName -> Automatic
 };
getMainListFileName[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{fn = OptionValue[ListFileName],
fname = If[fn === Automatic, ToString[s] <> ".m", fn], 
fullfname = FileNameJoin[{OptionValue[ DestinationDirectory], fname}]},
   fullfname];
 
This function saves the main symbol s and those on which it depends
(definitions) in a plain .m format to the disk.
 
ClearAll[storeMainList];
storeMainList[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{filteredOpts  = 
      Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
      fname  = getMainListFileName[s, filteredOpts]},
    releaseAllMemory[s];
    If[FileExistsQ[fname], DeleteFile[fname]];
    Replace[getDependencies[s],
       HoldComplete[syms_] :> Save[fname , Unevaluated[syms]]]];
 
A call to releaseAllMemory is important, since it converts all possibly expanded
definitions of sym-s for various parts back to lazy form, and in that form they will be saved.
This function does the inverse: it loads the environment, on a fresh kernel:

ClearAll[retrieveMainList];
retrieveMainList[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{filteredOpts  = 
    Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
      fname  = getMainListFileName[s, filteredOpts],
      imported =  Import[fname , "HeldExpressions"]
     },
    ReleaseHold[imported /.
       {TagSet -> TagSetDelayed, UpSet -> UpSetDelayed}
       ] /; imported =!= $Failed;
    ];

 retrieveMainList[___] := $Failed;
 
 
There are a few subtleties here. The problem is that Save converts delayed UpValue definitions (made with TagSetDelayed or UpSetDelayed), into immediate ones (which looks like a bug to me, but anyways). Therefore, I have to load the package in unevaluated form and do back replacements manually, before I allow it to run.
The last function here will completely remove all the generated files from the file system:

ClearAll[deleteListComplete];
deleteListComplete[s_Symbol, opts : OptionsPattern[]] :=
 LetL[{filteredOpts  = 
 Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
    fname  = getMainListFileName[s, filteredOpts]},
    removeStorage[s];
    If[FileExistsQ[fname], DeleteFile[fname]];
    Do[releaseAPI[s, i], {i, Length[s]}];
    ClearAll[s]]; 
 
This completes the current version of the system, and now we are ready to start using it.

Examples and benchmarks

Initialization

The following may be considered as a quick guide to the usage.

$HistoryLength = 0
 
We first generated a reasonably small piece of data, to have something to play with:
smallTest = RandomInteger[100, #] & /@ RandomInteger[{10000, 20000}, 300]; I will chose our top-level symbol to have a name test. Before we start anything, we must
initialize it:

initList[test]



Converting a list

We now convert our list into our key-value structure:
In[83]:= appendList[test,smallTest,DestinationDirectory:>"C:
\\Temp\\LargeData"]; //Timing
Out[83]= {2.906,Null}
 
This was about 18Mb:

In[84]:= ByteCount[smallTest]
Out[84]= 18193688
 
And we generated about 230 files:

In[87]:= FileNames["*.dat",{"C:\\Temp\\LargeData"}]//Short
Out[87]//Short= {C:\Temp\LargeData\data530106946.dat,<<234>>,
      C:\Temp\LargeData\data530554672.dat}

Details and tests...

Note that I intentionally chose a high enough threshold so that not all parts
of smallTest ended up in files, some were assigned in-memory only:

In[95]:= Length[test]
Out[95]= 300

In[97]:= Position[Table[savedOnDisk[test,i], 
{i,Length[test]}],False]//Short
Out[97]//Short= {{3},{5},{7},{33},{34},{35},{39},<<50>>, 
{277},{280},{287},{290}, 
{298},{299},{300}}
 
Let us now test that our file-backed system keeps the right results. We pick some
random positions:

In[99]:= randomPos = RandomSample[Range[Length[test]],20]
Out[99]= {287,214,9,294,32,263,12,141,282,85,213,108,22,
 197,77,67,41,286,146,38}
 
And test:

In[100]:= test[[#]]==smallTest[[#]]&/@randomPos//Timing
Out[100]= {0.203, {True,True,True,True,True,True,True, 
True,True,True,True,True,True,True,True,True,True,True, 
True,True}}
 
Note that the second time the test is instant, since memoization is now at work,
and there's no need to uncompress again:

In[101]:= test[[#]]==smallTest[[#]]&/@randomPos//Timing
Out[101]= {0.,{True,True,True,True,True,True,True,True,True, 
True,True,True,True,True,True,True,True,True,True,True}}
 
Another test:

In[102]:= Take[test, {10, 20}] == Take[smallTest, {10, 20}]
Out[102]= True

Adding new elements

Let us append some elements to our list now:

appendTo[test, Range[10000]]
 
We check the length:

In[105]:= Length[test]
Out[105]= 301
 
We can also test directly:

In[116]:= test[[301]]//Short
Out[116]//Short= {1,2,3,4,5,6,7,8,9,10,<<9980>>,9991,9992,
9993,9994,9995,9996,9997,9998,9999,10000}

In[117]:= Last@test//Short
Out[117]//Short= {1,2,3,4,5,6,7,8,9,10,<<9980>>,9991,9992,
 9993,9994,9995,9996,9997,9998,9999,10000}
 
We can append wholesale as well:

In[118]:= appendList[test, Partition[Range[10000, 60000], 10000]]

In[119]:= Length[test]
Out[119]= 306

Memory management

I will now illustrate memory management: we will force it to load from disk and
uncompress all parts:

In[120]:= MemoryInUse[]
Out[120]= 49040104
In[121]:= Take[test, {1, Length[test]}];
In[122]:= MemoryInUse[]
Out[122]= 64273408
 
We now release all memory, and return to lazy self-uncompressing definitions.

In[123]:= releaseAllMemory[test];
In[124]:= MemoryInUse[]
Out[124]= 49079560

Saving and reconstructing the environment

Let us now save our environment:

In[125]:= 
storeMainList[test, DestinationDirectory :> "C:\\Temp\\LargeData"]  
// AbsoluteTiming

Out[125]= {1.1015625, Null}
 
We now quit the kernel:

Quit
 
and now try to reconstruct it back:

In[126]:= 
retrieveMainList[test, 
   DestinationDirectory :> "C:\\Temp\\LargeData"]// AbsoluteTiming

Out[126]= {1.2294922, Null}
 
We can see that we are in business:

In[127]:= Length[test]
Out[127]= 306

In[128]:= test[[301]]//Short
Out[128]//Short= {1,2,3,4,5,6,7,8,9,10,<<9980>>,9991,9992,9993,
9994,9995,9996,9997,9998,9999,10000}

Removing the key-value store - uninstall

Finally, this will remove all the files from the system completely:

In[129]:= deleteListComplete[test,DestinationDirectory:>"C:
\\Temp\\LargeData"]//Timing
Out[129]= {0.031,Null}

Larger tests

I will throw in a few larger tests, which are still kind of toy tests, but a bit more
representative. We start with this:

In[130]:= MemoryInUse[]
Out[130]= 44668800
 
Now we create a reasonably large dataset:

In[131]:= mediumTest = RandomInteger[100,#]&/
@RandomInteger[{100000,200000},1000];
In[132]:= ByteCount[mediumTest]
 
This tells how large

Out[132]= 607800752
In[133]:= initList[test]
 
It takes slightly more than a minute to convert it to our data store:

In[134]:= appendList[test, mediumTest, 
   DestinationDirectory :> "C:\\Temp\\LargeData",
   ElementSizeLimit:>20000]; //Timing
Out[134]= {73.906,Null}
 
The memory consumption is just amazing (the lack of it!):

In[135]:= MemoryInUse[]
Out[135]= 657753176
 
This is pretty much what the initial memory use was plus the memory occupied
by mediumTest - our construction takes almost no memory because everything
is cached and lazy Here we extract some element (which is not that small):

In[136]:= test[[10]]//Short//Timing
Out[136]= {0.047,{1,19,82,24,54,12,25,5,11,4,74,7,75,
   <<176964>>,93,5,12,25,97,89,56,59,46,35,95,1,49}}
 
All the next times, this will be instantly for this particular element, until we
decide to release the cache. We take some more now:

In[137]:= Take[test,{10,30}]//Short//Timing
Out[137]= {0.5,{<<1>>}}

In[138]:= ByteCount[Take[test,{10,30}]]
Out[138]= 13765152
 
We now take about a third of the total data set - it takes several seconds:
In[139]:= (chunk = Take[test,{1,300}]);//Timing
Out[139]= {6.75,Null}

In[140]:= ByteCount[chunk]
Out[140]= 180658600

Need for speed: Turning on .mx files

If we sacrifice being cross-platform for speed, we get 10-40x speedup by
using .mx files, and in this regime I'll be hard-pressed to see any database
solution beating this in terms of performance. Here are the same benchmarks
as before, done with .mx files.

First, switch to .mx:

$fileNameFunction = mxFileName;
    $importFunction  = mxImport ;
$exportFunction = mxExport ;
    $compressFunction = Identity;
$uncompressFunction = Identity;
 
Note also that I disabled compressing, for maximal speed. The benchmarks:

In[57]:= MemoryInUse[]
Out[57]= 18638744

In[58]:= mediumTest = RandomInteger[100,#]&
/@RandomInteger[{100000,200000},1000];

In[59]:= ByteCount[mediumTest]
Out[59]= 594434920

In[60]:= initList[test]

In[61]:= appendList[test,mediumTest,DestinationDirectory:>"C:\\
Temp\\LargeData"];//Timing
Out[61]= {14.797,Null}

In[62]:= MemoryInUse[]
Out[62]= 618252872
 
Extraction of a singe list element (including loading from disk) is now instantly:

In[63]:= test[[10]]//Short//Timing
Out[63]= {0.,{7,17,36,41,54,62,49,78,63,62,84,83,14,42,42,
    <<184520>>,83,0,64,25,86,84,89,17,71,94,84,3,6,23,38}}
 
Extracting 20 elements is also pretty fast:

In[64]:= Take[test,{10,30}];//Timing
Out[64]= {0.047,Null}

In[65]:= ByteCount[Take[test,{10,30}]]//AbsoluteTiming
Out[65]= {0.,12279632}
 
We now extract about 300 elements, with the total size af about 180Mb:

In[66]:= (chunk = Take[test,{1,300}]);//AbsoluteTiming
Out[66]= {0.3281250,Null}

In[67]:= ByteCount[chunk]
Out[67]= 178392632
 
To my mind, this is blazing fast.

Summary and conclusions

I presented here a tiny but complete implementation of a key-value store, which may make it possible to work with large files which don't fit in memory, notably lists. From the technical viewpoint, this is by far the most serious application of UpValues I have ever written. I think the simplicity of the code illustrates the power of UpValues well. They also made it possible to have nice syntactic sugar, and be able to use the familiar commands such as Part, Take, etc.
The implementation has many flaws, and it is still not clear to me whether it is efficient enough to be useful, but I think this may represent a good starting point.

EDIT
 
As it turns out, using .mx files gives a huge speedup (which is not unexpected of course). If speed is absolutely crucial, one can use .mx files for all computations and only use normal files to import from or export to another computer. I plan to build a layer which would automate that, but so far, this can be done manually, based on the single-part API in the code above.
 
END EDIT

 
All ideas, suggestions etc - most welcome!
  
  


 

 

 


 

Comments

Popular posts from this blog

Python and Parquet Performance

In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask. This post outlines how to use all common Python libraries to read and write Parquet format while taking advantage of  columnar storage ,  columnar compression  and  data partitioning . Used together, these three optimizations can dramatically accelerate I/O for your Python applications compared to CSV, JSON, HDF or other row-based formats. Parquet makes applications possible that are simply impossible using a text format like JSON or CSV. Introduction I have recently gotten more familiar with how to work with  Parquet  datasets across the six major tools used to read and write from Parquet in the Python ecosystem:  Pandas ,  PyArrow ,  fastparquet ,  AWS Data Wrangler ,  PySpark  and  Dask . My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM comput...

Kubernetes Configuration Provider to load data from Secrets and Config Maps

Using Kubernetes Configuration Provider to load data from Secrets and Config Maps When running Apache Kafka on Kubernetes, you will sooner or later probably need to use Config Maps or Secrets. Either to store something in them, or load them into your Kafka configuration. That is true regardless of whether you use Strimzi to manage your Apache Kafka cluster or something else. Kubernetes has its own way of using Secrets and Config Maps from Pods. But they might not be always sufficient. That is why in Strimzi, we created Kubernetes Configuration Provider for Apache Kafka which we will introduce in this blog post. Usually, when you need to use data from a Config Map or Secret in your Pod, you will either mount it as volume or map it to an environment variable. Both methods are configured in the spec section or the Pod resource or in the spec.template.spec section when using higher level resources such as Deployments or StatefulSets. When mounted as a volume, the contents of the Secr...

Andriod Bug

A bug that steals cash by racking up charges from sending premium rate text messages has been found in Google Play.  Security researchers have identified 32 apps on Google Play that harbour the bug called BadNews. A security firm Lookout, which uncovered BadNews, said that the malicious program lays dormant on handsets for weeks to escape detection.  The malware targeted Android owners in Russia, Ukraine, Belarus and other countries in eastern Europe. 32 apps were available through four separate developer accounts on Google Play. Google has now suspended those accounts and it has pulled all the affected apps from Google Play, it added. Half of the 32 apps seeded with BadNews are Russian and the version of AlphaSMS it installed is tuned to use premium rate numbers in Russia, Ukraine, Belarus, Armenia and Kazakhstan.