Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtimevar: add URLOpeners for etcdvar, filevar, paramstore, and runtimeconfigurator #1463

Merged
merged 9 commits into from
Mar 7, 2019
89 changes: 89 additions & 0 deletions runtimevar/etcdvar/etcdvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
// Package etcdvar provides a runtimevar implementation with variables
// backed by etcd. Use New to construct a *runtimevar.Variable.
//
// URLs
//
// For runtimevar.OpenVariable URLs, etcdvar registers for the scheme "etcd".
// The host+path is used as the blob key. etcdvar supports the following URL
// parameters:
// - client: The URL to be passed to etcd's
// go.etcd.io/etcd/clientv3.NewFromURL (required).
// NewFromURL will be called once per unique client URL.
// - decoder: The decoder to use. Defaults to runtimevar.BytesDecoder.
// See runtimevar.DecoderByName for supported values.
// Example URL: "blob://myvar?client=http://my.etcd.server:8080&decoder=string".
//
// As
//
// etcdvar exposes the following types for As:
Expand All @@ -26,6 +38,9 @@ import (
"context"
"errors"
"fmt"
"net/url"
"path"
"sync"
"time"

"go.etcd.io/etcd/clientv3"
Expand All @@ -36,6 +51,80 @@ import (
"google.golang.org/grpc/codes"
)

func init() {
runtimevar.DefaultURLMux().RegisterVariable(Scheme, &lazyClientOpener{})
}

// Scheme is the URL scheme etcdvar registers its URLOpener under on runtimevar.DefaultMux.
const Scheme = "etcd"

type lazyClientOpener struct {
mu sync.Mutex
clients map[string]*clientv3.Client
}

func (o *lazyClientOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) {
clientURL := u.Query().Get("client")
if clientURL == "" {
return nil, fmt.Errorf("open variable %q: URL parameter \"client\" is required", u)
}
o.mu.Lock()
defer o.mu.Unlock()
if o.clients == nil {
o.clients = map[string]*clientv3.Client{}
}
cli := o.clients[clientURL]
if cli == nil {
var err error
cli, err = clientv3.NewFromURL(clientURL)
if err != nil {
return nil, fmt.Errorf("open variable %q: NewFromURL failed: %v", u, err)
}
o.clients[clientURL] = cli
}
opener := URLOpener{Client: cli}
return opener.OpenVariableURL(ctx, u)
}

// URLOpener opens Variable URLs like "etcd://mykey?decoder=string".
// It supports the URL parameters:
// - decoder: The decoder to use. Defaults to runtimevar.BytesDecoder.
// See runtimevar.DecoderByName for supported values.
type URLOpener struct {
// The Client to use; required.
Client *clientv3.Client

// Decoder and Options can be specified at URLOpener construction time,
// or provided/overridden via URL parameters.
Decoder *runtimevar.Decoder
Options Options
}

// OpenVariableURL opens the variable at the URL's path. See the package doc
// for more details.
func (o *URLOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) {
if o.Client == nil {
return nil, fmt.Errorf("open variable %q: a client is required", u)
}
q := u.Query()
if decoderName := q.Get("decoder"); decoderName != "" || o.Decoder == nil {
vangent marked this conversation as resolved.
Show resolved Hide resolved
var err error
o.Decoder, err = runtimevar.DecoderByName(decoderName)
if err != nil {
return nil, fmt.Errorf("open variable %q: invalid \"decoder\": %v", u, err)
}
}
for param := range q {
switch param {
case "decoder", "client":
// processed elsewhere
default:
return nil, fmt.Errorf("open variable %q: invalid query parameter %q", u, param)
}
}
return New(o.Client, path.Join(u.Host, u.Path), o.Decoder, &o.Options)
}

// Options sets options.
type Options struct {
// Timeout controls the timeout on RPCs to etcd; timeouts will result in
Expand Down
56 changes: 56 additions & 0 deletions runtimevar/etcdvar/etcdvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
Expand Down Expand Up @@ -156,3 +157,58 @@ func TestNoConnectionError(t *testing.T) {
t.Error("got nil want error")
}
}

func TestOpenVariable(t *testing.T) {
h, err := newHarness(t)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
if err := h.CreateVariable(ctx, "string-var", []byte("hello world")); err != nil {
t.Fatal(err)
}
if err := h.CreateVariable(ctx, "json-var", []byte(`{"Foo": "Bar"}`)); err != nil {
t.Fatal(err)
}

tests := []struct {
URL string
WantErr bool
WantWatchErr bool
Want interface{}
}{
// Variable construction succeeds, but nonexistentvar does not exist
// so we get an error from Watch.
{"etcd://nonexistentvar?client=http://localhost:2379", false, true, nil},
// Variable construction fails due to missing client arg.
{"etcd://string-var", true, false, nil},
// Variable construction fails due to invalid decoder arg.
{"etcd://string-var?client=http://localhost:2379&decoder=notadecoder", true, false, nil},
// Variable construction fails due to invalid arg.
{"etcd://string-var?client=http://localhost:2379&param=value", true, false, nil},
// Working example with string decoder.
{"etcd://string-var?client=http://localhost:2379&decoder=string", false, false, "hello world"},
// Working example with JSON decoder.
{"etcd://json-var?client=http://localhost:2379&decoder=jsonmap", false, false, &map[string]interface{}{"Foo": "Bar"}},
}

for _, test := range tests {
v, err := runtimevar.OpenVariable(ctx, test.URL)
if (err != nil) != test.WantErr {
t.Errorf("%s: got error %v, want error %v", test.URL, err, test.WantErr)
}
if err != nil {
continue
}
snapshot, err := v.Watch(ctx)
if (err != nil) != test.WantWatchErr {
t.Errorf("%s: got Watch error %v, want error %v", test.URL, err, test.WantWatchErr)
}
if err != nil {
continue
}
if !cmp.Equal(snapshot.Value, test.Want) {
t.Errorf("%s: got snapshot value\n%v\n want\n%v", test.URL, snapshot.Value, test.Want)
}
}
}
13 changes: 13 additions & 0 deletions runtimevar/etcdvar/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ func Example() {
cfg := snapshot.Value.(MyConfig)
_ = cfg
}

func Example_openVariable() {
// OpenVariable creates a *runtimevar.Variable from a URL.
// This example watches a variable based on a file-based blob.Bucket with JSON.
ctx := context.Background()
v, err := runtimevar.OpenVariable(ctx, "etcd://myvarname?client=my.etcd.server:9999")
if err != nil {
log.Fatal(err)
}

snapshot, err := v.Watch(ctx)
_, _ = snapshot, err
}
12 changes: 12 additions & 0 deletions runtimevar/filevar/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,15 @@ func Example() {
// Output:
// foo.com running on port 80
}

func Example_openVariable() {
// OpenVariable creates a *runtimevar.Variable from a URL.
ctx := context.Background()
v, err := runtimevar.OpenVariable(ctx, "file:///path/to/config.json?decoder=json")
if err != nil {
log.Fatal(err)
}

snapshot, err := v.Watch(ctx)
_, _ = snapshot, err
}
71 changes: 71 additions & 0 deletions runtimevar/filevar/filevar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
// * On macOS, if an empty file is copied into a configuration file,
// filevar will not detect the change.
//
// URLs
//
// For runtimevar.OpenVariable URLs, filevar registers for the scheme
// "file". For details on the format of the URL, see URLOpener.
// Example URL: file:///path/to/config.json?decoder=json
//
// As
//
// filevar does not support any types for As.
Expand All @@ -35,9 +41,12 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -46,6 +55,68 @@ import (
"gocloud.dev/runtimevar/driver"
)

func init() {
runtimevar.DefaultURLMux().RegisterVariable(Scheme, &URLOpener{})
}

// Scheme is the URL scheme filevar registers its URLOpener under on runtimevar.DefaultMux.
const Scheme = "file"

// URLOpener opens filevar URLs like "file:///path/to/config.json?decoder=json".
type URLOpener struct {
// Decoder and Options can be specified at URLOpener construction time,
// or provided/overridden via URL parameters.
Decoder *runtimevar.Decoder
Options Options
}

// OpenVariableURL opens the file variable at the URL's path.
// The URL's host+path is used as the path to the file to watch.
// If os.PathSeparator != "/", any leading "/" from the path is dropped
// and remaining '/' characters are converted to os.PathSeparator.
//
// In addition, the following URL parameters are supported:
// - decoder: The decoder to use. Defaults to URLOpener.Decoder, or
// runtimevar.BytesDecoder if URLOpener.Decoder is nil.
// See runtimevar.DecoderByName for supported values.
// - wait: The poll interval; supported values are from time.ParseDuration.
// Defaults to 30s.
func (o *URLOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) {
q := u.Query()
if decoderName := q.Get("decoder"); decoderName != "" || o.Decoder == nil {
var err error
o.Decoder, err = runtimevar.DecoderByName(q.Get("decoder"))
if err != nil {
return nil, fmt.Errorf("open variable %q: invalid \"decoder\": %v", u, err)
}
q.Del("decoder")
}

if wait := q.Get("wait"); wait != "" {
var err error
o.Options.WaitDuration, err = time.ParseDuration(wait)
if err != nil {
return nil, fmt.Errorf("open variable %q: invalid \"wait\": %v", u, err)
}
q.Del("wait")
}
for param := range q {
return nil, fmt.Errorf("open bucket %q: invalid query parameter %q", u, param)
}
return New(mungeURLPath(u.Path, os.PathSeparator), o.Decoder, &o.Options)
}

func mungeURLPath(path string, pathSeparator byte) string {
if pathSeparator != '/' {
path = strings.TrimPrefix(path, "/")
// TODO: use filepath.FromSlash instead; and remove the pathSeparator arg
// from this function. Test Windows behavior by opening a bucket on Windows.
// See #1075 for why Windows is disabled.
return strings.Replace(path, "/", string(pathSeparator), -1)
}
return path
}

// Options sets options.
type Options struct {
// WaitDuration controls the frequency of retries after an error. For example,
Expand Down
61 changes: 61 additions & 0 deletions runtimevar/filevar/filevar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package filevar
import (
"context"
"errors"
"github.com/google/go-cmp/cmp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move the import line.

"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -170,3 +171,63 @@ func TestNew(t *testing.T) {
})
}
}

func TestOpenVariable(t *testing.T) {
dir, err := ioutil.TempDir("", "gcdk-filevar-example")
if err != nil {
t.Fatal(err)
}
jsonPath := filepath.Join(dir, "myvar.json")
if err := ioutil.WriteFile(jsonPath, []byte(`{"Foo": "Bar"}`), 0666); err != nil {
t.Fatal(err)
}
txtPath := filepath.Join(dir, "myvar.txt")
if err := ioutil.WriteFile(txtPath, []byte("hello world!"), 0666); err != nil {
t.Fatal(err)
}
nonexistentPath := filepath.Join(dir, "filenotfound")
defer os.RemoveAll(dir)

tests := []struct {
URL string
WantErr bool
WantWatchErr bool
Want interface{}
}{
// Variable construction succeeds, but the file does not exist.
{"file://" + nonexistentPath, false, true, nil},
// Variable construction fails due to invalid wait arg.
{"file://" + txtPath + "?decoder=string&wait=notaduration", true, false, nil},
// Variable construction fails due to invalid decoder arg.
{"file://" + txtPath + "?decoder=notadecoder", true, false, nil},
// Variable construction fails due to invalid arg.
{"file://" + txtPath + "?param=value", true, false, nil},
// Working example with default decoder.
{"file://" + txtPath, false, false, []byte("hello world!")},
// Working example with string decoder and wait.
{"file://" + txtPath + "?decoder=string&wait=5s", false, false, "hello world!"},
// Working example with JSON decoder.
{"file://" + jsonPath + "?decoder=jsonmap", false, false, &map[string]interface{}{"Foo": "Bar"}},
vangent marked this conversation as resolved.
Show resolved Hide resolved
}

ctx := context.Background()
for _, test := range tests {
v, err := runtimevar.OpenVariable(ctx, test.URL)
if (err != nil) != test.WantErr {
t.Errorf("%s: got error %v, want error %v", test.URL, err, test.WantErr)
}
if err != nil {
continue
}
snapshot, err := v.Watch(ctx)
if (err != nil) != test.WantWatchErr {
t.Errorf("%s: got Watch error %v, want error %v", test.URL, err, test.WantWatchErr)
}
if err != nil {
continue
}
if !cmp.Equal(snapshot.Value, test.Want) {
t.Errorf("%s: got snapshot value\n%v\n want\n%v", test.URL, snapshot.Value, test.Want)
}
}
}
12 changes: 12 additions & 0 deletions runtimevar/paramstore/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,15 @@ func Example() {
cfg := snapshot.Value.(MyConfig)
_ = cfg
}

func Example_openVariable() {
// OpenVariable creates a *runtimevar.Variable from a URL.
ctx := context.Background()
v, err := runtimevar.OpenVariable(ctx, "paramstore://myvar?region=us-west-1")
if err != nil {
log.Fatal(err)
}

snapshot, err := v.Watch(ctx)
_, _ = snapshot, err
}
Loading