Processing streaming data with a thread pool

Due: May 24 at 5pm.

We have seen that threads provide a mechanism to perform multiple tasks concurrently. Sometimes, the number of threads we will need is known beforehand. For example, if we want to process an image, we can divide it up and assign each portion of the image to a separate thread (after a certain point, adding more threads doesn't help; however, once we figure out the "sweet spot" of threads, that number will work every time we re-run the application on the same data and same machine).

However, in some applications, the number of tasks is not known beforehand and might even change throughout the lifetime of an application. For example, a web server receives a stream of requests for web pages and most modern web servers assign a thread to each request. If the web server spawns a new thread every time a new request comes along, it could end up spawning too many threads if there is a spike in web traffic (and those threads could use up valuable system resources, to the point where the server becomes saturated and can't deal with any more requests).

A common solution to this problem is the thread pool pattern, where a "pool" of threads is pre-spawned when the application starts running, and is always available to deal with incoming tasks. If the number of tasks ever exceeds the number of threads, the tasks are placed in a queue until a thread becomes available.

So, instead of potentially spawning more threads than the application can handle, we make these extra tasks wait. Another advantage is that the cost of starting up the threads is incurred once, when the application starts, instead of when a task is run (since we don't spawn a thread for each task; the threads are already there and are assigned to work on incoming tasks).

In this assignment, you will be implementing a thread pool to handle the 1.USA.gov clicks public stream. 1.USA.gov URLs are created whenever anyone shortens a .gov or .mil URL using bitly. The 1.USA.gov public stream provides real-time data every time anyone clicks on a 1.USA.gov URL (each click is represented as a JSON object).

Implementing a thread pool requires understanding locks and condition variables, both of which will be necessary to maintain a queue of tasks and to coordinate the threads in the thread pool. This assignment will help you practice using these synchronization primitives and will allow you to continue practicing C.

To ensure you can compile this assignment on your VM, type clang++ in a terminal window. If you receive an error message other than clang++: no input files, then run sudo apt-get install clang.

The Thread Pool Pattern

The thread pool pattern is typically implemented using a queue, N threads (typically referred to as "worker threads"), and a data structure that encapsulates a single task to be run by a thread.

When the thread pool is created, N threads are created and are initially idle (since there is no work to do). When a task is submitted to the thread pool, it is placed in the task queue. If there are any idle threads, an idle thread (selected arbitrarily) is notified that there is work to do. That thread then extracts the first task in the queue and runs it.

Once all the threads are busy, the threads no longer have to receive notifications that there is more work to do (although such notifications are harmless). Instead, once a thread is done with the task it was working on, it checks the queue again; if there are tasks in it, it takes the first task and runs it. If not (if the queue is empty), it goes to sleep and waits for the queue to be non-empty.

threadpool.png

A common feature of thread pools is that the number of threads also varies depending on how many tasks are waiting to be run. For example, if we created a pool with 4 threads, but suddenly there are 100 tasks waiting in the task queue, it would make sense to spawn more threads. Once the queue is empty, those extra threads can be destroyed to free up resources. The difference between doing this and just spawning threads for every single request is that the threads in the pool are longer-lived (and thus incur less overhead), and the adaptation to changes in demand can be conservative.

In this assignment, for simplicity, we will assume that the pool is created with N threads, and that this number does not change over time. Furthermore, the queue will have a maximum size of 10 tasks, meaning that any tasks submitted when the queue has 10 tasks in it will simply be dropped.

The 1.USA.gov click data

1.USA.gov is a URL shortening service. Any time a .gov or .mil URL is shortened using bitly, a 1.USA.gov URL is created. For example http://1.usa.gov/awQHfg is a short URL that redirects to http://www.cancer.gov/.

1.USA.gov provided a live feed of clicks on 1.USA.gov URLs ("provided" because the service is being decommissioned). Each click is represented as a single JSON object. For example:

{ "a": "Mozilla\/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident\/4.0)",
  "c": "US", "nk": 0, "tz": "America\/New_York", "gr": "PA",
  "g": "awQHfg", "h": "19WPvQ8", "l": "rocklandantibodies",
  "hh": "1.usa.gov", "r": "direct", "u": "http:\/\/www.cancer.gov\/",
  "t": 1369838391, "hc": 1369083379, "cy": "Lemoyne", "ll": [ 40.252499, -76.897003 ] }

You will not have to worry about parsing the JSON, as we will be providing the code that handles this. Nonetheless, notice how the data includes several interesting pieces of information, such as the web browser used, the URL being clicked (http://www.cancer.gov/), and even the longitude/latitude where the click happened. You can find a more complete description of each data field here (though you do not need to appreciate the details to complete this assignment).

In this assignment, you will be able to run your code on a file containing clicks that was captured and saved in the past.

The code

Do a git pull upstream master to get the distribution files for this assignment. You will receive a folder named pa3, with several C++ [sic] files that handle most of the functionality of dealing with the 1.USA.gov data. The only thing missing is the thread pool.

(Do not adjust your television: yes, we are providing C++ files even though this course only teaches you C. Why? We use a package to parse the JSON which has a C++ interface. Thus, the code we wrote that accesses this package is itself in C++. The easiest way to integrade this code with the other parts of the program that are not themselves C++ is to make these, otherwise C, files, C++ files. Since C++ is essentially a superset of C, this allows you to write your code in C, in a file that is nominally a C++ file, and have everything work together. Rest assured that none of the code that you need to understand in this assignment is in C++.)

The following are files we provide which you should not have to modify:

The following are files you will be modifying:

To build the program, just run make:

make

This will generate a usagov-clicks executable.

The program accepts several command-line parameters:

After the program completes its analysis, it will print out a single line with the following information:

FILE,NTHREADS,ELAPSED,N,DROPPED

Where:

Your Tasks

You must complete the following tasks in this assignment. To help you avoid needlessly complex approaches, we have cited the rough line counts of our solutions, but you should not be overly concerned if your solution has slightly more or fewer lines.

Task 1: Make USAGovClickData thread-safe (20 points)

The USAGovClickData struct is used to aggregate information about multiple clicks. This means that there will be a single USAGovClickData struct shared by all the threads and, as each thread processes a click, it will update the data in USAGovClickData (this is actually done in USAGovClickTask). However, USAGovClickData is not currently thread-safe. You must update it so that there can be no race conditions when the threads access the shared USAGovClickData struct.

Our solution to this task required fewer than 5 lines of code.

Task 2: Implement the thread pool (30 + 30 points)

To implement the thread pool, you will have to implement the pool_schedule function for the ThreadPool struct (30 points) and the worker_run function for the WorkerThread struct (30 points). You do not need to implement the pool_stop function for the ThreadPool struct yet.

When implementing the thread pool, you must take the following into account:

  • Tasks submitted with the pool_schedule function are placed in the task queue (unless the queue has maxTasks tasks in it, in which case the task is dropped). Then, the threads should be notified that there is work to do. Even though the queue may have been non-empty previously, some threads may still be asleep if there was not previously enough work to keep all the threads busy. So, now that there is more work, we should notify a sleeping thread, if there is one, so that it has the chance to make itself useful by providing further concurrency.
  • Threads are in one of two states: idle and working.
    • When a thread is idle, it is just waiting for the queue to be non-empty. This idle state can't be implemented with "busy waiting" (i.e., looping constantly while waiting for the queue to be non-empty). You must use synchronization primitives to make sure that the thread is blocked while waiting for the queue to be non-empty, and has the potential to be woken up when the queue has at least one task. (We say "has the potential" because there will be a number of threads, and only one of them, chosen arbitrarily, may actually be woken up.)
  • The state of a thread is a conceptual matter; you may or may not find it necessary or helpful to actually store this information in some way.
  • When a thread is working, it simply runs the task_run function on the task it took from the queue. Once it's done, it checks if there are more tasks in the queue. If there are, it just takes the first task and runs it. If there are no tasks in the queue, then the thread goes into the idle state.
  • The task queue is implemented using a linked list. We have provided the functions queue_length, queue_enqueue, and queue_dequeue to work with this queue.
  • The task queue is shared by all the threads. Make sure it is accessed in a thread-safe manner, but that threads can attempt to work on the tasks themselves concurrently.

Our solution to this task required 10 lines of code in WorkerThread and around the same in ThreadPool. It can be implemented with a single mutex and a single condition variable, which, in fact, have already been provided for you.

Task 3: Implement the pool_stop function for ThreadPool (20 points)

The worker threads in a thread pool will continue to run tasks (and to wait for tasks to appear in the queue) until explicitly told to stop. This is in contrast to threads that are given a single finite task to work on; if we want to wait until the thread completes, we just use pthread_join .

Here, we need to explicitly stop the threads. you will need to figure out a way to communicate to the threads that they should stop processing the task queue (and that the worker_run() function should return). We should then wait for each thread to actually stop.

This is a tricky task, although it ultimately requires few lines of code: our solution required a few extra lines (and a couple of changed lines) in WorkerThread, about the same to implement the pool_stop function, and one or two other small tweaks. No extra synchronization primitives should be needed.

Testing

We have included a clicks.json file that you can use to test your implementation.

We suggest that you start by testing your implementation by setting the thread pool to have a single thread. That will allow you to verify that you are processing the click data correctly. You can try running your solution like this:

./usagov-clicks -f clicks.json -t 1 -v

It should print something like this (though you should take all these outputs with an enormous grain of salt, as they vary from computer to computer and run to run):

Task 1 scheduled.
Task 2 scheduled.
Task 3 scheduled.
Task 4 scheduled.
Task 5 scheduled.
Task 6 scheduled.
Task 7 scheduled.
Task 8 scheduled.
Task 9 scheduled.
Task 10 scheduled.
Task 11 scheduled.
Task 12 scheduled.
Task 13 scheduled.
Task 14 scheduled.
Task 15 scheduled.
Task 16 scheduled.
Task 17 scheduled.
Task 18 scheduled.
Task 19 scheduled.
Task 20 scheduled.
Task 21 scheduled.
Task 22 scheduled.
Task 23 scheduled.
Task 24 scheduled.
Task 25 scheduled.
Task 26 scheduled.
Task 27 scheduled.
Task 28 scheduled.
Task 29 scheduled.
Task 30 scheduled.
Task 31 scheduled.
Task 32 scheduled.
Task 33 scheduled.
Task 34 scheduled.
Task 35 scheduled.
Task 36 scheduled.
Task 37 scheduled.
Task 38 scheduled.
Task 39 scheduled.
Task 40 scheduled.
Task 41 scheduled.
Task 42 scheduled.


Running time: 0.00275696
Received 42 clicks.
Dropped 0 clicks.
Processed 42 clicks.

10 of the clicks were from new users

Next, try introducing arbitrary durations and intervals for the tasks, to verify that the thread pool correctly detects when it has reached its maximum capacity. For example, running this:

./usagov-clicks -f clicks.json -t 1 -v -i 10 -d 20

Should produce something like this:

Task 1 scheduled.
Task 2 scheduled.
Task 3 scheduled.
Task 4 scheduled.
Task 5 scheduled.
Task 6 scheduled.
Task 7 scheduled.
Task 8 scheduled.
Task 9 scheduled.
Task 10 scheduled.
Task 11 scheduled.
Task 12 scheduled.
Task 13 scheduled.
Task 14 scheduled.
Task 15 scheduled.
Task 16 scheduled.
Task 17 scheduled.
Task 18 scheduled.
Task 19 scheduled.
Task 20 scheduled.
Task 21 scheduled.
Task 22 NOT scheduled.
Task 23 scheduled.
Task 24 NOT scheduled.
Task 25 scheduled.
Task 26 NOT scheduled.
Task 27 scheduled.
Task 28 NOT scheduled.
Task 29 scheduled.
Task 30 NOT scheduled.
Task 31 scheduled.
Task 32 NOT scheduled.
Task 33 scheduled.
Task 34 NOT scheduled.
Task 35 scheduled.
Task 36 NOT scheduled.
Task 37 scheduled.
Task 38 NOT scheduled.
Task 39 scheduled.
Task 40 NOT scheduled.
Task 41 scheduled.
Task 42 NOT scheduled.


Running time: 0.442897
Received 42 clicks.
Dropped 11 clicks.
Processed 22 clicks.

18 of the clicks were from new users

The actual dropped tasks may vary when you run this, but you should nonetheless see some of the tasks being dropped. In this case, because the thread pool only has one thread, the task queue eventually fills up (to its limit of ten tasks) and has to start dropping tasks. If you run the following (the same as above, but with two threads):

./usagov-clicks -f clicks.json -t 2 -v -i 10 -d 20

You should not see any dropped task.

Also note that the dropped and the processed tasks in the previous run added up to 33, which means 9 tasks were unaccounted for. These tasks were entered into the queue but effectively cancelled when we chose to exit the program.