Introductory Concepts
In programming, a queue is a data structure in the form of a first-in, first-out list. Queues are used to hold such things as the currently executing process in the system, a list of pending database transactions, or data packets traveling over a TCP/IP network connection over the Internet. Often producer/consumer situations call for blocking and bounded queues. These are queues that block consumers on dequeue when the queue is empty and that block producers on enqueue when the queue is full. To give a crude example, when a slow-running hardware device has to perform a long, drawn out I/O routine, other operations are suspended and placed on a sort of braided strand waiting list. To prevent an I/O conflict, there must be a coordination of tasks. One group of tasks (the producers) creates the data items consumed by another group of tasks (the consumers).
BlockingCollection<T>
The flow of work from producers to consumers is mediated by a collection, typically a queue: producers place work items in the collection so that consumers can remove and process them. At any given moment, the content of the collection represents the outstanding work items. That is, those which have been produced but have yet to be consumed. A synchronization primitive is used so that producers can signal consumers when work items are available to be processed. The collection and primitive combination enables decoupling the production from the consumption of items. The idea is to vary the ratio of producer and consumer tasks based on the relative time taken to produce or consume an item. If the ratio is high, the situation can be extended to have multiple stages, which forms a pipeline. We can use the number of outstanding items to regulate production if consumption gets backed up. We can use the collection to smooth out the effects of peaks and troughs on one side of the equation or the other. The .NET Parallel Extensions provides a collection called BlockingCollection<t>
which supports both blocking and bounded queues:
public class BlockingCollection<T> : IEnumerable<T>, ICollection,
IEnumerable, IDisposable {
public BlockingCollection(IProducerConsumerCollection<T> collection,
Int32 boundedCapacity);
public void Add(T item);
public Boolean TryAdd(T item, Int32 msTimeout, CancellationToken cancellationToken);
public void CompleteAdding();
public T Take();
public Boolean TryTake(out T item, Int32 msTimeout, CancellationToken cancellationToken);
public Int32 BoundedCapacity { get; }
public Int32 Count { get; }
public Boolean IsAddingCompleted { get; }
public Boolean IsCompleted { get; }
public IEnumerable<t> GetConsumingEnumerable(CancellationToken cancellationToken);
public void CopyTo(T[] array, int index);
public T[] ToArray();
public void Dispose();
}
Before we establish a parallel pattern, let's examine this code:
using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
public static class Program {
public static void Main() {
var bl = new BlockingCollection<Int32>(new ConcurrentQueue<Int32>());
ThreadPool.QueueUserWorkItem(ConsumeItems, bl);
for (Int32 item = 0; item < 5; item++) {
Console.WriteLine("Producing: " + item);
bl.Add(item);
}
bl.CompleteAdding();
Console.ReadLine();
}
private static void ConsumeItems(Object o) {
var bl = (BlockingCollection<Int32>)o;
foreach (var item in bl.GetConsumingEnumerable()) {
Console.WriteLine("Consuming: " + item);
}
Console.WriteLine("All items have been consumed");
Console.WriteLine("Press <enter> to finish...");
}
}
Compiling this code yields the following:
Producing: 0
Producing: 1
Producing: 2
Producing: 3
Producing: 4
Consuming: 0
Consuming: 1
Consuming: 2
Consuming: 3
Consuming: 4
All items have been consumed
Press <enter> to finish...
The code below starts by creating two small classes, a BankAccount
class and a Deposit
class. In the main body of the program, we instantiate a BlockingCollection
. Next, we create and start the producers, which will generate deposits and place them into the collection. After the deposits transferred into the account are added, we also create a many to one continuation that will signal the end of production to the consumer. Now that production has ceased, we create the bank account to them creating the consumer (which will update the balance based on the deposits).
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class BankAccount {
public Int32 Balance { get; set; }
}
public class Deposit {
public Int32 Amount { get; set; }
}
public class Program {
public static void Main() {
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();
Task[] producers = new Task[3];
for (Int32 i = 0; i < 3; i++) {
producers[i] = Task.Factory.StartNew(() => {
for (Int32 j = 0; j < 20; j++) {
Deposit deposit = new Deposit { Amount = 100 };
blockingCollection.Add(deposit);
}
});
};
Task.Factory.ContinueWhenAll(producers, antecedents => {
Console.WriteLine("Signaling production end");
blockingCollection.CompleteAdding();
});
BankAccount account = new BankAccount();
Task consumer = Task.Factory.StartNew(() => {
while (!blockingCollection.IsCompleted) {
Deposit deposit;
if (blockingCollection.TryTake(out deposit)) {
account.Balance += deposit.Amount;
}
}
Console.WriteLine("Final Balance: {0}", account.Balance);
});
consumer.Wait();
Console.WriteLine("Press <Enter> to finish");
Console.ReadLine();
}
}
Compiling this code yields the following:
Signaling production end
Final Balance: 6000
Press <Enter> to finish
Now let's examine some non-blocking collections.
ConcurrentQueue<T>: A Non-Blocking Collection
The ConcurrentQueue
class implements a first in, first out (FIFO) queue, which means that when you take items from the queue, you get them in the same order in which they were added. To place an item into a ConcurrentQueue
, you call the Enqueue()
method. To take the first item in the queue, you call TryDequeue()
, and to get the first item in the queue without taking it, you call TryPeek()
. TryDequeue()
and TryPeek()
take a parameter of the collection type, modified by the out
keyword, and return a bool
result. If the result is true
, the parameter will contain the data item. If it is false
, no data item could be obtained:
using System;;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {
public static void Main() {
ConcurrentQueue<Int32> sharedQueue = new ConcurrentQueue<Int32>();
for (Int32 i = 0; i < 1000; i++) {
sharedQueue.Enqueue(i);
}
Int32 itemCount = 0;
Task[] tasks = new Task[10];
for (Int32 i = 0; i < tasks.Length; i++) {
tasks[i] = new Task(() => {
while (sharedQueue.Count > 0) {
Int32 queueElement;
bool gotElement = sharedQueue.TryDequeue(out queueElement);
if (gotElement) {
Interlocked.Increment(ref itemCount);
}
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Items processed: {0}", itemCount);
Console.WriteLine("Press <Enter> to finish....");
Console.ReadLine();
}
}
Produces the following:
Items processed: 1000
Press <Enter> to finish....
ConcurrentStack<T>: Also Non-Blocking
The System.Collections.Concurrent.ConcurrentStack
class implements a last in, first out (LIFO) queue, taking an item from the queue returns the most recently added item. Items are added to the stack using the Push()
and PushRange()
methods, and inspected and retrieved using the TryPeek()
, TryPop()
, and TryPopRange()
methods. The design of this class is nearly equivalent to the queue data type. You use Push to take elements off the head of the stack. As stated, there is a TryPeek
that returns the current head element without modifying it. The stack also provides a Clear
method that clears its contents. This code sample is similar to the above sample:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {
public static void Main() {
ConcurrentStack<int> sharedStack = new ConcurrentStack<int>();
for (int i = 0; i < 1000; i++) {
sharedStack.Push(i);
}
int itemCount = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < tasks.Length; i++) {
tasks[i] = new Task(() => {
while (sharedStack.Count > 0) {
int queueElement;
bool gotElement = sharedStack.TryPop(out queueElement);
if (gotElement) {
Interlocked.Increment(ref itemCount);
}
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Items processed: {0}", itemCount);
Console.WriteLine("Press <enter> to finish...");
Console.ReadLine();
}
}
Compiling this produces this result:
Items processed: 1000
Press <Enter> to finish...
ConcurrentDictionary: Non-Blocking
The ConcurrentDictionary
class implements a collection of key-value pairs and is the concurrent version of an implementation of System.Collections.IDictionary
. You can define the desired concurrency level (the maximum number of tasks or threads that are going to update the dictionary), its preferred initial capacity, and an IEqualityComparer<tkey>
implementation for comparing keys. You can get or set the value associated with a specified key using the Item
property. This property works as the indexer for TKey
and returns a TValue
:
using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
public class MyParallel<TKey,TValue> {
private ConcurrentDictionary<TKey, Lazy<TValue>> dictionary;
private Func<TKey, TValue> valueFactory;
public MyParallel(Func<TKey, TValue> factory) {
valueFactory = factory;
dictionary = new ConcurrentDictionary<TKey,Lazy<TValue>>();
}
public TValue GetValue(TKey key) {
return dictionary.GetOrAdd(key,
new Lazy<TValue>(() => valueFactory(key))).Value;
}
}
public static class Program {
public static void Main() {
MyParallel<int, double> cache
= new MyParallel<int, double>(key => {
Console.WriteLine("Created value for key {0}", key);
return Math.Pow(key, 2);
});
for (int i = 0; i < 10; i++) {
Task.Factory.StartNew(() => {
for (int j = 0; j < 20; j++) {
Console.WriteLine(
"Task {0} got value {1} for key {2}",
Task.CurrentId, cache.GetValue(j), j);
}
});
}
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
}
Compiling this code yields the following:
Press enter to finish
Created value for key 0
Task 1 got value 0 for key 0
Created value for key 1
Task 1 got value 1 for key 1
Created value for key 2
Task 1 got value 4 for key 2
Created value for key 3
Task 1 got value 9 for key 3
Created value for key 4
Task 1 got value 16 for key 4
Created value for key 5
Task 1 got value 25 for key 5
Created value for key 6
Task 1 got value 36 for key 6
Created value for key 7
Task 1 got value 49 for key 7
Created value for key 8
Task 1 got value 64 for key 8
Created value for key 9
Task 1 got value 81 for key 9
Created value for key 10
Task 1 got value 100 for key 10
Created value for key 11
Task 1 got value 121 for key 11
Task 2 got value 0 for key 0
Task 2 got value 1 for key 1
Task 2 got value 4 for key 2
Task 2 got value 9 for key 3
Task 2 got value 16 for key 4
Task 2 got value 25 for key 5
Task 2 got value 36 for key 6
Task 2 got value 49 for key 7
Task 2 got value 64 for key 8
Task 2 got value 81 for key 9
Task 2 got value 100 for key 10
Task 2 got value 121 for key 11
Created value for key 12
Task 1 got value 144 for key 12
Created value for key 13
Task 1 got value 169 for key 13
Created value for key 14
Task 1 got value 196 for key 14
Created value for key 15
Task 1 got value 225 for key 15
Created value for key 16
Task 1 got value 256 for key 16
Task 2 got value 144 for key 12
Created value for key 17
Task 2 got value 169 for key 13
. . . . . .
etc ....
The value that maps to the key is the number, or ID, of the key, raised to the power of two.
ConcurrentBag
According to Gaston C. Hillar (author of Professional Parallel Programming with C#), a ConcurrentBag
is a very efficient collection for certain scenarios where the same thread is adding elements (producing) and removing elements (consuming). It uses many different mechanisms to minimize the need for synchronization and its associated overhead. However, sometimes it requires locking, and it is fairly inefficient in a scenario where producer threads are completely separate from consumer threads. ConcurrentBag
maintains a local queue for each thread that accesses the bag, and whenever possible, it accesses the local queue in a lock-free manner with little or no contention. ConcurrentBag
represents a bag, which is an unordered collection of objects that supports duplicates. Thus, a ConcurrentBag
is useful for storing and accessing objects when ordering doesn't matter. Here is referenced code from his work (shown to try and shed light on the ConcurrentBag
class):
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading.Tasks;
using System.Collections.Concurrent;
public class Program
{
private static string RemoveLetters(char[] letters, string sentence)
{
var sb = new StringBuilder();
bool match = false;
for (int i = 0; i < sentence.Length; i++)
{
for (int j = 0; j < letters.Length; j++)
{
if (sentence[i] == letters[j])
{
match = true;
break;
}
}
if (!match)
{
sb.Append(sentence[i]);
}
match = false;
}
return sb.ToString();
}
private static string CapitalizeWords(char[] delimiters,
string sentence, char newDelimiter)
{
string[] words = sentence.Split(delimiters);
var sb = new StringBuilder();
for (int i = 0; i < words.Length; i++)
{
if (words[i].Length > 1)
{
sb.Append(words[i][0].ToString().ToUpper());
sb.Append(words[i].Substring(1).ToLower());
}
else
{
sb.Append(words[i].ToLower());
}
sb.Append(newDelimiter);
}
return sb.ToString();
}
private const int NUM_SENTENCES = 2000000;
private static ConcurrentBag<string> _sentencesBag;
private static ConcurrentBag<string> _capWordsInSentencesBag;
private static ConcurrentBag<string> _finalSentencesBag;
private static volatile bool _producingSentences = false;
private static volatile bool _capitalizingWords = false;
private static void ProduceSentences()
{
string[] possibleSentences =
{
"ConcurrentBag is included in the System.Collections.Concurrent namespace.",
"Is parallelism important for cloud-computing?",
"Parallelism is very important for cloud-computing!",
"ConcurrentQueue is one of the new concurrent " +
"collections added in .NET Framework 4",
"ConcurrentStack is a concurrent collection that represents a LIFO collection",
"ConcurrentQueue is a concurrent collection that represents a FIFO collection"
};
try
{
var rnd = new Random();
for (int i = 0; i < NUM_SENTENCES; i++)
{
var sb = new StringBuilder();
for (int j = 0; j < possibleSentences.Length; j++)
{
if (rnd.Next(2) > 0)
{
sb.Append(possibleSentences[rnd.Next(possibleSentences.Length)]);
sb.Append(' ');
}
}
if (rnd.Next(20) > 15)
{
_sentencesBag.Add(sb.ToString());
}
else
{
_sentencesBag.Add(sb.ToString().ToUpper());
}
}
}
finally
{
_producingSentences = false;
}
}
private static void CapitalizeWordsInSentences()
{
char[] delimiterChars = { ' ', ',', '.', ':', ';', '(', ')',
'[', ']', '{', '}', '/', '?', '@', '\t', '"' };
System.Threading.SpinWait.SpinUntil(() => _producingSentences);
try
{
_capitalizingWords = true;
while ((!_sentencesBag.IsEmpty) || (_producingSentences))
{
string sentence;
if (_sentencesBag.TryTake(out sentence))
{
_capWordsInSentencesBag.Add(
CapitalizeWords(delimiterChars, sentence, '\\'));
}
}
}
finally
{
_capitalizingWords = false;
}
}
private static void RemoveLettersInSentences()
{
char[] letterChars = { 'A', 'B', 'C', 'e', 'i', 'j', 'm', 'X', 'y', 'Z' };
System.Threading.SpinWait.SpinUntil(() => _capitalizingWords);
while ((!_capWordsInSentencesBag.IsEmpty) || (_capitalizingWords))
{
string sentence;
if (_capWordsInSentencesBag.TryTake(out sentence))
{
_finalSentencesBag.Add(RemoveLetters(letterChars, sentence));
}
}
}
static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
_sentencesBag = new ConcurrentBag<string>();
_capWordsInSentencesBag = new ConcurrentBag<string>();
_finalSentencesBag = new ConcurrentBag<string>();
_producingSentences = true;
Parallel.Invoke(
() => ProduceSentences(),
() => CapitalizeWordsInSentences(),
() => RemoveLettersInSentences()
);
Console.WriteLine(
"Number of sentences with capitalized words in the bag: {0}",
_capWordsInSentencesBag.Count);
Console.WriteLine(
"Number of sentences with removed letters in the bag: {0}",
_finalSentencesBag.Count);
Console.WriteLine(sw.Elapsed.ToString());
Console.WriteLine("Finished!");
Console.ReadLine();
}
}
This code compiles quickly, but running the program takes around 30 seconds to yield the following:
Number of sentences with capitalized words in the bag: 0
Number of sentences with removed letters in the bag: 2000000
00:01:04.2583357
Finished!
The methods executed by the Parallel class' Invoke
method show the producer/consumer relationship:
References
- CLR via C#, 3rd Edition Jeffrey Richter
- Parallel Programming with C#, Gaston C. Hillar, Wrox Publishing