Introduction
Sometimes it becomes necessary to bind the rate of messages/requests being sent out. This could be sending requests/messages over web, or sending to a different process, or even to a different thread within the same process space. Throttling helps to avoid heavy performance penalty induced due to heavy message rate at the peer end.
I faced a similar problem where the peer process actually drops connection if rate exceeds the limit. The peer application maintains a count of number of messages received in a logical time interval (as per its system clock) and resets the count after every fixed interval of time. Because synchronizing the system clocks of sending and receiving applications is not a trivial task, finding reference point to maintain throttle count at sender end becomes impossible without additional functionality from peer. To make our life easier peer process may send us a notification when it starts counting the first time but not always peer applications are implemented that way and we need to have a design to restrict out going message rate in our application without involvement of peer application.
Background
Because there is no reference point, maintaining a send count and resetting it after every fixed interval will definitely not work. One approach is to let wait all messages/requests for interval/(allowed messages per interval). This approach won't work for latency sensitive flows because messages are waiting even when the throttling limit is not breached and so is not efficient. Following is the approach taken here.
Suppose requirement is to restrict the rate to maximum of M sends in I interval of time. Compare the current time stamp (t_0) with the time stamp of Mth message sent before the current message under consideration (t_m). If the difference in time stamps is more than I, current message can be sent with zero wait, else it needs to wait for I - (t_0 - t_m).
long now = TimeProvider.getCurrentTimeInNano();
long lastSent = buffer.getValueAtCurrentIndex();
long delta = interval - (actual_now - lastSent);
Using the code
A circular buffer is used to maintain the time stamps of last M sends and achieve a fast lookup. The rest of implementation is within class
Throttler
. It can be used in non blocking mode (scheduleAsync
) or blocking mode. For non-blocking mode single thread scheduler is used to execute messages, blocking mode blocks the running thread for time as specified by the
Throttler.getTime
function.
I have done the time calculation in Nano-seconds but it can be easily converted to milisec/sec. Following is the
getTime()
function which computes the time for which message has to wait before sending.
private static long getTime()
{
long now = TimeProvider.getCurrentTimeInNano();
long lastSent = buffer.getValueAtCurrentIndex();
if(lastSent == init_val)
{
buffer.updateValueAtCurrIndx(now);
buffer.updateNextIndx();
return 0;
}
if(now-lastSent < interval)
{
long wait_time = interval - (now-lastSent);
buffer.updateValueAtCurrIndx(now+wait_time);
buffer.updateNextIndx();
return (wait_time);
}
buffer.updateValueAtCurrIndx(now);
buffer.updateNextIndx();
return 0;
}
This getTime
function is used by blocking call (scheduleBlocking()
) to make current thread wait for required amount of time before dispatching.
Non-blocking call (scheduleAsync()
) call implementation is a bit more tricky than blocking call. Because current thread always returns in almost no time and is ready to schedule next message, direct comparison of Mth last sent value received from Circular buffer with current time would be wrong. To tackle this problem, I have used a global_wait
variable to translate current time to "actual_now
" which can directly be compared with Mth last sent time stored in circular buffer. global_wait
is essentially non-negative (aggregation of all the wait times) - (aggregation of difference in time of two subsequent messages made available to be sent)).
long delta = interval - (actual_now - lastSent);
global_wait = global_wait + delta;
global_wait = global_wait - (now - previous);
if(global_wait < 0)
global_wait = 0;
I have used newSingleThreadScheduledExecutor
to schedule the tasks.
Throttler
is initialized as
Throttler.init(51, 1000000000);
where 51 is allowed number of sends in 1000000000 nano secs (1 sec).
Test
RandomThread
is created which essentially samples the the value of
Throttle.counter
after a duration of interval (I), starting from a randomly chosen reference point. In the main() for loop makes available messages with a random wait such that numbers of messages available are always more than throttle rate.
I have created DummyRunnable
which represents task to be executed by Executor in non-blocking call. It can be replaced by actual task to be executed in the application.
Points of Interest
Circular buffers are generally used in implementing efficient queues. This article shows one more efficient application of circular buffers.