Vert.x: correctly stream to http response in case of closed client connection

I have the following usecase for a vert.x aplication:

  • write a REST handler for a GET request
  • in this handler copy data from a ReadStream onto the response
  • This looked straight forward, in the handler I would create the ReadStream and then use a Pump to pipe the stream onto the response (WriteStream).

    I noticed however that it can happen that the client closes the HTTP connection to my handler while the pump is still active. In this situation I had expected an exception on the WriteStream instance to occurs. However this is not the case, instead the WriteStream#writeQueueFull method returns "true" which sets the ReadStream into paused mode. It now waits for a drain event, but this event is never sent, because the write connection has been closed. The result is that over time the number of open (paused) ReadStreams grows, eventually generating a leak.

    What is the correct way to handle this situation? The first aspect that looks strange is that there is no exception on the write stream. But even if I were able to figure out the error situation (eg by listening on the close event on the response), what am I supposed to do with the ReadStream? It cannot be canceled but it also cannot stay open. The approach I can think of is to pump its content into a nil stream (ie consume it but ignore its content). Overall this makes the complete pumping process pretty complicated.

    The example below shows a simple testcase. The main method makes a request against a verticle and closed the connection immediately. On the server (ie in the verticle) no exception is triggered, instead the ReadStream is locked in paused state.

    The output of the sample code is:

    request
    false
    starting to pipe ... false
    response closed: false
    writeQueueFull false
    closed ...
    writeQueueFull true
    pause: 1
    response is closed
    

    Any suggestions are highly appreciated.

    package com.ibm.wps.test.vertx;
    
    import java.io.File;
    import java.io.InputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.concurrent.CompletableFuture;
    
    import io.vertx.core.AbstractVerticle;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.buffer.Buffer;
    import io.vertx.core.file.AsyncFile;
    import io.vertx.core.file.OpenOptions;
    import io.vertx.core.http.HttpServer;
    import io.vertx.core.http.HttpServerResponse;
    import io.vertx.core.streams.Pump;
    import io.vertx.core.streams.ReadStream;
    import io.vertx.core.streams.WriteStream;
    
    public class CancellationTest {
    
    private static final class ReadStreamProxy implements ReadStream<Buffer> {
    
        private int countPause;
    
        private final ReadStream<Buffer> delegate;
    
        private ReadStreamProxy(final ReadStream<Buffer> aDelegate) {
            delegate = aDelegate;
        }
    
        @Override
        public ReadStream<Buffer> endHandler(final Handler<Void> endHandler) {
            delegate.endHandler(endHandler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
            delegate.exceptionHandler(handler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> handler(final Handler<Buffer> handler) {
            delegate.handler(handler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> pause() {
            countPause++;
            delegate.pause();
            System.out.println("pause: " + countPause);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> resume() {
            countPause--;
            delegate.resume();
            System.out.println("resume: " + countPause);
            return this;
        }
    
    }
    
    private static final class TestVerticle extends AbstractVerticle {
    
        private HttpServer server;
    
        @Override
        public void start(final Future<Void> startFuture) throws Exception {
    
            final String data = new File(CancellationTest.class.getResource("data.txt").toURI()).getCanonicalPath();
            System.out.println("data " + data);
    
            server = vertx.createHttpServer();
            server.requestHandler(req -> {
                System.out.println("request");
    
                final HttpServerResponse resp = req.response();
                System.out.println(resp.closed());
                resp.exceptionHandler(th -> {
                    System.out.println("exception from response " + th);
                });
                resp.closeHandler(v -> {
                    System.out.println("response is closed");
                });
                resp.setChunked(true);
    
                vertx.setTimer(100, l -> {
                    System.out.println("starting to pipe ... " + resp.closed());
    
                    final OpenOptions opts = new OpenOptions();
                    opts.setWrite(false);
                    opts.setRead(true);
                    vertx.fileSystem().open(data.toString(), opts, fileRes -> {
                        final AsyncFile file = fileRes.result();
                        file.exceptionHandler(ex -> {
                            System.out.println("file exception " + ex);
                        });
                        file.endHandler(v -> {
                            System.out.println("file ended");
                        });
    
                        System.out.println("response closed: " + resp.closed());
    
                        pipe(file, resp);
                    });
                });
            });
    
            server.listen(8080, result -> {
                if (result.failed()) {
                    startFuture.fail(result.cause());
                } else {
                    startFuture.complete();
                }
            });
        }
    }
    
    private static final class WriteStreamProxy implements WriteStream<Buffer> {
    
        private final WriteStream<Buffer> delegate;
    
        private WriteStreamProxy(final WriteStream<Buffer> aDelegate) {
            delegate = aDelegate;
        }
    
        @Override
        public WriteStream<Buffer> drainHandler(final Handler<Void> handler) {
            delegate.drainHandler(handler);
            return this;
        }
    
        @Override
        public void end() {
            delegate.end();
        }
    
        @Override
        public WriteStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
            delegate.exceptionHandler(handler);
            return this;
        }
    
        @Override
        public WriteStream<Buffer> setWriteQueueMaxSize(final int maxSize) {
            delegate.setWriteQueueMaxSize(maxSize);
            return this;
        }
    
        @Override
        public WriteStream<Buffer> write(final Buffer data) {
            delegate.write(data);
            return this;
        }
    
        @Override
        public boolean writeQueueFull() {
            final boolean result = delegate.writeQueueFull();
            System.out.println("writeQueueFull " + result);
            return result;
        }
    
    }
    
    public static void main(final String[] args) throws Exception {
    
        System.out.println(System.getProperties());
    
    
        final CompletableFuture<Void> sync = new CompletableFuture<Void>();
    
        final Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new TestVerticle(), result -> {
            try {
                final URL url = new URL("http://localhost:8080/");
                final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.connect();
                final InputStream is = conn.getInputStream();
                is.close();
                conn.disconnect();
                System.out.println("closed ...");
                sync.complete(null);
            } catch (final Throwable th) {
                sync.completeExceptionally(th);
            }
        });
    
        sync.get();
        vertx.close();
    }
    
    private static final void pipe(final ReadStream<Buffer> aRead, final WriteStream<Buffer> aWrite) {
    
        aWrite.exceptionHandler(ex -> {
            new Exception().printStackTrace();
            System.out.println("write stream exception " + ex);
        });
    
    
        Pump.pump(new ReadStreamProxy(aRead), new WriteStreamProxy(aWrite)).start();
    }
    

    } ```

    链接地址: http://www.djcxy.com/p/96726.html

    上一篇: C ++随机浮点数生成

    下一篇: 在关闭客户端连接的情况下,Vert.x:正确地流到http响应