Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session streams structured events. #4045

Merged
merged 1 commit into from
Sep 29, 2020
Merged

Session streams structured events. #4045

merged 1 commit into from
Sep 29, 2020

Conversation

klizhentas
Copy link
Contributor

@klizhentas klizhentas commented Jul 15, 2020

This commit refactors events subsystem in teleport in a backwards compatible way and fixes several issues.

fixes #3549 (Session streaming design), fixes #3800 (Events overwritten in DynamoDB), fixes #3182 (Teleport consuming all disk space with multipart uploads)

Structured Events

Events have been refactored from unstructured to structured definitions generated
from protobuf spec

Each event embeds common required metadata:

// Metadata is a common event metadata
message Metadata {
    // Index is a monotonically incremented index in the event sequence
    int64 Index = 1;

    // Type is the event type
    string Type = 2;

    // ID is a unique event identifier
    string ID = 3;

    // Code is a unique event code
    string Code = 4;

    // Time is event time
    google.protobuf.Timestamp Time = 5;
}

This metadata is accompanied by common event fields:

// GetType returns event type
func (m *Metadata) GetType() string {
	return m.Type
}

// SetType sets unique type
func (m *Metadata) SetType(etype string) {
	m.Type = etype
}

That allow every event to comply with the common interface

// AuditEvent represents audit event
type AuditEvent interface {
	// ProtoMarshaler implements efficient
	// protobuf marshaling methods
	ProtoMarshaler

	// GetID returns unique event ID
	GetID() string
	// SetID sets unique event ID
	SetID(id string)

Session events

Session events embed session metadata:

// SesssionMetadata is a common session event metadata
message SessionMetadata {
    // SessionID is a unique UUID of the session.
    string SessionID = 1 [ (gogoproto.jsontag) = "sid" ];
}

And implement extended interfaces:

// ServerMetadataGetter represents interface
// that provides information about it's server id
type ServerMetadataGetter interface {
	// GetServerID returns event server ID
	GetServerID() string

	// GetServerNamespace returns event server namespace
	GetServerNamespace() string
}

This approach allows common event interface to be converted to other event classes without casting to specific type

	getter, ok := in.(events.SessionMetadataGetter)
	if ok && getter.GetSessionID() != "" {
		sessionID = getter.GetSessionID()
	} else {

Other event types

Other event types, such as events dealing with connections embed other common types of metadata, introducing different event classes, for example connection metadata events:

// Connection contains connection infro
message ConnectionMetadata {
    // LocalAddr is a target address on the host
    string LocalAddr = 1 ;

    // RemoteAddr is a client (user's) address
    string RemoteAddr = 2;

    // Protocol specifies protocol that was captured
    string Protocol = 3;
}

Streams

Streamer is the new interface for nodes and proxies to send session events to the auth server as a continuous sequence of events:

// Streamer creates and resumes event streams for session IDs
type Streamer interface {
	// CreateAuditStream creates event stream
	CreateAuditStream(context.Context, session.ID) (Stream, error)
	// ResumeAuditStream resumes the stream for session upload that
	// has not been completed yet.
	ResumeAuditStream(ctx context.Context, sid session.ID, uploadID string) (Stream, error)
}

Nodes and proxies can resume streams that were interrupted using upload ID.

Streams represent continuous sequence of events associated with session.

// Stream used to create continuous ordered sequence of events
// associated with a session.
type Stream interface {
	// Status returns channel receiving updates about stream status
	// last event index that was uploaded and upload ID
	Status() <-chan StreamStatus
....
}

Clients can use stream status to create back-pressure - (stop sending until streams reports events uploaded) or resume the upload without re sending all events.

Uploaders

Uploaders provide abstraction over multipart upload API, specifically S3 for AWS and GCS for Google.

The stream on-disk format is optimized to support parallel uploads of events to S3 and resuming of uploads.

Session events storage format

The storage format for session recordings is optimized around fast marshal/unmarshal using protobuf, compression using gzip and support for parallel uploads to S3 or GCS storage.

Unlike v0 (json based multiple file recording format), the individual session is represented by continuous globally ordered sequence of events serialized to binary protobuf format.

//
// Each slice is composed of three parts:
//
// 1. Slice starts with 24 bytes version header
//
// * 8 bytes for the format version (used for future expansion)
// * 8 bytes for meaningful size of the part
// * 8 bytes for padding at the end of the slice (if present)
//
// 2. V1 body of the slice is gzipped protobuf messages in binary format.
//
// 3. Optional padding (if specified in the header), required
// to bring slices to minimum slice size.
//
// The slice size is determined by S3 multipart upload requirements:
//
// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
//
// This design allows the streamer to upload slices S3-compatible APIs
// in parallel without buffering to disk.
//

Multipart Uploader storage backends

  • The design is centered to be maximally compatible with the most popular S3 upload format supported by AWS, IBM and third party cloud providers
  • GCS
  • File local storage supports multipart upload (by storing each uploaded part in a separate file), but compresses

Backwards compatibility

  • Converter is implemented to avoid rewriting player code and for other possible backwards compatibility tasks.
  • Events are marshaled to JSON in some cases, like dynamodb and disk with global events in a backwards compatible format. Test suite verifies that serialization is backwards compatible with JSON format.

GRPC

GRPC streaming

Nodes and proxies are using GRPC interface implementation to submit individual global events and create and resume streams

GRPC/HTTPs protocol switching

ServeHTTP compatibility handler that allowed to serve GRPC over HTTPs connection had to be replaced with native GRPC problems, because of the problems with backpressure described here.

This in turn required to implement protocol switching to be done on TLS level using NextProto. This approach takes the advantage of the fact that all clients (Golang) are using HTTP/1.1, so old clients talking to the new Auth server will not be affected (gprc will continue to work as expected using HTTP/2 and legacy API is still served by HTTP/1.1 until deprecated). We are not aware of the clients accessing the server's legacy HTTPs API over HTTP/2, but those if exist would be broken.

Sync and async streams

The existing 4.3 stream implementation is async - the sessions are streamed on disk of proxy and node and then uploaded as a single tarball. This created performance and stability problems for large uploads - #3182 (Teleport consuming all disk space with multipart uploads) and security concerns - storage on disk required disk encryption to support FedRamp mode.

Sync stream

This commit introduces experimental proxy-sync and node-sync recording mode. In this mode, proxy and node sends logs directly to the auth server that in turn sends the recordings to S3 or other storage without storing the records on disk at all.

This created potential problem of resuming the session stream. The new audit writer takes advantage of stream status reporting and a new option to resume stream to replay the events that have not been uploaded to the storage.

Because auth server never stores any local data about stream and instead initiates multipart upload that can be resumed on the auth server, the loss of the single auth server will not lead to sync sessions problems in case if another auth server is available to resume the stream.

Async streams

Default mode in teleport 4.4 remains async, with file uploader used to store the events on disk in the new protobuf format.

Disk uploader now attempts to resume the upload to the auth server based on the last reported status if possible solving the problem of very large uploads interrupted because of the server overload or intermittent network problems.

Unified stream interface

The sync and async streams are using the same GRPC API to implement functionality, session chooses the appropriate emitter based on the cluster configuration.

Completing broken streams

In teleport 4.3 some streams and sessions were never uploaded to the auth server, for example in cases when node or proxy crashed before marking the session on disk as complete, the session would stay on the proxy or node without being uploaded.

Switching to S3 based multipart upload API allowed to implement upload completer - special service that watches uploads that haven't been completed over grace period (> 12 hours) and completes them.

P.S.

@awly @fspmarshall @russjones please check out this interview guide with direct links to diff, hopefully this simplify the review of this lengthy PR.

I have added the same content in RFD format here

@webvictim
Copy link
Contributor

webvictim commented Jul 15, 2020

Builds are failing because build.assets/Dockerfile in your branch is out of date - if you can rebase that file on master then they should work.

@klizhentas klizhentas changed the title Merge in progress Teleport Streaming Jul 22, 2020
@klizhentas klizhentas force-pushed the sasha/streaming branch 3 times, most recently from 7709c4e to 90d9088 Compare July 28, 2020 16:46
@klizhentas klizhentas force-pushed the sasha/streaming branch 4 times, most recently from c32d957 to c4a6bd2 Compare July 31, 2020 02:38
@klizhentas klizhentas force-pushed the sasha/streaming branch 4 times, most recently from 2dfb89f to 7db575f Compare August 7, 2020 03:04
@klizhentas klizhentas force-pushed the sasha/streaming branch 2 times, most recently from 1a33194 to f5aae70 Compare August 20, 2020 18:24
@benarent benarent added this to the 4.4 "Rome" milestone Aug 25, 2020
@klizhentas klizhentas force-pushed the sasha/streaming branch 3 times, most recently from ae3930c to 22ae84e Compare August 27, 2020 20:59
@klizhentas klizhentas force-pushed the sasha/streaming branch 3 times, most recently from f223bef to 278c9d7 Compare September 3, 2020 01:17
@klizhentas
Copy link
Contributor Author

retest this please

@klizhentas klizhentas marked this pull request as ready for review September 3, 2020 17:17
@klizhentas
Copy link
Contributor Author

retest this please

@benarent
Copy link
Contributor

I know we plan to roll this out as experimental, but I wanted to add more content for the file config for alpha / beta testers of this. Would this be correct?

# This section configures the 'auth service':
auth_service:
    # Optional setting for configuring session recording. Possible values are:
    #    "node"  : sessions will be recorded on the node level  (the default)
    #    "proxy" : recording on the proxy level, see "recording proxy mode" section.
    #    "off"   : session recording is turned off
    #
    #    EXPERIMENTAL *-sync modes proxy and node sends logs directly to S3 or other 
    #    storage without storing the records on disk at all. This mode will kill a 
    #    connection if network connectivity is lost. 
    #
    #    "node-sync" : sessions recording will be streamed from node -> auth -> storage service  
    #    "proxy-sync : sessions recording will be streamed from proxy -> auth -> storage service
    #    
    session_recording: "node"

@klizhentas klizhentas force-pushed the sasha/streaming branch 7 times, most recently from e7fc9b8 to 1dd84c2 Compare September 18, 2020 01:09
@webvictim
Copy link
Contributor

@gravitational-jenkins retest this please

Copy link
Contributor

@awly awly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skimmed through some code in lib/events (not subdirectories).
i think we're adding some performance overhead by not using the proto types directly (see ToOneOf and FromOneOf in lib/events/convert.go, or any code that uses json encode/decode as conversion). not sure how much though.

@klizhentas did you happen to look at any CPU or memory profiles for a server under load?

Comment on lines 231 to 232
// This first drain is necessary to give status updates a priority
// in the event processing loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do status updates need priority?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you don't give updates the priority then you may never receive the update in time - the sending part of the loop can block before it gets a chance to receive status update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify - select below may pop an item off of a.eventsCh and block on EmitAuditEvents for too long without processing status updates?
how does this problem manifest at a higher level: is a.buffer growing too large spiking the memory usage or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in practice when there are too many events, select on status channel will happen much later, yes, which leads to increased buffer, yes.

Comment on lines 311 to 342
// retry is created on the first failure to resume
if retry == nil {
var rerr error
retry, rerr = utils.NewLinear(utils.LinearConfig{
Step: defaults.NetworkRetryDuration,
Max: defaults.NetworkBackoffDuration,
})
if rerr != nil {
return nil, trace.Wrap(err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not create this before the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔️

@klizhentas
Copy link
Contributor Author

@awly ToOneOf and FromOneOf are using proto types directly, they are just wrapping the types in the OneOf proto container

https://github.com/gravitational/teleport/pull/4045/files/1dd84c2004fc5374441614cc2e6cb5ffe73c3003#diff-55ef4c77efa7a2b0a04acff3280aea37R227

@awly
Copy link
Contributor

awly commented Sep 22, 2020

@klizhentas gotcha. I'm still curious to see some CPU/memory profiles if you have a chance.

Copy link
Contributor

@fspmarshall fspmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially reviewed (mostly auth package). Will resume tomorrow.

lib/auth/auth.go Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/grpcserver.go Outdated Show resolved Hide resolved
lib/auth/grpcserver.go Outdated Show resolved Hide resolved
lib/auth/middleware.go Show resolved Hide resolved
lib/auth/middleware.go Show resolved Hide resolved
lib/auth/middleware.go Show resolved Hide resolved
Copy link
Contributor

@webvictim webvictim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly spelling/grammar fixes.

lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/grpcserver.go Outdated Show resolved Hide resolved
lib/auth/grpcserver.go Outdated Show resolved Hide resolved
lib/auth/permissions.go Outdated Show resolved Hide resolved
lib/events/api.go Outdated Show resolved Hide resolved
lib/service/service.go Outdated Show resolved Hide resolved
lib/utils/utils.go Outdated Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/auth_with_roles.go Outdated Show resolved Hide resolved
lib/auth/clt.go Outdated Show resolved Hide resolved
lib/bpf/common.go Outdated Show resolved Hide resolved
Comment on lines 373 to 398
// GetID returns unique event ID
GetID() string
// SetID sets unique event ID
SetID(id string)

// GetCode returns event short diagnostic code
GetCode() string
// SetCode sets unique event diagnostic code
SetCode(string)

// GetType returns event type
GetType() string
// SetCode sets unique type
SetType(string)

// GetTime returns event time
GetTime() time.Time
// SetTime sets event time
SetTime(time.Time)

// GetIndex gets event index - a non-unique
// monotonicaly incremented number
// in the event sequence
GetIndex() int64
// SetIndex sets event index
SetIndex(idx int64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface can be significantly simplified by just exposing *Metadata. Ex:

type AuditEvent interface {
    ProtoMarshaler
    Meta() *Metadata 
}

// Set ID
event.Meta().ID = "some-id"

// Get Index
index := event.Meta().Index

// etc...

Not sure if that is desirable, but its worth noting.

edit: Same kind of simplification could be applied to the other getter/setter style interfaces in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will keep it for now, but noted for the future refactorings

if err != nil {
return trace.ConvertSystemError(err)
}
if format.Proto == true {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if format.Proto == true {
if format.Proto {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that @awly prefers == true and == false, I wonder what do you folks think about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @russjones likes == true/false, I don't ;)

lib/events/auditlog.go Outdated Show resolved Hide resolved
lib/events/auditwriter.go Outdated Show resolved Hide resolved
lib/events/filesessions/fileasync.go Outdated Show resolved Hide resolved
lib/events/gcssessions/gcsstream_test.go Show resolved Hide resolved
lib/events/generate.go Outdated Show resolved Hide resolved
lib/events/stream.go Outdated Show resolved Hide resolved
lib/events/stream.go Show resolved Hide resolved
@klizhentas
Copy link
Contributor Author

klizhentas commented Sep 27, 2020

@awly check out the node profiles:

09-26-20.tar.gz

this is a 10 second workload that produces ~4MB compressed recording in a loop on a node.

@klizhentas
Copy link
Contributor Author

klizhentas commented Sep 28, 2020

@awly @fspmarshall if you guys don't have any outstanding requests, I'm ready to merge and cut beta.1 today. I will continue testing for the next 2 weeks until Oct 7th release.

Copy link
Contributor

@awly awly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the profiles, all numbers look reasonable to me

This commit introduces GRPC API for streaming sessions.

It adds structured events and sync streaming
that avoids storing events on disk.

You can find design in rfd/0002-streaming.md RFD.
@klizhentas
Copy link
Contributor Author

retest this please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants