RCC's Midway and MPI

This week you will be interacting with a real High-Performance Computing (HPC) cluster and learn how to submit jobs to a scheduler. Here is what we will cover:

Resources

MPI

Mrjob allowed us to write your tasks as MapReduce jobs for easy parallelization. This week we will instead look at Message Passing Interface (MPI), which is a lower-level abstraction for handling parallel processes. This means that it is much more expressive than MapReduce, since it can describe a richer set of communications between your parallel processes.

Any HPC center worth its salt will support MPI, so it is a very useful technology to know. RCC's Midway cluster is no exception, so let's start by looking at MPI.

Basics

As the name implies, MPI is an interface for how parallel processes communicate with one another through passing messages. Each process operates autonomously and is associated with a unique identifier, called a rank. Messages between processes are addressed using this rank.

Each process typically executes the same program, so if we want nodes to behave differently, we need to treat ranks differently within our program. Here is an example of how to print out some MPI related information using mpi4py:

from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

print "Hello from rank {0} of {1} on {2}".format(rank, size, name)

Save this as mpitest.py and try executing it as usual:

$ python mpitest.py

This will start one process and it will have rank 0. To start several, we have to run it through MPI, which is done using mpirun:

$ mpirun -np 4 python mpitest.py

Now, we have 4 parallel running instances of mpitest.py, running on the same computer. As we will later see, MPI makes it easy to scale this up beyond a single computer.

Midway

An introduction to RCC's Midway is offered in this tutorial:

This document explains how to SSH to Midway, load software modules, submit jobs and query their status. You should make an effort to read through the entire document throughout this lab. We will point out some of the essential bits, but you are expected to find further information in the tutorial.

Submitting a job

Copy the mpitest.py to your cluster (you can use scp or create a new file after logging in). As you can see at the bottom of the tutorial, create a bash script that runs the job, with a few settings to their scheduler through #SBATCH comments:

#!/bin/sh
#SBATCH --job-name=job1
#SBATCH --output=job1.out
#SBATCH --error=job1.err
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1

module load python
module load mpi4py
mpirun -np 4 python mpitest.py

Save this as job1.sh and submit it as:

sbatch job1.sh

Since this is a shared cluster, your job is handled by a scheduler, which will run when it sees fit. This can if we're lucky be immediate, or we could have to wait if there is a queue of jobs before ours. Use some of the commands from the tutorial to check up on your job. When it's done, check job1.out and confirm that the job ran on two nodes by checking the processor names (which we printed for each process).

For some of the following exercises, you can choose to run them on your CS computer or on the cluster. The former being especially preferable if there is a long waiting time.

Message passing

Now that we know how to launch multiple processes, let us look at how to send messages between them:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = [1, 2, 3, 4]
    comm.send(data, dest=1, tag=1234)
    print 'Send', data
elif rank == 1:
    data = comm.recv(source=0, tag=1234)
    print 'Received', data

This will fail if you run it with mpirun, since it will try to send a signal to a rank that doesn't exist. Try running it with mpirun -np 2. The message is addressed by dest=1, indicating that we want to send it to rank 1. At the receiving end we specify source=0 since we're expecting a message from rank 0. The tag is an integer that you can use to describe what kind of message it is. We picked 1234 arbitrarily.

Both send and recv are blocking until the message has been transferred, so it is possible to end up with deadlocks, which is when two processes are waiting for each other. MPI also supports non-blocking messages that can be used to circumvent situations like this.

In mpi4py, functions with lower-case, such as send and recv, operate gracefully on Python data structures. When you send data, it will pickle (serialize) the data into a binary representation. This pickled version of data can then be transmitted to another process and unpickled at arrival.

On the other hand, capitalized functions offer more direct wrappers of their C counterparts. For instance, Send and Recv can be used to transmit lower level data types, such as a numpy array:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = np.array([1, 2, 3, 4], dtype=np.int)
    comm.Send(data, dest=1, tag=1234)
    print 'Send', data
elif rank == 1:
    data = np.empty(4, dtype=np.int)
    comm.Recv(data, source=0, tag=1234)
    print 'Received', data

If we do it this way, we have to be a bit more mindful and manually make sure that the data type and the size is the same at both ends. However, since numpy arrays will pack data more efficiently than Python lists, this will have better performance.

Scatter and gather

Let us say that we want to have a function that takes and returns an integer, doing some useful processing in between. Furthermore, we want to calculate this for a range of input values, so it would be a perfect candidate for parallelization. MPI offers functions that make this easier called scatter and gather. First, let's take a look at how to scatter some data (note, you need to keep the imports and rank, size and comm from before):

if rank == 0:
    data = np.arange(20)
    chunks = np.array_split(data, size)
else:
    chunks = None

chunk = comm.scatter(chunks, root=0)

We are being lazy here and using lower-case scatter, that will even if given a numpy array, convert it to a list and then transmit it. This will be slower, but if the computation is the heavy part, then this won't make a difference so let's not optimize prematurely. The returned chunk will be a Python list, but we can easily convert it to a numpy array if we find that easier to manipulate.

Try printing chunk along with the rank and you will see that everyone gets an interval of integers, including rank 0. Now, we can run the expensive function on each value of chunk. For now, just make it double the value of chunk.

The next step is to send the calculated values back to rank 0, so that we can return it as a single array. This is done with gather:

gathered_chunks = comm.gather(chunk, root=0)

Here, we're assuming that chunk stores the modified values that we want to send back to rank 0. Since we're telling gather to send the gathered data to rank 0, gathered_chunks will be None for all other processes. As with many MPI functions in mpi4py, there are lower-level versions Gather and Scatter that will allow you to pack the data more efficiently.

Exercise:

  • Print out whatever gather is returning. Use np.concatenate to combine this into a final list and then print it out. You should do this only at rank 0. Gather and scatter will preserve order, so we don't have to worry about things getting out of order. Your output should be:

    [0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38]
    

Pi approximation

There are plenty of more functions offered by MPI, but these basic ones will be plenty if all you are doing is distributing a computationally heavy task. You will now parallelize a simple computation and deploy it on the Midway cluster.

We will calculate an approximation of \(\pi\) by evaluating a finite segment of the following infinite series:

\begin{equation*} \frac{\pi}{4} = \frac{1}{1} - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} + \frac{1}{9} - \dots \end{equation*}

A fast way to do this in Python is to try to avoid for loops and operate on numpy arrays as much as possible. We can do this by constructing an array of length N using np.arange, and then formulating an expression that will turn it into a list of the first N terms in the expression above. If we set N too large, your computer will have a hard time allocating enough memory for the array, even though the calculations themselves will still be relatively fast. This means that since we're using Python and numpy, it's natural to break this task up into chunks, even if performed on a single computer. For the record, if we implemented this in C on a single computer, we would not need to break it up into chunks, since this is only to avoid the slowness of Python for loops.

I suggest that you implement this by first creating a function that takes a single integer and the number of terms for each block and return the sum of that block. For instance, given the function 2 and 1000, the function should return the sum of the terms 2001 to 3000 (or 2000 to 2999 if we zero-index them). Now, the calculations will be fast and allocating memory is cheap, so we want to make these blocks pretty big. However, we need to set them so that they can comfortable fit inside your RAM, even if we're running 8 processes on the same computer. Let us be on the safe side and set it to 30 million, which will take about 240 MB to store in double-point precision (64 bits/value).

Confirm that you're getting something close to \(\pi\) before parallelizing it and make sure summing the first and second block takes you even closer to the real value of \(\pi\) . If it looks good, use scatter to distribute a list of block indices that your nodes should compute. If you use array_split, it will be easy to distribute multiple indices to nodes, which is appropriate since the computations themselves are quite fast and we want each node to do more than just one block. When you are calculating the different blocks at each node, it is okay to use a for loop, since these will be called much less frequently.

Now, set the block size to 30 million, which will make each block array of double-precision floating values to around 240 MB. Be very careful setting this any higher! When you write programs and you accidentally cause an infinite loop, it's easy to kill it by Ctrl-C. However, it is much worse if your program starts consuming all available memory, since it can leave your computer completely unresponsive.

Finally, you might as well sum together the calculations for all the blocks before you send them to rank 0 using gather. At rank 0, sum them again and print the value. Also print the difference between your value and np.pi. Your goal is to make this less than 1e-10, which will require running 1000 blocks across your nodes.

Note

You might note that since the terms alternate in sign, the approximation will oscillate between being an overestimation and an underestimation of the real value. By simply taking half of the last term that we add, we can greatly improve the approximation. For the purpose of this lab, let's consider this cheating.

Midway

Now try running this on the Midway cluster. You will have to create a bash script as described in their tutorial. Use the #SBATCH directives to specify 10 nodes and 8 jobs per node. When running mpirun you should now specify -np 80 to split the task into 80 processes. Monitor your task using squeue.

If you feel adventurous, you can increase the number of blocks to 10000, which will take around 10 minutes on the cluster.