MPI

This week you will gain experience with the MPI (Message Passing Interface) programming environment used for many High-Performance Computing (HPC) codes. Here is what we will cover:

Resources

MPI

Mrjob allowed us to write tasks as MapReduce jobs for easy parallelization. This week we will instead look at Message Passing Interface (MPI), which is a lower-level abstraction for handling parallel processes. This means that it is much more expressive than MapReduce, since it can describe a richer set of communications between your parallel processes.

Any HPC center worth its salt will support MPI, so it is a very useful technology to know. And, you can set up MPI on a cluster quite easily yourself.

Basics

As the name implies, MPI is an interface for how parallel processes communicate with one another through passing messages. Each process operates autonomously and is associated with a unique identifier, called a rank. Messages between processes are addressed using this rank. Processes know the role they are supposed to play in the overall computation by referring to their rank. Put another way, the rank is the characteristic used to provide differentiation across all the nodes, so they do not duplicate effort.

Each process typically executes the same program, so if we want nodes not to duplicate work, we need to treat ranks differently within our program.

Setting up MPI on Google Cloud Platform

We'll now set up a very small MPI cluster on Google Cloud Platform by creating and configuring Compute Engine instances. Because we'll have to do things by hand on each instance to get them ready to use, we'll only create two instances to save effort. But, normally, you'd be using dozens or more.

You'll need to perform these steps within an environment on which you have set up your Compute Engine ssh key. Recall that you ran a program called ssh-keygen during lab 2.

Here's what to do next:

  • Go to the Google Cloud console, to the Compute Engine console within it, and choose "VM instances".

  • Create a VM instance. (Refer to the instructions in lab 2 for the details if necessary. Be sure to choose Machine type: e2-medium to avoid it taking way too long.)

  • Once the instance is ready (checkmark in the list instead of a spinner), click on the name. In the resulting screen, click the "Create similar" button (two spots to the right of "Reset", which looks like a circular arrow with a vertical line in it). Accept the same settings and click "Create".

  • Open two terminal windows and ssh into both instances side-by-side. (Refer to lab 2 if necessary.)

  • In the "VM instances" table on the Google Cloud web site, take note of the "Internal IP". You needed the "External IP" for ssh'ing, but need the Internal IP for node-to-node communications, which you will configure in the next step. Take only the part with numbers and dots (e.g. 1.2.3.4), not the part that has something like "(nic0)".

  • On both machines, perform the following (substituting in the two actual Internal IP addresses from the previous step) to install and configure (the mpich version of) MPI and mpi4py:

    sudo apt-get install mpich
    echo internal-ip-1 >> hosts
    echo internal-ip-2 >> hosts
    sudo apt-get install python-pip
    sudo apt-get install python-dev
    sudo pip install mpi4py
    sudo apt-get install python3-dev
    sudo apt-get install python3-pip
    sudo pip3 install mpi4py
    sudo pip install numpy
    sudo pip3 install numpy
    
  • We need to copy your authentication credentials to the machines themselves so they can log into each other. Open up a third terminal on the machine you are using to connect to the machines and perform the following, filling in the two external IPs from the "VM instances" console (note that there should be no space after the last digit of the IP and before the colon):

    scp -i .ssh/google-cloud-cs123 .ssh/google-cloud-cs123 username@external-IP-1:~/.ssh/id_rsa
    scp -i .ssh/google-cloud-cs123 .ssh/google-cloud-cs123 username@external-IP-2:~/.ssh/id_rsa
    
  • On node 1, ssh internal-ip-2 and accept the host key. Then immediately ssh internal-ip-1 and accept the host key. (Substitute in the actual IPs.) Type exit (followed by return), twice. This should take you back to a direct connection to the first machine.

  • The machines can now connect to each other without passwords.

Running codes

As you run example MPI code or develop your own, edit it on the same machine you are using to connect to the nodes, then copy it to both of the Compute Engine nodes:

scp -i .ssh/google-cloud-cs123 myfile.py username@external-IP-1:~/
scp -i .ssh/google-cloud-cs123 myfile.py username@external-IP-2:~/

Note that you must copy it to both nodes, as MPI does not distribute the python code for you the way mrjob does.

MPI examples

Here is an example of how to print out some MPI related information using mpi4py:

from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

print("Hello from rank {0} of {1} on {2}".format(rank, size, name))

Save this as mpitest.py, scp it to both of the machines, go to one machine, and try executing it as usual:

$ python3 mpitest.py

This will start one process and it will have rank 0. To start several, we have to run it through MPI, which is done using mpiexec:

$ mpiexec -n 2 python3 ~/mpitest.py

To be clear, you only do this on one node.

The printouts from both processes may end up concatenated on the same line, or even interleaved, rather than being nicely printed on separate lines; this is because they are both printing to the console at the same time and are not coordinating with each other. (This is a race condition involving the console.)

Now, we have 2 parallel running instances of mpitest.py, running on the same computer. This might make sense for testing, or if we have two cores on our computer. As we will see, MPI makes it easy to scale this up beyond a single computer (we just need to tell it the name of the file we created earlier that lists both machines):

$ mpiexec -f hosts -n 2 python3 ~/mpitest.py

Message passing

Now that we know how to launch multiple processes, let us look at how to send messages between them:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = [1, 2, 3, 4]
    comm.send(data, dest=1, tag=7)
    print('Send', data)
elif rank == 1:
    data = comm.recv(source=0, tag=7)
    print('Received', data)

This will fail if you don't run it with mpiexec -n 2, since it will try to send a signal to a rank that doesn't exist. Try running it with mpiexec -f hosts -n 2. (Don't forget to copy the code to both machines.) The message is addressed by dest=1, indicating that we want to send it to rank 1. At the receiving end we specify source=0 since we're expecting a message from rank 0. The tag is an integer that you can use to describe what kind of message it is. We picked 7 arbitrarily.

Both send and recv are blocking until the message has been transferred, so it is possible to end up with deadlocks, which is when two processes are waiting for each other and neither can make progress. MPI also supports non-blocking messages that can be used to circumvent situations like this.

Note that when writing networking code, it is common to perform blocking receives so that we wait for a message to actually come in before proceeding. It is less common to perform blocking sends, as we are doing here, to keep the sender in lock step with the receiver. (In particular, this causes the sender to wait until the receiver is actually at the point in the code where it is ready to receive. Often, in other network programming styles, we send the data and let it get queued up at the receiver if the remote end isn't ready for it yet.)

In mpi4py, functions with lower-case, such as send and recv, operate gracefully on Python data structures. When you send data, it will pickle (serialize) the data into a binary representation. This pickled version of data can then be transmitted to another process and unpickled at arrival.

On the other hand, capitalized functions offer more direct wrappers of their C counterparts. For instance, Send and Recv can be used to transmit lower level data types, such as a numpy array:

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = np.array([1, 2, 3, 4], dtype=int)
    comm.Send(data, dest=1, tag=7)
    print('Send', data)
elif rank == 1:
    data = np.empty(4, dtype=int)
    comm.Recv(data, source=0, tag=7)
    print('Received', data)

If we do it this way, we have to be a bit more mindful and manually make sure that the data type and the size is the same at both ends. However, since numpy arrays will pack data more efficiently than Python lists, this will have better performance.

Scatter and gather

Let us say that we want to have a function that takes and returns an integer, doing some useful processing in between. Furthermore, we want to calculate this for a range of input values, so it would be a perfect candidate for parallelization. MPI offers functions that make this easier called scatter and gather. First, let's take a look at how to scatter some data (note, you need to keep the imports and rank, size and comm from before):

if rank == 0:
    data = np.arange(20)
    chunks = np.array_split(data, size)
else:
    chunks = None

chunk = comm.scatter(chunks, root=0)

We are being lazy here and using lower-case scatter, that will even if given a numpy array, convert it to a list and then transmit it. This will be slower, but if the computation is the heavy part, then this won't make a difference so let's not optimize prematurely. The returned chunk will be a Python list, but we can easily convert it to a numpy array if we find that easier to manipulate.

Try printing chunk along with the rank and you will see that everyone gets an interval of integers, including rank 0. Now, we can run the expensive function on each value of chunk. For now, just make it double the value of chunk.

The next step is to send the calculated values back to rank 0, so that we can return it as a single array. This is done with gather:

gathered_chunks = comm.gather(chunk, root=0)

Here, we're assuming that chunk stores the modified values that we want to send back to rank 0. Since we're telling gather to send the gathered data to rank 0, gathered_chunks will be None for all other processes. As with many MPI functions in mpi4py, there are lower-level versions Gather and Scatter that will allow you to pack the data more efficiently.

Exercise:

  • Print out whatever gather is returning. Use np.concatenate to combine this into a final list and then print it out. You should do this only at rank 0. Gather and scatter will preserve order, so we don't have to worry about things getting out of order. Your output should be:

    [0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38]
    

Terminate

As always, please don't forget to terminate your instances.