Skip to content

Commit

Permalink
Add client responder configuration
Browse files Browse the repository at this point in the history
Prior to this commit, the `RSocketRequester.Builder` would allow to
configure directly annotated handlers for processing server requests.
This lead to a package tangle where the `o.s.messaging.rsocket` would
use classes from `o.s.messaging.rsocket.annotation.support` package.

This commit introduces the `ClientResponder` interface for configuring a
responder on the client requester. This is compatible with future
changes with a parallel implementation of a functional variant for
RSocket handlers.

Closes spring-projectsgh-23170
  • Loading branch information
bclozel committed Jul 8, 2019
1 parent 507d128 commit f6b879d
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.messaging.rsocket;

import java.util.function.BiFunction;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;

import org.springframework.util.RouteMatcher;

/**
* Handle requests sent by the RSocket server to the client.
* @author Brian Clozel
* @since 5.2
*/
public interface ClientResponder {

/**
* Create a client Socket acceptor for handling server requests.
* @param matcher the route matcher for routing incoming requests
* @param strategies the codecs strategies for (de)serializing messages
* @return a client Socket acceptor
*/
BiFunction<ConnectionSetupPayload, RSocket, RSocket> toSocketAcceptor(
RouteMatcher matcher, RSocketStrategies strategies);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand All @@ -31,9 +30,11 @@
import reactor.core.publisher.Mono;

import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;

/**
* Default implementation of {@link RSocketRequester.Builder}.
Expand All @@ -54,9 +55,13 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
@Nullable
private RSocketStrategies strategies;

@Nullable
private RouteMatcher routeMatcher;

private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();

private List<Object> handlers = new ArrayList<>();
@Nullable
private ClientResponder clientResponder;

@Override
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
Expand Down Expand Up @@ -84,8 +89,14 @@ public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies st
}

@Override
public RSocketRequester.Builder annotatedHandlers(Object... handlers) {
this.handlers.addAll(Arrays.asList(handlers));
public RSocketRequester.Builder routeMatcher(@Nullable RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}

@Override
public RSocketRequester.Builder responder(@Nullable ClientResponder responder) {
this.clientResponder = responder;
return this;
}

Expand Down Expand Up @@ -120,12 +131,8 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());

if (!this.handlers.isEmpty()) {
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers);
messageHandler.setRSocketStrategies(rsocketStrategies);
messageHandler.afterPropertiesSet();
rsocketFactory.acceptor(messageHandler.clientAcceptor());
if (this.clientResponder != null) {
rsocketFactory.acceptor(this.clientResponder.toSocketAcceptor(getRouteMatcher(), rsocketStrategies));
}
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
Expand All @@ -148,6 +155,10 @@ private RSocketStrategies getRSocketStrategies() {
}
}

private RouteMatcher getRouteMatcher() {
return this.routeMatcher != null ? this.routeMatcher : new SimpleRouteMatcher(new AntPathMatcher("."));
}

private MimeType getDataMimeType(RSocketStrategies strategies) {
if (this.dataMimeType != null) {
return this.dataMimeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;

/**
* A thin wrapper around a sending {@link RSocket} with a fluent API accepting
Expand Down Expand Up @@ -65,7 +65,6 @@ public interface RSocketRequester {
*/
MimeType metadataMimeType();


/**
* Begin to specify a new request with the given route to a remote handler.
* <p>If the connection is set to use composite metadata, the route is
Expand Down Expand Up @@ -158,9 +157,19 @@ interface Builder {
/**
* Set the {@link RSocketStrategies} to use for access to encoders,
* decoders, and a factory for {@code DataBuffer's}.
* @param strategies the codecs strategies to use
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);

/**
* Set the {@link RouteMatcher} to use for matching incoming requests.
* <p>If none is set, then the responder will use a default
* {@link org.springframework.util.SimpleRouteMatcher} instance backed
* by and {@link org.springframework.util.AntPathMatcher}.
* @param routeMatcher the route matcher to use with the responder
*/
RSocketRequester.Builder routeMatcher(@Nullable RouteMatcher routeMatcher);

/**
* Customize the {@link RSocketStrategies}.
* <p>By default this starts out with an empty builder, i.e.
Expand All @@ -171,15 +180,13 @@ interface Builder {
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);

/**
* Add handlers for processing requests sent by the server.
* <p>This is a shortcut for registering client handlers (i.e. annotated controllers)
* to a {@link RSocketMessageHandler} and configuring it as an acceptor.
* You can take full control by manually registering an acceptor on the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory} using
* {@link #rsocketFactory(Consumer)} instead.
* @param handlers the client handlers to configure on the requester
* Configure a client responder for processing requests sent by the server.
* <p>This is a shortcut for registering an acceptor on the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory}
* using {@link #rsocketFactory(Consumer)} instead.
* @param responder the responder to use for processing requests sent by the server
*/
RSocketRequester.Builder annotatedHandlers(Object... handlers);
RSocketRequester.Builder responder(ClientResponder responder);

/**
* Connect to the RSocket server over TCP.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.messaging.rsocket.annotation.support;

import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;

import org.springframework.messaging.rsocket.ClientResponder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.RouteMatcher;

/**
* Default implementation of {@link ClientResponder} for annotated handlers.
* @author Brian Clozel
*/
class DefaultClientResponder implements ClientResponder {

private final List<Object> handlers;

DefaultClientResponder(Object... handlers) {
Assert.notEmpty(handlers, "handlers should not be empty");
this.handlers = Arrays.asList(handlers);
}

@Override
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> toSocketAcceptor(RouteMatcher routeMatcher, RSocketStrategies strategies) {
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers);
messageHandler.setRSocketStrategies(strategies);
messageHandler.setRouteMatcher(routeMatcher);
messageHandler.afterPropertiesSet();
return messageHandler.clientAcceptor();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.ClientResponder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
Expand All @@ -52,6 +53,7 @@
* side adapters.
*
* @author Rossen Stoyanchev
* @author Brian Clozel
* @since 5.2
*/
public class RSocketMessageHandler extends MessageMappingMessageHandler {
Expand Down Expand Up @@ -232,6 +234,14 @@ public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientAcceptor() {
return this::createRSocket;
}

/**
* Configure a {@link ClientResponder} for handling requests sent by the server.
* @param handlers the annotated handlers to configure
*/
public static ClientResponder clientResponder(Object... handlers) {
return new DefaultClientResponder(handlers);
}

private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
String s = setupPayload.dataMimeType();
MimeType dataMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultDataMimeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static void connectAndVerify(String destination) {
RSocketRequester requester = null;
try {
requester = RSocketRequester.builder()
.annotatedHandlers(new ClientHandler())
.responder(RSocketMessageHandler.clientResponder(new ClientHandler()))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", server.address().getPort())
.block();
Expand Down

0 comments on commit f6b879d

Please sign in to comment.