Skip to content

Commit

Permalink
bulk audit event export api (#46399)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Sep 19, 2024
1 parent 6d79fc8 commit 2aebc3a
Show file tree
Hide file tree
Showing 24 changed files with 1,408 additions and 67 deletions.
52 changes: 52 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,58 @@ func (c *Client) SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC ti
return response.Items, response.LastKey, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (c *Client) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

events, err := c.grpc.ExportUnstructuredEvents(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err))
}

return stream.Func[*auditlogpb.ExportEventUnstructured](func() (*auditlogpb.ExportEventUnstructured, error) {
event, err := events.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return event, nil
}, cancel)
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (c *Client) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

chunks, err := c.grpc.GetEventExportChunks(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err))
}

return stream.Func[*auditlogpb.EventExportChunk](func() (*auditlogpb.EventExportChunk, error) {
chunk, err := chunks.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return chunk, nil
}, cancel)
}

// StreamUnstructuredSessionEvents streams audit events from a given session recording in an unstructured format.
// This method is used by the Teleport event-handler plugin to receive events
// from the auth server wihout having to support the Protobuf event schema.
Expand Down
446 changes: 384 additions & 62 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go

Large diffs are not rendered by default.

136 changes: 136 additions & 0 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions api/proto/teleport/auditlog/v1/auditlog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ service AuditLogService {
// GetUnstructuredEvents gets events from the audit log in an unstructured format.
// This endpoint is used by the event handler to retrieve the events as JSON.
rpc GetUnstructuredEvents(GetUnstructuredEventsRequest) returns (EventsUnstructured);
// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
rpc ExportUnstructuredEvents(ExportUnstructuredEventsRequest) returns (stream ExportEventUnstructured);
// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
rpc GetEventExportChunks(GetEventExportChunksRequest) returns (stream EventExportChunk);
}

// StreamUnstructuredSessionEventsRequest is a request containing data needed to fetch a session recording.
Expand Down Expand Up @@ -77,6 +83,25 @@ message EventsUnstructured {
string last_key = 2;
}

// ExportUnstructuredEventsRequest is a request with the needed data to export events.
message ExportUnstructuredEventsRequest {
// date is the target date from which to export events. note that only the UTC date of the
// timestamp value is used. use of a specific local timestamp may produce confusing results.
google.protobuf.Timestamp date = 1;
// chunk is the chunk to export events from.
string chunk = 2;
// cursor is an optional mechanism to resume interrupted streams for a given chunk.
string cursor = 3;
}

// ExportEventUnstructured is the stream item of the ExportUnstructuredEvents method.
message ExportEventUnstructured {
// event is the unstructured representation of the event payload.
EventUnstructured event = 1;
// cursor is the cursor to resume the stream after this point.
string cursor = 2;
}

// EventUnstructured represents a single events.AuditEvent in an unstructured format.
message EventUnstructured {
// type is the type of the event.
Expand All @@ -92,3 +117,15 @@ message EventUnstructured {
// unstructured is the unstructured representation of the event payload.
google.protobuf.Struct unstructured = 5;
}

// GetEventExportChunksRequest is used to request the next set of event chunks to export.
message GetEventExportChunksRequest {
// date is the date for which to list export shards.
google.protobuf.Timestamp date = 1;
}

// EventExportChunk represents a chunk of events to export.
message EventExportChunk {
// chunk is the chunk to export.
string chunk = 1;
}
21 changes: 21 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
mfav1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/mfa/v1"
trustpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/trust/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
Expand Down Expand Up @@ -5807,6 +5808,26 @@ func (a *ServerWithRoles) SearchEvents(ctx context.Context, req events.SearchEve
return outEvents, lastKey, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (a *ServerWithRoles) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil {
return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err))
}

return a.alog.ExportUnstructuredEvents(ctx, req)
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (a *ServerWithRoles) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil {
return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err))
}

return a.alog.GetEventExportChunks(ctx, req)
}

// SearchSessionEvents allows searching session audit events with pagination support.
func (a *ServerWithRoles) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) (outEvents []apievents.AuditEvent, lastKey string, err error) {
if req.Cond != nil {
Expand Down
41 changes: 40 additions & 1 deletion lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4043,7 +4043,6 @@ func (g *GRPCServer) StreamSessionEvents(req *authpb.StreamSessionEventsRequest,
}

c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionID), int64(req.StartIndex))

for {
select {
case event, more := <-c:
Expand Down Expand Up @@ -6125,6 +6124,46 @@ func (g *GRPCServer) GetUnstructuredEvents(ctx context.Context, req *auditlogv1p
}, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (g *GRPCServer) ExportUnstructuredEvents(req *auditlogpb.ExportUnstructuredEventsRequest, stream auditlogpb.AuditLogService_ExportUnstructuredEventsServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
}

events := auth.ServerWithRoles.ExportUnstructuredEvents(stream.Context(), req)

for events.Next() {
if err := stream.Send(events.Item()); err != nil {
events.Done()
return trace.Wrap(err)
}
}

return trace.Wrap(events.Done())
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (g *GRPCServer) GetEventExportChunks(req *auditlogpb.GetEventExportChunksRequest, stream auditlogpb.AuditLogService_GetEventExportChunksServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
}

chunks := auth.ServerWithRoles.GetEventExportChunks(stream.Context(), req)

for chunks.Next() {
if err := stream.Send(chunks.Item()); err != nil {
chunks.Done()
return trace.Wrap(err)
}
}

return trace.Wrap(chunks.Done())
}

// StreamUnstructuredSessionEvents streams all events from a given session recording as an unstructured format.
func (g *GRPCServer) StreamUnstructuredSessionEvents(req *auditlogv1pb.StreamUnstructuredSessionEventsRequest, stream auditlogv1pb.AuditLogService_StreamUnstructuredSessionEventsServer) error {
auth, err := g.authenticate(stream.Context())
Expand Down
Loading

0 comments on commit 2aebc3a

Please sign in to comment.