Massively parallel processing

Mark Edmondson

2019-05-04

Run massive parallel R jobs cheaply

Due to its integration with future you can run massive computing tasks using a Google Compute Engine cluster with just a few lines of code.

Some more examples of using future can be found here, using fractals as an example.

On other platforms, see also an Azure example here on Revolution Analytics.

Remote R cluster

This workflow takes advantage of the future integration to run your local R-functions within a cluster of GCE machines.
You can do this to throw up expensive computations by spinning up a cluster and tearing it down again once you are done.

In summary, this workflow:

  1. Creates a GCE cluster
  2. Lets you perform computations
  3. Stops the VMs

Create the cluster

The example below uses a default rocker/r-parallel template, but you can also create a dynamic_template pulled from the Container Registry if required.

Instead of the more generic gce_vm() that is used for more interactive use, we create the instances directly using gce_vm_cluster().

This creates a cluster, uploads any SSH settings you have and tests the connection, then returns the list of VMs suitable for use in future::cluster().

By default it makes a 3 size cluster called r-cluster-1/2/3:

library(future)
library(googleComputeEngineR)

vms <- gce_vm_cluster()
#2019-03-29 23:24:54> # Creating cluster with these arguments:template = r-base,dynamic_image = rocker/r-parallel,wait = #FALSE,predefined_type = n1-standard-1
#2019-03-29 23:25:04> Operation running...
#2019-03-29 23:25:07> Operation running...
#2019-03-29 23:25:10> Operation running...
#2019-03-29 23:25:17> Operation complete in 13 secs
#2019-03-29 23:25:20> Operation complete in 13 secs
#2019-03-29 23:25:23> Operation complete in 14 secs
#2019-03-29 23:25:25> r-cluster-1 VM running
#2019-03-29 23:25:27> r-cluster-2 VM running
#2019-03-29 23:25:29> r-cluster-3 VM running
#2019-03-29 23:25:37> Public SSH key uploaded to instance
#2019-03-29 23:25:45> Public SSH key uploaded to instance
#2019-03-29 23:25:53> Public SSH key uploaded to instance
#2019-03-29 23:25:53> # Testing cluster:
#Warning: Permanently added '35.233.25.199' (ED25519) to the list of known hosts.
r-cluster-1 ssh working
#Warning: Permanently added '35.187.54.41' (ED25519) to the list of known hosts.
r-cluster-2 ssh working
#Warning: Permanently added '35.205.66.124' (ED25519) to the list of known hosts.
r-cluster-3 ssh working

We now make the VM cluster as per details given in the future README

## make a future cluster
plan(cluster, workers = as.cluster(vms))

You can pass in your own arguments to gce_vm_cluster() such as which docker image to use, name and custom SSH arguments you may have. See the function documentation for details.

Using your own Docker image

The default uses rocker/r-parallel as its image, but if you want your own custom image then create your own Docker image based on that one, for example via this tutorial using Google Build Triggers.

This will give you a docker image name such as gcr.io/my-project/my-r - use a version of the code below to use this in your cluster.

You can also customise the RScript command that launches your script, but always make sure to include --net=host as is shown in the default arguments, so the Docker image uses the SSH ports the host VM has (e.g. it can connect to your SSH commands)

plan(cluster, workers = as.cluster(vms, docker_image="gcr.io/my-project/my-r"))

Using the cluster

The cluster is now ready to recieve jobs. You can send them by simply using %<-% instead of <-. Another useful function is future.apply::future_lapply that lets you loop over a cluster. Consult the future.apply documentation for details.

## use %<-% to send functions to work on cluster
## See future README for details: https://github.com/HenrikBengtsson/future
a %<-% Sys.getpid()

## make a big function to run asynchronously
f <- function(my_data, args){
   ## ....expensive...computations
   
   result
}

## send to cluster
result %<-% f(my_data) 

For long running jobs you can use future::resolved to check on its progress.

## check if resolved
resolved(result)
[1] TRUE

Examples

Forecasting a large data set

The below splits a dataset into chunks that are each run on a seperate VMs, using a custom Docker image that has the necessary packages installed, for instance via build triggers. Optimise by including the package future in these Docker images.

library(future.apply) ## Will automatically load future too
library(googleComputeEngineR)

my_docker <- gce_tag_container("custom-image", project = "my-project")

vms <- gce_vm_cluster("r-vm", cluster_size = 3, docker_image = my_docker)
                
## create the future cluster
plan(cluster, 
     workers = as.cluster(vms, 
                          docker_image=my_docker))
                          
## create the list of data to run on the cluster
## here we assume they are in a folder of CSVs
## and there are as many files as VMs to run it upon
my_files <- list.files("myfolder")

my_data <- lapply(my_files, read.csv)

## make a big function to run asynchronously
cluster_f <- function(my_data, args = 4){
   
   forecast::forecast(forecast::auto.arima(ts(my_data, frequency = args)))
   
}

## send to cluster
result <- future.apply::future_lapply(my_data, cluster_f, args = 4) 

## once done this will be TRUE
resolved(result)

## Your list of forecasts are now available
result

Rasters in parallel

This is from @ctlamb’s GitHub issue #93 which uses a custom Dockerfile to install the raster package.

The custom Dockerfile was setup in this GitHub repo then made into an image with these Build Trigger settings:

Make sure the VMs are created in the same project as the build triggers to ensure authentication is smooth.

The example code is shown below, assuming your custom Docker image is available at gcr.io/your-project/raster

library(raster)
library(googleComputeEngineR)
library(future)
library(future.apply)
library(SpaDES.tools)

gce_global_project("your-project")

## create raster
row <- 8
col <- 8
r <- raster(nrows=row, ncols=col,
            xmn=0, xmx=row, 
            ymn=0, ymx=col, 
            vals=c(1:(row*col)))
plot(r)

## Split
r_split <- splitRaster(r, nx=2, ny=2)

## create model
df <- data.frame(y=c(1:10),layer=c(1:5,7,6,8:10))
mod <- glm(y~layer, data=df)


## create CPUs names - here we customise the CPU machine type
vms <- gce_vm_cluster("myvms", predefined_type = "n1-highmem-2")

## once all launched, add to cluster with custom Dockerfile
## use plan(sequential) for local testing
plan(cluster, workers = as.cluster(vms, docker_image=my_image)

## make the vector of stuff to send to nodes
o <- lapply(r_split, readAll)

## the action you want to perform on the elements in the cluster
my_single_function <- function(x){
  raster::predict(x, mod)
}

#parallel - working?
result <- future_lapply(o, my_single_function)

## tidy up
lapply(vms, FUN = gce_vm_stop)

Cleanup

Remember to shut down your cluster. You are charged per second, per instance of uptime.

## shutdown instances when finished
gce_vm_stop(vms)

# or delete them
gce_vm_delete(vms)

Pre-emptible VMs

Preemptible VMs are a lot cheaper (80%) than normal instances, but Google reserves the right to stop them at any time. They are intended to be used in non-critical jobs where if they shutdown you can account for it and launch another.

To create them, you need to pass scheduling = list(preemptible = TRUE) to gce_vm_create() creation family of functions.

Make sure you can cope with the result may not be returned, so over provision the VMs and ensure your script can deal with redoing jobs if they didn’t complete.

Quotas

You can launch as many VMs as you have quota for in your account. These vary from region, from ~240 to 720. You can apply for more quota if you need it.