MPI

This week you will gain experience with the MPI (Message Passing Interface) programming environment used for many High-Performance Computing (HPC) codes. Here is what we will cover:

Resources

MPI

Mrjob allowed us to write tasks as MapReduce jobs for easy parallelization. This week we will instead look at Message Passing Interface (MPI), which is a lower-level abstraction for handling parallel processes. This means that it is much more expressive than MapReduce, since it can describe a richer set of communications between your parallel processes.

Any HPC center worth its salt will support MPI, so it is a very useful technology to know. And, you can set up MPI on a cluster quite easily yourself.

Basics

As the name implies, MPI is an interface for how parallel processes communicate with one another through passing messages. Each process operates autonomously and is associated with a unique identifier, called a rank. Messages between processes are addressed using this rank. Processes know the role they are supposed to play in the overall computation by referring to their rank. Put another way, the rank is the characteristic used to provide differentiation across all the nodes, so they do not duplicate effort.

Each process typically executes the same program, so if we want nodes to behave differently, we need to treat ranks differently within our program.

Setting up MPI on AWS

We'll now set up a very small MPI cluster on AWS by creating and configuring EC2 instances. Because we'll have to do things by hand on each instance to get them ready to use, we'll only create two instances to save effort. But, normally, you'd be using dozens, hundreds, or more.

Here's what to do:

  • Go to the AWS management console, to the EC2 console within it, and to the Instances view within that.

  • Click Launch Instance

  • Choose the "Amazon Linux AMI 2016.03.0 (HVM), SSD Volume Type" AMI.

  • Choose t2.micro, but don't click the blue button. Click Next instead. This allows us to do further customization, which customization we have not had to do in the past.

  • Number of instances: 2. This allows us to make multiple identical instances at once. Click Next again instead of the blue button.

  • Click Next two more times.

  • Click Add Rule

  • Choose All Traffic, and set the final field (the blank one under "0.0.0.0/0" to be "172.31.0.0/16". This allows the machines to talk to each other fully, without firewall security protection between them. By specifying only that nodes within a certain (private-to-AWS) address range can send the traffic, however, we are still protecting the nodes from the outside world; the only traffic allowed from it is ssh traffic (as was already configure by default).

  • Now go ahead and launch the nodes.

  • Get back to the table showing the list of instances

  • Select the first instance and note its Public DNS. Also, note its Private IP (should be 172.31.x.x).

  • Select the second instance and also note its Public DNS and Private IP.

  • Log into the first instance:

    ssh -i path-to-your-aws-key-file ec2-user@public-dns-1
    
  • Perform the following (substituting in actual IP addresses from earlier) to install and configure (the mpich version of) MPI and mpi4py:

    sudo yum install mpich-devel
    echo export PATH=/usr/lib64/mpich/bin/:$PATH >> .bashrc
    echo export LD_LIBRARY_PATH=/usr/lib64/mpich/lib/:$LD_LIBRARY_PATHPATH >> .bashrc
    source .bashrc
    echo private-ip-1 >> hosts
    echo private-ip-2 >> hosts
    sudo pip install numpy
    wget https://pypi.python.org/packages/source/m/mpi4py/mpi4py-1.3.1.tar.gz
    tar xzf mpi4py-1.3.1.tar.gz
    cd mpi4py-1.3.1
    python setup.py build
    sudo python setup.py install
    
  • Log in to the second machine and do the same. (Now you see why we're only doing this on two machines for this lab.)

  • On your CSIL machine's terminal:

    scp -i your-key-file your-key-file ec2-user@public-dns-1:~/.ssh/id_rsa
    scp -i your-key-file your-key-file ec2-user@public-dns-2:~/.ssh/id_rsa
    
  • On node 1, ssh private-ip-2 and accept the host key. Then immediately ssh private-ip-1 and accept the host key. (Substitute in the actual IPs.)

  • The machines can now connect to each other without passwords.

Running codes

As you run example MPI code or develop your own, edit it on the CSIL machine, then copy it to the AWS nodes:

scp -i your-key-file myfile.py ec2-user@public-dns-1:~/
scp -i your-key-file myfile.py ec2-user@public-dns-2:~/

Note that you must copy it to both nodes, as MPI does not distribute the python code for you the way mrjob does.

MPI examples

Here is an example of how to print out some MPI related information using mpi4py:

from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

print "Hello from rank {0} of {1} on {2}".format(rank, size, name)

Save this as mpitest.py, scp it to both of the machines, log into one machine, and try executing it as usual:

$ python mpitest.py

This will start one process and it will have rank 0. To start several, we have to run it through MPI, which is done using mpiexec:

$ mpiexec -n 2 python ~/mpitest.py

To be clear, you only do this on one node.

Now, we have 2 parallel running instances of mpitest.py, running on the same computer. This might make sense for testing, or if we have two cores on our computer. As we will see, MPI makes it easy to scale this up beyond a single computer:

$ mpiexec -f hosts -n 2 python ~/mpitest.py

Message passing

Now that we know how to launch multiple processes, let us look at how to send messages between them:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = [1, 2, 3, 4]
    comm.send(data, dest=1, tag=7)
    print 'Send', data
elif rank == 1:
    data = comm.recv(source=0, tag=7)
    print 'Received', data

This will fail if you don't run it with mpiexec -n 2, since it will try to send a signal to a rank that doesn't exist. Try running it with mpiexec -f hosts -n 2. The message is addressed by dest=1, indicating that we want to send it to rank 1. At the receiving end we specify source=0 since we're expecting a message from rank 0. The tag is an integer that you can use to describe what kind of message it is. We picked 7 arbitrarily.

Both send and recv are blocking until the message has been transferred, so it is possible to end up with deadlocks, which is when two processes are waiting for each other. MPI also supports non-blocking messages that can be used to circumvent situations like this.

Note that when writing networking code, it is common to perform blocking receives so that we wait for a message to actually come in before proceeding. It is less common to perform blocking sends, as we are doing here, to keep the sender in lock step with the receiver. (In particular, this causes the sender to wait until the receiver is actually at the point in the code where it is ready to receive. Often, in other network programming styles, we send the data and let it get queued up at the receiver if the remote end isn't ready for it yet.)

In mpi4py, functions with lower-case, such as send and recv, operate gracefully on Python data structures. When you send data, it will pickle (serialize) the data into a binary representation. This pickled version of data can then be transmitted to another process and unpickled at arrival.

On the other hand, capitalized functions offer more direct wrappers of their C counterparts. For instance, Send and Recv can be used to transmit lower level data types, such as a numpy array:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = np.array([1, 2, 3, 4], dtype=np.int)
    comm.Send(data, dest=1, tag=7)
    print 'Send', data
elif rank == 1:
    data = np.empty(4, dtype=np.int)
    comm.Recv(data, source=0, tag=7)
    print 'Received', data

If we do it this way, we have to be a bit more mindful and manually make sure that the data type and the size is the same at both ends. However, since numpy arrays will pack data more efficiently than Python lists, this will have better performance.

Scatter and gather

Let us say that we want to have a function that takes and returns an integer, doing some useful processing in between. Furthermore, we want to calculate this for a range of input values, so it would be a perfect candidate for parallelization. MPI offers functions that make this easier called scatter and gather. First, let's take a look at how to scatter some data (note, you need to keep the imports and rank, size and comm from before):

if rank == 0:
    data = np.arange(20)
    chunks = np.array_split(data, size)
else:
    chunks = None

chunk = comm.scatter(chunks, root=0)

We are being lazy here and using lower-case scatter, that will even if given a numpy array, convert it to a list and then transmit it. This will be slower, but if the computation is the heavy part, then this won't make a difference so let's not optimize prematurely. The returned chunk will be a Python list, but we can easily convert it to a numpy array if we find that easier to manipulate.

Try printing chunk along with the rank and you will see that everyone gets an interval of integers, including rank 0. Now, we can run the expensive function on each value of chunk. For now, just make it double the value of chunk.

The next step is to send the calculated values back to rank 0, so that we can return it as a single array. This is done with gather:

gathered_chunks = comm.gather(chunk, root=0)

Here, we're assuming that chunk stores the modified values that we want to send back to rank 0. Since we're telling gather to send the gathered data to rank 0, gathered_chunks will be None for all other processes. As with many MPI functions in mpi4py, there are lower-level versions Gather and Scatter that will allow you to pack the data more efficiently.

Exercise:

  • Print out whatever gather is returning. Use np.concatenate to combine this into a final list and then print it out. You should do this only at rank 0. Gather and scatter will preserve order, so we don't have to worry about things getting out of order. Your output should be:

    [0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38]
    

Terminate

As always, please don't forget to terminate your instances.