Java 1.5 Tiger A Developers Notebook [Electronic resources]

David Flanagan, Brett McLaughlin

نسخه متنی -صفحه : 131/ 96
نمايش فراداده

10.3 Using Blocking Queues

In Chapter 1, you saw that Tiger introduced a new collection type, java.util.Queue. This interface has several implementations, such as DelayQueue and PriorityQueue. However, all of these assume sufficient room in the queue for adding elements, or, at the least, an error when there isn't room available.

True queues, though, often involve a wait period, where an element (or personthink of a line for a concert) waits in place until an opening is available. The same is true for removalanother thread (or unruly ticketbuyer) shouldn't be able to jump in front if there are already threads waiting to peel off the next item in the queue. Fortunately, the guys at Sun realized this is an important threading concept (or maybe just spent a lot of time trying to see Dave Matthews recently). In either case, the end-result is java.util.concurrent.BlockingQueue. This interface defines a means of blocking other threads on a put, or a take. As an added bonus, I'll even drop the concert ticket analogy now.

10.3.1 How do I do that?

The Queue interface defines the offer( ) method for adding elements to the queue, and the poll( ) method for removing elements. offer( ) should be used instead of add( ) (defined in Collection), because it returns a boolean value indicating if the addition was successful (implying that the queue was not full). In the same fashion, poll( ) simply returns null if the queue is empty. However, neither of these methods wait for space to be available, or for an element to be available, respectively.

java.util.concurrent.BlockingQueue is an interface that extends Queue, and adds two more methods: put( ) and take( ). This is one of those cases where a code sample is worth a thousand words, so take a look at Example 10-3. This represents one of the classic uses of a queue, in a producer/consumer relationship.

NOTE

"add( )" throws an unchecked exception if the queue is full, which isn't really appropriate, as a full queue isn't an exceptional condition.

Example 10-3. A producer for a BlockingQueue
package com.oreilly.tiger.ch10;
import java.io.PrintStream;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
public class Producer extends Thread {
private BlockingQueue q;
private PrintStream out;
public Producer(BlockingQueue q, PrintStream out) {
setName("Producer");
this.q = q;
this.out = out;
}
public void run( ) {
try {
while (true) {
q.put(produce( ));
}
} catch (InterruptedException e) {
out.printf("%s interrupted: %s", getName( ), e.getMessage( ));
}
}
private String produce( ) {
while (true) {
double r = Math.random( );
// Only goes forward 1/10 of the time
if ((r*100) < 10) {
String s = String.format("Inserted at %tc", new Date( ));
return s;
}
}
}
}

Example 10-4 is the consumer half of the relationship.

Example 10-4. Consumer for a BlockingQueue
package com.oreilly.tiger.ch10;
import java.io.PrintStream;
import java.util.concurrent.BlockingQueue;
public class Consumer extends Thread {
private BlockingQueue q;
private PrintStream out;
public Consumer(String name, BlockingQueue q,
PrintStream out) {
setName(name);
this.q = q;
this.out = out;
}
public void run( ) {
try {
while (true) {
process(q.take( ));
}
} catch (InterruptedException e) {
out.printf("%s interrupted: %s", getName( ), e.getMessage( ));
}
}
private void process(Object obj) {
out.printf("%s processing object:%n           '%s'%n",
getName( ), obj.toString( )); }
}

Finally, here's a sample usage:

public void testQueue(PrintStream out) throws IOException {
BlockingQueue queue = new LinkedBlockingQueue(10);
Producer p = new Producer(queue, out);
Consumer c1 = new Consumer("Consumer 1", queue, out);
Consumer c2 = new Consumer("Consumer 2", queue, out);
Consumer c3 = new Consumer("Consumer 3", queue, out);
Consumer c4 = new Consumer("Consumer 4", queue, out);
p.start(); c1.start(); c2.start(); c3.start( ); c4.start( );
while (true) {
// hang out for a while
}
}

NOTE

You can test out this method by running "ant runch10". It will wait forever, though, so you'll have to break out of the program.

You'll see tons of output, as the producer fills the queue and the consumers grab information out of it. What's cool, though, is that the processing cycles through the four consumers, in order:

[java] Consumer 1 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 2 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 3 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 4 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 1 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 2 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 3 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'
[java] Consumer 4 processing object:
[java]          'Inserted at Tue May 04 08:43:50 GMT-06:00 2004'

This lets you know that each consumer, once it gets its turn, has a lock on the queue until it gets an object. This could take a few seconds, or a few days, and the threads really don't care.

There are five out-of-the-box implementations of BlockingQueue; all are in the java.util.concurrent package:

NOTE

It's a bit of luck that these came out in order. Your results may be completely different, orderwise.

ArrayBlockingQueue

You have to specify the initial capacity when you create this queue, and like any other array, this capacity is the fixed limit. This queue has a somewhat reduced throughput as compared to other implementations, but threads are served in the order that they arrive.

LinkedBlockingQueue

This queue is based on a linked list (duh!). While you can specify a maximum size, it is by default unbounded.

PriorityBlockingQueue

This queue bases ordering on a specified Comparator, and the element returned by any take( ) call is the smallest element based on this ordering. If you don't specify a Comparator, the natural ordering is used (assuming the objects supplied to it implement Comparable). If your objects don't implement Comparable, and you don't have a Comparator to supply, there's really no reason to use PriorityBlockingQueue.

NOTE

There's a nonblocking version of PriorityBlockingQueue, java.util.PriorityQueue.

DelayQueue

DelayQueue is essentially a version of PriorityBlockingQueue that uses elements that implement the new java.util.concurrent.Delayed interface. Since this interface extends Comparable, it fits right into a PriorityBlockingQueue structure. Additionally, it won't allow an element to be grabbed with take( ) until that element's delay has elapsed.

SynchronousQueue

This queue has a size of zero (yes, you read that correctly). It blocks put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).

These are self-explanatory, so pick the one you need, and go forth and code (well, after reading the rest of this chapter).

NOTE

If you're familiar at all with Ada (a programming language used most often in military defense programs), SynchronousQueue is a lot like a rendezvous `channel.