Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ServiceBus Session Receiver Client #16690

Merged
merged 25 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aee1663
Add ServiceBus Session Receiver Client
YijunXieMS Oct 22, 2020
094f9ed
Merge branch 'master' into sb_session_receiver
YijunXieMS Oct 22, 2020
b4aeb73
More small fixes after merging from master
YijunXieMS Oct 22, 2020
b7d7ab1
More small fixes after merging from master
YijunXieMS Oct 22, 2020
d9af703
create an unnamed session manager for getReceiverClient
YijunXieMS Oct 23, 2020
f59e84f
Update code review comment
YijunXieMS Oct 27, 2020
8b063c3
Add test case for ServiceBusSessionReceiverAsyncClient and -Client
YijunXieMS Oct 28, 2020
a9a7a0e
Merge branch 'master' into sb_session_receiver
YijunXieMS Oct 28, 2020
2c2c7ae
Fix merge problems and tests
YijunXieMS Oct 28, 2020
464d533
Use ServiceBusClientBuilder's retryPolicy timeout
YijunXieMS Oct 28, 2020
27580ca
Remove/hide receiver methods that have param sessionId
YijunXieMS Oct 28, 2020
30266e1
Small fixes
YijunXieMS Oct 29, 2020
4eb8093
Remove INVALID_LOCK_TOKEN_STRING
YijunXieMS Oct 29, 2020
60d0b52
Fix unnamesession test
YijunXieMS Oct 29, 2020
600f60f
cannot acquire lock wording adjustment
YijunXieMS Oct 29, 2020
22308dd
Add maxLockRenewal to fix test
YijunXieMS Oct 29, 2020
925d7b8
Add @ServiceClient annotation
YijunXieMS Oct 29, 2020
b08e95d
Adjust readme sample code lines
YijunXieMS Oct 29, 2020
c86c5fc
Supress @ServiceClient check for methods acceptNextSession and accept…
YijunXieMS Oct 29, 2020
ef3d9d5
Add back maxConcurrentSessions to ServiceBusSessionReceiverBuilder at…
YijunXieMS Oct 29, 2020
c13f280
Remove </p> from javadoc
YijunXieMS Oct 29, 2020
e717790
Restore some session related code but hide with package level
YijunXieMS Oct 29, 2020
110fcb7
Update sync ServiceBusReceiverClientTest
YijunXieMS Oct 30, 2020
d514793
Try fix javadoc ci problem
YijunXieMS Oct 30, 2020
36c8c95
Try fix javadoc ci problem
YijunXieMS Oct 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@
the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="ServiceBusClientBuilder.java"/>

<!-- Suppress method name "acceptNextSession" and "acceptSession" for ServiceBusSessionReceiverAsyncClient and ServiceBusSessionReceiverClient-->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverAsyncClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverClient.java"/>

<!-- Some classes are named *Builder but are not @ServiceClientBuilder -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.netty.NettyAsyncHttpClientBuilder"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder"/>
Expand Down
29 changes: 15 additions & 14 deletions sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar
`ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver,
respectively.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L29-L33 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L30-L34 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand All @@ -66,7 +66,7 @@ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.buildClient();
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L40-L45 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L41-L46 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -102,7 +102,7 @@ refer to [the associated documentation][aad_authorization].

Use the returned token credential to authenticate the client:

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L52-L58 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L53-L59 -->
```java
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
Expand Down Expand Up @@ -152,7 +152,7 @@ a topic.
The snippet below creates a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to publish a message to a
queue.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L65-L77 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L66-L78 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -187,7 +187,7 @@ queue or topic/subscriber.
The snippet below creates a [`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages from a topic
subscription.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L84-L101 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L85-L102 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand All @@ -214,7 +214,7 @@ receiver.close();
The asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] continuously fetches messages until
the `subscription` is disposed.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L108-L130 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L109-L131 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down Expand Up @@ -247,7 +247,7 @@ When a message is received, it can be settled using any of the `complete()`, `ab
overloads. The sample below completes a received message from synchronous
[`ServiceBusReceiverClient`][ServiceBusReceiverClient].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L145-L151 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L146-L152 -->
```java
// This fetches a batch of 10 messages or until the default operation timeout has elapsed.
receiver.receiveMessages(10).forEach(context -> {
Expand Down Expand Up @@ -287,7 +287,7 @@ Create a [`ServiceBusSenderClient`][ServiceBusSenderClient] for a session enable
`ServiceBusMessage.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the
session does not exist, it is created.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L164-L168 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L165-L169 -->
```java
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
Expand All @@ -300,25 +300,26 @@ sender.sendMessage(message);

Receivers can fetch messages from a specific session or the first available, unlocked session.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L175-L181 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L176-L182 -->
```java
// Creates a session-enabled receiver that gets messages from the session "greetings".
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.sessionId("greetings")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptSession("greetings");
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L188-L193 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L189-L195 -->
```java
// Creates a session-enabled receiver that gets messages from the first available session.
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptNextSession();
```

### Create a dead-letter queue Receiver
Expand All @@ -328,7 +329,7 @@ The dead-letter queue doesn't need to be explicitly created and can't be deleted
of the main entity. For session enabled or non-session queue or topic subscriptions, the dead-letter receiver can be
created the same way as shown below. Learn more about dead-letter queue [here][dead-letter-queue].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L200-L206 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L202-L208 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.servicebus;

import com.azure.core.util.CoreUtils;
import com.azure.messaging.servicebus.models.ReceiveMode;

import java.time.Duration;
Expand All @@ -16,28 +15,22 @@ class ReceiverOptions {
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final boolean isRollingSessionReceiver;
private final Integer maxConcurrentSessions;
private final boolean isSessionReceiver;
private final Duration maxLockRenewDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, false, null);
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, boolean isRollingSessionReceiver,
Integer maxConcurrentSessions) {

boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.enableAutoComplete = enableAutoComplete;
this.sessionId = sessionId;
this.isRollingSessionReceiver = isRollingSessionReceiver;
this.maxConcurrentSessions = maxConcurrentSessions;
this.maxLockRenewDuration = maxLockRenewDuration;
this.isSessionReceiver = !CoreUtils.isNullOrEmpty(sessionId) || isRollingSessionReceiver;
}

/**
Expand Down Expand Up @@ -90,7 +83,7 @@ boolean isAutoLockRenewEnabled() {
* @return true if it is a session-aware receiver; false otherwise.
*/
boolean isSessionReceiver() {
return isSessionReceiver;
return sessionId != null || maxConcurrentSessions != null;
}

/**
Expand All @@ -100,7 +93,7 @@ boolean isSessionReceiver() {
* false} otherwise.
*/
public boolean isRollingSessionReceiver() {
return isRollingSessionReceiver;
return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ public final class ServiceBusSessionReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private String sessionId;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand Down Expand Up @@ -674,7 +673,7 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
*/
public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"maxConcurrentSessions cannot be less than 1."));
Expand Down Expand Up @@ -728,18 +727,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod
return this;
}

/**
* Sets the session id.
*
* @param sessionId session id.
*
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}

/**
* Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
* </b>
Expand Down Expand Up @@ -780,8 +767,8 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
return buildAsyncClientForProcessor(true);
}

/**
Expand All @@ -797,11 +784,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverClient buildClient() {
return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout());
ServiceBusReceiverClient buildClientForProcessor() {
return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout());
}
Comment on lines +787 to 789
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processor will never build a sync client. So, this method can be removed.


private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the async client built for processor will always have autoComplete enabled, the boolean param is redundant and can be removed.

final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
Expand All @@ -822,7 +809,7 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(),
maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
Expand All @@ -834,22 +821,66 @@ maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceive
}

/**
* This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If
* there is a sessionId, this is going to be a single, named session receiver.
* Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
}

/**
* Creates a <b>synchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise.
* @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
private boolean isRollingSessionReceiver() {
if (maxConcurrentSessions == null) {
return false;
public ServiceBusSessionReceiverClient buildClient() {
return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
retryOptions.getTryTimeout());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is for a single attempt, right? If the user has set retry policy to 3 attempts, will this not wait for the 2nd attempt after the first attempt fails due to timeout?

}

private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
SubQueue.NONE);

if (!isAutoCompleteAllowed && enableAutoComplete) {
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
throw logger.logExceptionAsError(new IllegalStateException(
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
}

if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Maximum number of concurrent sessions must be positive."));
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

return CoreUtils.isNullOrEmpty(sessionId);
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);

return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
ServiceBusClientBuilder.this::onClientClose);
}
}

Expand Down
Loading