Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf improvement: unblocking call, aggregated flush, and shared thread pool #115

Merged
merged 4 commits into from
Dec 5, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tchannel-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ THE POSSIBILITY OF SUCH DAMAGE.
<artifactId>tchannel-core</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.13</version>
</dependency>
</dependencies>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import com.uber.tchannel.messages.Response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -68,25 +73,30 @@ public class LargePayloadBenchmark {
private InetAddress host;
private ByteBuf payload;

// @Param({ "0", "1", "10" })
@Param({ "0"})
private int sleepTime;
private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + LargePayloadBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(10)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

this.host = InetAddress.getByName("127.0.0.1");
this.channel = new TChannel.Builder("ping-server")
.setServerHost(host)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
channel.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
channel.listen();
Expand All @@ -95,6 +105,8 @@ public void setup() throws Exception {
this.client = new TChannel.Builder("ping-client")
// .setResetOnTimeoutLimit(100)
.setClientMaxPendingRequests(200000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
this.client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import com.uber.tchannel.channels.Connection;
import com.uber.tchannel.messages.JsonRequest;
import com.uber.tchannel.messages.JsonResponse;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -50,6 +55,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;
Expand All @@ -58,25 +64,36 @@
public class PingPongMultiServerBenchmark {

private List<TChannel> servers = new ArrayList<>();
TChannel client;
SubChannel subClient;
private TChannel client;
private SubChannel subClient;

int connections = 2;
private int connections = 3;

private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + PingPongMultiServerBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(10)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

createServers();
this.client = new TChannel.Builder("ping-client").build();
this.client = new TChannel.Builder("ping-client")
.setClientMaxPendingRequests(150000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
List<InetSocketAddress> peers = new ArrayList<>();
List<Connection> conns = new ArrayList<>();
Expand All @@ -97,6 +114,8 @@ protected void createServers() throws Exception {
for (int i = 0; i < connections; i++) {
TChannel server = new TChannel.Builder("ping-server")
.setServerHost(InetAddress.getByName("127.0.0.1"))
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
server.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
server.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import com.uber.tchannel.api.handlers.JSONRequestHandler;
import com.uber.tchannel.messages.JsonRequest;
import com.uber.tchannel.messages.JsonResponse;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.LogManager;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -46,6 +51,7 @@
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.net.InetAddress;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;
Expand All @@ -59,29 +65,40 @@ public class PingPongServerBenchmark {
private int port;
private InetAddress host;

private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private NioEventLoopGroup childGroup = new NioEventLoopGroup();

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(".*" + PingPongServerBenchmark.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(150)
.warmupIterations(30)
.measurementIterations(50)
.forks(1)
.build();
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws Exception {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
BasicConfigurator.configure();
LogManager.getRootLogger().setLevel(org.apache.log4j.Level.INFO);

this.host = InetAddress.getByName("127.0.0.1");
this.channel = new TChannel.Builder("ping-server")
.setServerHost(host)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
channel.makeSubChannel("ping-server").register("ping", new PingDefaultRequestHandler());
channel.listen();
this.port = this.channel.getListeningPort();

this.client = new TChannel.Builder("ping-client")
// .setResetOnTimeoutLimit(100)
.setClientMaxPendingRequests(200000)
.setClientMaxPendingRequests(100000)
.setBossGroup(bossGroup)
.setChildGroup(childGroup)
.build();
this.subClient = this.client.makeSubChannel("ping-server");
this.client.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public int getClientMaxPendingRequests() {
public static class Builder {

private final HashedWheelTimer timer;
private ExecutorService executorService;
private static ExecutorService executorService = new ForkJoinPool();
private EventLoopGroup bossGroup;
private EventLoopGroup childGroup;

Expand All @@ -216,13 +216,12 @@ public Builder(String service) {
}

timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS);
executorService = new ForkJoinPool();
bossGroup = new NioEventLoopGroup(1);
childGroup = new NioEventLoopGroup();
}

public Builder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
Builder.executorService = executorService;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public void setChannelFuture(ChannelFuture channelFuture) {
}

public void flushWrite() {
if (channelFuture == null) {
return;
}

try {
this.channelFuture.sync();
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@
import com.uber.tchannel.headers.ArgScheme;
import com.uber.tchannel.headers.TransportHeaders;
import com.uber.tchannel.messages.Request;
import com.uber.tchannel.messages.Response;
import com.uber.tchannel.messages.ResponseMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.uber.tchannel.frames.ErrorFrame.sendError;

Expand All @@ -56,7 +58,8 @@ public class RequestRouter extends SimpleChannelInboundHandler<Request> {

private final ListeningExecutorService listeningExecutorService;

private final List<ResponseMessage> responseQueue = new LinkedList<ResponseMessage>();
private final AtomicBoolean busy = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Response> responseQueue = new ConcurrentLinkedQueue<>();

public RequestRouter(TChannel topChannel, ExecutorService executorService) {
this.topChannel = topChannel;
Expand Down Expand Up @@ -123,24 +126,19 @@ protected void messageReceived(final ChannelHandlerContext ctx, final Request re
}

// Handle the request in a separate thread and get a future to it
ListenableFuture<ResponseMessage> responseFuture = listeningExecutorService.submit(
ListenableFuture<Response> responseFuture = listeningExecutorService.submit(
new CallableHandler(handler, request));

Futures.addCallback(responseFuture, new FutureCallback<ResponseMessage>() {
Futures.addCallback(responseFuture, new FutureCallback<Response>() {
@Override
public void onSuccess(ResponseMessage response) {
public void onSuccess(Response response) {
if (!ctx.channel().isActive()) {
response.release();
return;
}

if (!ctx.channel().isWritable()) {
synchronized (responseQueue) {
responseQueue.add(response);
}
} else {
ctx.writeAndFlush(response);
}
responseQueue.offer(response);
sendResponse(ctx);
}

@Override
Expand All @@ -156,15 +154,38 @@ public void onFailure(Throwable throwable) {

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
if (!ctx.channel().isWritable()) {
sendResponse(ctx);
}

protected void sendResponse(ChannelHandlerContext ctx) {

if (!busy.compareAndSet(false, true)) {
return;
}

synchronized (responseQueue) {
while (!responseQueue.isEmpty() && ctx.channel().isWritable()) {
ResponseMessage res = responseQueue.remove(0);
ctx.writeAndFlush(res);
Channel channel = ctx.channel();
try {
boolean flush = false;
while (channel.isWritable()) {
Response res = responseQueue.poll();
if (res == null) {
break;
}

channel.write(res);
flush = true;
}

if (flush) {
channel.flush();
}
} finally {
busy.set(false);
}

// in case there are new response added
if (channel.isWritable() && !responseQueue.isEmpty()) {
sendResponse(ctx);
}
}

Expand All @@ -173,16 +194,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

// clean up the queue
synchronized (responseQueue) {
while (!responseQueue.isEmpty()) {
ResponseMessage res = responseQueue.remove(0);
res.release();
}
while (!responseQueue.isEmpty()) {
ResponseMessage res = responseQueue.poll();
res.release();
}
}


private class CallableHandler implements Callable<ResponseMessage> {
private class CallableHandler implements Callable<Response> {
private final Request request;
private final RequestHandler handler;

Expand All @@ -192,8 +211,8 @@ public CallableHandler(RequestHandler handler, Request request) {
}

@Override
public ResponseMessage call() throws Exception {
ResponseMessage response = handler.handle(request);
public Response call() throws Exception {
Response response = handler.handle(request);
request.release();
return response;
}
Expand Down
Loading