Skip to content

Commit

Permalink
Ensure large payloads don't affect the invoking thread of a Resource …
Browse files Browse the repository at this point in the history
…Method

Fixes: quarkusio#31606
  • Loading branch information
geoand committed Mar 7, 2023
1 parent ccf3df0 commit be37355
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ public List<String> strings(List<String> strings) {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,8 @@ public Person getPerson(Person person) {
@Produces(MediaType.APPLICATION_XML)
@Consumes(MediaType.APPLICATION_XML)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public List<String> strings(List<String> strings) {
@Consumes(MediaType.APPLICATION_JSON)
public Person personTest(Person person) {
//large requests should get bumped from the IO thread
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched back to event loop");
}
return person;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
// form params can be everywhere (field, beanparam, param)
boolean checkWithFormReadRequestFilters = false;
boolean inputHandlerEngaged = false;
if (method.isFormParamRequired() || hasWithFormReadRequestFilters) {
// read the body as multipart in one go
handlers.add(new FormBodyHandler(bodyParameter != null, executorSupplier, method.getFileFormNames()));
Expand All @@ -296,6 +297,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
// allow the body to be read by chunks
handlers.add(new InputHandler(resteasyReactiveConfig.getInputBufferSize(), executorSupplier));
checkWithFormReadRequestFilters = true;
inputHandlerEngaged = true;
}
}
}
Expand Down Expand Up @@ -324,6 +326,10 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
handlers.add(new RequestDeserializeHandler(typeClass, genericType, consumesMediaTypes, serialisers,
bodyParameterIndex));
if (inputHandlerEngaged) {
handlers.add(new NonBlockingHandler());
}

}

// given that we may inject form params in the endpoint we need to make sure we read the body before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class InputHandler implements ServerRestHandler {

final long maxBufferSize;
private volatile Executor executor;
private final Supplier<Executor> supplier;
private volatile Executor workerExecutor;
private final Supplier<Executor> workerExecutorSupplier;
private final ClassLoader originalTCCL;

public InputHandler(long maxBufferSize, Supplier<Executor> supplier) {
public InputHandler(long maxBufferSize, Supplier<Executor> workerExecutorSupplier) {
this.maxBufferSize = maxBufferSize;
this.supplier = supplier;
this.workerExecutorSupplier = workerExecutorSupplier;
// capture the proper TCCL in order to avoid losing it to Vert.x in dev-mode
this.originalTCCL = Thread.currentThread().getContextClassLoader();

Expand Down Expand Up @@ -93,8 +93,8 @@ public void data(ByteBuffer event) {
data.add(event);
if (dataCount > maxBufferSize) {
context.serverRequest().pauseRequestInput();
if (executor == null) {
executor = supplier.get();
if (workerExecutor == null) {
workerExecutor = workerExecutorSupplier.get();
}
//super inefficient
//TODO: write a stream that just uses the existing vert.x buffers
Expand All @@ -107,7 +107,7 @@ public void data(ByteBuffer event) {
}
//todo timeout
context.setInputStream(context.serverRequest().createInputStream(ByteBuffer.wrap(ar)));
context.resume(executor);
context.resume(workerExecutor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jboss.resteasy.reactive.server.vertx.test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.Base64;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport;
import org.jboss.resteasy.reactive.server.vertx.test.framework.ResteasyReactiveUnitTest;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.smallrye.mutiny.Uni;

public class BodyPayloadBlockingAllowedTest {

@RegisterExtension
static ResteasyReactiveUnitTest test = new ResteasyReactiveUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestResource.class));

@Test
void testSmallRequestForNonBlocking() {
doTest(5_000, "non-blocking", false);
}

@Test
void testLargeRequestForNonBlocking() {
doTest(5_000_000, "non-blocking", false);
}

@Test
void testSmallRequestForBlocking() {
doTest(5_000, "blocking", true);
}

@Test
void testLargeRequestForBlocking() {
doTest(5_000_000, "blocking", true);
}

private static void doTest(int size, String path, boolean blockingAllowed) {
given() //
.body(String.format("{\"data\":\"%s\"}", getBase64String(size)))
.header("Content-Type", MediaType.TEXT_PLAIN)
.when().post("/test/" + path)
.then()
.statusCode(200)
.body(equalTo("" + blockingAllowed));
}

private static String getBase64String(int size) {
return Base64.getEncoder().encodeToString(new byte[size]);
}

@Path("test")
public static class TestResource {

@Path("non-blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Uni<Boolean> nonBlocking(String request) {
return Uni.createFrom().item(BlockingOperationSupport::isBlockingAllowed);
}

@Path("blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Boolean blocking(String request) {
return BlockingOperationSupport.isBlockingAllowed();
}

}
}

0 comments on commit be37355

Please sign in to comment.