Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Java

Producer-Consumer Problem

0.00/5 (No votes)
6 Sep 2014CPOL1 min read 10.9K   63  
This is an alternative solution for Producer-Consumer Problem

Introduction

This time, we are going to discuss an alternate solution for Producer Consumer Problem, i.e., with high concurrency APIs BlockingQueue.

BlockingQueue amazingly simplifies implementation of Producer-Consumer design pattern by providing out of the box support of blocking on put() and take().

Background

Blocking Queue has been introduced under Java 1.5 release as part of High level Concurrency API.

Blocking Queue is a special type of queue which:

  1. Provides basic Queue functionality. put() method to insert an element and take() method to remove an element from Queue.
  2. Blocks Thread 'A' trying to enqueue when Queue is full untill Thread 'B' removes an element from queue.

Image 1

3. Blocks Thread 'B' trying to dequeue when queue is empty untill Thread 'A' inserts an element into queue.

Image 2

This shows that solution to Producer Consumer problem can be given using Blocking Queue.

Using the Code

I am trying to utilize the Blocking Queue to handle Email Message Queue where Producer thread is putting messages into queue using put() method and Consumer thread is taking messages out from queue using take() method.

Java
package com.lalit.threadExample;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class EmailMessageQueueImplUsingBlockingQueue {
    public static void main(String... s) throws InterruptedException {
        int queueMaxSizeAllowed = 10;
        BlockingQueue<string> messageQueue = new LinkedBlockingQueue<string>(
                queueMaxSizeAllowed);
        new Thread(new Consumer(messageQueue)).start();
        for (int i = 0; i < 100; ++i) {
            new Thread(new Producer("Email Message " + i, messageQueue))
                    .start();
        }
    }
}

class Producer implements Runnable {
    BlockingQueue<string> messageQueue;
    String emailMessageContent;

    public Producer(String message, BlockingQueue<string> messageQueue) {
        emailMessageContent = message;
        this.messageQueue = messageQueue;
    }

    public void run() {
        try {
            System.out.println("Producer Added :- \"" + emailMessageContent
                    + "\" to Mail Queue");
            messageQueue.put(emailMessageContent);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    BlockingQueue<string> messageQueue;

    public Consumer(BlockingQueue<string> messageQueue) {
        this.messageQueue = messageQueue;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consumer Removed :- \""
                        + messageQueue.take()
                        + "\" from Mail Queue to send to destination");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Points of Interest

  • Blocking queue handles Thread's blocking when queue is full or empty and developer does not need to implement extrinsic object lock machanism (i.e., done through synchronized blocks that we used to in case of wait(), notify() methods).
  • Blocking queue provides put() and take() methods which are thread safe.

History

  • 6th September, 2014: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)