Introduction
There are some common patterns when building batch processes
- Instance concurrency control - for example, singleton processes
- Partitioning data into smaller batches
- Recovering from failures by continuing processing from where the failed instance left off
- Maintaining/logging the state of instances and the state of the data that gets processed
- Alerts when things go wrong
Taskling is a set of two libraries that provide these patterns with SQL Server as a backing store. It is host agnostic and can be used in web applications, Azure jobs, console applications etc.
We'll look at each pattern with some example code with Taskling.
Background
In order to describe how you can use the patterns described in this article with Taskling we'll need to start-off by talking about configuration and how to instantiate the ITasklingClient
.
Much of the behaviour is controlled by configuration and with Taskling you must create a class that implements the IConfigurationReader
which simply returns a string with a series of key value pairs in the format KEY[value]. We'll discuss each setting in the patterns below.
public class MyConfigReader : IConfigurationReader
{
public string GetTaskConfigurationString(string applicationName, string taskName)
{
var key = applicationName + "::" + taskName;
return ConfigurationManager.AppSettings.Get(key);
}
}
All tasks are uniquely identified by an application name and a task name. In the above example it gets the configuration string from the application config.
<appSettings>
<add key="MyApplication::MyTask" value="DB[Server=(local);Database=MyAppDb;Trusted_Connection=True;] TO[120] E[true] CON[-1] KPLT[2] KPDT[40] MCI[1] KA[true] KAINT[1] KADT[10] TPDT[0] RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] MXBL[20]"/>
</appSettings>
We'll look at the settings not related to the patterns
- DB is the connection string
- TO is the command timeout in seconds
- E is Enabled 1, Disabled 0
We'll go over the meaning of the rest of settings below.
We instantiate the ITasklingClient
by passing your configuration reader implementation to its constructor.
ITasklingClient tasklingClient = new TasklingClient(new MyConfigReader());
Or via dependency injection (AutoFac example)
builder.Register<TasklingClient>(x => new TasklingClient(new MyConfigReader())).As<ITasklingClient>();
Using the code
In the main class where your batch processing lives we'll need to instantiate a new ITaskExecutionContext
which will be responsible for doing all the state mangement, logging and creating child contexts for partitioning data into blocks.
using (var executionContext = _tasklingClient.CreateTaskExecutionContext("MyApplication", "MyTask"))
{
if(executionContext.TryStart())
{
}
}
We're now ready to look at the patterns.
Pattern #1 - Instance Concurrency Limits such as Singletons
Some batch tasks need to be singletons. If you run your task every hour but it can take more than an hour to run then you could end up with two executions running.
As well as singletons it can be useful to limit the number of concurrent executions to prevent the overloading of other components. May be you have large amounts of data to process and so you run the task every minute and each execution takes ten minutes, you'll have ten concurrent executions. When a component in your architecture (web service, database etc) cannot handle the load of ten concurrent executions then you can put a limit of 5 for example. When components show signs of being overloaded you can simply reduce the concurrency limit in real-time and then increase it again later.
Taskling concurrency limits work across servers. The settings key CON sets the limit. -1 means no limit, anything above that will be the limit. So for a singleton set CON[1].
We instantiate the ITaskExecutionContext
and call it's TryStart
. TryStart()
will return false if the limit has already been reached. Any further calls to that context will fail, so we wrap the call in an if statement.
if(executionContext.TryStart())
{
}
How Taskling guarantees concurrency control is an interesting subject in itself. Taskling leverages row locking and blocking in SQL Server to create a single-threaded scenario in a multiple threaded and even multiple server environment.
Pattern #2 - Partitioning of Data into Smaller Batches
Taskling can partition data into four types of block
- Range Blocks
- Date Range Blocks
- Numeric Range Blocks
- List Blocks
- Object Blocks
What is common to all is that we need isolation between blocks, that is to say, no data overlap. Taskling guarantees block isolation if
- You wrap your block creation logic in a Critical Section
- You don't pass duplicate data to Taskling
You can create sections of code that are guaranteed to be single-threaded, even across servers, by wrapping the code as follows:
using (var cs = taskExecutionContext.CreateCriticalSection())
{
if(cs.TryStart())
{
}
}
The Taskling critical section uses the same method of concurrency control as the main task concurrency control. It will wait for 20 seconds with 2 retries and if it still cannot get in then TryStart
will return false; This wait time can be changed in one of the overloads.
Range Blocks with a Date Range Block Example
Range blocks store no data, just a date or numeric range. This range can then be used to retrieve data and process it.
Some processes continually process data between dates. Each time the batch process runs it might check up to which date has been processed and then use that date as a From Date and the current time as a To Date.
- With Taskling you can get the end date of the last block
- Uses a critical section to guarantee block isolation for tasks where the concurrency limit is higher than 1
private void RunTask(ITaskExecutionContext taskExecutionContext)
{
try
{
var dateRangeBlocks = GetDateRangeBlocks(taskExecutionContext);
foreach (var block in dateRangeBlocks)
ProcessBlock(block);
taskExecutionContext.Complete();
}
catch(Exception ex)
{
taskExecutionContext.Error(ex.ToString(), true);
}
}
private IList<IDateRangeBlockContext> GetDateRangeBlocks(ITaskExecutionContext taskExecutionContext)
{
using (var cs = taskExecutionContext.CreateCriticalSection())
{
if(cs.TryStart())
{
var startDate = GetDateRangeStartDate(taskExecutionContext);
var endDate = DateTime.UtcNow;
return taskExecutionContext.GetDateRangeBlocks(x => x.WithRange(startDate, endDate, TimeSpan.FromMinutes(30)));
}
throw new Exception("Could not acquire a critical section, aborted task");
}
}
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
var lastBlock = taskExecutionContext.GetLastDateRangeBlock(LastBlockOrder.LastCreated);
if (lastBlock == null)
return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
else
return lastBlock.EndDate;
}
In the above example Taskling takes a date range and a TimeSpan for the maximum block size and returns a list of date range blocks (IDateRangeBlockContext
). So if the date range covered 24 hours and we specified a maximum block size of TimeSpan.FromHours(1)
we'd get 24 blocks.
Once we have the blocks we then process each one. In this example we are retrieving journeys between the dates of each block, calculating travel insights and persisting them.
Start()
is called before processing begins Complete()
is called once processing is finished. You can optionally pass the number of records processed as an argument - The start and end dates of the block are used to retrieve the journeys from the database
- All code is wrapped in a try catch block, in the catch
Failed(string errorMessage)
is called and the exception is not thrown again in order to allow subsequent blocks to execute
private void ProcessBlock(IDateRangeBlockContext blockContext)
{
try
{
blockContext.Start();
var travelDataItems = _travelDataService.GetJourneys(blockContext.DateRangeBlock.StartDate, blockContext.DateRangeBlock.EndDate);
var travelInsights = new List<TravelInsight>();
_travelInsightsService.Add(travelInsights);
int itemCountProcessed = travelInsights.Count;
blockContext.Complete(itemCountProcessed);
}
catch(Exception ex)
{
blockContext.Failed(ex.ToString());
}
}
Numeric blocks are basically the same. You pass Taskling a start number, an end number and a maximum block size.
int maxBlockSize = 500;
return taskExecutionContext.GetNumericRangeBlocks(x => x.WithRange(startNumber, endNumber, maxBlockSize));
So if we pass the number 1 and 1000 with a maximum block size of 100 then 10 INumericBlockContexts
will be returned which you can then use to process the data between those ranges.
You can see example code of numeric blocks in the following link
https://github.com/Vanlightly/Taskling.NET/wiki/Task-with-Numeric-Range-Blocks-Example-Code
List Blocks
List blocks actually store the data in SQL Server as JSON. There are two list block contexts
IListBlockContext<TItem>
IListBlockContext<TItem,THeader>
TItem is a generic type that will be the type of the list items. THeader
is the generic type that can store data related to the block.
Let's look at an example of generating IListBlockContext<TItem,THeader>
blocks. In this example we retrive data by date range but store the data to be processed in list blocks.
public class BatchDatesHeader
{
public DateTime FromDate { get; set; }
public DateTime ToDate { get; set; }
}
public class Journey
{
public long JourneyId { get; set; }
public string DepartureStation { get; set; }
public string ArrivalStation { get; set; }
public DateTime TravelDate { get; set; }
public string PassengerName { get; set; }
}
BatchDatesHeader
will be our header class and Journey will be our list item class.
Note that ItemStatus.Pending, ItemStatus.Failed
is related to recovery from failure and reprocessing previously failed blocks. We'll look at that in more detail in the next pattern.
In this example we retrieve all the journeys since the last time the job ran, partition them into list blocks and then process each list block. For each list block, we'll process individually each journey by extracting a travel insight and notifying the user of that insight.
private void RunTask(ITaskExecutionContext taskExecutionContext)
{
try
{
var listBlocks = GetListBlocks(taskExecutionContext);
foreach (var block in listBlocks)
ProcessBlock(block);
taskExecutionContext.Complete();
}
catch (Exception ex)
{
taskExecutionContext.Error(ex.ToString(), true);
}
}
private IList<IListBlockContext<Journey, BatchDatesHeader>> GetListBlocks(ITaskExecutionContext taskExecutionContext)
{
using (var cs = taskExecutionContext.CreateCriticalSection())
{
if (cs.TryStart())
{
var startDate = GetDateRangeStartDate(taskExecutionContext);
var endDate = DateTime.UtcNow;
var journeys = _travelDataService.GetJourneys(startDate, endDate).ToList();
var batchHeader = new BatchDatesHeader()
{
FromDate = startDate,
ToDate = endDate
};
short blockSize = 500;
return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));
}
throw new Exception("Could not acquire a critical section, aborted task");
}
}
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
var lastBlock = taskExecutionContext.GetLastListBlock<Journey, BatchDatesHeader>();
if (lastBlock == null)
return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
else
return lastBlock.Header.ToDate;
}
private void ProcessBlock(IListBlockContext<Journey, BatchDatesHeader> blockContext)
{
try
{
blockContext.Start();
foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed))
ProcessJourney(journeyItem);
blockContext.Complete();
}
catch (Exception ex)
{
blockContext.Failed(ex.ToString());
}
}
private void ProcessJourney(IListBlockItem<Journey> journeyItem)
{
try
{
if (journeyItem.Value.DepartureStation.Equals(journeyItem.Value.ArrivalStation))
{
journeyItem.Discarded("Discarded due to distance rule");
}
else
{
var insight = ExtractInsight(journeyItem.Value);
_notificationService.NotifyUser(insight);
journeyItem.Completed();
}
}
catch(Exception ex)
{
journeyItem.Failed(ex.ToString());
}
}
private TravelInsight ExtractInsight(Journey jounery)
{
TravelInsight insight = new TravelInsight();
return insight;
}
Some interesting things to note are
- Each list item has it's own status.
- For any data items that need to be ignored due to some business rule, you can mark them as discarded with a reason attached.
- If your data is too big to store in blocks you can store data identifiers in the list blocks instead and then retrieve the data while processing each item.
Pattern #3 - Failure Recovery
Sometimes an application can fail, an Azure job die or an ASP.NET/WCF process get recycled. Or a bug can cause the batch process to fail and stop. Taskling provides a way of continuing the processing from where it left off.
When you ask the ITaskExecutionContext
for blocks, be it date range, list or whatever, it will return the data you pass into it as blocks. You can configure taskling to also return previously failed blocks.
If you configure your ITasklingClient with
- RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] you are telling Taskling to look for failed blocks that were created in the last 600 minutes, and have not been retried more than 3 times.
- RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] you are telling Taskling to look for dead blocks that were created in the last 600 minutes, and have not been retried more than 3 times.
Then when you call GetListBlocks, it will return the new and the old blocks in one list. But what is a dead block you ask? A dead task/block is one that had a catastrophic failure such that it was unable to register its demise. For example, someone pulls the power cord from the server or IIS kills your thread with a ThreadAbortException. In this case the status is still "In Progress", or for the blocks that have been created but not started then their status will remain "Pending" even though it has died.
Taskling uses a keep alive (heartbeat) to register the fact that it is still alive. Once a configured time has passed since the last keep alive has passed and the status is still In progress or Pending then Taskling knows that it has really died.
To configure the keep alive we set KA[true] KAINT[1] KADT[10] which means use the keep alive, send one every 1 minute and treat it as dead after 10 minutes without any keep alive being received.
Let's use the last example to illustrate how this works. In the last example we create a bunch of list blocks that contain journeys.
When reprocessing of failed and dead blocks is enabled then the following call may return previously failed blocks.
return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));
If you don't want to send out that user notification again for the journeys that were processed successfully, then when you are iterating over the items we only ask for the Pending and Failed ones. When we process a block it could be new data or an old block that failed. If it is new then all items will be in ther Pending state anyway.
foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed))
ProcessJourney(journeyItem);
Once a block has reached the retry limit or its creation date is more than the 600 minutes configured in this example then it will not be retried again.
Pattern #4 - Alerts
All data in Taskling is stored in seven tables that you can deploy to a central database server or deploy to each application database that uses Taskling. Because Taskling maintains all state information, blocks data and the configuration used for each task execution we can create SQL queries that can be used in real-time alerts.
Examples are
- Failed/Dead blocks that reach their retry limit
- Tasks that fail
- Tasks that fail X times in Y minutes
- Tasks that fail X times consecutively
- Tasks have no run in the past X minutes
There is example SQL on the GitHub wiki pages
https://github.com/Vanlightly/Taskling.NET/wiki/Alerts-Database-Scripts
Conclusion
Taskling is a useful library for implementing common batch processing patterns. It is open source. You just need to include the Taskling and Taskling.SqlServer nuget packages and run the table creation script and you're ready.
For more reading material check out the GitHub wiki on the GitHub page
https://github.com/Vanlightly/Taskling.NET
Sample Code
The TasklingTester solution accompanies this article and contains the source code we've covered. It also includes
- the SQL scripts necessary to generate the Taskling tables and the Journey and TravelInsight tables
- a data generator script that simulates new data coming in
- some useful queries to view the Taskling data that was generated
Read the readme file in the solution first for instructions. Basically all you need to do is run one script and then run the application and you can see it work.