From cd412d076dd4696cd615909f57c77d2df95c7b7d Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Wed, 24 Apr 2024 19:18:07 +0900 Subject: [PATCH] separate metrics and use errgroup --- pool-agent/cmd/agent.go | 96 ++++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 31 deletions(-) diff --git a/pool-agent/cmd/agent.go b/pool-agent/cmd/agent.go index bc01361..6809172 100644 --- a/pool-agent/cmd/agent.go +++ b/pool-agent/cmd/agent.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api" + "golang.org/x/sync/errgroup" ) // Agent is an agent for pool mode. @@ -119,41 +120,64 @@ func (a *Agent) reloadConfig() error { // Run runs the agent. func (a *Agent) Run(ctx context.Context, sigHupCh chan os.Signal) error { - ticker := time.NewTicker(a.CheckInterval) - defer ticker.Stop() - - slog.Info("Started agent") - quit := make(chan struct{}) - - go func() { - L: - for { - select { - case <-ticker.C: - if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil { - slog.Error("failed to write metrics: %+v", "err", err.Error()) + for { + slog.Info("Started agent") + eg, _egCtx := errgroup.WithContext(ctx) + egCtx, egCancel := context.WithCancel(_egCtx) + eg.Go(func() error { + ticker := time.NewTicker(a.CheckInterval) + defer ticker.Stop() + L: + for { + select { + case <-egCtx.Done(): + break L + case <-ticker.C: + slog.Debug("Collecting metrics") + if err := a.collectMetrics(); err != nil { + egCancel() + return fmt.Errorf("collect metrics: %w", err) + } + if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil { + egCancel() + return fmt.Errorf("write metrics: %w", err) + } } - case <-ctx.Done(): - slog.Info("Stopping agent...") - break L } - } - quit <- struct{}{} - }() - - for { - select { - case <-sigHupCh: - slog.Info("Received SIGHUP. Reloading config...") - a.reloadConfig() - case <-ticker.C: - if err := a.adjustInstancePool(); err != nil { - slog.Error("failed to check instances", "err", err.Error()) + return nil + }) + eg.Go(func() error { + ticker := time.NewTicker(a.CheckInterval) + defer ticker.Stop() + L: + for { + select { + case <-sigHupCh: + slog.Info("Received SIGHUP. Reloading config...") + if err := a.reloadConfig(); err != nil { + return fmt.Errorf("reload config: %w", err) + } + case <-egCtx.Done(): + slog.Info("Stopping agent...") + break L + case <-ticker.C: + if err := a.adjustInstancePool(); err != nil { + egCancel() + return fmt.Errorf("adjust instances: %w", err) + } + } } - case <-quit: return nil + }) + + if err := eg.Wait(); err != nil { + slog.Error("Error occurred", "err", err.Error()) + continue + } else { + break } } + return nil } func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int { @@ -227,10 +251,8 @@ func (a *Agent) adjustInstancePool() error { } } - lxdInstances.Reset() for _, i := range s { l := slog.With("instance", i.Name) - lxdInstances.WithLabelValues(i.Status, i.Config[configKeyResourceType]).Inc() if _, ok := a.ResourceTypesCounts[i.Config[configKeyResourceType]]; !ok { toDelete = append(toDelete, i.Config[configKeyResourceType]) } @@ -266,6 +288,18 @@ func (a *Agent) adjustInstancePool() error { return nil } +func (a *Agent) collectMetrics() error { + s, err := a.Client.GetInstances(api.InstanceTypeAny) + if err != nil { + return fmt.Errorf("get instances: %w", err) + } + lxdInstances.Reset() + for _, i := range s { + lxdInstances.WithLabelValues(i.Status, i.Config[configKeyResourceType]).Inc() + } + return nil +} + func (a *Agent) isZombieInstance(i api.Instance) bool { if i.StatusCode == api.Frozen { return false