Table of contents
Introduction
The introduction of the Websocket protocol marks an interesting milestone in the evolution of the web. Finally, it is possible
for a web page to open a full duplex connection with a remote server and asynchronously receive data without having to poll for it.
This opens the door for plenty of ideas to be easily doable by implementing a web front-end that gets deployed to multiple types of devises and
a custom server side application able to handle thousands of simultaneously connected clients and which can be deployed on one low-cost server machine.
In this article, a Websocket server application is developed and we showcase its interaction with a webpage. The solution is based on a real time communication
library that was previously published at CodeProject: Push Framework.
The protocol layer is devised in an independent library project that can be easily reused by developers.
Protocol Extension Layer
The solution presented in this article is based on Push Framework which provides a foundation for creating real time servers able to manage
large numbers of simultaneously connected clients. Push Framework is protocol independent: It is up to us to give protocol details
and information by making concrete implementation of the following "abstract classes":
IncomingPacket
: What's the prototype for incoming messages, i.e., messages sent by the client and which the server needs to react to?OutgoingPacket
: What's the prototype for outgoing messages? Most protocols are
symmetric, so it should be the same as IncomingPacket
.Protocol
: How incoming packets are deserialized and how outgoing packets are serialized so they get sent through the network.
To make PushFramework::Protocol
a concrete class, the following virtual methods must be overridden:
encodeOutgoingPacket
: it takes an OutgoingPacket
and encodes it.frameOutgoingPacket
: it takes an encoded OutgoingPacket
and inserts it into the output socket buffer.tryDeframeIncomingPacket
: it provides a reference to the received data. These should be examined and an
IncomingPacket
object may be returned.decodeIncomingPacket
: if tryDeframeIncomingPacket
succeeds in making an
IncomingPacket
, this function should decode its content.
These methods are requested internally by PF at serialization and de-serialization times and they provide enough abstraction to the majority of protocols.
Dealing with the Websocket protocol, it should be understood that the encoding/decoding part is a separate implementation: This is because,
the spec is more of a framing protocol. It details how the payload is encapsulated into a frame along with header information
so that it is transmitted into the network. But it does not impose how the payload is "encoded". So the methods that are really relevant to Websocket
are frameOutgoingPacket
and tryDeframeIncomingPacket
. In our example, we do not do a big job in the encoding stage. Developers might find it
suitable to modify this, by adding a JSON layer, for example.
The spec., however, talks about two communication stages in the protocol, which leads us to creating two types of data structures:
- A handshake message: when a connection is accepted at the transport layer, a handshake stage where some
negotiation is made begins.
- A websocket data message: this will represent the data messages that are exchanged once the handshake stage is accomplished.
The framing code should distinguish between the two stages.
int WebsocketProtocol::tryDeframeIncomingPacket( PushFramework::DataBuffer& buffer,
PushFramework::IncomingPacket*& pPacket, int& serviceId,
unsigned int& nExtractedBytes, ConnectionContext* pContext )
{
if (buffer.GetDataSize() == 0)
return Protocol::eIncompletePacket;
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage* pMessage =
new WebsocketHandshakeMessage(buffer.GetBuffer(), buffer.GetDataSize());
serviceId = 0;
nExtractedBytes = buffer.GetDataSize();
pPacket = pMessage;
return Protocol::Success;
}
int nMinExpectedSize = 6;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE payloadFlags = buffer.getAt(0);
if (payloadFlags != 129)
return Protocol::eUndefinedFailure;
BYTE basicSize = buffer.getAt(1) & 0x7F;
unsigned __int64 payloadSize;
int masksOffset;
if (basicSize <= 125)
{
payloadSize = basicSize;
masksOffset = 2;
}
else if (basicSize == 126)
{
nMinExpectedSize += 2;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohs( *(u_short*) (buffer.GetBuffer() + 2) );
masksOffset = 4;
}
else if (basicSize == 127)
{
nMinExpectedSize += 8;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohl( *(u_long*) (buffer.GetBuffer() + 2) );
masksOffset = 10;
}
else
return Protocol::eUndefinedFailure;
nMinExpectedSize += payloadSize;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE masks[4];
memcpy(masks, buffer.GetBuffer() + masksOffset, 4);
char* payload = new char[payloadSize + 1];
memcpy(payload, buffer.GetBuffer() + masksOffset + 4, payloadSize);
for (unsigned __int64 i = 0; i < payloadSize; i++) {
payload[i] = (payload[i] ^ masks[i%4]);
}
payload[payloadSize] = '\0';
WebsocketDataMessage* pMessage = new WebsocketDataMessage(payload);
serviceId = 1;
nExtractedBytes = nMinExpectedSize;
pPacket = pMessage;
delete payload;
return Protocol::Success;
}
The Websocket Server
In WebsocketServer, we instantiate a main object derived from PushFramework::Server
, initialize it by describing a Protocol object, a service object,
and a ClientFactory
object, then we start it by calling the ::Start
member function.
When this function is called, many resources are put in place:
- A listening thread
- A pool of threads (IO Workers) to service IO events
- A main thread to manage the overall server structures
- A number of "streaming threads", these will stream out data in
broadcast queues to subscribers
The protocol object provided should be derived from the WebsocketProtocol
class designed in the separate DLL project.
As for the ClientFactory
subclass, it should manage the lifecycle of connected clients. Particularly, it decides on
the transition of when a newly accepted connection (PhysicalConnection
) is transformed into a
legitimate client (LogicalConnection
).
In our case, this transition is dependent on two validations: handshake validation as described by
the Websocket protocol, and
a login validation where we just require that clients send a unique pseudonym.
int WebsocketClientFactory::onFirstRequest( IncomingPacket& _request,
ConnectionContext* pConnectionContext, LogicalConnection*& lpClient,
OutgoingPacket*& lpPacket )
{
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pConnectionContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage& request = (WebsocketHandshakeMessage&) _request;
if (!request.Parse())
{
return ClientFactory::RefuseAndClose;
}
WebsocketHandshakeMessage *pResponse = new WebsocketHandshakeMessage();
if (WebsocketProtocol::ProcessHandshake(request, *pResponse))
{
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::LoginStage);
}
return ClientFactory::RefuseRequest;
}
if (pCxt->GetStage() == WebsocketConnectionContext::LoginStage)
{
WebsocketDataMessage& request = (WebsocketDataMessage&) _request;
WebsocketClient* pClient = new WebsocketClient(request.GetArg1());
lpClient = pClient;
WebsocketDataMessage *pResponse = new WebsocketDataMessage(LoginCommunication);
pResponse->SetArguments("Welcome " + request.GetArg1());
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::ConnectedStage);
return ClientFactory::CreateClient;
}
}
The server business code is organized into "Service" classes. Each is bound to a particular type of request:
WebsocketServer server;
server.registerService(EchoCommunication, new EchoService, "echo");
server.registerService(Routedcommunication, new RoutedCommunicationService, "routed");
server.registerService(GroupCommunication, new GroupCommunicationService, "grouped");
server.registerService(StreamedCommunication, new StreamedCommunicationService, "streamed");
Let's see the source code for two of them:
void RoutedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
LogicalConnection* pRecipient = FindClient(request.GetArg1().c_str());
if (pRecipient)
{
WebsocketDataMessage response(Routedcommunication);
response.SetArguments(client.getKey(), request.GetArg2());
pRecipient->PushPacket(&response);
}
}
For the forth situation, all the servers care about is handling the subscribe, unsubscribe requests:
void StreamedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
string opType = request.GetArg1();
if (opType == "subscribe")
{
broadcastManager.SubscribeConnectionToQueue(client.getKey(), "streamingQueue");
}
if (opType == "unsubscribe")
{
broadcastManager.UnsubscribeConnectionFromQueue(client.getKey(), "streamingQueue");
}
}
In fact, PF already has a publish/subscribe mechanism, so that we just care about setting up the queues, subscribe clients to
these, and publish messages. Message senders do not know about receivers nor are receivers aware about senders. Available data is streamed
continuously to those who are interested in it.
The Client
Our web page displays four tabs, in each tab we can trigger one type of operation:
- Echo tab: a message is sent and all the server does is echo it back to the client.
- Routed Communication: a message is sent to a particular client, the server takes care of routing it to its destination.
- Group Communication: a message is sent to the server so it is published to a broadcast queue. We can remotely subscribe to the queue, to begin receiving all content.
- Streamed Communication: allows subscription/unsubscription to a broadcast queue of which content is published automatically. A server thread will
do this publishing, so we can experience real time data in the client.
To login, the client enters a pseudonym then clicks "Connect". The server then replies back.
You can test the different types of communication workflows like echo communication and streamed communication where you get
a real
time stream of messages automatically created by the server and sent to the webpage:
References