Skip to content

Commit

Permalink
Add ServiceBus Session Receiver Client (#16690)
Browse files Browse the repository at this point in the history
  • Loading branch information
YijunXieMS committed Oct 30, 2020
1 parent 07a953c commit 3b5d04c
Show file tree
Hide file tree
Showing 27 changed files with 1,019 additions and 497 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@
the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="ServiceBusClientBuilder.java"/>

<!-- Suppress method name "acceptNextSession" and "acceptSession" for ServiceBusSessionReceiverAsyncClient and ServiceBusSessionReceiverClient-->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverAsyncClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverClient.java"/>

<!-- Some classes are named *Builder but are not @ServiceClientBuilder -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.netty.NettyAsyncHttpClientBuilder"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder"/>
Expand Down
29 changes: 15 additions & 14 deletions sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar
`ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver,
respectively.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L29-L33 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L30-L34 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand All @@ -66,7 +66,7 @@ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.buildClient();
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L40-L45 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L41-L46 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -102,7 +102,7 @@ refer to [the associated documentation][aad_authorization].

Use the returned token credential to authenticate the client:

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L52-L58 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L53-L59 -->
```java
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
Expand Down Expand Up @@ -152,7 +152,7 @@ a topic.
The snippet below creates a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to publish a message to a
queue.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L65-L77 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L66-L78 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -187,7 +187,7 @@ queue or topic/subscriber.
The snippet below creates a [`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages from a topic
subscription.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L84-L101 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L85-L102 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand All @@ -214,7 +214,7 @@ receiver.close();
The asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] continuously fetches messages until
the `subscription` is disposed.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L108-L130 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L109-L131 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -247,7 +247,7 @@ When a message is received, it can be settled using any of the `complete()`, `ab
overloads. The sample below completes a received message from synchronous
[`ServiceBusReceiverClient`][ServiceBusReceiverClient].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L145-L151 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L146-L152 -->
```java
// This fetches a batch of 10 messages or until the default operation timeout has elapsed.
receiver.receiveMessages(10).forEach(context -> {
Expand Down Expand Up @@ -287,7 +287,7 @@ Create a [`ServiceBusSenderClient`][ServiceBusSenderClient] for a session enable
`ServiceBusMessage.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the
session does not exist, it is created.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L164-L168 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L165-L169 -->
```java
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
Expand All @@ -300,25 +300,26 @@ sender.sendMessage(message);

Receivers can fetch messages from a specific session or the first available, unlocked session.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L175-L181 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L176-L182 -->
```java
// Creates a session-enabled receiver that gets messages from the session "greetings".
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.sessionId("greetings")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptSession("greetings");
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L188-L193 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L189-L195 -->
```java
// Creates a session-enabled receiver that gets messages from the first available session.
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptNextSession();
```

### Create a dead-letter queue Receiver
Expand All @@ -328,7 +329,7 @@ The dead-letter queue doesn't need to be explicitly created and can't be deleted
of the main entity. For session enabled or non-session queue or topic subscriptions, the dead-letter receiver can be
created the same way as shown below. Learn more about dead-letter queue [here][dead-letter-queue].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L200-L206 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L202-L208 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.servicebus;

import com.azure.core.util.CoreUtils;
import com.azure.messaging.servicebus.models.ReceiveMode;

import java.time.Duration;
Expand All @@ -16,28 +15,22 @@ class ReceiverOptions {
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final boolean isRollingSessionReceiver;
private final Integer maxConcurrentSessions;
private final boolean isSessionReceiver;
private final Duration maxLockRenewDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, false, null);
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, boolean isRollingSessionReceiver,
Integer maxConcurrentSessions) {

boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.enableAutoComplete = enableAutoComplete;
this.sessionId = sessionId;
this.isRollingSessionReceiver = isRollingSessionReceiver;
this.maxConcurrentSessions = maxConcurrentSessions;
this.maxLockRenewDuration = maxLockRenewDuration;
this.isSessionReceiver = !CoreUtils.isNullOrEmpty(sessionId) || isRollingSessionReceiver;
}

/**
Expand Down Expand Up @@ -90,7 +83,7 @@ boolean isAutoLockRenewEnabled() {
* @return true if it is a session-aware receiver; false otherwise.
*/
boolean isSessionReceiver() {
return isSessionReceiver;
return sessionId != null || maxConcurrentSessions != null;
}

/**
Expand All @@ -100,7 +93,7 @@ boolean isSessionReceiver() {
* false} otherwise.
*/
public boolean isRollingSessionReceiver() {
return isRollingSessionReceiver;
return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ public final class ServiceBusSessionReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private String sessionId;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand Down Expand Up @@ -674,7 +673,7 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
*/
public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"maxConcurrentSessions cannot be less than 1."));
Expand Down Expand Up @@ -728,18 +727,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod
return this;
}

/**
* Sets the session id.
*
* @param sessionId session id.
*
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}

/**
* Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
* </b>
Expand Down Expand Up @@ -780,8 +767,8 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
return buildAsyncClientForProcessor(true);
}

/**
Expand All @@ -797,11 +784,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverClient buildClient() {
return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout());
ServiceBusReceiverClient buildClientForProcessor() {
return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout());
}

private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
Expand All @@ -822,7 +809,7 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(),
maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
Expand All @@ -834,22 +821,66 @@ maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceive
}

/**
* This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If
* there is a sessionId, this is going to be a single, named session receiver.
* Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
}

/**
* Creates a <b>synchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise.
* @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
private boolean isRollingSessionReceiver() {
if (maxConcurrentSessions == null) {
return false;
public ServiceBusSessionReceiverClient buildClient() {
return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
retryOptions.getTryTimeout());
}

private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
SubQueue.NONE);

if (!isAutoCompleteAllowed && enableAutoComplete) {
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
throw logger.logExceptionAsError(new IllegalStateException(
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
}

if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Maximum number of concurrent sessions must be positive."));
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

return CoreUtils.isNullOrEmpty(sessionId);
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);

return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
ServiceBusClientBuilder.this::onClientClose);
}
}

Expand Down
Loading

0 comments on commit 3b5d04c

Please sign in to comment.