Skip to content

Commit

Permalink
Merge pull request #102 from grafana/feat/configurable-grpc-srv
Browse files Browse the repository at this point in the history
grpcutil: Remove gRPC load balancer references
  • Loading branch information
aknuds1 committed Jan 4, 2022
2 parents 0b2e3ec + 1fb802d commit acbb881
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* [CHANGE] Remove package `math`. #104
* [CHANGE] time: Remove time package. #103
* [CHANGE] grpcutil: Convert Resolver into concrete type. #105
* [CHANGE] grpcutil.Resolver.Resolve: Take a service parameter. #102
* [CHANGE] grpcutil.Update: Remove gRPC LB related metadata. #102
* [ENHANCEMENT] Add middleware package. #38
* [ENHANCEMENT] Add the ring package #45
* [ENHANCEMENT] Add limiter package. #41
Expand Down
70 changes: 29 additions & 41 deletions grpcutil/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ func parseTarget(target string) (host, port string, err error) {
return "", "", fmt.Errorf("invalid target address %v", target)
}

// Resolve creates a watcher that watches the name resolution of the target.
func (r *Resolver) Resolve(target string) (Watcher, error) {
// Resolve creates a watcher that watches the SRV/hostname record resolution of the target.
//
// If service is not empty, the watcher will first attempt to resolve an SRV record.
// If that fails, or service is empty, hostname record resolution is attempted instead.
// If target can be parsed as an IP address, the watcher will return it, and will not send any more updates afterwards.
func (r *Resolver) Resolve(target, service string) (Watcher, error) {
host, port, err := parseTarget(target)
if err != nil {
return nil, err
Expand All @@ -119,22 +123,24 @@ func (r *Resolver) Resolve(target string) (Watcher, error) {

ctx, cancel := context.WithCancel(context.Background())
return &dnsWatcher{
r: r,
logger: r.logger,
host: host,
port: port,
ctx: ctx,
cancel: cancel,
t: time.NewTimer(0),
r: r,
logger: r.logger,
host: host,
port: port,
service: service,
ctx: ctx,
cancel: cancel,
t: time.NewTimer(0),
}, nil
}

// dnsWatcher watches for the name resolution update for a specific target
type dnsWatcher struct {
r *Resolver
logger log.Logger
host string
port string
r *Resolver
logger log.Logger
host string
port string
service string
// The latest resolved address set
curAddrs map[string]*Update
ctx context.Context
Expand Down Expand Up @@ -164,26 +170,6 @@ func (i *ipWatcher) Close() {
close(i.updateChan)
}

// AddressType indicates the address type returned by name resolution.
type AddressType uint8

const (
// Backend indicates the server is a backend server.
Backend AddressType = iota
// GRPCLB indicates the server is a grpclb load balancer.
GRPCLB
)

// AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The
// name resolver used by the grpclb balancer is required to provide this type of metadata in
// its address updates.
type AddrMetadataGRPCLB struct {
// AddrType is the type of server (grpc load balancer or backend).
AddrType AddressType
// ServerName is the name of the grpc load balancer. Used for authentication.
ServerName string
}

// compileUpdate compares the old resolved addresses and newly resolved addresses,
// and generates an update list
func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update {
Expand All @@ -203,27 +189,30 @@ func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update {
}

func (w *dnsWatcher) lookupSRV() map[string]*Update {
if w.service == "" {
return nil
}

newAddrs := make(map[string]*Update)
_, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host)
_, srvs, err := lookupSRV(w.ctx, w.service, "tcp", w.host)
if err != nil {
level.Info(w.logger).Log("msg", "failed DNS SRV record lookup", "err", err)
return nil
}
for _, s := range srvs {
lbAddrs, err := lookupHost(w.ctx, s.Target)
addrs, err := lookupHost(w.ctx, s.Target)
if err != nil {
level.Warn(w.logger).Log("msg", "failed load balancer address DNS lookup", "err", err)
level.Warn(w.logger).Log("msg", "failed SRV target DNS lookup", "target", s.Target, "err", err)
continue
}
for _, a := range lbAddrs {
for _, a := range addrs {
a, ok := formatIP(a)
if !ok {
level.Error(w.logger).Log("failed IP parsing", "err", err)
continue
}
addr := a + ":" + strconv.Itoa(int(s.Port))
newAddrs[addr] = &Update{Addr: addr,
Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}}
newAddrs[addr] = &Update{Addr: addr}
}
}
return newAddrs
Expand Down Expand Up @@ -251,8 +240,7 @@ func (w *dnsWatcher) lookupHost() map[string]*Update {
func (w *dnsWatcher) lookup() []*Update {
newAddrs := w.lookupSRV()
if newAddrs == nil {
// If failed to get any balancer address (either no corresponding SRV for the
// target, or caused by failure during resolution/parsing of the balancer target),
// If we failed to get any valid addresses from SRV record lookup,
// return any A record info available.
newAddrs = w.lookupHost()
}
Expand Down
2 changes: 1 addition & 1 deletion grpcutil/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Update struct {
Metadata interface{}
}

// Watcher watches for the updates on the specified target.
// Watcher watches for SRV updates on the specified target.
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
// updates. The first call should get the full set of the results. It should
Expand Down

0 comments on commit acbb881

Please sign in to comment.