Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service bus] Use ServiceBusException rather than AmqpException #17601

Merged
merged 19 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5ebd1de
Changing over to using ServiceBusException
richardpark-msft Nov 13, 2020
1a2b7c1
Removing import collapse.
richardpark-msft Nov 17, 2020
dcfd301
Fixing verify errors.
richardpark-msft Nov 17, 2020
8e965d9
Fixing verify errors.
richardpark-msft Nov 17, 2020
810a107
Pass in an entire context object, rather than just the throwable.
richardpark-msft Nov 17, 2020
1edca1c
Fixing more verify issues
richardpark-msft Nov 17, 2020
185cd59
Fixing more verify issues
richardpark-msft Nov 17, 2020
9459733
Make errorSource a package private field and add in a helper to extra…
richardpark-msft Nov 17, 2020
32c48aa
Make the ServiceBusErrorContext final (and rename from ServiceBusProc…
richardpark-msft Nov 17, 2020
44bcc36
Moving the location of the onErrorMap so I don't accidentally create …
richardpark-msft Nov 17, 2020
654909c
Remove lame comment.
richardpark-msft Nov 17, 2020
649179c
Rename ReceiveMode to ServiceBusReceiveMode
richardpark-msft Nov 17, 2020
b51d5fa
* Doing some renames for consistency (SENDER -> SEND, ABANDONED -> AB…
richardpark-msft Nov 17, 2020
962cc32
* Be idiomatic with the naming of SERVICE_COMMUNICATION_PROBLEM and c…
richardpark-msft Nov 17, 2020
5be5cab
Fixing formatting errors caught by verify
richardpark-msft Nov 17, 2020
9a84432
Fixing compile errors with the perf tests from changing the receive m…
richardpark-msft Nov 17, 2020
a6926d5
Missed one.
richardpark-msft Nov 17, 2020
de7ce77
Updating to use the new ServiceBusReceiveMode name rather than just R…
richardpark-msft Nov 17, 2020
4113891
Missed one
richardpark-msft Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -26,7 +26,7 @@ public class ReceiveAndDeleteMessageTest extends ServiceTest<ServiceBusStressOpt
* @param options to set performance test options.
*/
public ReceiveAndDeleteMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.RECEIVE_AND_DELETE);
super(options, ServiceBusReceiveMode.RECEIVE_AND_DELETE);
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -27,7 +27,7 @@ public class ReceiveAndLockMessageTest extends ServiceTest<ServiceBusStressOptio
* @param options to set performance test options.
*/
public ReceiveAndLockMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.perf;

import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.UUID;
Expand All @@ -20,7 +20,7 @@ public class SendMessageTest extends ServiceTest<ServiceBusStressOptions> {
* @param options to set performance test options.
*/
public SendMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
String messageId = UUID.randomUUID().toString();
message = new ServiceBusMessage(CONTENTS);
message.setMessageId(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.perf;

import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -22,7 +22,7 @@ public class SendMessagesTest extends ServiceTest<ServiceBusStressOptions> {
* @param options to set performance test options.
*/
public SendMessagesTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
messages = new ArrayList<>();
for (int i = 0; i < options.getMessagesToSend(); ++i) {
ServiceBusMessage message = new ServiceBusMessage(CONTENTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.perf.test.core.PerfStressOptions;
import com.azure.perf.test.core.PerfStressTest;

Expand Down Expand Up @@ -44,7 +44,7 @@ abstract class ServiceTest<TOptions extends PerfStressOptions> extends PerfStres
* @param receiveMode to receive messages.
* @throws IllegalArgumentException if environment variable not being available.
*/
ServiceTest(TOptions options, ReceiveMode receiveMode) {
ServiceTest(TOptions options, ServiceBusReceiveMode receiveMode) {
super(options);
String connectionString = System.getenv(AZURE_SERVICE_BUS_CONNECTION_STRING);
if (CoreUtils.isNullOrEmpty(connectionString)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@

package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

import java.time.Duration;

/**
* Options set when creating a service bus receiver.
*/
class ReceiverOptions {
private final ReceiveMode receiveMode;
private final ServiceBusReceiveMode receiveMode;
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final Integer maxConcurrentSessions;
private final Duration maxLockRenewDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
Expand All @@ -46,7 +46,7 @@ Duration getMaxLockRenewDuration() {
*
* @return the receive mode for the message.
*/
ReceiveMode getReceiveMode() {
ServiceBusReceiveMode getReceiveMode() {
return receiveMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import org.apache.qpid.proton.engine.SslDomain;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -642,7 +642,7 @@ public final class ServiceBusSessionProcessorClientBuilder {
private final ServiceBusProcessorClientOptions processorClientOptions;
private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusErrorContext> processError;

private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
Expand All @@ -669,8 +669,8 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc
}

/**
* Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
Expand Down Expand Up @@ -702,7 +702,7 @@ public ServiceBusSessionProcessorClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
*/
public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
sessionReceiverClientBuilder.receiveMode(receiveMode);
return this;
}
Expand Down Expand Up @@ -750,7 +750,8 @@ public ServiceBusSessionProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusSessionProcessorClientBuilder processError(
Consumer<ServiceBusErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down Expand Up @@ -820,7 +821,7 @@ public final class ServiceBusSessionReceiverClientBuilder {
private Integer maxConcurrentSessions = null;
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand All @@ -843,8 +844,8 @@ public ServiceBusSessionReceiverClientBuilder disableAutoComplete() {

/**
* Sets the amount of time to continue auto-renewing the session lock. Setting {@link Duration#ZERO} or
* {@code null} disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode,
* auto-renewal is disabled.
* {@code null} disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE}
* mode, auto-renewal is disabled.
*
* @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the session lock.
* {@link Duration#ZERO} or {@code null} indicates that auto-renewal is disabled.
Expand Down Expand Up @@ -877,8 +878,8 @@ ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSe
}

/**
* Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
Expand Down Expand Up @@ -915,7 +916,7 @@ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusSessionReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
this.receiveMode = receiveMode;
return this;
}
Expand Down Expand Up @@ -992,12 +993,12 @@ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAut
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down Expand Up @@ -1059,12 +1060,12 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down Expand Up @@ -1095,16 +1096,16 @@ public final class ServiceBusProcessorClientBuilder {
private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder;
private final ServiceBusProcessorClientOptions processorClientOptions;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusErrorContext> processError;

private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
}

/**
* Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
Expand Down Expand Up @@ -1136,7 +1137,7 @@ public ServiceBusProcessorClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusProcessorClientBuilder} object.
*/
public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
serviceBusReceiverClientBuilder.receiveMode(receiveMode);
return this;
}
Expand Down Expand Up @@ -1184,7 +1185,7 @@ public ServiceBusProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down Expand Up @@ -1254,7 +1255,7 @@ public final class ServiceBusReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private SubQueue subQueue;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand All @@ -1277,8 +1278,8 @@ public ServiceBusReceiverClientBuilder disableAutoComplete() {

/**
* Sets the amount of time to continue auto-renewing the lock. Setting {@link Duration#ZERO} or {@code null}
* disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode, auto-renewal is
* disabled.
* disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode,
* auto-renewal is disabled.
*
* @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the lock. {@link Duration#ZERO}
* or {@code null} indicates that auto-renewal is disabled.
Expand All @@ -1293,8 +1294,8 @@ public ServiceBusReceiverClientBuilder maxAutoLockRenewDuration(Duration maxAuto
}

/**
* Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
Expand Down Expand Up @@ -1331,7 +1332,7 @@ public ServiceBusReceiverClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
this.receiveMode = receiveMode;
return this;
}
Expand Down Expand Up @@ -1421,12 +1422,12 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down
Loading