Python Multiprocessing Producer Consumer Pattern

Python3 has a multiprocessing module that provides an API that’s similar to the one found in the threading module. The main selling point behind multiprocessing over threading is that multiprocessing allows tasks to run in a truly concurrent fashion by spanning multiple CPU cores while threading is still limited by the global interpreter lock (GIL). The Process class found in multiprocessing works internally by spawning new processes and providing classes that allow for data sharing between processes.

Since multiprocessing uses processes rather than threads, child processes do not share their memory with the parent process. That means we have to rely on low-level objects such as pipes to allow the processes to communicate with each other. The multiprocessing module provides high level classes similar to the ones found in threading that allow for sharing data between processes. This example demonstrates the producer consumer pattern using processes and the Queue class sharing data.

Code

import time
import os
import random
from multiprocessing import Process, Queue, Lock


# Producer function that places data on the Queue
def producer(queue, lock, names):
    # Synchronize access to the console
    with lock:
        print('Starting producer => {}'.format(os.getpid()))
        
    # Place our names on the Queue
    for name in names:
        time.sleep(random.randint(0, 10))
        queue.put(name)

    # Synchronize access to the console
    with lock:
        print('Producer {} exiting...'.format(os.getpid()))


# The consumer function takes data off of the Queue
def consumer(queue, lock):
    # Synchronize access to the console
    with lock:
        print('Starting consumer => {}'.format(os.getpid()))
    
    # Run indefinitely
    while True:
        time.sleep(random.randint(0, 10))
        
        # If the queue is empty, queue.get() will block until the queue has data
        name = queue.get()

        # Synchronize access to the console
        with lock:
            print('{} got {}'.format(os.getpid(), name))


if __name__ == '__main__':
    
    # Some lists with our favorite characters
    names = [['Master Shake', 'Meatwad', 'Frylock', 'Carl'],
             ['Early', 'Rusty', 'Sheriff', 'Granny', 'Lil'],
             ['Rick', 'Morty', 'Jerry', 'Summer', 'Beth']]

    # Create the Queue object
    queue = Queue()
    
    # Create a lock object to synchronize resource access
    lock = Lock()

    producers = []
    consumers = []

    for n in names:
        # Create our producer processes by passing the producer function and it's arguments
        producers.append(Process(target=producer, args=(queue, lock, n)))

    # Create consumer processes
    for i in range(len(names) * 2):
        p = Process(target=consumer, args=(queue, lock))
        
        # This is critical! The consumer function has an infinite loop
        # Which means it will never exit unless we set daemon to true
        p.daemon = True
        consumers.append(p)

    # Start the producers and consumer
    # The Python VM will launch new independent processes for each Process object
    for p in producers:
        p.start()

    for c in consumers:
        c.start()

    # Like threading, we have a join() method that synchronizes our program
    for p in producers:
        p.join()

    print('Parent process exiting...')

Explanation

The program demonstrates the producer and consumer pattern. We have two functions that run in their own independent processes. The producer function places supplied names on the Queue. The consumer function monitors the Queue and removes names from it as they become available.

The producer function takes three objects: a Queue, a Lock, and a List of names. It start with acquiring a lock on the console. The console is still a shared resource so we need to make sure only one Process writes to the console at a time or they will write over the top of one another. After acquiring a lock on the console, the function prints out its process id (PID).

The producer function enters a for each loop on lines 14-16. It sleeps between 0-10 seconds on line 15 to simulate a delay in processing and then it places a name on the Queue on line 16. When the for each loop is complete, the function aquires another console lock and then notifies the user it is exiting. At this point, the process ends.

The consumer function runs in it’s own process as well. It takes the Queue and the Lock as it’s parameters and then acquires a lock on the console to notify the user it is starting. The consumer prints out it’s PID also. Next the consumer enters an infinte loop on lines 30-38. It similuates sleeping on line 31 and then makes a call the queue.get() on line 34. If the queue has data, the get() method returns that data immediately and the consumer prints the data on line 38. Otherwise, get() blocks execution until data is available.

Line 41 is the entry point to the programing, using the if __name__ == ‘__main__’ test. We begin on 44 by making a list of names. The Queue object is created on line 49 and the Lock() object is made on line 52. Then on lines 57-59, we enter a for-each loop and create our producer Process objects. We use the target parameter to point the Process at the producer function and then pass in a tuple for the arguments that the function is called with.

Creating the consumers processes has one extra that that isn’t needed when creating the Producers. Lines 62-68 creates the consumer processes, but on line 67, set the daemon property to True. This is needed because the consumer function uses and infinite loop and those processes will never terminate unless they are marked as daemon processes.

Once are processes are created, we start them by calling start() on each Process object (lines 72-76). Like threads, Processes also have a join() method that can be used to synchronize a program. Our consumer processes never return, so calling join() on them would cause the program to hang, but our producer processes do return so we use join() on line 80 to cause the parent process to wait for the producer processes to exit.

Resources

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Programming Python: Powerful Object-Oriented Programming

Advertisement

Python Basic Forking

Many programs need to execute tasks simultaneously and Python provides us with a few different mechanisms for concurrent programming. One of those mechanisms is called forking, where a call is made to the underlying operating system to create a working copy of a program that’s already running. The program that created the new process is called the parent process, while the processes that are created by the parent are called the child process.

This post shows the most basic form of creating processes in Python and helps serve as a foundation to understanding forking. The example is derived from Programming Python: Powerful Object-Oriented Programming, and I added my own comments to help better explain the program.

import os


# This is a function called by the child process
def child():
    # Use os.getpid() to get the pid for this process
    print('Hello from child', os.getpid())

    # force the child process to exit right away
    # or the child process will return to the infinite loop in parent()
    os._exit(0)


def parent():
    while True:
        # Attempt to fork this program into a new process. When forking is complete
        # newpid will be non-zero for the original process, but it wil be
        # 0 in the child process
        newpid = os.fork()

        # The program now goes in two different directions at the same time
        # When newpid is 0, we call child() and the child process exits
        if newpid == 0:  # Test if this is a child process
            child()
        else:
            # If are here, then we are still in the parent process
            # We print the pid of the parent and the child process (newpid)
            print('Hello from parent', os.getpid(), newpid)
        if input() == 'q':
            break


if __name__ == '__main__':
    parent()

When run on my machine, the program shows the following output

Hello from parent 87800 87802
Hello from child 87802
k
Hello from parent 87800 87803
Hello from child 87803
k
Hello from parent 87800 87804
Hello from child 87804
k
Hello from parent 87800 87805
Hello from child 87805
q

Explanation

The hardest part to grasp about this program is that when os.fork() is called on line 19, the program actually launches a copy of itself. The operating system creates the new process and that new process gets a copy of all variables in memory and execution of the new and old programs continue after line 19. (Note: The OS may not exactly copy the parent process, but functionally speaking, the child process can be considered to be a copy of the parent).

The os.fork() function returns a number called a PID (process ID). We can test the pid to see if we are running in the parent or child process. When we are in the child process, the value returned by os.fork() is zero. So on line 23, we test for 0 and if newpid is zero, we call the child() function.

The alternative case is that we are still running in the parent process (bearing in mind, that the child process is also running at this point in time as well). If we are still in the parent process os.fork() returns a non-zero value. In that case, we use the else block to print the parent and child PID.

The parent process continues to loop until the user enters q to quit. Each time the loop iterates, a new child process is created by the parent. The parent prints its own PID (using os.getpid()) and the pid of the child on line 28.

The child process also uses os.getpid() to get its own PID. It prints its own PID on line 7 and then on line 11, we use os._exit(0) to force the child process to shut down. This is a critical step for this program! If we were to omit the call to os._exit on line 11, the child process would return to the parent function and enter the same infinite loop the parent is using.

Conclusion

This is the most basic example of creating child processes using Python. Keep in mind that processes do not share memory (unlike threads). In real world programs, processes often need to sycnchronize data from one process to another process using tools such as network sockets, databases, or files. When a child process is spawned it gets a copy of the memory of the parent process, but then functions as an independent program.

Source

Lutz, Mark. Programming Python. Beijing, OReilly, 2013

%d bloggers like this: