Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Multithreaded and Interprocess Signaling Using Semaphores in C++

4.38/5 (5 votes)
24 Sep 2009CPOL2 min read 75K   1.2K  
Multithreaded and interprocess signaling using semaphores in C++.

Introduction

Semaphores and Mutex are heavily used for inter-process and multi-threaded signaling. This article demonstrates an Object Oriented wrapper over Unix system calls for creating Mutex, semaphores, and threads. Like the .NET framework's implementation, here also, apart for Mutex, the Semaphore class can be used for both Shared (system wide) and Local semaphores.

Background

Basic knowledge of semaphores, Mutex, and threads are required.

Using the code

This article provides you classes for creating semaphores and Mutex in an object oriented way. The interfaces are inspired by the .NET framework's implementation of semaphores and Mutex.

The following example shows how to create and use local semaphores for multi threaded signaling. In the program, we are creating two threads and starting them. The callback has a call to ::localSemaphore.WaitOne(). So, the two threads will call two WAitOne(). That means each thread waits to acquire a room in the semaphore. But the call to WaitOne() blocks them from going ahead because the initial count (i.e., the initially available room count) is set to 0. Two Release() calls or a Release(2) call is required to help them. The getchar() function halts the program until the user presses a key. As the user presses any key, the program call the next method, ::localSemaphore.Release(2). This calls signals the two threads which are waiting. As the threads can proceed now (they succeed to acquire a room, i.e., the WaitOne() call returns), their callback routines continue to end. The program waits for each of the threads to complete by calling the Join() method.

C++
#include <stdlib.h>
#include <iostream>
#include "Thread.h"
#include "Semaphore.h"

using namespace mom;
//declear the callback method
var_t callbackMethod(var_t arg);
//define the 'local semaphore' instance
Semaphore localSemaphore((unsigned int)0, (unsigned int)2);
//initial-count = 0 (no room available to enter), max-size = 2 

int main(int argc, char** argv) {

    std::cout <<std::endl
              <<std::endl;
    //create two thread objects
    Thread thread1(callbackMethod);
    Thread thread2(callbackMethod);

    //lets start the threads
    thread1.Start((var_t)"1");
    thread2.Start((var_t)"2");
    
    std::cout <<"press any key to signal"
              <<std::endl;
    //two threads were started and they are waiting to enter the semaphore 
    getchar();
    
    //now providing two rooms that can be occupied 
    ::localSemaphore.Release(2);

    //main thread will wait for the threads to complete their routine
    thread1.Join();
    std::cout <<"thread1 exited"
              <<std::endl;
    thread2.Join();
    std::cout <<"thread2 exited"
              <<std::endl;

    std::cout <<"press any key to exit"
              <<std::endl;
    getchar();
    return (EXIT_SUCCESS);
}

var_t callbackMethod(var_t arg) {

    std::cout <<"Thread "
              <<(const char*)arg
              <<" callback_method called .. "
              <<"waiting for signal to exit.."
              <<std::endl;

    //try to occupy a room in the semaphore. block the thread until sucess
    ::localSemaphore.WaitOne();
}

The same semaphore class can also be used to created system wide shared semaphores that can be used by multiple processes for signaling purpose. Let's assume we have two processes and those will share a system wide semaphore to signal the other.

Let's define Process 1:

C++
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
 * 
 */
using namespace mom;
int main(int argc, char** argv) {

    std::cout <<std::endl;
    //lets create a system wide semaphore with sem_id 111
    Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
    //initial-count: 0 (no room available to occupy)

    std::cout <<"Process 1 Started... waiting for for signal from Process 2"
              <<std::endl;
    //wait util a room is available to occupy on the semaphore (sem_id 100)
    systemWideSemaphore.WaitOne();    
    std::cout <<"Signal received from Process2.. exiting Process1"
              <<std::endl;

    return (EXIT_SUCCESS);
}

And for Process 2, we have:

C++
#include <stdlib.h>
#include <iostream>
#include "Semaphore.h"
/*
 * 
 */
using namespace mom;
int main(int argc, char** argv) {

    std::cout <<std::endl;
    //lets retrieve the system wide semaphore with sem_id 111
    Semaphore systemWideSemaphore((unsigned short)50, 0, 10);
   /* std::cout <<"Process 2 Started... press any key to signal Process1"
              <<std::endl;
    getchar();*/
    //Process1 is already waiting to occupy a room in the semaphore
    //release/provide a room in the semaphore so that Process1 can continue
    systemWideSemaphore.Release();
    return (EXIT_SUCCESS);
}

Process 1 calls the WaitOne() method on the semaphore to acquire a room. As the initial count was 0, it waits there until any room is released for it to go ahead. Process 2 does this job. When started, Process 2 retrieves the semaphore with the given key and calls Release() to provide a room for Process 1 to go ahead. So after the systemWideSemaphore.Release() method is called in Process 2, Process 1 exits.

Source code

Please find the source code for the classes here:

Semaphore.h:

C++
/* 
 * File:   Semaphore.h
 * Author: Souvik Chatterjee
 * This class is a C# like object oriented wrapper
 * for local(in process) and shared(system wide) semaphore
 * This file defines the interface for the Semaphore class
 *
 * Important: I designed the class inspired
 * by the .NET framework's implementation of Semaphore
 * In .Net framework, apart from Mutex there is a Semaphore class
 * which can be used as both local and shared 
 * semaphores. I am not sure of .Net internal implementation.
 * But this C++ class internally uses two completely 
 * different implementation UNIX semget C system call
 * and UNIX pthread C system call for shared and local semaphores
 * respectively.
 */

#ifndef _SEMAPHORE_H
#define    _SEMAPHORE_H


namespace mom {    
    class Semaphore {
    public:
        //This constructor creates(or retrieves existing)
        //system wide semaphore with the given sem_id
        //using this call 65535 different system wide semaphores can be created
        //this shared semaphore is implemented with UNIX semget C system call
        Semaphore(unsigned short sem_id, unsigned int initial_count, unsigned int max_count);

        //This constructor creates local(in process) semaphore
        //in UNIX local semaphore in a MUTEX. 
        //this local semaphore internally uses UNIX pthread mutext
        Semaphore(unsigned int initial_count, unsigned int max_count);
        
        //waits until succeeds to acquire a room in the semaphore object
        void WaitOne();
        //releases one room among the acquired rooms in the semaphore objet
        void Release();
        //releases specified number(release_count) rooms 
        //among the acquired rooms in the semaphore objet
        void Release(int release_count);
        //destructor
        virtual ~Semaphore();

    private:        
        //internal flag to determine if the created semaphore is local or shared
        bool _is_local;
        //internal handle for the created semaphore instance
        void* _semaphore_instance_ptr;
    };
}

#endif    /* _SEMAPHORE_H */

Semaphore.cpp:

C++
/* 
 * File:   SemaphoreWrapper.cpp
 * Author: Souvik Chatterjee
 * This file defines the implementation
 * of the Semaphore interface declared in Semaphore.h
 * Semaphore class uses __shared_semaphore class
 * for shared(system wide) semaphore which is basically an object
 * oriented wrapper over UNIX semget.
 * On the other hand, it uses __local_semaphore class
 * for local(in process) semaphore which is basically an object 
 * oriented wrapper over UNIX pthread mutex.
 * Reference: to know understand the UNIX system calls
 * used throughout this implementation please refer to 
 * Open Group (http://www.unix.org/single_unix_specification/) and 
 * The Linux Programmer's Guide (http://tldp.org/LDP/lpg/)
 */

#include "Semaphore.h"
#include "Mutex.h"
#include "Thread.h"
#include <iostream>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <errno.h>

namespace mom {
    //------------------- System wide shared semaphore 
    //                   (class __shared_semaphore)-------------------
    class __shared_semaphore {
    public:
        // creates (or retrieves existing) system wide(shared)
        // semaphore with given semaphore id(sem_id)
        // initial_count refers to the initially available
        // rooms that can be acquired by _wait_one method
        // max_count refers to the maximum number of rooms
        // that can be acquired by _wait_one_method
        __shared_semaphore(unsigned short sem_id, 
                 unsigned int initial_count, unsigned int max_count);
        
        // waits untils succeeds to acquire a room in the semaphore object
        void _wait_one();
        // releases specified number(release_count) of rooms
        // among the acquired rooms in the semaphore object
        // if no rooms are currently occupied, it simply ignores
        // the call. you an implement it with a custom exception 
        // thrown
        void _release(unsigned int release_count);

    private:
        // releases specified number(release_count) of rooms
        // among the acquired rooms in the semaphore object
        // it can not provided rooms exceeding the <max_count>.
        // any such attempt will be simply ignored. you can 
        // implement this with a custom xception thrown
        void _release_internal(unsigned int release_count);
        //holds the key of the shared semaphore object
        key_t _sem_key;
        //holds the maximum count for the semaphore object        
        unsigned int _max_count;
        //holds the semaphore id retrieved from the system
        int _sem_id;              
    };

    __shared_semaphore::__shared_semaphore(unsigned short sem_id, 
                   unsigned int initial_count, unsigned int max_count) {        
        //set the key
        _sem_key = (key_t)sem_id;
        //set max count
        _max_count = max_count;
        //set wait instruction
                
        //set sem id to not set i.e. -1
        _sem_id = -1;
        
        //define the semaphore creation mode
        // IPC_CREATE: create a new semaphore if already
        // there is no sem_id associated with sem_key
        // IPC_EXCL: the semget function will fail if there
        // is already sem_id exists associated with the sem_key
        // S_IRUSR: owner has the read permission on semaphore object
        // S_IWUSR: owner has the write permission on semaphore object
        // S_IROTH: read permission on semaphore object for others
        // S_IWOTH: write permission on semaphore object for others
        mode_t sem_mode = IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR | 
                          S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;

        //lets try to retrieve the semaphore id for
        //the existing semaphore(if any) associated with the sem_key
        //it will return -1 if there is no semaphore 
        //available in the system associated with the given key
        _sem_id = semget(_sem_key, 0, 0);

        if( _sem_id == -1 ) { //okay no semaphore found in the system for the given key
            //now lets create a new semaphore with the sem_key and with sem_mode
            _sem_id = semget(_sem_key, 1, sem_mode); 
            //lets assume it failed due to some reason..
            //if you use this code, I will recommend to use 
            //proper object oriented exception handling here
            if(_sem_id == -1) {
                if (errno == EEXIST) {
                    perror("IPC error 1: semget");
                }
                else {
                    perror("IPC error 2: semget");
                }
                exit(1);
            }
            //this process created the semaphore first
            //lets provide <initial_count> number of rooms
            _release_internal(initial_count);
        }
    }

    void __shared_semaphore::_wait_one() {
        
        sembuf sem_instruction;
        sem_instruction.sem_num = 0;
        sem_instruction.sem_op = -1;
        sem_instruction.sem_flg = SEM_UNDO;
        //execute the semop system call on the semaphore 
        //with the prepared wait instruction
        if(semop(_sem_id, &sem_instruction, 1)!=-1) {
            //for proper functionality, this line of code is required
            //it sets the semaphore's current value 
            //in the system which other process can feel
            //i am not very much sure why it is required. 
            //I used it after doing a lots of debugging
            //please put a comment in the article 
            //if you have the detailed information for it
            semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));
        }
    }

    void __shared_semaphore::_release(unsigned int release_count) {
        
        if(semctl(_sem_id, 0, GETNCNT, 0) > 0) {
        //if atleast one process is waiting for a resource
            _release_internal(release_count);
        }
        else {
            //no process is waiting fo the resource.. 
            //so simply ignored the call.. you should throw some
            // custom exception from here
        }
    }

    void __shared_semaphore::_release_internal(unsigned int release_count) {
        
        if(semctl(_sem_id, 0, GETVAL, 0) < _max_count) {
            sembuf sem_instruction;
            sem_instruction.sem_num = 0;
            sem_instruction.sem_op = release_count;
            sem_instruction.sem_flg = IPC_NOWAIT|SEM_UNDO;
            //execute the semop system call on the semaphore 
            //with the prepared signal instruction
            if(semop(_sem_id, &sem_instruction, 1) != -1) {
                //for proper functionality, this line of code is required
                //it sets the semaphore's current value 
                //in the system which other process can feel
                //i am not very much sure why it is required. 
                //I used it after doing a lots of debugging
                //please put a comment in the article 
                //if you have the detailed information for it
                semctl( _sem_id, 0, SETVAL, semctl(_sem_id, 0, GETVAL, 0));            
            }
        }
        else {
            //ignored the call. you should thorw some custo exception
        }
    }
    
//----------------------- Local semaphore --------------(class __local_semaphore)--------
    class __local_semaphore {
    public:
        //creates a logical couting semaphore using mutex.
        //This semaphore has no scope out side the process
        //inwhich its running. so it can be used 
        //for inter-thread signalling but not interprocess signalling
        __local_semaphore(unsigned int initial_count, unsigned int max_count);
        void _wait_one();
        void _release(unsigned int release_count);

    private:
        unsigned int _initial_count;
        unsigned int _max_count;
        Mutex* _wait_handle;
        bool _waiting;
    };

    __local_semaphore::__local_semaphore(unsigned int initial_count, 
                         unsigned int max_count) {        
        _initial_count = initial_count;
        _max_count = max_count;
        _wait_handle = new Mutex();
        _wait_handle->Lock();
        _waiting = false;
    }

    void __local_semaphore::_wait_one() {
        if(_initial_count == _max_count) {
            _waiting = true;
            _wait_handle->Lock();
        }
        else if(_initial_count < _max_count) {
            _initial_count ++;
        }
    }
    void __local_semaphore::_release(unsigned int release_count) {
        _initial_count -= release_count;
        if(_waiting) {
            _waiting = false;
            _wait_handle->Unlock();
        }
    }
    
    
    //----------------------- Semphore (wrapper)------------(class Semaphore)----
   //create a system wide semaphore with the sem_id provided
    Semaphore::Semaphore(unsigned short sem_id, 
               unsigned int initial_count, unsigned int max_count) {
        _is_local = false;
        __shared_semaphore* shared_semaphore = 
           new __shared_semaphore(sem_id, initial_count, max_count);
        _semaphore_instance_ptr = (void*)shared_semaphore;
    }

    //create a local semaphore
    Semaphore::Semaphore(unsigned int initial_count, unsigned int max_count) {
        _is_local = true;
        __local_semaphore* local_semaphore = 
           new __local_semaphore(initial_count, max_count);
        _semaphore_instance_ptr = (void*)local_semaphore;
    }
    
    //block the caller until it succeeds to occupy a room
    void Semaphore::WaitOne() {
        if(_is_local) {
            ((__local_semaphore*)_semaphore_instance_ptr)->_wait_one();
        }
        else {
            ((__shared_semaphore*)_semaphore_instance_ptr)->_wait_one();
        }
    }

    //release <release_count> occupied rooms
    void Semaphore::Release(int release_count) {
         if(_is_local) {
            ((__local_semaphore*)_semaphore_instance_ptr)->_release(release_count);
        }
        else {
            ((__shared_semaphore*)_semaphore_instance_ptr)->_release(release_count);
        }
    }

    //release an occupied room
    void Semaphore::Release() {
        Release(1);
    }
    
    Semaphore ::~Semaphore() {
        if(_is_local) {
            __local_semaphore* __semaphore_ptr = 
                     (__local_semaphore*)_semaphore_instance_ptr;
            delete __semaphore_ptr;
        }
        else {
            __shared_semaphore* __semaphore_ptr = 
                   (__shared_semaphore*)_semaphore_instance_ptr;
            delete __semaphore_ptr;
        }
        _semaphore_instance_ptr = NULL;
    }
//-----------------------------------------------------------------------
}

Mutex.h:

C++
/* 
 * File:   Mutex.h
 * Author: Souvik Chatterjee
 * This file declares the interface for the Mutex class
 * Mutex class is a wrapper over the pthread mutex.
 * It provides an C# like object oriented implementation 
 * of unix pthread mutex
 */

#ifndef _MUTEX_H
#define    _MUTEX_H
#include <pthread.h>

class Mutex {

public:
    //Mutext::Lock() gains a lock on the MUTEX 
    void Lock();

    //Mutext::Unlock() releases the MUTEX 
    void Unlock();

private:
    //unix pthread instance
    pthread_mutex_t _mutex;
};

#endif    /* _MUTEX_H */

Mutex.cpp:

C++
/* 
 * File:   Mutex.cpp
 * Author: Souvik Chatterjee 
 * This CPP File Contains the implementation of the header Mutex.h
*/

#include <stdio.h>
#include <ios>
#include <pthread.h>

#include "Mutex.h"
//---------------------------------------
/*
 * Mutext::Lock() gains a lock on the MUTEX 
*/
void Mutex::Lock() {
    //execute pthread mutex lock system call
    //with member pthread mutext instance
    //pass the reference of the pthread mutex instance
    pthread_mutex_lock(&_mutex);
}

//--------------------------------------
/*
 * Mutext::Unlock() releases the MUTEX
*/
void Mutex::Unlock() {
    //execute pthread mutex unlock system call 
    //with member pthread mutext instance
    //pass the reference of the pthread mutex instance
    pthread_mutex_unlock(&_mutex);
}
//--------------------------------------

Thread.h:

C++
/* 
 * File:   Thread.h
 * Author: Souvik Chatterjee
 *
 * Created on August 12, 2009, 2:54 AM
 */

#ifndef _THREAD_H
#define    _THREAD_H

#include<pthread.h>
#include <stdio.h>
#include <ios>

namespace mom {
    typedef void* var_t;
    typedef var_t (*thread_start_t)(var_t);
    
    class Thread {    
    public:
        Thread(thread_start_t thread_start);
        void Start(var_t thread_args);
        void Join();
        static int Sleep(unsigned long millisecs) {
            long sec = (long)(millisecs / 1000);
            long nsec = (millisecs - (sec*1000))*1000;
            timespec delay = {sec, nsec};
            int return_val = nanosleep(&delay, (timespec*)NULL);
            return return_val;
        }

    private:
        thread_start_t _thread_start;
        pthread_t _thread;
    };
}
#endif    /* _THREAD_H */

Thread.cpp:

C++
/* 
 * File:   Thread.cpp
 * Author: Souvik Chatterjee
 * 
 * Created on August 12, 2009, 2:54 AM
 */

#include "Thread.h"

namespace mom {
    Thread :: Thread(thread_start_t thread_start) {
        _thread_start = thread_start;
    }
    void Thread :: Start(var_t thread_args) {
        pthread_create(&_thread, NULL, _thread_start, thread_args);
    }
    void Thread :: Join() {
        pthread_join(_thread, NULL);
    }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)