Skip to content

Commit

Permalink
[VAULT-17827] Rollback manager worker pool (hashicorp#22567)
Browse files Browse the repository at this point in the history
* workerpool implementation

* rollback tests

* website documentation

* add changelog

* fix failing test
  • Loading branch information
miagilepner committed Sep 4, 2023
1 parent c25e1a5 commit 4e3b91d
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 19 deletions.
3 changes: 3 additions & 0 deletions changelog/22567.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: Use a worker pool for the rollback manager. Add new metrics for the rollback manager to track the queued tasks.
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/fatih/color v1.15.0
github.com/fatih/structs v1.1.0
github.com/favadi/protoc-go-inject-tag v1.4.0
github.com/gammazero/workerpool v1.1.3
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-errors/errors v1.4.2
github.com/go-git/go-git/v5 v5.7.0
Expand Down Expand Up @@ -338,7 +339,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/gammazero/workerpool v1.1.3 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.4.1 // indirect
Expand Down
7 changes: 7 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ type Core struct {
// heartbeating with the active node. Default to the current SDK version.
effectiveSDKVersion string

numRollbackWorkers int
rollbackPeriod time.Duration
rollbackMountPathMetrics bool

Expand Down Expand Up @@ -866,6 +867,8 @@ type CoreConfig struct {
// AdministrativeNamespacePath is used to configure the administrative namespace, which has access to some sys endpoints that are
// only accessible in the root namespace, currently sys/audit-hash and sys/monitor.
AdministrativeNamespacePath string

NumRollbackWorkers int
}

// SubloggerHook implements the SubloggerAdder interface. This implementation
Expand Down Expand Up @@ -954,6 +957,9 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
conf.NumExpirationWorkers = numExpirationWorkersDefault
}

if conf.NumRollbackWorkers == 0 {
conf.NumRollbackWorkers = RollbackDefaultNumWorkers
}
// Use imported logging deadlock if requested
var stateLock locking.RWMutex
if strings.Contains(conf.DetectDeadlocks, "statelock") {
Expand Down Expand Up @@ -1038,6 +1044,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
pendingRemovalMountsAllowed: conf.PendingRemovalMountsAllowed,
expirationRevokeRetryBase: conf.ExpirationRevokeRetryBase,
rollbackMountPathMetrics: conf.MetricSink.TelemetryConsts.RollbackMetricsIncludeMountPoint,
numRollbackWorkers: conf.NumRollbackWorkers,
impreciseLeaseRoleTracking: conf.ImpreciseLeaseRoleTracking,
}

Expand Down
84 changes: 66 additions & 18 deletions vault/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@ package vault
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"

metrics "github.com/armon/go-metrics"
"github.com/gammazero/workerpool"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
)

const (
RollbackDefaultNumWorkers = 256
RollbackWorkersEnvVar = "VAULT_ROLLBACK_WORKERS"
)

// RollbackManager is responsible for performing rollbacks of partial
// secrets within logical backends.
//
Expand Down Expand Up @@ -51,8 +60,8 @@ type RollbackManager struct {
stopTicker chan struct{}
tickerIsStopped bool
quitContext context.Context

core *Core
runner *workerpool.WorkerPool
core *Core
// This channel is used for testing
rollbacksDoneCh chan struct{}
}
Expand All @@ -63,6 +72,9 @@ type rollbackState struct {
sync.WaitGroup
cancelLockGrabCtx context.Context
cancelLockGrabCtxCancel context.CancelFunc
// scheduled is the time that this job was created and submitted to the
// rollbackRunner
scheduled time.Time
}

// NewRollbackManager is used to create a new rollback manager
Expand All @@ -81,9 +93,26 @@ func NewRollbackManager(ctx context.Context, logger log.Logger, backendsFunc fun
rollbackMetricsMountName: core.rollbackMountPathMetrics,
rollbacksDoneCh: make(chan struct{}),
}
numWorkers := r.numRollbackWorkers()
r.logger.Info(fmt.Sprintf("Starting the rollback manager with %d workers", numWorkers))
r.runner = workerpool.New(numWorkers)
return r
}

func (m *RollbackManager) numRollbackWorkers() int {
numWorkers := m.core.numRollbackWorkers
envOverride := os.Getenv(RollbackWorkersEnvVar)
if envOverride != "" {
envVarWorkers, err := strconv.Atoi(envOverride)
if err != nil || envVarWorkers < 1 {
m.logger.Warn(fmt.Sprintf("%s must be a positive integer, but was %s", RollbackWorkersEnvVar, envOverride))
} else {
numWorkers = envVarWorkers
}
}
return numWorkers
}

// Start starts the rollback manager
func (m *RollbackManager) Start() {
go m.run()
Expand All @@ -99,7 +128,7 @@ func (m *RollbackManager) Stop() {
close(m.shutdownCh)
<-m.doneCh
}
m.inflightAll.Wait()
m.runner.StopWait()
}

// StopTicker stops the automatic Rollback manager's ticker, causing us
Expand Down Expand Up @@ -168,6 +197,8 @@ func (m *RollbackManager) triggerRollbacks() {
func (m *RollbackManager) startOrLookupRollback(ctx context.Context, fullPath string, grabStatelock bool) *rollbackState {
m.inflightLock.Lock()
defer m.inflightLock.Unlock()
defer metrics.SetGauge([]string{"rollback", "queued"}, float32(m.runner.WaitingQueueSize()))
defer metrics.SetGauge([]string{"rollback", "inflight"}, float32(len(m.inflight)))
rsInflight, ok := m.inflight[fullPath]
if ok {
return rsInflight
Expand All @@ -183,31 +214,48 @@ func (m *RollbackManager) startOrLookupRollback(ctx context.Context, fullPath st
m.inflight[fullPath] = rs
rs.Add(1)
m.inflightAll.Add(1)
go func() {
m.attemptRollback(ctx, fullPath, rs, grabStatelock)
select {
case m.rollbacksDoneCh <- struct{}{}:
default:
}
}()
rs.scheduled = time.Now()
select {
case <-m.doneCh:
// if we've already shut down, then don't submit the task to avoid a panic
// we should still call finishRollback for the rollback state in order to remove
// it from the map and decrement the waitgroup.

// we already have the inflight lock, so we can't grab it here
m.finishRollback(rs, errors.New("rollback manager is stopped"), fullPath, false)
default:
m.runner.Submit(func() {
m.attemptRollback(ctx, fullPath, rs, grabStatelock)
select {
case m.rollbacksDoneCh <- struct{}{}:
default:
}
})

}
return rs
}

func (m *RollbackManager) finishRollback(rs *rollbackState, err error, fullPath string, grabInflightLock bool) {
rs.lastError = err
rs.Done()
m.inflightAll.Done()
if grabInflightLock {
m.inflightLock.Lock()
defer m.inflightLock.Unlock()
}
delete(m.inflight, fullPath)
}

// attemptRollback invokes a RollbackOperation for the given path
func (m *RollbackManager) attemptRollback(ctx context.Context, fullPath string, rs *rollbackState, grabStatelock bool) (err error) {
metrics.MeasureSince([]string{"rollback", "waiting"}, rs.scheduled)
metricName := []string{"rollback", "attempt"}
if m.rollbackMetricsMountName {
metricName = append(metricName, strings.ReplaceAll(fullPath, "/", "-"))
}
defer metrics.MeasureSince(metricName, time.Now())
defer func() {
rs.lastError = err
rs.Done()
m.inflightAll.Done()
m.inflightLock.Lock()
delete(m.inflight, fullPath)
m.inflightLock.Unlock()
}()
defer m.finishRollback(rs, err, fullPath, true)

ns, err := namespace.FromContext(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 4e3b91d

Please sign in to comment.