Challenge
I just had the first chance to apply PLINQ (Parallel LINQ) to the real task.
When I am playing around with some new technology, everything works fine, the real understanding comes along with challenges in a real use case. Currently, I am working on an application which processes large amount of text data gathering statistics on word occurrences (see Source Code Word Cloud).
Here's what the simplified core of my code is doing:
- Enumerate through all files with *.txt extension
- Enumerate through words in each text file
- Group by word and count occurrences
- Sort by occurrences
- Output top 20
Everything worked fine with LINQ. I wrote a prototype console application with a single LINQ query. … and ta-daa! Just adding one line turned it into PLINQ.
public static void Main(string[] args) {
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word =>; word, (word, words) =>; new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair =>; occurrencesWordPair.Item1)
.Take(20);
foreach (Tuple<int, string> tuple in result) {
Console.WriteLine(tuple);
}
Console.Write("Press any key to quit."); Console.ReadKey();
}
private static IEnumerable<string> ReadWords(string line) {
StringBuilder word = new StringBuilder();
foreach (char ch in line) {
if (char.IsLetter(ch)) { word.Append(ch); }
else {
if (word.Length == 0) continue;
yield return word.ToString();
word.Clear();
}
}
}
Finally, all four cores of my PC and the fan were busy processing dozens of megabytes of free eBooks in my temp folder I have prepared for testing. Moving to PLINQ brought me significant performance boost. But adopting this one line into the WinForms application was a different challenge.
Frustration
… cancellability and responsiveness during long running queries were lost.
It seems that the OrderBy
query was synchronizing data back into the main thread and Windows messages were not processed. In the example below, I am demonstrating my implementation of cancellation according to MSDN. How to: Cancel a PLINQ Query which does not work. :(
public Form1()
{
InitializeComponent();
m_CancellationTokenSource = new CancellationTokenSource();
}
private readonly CancellationTokenSource m_CancellationTokenSource;
private void buttonStart_Click(object sender, EventArgs e)
{
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word => word, (word, words) => new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
try {
foreach (Tuple<int, string> tuple in result) {
Console.WriteLine(tuple);
}
}
catch (OperationCanceledException ex) {
Console.WriteLine(ex.Message);
}
}
private void buttonCancel_Click(object sender, EventArgs e)
{
m_CancellationTokenSource.Cancel();
}
private static IEnumerable<string> ReadWords(string line) { ... }
I was not able to click Cancel button, move the form around, do anything at all until the whole query was finished. That’s was not the perfect cancellation behavior – “Yes you can cancel it but you cannot click button which does that”. I have asked the question to my colleagues and posted on stackoverflow. Everyone gave me the same answer – call the query in another thread.
That was not an option form me, if I wanted to clutter my code with threading stuff, I would do it without PLINQ. PLINQ allows you to keep your logic free from parallelism / synchronization overhead keeping your code readable and closer to business, not to techniques. (see proposed solutions at stackoverflow)
The Ahaa Moment
While waiting for THE expert’s opinion from somewhere in cyberspace, I came up with my own which I like pretty much. I wrote these two extension methods:
public static class CallbackExtensions
{
public static ParallelQuery<TSource> ProcessWindowsMessages<TSource>
(this ParallelQuery<TSource> source)
{
return source.Select(
item =>
{
Application.DoEvents();
Thread.Yield();
return item;
});
}
public static ParallelQuery<TSource> WithCallback<TSource>
(this ParallelQuery<TSource> source, ISynchronizeInvoke target, Action<TSource> callback)
{
return source.Select(
item =>
{
Application.DoEvents();
Thread.Yield();
target.Invoke(callback, new object[] {item});
return item;
});
}
}
The first one just passes control to the main thread allowing standard COM and SendMessage
pumping. The second one allows to invoke any action into the main thread with item argument which is currently processed. You can put them anywhere in your query depending on how responsive your query should be. The example below passes control back to main after processing each file.
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.ProcessWindowsMessages()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word => word, (word, words) =>
new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
You can even use the second extension method to perform progress indication and stuff like that. In my example, I am showing every word processed in caption. Yes I know – this is highly inefficient but for demo purposes is ok. You can download my solution and play with position of the line. For instance, putting it beside file query will show files in caption and will have moderate responsiveness.
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.WithCallback(this, word => { this.Text = word; })
.GroupBy(word => word, (word, words) => new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
Negative Effects
Using one of my extension methods, you are creating a bottleneck bringing all of your parallel threads back into the main just to paint one word into form’s title bar. So the best solution would be to slowdown callbacks invoking into the main thread as much as possible. Let’s say you need to update a progress bar. It would be enough to do it 10 times in a second so put the timer and do not invoke again until appropriate time is elapsed after the last invoke. All other calls will just trivially return. Or you may decide to pass only every 1000th word to display on the screen.