Using Dask to parallelize simulations

In real-world use, SWYFT will ofen be combined with existing simulators. As inference using truncated marginal neural ratio estimation (as implemented in SWYFT), allows the reuse and independent realization of simulations, SWYFT makes the use of more computationally expensive and (physically) relevant simulators viable.

In SWYFT this supported by the use of the directory store, but even more importantly by the DaskSimulator class. In contrast to the Simulator class, DaskSimulator allows the simulations requested to executed in parallel on a Dask cluster.

In this notebook we demonstrate the framework for the use of a more computationally expensive external simulator, although we still make use of the toy external simulator we previously defined.

Here we have chosen to include all explanatory text.

[1]:
%load_ext autoreload
%autoreload 2
[2]:
# DON'T FORGET TO ACTIVATE THE GPU when on google colab (Edit > Notebook settings)
from os import environ
GOOGLE_COLAB = True if "COLAB_GPU" in environ else False
if GOOGLE_COLAB:
    !pip install git+https://github.com/undark-lab/swyft.git
[3]:
import numpy as np
import torch
import pylab as plt
import os

import swyft
[4]:
# Set randomness
np.random.seed(25)
torch.manual_seed(25)

# cwd
cwd = os.getcwd()

# swyft
device = 'cpu'
n_training_samples = 100
n_parameters = 2
observation_key = "x"

Set input …

In order to make use of en external simulator called frrom the command line, the user must specify a function to setup the simulator input. It should take one input argument (the array with the input parameters), and return any input to be passed to the program via stdin. If the simulator requires any input files to be present, this function should write these to disk.

[5]:
def set_input(v):
    v0 = v[0]
    v1 = v[1]
    v_str = str(v0).strip()+' '+str(v1).strip()
    return v_str

##… output methods … Analogously, the user must define a function to retrieve results from the simulator output. It should take two input arguments (stdout and stderr of the simulator run) and return a dictionary with the simulator output shaped as described by the sim_shapes argument. If the simulator writes output to disk, this function should parse the results from the file(s).

[6]:
def get_output(stdout,stderr):
    try:
        if not stderr :
            x0,x1 = stdout.split(" ")
            x0 = float(x0.strip())
            x1 = float(x1.strip())
            x = np.array([x0,x1])
            return dict(x=x)

        else:
            raise('simulator returned on stderr')

    except:
        raise('Error in output retrieval')

… and invocation

Here we use the cell magic %%writefile command to create an external python function randgauss.py containing the simulator defined as model in the Quickstart notebook. This function is then invoked from the command line.

[7]:
%%writefile randgauss.py
#!/usr/bin/env python

import numpy as np
import sys


def rgmodel(v,sigma=0.05):
    x = v + np.random.randn(2)*sigma
    return x


def main():
    sigma = None
    args = sys.stdin.readline()
    arg1, arg2 = args.split(' ')
    try:
        v0 = float(arg1.rstrip())
        v1 = float(arg2.rstrip())

    except:
        raise()

    v = np.array([v0,v1])

    if sigma is not None:
        x = rgmodel(v,sigma=sigma)
    else:
        x = rgmodel(v)

    print(str(x[0]).strip()+' '+str(x[1]).strip())


if __name__ == "__main__":
    main()

Writing randgauss.py

It is up to the user to ensure adaquate permissions for all relevant files.

[8]:
!chmod 755 randgauss.py

And to ensure that the root temporary directory in which the simulator is run exists. Each instance of the simulator will run in a separate sub-folder.

[9]:
!mkdir -p ./tmp
[10]:
command = cwd+'/randgauss.py'

Defining the Dask simulator

The simulator itslef can then be defined using the from_command() method of the DaskSimulator class, exactly as for the Simulator class. A local dask cluster with a local scheduler is instantiated.

[11]:
simulator = swyft.DaskSimulator.from_command(
    command=command,
    parameter_names=["x0","x1"],
    sim_shapes=dict(x=(n_parameters,)),
    set_input_method=set_input,
    get_output_method=get_output,
    tmpdir=cwd+'/tmp/'
)

In order to connect to an external Dask cluster, and also locally for better performance, we make use of the Dask distributed scheduler.

[12]:
from dask.distributed import LocalCluster # from dask_jobqueue import SLURMCluster

Here we set the cluster the simulator will use, enabling the user to connect to a dask cluster of their choice.

##Please Note When using the DaskSimulator class in combination with a simulator invoked from the command line, the user must ensure that it is available at the specified location on all nodes of the cluster.

Furthermore, the number of threads per worker must be set to one, i.e. threads_per_worker=1.

This is because each worker access a different scratch directory space, which can be achieved by process-parallelism only. The command-line simulator can then spawn multiple threads.

[13]:
cluster = LocalCluster(n_workers=3,threads_per_worker=1)
[14]:
simulator.set_dask_cluster(cluster)

The directory store

A directory store is initialized using the Store.directory_store() convenience function. The user must specify the path where to create a new store. If a store already exists, it can be opened using Store.load().

[15]:
store = swyft.Store.directory_store(cwd+'/mystore', simulator=simulator)
Creating new store.

Inferrence

We can now define the intial prior, and given an observation, begin with the inferrence problem,

[16]:
low = -1 * np.ones(n_parameters)
high = 1 * np.ones(n_parameters)
prior = swyft.get_uniform_prior(low, high)

# drawing samples from the store is Poisson distributed. Simulating slightly more than we need avoids attempting to draw more than we have.
store.add(n_training_samples + 0.01 * n_training_samples, prior)
Store: Adding 114 new samples to simulator store.

invoking the simulator to produce the required simulations. Here, we wait for the simmulations to complete (blocking the flow of execution) and then continue

[17]:
store.simulate()
[18]:
dataset = swyft.Dataset(n_training_samples, prior, store)
[19]:
# The store / dataset is populated with samples simulated with dask.
print(len(dataset))
113

Asynchronous simulation

In some cases performing all the required simulations, even when parallelized, will take a significant amount of time. For such situations, the use of the DaskSimulator class, together with the directory store enables the asynchronous execution of the requested simulations. The dask workers then write the obtained simulation results directly to the store on disk, allowing the inference workflow to be suspened and later restarted.

Please note

For the example shown here, we have started the dask cluster from within the notebook. If the user expects simulations to take so much time as to warrant exiting and rejoining at a later time, then the Dask cluster should be started outside of the Jupyter notebook in order to keep working when the kernel stops.

We refer the user to the dask documentation and to this example for reference in using dask on an HPC system.

[20]:
store.simulate(wait_for_results=False)

and when the simulations are complete, the (interrupted) workflow can continue as

[21]:
rejoinedstore = swyft.Store.load(cwd+'/mystore', simulator=simulator)
Loading existing store.
[22]:
dataset = swyft.Dataset(n_training_samples*8/10, prior, rejoinedstore)
[23]:
# The store / dataset is populated with samples simulated with dask.
print(len(dataset))
96
[ ]: