是否可以在全双工TCP通信中使用Netty?

Netty似乎只能用一个TCP连接来处理读或写操作,但不能同时处理。 我有一个客户端连接到用Netty编写的echo服务器应用程序,并发送大约200k条消息。

echo服务器只接受客户端连接并发送客户端发送的任何消息。

问题是我无法使Netty在全双工模式下与TCP连接一起工作。 我想在服务器端同时处理读写操作。 在我的情况下,Netty从客户端读取所有消息,然后将其发送回来,从而导致高延迟。

客户端应用程序为每个连接触发两个线程。 一个用于任何写入操作,另一个用于读取操作。 而且,客户端是用普通的旧Java IO风格编写的。

也许这个问题在某种程度上与我在服务器端设置的TCP选项有关:

    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, bufferWatermarkHigh)
    .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, bufferWatermarkLow)
    .childOption(ChannelOption.SO_RCVBUF, bufferInSize)
    .childOption(ChannelOption.SO_SNDBUF, bufferOutSize)
    .childOption(ChannelOption.SO_REUSEADDR, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.TCP_NODELAY, true);

在你使用github仓库提供的例子中,有很多错误的东西:

  • 您可以直接从channelActive方法编写

    在netty中,有一个策略是每个处理程序只有一个同时执行的传入方法,这是为了使开发更容易,并确保类的方法以正确的顺序执行,但它也确保方法的副作用在其他类中可见。

  • 您正在打印出channelReadComplete的消息
  • channelReadComplete在消息的当前缓冲区被清除后调用, channelRead可能会在被调用之前调用多次。

  • 缺少成帧器
  • 构建消息或计算消息的大小是检测有多少字节进入内存的方式。 对于2客户端写入可能是1在没有这个成帧器的服务器读取,作为测试,我用了一个new io.netty.handler.codec.FixedLengthFrameDecoder(120) ,所以我可以使用i++计算消息的数量到达服务器和客户端。

  • **使用重磅印刷轻量级操作。

    根据我的分析器,大部分时间都是在LOG.info()调用,这对记录器来说很常见,因为他们在后台执行很多操作,例如在输出流中进行同步。 通过让记录器只记录每1000个字节的消息,我得到了巨大的速度提升(而且由于我运行双核心,所以速度非常慢)

  • 繁重的发送代码

    发送代码每次都会重新创建ByteBuf 。 通过重用ByteBuf您可以ByteBuf地提高发送速度,您可以通过创建1次ByteBuf,然后每次通过它时调用.retain()来完成此操作。

    这很容易完成:

    ByteBuf buf = createMessage(MESSAGE_SIZE);
    for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
        ctx.writeAndFlush(buf.retain());
    }
    
  • 减少潮红的数量

    通过减少冲水量,您可以获得更高的本机性能。 每次调用flush()都是调用网络堆栈来发送未决消息。 如果我们将该规则应用于上面的代码,它将提供以下代码:

    ByteBuf buf = createMessage(MESSAGE_SIZE);
    for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
        ctx.write(buf.retain());
    }
    ctx.flush();
    
  • 最终的代码

    有时候,你只是想看到结果,并为自己尝试:

    App.java(不变)

    public class App {
      public static void main( String[] args ) throws InterruptedException {
        final int PORT = 8080;
        runInSeparateThread(() -> new Server(PORT));
        runInSeparateThread(() -> new Client(PORT));
      }
      private static void runInSeparateThread(Runnable runnable) {
        new Thread(runnable).start();
      }
    }
    

    Client.java

    public class Client {
      public Client(int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
          ChannelFuture channelFuture = createBootstrap(group).connect("192.168.171.102", port).sync();
          channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          group.shutdownGracefully();
        }
      }
      private Bootstrap createBootstrap(EventLoopGroup group) {
        return new Bootstrap().group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(
                new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
                    ch.pipeline().addLast(new ClientHandler());
                  }
                }
            );
      }
    }
    

    ClientHandler.java

    public class ClientHandler extends ChannelInboundHandlerAdapter {
      private final Logger LOG = LoggerFactory.getLogger(ClientHandler.class.getSimpleName());
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final int MESSAGE_SIZE = 200;
        final int NUMBER_OF_MESSAGES = 200000;
        new Thread(()->{
        ByteBuf buf = createMessage(MESSAGE_SIZE);
        for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
          ctx.writeAndFlush(buf.retain());
        }}).start();
      }
      int i;
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(i++%10000==0)
        LOG.info("Got a message back from the server "+(i));
        ((io.netty.util.ReferenceCounted)msg).release();
      }
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
      }
      private ByteBuf createMessage(int size) {
        ByteBuf message = Unpooled.buffer(size);
        for (int i = 0; i < size; ++i) {
          message.writeByte((byte) i);
        }
        return message;
      }
    }
    

    Server.java

    public class Server {
      public Server(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
          ChannelFuture channelFuture = createServerBootstrap(bossGroup, workerGroup).bind(port).sync();
          channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
      private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup,
                                                    EventLoopGroup workerGroup) {
        return new ServerBootstrap().group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
                 ch.pipeline().addLast(new ServerHandler());
              }
            });
      }
    }
    

    ServerHandler.java

    public class ServerHandler extends ChannelInboundHandlerAdapter {
      private final Logger LOG = LoggerFactory.getLogger(ServerHandler.class.getSimpleName());
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.writeAndFlush(msg).addListener(f->{if(f.cause()!=null)LOG.info(f.cause().toString());});
        if(i++%10000==0)
        LOG.info("Send the message back to the client "+(i));
        ;
      }
      int i;
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       // LOG.info("Send the message back to the client "+(i++));
        ctx.flush();
      }
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
      }
    }
    

    检测结果

    我决定测试如果我更改记录的incmoing消息的频率会发生什么,这些是测试结果:

    What to print:                            max message latency    time taken*
    (always)                                  > 20000                >10 min        
    i++ % 10 == 0                             > 20000                >10 min
    i++ % 100 == 0                            16000                    4 min
    i++ % 1000 == 0                           0-3000                  51 sec
    i++ % 10000 == 0                          <10000                  22 sec
    

    *时间应该与一粒盐一起服用,没有真正的基准测试完成,只有1个快速运行的程序

    这表明通过减少呼叫记录的数量(精度),我们可以获得更好的传输速率(速度)。

    链接地址: http://www.djcxy.com/p/95223.html

    上一篇: Is it possible to use Netty in full duplex TCP communication?

    下一篇: Netty 4 creating multiple connections from client