Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Task Parallel Library: 5 of n

0.00/5 (No votes)
10 May 2011 1  
A look into using the Task Parallel Library.

Demo code source : TasksArticle5.zip

Introduction

This is the 5th part of my proposed series of articles on TPL. Last time I introduced Parallel For and Foreach, and covered this ground:

  • Introduction To PLinq
  • Useful Extension Methods
  • Simple PLinq Example
  • Ordering
  • Using Ranges
  • Handling Exceptions
  • Cancelling A PLinq Query
  • Partitioning For Possibly Better Perfomance
  • Using Custom Aggregation

This time we are going to be looking at how to use some new classes that although not what I would call officially part of TPL, are more than likely going to be used when working with TPL. These new classes are in the System.Collections.Concurrent Namespace here is a listing of the classes in that namespace, which was introduced in .NET 4

Class Name Description
BlockingCollection<T> Provides blocking and bounding capabilities for thread-safe collections that implement IProducerConsumerCollection.
ConcurrentBag<T> Represents a thread-safe, unordered collection of objects.
ConcurrentDictionary<TKey, TValue> Represents a thread-safe collection of key-value pairs that can be accessed by multiple threads concurrently.
ConcurrentQueue<T> Represents a thread-safe first in-first out (FIFO) collection.
ConcurrentStack<T> Represents a thread-safe last in-first out (LIFO) collection.
OrderablePartitioner<TSource> Represents a particular manner of splitting an orderable data source into multiple partitions.
Partitioner Provides common partitioning strategies for arrays, lists, and enumerables.
Partitioner<TSource> Represents a particular manner of splitting a data source into multiple partitions.

As can be seen from this table, we have seen a few of these classes before, such as OrderedPartitioner<T> and Partitioner<T>/Partitioner.

 

 

Article Series Roadmap

This is article 5 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover.

  1. Starting Tasks / Trigger Operations / ExceptionHandling / Cancelling / UI Synchronization
  2. Continuations / Cancelling Chained Tasks
  3. Parallel For / Custom Partioner / Aggregate Operations
  4. Parallel LINQ
  5. Pipelines (this article)
  6. Advanced Scenarios / v.Next for Tasks

Now I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that, however there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:

  • It will only really be the first couple of articles which show simliar ideas to MSDN, after that I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...Aye, nice
  • There will be screen shots of live output here which is something MSDN does not have that much off, which may help some readers to reinforce the article(s) text
  • There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the 1st place thing.
  • I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them

All that said, if people having read this article, truly think this is too similar to MSDN (which I still hope it won't be) let me know that as well, and I will try and adjust the upcoming articles to make amends. 

 

Table Of Contents 

Like I said within the introduction of this article there are a few new classes that we can use in the  System.Collections.Concurrent Namespace, in this article I will be concentrating on the use of the BlockingCollection.

 

BlockingCollection

This is what MSDN has to say about BlockingCollection<T>

The seciont immediately below is taken from http://msdn.microsoft.com/en-us/library/dd997371.aspx up on date 25/03/2011

BlockingCollection<T> is a thread-safe collection class that provides the following features:

  • An implementation of the Producer-Consumer pattern.
  • Concurrent adding and taking of items from multiple threads.
  • Optional maximum capacity.
  • Insertion and removal operations that block when collection is empty or full.
  • Insertion and removal "try" operations that do not block or that block up to a specified period of time.
  • Encapsulates any collection type that implements IProducerConsumerCollection<T>
  • Cancellation with cancellation tokens.
  • Two kinds of enumeration with foreach (For Each in Visual Basic):
    • Read-only enumeration.
    • Enumeration that removes items as they are enumerated.

Using BlockingCollection<T> we can create what I will call "pipelines" which is where we have multiple phases, where each of the phases makes use of the Producer-Consumer pattern. So imagine we have a pipeline of 3 stages, the 1st stage would produce items that the 2nd stage would consume and the 2nd stage would consume the 1st stages output, and would also supply data for the 3rd stage...You get the idea right.

So lets have a look at some example pipelines

 

BlockingCollection Basics

Like I said this article will concentrate on BlockingCollection<T> which is makes use of the Producer-Consumer pattern. So how does this happen, shall we have a look.

Producer

The producer is responsible for creating items, and should be the one that tells consumer that the produces is done. A very simple example of this may be as follows

static void CreateInitialRange(BlockingCollection<int> output)
{
    try
    {
        for (int i = 1; i < 10; i++)
        {
            output.Add(i);
        }
    }
    finally
    {
        output.CompleteAdding();
    }
}

 

What we can see from the above code is that we add items to a BlockingCollection<T>, which is blocking, and when we have finished producing we need to alert the consumer that there are no more items expected, which is done via the BlockingCollection<T>.CompletedAdding method

Consumer

The consumer is obviously the one that is responsible for consuming the results from the producer, so how does it do that exactly. Well it's actually quite simple. All we really need to do is use a BlockingCollection<T>.GetConsumingEnumerable() which the consumer can use to consume the producers produces values. Here is a snippet that follows on from the snippet above:

static void DoubleTheRange(BlockingCollection<int> input, BlockingCollection<int> output)
{
    try
    {
        foreach (var number in input.GetConsumingEnumerable())
        {
            output.Add((int)(number * number));
        }
    }
    finally
    {
        output.CompleteAdding();
    }
}

 

Simple Pipeline

Demo project name : SimplePipeline

So now that I have talked about BlockingCollection<T> a bit, lets have a look how we can chain these producer-consumer collections together to form something more meaningful (which I call a pipeline).

Here is the full code listing for a simple console app that carries out the following pipeline:

  • Creates an initial range of Ints (produce phase)
  • Doubles the initial range of Ints (from producer, and also acts as producer for next phase)
  • Writes the results of the doubling phase producer
class Program
{
    static void Main(string[] args)
    {
        var buffer1 = new BlockingCollection<int>(10);
        var buffer2 = new BlockingCollection<int>(10);

        var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                TaskContinuationOptions.None);

        //Start the phases of the pipeline
        var stage1 = f.StartNew(() => CreateInitialRange(buffer1));
        var stage2 = f.StartNew(() => DoubleTheRange(buffer1, buffer2));
        var stage3 = f.StartNew(() => WriteResults(buffer2));
        //wait for the phases to complete
        Task.WaitAll(stage1, stage2, stage3);

        Console.ReadLine();
    }



    static void CreateInitialRange(BlockingCollection<int> output)
    {
        try
        {
            for (int i = 1; i < 10; i++)
            {
                output.Add(i);
                Console.WriteLine("CreateInitialRange {0}", i);
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }


    static void DoubleTheRange(
		BlockingCollection<int> input, 
		BlockingCollection<int> output)
    {
        try
        {
            foreach (var number in input.GetConsumingEnumerable())
            {
                output.Add((int)(number * number));
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }


    static void WriteResults(BlockingCollection<int> input)
    {
        foreach (var squaredNumber in input.GetConsumingEnumerable())
        {
            Console.WriteLine("Result is {0}", squaredNumber);
        }
    }

}

And here is the result of running this code:

 

More Complex Pipeline

Demo project name : WPFImagePipeline

I have also included a more complicated pipeline that acts pretty much the same as the example above, but uses a WPF application to show the results of the final consumer. I think it may be more easy for people to grasp what is going on with this demo.

I should point out that this demo tries to look for images, which can either be on your local file system, or web based. The type of pipeline that is created depends on the following App.Config settings

If you are NOT connected to the internet, just make sure that the LocalImageFolder points to a valid image path on your PC.

I don't want to go into this code too much, as it is not that relevant to the discussion, but I think a general understanding of how this demo works may be beneficial, so here we go.

 

The View

There is a single window, which contains the following XAML (which I ummed and ahhhed about showing, but in the end I decided to show it, as there is a useful threading control included with the article that requires some XAML to make it work)

<controls:AsyncHost AsyncState="{Binding Path=AsyncState, Mode=OneWay}">

    <Grid x:Name="mainGrid" Background="White" Margin="5"
            controls:AsyncHost.AsyncContentType="Content">


        <ListBox ItemsSource="{Binding ProcessedImages}" 
            ItemTemplate="{StaticResource ImageInfoDataTemplate}" 
            BorderThickness="0"
            BorderBrush="Transparent"
            ItemContainerStyle="{StaticResource ImageInfoListBoxItemStyle}">
            <ListBox.ItemsPanel>
                <ItemsPanelTemplate>
                    <controls:ScatterPanel Background="White">
                    </controls:ScatterPanel>
                </ItemsPanelTemplate>
            </ListBox.ItemsPanel>

        </ListBox>
    </Grid>
    <controls:AsyncBusyUserControl 
        controls:AsyncHost.AsyncContentType="Busy" 
        AsyncWaitText="{Binding Path=WaitText, Mode=OneWay}" 
        Visibility="Hidden" />
    <controls:AsyncFailedUserControl 
        controls:AsyncHost.AsyncContentType="Error" 
        Error="{Binding Path=ErrorMessage, Mode=OneWay}" 
        Visibility="Hidden" />
</controls:AsyncHost>

The general idea is that the main window simply shows a list of images within a ListBox. Where the images could have been fetched via the pipeline from a local hard drive (see App.Config) or via the use of a random Google search.

The threading component helper is called AsyncHost and is bound to a AsyncState property exposed via the ViewModel in use for this Window. Depending on the value of that bound property, the AsyncHost will show one of three things

  1. AsyncState = Content, will show the element marked up with the attached property set to "Content", which is this example is the ListBox
  2. AsyncState = Busy, will show the element marked up with the attached property set to "Busy", which is this example is a custom control called AyncBusyUserControl
  3. AsyncState = Error, will show the element marked up with the attached property set to "Error", which is this example is a custom control called AyncFailedUserControl

All of these controls are within the download, feel free to look into those controls at you leasure, they are very useful and I find myself using them a lot in any WPF UI project when I am doing long running tasks that could also fail.

So that is the relevant XAML

 

The ViewModel

So now that we seen the most relevant parts of the view, lets see the view model which looks like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Common;
using WPFImagePipeline.Services;
using WPFImagePipeline.Model;
using System.Configuration;
using System.IO;
using WPFImagePipeline.Controls;

namespace WPFImagePipeline.ViewModels
{
    public class MainWindowViewModel : INPCBase
    {
        private ILocalImagePipelineService localImagePipelineService;
        private IGoogleImagePipeLineService googleImagePipelineService;
        private IMessageBoxService messageBoxService;
        private List<ImageInfo> processedImages = new List<ImageInfo>();
        private bool useWebBasedImages = false;
        private string localImageFolder = "";
        private string defaultImagePath = @"C:\Users\Public\Pictures\Sample Pictures";


        private string waitText;
        private string errorMessage;
        private AsyncType asyncState = AsyncType.Content;


        public MainWindowViewModel(
            ILocalImagePipelineService imagePipelineService, 
            IGoogleImagePipeLineService googleSearchProvider,
            IMessageBoxService messageBoxService)
        {
            this.localImagePipelineService = imagePipelineService;
            this.googleImagePipelineService = googleSearchProvider;
            this.messageBoxService = messageBoxService;

            imagePipelineService.PipeLineCompleted += ImagePipelineService_PipeLineCompleted;
            googleSearchProvider.PipeLineCompleted += GooglePipelineService_PipeLineCompleted;

            AsyncState = AsyncType.Content;
            WaitText ="Fetching images";

        }




        public void DoIt()
        {
            AsyncState = AsyncType.Busy;
            bool result=false;
            if (Boolean.TryParse(
                ConfigurationManager.AppSettings["UseWebBasedImages"].ToString(), 
                out useWebBasedImages))
            {
                if (useWebBasedImages)
                {
                    googleImagePipelineService.StartPipeline();
                }
                else
                {
                    ShowUsingLocalImages();
                }
            }
            else
            {
                ShowUsingLocalImages();   
            }
        }


        public List<ImageInfo> ProcessedImages
        {
            get { return processedImages; }
            set
            {
                if (processedImages != value)
                {
                    processedImages = value;
                    NotifyPropertyChanged("ProcessedImages");
                }
            }
        }


        public AsyncType AsyncState
        {
            get { return asyncState; }
            set
            {
                if (asyncState != value)
                {
                    asyncState = value;
                    NotifyPropertyChanged("AsyncState");
                }
            }
        }



        public string WaitText
        {
            get { return waitText; }
            set
            {
                if (waitText != value)
                {
                    waitText = value;
                    NotifyPropertyChanged("WaitText");
                }
            }
        }


        public string ErrorMessage
        {
            get { return errorMessage; }
            set
            {
                if (errorMessage != value)
                {
                    errorMessage = value;
                    NotifyPropertyChanged("ErrorMessage");
                }
            }
        }




        private void ShowUsingLocalImages()
        {
            localImageFolder = ConfigurationManager.AppSettings["LocalImageFolder"].ToString();
            if (!String.IsNullOrEmpty(localImageFolder))
            {
                if (Directory.Exists(localImageFolder))
                {
                    localImagePipelineService.StartPipeline(localImageFolder);
                }
                else
                {
                    messageBoxService.ShowMessage(
			"The LocalImageFolder folder you specified does not exist");
                }
            }
            else
            {
                localImagePipelineService.StartPipeline(
			@"C:\Users\Public\Pictures\Sample Pictures");
            }
        }


        private void ImagePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
        {
            ProcessedImages = e.GatheredImages;
            AsyncState = AsyncType.Content;
        }

        private void GooglePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
        {
            ProcessedImages = e.GatheredImages;
            AsyncState = AsyncType.Content;
        }
       

    }
}

The important thing to note is that the MainWindowViewModel makes use of 2 services, one for a local image pipeline and one for a google image pipeline. These 2 services are shown below. The one that is used depends on the "UseWebBasedImages" setting in the App.Config

 

LocalImagePipelineService : Local pipeline

This service does the local image search pipeline. The idea being that it will raise a PipeLineCompleted event when the pipeline is finished adding, which the MainWindowViewModel will listen to, and will then grab the images and set them on a property within the MainWindowViewModel which will update the binding which the ListBox in the XAML will listen to and bingo images will be shown.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using WPFImagePipeline.Model;
using System.Threading.Tasks;
using System.IO;

namespace WPFImagePipeline.Services
{

    public class ImagePipelineCompletedArgs : EventArgs
    {
        public List<ImageInfo> GatheredImages { get; private set; }

        public ImagePipelineCompletedArgs(List<ImageInfo> gatheredImages)
        {
            this.GatheredImages = gatheredImages;
        }
    }


    public interface ILocalImagePipelineService
    {
        void StartPipeline(string yourImageFolder);
        event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

    }



    public class LocalImagePipelineService : ILocalImagePipelineService
    {
        private Object locker = new Object();


        private FileInfo[] GetAllMatchingImageFiles(string yourImageFolder)
        {
            string lookfor = "*.png;*.jpg;*.gif;*.tif";
            string[] extensions = lookfor.Split(new char[] { ';' });

            List<FileInfo> myfileinfos = new List<FileInfo>();
            DirectoryInfo di = new DirectoryInfo(yourImageFolder);

            foreach (string ext in extensions)
            {
                myfileinfos.AddRange(di.GetFiles(ext));
            }

            return myfileinfos.ToArray();
        }


        private void CreateImageUrls(string yourImageFolder, BlockingCollection<string> urls)
        {
            try
            {
                FileInfo[] files = GetAllMatchingImageFiles(yourImageFolder);
                Random rand = new Random();
                int added = 0;
                do
                {
                    int idx = rand.Next(0, files.Count());
                    urls.Add(files[idx].FullName);
                    ++added;
                } while (added < 100);
            }
            finally
            {
                urls.CompleteAdding();
            }
        }

        private void CreateImageInfos(BlockingCollection<string> urls, 
            BlockingCollection<ImageInfo> initialImageInfos)
        {
            try
            {
                foreach (string url in urls.GetConsumingEnumerable())
                {
                    int idx = url.LastIndexOf(@"\") + 1;
                    initialImageInfos.Add(new ImageInfo(url, url.Substring(idx,url.Length-idx)));
                }
            }
            finally
            {
                initialImageInfos.CompleteAdding();
            }
        }


        private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
        {
            List<ImageInfo> localInfos = new List<ImageInfo>();

            try
            {
                foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
                {
                    lock (locker)
                    {
                        localInfos.Add(imageInfo);
                    }
                }
            }
            finally
            {
                OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
            }
        }



        #region IImagePipelineService Members

        public void StartPipeline(string yourImageFolder)
        {
            BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
            BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);

            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning, 
                TaskContinuationOptions.None);

            Task stage1 = factory.StartNew(() => CreateImageUrls(yourImageFolder,buffer1));
            Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
            Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));

        }


        public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

        protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
        {
            if (PipeLineCompleted != null)
            {
                PipeLineCompleted(this, e);
            }
        }
        #endregion
    }
}

GoogleImagePipelineService : Web pipeline

This service does the web image search pipeline using a freely available .NET Google search API Dll. As before the idea being that it will raise a PipeLineCompleted event when the pipeline is finished adding, which the MainWindowViewModel will listen to, and will then grab the images and set them on a property within the MainWindowViewModel which will update the binding which the ListBox in the XAML will listen to and bingo images will be shown.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Model;
using System.Threading;
using System.Threading.Tasks;
using Gapi.Search;
using System.Collections.Concurrent;

namespace WPFImagePipeline.Services
{
    #region IGoogleSearchProvider
    public interface IGoogleImagePipeLineService
    {
        void StartPipeline();
        event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
    }
    #endregion

    #region GoogleSearchProvider
    public class GoogleImagePipeLineService : IGoogleImagePipeLineService
    {

        private List<string> randomKeyWords = new List<string>() 
            {   "pitbull", "shark", "dog", "parrot", "robot", 
                "cheerleader", "gun", "skull", "plane", "manga", 
                "bikini","model","snake","spider" 
            };
        private Random rand = new Random();
        private List<string> urls = new List<string>();
        private Object locker = new Object();



        private void CreateImageUrls(BlockingCollection<string> urls)
        {
            try
            {
                int added = 0;

                do
                {
                    string keyword = randomKeyWords[rand.Next(0, randomKeyWords.Count)];
                    SearchResults searchResults = Searcher.Search(SearchType.Image, keyword);

                    if (searchResults.Items.Count() > 0)
                    {
                        foreach (var searchResult in searchResults.Items)
                        {
                            urls.Add(searchResult.Url);
                            ++added;
                        }
                    }
                } while (added < 100);


            }
            finally
            {
                urls.CompleteAdding();
            }
        }

        private void CreateImageInfos(BlockingCollection<string> urls,
            BlockingCollection<ImageInfo> initialImageInfos)
        {
            try
            {
                foreach (string url in urls.GetConsumingEnumerable())
                {
                    int idx = url.LastIndexOf(@"\") + 1;
                    initialImageInfos.Add(new ImageInfo(url, url.Substring(idx, url.Length - idx)));
                }
            }
            finally
            {
                initialImageInfos.CompleteAdding();
            }
        }


        private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
        {
            List<ImageInfo> localInfos = new List<ImageInfo>();

            try
            {
                foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
                {
                    lock (locker)
                    {
                        localInfos.Add(imageInfo);
                    }
                }
            }
            finally
            {
                OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
            }
        }




        #region IImagePipelineService Members

        public void StartPipeline()
        {
            BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
            BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);

            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
                TaskContinuationOptions.None);

            Task stage1 = factory.StartNew(() => CreateImageUrls(buffer1));
            Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
            Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));

        }


        public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

        protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
        {
            if (PipeLineCompleted != null)
            {
                PipeLineCompleted(this, e);
            }
        }
        #endregion
    }
    #endregion

}

As can be seen both of these services use TPL concepts you have seen before as well as the BlockingCollection<T> stuff from this article

 

What Does It Look Like

Here is what is produced at the end of the pipeline, which is a surface like panel for WPF, which is making use of my work collegues SurfacePanel for WPF. This is possible since I simply swapped out the standard WPF ListBox Panel to use my work collegues panel. So yeah what you are seeing below is just a ListBox really.

The code is the place to look to see how this all works.

 

 

 

That's It For Now

That is all I wanted to say in this in this article. I hope you liked it, and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote. Many thanks.

Hopefully, see you at the next one.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here