Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API ServiceBusErrorSource to represent source of error #16710

Merged
merged 37 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b0c8562
Designing ErrorSource options
Oct 22, 2020
b9a83ef
Merge branch 'master' into sb-t2-errorsource-api
Oct 22, 2020
aba06d0
Error source structure.
Oct 22, 2020
28fea7b
Adding error handling for Error source
Oct 26, 2020
d01190a
Adding error handling for Error source
Oct 26, 2020
1ec13b5
Adding error handling for Error source
Oct 26, 2020
bd05bfe
Removed error source from SBM context
Oct 26, 2020
3ce0cb3
Removed unwanted changes
Oct 26, 2020
36106a3
Added test case for sending and receiving the messages.
Oct 27, 2020
1d498ae
merge master into branch
Oct 27, 2020
46f7d2c
Added java doc
Oct 27, 2020
e8d7e1f
updated module info
Oct 27, 2020
cbc6f55
updated code to pass error source in updateDisposition
Oct 27, 2020
32aae8d
module info changes will be in separate PR
Oct 27, 2020
e34a23e
Updated based on review comments.
Oct 28, 2020
d9cd7c0
changes based on review comments
Oct 28, 2020
fce6f55
changes based on review comments
Oct 28, 2020
f84f896
changes based on review comments
Oct 28, 2020
2d004d8
Removed Error source from Sender as dotnet is also not doing it
Oct 28, 2020
5a52a8d
Removed Error source from Sender as dotnet is also not doing it
Oct 28, 2020
578a0dd
added error source for renew lock
Oct 28, 2020
eda86d1
added error source for renew lock
Oct 28, 2020
d24f339
added error source for renew lock
Oct 28, 2020
6e3b5d8
added docs for unit test
Oct 28, 2020
c90eb0e
Renamed Exception
Oct 29, 2020
3286f4c
Renamed Exception
Oct 29, 2020
b61551f
review comments incorporated
Oct 29, 2020
204fd28
Fixing checkstyle
Oct 29, 2020
47bfc0b
Incorporated review comments
Oct 29, 2020
e62f7f5
Review comments
Oct 29, 2020
4c11eb4
Review comments
Oct 29, 2020
59ff15f
Review comments
Oct 29, 2020
5b0d16a
Removed unwanted error source types
Oct 30, 2020
205fbfe
Fix unit test and ErrorSource only for AutoComplete on
Oct 30, 2020
62510bd
Review Comments
Oct 30, 2020
6d79df0
Fix unit test
Oct 30, 2020
0a6fea9
merge conflict resolution
Oct 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpException;

/**
* Defines {@link ServiceBusAmqpException} which has additional information about the operation that caused the error.
*
* @see ServiceBusErrorSource
*/
public final class ServiceBusAmqpException extends AmqpException {
private final ServiceBusErrorSource errorSource;

/**
* @param amqpException for the error hapened.
* @param errorSource indicating which api caused the error.
*/
ServiceBusAmqpException(AmqpException amqpException, ServiceBusErrorSource errorSource) {
super(amqpException.isTransient(), amqpException.getErrorCondition(), amqpException.getMessage(),
amqpException.getCause(), amqpException.getContext());
this.errorSource = errorSource;
}

/**
* Gets the {@link ServiceBusErrorSource} in case of any errors.
*
* @return the {@link ServiceBusErrorSource}
*/
public ServiceBusErrorSource getErrorSource() {
return errorSource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.ExpandableStringEnum;

import java.io.Serializable;

/**
* Represent the operation this sdk was performing when the error happened.
*/
public final class ServiceBusErrorSource extends ExpandableStringEnum<ServiceBusErrorSource> implements Serializable {

private static final long serialVersionUID = -2819764417333954922L;
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved

/** Error while abandoning the message.*/
public static final ServiceBusErrorSource ABANDONED = fromString("ABANDONED", ServiceBusErrorSource.class);

/** Error while completing the message.*/
public static final ServiceBusErrorSource COMPLETE = fromString("COMPLETE", ServiceBusErrorSource.class);

/** Error while deferring the message.*/
public static final ServiceBusErrorSource DEFER = fromString("DEFER", ServiceBusErrorSource.class);

/** Error while dead-lettering the message.*/
public static final ServiceBusErrorSource DEAD_LETTER = fromString("DEAD_LETTER",
ServiceBusErrorSource.class);

/** Error while receiving the message(s).*/
public static final ServiceBusErrorSource RECEIVE = fromString("RECEIVE", ServiceBusErrorSource.class);

/** Error while renewing lock.*/
public static final ServiceBusErrorSource RENEW_LOCK = fromString("RENEW_LOCK", ServiceBusErrorSource.class);
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved

/** Error when we could not determine the source.*/
public static final ServiceBusErrorSource UNKNOWN = fromString("UNKNOWN", ServiceBusErrorSource.class);

/** Error while user's code is running for a message.*/
public static final ServiceBusErrorSource USER_CALLBACK = fromString("USER_CALLBACK",
ServiceBusErrorSource.class);

/** Error while session is accepted.*/
public static final ServiceBusErrorSource ACCEPT_MESSAGE_SESSION = fromString("ACCEPT_MESSAGE_SESSION",
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
ServiceBusErrorSource.class);

/** Error while session is closed.*/
public static final ServiceBusErrorSource CLOSE_MESSAGE_SESSION = fromString("CLOSE_MESSAGE_SESSION",
ServiceBusErrorSource.class);


}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,8 @@ public Flux<ServiceBusReceivedMessageContext> receiveMessages() {
withAutoComplete = withAutoLockRenewal;
}

return withAutoComplete;
return withAutoComplete
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -733,7 +734,8 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
String.format("Cannot renew message lock [%s] for a session receiver.", message.getLockToken())));
}

return renewMessageLock(message.getLockToken());
return renewMessageLock(message.getLockToken())
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand All @@ -751,7 +753,8 @@ Mono<OffsetDateTime> renewMessageLock(String lockToken) {
.flatMap(serviceBusManagementNode ->
serviceBusManagementNode.renewMessageLock(lockToken, getLinkName(null)))
.map(offsetDateTime -> managementNodeLocks.addOrUpdate(lockToken, offsetDateTime,
offsetDateTime));
offsetDateTime))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand Down Expand Up @@ -790,7 +793,8 @@ public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration m
renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration),
operation);

return operation.getCompletionOperation();
return operation.getCompletionOperation()
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand All @@ -815,7 +819,8 @@ public Mono<OffsetDateTime> renewSessionLock(String sessionId) {

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand Down Expand Up @@ -851,7 +856,9 @@ public Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDura
this::renewSessionLock);

renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
return operation.getCompletionOperation();
return operation
.getCompletionOperation()
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand Down Expand Up @@ -1048,9 +1055,10 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
renewalContainer.remove(lockToken);
}));

Mono<Void> updateDispositionOperation;
if (sessionManager != null) {
return sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify,
deadLetterReason, deadLetterErrorDescription, transactionContext)
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext)
.flatMap(isSuccess -> {
if (isSuccess) {
renewalContainer.remove(lockToken);
Expand All @@ -1060,20 +1068,48 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
logger.info("Could not perform on session manger. Performing on management node.");
return performOnManagement;
});
}

final ServiceBusAsyncConsumer existingConsumer = consumer.get();
if (isManagementToken(lockToken) || existingConsumer == null) {
return performOnManagement;
} else {
return existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason,
deadLetterErrorDescription, propertiesToModify, transactionContext)
.then(Mono.fromRunnable(() -> {
logger.info("{}: Update completed. Disposition: {}. Lock: {}.",
entityPath, dispositionStatus, lockToken);
renewalContainer.remove(lockToken);
}));
final ServiceBusAsyncConsumer existingConsumer = consumer.get();
if (isManagementToken(lockToken) || existingConsumer == null) {
updateDispositionOperation = performOnManagement;
} else {
updateDispositionOperation = existingConsumer.updateDisposition(lockToken, dispositionStatus,
deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext)
.then(Mono.fromRunnable(() -> {
logger.info("{}: Update completed. Disposition: {}. Lock: {}.",
entityPath, dispositionStatus, lockToken);
renewalContainer.remove(lockToken);
}));
}
}
return updateDispositionOperation
.onErrorMap(throwable -> {
ServiceBusErrorSource errorSource;
if (throwable instanceof AmqpException) {
switch (dispositionStatus) {
case COMPLETED:
errorSource = ServiceBusErrorSource.COMPLETE;
break;
case DEFERRED:
errorSource = ServiceBusErrorSource.DEFER;
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
break;
case SUSPENDED:
errorSource = ServiceBusErrorSource.DEAD_LETTER;
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
break;
case ABANDONED:
errorSource = ServiceBusErrorSource.ABANDONED;
break;
default:
errorSource = ServiceBusErrorSource.UNKNOWN;
}

return new ServiceBusAmqpException((AmqpException) throwable, errorSource);

} else {
return throwable;
}

});
}

private ServiceBusAsyncConsumer getOrCreateConsumer() {
Expand Down Expand Up @@ -1136,4 +1172,15 @@ private String getLinkName(String sessionId) {
return existing != null ? existing.getLinkName() : null;
}
}

/**
* Map the error to {@link ServiceBusAmqpException}
*/
private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) {
if (throwable instanceof AmqpException) {
return new ServiceBusAmqpException((AmqpException) throwable, errorSource);
} else {
return throwable;
}
}
}
Loading