Python Producer Consumer with Queue

The producer / consumer pattern is a common programming construct used in multithreaded applications where one thread acts as a producer of data while other threads consume the data. A web crawler application is a use case of the producer / consumer pattern. For example, the application may have a thread dedicated to crawling the web that gathers data (producer) while other threads index and store the data (consumers).

Producer and consumer threads need a way to share data. Python’s queue module provides one of many solutions. The Queue object is a FIFO object that lets the produce thread place data on the queue. Consumer threads are blocked by the Queue until the Queue has data for the consumer thread to read. When data becomes available, the consumer thread removes data from the Queue and does its work.

Below is an example program borrowed from Programming Python: Powerful Object-Oriented Programming that shows how to use a Queue to synchronize data between producer and consumer threads. I added my own comments to the code to help explain what is happening in the program.

Code

# Specify the number of consumer and producer threads
numconsumers = 2
numproducers = 4
nummessages = 4

import _thread as thread, queue, time

# Create a lock so that only one thread writes to the console at a time
safeprint = thread.allocate_lock()

# Create a queue object
dataQueue = queue.Queue()


# Function called by the producer thread
def producer(idnum):
    # Produce 4 messages to place on the queue
    for msgnum in range(nummessages):
        # Simulate a delay
        time.sleep(idnum)

        # Put a String on the queue
        dataQueue.put('[producer id={}, count={}]'.format(idnum, msgnum))


# Function called by the consumer threads
def consumer(idnum):
    # Create an infinite loop
    while True:
        # Simulate a delay
        time.sleep(0.1)
        try:
            # Attempt to get data from the queue. Note that
            # dataQueue.get() will block this thread's execution
            # until data is available
            data = dataQueue.get()
        except queue.Empty:
            pass
        else:
            # Acquire a lock on the console
            with safeprint:
                # Print the data created by the producer thread
                print('consumer ', idnum, ' got => ', data)


if __name__ == '__main__':
    # Create consumers
    for i in range(numconsumers):
        thread.start_new_thread(consumer, (i,))
        
    # Create producers
    for i in range(numproducers):
        thread.start_new_thread(producer, (i,))
        
    # Simulate a delay
    time.sleep(((numproducers - 1) * nummessages) + 1)
    
    # Exit the program
    print('Main thread exit')

Detailed Explanation

This program shows the producer / consumer pattern in action. We begin by defining variables that specify the number of consumer threads (line 2), the number of produce threads (line 3), and the number of messages the producer threads make (line 4). The program creates a lock on line 9 so that only one thread can use the console at the same time. Then on line 12, the queue is created as a global variable.

Our first function, producer, is defined on lines 16-23. There isn’t anything fancy going on in this function. The function simply enters a for-each loop and creates 4 strings that are placed on dataQueue (line 23). Since dataQueue is a FIFO structure, worker threads will remove these Strings from dataQueue in the order they are recieved.

Lines 27-43 define the consumer thread function, consumer. This code enters an infinite loop and removes data from dataQueue and prints the String to the console. Line 36 is the critical piece of code in the consumer function. The call to get() on dataQueue removes the item at the front of the queue and stores it in the variable data. If dataQueue is empty, the consumer thread is blocked until data becomes available.

Alternatively, we could pass false to the optional block parameter on get(). That would cause the thread to continue to execute even if the queue is empty. However, we need to be prepared for situations where the queue is empty and catch the queue.Empty exception that is thrown. Our program calls pass to skip over the exception should this happen (it shouldn’t be the way, because we are using the blocking version of get()).

Lines 48-49 create our producer threads and start them. Lines 52-53 create and start the consumer threads. The producer threads call the produce function while the consumer threads call the consume function. The dataQueue object does the job of synchronizing data between threads. The produce threads write to dataQueue and consumer threads read from it. Thus, our program has created the consumer / producer pattern.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Advertisements

Python _thread Mutex

When Python programs create threads, each thread shares the same global memory as every other thread. Usually, but not always, multiple threads can safely read from shared resources without issue. Threads writing to shared resources are a different story because one thread could potentially overwrite the work of another thread.

This post demonstrates an example program shown in Programming Python: Powerful Object-Oriented Programming where threads acquire and release locks in the program. The locking mechanism ensures that only one thread has access to a shared resource at a time.

Code

Here is an example program with my own comments added.

import _thread as thread, time

# This mutex object is created by calling
# thread.allocate_lock()
# The mutex is responsible for synchronizing threads
mutex = thread.allocate_lock()


def counter(tid, count):
    for i in range(count):
        time.sleep(1)
        
        # The standard out is a shared resource
        # Unless the program controls access to the standard out
        # multiple threads can print to standard out at the same time
        # which results in garbage output
        
        # Acquire a lock
        mutex.acquire()
        
        # Now only the current thread can print to the console
        print('[{}] => {}'.format(tid, i))
        
        # Make sure to release the lock for other threads when finished
        mutex.release()


if __name__ == '__main__':
    for i in range(5):
        thread.start_new_thread(counter, (i, 5))

    time.sleep(6)
    print('Main thread exiting...')

Explanation

The program creates five threads, each of which needs access to the standard output stream. The standard output stream is a global object that all of the threads share, which means that each thread can call print at the same time. That isn’t ideal because we can get garbage output printed to the console if two threads call the print() statement at the same time.

The solution is to lock access to the standard output stream so that only one thread may use it at a time. We do this by creating a mutex object on line 6 in the program by using thread.allocate_lock(). When a thread needs a lock, it calls acquire() on the mutex. At that point, all other threads that need protected resources have to sit and wait for mutex.release().

It’s important to keep the operations between mutex.acquire() and mutex.release() as brief as possible. Only one thread can hold a lock at a time, so the longer one thread holds a lock, the longer other threads need to wait for their turn to use the lock. That naturally impacts the performance of the overall program.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Threading

The Python threading module provides an OOP solution to threading. The base class, threading.Thread, follows a Java like pattern to creating and joining threads. This post provides a threading demonstration with an example program found in Programming Python: Powerful Object-Oriented Programming.

Code

This is the example program with my own comments added.

import threading

# Thread is the base class for creating OOP Style Threads
# It has a run() method that contains the code that runs in a new thread
class MyThread(threading.Thread):
    def __init__(self, myId, count, mutex):
        self.myId = myId
        self.count = count
        self.mutex = mutex
        threading.Thread.__init__(self)

    # Everything inside of run is executed in a seperate thread
    def run(self):
        for i in range(self.count):
            with self.mutex:
                print('[{}] => {}'.format(self.myId, i))


if __name__ == '__main__':
    stdoutmutex = threading.Lock()
    threads = []
    for i in range(10):
        # Create the new Thread Object
        thread = MyThread(i, 100, stdoutmutex)
        
        # The thread doesn't actually start running until
        # start() is called
        thread.start()
        threads.append(thread)

    for thread in threads:
        # join() is used to synchronize threads
        # Calling join() on a thread makes the parent thread wait
        # until the child thread has finished
        thread.join()

    print('Main thread exiting...')

Explanation

The OOP approach to Python threads requires developers to extend the threading.Thread class. The Thread class provides high level methods that support threading such as start() and join() which we discuss shortly. It has also an empty run() method that developers need to override. All of the code placed in the run() method runs in a new thread.

We are still free to use locks with OOP threads. On line 20, we acquire a lock by calling threading.Lock(). The mutex is passed to the thread object’s constructor on line 24 and is used by the thread on line 15 to aquire a lock to that standard output stream.

It’s important to note that the new thread doesn’t actually run until we call start(). The start() method is what submits the thread to the thread pool so that the Python runtime can use the thread. Never call run() directly on thread object because doing so will keep the program single threaded. The run() method is called by the Python environment when it’s the thread’s turn to run.

The example program also uses the join() method. The join() method is used to make a parent thread wait until a child thread completes. The example program creates 10 threads and needs to wait until all of the threads are finished. This is done by entering a for-each loop on line 31 and then calling join() on each of the threads. When join() is called, the parent thread sleeps until child thread’s run() method is finished. When all 10 threads are finished, the program exits.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python _thread Basic

Python 3 has the newer thread package, but the _thread package still exists for developers who are more comfortable with the 2.x API. This is a basic example derived from Programming Python: Powerful Object-Oriented Programming that demonstrates how to create threads using the _thread module.

Code

import _thread as thread, time


# This function will run in a new thread
def counter(tid, count):
    for i in range(count):
        # Simulate a delay
        time.sleep(1)
        # Print out the thread id (tid) and the current iteration
        # of our for loop
        print('[{}] => {}'.format(tid, i))


if __name__ == '__main__':
    # Enter a loop that creates 5 threads
    for i in range(5):
        # Start a new thread passing a callable and it's arguments
        # in the form of a tuple
        thread.start_new_thread(counter, (i, 5))

    time.sleep(6)
    print('Main thread exiting')

Explanation

This script creates five new threads using _thread.start_new_thread. Each time a new thread is created, the counter function is called and is passed a tuple of (i, 5). That tuple corresponds to the tid and count parameters of the counter function. Counter enters a loop that runs 5 times since 5 was passed to the second parameter of counter on line 19. It will print the thread id and current iteration of the loop.

Meanwhile, the for loop in the parent thread continues to iterate because thread.start_new_thread does not block the for loop in the main thread. By calling start_new_thread, the program’s execution runs both the for loop in the main thread and the counter function in parallel. Allowing programs to run multiple portions of code at the same time is what gives threads their power. For example, you may wish to use a thread to handle a long running database query while the user continues to interact with the program in the main thread.

One final note about threads in Python. Threads give the appearance of allowing programs to multitask and for all intents and purposes, that is what is happening in the program. Nevertheless, what is really happening is that the Python Virtual Machine is time slicing the computer instructions and allowing a few lines of code to run before switching to another set of instructions.

In other words, if a program has three threads, A, B, and C, then Thread A runs for a few moments, then Thread B, and finally Thread C. Note that there is no guarantee to the order in which threads run. It is possible that one thread may run more often than other threads or that the order of running threads is different each time.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.