Skip to content

Commit

Permalink
Implement pool mode
Browse files Browse the repository at this point in the history
  • Loading branch information
takonomura authored and gamoutatsumi committed Apr 16, 2024
1 parent 6df6153 commit aae239b
Show file tree
Hide file tree
Showing 16 changed files with 1,197 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ builds:
binary: shoes-lxd-multi-server
env:
- CGO_ENABLED=0
- id: "shoes-lxd-multi-pool-agent"
dir: pool-agent
binary: shoes-lxd-multi-pool-agent
env:
- CGO_ENABLED=0
archives:
- format: binary
name_template: "{{ .Binary }}-{{ .Os }}-{{ .Arch }}"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@

- shoes-lxd-multi: shoes-provider
- server: Server-side
- pool-agent: stadium agent for pool mode
46 changes: 46 additions & 0 deletions pool-agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Pool Agent

Stadium Agent for pool mode

## Setup

- `LXD_MULTI_RESOURCE_TYPES`

```json
[
{
"name": "nano",
"cpu": 1,
"memory": "1GB",
"count": 3
},
{
"name": "micro",
"cpu": 2,
"memory": "2GB",
"count": 1
},
...
]
```

- `LXD_MULTI_IMAGE_ALIAS`
- Image to pool

### Optional values

- `LXD_SOCKET`
- Path to LXD socket
- default: `/var/lib/lxd/unix.socket`
- `LXD_MULTI_CHECK_INTERVAL`
- Interval to check instances
- default: `2s`
- `LXD_MULTI_CONCURRENT_CREATE_LIMIT`
- Limit concurrency for creating instance
- default: `3`
- `LXD_MULTI_WAIT_IDLE_TIME`
- Duration to wait instance idle after `systemctl is-system-running --wait`
- default: `5s`
- `LXD_MULTI_ZOMBIE_ALLOW_TIME`
- Duration to delete zombie instances after instance created
- default: `5m`
201 changes: 201 additions & 0 deletions pool-agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"context"
"crypto/rand"
"fmt"
"log"
"sync"
"time"

lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/shared/api"
"golang.org/x/sync/semaphore"
)

type Agent struct {
ImageAlias string
InstanceSource api.InstanceSource

ResourceTypes []ResourceType
Client lxd.InstanceServer

CheckInterval time.Duration
ConcurrentCreateLimit int64
WaitIdleTime time.Duration
ZombieAllowTime time.Duration

wg *sync.WaitGroup
createLimit *semaphore.Weighted
creatingInstances map[string]map[string]struct{}
deletingInstances map[string]struct{}
}

func (a *Agent) Run(ctx context.Context) error {
ticker := time.NewTicker(a.CheckInterval)

a.wg = new(sync.WaitGroup)
a.createLimit = semaphore.NewWeighted(a.ConcurrentCreateLimit)
a.deletingInstances = make(map[string]struct{})

a.creatingInstances = make(map[string]map[string]struct{}, len(a.ResourceTypes))
for _, rt := range a.ResourceTypes {
a.creatingInstances[rt.Name] = make(map[string]struct{})
}

log.Printf("Started agent")

for {
select {
case <-ticker.C:
if err := a.checkInstances(ctx); err != nil {
log.Printf("failed to check instances: %+v", err)
}
case <-ctx.Done():
log.Printf("Stopping agent...")
a.wg.Wait()
return ctx.Err()
}
}
}

func (a *Agent) countPooledInstances(instances []api.Instance, resourceTypeName string) int {
var count int
for _, i := range instances {
if i.StatusCode != api.Frozen {
continue
}
if i.Config[configKeyImageAlias] != a.ImageAlias {
continue
}
if i.Config[configKeyResourceType] != resourceTypeName {
continue
}
if _, ok := i.Config[configKeyRunnerName]; ok {
continue
}
count++
}
return count
}

func (a *Agent) generateInstanceName() (string, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return "", fmt.Errorf("generate random id: %w", err)
}
return fmt.Sprintf("myshoes-runner-%x", b), nil
}

func (a *Agent) checkInstances(ctx context.Context) error {
s, err := a.Client.GetInstances(api.InstanceTypeAny)
if err != nil {
return fmt.Errorf("get instances: %w", err)
}

for _, rt := range a.ResourceTypes {
current := a.countPooledInstances(s, rt.Name)
creating := len(a.creatingInstances[rt.Name])
createCount := rt.PoolCount - current - creating
if createCount < 1 {
continue
}
log.Printf("Create %d instances for %q", createCount, rt.Name)
for i := 0; i < createCount; i++ {
name, err := a.generateInstanceName()
if err != nil {
return fmt.Errorf("generate instance name: %w", err)
}
a.creatingInstances[rt.Name][name] = struct{}{}
go func(name string, rt ResourceType) {
a.createLimit.Acquire(context.Background(), 1)
defer a.createLimit.Release(1)

defer delete(a.creatingInstances[rt.Name], name)

a.wg.Add(1)
defer a.wg.Done()
select {
case <-ctx.Done():
// context cancelled, stop creating immediately
return
default:
// context is not cancelled, continue
}

if err := a.createInstance(name, rt); err != nil {
log.Printf("failed to create instance %q: %+v", name, err)
}
}(name, rt)
}
}

for _, i := range s {
if a.isZombieInstance(i) {
if _, ok := a.deletingInstances[i.Name]; ok {
continue
}
log.Printf("Deleting zombie instance %q...", i.Name)
a.deletingInstances[i.Name] = struct{}{}
a.wg.Add(1)
go func(i api.Instance) {
defer a.wg.Done()
defer delete(a.deletingInstances, i.Name)

if err := a.deleteZombieInstance(i); err != nil {
log.Printf("failed to delete zombie instance %q: %+v", i.Name, err)
}
log.Printf("Deleted zombie instance %q", i.Name)
}(i)
}
}

return nil
}

func (a *Agent) isZombieInstance(i api.Instance) bool {
if i.StatusCode == api.Frozen {
return false
}
if _, ok := i.Config[configKeyRunnerName]; ok {
return false
}
if i.Config[configKeyImageAlias] != a.ImageAlias {
return false
}
if i.CreatedAt.Add(a.ZombieAllowTime).After(time.Now()) {
return false
}
if rt, ok := i.Config[configKeyResourceType]; !ok {
return false
} else if _, ok := a.creatingInstances[rt][i.Name]; ok {
return false
}
return true
}

func (a *Agent) deleteZombieInstance(i api.Instance) error {
if i.StatusCode == api.Running {
op, err := a.Client.UpdateInstanceState(i.Name, api.InstanceStatePut{
Action: "stop",
Timeout: -1,
}, "")
if err != nil {
return fmt.Errorf("stop: %w", err)
}
if err := op.Wait(); err != nil {
return fmt.Errorf("stop operation: %w", err)
}
}

op, err := a.Client.DeleteInstance(i.Name)
if err != nil {
return fmt.Errorf("delete: %w", err)
}
if err := op.Wait(); err != nil {
return fmt.Errorf("delete operation: %w", err)
}

return nil
}
83 changes: 83 additions & 0 deletions pool-agent/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"encoding/json"
"fmt"
"os"
"strconv"
"time"

"github.com/lxc/lxd/shared/api"
slm "github.com/whywaita/shoes-lxd-multi/server/pkg/api"
)

type ResourceType struct {
Name string `json:"name"`

CPUCore int `json:"cpu"`
Memory string `json:"memory"`

PoolCount int `json:"count"`
}

func LoadResourceTypes() ([]ResourceType, error) {
env := os.Getenv("LXD_MULTI_RESOURCE_TYPES")
if env == "" {
return nil, fmt.Errorf("LXD_MULTI_RESOURCE_TYPES is not set")
}
var s []ResourceType
if err := json.Unmarshal([]byte(env), &s); err != nil {
return nil, fmt.Errorf("parse LXD_MULTI_RESOURCE_TYPES: %w", err)
}
return s, nil
}

func LoadImageAlias() (string, api.InstanceSource, error) {
env := os.Getenv("LXD_MULTI_IMAGE_ALIAS")
if env == "" {
return "", api.InstanceSource{}, fmt.Errorf("LXD_MULTI_IMAGE_ALIAS is not set")
}
source, err := slm.ParseAlias(env)
if err != nil {
return "", api.InstanceSource{}, fmt.Errorf("parse LXD_MULTI_IMAGE_ALIAS: %w", err)
}
return env, *source, nil
}

func LoadParams() (checkInterval time.Duration, concurrentCreateLimit int64, waitIdleTime time.Duration, zombieAllowTime time.Duration, err error) {
checkInterval, err = loadDurationEnv("LXD_MULTI_CHECK_INTERVAL", 2*time.Second)
if err != nil {
return
}
waitIdleTime, err = loadDurationEnv("LXD_MULTI_WAIT_IDLE_TIME", 5*time.Second)
if err != nil {
return
}
zombieAllowTime, err = loadDurationEnv("LXD_MULTI_ZOMBIE_ALLOW_TIME", 5*time.Minute)
if err != nil {
return
}

if env := os.Getenv("LXD_MULTI_CONCURRENT_CREATE_LIMIT"); env != "" {
concurrentCreateLimit, err = strconv.ParseInt(env, 10, 64)
if err != nil {
return
}
} else {
concurrentCreateLimit = 3
}

return
}

func loadDurationEnv(name string, def time.Duration) (time.Duration, error) {
env := os.Getenv(name)
if env == "" {
return def, nil
}
d, err := time.ParseDuration(env)
if err != nil {
return 0, fmt.Errorf("parse %s: %w", name, err)
}
return d, nil
}
7 changes: 7 additions & 0 deletions pool-agent/config_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

const (
configKeyResourceType = "user.myshoes_resource_type"
configKeyImageAlias = "user.myshoes_image_alias"
configKeyRunnerName = "user.myshoes_runner_name"
)
Loading

0 comments on commit aae239b

Please sign in to comment.