Skip to content

Commit

Permalink
Dispose processor in tests (#14784)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft committed Sep 2, 2020
1 parent a994c45 commit cb05f2e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -42,7 +41,7 @@ public async Task ProcessMessages(int numThreads, bool autoComplete)
AutoComplete = autoComplete,
MaxReceiveWaitTime = TimeSpan.FromSeconds(30)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -114,7 +113,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
AutoComplete = true,
MaxReceiveWaitTime = TimeSpan.FromSeconds(30)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -195,7 +194,7 @@ public async Task AutoLockRenewalWorks(int numThreads)
MaxConcurrentCalls = numThreads,
AutoComplete = false
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -268,7 +267,7 @@ public async Task MaxAutoLockRenewalDurationRespected(int numThreads, int autoLo
AutoComplete = false,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -330,7 +329,7 @@ public async Task CanStopProcessingFromHandler(int numThreads)
MaxConcurrentCalls = numThreads,
ReceiveMode = ReceiveMode.ReceiveAndDelete
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageProcessedCt = 0;

// stop processing halfway through
Expand Down Expand Up @@ -420,11 +419,11 @@ Task ProcessErrors(ProcessErrorEventArgs args)
}

[Test]
public void StartStopMultipleTimes()
public async Task StartStopMultipleTimes()
{
var invalidQueueName = "nonexistentqueuename";
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
await using ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -455,7 +454,7 @@ public async Task CannotAddHandlerWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateProcessor(scope.QueueName);
await using var processor = client.CreateProcessor(scope.QueueName);

Func<ProcessMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -488,7 +487,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoComplete = true
});
Expand Down Expand Up @@ -527,7 +526,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoComplete = true
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task CannotRemoveHandlersWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);

Func<ProcessSessionMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -65,7 +65,7 @@ public async Task CannotAddHandlersWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);

Func<ProcessSessionMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -120,7 +120,7 @@ public async Task ProcessSessionMessage(int numThreads, bool autoComplete)
MaxConcurrentSessions = numThreads,
AutoComplete = autoComplete
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;
int sessionOpenEventCt = 0;
int sessionCloseEventCt = 0;
Expand Down Expand Up @@ -216,7 +216,7 @@ await sender.SendMessageAsync(new ServiceBusMessage()
MaxAutoLockRenewalDuration = lockDuration,
AutoComplete = false
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
bool receivedDelayMsg = false;
List<string> receivedMessages = new List<string>();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -276,7 +276,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
MaxConcurrentSessions = numThreads,
AutoComplete = true
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
Expand Down Expand Up @@ -362,7 +362,7 @@ public async Task ProcessConsumesAllMessages(int numThreads, bool autoComplete)
AutoComplete = autoComplete
};

ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options);
await using ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options);

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;
Expand Down Expand Up @@ -423,7 +423,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulQu
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);

var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
});
Expand Down Expand Up @@ -465,7 +465,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulTo
{
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
});
Expand Down Expand Up @@ -535,7 +535,7 @@ public async Task ProcessSessionMessageUsingNamedSessionId(int numThreads, bool
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -605,7 +605,7 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession)
AutoComplete = false,
MaxConcurrentCallsPerSession = maxCallsPerSession
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -682,7 +682,7 @@ public async Task MaxAutoLockRenewalDurationRespected(
AutoComplete = false,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration)
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -759,7 +759,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
AutoComplete = true
});
Expand Down Expand Up @@ -799,7 +799,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);
var tcs = new TaskCompletionSource<bool>();

async Task ProcessMessage(ProcessSessionMessageEventArgs args)
Expand Down Expand Up @@ -920,7 +920,7 @@ public async Task ProcessMessagesFromMultipleNamedSessions(
SessionIds = sessionIds.ToArray()
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1018,7 +1018,7 @@ public async Task SessionLockLostWhenProcessSessionMessages(int numSessions, int
SessionIds = sessionIds.ToArray()
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1161,7 +1161,7 @@ public async Task UserErrorHandlerInvokedOnceIfSessionLockLost()
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1268,7 +1268,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource)
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1441,7 +1441,7 @@ public async Task SessionOpenEventDoesNotLoseLock()
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1513,7 +1513,7 @@ public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrent
MaxConcurrentCallsPerSession = maxCallsPerSession
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);
ConcurrentDictionary<string, int> sessionDict = new ConcurrentDictionary<string, int>();
Expand Down Expand Up @@ -1582,7 +1582,7 @@ public async Task StopProcessingDoesNotCloseLink()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
AutoComplete = false,
MaxConcurrentSessions = 1
Expand Down

0 comments on commit cb05f2e

Please sign in to comment.