Demo code source : TasksArticle5.zip
Introduction
This is the 5th part of my proposed series of articles on TPL. Last time I
introduced Parallel For and Foreach, and covered this ground:
- Introduction To PLinq
- Useful Extension Methods
- Simple PLinq Example
- Ordering
- Using Ranges
- Handling Exceptions
- Cancelling A PLinq Query
- Partitioning For Possibly Better Perfomance
- Using Custom Aggregation
This time we are going to be looking at how to use some new classes that
although not what I would call officially part of TPL, are more than likely
going to be used when working with TPL. These new classes are in the
System.Collections.Concurrent Namespace here is a listing of the classes in
that namespace, which was introduced in .NET 4
Class Name |
Description |
BlockingCollection<T> |
Provides blocking and bounding capabilities for thread-safe collections that implement IProducerConsumerCollection. |
ConcurrentBag<T> |
Represents a thread-safe, unordered collection of objects. |
ConcurrentDictionary<TKey, TValue> |
Represents a thread-safe collection of key-value pairs that can be
accessed by multiple threads concurrently. |
ConcurrentQueue<T> |
Represents a thread-safe first in-first out (FIFO) collection. |
ConcurrentStack<T> |
Represents a thread-safe last in-first out (LIFO) collection. |
OrderablePartitioner<TSource> |
Represents a particular manner of splitting an orderable data source
into multiple partitions. |
Partitioner |
Provides common partitioning strategies for arrays, lists, and
enumerables. |
Partitioner<TSource> |
Represents a particular manner of splitting a data source into
multiple partitions. |
As can be seen from this table, we have seen a few of these classes before,
such as OrderedPartitioner<T>
and Partitioner<T>/Partitioner
.
Article Series Roadmap
This is article 5 of a possible 6, which I hope people will like. Shown below
is the rough outline of what I would like to cover.
- Starting Tasks / Trigger Operations / ExceptionHandling / Cancelling /
UI Synchronization
- Continuations / Cancelling Chained Tasks
- Parallel For / Custom Partioner / Aggregate Operations
- Parallel LINQ
- Pipelines (this article)
- Advanced Scenarios / v.Next for Tasks
Now I am aware that some folk will simply read this article and state that it
is similar to what is currently available on MSDN, and I in part agree with that, however there are several reasons I have chosen to still take on
the task
of writing up these articles, which are as follows:
- It will only really be the first couple of articles which show simliar
ideas to MSDN, after that I feel the material I will get into will not be on
MSDN, and will be the result of some TPL research on my behalf, which I will be
outlining in the article(s), so you will benefit from my research which you
can just read...Aye, nice
- There will be screen shots of live output here which is something MSDN
does not have that much off, which may help some readers to reinforce the
article(s) text
- There may be some readers out here that have never even heard of Task
Parallel Library so would not come across it in MSDN, you know the old
story, you have to know what you are looking for in the 1st place thing.
- I enjoy threading articles, so like doing them, so I did them, will do
them, have done them, and continue to do them
All that said, if people having read this article, truly think this is too similar to MSDN
(which I still hope it won't be) let me know that as well, and I will try and
adjust the upcoming articles to make amends.
Table Of Contents
Like I said within the introduction of this article there are a few new
classes that we can use in the
System.Collections.Concurrent Namespace, in this article I will be
concentrating on the use of the
BlockingCollection.
This is what MSDN has to say about BlockingCollection<T>
The seciont immediately below is taken from
http://msdn.microsoft.com/en-us/library/dd997371.aspx up on date 25/03/2011
BlockingCollection<T>
is a thread-safe collection class that
provides the following features:
- An implementation of the Producer-Consumer pattern.
- Concurrent adding and taking of items from multiple threads.
- Optional maximum capacity.
- Insertion and removal operations that block when collection is empty or
full.
- Insertion and removal "try" operations that do not block or that block
up to a specified period of time.
- Encapsulates any collection type that implements
IProducerConsumerCollection<T>
- Cancellation with cancellation tokens.
- Two kinds of enumeration with foreach (For Each in Visual Basic):
- Read-only enumeration.
- Enumeration that removes items as they are enumerated.
Using BlockingCollection<T>
we can create what I will call "pipelines" which is where we have
multiple phases, where each of the phases makes use of the Producer-Consumer pattern. So imagine we have a pipeline of 3
stages, the 1st stage would produce items that the 2nd stage would consume and the 2nd stage would consume the 1st stages output, and would
also supply data for the 3rd stage...You get the idea right.
So lets have a look at some example pipelines
Like I said this article will concentrate on BlockingCollection<T>
which is makes use of the Producer-Consumer pattern. So how does this happen,
shall we have a look.
Producer
The producer is responsible for creating items, and should be the one that
tells consumer that the produces is done. A very simple example of this may be
as follows
static void CreateInitialRange(BlockingCollection<int> output)
{
try
{
for (int i = 1; i < 10; i++)
{
output.Add(i);
}
}
finally
{
output.CompleteAdding();
}
}
What we can see from the above code is that we add items to a BlockingCollection<T>
,
which is blocking, and when we have finished producing we need to alert the
consumer that there are no more items expected, which is done via the BlockingCollection<T>.CompletedAdding
method
Consumer
The consumer is obviously the one that is responsible for consuming the
results from the producer, so how does it do that exactly. Well it's actually
quite simple. All we really need to do is use a BlockingCollection<T>.GetConsumingEnumerable()
which the consumer can use to consume the producers produces values. Here is a
snippet that follows on from the snippet above:
static void DoubleTheRange(BlockingCollection<int> input, BlockingCollection<int> output)
{
try
{
foreach (var number in input.GetConsumingEnumerable())
{
output.Add((int)(number * number));
}
}
finally
{
output.CompleteAdding();
}
}
Demo project name : SimplePipeline
So now that I have talked about BlockingCollection<T>
a bit, lets have a look how we can chain these producer-consumer collections
together to form something more meaningful (which I call a pipeline).
Here is the full code listing for a simple console app that carries out the
following pipeline:
- Creates an initial range of Ints (produce phase)
- Doubles the initial range of Ints (from producer, and also acts as
producer for next phase)
- Writes the results of the doubling phase producer
class Program
{
static void Main(string[] args)
{
var buffer1 = new BlockingCollection<int>(10);
var buffer2 = new BlockingCollection<int>(10);
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
var stage1 = f.StartNew(() => CreateInitialRange(buffer1));
var stage2 = f.StartNew(() => DoubleTheRange(buffer1, buffer2));
var stage3 = f.StartNew(() => WriteResults(buffer2));
Task.WaitAll(stage1, stage2, stage3);
Console.ReadLine();
}
static void CreateInitialRange(BlockingCollection<int> output)
{
try
{
for (int i = 1; i < 10; i++)
{
output.Add(i);
Console.WriteLine("CreateInitialRange {0}", i);
}
}
finally
{
output.CompleteAdding();
}
}
static void DoubleTheRange(
BlockingCollection<int> input,
BlockingCollection<int> output)
{
try
{
foreach (var number in input.GetConsumingEnumerable())
{
output.Add((int)(number * number));
}
}
finally
{
output.CompleteAdding();
}
}
static void WriteResults(BlockingCollection<int> input)
{
foreach (var squaredNumber in input.GetConsumingEnumerable())
{
Console.WriteLine("Result is {0}", squaredNumber);
}
}
}
And here is the result of running this code:
Demo project name : WPFImagePipeline
I have also included a more complicated pipeline that acts pretty much the
same as the example above, but uses a WPF application to show the results of the
final consumer. I think it may be more easy for people to grasp what is going on
with this demo.
I should point out that this demo tries to look for images, which can either
be on your local file system, or web based. The type of pipeline that is created
depends on the following App.Config settings
If you are NOT connected to the internet, just make sure
that the LocalImageFolder points to a valid image path on your PC.
I don't want to go into this code too much, as it is not that relevant to the
discussion, but I think a general understanding of how this demo works may be
beneficial, so here we go.
There is a single window, which contains the following XAML (which I ummed
and ahhhed about showing, but in the end I decided to show it, as there is a
useful threading control included with the article that requires some XAML to
make it work)
<controls:AsyncHost AsyncState="{Binding Path=AsyncState, Mode=OneWay}">
<Grid x:Name="mainGrid" Background="White" Margin="5"
controls:AsyncHost.AsyncContentType="Content">
<ListBox ItemsSource="{Binding ProcessedImages}"
ItemTemplate="{StaticResource ImageInfoDataTemplate}"
BorderThickness="0"
BorderBrush="Transparent"
ItemContainerStyle="{StaticResource ImageInfoListBoxItemStyle}">
<ListBox.ItemsPanel>
<ItemsPanelTemplate>
<controls:ScatterPanel Background="White">
</controls:ScatterPanel>
</ItemsPanelTemplate>
</ListBox.ItemsPanel>
</ListBox>
</Grid>
<controls:AsyncBusyUserControl
controls:AsyncHost.AsyncContentType="Busy"
AsyncWaitText="{Binding Path=WaitText, Mode=OneWay}"
Visibility="Hidden" />
<controls:AsyncFailedUserControl
controls:AsyncHost.AsyncContentType="Error"
Error="{Binding Path=ErrorMessage, Mode=OneWay}"
Visibility="Hidden" />
</controls:AsyncHost>
The general idea is that the main window simply shows a list of images within
a ListBox
. Where the images could have been fetched via the
pipeline from a local hard drive (see App.Config) or via the use of a random
Google search.
The threading component helper is called AsyncHost
and is bound to a AsyncState
property exposed
via the ViewModel in use for this Window. Depending on the value of that bound property, the AsyncHost
will show one of three things
AsyncState
= Content
, will show the element
marked up with the attached property set to "Content
", which is
this example is the ListBox
AsyncState
= Busy
, will show the element
marked up with the attached property set to "Busy
", which is
this example is a custom control called AyncBusyUserControl
AsyncState
= Error
, will show the element
marked up with the attached property set to "Error
", which is
this example is a custom control called AyncFailedUserControl
All of these controls are within the download, feel free to look into those
controls at you leasure, they are very useful and I find myself using them a lot
in any WPF UI project when I am doing long running tasks that could also fail.
So that is the relevant XAML
So now that we seen the most relevant parts of the view, lets see the view
model which looks like this:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Common;
using WPFImagePipeline.Services;
using WPFImagePipeline.Model;
using System.Configuration;
using System.IO;
using WPFImagePipeline.Controls;
namespace WPFImagePipeline.ViewModels
{
public class MainWindowViewModel : INPCBase
{
private ILocalImagePipelineService localImagePipelineService;
private IGoogleImagePipeLineService googleImagePipelineService;
private IMessageBoxService messageBoxService;
private List<ImageInfo> processedImages = new List<ImageInfo>();
private bool useWebBasedImages = false;
private string localImageFolder = "";
private string defaultImagePath = @"C:\Users\Public\Pictures\Sample Pictures";
private string waitText;
private string errorMessage;
private AsyncType asyncState = AsyncType.Content;
public MainWindowViewModel(
ILocalImagePipelineService imagePipelineService,
IGoogleImagePipeLineService googleSearchProvider,
IMessageBoxService messageBoxService)
{
this.localImagePipelineService = imagePipelineService;
this.googleImagePipelineService = googleSearchProvider;
this.messageBoxService = messageBoxService;
imagePipelineService.PipeLineCompleted += ImagePipelineService_PipeLineCompleted;
googleSearchProvider.PipeLineCompleted += GooglePipelineService_PipeLineCompleted;
AsyncState = AsyncType.Content;
WaitText ="Fetching images";
}
public void DoIt()
{
AsyncState = AsyncType.Busy;
bool result=false;
if (Boolean.TryParse(
ConfigurationManager.AppSettings["UseWebBasedImages"].ToString(),
out useWebBasedImages))
{
if (useWebBasedImages)
{
googleImagePipelineService.StartPipeline();
}
else
{
ShowUsingLocalImages();
}
}
else
{
ShowUsingLocalImages();
}
}
public List<ImageInfo> ProcessedImages
{
get { return processedImages; }
set
{
if (processedImages != value)
{
processedImages = value;
NotifyPropertyChanged("ProcessedImages");
}
}
}
public AsyncType AsyncState
{
get { return asyncState; }
set
{
if (asyncState != value)
{
asyncState = value;
NotifyPropertyChanged("AsyncState");
}
}
}
public string WaitText
{
get { return waitText; }
set
{
if (waitText != value)
{
waitText = value;
NotifyPropertyChanged("WaitText");
}
}
}
public string ErrorMessage
{
get { return errorMessage; }
set
{
if (errorMessage != value)
{
errorMessage = value;
NotifyPropertyChanged("ErrorMessage");
}
}
}
private void ShowUsingLocalImages()
{
localImageFolder = ConfigurationManager.AppSettings["LocalImageFolder"].ToString();
if (!String.IsNullOrEmpty(localImageFolder))
{
if (Directory.Exists(localImageFolder))
{
localImagePipelineService.StartPipeline(localImageFolder);
}
else
{
messageBoxService.ShowMessage(
"The LocalImageFolder folder you specified does not exist");
}
}
else
{
localImagePipelineService.StartPipeline(
@"C:\Users\Public\Pictures\Sample Pictures");
}
}
private void ImagePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
{
ProcessedImages = e.GatheredImages;
AsyncState = AsyncType.Content;
}
private void GooglePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
{
ProcessedImages = e.GatheredImages;
AsyncState = AsyncType.Content;
}
}
}
The important thing to note is that the MainWindowViewModel
makes use of 2 services, one for a local image pipeline and one for a
google image pipeline. These 2 services are shown below. The one that is used
depends on the "UseWebBasedImages" setting in the App.Config
This service does the local image search pipeline. The idea being that it
will raise a PipeLineCompleted
event when the pipeline is finished
adding, which the MainWindowViewModel
will listen to, and will then
grab the images and set them on a property within the MainWindowViewModel
which will update the binding which the ListBox
in the XAML
will listen to and bingo images will be shown.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using WPFImagePipeline.Model;
using System.Threading.Tasks;
using System.IO;
namespace WPFImagePipeline.Services
{
public class ImagePipelineCompletedArgs : EventArgs
{
public List<ImageInfo> GatheredImages { get; private set; }
public ImagePipelineCompletedArgs(List<ImageInfo> gatheredImages)
{
this.GatheredImages = gatheredImages;
}
}
public interface ILocalImagePipelineService
{
void StartPipeline(string yourImageFolder);
event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
}
public class LocalImagePipelineService : ILocalImagePipelineService
{
private Object locker = new Object();
private FileInfo[] GetAllMatchingImageFiles(string yourImageFolder)
{
string lookfor = "*.png;*.jpg;*.gif;*.tif";
string[] extensions = lookfor.Split(new char[] { ';' });
List<FileInfo> myfileinfos = new List<FileInfo>();
DirectoryInfo di = new DirectoryInfo(yourImageFolder);
foreach (string ext in extensions)
{
myfileinfos.AddRange(di.GetFiles(ext));
}
return myfileinfos.ToArray();
}
private void CreateImageUrls(string yourImageFolder, BlockingCollection<string> urls)
{
try
{
FileInfo[] files = GetAllMatchingImageFiles(yourImageFolder);
Random rand = new Random();
int added = 0;
do
{
int idx = rand.Next(0, files.Count());
urls.Add(files[idx].FullName);
++added;
} while (added < 100);
}
finally
{
urls.CompleteAdding();
}
}
private void CreateImageInfos(BlockingCollection<string> urls,
BlockingCollection<ImageInfo> initialImageInfos)
{
try
{
foreach (string url in urls.GetConsumingEnumerable())
{
int idx = url.LastIndexOf(@"\") + 1;
initialImageInfos.Add(new ImageInfo(url, url.Substring(idx,url.Length-idx)));
}
}
finally
{
initialImageInfos.CompleteAdding();
}
}
private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
{
List<ImageInfo> localInfos = new List<ImageInfo>();
try
{
foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
{
lock (locker)
{
localInfos.Add(imageInfo);
}
}
}
finally
{
OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
}
}
#region IImagePipelineService Members
public void StartPipeline(string yourImageFolder)
{
BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);
TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
Task stage1 = factory.StartNew(() => CreateImageUrls(yourImageFolder,buffer1));
Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));
}
public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
{
if (PipeLineCompleted != null)
{
PipeLineCompleted(this, e);
}
}
#endregion
}
}
This service does the web image search pipeline using a freely available .NET
Google search API Dll. As before the idea being that it
will raise a PipeLineCompleted
event when the pipeline is finished
adding, which the MainWindowViewModel
will listen to, and will then
grab the images and set them on a property within the MainWindowViewModel
which will update the binding which the ListBox
in the XAML
will listen to and bingo images will be shown.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Model;
using System.Threading;
using System.Threading.Tasks;
using Gapi.Search;
using System.Collections.Concurrent;
namespace WPFImagePipeline.Services
{
#region IGoogleSearchProvider
public interface IGoogleImagePipeLineService
{
void StartPipeline();
event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
}
#endregion
#region GoogleSearchProvider
public class GoogleImagePipeLineService : IGoogleImagePipeLineService
{
private List<string> randomKeyWords = new List<string>()
{ "pitbull", "shark", "dog", "parrot", "robot",
"cheerleader", "gun", "skull", "plane", "manga",
"bikini","model","snake","spider"
};
private Random rand = new Random();
private List<string> urls = new List<string>();
private Object locker = new Object();
private void CreateImageUrls(BlockingCollection<string> urls)
{
try
{
int added = 0;
do
{
string keyword = randomKeyWords[rand.Next(0, randomKeyWords.Count)];
SearchResults searchResults = Searcher.Search(SearchType.Image, keyword);
if (searchResults.Items.Count() > 0)
{
foreach (var searchResult in searchResults.Items)
{
urls.Add(searchResult.Url);
++added;
}
}
} while (added < 100);
}
finally
{
urls.CompleteAdding();
}
}
private void CreateImageInfos(BlockingCollection<string> urls,
BlockingCollection<ImageInfo> initialImageInfos)
{
try
{
foreach (string url in urls.GetConsumingEnumerable())
{
int idx = url.LastIndexOf(@"\") + 1;
initialImageInfos.Add(new ImageInfo(url, url.Substring(idx, url.Length - idx)));
}
}
finally
{
initialImageInfos.CompleteAdding();
}
}
private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
{
List<ImageInfo> localInfos = new List<ImageInfo>();
try
{
foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
{
lock (locker)
{
localInfos.Add(imageInfo);
}
}
}
finally
{
OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
}
}
#region IImagePipelineService Members
public void StartPipeline()
{
BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);
TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
Task stage1 = factory.StartNew(() => CreateImageUrls(buffer1));
Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));
}
public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
{
if (PipeLineCompleted != null)
{
PipeLineCompleted(this, e);
}
}
#endregion
}
#endregion
}
As can be seen both of these services use TPL concepts you have seen before as well as the BlockingCollection<T>
stuff from this article
Here is what is produced at the end of the pipeline, which is a surface like
panel for WPF, which is making use of my work collegues
SurfacePanel for WPF. This is possible since I simply swapped out the
standard WPF ListBox
Panel to use my work collegues panel. So yeah
what you are seeing below is just a ListBox
really.
The code is the place to look to see how this all works.
That's It For Now
That is all I wanted to say in this in this article. I hope you liked it, and
want more. If you did like this article, and would like more, could you spare some time to leave a
comment and a vote. Many thanks.
Hopefully, see you at the next one.