From 3b1a73f2dfd5460dfe62be118271a44667c3bcab Mon Sep 17 00:00:00 2001 From: gamoutatsumi Date: Thu, 25 Apr 2024 11:53:05 +0900 Subject: [PATCH] separate agent run and collecting metrics --- pool-agent/cmd/agent.go | 95 ++++++++++++++++++----------------------- pool-agent/cmd/run.go | 19 ++++++++- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/pool-agent/cmd/agent.go b/pool-agent/cmd/agent.go index 6809172..54ad8ad 100644 --- a/pool-agent/cmd/agent.go +++ b/pool-agent/cmd/agent.go @@ -13,7 +13,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api" - "golang.org/x/sync/errgroup" ) // Agent is an agent for pool mode. @@ -24,6 +23,7 @@ type Agent struct { ResourceTypesMap []ResourceTypesMap ResourceTypesCounts ResourceTypesCounts Client lxd.InstanceServer + MetricsClient lxd.InstanceServer CheckInterval time.Duration WaitIdleTime time.Duration @@ -57,6 +57,10 @@ func newAgent(ctx context.Context) (*Agent, error) { if err != nil { return nil, fmt.Errorf("connect lxd: %w", err) } + mc, err := lxd.ConnectLXDUnixWithContext(ctx, "", &lxd.ConnectionArgs{}) + if err != nil { + return nil, fmt.Errorf("connect lxd: %w", err) + } checkInterval, waitIdleTime, zombieAllowTime, err := LoadParams() if err != nil { return nil, fmt.Errorf("load params: %w", err) @@ -79,6 +83,7 @@ func newAgent(ctx context.Context) (*Agent, error) { ResourceTypesMap: conf.ResourceTypesMap, ResourceTypesCounts: conf.ResourceTypesCounts, Client: c, + MetricsClient: mc, CheckInterval: checkInterval, WaitIdleTime: waitIdleTime, @@ -119,65 +124,27 @@ func (a *Agent) reloadConfig() error { } // Run runs the agent. + func (a *Agent) Run(ctx context.Context, sigHupCh chan os.Signal) error { + ticker := time.NewTicker(a.CheckInterval) + defer ticker.Stop() + slog.Info("Started agent") for { - slog.Info("Started agent") - eg, _egCtx := errgroup.WithContext(ctx) - egCtx, egCancel := context.WithCancel(_egCtx) - eg.Go(func() error { - ticker := time.NewTicker(a.CheckInterval) - defer ticker.Stop() - L: - for { - select { - case <-egCtx.Done(): - break L - case <-ticker.C: - slog.Debug("Collecting metrics") - if err := a.collectMetrics(); err != nil { - egCancel() - return fmt.Errorf("collect metrics: %w", err) - } - if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil { - egCancel() - return fmt.Errorf("write metrics: %w", err) - } - } + select { + case <-sigHupCh: + slog.Info("Received SIGHUP. Reloading config...") + if err := a.reloadConfig(); err != nil { + slog.Error("Failed to reload config", "err", err.Error()) } + case <-ctx.Done(): + slog.Info("Stopping agent...") return nil - }) - eg.Go(func() error { - ticker := time.NewTicker(a.CheckInterval) - defer ticker.Stop() - L: - for { - select { - case <-sigHupCh: - slog.Info("Received SIGHUP. Reloading config...") - if err := a.reloadConfig(); err != nil { - return fmt.Errorf("reload config: %w", err) - } - case <-egCtx.Done(): - slog.Info("Stopping agent...") - break L - case <-ticker.C: - if err := a.adjustInstancePool(); err != nil { - egCancel() - return fmt.Errorf("adjust instances: %w", err) - } - } + case <-ticker.C: + if err := a.adjustInstancePool(); err != nil { + slog.Error("Failed to adjust instances", "err", err) } - return nil - }) - - if err := eg.Wait(); err != nil { - slog.Error("Error occurred", "err", err.Error()) - continue - } else { - break } } - return nil } func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int { @@ -288,8 +255,28 @@ func (a *Agent) adjustInstancePool() error { return nil } +func (a *Agent) CollectMetrics(ctx context.Context) error { + ticker := time.NewTicker(a.CheckInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + slog.Info("Stopping metrics collection...") + return nil + case <-ticker.C: + slog.Debug("Collecting metrics...") + if err := a.collectMetrics(); err != nil { + slog.Error("Failed to collect metrics", "err", err) + } + if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil { + slog.Error("Failed to write metrics", "err", err) + } + } + } +} + func (a *Agent) collectMetrics() error { - s, err := a.Client.GetInstances(api.InstanceTypeAny) + s, err := a.MetricsClient.GetInstances(api.InstanceTypeAny) if err != nil { return fmt.Errorf("get instances: %w", err) } diff --git a/pool-agent/cmd/run.go b/pool-agent/cmd/run.go index 6f8552f..5f58447 100644 --- a/pool-agent/cmd/run.go +++ b/pool-agent/cmd/run.go @@ -2,11 +2,13 @@ package cmd import ( "context" + "fmt" "os" "os/signal" "syscall" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) func init() { @@ -26,5 +28,20 @@ var agentRunCommand = &cobra.Command{ return err } - return agent.Run(ctx, sigHupCh) + eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + if err := agent.CollectMetrics(egCtx); err != nil { + return fmt.Errorf("collect metrics: %w", err) + } + return nil + }) + + eg.Go(func() error { + if err := agent.Run(egCtx, sigHupCh); err != nil { + return fmt.Errorf("run agent: %w", err) + } + return nil + }) + return eg.Wait() }}