System Development with Python

Week 7 :: threading and multiprocessing

Threading / multiprocessing

Today's topics

Motivations for concurrency

Leverage the advances in hardware such as multi core CPUs:

If your problem can be solved sequentially, consider the costs and benefits before implementing concurrency

Basic concurrency strategy

  1. Break problem down into chunks
  2. Execute chunks in parallel
  3. Reassemble output of chunks into result

Basic concurrency strategy

Threads versus processes in Python

Processes

A process provides the resources needed to execute a program. It has a lot of stuff. Virtual address space, executable code, open handles to system objects, a security context, environment variables and much more.

Communication between processes can be achieved via different interprocess communication techniques ( IPC ).

Processes require multiple copies of the data, or expensive IPC to access it

One or more threads per process

Threads

A thread is the entity within a process that can be scheduled for execution

Threads are lightweight processes, run in the address space of an OS process.

These threads share the memory and the state of the process. This allows multiple threads access to data in the same scope.

Python threads are true OS level threads

Threads can not gain the performance advantage of multiple processors due to the Global Interpreter Lock (GIL)

GIL

Global Interpreter Lock

This is a lock which must be obtained by each thread before it can execute, ensuring thread safety

Remember, this is an implementation detail. Some alternative Python implementations such as Jython and IronPython have no GIL. cPython and PyPy have the GIL

The GIL is released during IO operations allowing IO bound processes to benefit from threading

A CPU bound problem

Numerically integrate the function y = x2 from 0 to 10.


Solution

Another example using prime factors

Parallel execution example

Consider the following code from examples/integrate/sequential

def f(x):
    return x**2

def integrate(f, a, b, N):
    s = 0
    dx = (b-a)/N
    for i in xrange(N):
        s += f(a+i*dx)
    return s * dx

print integrate(f, 0, 10, 100)
      

Break down the problem into parallelizable chunks, then add the results together:

The threading module functions

Starting threads doesn't take much. See examples/simple-threading.py

import sys
import threading
import time

def func():
    for i in xrange(5):
        print "hello from thread %s" % threading.current_thread().name
        time.sleep(1)

threads = []
for i in xrange(3):
    thread = threading.Thread(target=func, args=())
    thread.start()
    threads.append(thread)

Let's talk about some of the calls:

Daemon Threads

What if we don't want a thread to block the process from exiting?

See examples/simple-threading-daemon.py

Exercise

There is a second form for creating threads. You can sublcass: threading.Thread

  1. Take the example we've been working with
  2. Create a new class that subclasses threading.Thread
  3. Implement the run() method
  4. NOTE: the thread creation code must change too:
  5. thread = threading.Thread(target=func, args=())

Sharing Resources

When threaded operations run concurrently we have to be careful about how they use shared resources. There's a potential for corruption.

Race Conditions

If competing threads try to update the same value, we might get an unexpected race condition

Race conditions occur when multiple statements need to execute atomically, but get interrupted midway

See examples/race_condition.py

Thread 1 Thread 2 Integer value
0
read value 0
increase value 0
write back 1
read value 1
increase value 1
write back 2
Thread 1 Thread 2 Integer value
0
read value 0
read value 0
increase value 0
increase value 0
write back 1
write back 1

http://en.wikipedia.org/wiki/Race_condition

Synchronization

Synchronization is used to control race conditions for critical sections of code

But they introduce other potential problems...as we will see

Locks

Lock objects allow threads to control access to a resource until they're done with it

This is known as mutual exclusion, often called mutex

Python 2 has a deprecated module called mutex for this. Use a Lock instead.

A Lock has two states: locked and unlocked

If multiple threads have access to the same Lock, they can police themselves by calling its .acquire() and .release() methods

If a Lock is locked, .acquire will block until it becomes unlocked

These threads will wait in line politely for access to the statements in f()

import threading
import time

lock = threading.Lock()

def f():
    lock.acquire()
    print "%s got lock" % threading.current_thread().name
    time.sleep(1)
    lock.release()

threading.Thread(target=f).start()
threading.Thread(target=f).start()
threading.Thread(target=f).start()

See examples/lock/simple_lock.py

Nonblocking locking

.acquire() will return True if it successfully acquires a lock

Its first argument is a boolean which specifies whether a lock should block or not. The default is True

import threading
lock = threading.Lock()
lock.acquire()
if not lock.acquire(False):
    print "couldn't get lock"
lock.release()
if lock.acquire(False):
    print "got lock"

See examples/lock/simple_nonblocking_lock.py

Race Condition Solution

We now have the tools to solve the previous race condition problem

See examples/solutions/race_condition.py

Deadlocks

What potential problems does synchronization create?

...Deadlocks!

"A deadlock is a situation in which two or more competing actions are each waiting for the other to finish, and thus neither ever does."

When two trains approach each other at a crossing, both shall come to a full stop and neither shall start up again until the other has gone.

See examples/lock/deadlock.py

Locking Exercise

find examples/lock/stdout_writer.py

multiple threads in the script write to stdout, and their output gets jumbled

  1. Add a locking mechanism to give each thread exclusive access to stdout

threading.Semaphore

A Semaphore is given an initial counter value, defaulting to 1

Each call to acquire() decrements the counter, release() increments it

If acquire() is called on a Semaphore with a counter of 0, it will block until the Semaphore counter is greater than 0.

http://en.wikipedia.org/wiki/Semaphore_(programming)

Semaphore(s) offer many synchronization patterns

Stop what you are doing and watch this video and read this book.

Allen Downey's A Little Book of Semaphores

Locking Exercise

find examples/lock/stdout_writer.py

multiple threads in the script write to stdout, and their output gets jumbled

  1. Try adding a Semaphore to allow 2 threads access at once

Managing thread results

We need a thread safe way of storing results from multiple threads of execution. That is provided by the Queue module.

Queues allow multiple producers and multiple consumers to exchange data safely

Size of the queue is managed with the maxsize kwarg

It will block consumers if empty and block producers if full

See examples/lock/producer_consumer.py

from Queue import Queue
q = Queue(maxsize=10)
q.put(37337)
block = True
timeout = 2
print q.get(block, timeout)

Other Queue types

Queue.LifoQueue - Last In, First Out

Queue.PriorityQueue - Lowest valued entries are retrieved first

One pattern for PriorityQueue is to insert entries of form data by inserting the tuple: (priority_number, data)

Exercise

The "ord" command converts an ascii character to it's ordinal number

assert ord('a') == 97

  1. Look at the stubbed out code in /examples/ascii_exercise.py
  2. Create a producer function that converts the "alpha" list to their ordinals
  3. Create producer threads that use this producer function and add their results to a Queue.Queue()
  4. Write a single consumer thread that reads the Queue.Queue() and prints out the converted value

threading example

See examples/threading/integrate_main.py

#!/usr/bin/env python

import argparse
import os
import sys
import threading
import Queue

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from integrate.integrate import integrate, f
from decorators.decorators import timer

@timer
def threading_integrate(f, a, b, N, thread_count=2):
    """break work into two chunks"""
    N_chunk = int(float(N) / thread_count)
    dx = float(b-a) / thread_count

    results = Queue.Queue()

    def worker(*args):
        results.put(integrate(*args))

    threads = []
    for i in xrange(thread_count):
        x0 = dx*i
        x1 = x0 + dx
        thread = threading.Thread(target=worker, args=(f, x0, x1, N_chunk))
        thread.start()
        print "Thread %s started" % thread.name
        # thread1.join()

    return sum( (results.get() for i in xrange(thread_count) ))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='integrator')
    parser.add_argument('a', nargs='?', type=float, default=0.0)
    parser.add_argument('b', nargs='?', type=float, default=10.0)
    parser.add_argument('N', nargs='?', type=int, default=10**7)
    parser.add_argument('thread_count', nargs='?', type=int, default=2)

    args = parser.parse_args()
    a = args.a
    b = args.b
    N = args.N
    thread_count = args.thread_count

    print "Numerical solution with N=%(N)d : %(x)f" % \
            {'N': N, 'x': threading_integrate(f, a, b, N, thread_count=thread_count)}

Threading on a CPU bound problem

Try running the code in examples/threading/integrate_main.py

It accepts 4 arguments:

./integrate_main.py -h
usage: integrate_main.py [-h] [a] [b] [N] [thread_count]

integrator

positional arguments:
  a
  b
  N
  thread_count
      
./integrate_main.py 0 10 1000000 4

What happens when you change the thread count? What thread count gives the maximum speed?

multiprocessing

multiprocessing provides an API very similar to threading, so the transition is easy

use multiprocessing.Process instead of threading.Thread

import multiprocessing
import os
import time

def func():
    print "hello from process %s" % os.getpid()
    time.sleep(1)

proc = multiprocessing.Process(target=func, args=())
proc.start()
proc = multiprocessing.Process(target=func, args=())
proc.start()

Differences with threading

multiprocessing has its own multiprocessing.Queue which handles interprocess communication

Also has its own versions of Lock, RLock, Semaphore


from multiprocessing import Queue, Lock
      

multiprocessing.Pipe for 2-way process communication:

from multiprocessing import Pipe
parent_conn, child_conn = Pipe()
child_conn.send("foo")
print parent_conn.recv()
      

multiprocessing to the rescue

Let's change around our threaded integrate workflow and use multiprocessing instead

What do we expect the results to be?

See examples/multiprocessing/integrate_main.py

Pooling

a processing pool contains worker processes with only a configured number running at one time

from multiprocessing import Pool
pool = Pool(processes=4)
      

The Pool module has several methods for adding jobs to the pool

Pooling example

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)

    result = pool.apply_async(f, (10,))
    print result.get(timeout=1)

    print pool.map(f, range(10))

    it = pool.imap(f, range(10))
    print it.next()
    print it.next()
    print it.next(timeout=1)

    import time
    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)
      

http://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

ThreadPool

threading also has a pool

confusingly, it lives in the multiprocessing module


      from multiprocessing.pool import ThreadPool
      pool = ThreadPool(processes=4)
      

threading versus multiprocessing, networking edition

We're going to test making concurrent connections to a web service in examples/server/app.py

It is a WSGI application which can be run with Green Unicorn or another WSGI server

$ gunicorn app:app --bind 0.0.0.0:37337

client-threading.py makes 100 threads to contact the web service

client-mp.py makes 100 processes to contact the web service

client-pooled.py creates a ThreadPool

client-pooled.py contains a results Queue, but doesn't use it. Can you collect all the output from the pool into a single data structure using this Queue?

Other options

Traditionally, concurency has been achieved through multiple process communication and in-process threads, as we've seen

Another strategy is through micro-threads, implemented via coroutines and a scheduler

A coroutine is a generalization of a subroutine which allows multiple entry points for suspending and resuming execution

the threading and the multiprocessing modules follow a preemptive multitasking model

coroutine based solutions follow a cooperative multitasking model

With send(), a generator becomes a coroutine


def coroutine(n):
    try:
        while True:
            x = (yield)
            print n+x
    except GeneratorExit:
        pass

targets = [
 coroutine(10),
 coroutine(20),
 coroutine(30),
]

for target in targets:
    target.next()

for i in range(5):
    for target in targets:
        target.send(i)
      

http://dabeaz.com/coroutines/Coroutines.pdf

Packages using coroutines for micro threads

By "jumping" to parallel coroutines, our application can simulate true threads.

Creating the scheduler which does the jumping is an exercise for the reader, but look into these packages which handle the dirty work

Questions?

/