From 1a9ccc704cb3808d1b25bfcb766638575f2c618a Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 17 Feb 2022 01:44:19 +0000 Subject: [PATCH 1/4] [ServiceBus] fix infinite retry cycles The live test shows that with certain timing we could get into a situation where after the connection is closed, we throw an error in `_initAndAddCreditOperation`. and we keep restarting retry cycles in the `forEverRetry()` loop. User calling `close()` wouldn't help because the place we check for suspended state and throw `AbortError` is after we check the connection and throw error. This PR moves the checks of whether the user has suspended the receiver so we could throw `AbortError` to end the retry cycles. I also fix the issue of not respecting 0 values in user specified retry options. --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 8 ++++---- .../service-bus/src/receivers/receiverCommon.ts | 9 +++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 520a91bd88d1..39a3cc9a39a7 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -576,10 +576,6 @@ export class StreamingReceiver extends MessageReceiver { * @param catchAndReportError - A function and reports an error but does not throw it. */ private async _initAndAddCreditOperation(caller: "detach" | "subscribe"): Promise { - throwErrorIfConnectionClosed(this._context); - - await this._messageHandlers().preInitialize(); - if (this._receiverHelper.isSuspended()) { // user has suspended us while we were initializing // the connection. Abort this attempt - if they attempt @@ -587,6 +583,10 @@ export class StreamingReceiver extends MessageReceiver { throw new AbortError("Receiver was suspended during initialization."); } + throwErrorIfConnectionClosed(this._context); + + await this._messageHandlers().preInitialize(); + await this._init( this._createReceiverOptions(caller === "detach", this._getHandlers()), this._subscribeOptions?.abortSignal diff --git a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts index 7514a6e41f93..c98c75ec9bb4 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts @@ -321,10 +321,15 @@ export async function retryForever( if (!config.retryOptions) { config.retryOptions = {}; } - if (!config.retryOptions.retryDelayInMs || config.retryOptions.retryDelayInMs < 0) { + // eslint-disable-next-line eqeqeq + if (config.retryOptions.retryDelayInMs == undefined || config.retryOptions.retryDelayInMs < 0) { config.retryOptions.retryDelayInMs = Constants.defaultDelayBetweenOperationRetriesInMs; } - if (!config.retryOptions.maxRetryDelayInMs || config.retryOptions.maxRetryDelayInMs < 0) { + if ( + // eslint-disable-next-line eqeqeq + config.retryOptions.maxRetryDelayInMs == undefined || + config.retryOptions.maxRetryDelayInMs < 0 + ) { config.retryOptions.maxRetryDelayInMs = Constants.defaultMaxDelayForExponentialRetryInMs; } if (!config.retryOptions.mode) { From a66a399ffdaecf2ec26e1aa2a38cccb1d9238792 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 17 Feb 2022 18:29:59 +0000 Subject: [PATCH 2/4] Need to check suspended status again as user can suspend us in `preInitialize()` handler. --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 39a3cc9a39a7..7b75a6bdc411 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -580,6 +580,8 @@ export class StreamingReceiver extends MessageReceiver { // user has suspended us while we were initializing // the connection. Abort this attempt - if they attempt // resubscribe we'll just reinitialize. + // This checks should happen before throwErrorIfConnectionClosed(); otherwise + // we won't be able to break out of the retry-for-ever loops when user suspend us. throw new AbortError("Receiver was suspended during initialization."); } @@ -587,6 +589,10 @@ export class StreamingReceiver extends MessageReceiver { await this._messageHandlers().preInitialize(); + if (this._receiverHelper.isSuspended()) { + // Need to check again as user can suspend us in preInitialize() + throw new AbortError("Receiver was suspended during initialization."); + } await this._init( this._createReceiverOptions(caller === "detach", this._getHandlers()), this._subscribeOptions?.abortSignal From 141bf9cc744144fa400df565c960cdb97b40e6d6 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 17 Feb 2022 18:53:47 +0000 Subject: [PATCH 3/4] Add changelog entry --- sdk/servicebus/service-bus/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index d2dcee4ce41c..cb920f6b225a 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fix an issue where we don't respect user request to close the receiver when the connection is disconnected. [PR #20427](https://github.com/Azure/azure-sdk-for-js/pull/20427) + ### Other Changes ## 7.5.0 (2022-02-14) From 29d776d5abccc47e74369c50d6c88e9f6d4689d5 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 17 Feb 2022 14:14:16 -0800 Subject: [PATCH 4/4] Update sdk/servicebus/service-bus/CHANGELOG.md Co-authored-by: Ramya Rao --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index cb920f6b225a..e262f242fa3b 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -8,7 +8,7 @@ ### Bugs Fixed -- Fix an issue where we don't respect user request to close the receiver when the connection is disconnected. [PR #20427](https://github.com/Azure/azure-sdk-for-js/pull/20427) +- Fix an issue where we don't respect user request to close the receiver if the connection is disconnected when using the `subscribe()` method. [PR #20427](https://github.com/Azure/azure-sdk-for-js/pull/20427) ### Other Changes