From 2eb57beebbf22fae08b7cf834007731cb64960b6 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 26 Jul 2019 11:33:53 +0100 Subject: [PATCH 1/2] Store less data in Cassandra prefix buckets The Cassandra physical backend relies on storing data for sys/foo/bar under sys, sys/foo, and sys/foo/bar. This is necessary so that we can list the sys bucket, get a list of all child keys, and then trim this down to find child 'folders' eg food. Right now however, we store the full value of every storage entry in all three buckets. This is unnecessary as the value will only ever be read out in the leaf bucket ie sys/foo/bar. We use the intermediary buckets simply for listing keys. We have seen some issues around compaction where certain buckets, particularly intermediary buckets that are exclusively for listing, get really clogged up with data to the point of not being listable. Buckets like sys/expire/id are huge, combining lease expiry data for all auth methods, and need to be listed for vault to successfully become leader. This PR tries to cut down on the amount of data stored in intermediary buckets. --- physical/cassandra/cassandra.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/physical/cassandra/cassandra.go b/physical/cassandra/cassandra.go index 0276e0ce39df..8fcb04cdb16a 100644 --- a/physical/cassandra/cassandra.go +++ b/physical/cassandra/cassandra.go @@ -258,10 +258,16 @@ func (c *CassandraBackend) Put(ctx context.Context, entry *physical.Entry) error stmt := fmt.Sprintf(`INSERT INTO "%s" (bucket, key, value) VALUES (?, ?, ?)`, c.table) results := make(chan error) buckets := c.buckets(entry.Key) - for _, _bucket := range buckets { - go func(bucket string) { - results <- c.sess.Query(stmt, bucket, entry.Key, entry.Value).Exec() - }(_bucket) + for i, _bucket := range buckets { + go func(i int, bucket string) { + var value []byte + if i == len(buckets) - 1 { + // Only store the full value if this is the leaf bucket where the entry will actually be read + // otherwise this write is just to allow for list operations + value = entry.Value + } + results <- c.sess.Query(stmt, bucket, entry.Key, value).Exec() + }(i, _bucket) } for i := 0; i < len(buckets); i++ { if err := <-results; err != nil { From f5aea174f676b6ea6d52651b176150c7c0d15242 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Mon, 19 Aug 2019 11:56:19 +0100 Subject: [PATCH 2/2] Avoid goroutine leak by buffering results channel up to the bucket count --- physical/cassandra/cassandra.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/physical/cassandra/cassandra.go b/physical/cassandra/cassandra.go index 8fcb04cdb16a..e1ca49ad75d4 100644 --- a/physical/cassandra/cassandra.go +++ b/physical/cassandra/cassandra.go @@ -256,12 +256,12 @@ func (c *CassandraBackend) Put(ctx context.Context, entry *physical.Entry) error // Execute inserts to each key prefix simultaneously stmt := fmt.Sprintf(`INSERT INTO "%s" (bucket, key, value) VALUES (?, ?, ?)`, c.table) - results := make(chan error) buckets := c.buckets(entry.Key) + results := make(chan error, len(buckets)) for i, _bucket := range buckets { go func(i int, bucket string) { var value []byte - if i == len(buckets) - 1 { + if i == len(buckets)-1 { // Only store the full value if this is the leaf bucket where the entry will actually be read // otherwise this write is just to allow for list operations value = entry.Value @@ -302,8 +302,8 @@ func (c *CassandraBackend) Delete(ctx context.Context, key string) error { defer metrics.MeasureSince([]string{"cassandra", "delete"}, time.Now()) stmt := fmt.Sprintf(`DELETE FROM "%s" WHERE bucket = ? AND key = ?`, c.table) - results := make(chan error) buckets := c.buckets(key) + results := make(chan error, len(buckets)) for _, bucket := range buckets { go func(bucket string) {