Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use different connection strings for ServiceBus binders integration tests. #21966

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.spring.test.eventhubs.stream.binder;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,10 +17,13 @@
import reactor.core.publisher.Sinks;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = EventHubBinderBatchModeIT.TestConfig.class)
@TestPropertySource(properties =
{
Expand All @@ -35,7 +37,8 @@ public class EventHubBinderBatchModeIT {
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderBatchModeIT.class);

private static String message = UUID.randomUUID().toString();
private static final AtomicInteger count = new AtomicInteger(0);

private static CountDownLatch latch = new CountDownLatch(1);

@Autowired
private Sinks.Many<Message<String>> many;
Expand All @@ -58,18 +61,21 @@ public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many)
@Bean
public Consumer<Message<String>> consume() {
return message -> {
LOGGER.info("New message received: '{}'", message.getPayload());
Assertions.assertEquals(message.getPayload(), EventHubBinderBatchModeIT.message);
count.addAndGet(1);
LOGGER.info("EventHubBinderBatchModeIT: New message received: '{}'", message.getPayload());
if (message.getPayload().equals(EventHubBinderBatchModeIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
Thread.sleep(15000);
LOGGER.info("EventHubBinderBatchModeIT begin.");
EventHubBinderBatchModeIT.latch.await(15, TimeUnit.SECONDS);
LOGGER.info("Send a message:" + message + ".");
many.emitNext(new GenericMessage<>(message), Sinks.EmitFailureHandler.FAIL_FAST);
Thread.sleep(6000);
Assertions.assertEquals(1, count.get());
assertThat(EventHubBinderBatchModeIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("EventHubBinderBatchModeIT end.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import reactor.core.publisher.Sinks;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = EventHubBinderManualModeIT.TestConfig.class)
@TestPropertySource(properties =
{
Expand All @@ -36,7 +39,7 @@ public class EventHubBinderManualModeIT {

private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderManualModeIT.class);
private static String message = UUID.randomUUID().toString();
private static final AtomicInteger count = new AtomicInteger(0);
private static CountDownLatch latch = new CountDownLatch(1);

@Autowired
private Sinks.Many<Message<String>> many;
Expand All @@ -59,22 +62,25 @@ public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many)
@Bean
public Consumer<Message<String>> consume() {
return message -> {
LOGGER.info("New message received: '{}'", message.getPayload());
Assertions.assertEquals(message.getPayload(), EventHubBinderManualModeIT.message);
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(AzureHeaders.CHECKPOINTER);
checkpointer.success().handle((r, ex) -> {
Assertions.assertNull(ex);
});
count.addAndGet(1);
LOGGER.info("EventHubBinderManualModeIT: New message received: '{}'", message.getPayload());
if (message.getPayload().equals(EventHubBinderManualModeIT.message)) {
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(AzureHeaders.CHECKPOINTER);
checkpointer.success().handle((r, ex) -> {
Assertions.assertNull(ex);
});
latch.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
Thread.sleep(15000);
LOGGER.info("EventHubBinderManualModeIT begin.");
EventHubBinderManualModeIT.latch.await(15, TimeUnit.SECONDS);
LOGGER.info("Send a message:" + message + ".");
many.emitNext(new GenericMessage<>(message), Sinks.EmitFailureHandler.FAIL_FAST);
Thread.sleep(6000);
Assertions.assertEquals(1, count.get());
assertThat(EventHubBinderManualModeIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("EventHubBinderManualModeIT end.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.spring.test.eventhubs.stream.binder;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,10 +17,13 @@
import reactor.core.publisher.Sinks;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = EventHubBinderRecordModeIT.TestConfig.class)
@TestPropertySource(properties =
{
Expand All @@ -32,9 +34,9 @@
})
public class EventHubBinderRecordModeIT {

private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderManualModeIT.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderRecordModeIT.class);
private static String message = UUID.randomUUID().toString();
private static final AtomicInteger count = new AtomicInteger(0);
private static CountDownLatch latch = new CountDownLatch(1);

@Autowired
private Sinks.Many<Message<String>> many;
Expand All @@ -57,18 +59,21 @@ public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many)
@Bean
public Consumer<Message<String>> consume() {
return message -> {
LOGGER.info("New message received: '{}'", message.getPayload());
Assertions.assertEquals(message.getPayload(), EventHubBinderRecordModeIT.message);
count.addAndGet(1);
LOGGER.info("EventHubBinderRecordModeIT: New message received: '{}'", message.getPayload());
if (message.getPayload().equals(EventHubBinderRecordModeIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
Thread.sleep(15000);
LOGGER.info("EventHubBinderRecordModeIT begin.");
EventHubBinderRecordModeIT.latch.await(15, TimeUnit.SECONDS);
LOGGER.info("Send a message:" + message + ".");
many.emitNext(new GenericMessage<>(message), Sinks.EmitFailureHandler.FAIL_FAST);
Thread.sleep(6000);
Assertions.assertEquals(1, count.get());
assertThat(EventHubBinderRecordModeIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("EventHubBinderRecordModeIT end.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.azure.spring.test.eventhubs.stream.binder;

import org.junit.Rule;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,10 +19,13 @@
import reactor.core.publisher.Sinks;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = EventHubBinderSyncModeIT.TestConfig.class)
@TestPropertySource(properties =
{
Expand All @@ -34,7 +36,7 @@
})
public class EventHubBinderSyncModeIT {

private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderManualModeIT.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderSyncModeIT.class);
private static String message = UUID.randomUUID().toString();

@Autowired
Expand All @@ -43,7 +45,7 @@ public class EventHubBinderSyncModeIT {
@Rule
public OutputCaptureRule capture = new OutputCaptureRule();

private static final AtomicInteger count = new AtomicInteger(0);
private static CountDownLatch latch = new CountDownLatch(1);

@EnableAutoConfiguration
public static class TestConfig {
Expand All @@ -63,18 +65,21 @@ public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many)
@Bean
public Consumer<Message<String>> consume() {
return message -> {
LOGGER.info("New message received: '{}'", message.getPayload());
Assertions.assertEquals(message.getPayload(), EventHubBinderSyncModeIT.message);
count.addAndGet(1);
LOGGER.info("EventHubBinderRecordModeIT: New message received: '{}'", message.getPayload());
if (message.getPayload().equals(EventHubBinderSyncModeIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
Thread.sleep(15000);
LOGGER.info("EventHubBinderSyncModeIT begin.");
EventHubBinderSyncModeIT.latch.await(15, TimeUnit.SECONDS);
LOGGER.info("Send a message:" + message + ".");
many.emitNext(new GenericMessage<>(message), Sinks.EmitFailureHandler.FAIL_FAST);
Thread.sleep(6000);
Assertions.assertEquals(1, count.get());
assertThat(EventHubBinderSyncModeIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("EventHubBinderSyncModeIT end.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.
package com.azure.spring.sample.servicebus.binder;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,7 +11,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

Expand All @@ -24,12 +23,12 @@

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = { ServiceBusQueueAndTopicBinderIT.TestQueueConfig.class,
ServiceBusQueueAndTopicBinderIT.TestTopicConfig.class })
@TestPropertySource(locations = "classpath:application.yml")
public class ServiceBusQueueAndTopicBinderIT {
@SpringBootTest(classes = { MultiServiceBusQueueAndTopicBinderIT.TestQueueConfig.class,
MultiServiceBusQueueAndTopicBinderIT.TestTopicConfig.class })
@ActiveProfiles("multi")
public class MultiServiceBusQueueAndTopicBinderIT {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueAndTopicBinderIT.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MultiServiceBusQueueAndTopicBinderIT.class);

private static String message = UUID.randomUUID().toString();

Expand Down Expand Up @@ -60,8 +59,9 @@ public Supplier<Flux<Message<String>>> queueSupply(Sinks.Many<Message<String>> m
public Consumer<Message<String>> queueConsume() {
return message -> {
LOGGER.info("Test queue new message received: '{}'", message);
Assertions.assertEquals(message.getPayload(), ServiceBusQueueAndTopicBinderIT.message);
latch.countDown();
if (message.getPayload().equals(MultiServiceBusQueueAndTopicBinderIT.message)) {
latch.countDown();
}
};
}
}
Expand All @@ -85,23 +85,25 @@ public Supplier<Flux<Message<String>>> topicSupply(Sinks.Many<Message<String>> m
public Consumer<Message<String>> topicConsume() {
return message -> {
LOGGER.info("Test topic new message received: '{}'", message);
Assertions.assertEquals(message.getPayload(), ServiceBusQueueAndTopicBinderIT.message);
latch.countDown();
if (message.getPayload().equals(MultiServiceBusQueueAndTopicBinderIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
LOGGER.info("testSendAndReceiveMessage begin.");
public void testMultiServiceBusSendAndReceiveMessage() throws InterruptedException {
LOGGER.info("MultiServiceBusQueueAndTopicBinderIT begin.");
GenericMessage<String> genericMessage = new GenericMessage<>(message);
LOGGER.info("Send a message to the queue.");

LOGGER.info("Send a message:" + message + " to the queue.");
manyQueue.emitNext(genericMessage, Sinks.EmitFailureHandler.FAIL_FAST);
LOGGER.info("Send a message to the topic.");
LOGGER.info("Send a message:" + message + " to the topic.");
manyTopic.emitNext(genericMessage, Sinks.EmitFailureHandler.FAIL_FAST);

assertThat(ServiceBusQueueAndTopicBinderIT.latch.await(10, TimeUnit.SECONDS)).isTrue();
LOGGER.info("testSendAndReceiveMessage end.");
assertThat(MultiServiceBusQueueAndTopicBinderIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("MultiServiceBusQueueAndTopicBinderIT end.");
}

}
Loading