Concurrency in C

In this lab you will learn the basics of running concurrent threads with shared memory.

Threads

Here's how to create a basic thread:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

void* func(void* x) {
    int xi = (int)x;

    printf("Inside thread; x = %d\n", xi);

    return (void*)(xi + 123);
}

int main(int argc, char** argv) {
    pthread_t th;
    pthread_create(&th, NULL, func, (void*)100);

    void* ret_from_thread;
    int ri;
    pthread_join(th, &ret_from_thread);
    ri = (int)ret_from_thread;

    printf("Outside thread, which returned %d\n", ri);
    return 0;
}

Copy this to main.c and compile it by running:

$ clang -lpthread main.c

Note that we have to specify the pthread library to properly compile the code. You will receive a warning about one of the casts, which you can ignore. Once compiled, run the result via ./a.out

This code demonstrates:

Race conditions

Let us imagine that x * x is a very costly operation (it's not, but use your imagination) and we want to calculate the sum of squares up to a certain number. It would make sense to parallelize the calculation of each square across threads. We can do something like this:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

/* single global variable */
/* shared, accessible, modifiable by all threads */
int accum = 0;

void* square(void* x) {
    int xi = (int)x;
    accum += xi * xi;
    return NULL; /* nothing to return, prevent warning */
}

int main(int argc, char** argv) {
    int i;
    pthread_t ths[20];
    for (i = 0; i < 20; i++) {
        pthread_create(&ths[i], NULL, square, (void*)(i + 1));
    }

    for (i = 0; i < 20; i++) {
        void* res;
        pthread_join(ths[i], &res);
    }

    printf("accum = %d\n", accum);
    return 0;
}

This should sum all squares up to and including 20. We iterate up to 20, and launch a new thread in each iteration that we give the assignment to. After this, we call join() on all our threads, which is a blocking operation that waits for the thread to finish, before continuing the execution. This is important to do before we print accum, since otherwise our threads might not be done yet. You should always join your threads before leaving main, if you haven't already.

Now, run this. Chances are it spits out 2870, which is the correct answer.

Let's use our bash shell to run it a few more times (assuming you compiled it to a.out):

$ for i in {1..40}; do ./a.out; done

See any inconsistencies yet? Better yet, let's list all distinct outputs from 1000 separate runs, including the count for each output:

$ for i in {1..1000}; do ./a.out; done | sort | uniq -c

You should definitely see plenty of incorrect answers, even though most of the time it gets it right. This is because of something called a race condition. When the compiler processes accum += x * x;, reading the current value of accum and setting the updated value is not an atomic (meaning indivisible) event. Let's re-write square to capture this:

int temp = accum;
temp += x * x;
accum = temp;

While the C code in this second version looks different, the fact is that when you compile the earlier version, it needs to do these intermediate steps as well. There is no practical difference between these versions, other than that this new version makes explicit the steps that were implicit, but still present, in the prior version.

Now, let's say the first two threads are interleaved over time, giving us something like this:

// Thread 1             // Thread 2
int temp1 = accum;      int temp2 = accum;          // temp1 = temp2 = 0
                        temp2 += 2 * 2;             // temp2 = 4
temp1 += 1 * 1;                                     // temp1 = 1
                        accum = temp1;              // accum = 1
accum = temp2;                                      // accum = 4

We end up with accum as 4, instead of the correct 5.

Mutex

A mutex (mutual exlusion) allows us to encapsulate blocks of code that should only be executed in one thread at a time. Put another way, it allows us to glue together a sequence of operations that would otherwise not be atomic, so that they are executed atomically. Put yet another way, it allows us to be certain that the intermediate state during a sequence of operations will not be observed or tampered with by another thread. Keeping the main function the same:

int accum = 0;
pthread_mutex_t accum_mutex = PTHREAD_MUTEX_INITIALIZER;

void* square(void* x) {
    int xi = (int)x;
    int temp = xi * xi;

    pthread_mutex_lock(&accum_mutex);
    accum += temp;
    pthread_mutex_unlock(&accum_mutex);

    return NULL; /* nothing to return, prevent warning */
}

Try running the program repeatedly again and the problem should now be fixed. The first thread that calls pthread_mutex_lock() gets the lock. During this time, all other threads that call pthread_mutex_lock(), will simply halt, waiting at that line for the mutex to be unlocked. Once it becomes available, one of the waiting threads will be allowed to lock it next, while the other threads continue to wait their turn. It is important to introduce the variable temp, since we want the x * x calculations to be outside the lock-unlock block, otherwise we would be hogging the lock while we're running calculations that do not also require mutual exclusion (because each thread can safely square its own value independently).

Condition variables

It would be useful to be able to have one thread wait for another thread to finish processing something, essentially sending a signal between the threads. This can be done with mutexes, but it would be awkward. It can also be done using a global boolean variable called notified that is set to true when we want to send the signal. The other thread would then run a for loop that checks if notified is true and stops looping when that happens. Since setting notified to true is atomic and in this example we're only setting it once, we don't even need a mutex. However, on the receiving thread we are running a for loop at full speed, wasting a lot of CPU time even though we are not performing a meaningful calculation. This wastes energy and hogs a CPU core unproductively. We could add a short sleep inside the for loop, making the CPU idle most of the time.

A more principled way, however, is to add a call to pthread_cond_wait for a condition variable inside the for loop:

#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <pthread.h>

pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;


int value = 100;
bool notified = false;

void* reporter(void* unused) {
    /*
    pthread_mutex_lock(&m);
    while (!notified) {
        pthread_cond_wait(&cond_var, &m);
    }
    */
    printf("The value is %d\n", value);
    /*
    pthread_mutex_unlock(&m);
    */
    return NULL;
}

void* assigner(void* unused) {
    value = 20;
    /*
    pthread_mutex_lock(&m);
    notified = true;
    pthread_cond_signal(&cond_var);
    pthread_mutex_unlock(&m);
    */
    return NULL;
}

int main(int argc, char** argv) {
    pthread_t rep, asgn;

    pthread_create(&rep, NULL, reporter, NULL);
    pthread_create(&asgn, NULL, assigner, NULL);

    void* unused;

    pthread_join(rep, &unused);
    pthread_join(asgn, &unused);
    return 0;
}

If you compile it as it is, it will output 100 most of the time (maybe try the bash for loop again?). However, we want the reporter thread to wait for the assigner thread to give it the value 20, before outputting it. You can do this by uncommenting the three /* ... */ blocks in the thread functions. In the assigner thread, it will set notified to true and send a signal through the condition variable cond_var. In the reporter thread, we're looping as long as notified is false, and in each iteration we wait for a signal. Try running it, it should work.

But wait, if cond_var can send a signal that will make the call pthread_cond_wait() blocking until it receives it, why are we still using notified and a loop? Well, that's because the condition variable can be spuriously awaken even if we didn't call pthread_cond_signal, and in those cases we need to fall back to checking notified.

This is a simplified description since we are also giving pthread_cond_wait the mutex m. What happens is that when pthread_cond_wait is called, it not only waits for a notification, but also for the mutex m to be unlocked. When this happens, it will acquire the lock itself. If cond_var has acquired a lock and pthread_cond_wait is called again, it will be unlocked as long as it's waiting to acquire it again. This gives us some structure of mutual exclusion between the two threads, as we will see in the following example.

Producer-consumer problem

You should now have all the tools needed to fix an instance of the producer-consumer problem. Simply put, one thread is producing goods and another thread is consuming goods. We want the consumer thread to wait using a condition variable, and we want adding something to the list of available goods to be mutually exclusive to removing it, so are data doesn't get corrupted. We are letting c++ and c-- be surrogates for items being produced and consumed. In other words, the producer generates another item that is being counted by c, and the consumer uses it, decrementing c. (We use this proxy so that we don't clutter the code with actual list manipulation statements.) We can easily check if we correctly end up with 0 in the end. Run the code as it is, and you will see that the net value can be way off (though you may get lucky some of the time):

#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <pthread.h>

int c = 0;
bool done = false;

void* producer(void* unused) {
    int i;
    for (i = 0; i < 500; i++) {
        /* produce something */
        /* append it to a list */
        c++;
    }
    done = true;
    return NULL;
}

void* consumer(void* unused) {
    while (!done) {
        while (c > 0) {
            /* remove something from list */
            c--;
        }
    }
    return NULL;
}

int main(int argc, char** argv) {
    pthread_t prod, con;

    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&con, NULL, consumer, NULL);

    void* unused;
    pthread_join(prod, &unused);
    pthread_join(con, &unused);

    printf("Net: %d\n", c);
}

Don't expect the fix to be that trivial, and feel free to ask the TA for help. Just wrapping all access to the shared memory in lock-unlock blocks can fix this problem, but remember that we don't want the consumer loop to constantly lock and unlock without accomplishing anything in the case that the queue is empty. So a condition variable is ideal. Use the commands for executing the problem 1000 times to test the integrity of your solution.