Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] [kube] return Kubernetes API errors when using Websocket API #46811

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion lib/kube/proxy/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"golang.org/x/exp/maps"
authzapi "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachineryversion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
authztypes "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -212,7 +213,11 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeCluster: mustCreateKubernetesClusterV3(t, "foo"),
kubeCluster: mustCreateKubernetesClusterV3(t, "foo"),
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
rbacSupportedTypes: rbacSupportedTypes,
},
"bar": {
Expand All @@ -222,6 +227,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "bar"),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -232,6 +241,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "baz"),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -257,6 +270,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, teleClusterName),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -275,6 +292,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "foo"),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -285,6 +306,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "bar"),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -295,6 +320,10 @@ current-context: foo
kubeClient: &kubernetes.Clientset{},
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "baz"),
rbacSupportedTypes: rbacSupportedTypes,
},
Expand Down
144 changes: 144 additions & 0 deletions lib/kube/proxy/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachineryversion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
Expand Down Expand Up @@ -455,3 +456,146 @@ func TestExecMissingGETPermissionError(t *testing.T) {
})
}
}

func TestExecWebsocketEndToEndErrReturn(t *testing.T) {
t.Parallel()

const (
errorMessage = "pods \"api-1\" is forbidden: User \"bar\" cannot %s resource " +
"\"pods/exec\" in API group \"\" in the namespace \"ns\""
)

const errorCode = http.StatusForbidden

kubeMock, err := testingkubemock.NewKubeAPIMock(
testingkubemock.WithExecError(
metav1.Status{
Status: metav1.StatusFailure,
Message: fmt.Sprintf(errorMessage, "get"),
Reason: metav1.StatusReasonForbidden,
Code: errorCode,
},
),
testingkubemock.WithVersion(
&apimachineryversion.Info{
Major: "1",
Minor: "31",
GitVersion: "v1.31.0",
}),
)
require.NoError(t, err)
t.Cleanup(func() {
require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests")
require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request")
kubeMock.Close()
})
var (
execEvent *apievents.Exec
eventsLock sync.Mutex
)

// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
OnEvent: func(evt apievents.AuditEvent) {
eventsLock.Lock()
defer eventsLock.Unlock()
if exec, ok := evt.(*apievents.Exec); ok {
execEvent = exec
}
},
},
)

t.Cleanup(func() { require.NoError(t, testCtx.Close()) })

// create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified)
user, _ := testCtx.CreateUserAndRole(
testCtx.Context,
t,
username,
RoleSpec{
Name: roleName,
KubeUsers: roleKubeUsers,
KubeGroups: roleKubeGroups,
})

// generate a kube client with user certs for auth
_, userRestConfig := testCtx.GenTestKubeClientTLSCert(
t,
user.GetName(),
kubeCluster,
)

tests := []struct {
name string
interactive bool
}{
{
name: "error propagation in non-interactive session",
},
{
name: "error propgation in interactive session",
interactive: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

var streamOpts remotecommand.StreamOptions
if !tt.interactive {
streamOpts = remotecommand.StreamOptions{
Stdin: nil,
Stdout: &bytes.Buffer{},
Stderr: &bytes.Buffer{},
Tty: false,
}
} else {
stdinReader, _ := io.Pipe()
t.Cleanup(func() { stdinReader.Close() })
streamOpts = remotecommand.StreamOptions{
Stdin: stdinReader,
Stdout: &bytes.Buffer{},
Stderr: nil,
Tty: true,
}
}
req, err := generateExecRequest(
generateExecRequestConfig{
addr: testCtx.KubeProxyAddress(),
podName: podName,
podNamespace: podNamespace,
containerName: podContainerName,
cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod
options: streamOpts,
},
)
require.NoError(t, err)

exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL())
require.NoError(t, err)
err = exec.StreamWithContext(context.Background(), streamOpts)
require.Error(t, err)
require.Contains(t, err.Error(), kubernetes130BreakingChangeHint)

require.Eventually(t, func() bool {
eventsLock.Lock()
defer eventsLock.Unlock()
return execEvent != nil
}, 5*time.Second, 100*time.Millisecond, "expected exec event to be recorded")

eventsLock.Lock()
require.Equal(t, events.ExecFailureCode, execEvent.Code)
require.Equal(t, "403", execEvent.ExitCode)
require.NotEmpty(t, execEvent.Error)
eventsLock.Unlock()

})
}

}
39 changes: 22 additions & 17 deletions lib/kube/proxy/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec
connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection))
upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade))
if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(streamspdy.HeaderSpdy31)) {
defer resp.Body.Close()
responseError := ""
responseErrorBytes, err := io.ReadAll(resp.Body)
if err != nil {
responseError = "unable to read error from server response"
} else {
// TODO: I don't belong here, I should be abstracted from this class
if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
if status, ok := obj.(*metav1.Status); ok {
return nil, &apierrors.StatusError{ErrStatus: *status}
}
}
responseError = string(responseErrorBytes)
responseError = strings.TrimSpace(responseError)
}

return nil, fmt.Errorf("unable to upgrade connection: %s", responseError)
return nil, trace.Wrap(extractKubeAPIStatusFromReq(resp))
}

return streamspdy.NewClientConnectionWithPings(s.conn, s.pingPeriod)
Expand All @@ -292,3 +276,24 @@ func init() {
&metav1.Status{},
)
}

// extractKubeAPIStatusFromReq extracts the status from the response body and returns it as an error.
func extractKubeAPIStatusFromReq(rsp *http.Response) error {
defer func() {
_ = rsp.Body.Close()
}()
responseError := ""
responseErrorBytes, err := io.ReadAll(rsp.Body)
if err != nil {
responseError = "unable to read error from server response"
} else {
if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
if status, ok := obj.(*metav1.Status); ok {
return &apierrors.StatusError{ErrStatus: *status}
}
}
responseError = string(responseErrorBytes)
responseError = strings.TrimSpace(responseError)
}
return fmt.Errorf("unable to upgrade connection: %s", responseError)
}
3 changes: 3 additions & 0 deletions lib/kube/proxy/roundtrip_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er

wsConn, wsResp, err := wsDialer.DialContext(w.ctx, clone.URL.String(), clone.Header)
if err != nil {
if wsResp != nil {
return nil, trace.Wrap(extractKubeAPIStatusFromReq(wsResp))
}
return nil, &httpstream.UpgradeFailureError{Cause: err}
}
w.conn = wsConn
Expand Down
39 changes: 39 additions & 0 deletions lib/kube/proxy/testing/kube_server/kube_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gravitational/trace"
Expand All @@ -44,6 +45,7 @@ import (
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand"
apimachineryversion "k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/client-go/tools/remotecommand"

Expand Down Expand Up @@ -116,10 +118,26 @@ func WithExecError(status metav1.Status) Option {
}
}

// WithVersion sets the version of the server
func WithVersion(version *apimachineryversion.Info) Option {
return func(s *KubeMockServer) {
s.version = version
}
}

type deletedResource struct {
requestID string
kind string
}

// KubeExecRequests keeps track of the number of exec requests
type KubeExecRequests struct {
// SPDY is the number of SPDY exec requests
SPDY atomic.Int32
// Websocket is the number of Websocket exec requests
Websocket atomic.Int32
}

type KubeMockServer struct {
router *httprouter.Router
log *log.Entry
Expand All @@ -132,6 +150,8 @@ type KubeMockServer struct {
getPodError *metav1.Status
execPodError *metav1.Status
mu sync.Mutex
version *apimachineryversion.Info
KubeExecRequests
}

// NewKubeAPIMock creates Kubernetes API server for handling exec calls.
Expand All @@ -146,6 +166,10 @@ func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) {
router: httprouter.New(),
log: log.NewEntry(log.New()),
deletedResources: make(map[deletedResource][]string),
version: &apimachineryversion.Info{
Major: "1",
Minor: "20",
},
}

for _, o := range opts {
Expand Down Expand Up @@ -191,6 +215,8 @@ func (s *KubeMockServer) setup() {
s.router.GET("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.getTeleportRole))
s.router.DELETE("/apis/resources.teleport.dev/v6/namespaces/:namespace/teleportroles/:name", s.withWriter(s.deleteTeleportRole))

s.router.GET("/version", s.withWriter(s.versionEndpoint))

for _, endpoint := range []string{"/api", "/api/:ver", "/apis", "/apis/resources.teleport.dev/v6"} {
s.router.GET(endpoint, s.withWriter(s.discoveryEndpoint))
}
Expand Down Expand Up @@ -238,6 +264,12 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro
}

func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) {
if wsstream.IsWebSocketRequest(req) {
s.KubeExecRequests.Websocket.Add(1)
} else {
s.KubeExecRequests.SPDY.Add(1)
}

q := req.URL.Query()
if s.execPodError != nil {
s.writeResponseError(w, nil, s.execPodError)
Expand Down Expand Up @@ -778,3 +810,10 @@ func httpStreamReceived(ctx context.Context, streams chan httpstream.Stream) fun
}
}
}

func (s *KubeMockServer) versionEndpoint(_ http.ResponseWriter, _ *http.Request, _ httprouter.Params) (resp any, err error) {
if s.version == nil {
return nil, trace.BadParameter("version not set")
}
return s.version, nil
}
Loading