Sunday, September 16, 2012

Variations on producer consumer in C++

Image made by Stefaan Himpe in libreoffice draw, available under CC-BY-SA license.

Problem

Paraphrased from wikipedia:
The consumer producer problem is a classical example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common buffer used as a queue. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer and doing something with it) one piece at a time.
As an example consider a printer server. The printer server receives requests to print long documents. We don't want the printer server to refuse new requests while the printer is still printing. Therefore we run it in a separate thread. After receiving the request, the printer server queues it in a printer queue and immediately returns to listening for new requests. The printer server is a producer. An application monitors the printer queue and prints the requested documents, one by one. The application is a consumer.

Challenges

  • I still have too little practical experience in multithreaded programming. This article is written as part of my investigation into what is possible. If you spot mistakes or inaccuracies, please comment!
  • How do we push and pop work items on/from a queue in a multi-threaded environment?
  • If the producer is faster than the consumer, how do we prevent the queue from exploding in size, eventually filling up all available memory?
  • How can a consumer wait for data to appear on the queue?
  • How can we wait for a queue to become empty? How do we wait for a consumer to finish processing everything in its queue? What's the difference?

Preliminaries

Synchronization

In a multi-threaded environment it is imperative that we use synchronization mechanisms to avoid situations like the producer trying to add something on the queue while simultaneously the consumer tries to get something from the queue. Without synchronization mechanisms this can lead to crashes or data corruption.

Technology choices

To implement multi-threaded things in C++ in a portable way I have chosen the following technologies:
  • Boost: a C++ library that turns "C++" into "C++, batteries included"
  • CMake: a cross-platform C++ build system

CMakeLists.txt

Here's my CMakeLists.txt file (New BSD License). This file defines how CMake can find the boost libraries, and build the project.
project(ProducerConsumer)
cmake_minimum_required(VERSION 2.8)
set(Boost_MULTITHREADED TRUE)
find_package (Boost COMPONENTS system thread)
include_directories (${Boost_INCLUDE_DIRS})
link_directories (${Boost_LIBRARY_DIRS})
aux_source_directory(. SRC_LIST)
add_executable(${PROJECT_NAME} ${SRC_LIST})
target_link_libraries(${PROJECT_NAME} ${Boost_LIBRARIES} )
To use this CMakeLists.txt file, you'd create a directory(say at: /home/user/myproject/src or c:\development\myproject\src) containing the following files
  • CMakeLists.txt (shown above)
  • main.cpp (i.e. the rest of the code appearing later in this blog entry)
Then, to build the software, you'd create a build directory outside the src directory(say at: /home/user/myproject/build or c:\development\myproject\build) and from inside that directory you'd run on the command line:
If you have gnu make installed:
cmake ../src
make
If you don't have gnu make installed on your system, try cmake --help to see how you can use it to generate a project file for eclipse, kdevelop, visual studio, ninja, codeblocks, ...

Creating a producer and consumer for testing

Here's a simple producer and consumer, both derived from a WorkerThread class (both need to run independently in a separate thread). WorkerThread implements all the things that all producers and consumers have in common, but leaves the actual work to be implemented by derived classes by means of the purely virtual method Work().
In an attempt to reduce code duplication, I've templatized the WorkerThread with a QueueType that itself is templatized with QueueContents (e.g. int or some class PrintingParameters). In the rest of this blog entry, different QueueTypes will be presented.

Attempt 1: ThreadSafeQueue

Code

This is based on the code published here.

Test program

Output

About to construct queue
About to construct producer
About to construct consumer
About to start producer
About to start consumer
Waiting for producer to finish
[PRODUCER] Produced value 0
[CONSUMER] Consumed value 0
[PRODUCER] Produced value 1
[PRODUCER] Produced value 2
[PRODUCER] Produced value 3
[PRODUCER] Produced value 4
[PRODUCER] Produced value 5
[PRODUCER] Produced value 6
[PRODUCER] Produced value 7
[PRODUCER] Produced value 8
[PRODUCER] Produced value 9
Waiting for consumer to finish
[CONSUMER] Consumption of value 0 completely handled.
[CONSUMER] Consumed value 1
[CONSUMER] Consumption of value 1 completely handled.
[CONSUMER] Consumed value 2
[CONSUMER] Consumption of value 2 completely handled.
[CONSUMER] Consumed value 3
[CONSUMER] Consumption of value 3 completely handled.
[CONSUMER] Consumed value 4
[CONSUMER] Consumption of value 4 completely handled.
[CONSUMER] Consumed value 5
[CONSUMER] Consumption of value 5 completely handled.
[CONSUMER] Consumed value 6
[CONSUMER] Consumption of value 6 completely handled.
[CONSUMER] Consumed value 7
[CONSUMER] Consumption of value 7 completely handled.
[CONSUMER] Consumed value 8
[CONSUMER] Consumption of value 8 completely handled.
[CONSUMER] Consumed value 9
[CONSUMER] Consumption of value 9 completely handled.
Queue should be empty after all threads finished: 1

Discussion

  • Each method accessing the queue first waits until it can acquire a mutex (obviously, each method has to try to acquire the same mutex). This is needed to ensure that other threads cannot modify the queue this method is operating on. This could lead to data corruption or crashes.
  • At some point (e.g. in method TryPop) we check the condition "m_queue.size()==0". If we'd replace this condition with a call to "bool Empty()", the code would deadlock. The reason is that "Empty()" tries to acquire the same mutex that "TryPop(...)" has already acquired.
  • To wait for data to appear on the queue, we wait for a condition variable. This is by far the most efficient way to wait for something to happen. In some code bases one finds constructs like "while (!DataAvailable) sleep(100);". Such approach consumes more resources and if one is not careful it is not thread-safe.
  • When the producer is faster than the consumer (which I simulated in the test program by giving the consumer a 1 ms delay, and giving the producer no delay), it will keep on pushing values on the queue, and the queue size will keep increasing. This is clearly visible in the output of the program. At some point the available memory on the system would be exhausted causing a hang up or crash. This is a serious problem that needs an improved design: a BoundedThreadSafeQueue. A BoundedThreadSafeQueue is parameterized with a maximum size. Pushing data on the queue will block until the queue has room for more entries.

Attempt 2: BoundedThreadSafeQueue

Code

Test program

Output

About to construct queue
About to construct producer
About to construct consumer
About to start producer
About to start consumer
Waiting for producer to finish
Produced value 0
Produced value 1
Produced value 2
Consumed value 0
Produced value 3
Consumed value 1
Produced value 4
Consumed value 2
Produced value 5
Consumed value 3
Produced value 6
Consumed value 4
Produced value 7
Consumed value 5
Produced value 8
Consumed value 6
Produced value 9
Waiting for consumer to finish
Consumed value 7
Consumed value 8
Consumed value 9
Queue should be empty after all threads finished: 1

Discussion

  • The test program instantiated a bounded queue with size 3, with a consumer that is slower than the producer. The output of the test program shows that the consumer first needs to consume items before pushing can proceed when the queue is full.
  • But we have a different problem now. In real life, consumption of an item will take some time (printing a large document can take several minutes). The queue can now signal that it is empty, but an empty queue does not mean that all processing is finished. (The items are removed from the queue before their processing starts.) So how can we extend our design to be able to wait until the queue and associated processing is completely finished?

Attempt 3: BoundedThreadSafeQueueSignalWorkloadDone

The basic idea is for the consumer of the queue data to inform the queue that processing the data is finished. Ideally we'd have a kind of condition variable that we can set/reset manually to signal that the queue + associated processing is completely finished (idle) or not. Someone waiting for the queue and associated processing to finish completely can then wait for the condition variable to become true. Boost doesn't offer this out of the box, but based on the code published here, it is possible to create such a manually resetable condition variable ourselves.

The consumer class needs a small update: we have to call ProcessingFinished() on the queue after finishing processing the information.

Output

[MAIN] About to construct queue
[MAIN] About to construct producer
[MAIN] About to construct consumer
[MAIN] About to start producer
[MAIN] About to start consumer
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[PRODUCER] Produced value 0
[CONSUMER] Consumed value 0
[PRODUCER] Produced value 1
[PRODUCER] Produced value 2
[PRODUCER] Produced value 3
[PRODUCER] Produced value 4
[QUEUE] Wait until idle
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 0 completely handled.
[CONSUMER] Consumed value 1
[PRODUCER] Produced value 5
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 1 completely handled.
[CONSUMER] Consumed value 2
[PRODUCER] Produced value 6
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 2 completely handled.
[CONSUMER] Consumed value 3
[PRODUCER] Produced value 7
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 3 completely handled.
[CONSUMER] Consumed value 4
[PRODUCER] Produced value 8
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 4 completely handled.
[CONSUMER] Consumed value 5
[PRODUCER] Produced value 9
[QUEUE] ProcessingFinished but queue not empty. ProcessingDone flag remains low.
[CONSUMER] Consumption of value 5 completely handled.
[CONSUMER] Consumed value 6
[QUEUE] ProcessingFinished but queue not empty
[CONSUMER] Consumption of value 6 completely handled.
[CONSUMER] Consumed value 7
[QUEUE] ProcessingFinished but queue not empty
[CONSUMER] Consumption of value 7 completely handled.
[CONSUMER] Consumed value 8
[QUEUE] ProcessingFinished but queue not empty
[CONSUMER] Consumption of value 8 completely handled.
[CONSUMER] Consumed value 9
[CONSUMER] Consumption of value 9 completely handled.
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[QUEUE] Wait until idle
[QUEUE] Consumer is completely idle
[MAIN] Queue should be empty after all threads finished: 1
[MAIN] Waiting for producer to finish
[MAIN] Waiting for consumer to finish

Discussion

  • Writing a meaningful test program is less obvious here.
  • From the output: when the program starts, it starts producer and consumer in a separate thread, and then starts waiting in the main thread until the queue + associated processing is completely finished.
    • Just after constructing the producer and consumer, the channel is still completely idle: no work has been pushed onto the queue yet.
    • Then the producer starts producing values, and the next wait until idle blocks until the processing is completely finished. After that, WaitUntilCompletelyIdle immediately returns true (until another producer inserts work into the queue).