Spring Gateway AsyncPredicate does not work with react heaps and fluxes
We wrote a custom Predicate factory for Spring-Gateway to route requests. We are parsing the body of the XML request and then exporting the route based on the specific method present in the body. In doing this, we wrote the following code to create ServerRquest.
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return exchange -> {
Class<String> inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error("Predicate test failed because String.class does not match the cached body object", e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
}
};
}
Using older versions of Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)
will work perfectly. But with the latest versions of Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6),
I encountered the following exception. The gateway application starts normally without errors.
Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer ( reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/ :2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more
Has anyone else had the same problem and knows how to fix it?
Solution
The problem is that the greenwich version of those apis was beta . The object expected in CACHED_REQUEST_BODY_ATTR
must now be PooledDataBuffer
. So I’ve now changed my code accordingly. It now looks like this:
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
After updating the class, it now works as expected.