Java – Non-blocking Java Asynchronous Processing – How do I limit memory usage?

Non-blocking Java Asynchronous Processing – How do I limit memory usage?… here is a solution to the problem.

Non-blocking Java Asynchronous Processing – How do I limit memory usage?

I was back in Java a few years later and was happy to see the new java.net.http.HttpClient Non-blocking asynchronous support was introduced in . In AWS Java SDK 2.0. I heard about the concept of reactive programming at sessions many years ago. But there aren’t many opportunities to put these ideas into practice.

I

have a question that seems to fit well with this programming style: basically I want to download a bunch of files (say 10,000) over HTTP and write them back to S3.

I’ve used failsafe to implement retries for non-blocking asynchronous http GET, and combining these with uploads via an S3 async client is simple (see sketch below).

However, I’m not sure how to properly limit the memory usage of the program: if the file downloads faster than the writeback speed, there is no mechanism to apply backpressure and prevent out-of-memory exceptions S3

I’m familiar with some traditional blocking solutions for this issue – such as using semaphores to limit the number of concurrent downloads, or writing downloads out to some bounded blocking queue from which the S3 upload thread will pull. However, if I’m going to apply backpressure with this blocking mechanism, it would first make me question the advantages of using non-blocking IO.

Is there a more idiomatic “reactive” way to achieve the same goal?

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class BackupClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
    private final HttpClient httpClient = HttpClient.newBuilder().build();
    private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();

public runBackup(List<URI> filesToBackup) {
        List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
                .map(backupClient::submitBackup)
                .collect(Collectors.toList());

futures.forEach(CompletableFuture::join);
    }

private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
        return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
                .thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
                        .bucket("my-bucket")
                        .key(uri.toASCIIString())
                        .build(), AsyncRequestBody.fromString(httpResponse.body())));
    }

private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
        final HttpRequest request = HttpRequest.newBuilder()
                .uri(uri)
                .timeout(Duration.ofMinutes(2))
                . GET()
                .build();

final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
                .withMaxRetries(4)
                .withDelay(Duration.ofSeconds(1))
                .handleResultIf(response -> 200 != response.statusCode());

return Failsafe.with(retryPolicy)
                .getStageAsync(context -> {
                    if (context.getAttemptCount() > 0) {
                        LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
                    }
                    return this.httpClient.sendAsync(request, handler);
                });
    }
}

Solution

Since you need to control resource (memory) consumption, Semaphore is the right tool to achieve this. When you want to use non-blocking computation, all you need is asynchronous semaphores. Popular libraries (rxjava, React streams) use asynchronous semaphores internally to construct React streams, but do not provide them as separate classes. When a subscriber to a React stream invokes Flow.Subscription.request(n) , equivalent to Semaphore.release(n). Semaphore.acquire(), however, is hidden. It is called internally by the publisher.

The disadvantage of this design scheme is that resource feedback can only be established between producers and recent consumers. If there is a chain of producers and consumers, then the resource consumption of each link must be controlled separately, and the overall resource consumption will become N times larger, where N is the number of links.

If you can

afford it, then you can use an implementation of RxJava or any other React streaming library. If not, then you must use the only asynchronous library that allows users full access to asynchronous Semaphore implementation : DF4J (Yes, I am the author). It does not contain a direct solution to your problem, however
For an example where an asynchronous web server limits the number of simultaneous connections by asynchronous semaphores, see ConnectionManager.java .

Related Problems and Solutions