Hi All,
I have a question regarding cancellation of tasks. Please read below for the explanation. Any suggestion would help me solve the problem.
I have 3 tasks. Lets call them task1, task2, & task3. Each of them are independent except for the data. i.e. task1 puts data into a queue1 and task2 reads it. task2 puts some other processed data into queue2 and task3 reads it. task3 does some final work. This chain should continue till task1 completes successfully. Upon completion of task1 and after processing their respective queues task2 and task3 should run into successful completion. If for any reason a task stops in-between (say due to an exception in the method the task is running), then the other two tasks should also stop immediately.
For successful completion I am using an integer variable. I increment it when task1 completes. The other 2 tasks read it and exit smoothly after emptying their own queues.
For erroneous exits I can use the same integer variable. But I am looking at a better solution so that code looks cleaner.
See the code below:
I have simulated error by throwing exception in Method2. After this error occurs, task2 stops, task1 runs to completion and task3 infinitely waits for more data in the queue thus blocking the application to finish.
Is there a way to notify other tasks when a task has faulted/completed?
E.g.
1. Notify task2 and task3 when task1 faults/completes
2. Notify task1 and task3 when task2 faults/completes
3. Notify task1 and task2 when task3 faults/completes
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Linq;
using System.Xml.XPath;
using System.Diagnostics;
namespace Designer.UpdateManager.Dependency
{
public class StringDependencyCalculator
{
#region Fields
private ConcurrentQueue<int> queue1 = new ConcurrentQueue<int>();
private ConcurrentQueue<string> queue2 = new ConcurrentQueue<string>();
private int foundAllDependencies = 0;
private int convertedAllDependencies = 0;
private int maxLimit;
#endregion
#region Constructor
public static void Main(string[] args)
{
if(args.Length != 1)
{
Console.WriteLine("Invalid no. of arguments");
Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
}
int maxLimit = 0;
if(int.TryParse(args[0], out maxLimit))
{
StringDependencyCalculator sdc = new StringDependencyCalculator();
sdc.CalculateDependency(maxLimit);
}
else
{
Console.WriteLine("Invalid no. of arguments");
Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
}
}
#endregion
#region Methods
public void CalculateDependency(int maxLimit)
{
this.maxLimit = maxLimit;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
try
{
Task[] tasks = new Task[]
{
Task.Factory.StartNew(() => Method1()),
Task.Factory.StartNew(() => Method2()),
Task.Factory.StartNew(() => Method3())
};
Task.WaitAll(tasks);
}
catch (AggregateException ex)
{
StringBuilder messageStringBuilder = new StringBuilder();
foreach (Exception exception in ex.InnerExceptions)
{
messageStringBuilder.AppendLine(exception.Message);
}
Console.WriteLine(messageStringBuilder.ToString());
}
finally
{
stopWatch.Stop();
Console.WriteLine("Elapsed time - " + (stopWatch.ElapsedMilliseconds / 1000) + "s");
stopWatch = null;
}
}
private void Method1()
{
Random r = new Random();
for (int i = 0; i < maxLimit; i++)
{
queue1.Enqueue(i * r.Next(100));
}
Interlocked.Increment(ref foundAllDependencies);
}
private void Method2()
{
for (int i = 0; i < maxLimit; i++)
{
int dequedItem = 0;
while (!queue1.TryDequeue(out dequedItem))
{
if (foundAllDependencies > 0 && !queue1.Any())
break;
}
queue2.Enqueue(dequedItem.ToString());
if(i == 10)
throw new InvalidDataException("10 is not a valid value");
}
Interlocked.Increment(ref convertedAllDependencies);
}
private void Method3()
{
for (int i = 0; i < maxLimit; i++)
{
string dequedItem = string.Empty;
while (!queue2.TryDequeue(out dequedItem))
{
if (foundAllDependencies > 0 && convertedAllDependencies > 0 && !queue2.Any())
break;
}
if(string.IsNullOrEmpty(dequedItem))
continue;
Console.WriteLine(dequedItem);
}
}
#endregion
}
}