diff --git a/fixtures/ci-teleport-rbac/ci-teleport.yaml b/fixtures/ci-teleport-rbac/ci-teleport.yaml index 8d941dc36181..15dc0721cc2d 100644 --- a/fixtures/ci-teleport-rbac/ci-teleport.yaml +++ b/fixtures/ci-teleport-rbac/ci-teleport.yaml @@ -46,7 +46,7 @@ rules: resourceNames: ["test-pod"] - apiGroups: [""] resources: ["pods/portforward"] - verbs: ["create"] + verbs: ["create", "get"] resourceNames: ["test-pod"] - apiGroups: [""] resources: ["pods/ephemeralcontainers"] diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 574274e96ab9..19a0e8451d7b 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -507,34 +507,6 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) { }) require.NoError(t, err) - // forward local port to target port 80 of the nginx container - localPort := newPortValue() - - forwarder, err := newPortForwarder(proxyClientConfig, kubePortForwardArgs{ - ports: []string{fmt.Sprintf("%v:80", localPort)}, - podName: testPod, - podNamespace: testNamespace, - }) - require.NoError(t, err) - - forwarderCh := make(chan error) - go func() { forwarderCh <- forwarder.ForwardPorts() }() - defer func() { - assert.NoError(t, <-forwarderCh, "Forward ports exited with error") - }() - - select { - case <-time.After(5 * time.Second): - t.Fatalf("Timeout waiting for port forwarding.") - case <-forwarder.readyC: - } - defer close(forwarder.stopC) - - resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort)) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) - require.NoError(t, resp.Body.Close()) - // impersonating client requests will bse denied _, impersonatingProxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{ T: teleport, @@ -544,18 +516,72 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) { }) require.NoError(t, err) - localPort = newPortValue() - impersonatingForwarder, err := newPortForwarder(impersonatingProxyClientConfig, kubePortForwardArgs{ - ports: []string{fmt.Sprintf("%v:80", localPort)}, - podName: testPod, - podNamespace: testNamespace, - }) - require.NoError(t, err) + tests := []struct { + name string + builder func(*rest.Config, kubePortForwardArgs) (*kubePortForwarder, error) + }{ + { + name: "SPDY portForwarder", + builder: newPortForwarder, + }, + { + name: "SPDY over Websocket portForwarder", + builder: newPortForwarderSPDYOverWebsocket, + }, + } + + for _, tt := range tests { + t.Run(tt.name, + func(t *testing.T) { + // forward local port to target port 80 of the nginx container + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, listener.Close()) + }) + + localPort := listener.Addr().(*net.TCPAddr).Port + + forwarder, err := tt.builder(proxyClientConfig, kubePortForwardArgs{ + ports: []string{fmt.Sprintf("%v:80", localPort)}, + podName: testPod, + podNamespace: testNamespace, + }) + require.NoError(t, err) + + forwarderCh := make(chan error) + go func() { forwarderCh <- forwarder.ForwardPorts() }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for port forwarding.") + case <-forwarder.readyC: + } + t.Cleanup(func() {}) + + resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + close(forwarder.stopC) + require.NoError(t, <-forwarderCh, "Forward ports exited with error") + + impersonatingForwarder, err := tt.builder(impersonatingProxyClientConfig, kubePortForwardArgs{ + ports: []string{fmt.Sprintf("%v:80", localPort)}, + podName: testPod, + podNamespace: testNamespace, + }) + require.NoError(t, err) + + // This request should be denied + err = impersonatingForwarder.ForwardPorts() + require.Error(t, err) + require.Regexp(t, ".*impersonation request has been denied.*|.*403 Forbidden.*", err.Error()) + }, + ) + } - // This request should be denied - err = impersonatingForwarder.ForwardPorts() - require.Error(t, err) - require.Regexp(t, ".*impersonation request has been denied.*", err.Error()) } // TestKubeTrustedClustersClientCert tests scenario with trusted clusters @@ -1744,6 +1770,27 @@ type kubePortForwarder struct { readyC chan struct{} } +func newPortForwarderSPDYOverWebsocket(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) { + u, err := url.Parse(kubeConfig.Host) + if err != nil { + return nil, trace.Wrap(err) + } + u.Scheme = "https" + u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v/portforward", args.podNamespace, args.podName) + + tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(u, kubeConfig) + if err != nil { + return nil, trace.Wrap(err) + } + + stopC, readyC := make(chan struct{}), make(chan struct{}) + fwd, err := portforward.New(tunnelingDialer, args.ports, stopC, readyC, nil, nil) + if err != nil { + return nil, trace.Wrap(err) + } + return &kubePortForwarder{PortForwarder: fwd, stopC: stopC, readyC: readyC}, nil +} + func newPortForwarder(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) { u, err := url.Parse(kubeConfig.Host) if err != nil { diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 544952da0e6b..0648a80a1a67 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -49,6 +49,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/httpstream" + httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/tools/remotecommand" @@ -1791,10 +1792,14 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req // Go client uses SPDY while other clients still require WebSockets. // This function will run until the end of the execution of the request. func runPortForwarding(req portForwardRequest) error { - if wsstream.IsWebSocketRequest(req.httpRequest) { + switch { + case wsstream.IsWebSocketRequestWithTunnelingProtocol(req.httpRequest): + return trace.Wrap(runPortForwardingTunneledHTTPStreams(req)) + case wsstream.IsWebSocketRequest(req.httpRequest): return trace.Wrap(runPortForwardingWebSocket(req)) + default: + return trace.Wrap(runPortForwardingHTTPStreams(req)) } - return trace.Wrap(runPortForwardingHTTPStreams(req)) } const ( @@ -2071,6 +2076,7 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http return nil, trace.Wrap(err) } + req = createSPDYRequest(req, PortForwardProtocolV1Name) upgradeRoundTripper := NewSpdyRoundTripperWithDialer(roundTripperConfig{ ctx: req.Context(), sess: sess, @@ -2096,6 +2102,27 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil } +// createSPDYRequest modifies the passed request to remove +// WebSockets headers and add SPDY upgrade information, including +// spdy protocols acceptable to the client. +func createSPDYRequest(req *http.Request, spdyProtocols ...string) *http.Request { + clone := req.Clone(req.Context()) + // Clean up the websocket headers from the http request. + clone.Header.Del(wsstream.WebSocketProtocolHeader) + clone.Header.Del("Sec-Websocket-Key") + clone.Header.Del("Sec-Websocket-Version") + clone.Header.Del(httpstream.HeaderUpgrade) + // Update the http request for an upstream SPDY upgrade. + clone.Method = "POST" + clone.Body = nil // Remove the request body which is unused. + clone.Header.Set(httpstream.HeaderUpgrade, httpstreamspdy.HeaderSpdy31) + clone.Header.Del(httpstream.HeaderProtocolVersion) + for i := range spdyProtocols { + clone.Header.Add(httpstream.HeaderProtocolVersion, spdyProtocols[i]) + } + return clone +} + // clusterSession contains authenticated user session to the target cluster: // x509 short lived credentials, forwarding proxies and other data type clusterSession struct { diff --git a/lib/kube/proxy/portforward_websocket.go b/lib/kube/proxy/portforward_websocket.go index ec3e54606c2d..d4ffcdc4b329 100644 --- a/lib/kube/proxy/portforward_websocket.go +++ b/lib/kube/proxy/portforward_websocket.go @@ -27,10 +27,14 @@ import ( "strings" "sync" + gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/httpstream" + spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" + portforwardconstants "k8s.io/apimachinery/pkg/util/portforward" + "k8s.io/client-go/tools/portforward" "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/events" @@ -297,3 +301,59 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair) h.Debugf("Port forwarding pair completed.") } + +// runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol +// over WebSockets. +func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error { + targetConn, _, err := req.targetDialer.Dial(PortForwardProtocolV1Name) + if err != nil { + return trace.Wrap(err, "error upgrading target connection") + } + defer targetConn.Close() + + // Try to upgrade the websocket connection. + // Beyond this point, we don't need to write errors to the response. + upgrader := gwebsocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + Subprotocols: []string{portforwardconstants.WebsocketsSPDYTunnelingPortForwardV1}, + } + conn, err := upgrader.Upgrade(req.httpResponseWriter, req.httpRequest, nil) + if err != nil { + return trace.Wrap(err) + } + + tunneledConn := portforward.NewTunnelingConnection("server", conn) + + streamChan := make(chan httpstream.Stream, 1) + spdyConn, err := spdystream.NewServerConnectionWithPings( + tunneledConn, + httpStreamReceived(req.context, streamChan), + req.pingPeriod, + ) + if err != nil { + return trace.Wrap(err, "error upgrading connection") + } + + if conn == nil { + return trace.ConnectionProblem(nil, "Unable to upgrade websocket connection") + } + defer conn.Close() + + h := &portForwardProxy{ + Entry: logrus.WithFields(logrus.Fields{ + teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube), + events.RemoteAddr: req.httpRequest.RemoteAddr, + }), + portForwardRequest: req, + sourceConn: spdyConn, + streamChan: streamChan, + streamPairs: make(map[string]*httpStreamPair), + streamCreationTimeout: DefaultStreamCreationTimeout, + targetConn: targetConn, + } + defer h.Close() + h.Debugf("Setting port forwarding streaming connection idle timeout to %v", IdleTimeout) + spdyConn.SetIdleTimeout(IdleTimeout) + h.run() + return nil +}