Skip to content

Commit

Permalink
Fix flaky org.opensearch.rest.ReactorNetty4StreamingStressIT.testClos…
Browse files Browse the repository at this point in the history
…eClientStreamingRequest test case

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Sep 9, 2024
1 parent 5642ce7 commit 5bb9793
Showing 1 changed file with 13 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.StepVerifier;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.collection.IsEmptyCollection.empty;

public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
@After
Expand Down Expand Up @@ -65,31 +60,15 @@ public void testCloseClientStreamingRequest() throws Exception {
streamingRequest.addParameter("refresh", "true");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
TestSubscriber<ByteBuffer> subscriber = TestSubscriber.create();
streamingResponse.getBody().subscribe(subscriber);

final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
// Await for subscriber to receive at least one chunk
assertBusy(() -> assertThat(subscriber.getReceivedOnNext(), not(empty())));

// Close client forceably
executor.schedule(() -> {
client().close();
return null;
}, 2, TimeUnit.SECONDS);

// Await for subscriber to terminate
subscriber.block(Duration.ofSeconds(10));
assertThat(
subscriber.expectTerminalError(),
anyOf(instanceOf(InterruptedIOException.class), instanceOf(ConnectionClosedException.class))
);
} finally {
executor.shutdown();
if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) {
executor.shutdownNow();
}
}
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.then(() -> {
try {
client().close();
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
})
.expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException);
}
}

0 comments on commit 5bb9793

Please sign in to comment.