Skip to content

Commit

Permalink
Ensure large payloads don't affect the invoking thread of a Resource …
Browse files Browse the repository at this point in the history
…Method

Fixes: quarkusio#31606
  • Loading branch information
geoand committed Mar 6, 2023
1 parent 59e47e7 commit bbd5388
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void resume() {
resume((Executor) null);
}

public void resumeOnEventLoop() {
lastExecutor = null;
resume();
}

public synchronized void resume(Throwable throwable) {
resume(throwable, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.jboss.resteasy.reactive.server.handlers.ResponseHandler;
import org.jboss.resteasy.reactive.server.handlers.ResponseWriterHandler;
import org.jboss.resteasy.reactive.server.handlers.SseResponseWriterHandler;
import org.jboss.resteasy.reactive.server.handlers.SwitchToEventLoopHandler;
import org.jboss.resteasy.reactive.server.handlers.VariableProducesHandler;
import org.jboss.resteasy.reactive.server.mapping.RuntimeResource;
import org.jboss.resteasy.reactive.server.mapping.URITemplate;
Expand Down Expand Up @@ -286,6 +287,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
// form params can be everywhere (field, beanparam, param)
boolean checkWithFormReadRequestFilters = false;
boolean inputHandlerEngaged = false;
if (method.isFormParamRequired() || hasWithFormReadRequestFilters) {
// read the body as multipart in one go
handlers.add(new FormBodyHandler(bodyParameter != null, executorSupplier, method.getFileFormNames()));
Expand All @@ -296,6 +298,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
// allow the body to be read by chunks
handlers.add(new InputHandler(resteasyReactiveConfig.getInputBufferSize(), executorSupplier));
checkWithFormReadRequestFilters = true;
inputHandlerEngaged = true;
}
}
}
Expand Down Expand Up @@ -324,6 +327,10 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
}
handlers.add(new RequestDeserializeHandler(typeClass, genericType, consumesMediaTypes, serialisers,
bodyParameterIndex));
if (inputHandlerEngaged) {
handlers.add(new SwitchToEventLoopHandler(executorSupplier));
}

}

// given that we may inject form params in the endpoint we need to make sure we read the body before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class InputHandler implements ServerRestHandler {

final long maxBufferSize;
private volatile Executor executor;
private final Supplier<Executor> supplier;
private volatile Executor workerExecutor;
private final Supplier<Executor> workerExecutorSupplier;
private final ClassLoader originalTCCL;

public InputHandler(long maxBufferSize, Supplier<Executor> supplier) {
public InputHandler(long maxBufferSize, Supplier<Executor> workerExecutorSupplier) {
this.maxBufferSize = maxBufferSize;
this.supplier = supplier;
this.workerExecutorSupplier = workerExecutorSupplier;
// capture the proper TCCL in order to avoid losing it to Vert.x in dev-mode
this.originalTCCL = Thread.currentThread().getContextClassLoader();

Expand Down Expand Up @@ -93,8 +93,8 @@ public void data(ByteBuffer event) {
data.add(event);
if (dataCount > maxBufferSize) {
context.serverRequest().pauseRequestInput();
if (executor == null) {
executor = supplier.get();
if (workerExecutor == null) {
workerExecutor = workerExecutorSupplier.get();
}
//super inefficient
//TODO: write a stream that just uses the existing vert.x buffers
Expand All @@ -107,7 +107,7 @@ public void data(ByteBuffer event) {
}
//todo timeout
context.setInputStream(context.serverRequest().createInputStream(ByteBuffer.wrap(ar)));
context.resume(executor);
context.resume(workerExecutor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.jboss.resteasy.reactive.server.handlers;

import java.util.concurrent.Executor;
import java.util.function.Supplier;

import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;

/**
* A special handler whose only purpose is to switch execution from a worker thread to the event loop
* This can be needed when for example a large payload is read but the JAX-RS Resource method
* is intended to be run on the event loop.
*/
public class SwitchToEventLoopHandler implements ServerRestHandler {

private final Supplier<Executor> workerExecutorSupplier;

public SwitchToEventLoopHandler(Supplier<Executor> workerExecutorSupplier) {
this.workerExecutorSupplier = workerExecutorSupplier;
}

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
if (BlockingOperationSupport.isBlockingAllowed()) { // we are on a worker thread
requestContext.suspend();
workerExecutorSupplier.get().execute(new Runnable() {
@Override
public void run() {
requestContext.resumeOnEventLoop();
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jboss.resteasy.reactive.server.vertx.test;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.Base64;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport;
import org.jboss.resteasy.reactive.server.vertx.test.framework.ResteasyReactiveUnitTest;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.smallrye.mutiny.Uni;

public class BodyPayloadBlockingAllowedTest {

@RegisterExtension
static ResteasyReactiveUnitTest test = new ResteasyReactiveUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestResource.class));

@Test
void testSmallRequestForNonBlocking() {
doTest(5_000, "non-blocking", false);
}

@Test
void testLargeRequestForNonBlocking() {
doTest(5_000_000, "non-blocking", false);
}

@Test
void testSmallRequestForBlocking() {
doTest(5_000, "blocking", true);
}

@Test
void testLargeRequestForBlocking() {
doTest(5_000_000, "blocking", true);
}

private static void doTest(int size, String path, boolean blockingAllowed) {
given() //
.body(String.format("{\"data\":\"%s\"}", getBase64String(size)))
.header("Content-Type", MediaType.TEXT_PLAIN)
.when().post("/test/" + path)
.then()
.statusCode(200)
.body(equalTo("" + blockingAllowed));
}

private static String getBase64String(int size) {
return Base64.getEncoder().encodeToString(new byte[size]);
}

@Path("test")
public static class TestResource {

@Path("non-blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Uni<Boolean> nonBlocking(String request) {
return Uni.createFrom().item(BlockingOperationSupport::isBlockingAllowed);
}

@Path("blocking")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Boolean blocking(String request) {
return BlockingOperationSupport.isBlockingAllowed();
}

}
}

0 comments on commit bbd5388

Please sign in to comment.