Introduction
The producer and consumer problem is one of the small collection of standard, well-known problems in concurrent programming. A finite-size buffer and two classes of threads, producers and consumers, put items into the buffer (producers) and take items out of the buffer (consumers).
Using the code
I've implemented this topic in C++ and Python. Could someone guide me to implement this sequence in C# and Java? Thank you!
C++ implementation:
constexpr auto BSIZE = 0x10;
char buffer[BSIZE];
int nextin = 0;
int nextout = 0;
cyan::counting_semaphore<BSIZE> occupied_semaphore(0);
cyan::counting_semaphore<BSIZE> empty_semaphore(BSIZE);
cyan::binary_semaphore resource_mutex(1);
void producer(char item) {
empty_semaphore.acquire();
resource_mutex.acquire();
buffer[nextin] = item;
nextin++;
nextin %= BSIZE;
printf("producer = %d\n", item);
resource_mutex.release();
occupied_semaphore.release();
}
char consumer() {
char item = 0;
occupied_semaphore.acquire();
resource_mutex.acquire();
item = buffer[nextout];
nextout++;
nextout %= BSIZE;
printf("consumer = %d\n", item);
resource_mutex.release();
empty_semaphore.release();
return(item);
}
void producer_thread() {
for (;;) {
const char item = rand() % 255 + 1;
producer(item);
}
}
void consumer_thread() {
for (;;) {
const char item = consumer();
}
}
int main()
{
srand((unsigned int)time(nullptr));
std::thread t1(producer_thread);
std::thread t2(consumer_thread);
t1.join();
t2.join();
}
Python implementation:
CAPACITY = 0x10
buffer = [-1 for i in range(CAPACITY)]
in_index = 0
out_index = 0
occupied_semaphore = threading.Semaphore(0)
empty_semaphore = threading.Semaphore(CAPACITY)
resource_mutex = threading.Semaphore()
def producer(item):
global CAPACITY, buffer, in_index, out_index
global resource_mutex, empty_semaphore, occupied_semaphore
empty_semaphore.acquire()
resource_mutex.acquire()
buffer[in_index] = item
in_index = (in_index + 1) % CAPACITY
print("producer = ", item)
resource_mutex.release()
occupied_semaphore.release()
def consumer():
global CAPACITY, buffer, in_index, out_index
global resource_mutex, empty_semaphore, occupied_semaphore
occupied_semaphore.acquire()
resource_mutex.acquire()
item = buffer[out_index]
out_index = (out_index + 1) % CAPACITY
print("consumer = ", item)
resource_mutex.release()
empty_semaphore.release()
return item
class ProducerThread(threading.Thread):
def run(self):
while True:
item = randint(0, 255)
producer(item)
class ConsumerThread(threading.Thread):
def run(self):
while True:
item = consumer()
seed(1)
producer_thread = ProducerThread()
consumer_thread = ConsumerThread()
producer_thread.start()
consumer_thread.start()
You can find the latest version of source code in my GitHub repo.
History
January 29th, 2024: Initial release.