6.10 A Multiplexed Network Client
We end this chapter with Example 6-14, a complex class named
HttpDownloadManager. As its name implies, this
class is a client-side utility that manages any number of concurrent
HTTP downloads on a background thread. To download something, just
call the download( ) method, passing a
java.net.URI and an optional
Listener object to be notified when the download
is complete or has aborted with an error. download(
) does not block: it returns a Download
object immediately, and you can monitor the status of the download by
polling the methods of this object. The data that is downloaded is
not saved to a file, but is available from the getData(
) method of the Download object.The Listener and Download
interfaces are defined as inner classes of
HttpDownloadManager. The Status
class is another inner class: it is a type-safe enumeration of
download states returned by Download.getStatus( ).
Two other inner classes used in this example are
DownloadImpl, the package-private concrete
implementation of the Download interface, and
Test, a simple test program that demonstrates the
usage of the HttpDownloadManager.HttpDownloadManager extends
Thread, and the downloads are handled in this
background thread. Multiple downloads can be handled concurrently
(this is useful when the client has more bandwidth available than the
servers which are being downloaded from) because the thread uses a
Selector to multiplex the
SocketChannel objects from which data is being
read: the select( ) call wakes up whenever data is
ready to be read from one of the pending downloads. Most of the
interesting code is found in the run( ) method,
which is the body of the background thread.We've seen basic channel multiplexing code with
Selector.select( ) in previous examples. This one
demonstrates three new features. The first is the call to
wakeup( ) in the download( )
method. A new channel cannot be registered with a
Selector while that selector is blocked in a
select( ) call in the background thread. So the
download( ) method creates a
DownloadImpl object containing all the information
about the download, places this object in a synchronized list of
pending downloads, and then calls wakeup( ) and
returns the Download object to the caller. The
wakeup( ) call in download( )
causes the background thread to stop blocking in select(
). In other examples, we've immediately
checked the selectedKeys( ) of a
Selector when its select( )
method returns. In this case, we first look at the
pendingDownloads list and create and register a
SocketChannel for any
DownloadImpl objects found there.The second new feature of interest in this example is that it
performs
asynchronous connection. Before an
HTTP GET request can be sent to a web server, the client must connect
to the server. Establishing a TCP connection over the Internet can
sometimes take a second or two, and we don't want
the background thread to block while this connection is set up. So
when the thread detects a pending download, it creates a
SocketChannel in an unconnected state. It then
puts the channel into nonblocking mode and registers it with the
Selector object, indicating that the
Selector should monitor the channel for readiness
to connect as well as readiness to read. Only after registering the
channel does the thread call the connect( ) method
and supply the address and port to connect to. Since the channel is
nonblocking, connect( ) returns immediately. When
the connection is ready, select( ) wakes up and
the thread calls finishConnect( ) on the channel
to complete the connection. (After completing the connection, the
thread immediately sends the HTTP GET request across the channel. It
assumes that the channel is writable and that the complete text of
the request can be written quickly; if this assumption fails, the
thread will end up busy-waiting while it repeatedly attempts to send
the request to the server.)The third new feature demonstrated by this example is that when it
registers a SocketChannel with the
Selector, it uses a three-argument version of the
register( ) call to associate the
DownloadImpl object with the
SelectionKey for the channel. Then, when the key
becomes connectable or readable, the background thread can retrieve
the channel and the state object associated with the key.Finally, this example also demonstrates the logging API of
java.util.logging. We'll discuss
logging in the subsection that follows the example code.
Example 6-14. HttpDownloadManager.java
package je3.nio;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
import java.util.logging.*;
/**
* This class manages asynchronous HTTP GET downloads and demonstrates
* non-blocking I/O with SocketChannel and Selector and also demonstrates
*logging with the java.util.logging package.This example uses a number
* of inner classes and interfaces.
*
* Call download( ) for each HTTP GET request you want to issue.You may
* optionally pass a Listener object that will be notified when the
download
* terminates or encounters an exception. download( ) returns a
Download object
* which holds the downloaded bytes (including HTTP headers)
and which allows
* you to poll the Status of the download. Call release( )
when there are
* no more downloads.
*/
public class HttpDownloadManager extends Thread {
// An enumerated type. Values are returned by Download.getStatus( )
public static class Status {
// We haven't connected to the server yet
public static final Status UNCONNECTED = new Status("Unconnected");
// We're connected to the server, sending request or receiving response
public static final Status CONNECTED = new Status("Connected");
// Response has been received. Response may have been an HTTP error
public static final Status DONE = new Status("Done");
// Something went wrong: bad hostname, for example.
public static final Status ERROR = new Status("Error");
private final String name;
private Status(String name) { this.name = name; }
public String toString( ) { return name; }
}
// Everything you need to know about a pending download
public interface Download {
public String getHost( ); // Hostname we're downloading from
public int getPort( ); // Defaults to port 80
public String getPath( ); // includes query string as well
public Status getStatus( ); // Status of the download
public byte[ ] getData( );
// Download data, including response headers
public int getHttpStatus( );// Only call when status is DONE
}
// Implement this interface if you want to know when a
download completes
public interface Listener {
public void done(Download download);
public void error(Download download, Throwable throwable);
}
Selector selector; // For multiplexing non-blocking I/O.
ByteBuffer buffer; // A shared buffer for downloads
List pendingDownloads; // Downloads that don't have a Channel yet
boolean released = false; // Set when the release( ) method is called.
Logger log; // Logging output goes here
// The HTTP protocol uses this character encoding
static final Charset LATIN1 = Charset.forName("ISO-8859-1");
public HttpDownloadManager(Logger log) throws IOException {
if (log == null) log = Logger.getLogger(this.getClass( ).getName( ));
this.log = log;
selector = Selector.open( ); // create Selector
buffer = ByteBuffer.allocateDirect(64*1024); // allocate buffer
pendingDownloads = Collections.synchronizedList(new ArrayList( ));
this.start( ); // start thread
}
// Ask the HttpDownloadManager to begin a download. Returns a Download
// object that can be used to poll the progress of the download. The
// optional Listener object will be notified when the download completes
// or aborts.
public Download download(URI uri, Listener l)
throws IOException
{
if (released)
throw new IllegalStateException("Can't download( ) after release( )");
// Get info from the URI
String scheme = uri.getScheme( );
if (scheme == null || !scheme.equals("http"))
throw new IllegalArgumentException("Must use 'http:' protocol");
String hostname = uri.getHost( );
int port = uri.getPort( );
if (port == -1) port = 80; // Use default port if none specified
String path = uri.getRawPath( );
if (path == null || path.length( ) == 0) path = "/";
String query = uri.getRawQuery( );
if (query != null) path += "?" + query;
// Create a Download object with the pieces of the URL
Download download = new DownloadImpl(hostname, port, path, l);
// Add it to the list of pending downloads. This is a synchronized list
pendingDownloads.add(download);
// And ask the thread to stop blocking in the select( ) call so that
// it will notice and process this new pending Download object.
selector.wakeup( );
// Return the Download so that the caller can monitor it if desired.
return download;
}
public void release( ) {
released = true; // The thread will terminate when it notices the flag.
try { selector.close( ); } // This will wake the thread up
catch(IOException e) {
log.log(Level.SEVERE, "Error closing selector", e);
}
}
public void run( ) {
log.info("HttpDownloadManager thread starting.");
// The download thread runs until release( ) is called
while(!released) {
// The thread blocks here waiting for something to happen
try { selector.select( ); }
catch(IOException e) {
// This should never happen.
log.log(Level.SEVERE, "Error in select( )", e);
return;
}
// If release( ) was called, the thread should exit.
if (released) break;
// If any new Download objects are pending, deal with them first
if (!pendingDownloads.isEmpty( )) {
// Although pendingDownloads is a synchronized list, we still
// need to use a synchronized block to iterate through its
// elements to prevent a concurrent call to download( ).
synchronized(pendingDownloads) {
Iterator iter = pendingDownloads.iterator( );
while(iter.hasNext( )) {
// Get the pending download object from the list
DownloadImpl download = (DownloadImpl)iter.next( );
iter.remove( ); // And remove it.
// Now begin an asynchronous connection to the
// specified host and port. We don't block while
// waiting to connect.
SelectionKey key = null;
SocketChannel channel = null;
try {
// Open an unconnected channel
channel = SocketChannel.open( );
// Put it in non-blocking mode
channel.configureBlocking(false);
// Register it with the selector, specifying that
// we want to know when it is ready to connect
// and when it is ready to read.
key = channel.register(selector,
SelectionKey.OP_READ |
SelectionKey.OP_CONNECT,
download);
// Create the web server address
SocketAddress address =
new InetSocketAddress(download.host,
download.port);
// Ask the channel to start connecting
// Note that we don't send the HTTP request yet.
// We'll do that when the connection completes.
channel.connect(address);
}
catch(Exception e) {
handleError(download, channel, key, e);
}
}
}
}
// Now get the set of keys that are ready for connecting or reading
Set keys = selector.selectedKeys( );
if (keys == null) continue; // bug workaround; should not be needed
// Loop through the keys in the set
for(Iterator i = keys.iterator( ); i.hasNext( ); ) {
SelectionKey key = (SelectionKey)i.next( );
i.remove( ); // Remove the key from the set before handling
// Get the Download object we attached to the key
DownloadImpl download = (DownloadImpl) key.attachment( );
// Get the channel associated with the key.
SocketChannel channel = (SocketChannel)key.channel( );
try {
if (key.isConnectable( )) {
// If the channel is ready to connect, complete the
// connection and then send the HTTP GET request to it.
if (channel.finishConnect( )) {
download.status = Status.CONNECTED;
// This is the HTTP request we send
String request =
"GET " + download.path + " HTTP/1.1\r\n" +
"Host: " + download.host + "\r\n" +
"Connection: close\r\n" +
"\r\n";
// Wrap in a CharBuffer and encode to a ByteBuffer
ByteBuffer requestBytes =
LATIN1.encode(CharBuffer.wrap(request));
// Send the request to the server. If the bytes
// aren't all written in one call, we busy loop!
while(requestBytes.hasRemaining( ))
channel.write(requestBytes);
log.info("Sent HTTP request: " + download.host +
":" + download.port + ": " + request);
}
}
if (key.isReadable( )) {
// If the key indicates that there is data to be read,
// then read it and store it in the Download object.
int numbytes = channel.read(buffer);
// If we read some bytes, store them, otherwise
// the download is complete and we need to note this
if (numbytes != -1) {
buffer.flip( ); // Prepare to drain the buffer
download.addData(buffer); // Store the data
buffer.clear( ); // Prepare for another read
log.info("Read " + numbytes + " bytes from " +
download.host + ":" + download.port);
}
else {
// If there are no more bytes to read
key.cancel( ); // We're done with the key
channel.close( ); // And with the channel.
download.status = Status.DONE;
if (download.listener != null) // notify listener
download.listener.done(download);
log.info("Download complete from " +
download.host + ":" + download.port);
}
}
}
catch (Exception e) {
handleError(download, channel, key, e);
}
}
}
log.info("HttpDownloadManager thread exiting.");
}
// Error-handling code used by the run( ) method:
// set status, close channel, cancel key, log error, notify listener.
void handleError(DownloadImpl download, SocketChannel channel,
SelectionKey key, Throwable throwable)
{
download.status = Status.ERROR;
try {if (channel != null) channel.close( );} catch(IOException e) { }
if (key != null) key.cancel( );
log.log(Level.WARNING,
"Error connecting to or downloading from " + download.host +
":" + download.port,
throwable);
if (download.listener != null)
download.listener.error(download, throwable);
}
// This is the Download implementation we use internally.
static class DownloadImpl implements Download {
final String host; // Final fields are immutable for thread-safety
final int port;
final String path;
final Listener listener;
volatile Status status; // Volatile fields may be changed concurrently
volatile byte[ ] data = new byte[0];
DownloadImpl(String host, int port, String path, Listener listener) {
this.host = host;
this.port = port;
this.path = path;
this.listener = listener;
this.status = Status.UNCONNECTED; // Set initial status
}
// These are the basic getter methods
public String getHost( ) { return host; }
public int getPort( ) { return port; }
public String getPath( ) { return path; }
public Status getStatus( ) { return status; }
public byte[ ] getData( ) { return data; }
/**
* Return the HTTP status code for the download.
* Throws IllegalStateException if status is not Status.DONE
*/
public int getHttpStatus( ) {
if (status != Status.DONE) throw new IllegalStateException( );
// In HTTP 1.1, the return code is in ASCII bytes 10-12.
return
(data[9] - '0') * 100 +
(data[10]- '0') * 10 +
(data[11]- '0') * 1;
}
// Used internally when we read more data.
// This should use a larger buffer to prevent frequent re-allocation.
void addData(ByteBuffer buffer) {
assert status == Status.CONNECTED; // only called during download
int oldlen = data.length; // How many existing bytes
int numbytes = buffer.remaining( ); // How many new bytes
int newlen = oldlen + numbytes;
byte[ ] newdata = new byte[newlen]; // Create new array
System.arraycopy(data, 0, newdata, 0, oldlen); // Copy old bytes
buffer.get(newdata, oldlen, numbytes); // Copy new bytes
data = newdata; // Save new array
}
}
// This class demonstrates a simple use of HttpDownloadManager.
public static class Test {
static int completedDownloads = 0;
public static void main(String args[ ])
throws IOException, URISyntaxException
{
// With a -v argument, our logger will display lots of messages
final boolean verbose = args[0].equals("-v");
int firstarg = 0;
Logger logger = Logger.getLogger(Test.class.getName( ));
if (verbose) {
firstarg = 1;
logger.setLevel(Level.INFO);
}
else // regular output
logger.setLevel(Level.WARNING);
// How many URLs are on the command line?
final int numDownloads = args.length - firstarg;
// Create the download manager
final HttpDownloadManager dm = new HttpDownloadManager(logger);
// Now loop through URLs and call download( ) for each one
// passing a listener object to receive notifications
for(int i = firstarg; i < args.length; i++) {
URI uri = new URI(args[i]);
dm.download(uri,
new Listener( ) {
public void done(Download d) {
System.err.println("DONE: " + d.getHost( ) +
": " + d.getHttpStatus( ));
// If all downloads are complete, we're done
// with the HttpDownloadManager thread.
if (++completedDownloads == numDownloads)
dm.release( );
}
public void error(Download d, Throwable t) {
System.err.println(d.getHost( ) + ": " + t);
if (++completedDownloads == numDownloads)
dm.release( );
}
});
}
}
}
}
6.10.1 Logging in HttpDownloadManager
HttpDownloadManager uses a Logger object to
log informational and error messages. It demonstrates the
info( ) and log( ) methods of
Logger; see the Logger
documentation for descriptions of many other logging methods.
HttpDownloadManager can use a
Logger passed to it, or it can obtain its own.
There are examples of obtaining a Logger object in
both the constructor method and the inner Test
class. The Test class also implements a
-v "verbose"
switch to demonstrate how to set the logging threshold of a
Logger. Informational logging messages will be
discarded unless the -v option is specified.The
java.util.logging package allows flexible runtime
configuration of how
logging is done. In
most installations, the default is to print logging messages to the
console. See the file jre/lib/logging.properties
for the default configuration on your installation. You can override
this default configuration with your own properties file: define the
property java.util.logging.config.file with the
-D option when you start the Java VM. For example,
to run the HttpDownloadManager test program using
a logging configuration specified in a file named
log.props, you'd use a command
line like this one (it has been word wrapped to fit on the page;
you'd type it on one line):
java -Djava.util.logging.config.file=log.props
je3.nio.HttpDownloadManager\$Test -v ...urls here...
Note that we use the -v switch so that the program
actually generates log messages. Without this switch,
you'd have to purposely specify bad URLs so that the
program would log some errors. The following listing is a sample
logging configuration file, which you'll find, along
with the HttpDownloadManager source code, in a
file named log.props:
#
# A logging properties file for the HttpDownloadManager example
# See also jre/lib/logging.properties in your Java installation.
# Use this file by specifying it as the value of a property named
# java.util.logging.config.file. For example:
#
# java -Djava.util.logging.config.file je3.nio.Http...
#
# This property specifies the default logging level for log messages
# sent by our program. Note, however, that if you run the
# HttpDownloadManager\$Test class, this property will be overridden by
# the presence or absence of the -v option.
je3.nio.HttpDownloadManager.level: INFO
# This property says where output should go. The default configuration
# is for it to go to the console. This property sends log messages to
# a FileHandler configured below instead
handlers=java.util.logging.FileHandler
# These properties configure the FileHandler
# See java.util.logging.FileHandler for other available properties
java.util.logging.FileHandler.pattern = %h/java%u.log
java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter