Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API updates to support function level autocomplete #21181

Merged
merged 3 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ public partial class MessageProcessor
{
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected internal virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class MessagingProvider
{
protected MessagingProvider() { }
public MessagingProvider(Microsoft.Extensions.Options.IOptions<Microsoft.Azure.WebJobs.ServiceBus.ServiceBusOptions> options) { }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusReceiverOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { throw null; }
protected internal virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusProcessorOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusProcessorOptions options) { throw null; }
protected internal virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options) { throw null; }
}
public enum ServiceBusEntityType
{
Expand All @@ -77,9 +76,9 @@ public ServiceBusOptions() { }
public System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ExceptionHandler { get { throw null; } set { } }
public Newtonsoft.Json.JsonSerializerSettings JsonSerializerSettings { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public int MaxConcurrentCalls { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int MaxMessages { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using System;
using Microsoft.Extensions.Options;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(extreme) nit: We should consider sorting the usings.

using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config
Expand All @@ -18,25 +19,28 @@ internal class ServiceBusClientFactory
private readonly IConfiguration _configuration;
private readonly AzureComponentFactory _componentFactory;
private readonly MessagingProvider _messagingProvider;
private readonly ServiceBusOptions _options;

public ServiceBusClientFactory(
IConfiguration configuration,
AzureComponentFactory componentFactory,
MessagingProvider messagingProvider,
AzureEventSourceLogForwarder logForwarder)
AzureEventSourceLogForwarder logForwarder,
IOptions<ServiceBusOptions> options)
{
_configuration = configuration;
_componentFactory = componentFactory;
_messagingProvider = messagingProvider;
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
logForwarder.Start();
}

internal ServiceBusClient CreateClientFromSetting(string connection)
{
var connectionInfo = ResolveConnectionInformation(connection);

return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString)
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential);
return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString, _options.ToClientOptions())
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential, _options.ToClientOptions());
}

internal ServiceBusAdministrationClient CreateAdministrationClient(string connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public int MaxConcurrentSessions
/// Gets or sets the maximum number of messages that will be passed to each function call. This only applies for functions that receive
/// a batch of messages. The default value is 1000.
/// </summary>
public int MaxMessages { get; set; } = 1000;
public int MaxBatchSize { get; set; } = 1000;

/// <summary>
/// Gets or sets the maximum amount of time to wait for a message to be received for the
Expand Down Expand Up @@ -179,7 +179,7 @@ string IOptionsFormatter.Format()
{ nameof(MaxAutoLockRenewalDuration), MaxAutoLockRenewalDuration },
{ nameof(MaxConcurrentCalls), MaxConcurrentCalls },
{ nameof(MaxConcurrentSessions), MaxConcurrentSessions },
{ nameof(MaxMessages), MaxMessages },
{ nameof(MaxBatchSize), MaxBatchSize },
{ nameof(SessionIdleTimeout), SessionIdleTimeout.ToString() ?? string.Empty }
};

Expand Down Expand Up @@ -212,6 +212,12 @@ internal ServiceBusSessionProcessorOptions ToSessionProcessorOptions() =>
SessionIdleTimeout = SessionIdleTimeout
};

internal ServiceBusReceiverOptions ToReceiverOptions() =>
new ServiceBusReceiverOptions
{
PrefetchCount = PrefetchCount
};

internal ServiceBusClientOptions ToClientOptions() =>
new ServiceBusClientOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
private readonly MessagingProvider _messagingProvider;
private readonly ITriggeredFunctionExecutor _triggerExecutor;
private readonly string _functionId;
private readonly ServiceBusEntityType _serviceBusEntityType;
private readonly ServiceBusEntityType _entityType;
private readonly string _entityPath;
private readonly bool _isSessionsEnabled;
private readonly CancellationTokenSource _cancellationTokenSource;
Expand All @@ -49,7 +49,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider

public ServiceBusListener(
string functionId,
ServiceBusEntityType serviceBusEntityType,
ServiceBusEntityType entityType,
string entityPath,
bool isSessionsEnabled,
ITriggeredFunctionExecutor triggerExecutor,
Expand All @@ -61,7 +61,7 @@ public ServiceBusListener(
ServiceBusClientFactory clientFactory)
{
_functionId = functionId;
_serviceBusEntityType = serviceBusEntityType;
_entityType = entityType;
_entityPath = entityPath;
_isSessionsEnabled = isSessionsEnabled;
_triggerExecutor = triggerExecutor;
Expand All @@ -70,12 +70,38 @@ public ServiceBusListener(
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<ServiceBusListener>();

_client = new Lazy<ServiceBusClient>(() => clientFactory.CreateClientFromSetting(connection));
_batchReceiver = new Lazy<ServiceBusReceiver>(() => _messagingProvider.CreateBatchMessageReceiver(_client.Value, _entityPath));
_messageProcessor = new Lazy<MessageProcessor>(() => _messagingProvider.CreateMessageProcessor(_client.Value, _entityPath));
_sessionMessageProcessor = new Lazy<SessionMessageProcessor>(() => _messagingProvider.CreateSessionMessageProcessor(_client.Value, _entityPath));
_client = new Lazy<ServiceBusClient>(
() =>
clientFactory.CreateClientFromSetting(connection));

_batchReceiver = new Lazy<ServiceBusReceiver>(
() => _messagingProvider.CreateBatchMessageReceiver(
_client.Value,
_entityPath,
options.ToReceiverOptions()));

_messageProcessor = new Lazy<MessageProcessor>(
() => _messagingProvider.CreateMessageProcessor(
_client.Value,
_entityPath,
options.ToProcessorOptions()));
Copy link
Member Author

@JoshLove-msft JoshLove-msft May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be updated to take the AutoComplete property from ServiceBusTriggerAttribute into account, once the feature is added.


_sessionMessageProcessor = new Lazy<SessionMessageProcessor>(
() => _messagingProvider.CreateSessionMessageProcessor(
_client.Value,
_entityPath,
options.ToSessionProcessorOptions()));
Copy link
Member Author

@JoshLove-msft JoshLove-msft May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be updated to take the AutoComplete property from ServiceBusTriggerAttribute into account, once the feature is added.


_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(
() => new ServiceBusScaleMonitor(
_functionId,
_entityType,
_entityPath,
connection,
_batchReceiver,
_loggerFactory,
clientFactory));

_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(() => new ServiceBusScaleMonitor(_functionId, _serviceBusEntityType, _entityPath, connection, _batchReceiver, _loggerFactory, clientFactory));
_singleDispatch = singleDispatch;
_serviceBusOptions = options;
}
Expand Down Expand Up @@ -266,7 +292,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
}

IReadOnlyList<ServiceBusReceivedMessage> messages =
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxMessages).ConfigureAwait(false);
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxBatchSize).ConfigureAwait(false);

if (messages != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MessageProcessor(ServiceBusProcessor processor)
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that returns true if the message processing should continue, false otherwise.</returns>
public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
protected internal virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
return Task.FromResult<bool>(true);
}
Expand All @@ -54,7 +54,7 @@ public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions m
/// <param name="result">The <see cref="FunctionResult"/> from the job invocation.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that will complete the message processing.</returns>
public virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
protected internal virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (message is null)
{
Expand Down
Loading