Skip to content

Commit

Permalink
Merge pull request #115 from uber/perf
Browse files Browse the repository at this point in the history
Perf improvement: unblocking call, aggregated flush, and shared thread pool
  • Loading branch information
ShanniLi committed Dec 5, 2015
2 parents d7036e8 + 6d372e7 commit a38de37
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 56 deletions.
5 changes: 5 additions & 0 deletions tchannel-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ THE POSSIBILITY OF SUCH DAMAGE.
<artifactId>tchannel-core</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.13</version>
</dependency>
</dependencies>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import com.uber.tchannel.messages.Response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -68,25 +73,30 @@ public class LargePayloadBenchmark {
private InetAddress host;
private ByteBuf payload;

// @Param({ "0", "1", "10" })
@Param({ "0"})
private int sleepTime;
private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + LargePayloadBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(10)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

this.host = InetAddress.getByName("127.0.0.1");
this.channel = new TChannel.Builder("ping-server")
.setServerHost(host)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
channel.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
channel.listen();
Expand All @@ -95,6 +105,8 @@ public void setup() throws Exception {
this.client = new TChannel.Builder("ping-client")
// .setResetOnTimeoutLimit(100)
.setClientMaxPendingRequests(200000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
this.client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import com.uber.tchannel.channels.Connection;
import com.uber.tchannel.messages.JsonRequest;
import com.uber.tchannel.messages.JsonResponse;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -50,6 +55,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;
Expand All @@ -58,25 +64,36 @@
public class PingPongMultiServerBenchmark {

private List<TChannel> servers = new ArrayList<>();
TChannel client;
SubChannel subClient;
private TChannel client;
private SubChannel subClient;

int connections = 2;
private int connections = 3;

private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + PingPongMultiServerBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(10)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

createServers();
this.client = new TChannel.Builder("ping-client").build();
this.client = new TChannel.Builder("ping-client")
.setClientMaxPendingRequests(150000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
List<InetSocketAddress> peers = new ArrayList<>();
List<Connection> conns = new ArrayList<>();
Expand All @@ -97,6 +114,8 @@ protected void createServers() throws Exception {
for (int i = 0; i < connections; i++) {
TChannel server = new TChannel.Builder("ping-server")
.setServerHost(InetAddress.getByName("127.0.0.1"))
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
server.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
server.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import com.uber.tchannel.api.handlers.JSONRequestHandler;
import com.uber.tchannel.messages.JsonRequest;
import com.uber.tchannel.messages.JsonResponse;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -46,6 +51,7 @@
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.net.InetAddress;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;
Expand All @@ -59,29 +65,40 @@ public class PingPongServerBenchmark {
private int port;
private InetAddress host;

private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + PingPongServerBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(150)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

this.host = InetAddress.getByName("127.0.0.1");
this.channel = new TChannel.Builder("ping-server")
.setServerHost(host)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
channel.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
channel.listen();
this.port = this.channel.getListeningPort();

this.client = new TChannel.Builder("ping-client")
// .setResetOnTimeoutLimit(100)
.setClientMaxPendingRequests(200000)
.setClientMaxPendingRequests(100000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
this.client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public int getClientMaxPendingRequests() {
public static class Builder {

private final HashedWheelTimer timer;
private ExecutorService executorService;
private static ExecutorService executorService = new ForkJoinPool();
private EventLoopGroup bossGroup;
private EventLoopGroup childGroup;

Expand All @@ -216,13 +216,12 @@ public Builder(String service) {
}

timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS);
executorService = new ForkJoinPool();
bossGroup = new NioEventLoopGroup(1);
childGroup = new NioEventLoopGroup();
}

public Builder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
Builder.executorService = executorService;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public void setChannelFuture(ChannelFuture channelFuture) {
}

public void flushWrite() {
if (channelFuture == null) {
return;
}

try {
this.channelFuture.sync();
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@
import com.uber.tchannel.headers.ArgScheme;
import com.uber.tchannel.headers.TransportHeaders;
import com.uber.tchannel.messages.Request;
import com.uber.tchannel.messages.Response;
import com.uber.tchannel.messages.ResponseMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.uber.tchannel.frames.ErrorFrame.sendError;

Expand All @@ -56,7 +58,8 @@ public class RequestRouter extends SimpleChannelInboundHandler<Request> {

private final ListeningExecutorService listeningExecutorService;

private final List<ResponseMessage> responseQueue = new LinkedList<ResponseMessage>();
private final AtomicBoolean busy = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Response> responseQueue = new ConcurrentLinkedQueue<>();

public RequestRouter(TChannel topChannel, ExecutorService executorService) {
this.topChannel = topChannel;
Expand Down Expand Up @@ -123,24 +126,19 @@ protected void messageReceived(final ChannelHandlerContext ctx, final Request re
}

// Handle the request in a separate thread and get a future to it
ListenableFuture<ResponseMessage> responseFuture = listeningExecutorService.submit(
ListenableFuture<Response> responseFuture = listeningExecutorService.submit(
new CallableHandler(handler, request));

Futures.addCallback(responseFuture, new FutureCallback<ResponseMessage>() {
Futures.addCallback(responseFuture, new FutureCallback<Response>() {
@Override
public void onSuccess(ResponseMessage response) {
public void onSuccess(Response response) {
if (!ctx.channel().isActive()) {
response.release();
return;
}

if (!ctx.channel().isWritable()) {
synchronized (responseQueue) {
responseQueue.add(response);
}
} else {
ctx.writeAndFlush(response);
}
responseQueue.offer(response);
sendResponse(ctx);
}

@Override
Expand All @@ -156,15 +154,38 @@ public void onFailure(Throwable throwable) {

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
if (!ctx.channel().isWritable()) {
sendResponse(ctx);
}

protected void sendResponse(ChannelHandlerContext ctx) {

if (!busy.compareAndSet(false, true)) {
return;
}

synchronized (responseQueue) {
while (!responseQueue.isEmpty() && ctx.channel().isWritable()) {
ResponseMessage res = responseQueue.remove(0);
ctx.writeAndFlush(res);
Channel channel = ctx.channel();
try {
boolean flush = false;
while (channel.isWritable()) {
Response res = responseQueue.poll();
if (res == null) {
break;
}

channel.write(res);
flush = true;
}

if (flush) {
channel.flush();
}
} finally {
busy.set(false);
}

// in case there are new response added
if (channel.isWritable() && !responseQueue.isEmpty()) {
sendResponse(ctx);
}
}

Expand All @@ -173,16 +194,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

// clean up the queue
synchronized (responseQueue) {
while (!responseQueue.isEmpty()) {
ResponseMessage res = responseQueue.remove(0);
res.release();
}
while (!responseQueue.isEmpty()) {
ResponseMessage res = responseQueue.poll();
res.release();
}
}


private class CallableHandler implements Callable<ResponseMessage> {
private class CallableHandler implements Callable<Response> {
private final Request request;
private final RequestHandler handler;

Expand All @@ -192,8 +211,8 @@ public CallableHandler(RequestHandler handler, Request request) {
}

@Override
public ResponseMessage call() throws Exception {
ResponseMessage response = handler.handle(request);
public Response call() throws Exception {
Response response = handler.handle(request);
request.release();
return response;
}
Expand Down
Loading

0 comments on commit a38de37

Please sign in to comment.