Skip to content

Commit

Permalink
Introduce grpc-1.30+ compatible client/v3/naming API.
Browse files Browse the repository at this point in the history
This is not yet implementation, just API and tests to be filled
with implementation in next CLs,
tracked by: etcd-io#12652

We propose here 3 packages:
 - clientv3/naming/endpoints ->
    That is abstraction layer over etcd that allows to write, read &
    watch Endpoints information. It's independent from GRPC API. It hides
    the storage details.

 - clientv3/naming/endpoints/internal ->
    That contains the grpc's compatible Update class to preserve the
    internal JSON mashalling format.

 - clientv3/naming/resolver ->
   That implements the GRPC resolver API, such that etcd can be
   used for connection.Dial in grpc.

Please see the grpc_naming.md document changes & grpcproxy/cluster.go
new integration, to see how the new abstractions work.

Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
ptabor authored and chaochn47 committed Oct 18, 2023
1 parent 3663ae1 commit 3298840
Show file tree
Hide file tree
Showing 11 changed files with 587 additions and 34 deletions.
48 changes: 34 additions & 14 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches

## Using etcd discovery with go-grpc

The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:

```go
import (
"go.etcd.io/etcd/clientv3"
etcdnaming "go.etcd.io/etcd/clientv3/naming"
resolver "go.etcd.io/etcd/clientv3/naming/resolver"

"google.golang.org/grpc"
)

...

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints

The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.

### Adding an endpoint

New endpoints can be added to the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
```

The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})

em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Deleting an endpoint

Hosts can be deleted from the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
```

The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
The etcd client's `endpoints.Manager` method also supports deleting endpoints:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
```

### Registering an endpoint with a lease
Expand All @@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease
```
In the golang:

```go
em := endpoints.NewManager(client, "foo/bar/my-service")
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Atomically updating endpoints

If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:

```
em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
31 changes: 17 additions & 14 deletions clientv3/naming/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
// Package naming provides:
// - subpackage endpoints: an abstraction layer to store and read endpoints
// information from etcd.
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
// services based on the endpoints configuration
//
// To use, first import the packages:
//
// import (
// "go.etcd.io/etcd/clientv3"
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
//
// "go.etcd.io/etcd/clientv3/naming/endpoints"
// "go.etcd.io/etcd/clientv3/naming/resolver"
// "google.golang.org/grpc"
// "google.golang.org/grpc/naming"
// )
//
// First, register new endpoint addresses for a service:
//
// func etcdAdd(c *clientv3.Client, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
// }
//
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
//
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// r := &etcdnaming.GRPCResolver{Client: c}
// b := grpc.RoundRobin(r)
// return grpc.Dial(service, grpc.WithBalancer(b))
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
//
// func etcdDelete(c *clientv3, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
// em := endpoints.NewManager(c, service)
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
// }
//
// Or register an expiring endpoint with a lease:
//
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
// }
package naming
96 changes: 96 additions & 0 deletions clientv3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoints

import (
"context"

"go.etcd.io/etcd/clientv3"
)

// Endpoint represents a single address the connection can be established with.
//
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
// Please document etcd version since which version each field is supported.
type Endpoint struct {
// Addr is the server address on which a connection will be established.
// Since etcd 3.1
Addr string

// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
// Since etcd 3.1
Metadata interface{}
}

type Operation uint8

const (
// Add indicates an Endpoint is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)

// Update describes a single edit action of an Endpoint.
type Update struct {
// Op - action Add or Delete.
Op Operation
Key string
Endpoint Endpoint
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint

// UpdateWithOpts describes endpoint update (add or delete) together
// with etcd options (e.g. to attach an endpoint to a lease).
type UpdateWithOpts struct {
Update
Opts []clientv3.OpOption
}

// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration.
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts}
}

// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion.
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts}
}

// Manager can be used to add/remove & inspect endpoints stored in etcd for
// a particular target.
type Manager interface {
// Update allows to atomically add/remove a few endpoints from etcd.
Update(ctx context.Context, updates []*UpdateWithOpts) error

// AddEndpoint registers a single endpoint in etcd.
// For more advanced use-cases use the Update method.
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error
// DeleteEndpoint deletes a single endpoint stored in etcd.
// For more advanced use-cases use the Update method.
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error

// List returns all the endpoints for the current target as a map.
List(ctx context.Context) (Key2EndpointMap, error)
// NewWatchChannel creates a channel that populates or endpoint updates.
// Cancel the 'ctx' to close the watcher.
NewWatchChannel(ctx context.Context) (WatchChannel, error)
}
135 changes: 135 additions & 0 deletions clientv3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoints

// TODO: The API is not yet implemented.

import (
"context"
"fmt"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/naming/endpoints/internal"
)

type endpointManager struct {
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
}

func NewManager(client *clientv3.Client, target string) (Manager, error) {
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
return nil, fmt.Errorf("Not implemented yet")
}

func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
// TODO: For loop in a single transaction:
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
switch internalUpdate.Op {
//case internal.Add:
// var v []byte
// if v, err = json.Marshal(internalUpdate); err != nil {
// return status.Error(codes.InvalidArgument, err.Error())
// }
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
//case internal.Delete:
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
//default:
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
}
return fmt.Errorf("Not implemented yet")
}

func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
}

func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
}

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")

// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
}

func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
// TODO: Implementation
return nil, fmt.Errorf("Not implemented yet")
}
Loading

0 comments on commit 3298840

Please sign in to comment.