From 4c93f7ebce9ee5269b77760d15f9b0115b98e032 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Tue, 17 Sep 2024 09:25:22 +0100 Subject: [PATCH] [kube] support SPDY over websocket protocol SPDY has been deprecated for several years, and most proxies are expected to lose support in the coming months. To address this issue, Kubernetes introduced SPDY over WebSocket connections. This protocol leverages a WebSocket upgrade, but once established, it functions as a simple connection with SPDY-based framing. This pull request (PR) introduces initial support for customer-facing upgrades. Future PRs will add support for teleport-to-teleport communication using the `SPDY/3.1+portforward.k8s.io` protocol. --- fixtures/ci-teleport-rbac/ci-teleport.yaml | 2 +- integration/kube_integration_test.go | 125 ++++++++++++++------- lib/kube/proxy/forwarder.go | 31 ++++- lib/kube/proxy/portforward_websocket.go | 60 ++++++++++ 4 files changed, 176 insertions(+), 42 deletions(-) diff --git a/fixtures/ci-teleport-rbac/ci-teleport.yaml b/fixtures/ci-teleport-rbac/ci-teleport.yaml index a0755caa35bf..e9346147bdd0 100644 --- a/fixtures/ci-teleport-rbac/ci-teleport.yaml +++ b/fixtures/ci-teleport-rbac/ci-teleport.yaml @@ -47,7 +47,7 @@ rules: resourceNames: ["test-pod"] - apiGroups: [""] resources: ["pods/portforward"] - verbs: ["create"] + verbs: ["create", "get"] resourceNames: ["test-pod"] - apiGroups: [""] resources: ["pods/ephemeralcontainers"] diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 5f4761a96a87..c6d08bc782a0 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -516,34 +516,6 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) { }) require.NoError(t, err) - // forward local port to target port 80 of the nginx container - localPort := newPortValue() - - forwarder, err := newPortForwarder(proxyClientConfig, kubePortForwardArgs{ - ports: []string{fmt.Sprintf("%v:80", localPort)}, - podName: testPod, - podNamespace: testNamespace, - }) - require.NoError(t, err) - - forwarderCh := make(chan error) - go func() { forwarderCh <- forwarder.ForwardPorts() }() - defer func() { - assert.NoError(t, <-forwarderCh, "Forward ports exited with error") - }() - - select { - case <-time.After(5 * time.Second): - t.Fatalf("Timeout waiting for port forwarding.") - case <-forwarder.readyC: - } - defer close(forwarder.stopC) - - resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort)) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) - require.NoError(t, resp.Body.Close()) - // impersonating client requests will bse denied _, impersonatingProxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{ T: teleport, @@ -553,18 +525,72 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) { }) require.NoError(t, err) - localPort = newPortValue() - impersonatingForwarder, err := newPortForwarder(impersonatingProxyClientConfig, kubePortForwardArgs{ - ports: []string{fmt.Sprintf("%v:80", localPort)}, - podName: testPod, - podNamespace: testNamespace, - }) - require.NoError(t, err) + tests := []struct { + name string + builder func(*rest.Config, kubePortForwardArgs) (*kubePortForwarder, error) + }{ + { + name: "SPDY portForwarder", + builder: newPortForwarder, + }, + { + name: "SPDY over Websocket portForwarder", + builder: newPortForwarderSPDYOverWebsocket, + }, + } + + for _, tt := range tests { + t.Run(tt.name, + func(t *testing.T) { + // forward local port to target port 80 of the nginx container + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, listener.Close()) + }) + + localPort := listener.Addr().(*net.TCPAddr).Port + + forwarder, err := tt.builder(proxyClientConfig, kubePortForwardArgs{ + ports: []string{fmt.Sprintf("%v:80", localPort)}, + podName: testPod, + podNamespace: testNamespace, + }) + require.NoError(t, err) + + forwarderCh := make(chan error) + go func() { forwarderCh <- forwarder.ForwardPorts() }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for port forwarding.") + case <-forwarder.readyC: + } + t.Cleanup(func() {}) + + resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + close(forwarder.stopC) + require.NoError(t, <-forwarderCh, "Forward ports exited with error") + + impersonatingForwarder, err := tt.builder(impersonatingProxyClientConfig, kubePortForwardArgs{ + ports: []string{fmt.Sprintf("%v:80", localPort)}, + podName: testPod, + podNamespace: testNamespace, + }) + require.NoError(t, err) + + // This request should be denied + err = impersonatingForwarder.ForwardPorts() + require.Error(t, err) + require.Regexp(t, ".*impersonation request has been denied.*|.*403 Forbidden.*", err.Error()) + }, + ) + } - // This request should be denied - err = impersonatingForwarder.ForwardPorts() - require.Error(t, err) - require.Regexp(t, ".*impersonation request has been denied.*", err.Error()) } // TestKubeTrustedClustersClientCert tests scenario with trusted clusters @@ -1948,6 +1974,27 @@ type kubePortForwarder struct { readyC chan struct{} } +func newPortForwarderSPDYOverWebsocket(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) { + u, err := url.Parse(kubeConfig.Host) + if err != nil { + return nil, trace.Wrap(err) + } + u.Scheme = "https" + u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v/portforward", args.podNamespace, args.podName) + + tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(u, kubeConfig) + if err != nil { + return nil, trace.Wrap(err) + } + + stopC, readyC := make(chan struct{}), make(chan struct{}) + fwd, err := portforward.New(tunnelingDialer, args.ports, stopC, readyC, nil, nil) + if err != nil { + return nil, trace.Wrap(err) + } + return &kubePortForwarder{PortForwarder: fwd, stopC: stopC, readyC: readyC}, nil +} + func newPortForwarder(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) { u, err := url.Parse(kubeConfig.Host) if err != nil { diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 21f88ee50b94..5bdb3d71d243 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -50,6 +50,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/httpstream" + httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/rest" @@ -1808,10 +1809,14 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req // Go client uses SPDY while other clients still require WebSockets. // This function will run until the end of the execution of the request. func runPortForwarding(req portForwardRequest) error { - if wsstream.IsWebSocketRequest(req.httpRequest) { + switch { + case wsstream.IsWebSocketRequestWithTunnelingProtocol(req.httpRequest): + return trace.Wrap(runPortForwardingTunneledHTTPStreams(req)) + case wsstream.IsWebSocketRequest(req.httpRequest): return trace.Wrap(runPortForwardingWebSocket(req)) + default: + return trace.Wrap(runPortForwardingHTTPStreams(req)) } - return trace.Wrap(runPortForwardingHTTPStreams(req)) } const ( @@ -2164,6 +2169,7 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http return nil, trace.Wrap(err) } + req = createSPDYRequest(req, PortForwardProtocolV1Name) upgradeRoundTripper := NewSpdyRoundTripperWithDialer(roundTripperConfig{ ctx: req.Context(), sess: sess, @@ -2189,6 +2195,27 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil } +// createSPDYRequest modifies the passed request to remove +// WebSockets headers and add SPDY upgrade information, including +// spdy protocols acceptable to the client. +func createSPDYRequest(req *http.Request, spdyProtocols ...string) *http.Request { + clone := req.Clone(req.Context()) + // Clean up the websocket headers from the http request. + clone.Header.Del(wsstream.WebSocketProtocolHeader) + clone.Header.Del("Sec-Websocket-Key") + clone.Header.Del("Sec-Websocket-Version") + clone.Header.Del(httpstream.HeaderUpgrade) + // Update the http request for an upstream SPDY upgrade. + clone.Method = "POST" + clone.Body = nil // Remove the request body which is unused. + clone.Header.Set(httpstream.HeaderUpgrade, httpstreamspdy.HeaderSpdy31) + clone.Header.Del(httpstream.HeaderProtocolVersion) + for i := range spdyProtocols { + clone.Header.Add(httpstream.HeaderProtocolVersion, spdyProtocols[i]) + } + return clone +} + // clusterSession contains authenticated user session to the target cluster: // x509 short lived credentials, forwarding proxies and other data type clusterSession struct { diff --git a/lib/kube/proxy/portforward_websocket.go b/lib/kube/proxy/portforward_websocket.go index ec3e54606c2d..d4ffcdc4b329 100644 --- a/lib/kube/proxy/portforward_websocket.go +++ b/lib/kube/proxy/portforward_websocket.go @@ -27,10 +27,14 @@ import ( "strings" "sync" + gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/httpstream" + spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" + portforwardconstants "k8s.io/apimachinery/pkg/util/portforward" + "k8s.io/client-go/tools/portforward" "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/events" @@ -297,3 +301,59 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair) h.Debugf("Port forwarding pair completed.") } + +// runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol +// over WebSockets. +func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error { + targetConn, _, err := req.targetDialer.Dial(PortForwardProtocolV1Name) + if err != nil { + return trace.Wrap(err, "error upgrading target connection") + } + defer targetConn.Close() + + // Try to upgrade the websocket connection. + // Beyond this point, we don't need to write errors to the response. + upgrader := gwebsocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + Subprotocols: []string{portforwardconstants.WebsocketsSPDYTunnelingPortForwardV1}, + } + conn, err := upgrader.Upgrade(req.httpResponseWriter, req.httpRequest, nil) + if err != nil { + return trace.Wrap(err) + } + + tunneledConn := portforward.NewTunnelingConnection("server", conn) + + streamChan := make(chan httpstream.Stream, 1) + spdyConn, err := spdystream.NewServerConnectionWithPings( + tunneledConn, + httpStreamReceived(req.context, streamChan), + req.pingPeriod, + ) + if err != nil { + return trace.Wrap(err, "error upgrading connection") + } + + if conn == nil { + return trace.ConnectionProblem(nil, "Unable to upgrade websocket connection") + } + defer conn.Close() + + h := &portForwardProxy{ + Entry: logrus.WithFields(logrus.Fields{ + teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube), + events.RemoteAddr: req.httpRequest.RemoteAddr, + }), + portForwardRequest: req, + sourceConn: spdyConn, + streamChan: streamChan, + streamPairs: make(map[string]*httpStreamPair), + streamCreationTimeout: DefaultStreamCreationTimeout, + targetConn: targetConn, + } + defer h.Close() + h.Debugf("Setting port forwarding streaming connection idle timeout to %v", IdleTimeout) + spdyConn.SetIdleTimeout(IdleTimeout) + h.run() + return nil +}