diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index 53d6c7fc3cd0..94eeb137f2b6 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -30,6 +30,7 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.codec.Hints; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpRequest; @@ -66,6 +67,8 @@ class DefaultClientResponse implements ClientResponse { private final Supplier requestSupplier; + private final BodyExtractor.Context bodyExtractorContext; + public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies, String logPrefix, String requestDescription, Supplier requestSupplier) { @@ -76,6 +79,22 @@ public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies str this.logPrefix = logPrefix; this.requestDescription = requestDescription; this.requestSupplier = requestSupplier; + this.bodyExtractorContext = new BodyExtractor.Context() { + @Override + public List> messageReaders() { + return strategies.messageReaders(); + } + + @Override + public Optional serverResponse() { + return Optional.empty(); + } + + @Override + public Map hints() { + return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix); + } + }; } @@ -107,22 +126,7 @@ public MultiValueMap cookies() { @SuppressWarnings("unchecked") @Override public T body(BodyExtractor extractor) { - T result = extractor.extract(this.response, new BodyExtractor.Context() { - @Override - public List> messageReaders() { - return strategies.messageReaders(); - } - - @Override - public Optional serverResponse() { - return Optional.empty(); - } - - @Override - public Map hints() { - return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix); - } - }); + T result = extractor.extract(this.response, this.bodyExtractorContext); String description = "Body from " + this.requestDescription + " [DefaultClientResponse]"; if (result instanceof Mono) { return (T) ((Mono) result).checkpoint(description); @@ -146,8 +150,10 @@ public Mono bodyToMono(ParameterizedTypeReference elementTypeRef) { } @Override + @SuppressWarnings("unchecked") public Flux bodyToFlux(Class elementClass) { - return body(BodyExtractors.toFlux(elementClass)); + return elementClass.equals(DataBuffer.class) ? + (Flux) body(BodyExtractors.toDataBuffers()) : body(BodyExtractors.toFlux(elementClass)); } @Override diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java index ea94cf843c56..c98f151fac26 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultServerRequest.java @@ -36,6 +36,7 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.Hints; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpRange; @@ -195,8 +196,10 @@ public Mono bodyToMono(ParameterizedTypeReference typeReference) { } @Override + @SuppressWarnings("unchecked") public Flux bodyToFlux(Class elementClass) { - Flux flux = body(BodyExtractors.toFlux(elementClass)); + Flux flux = (elementClass.equals(DataBuffer.class) ? + (Flux) request().getBody() : body(BodyExtractors.toFlux(elementClass))); return flux.onErrorMap(UnsupportedMediaTypeException.class, ERROR_MAPPER) .onErrorMap(DecodingException.class, DECODING_MAPPER); }