在JAVA中提高Netty Server的性能

我有这样的情况:我的Netty服务器将以惊人的速度从客户端获取数据。 我认为客户正在使用某种速度的PUSH机制。 我不知道PUSH-POP机制究竟是什么,但我确实认为客户端正在使用某种机制以非常高的速度发送数据。现在我的要求是,我编写了一个简单的TCP Netty服务器,它接收来自客户端,并添加到使用ArrayBlockingQueue实现的BlockingQueue中。 现在,由于Netty是基于事件的,因此接收数据并将其存储在队列中所需的时间就更多了,这就引发了客户端的一个异常情况,即Netty服务器没有运行。但是我的服务器运行得很完美,但接受单个数据并将其存储在队列中的时间更多。 我怎样才能防止这一点? 这种情况有没有最快排队? 我使用BlockingQueue,因为另一个线程会从队列中取数据并处理它。 所以我需要一个同步队列。 我该如何提高服务器的性能,或者有什么方法以非常高的速度插入数据? 我所关心的只是延迟。 延迟需要尽可能低。

我的服务器代码:

public class Server implements Runnable {
private final int port;

static String message;
Channel channel;
ChannelFuture channelFuture;
int rcvBuf, sndBuf, lowWaterMark, highWaterMark;

public Server(int port) {
    this.port = port;
    rcvBuf = 2048;
    sndBuf =   2048;
    lowWaterMark = 1024;
    highWaterMark = 2048;
}

@Override
public void run() {
    try {
        startServer();

    } catch (Exception ex) {
       System.err.println("Error in  Server : "+ex);
       Logger.error(ex.getMessage());
    }
}

public void startServer()  {
   // System.out.println("8888 Server started");
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(group)
         .channel(NioServerSocketChannel.class)
         .localAddress(new InetSocketAddress(port))
         .childOption(ChannelOption.SO_RCVBUF, rcvBuf * 2048)
         .childOption(ChannelOption.SO_SNDBUF, sndBuf * 2048)
         .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048))
         .childOption(ChannelOption.TCP_NODELAY, true)

             .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        channel = ch;
                        System.err.println("OMS connected : " + ch.localAddress());
                        ch.pipeline().addLast(new ReceiveFromOMSDecoder());
                    }
                });
        channelFuture = b.bind(port).sync();
        this.channel = channelFuture.channel();
        channelFuture.channel().closeFuture().sync();

    } catch (InterruptedException ex) {
        System.err.println("Exception raised in SendToOMS class"+ex);
    } finally {
        group.shutdownGracefully();
    }
}

}

我的ServerHandler代码:

   @Sharable
public class ReceiveFromOMSDecoder extends MessageToMessageDecoder<ByteBuf> {


    private  Charset charset;
    public ReceiveFromOMSDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public ReceiveFromOMSDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {

       String buffer = msg.toString(charset);
       if(buffer!=null){
        Server.sq.insertStringIntoSendingQueue(buffer);  //inserting into queue
      }
        else{
            Logger.error("Null string received"+buffer);
        }
    }
       @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      //  Logger.error(cause.getMessage());
        System.err.println(cause);
    }
}

快三点:

  • 看起来你并没有发送回应。 你可能应该。
  • 不要阻塞IO线程。 使用EventExecutorGroup来分派传入有效负载的处理。 即类似ChannelPipeline.addLast(EventExecutorGroup组,String名称,ChannelHandler处理程序)。
  • 一般不要阻止。 抛开你的ArrayBlockingQueue,看看JCTools或其他一些实现来寻找一个非阻塞模拟。
  • 链接地址: http://www.djcxy.com/p/95225.html

    上一篇: Improving the Performance of Netty Server in JAVA

    下一篇: Is it possible to use Netty in full duplex TCP communication?