Missing PLINQ
PLINQ is sorely missed in Silverlight. It is one of requested features for new Silverlight releases. Please vote for this implementation on User Voice:
http://dotnet.uservoice.com/forums/4325-silverlight-feature-suggestions/suggestions/310712-plinq-and-tpl.
I hoped PLINQ would make it into Silverlight 5 release. Downloading and installing Silverlight 5 Beta proved otherwise. There I was trying in vain to dot AsParallel and getting nothing in return… That is unfortunate, since it limits multi-platform targeting somewhat.
Since all I want right now is the ParallelEnumerable.ForAll, I decided to implement just that extension to have it on all platforms.
Bare bone classes
To implement my own version of ForAll, I need the following two classes:
ParallelEnumerable
, and
ParallelQuery<T>.
ParallelEnumerable class has to implement two extension methods: AsParallel() and ForAll(). These classes have to be in a conditional compilation block such that it is built only in Silverlight. I took the simplest approach possible.
#if SILVERLIGHT
public class ParallelQuery<TSource> : IEnumerable<TSource>
{
private List<TSource> _collection = new List<TSource>();
public ParallelQuery(IEnumerable<TSource> source)
{
_collection = source.ToList();
}
public IEnumerator<TSource> GetEnumerator()
{
return _collection.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return _collection.GetEnumerator();
}
}
public static class ParallelEnumerable
{
public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source)
{
return new ParallelQuery<TSource>(source);
}
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
....
....
}
}
#endif
ForAll() is discussed separately next.
Creditor and ForAll implementations
For the sake of multi-platform compatibility the ForAll could be implemented as a simple foreach. But I wanted it to resemble the real thing as much as possible and execute in parallel in multiple threads. The basic design, therefore, before any synchronization measures, should be similar to the following:
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
foreach (var item in source)
{
var scopedItem = item;
ThreadPool.QueueUserWorkItem((x) =>
{
action(scopedItem);
});
}
}
This code has one major problem that needs to be addressed, and one “good to have” improvement. The major problem is the fact that nothing stops the ForAll function from returning before all threads it spawned have finished their work. The “good to have” improvement is the ability to throttle the number of simultaneous threads. ThreadPool will do that for us, but letting ThreadPool get hammered on a busy system is not nice.
One way to solve the first problem while keeping the code simple is to use a Countdown Latch. Countdown Latch example is available on MSDN:
http://msdn.microsoft.com/en-us/magazine/cc163427.aspx. Using the latch, the code becomes as follows:
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
CountdownLatch latch = new CountdownLatch(source.Count());
foreach (var item in source)
{
var scopedItem = item;
ThreadPool.QueueUserWorkItem((x) =>
{
action(scopedItem);
latch.Signal();
});
}
latch.Wait();
}
Latch.Wait() blocks the method until all threads have finished their work and signaled. This method works, but it has a problem. To initialize the latch to correct value, one needs to count the number of elements in the source collection. It may mean traversal of the whole sequence. That way we would traverse the sequence twice: when calculating the Count and when invoking an action on each element. This double traversal just feels wrong.
Thread throttling could be done using a Blocking Queue, or a counting Semaphore. For example, as follows:
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
CountdownLatch latch = new CountdownLatch(source.Count());
Semaphore semaphore = new Semaphore(10);
foreach (var item in source)
{
var scopedItem = item;
semaphore.V();
ThreadPool.QueueUserWorkItem((x) =>
{
action(scopedItem);
latch.Signal();
semaphore.P();
});
}
latch.Wait();
}
It works, but now we have two synchronization objects to deal with, and the code is littered with signaling and waiting calls. Having two synchronization objects and calling Count() feels like a hack. I am definitely not using the right tools for the job.
Now, I am not a multi-threading expert. Chances are good that a synchronization primitive just for this job exists since 1960s. I could not think of one. Instead, I tried to redefine the problem in different terms and solve it. Hence, the Creditor.
Creditor has N loans to offer. Once all N loans have been distributed, Creditor cannot distribute any more loans. He then waits for paybacks to occur before he can loan some more. At some point Creditor decides to collect all its loans without giving out any new ones. At that point it blocks until borrowers have returned all N loans. In essence, it is a counting semaphore with a blocking operation to restore its initial state.
Here is how one uses the Creditor in the ForAll method:
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
Creditor creditor = new Creditor(10);
foreach (var item in source)
{
var scopedItem = item;
creditor.Borrow();
ThreadPool.QueueUserWorkItem((x) =>
{
action(scopedItem);
creditor.Payback();
});
}
creditor.Collect();
}
I find this quite readable and “natural”. The following is my implementation of the Creditor:
public class Creditor
{
private ManualResetEvent _countingEvent = new ManualResetEvent(false);
private object _syncRoot = new object();
private int _totalLoans;
private int _loans = 0;
private volatile bool _isCollecting;
private ManualResetEvent _collectingEvent = new ManualResetEvent(true);
public Creditor(int totalLoans)
{
_totalLoans = totalLoans;
_loans = totalLoans;
}
public void Borrow()
{
if (_isCollecting)
{
throw new InvalidOperationException("Cannot borrow when collecting.");
}
lock (_syncRoot)
{
while (_loans == 0)
{
_countingEvent.Reset();
Monitor.Exit(_syncRoot);
_countingEvent.WaitOne();
Monitor.Enter(_syncRoot);
}
--_loans;
}
}
public void Payback()
{
lock (_syncRoot)
{
++_loans;
if (_loans == 1)
{
_countingEvent.Set();
}
if (_isCollecting && _loans == _totalLoans)
{
_collectingEvent.Set();
}
}
}
public void Collect()
{
_isCollecting = true;
lock (_syncRoot)
{
while (_loans < _totalLoans)
{
_collectingEvent.Reset();
Monitor.Exit(_syncRoot);
_collectingEvent.WaitOne();
Monitor.Enter(_syncRoot);
}
}
_isCollecting = false;
}
}
Of course, the Creditor could be improved somewhat. It has to implement IDisposable, for example. One possible improvement to consider is whether it is a good idea to move the Collect() logic into the Dispose method. In that case, one could have a neat using block in the ForAll().
Conclusion
This concludes my implementation of
ParallelEnumerable.ForAll()
. I have tested my code by converting large color images to grayscale, each pixel in its own thread. I did not experience deadlocks.
This is the “image conversion” code, demonstrating usage of ForAll in Silverlight:
private void UserControl_Loaded(object sender, RoutedEventArgs e)
{
BitmapImage bmp = new BitmapImage();
bmp.ImageOpened += OnImageOpened;
....
}
void OnImageOpened(object sender, RoutedEventArgs e)
{
WriteableBitmap original = new WriteableBitmap((BitmapImage)sender);
WriteableBitmap grayscale = new WriteableBitmap(original.PixelWidth, original.PixelHeight);
var operation = from index in Enumerable.Range(0, original.Pixels.Length)
select new
{
Index = index,
Color = original.Pixels[index],
};
operation.AsParallel().ForAll(x => { grayscale.Pixels[x.Index] = GetGrayscale(x.Color); });
_imageBox.Source = grayscale;
}
Compatibility
One day Silverlight will implement PLINQ. Since
ParallelEnumerable
and
ParallelQuery<T>
classes will be defined in two places, that will result in compilation errors. This is good, because it makes it very simple to find the code one wants to remove. Just delete the entire conditional compilation block defined above.