Skip to content

Commit

Permalink
separate agent run and collecting metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gamoutatsumi committed Apr 25, 2024
1 parent cd412d0 commit 3b1a73f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 55 deletions.
95 changes: 41 additions & 54 deletions pool-agent/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api"
"golang.org/x/sync/errgroup"
)

// Agent is an agent for pool mode.
Expand All @@ -24,6 +23,7 @@ type Agent struct {
ResourceTypesMap []ResourceTypesMap
ResourceTypesCounts ResourceTypesCounts
Client lxd.InstanceServer
MetricsClient lxd.InstanceServer

CheckInterval time.Duration
WaitIdleTime time.Duration
Expand Down Expand Up @@ -57,6 +57,10 @@ func newAgent(ctx context.Context) (*Agent, error) {
if err != nil {
return nil, fmt.Errorf("connect lxd: %w", err)
}
mc, err := lxd.ConnectLXDUnixWithContext(ctx, "", &lxd.ConnectionArgs{})
if err != nil {
return nil, fmt.Errorf("connect lxd: %w", err)
}
checkInterval, waitIdleTime, zombieAllowTime, err := LoadParams()
if err != nil {
return nil, fmt.Errorf("load params: %w", err)
Expand All @@ -79,6 +83,7 @@ func newAgent(ctx context.Context) (*Agent, error) {
ResourceTypesMap: conf.ResourceTypesMap,
ResourceTypesCounts: conf.ResourceTypesCounts,
Client: c,
MetricsClient: mc,

CheckInterval: checkInterval,
WaitIdleTime: waitIdleTime,
Expand Down Expand Up @@ -119,65 +124,27 @@ func (a *Agent) reloadConfig() error {
}

// Run runs the agent.

func (a *Agent) Run(ctx context.Context, sigHupCh chan os.Signal) error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
slog.Info("Started agent")
for {
slog.Info("Started agent")
eg, _egCtx := errgroup.WithContext(ctx)
egCtx, egCancel := context.WithCancel(_egCtx)
eg.Go(func() error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
L:
for {
select {
case <-egCtx.Done():
break L
case <-ticker.C:
slog.Debug("Collecting metrics")
if err := a.collectMetrics(); err != nil {
egCancel()
return fmt.Errorf("collect metrics: %w", err)
}
if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil {
egCancel()
return fmt.Errorf("write metrics: %w", err)
}
}
select {
case <-sigHupCh:
slog.Info("Received SIGHUP. Reloading config...")
if err := a.reloadConfig(); err != nil {
slog.Error("Failed to reload config", "err", err.Error())
}
case <-ctx.Done():
slog.Info("Stopping agent...")
return nil
})
eg.Go(func() error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
L:
for {
select {
case <-sigHupCh:
slog.Info("Received SIGHUP. Reloading config...")
if err := a.reloadConfig(); err != nil {
return fmt.Errorf("reload config: %w", err)
}
case <-egCtx.Done():
slog.Info("Stopping agent...")
break L
case <-ticker.C:
if err := a.adjustInstancePool(); err != nil {
egCancel()
return fmt.Errorf("adjust instances: %w", err)
}
}
case <-ticker.C:
if err := a.adjustInstancePool(); err != nil {
slog.Error("Failed to adjust instances", "err", err)
}
return nil
})

if err := eg.Wait(); err != nil {
slog.Error("Error occurred", "err", err.Error())
continue
} else {
break
}
}
return nil
}

func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int {
Expand Down Expand Up @@ -288,8 +255,28 @@ func (a *Agent) adjustInstancePool() error {
return nil
}

func (a *Agent) CollectMetrics(ctx context.Context) error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("Stopping metrics collection...")
return nil
case <-ticker.C:
slog.Debug("Collecting metrics...")
if err := a.collectMetrics(); err != nil {
slog.Error("Failed to collect metrics", "err", err)
}
if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil {
slog.Error("Failed to write metrics", "err", err)
}
}
}
}

func (a *Agent) collectMetrics() error {
s, err := a.Client.GetInstances(api.InstanceTypeAny)
s, err := a.MetricsClient.GetInstances(api.InstanceTypeAny)
if err != nil {
return fmt.Errorf("get instances: %w", err)
}
Expand Down
19 changes: 18 additions & 1 deletion pool-agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

func init() {
Expand All @@ -26,5 +28,20 @@ var agentRunCommand = &cobra.Command{
return err
}

return agent.Run(ctx, sigHupCh)
eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
if err := agent.CollectMetrics(egCtx); err != nil {
return fmt.Errorf("collect metrics: %w", err)
}
return nil
})

eg.Go(func() error {
if err := agent.Run(egCtx, sigHupCh); err != nil {
return fmt.Errorf("run agent: %w", err)
}
return nil
})
return eg.Wait()
}}

0 comments on commit 3b1a73f

Please sign in to comment.