Skip to content

Commit

Permalink
ETCD as a configuration provider #43 (#120)
Browse files Browse the repository at this point in the history
* doc(otetcd): fix comment error

* feat: add remote config provider, client use etcd

* fix: comment error

* test: improve test coverage

* test: improve test coverage

* test(remote): make sure the coroutine is run

* doc(otetcd): fix comment error

* feat: add remote config provider, client use etcd

* fix: comment error

* test: improve test coverage

* test: improve test coverage

* test(remote): make sure the coroutine is run

* docs: rename file to key

* fix: remove Put, use put in test file

* fix: change etcd config ip

Co-authored-by: 谷溪 <guxi99@gmail.com>
  • Loading branch information
GGXXLL and Reasno committed Apr 25, 2021
1 parent 46600b8 commit 7192cff
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 7 deletions.
15 changes: 15 additions & 0 deletions c.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"reflect"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/config/remote"
"github.com/DoNewsCode/core/config/watcher"
"github.com/DoNewsCode/core/container"
"github.com/DoNewsCode/core/contract"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/confmap"
"github.com/knadh/koanf/providers/file"
clientv3 "go.etcd.io/etcd/client/v3"
)

// C stands for the core of the application. It contains service definitions and
Expand Down Expand Up @@ -89,6 +91,19 @@ func WithYamlFile(path string) (CoreOption, CoreOption) {
WithConfigWatcher(watcher.File{Path: path})
}

// WithRemoteYamlFile is a two-in-one coreOption. It uses the remote key on etcd as the
// source of configuration, and watches the change of that key for hot reloading.
func WithRemoteYamlFile(key string, cfg clientv3.Config) (CoreOption, CoreOption) {
r := remote.Provider(key, &cfg)
return WithRemote(r)
}

// WithRemote is a two-in-one coreOption. It uses the remote key on etcd as the
// source of configuration, and watches the change of that key for hot reloading.
func WithRemote(r *remote.Remote) (CoreOption, CoreOption) {
return WithConfigStack(r, yaml.Parser()), WithConfigWatcher(r)
}

// WithInline is a CoreOption that creates a inline config in the configuration stack.
func WithInline(key, entry string) CoreOption {
return WithConfigStack(confmap.Provider(map[string]interface{}{
Expand Down
40 changes: 37 additions & 3 deletions c_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"testing"
"time"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/config/remote"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/events"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/otgorm"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3"
)

func TestC_Serve(t *testing.T) {
Expand Down Expand Up @@ -106,6 +108,10 @@ func TestC_Default(t *testing.T) {
c.AddModuleFunc(config.New)

f, _ := ioutil.TempFile("./", "*")
defer func() {
f.Close()
os.Remove(f.Name())
}()

rootCommand := &cobra.Command{}
c.ApplyRootCommand(rootCommand)
Expand All @@ -114,7 +120,21 @@ func TestC_Default(t *testing.T) {

output, _ := ioutil.ReadFile(f.Name())
assert.Contains(t, string(output), "gorm:")
os.Remove(f.Name())
}

func TestC_Remote(t *testing.T) {
cfg := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 2 * time.Second,
}
_ = remote.Provider("config.yaml", &cfg)
if err := put(cfg, "config.yaml", "name: remote"); err != nil {
t.Fatal(err)
}

c := New(WithRemoteYamlFile("config.yaml", cfg))
c.ProvideEssentials()
assert.Equal(t, "remote", c.String("name"))
}

type m1 struct {
Expand Down Expand Up @@ -142,3 +162,17 @@ func TestC_Provide(t *testing.T) {
func() m2 { return m2{} },
})
}

func put(cfg clientv3.Config, key, val string) error {
client, err := clientv3.New(cfg)
if err != nil {
return err
}
defer client.Close()

_, err = client.Put(context.Background(), key, val)
if err != nil {
return err
}
return nil
}
77 changes: 77 additions & 0 deletions config/remote/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package remote

import (
"context"
"errors"
"fmt"

"go.etcd.io/etcd/client/v3"
)

// Remote is a core.ConfProvider and contract.ConfigWatcher implementation to read and watch remote config key.
// The remote client uses etcd.
type Remote struct {
key string
clientConfig *clientv3.Config
}

// Provider create a *Remote
func Provider(key string, clientConfig *clientv3.Config) *Remote {
return &Remote{
key: key,
clientConfig: clientConfig,
}
}

// ReadBytes reads the contents of a key from etcd and returns the bytes.
func (r *Remote) ReadBytes() ([]byte, error) {
client, err := clientv3.New(*r.clientConfig)
if err != nil {
return nil, err
}
defer client.Close()

resp, err := client.Get(context.Background(), r.key)
if err != nil {
return nil, err
}
if resp.Count == 0 {
return nil, fmt.Errorf("no such config key: %s", r.key)
}

return resp.Kvs[0].Value, nil
}

// Read is not supported by the remote provider.
func (r *Remote) Read() (map[string]interface{}, error) {
return nil, errors.New("remote provider does not support this method")
}

// Watch watches the change to the remote key from etcd. If the key is edited or created, the reload function
// will be called. note the reload function should not just load the changes made within this key, but rather
// it should reload the whole config stack. For example, if the flag or env takes precedence over the config
// key, they should remain to be so after the key changes.
func (r *Remote) Watch(ctx context.Context, reload func() error) error {
client, err := clientv3.New(*r.clientConfig)
if err != nil {
return err
}
defer client.Close()

rch := client.Watch(ctx, r.key)
for {
select {
case resp := <-rch:
if resp.Err() != nil {
return resp.Err()
}
// Trigger event.
if err := reload(); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

128 changes: 128 additions & 0 deletions config/remote/remote_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package remote

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3"
)

func TestRemote(t *testing.T) {
cfg := &clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 2 * time.Second,
}

r := Provider("config.yaml", cfg)

var testVal = "name: app"
// PREPARE TEST DATA
if err := put(r, testVal); err != nil {
t.Fatal(err)
}

_, err := r.Read()
assert.Error(t, err)

bytes, err := r.ReadBytes()
assert.NoError(t, err)
assert.Equal(t, testVal, string(bytes))

var ch = make(chan string)
go r.Watch(context.Background(), func() error {
bytes, err := r.ReadBytes()
if err != nil {
ch <- ""
return err
}
ch <- string(bytes)
return nil
})

time.Sleep(1 * time.Second)

if err := put(r, testVal); err != nil {
t.Fatal(err)
}

newVal := <-ch
assert.Equal(t, testVal, newVal)
}

func TestError(t *testing.T) {
var (
r *Remote
err error
)

cfg := &clientv3.Config{
Endpoints: []string{},
}

r = Provider("config.yaml", cfg)
err = put(r, "test")
assert.Error(t, err)

_, err = r.ReadBytes()
assert.Error(t, err)

err = r.Watch(context.Background(), func() error {
return nil
})
assert.Error(t, err)

cfg = &clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 2 * time.Second,
}
r = Provider("config-test1", cfg)
_, err = r.ReadBytes()
assert.Error(t, err)

r = Provider("config-test2", cfg)

// Confirm that the two coroutines are finished
g := sync.WaitGroup{}
g.Add(2)
go func() {
err := r.Watch(context.Background(), func() error {
return fmt.Errorf("for test")
})
assert.Error(t, err)
g.Done()
}()

go func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := r.Watch(ctx, func() error {
return fmt.Errorf("for test")
})
assert.Error(t, err)
g.Done()
}()

time.Sleep(1 * time.Second)
if err := put(r, "name: test"); err != nil {
t.Fatal(err)
}
g.Wait()
}

func put(r *Remote, val string) error {
client, err := clientv3.New(*r.clientConfig)
if err != nil {
return err
}
defer client.Close()

_, err = client.Put(context.Background(), r.key, val)
if err != nil {
return err
}
return nil
}
3 changes: 1 addition & 2 deletions default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package core
import (
stdlog "log"

"github.com/DoNewsCode/core/di"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/events"
"github.com/DoNewsCode/core/logging"
"github.com/go-kit/kit/log"
Expand Down
4 changes: 2 additions & 2 deletions otetcd/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type Maker interface {
Make(name string) (*clientv3.Client, error)
}

// Factory is a *di.Factory that creates redis.UniversalClient using a
// Factory is a *di.Factory that creates *clientv3.Client using a
// specific configuration entry.
type Factory struct {
*di.Factory
}

// Make creates redis.UniversalClient using a specific configuration entry.
// Make creates *clientv3.Client using a specific configuration entry.
func (r Factory) Make(name string) (*clientv3.Client, error) {
client, err := r.Factory.Make(name)
if err != nil {
Expand Down

0 comments on commit 7192cff

Please sign in to comment.