Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Parallel Containers

4.62/5 (10 votes)
13 May 2011CPOL5 min read 32.3K  
An article that describes concurrent collections.

Introductory Concepts

In programming, a queue is a data structure in the form of a first-in, first-out list. Queues are used to hold such things as the currently executing process in the system, a list of pending database transactions, or data packets traveling over a TCP/IP network connection over the Internet. Often producer/consumer situations call for blocking and bounded queues. These are queues that block consumers on dequeue when the queue is empty and that block producers on enqueue when the queue is full. To give a crude example, when a slow-running hardware device has to perform a long, drawn out I/O routine, other operations are suspended and placed on a sort of braided strand waiting list. To prevent an I/O conflict, there must be a coordination of tasks. One group of tasks (the producers) creates the data items consumed by another group of tasks (the consumers).

BlockingCollection<T>

The flow of work from producers to consumers is mediated by a collection, typically a queue: producers place work items in the collection so that consumers can remove and process them. At any given moment, the content of the collection represents the outstanding work items. That is, those which have been produced but have yet to be consumed. A synchronization primitive is used so that producers can signal consumers when work items are available to be processed. The collection and primitive combination enables decoupling the production from the consumption of items. The idea is to vary the ratio of producer and consumer tasks based on the relative time taken to produce or consume an item. If the ratio is high, the situation can be extended to have multiple stages, which forms a pipeline. We can use the number of outstanding items to regulate production if consumption gets backed up. We can use the collection to smooth out the effects of peaks and troughs on one side of the equation or the other. The .NET Parallel Extensions provides a collection called BlockingCollection<t> which supports both blocking and bounded queues:

C#
public class BlockingCollection<T> : IEnumerable<T>, ICollection,
             IEnumerable, IDisposable {
    public BlockingCollection(IProducerConsumerCollection<T> collection,
    Int32 boundedCapacity);
    public void Add(T item);
    public Boolean TryAdd(T item, Int32 msTimeout, CancellationToken cancellationToken);
    public void CompleteAdding();
    public T Take();
    public Boolean TryTake(out T item, Int32 msTimeout, CancellationToken cancellationToken);
    public Int32 BoundedCapacity { get; }
    public Int32 Count { get; }
    public Boolean IsAddingCompleted { get; } // true if AddingComplete is called
    public Boolean IsCompleted { get; } // true if IsAddingComplete and Count==0
    public IEnumerable<t> GetConsumingEnumerable(CancellationToken cancellationToken);
    public void CopyTo(T[] array, int index);
    public T[] ToArray();
    public void Dispose();
}

Before we establish a parallel pattern, let's examine this code:

C#
using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;

public static class Program  {
    public static void Main() {
      
     var bl = new BlockingCollection<Int32>(new ConcurrentQueue<Int32>());

        // A thread pool thread will do the consuming
        ThreadPool.QueueUserWorkItem(ConsumeItems, bl);

        // Add 5 items to the collection
        for (Int32 item = 0; item < 5; item++) {
            Console.WriteLine("Producing: " + item);
            bl.Add(item);
        }

        // Tell the consuming thread(s) that no more
        // items will be added to the collection
        bl.CompleteAdding();

        Console.ReadLine();  
    }

    private static void ConsumeItems(Object o) {
        var bl = (BlockingCollection<Int32>)o;

        // Block until an item shows up, then process it
        foreach (var item in bl.GetConsumingEnumerable()) {
            Console.WriteLine("Consuming: " + item);
        }

        // The collection is empty and no more items are going into it
        Console.WriteLine("All items have been consumed");
        Console.WriteLine("Press <enter> to finish...");   
    }
}

Compiling this code yields the following:

Producing: 0
Producing: 1
Producing: 2
Producing: 3
Producing: 4
Consuming: 0
Consuming: 1
Consuming: 2
Consuming: 3
Consuming: 4
All items have been consumed
Press <enter> to finish...

The code below starts by creating two small classes, a BankAccount class and a Deposit class. In the main body of the program, we instantiate a BlockingCollection. Next, we create and start the producers, which will generate deposits and place them into the collection. After the deposits transferred into the account are added, we also create a many to one continuation that will signal the end of production to the consumer. Now that production has ceased, we create the bank account to them creating the consumer (which will update the balance based on the deposits).

C#
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class BankAccount {
        public Int32 Balance  { get;  set; }
        }

 public  class Deposit {
        public Int32 Amount { get; set; }
    }

 public class Program {

   public  static void Main() {
        BlockingCollection<Deposit> blockingCollection
                = new BlockingCollection<Deposit>();

            Task[] producers = new Task[3];
            for (Int32 i = 0; i < 3; i++) {
                producers[i] = Task.Factory.StartNew(() => {
                    // create a series of deposits
                    for (Int32 j = 0; j < 20; j++) {
                        // create the transfer
                        Deposit deposit = new Deposit { Amount = 100 };
                        // place the transfer in the collection
                        blockingCollection.Add(deposit);
                    }
                });
            };

            Task.Factory.ContinueWhenAll(producers, antecedents => {
                // signal that production has ended
                Console.WriteLine("Signaling production end");
                blockingCollection.CompleteAdding();
            });
           
            BankAccount account = new BankAccount();

            Task consumer = Task.Factory.StartNew(() => {
                while (!blockingCollection.IsCompleted) {
                    Deposit deposit;
                    // try to take the next item 
                    if (blockingCollection.TryTake(out deposit)) {
                        // update the balance with the transfer amount
                        account.Balance += deposit.Amount;
                    }
                }
                // print out the final balance
                Console.WriteLine("Final Balance: {0}", account.Balance);
            });

            // wait for the consumer to finish
            consumer.Wait();

            // wait for input before exiting
            Console.WriteLine("Press <Enter> to finish");
            Console.ReadLine();
        }
    }

Compiling this code yields the following:

Signaling production end
Final Balance: 6000
Press <Enter> to finish

Now let's examine some non-blocking collections.

ConcurrentQueue<T>: A Non-Blocking Collection

The ConcurrentQueue class implements a first in, first out (FIFO) queue, which means that when you take items from the queue, you get them in the same order in which they were added. To place an item into a ConcurrentQueue, you call the Enqueue() method. To take the first item in the queue, you call TryDequeue(), and to get the first item in the queue without taking it, you call TryPeek(). TryDequeue() and TryPeek() take a parameter of the collection type, modified by the out keyword, and return a bool result. If the result is true, the parameter will contain the data item. If it is false, no data item could be obtained:

C#
using System;;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {

   public static void Main() {

        // create a shared collection 
        ConcurrentQueue<Int32> sharedQueue = new ConcurrentQueue<Int32>();

        // populate the collection with items to process
        for (Int32 i = 0; i < 1000; i++) {
            sharedQueue.Enqueue(i);
        }

        // define a counter for the number of processed items
        Int32 itemCount = 0;

        // create tasks to process the list
        Task[] tasks = new Task[10];
        for (Int32 i = 0; i < tasks.Length; i++) {
            // create the new task
            tasks[i] = new Task(() => {

                while (sharedQueue.Count > 0) {
                    // define a variable for the dequeue requests
                    Int32 queueElement;
                    // take an item from the queue
                    bool gotElement = sharedQueue.TryDequeue(out queueElement);
                    // increment the count of items processed
                    if (gotElement) {
                        Interlocked.Increment(ref itemCount);
                    }
                }

            });
            // start the new task
            tasks[i].Start();
        }

        // wait for the tasks to complete
        Task.WaitAll(tasks);

        // report on the number of items processed
        Console.WriteLine("Items processed: {0}", itemCount);

        // wait for input before exiting
        Console.WriteLine("Press <Enter> to finish....");
        Console.ReadLine();
    }
}

Produces the following:

Items processed: 1000
Press <Enter> to finish....

ConcurrentStack<T>: Also Non-Blocking

The System.Collections.Concurrent.ConcurrentStack class implements a last in, first out (LIFO) queue, taking an item from the queue returns the most recently added item. Items are added to the stack using the Push() and PushRange() methods, and inspected and retrieved using the TryPeek(), TryPop(), and TryPopRange() methods. The design of this class is nearly equivalent to the queue data type. You use Push to take elements off the head of the stack. As stated, there is a TryPeek that returns the current head element without modifying it. The stack also provides a Clear method that clears its contents. This code sample is similar to the above sample:

C#
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {

     public static void Main() {

        // create a shared collection 
        ConcurrentStack<int> sharedStack = new ConcurrentStack<int>();

        // populate the collection with items to process
        for (int i = 0; i < 1000; i++) {
            sharedStack.Push(i);
        }

        // define a counter for the number of processed items
        int itemCount = 0;

        // create tasks to process the list
        Task[] tasks = new Task[10];
        for (int i = 0; i < tasks.Length; i++) {
            // create the new task
            tasks[i] = new Task(() => {

                while (sharedStack.Count > 0) {
                    // define a variable for the dequeue requests
                    int queueElement;
                    // take an item from the queue
                    bool gotElement = sharedStack.TryPop(out queueElement);
                    // increment the count of items processed
                    if (gotElement) {
                        Interlocked.Increment(ref itemCount);
                    }
                }

            });
            // start the new task
            tasks[i].Start();
        }

        // wait for the tasks to complete
        Task.WaitAll(tasks);

        // report on the number of items processed
        Console.WriteLine("Items processed: {0}", itemCount);

        // wait for input before exiting
        Console.WriteLine("Press <enter> to finish...");
        Console.ReadLine();
    }
}

Compiling this produces this result:

Items processed: 1000
Press <Enter> to finish...

ConcurrentDictionary: Non-Blocking

The ConcurrentDictionary class implements a collection of key-value pairs and is the concurrent version of an implementation of System.Collections.IDictionary. You can define the desired concurrency level (the maximum number of tasks or threads that are going to update the dictionary), its preferred initial capacity, and an IEqualityComparer<tkey> implementation for comparing keys. You can get or set the value associated with a specified key using the Item property. This property works as the indexer for TKey and returns a TValue:

C#
using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;

public  class MyParallel<TKey,TValue> {
    private ConcurrentDictionary<TKey, Lazy<TValue>> dictionary;
    private Func<TKey, TValue> valueFactory;

    public MyParallel(Func<TKey, TValue> factory) {
        // set the factory instance variable
        valueFactory = factory;
        // initialize the dictionary
        dictionary = new ConcurrentDictionary<TKey,Lazy<TValue>>();
    }

    public TValue GetValue(TKey key) {
        return dictionary.GetOrAdd(key, 
            new Lazy<TValue>(() => valueFactory(key))).Value;
    }
}

public static class Program {
public  static void Main() {

        // create the cache
        MyParallel<int, double> cache
            = new MyParallel<int, double>(key => {
                Console.WriteLine("Created value for key {0}", key);
                return Math.Pow(key, 2);
            });

        for (int i = 0; i < 10; i++) {
            Task.Factory.StartNew(() => {
                for (int j = 0; j < 20; j++) {
                    Console.WriteLine(
                        "Task {0} got value {1} for key {2}",
                        Task.CurrentId, cache.GetValue(j), j);
                }
            });
        }

        // wait for input before exiting
        Console.WriteLine("Press enter to finish");
        Console.ReadLine();
    }
}

Compiling this code yields the following:

Press enter to finish
Created value for key 0
Task 1 got value 0 for key 0
Created value for key 1
Task 1 got value 1 for key 1
Created value for key 2
Task 1 got value 4 for key 2
Created value for key 3
Task 1 got value 9 for key 3
Created value for key 4
Task 1 got value 16 for key 4
Created value for key 5
Task 1 got value 25 for key 5
Created value for key 6
Task 1 got value 36 for key 6
Created value for key 7
Task 1 got value 49 for key 7
Created value for key 8
Task 1 got value 64 for key 8
Created value for key 9
Task 1 got value 81 for key 9
Created value for key 10
Task 1 got value 100 for key 10
Created value for key 11
Task 1 got value 121 for key 11
Task 2 got value 0 for key 0
Task 2 got value 1 for key 1
Task 2 got value 4 for key 2
Task 2 got value 9 for key 3
Task 2 got value 16 for key 4
Task 2 got value 25 for key 5
Task 2 got value 36 for key 6
Task 2 got value 49 for key 7
Task 2 got value 64 for key 8
Task 2 got value 81 for key 9
Task 2 got value 100 for key 10
Task 2 got value 121 for key 11
Created value for key 12
Task 1 got value 144 for key 12
Created value for key 13
Task 1 got value 169 for key 13
Created value for key 14
Task 1 got value 196 for key 14
Created value for key 15
Task 1 got value 225 for key 15
Created value for key 16
Task 1 got value 256 for key 16
Task 2 got value 144 for key 12
Created value for key 17
Task 2 got value 169 for key 13
. . . . . . 
etc ....

The value that maps to the key is the number, or ID, of the key, raised to the power of two.

ConcurrentBag

According to Gaston C. Hillar (author of Professional Parallel Programming with C#), a ConcurrentBag is a very efficient collection for certain scenarios where the same thread is adding elements (producing) and removing elements (consuming). It uses many different mechanisms to minimize the need for synchronization and its associated overhead. However, sometimes it requires locking, and it is fairly inefficient in a scenario where producer threads are completely separate from consumer threads. ConcurrentBag maintains a local queue for each thread that accesses the bag, and whenever possible, it accesses the local queue in a lock-free manner with little or no contention. ConcurrentBag represents a bag, which is an unordered collection of objects that supports duplicates. Thus, a ConcurrentBag is useful for storing and accessing objects when ordering doesn't matter. Here is referenced code from his work (shown to try and shed light on the ConcurrentBag class):

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading.Tasks;
using System.Collections.Concurrent;

public class Program
{
    private static string RemoveLetters(char[] letters, string sentence)
    {
        var sb = new StringBuilder();
        bool match = false;

        for (int i = 0; i < sentence.Length; i++)
        {
            for (int j = 0; j < letters.Length; j++)
            {
                if (sentence[i] == letters[j])
                {
                    match = true;
                    break;
                }
            }
            if (!match)
            {
                sb.Append(sentence[i]);
            }
            match = false;
        }
        return sb.ToString();
    }

    private static string CapitalizeWords(char[] delimiters, 
                   string sentence, char newDelimiter)
    {
        string[] words = sentence.Split(delimiters);
        var sb = new StringBuilder();
        for (int i = 0; i < words.Length; i++)
        {
            if (words[i].Length > 1)
            {
                sb.Append(words[i][0].ToString().ToUpper());
                sb.Append(words[i].Substring(1).ToLower());
            }
            else
            {
                // Word with just 1 letter must be lowercase
                sb.Append(words[i].ToLower());
            }
            sb.Append(newDelimiter);
        }
        return sb.ToString();
    }

    private const int NUM_SENTENCES = 2000000;
    private static ConcurrentBag<string> _sentencesBag;
    private static ConcurrentBag<string> _capWordsInSentencesBag;
    private static ConcurrentBag<string> _finalSentencesBag;
    private static volatile bool _producingSentences = false;
    private static volatile bool _capitalizingWords = false;

    private static void ProduceSentences()
    {

        string[] possibleSentences = 
        { 
           "ConcurrentBag is included in the System.Collections.Concurrent namespace.",
           "Is parallelism important for cloud-computing?",
           "Parallelism is very important for cloud-computing!",
           "ConcurrentQueue is one of the new concurrent " + 
               "collections added in .NET Framework 4",
           "ConcurrentStack is a concurrent collection that represents a LIFO collection",
           "ConcurrentQueue is a concurrent collection that represents a FIFO collection" 
        };

        try
        {
            var rnd = new Random();

            for (int i = 0; i < NUM_SENTENCES; i++)
            {
                var sb = new StringBuilder();
                for (int j = 0; j < possibleSentences.Length; j++)
                {
                    if (rnd.Next(2) > 0)
                    {
                        sb.Append(possibleSentences[rnd.Next(possibleSentences.Length)]);
                        sb.Append(' ');
                    }
                }
                if (rnd.Next(20) > 15)
                {
                    _sentencesBag.Add(sb.ToString());
                }
                else
                {
                    _sentencesBag.Add(sb.ToString().ToUpper());
                }
            }
        }
        finally
        {
            _producingSentences = false;
        }
    }

    private static void CapitalizeWordsInSentences()
    {
        char[] delimiterChars = { ' ', ',', '.', ':', ';', '(', ')', 
               '[', ']', '{', '}', '/', '?', '@', '\t', '"' };

        // Start after Produce sentences began working
        System.Threading.SpinWait.SpinUntil(() => _producingSentences);

        try
        {
            _capitalizingWords = true;
            // This condition running in a loop (spinning) is very inefficient
            // This example uses this spinning for educational purposes
            // It isn't a best practice
            // Subsequent sections and chapters explain an improved version
            while ((!_sentencesBag.IsEmpty) || (_producingSentences))
            {
                string sentence;
                if (_sentencesBag.TryTake(out sentence))
                {
                    _capWordsInSentencesBag.Add(
                       CapitalizeWords(delimiterChars, sentence, '\\'));
                }
            }
        }
        finally
        {
            _capitalizingWords = false;
        }
    }

    private static void RemoveLettersInSentences()
    {
        char[] letterChars = { 'A', 'B', 'C', 'e', 'i', 'j', 'm', 'X', 'y', 'Z' };

        // Start after CapitalizedWordsInsentences began working
        System.Threading.SpinWait.SpinUntil(() => _capitalizingWords);
        // This condition running in a loop (spinning) is very inefficient
        // This example uses this spinning for educational purposes
        // It isn't a best practice
        // Subsequent sections and chapters explain an improved version
        while ((!_capWordsInSentencesBag.IsEmpty) || (_capitalizingWords))
        {
            string sentence;
            if (_capWordsInSentencesBag.TryTake(out sentence))
            {
                _finalSentencesBag.Add(RemoveLetters(letterChars, sentence));
            }
        }
    }

    static void Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        _sentencesBag = new ConcurrentBag<string>();
        _capWordsInSentencesBag = new ConcurrentBag<string>();
        _finalSentencesBag = new ConcurrentBag<string>();

        
        _producingSentences = true;

        Parallel.Invoke(
            () => ProduceSentences(),
            () => CapitalizeWordsInSentences(),
            () => RemoveLettersInSentences()
            );

        Console.WriteLine(
            "Number of sentences with capitalized words in the bag: {0}", 
            _capWordsInSentencesBag.Count);
        Console.WriteLine(
            "Number of sentences with removed letters in the bag: {0}",
            _finalSentencesBag.Count);

        Console.WriteLine(sw.Elapsed.ToString());
        //  Display the results and wait for the user to press a key
        Console.WriteLine("Finished!");
        Console.ReadLine();
    }
}

This code compiles quickly, but running the program takes around 30 seconds to yield the following:

Number of sentences with capitalized words in the bag: 0
Number of sentences with removed letters in the bag: 2000000
00:01:04.2583357
Finished!

The methods executed by the Parallel class' Invoke method show the producer/consumer relationship:

1.jpg

References

  • CLR via C#, 3rd Edition Jeffrey Richter
  • Parallel Programming with C#, Gaston C. Hillar, Wrox Publishing

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)