From 022469da45fcd27f15dd836a942f69557a560009 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 7 Sep 2023 13:11:53 -0700 Subject: [PATCH] events: WebSocket subscriptions support go-bexpr expressions (#22835) Subscribing to events through a WebSocket now support boolean expressions to filter only the events wanted based on the fields * `event_type` * `operation` * `source_plugin_mount` * `data_path` * `namespace` Example expressions: These can be passed to `vault events subscribe`, e.g.,: * `event_type == abc` * `source_plugin_mount == secret/` * `event_type != def and operation != write` ```sh vault events subscribe -filter='source_plugin_mount == secret/' 'kv*' ``` The docs for the `vault events subscribe` command and API endpoint will be coming shortly in a different PR, and will include a better specification for these expressions, similar to (or linking to) https://developer.hashicorp.com/boundary/docs/concepts/filtering --- changelog/22835.txt | 3 ++ command/events.go | 17 ++++++- go.mod | 1 + go.sum | 2 + http/events.go | 5 +- http/events_test.go | 89 +++++++++++++++++++++++++++++++- sdk/logical/events.go | 36 +++++++++++++ vault/eventbus/bus.go | 32 +++++++++--- vault/eventbus/bus_test.go | 101 +++++++++++++++++++++++++++++++++---- vault/events_test.go | 2 +- 10 files changed, 266 insertions(+), 22 deletions(-) create mode 100644 changelog/22835.txt diff --git a/changelog/22835.txt b/changelog/22835.txt new file mode 100644 index 000000000000..c8e3d46cea36 --- /dev/null +++ b/changelog/22835.txt @@ -0,0 +1,3 @@ +```release-note:improvement +events: WebSocket subscriptions add support for boolean filter expressions +``` diff --git a/command/events.go b/command/events.go index d85955865798..0ed974fcb91d 100644 --- a/command/events.go +++ b/command/events.go @@ -25,7 +25,8 @@ var ( type EventsSubscribeCommands struct { *BaseCommand - namespaces []string + namespaces []string + bexprFilter string } func (c *EventsSubscribeCommands) Synopsis() string { @@ -34,7 +35,7 @@ func (c *EventsSubscribeCommands) Synopsis() string { func (c *EventsSubscribeCommands) Help() string { helpText := ` -Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType +Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] [-filter=filterExpression] eventType Subscribe to events of the given event type (topic), which may be a glob pattern (with "*"" treated as a wildcard). The events will be sent to @@ -49,6 +50,14 @@ Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType func (c *EventsSubscribeCommands) Flags() *FlagSets { set := c.flagSet(FlagSetHTTP) f := set.NewFlagSet("Subscribe Options") + f.StringVar(&StringVar{ + Name: "filter", + Usage: `A boolean expression to use to filter events. Only events matching + the filter will be subscribed to. This is applied after any filtering + by event type or namespace.`, + Default: "", + Target: &c.bexprFilter, + }) f.StringSliceVar(&StringSliceVar{ Name: "namespaces", Usage: `Specifies one or more patterns of additional child namespaces @@ -133,6 +142,10 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri if len(c.namespaces) > 0 { q["namespaces"] = cleanNamespaces(c.namespaces) } + bexprFilter := strings.TrimSpace(c.bexprFilter) + if bexprFilter != "" { + q.Set("filter", bexprFilter) + } u.RawQuery = q.Encode() client.AddHeader("X-Vault-Token", client.Token()) client.AddHeader("X-Vault-Namespace", client.Namespace()) diff --git a/go.mod b/go.mod index 17276d834ebe..4d79d1f3c8c8 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,7 @@ require ( github.com/hashicorp/consul/api v1.23.0 github.com/hashicorp/errwrap v1.1.0 github.com/hashicorp/eventlogger v0.2.3 + github.com/hashicorp/go-bexpr v0.1.12 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index e08f775287b6..c3a1c3ccb14a 100644 --- a/go.sum +++ b/go.sum @@ -1955,6 +1955,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/eventlogger v0.2.3 h1:HhtM4tGEqd5H3bcI4SdppQBHA8Y5QF8Aje7HODp8/TA= github.com/hashicorp/eventlogger v0.2.3/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= +github.com/hashicorp/go-bexpr v0.1.12 h1:XrdVhmwu+9iYxIUWxsGVG7NQwrhzJZ0vR6nbN5bLgrA= +github.com/hashicorp/go-bexpr v0.1.12/go.mod h1:ACktpcSySkFNpcxWSClFrut7wicd9WzisnvHuw+g9K8= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/http/events.go b/http/events.go index fac54a6f1d17..8060b65288a8 100644 --- a/http/events.go +++ b/http/events.go @@ -37,6 +37,7 @@ type eventSubscriber struct { events *eventbus.EventBus namespacePatterns []string pattern string + bexprFilter string conn *websocket.Conn json bool checkCache *cache.Cache @@ -48,7 +49,7 @@ type eventSubscriber struct { func (sub *eventSubscriber) handleEventsSubscribeWebsocket() (websocket.StatusCode, string, error) { ctx := sub.ctx logger := sub.logger - ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern) + ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern, sub.bexprFilter) if err != nil { logger.Info("Error subscribing", "error", err) return websocket.StatusUnsupportedData, "Error subscribing", nil @@ -217,6 +218,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler } } + bexprFilter := strings.TrimSpace(r.URL.Query().Get("filter")) namespacePatterns := r.URL.Query()["namespaces"] namespacePatterns = prependNamespacePatterns(namespacePatterns, ns) conn, err := websocket.Accept(w, r, nil) @@ -251,6 +253,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler events: core.Events(), namespacePatterns: namespacePatterns, pattern: pattern, + bexprFilter: bexprFilter, conn: conn, json: json, checkCache: cache.New(webSocketRevalidationTime, webSocketRevalidationTime), diff --git a/http/events_test.go b/http/events_test.go index d08cd9cae3aa..b89acae8d37f 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "strings" "sync" "sync/atomic" @@ -84,8 +85,8 @@ func TestEventsSubscribe(t *testing.T) { }{{true}, {false}} for _, testCase := range testCases { - url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json) - conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{ + location := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json) + conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{ HTTPHeader: http.Header{"x-vault-token": []string{token}}, }) if err != nil { @@ -131,6 +132,90 @@ func TestEventsSubscribe(t *testing.T) { } } +// TestBexprFilters tests that go-bexpr filters are used to filter events. +func TestBexprFilters(t *testing.T) { + core := vault.TestCoreWithConfig(t, &vault.CoreConfig{ + Experiments: []string{experiments.VaultExperimentEventsAlpha1}, + }) + + ln, addr := TestServer(t, core) + defer ln.Close() + + // unseal the core + keys, token := vault.TestCoreInit(t, core) + for _, key := range keys { + _, err := core.Unseal(key) + if err != nil { + t.Fatal(err) + } + } + + sendEvent := func(eventType string) error { + pluginInfo := &logical.EventPluginInfo{ + MountPath: "secret", + } + ns := namespace.RootNamespace + id, err := uuid.GenerateUUID() + if err != nil { + core.Logger().Info("Error generating UUID, exiting sender", "error", err) + return err + } + err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, logical.EventType(eventType), &logical.EventData{ + Id: id, + Metadata: nil, + EntityIds: nil, + Note: "testing", + }) + if err != nil { + core.Logger().Info("Error sending event, exiting sender", "error", err) + return err + } + return nil + } + ctx := context.Background() + wsAddr := strings.Replace(addr, "http", "ws", 1) + bexprFilter := url.QueryEscape("event_type == abc") + + location := fmt.Sprintf("%s/v1/sys/events/subscribe/*?json=true&filter=%s", wsAddr, bexprFilter) + conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{ + HTTPHeader: http.Header{"x-vault-token": []string{token}}, + }) + if err != nil { + t.Fatal(err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + err = sendEvent("def") + if err != nil { + t.Fatal(err) + } + err = sendEvent("xyz") + if err != nil { + t.Fatal(err) + } + err = sendEvent("abc") + if err != nil { + t.Fatal(err) + } + + // we should get the abc message + _, msg, err := conn.Read(context.Background()) + if err != nil { + t.Fatal(err) + } + event := map[string]interface{}{} + err = json.Unmarshal(msg, &event) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "abc", event["data"].(map[string]interface{})["event_type"].(string)) + + // and no other messages + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, _, err = conn.Read(ctx) + assert.ErrorContains(t, err, "context deadline exceeded") +} + func TestNamespacePrepend(t *testing.T) { testCases := []struct { requestNs string diff --git a/sdk/logical/events.go b/sdk/logical/events.go index d341f9749949..5bd9717f7ff1 100644 --- a/sdk/logical/events.go +++ b/sdk/logical/events.go @@ -67,3 +67,39 @@ func SendEvent(ctx context.Context, sender EventSender, eventType string, metada } return sender.SendEvent(ctx, EventType(eventType), ev) } + +// EventReceivedBexpr is used for evaluating boolean expressions with go-bexpr. +type EventReceivedBexpr struct { + EventType string `bexpr:"event_type"` + Operation string `bexpr:"operation"` + SourcePluginMount string `bexpr:"source_plugin_mount"` + DataPath string `bexpr:"data_path"` + Namespace string `bexpr:"namespace"` +} + +// BexprDatum returns a copy of EventReceived formatted for use in evaluating go-bexpr boolean expressions. +func (x *EventReceived) BexprDatum() any { + operation := "" + dataPath := "" + + if x.Event != nil { + if x.Event.Metadata != nil { + operationValue := x.Event.Metadata.Fields[EventMetadataOperation] + if operationValue != nil { + operation = operationValue.GetStringValue() + } + dataPathValue := x.Event.Metadata.Fields[EventMetadataDataPath] + if dataPathValue != nil { + dataPath = dataPathValue.GetStringValue() + } + } + } + + return &EventReceivedBexpr{ + EventType: x.EventType, + Operation: operation, + SourcePluginMount: x.PluginInfo.MountPath, + DataPath: dataPath, + Namespace: x.Namespace, + } +} diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 4e54400f7f81..a9909fb56eda 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -17,6 +17,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/formatter_filters/cloudevents" + "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" @@ -201,11 +202,15 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) { }, nil } -func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { - return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern) +// Subscribe subscribes to events in the given namespace matching the event type pattern and after +// applying the optional go-bexpr filter. +func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) { + return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern, bexprFilter) } -func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { +// SubscribeMultipleNamespaces subscribes to events in the given namespace matching the event type +// pattern and after applying the optional go-bexpr filter. +func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { @@ -217,7 +222,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP return nil, nil, err } - filterNode := newFilterNode(namespacePathPatterns, pattern) + filterNode, err := newFilterNode(namespacePathPatterns, pattern, bexprFilter) + if err != nil { + return nil, nil, err + } err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode) if err != nil { return nil, nil, err @@ -263,7 +271,15 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) { bus.timeout = timeout } -func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter { +func newFilterNode(namespacePatterns []string, pattern string, bexprFilter string) (*eventlogger.Filter, error) { + var evaluator *bexpr.Evaluator + if bexprFilter != "" { + var err error + evaluator, err = bexpr.CreateEvaluator(bexprFilter) + if err != nil { + return nil, err + } + } return &eventlogger.Filter{ Predicate: func(e *eventlogger.Event) (bool, error) { eventRecv := e.Payload.(*logical.EventReceived) @@ -287,9 +303,13 @@ func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filt return false, nil } + // apply go-bexpr filter + if evaluator != nil { + return evaluator.Evaluate(eventRecv.BexprDatum()) + } return true, nil }, - } + }, nil } func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode { diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index 54d1009c0701..e480fecc7810 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -6,6 +6,7 @@ package eventbus import ( "context" "encoding/json" + "errors" "fmt" "sync/atomic" "testing" @@ -13,6 +14,7 @@ import ( "github.com/hashicorp/eventlogger" "github.com/hashicorp/go-secure-stdlib/strutil" + "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" "github.com/stretchr/testify/assert" @@ -35,7 +37,7 @@ func TestBusBasics(t *testing.T) { } err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) - if err != ErrNotStarted { + if !errors.Is(err, ErrNotStarted) { t.Errorf("Expected not started error but got: %v", err) } @@ -46,7 +48,7 @@ func TestBusBasics(t *testing.T) { t.Errorf("Expected no error sending: %v", err) } - ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType)) + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType), "") if err != nil { t.Fatal(err) } @@ -90,7 +92,7 @@ func TestSubscribeNonRootNamespace(t *testing.T) { Path: "abc/", } - ch, cancel, err := bus.Subscribe(ctx, ns, string(eventType)) + ch, cancel, err := bus.Subscribe(ctx, ns, string(eventType), "") if err != nil { t.Fatal(err) } @@ -133,7 +135,7 @@ func TestNamespaceFiltering(t *testing.T) { t.Fatal(err) } - ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType)) + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType), "") if err != nil { t.Fatal(err) } @@ -189,13 +191,13 @@ func TestBus2Subscriptions(t *testing.T) { eventType2 := logical.EventType("someType2") bus.Start() - ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType1)) + ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType1), "") if err != nil { t.Fatal(err) } defer cancel1() - ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType2)) + ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType2), "") if err != nil { t.Fatal(err) } @@ -274,7 +276,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { received := atomic.Int32{} for i := 0; i < create; i++ { - ch, cancelFunc, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType)) + ch, cancelFunc, err := bus.Subscribe(ctx, namespace.RootNamespace, string(eventType), "") if err != nil { t.Fatal(err) } @@ -363,13 +365,13 @@ func TestBusWildcardSubscriptions(t *testing.T) { barEventType := logical.EventType("kv/bar") bus.Start() - ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*") + ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*", "") if err != nil { t.Fatal(err) } defer cancel1() - ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, "*/bar") + ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, "*/bar", "") if err != nil { t.Fatal(err) } @@ -437,7 +439,7 @@ func TestDataPathIsPrependedWithMount(t *testing.T) { fooEventType := logical.EventType("kv/foo") bus.Start() - ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*") + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*", "") if err != nil { t.Fatal(err) } @@ -545,3 +547,82 @@ func TestDataPathIsPrependedWithMount(t *testing.T) { t.Error("Timeout waiting for event") } } + +// TestBexpr tests go-bexpr filters are evaluated on an event. +func TestBexpr(t *testing.T) { + bus, err := NewEventBus(nil) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + bus.Start() + + sendEvent := func(eventType string) error { + event, err := logical.NewEvent() + if err != nil { + return err + } + metadata := map[string]string{ + logical.EventMetadataDataPath: "my/secret/path", + logical.EventMetadataOperation: "write", + } + metadataBytes, err := json.Marshal(metadata) + if err != nil { + return err + } + event.Metadata = &structpb.Struct{} + if err := event.Metadata.UnmarshalJSON(metadataBytes); err != nil { + return err + } + // send with a secrets plugin mounted + pluginInfo := logical.EventPluginInfo{ + MountClass: "secrets", + MountAccessor: "kv_abc", + MountPath: "secret/", + Plugin: "kv", + PluginVersion: "v1.13.1+builtin", + Version: "2", + } + return bus.SendEventInternal(ctx, namespace.RootNamespace, &pluginInfo, logical.EventType(eventType), event) + } + + testCases := []struct { + name string + filter string + shouldPassFilter bool + }{ + {"empty expression", "", true}, + {"non-matching expression", "data_path == nothing", false}, + {"matching expression", "data_path == secret/my/secret/path", true}, + {"full matching expression", "data_path == secret/my/secret/path and operation != read and source_plugin_mount == secret/ and source_plugin_mount != somethingelse", true}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + eventType, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType, testCase.filter) + if err != nil { + t.Fatal(err) + } + defer cancel() + err = sendEvent(eventType) + if err != nil { + t.Fatal(err) + } + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + got := false + select { + case <-ch: + got = true + case <-timer.C: + } + assert.Equal(t, testCase.shouldPassFilter, got) + }) + } +} diff --git a/vault/events_test.go b/vault/events_test.go index 33f465261879..e21fdec89105 100644 --- a/vault/events_test.go +++ b/vault/events_test.go @@ -21,7 +21,7 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) { if err != nil { t.Fatal(err) } - ch, cancel, err := c.events.Subscribe(ctx, namespace.RootNamespace, eventType) + ch, cancel, err := c.events.Subscribe(ctx, namespace.RootNamespace, eventType, "") if err != nil { t.Fatal(err) }