Skip to content

Commit

Permalink
Fixing NullPointerExceptions in Event Hubs live Tests (#9648)
Browse files Browse the repository at this point in the history
* Remove use of EventHubsAsyncClient in tests
  • Loading branch information
conniey committed Mar 30, 2020
1 parent eeb1cae commit 64892d8
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,12 @@ void onClientClose() {
}

logger.info("No more open clients, closing shared connection.");
eventHubConnectionProcessor.dispose();
eventHubConnectionProcessor = null;
if (eventHubConnectionProcessor != null) {
eventHubConnectionProcessor.dispose();
eventHubConnectionProcessor = null;
} else {
logger.warning("Shared EventHubConnectionProcessor was already disposed.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException {

final CountDownLatch countDownLatch = new CountDownLatch(batch.getCount());
final List<EventHubConsumerAsyncClient> consumers = new ArrayList<>();
final Instant now = Instant.now();
try {
// Creating consumers on all the partitions and subscribing to the receive event.
final List<String> partitionIds = producer.getPartitionIds().collectList().block(TIMEOUT);
Expand All @@ -135,7 +137,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException {
final EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient();

consumers.add(consumer);
consumer.receiveFromPartition(id, EventPosition.latest()).subscribe(partitionEvent -> {
consumer.receiveFromPartition(id, EventPosition.fromEnqueuedTime(now)).subscribe(partitionEvent -> {
EventData event = partitionEvent.getData();
if (event.getPartitionKey() == null || !PARTITION_KEY.equals(event.getPartitionKey())) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -25,7 +26,6 @@

import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT;
import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
Expand All @@ -47,14 +47,13 @@ protected void beforeTest(AmqpTransportType transportType) {
} else {
logger.info("Pushing... events to partition.");

final EventHubAsyncClient testClient = createBuilder()
try (EventHubProducerAsyncClient testClient = createBuilder()
.transportType(transportType)
.shareConnection()
.buildAsyncClient();

final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID);
testData = setupEventTestData(testClient.createProducer(), NUMBER_OF_EVENTS, options);
logger.warning("Pushed events to partition.");
.buildAsyncProducerClient()) {
final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID);
testData = setupEventTestData(testClient, NUMBER_OF_EVENTS, options);
logger.warning("Pushed events to partition.");
}
}
}

Expand All @@ -80,11 +79,10 @@ void receiveMessage(AmqpTransportType transportType) {
// Act & Assert
try {
StepVerifier.create(consumer.receiveFromPartition(PARTITION_ID, startingPosition)
.filter(x -> isMatchingEvent(x, testData.getMessageTrackingId()))
.take(NUMBER_OF_EVENTS))
.expectNextCount(NUMBER_OF_EVENTS)
.expectComplete()
.verify(Duration.ofMinutes(1));
.verify();
} finally {
consumer.close();
}
Expand All @@ -96,6 +94,7 @@ void receiveMessage(AmqpTransportType transportType) {
*/
@ParameterizedTest
@EnumSource(value = AmqpTransportType.class)
@Disabled("Works part of the time: https://github.com/Azure/azure-sdk-for-java/issues/9659")
void parallelEventHubClients(AmqpTransportType transportType) throws InterruptedException {
beforeTest(transportType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT;
Expand All @@ -52,7 +53,9 @@ public class EventHubConsumerAsyncClientIntegrationTest extends IntegrationTestB

private static final String MESSAGE_TRACKING_ID = UUID.randomUUID().toString();

private EventHubAsyncClient client;
private EventHubClientBuilder builder;
private EventHubProducerClient producer;
private List<String> partitionIds;

public EventHubConsumerAsyncClientIntegrationTest() {
super(new ClientLogger(EventHubConsumerAsyncClientIntegrationTest.class));
Expand All @@ -70,13 +73,18 @@ static void afterAll() {

@Override
protected void beforeTest() {
client = createBuilder()
.buildAsyncClient();
builder = createBuilder()
.shareConnection()
.consumerGroup(DEFAULT_CONSUMER_GROUP_NAME)
.prefetchCount(DEFAULT_PREFETCH_COUNT);

producer = builder.buildProducerClient();
partitionIds = producer.getPartitionIds().stream().collect(Collectors.toList());
}

@Override
protected void afterTest() {
dispose(client);
dispose(producer);
}

/**
Expand All @@ -86,20 +94,15 @@ protected void afterTest() {
public void parallelCreationOfReceivers() {
// Arrange
final int numberOfEvents = 10;
final List<String> partitionIds = client.getPartitionIds().collectList().block(TIMEOUT);
if (partitionIds == null || partitionIds.isEmpty()) {
Assertions.fail("Should have partitions");
}

final CountDownLatch countDownLatch = new CountDownLatch(partitionIds.size());
final EventHubConsumerAsyncClient[] consumers = new EventHubConsumerAsyncClient[partitionIds.size()];
final EventHubProducerAsyncClient[] producers = new EventHubProducerAsyncClient[partitionIds.size()];
final EventHubProducerClient[] producers = new EventHubProducerClient[partitionIds.size()];
final Disposable.Composite subscriptions = Disposables.composite();
try {
for (int i = 0; i < partitionIds.size(); i++) {
final String partitionId = partitionIds.get(i);
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME,
DEFAULT_PREFETCH_COUNT);
final EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient();
consumers[i] = consumer;

final Disposable subscription = consumer.receiveFromPartition(partitionId,
Expand All @@ -116,16 +119,17 @@ public void parallelCreationOfReceivers() {

subscriptions.add(subscription);

producers[i] = client.createProducer();
producers[i] = builder.buildProducerClient();
}

// Act
for (int i = 0; i < partitionIds.size(); i++) {
final String partitionId = partitionIds.get(i);
final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId);
final EventHubProducerAsyncClient producer = producers[i];
final EventHubProducerClient producer = producers[i];
final List<EventData> events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID);

producer.send(TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID), sendOptions).block(TIMEOUT);
producer.send(events, sendOptions);
}

// Assert
Expand All @@ -151,16 +155,15 @@ public void parallelCreationOfReceivers() {
public void lastEnqueuedInformationIsNotUpdated() {
// Arrange
final String firstPartition = "0";
final PartitionProperties properties = client.getPartitionProperties(firstPartition).block();
Assertions.assertNotNull(properties);
final PartitionProperties properties = producer.getPartitionProperties(firstPartition);

final EventPosition position = EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber());
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();
final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(false);

final AtomicBoolean isActive = new AtomicBoolean(true);
final int expectedNumber = 5;
final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final SendOptions sendOptions = new SendOptions().setPartitionId(firstPartition);
final Disposable producerEvents = getEvents(isActive)
.flatMap(event -> producer.send(event, sendOptions))
Expand Down Expand Up @@ -193,7 +196,7 @@ public void lastEnqueuedInformationIsUpdated() {
// Arrange
final String secondPartitionId = "1";
final AtomicBoolean isActive = new AtomicBoolean(true);
final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final Disposable producerEvents = getEvents(isActive)
.flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId)))
.subscribe(
Expand All @@ -203,12 +206,12 @@ public void lastEnqueuedInformationIsUpdated() {
() -> logger.info("Event sent."));

final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true);
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final AtomicReference<LastEnqueuedEventProperties> lastViewed = new AtomicReference<>(
new LastEnqueuedEventProperties(null, null, null, null));

// Act & Assert
try {
try (EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient()) {
final AtomicReference<LastEnqueuedEventProperties> lastViewed = new AtomicReference<>(
new LastEnqueuedEventProperties(null, null, null, null));

StepVerifier.create(consumer.receiveFromPartition(secondPartitionId, EventPosition.latest(), options).take(10))
.assertNext(event -> verifyLastRetrieved(lastViewed, event.getLastEnqueuedEventProperties(), true))
.expectNextCount(5)
Expand All @@ -220,7 +223,6 @@ public void lastEnqueuedInformationIsUpdated() {
} finally {
isActive.set(false);
producerEvents.dispose();
consumer.close();
}
}

Expand Down Expand Up @@ -264,12 +266,12 @@ public void sameOwnerLevelClosesFirstConsumer() throws InterruptedException {
final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now());
final ReceiveOptions options = new ReceiveOptions()
.setOwnerLevel(1L);
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();

final AtomicBoolean isActive = new AtomicBoolean(true);
final Disposable.Composite subscriptions = Disposables.composite();

final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
subscriptions.add(getEvents(isActive).flatMap(event -> producer.send(event)).subscribe(
sent -> logger.info("Event sent."),
error -> logger.error("Error sending event", error)));
Expand All @@ -293,7 +295,7 @@ public void sameOwnerLevelClosesFirstConsumer() throws InterruptedException {
Thread.sleep(2000);

logger.info("STARTED CONSUMING FROM PARTITION 1 with C3");
final EventHubConsumerAsyncClient consumer2 = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer2 = builder.prefetchCount(1).buildAsyncConsumerClient();
subscriptions.add(consumer2.receiveFromPartition(lastPartition, position, options)
.filter(event -> TestUtils.isMatchingEvent(event, MESSAGE_TRACKING_ID))
.subscribe(
Expand Down Expand Up @@ -387,7 +389,7 @@ public void canReceive() {
// Arrange
final String secondPartitionId = "1";
final AtomicBoolean isActive = new AtomicBoolean(true);
final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final Disposable producerEvents = getEvents(isActive)
.flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId)))
.subscribe(
Expand All @@ -398,7 +400,7 @@ public void canReceive() {

final ReceiveOptions options = new ReceiveOptions()
.setTrackLastEnqueuedEventProperties(true);
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();
final AtomicReference<LastEnqueuedEventProperties> lastViewed = new AtomicReference<>(
new LastEnqueuedEventProperties(null, null, null, null));

Expand All @@ -422,7 +424,7 @@ public void canReceive() {
@Test
public void receivesMultiplePartitions() {
// Arrange
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();

final AtomicBoolean isActive = new AtomicBoolean(true);
final AtomicInteger counter = new AtomicInteger();
Expand All @@ -436,7 +438,7 @@ public void receivesMultiplePartitions() {
Assumptions.assumeTrue(expectedPartitions.size() <= expectedNumber,
"Cannot run this test if there are more partitions than expected.");

final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final Disposable producerEvents = getEvents(isActive).flatMap(event -> {
final int partition = counter.getAndIncrement() % allPartitions.size();
event.getProperties().put(PARTITION_ID_HEADER, partition);
Expand Down Expand Up @@ -480,8 +482,8 @@ public void receivesMultiplePartitions() {
@Test
public void multipleReceiversSamePartition() throws InterruptedException {
// Arrange
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer2 = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();
final EventHubConsumerAsyncClient consumer2 = builder.buildAsyncConsumerClient();
final String partitionId = "1";
final PartitionProperties properties = consumer.getPartitionProperties(partitionId).block(TIMEOUT);
Assertions.assertNotNull(properties, "Should have been able to get partition properties.");
Expand All @@ -492,7 +494,7 @@ public void multipleReceiversSamePartition() throws InterruptedException {
final EventPosition position = EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber());

final AtomicBoolean isActive = new AtomicBoolean(true);
final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final Disposable producerEvents = getEvents(isActive).flatMap(event -> {
event.getProperties().put(PARTITION_ID_HEADER, partitionId);
return producer.send(event, new SendOptions().setPartitionId(partitionId));
Expand Down Expand Up @@ -543,11 +545,14 @@ void closesReceiver() throws InterruptedException {
// Arrange
final String partitionId = "1";
final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId);
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1);
final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient();
final int numberOfEvents = 5;
final AtomicBoolean isActive = new AtomicBoolean(true);
final EventHubProducerAsyncClient producer = client.createProducer();
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final PartitionProperties properties = producer.getPartitionProperties(partitionId).block(TIMEOUT);

Assertions.assertNotNull(properties);

final AtomicReference<EventPosition> startingPosition = new AtomicReference<>(
EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber()));
final Disposable producerEvents = getEvents(isActive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void receiveUntilTimeout() {
// Arrange
final int numberOfEvents = 15;
final String partitionId = "1";
final List<EventData> events = getEventsAsList(numberOfEvents);
final List<EventData> events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID);

final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now());
final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT);
Expand Down Expand Up @@ -165,8 +165,8 @@ public void doesNotContinueToReceiveEvents() {
final int receiveNumber = 10;
final String partitionId = "1";

final List<EventData> events = getEventsAsList(numberOfEvents);
final List<EventData> events2 = getEventsAsList(secondSetOfEvents);
final List<EventData> events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID);
final List<EventData> events2 = TestUtils.getEvents(secondSetOfEvents, MESSAGE_TRACKING_ID);

final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT);
final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId);
Expand Down Expand Up @@ -199,7 +199,7 @@ public void multipleConsumers() {
final int receiveNumber = 10;
final String partitionId = "1";

final List<EventData> events = getEventsAsList(numberOfEvents);
final List<EventData> events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID);

final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now());
final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT);
Expand Down Expand Up @@ -294,8 +294,4 @@ public void getPartitionProperties() {
dispose(consumer);
}
}

private static List<EventData> getEventsAsList(int numberOfEvents) {
return TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID).collectList().block();
}
}
Loading

0 comments on commit 64892d8

Please sign in to comment.