Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] [kube] support SPDY over websocket protocol for PortForward #46815

Open
wants to merge 1 commit into
base: branch/v16
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fixtures/ci-teleport-rbac/ci-teleport.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ rules:
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/portforward"]
verbs: ["create"]
verbs: ["create", "get"]
resourceNames: ["test-pod"]
- apiGroups: [""]
resources: ["pods/ephemeralcontainers"]
Expand Down
125 changes: 86 additions & 39 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,34 +516,6 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)

// forward local port to target port 80 of the nginx container
localPort := newPortValue()

forwarder, err := newPortForwarder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()
defer func() {
assert.NoError(t, <-forwarderCh, "Forward ports exited with error")
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
defer close(forwarder.stopC)

resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())

// impersonating client requests will bse denied
_, impersonatingProxyClientConfig, err := kube.ProxyClient(kube.ProxyConfig{
T: teleport,
Expand All @@ -553,18 +525,72 @@ func testKubePortForward(t *testing.T, suite *KubeSuite) {
})
require.NoError(t, err)

localPort = newPortValue()
impersonatingForwarder, err := newPortForwarder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)
tests := []struct {
name string
builder func(*rest.Config, kubePortForwardArgs) (*kubePortForwarder, error)
}{
{
name: "SPDY portForwarder",
builder: newPortForwarder,
},
{
name: "SPDY over Websocket portForwarder",
builder: newPortForwarderSPDYOverWebsocket,
},
}

for _, tt := range tests {
t.Run(tt.name,
func(t *testing.T) {
// forward local port to target port 80 of the nginx container
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, listener.Close())
})

localPort := listener.Addr().(*net.TCPAddr).Port

forwarder, err := tt.builder(proxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

forwarderCh := make(chan error)
go func() { forwarderCh <- forwarder.ForwardPorts() }()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for port forwarding.")
case <-forwarder.readyC:
}
t.Cleanup(func() {})

resp, err := http.Get(fmt.Sprintf("http://localhost:%v", localPort))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.NoError(t, resp.Body.Close())

close(forwarder.stopC)
require.NoError(t, <-forwarderCh, "Forward ports exited with error")

impersonatingForwarder, err := tt.builder(impersonatingProxyClientConfig, kubePortForwardArgs{
ports: []string{fmt.Sprintf("%v:80", localPort)},
podName: testPod,
podNamespace: testNamespace,
})
require.NoError(t, err)

// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*|.*403 Forbidden.*", err.Error())
},
)
}

// This request should be denied
err = impersonatingForwarder.ForwardPorts()
require.Error(t, err)
require.Regexp(t, ".*impersonation request has been denied.*", err.Error())
}

// TestKubeTrustedClustersClientCert tests scenario with trusted clusters
Expand Down Expand Up @@ -1948,6 +1974,27 @@ type kubePortForwarder struct {
readyC chan struct{}
}

func newPortForwarderSPDYOverWebsocket(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {
return nil, trace.Wrap(err)
}
u.Scheme = "https"
u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v/portforward", args.podNamespace, args.podName)

tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(u, kubeConfig)
if err != nil {
return nil, trace.Wrap(err)
}

stopC, readyC := make(chan struct{}), make(chan struct{})
fwd, err := portforward.New(tunnelingDialer, args.ports, stopC, readyC, nil, nil)
if err != nil {
return nil, trace.Wrap(err)
}
return &kubePortForwarder{PortForwarder: fwd, stopC: stopC, readyC: readyC}, nil
}

func newPortForwarder(kubeConfig *rest.Config, args kubePortForwardArgs) (*kubePortForwarder, error) {
u, err := url.Parse(kubeConfig.Host)
if err != nil {
Expand Down
31 changes: 29 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -1808,10 +1809,14 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
// Go client uses SPDY while other clients still require WebSockets.
// This function will run until the end of the execution of the request.
func runPortForwarding(req portForwardRequest) error {
if wsstream.IsWebSocketRequest(req.httpRequest) {
switch {
case wsstream.IsWebSocketRequestWithTunnelingProtocol(req.httpRequest):
return trace.Wrap(runPortForwardingTunneledHTTPStreams(req))
case wsstream.IsWebSocketRequest(req.httpRequest):
return trace.Wrap(runPortForwardingWebSocket(req))
default:
return trace.Wrap(runPortForwardingHTTPStreams(req))
}
return trace.Wrap(runPortForwardingHTTPStreams(req))
}

const (
Expand Down Expand Up @@ -2164,6 +2169,7 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return nil, trace.Wrap(err)
}

req = createSPDYRequest(req, PortForwardProtocolV1Name)
upgradeRoundTripper := NewSpdyRoundTripperWithDialer(roundTripperConfig{
ctx: req.Context(),
sess: sess,
Expand All @@ -2189,6 +2195,27 @@ func (f *Forwarder) getSPDYDialer(sess *clusterSession, req *http.Request) (http
return spdy.NewDialer(upgradeRoundTripper, client, req.Method, req.URL), nil
}

// createSPDYRequest modifies the passed request to remove
// WebSockets headers and add SPDY upgrade information, including
// spdy protocols acceptable to the client.
func createSPDYRequest(req *http.Request, spdyProtocols ...string) *http.Request {
clone := req.Clone(req.Context())
// Clean up the websocket headers from the http request.
clone.Header.Del(wsstream.WebSocketProtocolHeader)
clone.Header.Del("Sec-Websocket-Key")
clone.Header.Del("Sec-Websocket-Version")
clone.Header.Del(httpstream.HeaderUpgrade)
// Update the http request for an upstream SPDY upgrade.
clone.Method = "POST"
clone.Body = nil // Remove the request body which is unused.
clone.Header.Set(httpstream.HeaderUpgrade, httpstreamspdy.HeaderSpdy31)
clone.Header.Del(httpstream.HeaderProtocolVersion)
for i := range spdyProtocols {
clone.Header.Add(httpstream.HeaderProtocolVersion, spdyProtocols[i])
}
return clone
}

// clusterSession contains authenticated user session to the target cluster:
// x509 short lived credentials, forwarding proxies and other data
type clusterSession struct {
Expand Down
60 changes: 60 additions & 0 deletions lib/kube/proxy/portforward_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ import (
"strings"
"sync"

gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/httpstream"
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
portforwardconstants "k8s.io/apimachinery/pkg/util/portforward"
"k8s.io/client-go/tools/portforward"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -297,3 +301,59 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair)

h.Debugf("Port forwarding pair completed.")
}

// runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol
// over WebSockets.
func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error {
targetConn, _, err := req.targetDialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return trace.Wrap(err, "error upgrading target connection")
}
defer targetConn.Close()

// Try to upgrade the websocket connection.
// Beyond this point, we don't need to write errors to the response.
upgrader := gwebsocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
Subprotocols: []string{portforwardconstants.WebsocketsSPDYTunnelingPortForwardV1},
}
conn, err := upgrader.Upgrade(req.httpResponseWriter, req.httpRequest, nil)
if err != nil {
return trace.Wrap(err)
}

tunneledConn := portforward.NewTunnelingConnection("server", conn)

streamChan := make(chan httpstream.Stream, 1)
spdyConn, err := spdystream.NewServerConnectionWithPings(
tunneledConn,
httpStreamReceived(req.context, streamChan),
req.pingPeriod,
)
if err != nil {
return trace.Wrap(err, "error upgrading connection")
}

if conn == nil {
return trace.ConnectionProblem(nil, "Unable to upgrade websocket connection")
}
defer conn.Close()

h := &portForwardProxy{
Entry: logrus.WithFields(logrus.Fields{
teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube),
events.RemoteAddr: req.httpRequest.RemoteAddr,
}),
portForwardRequest: req,
sourceConn: spdyConn,
streamChan: streamChan,
streamPairs: make(map[string]*httpStreamPair),
streamCreationTimeout: DefaultStreamCreationTimeout,
targetConn: targetConn,
}
defer h.Close()
h.Debugf("Setting port forwarding streaming connection idle timeout to %v", IdleTimeout)
spdyConn.SetIdleTimeout(IdleTimeout)
h.run()
return nil
}
Loading