Introduction
Producer consumer
problem is a classic example of multi process synchronization problem. It have two types of runnable processes.
- Producer - which produces a piece of data and puts it in a queue
- Consumer - which consumes a piece of data from a queue
In this tip, I have tried to explain my understanding on this problem using Email queue program and it is written using Java language.
Background
Producer
and Consumer
share a fixed buffer like queue and the problem happens when:
- Queue is full --- > But
producer
still continues to produce data and add it into queue.
- Queue is empty ----> But
consumer
still tries to remove data from the queue.
Now the question arises as to how we can get rid of this problem.
The solution is to make sure that Producer
should not add data from full queue. Consumer
should not remove data from empty queue.
One of the solutions is the use of semaphore. Semaphore in simple words is any sharing resource among producer
and consumer
processes, whose value can decide which one process is to be in sleep state and which one process is to continue running.
Using the Code
I have tried to implement Email message queue with:
Producer
process adding the newly created email message to the email message queue. Consumer
process retrieving the message from message queue and sending it to the destination mail box.
Queue is used as shared resource among producer and consumer process.
In case message queue is empty, Consumer
process is sent to wait()
state and notifying all other producer
processes to wake up and starts adding the new mails messages to mail queue.
In case queue is full, Producer
process is sent to wait()
state and notify consumer
process to retrieve the mail message from the message queue and send it to the destination .
package com.lalit.threadExample;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
public class MainClass {
public static void main(String... s) throws InterruptedException {
EmailMessageQueueClass messageQueue = new EmailMessageQueueClass();
new Thread(new Consumer(messageQueue)).start();
for (int i = 0; i < 1000; ++i) {
new Thread(new Producer("Email Message " + i, messageQueue))
.start();
}
}
}
class Producer implements Runnable {
EmailMessageQueueClass messageQueue;
String emailMessageContent;
public Producer(String message, EmailMessageQueueClass messageQueue) {
emailMessageContent = message;
this.messageQueue = messageQueue;
}
public void run() {
try {
messageQueue.adds(emailMessageContent);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
class Consumer implements Runnable {
EmailMessageQueueClass messageQueue;
public Consumer(EmailMessageQueueClass messageQueue) {
this.messageQueue = messageQueue;
}
public void run() {
try {
messageQueue.reterive();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class EmailMessageQueueClass {
Queue<string> q = new LinkedList<string>();
private int queueMaxSize = 10;
synchronized void retrieve() throws InterruptedException {
while (true) {
while (q.isEmpty()) {
wait();
}
q.remove();
System.out.print("Mail Queue::::::[");
Iterator<string> it = q.iterator();
while (it.hasNext())
System.out.print(it.next() + ",");
System.out.print("]");
System.out.println();
notifyAll();
}
}
synchronized void adds(String emailMessage) throws InterruptedException {
while (q.size() == queueMaxSize) {
wait();
}
q.add(emailMessage);
System.out.print("Mail Queue::::::[");
Iterator<string> it = q.iterator();
while (it.hasNext())
System.out.print(it.next() + ",");
System.out.print("]");
System.out.println();
notify();
}
}
Points of Interest
While writing the code, I learnt:
- How to call
wait()
, notify()
and notifiyAll()
functions on shared resource