Netty versus blocking socket performance

I have a simple standalone client-server test in which a client sends 500 bytes to a server and the server returns 2000 bytes back in response. It runs in a loop and I print time for every 50000 of such request/response calls. I compare performance of three implementations based on a blocking socket API, Netty and NIO2. The test shows that blocking socket performs significantly faster than Netty or NIO2. I understand that there is no concurrency in this test, which NIO was designed for. Still, does this difference in performance have an explanation or do I do something very inefficiently? Is there any way to improve Netty-based code to achieve performance close to blocking socket? I tried using a direct buffer for reading – no significant difference.

Tests were run with java 1.7.0_55 on two Linux servers in a gigabit network. Results of first four readings from these tests, in milliseconds, were:

  • Blocking: 9754, 9307, 9305
  • Netty: 14879, 11872, 11781
  • NIO2: 14474, 12117, 12149
  • Another mystery is that Netty and NIO2 implementations run slow at the beginning and then stabilize. In a Netty case, stabilization happens after about 10000 cycles.

    Below is the source code.

    Config.java - used by all three implementations

    public class Config {
        static final String HOST = "192.168.1.121";
        static final int PORT = 10000;
    
        static int requestLength = 500;
        static int responseLength = 2000;
        static int numOfCalls = 50000;
    
        static byte[] request = new byte[requestLength];
        static byte[] response = new byte[responseLength];
    }
    

    BlockingClient.java

    public class BlockingClient {
    
        public static void main(String[] args) {
            Socket socket = null;
            try {
                socket = new Socket(Config.HOST, Config.PORT);
    
                InputStream is = socket.getInputStream();
                OutputStream os = socket.getOutputStream();
                int callCount = 0;
    
                long startTime = System.currentTimeMillis();
    
                while (true) {
                    os.write(Config.request);
                    read(is, Config.response);
                    callCount++;
                    if (callCount == Config.numOfCalls) {
                        System.out.println("numOfcalls=" + Config.numOfCalls + " time: " + (System.currentTimeMillis() - startTime));
                        callCount = 0;
                        startTime = System.currentTimeMillis();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void read(InputStream is, byte[] bytes) throws IOException {
            int num = 0;
            while(num < bytes.length) {
                num += is.read(bytes, num, bytes.length - num);
            }
        }
    
    }
    

    BlockingServer.java

    public class BlockingServer {
    
        public static void main(String[] args) {
            try {
                ServerSocket srvSocket = new ServerSocket(Config.PORT);
    
                while (true) {
                    final Socket socket = srvSocket.accept();
    
                    new Thread() {
                        @Override
                        public void run() {
                            try {
                                InputStream is = socket.getInputStream();
                                OutputStream os = socket.getOutputStream();
                                while (true) {
                                    BlockingClient.read(is, Config.request);
                                    os.write(Config.response);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                try {
                                    socket.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }.start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    

    NettyClient.java

        public final class NettyClient {
    
            public static void main(String[] args) throws Exception {
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    Bootstrap b = new Bootstrap();
                    b.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(
                                    new NettyClientHandler());
                        }
                     });
    
                    b.connect(Config.HOST, Config.PORT).sync().channel().closeFuture().sync();
    
                } finally {
                    group.shutdownGracefully();
                }
            }
        }
    

    NettyClientHandler.java

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        private static ByteBuf responseBuf = Unpooled.wrappedBuffer(Config.response).clear();
        //private static ByteBuf responseBuf = Unpooled.directBuffer(Config.responseLength).clear();
    
        private int readLen = 0;
        private int callCount = 0;
        private long startTime;
        private long chunks = 0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // Send the first message
            initLog();
            writeRequest(ctx);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
    
            int received = buf.readableBytes();
            responseBuf.writeBytes(buf);
            readLen += received;
            chunks++;
    
            if (readLen == Config.responseLength) {
                if (responseBuf.isWritable()) {
                    System.out.println("Error. responseBuf.isWritable()==true");
                }
                readLen = 0;
                responseBuf.clear();
    
                if (callCount++ == Config.numOfCalls - 1) {
                    doLog();
                    initLog();
                }
                writeRequest(ctx);
    
            } else if (readLen > Config.responseLength) {
                System.out.println("Error. readLen is too big: " + readLen);
            }
    
            buf.release();
    
        }
    
        private void initLog() {
            callCount = 0;
            chunks = 0;
            startTime = System.currentTimeMillis();
        }
    
        private void doLog() {
            System.out.println(Config.numOfCalls + " performed in " + chunks + " chunks, time: "+ (System.currentTimeMillis() - startTime));
        }
    
        private void writeRequest(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.request));
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    NettyServer.java

    public final class NettyServer {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup(1);
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group, group)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(
                                new NettyServerHandler()
                        );
                    }
                 });
    
                b.bind(Config.PORT).sync().channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    

    NettyServerHandler.java

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        private static ByteBuf requestBuf = Unpooled.wrappedBuffer(Config.request).clear();
        //private static ByteBuf requestBuf = Unpooled.directBuffer(Config.requestLength).clear();;
    
        private int readLen = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
    
            int received = buf.readableBytes();
            requestBuf.writeBytes(buf);
            readLen += received;
    
            if (readLen == Config.requestLength) {
                if (requestBuf.isWritable()) {
                    System.out.println("requestBuf.isWritable");
                }
                readLen = 0;
                requestBuf.clear();
                writeResponse(ctx);
            } else if (readLen > Config.responseLength) {
                System.out.println("readLen is too big: " + readLen);
            }
    
            buf.release();
    
        }
    
        private void writeResponse(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.response));
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    Nio2Base.java

    public abstract class Nio2Base {
    
        public static int numOfCalls = 50000;
    
        abstract ByteBuffer getWriteBuffer();
        abstract ByteBuffer getReadBuffer();
        abstract void messageReceived(ByteBuffer buffer);
    
        protected class ReadHandler implements CompletionHandler<Integer, Void> {
            private AsynchronousSocketChannel channel;
            private ByteBuffer buffer;
    
            ReadHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
                this.channel = channel;
                this.buffer = buffer;
            }
    
            @Override
            public void completed(Integer result, Void a) {
                if (buffer.hasRemaining()) {
                    channel.read(buffer, null, this);
                } else {
                    messageReceived(buffer);
                    buffer.clear();
                    ByteBuffer writeBuffer = getWriteBuffer();
                    channel.write(writeBuffer, null, new WriteHandler(channel, writeBuffer));
                }
    
            }
    
            @Override
            public void failed(Throwable exc, Void a) {
                exc.printStackTrace();
            }
    
        }
    
        protected class WriteHandler implements CompletionHandler<Integer, Void> {
            private AsynchronousSocketChannel channel;
            private ByteBuffer buffer;
    
            WriteHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
                this.channel = channel;
                this.buffer = buffer;
            }
    
            @Override
            public void completed(Integer result, Void attachment) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, null, this);
                } else {
                    buffer.clear();
                    ByteBuffer readBuffer = getReadBuffer();
                    channel.read(readBuffer, null, new ReadHandler(channel, readBuffer));
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        }
    
    }
    

    Nio2Client.java

    public class Nio2Client extends Nio2Base {
    
        private static ByteBuffer requestBuffer = ByteBuffer.wrap(Config.request);
        private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.response);
    
        private int count;
        private long startTime;
        private AsynchronousSocketChannel channel;
    
        public static void main(String[] args) throws Exception {
            new Nio2Client().init();
    
            // Wait
            System.in.read();
        }
    
        public void init() {
            // create an asynchronous socket channel bound to the default group
            try {
                channel = AsynchronousSocketChannel.open();
                if (channel.isOpen()) {
                    // connect this channel's socket
                    channel.connect(new InetSocketAddress(Config.HOST, Config.PORT), null, new ConnectHandler(channel));
                } else {
                    System.out.println("The asynchronous socket channel cannot be opened!");
                }
            } catch (IOException ex) {
                System.err.println(ex);
            }
        }
    
        private class ConnectHandler implements CompletionHandler<Void, Void> {
            private AsynchronousSocketChannel channel;
    
            public ConnectHandler(AsynchronousSocketChannel channel) {
                this.channel = channel;
            }
    
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    System.out.println("Successfully connected at: " + channel.getRemoteAddress());
                    ByteBuffer buffer = getWriteBuffer();
                    startTime = System.currentTimeMillis();
                    count = 0;
                    channel.write(buffer, null, new WriteHandler(channel, buffer));
    
                } catch (Exception e) {
                    System.err.println(e);
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                throw new UnsupportedOperationException("Connection cannot be established!");
            }
    
        }
    
        @Override
        ByteBuffer getWriteBuffer() {
            ByteBuffer ret = requestBuffer.duplicate();
            ret.position(ret.capacity());
            ret.flip();
            return ret;
        }
    
        @Override
        ByteBuffer getReadBuffer() {
            return (ByteBuffer)readBuffer.clear();
        }
    
        @Override
        void messageReceived(ByteBuffer buffer) {
    
            count++;
    
            if (count == numOfCalls) {
    
                System.out.println("Calls: " + count + " time: " + (System.currentTimeMillis() - startTime));
    
                count = 0;
                startTime = System.currentTimeMillis();
            }
        }
    
    }
    

    Nio2Server.java

    public class Nio2Server extends Nio2Base {
    
        private static byte[] response = new byte[Config.responseLength];
        private static ByteBuffer responseBuffer = ByteBuffer.wrap(response);
        private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.request);
    
    
        public static void main(String[] args) {
            new Nio2Server().init();
        }
    
        public void init() {
            // create an asynchronous server socket channel bound to the default group
            try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) {
                if (serverChannel.isOpen()) {
    
                    // bind the server socket channel to local address
                    serverChannel.bind(new InetSocketAddress(Config.HOST, Config.PORT));
    
                    // display a waiting message while ... waiting clients
                    System.out.println("Waiting for connections ...");
    
                    AcceptHandler acceptHandler = new AcceptHandler(serverChannel);
    
                    serverChannel.accept(null, acceptHandler);
    
                    // Wait
                    System.in.read();
    
                } else {
                    System.out.println("The asynchronous server-socket channel cannot be opened!");
                }
            } catch (IOException ex) {
                System.err.println(ex);
            }
        }
    
        private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
            private AsynchronousServerSocketChannel serverChannel;
    
            public AcceptHandler(AsynchronousServerSocketChannel serverChannel) {
                this.serverChannel = serverChannel;
            }
    
            @Override
            public void completed(AsynchronousSocketChannel channel, Void attachment) {
    
                serverChannel.accept(null, this);
    
                ByteBuffer buffer = getReadBuffer();
    
                try {
                    System.out.println("Incoming connection from: " + channel.getRemoteAddress());
    
                    channel.read(buffer, null, new ReadHandler(channel, buffer));
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                serverChannel.accept(null, this);
                throw new UnsupportedOperationException("Cannot accept connections!");
            }
        }
    
        @Override
        ByteBuffer getWriteBuffer() {
            return responseBuffer.duplicate();
        }
    
        @Override
        ByteBuffer getReadBuffer() {
            return (ByteBuffer)readBuffer.clear();
        }
    
        @Override
        void messageReceived(ByteBuffer buffer) {
        }
    
    
    }
    

    I'd assume the slow warm up is because of the JIT needing time to warm up, and NIO being much more complex than simple blocking IO (and needing more optimization). I think blocking IO will be higher performance when there are only a few clients, as netty and NIO incur overhead for their complexity. However, NIO is far more scalable than blocking IO (especially with netty's epoll backend), and can handle thousands of clients easily.

    Additionally, and most importantly, premature optimization is the root of all evil. If you're writing a simple command line application, netty and NIO are overkill, and you should stick with blocking IO. However, if you intend to write a robust, maintainable, and high-quality network application, you should use netty for the . If you do later decide to switch from NIO to blocking IO, you cand do so seamlessly with netty, since netty also has a blocking io backend which they 'recommend for a small number of connections'.

    链接地址: http://www.djcxy.com/p/95218.html

    上一篇: 基于Netty的应用程序性能问题

    下一篇: Netty与阻塞套接字性能