12.2 An Example Server
Clients are well and good, but
channels and buffers are really intended for server systems that need
to process many simultaneous connections efficiently. Handling
servers requires a third new piece in addition to the buffers and
channels used for the client. Specifically, you need selectors that
allow the server to find all the connections that are ready to
receive output or send input.To demonstrate the basics, I'll implement a simple
server for the character generator protocol. When implementing a
server that takes advantage of Java 1.4's new I/O
APIs, begin by calling the static factory method
ServerSocketChannel.open() method to create a new
ServerSocketChannel object:
ServerSocketChannel serverChannel = ServerSocketChannel .open( );Initially this channel is not actually listening on any port. To bind
it to a port, retrieve its ServerSocket peer
object with the socket( ) method and then use the
bind( ) method on that peer. For example, this
code fragment binds the channel to a server socket on port 19:
ServerSocket ss = serverChannel.socket( );As with regular server sockets, binding to port 19 requires you to be
ss.bind(new InetSocketAddress(19));
root on Unix (including Linux and Mac OS X). Nonroot users can only
bind to ports 1024 and higher.The server socket channel is now listening for incoming connections
on port 19. To accept one, call the
ServerSocketChannel accept() method, which returns a
SocketChannel object:
SocketChannel clientChannel = serverChannel.accept( );On the server side, you'll definitely want to make
the client channel non-blocking to allow the server to process
multiple simultaneous connections:
clientChannel.configureBlocking(false);You may also want to make the
ServerSocketChannel non-blocking. By default, this
accept( ) method blocks until
there's an incoming connection, like the
accept( ) method of
ServerSocket. To change this, simply call
configureBlocking(false) before calling
accept( ):
serverChannel.configureBlocking(false);A non-blocking accept( ) returns null almost
immediately if there are no incoming connections. Be sure to check
for that or you'll get a nasty
NullPointerException when trying to use the
socket.There are now two open channels: a server channel and a client
channel. Both need to be processed. Both can run indefinitely.
Furthermore, processing the server channel will create more open
client channels. In the traditional approach, you assign each
connection a thread, and the number of threads climbs rapidly as
clients connect. Instead, in the new I/O API, you create a
Selector that enables the program to iterate over
all the connections that are ready to be processed. To construct a
new Selector, just call the static
Selector.open( ) factory method:
Selector selector = Selector.open( );Next you need to register each
channel with the selector that
monitors it using the channel's register() method. When registering, specify the operation
you're interested in using a named constant from the
SelectionKey class. For the server socket, the only
operation of interest is
OP_ACCEPT; that
is, is the server socket channel ready to accept a new connection?
serverChannel.register(selector, SelectionKey.OP_ACCEPT);For the client channels, you want to know something a little
different, specifically, whether they're ready to
have data written onto them. For this, use the
OP_WRITE key:
SelectionKey key = clientChannel.register(selector, SelectionKey.OP_WRITE);Both register( ) methods return a
SelectionKey object. However,
we're only going to need to use that key for the
client channels, because there can be more than one of them. Each
SelectionKey has an attachment of arbitrary
Object type. This is normally used to hold an
object that indicates the current state of the connection. In this
case, we can store the buffer that the channel writes onto the
network. Once the buffer is fully drained, we'll
refill it. Fill an array with the data that will be copied into each
buffer. Rather than writing to the end of the buffer, then rewinding
to the beginning of the buffer and writing again,
it's easier just to start with two sequential copies
of the data so every line is available as a contiguous sequence in
the array:
byte[] rotation = new byte[95*2];Because this array will only be read from after it's
for (byte i = ' '; i <= '~'; i++) {
rotation[i-' '] = i;
rotation[i+95-' '] = i;
}
been initialized, you can reuse it for multiple channels. However,
each channel will get its own buffer filled with the contents of this
array. We'll stuff the buffer with the first 72
bytes of the rotation array, then add a carriage return/linefeed pair
to break the line. Then we'll flip the buffer so
it's ready for draining, and attach it to the
channel's key:
ByteBuffer buffer = ByteBuffer.allocate(74);To check whether anything is ready to be acted on, call the
buffer.put(rotation, 0, 72);
buffer.put((byte) '\r');
buffer.put((byte) '\n');
buffer.flip( );
key2.attach(buffer);
selector's
select( ) method. For a long-running server, this
normally goes in an infinite loop:
while (true) {Assuming the selector does find a ready channel, its
selector.select ( );
// process selected keys...
}
selectedKeys( )
method returns a java.util.Set containing one
SelectionKey object for each ready channel.
Otherwise it returns an empty set. In either case, you can loop
through this with a java.util.Iterator:
Set readyKeys = selector.selectedKeys( );Removing the key from the set tells the selector that
Iterator iterator = readyKeys.iterator( );
while (iterator.hasNext( )) {
SelectionKey key = (SelectionKey) (iterator.next( ));
// Remove key from set so we don't process it twice
iterator.remove( );
// operate on the channel...
}
we've dealt with it, and the
Selector doesn't need to keep
giving it back to us every time we call select( ).
The Selector will add the channel back into the
ready set when select( ) is called again if the
channel becomes ready again. It's really important
to remove the key from the ready set here, though.If the ready channel is the server channel, the program accepts a new
socket channel and adds it to the selector. If the ready channel is a
socket channel, the program writes as much of the buffer as it can
onto the channel. If no channels are ready, the selector waits for
one. One thread, the main thread, processes multiple simultaneous
connections.In this case, it's easy to tell whether a client or
a server channel has been selected because the server channel will
only be ready for accepting and the client channels will only be
ready for writing. Both of these are I/O operations, and both can
throw IOExceptions for a variety of reasons, so
you'll want to wrap this all in a
try block.
try {Writing the data onto the channel is easy. Retrieve the
if (key.isAcceptable( )) {
ServerSocketChannel server = (ServerSocketChannel )
key.channel( );
SocketChannel connection = server.accept( );
connection.configureBlocking(false);
connection.register(selector,
SelectionKey.OP_WRITE);
// set up the buffer for the client...
}
else if (key.isWritable( )) {
SocketChannel client = (SocketChannel ) key.channel( );
// write data to client...
}
}
key's attachment, cast it to
ByteBuffer, and call hasRemaining() to check whether there's any unwritten
data left in the buffer. If there is, write it. Otherwise, refill the
buffer with the next line of data from the
rotation array and write that.
ByteBuffer buffer = (ByteBuffer) key.attachment( );The algorithm that figures out where to grab the next line of data
if (!buffer.hasRemaining( )) {
// Refill the buffer with the next line
// Figure out where the last line started
buffer.rewind( );
int first = buffer.get( );
// Increment to the next character
buffer.rewind( );
int position = first - ' ' + 1;
buffer.put(rotation, position, 72);
buffer.put((byte) '\r');
buffer.put((byte) '\n');
buffer.flip( );
}
client.write(buffer);
relies on the characters being stored in the
rotation array in ASCII order. It should be
familiar to anyone who learned C from Kernighan and Ritchie, but for
the rest of us it needs a little explanation. buffer.get() reads the first byte of data from the buffer. From this
number we subtract the space character (32) because
that's the first character in the
rotation array. This tells us which index in the
array the buffer currently starts at. We add 1 to find the start of
the next line and refill the buffer.In the chargen protocol, the server never closes the connection. It
waits for the client to break the socket. When this happens, an
exception will be thrown. Cancel the key and close the corresponding
channel:
catch (IOException ex) {Example 12-2 puts this all together in a complete
key.cancel( );
try {
// You can still get the channel from the key after cancelling the key.
key.channel( ).close( );
}
catch (IOException cex) {
}
}
chargen
server that processes multiple connections efficiently in a single
thread.
Example 12-2. A non-blocking chargen server
import java.nio.*;This example only uses one thread. There are situations where you
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;
public class ChargenServer {
public static int DEFAULT_PORT = 19;
public static void main(String[] args) {
int port;
try {
port = Integer.parseInt(args[0]);
}
catch (Exception ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
byte[] rotation = new byte[95*2];
for (byte i = ' '; i <= '~'; i++) {
rotation[i-' '] = i;
rotation[i+95-' '] = i;
}
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open( );
ServerSocket ss = serverChannel.socket( );
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open( );
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
catch (IOException ex) {
ex.printStackTrace( );
return;
}
while (true) {
try {
selector.select( );
}
catch (IOException ex) {
ex.printStackTrace( );
break;
}
Set readyKeys = selector.selectedKeys( );
Iterator iterator = readyKeys.iterator( );
while (iterator.hasNext( )) {
SelectionKey key = (SelectionKey) iterator.next( );
iterator.remove( );
try {
if (key.isAcceptable( )) {
ServerSocketChannel server = (ServerSocketChannel) key.channel( );
SocketChannel client = server.accept( );
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey key2 = client.register(selector, SelectionKey.
OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(74);
buffer.put(rotation, 0, 72);
buffer.put((byte) '\r');
buffer.put((byte) '\n');
buffer.flip( );
key2.attach(buffer);
}
else if (key.isWritable( )) {
SocketChannel client = (SocketChannel) key.channel( );
ByteBuffer buffer = (ByteBuffer) key.attachment( );
if (!buffer.hasRemaining( )) {
// Refill the buffer with the next line
buffer.rewind( );
// Get the old first character
int first = buffer.get( );
// Get ready to change the data in the buffer
buffer.rewind( );
// Find the new first characters position in rotation
int position = first - ' ' + 1;
// copy the data from rotation into the buffer
buffer.put(rotation, position, 72);
// Store a line break at the end of the buffer
buffer.put((byte) '\r');
buffer.put((byte) '\n');
// Prepare the buffer for writing
buffer.flip( );
}
client.write(buffer);
}
}
catch (IOException ex) {
key.cancel( );
try {
key.channel( ).close( );
}
catch (IOException cex) {}
}
}
}
}
}
might still want to use multiple threads, especially if different
operations have different priorities. For instance, you might want to
accept new connections in one high priority thread and service
existing connections in a lower priority thread. However,
you're no longer required to have a 1:1 ratio
between threads and connections, which dramatically improves the
scalability of servers written in Java.It may also be important to use multiple threads for maximum
performance. Multiple threads allow the server to take advantage of
multiple CPUs. Even with a single CPU, it's often a
good idea to separate the accepting thread from the processing
threads. The thread pools discussed in Chapter 5 are still relevant even with the new I/O
model. The thread that accepts the connections can add the
connections it's accepted into the queue for
processing by the threads in the pool. This is still faster than
doing the same thing without selectors because select() ensures you're never wasting any time on
connections that aren't ready to receive data. On
the other hand, the synchronization issues here are quite tricky, so
don't attempt this solution until profiling proves
there is a bottleneck.