Ticker

6/recent/ticker-posts

Ad Code

Responsive Advertisement

What is Thread-Safe BlockingQueue in Java? When should you use it? Implementation Attached

What is Threadsafe BlockingQueue in Java and when you should use it

So far I’ve written two articles on Producer Consumer concept on Crunchify. 1st one to explain Java Semaphore and Mutex and 2nd one to explain Concurrent Read/Write.

In this Java Tutorial we will go over same Producer/Consumer concept to explain the BlockingQueue in Java.

What are the advantages of Blocking Queue in Java?

A java.util.Queue supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

java.util.BlockingQueue in java - Crunchify

We need to create four Java Classes:

  1. CrunchifyMessage.java to put and get message
  2. CrunchifyBlockingProducer.java to put message into queue
  3. CrunchifyBlockingConsumer.java to get message from queue
  4. CrunchifyBlockingMain.java to start test

BlockingQueue implementations are thread-safe. All queuing methods are atomic in nature and use internal locks.

Let’s get started on Thread-Safe BlockingQueue implementation in Java

Step-1

Create class CrunchifyMessage.java. This is simple Java Object.

package com.crunchify.example;

/**
 * @author Crunchify.com 
 * simple Message class to put and get message into queue
 */

public class CrunchifyMessage {
    private String crunchifyMsg;
    
    public CrunchifyMessage(String string) {
        this.crunchifyMsg = string;
    }
    
    public String getMsg() {
        return crunchifyMsg;
    }
}

Step-2

Create producer CrunchifyBlockingProducer.java which created simple msg and put it into queue.

package com.crunchify.example;

import java.util.concurrent.BlockingQueue;

/**
 * @author Crunchify.com
 * 
 */

public class CrunchifyBlockingProducer implements Runnable {
    
    private BlockingQueue<CrunchifyMessage> crunchQueue;
    
    public CrunchifyBlockingProducer(BlockingQueue<CrunchifyMessage> queue) {
        this.crunchQueue = queue;
    }
    
    @Override
    public void run() {
        // producing CrunchifyMessage messages
        for (int i = 1; i <= 5; i++) {
            CrunchifyMessage msg = new CrunchifyMessage("i'm msg " + i);
            try {
                Thread.sleep(10);
                crunchQueue.put(msg);
                System.out.println("CrunchifyBlockingProducer: Message - " + msg.getMsg() + " produced.");
            } catch (Exception e) {
                System.out.println("Exception:" + e);
            }
        }
        
        // adding exit message
        CrunchifyMessage msg = new CrunchifyMessage("All done from Producer side. Produced 50 CrunchifyMessages");
        try {
            crunchQueue.put(msg);
            System.out.println("CrunchifyBlockingProducer: Exit Message - " + msg.getMsg());
        } catch (Exception e) {
            System.out.println("Exception:" + e);
        }
    }   
}

Step-3

Create class CrunchifyBlockingConsumer.java which consumes message from queue.

package com.crunchify.example;

import java.util.concurrent.BlockingQueue;

/**
 * @author Crunchify.com
 * 
 */

public class CrunchifyBlockingConsumer implements Runnable {
    
    private BlockingQueue<CrunchifyMessage> queue;
    
    public CrunchifyBlockingConsumer(BlockingQueue<CrunchifyMessage> queue) {
        this.queue = queue;
    }
    
    @Override
    public void run() {
        try {
            CrunchifyMessage msg;
            
            // consuming messages until exit message is received
            while ((msg = queue.take()).getMsg() != "exit") {
                Thread.sleep(10);
                System.out.println("CrunchifyBlockingConsumer: Message - " + msg.getMsg() + " consumed.");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }   
}

Step-4

Create simple CrunchifyBlockingMain.java method which runs the BlockingQueue test. Run this program to check BlockingQueue behavior.

package com.crunchify.example;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author Crunchify.com
 * 
 */

public class CrunchifyBlockingMain {
    
    public static void main(String[] args) {
        
        // Creating BlockingQueue of size 10
        // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and
        // wait for space to become available in the queue when storing an element.
        BlockingQueue<CrunchifyMessage> crunchQueue = new ArrayBlockingQueue<>(10);
        CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer(crunchQueue);
        CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer(crunchQueue);
        
        // starting producer to produce messages in queue
        new Thread(crunchProducer).start();
        
        // starting consumer to consume messages from queue
        new Thread(crunchConsumer).start();
        
        System.out.println("Let's get started. Producer / Consumer Test Started.\n");
    }   
}

A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null.

A null is used as a sentinel value to indicate failure of poll operations.

Result:

Let's get started. Producer / Consumer Test Started.

CrunchifyBlockingProducer: Message - i'm msg 1 produced.
CrunchifyBlockingProducer: Message - i'm msg 2 produced.
CrunchifyBlockingConsumer: Message - i'm msg 1 consumed.
CrunchifyBlockingConsumer: Message - i'm msg 2 consumed.
CrunchifyBlockingProducer: Message - i'm msg 3 produced.
CrunchifyBlockingConsumer: Message - i'm msg 3 consumed.
CrunchifyBlockingProducer: Message - i'm msg 4 produced.
CrunchifyBlockingConsumer: Message - i'm msg 4 consumed.
CrunchifyBlockingProducer: Message - i'm msg 5 produced.
CrunchifyBlockingProducer: Exit Message - All done from Producer side. Produced 50 CrunchifyMessages
CrunchifyBlockingConsumer: Message - i'm msg 5 consumed.
CrunchifyBlockingConsumer: Message - All done from Producer side. Produced 50 CrunchifyMessages consumed.

When we should use java.util.concurrent.BlockingQueue?

  • When you want to throttle some sort of incoming request then you should use the same
  • A producers can get far ahead of the consumers with an unbounded queue. If consumer is not catching up with producer then it may cause an OutOfMemoryError. In situations like these, it may be better to signal a would-be producer that the queue is full, and to give up quickly with a failure.
    • In other words: the producers are naturally throttled.
  • Blocking Queue is normally used in concurrent application
  • It provides a correct, thread-safe implementation
  • Memory consumption should be limited as well

The post What is Thread-Safe BlockingQueue in Java? When should you use it? Implementation Attached appeared first on Crunchify.

Enregistrer un commentaire

0 Commentaires