Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] bulk audit event export api #46784

Open
wants to merge 1 commit into
base: branch/v16
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,58 @@ func (c *Client) SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC ti
return response.Items, response.LastKey, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (c *Client) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

events, err := c.grpc.ExportUnstructuredEvents(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err))
}

return stream.Func[*auditlogpb.ExportEventUnstructured](func() (*auditlogpb.ExportEventUnstructured, error) {
event, err := events.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return event, nil
}, cancel)
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (c *Client) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

chunks, err := c.grpc.GetEventExportChunks(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err))
}

return stream.Func[*auditlogpb.EventExportChunk](func() (*auditlogpb.EventExportChunk, error) {
chunk, err := chunks.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return chunk, nil
}, cancel)
}

// StreamUnstructuredSessionEvents streams audit events from a given session recording in an unstructured format.
// This method is used by the Teleport event-handler plugin to receive events
// from the auth server wihout having to support the Protobuf event schema.
Expand Down
446 changes: 384 additions & 62 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go

Large diffs are not rendered by default.

136 changes: 136 additions & 0 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions api/proto/teleport/auditlog/v1/auditlog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ service AuditLogService {
// GetUnstructuredEvents gets events from the audit log in an unstructured format.
// This endpoint is used by the event handler to retrieve the events as JSON.
rpc GetUnstructuredEvents(GetUnstructuredEventsRequest) returns (EventsUnstructured);
// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
rpc ExportUnstructuredEvents(ExportUnstructuredEventsRequest) returns (stream ExportEventUnstructured);
// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
rpc GetEventExportChunks(GetEventExportChunksRequest) returns (stream EventExportChunk);
}

// StreamUnstructuredSessionEventsRequest is a request containing data needed to fetch a session recording.
Expand Down Expand Up @@ -77,6 +83,25 @@ message EventsUnstructured {
string last_key = 2;
}

// ExportUnstructuredEventsRequest is a request with the needed data to export events.
message ExportUnstructuredEventsRequest {
// date is the target date from which to export events. note that only the UTC date of the
// timestamp value is used. use of a specific local timestamp may produce confusing results.
google.protobuf.Timestamp date = 1;
// chunk is the chunk to export events from.
string chunk = 2;
// cursor is an optional mechanism to resume interrupted streams for a given chunk.
string cursor = 3;
}

// ExportEventUnstructured is the stream item of the ExportUnstructuredEvents method.
message ExportEventUnstructured {
// event is the unstructured representation of the event payload.
EventUnstructured event = 1;
// cursor is the cursor to resume the stream after this point.
string cursor = 2;
}

// EventUnstructured represents a single events.AuditEvent in an unstructured format.
message EventUnstructured {
// type is the type of the event.
Expand All @@ -92,3 +117,15 @@ message EventUnstructured {
// unstructured is the unstructured representation of the event payload.
google.protobuf.Struct unstructured = 5;
}

// GetEventExportChunksRequest is used to request the next set of event chunks to export.
message GetEventExportChunksRequest {
// date is the date for which to list export shards.
google.protobuf.Timestamp date = 1;
}

// EventExportChunk represents a chunk of events to export.
message EventExportChunk {
// chunk is the chunk to export.
string chunk = 1;
}
21 changes: 21 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
mfav1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/mfa/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -6013,6 +6014,26 @@ func (a *ServerWithRoles) SearchEvents(ctx context.Context, req events.SearchEve
return outEvents, lastKey, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (a *ServerWithRoles) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil {
return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err))
}

return a.alog.ExportUnstructuredEvents(ctx, req)
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (a *ServerWithRoles) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil {
return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err))
}

return a.alog.GetEventExportChunks(ctx, req)
}

// SearchSessionEvents allows searching session audit events with pagination support.
func (a *ServerWithRoles) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) (outEvents []apievents.AuditEvent, lastKey string, err error) {
if req.Cond != nil {
Expand Down
41 changes: 40 additions & 1 deletion lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3405,7 +3405,6 @@ func (g *GRPCServer) StreamSessionEvents(req *authpb.StreamSessionEventsRequest,
}

c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionID), int64(req.StartIndex))

for {
select {
case event, more := <-c:
Expand Down Expand Up @@ -5538,6 +5537,46 @@ func (g *GRPCServer) GetUnstructuredEvents(ctx context.Context, req *auditlogpb.
}, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (g *GRPCServer) ExportUnstructuredEvents(req *auditlogpb.ExportUnstructuredEventsRequest, stream auditlogpb.AuditLogService_ExportUnstructuredEventsServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
}

events := auth.ServerWithRoles.ExportUnstructuredEvents(stream.Context(), req)

for events.Next() {
if err := stream.Send(events.Item()); err != nil {
events.Done()
return trace.Wrap(err)
}
}

return trace.Wrap(events.Done())
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (g *GRPCServer) GetEventExportChunks(req *auditlogpb.GetEventExportChunksRequest, stream auditlogpb.AuditLogService_GetEventExportChunksServer) error {
auth, err := g.authenticate(stream.Context())
if err != nil {
return trace.Wrap(err)
}

chunks := auth.ServerWithRoles.GetEventExportChunks(stream.Context(), req)

for chunks.Next() {
if err := stream.Send(chunks.Item()); err != nil {
chunks.Done()
return trace.Wrap(err)
}
}

return trace.Wrap(chunks.Done())
}

// StreamUnstructuredSessionEvents streams all events from a given session recording as an unstructured format.
func (g *GRPCServer) StreamUnstructuredSessionEvents(req *auditlogpb.StreamUnstructuredSessionEventsRequest, stream auditlogpb.AuditLogService_StreamUnstructuredSessionEventsServer) error {
auth, err := g.authenticate(stream.Context())
Expand Down
Loading
Loading