Skip to content

Commit

Permalink
Merge pull request #85 from uber/protocol-v2
Browse files Browse the repository at this point in the history
Protocol v2, init handshake, init headers, etc.
  • Loading branch information
ShanniLi committed Oct 18, 2015
2 parents c435190 + be34111 commit c9e5b33
Show file tree
Hide file tree
Showing 19 changed files with 791 additions and 141 deletions.
47 changes: 34 additions & 13 deletions tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.uber.tchannel.api.errors.TChannelError;
import com.uber.tchannel.api.errors.TChannelConnectionTimeout;
import com.uber.tchannel.api.handlers.RequestHandler;
import com.uber.tchannel.channels.ChannelManager;
import com.uber.tchannel.channels.PeerManager;
import com.uber.tchannel.channels.ChannelRegistrar;
import com.uber.tchannel.codecs.MessageCodec;
import com.uber.tchannel.codecs.TChannelLengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -73,14 +75,16 @@ public final class TChannel {
private final String service;
private final ServerBootstrap serverBootstrap;
private final Bootstrap clientBootstrap;
private final ChannelManager channelManager;
private final PeerManager peerManager;
private final EventLoopGroup bossGroup;
private final EventLoopGroup childGroup;
private final InetAddress host;
private final int port;
private String listeningHost = "0.0.0.0";
private int listeningPort;
private ExecutorService exectorService;
private final int maxQueuedRequests;
private final int initTimeout;

private final Serializer serializer = new Serializer(new HashMap<ArgScheme, Serializer.SerializerInterface>() {
{
Expand All @@ -94,12 +98,13 @@ private TChannel(Builder builder) {
this.exectorService = builder.executorService;
this.serverBootstrap = builder.serverBootstrap();
this.clientBootstrap = builder.bootstrap();
this.channelManager = builder.channelManager;
this.peerManager = builder.peerManager;
this.bossGroup = builder.bossGroup;
this.childGroup = builder.childGroup;
this.host = builder.host;
this.port = builder.port;
this.maxQueuedRequests = builder.maxQueuedRequests;
this.initTimeout = builder.initTimeout;
}

private <T, U> ListenableFuture<Response<T>> callWithEncoding(
Expand All @@ -108,7 +113,7 @@ private <T, U> ListenableFuture<Response<T>> callWithEncoding(
Request<U> request,
final Class<T> responseType,
ArgScheme scheme
) throws InterruptedException {
) throws InterruptedException, TChannelError {

RawRequest rawRequest = new RawRequest(
request.getTTL(),
Expand Down Expand Up @@ -156,15 +161,21 @@ public String getServiceName() {
return this.service;
}

public PeerManager getPeerManager() {
return this.peerManager;
}

public ChannelFuture listen() throws InterruptedException {
ChannelFuture f = this.serverBootstrap.bind(this.host, this.port).sync();
InetSocketAddress localAddress = (InetSocketAddress) f.channel().localAddress();
this.listeningPort = localAddress.getPort();
this.listeningHost = localAddress.getHostName();
this.peerManager.setHostPort(String.format("%s:%d", this.listeningHost, this.listeningPort));
return f;
}

public void shutdown() throws InterruptedException {
this.channelManager.close();
this.peerManager.close();
this.bossGroup.shutdownGracefully();
this.childGroup.shutdownGracefully();
}
Expand All @@ -174,7 +185,7 @@ public <T, U> ListenableFuture<Response<T>> callThrift(
int port,
Request<U> request,
final Class<T> responseType
) throws InterruptedException {
) throws InterruptedException, TChannelError {
return callWithEncoding(host, port, request, responseType, ArgScheme.THRIFT);
}

Expand All @@ -183,15 +194,15 @@ public <T, U> ListenableFuture<Response<T>> callJSON(
int port,
Request<U> request,
final Class<T> responseType
) throws InterruptedException {
) throws InterruptedException, TChannelError {
return callWithEncoding(host, port, request, responseType, ArgScheme.JSON);
}

public ListenableFuture<RawResponse> call(
InetAddress host,
int port,
RawRequest request
) throws InterruptedException {
) throws InterruptedException, TChannelError {

// Set the ArgScheme as RAW if its not set
Map<String, String> transportHeaders = request.getTransportHeaders();
Expand All @@ -200,7 +211,11 @@ public ListenableFuture<RawResponse> call(
}

// Get an outbound channel
Channel ch = this.channelManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap);
Channel ch = this.peerManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap).channel();

if (!this.peerManager.waitForIdentified(ch, this.initTimeout)) {
throw new TChannelConnectionTimeout();
}

// Get a response router for our outbound channel
ResponseRouter responseRouter = ch.pipeline().get(ResponseRouter.class);
Expand All @@ -212,7 +227,7 @@ public ListenableFuture<RawResponse> call(
public static class Builder {

private final String service;
private final ChannelManager channelManager = new ChannelManager();
private final PeerManager peerManager = new PeerManager();
private ExecutorService executorService = new ForkJoinPool();
private int maxQueuedRequests = Runtime.getRuntime().availableProcessors() * 5;
private InetAddress host;
Expand All @@ -221,6 +236,7 @@ public static class Builder {
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup childGroup = new NioEventLoopGroup();
private LogLevel logLevel = LogLevel.INFO;
private int initTimeout = 2000;

public Builder(String service) throws UnknownHostException {
if (service == null) {
Expand Down Expand Up @@ -270,6 +286,11 @@ public Builder setLogLevel(LogLevel logLevel) {
return this;
}

public Builder setInitTimeout(int initTimeout) {
this.initTimeout = initTimeout;
return this;
}

public TChannel build() {
return new TChannel(this);
}
Expand Down Expand Up @@ -307,9 +328,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("MessageCodec", new MessageCodec());

if (isServer) {
ch.pipeline().addLast("InitRequestHandler", new InitRequestHandler());
ch.pipeline().addLast("InitRequestHandler", new InitRequestHandler(peerManager));
} else {
ch.pipeline().addLast("InitRequestInitiator", new InitRequestInitiator());
ch.pipeline().addLast("InitRequestInitiator", new InitRequestInitiator(peerManager));
}

// Handle PingRequest
Expand All @@ -326,7 +347,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ResponseRouter", new ResponseRouter());

// Register Channels as they are created.
ch.pipeline().addLast("ChannelRegistrar", new ChannelRegistrar(channelManager));
ch.pipeline().addLast("ChannelRegistrar", new ChannelRegistrar(peerManager));

}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2015 Uber Technologies, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.uber.tchannel.api.errors;

public class TChannelConnectionTimeout extends TChannelError {
public TChannelConnectionTimeout() {
super("Connection timeout on identification", TChannelError.ERROR_INIT_TIMEOUT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2015 Uber Technologies, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.uber.tchannel.api.errors;

public class TChannelError extends Exception {
public static final String ERROR_INIT_TIMEOUT = "tchannel.connection.timeout";

public final String type;
public final Exception subError;

public TChannelError(String message, String type, Exception subError) {
super(message);
this.type = type;
this.subError = subError;
}

public TChannelError(String message, String type) {
super(message);
this.type = type;
this.subError = null;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,26 @@
import io.netty.channel.ChannelHandlerContext;

/**
* Simple ChannelHandlerAdapter that is responsible solely for registering new Channels with the ChannelManager
* Simple ChannelHandlerAdapter that is responsible solely for registering new Channels with the PeerManager
* and de-registering Channels when the go inactive.
*/
public class ChannelRegistrar extends ChannelHandlerAdapter {

private final ChannelManager channelManager;
private final PeerManager peerManager;

public ChannelRegistrar(ChannelManager channelManager) {
this.channelManager = channelManager;
public ChannelRegistrar(PeerManager peerManager) {
this.peerManager = peerManager;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.channelManager.add(ctx.channel());
this.peerManager.add(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
this.channelManager.remove(ctx.channel());
this.peerManager.remove(ctx.channel());
}
}
Loading

0 comments on commit c9e5b33

Please sign in to comment.