Skip to content

Commit

Permalink
events: WebSocket subscriptions support go-bexpr expressions (#22835)
Browse files Browse the repository at this point in the history
Subscribing to events through a WebSocket now support boolean
expressions to filter only the events wanted based on the fields

* `event_type`
* `operation`
* `source_plugin_mount`
* `data_path`
* `namespace`

Example expressions:

These can be passed to `vault events subscribe`, e.g.,:
* `event_type == abc`
* `source_plugin_mount == secret/`
* `event_type != def and operation != write`

```sh
vault events subscribe -filter='source_plugin_mount == secret/' 'kv*'
```

The docs for the `vault events subscribe` command and API endpoint
will be coming shortly in a different PR, and will include a better
specification for these expressions, similar to (or linking to)
https://developer.hashicorp.com/boundary/docs/concepts/filtering
  • Loading branch information
Christopher Swenson committed Sep 7, 2023
1 parent 3130e8b commit 022469d
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 22 deletions.
3 changes: 3 additions & 0 deletions changelog/22835.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
events: WebSocket subscriptions add support for boolean filter expressions
```
17 changes: 15 additions & 2 deletions command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var (
type EventsSubscribeCommands struct {
*BaseCommand

namespaces []string
namespaces []string
bexprFilter string
}

func (c *EventsSubscribeCommands) Synopsis() string {
Expand All @@ -34,7 +35,7 @@ func (c *EventsSubscribeCommands) Synopsis() string {

func (c *EventsSubscribeCommands) Help() string {
helpText := `
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] [-filter=filterExpression] eventType
Subscribe to events of the given event type (topic), which may be a glob
pattern (with "*"" treated as a wildcard). The events will be sent to
Expand All @@ -49,6 +50,14 @@ Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
func (c *EventsSubscribeCommands) Flags() *FlagSets {
set := c.flagSet(FlagSetHTTP)
f := set.NewFlagSet("Subscribe Options")
f.StringVar(&StringVar{
Name: "filter",
Usage: `A boolean expression to use to filter events. Only events matching
the filter will be subscribed to. This is applied after any filtering
by event type or namespace.`,
Default: "",
Target: &c.bexprFilter,
})
f.StringSliceVar(&StringSliceVar{
Name: "namespaces",
Usage: `Specifies one or more patterns of additional child namespaces
Expand Down Expand Up @@ -133,6 +142,10 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri
if len(c.namespaces) > 0 {
q["namespaces"] = cleanNamespaces(c.namespaces)
}
bexprFilter := strings.TrimSpace(c.bexprFilter)
if bexprFilter != "" {
q.Set("filter", bexprFilter)
}
u.RawQuery = q.Encode()
client.AddHeader("X-Vault-Token", client.Token())
client.AddHeader("X-Vault-Namespace", client.Namespace())
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
github.com/hashicorp/consul/api v1.23.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.2.3
github.com/hashicorp/go-bexpr v0.1.12
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.2.3 h1:HhtM4tGEqd5H3bcI4SdppQBHA8Y5QF8Aje7HODp8/TA=
github.com/hashicorp/eventlogger v0.2.3/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-bexpr v0.1.12 h1:XrdVhmwu+9iYxIUWxsGVG7NQwrhzJZ0vR6nbN5bLgrA=
github.com/hashicorp/go-bexpr v0.1.12/go.mod h1:ACktpcSySkFNpcxWSClFrut7wicd9WzisnvHuw+g9K8=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down
5 changes: 4 additions & 1 deletion http/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type eventSubscriber struct {
events *eventbus.EventBus
namespacePatterns []string
pattern string
bexprFilter string
conn *websocket.Conn
json bool
checkCache *cache.Cache
Expand All @@ -48,7 +49,7 @@ type eventSubscriber struct {
func (sub *eventSubscriber) handleEventsSubscribeWebsocket() (websocket.StatusCode, string, error) {
ctx := sub.ctx
logger := sub.logger
ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern)
ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern, sub.bexprFilter)
if err != nil {
logger.Info("Error subscribing", "error", err)
return websocket.StatusUnsupportedData, "Error subscribing", nil
Expand Down Expand Up @@ -217,6 +218,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}

bexprFilter := strings.TrimSpace(r.URL.Query().Get("filter"))
namespacePatterns := r.URL.Query()["namespaces"]
namespacePatterns = prependNamespacePatterns(namespacePatterns, ns)
conn, err := websocket.Accept(w, r, nil)
Expand Down Expand Up @@ -251,6 +253,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
events: core.Events(),
namespacePatterns: namespacePatterns,
pattern: pattern,
bexprFilter: bexprFilter,
conn: conn,
json: json,
checkCache: cache.New(webSocketRevalidationTime, webSocketRevalidationTime),
Expand Down
89 changes: 87 additions & 2 deletions http/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -84,8 +85,8 @@ func TestEventsSubscribe(t *testing.T) {
}{{true}, {false}}

for _, testCase := range testCases {
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
location := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json)
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
Expand Down Expand Up @@ -131,6 +132,90 @@ func TestEventsSubscribe(t *testing.T) {
}
}

// TestBexprFilters tests that go-bexpr filters are used to filter events.
func TestBexprFilters(t *testing.T) {
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{
Experiments: []string{experiments.VaultExperimentEventsAlpha1},

Check failure on line 138 in http/events_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (6)

undefined: experiments

Check failure on line 138 in http/events_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (6)

undefined: experiments

Check failure on line 138 in http/events_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests / test-go (6)

undefined: experiments

Check failure on line 138 in http/events_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests / test-go (6)

undefined: experiments
})

ln, addr := TestServer(t, core)
defer ln.Close()

// unseal the core
keys, token := vault.TestCoreInit(t, core)
for _, key := range keys {
_, err := core.Unseal(key)
if err != nil {
t.Fatal(err)
}
}

sendEvent := func(eventType string) error {
pluginInfo := &logical.EventPluginInfo{
MountPath: "secret",
}
ns := namespace.RootNamespace
id, err := uuid.GenerateUUID()
if err != nil {
core.Logger().Info("Error generating UUID, exiting sender", "error", err)
return err
}
err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, logical.EventType(eventType), &logical.EventData{
Id: id,
Metadata: nil,
EntityIds: nil,
Note: "testing",
})
if err != nil {
core.Logger().Info("Error sending event, exiting sender", "error", err)
return err
}
return nil
}
ctx := context.Background()
wsAddr := strings.Replace(addr, "http", "ws", 1)
bexprFilter := url.QueryEscape("event_type == abc")

location := fmt.Sprintf("%s/v1/sys/events/subscribe/*?json=true&filter=%s", wsAddr, bexprFilter)
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
defer conn.Close(websocket.StatusNormalClosure, "")
err = sendEvent("def")
if err != nil {
t.Fatal(err)
}
err = sendEvent("xyz")
if err != nil {
t.Fatal(err)
}
err = sendEvent("abc")
if err != nil {
t.Fatal(err)
}

// we should get the abc message
_, msg, err := conn.Read(context.Background())
if err != nil {
t.Fatal(err)
}
event := map[string]interface{}{}
err = json.Unmarshal(msg, &event)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "abc", event["data"].(map[string]interface{})["event_type"].(string))

// and no other messages
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, _, err = conn.Read(ctx)
assert.ErrorContains(t, err, "context deadline exceeded")
}

func TestNamespacePrepend(t *testing.T) {
testCases := []struct {
requestNs string
Expand Down
36 changes: 36 additions & 0 deletions sdk/logical/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,39 @@ func SendEvent(ctx context.Context, sender EventSender, eventType string, metada
}
return sender.SendEvent(ctx, EventType(eventType), ev)
}

// EventReceivedBexpr is used for evaluating boolean expressions with go-bexpr.
type EventReceivedBexpr struct {
EventType string `bexpr:"event_type"`
Operation string `bexpr:"operation"`
SourcePluginMount string `bexpr:"source_plugin_mount"`
DataPath string `bexpr:"data_path"`
Namespace string `bexpr:"namespace"`
}

// BexprDatum returns a copy of EventReceived formatted for use in evaluating go-bexpr boolean expressions.
func (x *EventReceived) BexprDatum() any {
operation := ""
dataPath := ""

if x.Event != nil {
if x.Event.Metadata != nil {
operationValue := x.Event.Metadata.Fields[EventMetadataOperation]
if operationValue != nil {
operation = operationValue.GetStringValue()
}
dataPathValue := x.Event.Metadata.Fields[EventMetadataDataPath]
if dataPathValue != nil {
dataPath = dataPathValue.GetStringValue()
}
}
}

return &EventReceivedBexpr{
EventType: x.EventType,
Operation: operation,
SourcePluginMount: x.PluginInfo.MountPath,
DataPath: dataPath,
Namespace: x.Namespace,
}
}
32 changes: 26 additions & 6 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
Expand Down Expand Up @@ -201,11 +202,15 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}, nil
}

func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern)
// Subscribe subscribes to events in the given namespace matching the event type pattern and after
// applying the optional go-bexpr filter.
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern, bexprFilter)
}

func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// SubscribeMultipleNamespaces subscribes to events in the given namespace matching the event type
// pattern and after applying the optional go-bexpr filter.
func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -217,7 +222,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return nil, nil, err
}

filterNode := newFilterNode(namespacePathPatterns, pattern)
filterNode, err := newFilterNode(namespacePathPatterns, pattern, bexprFilter)
if err != nil {
return nil, nil, err
}
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -263,7 +271,15 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
bus.timeout = timeout
}

func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter {
func newFilterNode(namespacePatterns []string, pattern string, bexprFilter string) (*eventlogger.Filter, error) {
var evaluator *bexpr.Evaluator
if bexprFilter != "" {
var err error
evaluator, err = bexpr.CreateEvaluator(bexprFilter)
if err != nil {
return nil, err
}
}
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)
Expand All @@ -287,9 +303,13 @@ func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filt
return false, nil
}

// apply go-bexpr filter
if evaluator != nil {
return evaluator.Evaluate(eventRecv.BexprDatum())
}
return true, nil
},
}
}, nil
}

func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
Expand Down
Loading

0 comments on commit 022469d

Please sign in to comment.