-
Notifications
You must be signed in to change notification settings - Fork 0
BlockingQueue
#Blocking Queues
A BlockingQueue is typically used to have on thread produce objects, which another thread consumes. Here is a diagram that illustrates this principle:
The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. It's limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.
The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.
The implementation of a blocking queue looks similar to a Bounded Semaphore. Here is a simple implementation of a Blocking Queue:
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item)
throws InterruptedException {
while(this.queue.size() == this.limit) {
wait();
}
if(this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
public synchronized Object dequeue()
throws InterruptedException{
while(this.queue.size() == 0){
wait();
}
if(this.queue.size() == this.limit){
notifyAll();
}
return this.queue.remove(0);
}
}
Notice how notifyAll() is only called from enqueue() and dequeue() if the queue size is equal to the size bounds (0 or limit). If the queue size is not equal to either bound when enqueue() or dequeue() is called, there can be no threads waiting to either enqueue or dequeue items.
A BlockingQueue has 4 different sets of methods for inserting, removing and examining the elements in the queue. Each set of methods behaves differently in case the requested operation cannot be carried out immediately. Here is a table of the methods.
Throws Exception Special Value Blocks Times Out
Insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
Remove remove(o) poll() take() poll(timeout, timeunit)
Examine element() peek()
The 4 different sets of behaviour means this:
- Throws Exception: If the attempted operation is not possible immediately, an exception is thrown.
- Special Value: If the attempted operation is not possible immediately, a special value is returned (often true / false).
- Blocks: If the attempted operation is not possible immedidately, the method call blocks until it is.
- Times Out: If the attempted operation is not possible immediately, the method call blocks until it is, but waits no longer than the given timeout. Returns a special value telling whether the operation succeeded or not (typically true / false).
It is not possible to insert null into a BlockingQueue. If you try to insert null, the BlockingQueue will throw a NullPointerException.
It is also possible to access all the elements inside a BlockingQueue, and not just the elements at the start and end. For instance, say you have queued an object for processing, but your application decides to cancel it. You can then call e.g. remove(o) to remove a specific object in the queue. However, this is not done very efficiently, so you should not use these Collection methods unless you really have to.
- unbounded queue – can grow almost indefinitely
- bounded queue – with maximal capacity defined
Java provides several BlockingQueue implementations such as ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, **SynchronousQueue **etc.
The ArrayBlockingQueue class implements the BlockingQueue interface. ArrayBlockingQueue is a bounded, blocking queue that stores the elements internally in an array. That it is bounded means that it cannot store unlimited amounts of elements. There is an upper bound on the number of elements it can store at the same time. You set the upper bound at instantiation time, and after that it cannot be changed.
The ArrayBlockingQueue stores the elements internally in **FIFO **(First In, First Out) order. The head of the queue is the element which has been in queue the longest time, and the tail of the queue is the element which has been in the queue the shortest time.
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();
he LinkedBlockingQueue class implements the BlockingQueue interface. Read the BlockingQueue text for more information about the interface.
The **LinkedBlockingQueue **keeps the elements internally in a linked structure (linked nodes). This linked structure can optionally have an upper bound if desired. If no upper bound is specified, Integer.MAX_VALUE is used as the upper bound.
The **LinkedBlockingQueue **stores the elements internally in **FIFO **(First In, First Out) order. The head of the queue is the element which has been in queue the longest time, and the tail of the queue is the element which has been in the queue the shortest time.
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();
PriorityBlockingQueue class implements the BlockingQueue interface. Read the BlockingQueue text for more information about the interface.
The PriorityBlockingQueue is an unbounded concurrent queue. It uses the same ordering rules as the java.util.PriorityQueue class. You cannot insert null into this queue.
All elements inserted into the PriorityBlockingQueue must implement the java.lang.Comparable interface. The elements thus order themselves according to whatever priority you decide in your Comparable implementation.
Notice that the PriorityBlockingQueue does not enforce any specific behaviour for elements that have equal priority (compare() == 0).
Also notice, that in case you obtain an Iterator from a PriorityBlockingQueue, the Iterator does not guarantee to iterate the elements in priority order.
BlockingQueue queue = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take();
The SynchronousQueue class implements the BlockingQueue interface. Read the BlockingQueue text for more information about the interface.
The SynchronousQueue is a queue that can only contain a single element internally. A thread inseting an element into the queue is blocked until another thread takes that element from the queue. Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.