From 8136a536f145b4757339b2bc3385cab45cf19532 Mon Sep 17 00:00:00 2001 From: Will Salisbury Date: Mon, 21 Sep 2015 22:47:03 -0700 Subject: [PATCH 1/5] Hyperbahn work in progress. Implement very basic HyperbahnClient. Fix bug in MessageFragmenter where we weren't writing out the TTL from the Outbound Request. Add a basic hyperbahn example to tchannel-example. Add back loading routers from disk Fix null header bug! Cleanup timeouts Fix naming of request handlers in tchannel-example *somebody* was overly aggressive with their class refactoring :) Rebase on master. Tests may fail due to missing hosts file --- .../uber/tchannel/schemes/JSONSerializer.java | 4 +- tchannel-example/pom.xml | 5 + .../tchannel/hyperbahn/HyperbahnExample.java | 50 +++++++ ...stHandler.java => JsonRequestHandler.java} | 4 +- .../com/uber/tchannel/json/JsonServer.java | 2 +- ...stHandler.java => PingRequestHandler.java} | 15 ++- .../com/uber/tchannel/ping/PingServer.java | 2 +- ...fault.java => GetValueRequestHandler.java} | 13 +- .../uber/tchannel/thrift/KeyValueServer.java | 4 +- ...fault.java => SetValueRequestHandler.java} | 9 +- .../hyperbahn/api/HyperbahnClient.java | 127 ++++++++++++++++++ .../hyperbahn/messages/AdvertiseRequest.java | 19 +++ .../hyperbahn/messages/AdvertiseResponse.java | 10 ++ .../hyperbahn/api/HyperbahnClientTest.java | 36 +++++ 14 files changed, 279 insertions(+), 21 deletions(-) create mode 100644 tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java rename tchannel-example/src/main/java/com/uber/tchannel/json/{JsonDefaultRequestHandler.java => JsonRequestHandler.java} (93%) rename tchannel-example/src/main/java/com/uber/tchannel/ping/{PingDefaultRequestHandler.java => PingRequestHandler.java} (79%) rename tchannel-example/src/main/java/com/uber/tchannel/thrift/{GetValueHandlerDefault.java => GetValueRequestHandler.java} (88%) rename tchannel-example/src/main/java/com/uber/tchannel/thrift/{SetValueHandlerDefault.java => SetValueRequestHandler.java} (93%) create mode 100644 tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java create mode 100644 tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java diff --git a/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java b/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java index a94e875..f448f24 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java @@ -50,8 +50,8 @@ public String decodeEndpoint(ByteBuf arg1) { public Map decodeHeaders(ByteBuf arg2) { String headerJSON = arg2.toString(CharsetUtil.UTF_8); arg2.release(); - Map headers = GSON.fromJson(headerJSON, HEADER_TYPE); - return (headers != null) ? headers : new HashMap(); + Map headers = new Gson().fromJson(headerJSON, HEADER_TYPE); + return (headers == null) ? new HashMap() : headers; } @Override diff --git a/tchannel-example/pom.xml b/tchannel-example/pom.xml index 8110e5f..de997ed 100644 --- a/tchannel-example/pom.xml +++ b/tchannel-example/pom.xml @@ -41,6 +41,11 @@ tchannel-core 0.1.4-SNAPSHOT + + com.uber.tchannel + tchannel-hyperbahn + 0.1.4-SNAPSHOT + diff --git a/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java b/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java new file mode 100644 index 0000000..c7218f1 --- /dev/null +++ b/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2015 Uber Technologies, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.uber.tchannel.hyperbahn; + +import java.util.concurrent.TimeUnit; + +import com.uber.tchannel.api.Response; +import com.uber.tchannel.api.TChannel; +import com.uber.tchannel.hyperbahn.api.HyperbahnClient; +import com.uber.tchannel.hyperbahn.messages.AdvertiseResponse; +import com.uber.tchannel.ping.PingRequestHandler; + +public class HyperbahnExample { + public static void main(String[] args) throws Exception { + TChannel tchannel = new TChannel.Builder("hyperbahn-example") + .register("ping", new PingRequestHandler()) + .build(); + + tchannel.listen(); + + HyperbahnClient hyperbahn = new HyperbahnClient(tchannel); + + Response response = hyperbahn.advertise(tchannel.getServiceName(), 0); + + System.err.println(response); + + Thread.sleep(TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES)); + + hyperbahn.shutdown(); + } +} diff --git a/tchannel-example/src/main/java/com/uber/tchannel/json/JsonDefaultRequestHandler.java b/tchannel-example/src/main/java/com/uber/tchannel/json/JsonRequestHandler.java similarity index 93% rename from tchannel-example/src/main/java/com/uber/tchannel/json/JsonDefaultRequestHandler.java rename to tchannel-example/src/main/java/com/uber/tchannel/json/JsonRequestHandler.java index 4845806..c98ab90 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/json/JsonDefaultRequestHandler.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/json/JsonRequestHandler.java @@ -27,11 +27,13 @@ import com.uber.tchannel.api.ResponseCode; import com.uber.tchannel.api.handlers.JSONRequestHandler; -public class JsonDefaultRequestHandler extends JSONRequestHandler { +public class JsonRequestHandler extends JSONRequestHandler { + @Override public Response handleImpl(Request request) { System.out.println(request); return new Response.Builder<>(new ResponsePojo(true, "hi!"), request.getEndpoint(), ResponseCode.OK).build(); } + } diff --git a/tchannel-example/src/main/java/com/uber/tchannel/json/JsonServer.java b/tchannel-example/src/main/java/com/uber/tchannel/json/JsonServer.java index 46fb623..27661a9 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/json/JsonServer.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/json/JsonServer.java @@ -28,7 +28,7 @@ public class JsonServer { public static void main(String[] args) throws Exception { final TChannel tchannel = new TChannel.Builder("json-server") - .register("json-endpoint", new JsonDefaultRequestHandler()) + .register("json-endpoint", new JsonRequestHandler()) .setServerPort(8888) .build(); diff --git a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingDefaultRequestHandler.java b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java similarity index 79% rename from tchannel-example/src/main/java/com/uber/tchannel/ping/PingDefaultRequestHandler.java rename to tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java index e9ce2c6..928d543 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingDefaultRequestHandler.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java @@ -27,12 +27,19 @@ import com.uber.tchannel.api.ResponseCode; import com.uber.tchannel.api.handlers.JSONRequestHandler; -public class PingDefaultRequestHandler extends JSONRequestHandler { +public class PingRequestHandler extends JSONRequestHandler { + @Override public Response handleImpl(Request request) { - return new Response.Builder<>(new Pong("pong!"), request.getEndpoint(), ResponseCode.OK) - .setHeaders(request.getHeaders()) - .build(); + + return new Response.Builder<>( + new Pong("pong!"), + request.getEndpoint(), + ResponseCode.OK + ) + .setHeaders(request.getHeaders()) + .setTransportHeaders(request.getTransportHeaders()) + .build(); } } diff --git a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingServer.java b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingServer.java index 2d449d5..c66a74d 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingServer.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingServer.java @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception { public void run() throws Exception { TChannel tchannel = new TChannel.Builder("ping-server") - .register("ping", new PingDefaultRequestHandler()) + .register("ping", new PingRequestHandler()) .setServerHost(InetAddress.getLoopbackAddress()) .setServerPort(this.port) .build(); diff --git a/tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueHandlerDefault.java b/tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueRequestHandler.java similarity index 88% rename from tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueHandlerDefault.java rename to tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueRequestHandler.java index b5d4e62..cbf6329 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueHandlerDefault.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/thrift/GetValueRequestHandler.java @@ -22,6 +22,8 @@ package com.uber.tchannel.thrift; +import java.util.Map; + import com.uber.tchannel.api.Request; import com.uber.tchannel.api.Response; import com.uber.tchannel.api.ResponseCode; @@ -29,16 +31,15 @@ import com.uber.tchannel.thrift.generated.KeyValue; import com.uber.tchannel.thrift.generated.NotFoundError; -import java.util.Map; - -public class GetValueHandlerDefault extends ThriftRequestHandler { +public class GetValueRequestHandler extends ThriftRequestHandler { protected final Map keyValueStore; - public GetValueHandlerDefault(Map keyValueStore) { + public GetValueRequestHandler(Map keyValueStore) { this.keyValueStore = keyValueStore; } + @Override public Response handleImpl(Request request) { String key = request.getBody().getKey(); @@ -50,7 +51,7 @@ public Response handleImpl(Request( - new KeyValue.getValue_result(value, err), request.getEndpoint(), ResponseCode.OK) - .build(); + new KeyValue.getValue_result(value, err), request.getEndpoint(), ResponseCode.OK) + .build(); } } diff --git a/tchannel-example/src/main/java/com/uber/tchannel/thrift/KeyValueServer.java b/tchannel-example/src/main/java/com/uber/tchannel/thrift/KeyValueServer.java index 4feaf04..4a5af10 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/thrift/KeyValueServer.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/thrift/KeyValueServer.java @@ -38,8 +38,8 @@ public static void main(String[] args) throws InterruptedException, UnknownHostE TChannel tchannel = new TChannel.Builder("keyvalue-service") .setServerPort(8888) .setLogLevel(LogLevel.INFO) - .register("KeyValue::getValue", new GetValueHandlerDefault(keyValueStore)) - .register("KeyValue::setValue", new SetValueHandlerDefault(keyValueStore)) + .register("KeyValue::getValue", new GetValueRequestHandler(keyValueStore)) + .register("KeyValue::setValue", new SetValueRequestHandler(keyValueStore)) .build(); tchannel.listen().channel().closeFuture().sync(); diff --git a/tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueHandlerDefault.java b/tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueRequestHandler.java similarity index 93% rename from tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueHandlerDefault.java rename to tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueRequestHandler.java index a750cf9..18d22b2 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueHandlerDefault.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/thrift/SetValueRequestHandler.java @@ -22,22 +22,23 @@ package com.uber.tchannel.thrift; +import java.util.Map; + import com.uber.tchannel.api.Request; import com.uber.tchannel.api.Response; import com.uber.tchannel.api.ResponseCode; import com.uber.tchannel.api.handlers.ThriftRequestHandler; import com.uber.tchannel.thrift.generated.KeyValue; -import java.util.Map; - -public class SetValueHandlerDefault extends ThriftRequestHandler { +public class SetValueRequestHandler extends ThriftRequestHandler { private final Map keyValueStore; - public SetValueHandlerDefault(Map keyValueStore) { + public SetValueRequestHandler(Map keyValueStore) { this.keyValueStore = keyValueStore; } + @Override public Response handleImpl(Request request) { String key = request.getBody().getKey(); diff --git a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java new file mode 100644 index 0000000..edaf200 --- /dev/null +++ b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2015 Uber Technologies, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.uber.tchannel.hyperbahn.api; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.gson.Gson; +import com.uber.tchannel.api.Request; +import com.uber.tchannel.api.Response; +import com.uber.tchannel.api.TChannel; +import com.uber.tchannel.hyperbahn.messages.AdvertiseRequest; +import com.uber.tchannel.hyperbahn.messages.AdvertiseResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HyperbahnClient { + + private static final String HOSTS_FILE_PATH = "/etc/uber/hyperbahn/hosts.json"; + private static final String HYPERBAHN_SERVICE_NAME = "hyperbahn"; + private static final String HYPERBAHN_ADVERTISE_ENDPOINT = "ad"; + + private final TChannel tchannel; + private final Logger logger = LoggerFactory.getLogger(HyperbahnClient.class); + private List routers; + + public HyperbahnClient(TChannel tchannel) throws IOException { + this(tchannel, HOSTS_FILE_PATH); + } + + public HyperbahnClient(TChannel tchannel, String hostsFilePath) throws IOException { + this.tchannel = tchannel; + this.routers = HyperbahnClient.loadRouters(hostsFilePath); + } + + private static List loadRouters(String hostsFilePath) throws IOException { + + List hostPorts; + try (Reader reader = new FileReader(hostsFilePath)) { + hostPorts = new Gson().fromJson(reader, List.class); + } + + List routers = new LinkedList<>(); + for (String hostPort : hostPorts) { + String[] hostPortPair = hostPort.split(Pattern.quote(":")); + String host = hostPortPair[0]; + int port = Integer.parseInt(hostPortPair[1]); + InetSocketAddress router = new InetSocketAddress(host, port); + routers.add(router); + } + + return routers; + + } + + public Response advertise( + String service, + int cost + ) throws InterruptedException, TimeoutException, ExecutionException { + + final AdvertiseRequest advertiseRequest = new AdvertiseRequest(); + advertiseRequest.addService(service, cost); + + final Request request = new Request.Builder<>( + advertiseRequest, + HYPERBAHN_SERVICE_NAME, + HYPERBAHN_ADVERTISE_ENDPOINT + ) + .setTTL(1, TimeUnit.SECONDS) + .build(); + + final InetSocketAddress router = this.routers.get(new Random().nextInt(this.routers.size())); + + ListenableFuture> responseFuture = this.tchannel.callJSON( + router.getAddress(), + router.getPort(), + request, + AdvertiseResponse.class + ); + + return responseFuture.get(2, TimeUnit.SECONDS); + + } + + public void stopAdvertising() { + + } + + public void shutdown() throws InterruptedException { + this.logger.info("Shutting down HyperbahnClient and TChannel."); + this.stopAdvertising(); + this.tchannel.shutdown(); + this.routers.clear(); + this.logger.info("HyperbahnClient shutdown complete."); + } + +} diff --git a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseRequest.java b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseRequest.java index 78bcfb8..f47fe61 100644 --- a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseRequest.java +++ b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseRequest.java @@ -41,6 +41,15 @@ public void addService(String serviceName, int cost) { services.add(new Service(serviceName, cost)); } + @Override + public String toString() { + return String.format( + "<%s services=%s>", + this.getClass().getSimpleName(), + this.services + ); + } + // Inner class representing a service object in a hyperbahn message. public class Service { private final String serviceName; @@ -50,6 +59,16 @@ public Service(String serviceName, int cost) { this.serviceName = serviceName; this.cost = cost; } + + @Override + public String toString() { + return String.format( + "<%s serviceName=%s cost=%d>", + this.getClass().getSimpleName(), + this.serviceName, + this.cost + ); + } } } diff --git a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseResponse.java b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseResponse.java index 09e5461..4a9b1dc 100644 --- a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseResponse.java +++ b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/messages/AdvertiseResponse.java @@ -28,4 +28,14 @@ public class AdvertiseResponse { public AdvertiseResponse(int connectionCount) { this.connectionCount = connectionCount; } + + @Override + public String toString() { + return String.format( + "<%s connectionCount=%d>", + this.getClass().getSimpleName(), + this.connectionCount + ); + } + } diff --git a/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java b/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java new file mode 100644 index 0000000..36ffeb4 --- /dev/null +++ b/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2015 Uber Technologies, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.uber.tchannel.hyperbahn.api; + +import com.uber.tchannel.api.TChannel; + +public class HyperbahnClientTest { + + @org.junit.Test + public void testAdvertise() throws Exception { + TChannel tchannel = new TChannel.Builder("hyperbahn-service").build(); + + HyperbahnClient hyperbahnClient = new HyperbahnClient(tchannel); + + } +} From 06b872f04dedce4c7d08c703ffe96296877c43af Mon Sep 17 00:00:00 2001 From: Juncao Li Date: Mon, 19 Oct 2015 14:08:32 -0700 Subject: [PATCH 2/5] should response BadRequest when request arg2 or arg3 are malformed --- .../com/uber/tchannel/handlers/RequestRouter.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java b/tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java index 830cb98..a000791 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java @@ -30,8 +30,10 @@ import com.uber.tchannel.api.handlers.RequestHandler; import com.uber.tchannel.errors.BadRequestError; import com.uber.tchannel.errors.BusyError; +import com.uber.tchannel.errors.ErrorType; import com.uber.tchannel.headers.ArgScheme; import com.uber.tchannel.headers.TransportHeaders; +import com.uber.tchannel.messages.ErrorMessage; import com.uber.tchannel.schemes.JSONSerializer; import com.uber.tchannel.schemes.RawRequest; import com.uber.tchannel.schemes.RawResponse; @@ -127,7 +129,14 @@ public void onSuccess(RawResponse response) { @Override public void onFailure(Throwable throwable) { queuedRequests.decrementAndGet(); - // TODO handle the failure case + + // TODO better interface for sending errors + ErrorMessage error = new ErrorMessage( + rawRequest.getId(), + ErrorType.BadRequest, + new Trace(0, 0, 0, (byte) 0x00), + throwable.getMessage()); + ctx.writeAndFlush(error); } }); } From 36ee4378fa7ce2af9cfcd2d0a38fe6acac7273eb Mon Sep 17 00:00:00 2001 From: Juncao Li Date: Mon, 19 Oct 2015 14:10:21 -0700 Subject: [PATCH 3/5] compatible with other languages when application header is empty --- .../main/java/com/uber/tchannel/schemes/JSONSerializer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java b/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java index f448f24..99310f3 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/schemes/JSONSerializer.java @@ -50,6 +50,10 @@ public String decodeEndpoint(ByteBuf arg1) { public Map decodeHeaders(ByteBuf arg2) { String headerJSON = arg2.toString(CharsetUtil.UTF_8); arg2.release(); + if (headerJSON == null || headerJSON.isEmpty() || headerJSON.equals("\"\"")) { + headerJSON = "{}"; + } + Map headers = new Gson().fromJson(headerJSON, HEADER_TYPE); return (headers == null) ? new HashMap() : headers; } From a43eda1435d90dd0b4f6d23ed6271d75a898c0cc Mon Sep 17 00:00:00 2001 From: Juncao Li Date: Mon, 19 Oct 2015 14:11:42 -0700 Subject: [PATCH 4/5] asyc connect & error handling for connection failures --- .../java/com/uber/tchannel/api/TChannel.java | 14 +++++---- .../api/errors/TChannelConnectionFailure.java | 29 +++++++++++++++++++ .../tchannel/api/errors/TChannelError.java | 7 +++-- .../api/errors/TChannelInterrupted.java | 29 +++++++++++++++++++ .../uber/tchannel/channels/Connection.java | 17 +++++++++-- .../java/com/uber/tchannel/channels/Peer.java | 21 ++++++++++++-- .../uber/tchannel/channels/PeerManager.java | 3 +- .../uber/tchannel/api/PeerManagerTest.java | 2 +- 8 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelConnectionFailure.java create mode 100644 tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelInterrupted.java diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java b/tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java index e3fef90..91ebfb8 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java @@ -27,6 +27,7 @@ import com.uber.tchannel.api.errors.TChannelError; import com.uber.tchannel.api.errors.TChannelConnectionTimeout; import com.uber.tchannel.api.handlers.RequestHandler; +import com.uber.tchannel.channels.Connection; import com.uber.tchannel.channels.PeerManager; import com.uber.tchannel.channels.ChannelRegistrar; import com.uber.tchannel.codecs.MessageCodec; @@ -48,7 +49,6 @@ import com.uber.tchannel.schemes.ThriftSerializer; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -211,14 +211,18 @@ public ListenableFuture call( } // Get an outbound channel - Channel ch = this.peerManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap).channel(); + Connection conn = this.peerManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap); - if (!this.peerManager.waitForIdentified(ch, this.initTimeout)) { - throw new TChannelConnectionTimeout(); + if (!conn.waitForIdentified(this.initTimeout)) { + if (conn.lastError() != null) { + throw conn.lastError(); + } else { + throw new TChannelConnectionTimeout(); + } } // Get a response router for our outbound channel - ResponseRouter responseRouter = ch.pipeline().get(ResponseRouter.class); + ResponseRouter responseRouter = conn.channel().pipeline().get(ResponseRouter.class); // Ask the router to make a call on our behalf, and return its promise return responseRouter.expectResponse(request); diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelConnectionFailure.java b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelConnectionFailure.java new file mode 100644 index 0000000..2331103 --- /dev/null +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelConnectionFailure.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Uber Technologies, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.uber.tchannel.api.errors; + +public class TChannelConnectionFailure extends TChannelError { + public TChannelConnectionFailure(Throwable ex) { + super("Failed to connect to the host", TChannelError.ERROR_CONNECTION_FAILURE, ex); + } +} diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelError.java b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelError.java index e6a7af7..a24d821 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelError.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelError.java @@ -23,12 +23,15 @@ package com.uber.tchannel.api.errors; public class TChannelError extends Exception { + public static final String ERROR_INTERRUPTED = "tchannel.interrupted"; + public static final String ERROR_INIT_TIMEOUT = "tchannel.connection.timeout"; + public static final String ERROR_CONNECTION_FAILURE = "tchannel.socket"; public final String type; - public final Exception subError; + public final Throwable subError; - public TChannelError(String message, String type, Exception subError) { + public TChannelError(String message, String type, Throwable subError) { super(message); this.type = type; this.subError = subError; diff --git a/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelInterrupted.java b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelInterrupted.java new file mode 100644 index 0000000..5a75f5f --- /dev/null +++ b/tchannel-core/src/main/java/com/uber/tchannel/api/errors/TChannelInterrupted.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Uber Technologies, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.uber.tchannel.api.errors; + +public class TChannelInterrupted extends TChannelError { + public TChannelInterrupted(Throwable ex) { + super("Interrupted Error", TChannelError.ERROR_INTERRUPTED, ex); + } +} diff --git a/tchannel-core/src/main/java/com/uber/tchannel/channels/Connection.java b/tchannel-core/src/main/java/com/uber/tchannel/channels/Connection.java index 23153f2..3ec2b9f 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/channels/Connection.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/channels/Connection.java @@ -21,6 +21,7 @@ */ package com.uber.tchannel.channels; +import com.uber.tchannel.api.errors.TChannelError; import com.uber.tchannel.messages.InitMessage; import io.netty.channel.Channel; @@ -35,6 +36,7 @@ public class Connection { private final Channel channel; private String remoteAddress = null; + private TChannelError lastError = null; public Connection(Channel channel, Direction direction) { this.channel = channel; @@ -47,6 +49,9 @@ public Connection(Channel channel, Direction direction) { public Channel channel() { return this.channel; } + public TChannelError lastError() { + return this.lastError; + } public synchronized boolean satisfy(ConnectionState preferedState) { ConnectionState connState = this.state; @@ -65,7 +70,8 @@ public synchronized boolean satisfy(ConnectionState preferedState) { public synchronized void setState(ConnectionState state) { this.state = state; - if (state == ConnectionState.IDENTIFIED) { + if (state == ConnectionState.IDENTIFIED || ( + state == ConnectionState.UNCONNECTED && this.lastError != null)) { this.notifyAll(); } } @@ -81,6 +87,12 @@ public synchronized void setIndentified(Map headers) { this.setState(ConnectionState.IDENTIFIED); } + public synchronized void setIndentified(TChannelError error) { + this.remoteAddress = null; + this.lastError = error; + this.setState(ConnectionState.UNCONNECTED); + } + public synchronized boolean isEphemeral() { return this.remoteAddress.equals("0.0.0.0:0"); } @@ -99,6 +111,7 @@ public synchronized boolean waitForIdentified(long timeout) { // TODO reap connections/peers on init timeout try { if (this.state != ConnectionState.IDENTIFIED) { + this.lastError = null; this.wait(timeout); } } catch (InterruptedException ex) { @@ -109,7 +122,7 @@ public synchronized boolean waitForIdentified(long timeout) { } public synchronized void close() throws InterruptedException { - channel.close().sync(); + channel.close(); this.state = ConnectionState.DESTROYED; } diff --git a/tchannel-core/src/main/java/com/uber/tchannel/channels/Peer.java b/tchannel-core/src/main/java/com/uber/tchannel/channels/Peer.java index 984f349..466298f 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/channels/Peer.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/channels/Peer.java @@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.HashMap; +import com.uber.tchannel.api.errors.TChannelConnectionFailure; +import com.uber.tchannel.api.errors.TChannelError; import com.uber.tchannel.messages.InitMessage; import com.uber.tchannel.messages.InitRequest; import io.netty.bootstrap.Bootstrap; @@ -99,14 +101,27 @@ public Connection remove(Channel channel) { return conn; } - public Connection connect(Bootstrap bootstrap) throws InterruptedException { + public Connection connect(Bootstrap bootstrap) throws TChannelError { Connection conn = getConnection(ConnectionState.IDENTIFIED); if (conn != null) { return conn; } - Channel channel = bootstrap.connect(remoteAddress).sync().channel(); - return add(channel, Connection.Direction.OUT); + final ChannelFuture f = bootstrap.connect(remoteAddress); + Channel channel = f.channel(); + final Connection connection = add(channel, Connection.Direction.OUT); + + // handle connection errors + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + connection.setIndentified(new TChannelConnectionFailure(future.cause())); + } + } + }); + + return connection; } public Connection getConnection(ConnectionState preferedState) { diff --git a/tchannel-core/src/main/java/com/uber/tchannel/channels/PeerManager.java b/tchannel-core/src/main/java/com/uber/tchannel/channels/PeerManager.java index 92a2d65..ee19241 100644 --- a/tchannel-core/src/main/java/com/uber/tchannel/channels/PeerManager.java +++ b/tchannel-core/src/main/java/com/uber/tchannel/channels/PeerManager.java @@ -22,6 +22,7 @@ package com.uber.tchannel.channels; +import com.uber.tchannel.api.errors.TChannelError; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -38,7 +39,7 @@ public class PeerManager { private final ConcurrentHashMap peers = new ConcurrentHashMap<>(); private String hostPort = "0.0.0.0:0"; - public Connection findOrNew(SocketAddress address, Bootstrap bootstrap) throws InterruptedException { + public Connection findOrNew(SocketAddress address, Bootstrap bootstrap) throws TChannelError { Peer peer = peers.get(address); if (peer == null) { peer = new Peer(this, address); diff --git a/tchannel-core/src/test/java/com/uber/tchannel/api/PeerManagerTest.java b/tchannel-core/src/test/java/com/uber/tchannel/api/PeerManagerTest.java index dff16a2..3303cbe 100644 --- a/tchannel-core/src/test/java/com/uber/tchannel/api/PeerManagerTest.java +++ b/tchannel-core/src/test/java/com/uber/tchannel/api/PeerManagerTest.java @@ -60,7 +60,7 @@ public void testPeerAndConnections() throws Exception { int port = server.getListeningPort(); // create client - final TChannel client = new TChannel.Builder("json-server") + final TChannel client = new TChannel.Builder("server") .setServerHost(host) .build(); client.listen(); From 5e883316cfc057b458352902c78a3c112cb34be9 Mon Sep 17 00:00:00 2001 From: Juncao Li Date: Mon, 19 Oct 2015 14:12:29 -0700 Subject: [PATCH 5/5] Hyperbahn client: code & test & example fix the review comments --- tchannel-example/README.md | 11 + .../tchannel/hyperbahn/HyperbahnExample.java | 61 +++++- .../tchannel/ping/PingRequestHandler.java | 6 +- .../hyperbahn/api/HyperbahnClient.java | 200 +++++++++++++----- .../hyperbahn/api/HyperbahnClientTest.java | 61 +++++- 5 files changed, 275 insertions(+), 64 deletions(-) diff --git a/tchannel-example/README.md b/tchannel-example/README.md index 83aba5c..b97368d 100644 --- a/tchannel-example/README.md +++ b/tchannel-example/README.md @@ -25,4 +25,15 @@ java -cp tchannel-example/target/tchannel-example.jar com.uber.tchannel.ping.Pin #Stopping Client... ``` +#### HyperbahnExample +```bash +mvn package +Run Hyperbahn: node server.js --port 21300 2>&1 | jq . +java -cp tchannel-example/target/tchannel-example.jar com.uber.tchannel.hyperbahn.HyperbahnExample +tcurl -p 127.0.0.1:21300 javaServer ping -j -2 "{}" -3 '{"request":"hello"}' | jq . +``` + + + + ## MIT Licenced diff --git a/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java b/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java index c7218f1..4c8812d 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/hyperbahn/HyperbahnExample.java @@ -21,30 +21,77 @@ */ package com.uber.tchannel.hyperbahn; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ListenableFuture; import com.uber.tchannel.api.Response; import com.uber.tchannel.api.TChannel; +import com.uber.tchannel.api.errors.TChannelError; import com.uber.tchannel.hyperbahn.api.HyperbahnClient; import com.uber.tchannel.hyperbahn.messages.AdvertiseResponse; import com.uber.tchannel.ping.PingRequestHandler; +// Instructions: +// 1. run Hyperbahn: node server.js --port 21300 2>&1 | jq . +// 2. run HyperbahnExample.java +// 3. tcurl -p 127.0.0.1:21300 javaServer ping -j -2 "{}" -3 '{"request":"hello"}' | jq . + public class HyperbahnExample { public static void main(String[] args) throws Exception { - TChannel tchannel = new TChannel.Builder("hyperbahn-example") - .register("ping", new PingRequestHandler()) - .build(); + TChannel tchannel = new TChannel.Builder("javaServer") + .register("ping", new PingRequestHandler()) + .setServerHost(InetAddress.getByName("127.0.0.1")) + .setServerPort(8888) + .build(); tchannel.listen(); - HyperbahnClient hyperbahn = new HyperbahnClient(tchannel); + List routers = new ArrayList() { + { + add(new InetSocketAddress("127.0.0.1", 21300)); + } + }; + + HyperbahnClient hyperbahn = new HyperbahnClient.Builder(tchannel.getServiceName(), tchannel) + .setRouters(routers) + .build(); + + final ListenableFuture> responseFuture; - Response response = hyperbahn.advertise(tchannel.getServiceName(), 0); + try { + responseFuture = hyperbahn.advertise(); + } catch (TChannelError ex) { + System.out.println("Advertise failure: " + ex.toString()); + tchannel.shutdown(); + hyperbahn.shutdown(); + return; + } - System.err.println(response); + responseFuture.addListener(new Runnable() { + @Override + public void run() { + try { + Response response = responseFuture.get(); + System.out.println("Got response. All set: " + response.getBody().toString()); + } catch (Exception ex) { + System.out.println("Error happened: " + ex.getMessage()); + } + } + }, new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }); - Thread.sleep(TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES)); + Thread.sleep(TimeUnit.MILLISECONDS.convert(600, TimeUnit.SECONDS)); + tchannel.shutdown(); hyperbahn.shutdown(); } } diff --git a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java index 928d543..b84f6e5 100644 --- a/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java +++ b/tchannel-example/src/main/java/com/uber/tchannel/ping/PingRequestHandler.java @@ -32,11 +32,7 @@ public class PingRequestHandler extends JSONRequestHandler { @Override public Response handleImpl(Request request) { - return new Response.Builder<>( - new Pong("pong!"), - request.getEndpoint(), - ResponseCode.OK - ) + return new Response.Builder<>(new Pong("pong!"), request.getEndpoint(), ResponseCode.OK) .setHeaders(request.getHeaders()) .setTransportHeaders(request.getTransportHeaders()) .build(); diff --git a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java index edaf200..c9ac5a9 100644 --- a/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java +++ b/tchannel-hyperbahn/src/main/java/com/uber/tchannel/hyperbahn/api/HyperbahnClient.java @@ -26,12 +26,14 @@ import java.io.IOException; import java.io.Reader; import java.net.InetSocketAddress; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.ExecutionException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import com.google.common.util.concurrent.ListenableFuture; @@ -39,84 +41,114 @@ import com.uber.tchannel.api.Request; import com.uber.tchannel.api.Response; import com.uber.tchannel.api.TChannel; +import com.uber.tchannel.api.errors.TChannelError; import com.uber.tchannel.hyperbahn.messages.AdvertiseRequest; import com.uber.tchannel.hyperbahn.messages.AdvertiseResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HyperbahnClient { - - private static final String HOSTS_FILE_PATH = "/etc/uber/hyperbahn/hosts.json"; +public final class HyperbahnClient { private static final String HYPERBAHN_SERVICE_NAME = "hyperbahn"; private static final String HYPERBAHN_ADVERTISE_ENDPOINT = "ad"; - - private final TChannel tchannel; + public AtomicBoolean destroyed = new AtomicBoolean(false); private final Logger logger = LoggerFactory.getLogger(HyperbahnClient.class); - private List routers; - - public HyperbahnClient(TChannel tchannel) throws IOException { - this(tchannel, HOSTS_FILE_PATH); - } - - public HyperbahnClient(TChannel tchannel, String hostsFilePath) throws IOException { - this.tchannel = tchannel; - this.routers = HyperbahnClient.loadRouters(hostsFilePath); - } - - private static List loadRouters(String hostsFilePath) throws IOException { - - List hostPorts; - try (Reader reader = new FileReader(hostsFilePath)) { - hostPorts = new Gson().fromJson(reader, List.class); - } - - List routers = new LinkedList<>(); - for (String hostPort : hostPorts) { - String[] hostPortPair = hostPort.split(Pattern.quote(":")); - String host = hostPortPair[0]; - int port = Integer.parseInt(hostPortPair[1]); - InetSocketAddress router = new InetSocketAddress(host, port); - routers.add(router); - } - - return routers; + private final String service; + private final TChannel tchannel; + private final List routers; + private final long advertiseTimeout; + private final long advertiseInterval; + + private Timer advertiseTimer = new Timer(true); + + private HyperbahnClient(Builder builder) { + this.service = builder.service; + this.tchannel = builder.channel; + this.routers = builder.routers; + this.advertiseTimeout = builder.advertiseTimeout; + this.advertiseInterval = builder.advertiseInterval; } - public Response advertise( - String service, - int cost - ) throws InterruptedException, TimeoutException, ExecutionException { + public ListenableFuture> advertise() + throws InterruptedException, TChannelError { final AdvertiseRequest advertiseRequest = new AdvertiseRequest(); - advertiseRequest.addService(service, cost); + advertiseRequest.addService(service, 0); + // TODO: options for timeout, hard fail, etc. final Request request = new Request.Builder<>( advertiseRequest, HYPERBAHN_SERVICE_NAME, HYPERBAHN_ADVERTISE_ENDPOINT ) - .setTTL(1, TimeUnit.SECONDS) + .setTTL(advertiseTimeout, TimeUnit.SECONDS) .build(); + // TODO: should be part of tchannel peer selection final InetSocketAddress router = this.routers.get(new Random().nextInt(this.routers.size())); + ListenableFuture> responseFuture = null; + try { + responseFuture = this.tchannel.callJSON( + router.getAddress(), + router.getPort(), + request, + AdvertiseResponse.class + ); + } catch (TChannelError ex) { + // TODO: should be moved into tchanne as retry ... + this.logger.error("Advertise failure: " + ex.toString()); + + // re-throw for now + // TODO: should really handle internally + throw ex; + } - ListenableFuture> responseFuture = this.tchannel.callJSON( - router.getAddress(), - router.getPort(), - request, - AdvertiseResponse.class - ); + responseFuture.addListener(new Runnable() { + @Override + public void run() { + if (destroyed.get()) { + return; + } - return responseFuture.get(2, TimeUnit.SECONDS); + scheduleAdvertise(); + } + }, new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }); + + return responseFuture; } - public void stopAdvertising() { + private void scheduleAdvertise() { + if (destroyed.get()) { + return; + } + advertiseTimer.schedule(new TimerTask() { + @Override + public void run() { + try { + ListenableFuture> responseFuture = advertise(); + } catch (Exception ex) { + logger.error(service + " failed to advertise. " + ex.getMessage()); + } + } + }, advertiseInterval); + } + + public void stopAdvertising() { + advertiseTimer.cancel(); } public void shutdown() throws InterruptedException { + if (!destroyed.compareAndSet(false, true)) { + return; + } + this.logger.info("Shutting down HyperbahnClient and TChannel."); this.stopAdvertising(); this.tchannel.shutdown(); @@ -124,4 +156,72 @@ public void shutdown() throws InterruptedException { this.logger.info("HyperbahnClient shutdown complete."); } + public static class Builder { + + private final String service; + private final TChannel channel; + + private List routers; + private long advertiseTimeout = 5000; + private long advertiseInterval = 60 * 1000; + + public Builder(String service, TChannel channel) { + if (service == null) { + throw new NullPointerException("`service` cannot be null"); + } + + if (channel == null) { + throw new NullPointerException("`channel` cannot be null"); + } + + this.service = service; + this.channel = channel; + } + + public Builder setAdvertiseTimeout(long advertiseTimeout) { + this.advertiseTimeout = advertiseTimeout; + return this; + } + + public Builder advertiseInterval(long advertiseInterval) { + this.advertiseInterval = advertiseInterval; + return this; + } + + public Builder setRouters(List routers) { + this.routers = routers; + return this; + } + + public Builder setRouterFile(String routerFile) throws IOException { + this.routers = loadRouters(routerFile); + return this; + } + + private static List loadRouters(String hostsFilePath) throws IOException { + + List hostPorts; + try (Reader reader = new FileReader(hostsFilePath)) { + hostPorts = new Gson().fromJson(reader, List.class); + } + + List routers = new ArrayList<>(); + for (String hostPort : hostPorts) { + String[] hostPortPair = hostPort.split(Pattern.quote(":")); + String host = hostPortPair[0]; + int port = Integer.parseInt(hostPortPair[1]); + InetSocketAddress router = new InetSocketAddress(host, port); + routers.add(router); + } + + return routers; + + } + + public HyperbahnClient build() { + return new HyperbahnClient(this); + } + + } + } diff --git a/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java b/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java index 36ffeb4..55aaaf1 100644 --- a/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java +++ b/tchannel-hyperbahn/src/test/java/com/uber/tchannel/hyperbahn/api/HyperbahnClientTest.java @@ -22,15 +22,72 @@ package com.uber.tchannel.hyperbahn.api; +import com.google.common.util.concurrent.ListenableFuture; +import com.uber.tchannel.api.Request; +import com.uber.tchannel.api.Response; +import com.uber.tchannel.api.ResponseCode; +import com.uber.tchannel.api.handlers.JSONRequestHandler; +import io.netty.channel.ChannelFuture; +import org.junit.Test; + import com.uber.tchannel.api.TChannel; +import com.uber.tchannel.hyperbahn.messages.AdvertiseRequest; +import com.uber.tchannel.hyperbahn.messages.AdvertiseResponse; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class HyperbahnClientTest { - @org.junit.Test + @Test public void testAdvertise() throws Exception { + AdvertiseResponseHandler responseHandler = new AdvertiseResponseHandler(); + TChannel server = createMockHyperbahn(responseHandler); + TChannel tchannel = new TChannel.Builder("hyperbahn-service").build(); - HyperbahnClient hyperbahnClient = new HyperbahnClient(tchannel); + List routers = new ArrayList() {{ + add(new InetSocketAddress("127.0.0.1", 8888)); + }}; + + HyperbahnClient hyperbahnClient = new HyperbahnClient.Builder("service", tchannel) + .setRouters(routers) + .build(); + ListenableFuture> responseFuture = hyperbahnClient.advertise(); + + Response response = responseFuture.get(1000, TimeUnit.MILLISECONDS); + + assertNotNull(response); + assertEquals(responseHandler.requestReceived, true); + + tchannel.shutdown(); + server.shutdown(); + } + + public static TChannel createMockHyperbahn(AdvertiseResponseHandler adHandler) throws Exception { + final TChannel server = new TChannel.Builder("autobahn") + .register("ad", adHandler) + .setServerHost(InetAddress.getByName("127.0.0.1")) + .setServerPort(8888) + .build(); + + ChannelFuture f = server.listen(); + return server; + } + + public class AdvertiseResponseHandler extends JSONRequestHandler { + public boolean requestReceived = false; + @Override + public Response handleImpl(Request request) { + requestReceived = true; + return new Response.Builder<>(new AdvertiseResponse(10), request.getEndpoint(), ResponseCode.OK).build(); + } } }