Skip to content

Commit

Permalink
#66 I think HTTPS is almost working. Tunnel seems to be established b…
Browse files Browse the repository at this point in the history
…ut getting 400 response.
  • Loading branch information
robfletcher committed Aug 9, 2013
1 parent dab5b0c commit 3c5f9c2
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package co.freeside.betamax.proxy.netty;

import io.netty.channel.ChannelHandlerContext;

public interface CallbackNotifier {
void onSuccess(ChannelHandlerContext outboundCtx);
void onFailure(ChannelHandlerContext outboundCtx, Throwable cause);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package co.freeside.betamax.proxy.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public final class DirectClientHandler extends ChannelInboundHandlerAdapter {
private static final String name = "DIRECT_CLIENT_HANDLER";

public static String getName() {
return name;
}
private final CallbackNotifier cb;

public DirectClientHandler(CallbackNotifier cb) {
this.cb = cb;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
cb.onSuccess(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
cb.onFailure(ctx, throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package co.freeside.betamax.proxy.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public final class DirectClientInitializer extends ChannelInitializer<SocketChannel> {

private final CallbackNotifier callbackNotifier;

public DirectClientInitializer(CallbackNotifier callbackNotifier) {
this.callbackNotifier = callbackNotifier;
}

@Override
public void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(DirectClientHandler.getName(), new DirectClientHandler(callbackNotifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {

public static final int MAX_CONTENT_LENGTH = 65536;

static final String HANDLER_HTTP_DECODER = "decoder";
static final String HANDLER_HTTP_AGGREGATOR = "aggregator";
static final String HANDLER_HTTP_ENCODER = "encoder";
static final String HANDLER_CHUNKED_WRITER = "chunkedWriter";

private final ChannelHandler handler;
private final EventLoopGroup workerGroup;
protected final EventLoopGroup workerGroup;

public HttpChannelInitializer(int workerThreads, ChannelHandler handler) {
this.handler = handler;
Expand All @@ -32,10 +37,10 @@ public HttpChannelInitializer(int workerThreads, ChannelHandler handler) {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast(HANDLER_HTTP_DECODER, new HttpRequestDecoder());
pipeline.addLast(HANDLER_HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast(HANDLER_HTTP_ENCODER, new HttpResponseEncoder());
pipeline.addLast(HANDLER_CHUNKED_WRITER, new ChunkedWriteHandler());
if (workerGroup == null) {
pipeline.addLast("betamaxHandler", handler);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package co.freeside.betamax.proxy.netty;

import io.netty.buffer.*;
import io.netty.channel.*;

class NettyHelpers {
public static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}

private NettyHelpers() {}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,62 @@
package co.freeside.betamax.proxy.netty;

import java.net.*;
import io.netty.bootstrap.*;
import io.netty.channel.*;
import io.netty.channel.socket.nio.*;
import io.netty.handler.codec.http.*;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpVersion.*;

public class ProxyConnectHandler extends SimpleChannelInboundHandler<HttpRequest> {

private final SocketAddress proxyAddress;
private final Bootstrap bootstrap = new Bootstrap();

public ProxyConnectHandler(SocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
}

@Override
public boolean acceptInboundMessage(Object message) throws Exception {
System.err.printf("Evaluating %s%n", message);
return super.acceptInboundMessage(message) && CONNECT.equals(((HttpRequest) message).getMethod());
return super.acceptInboundMessage(message) && isConnectRequest((HttpRequest) message);
}

@Override
protected void channelRead0(ChannelHandlerContext context, HttpRequest request) {
System.err.println("I actually got a CONNECT. Now what?");
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
context.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
protected void channelRead0(final ChannelHandlerContext context, final HttpRequest request) {
CallbackNotifier callback = new CallbackNotifier() {
@Override
public void onSuccess(final ChannelHandlerContext outboundContext) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
context.channel().writeAndFlush(response)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
outboundContext.channel().pipeline().addLast(new RelayHandler(context.channel()));
context.channel().pipeline().addLast(new RelayHandler(outboundContext.channel()));
}
});
}

@Override
public void onFailure(ChannelHandlerContext outboundCtx, Throwable cause) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_GATEWAY);
context.channel().writeAndFlush(response);
NettyHelpers.closeOnFlush(context.channel());
}
};

final Channel inboundChannel = context.channel();
bootstrap.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientInitializer(callback));

bootstrap.connect(proxyAddress);
}

private boolean isConnectRequest(HttpRequest request) {
return CONNECT.equals(request.getMethod());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package co.freeside.betamax.proxy.netty;

import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.*;

public final class RelayHandler extends ChannelInboundHandlerAdapter {
private static final String name = "RELAY_HANDLER";

public static String getName() {
return name;
}

private final Channel relayChannel;

public RelayHandler(Channel relayChannel) {
this.relayChannel = relayChannel;
}

@Override
public void channelActive(ChannelHandlerContext context) {
context.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

@Override
public void channelRead(ChannelHandlerContext context, Object message) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(message);
} else {
ReferenceCountUtil.release(message);
}
}

@Override
public void channelInactive(ChannelHandlerContext context) {
NettyHelpers.closeOnFlush(relayChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
cause.printStackTrace();
context.close();
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package co.freeside.betamax.proxy.netty;

import java.net.*;
import io.netty.channel.*;
import io.netty.channel.socket.*;

public class TunnelingHttpChannelInitializer extends HttpChannelInitializer {

public TunnelingHttpChannelInitializer(int workerThreads, ChannelHandler handler) {
static final String HANDLER_HTTP_CONNECT = "connector";

private final SocketAddress proxyAddress;

public TunnelingHttpChannelInitializer(int workerThreads, ChannelHandler handler, SocketAddress proxyAddress) {
super(workerThreads, handler);
this.proxyAddress = proxyAddress;
}

@Override
public void initChannel(SocketChannel channel) throws Exception {
super.initChannel(channel);

channel.pipeline().addAfter("decoder", "connector", new ProxyConnectHandler());
channel.pipeline().addAfter("betamaxHandler", HANDLER_HTTP_CONNECT, new ProxyConnectHandler(proxyAddress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public String getMethod() {
@Override
public URI getUri() {
try {
return new URI(delegate.getUri());
String uri = delegate.getUri();
if (!uri.startsWith("http")) {
uri = "https://" + uri;
}
return new URI(uri);
} catch (URISyntaxException e) {
throw new IllegalStateException("Invalid URI in underlying request", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class ProxyServer implements HttpInterceptor {
proxyHandler = new BetamaxChannelHandler()
proxyHandler << new DefaultHandlerChain(recorder, newHttpClient())

def standardInitializer = new TunnelingHttpChannelInitializer(0, proxyHandler); // TODO: correct worker threads? After all nothing in Betamax is actually async so we should probably not tie up the main thread
final int sslPort = recorder.proxyPort + 1
def standardInitializer = new TunnelingHttpChannelInitializer(0, proxyHandler, new InetSocketAddress("localhost", sslPort)); // TODO: correct worker threads? After all nothing in Betamax is actually async so we should probably not tie up the main thread
def secureInitializer = new HttpsChannelInitializer(0, proxyHandler); // TODO: correct worker threads? After all nothing in Betamax is actually async so we should probably not tie up the main thread
proxyServer = new NettyBetamaxServer(recorder.proxyPort, recorder.proxyPort + 1, standardInitializer, secureInitializer)
proxyServer = new NettyBetamaxServer(recorder.proxyPort, sslPort, standardInitializer, secureInitializer)
}

@Override
Expand Down

0 comments on commit 3c5f9c2

Please sign in to comment.