Python Split and Join file

The book Programming Python: Powerful Object-Oriented Programming has an example program that shows how to split and join files. Many utilities exist for such an operation but the program offers a good working example of how to read from and write to binary files in Python3. The code below is an adaptation from the book with my own comments added.

Code

import os


def split(source, dest_folder, write_size):
    # Make a destination folder if it doesn't exist yet
    if not os.path.exists(dest_folder):
        os.mkdir(dest_folder)
    else:
        # Otherwise clean out all files in the destination folder
        for file in os.listdir(dest_folder):
            os.remove(os.path.join(dest_folder, file))

    partnum = 0

    # Open the source file in binary mode
    input_file = open(source, 'rb')

    while True:
        # Read a portion of the input file
        chunk = input_file.read(write_size)

        # End the loop if we have hit EOF
        if not chunk:
            break

        # Increment partnum
        partnum += 1

        # Create a new file name
        filename = os.path.join(dest_folder, ('part%004' % partnum))

        # Create a destination file
        dest_file = open(filename, 'wb')

        # Write to this portion of the destination file
        dest_file.write(chunk)

        # Explicitly close 
        dest_file.close()
    
    # Explicitly close
    input_file.close()
    
    # Return the number of files created by the split
    return partnum


def join(source_dir, dest_file, read_size):
    # Create a new destination file
    output_file = open(dest_file, 'wb')
    
    # Get a list of the file parts
    parts = os.listdir(source_dir)
    
    # Sort them by name (remember that the order num is part of the file name)
    parts.sort()

    # Go through each portion one by one
    for file in parts:
        
        # Assemble the full path to the file
        path = os.path.join(source_dir, file)
        
        # Open the part
        input_file = open(path, 'rb')
        
        while True:
            # Read all bytes of the part
            bytes = input_file.read(read_size)
            
            # Break out of loop if we are at end of file
            if not bytes:
                break
                
            # Write the bytes to the output file
            output_file.write(bytes)
            
        # Close the input file
        input_file.close()
        
    # Close the output file
    output_file.close()

Explanation

split

The code snippet shows to sample functions that either split a file into parts or join those parts back together into one file. The split function begins by taking three parameters. The first parameter, source, is the file that we wish to split. The second parameter, dest_folder, is a folder that stores the output files created by the split operation. The final parameter, write_size, is the size of the file parts in bytes.

Split starts by checking if dest_folder exists or not. If the folder does not exist, we call os.mkdir to create a new folder on the file system. Otherwise, we obtain a list of all files in the folder by calling os.listdir and then remove all of them by calling os.remove. When calling os.remove, we use os.path.join to create a full path to the target file that’s getting deleted.

Once the destination folder has been prepared, the function continues by performing the actually split operation. A partnum variable is created on line 13 that tracks the number of file parts created by the split operation. The source file is opened on line 16 in binary mode. Binary mode is used in this case because we could be dealing with audio or video files and not just text files.

The split function enters an infinite loop on line 18. On line 20, we read a number of bytes, specified by write_size, from the source file and store them in the chunk variable. On line 23, we test if chunk actually recieved any bytes from the read operation. If chunk did not read any bytes, then we have hit end of file (EOF) and we break out of the loop. Otherwise, we increment partnum by one and begin to write the file part.

Line 30 creates the name and destination for the file part by using os.path.join, the dest_folder, and a string template that accepts the current part number. The destination file is created on line 33 with a call to open (also in binary mode) and then on line 36, we write chunk to the file. Line 39 has an explicit call to closing the file. While we normally wait for files to close in garabage collection, this function opens a lot of files so ideally we should close them in oder to make sure we don’t exceed the number of file handles the underlying OS allows. The function ends by closing the input_file and returning the number of part files created.

join

The join function does the reverse job of the split function. It begins by accepting a source_dir, a destination file, and the size of the part files. The output_file is created on line 50 (opened in binary mode) and then on line 53, we use os.listdir to get a list of all parts.

Since our part files contain a number that identifies the parts, we can store all parts in a list and call sort() on it. Then it’s just a matter of looping through all of the parts and assembling them into a single file. The for loop starts on line 59. On line 62, we use os.path.join to create a full path to the part file and then we can open the part file on line 65.

The program enters an infinite join loop on line 67. Inside of the while loop, we read a part of the input_file and return the bytes read. If bytes is empty, we have it end of file so we can test for this on line 72 and use break to end the while loop if we have hit end of file. Otherwise, we can write to the output file on line 76.

When we have finished reading our part file, we again close it explicitly on line 79. When all parts of have been read we close the output_file. The output_file contains the bytes of the original file that was split in the first places

Thoughts

The code contained in this post isn’t ideal for production but is instead meant to be a learning tool. In this code, we cover reading and writing to binary files and functions of the os module. There are areas we could improve this code. For example, split destroys the contents of the destination folder, but ideally, it should instead throw an exception back to the caller and let the caller delete all files in a folder instead.

We also don’t test if our input files are really files and if our folders are really folders. That is certainly an area for improvement. Another thing that could be improved upon is using an enumeration for the size of the file parts. Right now, write_size in split and read_size in join are specified in bytes, but that isn’t clear to clients of these functions.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Find Python Source Files in Home Directory

Truthfully, most users aren’t very interested in finding the largest and smallest Python source files in their home directory, but doing so does provide for an exercise in walking the file tree and using tools from the os module. The program in this post is a modified example taken from Programming Python: Powerful Object-Oriented Programming where the user’s home directory is scanned for all Python source files. The console outputs the two smallest files (in bytes) and the two largest files.

Code

import os
import pprint
from pathlib import Path

trace = False

# Get the user's home directory in a platform neutral fashion
dirname = str(Path.home())

# Store the results of all python files found
# in home directory
allsizes = []

# Walk the file tree
for (current_folder, sub_folders, files) in os.walk(dirname):
    if trace:
        print(current_folder)

    # Loop through all files in current_folder
    for filename in files:

        # Test if it's a python source file
        if filename.endswith('.py'):
            if trace:
                print('...', filename)

            # Assemble the full file python using os.path.join
            fullname = os.path.join(current_folder, filename)

            # Get the size of the file on disk
            fullsize = os.path.getsize(fullname)

            # Store the result
            allsizes.append((fullsize, fullname))

# Sort the files by size
allsizes.sort()

# Print the 2 smallest files
pprint.pprint(allsizes[:2])

# Print the 2 largest files
pprint.pprint(allsizes[-2:])

Sample Output

[(0,
  '/Users/stonesoup/.local/share/heroku/client/node_modules/node-gyp/gyp/pylib/gyp/generator/__init__.py'),
 (0,
  '/Users/stonesoup/.p2/pool/plugins/org.python.pydev.jython_5.4.0.201611281236/Lib/email/mime/__init__.py')]
[(219552,
  '/Users/stonesoup/.p2/pool/plugins/org.python.pydev.jython_5.4.0.201611281236/Lib/decimal.py'),
 (349239,
  '/Users/stonesoup/Library/Caches/PyCharmCE2017.1/python_stubs/348993582/numpy/random/mtrand.py')]

Explanation

The program starts with a trace flag that’s set to false. When set to True, the program will print detailed information about what is happening in the program. On line 8, we grab the user’s home directory using Path.home(). This is a platform nuetral way of finding a user’s home directory. Notice that we do have to cast this value to a String for our purposes. Finally we create an empty allsizes list that holds our results.

Starting on line 15, we use the os.walk function and pass in the user’s home directory. It’s a common pattern to combine os.walk with a for loop so that we can traverse an entire directory tree. Each iteration os.walk returns a tuple that contains the current_folder, sub_folders, and files in the current folder. We are interested in the files.

Starting on line 20, the program enters a nested for each loop that examines each file individually. On line 23, we test if the file ends with ‘.py’ to see if it’s a Python source file. Should the test return True, we continue by using os.path.join to assemble the full path to the file. The os.path.join function takes into account the underlying operating system’s path separator, so on Unix like systems, we get / while Windows systems get \ as a path separator. The file’s size is computed on line 31 using os.path.getsize. Once we have the size and the file path, we can add the result to allsizes for later use.

The program has finished scanning the user’s home folder once the program reaches line 37. At this point, we can sort our results from smallest to largest by using the sort() method on allsizes. Line 40 prints the two smallest files (using pretty print for better formatting) and line 43 prints the two largest files.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Multiprocessing Producer Consumer Pattern

Python3 has a multiprocessing module that provides an API that’s similar to the one found in the threading module. The main selling point behind multiprocessing over threading is that multiprocessing allows tasks to run in a truly concurrent fashion by spanning multiple CPU cores while threading is still limited by the global interpreter lock (GIL). The Process class found in multiprocessing works internally by spawning new processes and providing classes that allow for data sharing between processes.

Since multiprocessing uses processes rather than threads, child processes do not share their memory with the parent process. That means we have to rely on low-level objects such as pipes to allow the processes to communicate with each other. The multiprocessing module provides high level classes similar to the ones found in threading that allow for sharing data between processes. This example demonstrates the producer consumer pattern using processes and the Queue class sharing data.

Code

import time
import os
import random
from multiprocessing import Process, Queue, Lock


# Producer function that places data on the Queue
def producer(queue, lock, names):
    # Synchronize access to the console
    with lock:
        print('Starting producer => {}'.format(os.getpid()))
        
    # Place our names on the Queue
    for name in names:
        time.sleep(random.randint(0, 10))
        queue.put(name)

    # Synchronize access to the console
    with lock:
        print('Producer {} exiting...'.format(os.getpid()))


# The consumer function takes data off of the Queue
def consumer(queue, lock):
    # Synchronize access to the console
    with lock:
        print('Starting consumer => {}'.format(os.getpid()))
    
    # Run indefinitely
    while True:
        time.sleep(random.randint(0, 10))
        
        # If the queue is empty, queue.get() will block until the queue has data
        name = queue.get()

        # Synchronize access to the console
        with lock:
            print('{} got {}'.format(os.getpid(), name))


if __name__ == '__main__':
    
    # Some lists with our favorite characters
    names = [['Master Shake', 'Meatwad', 'Frylock', 'Carl'],
             ['Early', 'Rusty', 'Sheriff', 'Granny', 'Lil'],
             ['Rick', 'Morty', 'Jerry', 'Summer', 'Beth']]

    # Create the Queue object
    queue = Queue()
    
    # Create a lock object to synchronize resource access
    lock = Lock()

    producers = []
    consumers = []

    for n in names:
        # Create our producer processes by passing the producer function and it's arguments
        producers.append(Process(target=producer, args=(queue, lock, n)))

    # Create consumer processes
    for i in range(len(names) * 2):
        p = Process(target=consumer, args=(queue, lock))
        
        # This is critical! The consumer function has an infinite loop
        # Which means it will never exit unless we set daemon to true
        p.daemon = True
        consumers.append(p)

    # Start the producers and consumer
    # The Python VM will launch new independent processes for each Process object
    for p in producers:
        p.start()

    for c in consumers:
        c.start()

    # Like threading, we have a join() method that synchronizes our program
    for p in producers:
        p.join()

    print('Parent process exiting...')

Explanation

The program demonstrates the producer and consumer pattern. We have two functions that run in their own independent processes. The producer function places supplied names on the Queue. The consumer function monitors the Queue and removes names from it as they become available.

The producer function takes three objects: a Queue, a Lock, and a List of names. It start with acquiring a lock on the console. The console is still a shared resource so we need to make sure only one Process writes to the console at a time or they will write over the top of one another. After acquiring a lock on the console, the function prints out its process id (PID).

The producer function enters a for each loop on lines 14-16. It sleeps between 0-10 seconds on line 15 to simulate a delay in processing and then it places a name on the Queue on line 16. When the for each loop is complete, the function aquires another console lock and then notifies the user it is exiting. At this point, the process ends.

The consumer function runs in it’s own process as well. It takes the Queue and the Lock as it’s parameters and then acquires a lock on the console to notify the user it is starting. The consumer prints out it’s PID also. Next the consumer enters an infinte loop on lines 30-38. It similuates sleeping on line 31 and then makes a call the queue.get() on line 34. If the queue has data, the get() method returns that data immediately and the consumer prints the data on line 38. Otherwise, get() blocks execution until data is available.

Line 41 is the entry point to the programing, using the if __name__ == ‘__main__’ test. We begin on 44 by making a list of names. The Queue object is created on line 49 and the Lock() object is made on line 52. Then on lines 57-59, we enter a for-each loop and create our producer Process objects. We use the target parameter to point the Process at the producer function and then pass in a tuple for the arguments that the function is called with.

Creating the consumers processes has one extra that that isn’t needed when creating the Producers. Lines 62-68 creates the consumer processes, but on line 67, set the daemon property to True. This is needed because the consumer function uses and infinite loop and those processes will never terminate unless they are marked as daemon processes.

Once are processes are created, we start them by calling start() on each Process object (lines 72-76). Like threads, Processes also have a join() method that can be used to synchronize a program. Our consumer processes never return, so calling join() on them would cause the program to hang, but our producer processes do return so we use join() on line 80 to cause the parent process to wait for the producer processes to exit.

Resources

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Programming Python: Powerful Object-Oriented Programming

Python Signals

Python has a signal module that is used to respond to signals generated by the operating system. Signals are a very low-level form of interprocess communication, but there are some cases where a program may wish to respond to a signal. For example, it may be useful to watch for program signals when writing developer toolkits.

This post demonstrates how to respond to an alarm signal. The example is borrowed from Programming Python: Powerful Object-Oriented Programming. I added my own comments to help explain the workings of the program.

Code

import sys, signal, time


# Function that returns the time
def now():
    return time.asctime()


# Function that handles the signal
def onSignal(signum, stackframe):
    print('Got alarm', signum, 'at', now())


while True:
    print('Setting at', now())
    
    # This tells the program to respond to the alarm signal
    # by calling the onSignal function
    signal.signal(signal.SIGALRM, onSignal)
    
    # Raise SIGALRM (Note this can be done by other processes also)
    signal.alarm(5)
    
    signal.pause()

Explanation

The code defines an onSignal function that works as a handler to operating system signals found on lines 10-11. All it does is prints text to the console. On line 19, we register onSignal as a handler for the SIGALRM os signal. Line 22 shows how to raise an os signal, which then invokes onSignal. Note that we don’t have to have our programs actually raise signals. We can also simply listen for other os signals raised by other programs (for example, the kill signal which is raised by executing killall in a unix shell).

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Sockets

Network sockets are extremely useful for interprocess communication (IPC). Not only do network sockets allow processes to communicate on the same machine, but we can also use sockets to communicate over a network. This post shows the most basic demonstration of network sockets using an example borrowed from Programming Python: Powerful Object-Oriented Programming. I added my own comments to help explain the program.

Code

from socket import socket, AF_INET, SOCK_STREAM

port = 50008
host = 'localhost'


# Function to create a server
def server():
    # Create a network socket
    sock = socket(AF_INET, SOCK_STREAM)

    # Bind the socket to localhost with our port
    sock.bind(('', port))

    # Listen for up to 5 connections
    sock.listen(5)
    while True:
        # Wait for a client
        conn, addr = sock.accept()

        # Grab a megabyte of data from the client
        data = conn.recv(1024)

        # Create a reply string
        reply = 'server got: [{}]'.format(data)

        # Send the reply back to the client
        conn.send(reply.encode())


# Function to create a socket client
def client(name):
    # Create a socket
    sock = socket(AF_INET, SOCK_STREAM)

    # Connect the socket to the server
    sock.connect((host, port))

    # Send a message to the server
    sock.send(name.encode())

    # Receive a megabyte of data from the server
    reply = sock.recv(1024)

    # Close our connection
    sock.close()

    # Print the output
    print('Client got: [{}]'.format(reply))


if __name__ == '__main__':
    from threading import Thread

    # Create a thread for the server
    sthread = Thread(target=server)
    sthread.daemon = True
    sthread.start()

    # Create 5 client threads
    for i in range(5):
        Thread(target=client, args=('client{}'.format(i),)).start()

Explanation

The example program creates a basic client / server program. The program uses threads to help keep the program simple. One thread calls the server function defined on lines 8-28 and the remaining five threads call the client function found on line 32-49. The server thread creates a network server that accepts up to five connections from the client threads.

The server function starts by creating a socket object (called sock). On line 13, the program binds our socket to the machines localhost address and the port number specified on line 3. On line 16, the socket waits for up to five connections. Then the server enters a loop on line 17.

Inside of the of the loop, we have a call to sock.accept(). This function accepts a connection from a client and returns a connection and address object. Out program only uses the connection object. The program reads data from the client on line 22 using conn.recv. The conn.recv function takes a number of bytes to read from the client. The conn.recv returns binary information and the program stores it in the data varaible. Lines 25 and 28 show how to send information back to the client using conn.send. The conn.send function expects binary information, which is why we call encode() on the reply variable.

The client function acts almost exactly like the server function. The socket client is created on line 34. We use the connect function (line 37) to connect to the server and pass it a tuple containing the host and the port number. Unlike the server, which has its own dedicated connection object, the client uses the socket object itself to send and receive information to and from the server. On line 40, the program calls sock.send() and passes it a binary string to send to the server. The response from the server is collected on line 43 using sock.recv(). When the client is finished, it needs to close its connection to the server using sock.close().

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Basic Pipes

Python provides two main avenues of parallel processing. One avenue is to use multithreading where a program itself multitasks, while the other approach is to have a program relaunch itself as a separate program in a new process. One approach is not necessarily better than the other approach but instead, should be throught of as tools for different use cases. Threads have low overhead and share a program’s memory space, which allows for easy communication between threads. Processes operate as if we launched a new copy of the program from our operating system and allow programs to spread themselves out over an operating system or even a network.

However, processes do not share a global memory space, which means they need a way to communicate with one another. One approach to interprocess communication (IPC) is to use pipes. This post shows an example of IPC using pipes taken from Programming Python: Powerful Object-Oriented Programming. I have added my own comments to the code for clarity.

Code

import os, time


# Function called by child processes
def child(pipeout):
    zzz = 0
    while True:
        time.sleep(zzz)

        # We have to encode our string to binary to use
        # with pipes
        msg = ('Spam {}'.format(zzz)).encode()

        # Send the data back to the parent process
        os.write(pipeout, msg)
        zzz = (zzz + 1) % 5


def parent():
    # Creates our pipes. The pipeout gets passed to the child
    # process while parent keeps pipein
    pipein, pipeout = os.pipe()

    if os.fork() == 0:
        # We are now in the child process so call child and supply
        # it with pipeout so that it can send information back to
        # the parent.
        child(pipeout)
    else:
        # This is the parent process
        while True:
            # Read data from the child process
            # This call blocks until there is data
            line = os.read(pipein, 32)

            # Print to the console
            print('Parent {} got [{}] as {}'.format(os.getpid(), line, time.time()))


if __name__ == '__main__':
    parent()

Explanation

We have two functions in the program named child() and parent(). The child() function is intended to run in child processes while parent() contains the main program. Parent() is defined on lines 19-37. The function begins by calling os.pipe() on line 22 which returns a tuple containing two ends of a single pipe. Pipes are unidirectional and thus pipein is used by the parent to read data that comes from the child process. The child process uses pipeout to send data to the parent.

The program forks into two different processes on line 24. The program is in the child process when os.fork() returns zero. Line 28 calls the child() function and passes pipeout to the child function so that the child process can send data back to the parent. The child process enters an infinite loop on line 7. On line 12, a msg variable is created that contains a String variable. Pipe send binary data, so we have to call encode() on the String to convert it to a binary string. Then on line 15, we send the msg varaiable back to the parent using os.write and supplying pipeout and msg to that function.

The parent process continues on line 31. It attempts to read data from the child process on line 34 using os.read. Notice that os.read requires a pipein variable and the size of binary data to read (32 bytes in this program). If the pipe contains data, os.read returns immedialy and stores the value in the line variable. Otherwise, os.read blocks the program until the pipe has data. The parent process prints the data on line 37.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Fork() Exit Status

Most computer programs return an exit code to the operating system’s shell. Shell scripting tools can use the exit status of a program to indicate if the program exited normally or abnormally. In either case, the shell script can react depending on the outcome of the child process.

Python programs can use os.fork() to create child processes. Since the child process is a new instance of the program, it can be useful in some cases to inspect if the child process exited normally or not. For example, a GUI program may spawn a child process and notify the user if the operation completed successfully or not.

This post shows an example program taken from Programming Python: Powerful Object-Oriented Programming that demonstrates how a parent process can inspect a child process’ exit code. I added comments to help explain the workings of the program.

Code

import os

exitstat = 0


# Function that is executed after os.fork() that runs in a new process
def child():
    global exitstat
    exitstat += 1
    print('Hello from child', os.getpid(), exitstat)
    
    # End this process using os._exit() and pass a status code back to the shell
    os._exit(exitstat)


# This is the parent process code
def parent():
    while True:
        # Fork this program into a child process
        newpid = os.fork()
        
        # newpid is 0 if we are in the child process
        if newpid == 0:
            # Call child()
            child()
            
        # otherwise, we are still in the parent process
        else:
            # os.wait() returns the pid and status and status code
            # On unix systems, status code is stored in status and has to
            # be bit-shifted
            pid, status = os.wait()
            print('Parent got', pid, status, (status >> 8))
            if input() == 'q':
                break


if __name__ == '__main__':
    parent()

Explanation

This program is pretty basic. We have two functions, parent() and child(). When the program starts on line 38, it calls parent() to enter the parent() function. The parent() function enters and infinite loop that forks this program on line 20. The result of os.fork() is stored in the newpid variable.

Our program is executing in the child process when newpid is zero. If that case, we call our child() function. The child() function prints a message to the console on line 10 and then exits by calling os._exit() on line 13. We pass the exitstat variable to os._exit() whose value becomes the exit code for this process.

The parent process continues in the meantime. On line 32, we use os.wait() to return the pid and status of the child process. The status variable also containes the exitstat value passed to os._exit() in the child process, but to get this code, we have to perform a bit shift operation by eight bits. The following line prints the pid, status, and the child process’ exit code to the console. When the user presses ‘q’, the parent process ends.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Producer Consumer with Queue

The producer / consumer pattern is a common programming construct used in multithreaded applications where one thread acts as a producer of data while other threads consume the data. A web crawler application is a use case of the producer / consumer pattern. For example, the application may have a thread dedicated to crawling the web that gathers data (producer) while other threads index and store the data (consumers).

Producer and consumer threads need a way to share data. Python’s queue module provides one of many solutions. The Queue object is a FIFO object that lets the produce thread place data on the queue. Consumer threads are blocked by the Queue until the Queue has data for the consumer thread to read. When data becomes available, the consumer thread removes data from the Queue and does its work.

Below is an example program borrowed from Programming Python: Powerful Object-Oriented Programming that shows how to use a Queue to synchronize data between producer and consumer threads. I added my own comments to the code to help explain what is happening in the program.

Code

# Specify the number of consumer and producer threads
numconsumers = 2
numproducers = 4
nummessages = 4

import _thread as thread, queue, time

# Create a lock so that only one thread writes to the console at a time
safeprint = thread.allocate_lock()

# Create a queue object
dataQueue = queue.Queue()


# Function called by the producer thread
def producer(idnum):
    # Produce 4 messages to place on the queue
    for msgnum in range(nummessages):
        # Simulate a delay
        time.sleep(idnum)

        # Put a String on the queue
        dataQueue.put('[producer id={}, count={}]'.format(idnum, msgnum))


# Function called by the consumer threads
def consumer(idnum):
    # Create an infinite loop
    while True:
        # Simulate a delay
        time.sleep(0.1)
        try:
            # Attempt to get data from the queue. Note that
            # dataQueue.get() will block this thread's execution
            # until data is available
            data = dataQueue.get()
        except queue.Empty:
            pass
        else:
            # Acquire a lock on the console
            with safeprint:
                # Print the data created by the producer thread
                print('consumer ', idnum, ' got => ', data)


if __name__ == '__main__':
    # Create consumers
    for i in range(numconsumers):
        thread.start_new_thread(consumer, (i,))
        
    # Create producers
    for i in range(numproducers):
        thread.start_new_thread(producer, (i,))
        
    # Simulate a delay
    time.sleep(((numproducers - 1) * nummessages) + 1)
    
    # Exit the program
    print('Main thread exit')

Detailed Explanation

This program shows the producer / consumer pattern in action. We begin by defining variables that specify the number of consumer threads (line 2), the number of produce threads (line 3), and the number of messages the producer threads make (line 4). The program creates a lock on line 9 so that only one thread can use the console at the same time. Then on line 12, the queue is created as a global variable.

Our first function, producer, is defined on lines 16-23. There isn’t anything fancy going on in this function. The function simply enters a for-each loop and creates 4 strings that are placed on dataQueue (line 23). Since dataQueue is a FIFO structure, worker threads will remove these Strings from dataQueue in the order they are recieved.

Lines 27-43 define the consumer thread function, consumer. This code enters an infinite loop and removes data from dataQueue and prints the String to the console. Line 36 is the critical piece of code in the consumer function. The call to get() on dataQueue removes the item at the front of the queue and stores it in the variable data. If dataQueue is empty, the consumer thread is blocked until data becomes available.

Alternatively, we could pass false to the optional block parameter on get(). That would cause the thread to continue to execute even if the queue is empty. However, we need to be prepared for situations where the queue is empty and catch the queue.Empty exception that is thrown. Our program calls pass to skip over the exception should this happen (it shouldn’t be the way, because we are using the blocking version of get()).

Lines 48-49 create our producer threads and start them. Lines 52-53 create and start the consumer threads. The producer threads call the produce function while the consumer threads call the consume function. The dataQueue object does the job of synchronizing data between threads. The produce threads write to dataQueue and consumer threads read from it. Thus, our program has created the consumer / producer pattern.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.