Threading and multiprocessing Part 4: Managing thread results

We need a thread-safe way of storing results from multiple threads of execution. That is provided by the Queue module.

Queues allow multiple producers and multiple consumers to exchange data safely.

Size of the queue is managed with the maxsize kwarg.

It will block consumers if empty and block producers if full.

If maxsize is less than or equal to zero, the queue size is infinite.

from Queue import Queue
q = Queue(maxsize=10)
q.put(37337)
block = True
timeout = 2
print(q.get(block, timeout))

Queues (queue)

  • Easier to use than many of the above.

  • Do not need locks.

  • Have signaling.

Common use: producer/consumer patterns

from Queue import Queue
data_q = Queue()

Producer thread:
for item in produce_items():
    data_q.put(item)

Consumer thread:
while True:
    item = q.get()
    consume_item(item)

Scheduling (sched)

  • Schedules based on time, either absolute or delay.

  • Low level, so it has many of the traps of the threading synchronization primitives.

Timed events (threading.timer)

Run a function at some time in the future:

import threading

def called_once():
    """
    this function is designed to be called once in the future
    """
    print("I just got called! It's now: {}".format(time.asctime()))

# setting it up to be called
t = Timer(interval=3, function=called_once)
t.start()

# you can cancel it if you want:
t.cancel()

See `simple_timer.py` in your repository.

Other Queue types

Queue.LifoQueue

  • Last In, First Out

Queue.PriorityQueue

  • Lowest valued entries are retrieved first

One pattern for PriorityQueue is to insert entries of form data by inserting the tuple:

(priority_number, data)

Threading example with a queue

See `integrate_main.py ` in your repository.

#!/usr/bin/env python

import threading
import queue

# from integrate.integrate import integrate, f
from integrate import f, integrate_numpy as integrate
from decorators import timer


@timer
def threading_integrate(f, a, b, N, thread_count=2):
    """break work into N chunks"""
    N_chunk = int(float(N) / thread_count)
    dx = float(b - a) / thread_count

    results = queue.Queue()

    def worker(*args):
        results.put(integrate(*args))

    for i in range(thread_count):
        x0 = dx * i
        x1 = x0 + dx
        thread = threading.Thread(target=worker, args=(f, x0, x1, N_chunk))
        thread.start()
        print("Thread %s started" % thread.name)

    return sum((results.get() for i in range(thread_count)))


if __name__ == "__main__":

    # parameters of the integration
    a = 0.0
    b = 10.0
    N = 10**8
    thread_count = 8

    print("Numerical solution with N=%(N)d : %(x)f" %
          {'N': N, 'x': threading_integrate(f, a, b, N, thread_count=thread_count)})

Threading on a CPU bound problem

Try running the code in `integrate_threads.py` in your repository.

It has a couple of tunable parameters:

a = 0.0  # the start of the integration
b = 10.0  # the end point of the integration
N = 10**8 # the number of steps to use in the integration
thread_count = 8  # the number of threads to use

What happens when you change the thread count? What thread count gives the maximum speed?