For processing continuous data input, RAM and CPU utilization has to be optimized. If there are multiple threads collecting and submitting data for processing, then you have two options from there. One is to create equal amount of input threads for processing data or store the input data in memory and process it one by one. Creating large number of threads chokes up the CPU and holding everything in memory exhausts the RAM. We need a balanced solution.
Introduction
If your data is intermittent (non-continuous), then we can leverage the time span gaps to optimize CPU\RAM utilization. The idea is to process the data before the next batch of data arrives. Let’s say that you receive N number of input data every T second with each data is of d size and one data requires P seconds to process. With a single thread, the Total output time needed will be N x P seconds. If N x P < T , then there is no issue anyway you program it. However, if N x P > T, then you need multiple threads, i.e., when time needed to process the input is greater than time between two consecutive batches of data. If we introduce another variable for multiple threads, then our problem simplifies to [ (N x P) / c ] < T.
Next constraint is how many threads you can create? That limits the factor c. If c is too high, then it would consume lot of CPU. Here, we bring in RAM utilization. As and when data comes in, we first store it in memory and then use c threads to process it. Hence, at any time, there will be c active threads and N-c pending items in queue. Let us say r number of batches which can be in memory, one batch can be processed by c threads at a time. One batch size is c x d. Now we can boil it down to:
- [ (N x P) / (r x c) ] < T
- r = Affordable RAM / (c x d)
Background
This scenario is applicable mostly for polling-based systems when you collect data at a specific frequency. Hence, the assumption is that data flow is intermittent and happens in interval. You can leverage the time gaps between data collection to optimally utilize CPU and RAM.
Using the Code
We need an investigative approach to data processing as one size does not fit all. Many parameters like N, d and P are not known beforehand. Hence, we need the design to also supply statistical information so that we can know about N, d and P and adjust CPU and RAM demands accordingly.
As a rough guideline, we need a way to ingest all data submitted via threads. Then, either start processing them immediately or line them up in a queue and process them in multiple threads.
C# provides blocking and bounding capabilities for thread-safe collections. This is an interesting feature which can be used to optimize CPU and Memory for high workload applications. This pattern can be further stacked and interconnected to build directed graphs of data routing. This pattern is used extensively in Apache Nifi Processors.
Before diving further into pattern, let us understand what is bounding and blocking. What problems do they solve?
When there are multiple threads trying to take data from a container, we want the threads to block till more data is available. This is called as “blocking”.
When multiple threads are writing data, we want them to bound until some memory is free to accommodate new data. This is called as “bounding”.
Hence, we can use a blocking collection as the underlying data container.
BlockingCollection DataContainer = new BlockingCollection<string>(
new ConcurrentBag<string>(),
this.MaxContainerSize);
For thread pool, you can use .NET framework built in thread pool but I am using simple array of threads for the sake of simplicity. In fact, I don’t tend towards someone else “managing my threads” 😊.
Thread[] Workers = new Thread[this.MaxWorkerThreads];
for (int i = 0; i < Workers.Length; i++)
{
Thread newThread = new Thread(new ParameterizedThreadStart(ThreadFunction));
Workers[i] = newThread;
}
Each of these threads are using a function to block till new data arrives. Here is a basic skeleton of this function.
private void ThreadFunction(object threadContext)
{
CancellationToken token = (CancellationToken)threadContext;
while (!token.IsCancellationRequested)
{
string Data = DataContainer.Take();
ProcessData(Data);
}
}
And the container provides the capability to block incoming threads for adding new data to the container.
public void Add(string data)
{
DataContainer.Add(data);
}
That’s the simple recipe.
Points of Interest
Now to optimize and adjust RAM and CPU utilization, you need to adjust MaxWorkerThreads
and MaxContainerSize
. We need to collect a few statistics to understand the data flow pattern.
- Rate of input or how much data comes per second?
- Rate of output or how much data is processed per second?
- Average active threads
- Average container size
These metrics help in the following way:
- If Input Rate > Output rate, then container size will either grow forever or there will be increasing blocking threads at input, but will crash the program.
- So Input Rate < Output Rate
- Average active threads, if active threads are mostly at maximum limit but container size is near zero then you can optimize CPU by using some RAM.
- Average container size is always at max limit, then more CPU threads will have to be created.
History
- 29th June, 2020 - First version published
- 30th June, 2020 - Formatting changes applied