mrjob

This lab gives you experience with running and using mrjob, both on a single machine (for basic development) and on Amazon AWS (for scaling).

Python generators

This section is a repeat of the material on the yield keyword from a prior lab. Now that you've seen this keyword in class, you may want to revisit this topic to understand how it works behind the scenes.

An iterator object is any object that can be iterated through in a linear fashion. There is only one operation that we are concerned with, and it is called next(). When there are no more elements in the iterator, it will raise a StopIteration exception. Lists, dictionaries and tuples are all iterable in Python, and to explicitly create an iterator from an iterable object we use iter():

>>> g = iter([0, 1, 2])
>>> g.next()
0
>>> g.next()
1
>>> g.next()
2
>>> g.next()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

A generator is another name for a iterator that was created similarly to how we define functions, inserting the yield keyword. Here is an example:

def gen():
    yield 0
    yield 1
    yield 2

Calling gen() now returns an iterator that will be indistinguishable to iter([0, 1, 2]). When we invoke gen(), instead of executing the function, it pauses at the very top of the function. Once we call next() on this object, it will continue the execution until it reaches yield and then return that value and pause again. If we call it again, it will simply go to the next yield.

Invoking next() directly is not the most elegant nor typical way of interacting with an iterator/generator, and it is instead much more common to extract the values through the following familiar patterns:

>>> for in gen():
...     print i
0
1
2
>>> sum(gen())
3
>>> list(gen())
[0, 1, 2]

Converting to a list is possible, as seen above, if the generator is finite. However, the whole point of iterators is to avoid having to store the entire sequence in memory at once. Iterators can even be infinite.

Exercises

mrjob

Amazon's Elastic MapReduce allows you to easily run the MapReduce algorithm on an AWS cluster. The Python package mrjob further simplifies this process if you want to implement the algorithm in Python.

Installing mrjob

To make sure you have the latest versions of the package in the CSIL environment:

pip2 install --upgrade --user mrjob
pip3 install --upgrade --user mrjob
echo "export PATH=~/.local/bin:\$PATH" >> ~/.bashrc
source ~/.bashrc
You need only do these commands once in a given environment.

If you are running on a machine you control, simply:

sudo pip2 install mrjob
sudo pip3 install mrjob

mrjob example

Taken from the mrjob documentation (Writing jobs in Python), a simple example of a word counter is:

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

The mapper function should yield tuples of (key, value), in this case (word, 1) indicating that you found one occurrence of word. These values are then combined using combiner, which is called for each word, with all the counts, as a generator. In the above example, the counts are combined to yield something like (word, n), where n is the word count on a particular cluster. Note that both mapper and combiner are called in parallel across several processes.

Finally, the computations of all processes are sent to a designated master node and combined using reducer. In this case, it takes counts again as a generator, but this time they won't all be ones. It may seem unnecessary to have both combiner and reducer, since they look identical in this example. However, combiner is performed on each process and reduces the amount of information that needs to be transmitted to the master node where reducer is called. The fact that they look identical only happens in some situations, such as for linear reduction functions or max/min operations.

Paste the above code into a filewordcount.py and run it. You haven't set up any configuration yet, so it will simply run on your CSIL machine. This is acceptable for small inputs, and lets you do a "dry run" of your code before scaling it up into a truly parallelized MapReduce computation. When it indicates it is reading from STDIN, paste in the following, press return, and then press control-D (you may have to press it multiple times):

This sentence is intended to have few words, is intended to be
this short; this is it.

You've just done a MapReduce computation, albeit with only one node. While the whole benefit of MapReduce is paralleization, the fact is that a MapReduce job should result in the same output regardless of the number of nodes; there is only the potential for increased performance by increasing node count (up to a point).

k-means mrjob

The file, within the github repository (remember to use python2) that we used in a prior lab, src/cs123/mrjob/kmeans.py uses mrjob to run the k-means in parallel. By default, it will run the jobs locally, but by specifying -r emr it will run them on an AWS cluster. For this, you have to create ~/.mrjob.conf with the following information:

runners:
  emr:
    strict_protocols: true
    aws_access_key_id: key
    aws_secret_access_key: secret_key
    aws_region: us-east-1
    ec2_key_pair: cslab
    ec2_key_pair_file: ~/cs123/cslab.pem
    ec2_instance_type: m1.medium
    num_ec2_instances: 2
    max_hours_idle: 1
    mins_to_end_of_hour: 5
    bootstrap:
    - sudo yum install -y python34 python34-devel python34-pip
    - sudo pip install --upgrade mrjob
    - sudo pip-3.4 install --upgrade mrjob

    emr_api_params: {"ServiceRole": "EMR_DefaultRole",
      "JobFlowRole": "EMR_EC2_DefaultRole"}

    # Less important
    local_tmp_dir: /tmp
    cmdenv:
      TZ: America/Chicago

  local:
    local_tmp_dir: /tmp

Notice that you have to fill in the same information as in your ~/.awsconfig file, as well as the name of your Key Pair file, which may be different than the example (cslab) given above. Remember that these are all from last week's lab. The m1.medium selects the instance type (does not work on t2.micro) and num_ec2_instances specifies how many instances you want to run it on.

As before, it is critical that you:

chmod 600 ~/.mrjob.conf

to protect the secret keys from being read by anyone else.

First, try running on your local machine with only one process:

cmsc12300/examples/data_analysis/src/cs123/mrjob$ python2 kmeans.py  --k 2 ../../../data/5clusters.txt

From the above you can see (before the prompt) which directory you need to be in, and the command-line options and path to an example file over which to perform the clustering. You can try this for different numbers of clusters; note that there are two dashes before the k in the command line this time. For each iteration, the code will print out the id number of each cluster and then the geometric coordinates of its centroid.

Now, you can try running on AWS. First, we need to do a one-time setup of EMR. By "one time," we mean that you only have to do the following command a single time ever. It sets up some authentication state in your account.

Begin by ensureing that your AWS_CONFIG_FILE environmental variable is set. You created a file in last week's lab for use with the aws-cli access method; then, you:

export AWS_CONFIG_FILE=path_and_name_of_config_file

If this is no longer in effect, do it again. Then, run the following:

aws emr create-default-roles

Now you are able to run Elastic MapReduce jobs.

Note that using EMR on AWS is extravagantly slow for small jobs. The time it takes to set up a cluster for use with MapReduce is significant. This up-front investment is repaid when you are able to process data rapidly. But, the simple MapReduce job we just created and ran locally is not anywhere near large enough to benefit, and its runtime is dominated by the setup time. In particular, with a shortened version of this job, it will take about 17 minutes. If you are running out of time for the lab and need to leave in the next few minutes, you may want to postpone trying this until a more convenient opportunity.

In fact, to get it to only take seventeen minutes, we need to actually modify the termination condition so it stops after only a single iteration. Open the kmeans.py file and change the line that compares diff to 10 so that it accepts any value of diff less than 100 instead.

Now, run with the additional option, inserted before -k, of -r emr. (This one has only one dash.) This will cause your computation to run on Amazon Web Services using Elastic MapReduce.

Note that the startup process is so slow that, even if you have an error in some detail of your configuration, and EMR is unable to begin the calculation, it may still take a minute or more for the error to appear. If the process goes on for more than a couple of minutes without printing an error, though, it's probably working. It will slowly print out incremental information, and finally return you to a prompt when it is completely done.

Clearly, using EMR for this small job is unjustifiable. But, you've now performed your first MapReduce calculation on the scalable AWS infrastructure, using two instances in parallel. And, this has given you an opportunity to work out any kinks with the configuration.

If you want to keep instances running and perform several jobs without having to start them and shut them down all the time, you can create persistent clusters and reuse them. How to do this with mrjob is explained later.

Exercises

Terminating

If you didn't do anything beyond the above instructions, the instances and Elastic MapReduce cluster should have shut down on their own. But, it is wise to check and make sure; and we'll describe how to do so for future reference as well.

To terminate all instances associated with a cluster, first you have to know the cluster ID, which you can check by:

$ aws emr list-clusters --output table

Once you know the ID, say j-<number>, then you can terminate it (and all the instances along with it) by running:

$ aws emr terminate-clusters --cluster-ids j-<number>

Make sure to terminate clusters when you are done with them for a while, since otherwise they will generate unnecessary charges. Make sure that the instances are shutting down and eventually are shut down by running aws ec2 describe-instances or through the web interface.

Starting a persistent cluster

Please run the following:

mrjob create-cluster

This will start n EC2 instances that have been configured to run Elastic MapReduce jobs. The value n is the number of EC2 instances specified in your .mrjob.conf file. When running this command you will be given a cluster ID value, something like j-<ID>. Please remember this value.

Note

If you do not finish the entire lab, please remember to jump to the last section called Terminate and finish it before you leave. If you don't, you will leave a draining charge on your AWS account. In general, when you are doing a development session, it may be a good idea to leave a cluster up for the duration of your session, but you should tear it down when you break for the day or for an otherwise prolonged period of time.

Now, go to the EC2 Management Console to see if your EC2 instances are ready for usage. Alternatively, you can use the AWS-CLI by running:

aws ec2 describe-instances --output table

Look for something that says Status. This might take a few minutes.

Once the cluster is up, here's how you would run a MapReduce task using it:

python2 mycode.py -r emr --cluster-id j-<CLUSTERID> input-file.txt

Where you will have to replace j-<CLUSTERID> with your cluster ID. The mrjob framework does not emphasize speed for small jobs, since there is no point in deploying them on a cluster in the first place. Because of this, you will find it much slower than running a small job locally. But at least with a cluster already warmed up and standing by, it will be faster than spawning a cluster each time. And, as we start to scale up to bigger jobs, the benefits will be clear for all of our jobs.

S3

When you input a file to mrjob and you're using Amazon EMR, it will automatically create a temporary bucket, upload the file to it, and then let the various nodes fetch the file from there directly. Instead, you can upload the file yourself and specify the S3 URL directly, which mrjob will recognize and handle appropriately. An example of an S3 URL is s3://datasets.elasticmapreduce/ngrams/books/, which happens to be Google's N-gram database. Using a persistent S3 bucket instead of having mrjob create a temporary one for each run makes sense when you plan on performing analyses on the same dataset repeatedly.

Note that even if we use input data on S3, mrjob might still create a bucket for temporary data and logs. To avoid this, you can create a bucket in advance and tell mrjob to use it instead. For a bucket named my-bucket, this is done by adding the following to the emr: section of your .mrjob.conf file:

s3_log_dir: s3://my-bucket/tmp/logs/
s3_scratch_dir: s3://my-bucket/tmp/

Handling S3 URLs

Uploading and downloading files to an S3 bucket can be done through various means,.

In a previous lab, you already used the S3 Management Console to create buckets and upload files.

Download this file, unzip it, and upload its contents into a new S3 bucket. Make sure to have a subdirectory in your bucket named files, into which you will place all the .txt files within the files directory that you get from decompressing this archive.

Show time

It's time to try our word counter on some real data. However, for it to finish within the lab, we won't scale it up that much.

By setting up your bucket in the previous section, you have already created a bucket with some works of literature in English from the Gutenberg Project. Luckily, not only can mrjob handle S3 URLs, but it handles entire folders as well, and will implicitly input all files in the folder. Putting it all together, you should be able to run (this all goes on one line):

python2 wordcount.py -r emr --cluster-id j-<CLUSTERID>
s3://your-bucket-name-from-previous-section/files/ > wordcounts.txt

Then, use a text editor to look at the output file wordcounts.txt. You could even use sort and uniq, which we learned about in CS 122, to find the top words. Can you do this directly in MapReduce, though? Well, you'll need to figure out how to complete Task 2 of the programming assignment....

Terminate

To terminate all instances associated with a persistent cluster, first you have to know the cluster ID. In case you forget this, you can check it by:

$ aws emr list-clusters --output table

Once you know the ID, say j-<CLUSTERID>, then you can terminate it (and all the instances along with it) by running:

$ aws emr terminate-clusters --cluster-ids j-<CLUSTERID>

Make sure to terminate clusters that you are done with for a while, since otherwise they will generate unnecessary charges. Make sure that the instances are in fact shutting down and eventually are shut down by running aws ec2 describe-instances or through the web interface.