Skip to content

Commit

Permalink
Preparing for azure-core-amqp 2.2.0 release. (#22211)
Browse files Browse the repository at this point in the history
* Fix Management Bug (#22122)

* Adding missing return statement.

* Using common logic for status codes.

* Adding isSuccessful.

* Closing reactor executor when IO pipe is closed. (#22192)

* When IO sink is disposed of, close the corresponding executor.

* Update ReactorExecutor to use AsyncCloseable.

* Removing unused method.

* Add changelog entry.

* Fix Azure Core Amqp Sample issue #18806 by lihong 202105271344 (#21885)

* Running prepare-release script.

Co-authored-by: Hong Li(MSFT) <74638143+hongli750210@users.noreply.github.com>
  • Loading branch information
conniey and v-hongli1 committed Jun 10, 2021
1 parent e80d82e commit 71e46f1
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 64 deletions.
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ com.azure:azure-communication-identity;1.1.0;1.2.0-beta.1
com.azure:azure-communication-phonenumbers;1.0.1;1.1.0-beta.1
com.azure:azure-containers-containerregistry;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.16.0;1.17.0
com.azure:azure-core-amqp;2.0.6;2.1.0-beta.1
com.azure:azure-core-amqp;2.0.6;2.2.0
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.13;1.0.0-beta.14
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
19 changes: 4 additions & 15 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Release History

## 2.1.0-beta.1 (Unreleased)

## 2.2.0 (2021-06-10)
### New Features
- Exposing CbsAuthorizationType.
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
- Delivery outcomes and delivery states are added.
- Adding support for AMQP data types SEQUENCE and VALUE.

### Bug Fixes
- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
### Dependency Updates
- Upgraded `azure-core` dependency to `1.15.0`.

## 2.0.6 (2021-05-24)
### Bug Fixes
Expand All @@ -22,13 +18,6 @@
- Upgraded `azure-core` from `1.15.0` to `1.16.0`.
- Upgraded Reactor from `3.4.3` to `3.4.5`.

## 2.2.0-beta.1 (2021-04-14)
### New Features
- Adding support for AMQP data types SEQUENCE and VALUE.

### Dependency Updates
- Upgraded `azure-core` dependency to `1.15.0`.

## 2.0.4 (2021-04-12)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ private Mono<Void> isAuthorized() {
.next()
.switchIfEmpty(Mono.error(new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext())))
.handle((response, sink) -> {
if (response != AmqpResponseCode.ACCEPTED && response != AmqpResponseCode.OK) {
if (RequestResponseUtils.isSuccessful(response)) {
sink.complete();
} else {
final String message = String.format("User does not have authorization to perform operation "
+ "on entity [%s]. Response: [%s]", entityPath, response);
sink.error(ExceptionUtil.amqpResponseCodeToException(response.getValue(), message,
getErrorContext()));

} else {
sink.complete();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}

connection.close();
handler.close();

final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand All @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});

handler.close();
subscriptions.dispose();
}));

Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

// To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
// It will not be kicked off until subscribed to.
final Mono<Void> executorCloseMono = executor.closeAsync();
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executorCloseMono;
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executorCloseMono;
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -14,7 +15,6 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
Expand All @@ -23,7 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
/**
* Schedules the proton-j reactor to continuously run work.
*/
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable {

/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
* #close()} is called.
* #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
Expand Down Expand Up @@ -142,10 +145,6 @@ private void run() {
}
}

Mono<Void> isClosed() {
return isClosedMono.asMono();
}

/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
Expand Down Expand Up @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class RequestResponseUtils {

public static boolean isSuccessful(Message message) {
final AmqpResponseCode statusCode = getStatusCode(message);
return isSuccessful(statusCode);
}

public static boolean isSuccessful(AmqpResponseCode statusCode) {
return statusCode == AmqpResponseCode.OK || statusCode == AmqpResponseCode.ACCEPTED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@

package com.azure.core.amqp.models;

import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;

/**
* Class contains sample code snippets that will be used in javadocs.
*/
public class AmqpAnnotatedMessageJavaDocCodeSamples {
/**
* Get message body from {@link AmqpAnnotatedMessage}.
*/
@Test
public void checkBodyType() {
AmqpAnnotatedMessage amqpAnnotatedMessage = null;
AmqpAnnotatedMessage amqpAnnotatedMessage =
new AmqpAnnotatedMessage(AmqpMessageBody.fromData("my-amqp-message".getBytes(StandardCharsets.UTF_8)));
// BEGIN: com.azure.core.amqp.models.AmqpBodyType.checkBodyType
// If client do not check `AmqpMessageBody.getBodyType()` and payload is not of type `AmqpMessageBodyType.DATA`,
// calling `getFirstData()` or `getData()` on `AmqpMessageBody` will throw Runtime exception.
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> respo
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
sink.next(messageSerializer.deserialize(message, responseType));
}

final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
final String statusDescription = RequestResponseUtils.getStatusDescription(message);
final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
statusDescription, channel.getErrorContext());
} else {
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
final String statusDescription = RequestResponseUtils.getStatusDescription(message);
final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
statusDescription, channel.getErrorContext());

sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
}
}));
});
}
Expand Down

0 comments on commit 71e46f1

Please sign in to comment.