Skip to content

Commit

Permalink
Merge pull request #36 from whywaita/refactor/rest-api-semaphore
Browse files Browse the repository at this point in the history
Add semaphore for GetResourceFromLXDWithClient
  • Loading branch information
whywaita committed Mar 7, 2024
2 parents b94756a + 98949bf commit 58c0274
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 112 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ jobs:
go-version-file: ${{ matrix.target }}/go.mod
- name: lint
run: |
GO111MODULE=off GOBIN=$(pwd)/bin go get golang.org/x/lint/golint
bin/golint -set_exit_status ./...
cd ${{ matrix.target }}
go install honnef.co/go/tools/cmd/staticcheck@latest
staticcheck ./...
- name: go vet
run: |
cd ${{ matrix.target }}
Expand Down
2 changes: 1 addition & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21 AS builder
FROM golang:1.22 AS builder

WORKDIR /go/src/github.com/whywaita/shoes-lxd-multi/server

Expand Down
5 changes: 3 additions & 2 deletions server/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/whywaita/shoes-lxd-multi/server

go 1.21
go 1.22.0

require (
github.com/docker/go-units v0.5.0
Expand All @@ -9,7 +9,8 @@ require (
github.com/prometheus/client_golang v1.12.1
github.com/whywaita/myshoes v1.14.0
github.com/whywaita/shoes-lxd-multi/proto.go v0.0.0-20230331051154-d763b94b0dd7
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
github.com/whywaita/xsemaphore v0.0.0-20240305080042-cf6ba671d2e7
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.54.0
)

Expand Down
6 changes: 4 additions & 2 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/whywaita/myshoes v1.14.0 h1:wIJoInogzSErsdVEwiKjWJuqXzg0mKpxxf5YKeaT3
github.com/whywaita/myshoes v1.14.0/go.mod h1:Fea/XeUlwjWSZdqfG/Ub3+U278kGilIG9o9f6DxRpq8=
github.com/whywaita/shoes-lxd-multi/proto.go v0.0.0-20230331051154-d763b94b0dd7 h1:MdNKtHc/T+46wn7JfEi5P72Q/GHVPHLInqLldZB4eWE=
github.com/whywaita/shoes-lxd-multi/proto.go v0.0.0-20230331051154-d763b94b0dd7/go.mod h1:2Z9+TYX1eQMUrCOnFBG71JaVcq5D8enmYJH6rnCWW88=
github.com/whywaita/xsemaphore v0.0.0-20240305080042-cf6ba671d2e7 h1:1dG8mw/qyLMKtzfkdQnsZABm07YXxOfPI+2GRIXileo=
github.com/whywaita/xsemaphore v0.0.0-20240305080042-cf6ba671d2e7/go.mod h1:DxG5uRIYgWQPhHggrUwSxLdCzhmKD91mUZYxq/det6o=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -388,8 +390,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
64 changes: 4 additions & 60 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/whywaita/shoes-lxd-multi/server/pkg/resourcecache"

"github.com/whywaita/shoes-lxd-multi/server/pkg/api"
"github.com/whywaita/shoes-lxd-multi/server/pkg/config"
"github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient"
"github.com/whywaita/shoes-lxd-multi/server/pkg/metric"
)

Expand All @@ -28,6 +27,8 @@ func main() {
}

func run() error {
ctx := context.Background()

hostConfigs, mapping, periodSec, listenPort, overCommitPercent, err := config.Load()
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
Expand All @@ -36,13 +37,12 @@ func run() error {
go serveMetrics(context.Background(), hostConfigs)

// lxd resource cache
t := time.NewTicker(time.Duration(periodSec) * time.Second)
var hcs []config.HostConfig
hostConfigs.Range(func(key string, value config.HostConfig) bool {
hcs = append(hcs, value)
return true
})
go setLXDResourceCacheWithTicker(hcs, t)
go resourcecache.RunLXDResourceCacheTicker(ctx, hcs, periodSec)

server, err := api.New(hostConfigs, mapping, overCommitPercent)
if err != nil {
Expand Down Expand Up @@ -81,59 +81,3 @@ func serveMetrics(ctx context.Context, hostConfigs *config.HostConfigMap) {
log.Fatal("failed to serve metrics (port 9090)", "err", err.Error())
}
}

func setLXDResourceCacheWithTicker(hcs []config.HostConfig, ticker *time.Ticker) {
for {
<-ticker.C
if err := setLXDResourceCache(hcs); err != nil {
log.Fatal("failed to set lxd resource cache", "err", err.Error())
}
}
}

func setLXDResourceCache(hcs []config.HostConfig) error {
hosts, _, err := lxdclient.ConnectLXDs(hcs)
if err != nil {
return fmt.Errorf("failed to connect LXD hosts: %s", err)
}

for _, host := range hosts {
l := slog.With("host", host.HostConfig.LxdHost)
if err := setLXDHostResourceCache(&host); err != nil {
l.Warn("failed to set lxd host resource cache", "err", err.Error())
continue
}
}
return nil
}

func setLXDHostResourceCache(host *lxdclient.LXDHost) error {
allCPU, allMemory, hostname, err := lxdclient.ScrapeLXDHostResources(host.Client)
if err != nil {
return fmt.Errorf("failed to scrape lxd resources: %s", err)
}

instances, err := lxdclient.GetAnyInstances(host.Client)
if err != nil {
return fmt.Errorf("failed to retrieve list of instance (host: %s): %s", hostname, err)
}

allocatedCPU, allocatedMemory, err := lxdclient.ScrapeLXDHostAllocatedResources(instances)
if err != nil {
return fmt.Errorf("failed to scrape instance info: %s", err)
}

s := lxdclient.LXDStatus{
Resource: lxdclient.Resource{
CPUTotal: allCPU,
MemoryTotal: allMemory,
CPUUsed: allocatedCPU,
MemoryUsed: allocatedMemory,
},
HostConfig: host.HostConfig,
}
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, s); err != nil {
return fmt.Errorf("failed to set status cache: %s", err)
}
return nil
}
15 changes: 9 additions & 6 deletions server/pkg/api/server_add_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
var client lxd.InstanceServer
var reqInstance *api.InstancesPost
if errors.Is(err, ErrInstanceIsNotFound) {
host, reqInstance, err = s.setLXDStatusCache(targetLXDHosts, instanceName, instanceSource, req)
host, reqInstance, err = s.setLXDStatusCache(ctx, targetLXDHosts, instanceName, instanceSource, req)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to set LXD status cache: %+v", err)
}
Expand Down Expand Up @@ -84,6 +84,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
op, err := client.DeleteInstance(instanceName)
if err != nil {
l.Warn("failed to delete instance", "err", err.Error())
return nil, status.Errorf(codes.Internal, "failed to wait starting instance: %+v", err)
}
if err := op.Wait(); err != nil {
l.Warn("failed to wait deleting instance", "err", err.Error())
Expand All @@ -97,6 +98,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
op, err := client.DeleteInstance(instanceName)
if err != nil {
l.Warn("failed to delete instance", "err", err.Error())
return nil, status.Errorf(codes.Internal, "failed to wait starting instance: %+v", err)
}
if err := op.Wait(); err != nil {
l.Warn("failed to wait deleting instance", "err", err.Error())
Expand All @@ -120,6 +122,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
}

func (s *ShoesLXDMultiServer) setLXDStatusCache(
ctx context.Context,
targetLXDHosts []lxdclient.LXDHost,
instanceName string,
instanceSource *api.InstanceSource,
Expand All @@ -128,7 +131,7 @@ func (s *ShoesLXDMultiServer) setLXDStatusCache(
s.mu.Lock()
defer s.mu.Unlock()

host, err := s.scheduleHost(targetLXDHosts)
host, err := s.scheduleHost(ctx, targetLXDHosts)
if err != nil {
return nil, nil, status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
Expand Down Expand Up @@ -205,8 +208,8 @@ type targetHost struct {
percentOverCommit uint64
}

func (s *ShoesLXDMultiServer) scheduleHost(targetLXDHosts []lxdclient.LXDHost) (*lxdclient.LXDHost, error) {
targets, err := getResources(targetLXDHosts)
func (s *ShoesLXDMultiServer) scheduleHost(ctx context.Context, targetLXDHosts []lxdclient.LXDHost) (*lxdclient.LXDHost, error) {
targets, err := getResources(ctx, targetLXDHosts)
if err != nil {
return nil, fmt.Errorf("failed to get resources: %w", err)
}
Expand All @@ -218,7 +221,7 @@ func (s *ShoesLXDMultiServer) scheduleHost(targetLXDHosts []lxdclient.LXDHost) (
return &(target.host), nil
}

func getResources(targetLXDHosts []lxdclient.LXDHost) ([]targetHost, error) {
func getResources(ctx context.Context, targetLXDHosts []lxdclient.LXDHost) ([]targetHost, error) {
var targets []targetHost

eg := errgroup.Group{}
Expand All @@ -228,7 +231,7 @@ func getResources(targetLXDHosts []lxdclient.LXDHost) ([]targetHost, error) {
t := t
eg.Go(func() error {
l := slog.With("host", t.HostConfig.LxdHost)
resources, err := lxdclient.GetResource(t.HostConfig, l)
resources, err := lxdclient.GetResource(ctx, t.HostConfig, l)
if err != nil {
l.Warn("failed to get resource", "err", err.Error())
return nil
Expand Down
90 changes: 75 additions & 15 deletions server/pkg/lxdclient/resource.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package lxdclient

import (
"context"
"errors"
"fmt"
"log/slog"
"strconv"
"sync"

"github.com/whywaita/shoes-lxd-multi/server/pkg/config"

"github.com/docker/go-units"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/shared/api"
"github.com/whywaita/xsemaphore"
)

// Resource is resource of lxd host
Expand All @@ -27,7 +30,7 @@ func GetCPUOverCommitPercent(in Resource) uint64 {
}

// GetResource get Resource
func GetResource(hostConfig config.HostConfig, logger *slog.Logger) (*Resource, error) {
func GetResource(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, error) {
status, err := GetStatusCache(hostConfig.LxdHost)
if err == nil {
// found from cache
Expand All @@ -39,22 +42,50 @@ func GetResource(hostConfig config.HostConfig, logger *slog.Logger) (*Resource,

logger.Warn("failed to get status from cache, so scrape from lxd")

r, _, _, err := GetResourceFromLXD(ctx, hostConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to get resource from lxd: %w", err)
}
s := LXDStatus{
Resource: *r,
HostConfig: hostConfig,
}
if err := SetStatusCache(hostConfig.LxdHost, s); err != nil {
return nil, fmt.Errorf("failed to set status to cache: %w", err)
}

return r, nil
}

// GetResourceFromLXD get resources from LXD API
func GetResourceFromLXD(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
client, err := ConnectLXDWithTimeout(hostConfig.LxdHost, hostConfig.LxdClientCert, hostConfig.LxdClientKey)
if err != nil {
return nil, fmt.Errorf("failed to connect lxd: %w", err)
return nil, nil, "", fmt.Errorf("failed to connect lxd: %w", err)
}

cpuTotal, memoryTotal, _, err := ScrapeLXDHostResources(*client)
return GetResourceFromLXDWithClient(ctx, *client, hostConfig.LxdHost, logger)
}

// GetResourceFromLXDWithClient get resources from LXD API with client
func GetResourceFromLXDWithClient(ctx context.Context, client lxd.InstanceServer, host string, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
sem := xsemaphore.Get(host, 1)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, nil, "", fmt.Errorf("failed to acquire semaphore: %w", err)
}
defer sem.Release(1)

cpuTotal, memoryTotal, hostname, err := ScrapeLXDHostResources(client, host, logger)
if err != nil {
return nil, fmt.Errorf("failed to scrape total resource: %w", err)
return nil, nil, "", fmt.Errorf("failed to scrape total resource: %w", err)
}
instances, err := GetAnyInstances(*client)
instances, err := GetAnyInstances(client)
if err != nil {
return nil, fmt.Errorf("failed to retrieve list of instance: %w", err)
return nil, nil, "", fmt.Errorf("failed to retrieve list of instance: %w", err)
}
cpuUsed, memoryUsed, err := ScrapeLXDHostAllocatedResources(instances)
if err != nil {
return nil, fmt.Errorf("failed to scrape allocated resource: %w", err)
return nil, nil, "", fmt.Errorf("failed to scrape allocated resource: %w", err)
}

r := Resource{
Expand All @@ -63,19 +94,48 @@ func GetResource(hostConfig config.HostConfig, logger *slog.Logger) (*Resource,
CPUUsed: cpuUsed,
MemoryUsed: memoryUsed,
}
s := LXDStatus{
Resource: r,
HostConfig: hostConfig,

return &r, instances, hostname, nil
}

var (
// LXDHostResourceCache is cache of LXD resource
LXDHostResourceCache sync.Map
)

// LXDHostResource is resource of LXD host
type LXDHostResource struct {
CPUTotal uint64
MemoryTotal uint64
Hostname string
}

// ScrapeLXDHostResources scrape all resources
func ScrapeLXDHostResources(client lxd.InstanceServer, host string, logger *slog.Logger) (uint64, uint64, string, error) {
v, ok := LXDHostResourceCache.Load(host)
if ok {
r := v.(LXDHostResource)
return r.CPUTotal, r.MemoryTotal, r.Hostname, nil
}
if err := SetStatusCache(hostConfig.LxdHost, s); err != nil {
return nil, fmt.Errorf("failed to set status to cache: %w", err)

logger.Warn("failed to get host resource from cache, so scrape from lxd")

cpuTotal, memoryTotal, hostname, err := ScrapeLXDHostResourcesFromLXD(client)
if err != nil {
return 0, 0, "", fmt.Errorf("failed to scrape total resource: %w", err)
}

return &r, nil
LXDHostResourceCache.Store(host, LXDHostResource{
CPUTotal: cpuTotal,
MemoryTotal: memoryTotal,
Hostname: hostname,
})

return cpuTotal, memoryTotal, hostname, nil
}

// ScrapeLXDHostResources scrape all resources
func ScrapeLXDHostResources(client lxd.InstanceServer) (uint64, uint64, string, error) {
// ScrapeLXDHostResourcesFromLXD scrape all resources
func ScrapeLXDHostResourcesFromLXD(client lxd.InstanceServer) (uint64, uint64, string, error) {
resources, err := client.GetServerResources()
if err != nil {
return 0, 0, "", fmt.Errorf("failed to get server resource: %w", err)
Expand Down
Loading

0 comments on commit 58c0274

Please sign in to comment.