Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

TPL: Solving Producer and Consumer Problems using BlockingCollection

4.71/5 (5 votes)
10 Aug 2016CPOL5 min read 31.1K   706  
Task Parallel Library - Getting Started with Blocking Collection

Introduction

Producer Consumer Problem is typical multi-process synchronization problem where both processes are sharing the shared fixed size buffer also referred as the queue.

Assume that one or more Producer is generating series of tasks and one or more Consumers are consuming the generated tasks for processing. Here Buffer or Queue is a shared place for both where Producer will add tasks and Consumer will remove and process the tasks.

Choosing the precise Queuing Collection mechanism is very important. 

Typical Problems:

  • Thread safe data access for Producer and Consumer
  • Producer will keep adding elements to Queue regardless or Queue capacity
  • Producer is not aware of the status of items being processed by Queue
  • Consumer will run an infinite loop to see if items are available in queue to process

Background

The job of Producer is to generate the data and put into the queue at the same time consumer will attempt to remove data from queue and process further.

Possible Solution:

  • Use Thread Safe Queue Collection (See my article http://www.codeproject.com/Articles/1112510/TPL-Producer-Consumer-Pattern-Thread-Safe-Queue-Co)

  • Producer should discard the data or enter into sleep mode when queue size is full

  • Consumer should go to sleep more when buffer is empty and resume its operation when elements are available in queue for processing
  • Consumer should be notified to end processing when producer completes producing elements, and no more elements will be added to Queue
  • (Optional) Consumer should raise notification to the producer as and when an item is removed from queue so that producer can start adding elements again (In case if Queue is full)

Yes, there are multiple ways to implement above solution using Notification and locking and other .Net supported legacy classes. Can we go for Blocking Collection? Smooth simpler and proven implementation that was introduced in the .Net framework 4.0 onwards.

Use Thread Safe Queue Collection

BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of elements from multiple threads. Optional maximum capacity. Insertion and removal operations that block when the collection is empty or full.

Reference: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.110).aspx

Since this is a Thread safe collection our first problem is solved; that means we don’t need to worry about accessing shared data from Queue which is common to both Producer and Consumer, either one consumer or more than one Consumer.

Blocking and Bounding

Thinking about BlockingCollection instead of ConcurrentQueue? Yes the key difference between these two is BlockingCollection supports bounding and blocking where are these are not supported by ConcurrentQueue as a Built-in implementation.

Producer should discard the data or enter into sleep mode when queue size is full

Bounding can be defined and setting the maximum capacity for elements that collection can hold.

If this value is set to 2786 and assume that you have the queue with 2786 items; that means next request to add an item to the queue will be blocked unless one or more elements from the queue are removed. Once there is free space available in queue item will be added.

Here request to add elements will be blocked meaning Producer thread which is adding data to the consumer will be in wait mode without any custom implementation.

This looks good because now producer waiting but what about time out? It should discard the request after some time. Here we need to use TryAdd method which supports timeout as one of the parameters that mean TryAdd method will wait for some time duration to add data to Queue during this period the consumer can remove the elements from Queue.

Consumer should go to sleep more when buffer is empty and resume its operation when items are available in queue for processing

Blocking Collection also provides method GetConsumingEnumerable enumerating with this method will block the Queue when no elements are available in queue i.e. send to waiting or sleep mode

Consumer should be notified to end processing when producer completes producing elements, and no more elements will be added to Queue

Blocking Collection also provides method CompleteAdding this method is used to indicate Queue that Producer has finished adding all the items to the Queue or cancel the GetConsumingEnumerable enumeration that means no more while loop for checking queue for elements and no “Attempt to read queue when no data is present.”

Some more about important methods in Blocking Collection used in example

Blocking Collection, Namespace:   System.Collections.Concurrent, Assembly:  System (in System.dll)

  • BlockingCollection<T>.TryAdd Method (T) - Tries to add the specified item to the BlockingCollection<T> within the given period.
  •  BlockingCollection<T>.GetConsumingEnumerable - enumerating GetConsumingEnumerable will block when no elements are available in Queue.
  • BlockingCollection<T>.CompleteAdding Method – Cancels the enumeration i.e. Queue is not accepting any more additions.

Using the code

Enough discussion need to see the code?

Attached herewith is the TPL.BlockingCollection.zip with the implementation of Producer Consumer Pattern using Blocking Collection. I have used Visual Studio 2015 with Microsoft.Net Framework 4.5

File - TaskProducer.cs

  • Responsible to Generate the tasks. Here Producer will generate tasks regarding Populating the QueuedObject class, associate with Action to perform after dequeue and finally adds to Queue.
C#
namespace TPL.BlockingCollection
{
    /// <summary>
    /// Produces and Enqueues the Tasks
    /// </summary>
    public class TaskProducer
    {
        private TaskQueue _taskQueue;

        public TaskProducer(TaskQueue taskQueue) { _taskQueue = taskQueue; }

        /// <summary>
        /// Produces the tasks.
        /// </summary>
        public void ProduceTasks()
        {
            Random random = new Random();
            for (int i = 1; i <= 5; i++)
            {
                // Prepare Queue Object (Hold the Test Data)
                var queue = new QueuedObject
                {
                    QueueID = i,
                    ProducerThreadID = Thread.CurrentThread.ManagedThreadId,
                    EnqueueDateTime = DateTime.Now,
                    // Used to Generate Random String
                    RandomString = new string(Enumerable.Repeat("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 5).Select(s => s[random.Next(s.Length)]).ToArray())
                };

                // Add Task to Queue with Action
                _taskQueue.EnqueueTask(() => { ReverseString(queue); });
                Console.WriteLine
                    (
                    "Enqueued: " + queue.QueueID +
                    "\t" + "Producer ThreadID :" + queue.ProducerThreadID +
                    "\t" + queue.EnqueueDateTime.ToLongTimeString() +
                    "\t" + "RandomString   :" + queue.RandomString
                    );
            }
        }

        /// <summary>
        /// Reverses the string. (This is an Associated Action with Task)
        /// </summary>
        /// <param name="queue">The queue.</param>
        public static void ReverseString(QueuedObject queue)
        {
            string reversedString = new string(Enumerable.Range(1, queue.RandomString.Length).Select(i => queue.RandomString[queue.RandomString.Length - i]).ToArray());
            Console.WriteLine
                (
                "Dequeued: " + queue.QueueID +
                "\t" + "Consumer ThreadID :" + Thread.CurrentThread.ManagedThreadId +
                "\t" + DateTime.Now.ToLongTimeString() +
                "\t" + "ReversedString :" + reversedString
                );

        }
    }
}

 

TaskQueue.cs

  • Working as a Queue which uses BlockingCollection and performs Enqueue, Dequeue of tasks added by Task Producer
C#
public class TaskQueue
   {
       private BlockingCollection<Task> _workTaskQueue;

       public delegate void TaskEventsHandler(TaskProcessingArguments e);

       public event TaskEventsHandler TaskStatus;

       public TaskQueue(IProducerConsumerCollection<Task> workTaskCollection, int QueueSize, int timeout)
       {
           _workTaskQueue = new BlockingCollection<Task>(workTaskCollection);
       }

       /// <summary>
       /// Enqueues the task.
       /// </summary>
       /// <param name="action">The action.</param>
       /// <param name="cancelToken">The cancel token.</param>
       public void EnqueueTask(Action action, CancellationToken cancelToken = default(CancellationToken))
       {
           var task = new Task(action, cancelToken);
           if (_workTaskQueue.TryAdd(task))
           {
               TaskStatus?.Invoke
                   (new TaskProcessingArguments
                   {
                       ISTaskAdded = true,
                       Message = "Task Added to Queue",
                       PendingTaskCount = _workTaskQueue.Count,
                   });
           }
           else
           {
               TaskStatus?.Invoke
                   (new TaskProcessingArguments
                   {
                       ISTaskAdded = false,
                       Message = "Timedout while adding Task to Queue",
                       PendingTaskCount = _workTaskQueue.Count,
                   });
           }
       }

       /// <summary>
       /// Dequeues the task.
       /// </summary>
       public void DequeueTask()
       {
           foreach (var task in _workTaskQueue.GetConsumingEnumerable())
               try
               {
                   if (!task.IsCanceled) task.RunSynchronously();
                  // if (!task.IsCanceled) task.Start();
               }
               catch (Exception ex)
               {

               }
       }

       /// <summary>
       /// CompleteAdding : Will notify Consumer / Queue - Task Addition from Producer is Completed
       /// </summary>
       public void Close()
       {
           _workTaskQueue.CompleteAdding();
       }
   }

 

Finally, Program.cs which calls both Producer and Consumer i.e. multiple Producer Producing tasks and multiple Consumers consuming tasks. See the usage of taskQueue.Close() method this is called when all Producers threads complete adding items to Queue so that consumer will know when to stop or cancel the Enumeration.

C#
class Program
    {
        static void Main(string[] args)
        {
            // Initialize Task Queue and Specify Capacity and timeout
            TaskQueue taskQueue = new TaskQueue(new ConcurrentQueue<Task>(), 1000, 10);

            // Subscrive to Queue Processing Events
            taskQueue.TaskStatus += WorkQueue_TaskStatus;

            //Setup Producers - To Produce Tasks and Associated Action
            TaskProducer producerOne = new TaskProducer(taskQueue);
            TaskProducer producerTwo = new TaskProducer(taskQueue);

            //Start Producing Tasks (Here we have 2 Producers)
            Task producerTaskOne = Task.Run(() => producerOne.ProduceTasks());
            Task producerTaskTwo = Task.Run(() =>  producerTwo.ProduceTasks());

            //Start Consumers
            Task consumerTaskOne = Task.Run(() => taskQueue.DequeueTask());
            Task consumerTaskTwo = Task.Run(() => taskQueue.DequeueTask());

            //Wait for all Producers to Complete Producing Tasks
            Task.WaitAll(producerTaskOne, producerTaskTwo);

            //Call Queue Close Method (Indicate Producers have stopped producing tasks)
            taskQueue.Close();

            //Wait for Consumer to Process Tasks
            Task.WaitAll(consumerTaskOne, consumerTaskTwo);
            Console.WriteLine("Tasks Processed");
            Console.ReadLine();
        }

        /// <summary>
        /// Trigged when attempt is made to Add Task to Queue (Either Success or Timeout)
        /// See TaskProcessingArguments
        /// </summary>
        /// <param name="e">The e.</param>
        private static void WorkQueue_TaskStatus(TaskProcessingArguments e)
        {           
            //Console.WriteLine(e.ISTaskAdded);
        }
    }

I have attached here with the initial version of the complete working source code and working for further optimization as there is always scope for improvement.

For playing with code change the for loop start and end values for bigger iteration example in TaskProducer.cs. Also, Bounding and Blocking can be controlled with the help of related parameters passed in TaskQueue.cs constructor in Program.cs

In my next article on Task Parallel Library - Data Flows I will give some more interesting complex examples on Producer Consumer Problems.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)