Introduction
Definition - Conflate: to combine into a composite whole.
When designing applications that process fast changing real time data it is necessary to conflate the real time data when it
has to be consumed by slower modules of the system. For example, a GUI application that displays real time market data for stocks cannot and need not
consume each update of market data. The data for a stock symbol can update several times in a second as the stock trades on various exchanges. A user
observing the data on a display screen cannot possibly react to data that changes so rapidly. In addition, processing each and every tick of market data
for display purposes can overwhelm the slower display modules that may be displaying several symbols in a single window.
A design pattern to deal with this situation is described below. Essentially the producer accumulates the data by queuing or
conflating the data updates received over a period of time. The slower consumer periodically drains the queue. An important objective of the design is that the
producer should experience minimum contention with the slower consumer of the data.
The schematic diagram of the flow of data is shown in Figure 1.
Using the code
The implementation of the design pattern in C++ is listed in accompanying code. A quick and dirty tester program is included as well.
The design pattern is implemented by CConflationQManager
class.
The constructor takes
- A callback delegate (
IDQCallBack
) which is called whenever a data object is dequeue’d by the consumer thread. - A pointer to
CConflationHelper
object which helps with getting the unique key for a data object and optionally with conflating two data objects. - Conflation Interval at which the consumer should dequeue the conflated data.
The producer passes the update data to CConflationQManager
by calling
EnqForUpdate
method.
This method executes with synchronization mutex locked. It conflates or enqueues the data on one of the two queues.
The CConflationQManager
’s DeqFunction
method executes periodically. It locks the mutex for a very
short time just long enough to flip the queue used by EnqForUpdate
method and then drains
the “idle” queue without any contention with the producer. Assuming that the updates are created at a rapid clip, the probability that the
EnqForUpdate
method
will find the mutex locked is small.
It is assumed that
- Each data item can be identified by a unique key.
- The data update type will be one of Insert, Updated or Delete. A data item identified by a particular key can be deleted and subsequently re-inserted.
- The conflation occurs as below.
For any data item identified by a unique key
- Any update immediately following a Delete is expected to be of type Insert. All other updates will be of type “Update”.
- Multiple updates received during the sampling period will be conflated into a single update. An update of type “Update” received in
the same sampling period as an Insert will be conflated with the Insert data and passed to the downstream process by the consumer as an Insert.
- If an Delete is received during the same sampling period as a previous Insert then both updates will be discarded i.e. will not be passed to the consumer.
- If an Insert is received during the same sampling period as a previous Delete then both Delete and Insert updates will be individually passed to the consumer.
The tester program runs for two minutes. It calls the EnqForUpdate
method every 5 milliseconds. It applies a “Delete” every 10 seconds and applies
an “Insert” in the next update after delete. All other updates are of type “Update”.