Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Respect user request to close receiver when disconnected #20427

Merged
merged 4 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fix an issue where we don't respect user request to close the receiver if the connection is disconnected when using the `subscribe()` method. [PR #20427](https://github.com/Azure/azure-sdk-for-js/pull/20427)

### Other Changes

## 7.5.0 (2022-02-14)
Expand Down
14 changes: 10 additions & 4 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,17 +576,23 @@ export class StreamingReceiver extends MessageReceiver {
* @param catchAndReportError - A function and reports an error but does not throw it.
*/
private async _initAndAddCreditOperation(caller: "detach" | "subscribe"): Promise<void> {
throwErrorIfConnectionClosed(this._context);

await this._messageHandlers().preInitialize();

if (this._receiverHelper.isSuspended()) {
// user has suspended us while we were initializing
// the connection. Abort this attempt - if they attempt
// resubscribe we'll just reinitialize.
// This checks should happen before throwErrorIfConnectionClosed(); otherwise
// we won't be able to break out of the retry-for-ever loops when user suspend us.
throw new AbortError("Receiver was suspended during initialization.");
}

throwErrorIfConnectionClosed(this._context);

await this._messageHandlers().preInitialize();

if (this._receiverHelper.isSuspended()) {
// Need to check again as user can suspend us in preInitialize()
throw new AbortError("Receiver was suspended during initialization.");
}
await this._init(
this._createReceiverOptions(caller === "detach", this._getHandlers()),
this._subscribeOptions?.abortSignal
Expand Down
9 changes: 7 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiverCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,15 @@ export async function retryForever<T>(
if (!config.retryOptions) {
config.retryOptions = {};
}
if (!config.retryOptions.retryDelayInMs || config.retryOptions.retryDelayInMs < 0) {
// eslint-disable-next-line eqeqeq
if (config.retryOptions.retryDelayInMs == undefined || config.retryOptions.retryDelayInMs < 0) {
config.retryOptions.retryDelayInMs = Constants.defaultDelayBetweenOperationRetriesInMs;
}
if (!config.retryOptions.maxRetryDelayInMs || config.retryOptions.maxRetryDelayInMs < 0) {
if (
// eslint-disable-next-line eqeqeq
config.retryOptions.maxRetryDelayInMs == undefined ||
config.retryOptions.maxRetryDelayInMs < 0
) {
config.retryOptions.maxRetryDelayInMs = Constants.defaultMaxDelayForExponentialRetryInMs;
}
if (!config.retryOptions.mode) {
Expand Down