Introduction
Java 7 asynchronous socket API is simple, but does not look like a complete solution. The main question is how the completion handler should look like. Another problem is that it is not possible to start new read/write operation while the last read/write operation is not complete, and it would be nice to manage it concurrently.
Using the Code
Here is a class solving the problems described above for the write
operations, moreover it can be used concurrently. It contains a queue for all pending write
requests. We cannot use any concurrent.* containers because none of them has important functionality needed here. But still, lock-free implementation is still possible. Let me know if it is interesting. :)
public static class AsynchronousSocketChannelWrapper
{
private final AsynchronousSocketChannel asynchronousSocketChannel;
private final Handler handler;
private final ReentrantLock lock;
private final Queue<ByteBuffer> queue;
private final ByteBuffer [] iov;
private boolean closed;
private class Handler implements CompletionHandler<Long, Integer>
{
public void completed( Long result, Integer attachment )
{
int iovc = 0;
lock.lock();
try {
int idx = 0;
for (;;) {
final ByteBuffer byteBuffer = queue.peek();
assert( byteBuffer == iov[idx] );
if (byteBuffer.remaining() > 0) {
assert( false );
}
iov[idx] = null;
queue.poll();
if ((++idx == iov.length) || (iov[idx] == null)) {
break;
}
}
if (queue.isEmpty())
return;
final Iterator<ByteBuffer> it = queue.iterator();
do {
iov[iovc] = it.next();
if (++iovc == iov.length)
break;
}
while (it.hasNext());
}
finally {
lock.unlock();
}
assert( iovc > 0 );
asynchronousSocketChannel.write(
iov, 0, iovc, 0, TimeUnit.SECONDS, null, this );
}
public void failed( Throwable exc, Integer attachment )
{
lock.lock();
try {
closed = true;
queue.clear();
}
finally {
lock.unlock();
}
}
}
public AsynchronousSocketChannelWrapper(
AsynchronousSocketChannel asynchronousSocketChannel )
{
this.asynchronousSocketChannel = asynchronousSocketChannel;
this.handler = new Handler();
this.lock = new ReentrantLock();
this.queue = new LinkedList<ByteBuffer>();
this.iov = new ByteBuffer[16];
}
public boolean write( ByteBuffer byteBuffer )
{
lock.lock();
try {
if (closed)
return false;
final boolean wasEmpty = queue.isEmpty();
queue.add( byteBuffer );
if (!wasEmpty)
return true;
}
finally {
lock.unlock();
}
iov[0] = byteBuffer;
asynchronousSocketChannel.write(
iov, 0, 1, 0, TimeUnit.SECONDS, null, handler );
return true;
}
}
How to Use?
Just wrap the AsynchronousSocketChannel
object with instance of this class and use its write()
function for write
operations. write()
function returns true
if the operation was enqueued for writing, false
if underlying socket channel is closed.