diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index e020455836af4..4313c4721a2e4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -530,8 +530,12 @@ void onClientClose() { } logger.info("No more open clients, closing shared connection."); - eventHubConnectionProcessor.dispose(); - eventHubConnectionProcessor = null; + if (eventHubConnectionProcessor != null) { + eventHubConnectionProcessor.dispose(); + eventHubConnectionProcessor = null; + } else { + logger.warning("Shared EventHubConnectionProcessor was already disposed."); + } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchIntegrationTest.java index 14c0e4d66f857..15516ee0ba40e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchIntegrationTest.java @@ -16,6 +16,7 @@ import reactor.test.StepVerifier; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -126,6 +127,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(batch.getCount()); final List consumers = new ArrayList<>(); + final Instant now = Instant.now(); try { // Creating consumers on all the partitions and subscribing to the receive event. final List partitionIds = producer.getPartitionIds().collectList().block(TIMEOUT); @@ -135,7 +137,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException { final EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient(); consumers.add(consumer); - consumer.receiveFromPartition(id, EventPosition.latest()).subscribe(partitionEvent -> { + consumer.receiveFromPartition(id, EventPosition.fromEnqueuedTime(now)).subscribe(partitionEvent -> { EventData event = partitionEvent.getData(); if (event.getPartitionKey() == null || !PARTITION_KEY.equals(event.getPartitionKey())) { return; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java index f431c81152fd1..c94ccfb0ebe0e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java @@ -8,6 +8,7 @@ import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.SendOptions; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -25,7 +26,6 @@ import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; -import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent; import static java.nio.charset.StandardCharsets.UTF_8; /** @@ -47,14 +47,13 @@ protected void beforeTest(AmqpTransportType transportType) { } else { logger.info("Pushing... events to partition."); - final EventHubAsyncClient testClient = createBuilder() + try (EventHubProducerAsyncClient testClient = createBuilder() .transportType(transportType) - .shareConnection() - .buildAsyncClient(); - - final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID); - testData = setupEventTestData(testClient.createProducer(), NUMBER_OF_EVENTS, options); - logger.warning("Pushed events to partition."); + .buildAsyncProducerClient()) { + final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID); + testData = setupEventTestData(testClient, NUMBER_OF_EVENTS, options); + logger.warning("Pushed events to partition."); + } } } @@ -80,11 +79,10 @@ void receiveMessage(AmqpTransportType transportType) { // Act & Assert try { StepVerifier.create(consumer.receiveFromPartition(PARTITION_ID, startingPosition) - .filter(x -> isMatchingEvent(x, testData.getMessageTrackingId())) .take(NUMBER_OF_EVENTS)) .expectNextCount(NUMBER_OF_EVENTS) .expectComplete() - .verify(Duration.ofMinutes(1)); + .verify(); } finally { consumer.close(); } @@ -96,6 +94,7 @@ void receiveMessage(AmqpTransportType transportType) { */ @ParameterizedTest @EnumSource(value = AmqpTransportType.class) + @Disabled("Works part of the time: https://github.com/Azure/azure-sdk-for-java/issues/9659") void parallelEventHubClients(AmqpTransportType transportType) throws InterruptedException { beforeTest(transportType); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java index 39c46e798aa91..b414c3cadcc41 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; @@ -52,7 +53,9 @@ public class EventHubConsumerAsyncClientIntegrationTest extends IntegrationTestB private static final String MESSAGE_TRACKING_ID = UUID.randomUUID().toString(); - private EventHubAsyncClient client; + private EventHubClientBuilder builder; + private EventHubProducerClient producer; + private List partitionIds; public EventHubConsumerAsyncClientIntegrationTest() { super(new ClientLogger(EventHubConsumerAsyncClientIntegrationTest.class)); @@ -70,13 +73,18 @@ static void afterAll() { @Override protected void beforeTest() { - client = createBuilder() - .buildAsyncClient(); + builder = createBuilder() + .shareConnection() + .consumerGroup(DEFAULT_CONSUMER_GROUP_NAME) + .prefetchCount(DEFAULT_PREFETCH_COUNT); + + producer = builder.buildProducerClient(); + partitionIds = producer.getPartitionIds().stream().collect(Collectors.toList()); } @Override protected void afterTest() { - dispose(client); + dispose(producer); } /** @@ -86,20 +94,15 @@ protected void afterTest() { public void parallelCreationOfReceivers() { // Arrange final int numberOfEvents = 10; - final List partitionIds = client.getPartitionIds().collectList().block(TIMEOUT); - if (partitionIds == null || partitionIds.isEmpty()) { - Assertions.fail("Should have partitions"); - } final CountDownLatch countDownLatch = new CountDownLatch(partitionIds.size()); final EventHubConsumerAsyncClient[] consumers = new EventHubConsumerAsyncClient[partitionIds.size()]; - final EventHubProducerAsyncClient[] producers = new EventHubProducerAsyncClient[partitionIds.size()]; + final EventHubProducerClient[] producers = new EventHubProducerClient[partitionIds.size()]; final Disposable.Composite subscriptions = Disposables.composite(); try { for (int i = 0; i < partitionIds.size(); i++) { final String partitionId = partitionIds.get(i); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, - DEFAULT_PREFETCH_COUNT); + final EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient(); consumers[i] = consumer; final Disposable subscription = consumer.receiveFromPartition(partitionId, @@ -116,16 +119,17 @@ public void parallelCreationOfReceivers() { subscriptions.add(subscription); - producers[i] = client.createProducer(); + producers[i] = builder.buildProducerClient(); } // Act for (int i = 0; i < partitionIds.size(); i++) { final String partitionId = partitionIds.get(i); final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId); - final EventHubProducerAsyncClient producer = producers[i]; + final EventHubProducerClient producer = producers[i]; + final List events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID); - producer.send(TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID), sendOptions).block(TIMEOUT); + producer.send(events, sendOptions); } // Assert @@ -151,16 +155,15 @@ public void parallelCreationOfReceivers() { public void lastEnqueuedInformationIsNotUpdated() { // Arrange final String firstPartition = "0"; - final PartitionProperties properties = client.getPartitionProperties(firstPartition).block(); - Assertions.assertNotNull(properties); + final PartitionProperties properties = producer.getPartitionProperties(firstPartition); final EventPosition position = EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber()); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(false); final AtomicBoolean isActive = new AtomicBoolean(true); final int expectedNumber = 5; - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final SendOptions sendOptions = new SendOptions().setPartitionId(firstPartition); final Disposable producerEvents = getEvents(isActive) .flatMap(event -> producer.send(event, sendOptions)) @@ -193,7 +196,7 @@ public void lastEnqueuedInformationIsUpdated() { // Arrange final String secondPartitionId = "1"; final AtomicBoolean isActive = new AtomicBoolean(true); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final Disposable producerEvents = getEvents(isActive) .flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId))) .subscribe( @@ -203,12 +206,12 @@ public void lastEnqueuedInformationIsUpdated() { () -> logger.info("Event sent.")); final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); - final AtomicReference lastViewed = new AtomicReference<>( - new LastEnqueuedEventProperties(null, null, null, null)); // Act & Assert - try { + try (EventHubConsumerAsyncClient consumer = builder.buildAsyncConsumerClient()) { + final AtomicReference lastViewed = new AtomicReference<>( + new LastEnqueuedEventProperties(null, null, null, null)); + StepVerifier.create(consumer.receiveFromPartition(secondPartitionId, EventPosition.latest(), options).take(10)) .assertNext(event -> verifyLastRetrieved(lastViewed, event.getLastEnqueuedEventProperties(), true)) .expectNextCount(5) @@ -220,7 +223,6 @@ public void lastEnqueuedInformationIsUpdated() { } finally { isActive.set(false); producerEvents.dispose(); - consumer.close(); } } @@ -264,12 +266,12 @@ public void sameOwnerLevelClosesFirstConsumer() throws InterruptedException { final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now()); final ReceiveOptions options = new ReceiveOptions() .setOwnerLevel(1L); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); final AtomicBoolean isActive = new AtomicBoolean(true); final Disposable.Composite subscriptions = Disposables.composite(); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); subscriptions.add(getEvents(isActive).flatMap(event -> producer.send(event)).subscribe( sent -> logger.info("Event sent."), error -> logger.error("Error sending event", error))); @@ -293,7 +295,7 @@ public void sameOwnerLevelClosesFirstConsumer() throws InterruptedException { Thread.sleep(2000); logger.info("STARTED CONSUMING FROM PARTITION 1 with C3"); - final EventHubConsumerAsyncClient consumer2 = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer2 = builder.prefetchCount(1).buildAsyncConsumerClient(); subscriptions.add(consumer2.receiveFromPartition(lastPartition, position, options) .filter(event -> TestUtils.isMatchingEvent(event, MESSAGE_TRACKING_ID)) .subscribe( @@ -387,7 +389,7 @@ public void canReceive() { // Arrange final String secondPartitionId = "1"; final AtomicBoolean isActive = new AtomicBoolean(true); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final Disposable producerEvents = getEvents(isActive) .flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId))) .subscribe( @@ -398,7 +400,7 @@ public void canReceive() { final ReceiveOptions options = new ReceiveOptions() .setTrackLastEnqueuedEventProperties(true); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); final AtomicReference lastViewed = new AtomicReference<>( new LastEnqueuedEventProperties(null, null, null, null)); @@ -422,7 +424,7 @@ public void canReceive() { @Test public void receivesMultiplePartitions() { // Arrange - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); final AtomicBoolean isActive = new AtomicBoolean(true); final AtomicInteger counter = new AtomicInteger(); @@ -436,7 +438,7 @@ public void receivesMultiplePartitions() { Assumptions.assumeTrue(expectedPartitions.size() <= expectedNumber, "Cannot run this test if there are more partitions than expected."); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final Disposable producerEvents = getEvents(isActive).flatMap(event -> { final int partition = counter.getAndIncrement() % allPartitions.size(); event.getProperties().put(PARTITION_ID_HEADER, partition); @@ -480,8 +482,8 @@ public void receivesMultiplePartitions() { @Test public void multipleReceiversSamePartition() throws InterruptedException { // Arrange - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); - final EventHubConsumerAsyncClient consumer2 = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); + final EventHubConsumerAsyncClient consumer2 = builder.buildAsyncConsumerClient(); final String partitionId = "1"; final PartitionProperties properties = consumer.getPartitionProperties(partitionId).block(TIMEOUT); Assertions.assertNotNull(properties, "Should have been able to get partition properties."); @@ -492,7 +494,7 @@ public void multipleReceiversSamePartition() throws InterruptedException { final EventPosition position = EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber()); final AtomicBoolean isActive = new AtomicBoolean(true); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final Disposable producerEvents = getEvents(isActive).flatMap(event -> { event.getProperties().put(PARTITION_ID_HEADER, partitionId); return producer.send(event, new SendOptions().setPartitionId(partitionId)); @@ -543,11 +545,14 @@ void closesReceiver() throws InterruptedException { // Arrange final String partitionId = "1"; final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, 1); + final EventHubConsumerAsyncClient consumer = builder.prefetchCount(1).buildAsyncConsumerClient(); final int numberOfEvents = 5; final AtomicBoolean isActive = new AtomicBoolean(true); - final EventHubProducerAsyncClient producer = client.createProducer(); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); final PartitionProperties properties = producer.getPartitionProperties(partitionId).block(TIMEOUT); + + Assertions.assertNotNull(properties); + final AtomicReference startingPosition = new AtomicReference<>( EventPosition.fromSequenceNumber(properties.getLastEnqueuedSequenceNumber())); final Disposable producerEvents = getEvents(isActive) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientIntegrationTest.java index 914b2820ddc0e..d6a51687c4bf5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientIntegrationTest.java @@ -133,7 +133,7 @@ public void receiveUntilTimeout() { // Arrange final int numberOfEvents = 15; final String partitionId = "1"; - final List events = getEventsAsList(numberOfEvents); + final List events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID); final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now()); final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT); @@ -165,8 +165,8 @@ public void doesNotContinueToReceiveEvents() { final int receiveNumber = 10; final String partitionId = "1"; - final List events = getEventsAsList(numberOfEvents); - final List events2 = getEventsAsList(secondSetOfEvents); + final List events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID); + final List events2 = TestUtils.getEvents(secondSetOfEvents, MESSAGE_TRACKING_ID); final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT); final SendOptions sendOptions = new SendOptions().setPartitionId(partitionId); @@ -199,7 +199,7 @@ public void multipleConsumers() { final int receiveNumber = 10; final String partitionId = "1"; - final List events = getEventsAsList(numberOfEvents); + final List events = TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID); final EventPosition position = EventPosition.fromEnqueuedTime(Instant.now()); final EventHubConsumerClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT); @@ -294,8 +294,4 @@ public void getPartitionProperties() { dispose(consumer); } } - - private static List getEventsAsList(int numberOfEvents) { - return TestUtils.getEvents(numberOfEvents, MESSAGE_TRACKING_ID).collectList().block(); - } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java index 3275504f73ef6..c69672aa0095e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java @@ -202,7 +202,7 @@ protected IntegrationTestEventData setupEventTestData(EventHubProducerAsyncClien logger.info("Pushing events to partition. Message tracking value: {}", messageId); - final List events = TestUtils.getEvents(numberOfEvents, messageId).collectList().block(); + final List events = TestUtils.getEvents(numberOfEvents, messageId); final Instant datePushed = Instant.now(); try { @@ -223,7 +223,7 @@ protected IntegrationTestEventData setupEventTestData(EventHubProducerClient pro logger.info("Pushing events to partition. Message tracking value: {}", messageId); - final List events = TestUtils.getEvents(numberOfEvents, messageId).collectList().block(); + final List events = TestUtils.getEvents(numberOfEvents, messageId); final Instant datePushed = Instant.now(); try { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java index 56ac5df702f5e..25d2353c7b3e9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java @@ -30,6 +30,7 @@ import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME; import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; import static com.azure.messaging.eventhubs.TestUtils.MESSAGE_TRACKING_ID; import static com.azure.messaging.eventhubs.TestUtils.getSymbol; @@ -41,7 +42,6 @@ public class InteropAmqpPropertiesTest extends IntegrationTestBase { private static final String PAYLOAD = "test-message"; private final MessageSerializer serializer = new EventHubMessageSerializer(); - private EventHubAsyncClient client; private EventHubProducerAsyncClient producer; private EventHubConsumerAsyncClient consumer; private SendOptions sendOptions; @@ -54,15 +54,16 @@ public InteropAmqpPropertiesTest() { protected void beforeTest() { sendOptions = new SendOptions().setPartitionId(PARTITION_ID); - client = createBuilder().shareConnection() - .buildAsyncClient(); - producer = client.createProducer(); - consumer = client.createConsumer(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME, DEFAULT_PREFETCH_COUNT); + final EventHubClientBuilder builder = createBuilder().shareConnection() + .consumerGroup(DEFAULT_CONSUMER_GROUP_NAME) + .prefetchCount(DEFAULT_PREFETCH_COUNT); + producer = builder.buildAsyncProducerClient(); + consumer = builder.buildAsyncConsumerClient(); } @Override protected void afterTest() { - dispose(producer, consumer, client); + dispose(producer, consumer); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java index 369bd96276fbd..3c1a5a6819e73 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java @@ -13,7 +13,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.io.IOException; @@ -93,7 +92,7 @@ public void sendEvents() { final String messageId = UUID.randomUUID().toString(); final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID); final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); - final Flux events = TestUtils.getEvents(NUMBER_OF_EVENTS, messageId); + final List events = TestUtils.getEvents(NUMBER_OF_EVENTS, messageId); final PartitionProperties information = producer.getPartitionProperties(PARTITION_ID).block(); Assertions.assertNotNull(information, "Should receive partition information."); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java index d65237c3c6448..c511bd3c67251 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java @@ -7,6 +7,7 @@ import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.SendOptions; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; @@ -19,6 +20,8 @@ /** * Verifies we can use various prefetch options with {@link EventHubConsumerAsyncClient}. */ +@Disabled("Set prefetch tests do not work because they try to send very large number of events at once." + + "https://github.com/Azure/azure-sdk-for-java/issues/9659") class SetPrefetchCountTest extends IntegrationTestBase { private static final String PARTITION_ID = "1"; // Default number of events to fetch when creating the consumer. @@ -49,7 +52,7 @@ protected void beforeTest() { if (!HAS_PUSHED_EVENTS.getAndSet(true)) { final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID); - try (EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient()) { + try (EventHubProducerAsyncClient producer = createBuilder().buildAsyncProducerClient()) { testData = setupEventTestData(producer, NUMBER_OF_EVENTS, options); Assertions.assertNotNull(testData); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java index 132b52ac35e3f..a8619bb5a417e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java @@ -14,13 +14,15 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; -import reactor.core.publisher.Flux; import java.time.Instant; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME; @@ -122,9 +124,10 @@ public static EventData getEventData(byte[] contents, Long sequenceNumber, Long return MESSAGE_SERIALIZER.deserialize(message, EventData.class); } - public static Flux getEvents(int numberOfEvents, String messageTrackingValue) { - return Flux.range(0, numberOfEvents) - .map(number -> getEvent("Event " + number, messageTrackingValue, number)); + public static List getEvents(int numberOfEvents, String messageTrackingValue) { + return IntStream.range(0, numberOfEvents) + .mapToObj(number -> getEvent("Event " + number, messageTrackingValue, number)) + .collect(Collectors.toList()); } static EventData getEvent(String body, String messageTrackingValue, int position) {