Implementing a Producer-Consumer scenario using BlockingQueue in Java

Before we start with the actual example, lets have a look at the few concepts we should be aware of.

Producer-Consumer Problem

Wikipedia here says that:
The consumer producer problem (also known as the bounded-buffer problem) is a classical example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate a piece of data, put it into the buffer and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time. The problem is to make sure that the producer won’t try to add data into the buffer if it’s full and that the consumer won’t try to remove data from an empty buffer.

also read:

The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer. The solution can be reached by means of inter-process communication, typically using semaphores. An inadequate solution could result in a deadlock where both processes are waiting to be awakened. The problem can also be generalized to have multiple producers and consumers.

There are numerous ways to solve a Producer-Consumer problem and in this post I will show one simple way to solve this problem by using the Data Structures and other constructs provided in the JDK. Java 5 introduced a new set of concurrency related APIs in its java.util.concurrent package. As I said here, not many of the developers are aware of these APIs and very few of the make use of it in their code.

There were quite a few new Collection classes which got introduced in Java 5 and one of them is the BlockingQueue.

BlockingQueue in Java

The JavaDoc says:
BlockingQueue is “A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.” There are multiple methods which are supported to retrieve and add elements to the queue wherein one pair throws an exceptions, one of them waits for some fixed time and one pair which blocks until the queue is full/empty. The methods which blocks the thread are the put(e) and take.

In a typical producer-consumer problem we would want the consumer thread to be blocked until there is something in the queue to be consumed and the producer thread to be blocked until there is some free space in the queue to add some element. With the use of normal collection classes it because quite a bit of work in implementing inter thread communication by waiting and notifying other threads about the status of the queue. The put(e) and take() methods of the BlockingQueue class are the ones which make it very easy to solve producer-consumer like problems.

Lets take a scenario where the producer thread would watch for the files being modified in some directory and add those files to the queue and the consumer thread would print the contents of those files on to the console.

Producer thread

If you are not familiar with implementing the WatchService in Java, you must first read this to get an idea of how it works.

class FileProducer implements Runnable{

  BlockingQueue<Path> filesList;
  Path rootPath;

  public FileProducer(BlockingQueue<Path> filesList, 
                      Path rootPath){
    this.filesList = filesList;
    this.rootPath = rootPath;
  }

  @Override
  public void run() {
    try {
      WatchService service = 
          FileSystems.getDefault().newWatchService();

      rootPath.register(service,
              StandardWatchEventKinds.ENTRY_MODIFY);

      while(true){
        WatchKey key = service.take();

        for (WatchEvent event : key.pollEvents()){

          Path relativePath = (Path)event.context();

          Path absolutePath =
                    Paths.get(rootPath.toString(),
                            relativePath.toString());

          filesList.put(absolutePath);

        }

        //reset is invoked to put the key back to ready
        boolean valid = key.reset();

        //If the key is invalid, just exit.
        if ( !valid){
          break;
        }
      }

    } catch (IOException e) {

      e.printStackTrace();

    } catch (InterruptedException e) {

      e.printStackTrace();

      Thread.currentThread().interrupt();
    }

  }
}

The producer thread above watches a certain directory for file modifications and adds the absolute path of the file into the BlockingQueue collection passed to the producer thread via its constructor.

Consumer Thread

The consumer thread would invoke take() on the BlockingQueue instance and then use the Files API to read the contents. As take() is a blocking call, if the filesList collection is empty then it would just block and wait for the data to be available in the filesList collection.

class FileConsumer implements Runnable{

  BlockingQueue<Path> filesList;
  Path rootPath;

  public FileConsumer(BlockingQueue<Path> filesList, 
                      Path rootPath){
    this.filesList = filesList;
    this.rootPath = rootPath;
  }

  @Override
  public void run(){
    try {
      while(true){

        Path fileToRead = filesList.take();

        List<String> linesInFile = 
            Files.readAllLines(fileToRead, 
                               Charset.defaultCharset());

        System.out.println("reading file: "+fileToRead);

        for ( String line : linesInFile){
          System.out.println(line);
        }

      }
    } catch (InterruptedException e) {

      e.printStackTrace();

      Thread.currentThread().interrupt();
    } catch (IOException e) {

      e.printStackTrace();

    }

  }
}

Note: If you are writing to a file using Vim or some other editors which creates temporary files then you have to make sure you exclude such files being added to the queue.

Invoking the Producer and consumer

public class ProducerConsumerSample {

  public static void main(String[] args) {
 
    BlockingQueue<Path> filesList = 
        new LinkedBlockingQueue<>(10);
 
    Path rootPath = Paths.get("/tmp/nio");
    
    Thread producerThread = 
        new Thread(new FileProducer(filesList, rootPath));
    Thread consumerThread = 
        new Thread(new FileConsumer(filesList, rootPath));
 
    producerThread.start();
    consumerThread.start();
  }

}

Pretty straight forward- create instances of both the threads and then launch them. You can create multiple consumer threads as well! In the above example we make use of the LinkedBlockingQueue which is one of the implementations of the BlockingQueue.

You have to make sure you import corresponding classes in your source code. You can have all the three classes defined in the same file and name the file as ProducerConsumerSample.java and compile and run the code. Once you have the code running, then go to your terminal and type:

/tmp/nio$ touch file1
/tmp/nio$ echo "this is file1" >> file1
/tmp/nio$ touch file2
/tmp/nio$ echo "this is file2" >> file2

and the output you see on the terminal of your java program is:

reading file: /tmp/nio/file1
reading file: /tmp/nio/file1
this is file1
reading file: /tmp/nio/file2
reading file: /tmp/nio/file2
this is file2

Note: This code was compiled and tested on a Linux platform, please find similar ways of creating files on Windows when you run your code.

Comments

comments

About Mohamed Sanaulla

In his day job he works on developing enterprise applications using ADF. He is also the moderator of JavaRanch forums and an avid blogger.

Trackbacks

  1. […] Using BlockingQueue to solve the Producer and consumer problem. More details can be found here. […]

Speak Your Mind

*