Java – When designing an asynchronous Java API, how do you ensure that the entire CompletableFuture chain is executed by the internal thread pool?

When designing an asynchronous Java API, how do you ensure that the entire CompletableFuture chain is executed by the internal thread pool?… here is a solution to the problem.

When designing an asynchronous Java API, how do you ensure that the entire CompletableFuture chain is executed by the internal thread pool?

I’m writing a library that provides multiple asynchronous methods that return CompletableFutures. The library has an internal thread pool that performs computational work for asynchronous methods.

I want to make sure that the following two requirements are met:

    The returned CompletableFuture

  1. is not done by an internal thread, so the library’s internal thread pool never executes a CompletableFuture chain outside my library
  2. All computations for my async methods are performed by the internal thread pool, not by the user thread (i.e. the thread of the method caller).

Suppose the library has the following blocking method

Data request(Address address) {
  Message request = encode(address);
  Message response = sendAndReceive(request);
  Data responseData = decode(response);
  return responseData;
}

and the corresponding asynchronous method

  CompletableFuture<Data> requestAsync(Address address) {
    return CompletableFuture.supplyAsync(() -> encode(address), internalThreadPool)
        .thenCompose(request -> sendAndReceiveAsync(request))
        .thenApply(response -> decode(response));
  }

The first requirement is satisfied by adding the link .whenCompleteAsync((v,t) -> {}), such as .

But what does it take to meet the second requirement?

Sergey Kuksenko has discussed the solution for the second requirement here and has been implemented in the HttpClient implementation of Java 11.

The requirements are not met because there is no guarantee that the decode (response) will be executed by an internal thread. If the coding and sendAndReceiveAsync are done quickly, decode can actually be executed by the caller’s thread.

This problem can be solved by introducing CompletableFuture startCf to launch the CF chain.

So the complete solution might look like this

CompletableFuture<Data> requestAsyncFixedAll(Address address) {
  CompletableFuture<Void> startCf = new CompletableFuture<>();
  CompletableFuture<Data> dataCf =  startCf.thenApplyAsync(v -> encode(address), internalThreadPool)
    .thenCompose(request -> sendAndReceiveAsync(request))
    .thenApply(response -> decode(response)).whenCompleteAsync((v, t) -> {});
  startCf.complete(null);
  return dataCf;
}

Related Problems and Solutions