Introduction
There are many scenarios that requires batch processing. Processing a bulk order of some product or processing a bulk request for doing something. This article will not foucs on performance effective processing but provides a simple process implementation. A process can be defined as a workflow with many small tasks. I will describe a simple workflow Cooking process and how I applied Task class following chain of responsibility and strategy design patterns.
Background
I have worked on a windows service that handles a batch of install order requests. Before I learnt about the C# Task class I created some Abstract classes on my own for Process and Task and then used Thread and ThreadPool classes to execute my tasks. (I will try to post my codes based on this old design when I get a chance.) When I came across the article Task Parallel Library: 1 of n series by Sacha Barber, I got interest with Task class and started using it by following some of the design patterns which I am presenting here.
Problem statement
Let us take "Cooking Process" as an example work flow to implement. We all know cooking process involves several tasks like "Buying vegetables, Cutting them, Cleaning vessels, Preparing ingredients, cooking etc. Some of these tasks are dependent on others, for example Cutting vegetables needs vegetables to be bought. Some tasks can happen in parallel like Preparing ingrdients and buying vegetables, they are not dependant on each other. Each of these tasks transform or process the input to some output. Like any factory that process raw material to produce some product. For example, Cleaning vessels going to transfrom dirty vessels to clean. Cut vegetable will give fine chopped vegetables for easy cooking. As each of these taks has its own responsibility on inputs like ingredients, vegetables, vessels etc. These tasks on its own does not produce any dish. we have to tie up all these tasks as a chain of responsibility to produce the dish.
Everyone will be happy to have different dishes everyday. So our cooking process should not create same dish all the time. We need to have different recipe like Ratatouille (I never had this), Aloo Gobi (yes and I like it) etc. So we have to buy different vegetables for differnt recipe and thus buying vegetables task needs a N number of algorithms for N recipe. Same is for Cleaning vessels or preparing spices. We need to follow some good strategical algorithms not only to solve the problem, but to have the declicious dishes.
Chain of responsibility and Strategy Pattern
Chain of Responsibility passes a request along a chain of handlers. Buying task can be considered as such a handler whose responsibility is to buy required vegetables for given recipe. Vegetable is passed to next handler in the chain whose responsibility is to cut them. So forming this chain is important to make a workflow. This way client(Hungry people) need not bother how the dishes are made. All they do is ask and wait for dish.
System.Threading.Tasks namespace provides a good framework to create tasks and execute them in many different ways. And this library itself takes care of creating threads from Thread pool for us. Also gives us option to link the tasks one by one which is required for our cooking process.
Buying Vegetables, preparing ingrdients and cleaning vessels can be done asynchronously. So we can use Task.Factory.startNew() or Task.Run() that creates a task to execute asynchronously.
Task<string[]> buyVegiesTask = Task.Run(() => buyVegies(input.recipe));
Task<string[]> cleanVesselsTask = Task.Run(() => cleanVessels(input.recipe));
Task<string[]> prepareIngredientsTask = Task.Run(() => prepareIngredients(input.recipe));
buyVegies, cleanVessels and prepareMassala are static methods that accepts Recipe as parameter. These static methods that operates on its parameter is good for thread safe work. And these are like utility methods created inside CookingProcess class.
Now we need to cut the vegetable that has been bought which is a chain task next to Buying Vegetable.
Task<String> cutVegeTask = buyVegiesTask.ContinueWith<string>(t =>
{
input.Vegetables = t.Result;
string[] vegetables = new string[input.Vegetables.Length];
input.Vegetables.CopyTo(vegetables, 0);
return Retry(() =>
{
return cutVegies(vegetables, input.recipe);
}, 3);
},TaskContinuationOptions.OnlyOnRanToCompletion);
Make a copy of Vegetables for cut task, to keep the result of buyVegiesTask safe. This is not required if you decide that vegetables are only used for cut task and it is no more important for other tasks.
buyVegiesTask.ContinueWith will ensure that cut happens only after Buy. Enum TaskContinuationOptions provides multiple options for chaining like OnlyOnCanceled, OnlyOnFaulted, NotOnCanceled, NotOnFaulted etc. Out of these options we need to execute Cut only when the buy vegetables completes. So OnlyOnRanToCompletion has been used.
Cooking task has to wait for all tasks like Buying Vegetable, Cutting them, Preparing Ingredients. Task.WaitAll can be used to make the cooking task wait till other tasks completes.
Task.WaitAll(new Task[] { buyVegiesTask, cleanVesselsTask, prepareIngredientsTask, cutVegeTask });
When cooking is completed then decorating the dish and serving them can be done in a chain.
Task<string[]> decorateTask = cookTask.ContinueWith((t) => { return decorate(input.recipe);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Task<string> serveTask = decorateTask.ContinueWith((t) => {
...
return serve(description, input.recipe);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Now to satisfy many people with differnt dishes we may need to cook more than one recipe. So our tasks like Buy vegetables, Cut vegetables, Cleaning Vessels etc will follow different algorithms according to recipe. For example potatoe recipe may need vegetables like Potatoe, onion etc to be bought. Where as a Brocoli recipe may need Brocoli to be bought. We need to collect more algorthims for each recipe for each task.
Define the Recipe as Enum in a Recipes class.
public enum Recipe
{
Ratatouille = 0,
Aloo_gobi = 1
}
Create a dictionary (this may be a cook book) for each task that holds different ways to buy, to cut, to prepare for each recipe.
public delegate string[] PrepareMasala();
public static Dictionary<Recipe, PrepareMasala> PrepareSpiceIngredientsAlgorithms = new Dictionary<Recipe, PrepareMasala>();
public delegate string[] buyVegie();
public static Dictionary<Recipe, buyVegie> buyVegieAlgorithms = new Dictionary<Recipe, buyVegie>();
public delegate string[] cleanVessel();
public static Dictionary<Recipe, cleanVessel> cleanVesselAlgorithms = new Dictionary<Recipe, cleanVessel>();
public delegate string[] decorate();
public static Dictionary<Recipe, decorate> decorateAlgorithms = new Dictionary<Recipe, decorate>();
This is an open dictionary that accepts any new invention of recipe. The idea is based on strategy pattern to select a buy or decorate algorithm at run time for the given recipe.
Recipes.buyVegieAlgorithms.Add(Recipe.Ratatouille, () => new string[4] { "Ginger", "Onion", "zucchini", "eggplant" });
Recipes.buyVegieAlgorithms.Add(Recipe.Aloo_gobi, () => new string[4] { "Potatoes", "Cauliflower", "Onion", "Ginger" });
buyVegies task method selects the algorithm based on the recipe input.
private static string[] buyVegies(Recipe recipe)
{
string[] vegetables = Recipes.buyVegieAlgorithms[recipe]();
...
}
Retry and Handling Exception
Cut Vegetable task is implemented with Retry mechanism. Consider some one cut their finger and that is manageble to retry the task they are doing. For this case a TaskRetryableException is raised which provides a way to log the message and also retry the task. I have defined a boolean variable to Simulate this exception scenario. Also please note that Ratatouille is hard coded only for testing exception handling.
static bool simulate_exception = true;
private static string cutVegies(string[] vegetables, Recipe recipe)
{ ...
if (recipe == Recipe.Ratatouille && simulate_exception)
{
...
TaskRetryableException exception = new TaskRetryableException(badMsg + "... But I can manage");
simulate_exception = false;
throw exception;
}
}
Consider some one cut their fingers badly and they cannot perform the task. For this case a TaskException is raised.
Retry is like a delegate method to the actual task method. Retry checks the retry counter for maximum attempts and delegates the call to actual method.
public static T Retry<T>(Func<T> actualWork, int retryCount)
{
int maxAllowedRetry = retryCount;
while (true)
{
try
{
return actualWork();
}
catch (TaskRetryableException e)
{
if (retryCount == 0 )
{
throw new TaskException ( "Maximum count of retry attempts reached " );
}
...
From Panagiotis Kanavos's excellent answer in http://stackoverflow.com/a/10494424/11635
All the exceptions that occurs during the tasks execution are added to InnerExceptions property of AggrerateException. So the TaskException will also be added to AggregateException. Catch block for handling this exception is shown in the below code snippet.
Test the Kitchen
CookingInput class as the name says it is the input for each task in cooking process. This will store Recipe, Vegetables array, Vessels array, Ingrdients array etc.
An array of CookingInput has been created to do more than one cook job as a batch. Each input element in the array represents the cooking requests. Create a task for each input elements to start cooking parallely. So if there are 2 inputs in the array then 2 tasks are created to work parallely.
CookingInput[] inputs = new CookingInput[2];
inputs[0] = new CookingInput(Recipe.Aloo_gobi);
inputs[1] = new CookingInput(Recipe.Ratatouille);
...
int count = inputs.Count(s => s != null);
...
Task[] cookingTasks = new Task[count];
try
{
for (int i=0; i< count; i++)
{
int index = i;
cookingTasks[i] = Task.Run(() => CookingProcess.start(inputs[index]));
}
Task.WaitAll(cookingTasks);
}
catch (AggregateException ae)
{
Console.WriteLine();
Console.WriteLine("Base exception: " + ae.GetBaseException());
} ...
GetbaseException() method of AggregateException provides the actual exception that caused the AggregateExceptions. So we get TaskException from GetBaseException() method.
Please note that there are both synchronously and asynchronously executed tasks are defined. Though CookingInput is thread safe inside the task method but outside of a task we need to take care of the state as many threads may operates on this input.
Sample Output
Async and await
async and await is a cool feature that saves lot of code, threads and time. Below snippet shows the rewriting of CookingProcess start method. Please check KitchenAsync.zip for the same implementation using async and await.
Task<string[]> buyVegTask = buyVegiesAsync(input.recipe);
Task<string[]> cleanVesselTask = cleanVesselsAsync(input.recipe);
Task<string[]> prepareIngredientsTask = prepareIngredientsAsync(input.recipe);
string[] vegetables = await buyVegTask;
input.Vegetables = new string[vegetables.Length];
vegetables.CopyTo(input.Vegetables, 0);
Task<string> cutVegeTask = RetryAsync(()=> cutVegiesAsync(vegetables, input.recipe),3);
await Task.WhenAll(buyVegTask, cleanVesselTask, prepareIngredientsTask, cutVegeTask);
input.ingrdients = prepareIngredientsTask.Result;
input.Vessels = prepareIngredientsTask.Result;
Task msg1 = Task.Run(() => giveMessageToHungryFolks(cutVegeTask.Result));
string result = await cookAsync(input.recipe);
Task msg2 = Task.Run(() => giveMessageToHungryFolks(result));
input.decorates = await decorateAsync(input.recipe);
string description = String.Format("The plate of cooked [{0}] in a mix with good flavor of [{1}], decorated with [{2}] ",
string.Join(",", input.Vegetables), string.Join(",", input.ingrdients), string.Join(",", input.decorates));
input.dish = await serveAsync(description, input.recipe);
Note that Task.Run is used for message task (giveMessageToHungryFolks). This is because Message task do not have to wait for any response or results.
Thank you George for your comments which made me to work on this topic.
Task Vs async/await
Stopwatch has been used to compare the execution time of both Task and Async implementation. Executing both for 10 times and taking average on execution time, I could see 2.5 seconds saved by Async way.
References
https://www.codeproject.com/Articles/152765/Task-Parallel-Library-of-n by Sacha Barber
http://stackoverflow.com/questions/10490307/retry-a-task-multiple-times-based-on-user-input-in-case-of-an-exception-in-task/10494424#10494424