Contents
Introduction
Real-time feed is a continuous flow of data that carries instantaneous information. That feed can be associated to one or several clients, and can be associated as a whole to all clients, or each client has its own parts (packets) of feed. We can put them in points to clarify our scope:
- Feed is distributed to one client.
- Feed is distributed to all clients.
- Feed is distributed between clients (each one takes his belonging data).
- The whole feed is distributed to all clients.
First one is the simplest case as we have only one client so we can store arrived data in a single queue, and manage it with simple queue management technique. Second one is our interest; the two sections (2-1, 2-2) are so different in handling. Number (2-1) means that the sender of the feed must include destination id
in each sent packet. A good example for that section is network router. Network routers receive packets from senders and route them to the right destination. Section (2-2) means the feed is forwarded to all clients, so no need for packets to include destination id. Section (2-2) is the scope of this article. The final goal of the article is to distribute real-time feed between multiple clients with different bandwidths as in the following figure:
Feed Distribution
Real time stream distribution is a critical issue that needs a perfect data structure to store and distribute real time feed efficiently. The system receives real-time feed and forwards it to all clients as in the following figure:
Actually, most systems don't forward feed directly to clients. They have two options:
- Store it in a temporary storage then forward it to all clients.
- Process feed (transfer-compress-encrypt) then store results in a temporary storage, then send results to all clients.
There are many reasons for saving the feed or its processing results in a temporary storage before sending to clients:
- First, to isolate the receiving operation from the sending or processing operations.
- Second, to enable sending data in different threads so as to balance send operation depending on clients' bandwidths.
So, we need to decide what is the most suitable data structure to store the feed? And how can we send to clients of different bandwidths? To answer this question needs to assign our goals from this storage:
- Fast data insertion and removal.
- Receiving data doesn't be blocked while sending data from the same storage for each client.
- Each Client has different offset in the storage depending on its bandwidth.
Circular Shared Queue
The best storage to achieve these goals is the pipe or the queue. Feed is received from one side and is sent from the other side; First In First Out (FIFO). A queue is a collection of entities linked in order. New entity is added to the tail of the queue and old entity is removed from the head terminal. So, queue is just a FIFO (First In First Out) data structure. The first added entity is the first manipulated one. Any Queue should keep its tail and head entities to handle addition and removal of them. Each entity in the queue points to the next entity.
So, the choice is suitable for our needs because:
- Insertion and removal is so fast with linked list structure.
- We isolate receiving data from sending it in a separate threads, because both are working on the tail node.
- The owner of the Shared Queue should keep an object for each client. This object is to keep a pointer to client
node
on the queue and the offset in that node. - So, we can manage the sending operation in many threads. Each thread sends to available clients. Each client object should pass its working node pointer and send offset to the Queue to send to each client depending on its location in the whole queue.
- Previous point is the key for the whole article as it is the solution for slow (low bandwidth) clients. As a result, fast clients are always working in the tail node and their send offset are synchronized with the "receive offset" of the tail node of the queue.
Shared Queue functions:
IsEmpty() | Checks if the queue is empty |
Enqueue() | Adds new node at queue tail if needed |
Dequeue() | Gets node to send its data |
Recv() | Receives new arrived data to tail node |
Send() | Sends data to one client |
MaxQueueLength | Max queue length |
Head | Points to head node of the queue |
Tail | Points to tail node of the queue |
In our case, we need some special cases to fulfill our need:
- In real situation clients, speeds are not equal. Clients have different bandwidths. Bandwidth affects the quantity that each client can receive instantaneously. Fast Clients are always ready to receive data. So, slow clients may delay the removal of head entity of the queue, in addition, the whole sending process. The distributor agent must handle this case in some way to make sure fast clients receive real time data on time, and slow clients receive real time data as bandwidth allows.
- Each queue node contains buffer of 32 K bytes to store received feed. So, the Enqueue() function doesn't add new node unless tail node receives its full 32 K bytes, then it adds a new node and adjusts the new tail.
#define MAX_QUEUE_LENGTH 8
#define MAX_NODE_LENGTH 32768
struct QNode {
QNode(int nSequence, int nLength) {
m_pBuffer = (char*)malloc(nLength);
Init(nSequence);
}
void Init(int nSequence) {
m_nTickCount = ::GetTickCount();
m_nSequence = nSequence;
m_nRecvOffset = 0;
m_pNext = NULL;
}
~QNode() {
free(m_pBuffer);
}
public:
int m_nSequence;
int m_nRecvOffset;
QNode* m_pNext;
char* m_pBuffer;
unsigned long m_nTickCount;
};
Enqueue
The standard usage of the function Enqueue
is to add new node at queue tail, but in our queue it does only that if the tail node is full of data. In other words, the Queue uses the Enqueue
function to get the working (not full) tail node. It also adds received buffer to queue node and increments "receive offset" (m_nRecvOffset
) with amount of received data. The Enqueue
function checks for tail "receive offset" to decide if it is full or not as in the following flow-chart figure:
QNode* Enqueue() {
QNode* pQE;
::EnterCriticalSection(&m_cs);
if(m_pTail == NULL)
pQE = m_pHead = m_pTail = new QNode(0, m_nNodeLength);
else if(m_pTail->m_nRecvOffset >= m_nNodeLength) {
...
pQE = m_pTail->m_pNext = new QNode(m_pTail->m_nSequence + 1, m_nNodeLength);
m_pTail = pQE;
}
else
pQE = m_pTail;
::LeaveCriticalSection(&m_cs);
return pQE;
}
We can modify the Enqueue
function to work as a circular queue.
Circular Queue
In circular queue the last node is connected back to the first node to make a circle, which is done each time I need to add a new node to receive more feed. So, Hear
and Tail
nodes are moved one node each time I need a new node, and the length between them is kept fixed, which represents queue length.
Remember, Receiving data is done from tail side. Also, for real-time clients, send
is done with client node pointer which is always the tail node for normal and fast clients. For slow non real-time clients, send
starts from Head
node and goes fast to tail node to work like real-time clients. Check Enqueue
code that represents circular queue movements:
QNode* Enqueue() {
QNode* pQE;
::EnterCriticalSection(&m_cs);
if(m_pTail == NULL)
pQE = m_pHead = m_pTail = new QNode(0, m_nNodeLength);
else if(m_pTail->m_nRecvOffset >= m_nNodeLength) {
if(m_pTail->m_nSequence+1 >= m_nMaxQueueLength) {
QNode* pNext = m_pHead->m_pNext;
pQE = m_pTail->m_pNext = m_pHead;
pQE->Init(m_pTail->m_nSequence + 1);
m_pHead = pNext;
}
else
pQE = m_pTail->m_pNext = new QNode(m_pTail->m_nSequence + 1, m_nNodeLength);
m_pTail = pQE;
}
else
pQE = m_pTail;
::LeaveCriticalSection(&m_cs);
return pQE;
}
Dequeue
The queue uses its protected
function Dequeue()
to retrieve an node to send its data to client. Each client must have a pointer (initialized to NULL
) to its working node and the offset in this node. The Dequeue
function uses these parameters to adjust next working node and offset as in the following flow-chart. The Dequeue
function doesn't remove nodes as in normal queue because shared queue sends data to multiple clients. Hence, all queue nodes is kept until they can be user again in the Enqueue
function to implement a circular queue. The last point to mention is the type of the client, if it is a real-time then it should join the queue for the first time at its tail to receive last received data, or if it is not real-time it should join the queue at its head to receive all data.
bool Dequeue(QNode*& pCurNode, int& nCurNodeSendOffset, unsigned int nDelay = 0)
{
::EnterCriticalSection(&m_cs);
if(pCurNode == NULL || pCurNode->m_nSequence < m_pHead->m_nSequence)
if(m_bRealTime)
pCurNode = m_pTail, nCurNodeSendOffset =
m_pTail ? m_pTail->m_nRecvOffset : 0;
else
pCurNode = m_pHead, nCurNodeSendOffset = 0;
else if(nCurNodeSendOffset >= m_nNodeLength && pCurNode->m_pNext)
pCurNode = pCurNode->m_pNext, nCurNodeSendOffset = 0;
::LeaveCriticalSection(&m_cs);
if (nDelay + m_nDelay > 0) {
unsigned int nCurDelay = (::GetTickCount() - pCurNode->m_nTickCount) / 60000;
if (nCurDelay < nDelay + m_nDelay)
return false;
}
return pCurNode != NULL;
}
Recv
Shared queue receives data in two ways.
First is Recv
(const char* pBuffer, int nLength
), which is a simple call with the buffer to be saved in the queue.
Second is template<class RECEIVER> int <code>Recv
(RECEIVER& receiver), which asks class of type RECEIVER
to receive data.
Recv Raw Data
This is the first way to receive data. It receives buffer with any length and calls Enqueue
function to get the working queue node to keep data. So, in many cases it needs to loop if received length is greater than m_nNodeLength
.
int Recv(const char* pBuffer, int nLength)
{
int nRecvLength = 0;
while(nRecvLength < nLength)
{
QNode* pQE = Enqueue();
if(pQE->m_nRecvOffset < m_nNodeLength)
{ int nRecv = min(m_nNodeLength - pQE->m_nRecvOffset,
nLength - nRecvLength);
memcpy(pQE->m_pBuffer+pQE->m_nRecvOffset,
pBuffer+nRecvLength, nRecv);
pQE->m_nRecvOffset += nRecv;
nRecvLength += nRecv;
}
}
return nRecvLength;
}
Recv Template
This is the second way to receive data. It asks class of type RECEIVER
to receive data. It calls Enqueue
function to get the working queue node to keep data. This function loops to request data from the RECEIVER
Recv
function till the enqueued node "receive offset" reaches m_nNodeLength
.
template<class RECEIVER> int Recv(RECEIVER& receiver)
{ QNode* pQE = Enqueue();
int nRecvLength = 0, nRecv;
while(pQE->m_nRecvOffset < m_nNodeLength)
{ if((nRecv = receiver.Recv(pQE->m_pBuffer +
pQE->m_nRecvOffset, m_nNodeLength - pQE->m_nRecvOffset)) <= 0)
return nRecv;
pQE->m_nRecvOffset += nRecv;
nRecvLength += nRecv;
}
return nRecvLength;
}
Send Template
Although Send template
has a simple Flow-Chart and code, it is the key of this article. To send to multiple clients, the owner of the queue should keep the "send node" and the "offset" in that node for each client. The key here is that each client in independent of other clients. Therefore, if the sender is a socket class and has a good connection it will be always synchronized with the receive process. If the sender connection is slow, it may lag, but it will not affect other clients. Client information (SendNode
, NodeOffset
) are passed to the Send
function to be used as in the following Flow-Chart and Code.
template<class SENDER> int Send(SENDER& sender,
QNode*& pCurNode, int& nCurNodeSendOffset)
{ if(m_pHead == NULL || Dequeue(pCurNode, nCurNodeSendOffset) == false)
return 0;
int nSendBytes = pCurNode->m_nRecvOffset - nCurNodeSendOffset;
if(nSendBytes <= 0 || (nSendBytes = sender.Send(pCurNode->m_pBuffer + nCurNodeSendOffset, nSendBytes)) <= 0)
return nSendBytes;
nCurNodeSendOffset += nSendBytes;
return nSendBytes;
}
Usage
You can define any number of queues in your application. The code listed here defines one queue, one thread to receive feed, and one thread to distribute this feed to clients list. You can define many Send
threads, each having its own clients list. So you can send to thousands of clients distributed between tens of threads.
SharedQueue sq;
struct CFeedClient
{
CFeedClient()
{ m_pCurSendElem = NULL;
m_nCurSendElemOffset = 0;
}
QNode *m_pCurSendElem;
int m_nCurSendElemOffset;
SOCKET m_sock;
int Send(const char* lpcs, int nLength)
{ return ::send(m_sock, lpcs, nLength, 0); }
...
};
void RecvThreadFunc(LPVOID pParam)
{
int nLength;
char szbuf[10240];
while(true)
{
... sq.Recv(szbuf, nLength);
}
}
void SendThreadFunc(LPVOID pParam)
{
vector<CFeedClient>* pvClients = (vector<CFeedClient>*)pParam;
while(true)
for(vector<CFeedClient>::iterator client =
pvClients->begin(); client != pvClients->end(); client++)
sq.Send(*client, client->m_pCurSendElem,
client->m_nCurSendElemOffset);
}
void Usage()
{
vector<CFeedClient> vClients;
_beginthread(RecvThreadFunc, 0, NULL);
_beginthread(SendThreadFunc, 0, (LPVOID)&vClients);
}
Points of Interest
-
Thread Safe
SharedQueue is a thread safe class as it synchronizes changes of its linked list with critical section. Queue Linked List changes are limited to add or remove new nodes to the queue, which is done by Enqueue
, Dequeue
, or Clear
functions. The other functions Recv
and Send
are independent, so no need to synchronize their access, and they are the only callers for Enqueue
and Dequeue
. So, the user of the shared queue can design one thread to receive feed and multiple threads to send it or its resultant as in section Feed Distribution.
-
Send Timeout
Sender may set "send timeout" to a small value to make sure all clients have equal chances or sending data, but in this case slow clients (slow connection) may lag than fast clients (fast connection). Slow client may accept delayed data than losing data.
-
Sender Type
"Sender Type" that is passed to the Send template can be a Socket class, that has a function Send
, or it can be any type that has a Send
function. User of this class can take data and transfer it to any type or do any processing like compression or encryption, then keep or send it.
-
Thread Management
To build a simple feed distribution system we need:
- One thread to receive feed in a primary queue.
- One thread to analyze feed and format a new feed suitable for clients and save it in a secondary queue.
- N threads to send feed from secondary queue to clients of each thread. And to join new clients to the threads we can use a simple Round-robin scheduling to add new clients. Round-robin will achieve a simple and fast Load balancing technique.
-
Data keeping for Disconnect clients
If a client disconnected due to network errors, our system should let it for a certain period in threads data hoping that it will reconnect again shortly. In this way the object regarding this client still keeping its offset in the queue and will continue from its last location in the data. So, it will not lose any part of the data. If the keeping period is exceeded, the systems should delete client object, and next time it reconnects, it will join in the queue tail to receive just arrived feed as a new client.
Updates
- 12/07/2011: v0.9000 Initial Posted
- 18/01/2015: v0.9001 Modify queue to be circular and remove garbage collection.
References
Thanks to...
God