Improving the Performance of Netty Server in JAVA

I have a situation like: My Netty Server will be getting data from a Client at a blazing speed. I think the client is using somewhat PUSH mechanism for that speed. I don't know what exactly PUSH - POP mechanism is, but I do feel that the Client is using some mechanism for sending data at a very high speed.Now my requirement is, I wrote a simple TCP Netty server that receives data from the client and just adds to the BlockingQueue implemented using ArrayBlockingQueue. Now , as Netty is event based, the time taken to accept the data and store it in a queue is some what more , this is raising an exception at the client side that the Netty server is not running.But my server is running perfectly, but the time to accept single data and store it in the queue is more. How can I prevent this? Is there any fastest queue for this situation? I nam using BlockingQueue as another thread will take data from the queue and process it. So I need a synchronized queue. How can I improve the performance of the Server or is there any way to insert data at a very high speed? All I care about is latency. The latency needs to be as low as possible.

My Server code:

public class Server implements Runnable {
private final int port;

static String message;
Channel channel;
ChannelFuture channelFuture;
int rcvBuf, sndBuf, lowWaterMark, highWaterMark;

public Server(int port) {
    this.port = port;
    rcvBuf = 2048;
    sndBuf =   2048;
    lowWaterMark = 1024;
    highWaterMark = 2048;
}

@Override
public void run() {
    try {
        startServer();

    } catch (Exception ex) {
       System.err.println("Error in  Server : "+ex);
       Logger.error(ex.getMessage());
    }
}

public void startServer()  {
   // System.out.println("8888 Server started");
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(group)
         .channel(NioServerSocketChannel.class)
         .localAddress(new InetSocketAddress(port))
         .childOption(ChannelOption.SO_RCVBUF, rcvBuf * 2048)
         .childOption(ChannelOption.SO_SNDBUF, sndBuf * 2048)
         .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048))
         .childOption(ChannelOption.TCP_NODELAY, true)

             .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        channel = ch;
                        System.err.println("OMS connected : " + ch.localAddress());
                        ch.pipeline().addLast(new ReceiveFromOMSDecoder());
                    }
                });
        channelFuture = b.bind(port).sync();
        this.channel = channelFuture.channel();
        channelFuture.channel().closeFuture().sync();

    } catch (InterruptedException ex) {
        System.err.println("Exception raised in SendToOMS class"+ex);
    } finally {
        group.shutdownGracefully();
    }
}

}

My ServerHandler code:

   @Sharable
public class ReceiveFromOMSDecoder extends MessageToMessageDecoder<ByteBuf> {


    private  Charset charset;
    public ReceiveFromOMSDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public ReceiveFromOMSDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {

       String buffer = msg.toString(charset);
       if(buffer!=null){
        Server.sq.insertStringIntoSendingQueue(buffer);  //inserting into queue
      }
        else{
            Logger.error("Null string received"+buffer);
        }
    }
       @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      //  Logger.error(cause.getMessage());
        System.err.println(cause);
    }
}

Three quickies:

  • Doesn't look like you're sending a response. You probably should.
  • Don't block the IO thread. Use an EventExecutorGroup to dispatch the handling of the incoming payload. ie something like ChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler).
  • Just don't block in general. Ditch your ArrayBlockingQueue and take a look at JCTools or some other implementation to find a non-blocking analog.
  • 链接地址: http://www.djcxy.com/p/95226.html

    上一篇: 如何在HTML / CSS / JS中为表/列表应用行虚拟化?

    下一篇: 在JAVA中提高Netty Server的性能