Python Multiprocessing Producer Consumer Pattern

Python3 has a multiprocessing module that provides an API that’s similar to the one found in the threading module. The main selling point behind multiprocessing over threading is that multiprocessing allows tasks to run in a truly concurrent fashion by spanning multiple CPU cores while threading is still limited by the global interpreter lock (GIL). The Process class found in multiprocessing works internally by spawning new processes and providing classes that allow for data sharing between processes.

Since multiprocessing uses processes rather than threads, child processes do not share their memory with the parent process. That means we have to rely on low-level objects such as pipes to allow the processes to communicate with each other. The multiprocessing module provides high level classes similar to the ones found in threading that allow for sharing data between processes. This example demonstrates the producer consumer pattern using processes and the Queue class sharing data.

Code

import time
import os
import random
from multiprocessing import Process, Queue, Lock


# Producer function that places data on the Queue
def producer(queue, lock, names):
    # Synchronize access to the console
    with lock:
        print('Starting producer => {}'.format(os.getpid()))
        
    # Place our names on the Queue
    for name in names:
        time.sleep(random.randint(0, 10))
        queue.put(name)

    # Synchronize access to the console
    with lock:
        print('Producer {} exiting...'.format(os.getpid()))


# The consumer function takes data off of the Queue
def consumer(queue, lock):
    # Synchronize access to the console
    with lock:
        print('Starting consumer => {}'.format(os.getpid()))
    
    # Run indefinitely
    while True:
        time.sleep(random.randint(0, 10))
        
        # If the queue is empty, queue.get() will block until the queue has data
        name = queue.get()

        # Synchronize access to the console
        with lock:
            print('{} got {}'.format(os.getpid(), name))


if __name__ == '__main__':
    
    # Some lists with our favorite characters
    names = [['Master Shake', 'Meatwad', 'Frylock', 'Carl'],
             ['Early', 'Rusty', 'Sheriff', 'Granny', 'Lil'],
             ['Rick', 'Morty', 'Jerry', 'Summer', 'Beth']]

    # Create the Queue object
    queue = Queue()
    
    # Create a lock object to synchronize resource access
    lock = Lock()

    producers = []
    consumers = []

    for n in names:
        # Create our producer processes by passing the producer function and it's arguments
        producers.append(Process(target=producer, args=(queue, lock, n)))

    # Create consumer processes
    for i in range(len(names) * 2):
        p = Process(target=consumer, args=(queue, lock))
        
        # This is critical! The consumer function has an infinite loop
        # Which means it will never exit unless we set daemon to true
        p.daemon = True
        consumers.append(p)

    # Start the producers and consumer
    # The Python VM will launch new independent processes for each Process object
    for p in producers:
        p.start()

    for c in consumers:
        c.start()

    # Like threading, we have a join() method that synchronizes our program
    for p in producers:
        p.join()

    print('Parent process exiting...')

Explanation

The program demonstrates the producer and consumer pattern. We have two functions that run in their own independent processes. The producer function places supplied names on the Queue. The consumer function monitors the Queue and removes names from it as they become available.

The producer function takes three objects: a Queue, a Lock, and a List of names. It start with acquiring a lock on the console. The console is still a shared resource so we need to make sure only one Process writes to the console at a time or they will write over the top of one another. After acquiring a lock on the console, the function prints out its process id (PID).

The producer function enters a for each loop on lines 14-16. It sleeps between 0-10 seconds on line 15 to simulate a delay in processing and then it places a name on the Queue on line 16. When the for each loop is complete, the function aquires another console lock and then notifies the user it is exiting. At this point, the process ends.

The consumer function runs in it’s own process as well. It takes the Queue and the Lock as it’s parameters and then acquires a lock on the console to notify the user it is starting. The consumer prints out it’s PID also. Next the consumer enters an infinte loop on lines 30-38. It similuates sleeping on line 31 and then makes a call the queue.get() on line 34. If the queue has data, the get() method returns that data immediately and the consumer prints the data on line 38. Otherwise, get() blocks execution until data is available.

Line 41 is the entry point to the programing, using the if __name__ == ‘__main__’ test. We begin on 44 by making a list of names. The Queue object is created on line 49 and the Lock() object is made on line 52. Then on lines 57-59, we enter a for-each loop and create our producer Process objects. We use the target parameter to point the Process at the producer function and then pass in a tuple for the arguments that the function is called with.

Creating the consumers processes has one extra that that isn’t needed when creating the Producers. Lines 62-68 creates the consumer processes, but on line 67, set the daemon property to True. This is needed because the consumer function uses and infinite loop and those processes will never terminate unless they are marked as daemon processes.

Once are processes are created, we start them by calling start() on each Process object (lines 72-76). Like threads, Processes also have a join() method that can be used to synchronize a program. Our consumer processes never return, so calling join() on them would cause the program to hang, but our producer processes do return so we use join() on line 80 to cause the parent process to wait for the producer processes to exit.

Resources

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Programming Python: Powerful Object-Oriented Programming

Advertisements