Introduction
Producer Consumer Problem is typical multi-process synchronization problem where both processes are sharing the shared fixed size buffer also referred as the queue.
Assume that one or more Producer is generating series of tasks and one or more Consumers are consuming the generated tasks for processing. Here Buffer or Queue is a shared place for both where Producer will add tasks and Consumer will remove and process the tasks.
Choosing the precise Queuing Collection mechanism is very important.
Typical Problems:
- Thread safe data access for Producer and Consumer
- Producer will keep adding elements to Queue regardless or Queue capacity
- Producer is not aware of the status of items being processed by Queue
- Consumer will run an infinite loop to see if items are available in queue to process
Background
The job of Producer is to generate the data and put into the queue at the same time consumer will attempt to remove data from queue and process further.
Possible Solution:
-
Use Thread Safe Queue Collection (See my article http://www.codeproject.com/Articles/1112510/TPL-Producer-Consumer-Pattern-Thread-Safe-Queue-Co)
-
Producer should discard the data or enter into sleep mode when queue size is full
- Consumer should go to sleep more when buffer is empty and resume its operation when elements are available in queue for processing
- Consumer should be notified to end processing when producer completes producing elements, and no more elements will be added to Queue
- (Optional) Consumer should raise notification to the producer as and when an item is removed from queue so that producer can start adding elements again (In case if Queue is full)
Yes, there are multiple ways to implement above solution using Notification and locking and other .Net supported legacy classes. Can we go for Blocking Collection? Smooth simpler and proven implementation that was introduced in the .Net framework 4.0 onwards.
Use Thread Safe Queue Collection
BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of elements from multiple threads. Optional maximum capacity. Insertion and removal operations that block when the collection is empty or full.
Reference: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.110).aspx
Since this is a Thread safe collection our first problem is solved; that means we don’t need to worry about accessing shared data from Queue which is common to both Producer and Consumer, either one consumer or more than one Consumer.
Blocking and Bounding
Thinking about BlockingCollection instead of ConcurrentQueue? Yes the key difference between these two is BlockingCollection supports bounding and blocking where are these are not supported by ConcurrentQueue as a Built-in implementation.
Producer should discard the data or enter into sleep mode when queue size is full
Bounding can be defined and setting the maximum capacity for elements that collection can hold.
If this value is set to 2786 and assume that you have the queue with 2786 items; that means next request to add an item to the queue will be blocked unless one or more elements from the queue are removed. Once there is free space available in queue item will be added.
Here request to add elements will be blocked meaning Producer thread which is adding data to the consumer will be in wait mode without any custom implementation.
This looks good because now producer waiting but what about time out? It should discard the request after some time. Here we need to use TryAdd method which supports timeout as one of the parameters that mean TryAdd method will wait for some time duration to add data to Queue during this period the consumer can remove the elements from Queue.
Consumer should go to sleep more when buffer is empty and resume its operation when items are available in queue for processing
Blocking Collection also provides method GetConsumingEnumerable enumerating with this method will block the Queue when no elements are available in queue i.e. send to waiting or sleep mode
Consumer should be notified to end processing when producer completes producing elements, and no more elements will be added to Queue
Blocking Collection also provides method CompleteAdding this method is used to indicate Queue that Producer has finished adding all the items to the Queue or cancel the GetConsumingEnumerable enumeration that means no more while loop for checking queue for elements and no “Attempt to read queue when no data is present.”
Some more about important methods in Blocking Collection used in example
Blocking Collection, Namespace: System.Collections.Concurrent, Assembly: System (in System.dll)
- BlockingCollection<T>.TryAdd Method (T) - Tries to add the specified item to the BlockingCollection<T> within the given period.
- BlockingCollection<T>.GetConsumingEnumerable - enumerating GetConsumingEnumerable will block when no elements are available in Queue.
- BlockingCollection<T>.CompleteAdding Method – Cancels the enumeration i.e. Queue is not accepting any more additions.
Using the code
Enough discussion need to see the code?
Attached herewith is the TPL.BlockingCollection.zip with the implementation of Producer Consumer Pattern using Blocking Collection. I have used Visual Studio 2015 with Microsoft.Net Framework 4.5
File - TaskProducer.cs
- Responsible to Generate the tasks. Here Producer will generate tasks regarding Populating the QueuedObject class, associate with Action to perform after dequeue and finally adds to Queue.
namespace TPL.BlockingCollection
{
public class TaskProducer
{
private TaskQueue _taskQueue;
public TaskProducer(TaskQueue taskQueue) { _taskQueue = taskQueue; }
public void ProduceTasks()
{
Random random = new Random();
for (int i = 1; i <= 5; i++)
{
var queue = new QueuedObject
{
QueueID = i,
ProducerThreadID = Thread.CurrentThread.ManagedThreadId,
EnqueueDateTime = DateTime.Now,
RandomString = new string(Enumerable.Repeat("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 5).Select(s => s[random.Next(s.Length)]).ToArray())
};
_taskQueue.EnqueueTask(() => { ReverseString(queue); });
Console.WriteLine
(
"Enqueued: " + queue.QueueID +
"\t" + "Producer ThreadID :" + queue.ProducerThreadID +
"\t" + queue.EnqueueDateTime.ToLongTimeString() +
"\t" + "RandomString :" + queue.RandomString
);
}
}
public static void ReverseString(QueuedObject queue)
{
string reversedString = new string(Enumerable.Range(1, queue.RandomString.Length).Select(i => queue.RandomString[queue.RandomString.Length - i]).ToArray());
Console.WriteLine
(
"Dequeued: " + queue.QueueID +
"\t" + "Consumer ThreadID :" + Thread.CurrentThread.ManagedThreadId +
"\t" + DateTime.Now.ToLongTimeString() +
"\t" + "ReversedString :" + reversedString
);
}
}
}
TaskQueue.cs
- Working as a Queue which uses BlockingCollection and performs Enqueue, Dequeue of tasks added by Task Producer
public class TaskQueue
{
private BlockingCollection<Task> _workTaskQueue;
public delegate void TaskEventsHandler(TaskProcessingArguments e);
public event TaskEventsHandler TaskStatus;
public TaskQueue(IProducerConsumerCollection<Task> workTaskCollection, int QueueSize, int timeout)
{
_workTaskQueue = new BlockingCollection<Task>(workTaskCollection);
}
public void EnqueueTask(Action action, CancellationToken cancelToken = default(CancellationToken))
{
var task = new Task(action, cancelToken);
if (_workTaskQueue.TryAdd(task))
{
TaskStatus?.Invoke
(new TaskProcessingArguments
{
ISTaskAdded = true,
Message = "Task Added to Queue",
PendingTaskCount = _workTaskQueue.Count,
});
}
else
{
TaskStatus?.Invoke
(new TaskProcessingArguments
{
ISTaskAdded = false,
Message = "Timedout while adding Task to Queue",
PendingTaskCount = _workTaskQueue.Count,
});
}
}
public void DequeueTask()
{
foreach (var task in _workTaskQueue.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (Exception ex)
{
}
}
public void Close()
{
_workTaskQueue.CompleteAdding();
}
}
Finally, Program.cs which calls both Producer and Consumer i.e. multiple Producer Producing tasks and multiple Consumers consuming tasks. See the usage of taskQueue.Close() method this is called when all Producers threads complete adding items to Queue so that consumer will know when to stop or cancel the Enumeration.
class Program
{
static void Main(string[] args)
{
TaskQueue taskQueue = new TaskQueue(new ConcurrentQueue<Task>(), 1000, 10);
taskQueue.TaskStatus += WorkQueue_TaskStatus;
TaskProducer producerOne = new TaskProducer(taskQueue);
TaskProducer producerTwo = new TaskProducer(taskQueue);
Task producerTaskOne = Task.Run(() => producerOne.ProduceTasks());
Task producerTaskTwo = Task.Run(() => producerTwo.ProduceTasks());
Task consumerTaskOne = Task.Run(() => taskQueue.DequeueTask());
Task consumerTaskTwo = Task.Run(() => taskQueue.DequeueTask());
Task.WaitAll(producerTaskOne, producerTaskTwo);
taskQueue.Close();
Task.WaitAll(consumerTaskOne, consumerTaskTwo);
Console.WriteLine("Tasks Processed");
Console.ReadLine();
}
private static void WorkQueue_TaskStatus(TaskProcessingArguments e)
{
}
}
I have attached here with the initial version of the complete working source code and working for further optimization as there is always scope for improvement.
For playing with code change the for loop start and end values for bigger iteration example in TaskProducer.cs. Also, Bounding and Blocking can be controlled with the help of related parameters passed in TaskQueue.cs constructor in Program.cs
In my next article on Task Parallel Library - Data Flows I will give some more interesting complex examples on Producer Consumer Problems.