Leverage the advances in hardward, such as multi core CPUs:
If your problem can be solved sequentially, consider the costs and benefits before implementing concurrency
A process provides the resources needed to execute a program. It has a lot of stuff. Virtual address space, executable code, open handles to system objects, a security context, environment variables and much more.
Communication between processes can be achieved via different interprocess communication techniques ( IPC ).
Processes require multiple copies of the data, or expensive IPC to access it
One or more threads per process
A thread is the entity within a process that can be scheduled for execution
Threads are lightweight processes, run in the address space of an OS process.
These threads share the memory and the state of the process. This allows multiple threads access to data in the same scope.
Python threads are true OS level threads
Threads can not gain the performance advantage of multiple processors due to the Global Interpreter Lock (GIL)
But the GIL is released during IO, allowing IO bound processes to benefit from threading
This is a lock which must be obtained by each thread before it can execute, ensuring thread safety
The GIL is released during IO operations, so threads which spend time waiting on network or disk access can enjoy performance gains
Some alternative Python implementations such as Jython and IronPython have no GIL
cPython and PyPy have one
Launch multiple processes to speed up CPU bound operations. Luckily, this is easy with the multiprocessing module.
Numerically integrate the function y = x2 from 0 to 10.
Another example using prime factors
Consider the following code from examples/integrate/sequential
def f(x):
return x**2
def integrate(f, a, b, N):
s = 0
dx = (b-a)/N
for i in xrange(N):
s += f(a+i*dx)
return s * dx
print integrate(f, 0, 10, 100)
Break down the problem into parallelizable chunks, then add the results together:
Starting threads doesn't take much. See examples/simple-threading.py
import sys
import threading
import time
def func():
for i in xrange(5):
print "hello from thread %s" % threading.current_thread().name
time.sleep(1)
threads = []
for i in xrange(3):
thread = threading.Thread(target=func, args=())
thread.start()
threads.append(thread)
Let's talk about some of the calls:
What if we don't want a thread to block the process from exiting?
thread.daemon = True
See examples/simple-threading-daemon.py
There is a second form for creating threads. You can sublcass: threading.Thread
thread = threading.Thread(target=func, args=())
When threaded operations run concurrently we have to be careful about how they use shared resources. There's a potential for corruption.
If competing threads try to update the same value, we might get an unexpected race condition
Race conditions occur when multiple statements need to execute atomically, but get interrupted midway
See examples/race_condition.py
Thread 1 | Thread 2 | Integer value | |
---|---|---|---|
0 | |||
read value | ← | 0 | |
increase value | 0 | ||
write back | → | 1 | |
read value | ← | 1 | |
increase value | 1 | ||
write back | → | 2 |
Thread 1 | Thread 2 | Integer value | |
---|---|---|---|
0 | |||
read value | ← | 0 | |
read value | ← | 0 | |
increase value | 0 | ||
increase value | 0 | ||
write back | → | 1 | |
write back | → | 1 |
Synchronization is used to control race conditions for critical sections of code
But they introduce other potential problems...as we will see
Lock objects allow threads to control access to a resource until they're done with it
This is known as mutual exclusion, often called mutex
Python 2 has a deprecated module called mutex for this. Use a Lock instead.
A Lock has two states: locked and unlocked
If multiple threads have access to the same Lock, they can police themselves by calling its .acquire() and .release() methods
If a Lock is locked, .acquire will block until it becomes unlocked
These threads will wait in line politely for access to the statements in f()
import threading
import time
lock = threading.Lock()
def f():
lock.acquire()
print "%s got lock" % threading.current_thread().name
time.sleep(1)
lock.release()
threading.Thread(target=f).start()
threading.Thread(target=f).start()
threading.Thread(target=f).start()
See examples/lock/simple_lock.py
We now have the tools to solve the previous race condition problem
See examples/solutions/race_condition.py
What potential problems does synchronization create?
...Deadlocks!
"A deadlock is a situation in which two or more competing actions are each waiting for the other to finish, and thus neither ever does."
When two trains approach each other at a crossing, both shall come to a full stop and neither shall start up again until the other has gone.
See examples/lock/deadlock.py
find examples/lock/stdout_writer.py
multiple threads in the script write to stdout, and their output gets jumbled
A Semaphore is given an initial counter value, defaulting to 1
Each call to acquire() decrements the counter, release() increments it
If acquire() is called on a Semaphore with a counter of 0, it will block until the Semaphore counter is greater than 0.
Stop what you are doing and watch this video and read this book.
Allen Downey's A Little Book of Semaphores
find examples/lock/stdout_writer.py
multiple threads in the script write to stdout, and their output gets jumbled
We need a thread safe way of storing results from multiple threads of execution. That is provided by the Queue module.
Queues allow multiple producers and multiple consumers to exchange data safely
Size of the queue is managed with the maxsize kwarg
It will block consumers if empty and block producers if full
See examples/lock/producer_consumer.py
from Queue import Queue
q = Queue(maxsize=10)
q.put(37337)
block = True
timeout = 2
print q.get(block, timeout)
Queue.LifoQueue - Last In, First Out
Queue.PriorityQueue - Lowest valued entries are retrieved first
One pattern for PriorityQueue is to insert entries of form data by inserting the tuple: (priority_number, data)
The "ord" command converts an ascii character to it's ordinal number
assert ord('a') == 97
See examples/threading/integrate_main.py
#!/usr/bin/env python import argparse import os import sys import threading import Queue sys.path.append(os.path.join(os.path.dirname(__file__), "..")) from integrate.integrate import integrate, f from decorators.decorators import timer @timer def threading_integrate(f, a, b, N, thread_count=2): """break work into two chunks""" N_chunk = int(float(N) / thread_count) dx = float(b-a) / thread_count results = Queue.Queue() def worker(*args): results.put(integrate(*args)) threads = [] for i in xrange(thread_count): x0 = dx*i x1 = x0 + dx thread = threading.Thread(target=worker, args=(f, x0, x1, N_chunk)) thread.start() print "Thread %s started" % thread.name # thread1.join() return sum( (results.get() for i in xrange(thread_count) )) if __name__ == "__main__": parser = argparse.ArgumentParser(description='integrator') parser.add_argument('a', nargs='?', type=float, default=0.0) parser.add_argument('b', nargs='?', type=float, default=10.0) parser.add_argument('N', nargs='?', type=int, default=10**7) parser.add_argument('thread_count', nargs='?', type=int, default=2) args = parser.parse_args() a = args.a b = args.b N = args.N thread_count = args.thread_count print "Numerical solution with N=%(N)d : %(x)f" % \ {'N': N, 'x': threading_integrate(f, a, b, N, thread_count=thread_count)}
Try running the code in examples/threading/integrate_main.py
It accepts 4 arguments:
./integrate_main.py -h usage: integrate_main.py [-h] [a] [b] [N] [thread_count] integrator positional arguments: a b N thread_count
./integrate_main.py 0 10 1000000 4
What happens when you change the thread count? What thread count gives the maximum speed?
multiprocessing provides an API very similar to threading, so the transition is easy
use multiprocessing.Process instead of threading.Thread
import multiprocessing
import os
import time
def func():
print "hello from process %s" % os.getpid()
time.sleep(1)
proc = multiprocessing.Process(target=func, args=())
proc.start()
proc = multiprocessing.Process(target=func, args=())
proc.start()
multiprocessing has its own multiprocessing.Queue which handles interprocess communication
Also has its own versions of Lock, RLock, Semaphore
from multiprocessing import Queue, Lock
multiprocessing.Pipe for 2-way process communication:
from multiprocessing import Pipe
parent_conn, child_conn = Pipe()
child_conn.send("foo")
print parent_conn.recv()
Let's change around our threaded integrate workflow and use multiprocessing instead
What do we expect the results to be?
See examples/multiprocessing/integrate_main.py
a processing pool contains worker processes with only a configured number running at one time
from multiprocessing import Pool
pool = Pool(processes=4)
The Pool module has several methods for adding jobs to the pool
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4)
result = pool.apply_async(f, (10,))
print result.get(timeout=1)
print pool.map(f, range(10))
it = pool.imap(f, range(10))
print it.next()
print it.next()
print it.next(timeout=1)
import time
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1)
http://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
threading also has a pool
confusingly, it lives in the multiprocessing module
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=4)
We're going to test making concurrent connections to a web service in examples/server/app.py
It is a WSGI application which can be run with Green Unicorn or another WSGI server
$ gunicorn app:app --bind 0.0.0.0:37337
client-threading.py makes 100 threads to contact the web service
client-mp.py makes 100 processes to contact the web service
client-pooled.py creates a ThreadPool
client-pooled.py contains a results Queue, but doesn't use it. Can you collect all the output from the pool into a single data structure using this Queue?
Traditionally, concurency has been achieved through multiple process communication and in-process threads, as we've seen
Another strategy is through micro-threads, implemented via coroutines and a scheduler
A coroutine is a generalization of a subroutine which allows multiple entry points for suspending and resuming execution
the threading and the multiprocessing modules follow a preemptive multitasking model
coroutine based solutions follow a cooperative multitasking model
def coroutine(n):
try:
while True:
x = (yield)
print n+x
except GeneratorExit:
pass
targets = [
coroutine(10),
coroutine(20),
coroutine(30),
]
for target in targets:
target.next()
for i in range(5):
for target in targets:
target.send(i)
By "jumping" to parallel coroutines, our application can simulate true threads.
Creating the scheduler which does the jumping is an exercise for the reader, but look into these packages which handle the dirty work
/