diff --git a/server/pkg/api/server.go b/server/pkg/api/server.go index ff6b96c..f7139de 100644 --- a/server/pkg/api/server.go +++ b/server/pkg/api/server.go @@ -49,15 +49,35 @@ func (s *ShoesLXDMultiServer) Run(listenPort int) error { // isExistInstance search created instance in same name func (s *ShoesLXDMultiServer) isExistInstance(targetLXDHosts []lxdclient.LXDHost, instanceName string) *lxdclient.LXDHost { + ch := make(chan lxdclient.LXDHost) + count := len(targetLXDHosts) + for _, lxdHost := range targetLXDHosts { - _, _, err := lxdHost.Client.GetInstance(instanceName) - if err == nil { - // found LXD worker - return &lxdHost - } + lxdHost := lxdHost + go func() { + defer func() { + count-- + }() + + _, _, err := lxdHost.Client.GetInstance(instanceName) + if err == nil { + // found LXD worker + ch <- lxdHost + return + } + }() } - return nil + for { + select { + case found := <-ch: + return &found + default: + if count == 0 { + return nil + } + } + } } func (s *ShoesLXDMultiServer) validateTargetHosts(targetHosts []string) ([]lxdclient.LXDHost, error) { diff --git a/server/pkg/api/server_add_instance.go b/server/pkg/api/server_add_instance.go index d4848ba..786ac52 100644 --- a/server/pkg/api/server_add_instance.go +++ b/server/pkg/api/server_add_instance.go @@ -2,6 +2,7 @@ package api import ( "context" + "errors" "fmt" "log" "math/rand" @@ -10,12 +11,13 @@ import ( "strconv" "strings" - "github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient" - lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/shared/api" + "github.com/whywaita/myshoes/pkg/runner" pb "github.com/whywaita/shoes-lxd-multi/proto.go" + "github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -122,8 +124,12 @@ func (s *ShoesLXDMultiServer) scheduleHost(targetLXDHosts []lxdclient.LXDHost) ( var targets []targetHost for _, t := range targetLXDHosts { - resources, err := lxdclient.GetResource(t.Client) + resources, err := lxdclient.GetResource(t) if err != nil { + if errors.Is(err, lxdclient.ErrTimeoutConnectLXD) { + continue + } + return nil, fmt.Errorf("failed to get resource (host: %s): %w", t.HostConfig.LxdHost, err) } diff --git a/server/pkg/lxdclient/connect.go b/server/pkg/lxdclient/connect.go index 27e35da..fbd3ea3 100644 --- a/server/pkg/lxdclient/connect.go +++ b/server/pkg/lxdclient/connect.go @@ -4,8 +4,11 @@ import ( "errors" "fmt" "log" + "sync" "time" + "golang.org/x/sync/errgroup" + lxd "github.com/lxc/lxd/client" "github.com/whywaita/shoes-lxd-multi/server/pkg/config" ) @@ -20,20 +23,34 @@ type LXDHost struct { func ConnectLXDs(hostConfigs []config.HostConfig) ([]LXDHost, error) { var targetLXDHosts []LXDHost + eg := errgroup.Group{} + mu := sync.Mutex{} + for _, hc := range hostConfigs { - conn, err := connectLXDWithTimeout(hc.LxdHost, hc.LxdClientCert, hc.LxdClientKey) - if err != nil && !errors.Is(err, ErrTimeoutConnectLXD) { - return nil, fmt.Errorf("failed to connect LXD with timeout: %w", err) - } else if errors.Is(err, ErrTimeoutConnectLXD) { - log.Printf("failed to connect LXD, So ignore host (host: %s)\n", hc.LxdHost) - continue - } - targetLXDHosts = append(targetLXDHosts, LXDHost{ - Client: *conn, - HostConfig: hc, + hc := hc + eg.Go(func() error { + conn, err := ConnectLXDWithTimeout(hc.LxdHost, hc.LxdClientCert, hc.LxdClientKey) + if err != nil && !errors.Is(err, ErrTimeoutConnectLXD) { + return fmt.Errorf("failed to connect LXD with timeout: %w", err) + } else if errors.Is(err, ErrTimeoutConnectLXD) { + log.Printf("failed to connect LXD, So ignore host (host: %s)\n", hc.LxdHost) + return nil + } + + mu.Lock() + targetLXDHosts = append(targetLXDHosts, LXDHost{ + Client: *conn, + HostConfig: hc, + }) + mu.Unlock() + return nil }) } + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("failed to connect LXD servers: %w", err) + } + return targetLXDHosts, nil } @@ -42,7 +59,9 @@ var ( ErrTimeoutConnectLXD = fmt.Errorf("timeout of ConnectLXD") ) -func connectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) { +// ConnectLXDWithTimeout connect LXD API with timeout +// lxd.ConnectLXD is not support context yet. So ConnectLXDWithTimeout occurred goroutine leak if timeout. +func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) { type resultConnectLXD struct { client lxd.InstanceServer err error diff --git a/server/pkg/lxdclient/resource.go b/server/pkg/lxdclient/resource.go index 4e437ac..420f28e 100644 --- a/server/pkg/lxdclient/resource.go +++ b/server/pkg/lxdclient/resource.go @@ -1,10 +1,13 @@ package lxdclient import ( + "errors" "fmt" "log" "strconv" + "github.com/whywaita/shoes-lxd-multi/server/pkg/config" + "github.com/docker/go-units" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/shared/api" @@ -24,12 +27,26 @@ func GetCPUOverCommitPercent(in Resource) uint64 { } // GetResource get Resource -func GetResource(client lxd.InstanceServer) (*Resource, error) { - cpuTotal, memoryTotal, _, err := ScrapeLXDHostResources(client) +func GetResource(hostConfig config.HostConfig) (*Resource, error) { + status, err := GetStatusCache(hostConfig.LxdHost) + if err == nil { + // found from cache + return &status.Resource, nil + } + if err != nil && !errors.Is(err, ErrCacheNotFound) { + return nil, fmt.Errorf("failed to get status from cache: %w", err) + } + + client, err := ConnectLXDWithTimeout(hostConfig.LxdHost, hostConfig.LxdClientCert, hostConfig.LxdClientKey) + if err != nil { + return nil, fmt.Errorf("failed to connect lxd: %w", err) + } + + cpuTotal, memoryTotal, _, err := ScrapeLXDHostResources(*client) if err != nil { return nil, fmt.Errorf("failed to scrape total resource: %w", err) } - instances, err := GetAnyInstances(client) + instances, err := GetAnyInstances(*client) if err != nil { return nil, fmt.Errorf("failed to retrieve list of instance: %w", err) } @@ -38,12 +55,21 @@ func GetResource(client lxd.InstanceServer) (*Resource, error) { return nil, fmt.Errorf("failed to scrape allocated resource: %w", err) } - return &Resource{ + r := Resource{ CPUTotal: cpuTotal, MemoryTotal: memoryTotal, CPUUsed: cpuUsed, MemoryUsed: memoryUsed, - }, nil + } + s := LXDStatus{ + Resource: r, + HostConfig: hostConfig, + } + if err := SetStatusCache(hostConfig.LxdHost, s); err != nil { + return nil, fmt.Errorf("failed to set status to cache: %w", err) + } + + return &r, nil } // ScrapeLXDHostResources scrape all resources diff --git a/server/pkg/lxdclient/resource_cache.go b/server/pkg/lxdclient/resource_cache.go new file mode 100644 index 0000000..760f756 --- /dev/null +++ b/server/pkg/lxdclient/resource_cache.go @@ -0,0 +1,49 @@ +package lxdclient + +import ( + "fmt" + "time" + + "github.com/patrickmn/go-cache" + "github.com/whywaita/shoes-lxd-multi/server/pkg/config" +) + +// LXDStatus is status for LXD +type LXDStatus struct { + IsGood bool + + Resource Resource + HostConfig config.HostConfig +} + +var ( + inmemoryCache = cache.New(10*time.Minute, cache.NoExpiration) + + // ErrCacheNotFound is error message for cache not found + ErrCacheNotFound = fmt.Errorf("cache not found") +) + +// GetCacheKey get a key of cache +func GetCacheKey(hostname string) string { + return fmt.Sprintf("host-%s", hostname) +} + +// GetStatusCache get a cache +func GetStatusCache(hostname string) (LXDStatus, error) { + resp, ok := inmemoryCache.Get(GetCacheKey(hostname)) + if !ok { + return LXDStatus{}, ErrCacheNotFound + } + + status, ok := resp.(LXDStatus) + if !ok { + return LXDStatus{}, fmt.Errorf("failed to cast status") + } + return status, nil +} + +// SetStatusCache set cache +func SetStatusCache(hostname string, status LXDStatus) error { + inmemoryCache.Set(GetCacheKey(hostname), status, cache.DefaultExpiration) + return nil +} diff --git a/server/pkg/metric/scrape_lxd.go b/server/pkg/metric/scrape_lxd.go index 9a92ebf..2956452 100644 --- a/server/pkg/metric/scrape_lxd.go +++ b/server/pkg/metric/scrape_lxd.go @@ -107,6 +107,19 @@ func scrapeLXDHost(ctx context.Context, hostConfigs []config.HostConfig, ch chan lxdUsageCPU, prometheus.GaugeValue, float64(allocatedCPU), hostname) ch <- prometheus.MustNewConstMetric( lxdUsageMemory, prometheus.GaugeValue, float64(allocatedMemory), hostname) + + s := lxdclient.LXDStatus{ + Resource: lxdclient.Resource{ + CPUTotal: allCPU, + MemoryTotal: allMemory, + CPUUsed: allocatedCPU, + MemoryUsed: allocatedMemory, + }, + HostConfig: host.HostConfig, + } + if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, s); err != nil { + return fmt.Errorf("failed to set status cache: %w", err) + } } return nil