Even though there exist many technologies and approaches for network communication, such as Windows Communication Foundation or Web Servers with HTTP POST
/GET
requests, some tasks still require using of Network TCP Sockets. I’ve got a chance to work with sockets a lot recently and one of my tasks was intermediate socket service which provides communication between automatic robot and client software. The first and naïve solution was the following:
- Client sends command
- Socket server gets command and starts executing it
- Once command execution is done, send response to client
There is nothing wrong with this solution except one thing: the time between steps 2 and 3 could be really big, up to several minutes. Socket server was written in .NET 4, client was written in Ruby 1.9. For unknown reasons (after some internet research, it happened that this is a Ruby bug) when waiting time exceeds ~1 minute, client never gets server response, even though server clearly has sent it.
Here, I’ll show one of the possible solutions which eliminates this problem and adds some flexibility to solution in general. We will use token based server, which means that the server and the client will use token exchange as a handshake to confirm that all of the messages were correctly delivered.
The basis for Server Socket could be found in the following MSDN articles:
- Using an Asynchronous Server Socket
- Asynchronous Server Socket Example
We will use that and will build up more functionality.
Our scenario will be the following:
- Client initiates connection by sending the command with token for that command
- Server gets the command, stores token for that command in local token cache and sends response immediately
- On a separate thread, server executes the command
- Client sends requests with “check status” command and token
- If command has not finished yet, server replies that this token is currently “busy”
- Once command is finished, server replies “finished” for “check status” request
- Client should confirm that command is finished, server cleans up token cache after confirmation
The code that I’m providing here is just an example, prototype, and has some restrictions:
- Only one command could be set up in token cache
- If client will send another command without prior confirmation for previous command, it will be ignored by server and previous token status will be sent as a response
- It will support dummy commands, such as “say hi” or “say bye”, but it is sufficient for the demo purposes
We will use JSON as an exchange protocol and an awesome JSON library for .NET: http://json.codeplex.com/
Server code is written in C++/CLI and using CLR classes. Easily portable to C# (just a matter of syntax in this case)
I. Define the Command
Commands.h
public enum class CommandsList
{
NOT_INITIALIZED = 0,
INIT,
SAY_HI,
SAY_BYE,
CHECK_STATUS,
CONFIRM,
UNSUPPORTED = -1
};
public ref class Command
{
public:
Command();
#pragma region Utils
static CommandsList StringToCommand(String ^command);
static bool IsValid(CommandsList cmd);
#pragma endregion
#pragma region Public Properties
[JsonProperty]
property String ^command;
[JsonProperty]
property String ^token;
#pragma endregion
private:
#pragma region Private Members
static Hashtable ^commandsMap;
#pragma endregion
};
Commands.cpp
Command::Command()
{
#pragma region Command name to enum mapping
commandsMap = gcnew Hashtable();
commandsMap["init"] = CommandsList::INIT;
commandsMap["say_hi"] = CommandsList::SAY_HI;
commandsMap["say_bye"] = CommandsList::SAY_BYE;
commandsMap["check_status"] = CommandsList::CHECK_STATUS;
commandsMap["confirm"] = CommandsList::CONFIRM;
#pragma endregion
}
CommandsList Command::StringToCommand(String ^command)
{
if (commandsMap->ContainsKey(command))
{
return safe_cast<CommandsList>(commandsMap[command]);
}
else
{
return CommandsList::UNSUPPORTED;
}
}
bool Command::IsValid(CommandsList cmd)
{
if ((CommandsList::NOT_INITIALIZED != cmd) &&
(CommandsList::UNSUPPORTED) != cmd)
{
return true;
}
return false;
}
II. Define the Commands Processor
CommandsProcessor.h
public ref class CommandsProcessor
{
public:
CommandsProcessor();
void ProcessCommand(Command ^command, TokenCache^ tokenCache);
private:
#pragma region Every Command Processor
bool _isInitialized;
void ProcessInit(Command ^command, TokenCache^ tokenCache);
void ProcessHi(Command ^command, TokenCache^ tokenCache);
void ProcessBye(Command ^command, TokenCache^ tokenCache);
#pragma endregion
#pragma region Exceptions Handlers
void HandleManagedException(Exception^ exception);
void EmptyExceptionMessage();
ServiceException^ _exception;
#pragma endregion
};
CommandsProcessor.cpp
CommandsProcessor::CommandsProcessor()
{
_isInitialized = false;
_exception = gcnew ServiceException();
}
void CommandsProcessor::ProcessCommand(Command ^command, TokenCache ^tokenCache)
{
Console::WriteLine("--SLEEP--\n");
Thread::Sleep(10000);
Console::WriteLine("--WAKE UP--\n");
CommandsList cmd = command->StringToCommand(command->command);
switch (cmd)
{
case CommandsList::INIT:
{
ProcessInit(command, tokenCache);
break;
}
case CommandsList::SAY_HI:
{
ProcessHi(command, tokenCache);
break;
}
case CommandsList::SAY_BYE:
{
ProcessBye(command, tokenCache);
break;
}
case CommandsList::CHECK_STATUS:
case CommandsList::CONFIRM:
default:
{
break;
}
}
EmptyExceptionMessage();
}
#pragma region Every Command Processor
#pragma region Init
void CommandsProcessor::ProcessInit(Command ^command, TokenCache ^tokenCache)
{
try
{
_isInitialized = true;
}
catch (Exception ^exception)
{
HandleManagedException(exception);
}
JObject ^o = gcnew JObject(
gcnew JProperty("command", command->command),
gcnew JProperty("exception_message", _exception->_exceptionMessage),
gcnew JProperty("is_init", _isInitialized));
tokenCache->SetTokenResponse(command, o);
tokenCache->SetTokenStatus(command,
(_exception->_isException) ?
TOKEN_ABORTED : TOKEN_SUCCESS,
true);
tokenCache->WriteTokenFile(tokenCache->GetToken(command));
}
#pragma endregion
#pragma region Hi
void CommandsProcessor::ProcessHi(Command ^command, TokenCache ^tokenCache)
{
try
{
}
catch (Exception ^exception)
{
HandleManagedException(exception);
}
JObject ^o = gcnew JObject(
gcnew JProperty("command", command->command),
gcnew JProperty("exception_message", _exception->_exceptionMessage),
gcnew JProperty("message", "Hi!"));
tokenCache->SetTokenResponse(command, o);
tokenCache->SetTokenStatus(command,
(_exception->_isException) ?
TOKEN_ABORTED : TOKEN_SUCCESS,
true);
tokenCache->WriteTokenFile(tokenCache->GetToken(command));
}
#pragma endregion
#pragma region Bye
void CommandsProcessor::ProcessBye(Command ^command, TokenCache ^tokenCache)
{
try
{
}
catch (Exception ^exception)
{
HandleManagedException(exception);
}
JObject ^o = gcnew JObject(
gcnew JProperty("command", command->command),
gcnew JProperty("exception_message", _exception->_exceptionMessage),
gcnew JProperty("message", "Bye!"));
tokenCache->SetTokenResponse(command, o);
tokenCache->SetTokenStatus(command,
(_exception->_isException) ?
TOKEN_ABORTED : TOKEN_SUCCESS,
true);
tokenCache->WriteTokenFile(tokenCache->GetToken(command));
}
#pragma endregion
#pragma endregion
#pragma region Exceptions Handlers
void CommandsProcessor::HandleManagedException(Exception^ exception)
{
_exception->_isException = true;
_exception->_exceptionMessage = gcnew String(exception->Message);
}
void CommandsProcessor::EmptyExceptionMessage()
{
_exception->_isException = false;
_exception->_exceptionMessage = String::Empty;
}
#pragma endregion
Here we use helper class to store exceptions:
Exceptions.h
public ref class ServiceException
{
public:
ServiceException() :
_isException(false),
_exceptionMessage(nullptr)
{ }
bool _isException;
String^ _exceptionMessage;
};
III. Define the Local Token Cache for Storing Current Command’s Status and Info
This class stores token status in memory and on file system for emergency cases (power off computer without prior confirmation for token for example).
TokenCache.h
public enum TokenStatuses
{
TOKEN_BUSY = 0,
TOKEN_SUCCESS,
TOKEN_ABORTED,
TOKEN_BAD_COMMAND = -1
};
public ref class TokenConverter
{
public:
static String^ TokenStatusToString(TokenStatuses status);
};
public ref class TokenStatus
{
public:
#pragma region Public Properties
[JsonProperty]
property String ^token;
[JsonProperty]
property String ^status;
[JsonProperty]
property bool finished;
[JsonProperty]
property JObject ^response;
#pragma endregion
TokenStatus(String ^_token);
};
public ref class TokenCache
{
public:
#pragma region Token Operations
static TokenStatus ^GetToken(Command^ cmd);
static bool SetToken(Command^ cmd, TokenStatus^ token);
static bool SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished);
static bool SetTokenResponse(Command^ cmd, JObject ^response);
static bool RemoveToken(Command^ cmd);
static bool CheckTokenFile([Out] TokenStatus ^%token);
static void WriteTokenFile(TokenStatus ^token);
static void DeleteTokenFile();
#pragma endregion
private:
#pragma region Private Members
static Dictionary<String^, TokenStatus^> ^_tokens
= gcnew Dictionary <String^, TokenStatus^>();
static String^ prevToken = String::Empty;
#pragma endregion
};
TokenCache.cpp
TokenStatus::TokenStatus(String^ _token)
{
token = _token;
status = TokenConverter::TokenStatusToString(TOKEN_BUSY);
finished = false;
response = nullptr;
}
TokenStatus^ TokenCache::GetToken(Command^ cmd)
{
TokenStatus ^t = nullptr;
System::Threading::Monitor::Enter(_tokens);
try
{
if (String::Empty == prevToken)
{
_tokens->TryGetValue(cmd->token, t);
}
else
{
_tokens->TryGetValue(prevToken, t);
}
}
finally
{
System::Threading::Monitor::Exit(_tokens);
}
return t;
}
bool TokenCache::RemoveToken(Command^ cmd)
{
bool res = false;
TokenStatus ^t = nullptr;
System::Threading::Monitor::Enter(_tokens);
try
{
if (String::Empty == prevToken)
{
if (_tokens->TryGetValue(cmd->token, t))
{
if (t->finished)
{
_tokens->Remove(cmd->token);
res = true;
}
}
}
else
{
if (_tokens->TryGetValue(prevToken, t))
{
if (t->finished)
{
_tokens->Remove(prevToken);
prevToken = String::Empty;
res = true;
}
}
}
}
finally
{
System::Threading::Monitor::Exit(_tokens);
}
return res;
}
bool TokenCache::SetToken(Command^ cmd, TokenStatus^ token)
{
bool res = false;
System::Threading::Monitor::Enter(_tokens);
try
{
if (String::Empty == prevToken)
{
if (!_tokens->ContainsKey(cmd->token))
{
_tokens[cmd->token] = token;
prevToken = cmd->token;
res = true;
}
}
}
finally
{
System::Threading::Monitor::Exit(_tokens);
}
return res;
}
bool TokenCache::SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished)
{
bool res = false;
System::Threading::Monitor::Enter(_tokens);
try
{
TokenStatus ^t = nullptr;
if (_tokens->TryGetValue(cmd->token, t))
{
t->status = TokenConverter::TokenStatusToString(status);
t->finished = finished;
_tokens[cmd->token] = t;
res = true;
}
}
finally
{
System::Threading::Monitor::Exit(_tokens);
}
return res;
}
bool TokenCache::SetTokenResponse(Command^ cmd, JObject ^response)
{
bool res = false;
System::Threading::Monitor::Enter(_tokens);
try
{
TokenStatus ^t = nullptr;
if (_tokens->TryGetValue(cmd->token, t))
{
t->response = response;
res = true;
}
}
finally
{
System::Threading::Monitor::Exit(_tokens);
}
return res;
}
bool TokenCache::CheckTokenFile([Out] TokenStatus ^%token)
{
bool fileExists = false;
token = gcnew TokenStatus(String::Empty);
try
{
IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
IsolatedStorageFileStream ^isoStream =
gcnew IsolatedStorageFileStream("token.last", FileMode::Open, isoFile);
String ^tokenData = nullptr;
try
{
StreamReader ^sr = gcnew StreamReader(isoStream);
try
{
tokenData = sr->ReadToEnd();
}
finally
{
delete sr;
}
fileExists = true;
}
catch (...) { }
if (!String::IsNullOrEmpty(tokenData))
{
try
{
token = JsonConvert::DeserializeObject<TokenStatus^>(tokenData);
}
catch (...) { }
}
isoFile->Close();
}
catch (...) { }
return fileExists;
}
void TokenCache::WriteTokenFile(TokenStatus ^token)
{
try
{
IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream(
"token.last", FileMode::Create, FileAccess::Write, isoFile);
try
{
JsonSerializer ^serializer = gcnew JsonSerializer();
StreamWriter ^sw = gcnew StreamWriter(isoStream);
try
{
JsonWriter ^writer = gcnew JsonTextWriter(sw);
try
{
serializer->Serialize(writer, token);
}
finally
{
delete writer;
}
}
finally
{
delete sw;
}
}
catch (...) { }
delete isoFile;
isoFile->Close();
}
catch (...) { }
}
void TokenCache::DeleteTokenFile()
{
try
{
IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
isoFile->DeleteFile("token.last");
delete isoFile;
isoFile->Close();
}
catch(...) { }
}
String^ TokenConverter::TokenStatusToString(TokenStatuses status)
{
String^ retStr = String::Empty;
switch(status)
{
case TOKEN_BUSY:
retStr = "busy";
break;
case TOKEN_SUCCESS:
retStr = "success";
break;
case TOKEN_ABORTED:
retStr = "aborted";
break;
case TOKEN_BAD_COMMAND:
default:
retStr = "bad_command";
break;
}
return retStr;
}
IV. Finally, Define Threaded Commands Processor and Socket Server Itself
SoketServer.h
public ref class SingleCommandProcessor
{
public:
SingleCommandProcessor(
Command^ currCmd,
TokenCache ^tokenCache,
CommandsProcessor ^commandsProcessor) :
_currCmd(currCmd),
_tokenCache(tokenCache),
_commandsProcessor(commandsProcessor)
{}
void DoProcess();
private:
Command ^_currCmd;
TokenCache ^_tokenCache;
CommandsProcessor ^_commandsProcessor;
};
public ref class StateObject
{
public:
StateObject();
Socket ^workSocket;
literal int BufferSize = 1024;
array<Byte> ^buffer;
StringBuilder ^sb;
private:
bool _initialized;
void InitializeInstanceFields();
};
public ref class SocketServer
{
public:
SocketServer(String ^ipAddress, int port);
void Run(String ^ipAddress, int port, int backlog);
void Stop();
private:
void AcceptCallback(IAsyncResult ^ar);
void ReceiveCallback(IAsyncResult ^ar);
void SendCallback(IAsyncResult ^ar);
SocketPermission ^_permission;
Socket ^_sListener;
static ManualResetEvent ^allDone = gcnew ManualResetEvent(false);
CommandsProcessor ^_commandsProcessor;
TokenCache ^_tokenCache;
};
The code for Asynchronous Callbacks is the same here as in MSDN examples (links are above) except of Receive Callback part. The logic for receive callback is the following:
-
Parse command
-
If valid command, check for tokens
-
If token not finished => reply with current token
-
If no current tokens => reply with new token
SocketServer.cpp
StateObject::StateObject()
{
InitializeInstanceFields();
}
void StateObject::InitializeInstanceFields()
{
if ( !_initialized)
{
buffer = gcnew array<Byte>(BufferSize);
sb = gcnew StringBuilder();
_initialized = true;
}
}
void SingleCommandProcessor::DoProcess()
{
_commandsProcessor->ProcessCommand(_currCmd, _tokenCache);
}
SocketServer::SocketServer(String ^ipAddress, int port)
{
_permission = gcnew SocketPermission(PermissionState::Unrestricted);
_commandsProcessor = gcnew CommandsProcessor();
_tokenCache = gcnew TokenCache();
_sListener = nullptr;
}
void SocketServer::Run(String ^ipAddress, int port, int backlog)
{
_permission->Demand();
IPAddress ^ipAddr = IPAddress::Parse(ipAddress);
IPEndPoint ^ipEndPoint = gcnew IPEndPoint(ipAddr, port);
_sListener = gcnew Socket(ipAddr->AddressFamily, SocketType::Stream, ProtocolType::Tcp);
_sListener->Bind(ipEndPoint);
_sListener->Listen(backlog);
while (true) {
allDone->Reset();
Console::WriteLine("Waiting for a connection on port "+
Convert::ToString(ipEndPoint->Address) + ":" +
Convert::ToString(ipEndPoint->Port));
AsyncCallback ^aCallback = gcnew AsyncCallback(this, &SocketServer::AcceptCallback);
_sListener->BeginAccept(aCallback, _sListener);
allDone->WaitOne();
}
}
void SocketServer::Stop()
{
if (_sListener->Connected)
{
_sListener->Shutdown(SocketShutdown::Both);
_sListener->Close();
}
}
void SocketServer::AcceptCallback(IAsyncResult ^ar)
{
Socket ^listener = nullptr;
Socket ^handler = nullptr;
allDone->Set();
listener = safe_cast<Socket^>(ar->AsyncState);
handler = listener->EndAccept(ar);
StateObject ^state = gcnew StateObject();
state->workSocket = handler;
handler->BeginReceive(
state->buffer,
0,
StateObject::BufferSize,
SocketFlags::None,
gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
state);
}
void SocketServer::ReceiveCallback(IAsyncResult ^ar)
{
bool startNewProcessingThread = false;
StateObject ^state = safe_cast<StateObject^>(ar->AsyncState);
Socket ^handler = state->workSocket;
String ^content = String::Empty;
String ^response = String::Empty;
int bytesRead = handler->EndReceive(ar);
String ^newLine = Encoding::ASCII->GetString(state->buffer,0,bytesRead);
if (bytesRead > 0)
{
state->sb->Append(newLine);
handler->BeginReceive(
state->buffer,
0,
StateObject::BufferSize,
SocketFlags::None,
gcnew AsyncCallback(this, &SocketServer::ReceiveCallback),
state);
}
if (newLine->Contains("<end_of_message>"))
{
if (state->sb->Length > 0)
{
Command ^command;
content = state->sb->ToString();
Console::WriteLine("Read "+
Convert::ToString(content->Length) +
"bytes from client.\n Data: " +
content + "\n");
CommandsList cmd = CommandsList::NOT_INITIALIZED;
try
{
String ^json = content->Substring(0, content->LastIndexOf("}") + 1);
command = JsonConvert::DeserializeObject<Command^>(json);
cmd = command->StringToCommand(command->command);
}
catch (...) { }
if (Command::IsValid(cmd))
{
TokenStatus ^t;
if(!_tokenCache->CheckTokenFile(t))
{
t = _tokenCache->GetToken(command);
}
if (nullptr != t)
{
if (CommandsList::CONFIRM == cmd)
{
bool tokenDone = _tokenCache->RemoveToken(command);
if (tokenDone)
{
_tokenCache->DeleteTokenFile();
}
response = (gcnew JObject(
gcnew JProperty("status",
TokenConverter::TokenStatusToString(
(tokenDone ? TOKEN_SUCCESS : TOKEN_ABORTED)))))->ToString();
}
else
{
response =
JsonConvert::SerializeObject(t,
Newtonsoft::Json::Formatting::Indented);
}
}
else
{
t = gcnew TokenStatus(command->token);
_tokenCache->SetToken(command, t);
startNewProcessingThread = true;
response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
}
}
else
{
JObject ^o = gcnew JObject(
gcnew JProperty("status", TokenConverter::TokenStatusToString(TOKEN_BAD_COMMAND)));
response = o->ToString();
}
array<Byte> ^byteData = Encoding::Unicode->GetBytes(response);
Console::WriteLine("Sending: \n" +
response + "\n");
handler->BeginSend(
byteData,
0,
byteData->Length,
SocketFlags::None,
gcnew AsyncCallback(this, &SocketServer::SendCallback),
handler);
state->sb->Clear();
if (startNewProcessingThread)
{
SingleCommandProcessor^ threadWork =
gcnew SingleCommandProcessor(command, _tokenCache, _commandsProcessor);
Thread^ newThread = gcnew Thread( gcnew ThreadStart(
threadWork, &SingleCommandProcessor::DoProcess ) );
newThread->Start();
}
}
}
}
void SocketServer::SendCallback(IAsyncResult ^ar)
{
Socket ^handler = safe_cast<Socket^>(ar->AsyncState);
int bytesSend = handler->EndSend(ar);
Console::WriteLine("Sent "+
Convert::ToString(bytesSend) +
"bytes to client." + "\n");
handler->Disconnect(true);
}
V. To Test Server Functionality, There is a Simplest Client Written in C#
Client.cs
public class Command
{
[JsonProperty]
public String command;
[JsonProperty]
public String token;
};
class Client
{
static void Main(string[] args)
{
PrintHelp();
byte[] bytes = new byte[1024];
string line = String.Empty;
Command cmd = new Command();
while (true)
{
line = Console.ReadLine();
if (line == "exit")
{
break;
}
int option = Int32.Parse(line);
switch (option)
{
case 0:
cmd.command = "check_status";
break;
case 1:
cmd.command = "init";
cmd.token = GenerateToken();
break;
case 2:
cmd.command = "say_hi";
cmd.token = GenerateToken();
break;
case 3:
cmd.command = "say_bye";
cmd.token = GenerateToken();
break;
case 4:
cmd.command = "confirm";
break;
default:
PrintHelp();
cmd.command = "echo";
break;
}
try
{
SocketPermission permission = new SocketPermission(PermissionState.Unrestricted);
permission.Demand();
IPAddress ipAddr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEndPoint = new IPEndPoint(ipAddr, 8081);
Socket sender = new Socket(
ipAddr.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp
);
sender.Connect(ipEndPoint);
Console.WriteLine("Socket connected to {0}",
sender.RemoteEndPoint.ToString());
string theMessage = JsonConvert.SerializeObject(cmd, Formatting.Indented);
Console.WriteLine(theMessage + "\n");
byte[] msg = Encoding.ASCII.GetBytes(theMessage + "<end_of_message>");
int bytesSend = sender.Send(msg);
int bytesRec = sender.Receive(bytes);
theMessage = Encoding.Unicode.GetString(bytes, 0, bytesRec);
while (sender.Available > 0)
{
bytesRec = sender.Receive(bytes);
theMessage += Encoding.Unicode.GetString(bytes, 0, bytesRec);
}
Console.WriteLine("The server reply: {0}\n", theMessage);
sender.Shutdown(SocketShutdown.Both);
sender.Close();
}
catch (Exception ex)
{
Console.WriteLine("Exception: {0}", ex.Message);
}
}
}
private static string GenerateToken()
{
ASCIIEncoding encoding = new ASCIIEncoding();
byte[] token = encoding.GetBytes(
DateTime.Now.Hour.ToString() +
DateTime.Now.Minute.ToString() +
DateTime.Now.Second.ToString() +
DateTime.Now.Millisecond.ToString());
return Convert.ToBase64String(token, 0, token.Length);
}
private static void PrintHelp()
{
Console.WriteLine("1 : init");
Console.WriteLine("2 : say_hi");
Console.WriteLine("3 : say_bye");
Console.WriteLine("4 : confirm");
Console.WriteLine("0 : check_status");
Console.WriteLine();
}
}
The advantages of the following approach are listed below:
- We can guarantee that both client and server always get responses from each other
- Server is designed with potential support to process several commands simultaneously: each command has its own thread and main thread is processing incoming connections
- Using token-based approach, we can guarantee that server is answering to required command – we cannot mess up responses
- Again, thanks to token-based approach, we can implement server polling from client side in order to track last command status
- None of the responses if lost
As usual, the Visual Studio solution and code files could be found on my public SVN: http://subversion.assembla.com/svn/max-s-blog-posts/ (SocketServerWithTokens folder).
Hope somebody will find this useful. Thanks and happy socket coding.
Max.