diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index ec92b55b87c0..cb2a0abc5348 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -42,7 +41,7 @@ public async Task ProcessMessages(int numThreads, bool autoComplete) AutoComplete = autoComplete, MaxReceiveWaitTime = TimeSpan.FromSeconds(30) }; - var processor = client.CreateProcessor(scope.QueueName, options); + await using var processor = client.CreateProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -114,7 +113,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads) AutoComplete = true, MaxReceiveWaitTime = TimeSpan.FromSeconds(30) }; - var processor = client.CreateProcessor(scope.QueueName, options); + await using var processor = client.CreateProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -195,7 +194,7 @@ public async Task AutoLockRenewalWorks(int numThreads) MaxConcurrentCalls = numThreads, AutoComplete = false }; - var processor = client.CreateProcessor(scope.QueueName, options); + await using var processor = client.CreateProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -268,7 +267,7 @@ public async Task MaxAutoLockRenewalDurationRespected(int numThreads, int autoLo AutoComplete = false, MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration) }; - var processor = client.CreateProcessor(scope.QueueName, options); + await using var processor = client.CreateProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -330,7 +329,7 @@ public async Task CanStopProcessingFromHandler(int numThreads) MaxConcurrentCalls = numThreads, ReceiveMode = ReceiveMode.ReceiveAndDelete }; - var processor = client.CreateProcessor(scope.QueueName, options); + await using var processor = client.CreateProcessor(scope.QueueName, options); int messageProcessedCt = 0; // stop processing halfway through @@ -420,11 +419,11 @@ Task ProcessErrors(ProcessErrorEventArgs args) } [Test] - public void StartStopMultipleTimes() + public async Task StartStopMultipleTimes() { var invalidQueueName = "nonexistentqueuename"; var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName); + await using ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName); TaskCompletionSource taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); processor.ProcessMessageAsync += eventArgs => Task.CompletedTask; processor.ProcessErrorAsync += eventArgs => Task.CompletedTask; @@ -455,7 +454,7 @@ public async Task CannotAddHandlerWhileProcessorIsRunning() { await using var client = GetClient(); - var processor = client.CreateProcessor(scope.QueueName); + await using var processor = client.CreateProcessor(scope.QueueName); Func eventHandler = eventArgs => Task.CompletedTask; Func errorHandler = eventArgs => Task.CompletedTask; @@ -488,7 +487,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion() await using var client = GetClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); - var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions + await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions { AutoComplete = true }); @@ -527,7 +526,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str await using var client = GetClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); - var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions + await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions { AutoComplete = true }); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index 379053e52ca8..9c0a07dad646 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -24,7 +24,7 @@ public async Task CannotRemoveHandlersWhileProcessorIsRunning() { await using var client = GetClient(); - var processor = client.CreateSessionProcessor(scope.QueueName); + await using var processor = client.CreateSessionProcessor(scope.QueueName); Func eventHandler = eventArgs => Task.CompletedTask; Func errorHandler = eventArgs => Task.CompletedTask; @@ -65,7 +65,7 @@ public async Task CannotAddHandlersWhileProcessorIsRunning() { await using var client = GetClient(); - var processor = client.CreateSessionProcessor(scope.QueueName); + await using var processor = client.CreateSessionProcessor(scope.QueueName); Func eventHandler = eventArgs => Task.CompletedTask; Func errorHandler = eventArgs => Task.CompletedTask; @@ -120,7 +120,7 @@ public async Task ProcessSessionMessage(int numThreads, bool autoComplete) MaxConcurrentSessions = numThreads, AutoComplete = autoComplete }; - var processor = client.CreateSessionProcessor(scope.QueueName, options); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); int messageCt = 0; int sessionOpenEventCt = 0; int sessionCloseEventCt = 0; @@ -216,7 +216,7 @@ await sender.SendMessageAsync(new ServiceBusMessage() MaxAutoLockRenewalDuration = lockDuration, AutoComplete = false }; - var processor = client.CreateSessionProcessor(scope.QueueName, options); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); bool receivedDelayMsg = false; List receivedMessages = new List(); TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -276,7 +276,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads) MaxConcurrentSessions = numThreads, AutoComplete = true }; - var processor = client.CreateSessionProcessor(scope.QueueName, options); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource tcs = new TaskCompletionSource(); @@ -362,7 +362,7 @@ public async Task ProcessConsumesAllMessages(int numThreads, bool autoComplete) AutoComplete = autoComplete }; - ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options); + await using ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options); processor.ProcessMessageAsync += ProcessMessage; processor.ProcessErrorAsync += ExceptionHandler; @@ -423,7 +423,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulQu var exceptionReceivedHandlerCalled = false; var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions + await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions { MaxConcurrentSessions = 1 }); @@ -465,7 +465,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulTo { var exceptionReceivedHandlerCalled = false; var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions + await using var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions { MaxConcurrentSessions = 1 }); @@ -535,7 +535,7 @@ public async Task ProcessSessionMessageUsingNamedSessionId(int numThreads, bool SessionIds = new string[] { sessionId } }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -605,7 +605,7 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession) AutoComplete = false, MaxConcurrentCallsPerSession = maxCallsPerSession }; - var processor = client.CreateSessionProcessor(scope.QueueName, options); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -682,7 +682,7 @@ public async Task MaxAutoLockRenewalDurationRespected( AutoComplete = false, MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration) }; - var processor = client.CreateSessionProcessor(scope.QueueName, options); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); int messageCt = 0; TaskCompletionSource[] completionSources = Enumerable @@ -759,7 +759,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion() await using var client = GetClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); - var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions + await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions { AutoComplete = true }); @@ -799,7 +799,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str await using var client = GetClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); - var processor = client.CreateSessionProcessor(scope.QueueName); + await using var processor = client.CreateSessionProcessor(scope.QueueName); var tcs = new TaskCompletionSource(); async Task ProcessMessage(ProcessSessionMessageEventArgs args) @@ -920,7 +920,7 @@ public async Task ProcessMessagesFromMultipleNamedSessions( SessionIds = sessionIds.ToArray() }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -1018,7 +1018,7 @@ public async Task SessionLockLostWhenProcessSessionMessages(int numSessions, int SessionIds = sessionIds.ToArray() }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -1161,7 +1161,7 @@ public async Task UserErrorHandlerInvokedOnceIfSessionLockLost() SessionIds = new string[] { sessionId } }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -1268,7 +1268,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource) SessionIds = new string[] { sessionId } }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -1441,7 +1441,7 @@ public async Task SessionOpenEventDoesNotLoseLock() SessionIds = new string[] { sessionId } }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); @@ -1513,7 +1513,7 @@ public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrent MaxConcurrentCallsPerSession = maxCallsPerSession }; - var processor = client.CreateSessionProcessor( + await using var processor = client.CreateSessionProcessor( scope.QueueName, options); ConcurrentDictionary sessionDict = new ConcurrentDictionary(); @@ -1582,7 +1582,7 @@ public async Task StopProcessingDoesNotCloseLink() await using var client = GetClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); - var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions + await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions { AutoComplete = false, MaxConcurrentSessions = 1