Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: Reload tenant limit configuration on file change #5673

Merged
merged 37 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb52a9c
Create a PathOrContent reloader
douglascamata Sep 5, 2022
72fbfdb
Add docs to staticPathContent.Rewrite
douglascamata Sep 5, 2022
f1828c8
Run goimports
douglascamata Sep 5, 2022
caaf733
Properly cancel the context in the test
douglascamata Sep 7, 2022
103aab0
Watch parent directory of file
douglascamata Sep 7, 2022
a4981f6
Remove useless ctx.Done()
douglascamata Sep 7, 2022
b95bd4f
Add a debounce timer to config reload
douglascamata Sep 7, 2022
168818e
Fix event.Op bitmask check
douglascamata Sep 7, 2022
b2d9022
Update lastReload
douglascamata Sep 7, 2022
54d6314
Fix debouncer for path content reloader
douglascamata Sep 7, 2022
c21e084
Improve documentation of the PathContentRealoder
douglascamata Sep 7, 2022
ce4663f
Dain reload timer before resetting
douglascamata Sep 7, 2022
e072f02
Run tests in parallel
douglascamata Sep 7, 2022
066ae0a
Simplify debouncing logic
douglascamata Sep 7, 2022
bc78cc9
Add more tests to file reloader
douglascamata Sep 7, 2022
2711539
Simplify condition for triggering reload
douglascamata Sep 7, 2022
bf4c80b
Use absolute path to config file
douglascamata Sep 7, 2022
14b87ef
Get rid of parallel test
douglascamata Sep 7, 2022
a2600a3
Put back 2s wait between fs operations
douglascamata Sep 7, 2022
9813d35
Remove useless sleep
douglascamata Sep 7, 2022
8d86e7d
Stop reloadTimer when context cancelled
douglascamata Sep 7, 2022
030848c
Remove unused fucntion
douglascamata Sep 8, 2022
871b396
Add missing copyright to test file
douglascamata Sep 8, 2022
e0bf6a2
Merge branch 'main' of github.com:thanos-io/thanos into add-path-cont…
douglascamata Sep 8, 2022
80a53b1
Auto-reload tenant limit config on file changes
douglascamata Sep 8, 2022
5522234
Merge branch 'main' of github.com:thanos-io/thanos into add-path-cont…
douglascamata Sep 27, 2022
caf3bf3
Wrap error when reloading config
douglascamata Oct 7, 2022
62af70f
Move limiter config reloader and update logs
douglascamata Oct 7, 2022
fba7571
Get rid of useless types and allocations
douglascamata Oct 7, 2022
b3aaa1a
Merge branch 'main' of github.com:thanos-io/thanos into add-path-cont…
douglascamata Oct 7, 2022
e983a70
Remove errorChan from config reload starter
douglascamata Oct 10, 2022
b7632d7
Retrigger CI
douglascamata Oct 11, 2022
67ea81b
Merge branch 'main' of github.com:thanos-io/thanos into add-path-cont…
douglascamata Oct 11, 2022
fd98829
Use UnRegisterer in the Limiter
douglascamata Oct 11, 2022
ca56664
Better guard against nil registerer in the limiter
douglascamata Oct 11, 2022
315c0ea
Remove wrong nil guard
douglascamata Oct 11, 2022
cb21419
Retrigger CI
douglascamata Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5674](https://github.com/thanos-io/thanos/pull/5674) Query Frontend/Store: Add support connecting to redis using TLS.
- [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI.
- [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter.
- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change.

### Changed

Expand Down
46 changes: 33 additions & 13 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,6 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}
limiter := receive.NewLimiter(limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand All @@ -218,6 +205,23 @@ func runReceive(
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
if err != nil {
return errors.Wrap(err, "creating limiter")
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Expand Down Expand Up @@ -400,6 +404,22 @@ func runReceive(
})
}

{
if limiter.CanReload() {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Debug(logger).Log("msg", "limits config initialized with file watcher.")
if err := limiter.StartConfigReloader(ctx); err != nil {
return err
}
<-ctx.Done()
return nil
}, func(err error) {
cancel()
})
}
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Thanos Receive has some limits and gates that can be configured to control resou

To configure the gates and limits you can use one of the two options:

- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file.
- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file. Any modification to the indicated file will trigger a configuration reload. If the updated configuration is invalid an error will be logged and it won't replace the previous valid configuration.
- `--receive.limits-config=<content>`: where `<content>` is the content of YAML file.

By default all the limits and gates are **disabled**.
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.2.0
github.com/felixge/fgprof v0.9.2
Expand Down Expand Up @@ -108,6 +108,7 @@ require (

require (
github.com/efficientgo/core v1.0.0-rc.0
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd
github.com/minio/sha256-simd v1.0.0
)

Expand All @@ -127,10 +128,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0
)

require (
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd
go.opentelemetry.io/contrib/propagators/autoprop v0.34.0
)
require go.opentelemetry.io/contrib/propagators/autoprop v0.34.0

require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a h1:cnJajqeh/Hjv
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a/go.mod h1:Hi+sz0REtlhVZ8zcdeTC3j6LUEEpJpPtNjOaOKuNcgI=
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd h1:svR6KxSP1xiPw10RN4Pd7g6BAVkEcNN628PAqZH31mM=
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd h1:VaYzzXeUbC5fVheskcKVNOyJMEYD+HgrJNzIAg/mRIM=
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down
128 changes: 128 additions & 0 deletions pkg/extkingpin/path_content_reloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extkingpin

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
)

type fileContent interface {
Content() ([]byte, error)
Path() string
}

// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs
// reloadFunc whenever a change is detected.
// A debounce timer can be configured via opts to handle situations where many "write" events are received together or
// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest
// after 2 times the debounce timer. By default the debouncer timer is 1 second.
// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error {
filePath, err := filepath.Abs(fileContent.Path())
if err != nil {
return errors.Wrap(err, "getting absolute file path")
}

watcher, err := fsnotify.NewWatcher()
if filePath == "" {
level.Debug(logger).Log("msg", "no path detected for config reload")
}
if err != nil {
return errors.Wrap(err, "creating file watcher")
}
go func() {
var reloadTimer *time.Timer
if debounceTime != 0 {
reloadTimer = time.AfterFunc(debounceTime, func() {
reloadFunc()
level.Debug(logger).Log("msg", "configuration reloaded after debouncing")
})
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
if reloadTimer != nil {
reloadTimer.Stop()
}
return
case event := <-watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if event.Name == "" {
break
}
// We are watching the file's parent folder (more details on this is done can be found below), but are
// only interested in changed to the target file. Discard every other file as quickly as possible.
if event.Name != filePath {
break
}
// We only react to files being written or created.
// On chmod or remove we have nothing to do.
// On rename we have the old file name (not useful). A create event for the new file will come later.
if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 {
break
}
level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op)
if reloadTimer != nil {
reloadTimer.Reset(debounceTime)
}
case err := <-watcher.Errors:
level.Error(logger).Log("msg", "watcher error", "error", err)
}
}
}()
// We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
if err := watcher.Add(path.Dir(filePath)); err != nil {
return errors.Wrapf(err, "adding path %s to file watcher", filePath)
}
return nil
}

type staticPathContent struct {
content []byte
path string
}

var _ fileContent = (*staticPathContent)(nil)

// Content returns the cached content.
func (t *staticPathContent) Content() ([]byte, error) {
return t.content, nil
}

// Path returns the path to the file that contains the content.
func (t *staticPathContent) Path() string {
return t.path
}

// NewStaticPathContent creates a new content that can be used to serve a static configuration. It copies the
// configuration from `fromPath` into `destPath` to avoid confusion with file watchers.
func NewStaticPathContent(fromPath string) (*staticPathContent, error) {
content, err := os.ReadFile(fromPath)
if err != nil {
return nil, errors.Wrapf(err, "could not load test content: %s", fromPath)
}
return &staticPathContent{content, fromPath}, nil
}

// Rewrite rewrites the file backing this staticPathContent and swaps the local content cache. The file writing
// is needed to trigger the file system monitor.
func (t *staticPathContent) Rewrite(newContent []byte) error {
t.content = newContent
// Write the file to ensure possible file watcher reloaders get triggered.
return os.WriteFile(t.path, newContent, 0666)
}
105 changes: 105 additions & 0 deletions pkg/extkingpin/path_content_reloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extkingpin

import (
"context"
"os"
"path"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestPathContentReloader(t *testing.T) {
type args struct {
runSteps func(t *testing.T, testFile string, pathContent *staticPathContent)
}
tests := []struct {
name string
args args
wantReloads int
}{
{
name: "Many operations, only rewrite triggers one reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
},
},
wantReloads: 1,
},
{
name: "Many operations, only rename triggers one reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Rename(testFile, testFile+".tmp"))
testutil.Ok(t, os.Rename(testFile+".tmp", testFile))
},
},
wantReloads: 1,
},
{
name: "Many operations, two rewrites trigger two reloads",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
time.Sleep(2 * time.Second)
testutil.Ok(t, pathContent.Rewrite([]byte("test modified again")))
},
},
wantReloads: 1,
},
{
name: "Chmod doesn't trigger reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
},
},
wantReloads: 0,
},
{
name: "Remove doesn't trigger reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Remove(testFile))
},
},
wantReloads: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testFile := path.Join(t.TempDir(), "test")
testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666))
pathContent, err := NewStaticPathContent(testFile)
testutil.Ok(t, err)

wg := &sync.WaitGroup{}
wg.Add(tt.wantReloads)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reloadCount := 0
err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() {
reloadCount++
wg.Done()
}, 100*time.Millisecond)
testutil.Ok(t, err)

tt.args.runSteps(t, testFile, pathContent)
wg.Wait()
testutil.Equals(t, tt.wantReloads, reloadCount)
})
}
}
Loading