diff --git a/lib/kube/proxy/auth_test.go b/lib/kube/proxy/auth_test.go index 565981138c57..ba88bdfecb60 100644 --- a/lib/kube/proxy/auth_test.go +++ b/lib/kube/proxy/auth_test.go @@ -34,6 +34,7 @@ import ( "golang.org/x/exp/maps" authzapi "k8s.io/api/authorization/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/kubernetes" authztypes "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" @@ -212,7 +213,11 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, - kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), + kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, rbacSupportedTypes: rbacSupportedTypes, }, "bar": { @@ -222,6 +227,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "bar"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -232,6 +241,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "baz"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -257,6 +270,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, teleClusterName), rbacSupportedTypes: rbacSupportedTypes, }, @@ -275,6 +292,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "foo"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -285,6 +306,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "bar"), rbacSupportedTypes: rbacSupportedTypes, }, @@ -295,6 +320,10 @@ current-context: foo kubeClient: &kubernetes.Clientset{}, clientRestCfg: &rest.Config{}, }, + kubeClusterVersion: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, kubeCluster: mustCreateKubernetesClusterV3(t, "baz"), rbacSupportedTypes: rbacSupportedTypes, }, diff --git a/lib/kube/proxy/exec_test.go b/lib/kube/proxy/exec_test.go index 8fc4949c4652..3aca4f095b15 100644 --- a/lib/kube/proxy/exec_test.go +++ b/lib/kube/proxy/exec_test.go @@ -34,6 +34,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubectl/pkg/scheme" @@ -455,3 +456,146 @@ func TestExecMissingGETPermissionError(t *testing.T) { }) } } + +func TestExecWebsocketEndToEndErrReturn(t *testing.T) { + t.Parallel() + + const ( + errorMessage = "pods \"api-1\" is forbidden: User \"bar\" cannot %s resource " + + "\"pods/exec\" in API group \"\" in the namespace \"ns\"" + ) + + const errorCode = http.StatusForbidden + + kubeMock, err := testingkubemock.NewKubeAPIMock( + testingkubemock.WithExecError( + metav1.Status{ + Status: metav1.StatusFailure, + Message: fmt.Sprintf(errorMessage, "get"), + Reason: metav1.StatusReasonForbidden, + Code: errorCode, + }, + ), + testingkubemock.WithVersion( + &apimachineryversion.Info{ + Major: "1", + Minor: "31", + GitVersion: "v1.31.0", + }), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") + require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request") + kubeMock.Close() + }) + var ( + execEvent *apievents.Exec + eventsLock sync.Mutex + ) + + // creates a Kubernetes service with a configured cluster pointing to mock api server + testCtx := SetupTestContext( + context.Background(), + t, + TestConfig{ + Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}}, + OnEvent: func(evt apievents.AuditEvent) { + eventsLock.Lock() + defer eventsLock.Unlock() + if exec, ok := evt.(*apievents.Exec); ok { + execEvent = exec + } + }, + }, + ) + + t.Cleanup(func() { require.NoError(t, testCtx.Close()) }) + + // create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified) + user, _ := testCtx.CreateUserAndRole( + testCtx.Context, + t, + username, + RoleSpec{ + Name: roleName, + KubeUsers: roleKubeUsers, + KubeGroups: roleKubeGroups, + }) + + // generate a kube client with user certs for auth + _, userRestConfig := testCtx.GenTestKubeClientTLSCert( + t, + user.GetName(), + kubeCluster, + ) + + tests := []struct { + name string + interactive bool + }{ + { + name: "error propagation in non-interactive session", + }, + { + name: "error propgation in interactive session", + interactive: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var streamOpts remotecommand.StreamOptions + if !tt.interactive { + streamOpts = remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &bytes.Buffer{}, + Stderr: &bytes.Buffer{}, + Tty: false, + } + } else { + stdinReader, _ := io.Pipe() + t.Cleanup(func() { stdinReader.Close() }) + streamOpts = remotecommand.StreamOptions{ + Stdin: stdinReader, + Stdout: &bytes.Buffer{}, + Stderr: nil, + Tty: true, + } + } + req, err := generateExecRequest( + generateExecRequestConfig{ + addr: testCtx.KubeProxyAddress(), + podName: podName, + podNamespace: podNamespace, + containerName: podContainerName, + cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod + options: streamOpts, + }, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL()) + require.NoError(t, err) + err = exec.StreamWithContext(context.Background(), streamOpts) + require.Error(t, err) + require.Contains(t, err.Error(), kubernetes130BreakingChangeHint) + + require.Eventually(t, func() bool { + eventsLock.Lock() + defer eventsLock.Unlock() + return execEvent != nil + }, 5*time.Second, 100*time.Millisecond, "expected exec event to be recorded") + + eventsLock.Lock() + require.Equal(t, events.ExecFailureCode, execEvent.Code) + require.Equal(t, "403", execEvent.ExitCode) + require.NotEmpty(t, execEvent.Error) + eventsLock.Unlock() + + }) + } + +} diff --git a/lib/kube/proxy/roundtrip.go b/lib/kube/proxy/roundtrip.go index ed68364f5fe2..f2d172f990b0 100644 --- a/lib/kube/proxy/roundtrip.go +++ b/lib/kube/proxy/roundtrip.go @@ -259,23 +259,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection)) upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade)) if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(streamspdy.HeaderSpdy31)) { - defer resp.Body.Close() - responseError := "" - responseErrorBytes, err := io.ReadAll(resp.Body) - if err != nil { - responseError = "unable to read error from server response" - } else { - // TODO: I don't belong here, I should be abstracted from this class - if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { - if status, ok := obj.(*metav1.Status); ok { - return nil, &apierrors.StatusError{ErrStatus: *status} - } - } - responseError = string(responseErrorBytes) - responseError = strings.TrimSpace(responseError) - } - - return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) + return nil, trace.Wrap(extractKubeAPIStatusFromReq(resp)) } return streamspdy.NewClientConnectionWithPings(s.conn, s.pingPeriod) @@ -292,3 +276,24 @@ func init() { &metav1.Status{}, ) } + +// extractKubeAPIStatusFromReq extracts the status from the response body and returns it as an error. +func extractKubeAPIStatusFromReq(rsp *http.Response) error { + defer func() { + _ = rsp.Body.Close() + }() + responseError := "" + responseErrorBytes, err := io.ReadAll(rsp.Body) + if err != nil { + responseError = "unable to read error from server response" + } else { + if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { + if status, ok := obj.(*metav1.Status); ok { + return &apierrors.StatusError{ErrStatus: *status} + } + } + responseError = string(responseErrorBytes) + responseError = strings.TrimSpace(responseError) + } + return fmt.Errorf("unable to upgrade connection: %s", responseError) +} diff --git a/lib/kube/proxy/roundtrip_websocket.go b/lib/kube/proxy/roundtrip_websocket.go index 6e232b46eb61..1ee847384c82 100644 --- a/lib/kube/proxy/roundtrip_websocket.go +++ b/lib/kube/proxy/roundtrip_websocket.go @@ -103,6 +103,9 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er wsConn, wsResp, err := wsDialer.DialContext(w.ctx, clone.URL.String(), clone.Header) if err != nil { + if wsResp != nil { + return nil, trace.Wrap(extractKubeAPIStatusFromReq(wsResp)) + } return nil, &httpstream.UpgradeFailureError{Cause: err} } w.conn = wsConn diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index ff43040d4ae3..eb1cc97edacc 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -30,6 +30,7 @@ import ( "net/http/httptest" "strings" "sync" + "sync/atomic" "time" "github.com/gravitational/trace" @@ -44,6 +45,7 @@ import ( spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand" + apimachineryversion "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/client-go/tools/remotecommand" @@ -116,10 +118,26 @@ func WithExecError(status metav1.Status) Option { } } +// WithVersion sets the version of the server +func WithVersion(version *apimachineryversion.Info) Option { + return func(s *KubeMockServer) { + s.version = version + } +} + type deletedResource struct { requestID string kind string } + +// KubeExecRequests keeps track of the number of exec requests +type KubeExecRequests struct { + // SPDY is the number of SPDY exec requests + SPDY atomic.Int32 + // Websocket is the number of Websocket exec requests + Websocket atomic.Int32 +} + type KubeMockServer struct { router *httprouter.Router log *log.Entry @@ -132,6 +150,8 @@ type KubeMockServer struct { getPodError *metav1.Status execPodError *metav1.Status mu sync.Mutex + version *apimachineryversion.Info + KubeExecRequests } // NewKubeAPIMock creates Kubernetes API server for handling exec calls. @@ -146,6 +166,10 @@ func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) { router: httprouter.New(), log: log.NewEntry(log.New()), deletedResources: make(map[deletedResource][]string), + version: &apimachineryversion.Info{ + Major: "1", + Minor: "20", + }, } for _, o := range opts { @@ -191,6 +215,8 @@ func (s *KubeMockServer) setup() { s.router.GET("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.getTeleportRole)) s.router.DELETE("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.deleteTeleportRole)) + s.router.GET("/version", s.withWriter(s.versionEndpoint)) + for _, endpoint := range []string{"/api", "/api/:ver", "/apis", "/apis/resources.teleport.dev/v6"} { s.router.GET(endpoint, s.withWriter(s.discoveryEndpoint)) } @@ -238,6 +264,12 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro } func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) { + if wsstream.IsWebSocketRequest(req) { + s.KubeExecRequests.Websocket.Add(1) + } else { + s.KubeExecRequests.SPDY.Add(1) + } + q := req.URL.Query() if s.execPodError != nil { s.writeResponseError(w, nil, s.execPodError) @@ -778,3 +810,10 @@ func httpStreamReceived(ctx context.Context, streams chan httpstream.Stream) fun } } } + +func (s *KubeMockServer) versionEndpoint(_ http.ResponseWriter, _ *http.Request, _ httprouter.Params) (resp any, err error) { + if s.version == nil { + return nil, trace.BadParameter("version not set") + } + return s.version, nil +}