Skip to content

Commit

Permalink
Reuse VMs when possible
Browse files Browse the repository at this point in the history
Whenever ext code and imports are unchanged, VMs have a cache that can be useful for future evaluations.
This PR adds a dumb pool that keeps the latest VM that was used for that same eval path.

I also added a shuffling of envs, so that the likelihood of reusing a VM is higher. When envs are done evaluating, their VM is put back in the pool (with an enriched cache from their eval).
An important thing to note is that VMs cannot be used concurrently, so we need to remove them from the pool when they are used.
  • Loading branch information
julienduchesne committed Aug 4, 2023
1 parent 8e496b7 commit b2cd809
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 22 deletions.
71 changes: 56 additions & 15 deletions pkg/jsonnet/eval.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package jsonnet

import (
"crypto/md5"
"encoding/hex"
"fmt"
"os"
"regexp"
"sync"
"time"

jsonnet "github.com/google/go-jsonnet"
Expand Down Expand Up @@ -75,32 +79,68 @@ func (o Opts) Clone() Opts {
}
}

// MakeVM returns a Jsonnet VM with some extensions of Tanka, including:
type vmPool struct {
mutex sync.Mutex
available map[string][]*jsonnet.VM
}

var VMPool = vmPool{
available: map[string][]*jsonnet.VM{},
}

func optsHash(opts Opts) string {
hash := md5.New()
hash.Write([]byte(fmt.Sprintf("%s", opts.ExtCode)))
hash.Write([]byte(fmt.Sprintf("%s", opts.ImportPaths)))
return hex.EncodeToString(hash.Sum(nil))
}

// Get returns a Jsonnet VM with some extensions of Tanka, including:
// - extended importer
// - extCode and tlaCode applied
// - native functions registered
func MakeVM(opts Opts) *jsonnet.VM {
vm := jsonnet.MakeVM()
vm.Importer(NewExtendedImporter(opts.ImportPaths))
// If a VM is available in the pool, it will be reused. Otherwise a new one will be created.
func (p *vmPool) Get(opts Opts) *jsonnet.VM {
p.mutex.Lock()
defer p.mutex.Unlock()

lastImport := opts.ImportPaths[len(opts.ImportPaths)-1]
var vm *jsonnet.VM
hash := optsHash(opts)
if cached := p.available[hash]; len(cached) > 0 {
log.Trace().Str("path", lastImport).Msg("reusing Jsonnet VM")
vm = cached[0]
p.available[hash] = cached[1:]
} else {
log.Trace().Str("path", lastImport).Msg("creating new Jsonnet VM")
vm = jsonnet.MakeVM()
if opts.MaxStack > 0 {
vm.MaxStack = opts.MaxStack
}
for _, nf := range native.Funcs() {
vm.NativeFunction(nf)
}
vm.Importer(NewExtendedImporter(opts.ImportPaths))

for k, v := range opts.ExtCode {
vm.ExtCode(k, v)
for k, v := range opts.ExtCode {
vm.ExtCode(k, v)
}
}

for k, v := range opts.TLACode {
vm.TLACode(k, v)
}

for _, nf := range native.Funcs() {
vm.NativeFunction(nf)
}

if opts.MaxStack > 0 {
vm.MaxStack = opts.MaxStack
}

return vm
}

// Release returns a Jsonnet VM to the pool
func (p *vmPool) Release(vm *jsonnet.VM, opts Opts) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.available[optsHash(opts)] = append(p.available[optsHash(opts)], vm)
}

// EvaluateFile evaluates the Jsonnet code in the given file and returns the
// result in JSON form. It disregards opts.ImportPaths in favor of automatically
// resolving these according to the specified file.
Expand Down Expand Up @@ -139,7 +179,8 @@ func evaluateSnippet(evalFunc evalFunc, path, data string, opts Opts) (string, e
return "", errors.Wrap(err, "resolving import paths")
}
opts.ImportPaths = jpath
vm := MakeVM(opts)
vm := VMPool.Get(opts)
defer VMPool.Release(vm, opts)

var hash string
if cache != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jsonnet/imports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func BenchmarkGetSnippetHash(b *testing.B) {
// Create a VM. It's important to reuse the same VM
// While there is a caching mechanism that normally shouldn't be shared in a benchmark iteration,
// it's useful to evaluate its impact here, because the caching will also improve the evaluation performance afterwards.
vm := MakeVM(Opts{ImportPaths: []string{tempDir}})
vm := VMPool.Get(Opts{ImportPaths: []string{tempDir}})
content, err := os.ReadFile(filepath.Join(tempDir, "main.jsonnet"))
require.NoError(b, err)

Expand Down
6 changes: 3 additions & 3 deletions pkg/jsonnet/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func Lint(fds []string, opts *LintOpts) error {
log.Debug().Str("file", file).Msg("linting file")
startTime := time.Now()

vm := MakeVM(Opts{})
jpaths, _, _, err := jpath.Resolve(file, true)
if err != nil {
fmt.Fprintf(buf, "got an error getting JPATH for %s: %v\n\n", file, err)
resultCh <- result{failed: true, output: buf.String()}
continue
}

vm.Importer(NewExtendedImporter(jpaths))
opts := Opts{ImportPaths: jpaths}
vm := VMPool.Get(opts)

content, _ := os.ReadFile(file)
failed := linter.LintSnippet(vm, buf, []linter.Snippet{{FileName: file, Code: string(content)}})
VMPool.Release(vm, opts)
resultCh <- result{failed: failed, output: buf.String()}
log.Debug().Str("file", file).Dur("duration_ms", time.Since(startTime)).Msg("linted file")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type testData struct {
func loadFixture(name string) testData {
filename := filepath.Join("./testdata", name)

vm := jsonnet.MakeVM(jsonnet.Opts{
vm := jsonnet.VMPool.Get(jsonnet.Opts{
ImportPaths: []string{"./testdata"},
})

Expand Down
8 changes: 6 additions & 2 deletions pkg/tanka/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tanka

import (
"fmt"
"math/rand"
"path/filepath"
"time"

Expand Down Expand Up @@ -34,8 +35,11 @@ func parallelLoadEnvironments(envs []*v1alpha1.Environment, opts parallelOpts) (
go parallelWorker(jobsCh, outCh)
}

for _, env := range envs {
// shuffle the environments to increase the likelihood of reusing VMs
randInts := rand.Perm(len(envs))
for i := range envs {
o := opts.Opts
env := envs[randInts[i]]

// TODO: This is required because the map[string]string in here is not
// concurrency-safe. Instead of putting this burden on the caller, find
Expand Down Expand Up @@ -89,7 +93,7 @@ type parallelOut struct {

func parallelWorker(jobsCh <-chan parallelJob, outCh chan parallelOut) {
for job := range jobsCh {
log.Debug().Str("name", job.opts.Name).Str("path", job.path).Msg("Loading environment")
log.Trace().Str("name", job.opts.Name).Str("path", job.path).Msg("Loading environment")
startTime := time.Now()

env, err := LoadEnvironment(job.path, job.opts)
Expand Down

0 comments on commit b2cd809

Please sign in to comment.