Introduction
Semaphores and Mutex are heavily used for inter-process and multi-threaded signaling. This article demonstrates an Object Oriented wrapper over Unix system calls for creating Mutex, semaphores, and threads. Like the .NET framework's implementation, here also, apart for Mutex, the Semaphore
class can be used for both Shared (system wide) and Local semaphores.
Background
Basic knowledge of semaphores, Mutex, and threads are required.
Using the code
This article provides you classes for creating semaphores and Mutex in an object oriented way. The interfaces are inspired by the .NET framework's implementation of semaphores and Mutex.
The following example shows how to create and use local semaphores for multi threaded signaling. In the program, we are creating two threads and starting them. The callback has a call to ::localSemaphore.WaitOne()
. So, the two threads will call two WAitOne()
. That means each thread waits to acquire a room in the semaphore. But the call to WaitOne()
blocks them from going ahead because the initial count (i.e., the initially available room count) is set to 0. Two Release()
calls or a Release(2)
call is required to help them. The getchar()
function halts the program until the user presses a key. As the user presses any key, the program call the next method, ::localSemaphore.Release(2)
. This calls signals the two threads which are waiting. As the threads can proceed now (they succeed to acquire a room, i.e., the WaitOne()
call returns), their callback routines continue to end. The program waits for each of the threads to complete by calling the Join()
method.
#include <stdlib.h>
#include <iostream>
#include "Thread.h"
#include "Semaphore.h"
using namespace mom;
var_t callbackMethod(var_t arg);
Semaphore localSemaphore((unsigned int)0, (unsigned int)2);
int main(int argc, char** argv) {
std::cout <<std::endl
<<std::endl;
Thread thread1(callbackMethod);
Thread thread2(callbackMethod);
thread1.Start((var_t)"1");
thread2.Start((var_t)"2");
std::cout <<"press any key to signal"
<<std::endl;
getchar();
::localSemaphore.Release(2);
thread1.Join();
std::cout <<"thread1 exited"
<<std::endl;
thread2.Join();
std::cout <<"thread2 exited"
<<std::endl;
std::cout <<"press any key to exit"
<<std::endl;
getchar();
return (EXIT_SUCCESS);
}
var_t callbackMethod(var_t arg) {
std::cout <<"Thread "
<<(const char*)arg
<<" callback_method called .. "
<<"waiting for signal to exit.."
<<std::endl;
::localSemaphore.WaitOne();
}
The same semaphore class can also be used to created system wide shared semaphores that can be used by multiple processes for signaling purpose. Let's assume we have two processes and those will share a system wide semaphore to signal the other.
Let's define Process 1:
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
std::cout <<"Process 1 Started... waiting for for signal from Process 2"
<<std::endl;
systemWideSemaphore.WaitOne();
std::cout <<"Signal received from Process2.. exiting Process1"
<<std::endl;
return (EXIT_SUCCESS);
}
And for Process 2, we have:
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
using namespace mom;
int main(int argc, char** argv) {
std::cout <<std::endl;
Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
systemWideSemaphore.Release();
return (EXIT_SUCCESS);
}
Process 1 calls the WaitOne()
method on the semaphore to acquire a room. As the initial count was 0, it waits there until any room is released for it to go ahead. Process 2 does this job. When started, Process 2 retrieves the semaphore with the given key and calls Release()
to provide a room for Process 1 to go ahead. So after the systemWideSemaphore.Release()
method is called in Process 2, Process 1 exits.
Source code
Please find the source code for the classes here:
Semaphore.h:
#ifndef _SEMAPHORE_H
#define _SEMAPHORE_H
namespace mom {
class Semaphore {
public:
Semaphore(unsigned short sem_id, unsigned int initial_count, unsigned int max_count);
Semaphore(unsigned int initial_count, unsigned int max_count);
void WaitOne();
void Release();
void Release(int release_count);
virtual ~Semaphore();
private:
bool _is_local;
void* _semaphore_instance_ptr;
};
}
#endif /* _SEMAPHORE_H */
Semaphore.cpp:
#include "Semaphore.h"
#include "Mutex.h"
#include "Thread.h"
#include <iostream>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <errno.h>
namespace mom {
class __shared_semaphore {
public:
__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count);
void _wait_one();
void _release(unsigned int release_count);
private:
void _release_internal(unsigned int release_count);
key_t _sem_key;
unsigned int _max_count;
int _sem_id;
};
__shared_semaphore::__shared_semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
_sem_key = (key_t)sem_id;
_max_count = max_count;
_sem_id = -1;
mode_t sem_mode = IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
_sem_id = semget(_sem_key, 0, 0);
if( _sem_id == -1 ) { _sem_id = semget(_sem_key, 1, sem_mode);
if(_sem_id == -1) {
if (errno == EEXIST) {
perror("IPC error 1: semget");
}
else {
perror("IPC error 2: semget");
}
exit(1);
}
_release_internal(initial_count);
}
}
void __shared_semaphore::_wait_one() {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = -1;
sem_instruction.sem_flg = SEM_UNDO;
if(semop(_sem_id, &sem_instruction, 1)!=-1) {
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
void __shared_semaphore::_release(unsigned int release_count) {
if(semctl(_sem_id, 0, GETNCNT, 0) > 0) {
_release_internal(release_count);
}
else {
}
}
void __shared_semaphore::_release_internal(unsigned int release_count) {
if(semctl(_sem_id, 0, GETVAL, 0) < _max_count) {
sembuf sem_instruction;
sem_instruction.sem_num = 0;
sem_instruction.sem_op = release_count;
sem_instruction.sem_flg = IPC_NOWAIT|SEM_UNDO;
if(semop(_sem_id, &sem_instruction, 1) != -1) {
semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
}
}
else {
}
}
class __local_semaphore {
public:
__local_semaphore(unsigned int initial_count, unsigned int max_count);
void _wait_one();
void _release(unsigned int release_count);
private:
unsigned int _initial_count;
unsigned int _max_count;
Mutex* _wait_handle;
bool _waiting;
};
__local_semaphore::__local_semaphore(unsigned int initial_count,
unsigned int max_count) {
_initial_count = initial_count;
_max_count = max_count;
_wait_handle = new Mutex();
_wait_handle->Lock();
_waiting = false;
}
void __local_semaphore::_wait_one() {
if(_initial_count == _max_count) {
_waiting = true;
_wait_handle->Lock();
}
else if(_initial_count < _max_count) {
_initial_count ++;
}
}
void __local_semaphore::_release(unsigned int release_count) {
_initial_count -= release_count;
if(_waiting) {
_waiting = false;
_wait_handle->Unlock();
}
}
Semaphore::Semaphore(unsigned short sem_id,
unsigned int initial_count, unsigned int max_count) {
_is_local = false;
__shared_semaphore* shared_semaphore =
new __shared_semaphore(sem_id, initial_count, max_count);
_semaphore_instance_ptr = (void*)shared_semaphore;
}
Semaphore::Semaphore(unsigned int initial_count, unsigned int max_count) {
_is_local = true;
__local_semaphore* local_semaphore =
new __local_semaphore(initial_count, max_count);
_semaphore_instance_ptr = (void*)local_semaphore;
}
void Semaphore::WaitOne() {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_wait_one();
}
}
void Semaphore::Release(int release_count) {
if(_is_local) {
((__local_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
else {
((__shared_semaphore*)_semaphore_instance_ptr)->_release(release_count);
}
}
void Semaphore::Release() {
Release(1);
}
Semaphore ::~Semaphore() {
if(_is_local) {
__local_semaphore* __semaphore_ptr =
(__local_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
else {
__shared_semaphore* __semaphore_ptr =
(__shared_semaphore*)_semaphore_instance_ptr;
delete __semaphore_ptr;
}
_semaphore_instance_ptr = NULL;
}
}
Mutex.h:
#ifndef _MUTEX_H
#define _MUTEX_H
#include <pthread.h>
class Mutex {
public:
void Lock();
void Unlock();
private:
pthread_mutex_t _mutex;
};
#endif /* _MUTEX_H */
Mutex.cpp:
#include <stdio.h>
#include <ios>
#include <pthread.h>
#include "Mutex.h"
void Mutex::Lock() {
pthread_mutex_lock(&_mutex);
}
void Mutex::Unlock() {
pthread_mutex_unlock(&_mutex);
}
Thread.h:
#ifndef _THREAD_H
#define _THREAD_H
#include<pthread.h>
#include <stdio.h>
#include <ios>
namespace mom {
typedef void* var_t;
typedef var_t (*thread_start_t)(var_t);
class Thread {
public:
Thread(thread_start_t thread_start);
void Start(var_t thread_args);
void Join();
static int Sleep(unsigned long millisecs) {
long sec = (long)(millisecs / 1000);
long nsec = (millisecs - (sec*1000))*1000;
timespec delay = {sec, nsec};
int return_val = nanosleep(&delay, (timespec*)NULL);
return return_val;
}
private:
thread_start_t _thread_start;
pthread_t _thread;
};
}
#endif /* _THREAD_H */
Thread.cpp:
#include "Thread.h"
namespace mom {
Thread :: Thread(thread_start_t thread_start) {
_thread_start = thread_start;
}
void Thread :: Start(var_t thread_args) {
pthread_create(&_thread, NULL, _thread_start, thread_args);
}
void Thread :: Join() {
pthread_join(_thread, NULL);
}
}