Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Java

AsynchronousSocketChannel Concurrent Writes

0.00/5 (No votes)
28 Apr 2014CPOL 10.5K  
AsynchronousSocketChannel concurrent writes

Introduction

Java 7 asynchronous socket API is simple, but does not look like a complete solution. The main question is how the completion handler should look like. Another problem is that it is not possible to start new read/write operation while the last read/write operation is not complete, and it would be nice to manage it concurrently.

Using the Code

Here is a class solving the problems described above for the write operations, moreover it can be used concurrently. It contains a queue for all pending write requests. We cannot use any concurrent.* containers because none of them has important functionality needed here. But still, lock-free implementation is still possible. Let me know if it is interesting. :)

Java
public static class AsynchronousSocketChannelWrapper
{
    private final AsynchronousSocketChannel asynchronousSocketChannel;
    private final Handler handler;
    private final ReentrantLock lock;
    private final Queue<ByteBuffer> queue;
    private final ByteBuffer [] iov;
    private boolean closed;

    private class Handler implements CompletionHandler<Long, Integer>
    {
        public void completed( Long result, Integer attachment )
        {
            /* Called when the write operation completed. */
            int iovc = 0;
            lock.lock();
            try {
                /* Remove all sent buffers from the queue. */
                int idx = 0;
                for (;;) {
                    final ByteBuffer byteBuffer = queue.peek();
                    assert( byteBuffer == iov[idx] );
                    if (byteBuffer.remaining() > 0) {
                        /* Nobody knows will it happen or not,
                         * let's assume will not.
                         */
                        assert( false );
                    }

                    iov[idx] = null;
                    queue.poll();

                    if ((++idx == iov.length) || (iov[idx] == null)) {
                        break;
                    }
                }

                if (queue.isEmpty())
                    return;

                /* Queue is not empty, let's schedule new write requests.
                 * Would be stupid to schedule them one by one if more than one,
                 * let's join them though by 16.
                 */
                final Iterator<ByteBuffer> it = queue.iterator();
                do {
                    iov[iovc] = it.next();
                    if (++iovc == iov.length)
                        break;
                }
                while (it.hasNext());
            }
            finally {
                lock.unlock();
            }

            assert( iovc > 0 );
            asynchronousSocketChannel.write(
                    iov, 0, iovc, 0, TimeUnit.SECONDS, null, this );
        }

        public void failed( Throwable exc, Integer attachment )
        {
            /* Called when the write operation failed,
             * most probably the underlying socket is being closed.
             */
            lock.lock();
            try {
                closed = true;
                queue.clear();
            }
            finally {
                lock.unlock();
            }
        }
    }

    public AsynchronousSocketChannelWrapper(
            AsynchronousSocketChannel asynchronousSocketChannel )
    {
        this.asynchronousSocketChannel = asynchronousSocketChannel;
        this.handler = new Handler();
        this.lock = new ReentrantLock();
        this.queue = new LinkedList<ByteBuffer>();
        this.iov = new ByteBuffer[16];
    }

    public boolean write( ByteBuffer byteBuffer )
    {
        lock.lock();
        try {
            if (closed)
                return false;

            final boolean wasEmpty = queue.isEmpty();
            queue.add( byteBuffer );

            if (!wasEmpty)
                return true;
        }
        finally {
            lock.unlock();
        }

        iov[0] = byteBuffer;
        asynchronousSocketChannel.write(
                iov, 0, 1, 0, TimeUnit.SECONDS, null, handler );

        return true;
    }
}

How to Use?

Just wrap the AsynchronousSocketChannel object with instance of this class and use its write() function for write operations. write() function returns true if the operation was enqueued for writing, false if underlying socket channel is closed.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)