CompletableFuture: Waiting for first one normally return?

I have some CompletableFutures and I want to run them in parallel, waiting for the first that returns normally.

I know I can use CompletableFuture.anyOf to wait for the first to return, but this will return normally or exceptionally. I want to ignore exceptions.

List<CompletableFuture<?>> futures = names.stream().map(
  (String name) ->
    CompletableFuture.supplyAsync(
      () ->
        // this calling may throw exceptions.
        new Task(name).run()
    )
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
    logger.info(any.get().toString());
} catch (Exception e) {
    e.printStackTrace();
}

Answers


You may use the following helper method:

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    l.forEach(s -> s.thenAccept(complete));
    return f;
}

which you can use like this, to demonstrate that it will ignore earlier exceptions but return the first provided value:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
        () -> { throw new RuntimeException("failing immediately"); }
    ),
    CompletableFuture.supplyAsync(
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
          return "with 5s delay";
        }),
    CompletableFuture.supplyAsync(
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
          return "with 10s delay";
        })
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());

One disadvantage of this solution is that it will never complete if all futures complete exceptionally. A solution, that will provide the first value if there is a successful computation but fail exceptionally if there is no successful computation at all, is a bit more involved:

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    CompletableFuture.allOf(
        l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
    return f;
}

It utilizes the fact that allOf’s exceptionally handler is only invoked after all futures have completed (exceptionally or not) and that a future can be completed only once (letting special things like obtrude… aside). When the exceptionally handler is executed, any attempt to complete the future with a result has been done, if there was one, so the attempt to complete it exceptionally only succeeds, if there was no previous successful completion.

It can be used exactly the same way as the first solution and only exhibit different behavior if all computations fail, e.g.:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
        () -> { throw new RuntimeException("failing immediately"); }
    ),
    CompletableFuture.supplyAsync(
        // delayed to demonstrate that the solution will wait for all completions
        // to ensure it doesn't miss a possible successful computation
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            throw new RuntimeException("failing later"); }
    )
);
CompletableFuture<String> c = anyOf(futures);
try { logger.info(c.join()); }
catch(CompletionException ex) { logger.severe(ex.toString()); }

The example above uses a delay demonstrating that the solution will wait for all completions when there is no success, whereas this example on ideone will demonstrate how a later success will turn the result into success. Note that due to Ideones caching of results you might not notice the delay.

Note that in the case that all futures fail, there is no guaranty about which of the exceptions will get reported. Since it waits for all completions in the erroneous case, any could make it to the final result.


Considering that:

  1. One of the foundations of the philosophy of Java is to prevent or discourage bad programming practices.

    (To what degree it has been successful in doing so is the subject of another debate; the point still stands that this has undeniably been one of the primary aims of the language.)

  2. Ignoring exceptions is a very bad practice.

    An exception should always be either rethrown to the layer above, or handled, or at the very least reported. Specifically, an exception should never be silently swallowed.

  3. Errors should be reported at the earliest time possible.

    for example, see the pains the runtime goes through in order to provide fail fast iterators which throw a ConcurrentModificationException if the collection is modified while iterating.

  4. Ignoring an exceptionally completed CompletableFuture means that a) you are not reporting an error at the earliest time possible, and b) you are likely planning to not report it at all.

  5. The inability to simply wait for the first non-exceptional completion and instead having to be bothered by exceptional completions does not impose any significant burden, because you can always remove the exceptionally completed item from the list, (while at the same time not forgetting to report the failure, right?) and repeat the wait.

I would therefore not be surprised if the sought-for feature is intentionally missing from Java, and I would be willing to argue that it is rightfully missing.

(Sorry Sotirios, no canonical answer.)


Well, that is a method what should be supported by the framework. First, I thought CompletionStage.applyToEither does something similar, but it turns out it doesnt. So I came up with this solution:

public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
  final int count = stages.size();
  if (count <= 0) {
    throw new IllegalArgumentException("stages must not be empty");
  }
  final AtomicInteger settled = new AtomicInteger();
  final CompletableFuture<U> future = new CompletableFuture<U>();
  BiConsumer<U, Throwable> consumer = (val, exc) -> {
    if (exc == null) {
      future.complete(val);
    } else {
      if (settled.incrementAndGet() >= count) {
        // Complete with the last exception. You can aggregate all the exceptions if you wish.
        future.completeExceptionally(exc);
      }
    }
  };
  for (CompletionStage<U> item : stages) {
    item.whenComplete(consumer);
  }
  return future;
}

To see it in action, here is some usage:

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

public class Main {
  public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
    final int count = stages.size();
    if (count <= 0) {
      throw new IllegalArgumentException("stages must not be empty");
    }
    final AtomicInteger settled = new AtomicInteger();
    final CompletableFuture<U> future = new CompletableFuture<U>();
    BiConsumer<U, Throwable> consumer = (val, exc) -> {
      if (exc == null) {
        future.complete(val);
      } else {
        if (settled.incrementAndGet() >= count) {
          // Complete with the last exception. You can aggregate all the exceptions if you wish.
          future.completeExceptionally(exc);
        }
      }
    };
    for (CompletionStage<U> item : stages) {
      item.whenComplete(consumer);
    }
    return future;
  }

  private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

  public static <U> CompletionStage<U> delayed(final U value, long delay) {
    CompletableFuture<U> future = new CompletableFuture<U>();
    worker.schedule(() -> {
      future.complete(value);
    }, delay, TimeUnit.MILLISECONDS);
    return future;
  }
  public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
    CompletableFuture<U> future = new CompletableFuture<U>();
    worker.schedule(() -> {
      future.completeExceptionally(value);
    }, delay, TimeUnit.MILLISECONDS);
    return future;
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    System.out.println("Started...");

    /*
    // Looks like applyToEither doesn't work as expected
    CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
    CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
    System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
    */

    try {
      List<CompletionStage<Integer>> futures = new ArrayList<>();
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
      futures.add(delayed(1, 1000));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
      futures.add(delayed(2, 500));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
      Integer value = firstCompleted(futures).toCompletableFuture().get();
      System.out.println("Completed normally: " + value);
    } catch (Exception ex) {
      System.out.println("Completed exceptionally");
      ex.printStackTrace();
    }

    try {
      List<CompletionStage<Integer>> futures = new ArrayList<>();
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
      Integer value = firstCompleted(futures).toCompletableFuture().get();
      System.out.println("Completed normally: " + value);
    } catch (Exception ex) {
      System.out.println("Completed exceptionally");
      ex.printStackTrace();
    }

    System.out.println("End...");
  }

}

Need Your Help

Should I "Compile .py Files to Byte Code after Installation"?

python python-3.x installation

I'm installing Python 3.2 32bit on Win7 machine, there is the following option:

Monitor multiple folders using FileSystemWatcher

c# .net filesystemwatcher

Whats the best way to monitor multiple folders (not subdirectories) using FileSystemWatcher in C#?