10.3 Using Blocking Queues
In Chapter 1, you saw that Tiger introduced a new collection type, java.util.Queue. This interface has
several implementations, such as DelayQueue and PriorityQueue. However, all of these assume sufficient room in the queue for adding elements, or, at the least, an error when
there isn't room available.True queues, though, often involve a wait period, where an element (or
personthink of a line for a concert) waits in place until an opening is
available. The same is true for removalanother thread (or unruly ticketbuyer)
shouldn't be able to jump in front if there are already threads
waiting to peel off the next item in the queue. Fortunately, the guys at
Sun realized this is an important threading concept (or maybe just spent
a lot of time trying to see Dave Matthews recently). In either case, the
end-result is java.util.concurrent.BlockingQueue. This interface
defines a means of blocking other threads on a put, or a take. As an
added bonus, I'll even drop the concert ticket analogy now.
10.3.1 How do I do that?
The Queue interface
defines the offer( ) method for adding elements to the queue, and the poll( ) method for removing elements. offer( ) should be used instead of add( ) (defined in Collection), because it returns a boolean value indicating if the addition was successful (implying that the queue was not full). In the same fashion, poll( ) simply returns null if the queue is empty. However, neither of these methods wait for space to be available, or for an element to be available, respectively.java.util.concurrent.BlockingQueue is an interface that extends Queue, and adds two more methods: put( ) and take( ). This is one of those
cases where a code sample is worth a thousand words, so take a look at
Example 10-3. This represents one of the classic uses of a queue, in a producer/consumer relationship.NOTE"add( )" throws an unchecked exception if the queue is full, which isn't really
appropriate, as a full queue isn't an exceptional condition.
Example 10-3. A producer for a BlockingQueue
Example 10-4 is the consumer half of the relationship.
package com.oreilly.tiger.ch10;
import java.io.PrintStream;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
public class Producer extends Thread {
private BlockingQueue q;
private PrintStream out;
public Producer(BlockingQueue q, PrintStream out) {
setName("Producer");
this.q = q;
this.out = out;
}
public void run( ) {
try {
while (true) {
q.put(produce( ));
}
} catch (InterruptedException e) {
out.printf("%s interrupted: %s", getName( ), e.getMessage( ));
}
}
private String produce( ) {
while (true) {
double r = Math.random( );
// Only goes forward 1/10 of the time
if ((r*100) < 10) {
String s = String.format("Inserted at %tc", new Date( ));
return s;
}
}
}
}
Example 10-4. Consumer for a BlockingQueue
Finally, here's a sample usage:
package com.oreilly.tiger.ch10;
import java.io.PrintStream;
import java.util.concurrent.BlockingQueue;
public class Consumer extends Thread {
private BlockingQueue q;
private PrintStream out;
public Consumer(String name, BlockingQueue q,
PrintStream out) {
setName(name);
this.q = q;
this.out = out;
}
public void run( ) {
try {
while (true) {
process(q.take( ));
}
} catch (InterruptedException e) {
out.printf("%s interrupted: %s", getName( ), e.getMessage( ));
}
}
private void process(Object obj) {
out.printf("%s processing object:%n '%s'%n",
getName( ), obj.toString( )); }
}
NOTEYou can test out this method by running "ant runch10". It will wait forever, though,
public void testQueue(PrintStream out) throws IOException {
BlockingQueue queue = new LinkedBlockingQueue(10);
Producer p = new Producer(queue, out);
Consumer c1 = new Consumer("Consumer 1", queue, out);
Consumer c2 = new Consumer("Consumer 2", queue, out);
Consumer c3 = new Consumer("Consumer 3", queue, out);
Consumer c4 = new Consumer("Consumer 4", queue, out);
p.start(); c1.start(); c2.start(); c3.start( ); c4.start( );
while (true) {
// hang out for a while
}
}
so you'll have to break out of the program.You'll see tons of output, as the producer fills the queue and the consumers
grab information out of it. What's cool, though, is that the processing
cycles through the four consumers, in order:
This lets you know that each consumer, once it gets its turn, has a lock
[java] Consumer 1 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 2 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 3 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 4 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 1 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 2 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 3 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 4 processing object:
[java] 'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
on the queue until it gets an object. This could take a few seconds, or a
few days, and the threads really don't care.There are five out-of-the-box implementations of BlockingQueue; all are
in the java.util.concurrent package:NOTEIt's a bit of luck that these came out in order. Your results may be completely
different, orderwise.
ArrayBlockingQueue
You have to specify the initial
capacity when you create this queue,
and like any other array, this capacity is the fixed limit. This queue
has a somewhat reduced throughput as compared to other implementations,
but threads are served in the order that they arrive.
LinkedBlockingQueue
This queue is based on a
linked list (duh!). While you can specify a
maximum size, it is by default unbounded.
PriorityBlockingQueue
This queue bases ordering on a
specified Comparator, and the element
returned by any take( ) call is the smallest element based on
this ordering. If you don't specify a Comparator, the natural ordering
is used (assuming the objects supplied to it implement Comparable). If
your objects don't implement Comparable, and you don't have a Comparator to supply, there's really no reason to use
PriorityBlockingQueue.NOTEThere's a nonblocking version of PriorityBlockingQueue, java.util.PriorityQueue.
DelayQueue
DelayQueue is essentially a version
of PriorityBlockingQueue that
uses elements that implement the new java.util.concurrent.Delayed interface. Since this interface extends Comparable, it fits
right into a PriorityBlockingQueue structure. Additionally, it won't
allow an element to be grabbed with take( ) until that element's
delay has elapsed.
SynchronousQueue
This queue has a size of
zero (yes, you read that correctly). It blocks
put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).
These are self-explanatory, so pick the one you need, and go forth and
code (well, after reading the rest of this chapter).NOTEIf you're familiar at all with Ada (a programming language used most often in military defense programs), SynchronousQueue is a lot like a rendezvous `channel.