Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add environments value for setting log level #41

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Server-side implementation for shoes-lxd-multi
- `LXD_MULTI_MODE`
- Mode (`create` or `pool`)
- default: `create`
- `LXD_MULTI_LOG_LEVEL`
- Log level (`debug`, `info`, `warn`, `error`, `fatal`, `panic`) will set to `log/slog.Level`
- default: `info`

## Note
LXD Server can't use `zfs` in storageclass if use `--privileged`. ref: https://discuss.linuxcontainers.org/t/docker-with-overlay-driver-in-lxd-cluster-not-working/9243
Expand Down
9 changes: 6 additions & 3 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
)

func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))

if err := run(); err != nil {
log.Fatal(err)
}
Expand All @@ -29,11 +27,16 @@ func main() {
func run() error {
ctx := context.Background()

hostConfigs, mapping, periodSec, listenPort, overCommitPercent, poolMode, err := config.Load()
hostConfigs, mapping, periodSec, listenPort, overCommitPercent, poolMode, logLevel, err := config.Load()
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
AddSource: true,
Level: *logLevel,
})))

go serveMetrics(context.Background(), hostConfigs)

// lxd resource cache
Expand Down
13 changes: 2 additions & 11 deletions server/pkg/api/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
defer cancel()
go func() {
defer close(ret)
s, err := lxdclient.GetAnyInstances(h.Client)
if err != nil {
ret <- &gotInstances{
Instances: nil,
OverCommitPercent: 0,
Error: fmt.Errorf("failed to get instances: %w", err),
}
return
}
r, err := lxdclient.GetResource(ctx, h.HostConfig, l)
if err != nil {
ret <- &gotInstances{
Expand All @@ -49,7 +40,7 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
return
}
var used uint64
for _, i := range s {
for _, i := range r.Instances {
if i.StatusCode != api.Running {
continue
}
Expand All @@ -70,7 +61,7 @@ func getInstancesWithTimeout(_ctx context.Context, h lxdclient.LXDHost, d time.D
}
overCommitPercent := uint64(float64(used) / float64(r.CPUTotal) * 100)
ret <- &gotInstances{
Instances: s,
Instances: r.Instances,
OverCommitPercent: overCommitPercent,
Error: nil,
}
Expand Down
70 changes: 36 additions & 34 deletions server/pkg/api/server_add_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
return nil, err
}
}
i, _, err := host.Client.GetInstance(instanceName)
i, _, err := host.Client.GetInstance(instanceName) // this line needs to assurance, So I will get instance information again from API
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve instance information: %+v", err)
}
Expand Down Expand Up @@ -87,18 +87,37 @@ func (s *ShoesLXDMultiServer) addInstanceCreateMode(ctx context.Context, targetL
}

if errors.Is(err, ErrInstanceIsNotFound) {
var reqInstance *api.InstancesPost
host, reqInstance, err = s.setLXDStatusCache(ctx, targetLXDHosts, instanceName, instanceSource, req)
scheduledHost, err := s.scheduleHost(ctx, targetLXDHosts)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to set LXD status cache: %+v", err)
return nil, "", status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
_l := slog.With("host", scheduledHost.HostConfig.LxdHost)
_l.Info("AddInstance scheduled host", "runnerName", instanceName)

reqInstance := &api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
}
op, err := host.Client.CreateInstance(*reqInstance)

op, err := scheduledHost.Client.CreateInstance(*reqInstance)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to create instance: %+v", err)
}
if err := op.Wait(); err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to wait creating instance: %+v", err)
}
createdInstance, _, err := scheduledHost.Client.GetInstance(instanceName)
if err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to get created instance: %+v", err)
}
if err := s.setLXDStatusCache(reqInstance, *createdInstance, *scheduledHost); err != nil {
return nil, "", status.Errorf(codes.Internal, "failed to set LXD status cache: %+v", err)
}
host = scheduledHost
}
l = l.With("host", host.HostConfig.LxdHost)

Expand Down Expand Up @@ -203,51 +222,34 @@ func (s *ShoesLXDMultiServer) addInstancePoolMode(ctx context.Context, targets [
}

func (s *ShoesLXDMultiServer) setLXDStatusCache(
ctx context.Context,
targetLXDHosts []lxdclient.LXDHost,
instanceName string,
instanceSource *api.InstanceSource,
req *pb.AddInstanceRequest,
) (*lxdclient.LXDHost, *api.InstancesPost, error) {
reqInstance *api.InstancesPost,
newInstance api.Instance,
scheduledHost lxdclient.LXDHost,
) error {
s.mu.Lock()
defer s.mu.Unlock()

host, err := s.scheduleHost(ctx, targetLXDHosts)
if err != nil {
return nil, nil, status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
l := slog.With("host", host.HostConfig.LxdHost)
l.Info("AddInstance scheduled host", "runnerName", instanceName)

reqInstance := &api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
}

cpu, err := strconv.ParseUint(reqInstance.InstancePut.Config["limits.cpu"], 10, 64)
if err != nil {
return nil, nil, fmt.Errorf("failde to parse limits.cpu: %w", err)
return fmt.Errorf("failde to parse limits.cpu: %w", err)
}

memory, err := units.FromHumanSize(reqInstance.InstancePut.Config["limits.memory"])
if err != nil {
return nil, nil, fmt.Errorf("failde to parse limits.memory: %w", err)
return fmt.Errorf("failde to parse limits.memory: %w", err)
}

cache, err := lxdclient.GetStatusCache(host.HostConfig.LxdHost)
cache, err := lxdclient.GetStatusCache(scheduledHost.HostConfig.LxdHost)
if err != nil {
return nil, nil, err
return fmt.Errorf("failed to get status cache: %w", err)
}
cache.Resource.CPUUsed += cpu
cache.Resource.MemoryUsed += uint64(memory)
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, cache); err != nil {
return nil, nil, fmt.Errorf("failed to set status cache: %s", err)
cache.Resource.Instances = append(cache.Resource.Instances, newInstance)
if err := lxdclient.SetStatusCache(scheduledHost.HostConfig.LxdHost, cache); err != nil {
return fmt.Errorf("failed to set status cache: %s", err)
}
return host, reqInstance, nil
return nil
}

func (s *ShoesLXDMultiServer) getInstanceConfig(setupScript string, rt myshoespb.ResourceType) map[string]string {
Expand Down
28 changes: 20 additions & 8 deletions server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"strconv"

Expand All @@ -25,6 +26,8 @@ const (
EnvOverCommit = "LXD_MULTI_OVER_COMMIT_PERCENT"
// EnvMode will set running mode
EnvMode = "LXD_MULTI_MODE"

EnvLogLevel = "LXD_MULTI_LOG_LEVEL"
)

// Mapping is resource mapping
Expand All @@ -35,18 +38,18 @@ type Mapping struct {
}

// Load load config from Environment values
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uint64, bool, error) {
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uint64, bool, *slog.Level, error) {
hostConfigs, err := loadHostConfigs()
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to load host config: %w", err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to load host config: %w", err)
}

envMappingJSON := os.Getenv(EnvLXDResourceTypeMapping)
var m map[myshoespb.ResourceType]Mapping
if envMappingJSON != "" {
m, err = readResourceTypeMapping(envMappingJSON)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
}
}

Expand All @@ -57,7 +60,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
periodSec, err = strconv.ParseInt(envPeriodSec, 10, 64)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}
log.Printf("periodSec: %d\n", periodSec)
Expand All @@ -69,7 +72,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
port, err = strconv.Atoi(envPort)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
}
}

Expand All @@ -80,7 +83,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
} else {
overCommitPercent, err = strconv.ParseUint(envOCP, 10, 64)
if err != nil {
return nil, nil, 0, -1, 0, false, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}
log.Printf("overCommitPercent: %d\n", overCommitPercent)
Expand All @@ -92,10 +95,19 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uin
case "pool":
poolMode = true
default:
return nil, nil, 0, -1, 0, false, fmt.Errorf(`unknown mode %q (expected "create" or "pool")`, os.Getenv(EnvMode))
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf(`unknown mode %q (expected "create" or "pool")`, os.Getenv(EnvMode))
}

var inLogLevel string
var level slog.Level
if os.Getenv(EnvLogLevel) == "" {
inLogLevel = "INFO"
}
if err := level.UnmarshalText([]byte(inLogLevel)); err != nil {
return nil, nil, 0, -1, 0, false, nil, fmt.Errorf("failed to parse log level (%s): %w", inLogLevel, err)
}

return hostConfigs, m, periodSec, port, overCommitPercent, poolMode, nil
return hostConfigs, m, periodSec, port, overCommitPercent, poolMode, &level, nil
}

func readResourceTypeMapping(env string) (map[myshoespb.ResourceType]Mapping, error) {
Expand Down
28 changes: 28 additions & 0 deletions server/pkg/lxdclient/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ var (
// ConnectLXDWithTimeout connect LXD API with timeout
// lxd.ConnectLXD is not support context yet. So ConnectLXDWithTimeout occurred goroutine leak if timeout.
func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceServer, error) {
client, ok := loadConnectedInstance(host)
if ok {
return client, nil
}

type resultConnectLXD struct {
client lxd.InstanceServer
err error
Expand All @@ -105,9 +110,32 @@ func ConnectLXDWithTimeout(host, clientCert, clientKey string) (*lxd.InstanceSer
if result.err != nil {
return nil, fmt.Errorf("failed to connect LXD: %w", result.err)
}

storeConnectedInstance(host, result.client)

return &result.client, nil
case <-time.After(2 * time.Second):
// lxd.ConnectLXD() is not support context.Context yet. need to refactor it after support context.Context.
return nil, ErrTimeoutConnectLXD
}
}

// connectedInstances is map of connected LXD instances
// key: lxdhost value: LXDHost
var connectedInstances sync.Map

// storeConnectedInstance store connected instance
func storeConnectedInstance(host string, lh lxd.InstanceServer) {
connectedInstances.Store(host, lh)
}

// loadConnectedInstance load connected instance
func loadConnectedInstance(host string) (*lxd.InstanceServer, bool) {
v, ok := connectedInstances.Load(host)
if !ok {
return nil, false
}
i := v.(lxd.InstanceServer)

return &i, true
}
21 changes: 12 additions & 9 deletions server/pkg/lxdclient/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

// Resource is resource of lxd host
type Resource struct {
Instances []api.Instance

CPUTotal uint64
MemoryTotal uint64
CPUUsed uint64
Expand Down Expand Up @@ -53,7 +55,7 @@ func GetResource(ctx context.Context, hostConfig config.HostConfig, logger *slog

logger.Warn("failed to get status from cache, so scrape from lxd")

r, _, _, err := GetResourceFromLXD(ctx, hostConfig, logger)
r, _, err := GetResourceFromLXD(ctx, hostConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to get resource from lxd: %w", err)
}
Expand All @@ -69,44 +71,45 @@ func GetResource(ctx context.Context, hostConfig config.HostConfig, logger *slog
}

// GetResourceFromLXD get resources from LXD API
func GetResourceFromLXD(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
func GetResourceFromLXD(ctx context.Context, hostConfig config.HostConfig, logger *slog.Logger) (*Resource, string, error) {
client, err := ConnectLXDWithTimeout(hostConfig.LxdHost, hostConfig.LxdClientCert, hostConfig.LxdClientKey)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to connect lxd: %w", err)
return nil, "", fmt.Errorf("failed to connect lxd: %w", err)
}

return GetResourceFromLXDWithClient(ctx, *client, hostConfig.LxdHost, logger)
}

// GetResourceFromLXDWithClient get resources from LXD API with client
func GetResourceFromLXDWithClient(ctx context.Context, client lxd.InstanceServer, host string, logger *slog.Logger) (*Resource, []api.Instance, string, error) {
func GetResourceFromLXDWithClient(ctx context.Context, client lxd.InstanceServer, host string, logger *slog.Logger) (*Resource, string, error) {
sem := xsemaphore.Get(host, 1)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, nil, "", fmt.Errorf("failed to acquire semaphore: %w", err)
return nil, "", fmt.Errorf("failed to acquire semaphore: %w", err)
}
defer sem.Release(1)

cpuTotal, memoryTotal, hostname, err := ScrapeLXDHostResources(client, host, logger)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to scrape total resource: %w", err)
return nil, "", fmt.Errorf("failed to scrape total resource: %w", err)
}
instances, err := GetAnyInstances(client)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to retrieve list of instance: %w", err)
return nil, "", fmt.Errorf("failed to retrieve list of instance: %w", err)
}
cpuUsed, memoryUsed, err := ScrapeLXDHostAllocatedResources(instances)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to scrape allocated resource: %w", err)
return nil, "", fmt.Errorf("failed to scrape allocated resource: %w", err)
}

r := Resource{
Instances: instances,
CPUTotal: cpuTotal,
MemoryTotal: memoryTotal,
CPUUsed: cpuUsed,
MemoryUsed: memoryUsed,
}

return &r, instances, hostname, nil
return &r, hostname, nil
}

var (
Expand Down
Loading
Loading