From b2cd80921b859af04ac7d88e957050039570a63d Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Fri, 4 Aug 2023 12:03:58 -0400 Subject: [PATCH] Reuse VMs when possible Whenever ext code and imports are unchanged, VMs have a cache that can be useful for future evaluations. This PR adds a dumb pool that keeps the latest VM that was used for that same eval path. I also added a shuffling of envs, so that the likelihood of reusing a VM is higher. When envs are done evaluating, their VM is put back in the pool (with an enriched cache from their eval). An important thing to note is that VMs cannot be used concurrently, so we need to remove them from the pool when they are used. --- pkg/jsonnet/eval.go | 71 +++++++++++++++++++++++++++++-------- pkg/jsonnet/imports_test.go | 2 +- pkg/jsonnet/lint.go | 6 ++-- pkg/process/data_test.go | 2 +- pkg/tanka/parallel.go | 8 +++-- 5 files changed, 67 insertions(+), 22 deletions(-) diff --git a/pkg/jsonnet/eval.go b/pkg/jsonnet/eval.go index cbf647801..671c6301d 100644 --- a/pkg/jsonnet/eval.go +++ b/pkg/jsonnet/eval.go @@ -1,8 +1,12 @@ package jsonnet import ( + "crypto/md5" + "encoding/hex" + "fmt" "os" "regexp" + "sync" "time" jsonnet "github.com/google/go-jsonnet" @@ -75,32 +79,68 @@ func (o Opts) Clone() Opts { } } -// MakeVM returns a Jsonnet VM with some extensions of Tanka, including: +type vmPool struct { + mutex sync.Mutex + available map[string][]*jsonnet.VM +} + +var VMPool = vmPool{ + available: map[string][]*jsonnet.VM{}, +} + +func optsHash(opts Opts) string { + hash := md5.New() + hash.Write([]byte(fmt.Sprintf("%s", opts.ExtCode))) + hash.Write([]byte(fmt.Sprintf("%s", opts.ImportPaths))) + return hex.EncodeToString(hash.Sum(nil)) +} + +// Get returns a Jsonnet VM with some extensions of Tanka, including: // - extended importer // - extCode and tlaCode applied // - native functions registered -func MakeVM(opts Opts) *jsonnet.VM { - vm := jsonnet.MakeVM() - vm.Importer(NewExtendedImporter(opts.ImportPaths)) +// If a VM is available in the pool, it will be reused. Otherwise a new one will be created. +func (p *vmPool) Get(opts Opts) *jsonnet.VM { + p.mutex.Lock() + defer p.mutex.Unlock() + + lastImport := opts.ImportPaths[len(opts.ImportPaths)-1] + var vm *jsonnet.VM + hash := optsHash(opts) + if cached := p.available[hash]; len(cached) > 0 { + log.Trace().Str("path", lastImport).Msg("reusing Jsonnet VM") + vm = cached[0] + p.available[hash] = cached[1:] + } else { + log.Trace().Str("path", lastImport).Msg("creating new Jsonnet VM") + vm = jsonnet.MakeVM() + if opts.MaxStack > 0 { + vm.MaxStack = opts.MaxStack + } + for _, nf := range native.Funcs() { + vm.NativeFunction(nf) + } + vm.Importer(NewExtendedImporter(opts.ImportPaths)) - for k, v := range opts.ExtCode { - vm.ExtCode(k, v) + for k, v := range opts.ExtCode { + vm.ExtCode(k, v) + } } + for k, v := range opts.TLACode { vm.TLACode(k, v) } - for _, nf := range native.Funcs() { - vm.NativeFunction(nf) - } - - if opts.MaxStack > 0 { - vm.MaxStack = opts.MaxStack - } - return vm } +// Release returns a Jsonnet VM to the pool +func (p *vmPool) Release(vm *jsonnet.VM, opts Opts) { + p.mutex.Lock() + defer p.mutex.Unlock() + p.available[optsHash(opts)] = append(p.available[optsHash(opts)], vm) +} + // EvaluateFile evaluates the Jsonnet code in the given file and returns the // result in JSON form. It disregards opts.ImportPaths in favor of automatically // resolving these according to the specified file. @@ -139,7 +179,8 @@ func evaluateSnippet(evalFunc evalFunc, path, data string, opts Opts) (string, e return "", errors.Wrap(err, "resolving import paths") } opts.ImportPaths = jpath - vm := MakeVM(opts) + vm := VMPool.Get(opts) + defer VMPool.Release(vm, opts) var hash string if cache != nil { diff --git a/pkg/jsonnet/imports_test.go b/pkg/jsonnet/imports_test.go index 7c7787d32..0792a88ef 100644 --- a/pkg/jsonnet/imports_test.go +++ b/pkg/jsonnet/imports_test.go @@ -53,7 +53,7 @@ func BenchmarkGetSnippetHash(b *testing.B) { // Create a VM. It's important to reuse the same VM // While there is a caching mechanism that normally shouldn't be shared in a benchmark iteration, // it's useful to evaluate its impact here, because the caching will also improve the evaluation performance afterwards. - vm := MakeVM(Opts{ImportPaths: []string{tempDir}}) + vm := VMPool.Get(Opts{ImportPaths: []string{tempDir}}) content, err := os.ReadFile(filepath.Join(tempDir, "main.jsonnet")) require.NoError(b, err) diff --git a/pkg/jsonnet/lint.go b/pkg/jsonnet/lint.go index 31a45da58..d6b81364c 100644 --- a/pkg/jsonnet/lint.go +++ b/pkg/jsonnet/lint.go @@ -56,18 +56,18 @@ func Lint(fds []string, opts *LintOpts) error { log.Debug().Str("file", file).Msg("linting file") startTime := time.Now() - vm := MakeVM(Opts{}) jpaths, _, _, err := jpath.Resolve(file, true) if err != nil { fmt.Fprintf(buf, "got an error getting JPATH for %s: %v\n\n", file, err) resultCh <- result{failed: true, output: buf.String()} continue } - - vm.Importer(NewExtendedImporter(jpaths)) + opts := Opts{ImportPaths: jpaths} + vm := VMPool.Get(opts) content, _ := os.ReadFile(file) failed := linter.LintSnippet(vm, buf, []linter.Snippet{{FileName: file, Code: string(content)}}) + VMPool.Release(vm, opts) resultCh <- result{failed: failed, output: buf.String()} log.Debug().Str("file", file).Dur("duration_ms", time.Since(startTime)).Msg("linted file") } diff --git a/pkg/process/data_test.go b/pkg/process/data_test.go index ea226570b..e186bc699 100644 --- a/pkg/process/data_test.go +++ b/pkg/process/data_test.go @@ -18,7 +18,7 @@ type testData struct { func loadFixture(name string) testData { filename := filepath.Join("./testdata", name) - vm := jsonnet.MakeVM(jsonnet.Opts{ + vm := jsonnet.VMPool.Get(jsonnet.Opts{ ImportPaths: []string{"./testdata"}, }) diff --git a/pkg/tanka/parallel.go b/pkg/tanka/parallel.go index 9c8ef23ae..bdbc63683 100644 --- a/pkg/tanka/parallel.go +++ b/pkg/tanka/parallel.go @@ -2,6 +2,7 @@ package tanka import ( "fmt" + "math/rand" "path/filepath" "time" @@ -34,8 +35,11 @@ func parallelLoadEnvironments(envs []*v1alpha1.Environment, opts parallelOpts) ( go parallelWorker(jobsCh, outCh) } - for _, env := range envs { + // shuffle the environments to increase the likelihood of reusing VMs + randInts := rand.Perm(len(envs)) + for i := range envs { o := opts.Opts + env := envs[randInts[i]] // TODO: This is required because the map[string]string in here is not // concurrency-safe. Instead of putting this burden on the caller, find @@ -89,7 +93,7 @@ type parallelOut struct { func parallelWorker(jobsCh <-chan parallelJob, outCh chan parallelOut) { for job := range jobsCh { - log.Debug().Str("name", job.opts.Name).Str("path", job.path).Msg("Loading environment") + log.Trace().Str("name", job.opts.Name).Str("path", job.path).Msg("Loading environment") startTime := time.Now() env, err := LoadEnvironment(job.path, job.opts)