# Concurrency: theory + Python examples

### Intro

Threads are used when I want to have multiple threads of control in the same address space (within the same process). Obviously, threads yield no performance gain if all of them are CPU bound; but if I have both computing and I/O or waiting, these activities can overlap.

Example of a web-server consisting of two threads: one that waits for work to do, and one that actually does the work:

while (True):
get_next_request(req)
handoff_work(req)


and

while (True):
wait_for_work(req)
look_for_page_in_cache(req)
if (page_not_in_cache(req)):
return page


Having threads allows the webserver to do work while the first thread is waiting for requests, and to process requests while the second thread is blocked doing disk operation.

### Race conditions

Every thread has its own stack and can be in four different states: running, blocked (e.g. performed a syscall to read input form the keyboard), ready (waiting for its CPU turn) or terminated.

We want to avoid race conditions (when two threads read/write some shared data and the result depends on who runs when), thus achieving mutual exclusion. A bunch of bad ways to do it:

1. Disabling interrupts when entering in the critical region: unwise to give a user process this power, and it only works on single cpu architectures.
2. Locks variables: using a shared variable. However, updating a variable is not an atomic operation (it consists of several assembly instructions) and an interrupt may kick in somewhere in between.
3. Busy waiting with spin lock aka strict alternation. Every time a thread wants to enter the critical region it checks if it’s its turn, wasting a ton of cpu time. Even worst, in a strict alternation p1/p2, even if p2 spends 10 seconds outside the critical region, p1 is still blocked.

We can do better than busy waiting using primitives that block a thread when needed, such as sleep and wakeup.

### Producer & Consumer problem

Producer:

if (counter == N) sleep();
insert_item();
counter++;
if (counter == 1) wakeup(consumer) /* buffer was empty */


Consumer:

if (counter == 0) sleep();
consume_item();
counter--;
if (counter == N-1) wakup(producer) /* buffer was full */


1. consumer reads counter is 0
2. interrupt happens while consumer is still running
3. producer produces, counter is now 1 so it wakes the consumer up. However, consumer is not sleeping so the signal is lost
4. consumer is resumed, it read before counter was 0, goes off to sleep
5. producer fills the buffer and goes to sleep too

### Introducing Semaphores

The whole point about semaphores is that they are implemented in an indivisible way so that the sequence above could never happen; this is achieved implementing up/down as syscalls and disabling interrupts while testing the semaphore, updating and putting the process to sleep.

Notice that disabling interrupts for the time needed to update the semaphore is OK, whereas disabling them for the whole program was kinda not.

The p/c problem with semaphores might look like this:

Producer

down(empty)
down(mutex)
put_item_in_buffer()
up(mutex)
up(full)


Consumer

down(full)
down(mutex)
remove_item_from_buffer()
up(mutex)
up(empty)


Here we’re using three semaphores: two for synchronization (empty and full) to make sure certain events sequences do or do not occur (wakeup and go to sleep); and one simple semaphore called mutex to enforce mutual exclusion to some shared resources. A mutex is a semaphore that doesn’t count – it usually is just 0 or 1. The operations of up/down on semaphores are guaranteed to be atomic (cause they’re semaphores); but putting stuff in a buffer is not, and that’s why we need the mutex too.

Some languages (Java, Python…) provide monitors. A monitor is a collection of procedures, variables and ds grouped together such as only one thread can be active in a monitor at any given instant. When using monitors, the compiler/interpreter handles the actual mutual exclusion.

### Mutexes in Python

Simple example of concurrency done wrong, and how to fix it using a mutex. I used sleep() to “encourage” the threads to read not-updated values between reads and writes.

from threading import Lock, Thread
from time import sleep

class myStruct(object):
def __init__(self):
self.var = 0

def increment_by_one(obj):
tmp = obj.var
sleep(0.0025)
obj.var = tmp + 1

def safe_increment_by_one(obj):
lock.acquire()
tmp = obj.var
sleep(0.0025)
obj.var = tmp + 1
lock.release()

# main
obj1 = myStruct()
obj2 = myStruct()
lock = Lock()

for i in range(100):
t = Thread(target = increment_by_one, args= (obj1,))
sleep(0.001)
t.start()

for i in range(100):
t = Thread(target = safe_increment_by_one, args= (obj2,))
sleep(0.001)
t.start()

sleep(3)
print(obj1.var) # anywhere between 40-45
print(obj2.var) # 100


### Consumer – Producer in Python

This implementation uses a list as buffer, and it will fail. Since the consumer consumes faster than the producer, eventually the consumer will try to pop from an empty list, raising an error and resulting in the thread to be killed.

# Using a list - this will fail
lock = Lock()
def unsafe_producer(q):
while True:
lock.acquire()
q.append(random.choice(range(5)))
print(q)
lock.release()
sleep(2.5)

def unsafe_consumer(q):
while True:
lock.acquire()
if not q:
print('this is gonna fail')
q.pop()
print(q)
lock.release()
sleep(0.5)


What we want is to put the producer to sleep when the buffer is empty, and to resume it when it gets notified by the producer. There are a couple different ways to go about it: one is to use a built-in data structure that supports synchronization, such as Python’s Queue:

# Using a built-in syncd Queue
def queue_producer(q):
while True:
q.put(random.choice(range(5)))
sleep(2)

def queue_consumer(q):
while True:
q.get()
sleep(1.5)


The other one is to build synchronization in our own data structure using Python’s conditions. Note that lock functionalities are built-in in the conditions (as we would expect with a proper monitor) so we don’t have to use a low-level lock() anymore.

# Using conditions
condition = Condition()
def safe_producer(q):
while True:
condition.acquire()
q.append(random.choice(range(5)))
print(q)
if len(q) == 1:
condition.notify() # q was empty
condition.release()
sleep(2.5)

def safe_consumer(q):
while True:
condition.acquire()
if not q:
print('q is empty, going to sleep')
condition.wait()
print('waking up consumer')
q.pop()
print(q)
condition.release()
sleep(1)


Note to super-lazy future me: here’s the boilerplate code for the examples above:

from queue import Queue
import random
from time import sleep

...

# safe with custom ds
q = [1,2,3,4]