CTP - sequence matters
Visit project's home site.
Introduction
First of all, what is meant here as "computational cluster". Cluster is a union of workstations, which is formed for some definite purposes. Computational cluster is a cluster, which is built for heavy computations. It is a specific system that asserts special requirements for network functionality. Main properties of the networking mechanism for a good quality cluster are:
- Fast data interchange.
- Reliable data transfer.
- Broadcasting support. As usual, all workstations inside some net take part in the computational experiment, so broadcasting makes controlling much easier.
- Huge data blocks interchange support. Sometimes, for example, initial conditions of experiment can be represented by such a block.
- Peer-to-peer networking. Any workstation can be the data source and the data destination, so they all are clients and servers simultaneously.
In fact, majority of parallel computing software toolkits are represented as libraries, which use standard networking protocol TCP/IP [1]. There are a lot of disadvantages of using this protocol:
- Low speed of data interchange. The "reliability" and "universality" of TCP has a lot of overhead charges. This protocol is a general-purpose one, so it is suitable for working in such unstable matter as Internet, but in a constant (or quite constant) system, which was developed for computations, it is possible to get more benefits.
- TCP does not support broadcasting. UDP does, but it is not reliable and the size of UDP datagram is limited by 65467 bytes [1].
- Ideology of logical channel creation before data interchange is redundant for cluster computations. Firstly because cluster, as usual, is a well tuned, good working net. Secondly because, some strategies of cluster computing lead to disordered interchange between workstations.
- TCP is a stream-based protocol, but, for determined tasks, bounded blocks interchange is preferable, because it allows to say definitely, when all data, necessary for further operations, have arrived.
Of course, a specialized networking protocol can be adapted for special requirements, which arise for cluster computations. So, CTP is a protocol that is to satisfy needs of arbitrary tasks, which, need support of rapid messages interchange and which, can start heavy computations,s as a reaction for message receipt. Despite the fact that the letter "P" from its name means "protocol", it is not just a specification. CTP is an ideology and toolkit, which allows to use it. So, it is able to replace such products as MPI implementations, PVM, and so on.
Ideology
The majority of existing toolkits for parallel computations use, the so named "message", as a basic abstraction. The basic abstraction used in CTP is "command". Command is an order from somebody to someone to do something (in most cases, workstations in clusters are communicating exactly in this way) or the response for such an order. From the last sentence, it is possible to conclude that a command is characterized by the following parameters:
- "somebody" - sender.
- "someone" - recipient.
- "something" - command's description.
So, first of all, it is needed to define the sender and the recipient somehow. For this purpose, IP addresses will be used. The reason is that IP is used extremely widely and it fully satisfies the requirements (gives unique identifiers to all workstations). Commands will be identified by integer numbers.
In terms of the discussed protocol words, "command" and "message" are, actually, synonyms. "Command" is "message", but not always vice versa.
CTP needs to satisfy the cluster networking requirements, listed above. The way in which this will be achieved follows (in the same order as in the introduction):
- For incrementing the speed of interchange, UDP will be used as the basis of the protocol. Moreover, the usage of UDP, without going down to raw networking, will save the user in future from additional problems with the protocol support toolkit's installation.
- Reliability of data interchange is to be implemented. Each sent packet will be stored until the recipient has not confirmed the receipt of the data. To maintain this mechanism, packets are to be provided with identifiers. Identification will be performed by assigning integer numbers on the sender-side. These IDs cannot be unique in general, but are to be unique for each sender.
- Broadcasting support is one more argument to use UDP as the basic protocol.
- Huge data interchange support is to be implemented. If a message that is greater than some limit (65400 bytes, by default) is going to be sent, then it is to be divided into smaller parts. These parts will be enumerated and sent to the recipient separately, one by one. On the recipient's side, they will be united to arrange the initial command. An important note is that the recipient application will get information about the command's arrival only after all its parts have been received. Such commands will be named as "large commands", but on practice, the majority of commands are "normal" (need a single packet for its transfer).
- For peer-to-peer interchange, CTP's implementations are to include both client and server functionality, as a solid unity.
CTP/IP's relationship with the OSI-model [2] and UDP/IP ideology is shown on fig. 1.
Fig. 1. Relationship between OSI-model, UDP/IP-model and CTP/IP-model.
The fact that CTP covers a number of layers, from transport layer to application layer, proves that the area of its responsibility starts from relatively low level and goes to a high one.
Internal World
The following discussion is carried out in the form of chaotic descriptions of CTP basic concepts, their properties, and functionality. Reading of all this is to form a solid idea on what is going on inside. Such a strange form of statement was chosen, because systematic discussion is to be started from several points simultaneously, but this is, unfortunately, impossible.
Debug abilities of CTP may help to better understand the sequence of operations. There is a feature to place descriptions of all actions and events to a given output stream.
Header
Each CTP-packet is represented, as usual, as header plus body (data). The structure of the header is shown in table 1 (in the order of appearance).
Name |
Size (in bits) |
Comment |
Packet size |
16 |
Unsigned integer. Size of the packet (including header). |
Command's number |
16 |
Unsigned integer. Command's number (from 0 to 32767, highest bit is not set). If highest bit is set, then packet represents a confirmation for the message with the corresponding command's number. |
Packet's number inside the large message |
32 |
Unsigned integer. For large commands - packets number, from zero to amount of packets (given by next field of the header) minus unity. For normal commands - zero. |
Amount of packets for the message |
32 |
Unsigned integer. For large commands - amount of packets needed for its sending. For normal commands - zero or unity. |
ID |
32 |
Unsigned integer. Identifier of packet. Must be unique for each sender. |
Message size |
64 (only 48 are used) |
Unsigned integer. Size of the whole command's data (without any headers). So, size of the biggest command is the maximal amount of packets multiplied by the maximal size of packet, namely 232*65400. This equals 280890861158400 bytes, or more than 255 terabytes. Last value allows to consider the size of message as unlimited. It is obvious that 48 bits are enough to store size value, but 64 bits were apportioned for alignment. |
Options |
8 |
Set of bits. Each bit determines if corresponding option is set or not. Options will be discussed later. |
Table 1. CTP packet's header
It is possible to calculate that the total size of the header is 25 bytes. All important transfer parameters, as IP addresses and ports of sender and recipient, are stored in the UDP header.
Each packet can be fully identified by its sender, its receiver, and ID. Sender provides uniqueness of ID for each recipient in the following way: initial value of ID for next packet that will be sent is taken as pseudorandom number. After sending each packet, it is to be incremented. The very first packet sent to the recipient, is to be marked with special option (see below) to allow recipient to learn the value of the starting ID.
Storages
The flowchart, which illustrates the sequence of operations CTP does for packet interchange, is shown on fig. 2:
Fig. 2. Flowchart of CTP's implementation.
In the flowchart, so named, "storages" are mentioned. There are four storages for data, cumulated during lifetime, which provide functionality.
- Session information storage. It stores description of each workstation the current one communicates with. Among description, next packet ID, interchange timeout, and description of packets received from this recipient, are meant here.
Interchange timeout is used to determine when the sent packets need to be resend if they have not been confirmed. This timeout is adoptive (because a cluster can be rather heterogeneous and can involve workstations via both intranet and Internet). Initially, default timeout is taken (100 milliseconds, by default). After the first interchange, its value is taken as time, needed for it, multiplied by coefficient (3, by default).
If confirmation of packet's arrival will not be received during timeout, then the packet is to be resent. The period between resending will grow exponentially. If packet is not be confirmed after 8 re-sendings (255 timeouts will pass), then an error message "Command is not confirmed too long" will be generated. If timeout is set to zero, then this feature is switched off.
Messages can be resent. So, it is necessary to protect the user from receiving one message several times. That is why, descriptions of received packets are stored for each addressee. It is implemented as an ordered list. First element contains the maximal ID of the packet, received in sequence. After this element, there can be more IDs, corresponding to packets, which have been received, but which are greater than the first element. After insertion of each new ID in this list, the sequence, which begins from the first element, is to be truncated. For example, let's assume that this storage contains {7, 9, 10, 11, 13, 14}. This means that all packets with ID less or equal to 7 and equal to 9, 10, 11, 13 and 14 already have been received. After receiving the packet with ID 8, the list will take the form {11, 13, 14}. If all packets arrive in sequence, then the list always contains a single element.
Values of IDs are to be in the endless loop (after 232-1 goes 0). Determining of starting ID, which was generated by the sender, is very important in this stage.
A new entry is added to session information storage when the first message is going to be sent or was received from workstation, or is unknown yet. There is to be a special entry for broadcasted messages.
- Sent commands storage. To send the command, packets are to be arranged. Some memory is to be allocated and filled with packet headers and data. The fact is that it will not be freed and unallocated just after sending, but stored to the sent commands storage. A record can be removed from sent commands storage only after all its packets arrival have been confirmed.
This ideology can be implemented not for "each command", but for "each packet" (like in CTP 1.0), but first variant is preferable. In this case, so named "smart buffers" can keep from redundant memory allocations, by reserving and guarding memory needed for headers, while doing packets data arrangement.
- Large commands storage is used to arrange the whole large message, when receiving it part by part. It stores the total amount, the vector of parts receiving status, and a buffer for compiling. Each part of the message, except, probably, the last one, is of maximal data size, so parts can easily find their places in the buffer, knowing their numbers. When all parts have been received, the message is considered to be arranged and the server informs the application about data arrival.
- Deliveries storage. The whole received message or error description is, so named, "delivery". After generating, they will be added into deliveries list. Then special deliverer threads will take them from the list and pass them to the application.
Speaking in terms of object-oriented programming: objects of classes, which implement the recipient application, can subscribe to get deliveries for given command. In this case, the corresponding object will receive information about the command's arrival, and about errors which are related to this command.
Also, there is to be a default receiver that gets information about the command, which has no subscribers, and about common errors (like error while sockets creation) which have no related commands.
Confirmations
Confirmation is a packet with empty body (header only), which has only three differences from headers of the packet having been confirmed. In confirmations header:
- Packet's size is set to header's size.
- In command's number, highest bit is set.
- Message size is set to zero.
It can be considered as an inefficient solution - to confirm each packet with separate confirmation, but it was done to provide more features by using options. Do not forget that CTP is not only a protocol, but also a toolkit.
After The Packet Have Been Sent
First thing the recipient does when it receives a packet is check if the same packet has been already received. If such a packet was already received, that means that the sender failed to get the confirmation, so confirmation has to be sent again and this packet's receiving procedure can be skipped.
Confirmation has to be, exactly, "sent again", not "resent". Confirmations are not stored in the sent packets storage, but are generated when needed.
If such a packet has not been received earlier, then information about its arrival needs to be stored.
If the got packet represent the normal command, then the server informs the application about data arrival (creates delivery and puts it to the deliveries storage). If it is a part of a large command, then the server stores it to the large command storage. If it is the last remainder part of the message, then delivery also is to be generated.
After the packet has found its place, confirmation has to be sent, and the recipient begins to wait for the next packet.
When the sender receives any confirmation, it is to delete the corresponding record from the sent packets storage. The mechanism, like a physical system, aspires to minimize its potential energy, to free all storages as soon as possible.
Threads
Implementation of protocol functionality is to be multithreaded. There are three types of threads:
- Server threads receive packets, implement confirmation support, large commands arrangement and so on. If data arrives or error information appears, the thread adds it to the deliveries storage. Once per some period (100 milliseconds, by default), this thread checks the existence of packets that need resending, and resends them if necessary. There can be an arbitrary amount of server threads, depending on the task.
- Deliverer threads check deliveries list, and if it is not empty, then delivers the first delivery to the corresponding subscriber or to the default receiver. If the thread does nothing for a long time (20 seconds, by default) then it will be terminated. On idle loop, a thread can fall asleep for some period (20 milliseconds, by default).
Each command is an order (or response). It can order to do something difficult and enduring. So implementation of deliverers as separate threads allows to compute something "by order", just in place and moment, when it has been requested. Nevertheless, it is strongly recommended not to waste this feature. Do not use, for example, modal dialogs in command receiving handlers, because it will keep the deliverer busy uselessly.
- Delivery manager thread creates additional deliverer threads if all existing deliverers are busy and deliveries list is not empty. Of course, the maximum amount of deliverers is limited by some value (50, by default). The protocol's mechanism aspires to reduce loading. On idle loop, a thread can fall asleep for some time (10 milliseconds, by default).
Options
Options allow to add an interesting functionality to the networking. There are five possible options:
These options can be used in any combination: separately, all jointly, and so on.
For example, options set ErrorOnce
, NoResend
and UniqueCommand
, ORed together, can be useful for commands like: "answer me if you are alive" (it is also called "ping"). For commands that are sent often, which are small and which does not bring information, but does response or confirmation - the recipient is working.
Implementation for Windows
Initially, CTP was implemented for the Windows operating system to become the basis of the networking mechanism used in (Cellular Automata Modeling Environment & Library) project [3]. See project's home site. Of course, it also could be used for arbitrary applications, which needed rapid messages interchange and heavy computations "by order".
The protocol's implementation is represented by a set of classes. The class which implements the main functionality of CTP has the name CCTPNet
. The description of all classes which are involved in the CTP's implementation follows.
IPAddr Class
Objects of IPAddr
class represent the IP-address of the workstation. This class does not need any explanations except the source.
union IPAddr
{
typedef unsigned __int32 IPSolid;
struct IPBytes {
unsigned char b1,b2,b3,b4;
} Bytes;
IPSolid Solid;
IPAddr() {SetLocalhost();};
IPAddr(unsigned char b1, unsigned char b2,
unsigned char b3, unsigned char b4)
{Bytes.b1=b1;Bytes.b2=b2;Bytes.b3=b3;Bytes.b4=b4;};
IPAddr(IPSolid l) {Solid=l;};
inline bool IsLocalhost()
{return Bytes.b1==127&&Bytes.b2==0&&Bytes.b3==0&&Bytes.b4==1;};
inline bool IsBroadcasting()
{return Bytes.b1==255&&Bytes.b2==255&&Bytes.b3==255&&Bytes.b4==255;};
inline LPTSTR GetString(LPTSTR s)
{sprintf(s,"%d.%d.%d.%d",Bytes.b1,
Bytes.b2,Bytes.b3,Bytes.b4); return s;};
bool FromString(LPTSTR s);
inline void SetLocalhost()
{Bytes.b1=127;Bytes.b2=0;Bytes.b3=0;Bytes.b4=1;};
inline void SetBroadcast()
{Bytes.b1=255;Bytes.b2=255;Bytes.b3=255;Bytes.b4=255;};
bool operator ==(unsigned long ip) {return Solid==ip;};
bool operator ==(IPAddr ip) {return Solid==ip.Solid;};
bool operator !=(unsigned long ip) {return Solid!=ip;};
bool operator !=(IPAddr ip) {return Solid!=ip.Solid;};
IPAddr& operator =(const unsigned long ip) {Solid=ip; return *this;};
IPAddr& operator =(const IPAddr ip) {Solid=ip.Solid; return *this;};
};
SmartBuffer Class
Objects of the class SmartBuffer
represent "smart buffers" that save CTP's implementation from redundant memory allocations. It reserves the place for the packet's header once per definite size of the data (maximum amount of data in a single packet) on the fly. So, the user just puts data to the smart buffer. Then sending function inserts headers, and packets are ready to go out. This class' definition follows:
class SmartBuffer
{
public:
SmartBuffer(unsigned int datasize=0, bool autodel=true,
unsigned int headsize=25, unsigned int maxdatasize=65400);
SmartBuffer(LPCTSTR fname, unsigned int datasize=0, bool autodel=true,
unsigned int headsize=25, unsigned int maxdatasize=65400);
virtual ~SmartBuffer() {delete[] m_pBuffer;};
inline unsigned int GetHeadSize() {return m_uHeadSize;};
inline unsigned int GetDataSize() {return m_uDataSize;};
void SetDataSize(unsigned int datasize);
inline unsigned int GetMaxDataSize() {return m_uMaxDataSize;};
inline unsigned int GetBufferSize() {return m_uBufferSize;};
inline bool GetAutoDel() {return m_bAutoDel;}
inline void SetAutoDel(bool autodel) {m_bAutoDel=autodel;}
inline char* GetBufferBegin() {return m_pBuffer;};
inline void* GetCurPtr() {return m_pCurPtr;};
inline bool CurPtrToDataPtr(unsigned int i)
{char* res=GetDataPtr(i);
if (res) {m_pCurPtr=res; return true;} else return false;};
inline void CurPtrToDataBegin() {m_pCurPtr=m_pBuffer+m_uHeadSize;};
inline unsigned int GetPacketsCount()
{return m_uBufferSize/(m_uHeadSize+m_uMaxDataSize)+
((m_uBufferSize%(m_uHeadSize+m_uMaxDataSize))?1:0);};
inline char* GetHeadPtr(unsigned int i)
{char* res=i*(m_uHeadSize+m_uMaxDataSize)+m_pBuffer;
if (res>m_pBuffer+m_uBufferSize) return NULL; else return res;};
inline char* GetDataPtr(unsigned int i)
{char* res=GetHeadPtr(i);
if (res) return res+m_uHeadSize; else return NULL;};
inline unsigned int GetPacketSize(unsigned int i)
{if (i<(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize))
return m_uHeadSize+m_uMaxDataSize;
else if (i==(m_uBufferSize)/(m_uHeadSize+m_uMaxDataSize))
return (m_uBufferSize)%(m_uHeadSize+m_uMaxDataSize);
else return 0;};
bool PutHead(void* src, unsigned int i);
bool PutDataByte(unsigned char bt, bool movecur=true, int dest=-1);
bool PutData(void* src, unsigned int size, bool movecur=true, int dest=-1);
inline bool PutDataString(char* str, bool movecur=true, int dest=-1)
{return PutData(str,strlen(str)+1,movecur,dest);};
bool PutDataFile(LPCTSTR fname, bool movecur=true, int dest=-1);
void Trim();
protected:
inline unsigned int GetNeededBufferSize(unsigned int datasize,
unsigned int headsize, unsigned int maxdatasize)
{return datasize?(datasize/maxdatasize*(headsize+maxdatasize)+
((datasize%maxdatasize>0)?(datasize%maxdatasize+
headsize):0)):headsize;};
inline void DestToPtr(int dest, char*& ptr, unsigned int* prtnsize);
bool m_bAutoDel;
unsigned int m_uHeadSize;
unsigned int m_uDataSize;
unsigned int m_uMaxDataSize;
unsigned int m_uBufferSize;
char* m_pBuffer;
char* m_pCurPtr;
};
NetSender Class
Class NetSender
is a base class for CCTPNet
, that implements the main functionality of CTP. NetSender
is used only to describe the interface of the common network sending class for an arbitrary protocol.
class NetSender
{
public:
virtual bool Send(SmartBuffer& sb, unsigned __int16 command,
IPAddr to, unsigned __int8 options=0, bool storeiffail=true)=0;
virtual bool IsWorking() {return true;};
};
NetReceiver Class
If there is NetSender
class, then there must be NetReceiver
class also. NetReceiver
is used to describe the interface of objects that can subscribe for the delivery of information about data arrival and errors.
class NetReceiver
{
public:
virtual void OnReceive(void* data)=0;
virtual void OnError(void* data)=0;
};
If an object is to subscribe for deliveries, it is necessary for its class to be NetReceiver
's descendant (support of multiple inheritance in C++ allows to add an additional ancestor to any class). Member-functions OnReceive
and OnError
will be called on message arrival and on error, respectively. In the first case, pointer data
will point to the description of arrived data; in the second case, data
will point to some error description generated by NetSender
's descendant.
Parameters of these member-functions are pointers to void
, not to some concrete class, because NetReceiver
class is also oriented to arbitrary protocols. When working with CTP, OnReceive
's parameter will point to an object of CCTPReceivedData
class, and OnError
's parameter will point to an object of CCTPErrorInfo
class.
CCTPReceivedData Class
Objects of this class describe and give access to the received data. It needs no explanation.
struct CCTPReceivedData {
CCTPReceivedData(unsigned __int16 command,
unsigned __int64 size, unsigned long from, char* buf);
virtual ~CCTPReceivedData() {delete[] pBuf;};
unsigned __int16 command;
unsigned __int64 size;
IPAddr from;
char* pBuf;
};
CCTPErrorInfo Class
Objects of this class describe the error occurred while networking.
struct CTCPErrorInfo {
CTCPErrorInfo(unsigned char type,int code,IPAddr addr)
{this->type=type; this->code=code;
this->addr=addr; GetTimeStamp(timestamp);};
static char* GetTimeStamp(char* s);
unsigned char type;
int code;
IPAddr addr;
char timestamp[22];
};
Everything must be clear above, except one circumstance: why timestamp
is needed as a field of this class? It is the fact that the moment, when a network error occurred, may be very important, for example, for building log files. But the time when the error's description has been delivered may differ greatly from the time when it had taken place. To avoid such mistakes, timestamp was decided to be included as a field of CCTPErrorInfo
class, and will be filled just during the object's construction. Timestamp for the current moment can be always retrieved with the help of the static member-function GetTimeStamp
with the accuracy of thousandth of a second.
CCTPNet Class
This class implements CTP's main functions (client and server simultaneously). The following definition describes the members:
class CCTPNet: public NetSender
{
public:
#pragma pack(push)
#pragma pack(1)
struct Header {
Header() {size=0;command=0;number=0;amount=0;id=0;messize=0;options=0;}
void ToStream(ostream& out);
unsigned __int16 size ;
unsigned __int16 command ;
unsigned __int32 number ;
unsigned __int32 amount ;
unsigned __int32 id ;
unsigned __int64 messize ;
unsigned __int8 options ;
};
#pragma pack(pop)
enum Options {
DelAfterError=0x01,
NoResend=0x02,
UniqueCommand=0x04,
Broadcast=0x08,
StartSession=0x10
};
static const unsigned __int8 OptPing;
enum DeliveryType {
ReceivedData,
ErrorInfo
};
struct Delivery {
Delivery(NetReceiver* target,CCTPErrorInfo* data)
{this->target=target; this->data=data;
this->type=DeliveryType::ErrorInfo;};
Delivery(NetReceiver* target,CCTPReceivedData* data)
{this->target=target; this->data=data;
this->type=DeliveryType::ReceivedData;};
Delivery() {target=NULL; data=NULL; type=(DeliveryType)NULL;};
NetReceiver* target;
void* data;
DeliveryType type;
};
typedef list<Delivery> DeliveriesList;
struct Times {
Times() {
uMultiplier= 3;
uDefTimeout= 100;
uSleepOnDestroy= 50;
uSleepSuspended= 10;
uSleepDelMan= 10;
uSleepNothing= 20;
uPeriodDestroy= 2000;
uPeriodAutoDest= 20000;
uPeriodCheckResend=100;
};
unsigned int uMultiplier;
unsigned int uDefTimeout;
unsigned int uSleepOnDestroy;
unsigned int uSleepSuspended;
unsigned int uSleepDelMan;
unsigned int uSleepNothing;
unsigned int uPeriodDestroy;
unsigned int uPeriodAutoDest;
unsigned int uPeriodCheckResend;
};
CCTPNet(NetReceiver* receiver, unsigned short port,
unsigned short servers=1,Times* times=NULL,ostream* log=NULL,
unsigned __int16 packetdatasize=65400,
unsigned short maxthreads=50);
virtual ~CCTPNet();
const Times& GetTimes() {return m_Times;}
void SetTimes(Times& times) {m_Times=times;}
unsigned short GetPort() {return m_uPort;}
void SetPort(unsigned short port)
{closesocket(m_SendSocket); closesocket(m_RecvSocket);
FreeSntCommands(); FreeSessions(); FreeLargeCommands();
m_uPort=port; CreateSockets();}
unsigned __int16 GetPacketDataSize() {return m_uPacketDataSize;}
void SetPacketDataSize(unsigned __int16 ps)
{delete[] m_pBuffer; m_uPacketDataSize=ps;
m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];}
void SetMaxDeliverers(unsigned short maxthreads)
{m_uMaxDeliverers=maxthreads;};
unsigned short GetMaxDeliverers() {return m_uMaxDeliverers;};
NetReceiver* GetDefaultReceiver() {return m_DefReceiver;}
void SetDefaultReceiver(NetReceiver* receiver) {m_DefReceiver=receiver;}
void AddSpecialReceiver(unsigned __int16 command,
NetReceiver* receiver, DeliveryType type);
void DeleteSpecialReceiver(NetReceiver* receiver);
NetReceiver* GetReceiver(unsigned __int16 command, DeliveryType type);
bool GetSuspended() {return m_bSuspended;};
void SetSuspended(bool suspended) {m_bSuspended=suspended;};
virtual bool IsWorking() {return !m_bSuspended;};
static unsigned __int16 GetHeaderSize() {return sizeof(Header);}
virtual bool Send(SmartBuffer& sb, unsigned __int16 command,
IPAddr to, unsigned __int8 options=0, bool storeiffail=true);
bool Send(unsigned __int16 command, IPAddr to, unsigned __int8 options=0,
bool storeiffail=true) {return Send(*(new SmartBuffer()),
command,to,options,storeiffail);};
bool SaveRcvPacket(unsigned long from,Header* head);
void ConfirmSntPacket(unsigned long to,Header* header);
void SendConfirmation(unsigned long to,Header header);
bool ArrangeLargeCommand(unsigned long from,Header* head);
void ResendNotConfirmedData();
inline bool IsConfirmation(unsigned __int16 command)
{return (command&m_iConfirm)!=0;};
inline unsigned int GetSntCommandsCount() {return m_SntCommands.size();};
inline unsigned int GetSessionsCount() {return m_Sessions.size();};
inline unsigned int GetLrgMessagesCount() {return m_LargeCommands.size();};
inline unsigned int GetDelThreadsCount() {return m_pDeliverTrds.size();};
inline unsigned int GetBusyDelThreadsCount() {return m_uBusy;};
inline unsigned int GetDelCount() {return m_Deliveries.size();};
protected:
void FreeSntCommands();
void FreeSessions();
void FreeLargeCommands();
void FreeDeliveries();
bool SendPacket(char* buf, unsigned long to);
bool CreateSockets();
void CheckupOptions(Header& header);
public:
SOCKET m_RecvSocket;
char* m_pBuffer;
bool m_bKill;
unsigned short m_uMaxDeliverers;
ostream* m_pLog;
DeliveriesList m_Deliveries;
vector<CWinThread*> m_pServerTrds;
CWinThread* m_pDelManTrd;
vector<CWinThread*> m_pDeliverTrds;
CCriticalSection m_csServerTrds;
CCriticalSection m_csDeliverTrds;
CCriticalSection m_csDeliveries;
CCriticalSection m_csSntCommands;
CCriticalSection m_csSessions;
CCriticalSection m_csLargeCommands;
CCriticalSection m_csNetwork;
CCriticalSection m_csLog;
unsigned short m_uBusy;
inline static bool Less(unsigned __int32 a,unsigned __int32 b)
{if (max(a,b)-min(a,b)>0x7fffffff)
return !(a<b); else return a<b;}
static const unsigned __int16 m_iConfirm;
protected:
void GetNextID(Header& head, IPAddr addr);
unsigned int GetTimeout(IPAddr addr, bool bcast);
void SetTimeout(IPAddr addr, bool bcast, unsigned int timeout);
struct SntCommandInfo {
SntCommandInfo():sbBody(*(new SmartBuffer()))
{CI=NULL; uCount=0;}
SntCommandInfo(SmartBuffer& sb, DWORD time,
unsigned long to):sbBody(sb)
{ipTo=to; uCount=sb.GetPacketsCount(); CI=new CommandInfo[uCount];
for (unsigned int i=0; i<uCount; i++)
{CI[i].dwTime=time; CI[i].dwLTime=time;}}
bool Confirm(unsigned int i);
inline void Free() {delete[] CI; if (sbBody.GetAutoDel()) delete &sbBody;};
unsigned long ipTo;
SmartBuffer& sbBody;
unsigned __int32 uCount;
struct CommandInfo {
CommandInfo() {uResend=1; dwTime=0; dwLTime=0; bConfirmed=false;};
void IncResend() {if (uResend<16384) uResend<<=1;}
bool IsDeadTimeout() {return uResend>=256;}
unsigned int uResend;
DWORD dwTime;
DWORD dwLTime;
bool bConfirmed;
};
CommandInfo* CI;
};
typedef list<SntCommandInfo> SntCommandInfoList;
struct SessionInfo {
SessionInfo()
{id=rand()*rand(); timeout=0;
received.clear(); minwasset=false;}
typedef list<unsigned __int32> RcvList;
unsigned __int32 id;
bool minwasset;
unsigned int timeout;
RcvList received;
};
typedef map<IPAddr::IPSolid,SessionInfo> SessionsInfo;
struct LargeCommandInfo {
LargeCommandInfo(unsigned __int16 command, unsigned __int64 size,
unsigned long from, unsigned __int32 id, unsigned __int32 amount)
{pRD=new CCTPReceivedData(command,size,from,NULL);
this->id=id; uCount=amount; received=new bool[uCount];
for (unsigned int i=0; i<uCount; i++)
received[i]=false;};
LargeCommandInfo()
{id=0; uCount=0; received=NULL;
pRD=NULL;};
inline bool GotPart(unsigned int i);
inline void Free() {delete[] received;};
unsigned __int32 id;
unsigned __int32 uCount;
bool* received;
CCTPReceivedData* pRD;
};
typedef list<LargeCommandInfo> LargeCommandInfoList;
struct SpecialReceiver {
SpecialReceiver(unsigned __int16 command,
NetReceiver* receiver, DeliveryType type)
{this->command=command;
this->receiver=receiver;
this->type=type;};
SpecialReceiver()
{command=0;receiver=NULL;
type=(DeliveryType)NULL;};
unsigned __int16 command;
NetReceiver* receiver;
DeliveryType type;
};
typedef list<SpecialReceiver> SpecialReceiversList;
SessionsInfo m_Sessions;
SntCommandInfoList m_SntCommands;
LargeCommandInfoList m_LargeCommands;
NetReceiver* m_DefReceiver;
SpecialReceiversList m_Receivers;
SOCKET m_SendSocket;
SOCKADDR_IN m_Local;
unsigned short m_uPort;
unsigned __int16 m_uPacketDataSize;
Times m_Times;
bool m_bSuspended;
SessionInfo& GetSessionInfo(IPAddr addr, bool bcast);
};
CCTPStatusDlg Class
This class allows to display a dialog which shows CTP loading: the amount of elements in storages and the amount of deliverer threads. It also gives the ability to suspend servers. The class has a constructor, which takes a reference to an object of CCTPNet
class, to keep an eye on. CCTPStatusDlg
class definition follows:
class CCTPStatusDlg : public CDialog
{
public:
CCTPStatusDlg(CCTPNet& ctp, UINT cycle,
CWnd* pParent = NULL);
enum { IDD = IDD_CTPSTATUS };
public:
virtual BOOL DestroyWindow();
protected:
virtual void DoDataExchange(CDataExchange* pDX);
protected:
void SetSuspendStatus();
CCTPNet& m_CTP;
UINT m_uTimer;
UINT m_uCycle;
afx_msg void OnTimer(UINT nIDEvent);
virtual BOOL OnInitDialog();
afx_msg void OnShowWindow(BOOL bShow, UINT nStatus);
afx_msg void OnBsuspend();
DECLARE_MESSAGE_MAP()
};
Remember, that if you want to use this dialog in your project, you have to carry the dialog template IDD_CTPSTATUS
and strings IDS_CTP_SUSPEND
and IDD_CTP_RESUME
from the demo application to your one.
How to use all this?
First of all, add files CTPNet.h, CTPNet.cpp, NetBasics.h, and NetBasics.cpp to your project. Then put the following directives to StdAfx.h:
#include <afxmt.h>
#pragma warning(disable: 4786)
#pragma warning(push)
#pragma warning(disable: 4245)
#pragma warning(disable: 4100)
#pragma warning(disable: 4663)
#pragma warning(disable: 4018)
#pragma warning(disable: 4097)
#include <map>
#include <list>
#include <vector>
#include <algorithm>
#include <iostream>
using namespace std;
#pragma warning(pop)
#include <sys/timeb.h>
#include <time.h>
#include <math.h>
#include <Winsock2.h>
#include <Ws2tcpip.h>
This code is provided in the file NetIncludes.h. It is also necessary to link WinSockets library ws2_32.lib to your project (choose "Project" | "Settings" | "Link", then type "ws2_32.lib" in "Object/library modules" edit field).
Then, start Winsock up. For this purpose, for example, put the following code in the project's main window's initialization function:
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2),&wsaData);
After this, place the following code to start the CTP server up:
m_pCTP = new CCTPNet(m_pCTPReceiver,1515);
m_pCTP->SetSuspended(false);
The last code is correct in the assumption that m_pCTPReceiver
is NetReceiver
's descendant. For example, you can add NetReceiver
to your main window's parents.
Measurements
The demo application which is provided with this article allows to try the described CTP implementation. It also includes implementations of TCP and UDP in the same framework, so all these protocols can be used together and, obviously, can be compared (fig. 3).
Fig. 3. Time of interchange via CTP, TCP and UDP (where possible) in microseconds versus size of command�s data.
For this experiment, system clocks of two workstations were synchronized via SNTP with the same time server, using NetTime 2.0. Extreme values of interchange time were taken for the diagram.
A similar result of CTP and UDP shows that CTP's implementation doesn't use a critical amount of resources. Its overhead expenses are small enough to be ignored.
CTP is twice faster than TCP while working with normal commands and not very large commands that can be brought by several packets. That is great result, because the overwhelming majority of interactions in clusters are performed using normal messages. Order to start computations, query of some values, and response for such queries are small.
TCP is better for large commands. Nevertheless, huge data blocks appear in cluster computations rarely, for example, on the stage of task separation (and even here, not always). An important note is that CTP is not critically slow for large blocks, so it can be used as networking mechanism in clusters, paying attention to the previous paragraph.
Besides, a test has been performed on two nodes, because this is more interesting here: pure protocol's implementation throughput. Results of comparison for rapid interchange between dozens of nodes are to be more pleasant for CTP, because its activities will stay the same, but TCP will loose a lot on channels creating and recreating. For CTP, it does not matter who the recipient is.
Reason why it is impossible to overcome TCP is because it is on kernel level, but CTP is implemented by the application. From one side - this is a disadvantage of the last one, but from another, it is absolutely independent and complete.
References
- Jones A., Ohlund J. Network Programming for Windows - Microsoft Press. 2000.
- Nemeth E., Snyder G., Seebass S., Hein T. R. Unix System Administration Handbook. Third Edition - Prentice Hall PTR. 2001.
- Naumov L. CAME&L - Cellular Automata Modeling Environment & Library // Cellular Automata. Sixth International Conference on Cellular Automata for Research and Industry (ACRI-2004). 2004. Available from project home site.
History
- 19 April 2004 - Release of CTP v. 1.0.
- 16 September 2004 - Release of CTP v. 1.1.
Improvements:
- "smart buffers" support;
- timing performance becomes a bit better;
- some bugs fixed.
- 31 January 2005 - Release of CTP v. 1.2.
Improvements:
- session information is taken into account now; so now we have adaptive timeouts, intelligent resending, and so on;
- now there is no need to store anything in the system registry;
- confirmations interchange ameliorated;
- received packets storage concept was substituted and improved;
- full-scale broadcasting support;
- support of multiple servers;
- more features for "smart buffers";
- featurefull debug and logging interface;
- all timeouts and delays become tunable;
- timing performance becomes significantly better;
- a lot of bugs fixed.