Skip to content

Commit

Permalink
separate metrics and use errgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
gamoutatsumi committed Apr 24, 2024
1 parent 917f608 commit cd412d0
Showing 1 changed file with 65 additions and 31 deletions.
96 changes: 65 additions & 31 deletions pool-agent/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api"
"golang.org/x/sync/errgroup"
)

// Agent is an agent for pool mode.
Expand Down Expand Up @@ -119,41 +120,64 @@ func (a *Agent) reloadConfig() error {

// Run runs the agent.
func (a *Agent) Run(ctx context.Context, sigHupCh chan os.Signal) error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()

slog.Info("Started agent")
quit := make(chan struct{})

go func() {
L:
for {
select {
case <-ticker.C:
if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil {
slog.Error("failed to write metrics: %+v", "err", err.Error())
for {
slog.Info("Started agent")
eg, _egCtx := errgroup.WithContext(ctx)
egCtx, egCancel := context.WithCancel(_egCtx)
eg.Go(func() error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
L:
for {
select {
case <-egCtx.Done():
break L
case <-ticker.C:
slog.Debug("Collecting metrics")
if err := a.collectMetrics(); err != nil {
egCancel()
return fmt.Errorf("collect metrics: %w", err)
}
if err := prometheus.WriteToTextfile(metricsPath, a.registry); err != nil {
egCancel()
return fmt.Errorf("write metrics: %w", err)
}
}
case <-ctx.Done():
slog.Info("Stopping agent...")
break L
}
}
quit <- struct{}{}
}()

for {
select {
case <-sigHupCh:
slog.Info("Received SIGHUP. Reloading config...")
a.reloadConfig()
case <-ticker.C:
if err := a.adjustInstancePool(); err != nil {
slog.Error("failed to check instances", "err", err.Error())
return nil
})
eg.Go(func() error {
ticker := time.NewTicker(a.CheckInterval)
defer ticker.Stop()
L:
for {
select {
case <-sigHupCh:
slog.Info("Received SIGHUP. Reloading config...")
if err := a.reloadConfig(); err != nil {
return fmt.Errorf("reload config: %w", err)
}
case <-egCtx.Done():
slog.Info("Stopping agent...")
break L
case <-ticker.C:
if err := a.adjustInstancePool(); err != nil {
egCancel()
return fmt.Errorf("adjust instances: %w", err)
}
}
}
case <-quit:
return nil
})

if err := eg.Wait(); err != nil {
slog.Error("Error occurred", "err", err.Error())
continue
} else {
break
}
}
return nil
}

func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int {
Expand Down Expand Up @@ -227,10 +251,8 @@ func (a *Agent) adjustInstancePool() error {
}
}

lxdInstances.Reset()
for _, i := range s {
l := slog.With("instance", i.Name)
lxdInstances.WithLabelValues(i.Status, i.Config[configKeyResourceType]).Inc()
if _, ok := a.ResourceTypesCounts[i.Config[configKeyResourceType]]; !ok {
toDelete = append(toDelete, i.Config[configKeyResourceType])
}
Expand Down Expand Up @@ -266,6 +288,18 @@ func (a *Agent) adjustInstancePool() error {
return nil
}

func (a *Agent) collectMetrics() error {
s, err := a.Client.GetInstances(api.InstanceTypeAny)
if err != nil {
return fmt.Errorf("get instances: %w", err)
}
lxdInstances.Reset()
for _, i := range s {
lxdInstances.WithLabelValues(i.Status, i.Config[configKeyResourceType]).Inc()
}
return nil
}

func (a *Agent) isZombieInstance(i api.Instance) bool {
if i.StatusCode == api.Frozen {
return false
Expand Down

0 comments on commit cd412d0

Please sign in to comment.