Skip to content

Commit

Permalink
Parallel connect to LXD
Browse files Browse the repository at this point in the history
  • Loading branch information
whywaita committed Dec 24, 2021
1 parent f058545 commit 03e7b1b
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 25 deletions.
32 changes: 26 additions & 6 deletions server/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,35 @@ func (s *ShoesLXDMultiServer) Run(listenPort int) error {

// isExistInstance search created instance in same name
func (s *ShoesLXDMultiServer) isExistInstance(targetLXDHosts []lxdclient.LXDHost, instanceName string) *lxdclient.LXDHost {
ch := make(chan lxdclient.LXDHost)
count := len(targetLXDHosts)

for _, lxdHost := range targetLXDHosts {
_, _, err := lxdHost.Client.GetInstance(instanceName)
if err == nil {
// found LXD worker
return &lxdHost
}
lxdHost := lxdHost
go func() {
defer func() {
count--
}()

_, _, err := lxdHost.Client.GetInstance(instanceName)
if err == nil {
// found LXD worker
ch <- lxdHost
return
}
}()
}

return nil
for {
select {
case found := <-ch:
return &found
default:
if count == 0 {
return nil
}
}
}
}

func (s *ShoesLXDMultiServer) validateTargetHosts(targetHosts []string) ([]lxdclient.LXDHost, error) {
Expand Down
12 changes: 9 additions & 3 deletions server/pkg/api/server_add_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"errors"
"fmt"
"log"
"math/rand"
Expand All @@ -10,12 +11,13 @@ import (
"strconv"
"strings"

"github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient"

lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/shared/api"

"github.com/whywaita/myshoes/pkg/runner"
pb "github.com/whywaita/shoes-lxd-multi/proto.go"
"github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -122,8 +124,12 @@ func (s *ShoesLXDMultiServer) scheduleHost(targetLXDHosts []lxdclient.LXDHost) (
var targets []targetHost

for _, t := range targetLXDHosts {
resources, err := lxdclient.GetResource(t.Client)
resources, err := lxdclient.GetResource(t)
if err != nil {
if errors.Is(err, lxdclient.ErrTimeoutConnectLXD) {
continue
}

return nil, fmt.Errorf("failed to get resource (host: %s): %w", t.HostConfig.LxdHost, err)
}

Expand Down
41 changes: 30 additions & 11 deletions server/pkg/lxdclient/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"errors"
"fmt"
"log"
"sync"
"time"

"golang.org/x/sync/errgroup"

lxd "github.com/lxc/lxd/client"
"github.com/whywaita/shoes-lxd-multi/server/pkg/config"
)
Expand All @@ -20,20 +23,34 @@ type LXDHost struct {
func ConnectLXDs(hostConfigs []config.HostConfig) ([]LXDHost, error) {
var targetLXDHosts []LXDHost

eg := errgroup.Group{}
mu := sync.Mutex{}

for _, hc := range hostConfigs {
conn, err := connectLXDWithTimeout(hc.LxdHost, hc.LxdClientCert, hc.LxdClientKey)
if err != nil && !errors.Is(err, ErrTimeoutConnectLXD) {
return nil, fmt.Errorf("failed to connect LXD with timeout: %w", err)
} else if errors.Is(err, ErrTimeoutConnectLXD) {
log.Printf("failed to connect LXD, So ignore host (host: %s)\n", hc.LxdHost)
continue
}
targetLXDHosts = append(targetLXDHosts, LXDHost{
Client: *conn,
HostConfig: hc,
hc := hc
eg.Go(func() error {
conn, err := ConnectLXDWithTimeout(hc.LxdHost, hc.LxdClientCert, hc.LxdClientKey)
if err != nil && !errors.Is(err, ErrTimeoutConnectLXD) {
return fmt.Errorf("failed to connect LXD with timeout: %w", err)
} else if errors.Is(err, ErrTimeoutConnectLXD) {
log.Printf("failed to connect LXD, So ignore host (host: %s)\n", hc.LxdHost)
return nil
}

mu.Lock()
targetLXDHosts = append(targetLXDHosts, LXDHost{
Client: *conn,
HostConfig: hc,
})
mu.Unlock()
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("failed to connect LXD servers: %w", err)
}

return targetLXDHosts, nil
}

Expand All @@ -42,7 +59,9 @@ var (
ErrTimeoutConnectLXD = fmt.Errorf("timeout of ConnectLXD")
)

func connectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) {
// ConnectLXDWithTimeout connect LXD API with timeout
// lxd.ConnectLXD is not support context yet. So ConnectLXDWithTimeout occurred goroutine leak if timeout.
func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) {
type resultConnectLXD struct {
client lxd.InstanceServer
err error
Expand Down
36 changes: 31 additions & 5 deletions server/pkg/lxdclient/resource.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package lxdclient

import (
"errors"
"fmt"
"log"
"strconv"

"github.com/whywaita/shoes-lxd-multi/server/pkg/config"

"github.com/docker/go-units"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/shared/api"
Expand All @@ -24,12 +27,26 @@ func GetCPUOverCommitPercent(in Resource) uint64 {
}

// GetResource get Resource
func GetResource(client lxd.InstanceServer) (*Resource, error) {
cpuTotal, memoryTotal, _, err := ScrapeLXDHostResources(client)
func GetResource(hostConfig config.HostConfig) (*Resource, error) {
status, err := GetStatusCache(hostConfig.LxdHost)
if err == nil {
// found from cache
return &status.Resource, nil
}
if err != nil && !errors.Is(err, ErrCacheNotFound) {
return nil, fmt.Errorf("failed to get status from cache: %w", err)
}

client, err := ConnectLXDWithTimeout(hostConfig.LxdHost, hostConfig.LxdClientCert, hostConfig.LxdClientKey)
if err != nil {
return nil, fmt.Errorf("failed to connect lxd: %w", err)
}

cpuTotal, memoryTotal, _, err := ScrapeLXDHostResources(*client)
if err != nil {
return nil, fmt.Errorf("failed to scrape total resource: %w", err)
}
instances, err := GetAnyInstances(client)
instances, err := GetAnyInstances(*client)
if err != nil {
return nil, fmt.Errorf("failed to retrieve list of instance: %w", err)
}
Expand All @@ -38,12 +55,21 @@ func GetResource(client lxd.InstanceServer) (*Resource, error) {
return nil, fmt.Errorf("failed to scrape allocated resource: %w", err)
}

return &Resource{
r := Resource{
CPUTotal: cpuTotal,
MemoryTotal: memoryTotal,
CPUUsed: cpuUsed,
MemoryUsed: memoryUsed,
}, nil
}
s := LXDStatus{
Resource: r,
HostConfig: hostConfig,
}
if err := SetStatusCache(hostConfig.LxdHost, s); err != nil {
return nil, fmt.Errorf("failed to set status to cache: %w", err)
}

return &r, nil
}

// ScrapeLXDHostResources scrape all resources
Expand Down
49 changes: 49 additions & 0 deletions server/pkg/lxdclient/resource_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package lxdclient

import (
"fmt"
"time"

"github.com/patrickmn/go-cache"
"github.com/whywaita/shoes-lxd-multi/server/pkg/config"
)

// LXDStatus is status for LXD
type LXDStatus struct {
IsGood bool

Resource Resource
HostConfig config.HostConfig
}

var (
inmemoryCache = cache.New(10*time.Minute, cache.NoExpiration)

// ErrCacheNotFound is error message for cache not found
ErrCacheNotFound = fmt.Errorf("cache not found")
)

// GetCacheKey get a key of cache
func GetCacheKey(hostname string) string {
return fmt.Sprintf("host-%s", hostname)
}

// GetStatusCache get a cache
func GetStatusCache(hostname string) (LXDStatus, error) {
resp, ok := inmemoryCache.Get(GetCacheKey(hostname))
if !ok {
return LXDStatus{}, ErrCacheNotFound
}

status, ok := resp.(LXDStatus)
if !ok {
return LXDStatus{}, fmt.Errorf("failed to cast status")
}
return status, nil
}

// SetStatusCache set cache
func SetStatusCache(hostname string, status LXDStatus) error {
inmemoryCache.Set(GetCacheKey(hostname), status, cache.DefaultExpiration)
return nil
}
13 changes: 13 additions & 0 deletions server/pkg/metric/scrape_lxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ func scrapeLXDHost(ctx context.Context, hostConfigs []config.HostConfig, ch chan
lxdUsageCPU, prometheus.GaugeValue, float64(allocatedCPU), hostname)
ch <- prometheus.MustNewConstMetric(
lxdUsageMemory, prometheus.GaugeValue, float64(allocatedMemory), hostname)

s := lxdclient.LXDStatus{
Resource: lxdclient.Resource{
CPUTotal: allCPU,
MemoryTotal: allMemory,
CPUUsed: allocatedCPU,
MemoryUsed: allocatedMemory,
},
HostConfig: host.HostConfig,
}
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, s); err != nil {
return fmt.Errorf("failed to set status cache: %w", err)
}
}

return nil
Expand Down

0 comments on commit 03e7b1b

Please sign in to comment.