Challenge
Just had the first chance to apply PLINQ (Parallel LINQ) to the real task.
When I am playing around with some new technology everything works fine, the real understanding comes along with challenges in a real use case. Currently, I am working on an application which processes large amount of text data gathering statistics on word occurrences (see: Source Code Word Cloud).
Here what is the simplified core of my code doing.
- Enumerate through all files with *.txt extension.
- Enumerate through words in each text files.
- Group by word and count occurrences.
- Sort by occurrences.
- Output top 20.
Everything worked fine with LINQ. I wrote a prototype console application with a single LINQ query. … and ta-daa! Just adding one line turned it into PLINQ.
public static void Main(string[] args) {
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word =>; word, (word, words) =>;
new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair =>; occurrencesWordPair.Item1)
.Take(20);
foreach (Tuple<int, string> tuple in result) {
Console.WriteLine(tuple);
}
Console.Write("Press any key to quit."); Console.ReadKey();
}
private static IEnumerable<string> ReadWords(string line) {
StringBuilder word = new StringBuilder();
foreach (char ch in line) {
if (char.IsLetter(ch)) { word.Append(ch); }
else {
if (word.Length == 0) continue;
yield return word.ToString();
word.Clear();
}
}
}
Finally, all four cores of my PC and the fan where busy processing dozens of megabytes of free eBooks in my temp folder I have prepared for testing. Moving to PLINQ brought me significant performance boost. But adopting this one line into the WinForms application was a different challenge.
Frustration
… cancelability and responsiveness during long running queries were lost.
It seems that the OrderBy
query was synchronizing data back into main thread and windows messages were not processed. In the example below, I am demonstrating my implementation of cancelation according to MSDN How to: Cancel a PLINQ Query which does not work.
public Form1()
{
InitializeComponent();
m_CancellationTokenSource = new CancellationTokenSource();
}
private readonly CancellationTokenSource m_CancellationTokenSource;
private void buttonStart_Click(object sender, EventArgs e)
{
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word => word, (word, words) =>
new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
try {
foreach (Tuple<int, string> tuple in result) {
Console.WriteLine(tuple);
}
}
catch (OperationCanceledException ex) {
Console.WriteLine(ex.Message);
}
}
private void buttonCancel_Click(object sender, EventArgs e)
{
m_CancellationTokenSource.Cancel();
}
private static IEnumerable<string> ReadWords(string line) { ... }
I was not able to click Cancel button, move the form around, do anything at all until the whole query was finished. That’s was not the perfect cancelation behavior – “Yes, you can cancel it but you cannot click button which does that”. I have asked the question to colleagues and posted at stackoverflow. Everyone gave me the same answer – call the query in another thread.
That was not an option form me, if I wanted to clutter my code with threading stuff, I would do it without PLINQ. PLINQ allows you to keep your logic free from parallelism / synchronization overhead keeping your code readable and closer to business not to techniques. (See proposed solutions at stackoverflow.)
The Ahaa Moment
While waiting for THE expert’s opinion from somewhere in cyberspace, I came up with my own which I like pretty much. I wrote these two extension methods:
public static class CallbackExtensions
{
public static ParallelQuery<TSource>
ProcessWindowsMessages<TSource>(this ParallelQuery<TSource> source)
{
return source.Select(
item =>
{
Application.DoEvents();
Thread.Yield();
return item;
});
}
public static ParallelQuery<TSource> WithCallback<TSource>
(this ParallelQuery<TSource> source, ISynchronizeInvoke target,
Action<TSource> callback)
{
return source.Select(
item =>
{
Application.DoEvents();
Thread.Yield();
target.Invoke(callback, new object[] {item});
return item;
});
}
}
The first one just passes control to the main thread allowing standard COM and SendMessage
pumping. The second one allows to invoke any action into the main thread with item argument which is currently processed. You can put them anywhere in your query depending on how responsive your query should be. The example below passes control back to main after processing each file.
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.ProcessWindowsMessages()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.GroupBy(word => word, (word, words) => new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
You can even use the second extension method to perform progress indication and stuff like that. In my example, I am showing every word processed in caption. Yes I know – this is highly inefficient but for demo purposes is ok. You can download my solution and play with position of the line. For instance, putting it beside file query will show files in caption and will have moderate responsiveness.
var result = Directory
.EnumerateFiles(@"c:\temp", "*.txt", SearchOption.AllDirectories)
.AsParallel()
.WithCancellation(m_CancellationTokenSource.Token)
.SelectMany(File.ReadLines)
.SelectMany(ReadWords)
.WithCallback(this, word => { this.Text = word; })
.GroupBy(word => word, (word, words) => new Tuple<int, string>(words.Count(), word))
.OrderByDescending(occurrencesWordPair => occurrencesWordPair.Item1)
.Take(20);
Negative Effects
Using one of my extension methods, you are create a bottleneck bringing all of your parallel threads back into the main just to paint one word into form’s title bar. So the best solution would be to slowdown callbacks invoking into the main thread as much as possible. Let’s say you need to update a progress bar. It would be enough to do it 10 times in a second so put the timer and do not invoke again until the appropriate time is elapsed after the last invoke. All other calls will just trivially return. Or you may decide to pass only every 1000-Th word to display on the screen.