Fantastic CompletableFuture.allOf() and how to handle errors.

Kalpa Senanayake
4 min readFeb 23, 2018

--

Bramante Staircase at Vatican museum — Everyone who starts the journey up there meets at a single point at the bottom.

Introduction.

For the last week, I was working on performance improvement in microservice. This microservice has calculations (time-bounded data) based on data coming from an external legacy application which is a bit slow. Usually, it takes 800ms -1000ms to respond and it increases the overall response time > 5s. It is a violation of the SLA.

The design.

The idea of making these calculations parallel was in my head from the beginning of the design of this service. Hence I always made sure that the code is thread-safe. And the whole component was based on the chain of responsibility design pattern where this time-bounded data set (context) goes through series of handlers and each handler has well-defined operation to invoke on the context.

The solution.

I looked into new java8 CompletableFuture class looking for a way to collect all the asynchronous tasks which I submit and wait for all of them to complete before applying the rest of the logic. CompletableFuture.allOf() seems to be a good fit so I did a bit of POC around it and that worked well for my requirement.

The implementation.

Since the actual code is proprietary software, I created a simple sample to demonstrate the use of CompletableFuture.allOf() method to achieve the parallel computation of the calculation.

Let’s have look at a simple task which returns a greeting for the given language parameter. It is a synchronous task.

private String getGreet(String lang) {
if (lang.equals("EN")) {
return "Hello";
} else if (lang.equals("ES")) {
return "Hola";
} else if (lang.equals("SN")) {
return "Ayubovan";
} else {
throw new IllegalArgumentException("Invalid lang param");
}
}

And let’s make it asynchronous task by CompletableFuture.supplyAsync(). Here some may wonder why supplyAsync() there is runAsync(). For this particular use-case, I need the async task to return the result, runAsync() returns CompletableFuture<Void> hence I used supplyAync().

One other good practice is providing a separate thread pool for our asynchronous tasks to run. Do you wanna know why it is good practice, please go and read http://www.nurkiewicz.com/2015/11/which-thread-executes.html.

private CompletableFuture<GreetHolder> getGreeting(String lang) {
return CompletableFuture.supplyAsync( () -> {
try {
log.info("Task execution started.");
Thread.sleep(2000);
log.info("Task execution stopped.");
} catch (InterruptedException e) {
e.printStackTrace();
}
return new GreetHolder(getGreet(lang));
}, executor);
}

The above code simulates the heavy computation with a Thread.sleep(2000) and returns a CompletableFuture of type GreetHolder. GreetHolder is just DTO as follows.

private class GreetHolder {

private String greet;

public GreetHolder(String greet) {
this.greet = greet;
}

public String getGreet() {
return greet;
}

public void setGreet(String greet) {
this.greet = greet;
}
}

Here comes the interesting part. Let’s consume it line by line.

List<String>  langList = Arrays.asList("EN", "ES", "SN", "EX");

List<CompletableFuture<GreetHolder>> completableFutures =
langList.stream().map(lang -> getGreeting(lang)).collect(Collectors.toList());

Using the List stream I can process getGreeting() tasks asynchronously and get List<CompletableFuture<GreetHolder>>.

Then I used the CompletableFuture.allOf to wait until all getGreeting() tasks get completed. The java document of allOf() says.

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

CompletableFuture<Void> allFutures = CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));

But I do not need a CompletableFuture<Void>, I need a CompletableFuture<List<GreetHolder>> where I have our GreetHolder objects. Hence I use CompletableFuture.join() to create CompletableFuture<List<GreetHolder>>.

CompletableFuture<List<GreetHolder>> allCompletableFuture = allFutures.thenApply(future -> {
return completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList());
});

Now I can get our final CompletableFuture which holds our results.

CompletableFuture completableFuture = allCompletableFuture.thenApply(greets -> {
return greets.stream() .map(GreetHolder::getGreet).collect(Collectors.toList());
});

This resulted CompletableFuture holds the list of greet strings. And now we are ready to call the blocking completableFuture.get() call.

completableFuture.get()

So far so good. But that is what happens in a summer day while everything works in my favour. But the enterprise service arena is no such a beautiful place. Things go wrong quite often. So let’s talk about the exception handling.

Exception Handling

Exceptional scenarios can be handled in two paradigms here.

  1. Asynchronous exception handling.

CompletableFuture comes with a handy tool call exceptionally() I can use that to handle the exceptions happen inside the asynchronous code block. The same code above can be improved like below.

private CompletableFuture<GreetHolder> getGreeting(String lang) {
return CompletableFuture.supplyAsync( () -> {
try {
log.info("Task execution started.");
Thread.sleep(2000);
log.info("Task execution stopped.");
} catch (InterruptedException e) {
e.printStackTrace();
}
return new GreetHolder(getGreet(lang));
}, executor).exceptionally( ex -> {
log.error("Something went wrong : ", ex);
return null;
});
}

And the last step of the post processing should be improved to filter out the null results as follows.

CompletableFuture completableFuture = allCompletableFuture.thenApply(greets -> {
return greets.stream()
.filter(Objects::nonNull) .map(GreetHolder::getGreet).collect(Collectors.toList());
}
);

Handling exception in this way can be used to generate partial results to the consumer. For an example, if I use the following list for the above code the last parameter will cause an exception and the getGreeting() method will log the exception but return null for that particular asynchronous task.

List<String>  langList = Arrays.asList("EN", "ES", "SN", "EX");

The output will look like below.

2. Synchronous exception handling.

And if I do not like partial results or asynchronous type exception handling the CompletableFuture.allOf() provides the handy mechanism to deal with that. The java document says.

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause.

So all we have to do is remove the exceptionally()method from an async method and let the allOf() to do its job to bubble up to the exception as its result.

This time the out put will not have the results.

Conclusion.

Here in this article, I have demonstrated how to use CompletableFuture.allOf() to achieve parallel computation of the asynchronous tasks. It includes the mechanism to handle exceptions according to consumer requirements as well.

--

--

Kalpa Senanayake

Solutions Architect | Senior Engineer | Cloud | API | System Design