diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAmqpException.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAmqpException.java new file mode 100644 index 0000000000000..61b52d7cca5e2 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAmqpException.java @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.amqp.exception.AmqpException; + +/** + * Defines {@link ServiceBusAmqpException} which has additional information about the operation that caused the error. + * + * @see ServiceBusErrorSource + */ +public final class ServiceBusAmqpException extends AmqpException { + private final transient ServiceBusErrorSource errorSource; + + /** + * @param amqpException for the error hapened. + * @param errorSource indicating which api caused the error. + */ + ServiceBusAmqpException(AmqpException amqpException, ServiceBusErrorSource errorSource) { + super(amqpException.isTransient(), amqpException.getErrorCondition(), amqpException.getMessage(), + amqpException.getCause(), amqpException.getContext()); + this.errorSource = errorSource; + } + + /** + * Gets the {@link ServiceBusErrorSource} in case of any errors. + * + * @return the {@link ServiceBusErrorSource} + */ + public ServiceBusErrorSource getErrorSource() { + return errorSource; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java new file mode 100644 index 0000000000000..43f3be2609af4 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.util.ExpandableStringEnum; + +/** + * Represent the operation this sdk was performing when the error happened. + */ +public final class ServiceBusErrorSource extends ExpandableStringEnum { + + /** Error while abandoning the message.*/ + public static final ServiceBusErrorSource ABANDONED = fromString("ABANDONED", ServiceBusErrorSource.class); + + /** Error while completing the message.*/ + public static final ServiceBusErrorSource COMPLETE = fromString("COMPLETE", ServiceBusErrorSource.class); + + /** Error while receiving the message(s).*/ + public static final ServiceBusErrorSource RECEIVE = fromString("RECEIVE", ServiceBusErrorSource.class); + + /** Error while renewing lock.*/ + public static final ServiceBusErrorSource RENEW_LOCK = fromString("RENEW_LOCK", ServiceBusErrorSource.class); + + /** Error when we could not determine the source.*/ + public static final ServiceBusErrorSource UNKNOWN = fromString("UNKNOWN", ServiceBusErrorSource.class); + + /** Error while user's code is running for a message.*/ + public static final ServiceBusErrorSource USER_CALLBACK = fromString("USER_CALLBACK", + ServiceBusErrorSource.class); + + /** Error while session is accepted.*/ + public static final ServiceBusErrorSource ACCEPT_SESSION = fromString("ACCEPT_SESSION", + ServiceBusErrorSource.class); + + /** Error while session is closed.*/ + public static final ServiceBusErrorSource CLOSE_SESSION = fromString("CLOSE_SESSION", + ServiceBusErrorSource.class); +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index d8378ef5548c0..7957ca5fcbe5d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -378,7 +378,7 @@ public long getSequenceNumber() { * @return Session Id of the {@link ServiceBusReceivedMessage}. */ public String getSessionId() { - return amqpAnnotatedMessage.getProperties().getGroupId(); + return getAmqpAnnotatedMessage().getProperties().getGroupId(); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index d7daa14710447..f63cdc509641e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -590,7 +590,8 @@ public Flux receiveMessages() { withAutoComplete = withAutoLockRenewal; } - return withAutoComplete; + return withAutoComplete + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)); } /** @@ -711,7 +712,8 @@ public Mono renewMessageLock(ServiceBusReceivedMessage message) String.format("Cannot renew message lock [%s] for a session receiver.", message.getLockToken()))); } - return renewMessageLock(message.getLockToken()); + return renewMessageLock(message.getLockToken()) + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK)); } /** @@ -768,7 +770,8 @@ public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration m renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration), operation); - return operation.getCompletionOperation(); + return operation.getCompletionOperation() + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK)); } /** @@ -975,9 +978,10 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId renewalContainer.remove(lockToken); })); + Mono updateDispositionOperation; if (sessionManager != null) { - return sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify, - deadLetterReason, deadLetterErrorDescription, transactionContext) + updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, + propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext) .flatMap(isSuccess -> { if (isSuccess) { renewalContainer.remove(lockToken); @@ -987,20 +991,38 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId logger.info("Could not perform on session manger. Performing on management node."); return performOnManagement; }); - } - - final ServiceBusAsyncConsumer existingConsumer = consumer.get(); - if (isManagementToken(lockToken) || existingConsumer == null) { - return performOnManagement; } else { - return existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason, - deadLetterErrorDescription, propertiesToModify, transactionContext) - .then(Mono.fromRunnable(() -> { - logger.info("{}: Update completed. Disposition: {}. Lock: {}.", - entityPath, dispositionStatus, lockToken); - renewalContainer.remove(lockToken); - })); + final ServiceBusAsyncConsumer existingConsumer = consumer.get(); + if (isManagementToken(lockToken) || existingConsumer == null) { + updateDispositionOperation = performOnManagement; + } else { + updateDispositionOperation = existingConsumer.updateDisposition(lockToken, dispositionStatus, + deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext) + .then(Mono.fromRunnable(() -> { + logger.info("{}: Update completed. Disposition: {}. Lock: {}.", + entityPath, dispositionStatus, lockToken); + renewalContainer.remove(lockToken); + })); + } } + return updateDispositionOperation + .onErrorMap(throwable -> { + // We only populate ErrorSource only when AutoComplete is enabled. + if (receiverOptions.isEnableAutoComplete() && throwable instanceof AmqpException) { + switch (dispositionStatus) { + case COMPLETED: + return new ServiceBusAmqpException((AmqpException) throwable, + ServiceBusErrorSource.COMPLETE); + case ABANDONED: + return new ServiceBusAmqpException((AmqpException) throwable, + ServiceBusErrorSource.ABANDONED); + default: + // Do nothing + } + } + return throwable; + + }); } private ServiceBusAsyncConsumer getOrCreateConsumer() { @@ -1077,7 +1099,8 @@ Mono renewSessionLock(String sessionId) { return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) - .flatMap(channel -> channel.renewSessionLock(sessionId, linkName)); + .flatMap(channel -> channel.renewSessionLock(sessionId, linkName)) + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK)); } Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) { @@ -1101,7 +1124,8 @@ Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) { maxLockRenewalDuration, true, this::renewSessionLock); renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation); - return operation.getCompletionOperation(); + return operation.getCompletionOperation() + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK)); } Mono setSessionState(String sessionId, byte[] sessionState) { @@ -1135,4 +1159,15 @@ Mono getSessionState(String sessionId) { .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId))); } } + + /** + * Map the error to {@link ServiceBusAmqpException} + */ + private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) { + if ((throwable instanceof ServiceBusAmqpException) || !(throwable instanceof AmqpException)) { + return throwable; + } else { + return new ServiceBusAmqpException((AmqpException) throwable, errorSource); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index bfdd9d0d7f643..7b9866f3aeb31 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -7,6 +7,7 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; @@ -29,6 +30,7 @@ import com.azure.messaging.servicebus.models.ReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; @@ -40,7 +42,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -65,6 +69,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static com.azure.messaging.servicebus.TestUtils.getMessage; import static java.nio.charset.StandardCharsets.UTF_8; @@ -75,6 +80,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -443,6 +449,163 @@ void deadLetterWithDescription() { verify(amqpReceiveLink).updateDisposition(eq(lockToken1), isA(Rejected.class)); } + /** + * Verifies that error source is populated when any error happened while renewing lock. + */ + @Test + void errorSourceOnRenewMessageLock() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final String lockToken = "some-token"; + + when(receivedMessage.getLockToken()).thenReturn(lockToken); + when(managementNode.renewMessageLock(lockToken, null)) + .thenReturn(Mono.error(new AmqpException(false, "some error occurred.", null))); + + // Act & Assert + StepVerifier.create(receiver.renewMessageLock(receivedMessage, maxDuration)) + .verifyErrorSatisfies(throwable -> { + Assertions.assertTrue(throwable instanceof ServiceBusAmqpException); + final ServiceBusErrorSource actual = ((ServiceBusAmqpException) throwable).getErrorSource(); + Assertions.assertEquals(ServiceBusErrorSource.RENEW_LOCK, actual); + }); + + verify(managementNode, times(1)).renewMessageLock(lockToken, null); + } + + /** + * Verifies that error source is populated when any error happened while renewing lock. + */ + @Test + void errorSourceOnSessionLock() { + // Arrange + when(managementNode.renewSessionLock(SESSION_ID, null)).thenReturn(Mono.error(new AmqpException(false, "some error occurred.", null))); + + // Act & Assert + StepVerifier.create(sessionReceiver.renewSessionLock(SESSION_ID)) + .verifyErrorSatisfies(throwable -> { + Assertions.assertTrue(throwable instanceof ServiceBusAmqpException); + final ServiceBusErrorSource actual = ((ServiceBusAmqpException) throwable).getErrorSource(); + Assertions.assertEquals(ServiceBusErrorSource.RENEW_LOCK, actual); + }); + } + + /** + * Verifies that error source is not populated when there is no autoComplete. Because user wanted to settle on their + * own, we do not need to populate ErrorSource. + */ + @ParameterizedTest + @MethodSource + void errorSourceNoneOnSettlement(DispositionStatus dispositionStatus, DeliveryStateType expectedDeliveryState) { + + final UUID lockTokenUuid = UUID.randomUUID(); + final String lockToken1 = lockTokenUuid.toString(); + + final MessageWithLockToken message = mock(MessageWithLockToken.class); + + when(receivedMessage.getLockToken()).thenReturn(lockToken1); + + when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); + + when(amqpReceiveLink.updateDisposition(eq(lockToken1), argThat(e -> e.getType() == expectedDeliveryState))) + .thenReturn(Mono.error(new AmqpException(false, "some error occurred.", null))); + + // Act & Assert + StepVerifier.create(receiver.receiveMessages().take(1) + .flatMap(context -> { + final Mono operation; + switch (dispositionStatus) { + case ABANDONED: + operation = receiver.abandon(context.getMessage()); + break; + case COMPLETED: + operation = receiver.complete(context.getMessage()); + break; + default: + throw new IllegalArgumentException("Unrecognized operation: " + dispositionStatus); + } + return operation; + }) + ) + .then(() -> messageSink.next(message)) + .expectNext() + .verifyErrorSatisfies(throwable -> { + Assertions.assertFalse(throwable instanceof ServiceBusAmqpException); + Assertions.assertTrue(throwable instanceof AmqpException); + }); + + verify(amqpReceiveLink).updateDisposition(eq(lockToken1), any(DeliveryState.class)); + } + + /** + * Ensure that we throw right error source when there is any issue during autocomplete. Error source should be + * {@link ServiceBusErrorSource#COMPLETE} + */ + @Test + void errorSourceAutoCompleteMessage() { + // Arrange + final int numberOfEvents = 2; + final int messagesToReceive = 1; + final List messages = getMessages(); + final String lockToken = UUID.randomUUID().toString(); + final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, + MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, + messageSerializer, onClientClose); + + when(receivedMessage.getLockToken()).thenReturn(lockToken); + when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class))) + .thenReturn(receivedMessage); + + when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.error(new AmqpException(false, "some error occurred.", null))); + + try { + // Act & Assert + StepVerifier.create(receiver2.receiveMessages().take(numberOfEvents)) + .then(() -> messages.forEach(m -> messageSink.next(m))) + .expectNextCount(messagesToReceive) + .verifyErrorSatisfies(throwable -> { + Assertions.assertTrue(throwable instanceof ServiceBusAmqpException); + final ServiceBusErrorSource actual = ((ServiceBusAmqpException) throwable).getErrorSource(); + Assertions.assertEquals(ServiceBusErrorSource.COMPLETE, actual); + }); + } finally { + receiver2.close(); + } + + verify(amqpReceiveLink, atLeast(messagesToReceive)).updateDisposition(lockToken, Accepted.getInstance()); + } + + /** + * Verifies that error source is populated when there is any error during receiving of message. + */ + @Test + void errorSourceOnReceiveMessage() { + final String lockToken = UUID.randomUUID().toString(); + + final OffsetDateTime expiration = OffsetDateTime.now().plus(Duration.ofMinutes(5)); + + final MessageWithLockToken message = mock(MessageWithLockToken.class); + + when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); + + when(receivedMessage.getLockToken()).thenReturn(lockToken); + when(receivedMessage.getLockedUntil()).thenReturn(expiration); + + when(connection.createReceiveLink(anyString(), anyString(), any(ReceiveMode.class), any(), + any(MessagingEntityType.class))).thenReturn(Mono.error(new AmqpException(false, "some receive link Error.", null))); + + // Act & Assert + StepVerifier.create(receiver.receiveMessages().take(1)) + .verifyErrorSatisfies(throwable -> { + Assertions.assertTrue(throwable instanceof ServiceBusAmqpException); + final ServiceBusErrorSource actual = ((ServiceBusAmqpException) throwable).getErrorSource(); + Assertions.assertEquals(ServiceBusErrorSource.RECEIVE, actual); + }); + + verify(amqpReceiveLink, never()).updateDisposition(eq(lockToken), any(DeliveryState.class)); + } + /** * Verifies that the user can complete settlement methods on received message. */ @@ -970,4 +1133,10 @@ private List getMessages() { .mapToObj(index -> getMessage(PAYLOAD_BYTES, messageTrackingUUID, map)) .collect(Collectors.toList()); } + + private static Stream errorSourceNoneOnSettlement() { + return Stream.of( + Arguments.of(DispositionStatus.COMPLETED, DeliveryStateType.Accepted), + Arguments.of(DispositionStatus.ABANDONED, DeliveryStateType.Modified)); + } }