Python Producer Consumer with Queue

The producer / consumer pattern is a common programming construct used in multithreaded applications where one thread acts as a producer of data while other threads consume the data. A web crawler application is a use case of the producer / consumer pattern. For example, the application may have a thread dedicated to crawling the web that gathers data (producer) while other threads index and store the data (consumers).

Producer and consumer threads need a way to share data. Python’s queue module provides one of many solutions. The Queue object is a FIFO object that lets the produce thread place data on the queue. Consumer threads are blocked by the Queue until the Queue has data for the consumer thread to read. When data becomes available, the consumer thread removes data from the Queue and does its work.

Below is an example program borrowed from Programming Python: Powerful Object-Oriented Programming that shows how to use a Queue to synchronize data between producer and consumer threads. I added my own comments to the code to help explain what is happening in the program.

Code

# Specify the number of consumer and producer threads
numconsumers = 2
numproducers = 4
nummessages = 4

import _thread as thread, queue, time

# Create a lock so that only one thread writes to the console at a time
safeprint = thread.allocate_lock()

# Create a queue object
dataQueue = queue.Queue()


# Function called by the producer thread
def producer(idnum):
    # Produce 4 messages to place on the queue
    for msgnum in range(nummessages):
        # Simulate a delay
        time.sleep(idnum)

        # Put a String on the queue
        dataQueue.put('[producer id={}, count={}]'.format(idnum, msgnum))


# Function called by the consumer threads
def consumer(idnum):
    # Create an infinite loop
    while True:
        # Simulate a delay
        time.sleep(0.1)
        try:
            # Attempt to get data from the queue. Note that
            # dataQueue.get() will block this thread's execution
            # until data is available
            data = dataQueue.get()
        except queue.Empty:
            pass
        else:
            # Acquire a lock on the console
            with safeprint:
                # Print the data created by the producer thread
                print('consumer ', idnum, ' got => ', data)


if __name__ == '__main__':
    # Create consumers
    for i in range(numconsumers):
        thread.start_new_thread(consumer, (i,))
        
    # Create producers
    for i in range(numproducers):
        thread.start_new_thread(producer, (i,))
        
    # Simulate a delay
    time.sleep(((numproducers - 1) * nummessages) + 1)
    
    # Exit the program
    print('Main thread exit')

Detailed Explanation

This program shows the producer / consumer pattern in action. We begin by defining variables that specify the number of consumer threads (line 2), the number of produce threads (line 3), and the number of messages the producer threads make (line 4). The program creates a lock on line 9 so that only one thread can use the console at the same time. Then on line 12, the queue is created as a global variable.

Our first function, producer, is defined on lines 16-23. There isn’t anything fancy going on in this function. The function simply enters a for-each loop and creates 4 strings that are placed on dataQueue (line 23). Since dataQueue is a FIFO structure, worker threads will remove these Strings from dataQueue in the order they are recieved.

Lines 27-43 define the consumer thread function, consumer. This code enters an infinite loop and removes data from dataQueue and prints the String to the console. Line 36 is the critical piece of code in the consumer function. The call to get() on dataQueue removes the item at the front of the queue and stores it in the variable data. If dataQueue is empty, the consumer thread is blocked until data becomes available.

Alternatively, we could pass false to the optional block parameter on get(). That would cause the thread to continue to execute even if the queue is empty. However, we need to be prepared for situations where the queue is empty and catch the queue.Empty exception that is thrown. Our program calls pass to skip over the exception should this happen (it shouldn’t be the way, because we are using the blocking version of get()).

Lines 48-49 create our producer threads and start them. Lines 52-53 create and start the consumer threads. The producer threads call the produce function while the consumer threads call the consume function. The dataQueue object does the job of synchronizing data between threads. The produce threads write to dataQueue and consumer threads read from it. Thus, our program has created the consumer / producer pattern.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Advertisements

Python _thread Basic

Python 3 has the newer thread package, but the _thread package still exists for developers who are more comfortable with the 2.x API. This is a basic example derived from Programming Python: Powerful Object-Oriented Programming that demonstrates how to create threads using the _thread module.

Code

import _thread as thread, time


# This function will run in a new thread
def counter(tid, count):
    for i in range(count):
        # Simulate a delay
        time.sleep(1)
        # Print out the thread id (tid) and the current iteration
        # of our for loop
        print('[{}] => {}'.format(tid, i))


if __name__ == '__main__':
    # Enter a loop that creates 5 threads
    for i in range(5):
        # Start a new thread passing a callable and it's arguments
        # in the form of a tuple
        thread.start_new_thread(counter, (i, 5))

    time.sleep(6)
    print('Main thread exiting')

Explanation

This script creates five new threads using _thread.start_new_thread. Each time a new thread is created, the counter function is called and is passed a tuple of (i, 5). That tuple corresponds to the tid and count parameters of the counter function. Counter enters a loop that runs 5 times since 5 was passed to the second parameter of counter on line 19. It will print the thread id and current iteration of the loop.

Meanwhile, the for loop in the parent thread continues to iterate because thread.start_new_thread does not block the for loop in the main thread. By calling start_new_thread, the program’s execution runs both the for loop in the main thread and the counter function in parallel. Allowing programs to run multiple portions of code at the same time is what gives threads their power. For example, you may wish to use a thread to handle a long running database query while the user continues to interact with the program in the main thread.

One final note about threads in Python. Threads give the appearance of allowing programs to multitask and for all intents and purposes, that is what is happening in the program. Nevertheless, what is really happening is that the Python Virtual Machine is time slicing the computer instructions and allowing a few lines of code to run before switching to another set of instructions.

In other words, if a program has three threads, A, B, and C, then Thread A runs for a few moments, then Thread B, and finally Thread C. Note that there is no guarantee to the order in which threads run. It is possible that one thread may run more often than other threads or that the order of running threads is different each time.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python os.walk

It’s very typical for a program to have to walk a file tree. In Recursion Example — Walking a file tree, I demonstrated how to use recursion to traverse a file system. Although it’s totally possible to walk through a file system in that fashion, it’s less than ideal because Python provides os.walk for this purpose.

The following script is a modified example borrowed from Programming Python: Powerful Object-Oriented Programming that demonstrates how to traverse a file system using os.walk.

import os
import sys


def lister(root):
    # os.walk returns a tuple with the current_folder, a list of sub_folders,
    # and a list of files in the current_folder
    for (current_folder, sub_folders, files) in os.walk(root):
        print('[' + current_folder + ']')
        for sub_folder in sub_folders:

            # Unix uses / as path separators, while Windows uses \
            # If we use os.path.join, we don't need to worry about which
            # path separator to use since os.path.join tracks that for us.
            path = os.path.join(current_folder, sub_folder)
            print('\t' + path)

        for file in files:
            path = os.path.join(current_folder, file)
            print('\t' + path)


if __name__ == '__main__':
    lister(sys.argv[1])

When run, this code prints out all of the files and directories starting at the specified root folder.

Explanation

os.walk

The os.walk function does the work of traversing a file system. The function generates a tuple with three fields. The first field is the current directory that os.walk is processing. The second field is a list of sub folders found in the current folder and the last field is a list of files found in the current folder.

Combining os.walk with a for loop is a very common technique (shown on line 8). The loop continues to iterate until os.walk finishes walking through the file system. The tuple declared in the for loop is updated on each iteration of the loop, providing developers with all of the information needed to process the contents of the directory.

os.path.join

Line 15 shows an example of using os.path.join to assemble a full path to a target folder or file. It’s import to use os.path.join to assemble file paths because Unix-like system use ‘/’ to separate file paths, while Windows systems use ‘\’. Tracking the path separator could be tedious work since it requires making a determination about which operating system is running the script. That’s not very ideal so Python provides os.path.join to take care of such work. As long as os.path.join is used, the assembled file paths will use the proper path separator for the os.

References

Lutz, Mark. Programming Python. Beijing, OReilly, 2013.

Python Page Through A File

Many operating systems have command line tools that allow a user to page through a file in chunks. As a demonstration of how to read text files in Python, I used an example from Programming Python: Powerful Object-Oriented Programming.

Code

def more(text, numlines=15):
    # This splits the text into a list object based on line
    # endings
    lines = text.splitlines()

    # Now continue to loop until we are out of lines
    while lines:
        # Slice off numLines into chunk
        chunk = lines[:numlines]
        
        # Remove numLines from the beginning of lines
        lines = lines[numlines:]

        # Now loop through each line in chunk
        for line in chunk:
            # and then print a line
            print(line)
            
        # Now ask the user if we want to keep going
        if lines and input('More?') not in ['y', 'Y']:
            break

if __name__ == '__main__':
    # Import sys so that we can read command line arguments
    import sys
    
    # Next, we are grabbing the first argument from the
    # command line, and passing it the open function
    # which returns a file object. Calling read on this
    # object will dump the contents of the file into a String
    # which gets passed to our more function above
    more(open(sys.argv[1]).read(), 10)

Detailed Explanation

The comments in the code above are mine and explain what is going on in the program. The program starts by testing if this script is getting called as a standalone program or if we are importing this code as a module.

Assuming this is a standalone program, we import the sys module so that we can examine the command line arguments. The second command line argument needs to be a text file or this program will crash. We pass the name of the file to the open function, which returns a file object. Calling read() on the file object dumps the entire contents of the file into a String.

At this point, we pass the string into our more() function. It starts out by splitting the string by lines, which returns a list object. We start to loop through this list object, which continues until the list is empty.

Inside of the while loop, we slice off numLines from lines and store then in chunk. Then we remove those lines from the lines list. The next step is to print out each line in chunk. Once that is complete, we test if we still have more lines to print and if we do, we ask the user if they want to keep going or exit.

Here is the program output when run on my screen.

Patricks-MacBook-Pro:System stonesoup$ python more.py more.py
def more(text, numlines=15):
    lines = text.splitlines()

    while lines:
        chunk = lines[:numlines]
        lines = lines[numlines:]

        for line in chunk:
            print(line)
        if lines and input('More?') not in ['y', 'Y']:
More?y
            break

if __name__ == '__main__':
    import sys
    more(open(sys.argv[1]).read(), 10)

Spring Security @RolesAllowed JSR250 Kotlin

Although Spring Security provides means to secure the web tier using XML markup, it’s also critically important that developers also secure backend method to ensure that methods. This post demosntrates an application in which a developer forgot to secure a web form but luckily the backend code is secured and provides a safe guard against such an error.

Enabling JSR250

Spring Boot takes a declaritive approaching to enabling method security, but we also need to provide it with an authentication manager.

@Configuration
@EnableJpaRepositories
//The next annotation enabled @RolesAllowed annotation
@EnableGlobalMethodSecurity(jsr250Enabled = true)
//We need to extend GlobalMethodSecurityConfiguration and override the configure method
//This will allow us to secure methods
class MethodSecurityConfig : GlobalMethodSecurityConfiguration(){

    override fun configure(auth: AuthenticationManagerBuilder) {
        //In our case, we are going to use an in memory authentication
        configureAuthentication(auth)
    }
}

fun configureAuthentication(auth: AuthenticationManagerBuilder){
    auth
            .inMemoryAuthentication()
            .withUser("bob").password("bob").roles("ADMIN", "USER")
            .and()
            .withUser("gene").password("gene").roles( "USER")
}

We create a class that extends GlobalMethodSecurityConfiguration. We turn the method security on by annotating this class with @EnableGlobalMethodSecurity. By default, Spring uses it’s own @Secured annotation so if we want to use the JSR standard, we need to pass true to the jsr250Enabled annotation. Then our MethodSecurityConfig class needs to override the configure method and add an authentication scheme.

Readers may be wondering what the difference is between @Secured and @RolesAllowed annotations. There doesn’t seem to be much as both annotations seem to do the same thing. There is the possibility that other software libraries may act on @RolesAllowed and if there is such as concern, then use @Secured.

Securing Methods

Once we have enabled method security, we only need to decorate our specific methods. Here is a service class used in the example application.

@Transactional
//This is our class that we are going to secure
class BurgerService(@Autowired val burgerRepository: BurgerRepository){

    @PostConstruct
    fun init(){
        //Just popuplates the DB for the example application
        val burgers = listOf(
                BurgerOfTheDay(name = "New Bacon-ings"),
                BurgerOfTheDay(name = "Last of the Mo-Jicama Burger"),
                BurgerOfTheDay(name = "Little Swiss Bunshine Burger"),
                BurgerOfTheDay(name = "Itsy Bitsy Teeny Weenie Yellow Polka-Dot Zucchini Burger"))
        burgerRepository.save(burgers)
    }

    @PreDestroy
    fun destory(){
        //Clean up the DB when done
        burgerRepository.deleteAll()
    }

    //Any user can add a new BurgerOfTheDay
    @RolesAllowed(value = *arrayOf("USER", "ADMIN"))
    fun saveBurger(burgerOfTheDay: BurgerOfTheDay) = burgerRepository.save(burgerOfTheDay)

    //But only adminstrators get to delete burgers
    @RolesAllowed(value = "ADMIN")
    fun deleteBurger(id : Long) = burgerRepository.delete(id)

    //Any user gets to see our Burgers
    @RolesAllowed(value = *arrayOf("USER", "ADMIN"))
    fun allBurgers() = burgerRepository.findAll()
}

The @RolesAllows annotation takes an array of allowed roles. In our case, we are letting anyone with the USER role to add burgers, but only ADMIN users are allowed to delete burgers. If a user without the ADMIN role attempts to invoke deleteBurger, an AccessDeniedException is thrown.

Catching Security Violations

Kotlin has no concept of checked exceptions, but Java users should note that Spring’s security exceptions are all RuntimeExceptions. If we want to report a security violation back to the user, we need to catch our security exceptions. Here is an example Controller class that handles security violations.

@Controller
class IndexController(
        @Autowired val logger : Logger,
        @Autowired val burgerService: BurgerService) {

    @GetMapping("/")
    fun doGet(model : Model) : String {
        model.addAttribute("burgers", burgerService.allBurgers().toList())
        return "index"
    }

    @PostMapping("/add")
    fun saveBurger(
            @RequestParam("burgerName") burgerName : String,
            model : Model) : String {
        try {
            burgerService.saveBurger(BurgerOfTheDay(name=burgerName))
            model.addAttribute("burgers", burgerService.allBurgers().toList())
            model.addAttribute("info", "Burger has been added")
        } catch (e : Exception){
            when (e){
                is AccessDeniedException -> {
                    logger.info("Security Exception")
                }
                else -> logger.error(e.toString(), e)
            }
        } finally {
            return "index"
        }
    }

    @PostMapping("/delete")
    fun deleteBurgers(
            @RequestParam("ids") ids : LongArray,
                      model: Model) : String {

        var errorThrown = false

        ids.forEach {
            try {
                burgerService.deleteBurger(it)

                //If the user doesn't have permission to invoke a method,
                //we will get AccessDeniedException which we handle and notify the user of the error
            } catch (e : Exception){
                when (e) {
                    is AccessDeniedException -> {
                        model.addAttribute("error", "Only Bob gets to delete burgers!")
                        logger.info("Security error")
                    }
                    else -> logger.error(e.toString(), e)
                }
                errorThrown = true
            }
        }
        model.addAttribute("burgers", burgerService.allBurgers().toList())
        if(!errorThrown){
            model.addAttribute("info", "Deleted burgers")
        }
        return "index"
    }
}

You’ll ntoice that the deleteBurgers method looks for AccessDeniedException (which is handled by Koltin’s powerful when block). In our case, we report an error that only Bob get’s to delete burgers.

Putting it all together

Here is a video of a sample web application that demonstrates this code in action.

The code for the example application is available at my GitHub page.

You can also learn more about Spring MVC by referring to the following posts.

Spring Boot Caching with Kotlin

It’s fairly common for applications to continually ask a datastore for the same information repeatedly. Requests to datastores consume application resources and thus have a performance cost even when the requested data is small. The Spring Platform provides a solution allows applications to store information in an in memory caching system that allows applications to check the cache for the required data prior to making a call to the database. This example shows how to use Spring Boot and Kotlin to cache files that we are storing in the database.

Database Entity

We are going to define a database entity that stores files in a database. Since retrieving such data can be an expesive call to the database, we are going to cache this entity.

@Entity
data class PersistedFile(
        @field: Id @field: GeneratedValue var id : Long = 0,
        var fileName : String = "",
        var mime : String = "",
        @field : Lob var bytes : ByteArray? = null)

You will notice that this class has a ByteArray field that is stored as a LOB in the database. In theory, this could be as many bytes as the system allows so ideally we would store this in cache. Other good candidates are entity classes that have complex object graphs and may result in the ORM generated complex SQL to retreive the managed object.

Enable Caching

Spring Boot defines a CachingManager internally for the application. You are free to use your own, but you need to configure your Spring Boot environment first.

Dependencies

You need to have spring-boot-starter-cache in your pom.xml or other dependency manager.


    org.springframework.boot
    spring-boot-starter-web

Annotation

You also need to tell the environment to turn on caching by using the @EnableCaching

@SpringBootApplication
@EnableJpaRepositories
@EnableCaching  //Spring Boot provides a CacheManager our of the box
                //but it only turns on when this annotation is present
class CachingTutorialApplication

Decorate the Caching Methods

At this point, we only need to decorate the methods we want the environment cache. This is done by decorating our methods with the @Cacheable annotation and then providing the annotation with the name of a cache. We can also optionally tell the cache manager what to use for the key. Here is the code for our service class followed by an explanation.

//We are going to use this class to handle caching of our PersistedFile object
//Normally, we would encapsulate our repository, but we are leaving it public to keep the code down
@Service
class PersistedFileService(@Autowired val persistedFileRepository: PersistedFileRepository){

    //This annotation will cause the cache to store a persistedFile in memory
    //so that the program doesn't have to hit the DB each time for the file.
    //This will result in faster page load times. Since we know that managed objects
    //have unique primary keys, we can just use the primary key for the cache key
    @Cacheable(cacheNames = arrayOf("persistedFile"), key="#id")
    fun findOne(id : Long) : PersistedFile = persistedFileRepository.findOne(id)

    //This annotation will cause the cache to store persistedFile ids
    //By storing the ids, we don't need to hit the DB to know if a file exists first
    @Cacheable(cacheNames = arrayOf("persistedIds"))
    fun exists(id: Long?): Boolean = persistedFileRepository.exists(id)
}

The first method, findOne, is used to look up a persistedFile object from the database. You will notice that we pass persistedFile as an argument to cacheNames and then use the primary key as the key for this item’s cache. We can use the PK because we know it’s a unique value so we can help make the cache more performant. However, keep in mind that the key is optional.

We can also avoid another call to the database by storing if items exist in the database in the cache. The first time exists() is called, the application will fire a count sql statement to the database. On subsequent calls, the cache will simply return true or false depending on what is stored in the cache.

Putting it all together

I put together a small web application that demonstates the caching working together. I turned on the show sql property in the applications.properties file so that viewers can see when the application is making calls to the database. You will notice that the first time I retreive the persisted file, there is sql generated. However, on the second call to the same object, no sql is generated because the application isn’t making a call to the database.

You can get the complete code from my GitHub page at this link.

Here are some links to posts that are related to concepts used in Spring Boot that we used today.

Spring Boot Kotlin & MongoDB

MongoDB is a NoSQL database that works really well with Kotlin and Spring Boot. MongoDB is incredibly useful in situations where the structure of data isn’t known prior to writing the application. For example, picture a blogging website where users can enter any number of comments or response. Modeling such a data structure would be difficult in a relational database, but it’s much easier with Mongo.

In this example application, we are going to use MongoDB to document Restaurants with any number of employees (of course, a simple example such as this can be done in a relational database, but let’s go with this for simplicity sake). The cool part using Mongo with Spring Boot is that there is zero configuration providing you are using default settings. This let’s us jump right into our code.

Let’s begin by creating a couple of data classes to store in our database.

//Create a document class
//that persists to the DB
@Document
data class Restaurant(
        //Mark this field as the document id
        @field: Id var name : String = "",
        //Unstructured Data Here
        var employees : List = mutableListOf())

//This class embeds directly into Restaurant
//without any annotations
data class Employee(var name : String = "",
                    var position : String = "")

Our Restaurant class is annotated with @Document to mark it as a persistable class. We also annotate the name field with the Id annotation to mark it as the document id. This value has to be unique in the database. The other class is Employee which does not have any annotations at all. It’s used as a property in the Employees database and the persistence provide is able store all of employee objects embedded in Restaurant.

Our next class is a repository class which Spring will generate the implementation for us. Before this can happen, we have to enable mongo repositories. All we need to do is annotate a configuration class to make this happen.

@Configuration
@EnableMongoRepositories //Allow Spring to Generate Mongo Repositories
class Config

Once we have enabled the mongo repositories, we just need to define an interface that extends MongoRespository.

//Spring will implement our interface for us!
interface RestaurantRepository : MongoRepository

Now let’s make a controller class to test our application. See this post for an explanation of Spring MVC.

//Example Controller class for demonstration purposes
@Controller
@RequestMapping("/")
class IndexController(
        //We can inject our RestaurantRepository class, Spring will
        //provide an implementation
        @Autowired private val restaurantRepository: RestaurantRepository){

    @RequestMapping(method = arrayOf(RequestMethod.GET))
    fun doGet(model : Model) : String {
        model.apply {
            addAttribute("restaurant", Restaurant())
            //Query all Restaurants
            addAttribute("allRestaurants", restaurantRepository.findAll())
        }
        return "index"
    }

    @RequestMapping(method = arrayOf(RequestMethod.POST))
    fun doPost(@RequestParam("name") name : String,
               @RequestParam("employees") employees : String,
               model : Model) : String {
        val restaurant = Restaurant(name = name,
                                    employees = parseEmployees(employees))
        //Save the new restaurant
        restaurantRepository.save(restaurant)
        model.apply {
            addAttribute("restaurant", Restaurant())
            //Query all Restaurants
            addAttribute("allRestaurants", restaurantRepository.findAll())
        }
        return "index"
    }

    fun parseEmployees(employees : String) : List {
        val employeeList = mutableListOf()
        val parts = employees.split('\n')

        parts.forEach {
            val subParts = it.split(",")
            employeeList.add(
                    Employee(name = subParts[0],
                            position = subParts[1]))
        }
        return employeeList.toList()
    }
}

Notice that we can directly inject RestaurantRepository into our controller. Spring does the work of providing an implementation for our controller class. In our doPost() method, we call restaurantRepository.save() to save our new document. In both doGet() and doPost(), we call restaurantRepository.findAll() to pull back all of our restaurants stored in the database.

Now we just need an HTML template to provide us with front end code.
indexcode

Conclusion

Here is an example of the application when run.


As you can see, Spring Boot combined with Kotlin makes it really easy to persist data into MongoDB. We only need to define a few data classes and allow Spring to make our Repository classes for us in order to get started.

You can view the code for this project at my GitHub page at this link.