Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: WebSocket subscriptions support go-bexpr expressions #22835

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/22835.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
events: WebSocket subscriptions add support for boolean filter expressions
```
17 changes: 15 additions & 2 deletions command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var (
type EventsSubscribeCommands struct {
*BaseCommand

namespaces []string
namespaces []string
bexprFilter string
}

func (c *EventsSubscribeCommands) Synopsis() string {
Expand All @@ -34,7 +35,7 @@ func (c *EventsSubscribeCommands) Synopsis() string {

func (c *EventsSubscribeCommands) Help() string {
helpText := `
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] [-filter=filterExpression] eventType

Subscribe to events of the given event type (topic), which may be a glob
pattern (with "*"" treated as a wildcard). The events will be sent to
Expand All @@ -49,6 +50,14 @@ Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
func (c *EventsSubscribeCommands) Flags() *FlagSets {
set := c.flagSet(FlagSetHTTP)
f := set.NewFlagSet("Subscribe Options")
f.StringVar(&StringVar{
Name: "filter",
Usage: `A boolean expression to use to filter events. Only events matching
the filter will be subscribed to. This is applied after any filtering
by event type or namespace.`,
Default: "",
Target: &c.bexprFilter,
})
f.StringSliceVar(&StringSliceVar{
Name: "namespaces",
Usage: `Specifies one or more patterns of additional child namespaces
Expand Down Expand Up @@ -133,6 +142,10 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri
if len(c.namespaces) > 0 {
q["namespaces"] = cleanNamespaces(c.namespaces)
}
bexprFilter := strings.TrimSpace(c.bexprFilter)
if bexprFilter != "" {
q.Set("filter", bexprFilter)
}
u.RawQuery = q.Encode()
client.AddHeader("X-Vault-Token", client.Token())
client.AddHeader("X-Vault-Namespace", client.Namespace())
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
github.com/hashicorp/consul/api v1.23.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.2.3
github.com/hashicorp/go-bexpr v0.1.12
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.2.3 h1:HhtM4tGEqd5H3bcI4SdppQBHA8Y5QF8Aje7HODp8/TA=
github.com/hashicorp/eventlogger v0.2.3/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-bexpr v0.1.12 h1:XrdVhmwu+9iYxIUWxsGVG7NQwrhzJZ0vR6nbN5bLgrA=
github.com/hashicorp/go-bexpr v0.1.12/go.mod h1:ACktpcSySkFNpcxWSClFrut7wicd9WzisnvHuw+g9K8=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down
5 changes: 4 additions & 1 deletion http/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type eventSubscriber struct {
events *eventbus.EventBus
namespacePatterns []string
pattern string
bexprFilter string
conn *websocket.Conn
json bool
checkCache *cache.Cache
Expand All @@ -48,7 +49,7 @@ type eventSubscriber struct {
func (sub *eventSubscriber) handleEventsSubscribeWebsocket() (websocket.StatusCode, string, error) {
ctx := sub.ctx
logger := sub.logger
ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern)
ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern, sub.bexprFilter)
if err != nil {
logger.Info("Error subscribing", "error", err)
return websocket.StatusUnsupportedData, "Error subscribing", nil
Expand Down Expand Up @@ -217,6 +218,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}

bexprFilter := strings.TrimSpace(r.URL.Query().Get("filter"))
namespacePatterns := r.URL.Query()["namespaces"]
namespacePatterns = prependNamespacePatterns(namespacePatterns, ns)
conn, err := websocket.Accept(w, r, nil)
Expand Down Expand Up @@ -251,6 +253,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
events: core.Events(),
namespacePatterns: namespacePatterns,
pattern: pattern,
bexprFilter: bexprFilter,
conn: conn,
json: json,
checkCache: cache.New(webSocketRevalidationTime, webSocketRevalidationTime),
Expand Down
89 changes: 87 additions & 2 deletions http/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -88,8 +89,8 @@ func TestEventsSubscribe(t *testing.T) {
}{{true}, {false}}

for _, testCase := range testCases {
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
location := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json)
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
Expand Down Expand Up @@ -135,6 +136,90 @@ func TestEventsSubscribe(t *testing.T) {
}
}

// TestBexprFilters tests that go-bexpr filters are used to filter events.
func TestBexprFilters(t *testing.T) {
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{
Experiments: []string{experiments.VaultExperimentEventsAlpha1},
})

ln, addr := TestServer(t, core)
defer ln.Close()

// unseal the core
keys, token := vault.TestCoreInit(t, core)
for _, key := range keys {
_, err := core.Unseal(key)
if err != nil {
t.Fatal(err)
}
}

sendEvent := func(eventType string) error {
pluginInfo := &logical.EventPluginInfo{
MountPath: "secret",
}
ns := namespace.RootNamespace
id, err := uuid.GenerateUUID()
if err != nil {
core.Logger().Info("Error generating UUID, exiting sender", "error", err)
return err
}
err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, logical.EventType(eventType), &logical.EventData{
Id: id,
Metadata: nil,
EntityIds: nil,
Note: "testing",
})
if err != nil {
core.Logger().Info("Error sending event, exiting sender", "error", err)
return err
}
return nil
}
ctx := context.Background()
wsAddr := strings.Replace(addr, "http", "ws", 1)
bexprFilter := url.QueryEscape("event_type == abc")

location := fmt.Sprintf("%s/v1/sys/events/subscribe/*?json=true&filter=%s", wsAddr, bexprFilter)
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
defer conn.Close(websocket.StatusNormalClosure, "")
err = sendEvent("def")
if err != nil {
t.Fatal(err)
}
err = sendEvent("xyz")
if err != nil {
t.Fatal(err)
}
err = sendEvent("abc")
if err != nil {
t.Fatal(err)
}

// we should get the abc message
_, msg, err := conn.Read(context.Background())
if err != nil {
t.Fatal(err)
}
event := map[string]interface{}{}
err = json.Unmarshal(msg, &event)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "abc", event["data"].(map[string]interface{})["event_type"].(string))

// and no other messages
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, _, err = conn.Read(ctx)
assert.ErrorContains(t, err, "context deadline exceeded")
}

func TestNamespacePrepend(t *testing.T) {
testCases := []struct {
requestNs string
Expand Down
36 changes: 36 additions & 0 deletions sdk/logical/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,39 @@ func SendEvent(ctx context.Context, sender EventSender, eventType string, metada
}
return sender.SendEvent(ctx, EventType(eventType), ev)
}

// EventReceivedBexpr is used for evaluating boolean expressions with go-bexpr.
type EventReceivedBexpr struct {
EventType string `bexpr:"event_type"`
Operation string `bexpr:"operation"`
SourcePluginMount string `bexpr:"source_plugin_mount"`
DataPath string `bexpr:"data_path"`
Namespace string `bexpr:"namespace"`
}

// BexprDatum returns a copy of EventReceived formatted for use in evaluating go-bexpr boolean expressions.
func (x *EventReceived) BexprDatum() any {
operation := ""
dataPath := ""

if x.Event != nil {
if x.Event.Metadata != nil {
operationValue := x.Event.Metadata.Fields[EventMetadataOperation]
if operationValue != nil {
operation = operationValue.GetStringValue()
}
dataPathValue := x.Event.Metadata.Fields[EventMetadataDataPath]
if dataPathValue != nil {
dataPath = dataPathValue.GetStringValue()
}
}
}

return &EventReceivedBexpr{
EventType: x.EventType,
Operation: operation,
SourcePluginMount: x.PluginInfo.MountPath,
DataPath: dataPath,
Namespace: x.Namespace,
}
}
32 changes: 26 additions & 6 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
Expand Down Expand Up @@ -201,11 +202,15 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}, nil
}

func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern)
// Subscribe subscribes to events in the given namespace matching the event type pattern and after
// applying the optional go-bexpr filter.
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{strings.Trim(ns.Path, "/")}, pattern, bexprFilter)
}

func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// SubscribeMultipleNamespaces subscribes to events in the given namespace matching the event type
// pattern and after applying the optional go-bexpr filter.
func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -217,7 +222,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return nil, nil, err
}

filterNode := newFilterNode(namespacePathPatterns, pattern)
filterNode, err := newFilterNode(namespacePathPatterns, pattern, bexprFilter)
if err != nil {
return nil, nil, err
}
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -263,7 +271,15 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
bus.timeout = timeout
}

func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter {
func newFilterNode(namespacePatterns []string, pattern string, bexprFilter string) (*eventlogger.Filter, error) {
var evaluator *bexpr.Evaluator
if bexprFilter != "" {
var err error
evaluator, err = bexpr.CreateEvaluator(bexprFilter)
if err != nil {
return nil, err
}
}
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)
Expand All @@ -287,9 +303,13 @@ func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filt
return false, nil
}

// apply go-bexpr filter
if evaluator != nil {
return evaluator.Evaluate(eventRecv.BexprDatum())
}
return true, nil
},
}
}, nil
}

func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
Expand Down
Loading
Loading