Netty与阻塞套接字性能

我有一个简单的独立客户端 - 服务器测试,其中客户端向服务器发送500个字节,服务器返回2000字节作为响应。 它运行在一个循环中,并且每打印50000个这样的请求/响应调用时间。 我比较了基于阻塞套接字API,Netty和NIO2的三种实现的性能。 测试表明,阻塞套接字执行速度比Netty或NIO2快得多。 我明白这个测试没有并发性,NIO是为此而设计的。 不过,这种性能差异是否有解释,还是我做得非常低效? 有没有什么办法来改进基于Netty的代码来实现接近阻塞套接字的性能? 我尝试使用直接缓冲区读取 - 没有显着差异。

在千兆网络中的两台Linux服务器上运行java 1.7.0_55测试。 这些测试的前四个读数结果(以毫秒为单位)为:

  • 阻止:9754,9307,9305
  • Netty:14879,11872,11781
  • NIO2:14474,12117,12149
  • 另一个谜团是Netty和NIO2的实现在一开始就运行缓慢,然后趋于稳定。 在Netty案例中,大约10000次循环后会发生稳定。

    以下是源代码。

    Config.java - 被所有三种实现使用

    public class Config {
        static final String HOST = "192.168.1.121";
        static final int PORT = 10000;
    
        static int requestLength = 500;
        static int responseLength = 2000;
        static int numOfCalls = 50000;
    
        static byte[] request = new byte[requestLength];
        static byte[] response = new byte[responseLength];
    }
    

    BlockingClient.java

    public class BlockingClient {
    
        public static void main(String[] args) {
            Socket socket = null;
            try {
                socket = new Socket(Config.HOST, Config.PORT);
    
                InputStream is = socket.getInputStream();
                OutputStream os = socket.getOutputStream();
                int callCount = 0;
    
                long startTime = System.currentTimeMillis();
    
                while (true) {
                    os.write(Config.request);
                    read(is, Config.response);
                    callCount++;
                    if (callCount == Config.numOfCalls) {
                        System.out.println("numOfcalls=" + Config.numOfCalls + " time: " + (System.currentTimeMillis() - startTime));
                        callCount = 0;
                        startTime = System.currentTimeMillis();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void read(InputStream is, byte[] bytes) throws IOException {
            int num = 0;
            while(num < bytes.length) {
                num += is.read(bytes, num, bytes.length - num);
            }
        }
    
    }
    

    BlockingServer.java

    public class BlockingServer {
    
        public static void main(String[] args) {
            try {
                ServerSocket srvSocket = new ServerSocket(Config.PORT);
    
                while (true) {
                    final Socket socket = srvSocket.accept();
    
                    new Thread() {
                        @Override
                        public void run() {
                            try {
                                InputStream is = socket.getInputStream();
                                OutputStream os = socket.getOutputStream();
                                while (true) {
                                    BlockingClient.read(is, Config.request);
                                    os.write(Config.response);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                try {
                                    socket.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }.start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    

    NettyClient.java

        public final class NettyClient {
    
            public static void main(String[] args) throws Exception {
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    Bootstrap b = new Bootstrap();
                    b.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(
                                    new NettyClientHandler());
                        }
                     });
    
                    b.connect(Config.HOST, Config.PORT).sync().channel().closeFuture().sync();
    
                } finally {
                    group.shutdownGracefully();
                }
            }
        }
    

    NettyClientHandler.java

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        private static ByteBuf responseBuf = Unpooled.wrappedBuffer(Config.response).clear();
        //private static ByteBuf responseBuf = Unpooled.directBuffer(Config.responseLength).clear();
    
        private int readLen = 0;
        private int callCount = 0;
        private long startTime;
        private long chunks = 0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // Send the first message
            initLog();
            writeRequest(ctx);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
    
            int received = buf.readableBytes();
            responseBuf.writeBytes(buf);
            readLen += received;
            chunks++;
    
            if (readLen == Config.responseLength) {
                if (responseBuf.isWritable()) {
                    System.out.println("Error. responseBuf.isWritable()==true");
                }
                readLen = 0;
                responseBuf.clear();
    
                if (callCount++ == Config.numOfCalls - 1) {
                    doLog();
                    initLog();
                }
                writeRequest(ctx);
    
            } else if (readLen > Config.responseLength) {
                System.out.println("Error. readLen is too big: " + readLen);
            }
    
            buf.release();
    
        }
    
        private void initLog() {
            callCount = 0;
            chunks = 0;
            startTime = System.currentTimeMillis();
        }
    
        private void doLog() {
            System.out.println(Config.numOfCalls + " performed in " + chunks + " chunks, time: "+ (System.currentTimeMillis() - startTime));
        }
    
        private void writeRequest(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.request));
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    NettyServer.java

    public final class NettyServer {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup(1);
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group, group)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(
                                new NettyServerHandler()
                        );
                    }
                 });
    
                b.bind(Config.PORT).sync().channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    

    NettyServerHandler.java

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        private static ByteBuf requestBuf = Unpooled.wrappedBuffer(Config.request).clear();
        //private static ByteBuf requestBuf = Unpooled.directBuffer(Config.requestLength).clear();;
    
        private int readLen = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
    
            int received = buf.readableBytes();
            requestBuf.writeBytes(buf);
            readLen += received;
    
            if (readLen == Config.requestLength) {
                if (requestBuf.isWritable()) {
                    System.out.println("requestBuf.isWritable");
                }
                readLen = 0;
                requestBuf.clear();
                writeResponse(ctx);
            } else if (readLen > Config.responseLength) {
                System.out.println("readLen is too big: " + readLen);
            }
    
            buf.release();
    
        }
    
        private void writeResponse(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.response));
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    Nio2Base.java

    public abstract class Nio2Base {
    
        public static int numOfCalls = 50000;
    
        abstract ByteBuffer getWriteBuffer();
        abstract ByteBuffer getReadBuffer();
        abstract void messageReceived(ByteBuffer buffer);
    
        protected class ReadHandler implements CompletionHandler<Integer, Void> {
            private AsynchronousSocketChannel channel;
            private ByteBuffer buffer;
    
            ReadHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
                this.channel = channel;
                this.buffer = buffer;
            }
    
            @Override
            public void completed(Integer result, Void a) {
                if (buffer.hasRemaining()) {
                    channel.read(buffer, null, this);
                } else {
                    messageReceived(buffer);
                    buffer.clear();
                    ByteBuffer writeBuffer = getWriteBuffer();
                    channel.write(writeBuffer, null, new WriteHandler(channel, writeBuffer));
                }
    
            }
    
            @Override
            public void failed(Throwable exc, Void a) {
                exc.printStackTrace();
            }
    
        }
    
        protected class WriteHandler implements CompletionHandler<Integer, Void> {
            private AsynchronousSocketChannel channel;
            private ByteBuffer buffer;
    
            WriteHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
                this.channel = channel;
                this.buffer = buffer;
            }
    
            @Override
            public void completed(Integer result, Void attachment) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, null, this);
                } else {
                    buffer.clear();
                    ByteBuffer readBuffer = getReadBuffer();
                    channel.read(readBuffer, null, new ReadHandler(channel, readBuffer));
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        }
    
    }
    

    Nio2Client.java

    public class Nio2Client extends Nio2Base {
    
        private static ByteBuffer requestBuffer = ByteBuffer.wrap(Config.request);
        private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.response);
    
        private int count;
        private long startTime;
        private AsynchronousSocketChannel channel;
    
        public static void main(String[] args) throws Exception {
            new Nio2Client().init();
    
            // Wait
            System.in.read();
        }
    
        public void init() {
            // create an asynchronous socket channel bound to the default group
            try {
                channel = AsynchronousSocketChannel.open();
                if (channel.isOpen()) {
                    // connect this channel's socket
                    channel.connect(new InetSocketAddress(Config.HOST, Config.PORT), null, new ConnectHandler(channel));
                } else {
                    System.out.println("The asynchronous socket channel cannot be opened!");
                }
            } catch (IOException ex) {
                System.err.println(ex);
            }
        }
    
        private class ConnectHandler implements CompletionHandler<Void, Void> {
            private AsynchronousSocketChannel channel;
    
            public ConnectHandler(AsynchronousSocketChannel channel) {
                this.channel = channel;
            }
    
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    System.out.println("Successfully connected at: " + channel.getRemoteAddress());
                    ByteBuffer buffer = getWriteBuffer();
                    startTime = System.currentTimeMillis();
                    count = 0;
                    channel.write(buffer, null, new WriteHandler(channel, buffer));
    
                } catch (Exception e) {
                    System.err.println(e);
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                throw new UnsupportedOperationException("Connection cannot be established!");
            }
    
        }
    
        @Override
        ByteBuffer getWriteBuffer() {
            ByteBuffer ret = requestBuffer.duplicate();
            ret.position(ret.capacity());
            ret.flip();
            return ret;
        }
    
        @Override
        ByteBuffer getReadBuffer() {
            return (ByteBuffer)readBuffer.clear();
        }
    
        @Override
        void messageReceived(ByteBuffer buffer) {
    
            count++;
    
            if (count == numOfCalls) {
    
                System.out.println("Calls: " + count + " time: " + (System.currentTimeMillis() - startTime));
    
                count = 0;
                startTime = System.currentTimeMillis();
            }
        }
    
    }
    

    Nio2Server.java

    public class Nio2Server extends Nio2Base {
    
        private static byte[] response = new byte[Config.responseLength];
        private static ByteBuffer responseBuffer = ByteBuffer.wrap(response);
        private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.request);
    
    
        public static void main(String[] args) {
            new Nio2Server().init();
        }
    
        public void init() {
            // create an asynchronous server socket channel bound to the default group
            try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) {
                if (serverChannel.isOpen()) {
    
                    // bind the server socket channel to local address
                    serverChannel.bind(new InetSocketAddress(Config.HOST, Config.PORT));
    
                    // display a waiting message while ... waiting clients
                    System.out.println("Waiting for connections ...");
    
                    AcceptHandler acceptHandler = new AcceptHandler(serverChannel);
    
                    serverChannel.accept(null, acceptHandler);
    
                    // Wait
                    System.in.read();
    
                } else {
                    System.out.println("The asynchronous server-socket channel cannot be opened!");
                }
            } catch (IOException ex) {
                System.err.println(ex);
            }
        }
    
        private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
            private AsynchronousServerSocketChannel serverChannel;
    
            public AcceptHandler(AsynchronousServerSocketChannel serverChannel) {
                this.serverChannel = serverChannel;
            }
    
            @Override
            public void completed(AsynchronousSocketChannel channel, Void attachment) {
    
                serverChannel.accept(null, this);
    
                ByteBuffer buffer = getReadBuffer();
    
                try {
                    System.out.println("Incoming connection from: " + channel.getRemoteAddress());
    
                    channel.read(buffer, null, new ReadHandler(channel, buffer));
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                serverChannel.accept(null, this);
                throw new UnsupportedOperationException("Cannot accept connections!");
            }
        }
    
        @Override
        ByteBuffer getWriteBuffer() {
            return responseBuffer.duplicate();
        }
    
        @Override
        ByteBuffer getReadBuffer() {
            return (ByteBuffer)readBuffer.clear();
        }
    
        @Override
        void messageReceived(ByteBuffer buffer) {
        }
    
    
    }
    

    我假设缓慢的热身是因为JIT需要时间预热,而NIO比简单的阻塞IO复杂得多(并且需要更多的优化)。 我认为,只有少数客户端时,阻止IO的性能会更高,因为netty和NIO的复杂性会导致开销。 但是,NIO比阻塞IO更具可扩展性(尤其是使用netty的epoll后端),并且可以轻松处理数千个客户端。

    另外,最重要的是,不成熟的优化是所有邪恶的根源。 如果你正在编写一个简单的命令行应用程序,netty和NIO是过度杀毒,你应该坚持阻止IO。 但是,如果您打算编写健壮,可维护且高质量的网络应用程序,则应使用netty作为。 如果你后来决定从NIO切换到阻塞IO,你可以无缝地使用netty,因为netty也有一个阻塞的io后端,他们'推荐少量连接'。

    链接地址: http://www.djcxy.com/p/95217.html

    上一篇: Netty versus blocking socket performance

    下一篇: Can I call `Channel.write()` in non