Java executors: how to be notified, without blocking, when a task completes?

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  • Take a task from the queue
  • Submit it to the executor
  • Call .get on the returned Future and block until a result is available
  • Take another task from the queue...
  • However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

    What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

    How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

    (the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)


    Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

    You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService . Or, see below for a mechanism built into Java 8.

    class CallbackTask implements Runnable {
    
      private final Runnable task;
    
      private final Callback callback;
    
      CallbackTask(Runnable task, Callback callback) {
        this.task = task;
        this.callback = callback;
      }
    
      public void run() {
        task.run();
        callback.complete();
      }
    
    }
    

    With CompletableFuture , Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class GetTaskNotificationWithoutBlocking {
    
      public static void main(String... argv) throws Exception {
        ExampleService svc = new ExampleService();
        GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
        CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
        f.thenAccept(listener::notify);
        System.out.println("Exiting main()");
      }
    
      void notify(String msg) {
        System.out.println("Received message: " + msg);
      }
    
    }
    
    class ExampleService {
    
      String work() {
        sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
        char[] str = new char[5];
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int idx = 0; idx < str.length; ++idx)
          str[idx] = (char) ('A' + current.nextInt(26));
        String msg = new String(str);
        System.out.println("Generated message: " + msg);
        return msg;
      }
    
      public static void sleep(long average, TimeUnit unit) {
        String name = Thread.currentThread().getName();
        long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
        System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
        try {
          unit.sleep(timeout);
          System.out.println(name + " awoke.");
        } catch (InterruptedException abort) {
          Thread.currentThread().interrupt();
          System.out.println(name + " interrupted.");
        }
      }
    
      public static long exponential(long avg) {
        return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
      }
    
    }
    

    Use Guava's listenable future API and add a callback. Cf. from the website :

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    });
    

    In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

        CompletableFuture.supplyAsync(
                userService::listUsers
        ).thenApply(
                this::mapUsersToUserViews
        ).thenAccept(
                this::updateView
        ).exceptionally(
                throwable -> { showErrorDialogFor(throwable); return null; }
        );
    

    It executes asynchronously. I'm using two private methods: mapUsersToUserViews and updateView .

    链接地址: http://www.djcxy.com/p/50180.html

    上一篇: 创建和连接线程时的副作用的可见性

    下一篇: Java执行者:如何在不阻塞的情况下通知,何时完成任务?