Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API ServiceBusErrorSource to represent source of error #16710

Merged
merged 37 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b0c8562
Designing ErrorSource options
Oct 22, 2020
b9a83ef
Merge branch 'master' into sb-t2-errorsource-api
Oct 22, 2020
aba06d0
Error source structure.
Oct 22, 2020
28fea7b
Adding error handling for Error source
Oct 26, 2020
d01190a
Adding error handling for Error source
Oct 26, 2020
1ec13b5
Adding error handling for Error source
Oct 26, 2020
bd05bfe
Removed error source from SBM context
Oct 26, 2020
3ce0cb3
Removed unwanted changes
Oct 26, 2020
36106a3
Added test case for sending and receiving the messages.
Oct 27, 2020
1d498ae
merge master into branch
Oct 27, 2020
46f7d2c
Added java doc
Oct 27, 2020
e8d7e1f
updated module info
Oct 27, 2020
cbc6f55
updated code to pass error source in updateDisposition
Oct 27, 2020
32aae8d
module info changes will be in separate PR
Oct 27, 2020
e34a23e
Updated based on review comments.
Oct 28, 2020
d9cd7c0
changes based on review comments
Oct 28, 2020
fce6f55
changes based on review comments
Oct 28, 2020
f84f896
changes based on review comments
Oct 28, 2020
2d004d8
Removed Error source from Sender as dotnet is also not doing it
Oct 28, 2020
5a52a8d
Removed Error source from Sender as dotnet is also not doing it
Oct 28, 2020
578a0dd
added error source for renew lock
Oct 28, 2020
eda86d1
added error source for renew lock
Oct 28, 2020
d24f339
added error source for renew lock
Oct 28, 2020
6e3b5d8
added docs for unit test
Oct 28, 2020
c90eb0e
Renamed Exception
Oct 29, 2020
3286f4c
Renamed Exception
Oct 29, 2020
b61551f
review comments incorporated
Oct 29, 2020
204fd28
Fixing checkstyle
Oct 29, 2020
47bfc0b
Incorporated review comments
Oct 29, 2020
e62f7f5
Review comments
Oct 29, 2020
4c11eb4
Review comments
Oct 29, 2020
59ff15f
Review comments
Oct 29, 2020
5b0d16a
Removed unwanted error source types
Oct 30, 2020
205fbfe
Fix unit test and ErrorSource only for AutoComplete on
Oct 30, 2020
62510bd
Review Comments
Oct 30, 2020
6d79df0
Fix unit test
Oct 30, 2020
0a6fea9
merge conflict resolution
Oct 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

/**
* Represent the operation user was performing when the error happened.
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum ServiceBusErrorSource {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
/** Error while sending the message(s).*/
SEND,
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved

/** Error while receiving the message(s).*/
RECEIVE,

/** Error while abandoning the message.*/
ABANDONED,

/** Error while completing the message.*/
COMPLETE,

/** Error while deferring the message.*/
DEFER,

/** Error while dead-lettering the message.*/
DEAD_LETTER;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpException;

/**
* Defines {@link ServiceBusException} which has addition properties. You can {@link ServiceBusErrorSource} to
* determine source of error.
*
* @see ServiceBusErrorSource
*/
public class ServiceBusException extends AmqpException {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
private final ServiceBusErrorSource errorSource;

/**
* @param amqpException for the error hapened.
* @param errorSource indicating which api caused the error.
*/
ServiceBusException(AmqpException amqpException, ServiceBusErrorSource errorSource) {
super(amqpException.isTransient(), amqpException.getErrorCondition(), amqpException.getMessage(),
amqpException.getCause(), amqpException.getContext());
this.errorSource = errorSource;
}

/**
* Gets the {@link ServiceBusErrorSource} in case of any errors.
*
* @return the {@link ServiceBusErrorSource}
*/
public ServiceBusErrorSource getErrorSource() {
return errorSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public String getEntityPath() {
*/
public Mono<Void> abandon(ServiceBusReceivedMessage message) {
return updateDisposition(message, DispositionStatus.ABANDONED, null, null,
null, null);
null, null, ServiceBusErrorSource.ABANDONED);
}

/**
Expand Down Expand Up @@ -233,7 +233,7 @@ public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions opti
}

return updateDisposition(message, DispositionStatus.ABANDONED, null, null,
options.getPropertiesToModify(), options.getTransactionContext());
options.getPropertiesToModify(), options.getTransactionContext(), ServiceBusErrorSource.ABANDONED);
}

/**
Expand All @@ -248,7 +248,7 @@ public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions opti
*/
public Mono<Void> complete(ServiceBusReceivedMessage message) {
return updateDisposition(message, DispositionStatus.COMPLETED, null, null,
null, null);
null, null, ServiceBusErrorSource.COMPLETE);
}

/**
Expand Down Expand Up @@ -277,7 +277,7 @@ public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions op
}

return updateDisposition(message, DispositionStatus.COMPLETED, null, null,
null, options.getTransactionContext());
null, options.getTransactionContext(), ServiceBusErrorSource.COMPLETE);
}

/**
Expand All @@ -292,7 +292,7 @@ public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions op
*/
public Mono<Void> defer(ServiceBusReceivedMessage message) {
return updateDisposition(message, DispositionStatus.DEFERRED, null, null,
null, null);
null, null, ServiceBusErrorSource.DEFER);
}

/**
Expand Down Expand Up @@ -323,7 +323,7 @@ public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)
}

return updateDisposition(message, DispositionStatus.DEFERRED, null, null,
options.getPropertiesToModify(), options.getTransactionContext());
options.getPropertiesToModify(), options.getTransactionContext(), ServiceBusErrorSource.DEFER);
}

/**
Expand Down Expand Up @@ -370,7 +370,7 @@ public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOption
}
return updateDisposition(message, DispositionStatus.SUSPENDED, options.getDeadLetterReason(),
options.getDeadLetterErrorDescription(), options.getPropertiesToModify(),
options.getTransactionContext());
options.getTransactionContext(), ServiceBusErrorSource.DEAD_LETTER);
}

/**
Expand Down Expand Up @@ -611,7 +611,14 @@ public Flux<ServiceBusReceivedMessageContext> receiveMessages() {
withAutoComplete = withAutoLockRenewal;
}

return withAutoComplete;
return withAutoComplete
.onErrorMap(throwable -> {
if (throwable instanceof AmqpException) {
return new ServiceBusException((AmqpException) throwable, ServiceBusErrorSource.RECEIVE);
} else {
return throwable;
}
});
}

/**
Expand Down Expand Up @@ -1008,8 +1015,21 @@ private boolean isManagementToken(String lockToken) {

private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus,
String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify,
ServiceBusTransactionContext transactionContext) {
ServiceBusTransactionContext transactionContext, ServiceBusErrorSource errorSource) {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
return updateDispositionInternal(message, dispositionStatus, deadLetterReason, deadLetterErrorDescription,
propertiesToModify, transactionContext)
.onErrorMap(throwable -> {
if (throwable instanceof AmqpException) {
return new ServiceBusException((AmqpException) throwable, errorSource);
}
return throwable;

});
}

private Mono<Void> updateDispositionInternal(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus,
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify,
ServiceBusTransactionContext transactionContext) {
if (isDisposed.get()) {
return monoError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,12 @@ private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBus
return createMessageBatch().flatMap(messageBatch -> {
messages.forEach(message -> messageBatch.tryAddMessage(message));
return sendInternal(messageBatch, transaction);
}).onErrorMap(throwable -> {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
if (throwable instanceof AmqpException) {
return new ServiceBusException((AmqpException) throwable, ServiceBusErrorSource.SEND);
} else {
return throwable;
}
});
}

Expand Down Expand Up @@ -607,6 +613,12 @@ private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact
if (isTracingEnabled) {
tracerProvider.endSpan(parentContext.get(), signal);
}
}).onErrorMap(throwable -> {
if (throwable instanceof AmqpException) {
return new ServiceBusException((AmqpException) throwable, ServiceBusErrorSource.SEND);
} else {
return throwable;
}
});

}
Expand All @@ -621,8 +633,14 @@ private Mono<Void> sendInternal(Flux<ServiceBusMessage> messages, ServiceBusTran
return messages.collect(new AmqpMessageCollector(batchOptions, 1,
link::getErrorContext, tracerProvider, messageSerializer, entityName,
link.getHostname()));
})
.flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext)));
}).flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext)))
.onErrorMap(throwable -> {
if (throwable instanceof AmqpException) {
return new ServiceBusException((AmqpException) throwable, ServiceBusErrorSource.SEND);
} else {
return throwable;
}
});
}

private Mono<Void> sendInternalBatch(Flux<ServiceBusMessageBatch> eventBatches,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.CbsAuthorizationType;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
Expand All @@ -29,6 +30,7 @@
import com.azure.messaging.servicebus.models.ReceiveMode;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.message.Message;
Expand All @@ -40,7 +42,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
Expand All @@ -65,6 +69,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.azure.messaging.servicebus.TestUtils.getMessage;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -443,6 +448,92 @@ void deadLetterWithDescription() {
verify(amqpReceiveLink).updateDisposition(eq(lockToken1), isA(Rejected.class));
}

/**
* Verifies that error source is populated when there is any error during message settlement.
*/
@ParameterizedTest
@MethodSource
void errorSourceOnSettlement(DispositionStatus dispositionStatus, ServiceBusErrorSource expectedErrorSource,
DeliveryStateType expectedDeliveryState) {
final String lockToken1 = UUID.randomUUID().toString();

final OffsetDateTime expiration = OffsetDateTime.now().plus(Duration.ofMinutes(5));

final MessageWithLockToken message = mock(MessageWithLockToken.class);

when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage);

when(receivedMessage.getLockToken()).thenReturn(lockToken1);
when(receivedMessage.getLockedUntil()).thenReturn(expiration);

when(amqpReceiveLink.updateDisposition(eq(lockToken1), argThat(e -> e.getType() == expectedDeliveryState)))
.thenReturn(Mono.error(new AmqpException(false, "some error occured.", null)));

// Act & Assert
StepVerifier.create(receiver.receiveMessages().take(1)
.flatMap(context -> {
final Mono<Void> operation;
switch (dispositionStatus) {
case DEFERRED:
operation = receiver.defer(receivedMessage);
break;
case ABANDONED:
operation = receiver.abandon(receivedMessage);
break;
case COMPLETED:
operation = receiver.complete(receivedMessage);
break;
case SUSPENDED:
operation = receiver.deadLetter(receivedMessage);
break;
default:
throw new IllegalArgumentException("Unrecognized operation: " + dispositionStatus);
}
return operation;
}))
.then(() -> messageSink.next(message))
.expectNext()
.verifyErrorMatches(throwable -> {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
Assertions.assertTrue(throwable instanceof ServiceBusException);
final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource();
Assertions.assertEquals(expectedErrorSource, actual);
return true;
});

verify(amqpReceiveLink).updateDisposition(eq(lockToken1), any(DeliveryState.class));
}

/**
* Verifies that error source is populated when there is any error during receiving of message.
*/
@Test
void errorSourceOnReceiveMessage() {
final String lockToken1 = UUID.randomUUID().toString();
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved

final OffsetDateTime expiration = OffsetDateTime.now().plus(Duration.ofMinutes(5));

final MessageWithLockToken message = mock(MessageWithLockToken.class);

when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage);

when(receivedMessage.getLockToken()).thenReturn(lockToken1);
when(receivedMessage.getLockedUntil()).thenReturn(expiration);

when(connection.createReceiveLink(anyString(), anyString(), any(ReceiveMode.class), any(),
any(MessagingEntityType.class))).thenReturn(Mono.error(new AmqpException(false, "some receive link Error.", null)));

// Act & Assert
StepVerifier.create(receiver.receiveMessages().take(1))
.verifyErrorMatches(throwable -> {
Assertions.assertTrue(throwable instanceof ServiceBusException);
final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource();
Assertions.assertEquals(ServiceBusErrorSource.RECEIVE, actual);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have tests for error source UNKNOWN?

return true;
});

verify(amqpReceiveLink, never()).updateDisposition(eq(lockToken1), any(DeliveryState.class));
}

/**
* Verifies that the user can complete settlement methods on received message.
*/
Expand Down Expand Up @@ -970,4 +1061,12 @@ private List<Message> getMessages() {
.mapToObj(index -> getMessage(PAYLOAD_BYTES, messageTrackingUUID, map))
.collect(Collectors.toList());
}

private static Stream<Arguments> errorSourceOnSettlement() {
return Stream.of(
Arguments.of(DispositionStatus.DEFERRED, ServiceBusErrorSource.DEFER, DeliveryStateType.Modified),
Arguments.of(DispositionStatus.COMPLETED, ServiceBusErrorSource.COMPLETE, DeliveryStateType.Accepted),
Arguments.of(DispositionStatus.SUSPENDED, ServiceBusErrorSource.DEAD_LETTER, DeliveryStateType.Rejected),
Arguments.of(DispositionStatus.ABANDONED, ServiceBusErrorSource.ABANDONED, DeliveryStateType.Modified));
}
}
Loading