SynchronousQueue is a special blocking queue with no internal capacity. It helps in exchange data or information between threads in a thread-safe manner.
SynchronousQueue has only 2 supported operations:
Both of these are blocking method which means when we want to add a piece of information or data in the queue, we call the put() method but this method will remain blocked or it will wait until some other thread calls take() method and allows the thread to take the data or information.
1. take()
Java
try { synchronousQueue.put( "data or information goes here" ); } catch (InterruptedException iex) { iex.printStackTrace(); } |
2. put()
Java
try { // data type according to the data or information String info = synchronousQueue.take(); } catch (InterruptedException iex) { iex.printStackTrace(); } |
There are two types of constructors of SynchronousQueue which is based on two different access policy:
1. SynchronousQueue(): In this, if multiple threads are waiting then these threads are granted access randomly or unspecified manner, this is called no fair policy.
2. SynchronousQueue(boolean fair): In this, if multiple threads are waiting then these threads are granted access in FIFO(first in first out) manner.
Implementation:
Java
// Java program to implement SynchronousQueue API. import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; public class SynchronousQAPI<E> { public SynchronousQueue<E> synchronousQ; // we create a SynchronousQueue with no fair policy public SynchronousQAPI() { synchronousQ = new SynchronousQueue<E>(); } // we create a SynchronousQueue with fair policy public SynchronousQAPI( boolean fair) { synchronousQ = new SynchronousQueue<E>(); } // As we discussed above in API overview that // SynchronousQueue has 2 supported operations put() and // take() So, we will implement this methods only // put() method: It insert element at tail of the queue // and used to wait until the queue is full. public void put(E e) throws InterruptedException { synchronousQ.put(e); } // take() method: return element at the head of the // queue public E take() throws InterruptedException { return synchronousQ.take(); } // Implementation of Put Thread (producer) class Put implements Runnable { @SuppressWarnings ( "rawtypes" ) BlockingQueue SynchronousQueue; @SuppressWarnings ( "rawtypes" ) public Put(BlockingQueue q) { this .SynchronousQueue = q; } @SuppressWarnings ( "unchecked" ) @Override public void run() { try { // put the data SynchronousQueue.put( 1 ); System.out.println( "1 added to synchronous queue." ); Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } class Take implements Runnable { @SuppressWarnings ( "rawtypes" ) BlockingQueue SynchronousQueue; @SuppressWarnings ( "rawtypes" ) public Take(BlockingQueue q) { this .SynchronousQueue = q; } @Override public void run() { try { // take out the previously inserted data this .SynchronousQueue.take(); System.out.println( "1 removed from synchronous queue." ); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { SynchronousQAPI<Integer> synchronousQueue = new SynchronousQAPI<Integer>(); new Thread( new SynchronousQAPI<>(). new Put( synchronousQueue.synchronousQ)) .start(); new Thread( new SynchronousQAPI<>(). new Take( synchronousQueue.synchronousQ)) .start(); } } |
1 added to synchronous queue. 1 removed from synchronous queue.