The BlockingQueue interface in Java is added in Java 1.5 along with various other concurrent Utility classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList, etc. BlockingQueue interface supports flow control (in addition to queue) by introducing blocking if either BlockingQueue is full or empty. A thread trying to enqueue an element in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more elements or clearing the queue completely. Similarly, it blocks a thread trying to delete from an empty queue until some other threads insert an item. BlockingQueue does not accept a null value. If we try to enqueue the null item, then it throws NullPointerException.
Java provides several BlockingQueue implementations such as LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue, etc. Java BlockingQueue interface implementations are thread-safe. All methods of BlockingQueue are atomic in nature and use internal locks or other forms of concurrency control. Java 5 comes with BlockingQueue implementations in the java.util.concurrent package.
Usage of BlockingQueue
The Hierarchy of BlockingQueue
Declaration
public interface BlockingQueue<E> extends Queue<E>
Here, E is the type of elements stored in the Collection.
Classes that Implement BlockingQueue
We directly cannot provide an instance of BlockingQueue since it is an interface, so to utilize the functionality of the BlockingQueue, we need to make use of the classes implementing it. Also, to use BlockingQueue in your code, use this import statement.
import java.util.concurrent.BlockingQueue; (or) import java.util.concurrent.*;
The implementing class of BlockingDeque is LinkedBlockingDeque. This class is the implementation of the BlockingDeque and the linked list data structure. The LinkedBlockingDeque can be optionally bounded using a constructor, however, if the capacity is unspecified it is Integer.MAX_VALUE by default. The nodes are added dynamically at the time of insertion obeying the capacity constraints.
The syntax for creating objects:
BlockingQueue<?> objectName = new LinkedBlockingDeque<?>(); (or) LinkedBlockingDeque<?> objectName = new LinkedBlockingDeque<?>();
Example: In the code given below we perform some basic operations on a LinkedBlockingDeque, like creating an object, adding elements, deleting elements, and using an iterator to traverse through the LinkedBlockingDeque.
BlockingQueue Types
The BlockingQueue are two types:
1. Unbounded Queue: The Capacity of the blocking queue will be set to Integer.MAX_VALUE. In the case of an unbounded blocking queue, the queue will never block because it could grow to a very large size. when you add elements its size grows.
Syntax:
BlockingQueue blockingQueue = new LinkedBlockingDeque();
2. Bounded Queue: The second type of queue is the bounded queue. In the case of a bounded queue you can create a queue passing the capacity of the queue in queues constructor:
Syntax:
// Creates a Blocking Queue with capacity 5 BlockingQueue blockingQueue = new LinkedBlockingDeque(5);
To implement Bounded Semaphore using BlockingQueue
Java
// Java program that explains the internal // implementation of BlockingQueue import java.io.*; import java.util.*; class BlockingQueue<E> { // BlockingQueue using LinkedList structure // with a constraint on capacity private List<E> queue = new LinkedList<E>(); // limit variable to define capacity private int limit = 10 ; // constructor of BlockingQueue public BlockingQueue( int limit) { this .limit = limit; } // enqueue method that throws Exception // when you try to insert after the limit public synchronized void enqueue(E item) throws InterruptedException { while ( this .queue.size() == this .limit) { wait(); } if ( this .queue.size() == 0 ) { notifyAll(); } this .queue.add(item); } // dequeue methods that throws Exception // when you try to remove element from an // empty queue public synchronized E dequeue() throws InterruptedException { while ( this .queue.size() == 0 ) { wait(); } if ( this .queue.size() == this .limit) { notifyAll(); } return this .queue.remove( 0 ); } public static void main(String []args) { } } |
Example:
Java
// Java Program to demonstrate usage of BlockingQueue import java.util.concurrent.*; import java.util.*; public class GFG { public static void main(String[] args) throws InterruptedException { // define capacity of ArrayBlockingQueue int capacity = 5 ; // create object of ArrayBlockingQueue BlockingQueue<String> queue = new ArrayBlockingQueue<String>(capacity); // Add elements to ArrayBlockingQueue using put // method queue.put( "StarWars" ); queue.put( "SuperMan" ); queue.put( "Flash" ); queue.put( "BatMan" ); queue.put( "Avengers" ); // print Queue System.out.println( "queue contains " + queue); // remove some elements queue.remove(); queue.remove(); // Add elements to ArrayBlockingQueue // using put method queue.put( "CaptainAmerica" ); queue.put( "Thor" ); System.out.println( "queue contains " + queue); } } |
queue contains [StarWars, SuperMan, Flash, BatMan, Avengers] queue contains [Flash, BatMan, Avengers, CaptainAmerica, Thor]
Basic Operations
1. Adding Elements
Elements can be added into a LinkedBlockedDeque in different ways depending on the type of structure we are trying to use it as. The most common method is the add() method using which we can add elements at the end of the deque. We can also use the addAll() method (which is a method of the Collection interface) to add an entire collection to LinkedBlockingDeque. If we wish to use the deque as a queue we can use add() and put().
Java
// Java Program Demonstrate add() // method of BlockingQueue import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.*; public class GFG { public static void main(String[] args) throws IllegalStateException { // create object of BlockingQueue BlockingQueue<Integer> BQ = new LinkedBlockingDeque<Integer>(); // Add numbers to the BlockingQueue BQ.add( 7855642 ); BQ.add( 35658786 ); BQ.add( 5278367 ); BQ.add( 74381793 ); // before removing print BlockingQueue System.out.println( "Blocking Queue: " + BQ); } } |
Blocking Queue: [7855642, 35658786, 5278367, 74381793]
2. Accessing Elements
The elements of the LinkedBlockingDeque can be accessed using contains(), element(), peek(), poll(). There are variations of these methods too which are given in the table above along with their descriptions.
Java
// Java Program for Accessing the elements of a // LinkedBlockingDeque import java.util.concurrent.*; public class AccessingElements { public static void main(String[] args) { // Instantiate an object of LinkedBlockingDeque // named lbdq BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>(); // Add elements using add() lbdq.add( 22 ); lbdq.add( 125 ); lbdq.add( 723 ); lbdq.add( 172 ); lbdq.add( 100 ); // Print the elements of lbdq on the console System.out.println( "The LinkedBlockingDeque, lbdq contains:" ); System.out.println(lbdq); // To check if the deque contains 22 if (lbdq.contains( 22 )) System.out.println( "The LinkedBlockingDeque, lbdq contains 22" ); else System.out.println( "No such element exists" ); // Using element() to retrieve the head of the deque int head = lbdq.element(); System.out.println( "The head of lbdq: " + head); } } |
The LinkedBlockingDeque, lbdq contains: [22, 125, 723, 172, 100] The LinkedBlockingDeque, lbdq contains 22 The head of lbdq: 22
3. Deleting Elements
Elements can be deleted from a LinkedBlockingDeque using remove(). Other methods such as take() and poll() can also be used in a way to remove the first and the last elements.
Java
// Java Program for removing elements from a // LinkedBlockingDeque import java.util.concurrent.*; public class RemovingElements { public static void main(String[] args) { // Instantiate an object of LinkedBlockingDeque // named lbdq BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>(); // Add elements using add() lbdq.add( 75 ); lbdq.add( 86 ); lbdq.add( 13 ); lbdq.add( 44 ); lbdq.add( 10 ); // Print the elements of lbdq on the console System.out.println( "The LinkedBlockingDeque, lbdq contains:" ); System.out.println(lbdq); // Remove elements using remove(); lbdq.remove( 86 ); lbdq.remove( 44 ); // Trying to remove an element // that doesn't exist // in the LinkedBlockingDeque lbdq.remove( 1 ); // Print the elements of lbdq on the console System.out.println( "\nThe LinkedBlockingDeque, lbdq contains:" ); System.out.println(lbdq); } } |
The LinkedBlockingDeque, lbdq contains: [75, 86, 13, 44, 10] The LinkedBlockingDeque, lbdq contains: [75, 13, 10]
4. Iterating through the Elements
To iterate through the elements of a LinkedBlockingDeque we can create an iterator and use the methods of the Iterable interface, which is the root of the Collection Framework of Java, to access the elements. The next() method of Iterable returns the element of any collection.
Java
// Java Program to iterate // through the LinkedBlockingDeque import java.util.Iterator; import java.util.concurrent.*; public class IteratingThroughElements { public static void main(String[] args) { // Instantiate an object of LinkedBlockingDeque named lbdq BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>(); // Add elements using add() lbdq.add( 166 ); lbdq.add( 246 ); lbdq.add( 66 ); lbdq.add( 292 ); lbdq.add( 98 ); // Create an iterator to traverse lbdq Iterator<Integer> lbdqIter = lbdq.iterator(); // Print the elements of lbdq on to the console System.out.println( "The LinkedBlockingDeque, lbdq contains:" ); for ( int i = 0 ; i<lbdq.size(); i++) { System.out.print(lbdqIter.next() + " " ); } } } |
The LinkedBlockingDeque, lbdq contains: 166 246 66 292 98
Methods of BlockingQueue
METHOD |
DESCRIPTION |
---|---|
add(E e) | Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success, and throwing an IllegalStateException if no space is currently available. |
contains(Object o) | Returns true if this queue contains the specified element. |
drainTo(Collection<? super E> c) | Removes all available elements from this queue and adds them to the given collection. |
drainTo(Collection<? super E> c, int maxElements) | Removes at most the given number of available elements from this queue and adds them to the given collection. |
offer(E e) | Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available. |
offer(E e, long timeout, TimeUnit unit) | Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available. |
poll(long timeout, TimeUnit unit) | Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available. |
put(E e) | Inserts the specified element into this queue, waiting if necessary for space to become available. |
remainingCapacity() | Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit. |
remove(Object o) | Removes a single instance of the specified element from this queue, if it is present. |
take() | Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. |
Methods declared in interface java.util.Collection
METHOD |
DESCRIPTION |
---|---|
addAll(Collection<? extends E> c) | Adds all of the elements in the specified collection to this collection (optional operation). |
clear() | Removes all of the elements from this collection (optional operation). |
containsAll(Collection<?> c) | Returns true if this collection contains all of the elements in the specified collection. |
equals(Object o) | Compares the specified object with this collection for equality. |
hashCode() | Returns the hash code value for this collection. |
isEmpty() | Returns true if this collection contains no elements. |
iterator() | Returns an iterator over the elements in this collection. |
parallelStream() | Returns a possibly parallel Stream with this collection as its source. |
removeAll(Collection<?> c) | Removes all of this collection’s elements that are also contained in the specified collection (optional operation). |
removeIf(Predicate<? super E> filter) | Removes all of the elements of this collection that satisfy the given predicate. |
retainAll(Collection<?> c) | Retains only the elements in this collection that are contained in the specified collection (optional operation). |
size() | Returns the number of elements in this collection. |
spliterator() | Creates a Spliterator over the elements in this collection. |
stream() | Returns a sequential Stream with this collection as its source. |
toArray() | Returns an array containing all of the elements in this collection. |
toArray(IntFunction<T[]> generator) | Returns an array containing all of the elements in this collection, using the provided generator function to allocate the returned array. |
toArray(T[] a) | Returns an array containing all of the elements in this collection; the runtime type of the returned array is that of the specified array. |
Methods declared in interface java.lang.Iterable
METHOD |
DESCRIPTION |
---|---|
forEach(Consumer<? super T> action) | Performs the given action for each element of the Iterable until all elements have been processed or the action throws an exception. |
Methods declared in interface java.util.Queue
METHOD |
DESCRIPTION |
---|---|
element() | Retrieves, but does not remove, the head of this queue. |
peek() | Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. |
poll() | Retrieves and removes the head of this queue, or returns null if this queue is empty. |
remove() | Retrieves and removes the head of this queue. |
The Behavior of BlockingQueue Methods
The following are the methods provided by the BlockingQueue for insertion, removal, and examine operations on the queue. Each of the four sets of methods behaves differently if the requested operation is not satisfied immediately.
- Throws Exception: An exception will be thrown, if the requested operation is not satisfied immediately.
- Special value: A special value is returned if the operation is not satisfied immediately.
- Blocks: The method call is blocked if the attempted operation is not satisfied immediately and it waits until it gets executed.
- Times out: A special value is returned telling whether the operation succeeded or not. If the requested operation is not possible immediately, the method call blocks until it is, but waits no longer than the given timeout.
Operation | Throws Exception | Special Value | Blocks | Times Out |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
Reference: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html