Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Java

Producer-Consumer Problem

4.75/5 (4 votes)
31 Aug 2014CPOL2 min read 40.5K  
Understanding Producer Consumer Problem using Email Queue Example.

Introduction

Producer consumer problem is a classic example of multi process synchronization problem. It have two types of runnable processes.

  1. Producer - which produces a piece of data and puts it in a queue
  2. Consumer - which consumes a piece of data from a queue

In this tip, I have tried to explain my understanding on this problem using Email queue program and it is written using Java language.

Background

Producer and Consumer share a fixed buffer like queue and the problem happens when:

  • Queue is full --- > But producer still continues to produce data and add it into queue.

  • Queue is empty ----> But consumer still tries to remove data from the queue.

Now the question arises as to how we can get rid of this problem.

The solution is to make sure that Producer should not add data from full queue. Consumer should not remove data from empty queue.

One of the solutions is the use of semaphore. Semaphore in simple words is any sharing resource among producer and consumer processes, whose value can decide which one process is to be in sleep state and which one process is to continue running.

Using the Code

I have tried to implement Email message queue with:

  • Producer process adding the newly created email message to the email message queue.
  • Consumer process retrieving the message from message queue and sending it to the destination mail box.

Queue is used as shared resource among producer and consumer process.

In case message queue is empty, Consumer process is sent to wait() state and notifying all other producer processes to wake up and starts adding the new mails messages to mail queue.

In case queue is full, Producer process is sent to wait() state and notify consumer process to retrieve the mail message from the message queue and send it to the destination .

Java
package com.lalit.threadExample;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

public class MainClass {
    public static void main(String... s) throws InterruptedException {
        EmailMessageQueueClass messageQueue = new EmailMessageQueueClass();
        new Thread(new Consumer(messageQueue)).start();
        for (int i = 0; i < 1000; ++i) {
            new Thread(new Producer("Email Message " + i, messageQueue))
                    .start();
        }
    }
}

class Producer implements Runnable {
    EmailMessageQueueClass messageQueue;
    String emailMessageContent;

    public Producer(String message, EmailMessageQueueClass messageQueue) {
        emailMessageContent = message;
        this.messageQueue = messageQueue;
    }

    public void run() {
        try {
            messageQueue.adds(emailMessageContent);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    EmailMessageQueueClass messageQueue;

    public Consumer(EmailMessageQueueClass messageQueue) {
        this.messageQueue = messageQueue;
    }

    public void run() {
        try {
            messageQueue.reterive();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class EmailMessageQueueClass {

    Queue<string> q = new LinkedList<string>();
    private int queueMaxSize = 10;

    synchronized void retrieve() throws InterruptedException {
        while (true) {
            while (q.isEmpty()) {
                wait();
            }
            q.remove();
            System.out.print("Mail Queue::::::[");
            Iterator<string> it = q.iterator();
            while (it.hasNext())
                System.out.print(it.next() + ",");
            System.out.print("]");
            System.out.println();
            notifyAll();
        }
    }

    synchronized void adds(String emailMessage) throws InterruptedException {
        while (q.size() == queueMaxSize) {
            wait();
        }
        q.add(emailMessage);
        System.out.print("Mail Queue::::::[");
        Iterator<string> it = q.iterator();
        while (it.hasNext())
            System.out.print(it.next() + ",");
        System.out.print("]");
        System.out.println();
        notify();
    }
}

Points of Interest

While writing the code, I learnt:

  • How to call wait(), notify() and notifiyAll() functions on shared resource

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)