diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a10486102e..ba14888fb2 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -487,14 +487,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) queryable, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine) + manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util.Logger) + if err != nil { + return nil, err + } + t.Ruler, err = ruler.NewRuler( t.Cfg.Ruler, - ruler.DefaultTenantManagerFactory( - t.Cfg.Ruler, - t.Distributor, - queryable, - engine, - ), + manager, prometheus.DefaultRegisterer, util.Logger, t.RulerStorage, diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go new file mode 100644 index 0000000000..f1a2082f7b --- /dev/null +++ b/pkg/ruler/manager.go @@ -0,0 +1,223 @@ +package ruler + +import ( + "context" + "net/http" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + ot "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/notifier" + promRules "github.com/prometheus/prometheus/rules" + "github.com/weaveworks/common/user" + "golang.org/x/net/context/ctxhttp" + + store "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/util" +) + +type DefaultMultiTenantManager struct { + cfg Config + notifierCfg *config.Config + managerFactory ManagerFactory + + mapper *mapper + + // Structs for holding per-user Prometheus rules Managers + // and a corresponding metrics struct + userManagerMtx sync.Mutex + userManagers map[string]*promRules.Manager + userManagerMetrics *ManagerMetrics + + // Per-user notifiers with separate queues. + notifiersMtx sync.Mutex + notifiers map[string]*rulerNotifier + + managersTotal prometheus.Gauge + registry prometheus.Registerer + logger log.Logger +} + +func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { + ncfg, err := buildNotifierConfig(&cfg) + if err != nil { + return nil, err + } + + userManagerMetrics := NewManagerMetrics() + if reg != nil { + reg.MustRegister(userManagerMetrics) + } + + return &DefaultMultiTenantManager{ + cfg: cfg, + notifierCfg: ncfg, + managerFactory: managerFactory, + notifiers: map[string]*rulerNotifier{}, + mapper: newMapper(cfg.RulePath, logger), + userManagers: map[string]*promRules.Manager{}, + userManagerMetrics: userManagerMetrics, + managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ruler_managers_total", + Help: "Total number of managers registered and running in the ruler", + }), + registry: reg, + logger: logger, + }, nil +} + +func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]store.RuleGroupList) { + // A lock is taken to ensure if this function is called concurrently, then each call + // returns after the call map files and check for updates + r.userManagerMtx.Lock() + defer r.userManagerMtx.Unlock() + + for userID, ruleGroup := range ruleGroups { + r.syncRulesToManager(ctx, userID, ruleGroup) + } + + // Check for deleted users and remove them + for userID, mngr := range r.userManagers { + if _, exists := ruleGroups[userID]; !exists { + go mngr.Stop() + delete(r.userManagers, userID) + level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID) + } + } + + r.managersTotal.Set(float64(len(r.userManagers))) +} + +// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the +// the users Prometheus Rules Manager. +func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups store.RuleGroupList) { + // Map the files to disk and return the file names to be passed to the users manager if they + // have been updated + update, files, err := r.mapper.MapRules(user, groups.Formatted()) + if err != nil { + level.Error(r.logger).Log("msg", "unable to map rule files", "user", user, "err", err) + return + } + + if update { + level.Debug(r.logger).Log("msg", "updating rules", "user", "user") + configUpdatesTotal.WithLabelValues(user).Inc() + manager, exists := r.userManagers[user] + if !exists { + manager, err = r.newManager(ctx, user) + if err != nil { + configUpdateFailuresTotal.WithLabelValues(user, "rule-manager-creation-failure").Inc() + level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err) + return + } + // manager.Run() starts running the manager and blocks until Stop() is called. + // Hence run it as another goroutine. + go manager.Run() + r.userManagers[user] = manager + } + err = manager.Update(r.cfg.EvaluationInterval, files, nil) + if err != nil { + configUpdateFailuresTotal.WithLabelValues(user, "rules-update-failure").Inc() + level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err) + return + } + } +} + +// newManager creates a prometheus rule manager wrapped with a user id +// configured storage, appendable, notifier, and instrumentation +func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (*promRules.Manager, error) { + notifier, err := r.getOrCreateNotifier(userID) + if err != nil { + return nil, err + } + + // Create a new Prometheus registry and register it within + // our metrics struct for the provided user. + reg := prometheus.NewRegistry() + r.userManagerMetrics.AddUserRegistry(userID, reg) + + logger := log.With(r.logger, "user", userID) + return r.managerFactory(ctx, userID, notifier, logger, reg), nil +} + +func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifier.Manager, error) { + r.notifiersMtx.Lock() + defer r.notifiersMtx.Unlock() + + n, ok := r.notifiers[userID] + if ok { + return n.notifier, nil + } + + reg := prometheus.WrapRegistererWith(prometheus.Labels{"user": userID}, r.registry) + reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) + n = newRulerNotifier(¬ifier.Options{ + QueueCapacity: r.cfg.NotificationQueueCapacity, + Registerer: reg, + Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + // Note: The passed-in context comes from the Prometheus notifier + // and does *not* contain the userID. So it needs to be added to the context + // here before using the context to inject the userID into the HTTP request. + ctx = user.InjectOrgID(ctx, userID) + if err := user.InjectOrgIDIntoHTTPRequest(ctx, req); err != nil { + return nil, err + } + // Jaeger complains the passed-in context has an invalid span ID, so start a new root span + sp := ot.GlobalTracer().StartSpan("notify", ot.Tag{Key: "organization", Value: userID}) + defer sp.Finish() + ctx = ot.ContextWithSpan(ctx, sp) + _ = ot.GlobalTracer().Inject(sp.Context(), ot.HTTPHeaders, ot.HTTPHeadersCarrier(req.Header)) + return ctxhttp.Do(ctx, client, req) + }, + }, util.Logger) + + go n.run() + + // This should never fail, unless there's a programming mistake. + if err := n.applyConfig(r.notifierCfg); err != nil { + return nil, err + } + + r.notifiers[userID] = n + return n.notifier, nil +} + +func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { + var groups []*promRules.Group + r.userManagerMtx.Lock() + if mngr, exists := r.userManagers[userID]; exists { + groups = mngr.RuleGroups() + } + r.userManagerMtx.Unlock() + return groups +} + +func (r *DefaultMultiTenantManager) Stop() { + r.notifiersMtx.Lock() + for _, n := range r.notifiers { + n.stop() + } + r.notifiersMtx.Unlock() + + level.Info(r.logger).Log("msg", "stopping user managers") + wg := sync.WaitGroup{} + r.userManagerMtx.Lock() + for user, manager := range r.userManagers { + level.Debug(r.logger).Log("msg", "shutting down user manager", "user", user) + wg.Add(1) + go func(manager *promRules.Manager, user string) { + manager.Stop() + wg.Done() + level.Debug(r.logger).Log("msg", "user manager shut down", "user", user) + }(manager, user) + } + wg.Wait() + r.userManagerMtx.Unlock() + level.Info(r.logger).Log("msg", "all user managers stopped") +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index bcf09501e7..6c8c23f4e1 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -9,21 +9,17 @@ import ( "net/url" "path/filepath" "strings" - "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" promRules "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/util/strutil" "github.com/weaveworks/common/user" - "golang.org/x/net/context/ctxhttp" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -31,7 +27,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rules" store "github.com/cortexproject/cortex/pkg/ruler/rules" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/tls" @@ -53,11 +48,6 @@ var ( Name: "ruler_config_update_failures_total", Help: "Total number of config update failures triggered by a user", }, []string{"user", "reason"}) - managersTotal = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "cortex", - Name: "ruler_managers_total", - Help: "Total number of managers registered and running in the ruler", - }) ) // Config is the configuration for the recording rules server. @@ -149,59 +139,64 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`) } +// MultiTenantManager is the interface of interaction with a Manager that is tenant aware. +type MultiTenantManager interface { + // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. + SyncRuleGroups(ctx context.Context, ruleGroups map[string]store.RuleGroupList) + // GetRules fetches rules for a particular tenant (userID). + GetRules(userID string) []*promRules.Group + // Stop stops all Manager components. + Stop() +} + // Ruler evaluates rules. +// +---------------------------------------------------------------+ +// | | +// | Query +-------------+ | +// | +------------------> | | +// | | | Store | | +// | | +----------------+ | | +// | | | Rules +-------------+ | +// | | | | +// | | | | +// | | | | +// | +----+-v----+ Filter +------------+ | +// | | +-----------> | | +// | | Ruler | | Ring | | +// | | <-----------+ | | +// | +-------+---+ Rules +------------+ | +// | | | +// | | | +// | | | +// | | Load +-----------------+ | +// | +--------------> | | +// | | Manager | | +// | | | | +// | +-----------------+ | +// | | +// +---------------------------------------------------------------+ type Ruler struct { services.Service - cfg Config - notifierCfg *config.Config - managerFactory ManagerFactory - + cfg Config lifecycler *ring.BasicLifecycler ring *ring.Ring subservices *services.Manager - - store rules.RuleStore - mapper *mapper - - // Structs for holding per-user Prometheus rules Managers - // and a corresponding metrics struct - userManagerMtx sync.Mutex - userManagers map[string]*promRules.Manager - userManagerMetrics *ManagerMetrics - - // Per-user notifiers with separate queues. - notifiersMtx sync.Mutex - notifiers map[string]*rulerNotifier + store rules.RuleStore + manager MultiTenantManager registry prometheus.Registerer logger log.Logger } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, ruleStore rules.RuleStore) (*Ruler, error) { - ncfg, err := buildNotifierConfig(&cfg) - if err != nil { - return nil, err - } - - userManagerMetrics := NewManagerMetrics() - - if reg != nil { - reg.MustRegister(userManagerMetrics) - } - +func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rules.RuleStore) (*Ruler, error) { ruler := &Ruler{ - cfg: cfg, - notifierCfg: ncfg, - managerFactory: managerFactory, - notifiers: map[string]*rulerNotifier{}, - store: ruleStore, - mapper: newMapper(cfg.RulePath, logger), - userManagers: map[string]*promRules.Manager{}, - userManagerMetrics: userManagerMetrics, - registry: reg, - logger: logger, + cfg: cfg, + store: ruleStore, + manager: manager, + registry: reg, + logger: logger, } if cfg.EnableSharding { @@ -266,32 +261,12 @@ func (r *Ruler) starting(ctx context.Context) error { // Stop stops the Ruler. // Each function of the ruler is terminated before leaving the ring func (r *Ruler) stopping(_ error) error { - r.notifiersMtx.Lock() - for _, n := range r.notifiers { - n.stop() - } - r.notifiersMtx.Unlock() + r.manager.Stop() if r.subservices != nil { // subservices manages ring and lifecycler, if sharding was enabled. _ = services.StopManagerAndAwaitStopped(context.Background(), r.subservices) } - - level.Info(r.logger).Log("msg", "stopping user managers") - wg := sync.WaitGroup{} - r.userManagerMtx.Lock() - for user, manager := range r.userManagers { - level.Debug(r.logger).Log("msg", "shutting down user manager", "user", user) - wg.Add(1) - go func(manager *promRules.Manager, user string) { - manager.Stop() - wg.Done() - level.Debug(r.logger).Log("msg", "user manager shut down", "user", user) - }(manager, user) - } - wg.Wait() - r.userManagerMtx.Unlock() - level.Info(r.logger).Log("msg", "all user managers stopped") return nil } @@ -326,48 +301,6 @@ func SendAlerts(n *notifier.Manager, externalURL string) promRules.NotifyFunc { } } -func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) { - r.notifiersMtx.Lock() - defer r.notifiersMtx.Unlock() - - n, ok := r.notifiers[userID] - if ok { - return n.notifier, nil - } - - reg := prometheus.WrapRegistererWith(prometheus.Labels{"user": userID}, r.registry) - reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) - n = newRulerNotifier(¬ifier.Options{ - QueueCapacity: r.cfg.NotificationQueueCapacity, - Registerer: reg, - Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { - // Note: The passed-in context comes from the Prometheus notifier - // and does *not* contain the userID. So it needs to be added to the context - // here before using the context to inject the userID into the HTTP request. - ctx = user.InjectOrgID(ctx, userID) - if err := user.InjectOrgIDIntoHTTPRequest(ctx, req); err != nil { - return nil, err - } - // Jaeger complains the passed-in context has an invalid span ID, so start a new root span - sp := ot.GlobalTracer().StartSpan("notify", ot.Tag{Key: "organization", Value: userID}) - defer sp.Finish() - ctx = ot.ContextWithSpan(ctx, sp) - _ = ot.GlobalTracer().Inject(sp.Context(), ot.HTTPHeaders, ot.HTTPHeadersCarrier(req.Header)) - return ctxhttp.Do(ctx, client, req) - }, - }, util.Logger) - - go n.run() - - // This should never fail, unless there's a programming mistake. - if err := n.applyConfig(r.notifierCfg); err != nil { - return nil, err - } - - r.notifiers[userID] = n - return n.notifier, nil -} - func (r *Ruler) ownsRule(hash uint32) (bool, error) { rlrs, err := r.ring.Get(hash, ring.Read, []ring.IngesterDesc{}) if err != nil { @@ -423,9 +356,6 @@ func (r *Ruler) run(ctx context.Context) error { return nil case <-tick.C: r.loadRules(ctx) - r.userManagerMtx.Lock() - managersTotal.Set(float64(len(r.userManagers))) - r.userManagerMtx.Unlock() } } } @@ -441,8 +371,9 @@ func (r *Ruler) loadRules(ctx context.Context) { // Iterate through each users configuration and determine if the on-disk // configurations need to be updated - for user, cfg := range configs { - filteredGroups := store.RuleGroupList{} + filteredConfigs := make(map[string]rules.RuleGroupList) + for userID, cfg := range configs { + filteredConfigs[userID] = store.RuleGroupList{} // If sharding is enabled, prune the rule group to only contain rules // this ruler is responsible for. @@ -452,7 +383,7 @@ func (r *Ruler) loadRules(ctx context.Context) { ringHasher.Reset() _, err = ringHasher.Write([]byte(id)) if err != nil { - level.Error(r.logger).Log("msg", "failed to create group for user", "user", user, "namespace", g.Namespace, "group", g.Name, "err", err) + level.Error(r.logger).Log("msg", "failed to create group for user", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) continue } hash := ringHasher.Sum32() @@ -462,85 +393,15 @@ func (r *Ruler) loadRules(ctx context.Context) { return } if owned { - filteredGroups = append(filteredGroups, g) + filteredConfigs[userID] = append(filteredConfigs[userID], g) } } } else { - filteredGroups = cfg - } - - r.syncManager(ctx, user, filteredGroups) - } - - // Check for deleted users and remove them - r.userManagerMtx.Lock() - defer r.userManagerMtx.Unlock() - for user, mngr := range r.userManagers { - if _, exists := configs[user]; !exists { - go mngr.Stop() - delete(r.userManagers, user) - level.Info(r.logger).Log("msg", "deleting rule manager", "user", user) + filteredConfigs[userID] = cfg } } -} - -// syncManager maps the rule files to disk, detects any changes and will create/update the -// the users Prometheus Rules Manager. -func (r *Ruler) syncManager(ctx context.Context, user string, groups store.RuleGroupList) { - // A lock is taken to ensure if syncManager is called concurrently, that each call - // returns after the call map files and check for updates - r.userManagerMtx.Lock() - defer r.userManagerMtx.Unlock() - - // Map the files to disk and return the file names to be passed to the users manager if they - // have been updated - update, files, err := r.mapper.MapRules(user, groups.Formatted()) - if err != nil { - level.Error(r.logger).Log("msg", "unable to map rule files", "user", user, "err", err) - return - } - - if update { - level.Debug(r.logger).Log("msg", "updating rules", "user", "user") - configUpdatesTotal.WithLabelValues(user).Inc() - manager, exists := r.userManagers[user] - if !exists { - manager, err = r.newManager(ctx, user) - if err != nil { - configUpdateFailuresTotal.WithLabelValues(user, "rule-manager-creation-failure").Inc() - level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err) - return - } - // manager.Run() starts running the manager and blocks until Stop() is called. - // Hence run it as another goroutine. - go manager.Run() - r.userManagers[user] = manager - } - err = manager.Update(r.cfg.EvaluationInterval, files, nil) - if err != nil { - configUpdateFailuresTotal.WithLabelValues(user, "rules-update-failure").Inc() - level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err) - return - } - } -} - -// newManager creates a prometheus rule manager wrapped with a user id -// configured storage, appendable, notifier, and instrumentation -func (r *Ruler) newManager(ctx context.Context, userID string) (*promRules.Manager, error) { - notifier, err := r.getOrCreateNotifier(userID) - if err != nil { - return nil, err - } - - // Create a new Prometheus registry and register it within - // our metrics struct for the provided user. - reg := prometheus.NewRegistry() - r.userManagerMetrics.AddUserRegistry(userID, reg) - - logger := log.With(r.logger, "user", userID) - return r.managerFactory(ctx, userID, notifier, logger, reg), nil + r.manager.SyncRuleGroups(ctx, filteredConfigs) } // GetRules retrieves the running rules from this ruler and all running rulers in the ring if @@ -559,12 +420,7 @@ func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) { } func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { - var groups []*promRules.Group - r.userManagerMtx.Lock() - if mngr, exists := r.userManagers[userID]; exists { - groups = mngr.RuleGroups() - } - r.userManagerMtx.Unlock() + groups := r.manager.GetRules(userID) groupDescs := make([]*GroupStateDesc, 0, len(groups)) prefix := filepath.Join(r.cfg.RulePath, userID) + "/" diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 49539249dc..046d6c5d5f 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -57,7 +57,7 @@ func defaultRulerConfig(store rules.RuleStore) (Config, func()) { return cfg, cleanup } -func newRuler(t *testing.T, cfg Config) (*Ruler, func()) { +func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, func()) { dir, err := ioutil.TempDir("", t.Name()) testutil.Ok(t, err) cleanup := func() { @@ -82,13 +82,33 @@ func newRuler(t *testing.T, cfg Config) (*Ruler, func()) { l := log.NewLogfmtLogger(os.Stdout) l = level.NewFilter(l, level.AllowInfo()) + + return engine, noopQueryable, pusher, l, cleanup +} + +func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) { + engine, noopQueryable, pusher, logger, cleanup := testSetup(t, cfg) + manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, noopQueryable, engine), prometheus.NewRegistry(), logger) + require.NoError(t, err) + + return manager, cleanup +} + +func newRuler(t *testing.T, cfg Config) (*Ruler, func()) { + engine, noopQueryable, pusher, logger, cleanup := testSetup(t, cfg) storage, err := NewRuleStorage(cfg.StoreConfig) require.NoError(t, err) + + reg := prometheus.NewRegistry() + managerFactory := DefaultTenantManagerFactory(cfg, pusher, noopQueryable, engine) + manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, reg, util.Logger) + require.NoError(t, err) + ruler, err := NewRuler( cfg, - DefaultTenantManagerFactory(cfg, pusher, noopQueryable, engine), - prometheus.NewRegistry(), - l, + manager, + reg, + logger, storage, ) require.NoError(t, err) @@ -106,6 +126,8 @@ func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) { return ruler, cleanup } +var _ MultiTenantManager = &DefaultMultiTenantManager{} + func TestNotifierSendsUserIDHeader(t *testing.T) { var wg sync.WaitGroup @@ -126,16 +148,13 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { cfg.AlertmanagerURL = ts.URL cfg.AlertmanagerDiscovery = false - r, rcleanup := newTestRuler(t, cfg) + manager, rcleanup := newManager(t, cfg) defer rcleanup() - defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + defer manager.Stop() - n, err := r.getOrCreateNotifier("1") + n, err := manager.getOrCreateNotifier("1") require.NoError(t, err) - for _, not := range r.notifiers { - defer not.stop() - } // Loop until notifier discovery syncs up for len(n.Alertmanagers()) == 0 { time.Sleep(10 * time.Millisecond) @@ -147,7 +166,7 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { wg.Wait() // Ensure we have metrics in the notifier. - assert.NoError(t, prom_testutil.GatherAndCompare(r.registry.(*prometheus.Registry), strings.NewReader(` + assert.NoError(t, prom_testutil.GatherAndCompare(manager.registry.(*prometheus.Registry), strings.NewReader(` # HELP cortex_prometheus_notifications_dropped_total Total number of alerts dropped due to errors when sending to Alertmanager. # TYPE cortex_prometheus_notifications_dropped_total counter cortex_prometheus_notifications_dropped_total{user="1"} 0