Skip to content

Commit

Permalink
Merge pull request #41 from whywaita/feat/switch-log-level
Browse files Browse the repository at this point in the history
Add environments value for setting log level
  • Loading branch information
whywaita committed Sep 18, 2024
2 parents ed5a0e6 + bc0ffb6 commit e6347dc
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 75 deletions.
3 changes: 3 additions & 0 deletions server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Server-side implementation for shoes-lxd-multi
- `LXD_MULTI_MODE`
- Mode (`create` or `pool`)
- default: `create`
- `LXD_MULTI_LOG_LEVEL`
- Log level (`debug`, `info`, `warn`, `error`, `fatal`, `panic`) will set to `log/slog.Level`
- default: `info`

## Note
LXD Server can't use `zfs` in storageclass if use `--privileged`. ref: https://discuss.linuxcontainers.org/t/docker-with-overlay-driver-in-lxd-cluster-not-working/9243
Expand Down
9 changes: 6 additions & 3 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
)

func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))

if err := run(); err != nil {
log.Fatal(err)
}
Expand All @@ -29,11 +27,16 @@ func main() {
func run() error {
ctx := context.Background()

hostConfigs, mapping, periodSec, listenPort, overCommitPercent, poolMode, err := config.Load()
hostConfigs, mapping, periodSec, listenPort, overCommitPercent, poolMode, logLevel, err := config.Load()
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
AddSource: true,
Level: *logLevel,
})))

go serveMetrics(context.Background(), hostConfigs)

// lxd resource cache
Expand Down
13 changes: 2 additions & 11 deletions server/pkg/api/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
defer cancel()
go func() {
defer close(ret)
s, err := lxdclient.GetAnyInstances(h.Client)
if err != nil {
ret <- &gotInstances{
Instances: nil,
OverCommitPercent: 0,
Error: fmt.Errorf("failed to get instances: %w", err),
}
return
}
r, err := lxdclient.GetResource(ctx, h.HostConfig, l)
if err != nil {
ret <- &gotInstances{
Expand All @@ -49,7 +40,7 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
return
}
var used uint64
for _, i := range s {
for _, i := range r.Instances {
if i.StatusCode != api.Running {
continue
}
Expand All @@ -70,7 +61,7 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
}
overCommitPercent := uint64(float64(used) / float64(r.CPUTotal) * 100)
ret <- &gotInstances{
Instances: s,
Instances: r.Instances,
OverCommitPercent: overCommitPercent,
Error: nil,
}
Expand Down
70 changes: 36 additions & 34 deletions server/pkg/api/server_add_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
return nil, err
}
}
i, _, err := host.Client.GetInstance(instanceName)
i, _, err := host.Client.GetInstance(instanceName) // this line needs to assurance, So I will get instance information again from API
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve instance information: %+v", err)
}
Expand Down Expand Up @@ -87,18 +87,37 @@ func (s *ShoesLXDMultiServer) addInstanceCreateMode(ctx context.Context, targetL
}

if errors.Is(err, ErrInstanceIsNotFound) {
var reqInstance *api.InstancesPost
host, reqInstance, err = s.setLXDStatusCache(ctx, targetLXDHosts, instanceName, instanceSource, req)
scheduledHost, err := s.scheduleHost(ctx, targetLXDHosts)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to set LXD status cache: %+v", err)
return nil, "", status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
_l := slog.With("host", scheduledHost.HostConfig.LxdHost)
_l.Info("AddInstance scheduled host", "runnerName", instanceName)

reqInstance := &api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
}
op, err := host.Client.CreateInstance(*reqInstance)

op, err := scheduledHost.Client.CreateInstance(*reqInstance)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to create instance: %+v", err)
}
if err := op.Wait(); err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to wait creating instance: %+v", err)
}
createdInstance, _, err := scheduledHost.Client.GetInstance(instanceName)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to get created instance: %+v", err)
}
if err := s.setLXDStatusCache(reqInstance, *createdInstance, *scheduledHost); err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to set LXD status cache: %+v", err)
}
host = scheduledHost
}
l = l.With("host", host.HostConfig.LxdHost)

Expand Down Expand Up @@ -203,51 +222,34 @@ func (s *ShoesLXDMultiServer) addInstancePoolMode(ctx context.Context, targets [
}

func (s *ShoesLXDMultiServer) setLXDStatusCache(
ctx context.Context,
targetLXDHosts []lxdclient.LXDHost,
instanceName string,
instanceSource *api.InstanceSource,
req *pb.AddInstanceRequest,
) (*lxdclient.LXDHost, *api.InstancesPost, error) {
reqInstance *api.InstancesPost,
newInstance api.Instance,
scheduledHost lxdclient.LXDHost,
) error {
s.mu.Lock()
defer s.mu.Unlock()

host, err := s.scheduleHost(ctx, targetLXDHosts)
if err != nil {
return nil, nil, status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
l := slog.With("host", host.HostConfig.LxdHost)
l.Info("AddInstance scheduled host", "runnerName", instanceName)

reqInstance := &api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
}

cpu, err := strconv.ParseUint(reqInstance.InstancePut.Config["limits.cpu"], 10, 64)
if err != nil {
return nil, nil, fmt.Errorf("failde to parse limits.cpu: %w", err)
return fmt.Errorf("failde to parse limits.cpu: %w", err)
}

memory, err := units.FromHumanSize(reqInstance.InstancePut.Config["limits.memory"])
if err != nil {
return nil, nil, fmt.Errorf("failde to parse limits.memory: %w", err)
return fmt.Errorf("failde to parse limits.memory: %w", err)
}

cache, err := lxdclient.GetStatusCache(host.HostConfig.LxdHost)
cache, err := lxdclient.GetStatusCache(scheduledHost.HostConfig.LxdHost)
if err != nil {
return nil, nil, err
return fmt.Errorf("failed to get status cache: %w", err)
}
cache.Resource.CPUUsed += cpu
cache.Resource.MemoryUsed += uint64(memory)
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, cache); err != nil {
return nil, nil, fmt.Errorf("failed to set status cache: %s", err)
cache.Resource.Instances = append(cache.Resource.Instances, newInstance)
if err := lxdclient.SetStatusCache(scheduledHost.HostConfig.LxdHost, cache); err != nil {
return fmt.Errorf("failed to set status cache: %s", err)
}
return host, reqInstance, nil
return nil
}

func (s *ShoesLXDMultiServer) getInstanceConfig(setupScript string, rt myshoespb.ResourceType) map[string]string {
Expand Down
28 changes: 20 additions & 8 deletions server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"strconv"

Expand All @@ -25,6 +26,8 @@ const (
EnvOverCommit = "LXD_MULTI_OVER_COMMIT_PERCENT"
// EnvMode will set running mode
EnvMode = "LXD_MULTI_MODE"

EnvLogLevel = "LXD_MULTI_LOG_LEVEL"
)

// Mapping is resource mapping
Expand All @@ -35,18 +38,18 @@ type Mapping struct {
}

// Load load config from Environment values
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uint64, bool, error) {
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uint64, bool, *slog.Level, error) {
hostConfigs, err := loadHostConfigs()
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to load host config: %w", err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to load host config: %w", err)
}

envMappingJSON := os.Getenv(EnvLXDResourceTypeMapping)
var m map[myshoespb.ResourceType]Mapping
if envMappingJSON != "" {
m, err = readResourceTypeMapping(envMappingJSON)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
}
}

Expand All @@ -57,7 +60,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
periodSec, err = strconv.ParseInt(envPeriodSec, 10, 64)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}
log.Printf("periodSec: %d\n", periodSec)
Expand All @@ -69,7 +72,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
port, err = strconv.Atoi(envPort)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
}
}

Expand All @@ -80,7 +83,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
overCommitPercent, err = strconv.ParseUint(envOCP, 10, 64)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}
log.Printf("overCommitPercent: %d\n", overCommitPercent)
Expand All @@ -92,10 +95,19 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
case "pool":
poolMode = true
default:
return nil, nil, 0, -1, 0, false, fmt.Errorf(`unknown mode %q (expected "create" or "pool")`, os.Getenv(EnvMode))
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf(`unknown mode %q (expected "create" or "pool")`, os.Getenv(EnvMode))
}

var inLogLevel string
var level slog.Level
if os.Getenv(EnvLogLevel) == "" {
inLogLevel = "INFO"
}
if err := level.UnmarshalText([]byte(inLogLevel)); err != nil {
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse log level (%s): %w", inLogLevel, err)
}

return hostConfigs, m, periodSec, port, overCommitPercent, poolMode, nil
return hostConfigs, m, periodSec, port, overCommitPercent, poolMode, &level, nil
}

func readResourceTypeMapping(env string) (map[myshoespb.ResourceType]Mapping, error) {
Expand Down
28 changes: 28 additions & 0 deletions server/pkg/lxdclient/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ var (
// ConnectLXDWithTimeout connect LXD API with timeout
// lxd.ConnectLXD is not support context yet. So ConnectLXDWithTimeout occurred goroutine leak if timeout.
func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) {
client, ok := loadConnectedInstance(host)
if ok {
return client, nil
}

type resultConnectLXD struct {
client lxd.InstanceServer
err error
Expand All @@ -105,9 +110,32 @@ func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceSer
if result.err != nil {
return nil, fmt.Errorf("failed to connect LXD: %w", result.err)
}

storeConnectedInstance(host, result.client)

return &result.client, nil
case <-time.After(2 * time.Second):
// lxd.ConnectLXD() is not support context.Context yet. need to refactor it after support context.Context.
return nil, ErrTimeoutConnectLXD
}
}

// connectedInstances is map of connected LXD instances
// key: lxdhost value: LXDHost
var connectedInstances sync.Map

// storeConnectedInstance store connected instance
func storeConnectedInstance(host string, lh lxd.InstanceServer) {
connectedInstances.Store(host, lh)
}

// loadConnectedInstance load connected instance
func loadConnectedInstance(host string) (*lxd.InstanceServer, bool) {
v, ok := connectedInstances.Load(host)
if !ok {
return nil, false
}
i := v.(lxd.InstanceServer)

return &i, true
}
21 changes: 12 additions & 9 deletions server/pkg/lxdclient/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

// Resource is resource of lxd host
type Resource struct {
Instances []api.Instance

CPUTotal uint64
MemoryTotal uint64
CPUUsed uint64
Expand Down Expand Up @@ -53,7 +55,7 @@ func GetResource(ctx context.Context, hostConfig config.HostConfig, logger *slog

logger.Warn("failed to get status from cache, so scrape from lxd")

r, _, _, err := GetResourceFromLXD(ctx, hostConfig, logger)
r, _, err := GetResourceFromLXD(ctx, hostConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to get resource from lxd: %w", err)
}
Expand All @@ -69,44 +71,45 @@ func GetResource(ctx context.Context, hostConfig config.HostConfig, logger *slog
}

// GetResourceFromLXD get resources from LXD API
func GetResourceFromLXD(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
func GetResourceFromLXD(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, string, error) {
client, err := ConnectLXDWithTimeout(hostConfig.LxdHost, hostConfig.LxdClientCert, hostConfig.LxdClientKey)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to connect lxd: %w", err)
return nil, "", fmt.Errorf("failed to connect lxd: %w", err)
}

return GetResourceFromLXDWithClient(ctx, *client, hostConfig.LxdHost, logger)
}

// GetResourceFromLXDWithClient get resources from LXD API with client
func GetResourceFromLXDWithClient(ctx context.Context, client lxd.InstanceServer, host string, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
func GetResourceFromLXDWithClient(ctx context.Context, client lxd.InstanceServer, host string, logger *slog.Logger) (*Resource, string, error) {
sem := xsemaphore.Get(host, 1)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, nil, "", fmt.Errorf("failed to acquire semaphore: %w", err)
return nil, "", fmt.Errorf("failed to acquire semaphore: %w", err)
}
defer sem.Release(1)

cpuTotal, memoryTotal, hostname, err := ScrapeLXDHostResources(client, host, logger)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to scrape total resource: %w", err)
return nil, "", fmt.Errorf("failed to scrape total resource: %w", err)
}
instances, err := GetAnyInstances(client)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to retrieve list of instance: %w", err)
return nil, "", fmt.Errorf("failed to retrieve list of instance: %w", err)
}
cpuUsed, memoryUsed, err := ScrapeLXDHostAllocatedResources(instances)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to scrape allocated resource: %w", err)
return nil, "", fmt.Errorf("failed to scrape allocated resource: %w", err)
}

r := Resource{
Instances: instances,
CPUTotal: cpuTotal,
MemoryTotal: memoryTotal,
CPUUsed: cpuUsed,
MemoryUsed: memoryUsed,
}

return &r, instances, hostname, nil
return &r, hostname, nil
}

var (
Expand Down
Loading

0 comments on commit e6347dc

Please sign in to comment.