Skip to content

Commit

Permalink
Session streaming
Browse files Browse the repository at this point in the history
This commit introduces GRPC API for streaming sessions.

It adds structured events and sync streaming
that avoids storing events on disk.

You can find design in rfd/0002-streaming.md RFD.
  • Loading branch information
klizhentas committed Sep 29, 2020
1 parent c5a855c commit d160507
Show file tree
Hide file tree
Showing 129 changed files with 36,300 additions and 4,384 deletions.
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ ifneq ("$(OS)", "windows")
zip -q -A $(BUILDDIR)/teleport
endif

#
# make full-ent - Builds Teleport enterprise binaries
#
.PHONY:full-ent
full-ent:
ifneq ("$(OS)", "windows")
@if [ -f e/Makefile ]; then $(MAKE) -C e full; fi
endif

#
# make clean - Removed all build artifacts.
#
Expand Down Expand Up @@ -386,6 +395,10 @@ buildbox-grpc:
--gofast_out=plugins=grpc:.\
*.proto

cd lib/multiplexer/test && protoc -I=.:$$PROTO_INCLUDE \
--gofast_out=plugins=grpc:.\
*.proto

.PHONY: goinstall
goinstall:
go install $(BUILDFLAGS) \
Expand Down
13 changes: 13 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ const (
SSHSessionID = "SSH_SESSION_ID"
)

const (
// HTTPNextProtoTLS is the NPN/ALPN protocol negotiated during
// HTTP/1.1.'s TLS setup.
// https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids
HTTPNextProtoTLS = "http/1.1"
)

const (
// HTTPSProxy is an environment variable pointing to a HTTPS proxy.
HTTPSProxy = "HTTPS_PROXY"
Expand Down Expand Up @@ -324,6 +331,12 @@ const (
// storage
SchemeGCS = "gs"

// GCSTestURI turns on GCS tests
GCSTestURI = "TEST_GCS_URI"

// AWSRunTests turns on tests executed against AWS directly
AWSRunTests = "TEST_AWS"

// Region is AWS region parameter
Region = "region"

Expand Down
2 changes: 1 addition & 1 deletion e
Submodule e updated from b462ec to ec2061
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/gravitational/form v0.0.0-20151109031454-c4048f792f70
github.com/gravitational/kingpin v2.1.11-0.20190130013101-742f2714c145+incompatible
github.com/gravitational/license v0.0.0-20180912170534-4f189e3bd6e3
github.com/gravitational/oxy v0.0.0-20180629203109-e4a7e35311e6
github.com/gravitational/oxy v0.0.0-20200916204440-3eb06d921a1d
github.com/gravitational/reporting v0.0.0-20180907002058-ac7b85c75c4c
github.com/gravitational/roundtrip v1.0.0
github.com/gravitational/trace v1.1.6
Expand Down Expand Up @@ -90,7 +90,7 @@ require (
google.golang.org/api v0.10.0
google.golang.org/appengine v1.6.3 // indirect
google.golang.org/genproto v0.0.0-20190916214212-f660b8655731
google.golang.org/grpc v1.23.0
google.golang.org/grpc v1.23.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ github.com/gravitational/license v0.0.0-20180912170534-4f189e3bd6e3 h1:vy9WwUq3H
github.com/gravitational/license v0.0.0-20180912170534-4f189e3bd6e3/go.mod h1:jaxS7X2ouXfNd2Pxpybd01qNQK15UmkixKj4vtpp7f8=
github.com/gravitational/logrus v0.10.1-0.20171120195323-8ab1e1b91d5f h1:FeloE/ofwzo61I0npMIJlqlrQxNPpbQBoWhzRdoUIAo=
github.com/gravitational/logrus v0.10.1-0.20171120195323-8ab1e1b91d5f/go.mod h1:iMtAvwI44N8L2IBvRF4G6NccFxkSYa/Kp8jWVTg3/wQ=
github.com/gravitational/oxy v0.0.0-20180629203109-e4a7e35311e6 h1:244Hc0XnOrqZxR0Fbwt9nwlvM5HnqKWJE+r5EdG6v4A=
github.com/gravitational/oxy v0.0.0-20180629203109-e4a7e35311e6/go.mod h1:ESOxlf8BB2yG3zJ0SfZe9U6wpYu3YF3znxIICg73FYA=
github.com/gravitational/oxy v0.0.0-20200916204440-3eb06d921a1d h1:IsbTjCQ4u5mr30ceWZ4GNcrQkp/Y/J9G+s9prmJm1ac=
github.com/gravitational/oxy v0.0.0-20200916204440-3eb06d921a1d/go.mod h1:ESOxlf8BB2yG3zJ0SfZe9U6wpYu3YF3znxIICg73FYA=
github.com/gravitational/reporting v0.0.0-20180907002058-ac7b85c75c4c h1:UwN3jo2EfZSGDchLVqH/EJ2A5GWvKROx3NJNUI6/plg=
github.com/gravitational/reporting v0.0.0-20180907002058-ac7b85c75c4c/go.mod h1:rBJeI3JYVzbL7Yw2hYrp4QdKIkncb1pUHo95DyoEGns=
github.com/gravitational/roundtrip v1.0.0 h1:eb+0EABfSKC8607CQ4oOyWCm9zVIfio/wW78TjQqLSc=
Expand Down Expand Up @@ -480,6 +480,8 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
4 changes: 2 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type TeleInstance struct {
Nodes []*service.TeleportProcess

// UploadEventsC is a channel for upload events
UploadEventsC chan *events.UploadEvent
UploadEventsC chan events.UploadEvent
}

type User struct {
Expand Down Expand Up @@ -218,7 +218,7 @@ func NewInstance(cfg InstanceConfig) *TeleInstance {
i := &TeleInstance{
Ports: cfg.Ports,
Hostname: cfg.NodeName,
UploadEventsC: make(chan *events.UploadEvent, 100),
UploadEventsC: make(chan events.UploadEvent, 100),
}
secrets := InstanceSecrets{
SiteName: cfg.ClusterName,
Expand Down
13 changes: 11 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
inForwardAgent bool
auditSessionsURI string
}{

// normal teleport
{
inRecordLocation: services.RecordAtNode,
Expand All @@ -269,6 +268,16 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
inForwardAgent: false,
auditSessionsURI: c.MkDir(),
},
// normal teleport, sync recording
{
inRecordLocation: services.RecordAtNodeSync,
inForwardAgent: false,
},
// recording proxy, sync recording
{
inRecordLocation: services.RecordAtProxySync,
inForwardAgent: true,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -518,7 +527,7 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
// the ID of the node. If sessions are being recorded at the proxy, then
// SessionServerID should be that of the proxy.
expectedServerID := nodeProcess.Config.HostUUID
if tt.inRecordLocation == services.RecordAtProxy {
if services.IsRecordAtProxy(tt.inRecordLocation) {
expectedServerID = t.Process.Config.HostUUID
}
c.Assert(start.GetString(events.SessionServerID), check.Equals, expectedServerID)
Expand Down
20 changes: 17 additions & 3 deletions lib/auth/api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2015 Gravitational, Inc.
Copyright 2015-2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,11 @@ import (
"context"
"io"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"

"github.com/gravitational/trace"
)

// Announcer specifies interface responsible for announcing presence
Expand Down Expand Up @@ -102,6 +104,8 @@ type AccessPoint interface {
ReadAccessPoint
// Announcer adds methods used to announce presence
Announcer
// Streamer creates and manages audit streams
events.Streamer

// Semaphores provides semaphore operations
services.Semaphores
Expand Down Expand Up @@ -161,6 +165,16 @@ type Wrapper struct {
NoCache AccessPoint
}

// ResumeAuditStream resumes existing audit stream
func (w *Wrapper) ResumeAuditStream(ctx context.Context, sid session.ID, uploadID string) (events.Stream, error) {
return w.NoCache.ResumeAuditStream(ctx, sid, uploadID)
}

// CreateAuditStream creates new audit stream
func (w *Wrapper) CreateAuditStream(ctx context.Context, sid session.ID) (events.Stream, error) {
return w.NoCache.CreateAuditStream(ctx, sid)
}

// Close closes all associated resources
func (w *Wrapper) Close() error {
err := w.NoCache.Close()
Expand Down
25 changes: 20 additions & 5 deletions lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ type APIConfig struct {
SessionService session.Service
AuditLog events.IAuditLog
Authorizer Authorizer
Emitter events.Emitter
// KeepAlivePeriod defines period between keep alives
KeepAlivePeriod time.Duration
// KeepAliveCount specifies amount of missed keep alives
// to wait for until declaring connection as broken
KeepAliveCount int
}

// CheckAndSetDefaults checks and sets default values
func (a *APIConfig) CheckAndSetDefaults() error {
if a.KeepAlivePeriod == 0 {
a.KeepAlivePeriod = defaults.ServerKeepAliveTTL
}
if a.KeepAliveCount == 0 {
a.KeepAliveCount = defaults.KeepAliveCountMax
}
return nil
}

// APIServer implements http API server for AuthServer interface
Expand Down Expand Up @@ -260,9 +277,7 @@ func (s *APIServer) withAuth(handler HandlerWithAuthFunc) httprouter.Handle {
}
auth := &AuthWithRoles{
authServer: s.AuthServer,
user: authContext.User,
checker: authContext.Checker,
identity: authContext.Identity,
context: *authContext,
sessions: s.SessionService,
alog: s.AuthServer.IAuditLog,
}
Expand Down Expand Up @@ -1869,9 +1884,9 @@ func (s *APIServer) emitAuditEvent(auth ClientI, w http.ResponseWriter, r *http.
// For backwards compatibility, check if the full event struct has
// been sent in the request or just the event type.
if req.Event.Name != "" {
err = auth.EmitAuditEvent(req.Event, req.Fields)
err = auth.EmitAuditEventLegacy(req.Event, req.Fields)
} else {
err = auth.EmitAuditEvent(events.Event{Name: req.Type}, req.Fields)
err = auth.EmitAuditEventLegacy(events.Event{Name: req.Type}, req.Fields)
}
if err != nil {
return nil, trace.Wrap(err)
Expand Down
Loading

0 comments on commit d160507

Please sign in to comment.