Skip to content

Commit

Permalink
dns: Added miekgdns resolver as a hidden option to query and ruler.
Browse files Browse the repository at this point in the history
Fixes: #1015

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 10, 2019
1 parent c0bfe35 commit 2ac8ea3
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ New tracing span:
:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
- [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client).
This to have SRV resolution working on [Golang 1.11+ with KubeDNS below v1.14](https://github.com/golang/go/issues/27546)

### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
fileSDInterval := modelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback.").
Default("5m"))

// TODO(bwplotka): Grab this from TTL at some point.
dnsSDInterval := modelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions.").
Default("30s"))

dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)).
Default(string(dns.GolangResolverType)).String()

unhealthyStoreTimeout := modelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m"))

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified.").
Expand Down Expand Up @@ -152,6 +156,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*enablePartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
)
}
Expand Down Expand Up @@ -269,6 +274,7 @@ func runQuery(
enablePartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand All @@ -287,6 +293,7 @@ func runQuery(
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
Expand Down
12 changes: 9 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
dnsSDInterval := modelDuration(cmd.Flag("query.sd-dns-interval", "Interval between DNS resolutions.").
Default("30s"))

dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]").
Default("golang").Hidden().String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand Down Expand Up @@ -173,6 +176,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
*queries,
fileSD,
time.Duration(*dnsSDInterval),
*dnsSDResolver,
)
}
}
Expand Down Expand Up @@ -206,6 +210,7 @@ func runRule(
queryAddrs []string,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
dnsSDResolver string,
) error {
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_rule_config_last_reload_successful",
Expand Down Expand Up @@ -272,11 +277,12 @@ func runRule(
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgrs = thanosrule.Managers{}
)
Expand Down Expand Up @@ -641,9 +647,9 @@ type alertmanagerSet struct {
current []*url.URL
}

func newAlertmanagerSet(addrs []string) *alertmanagerSet {
func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(),
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.0.8 // indirect
github.com/miekg/dns v1.0.8
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mozillazg/go-cos v0.11.0
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5u
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-autorest v10.8.1+incompatible h1:u0jVQf+a6k6x8A+sT60l6EY9XZu+kHdnZVPAYqpVRo0=
github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0=
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down Expand Up @@ -272,6 +270,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.19.0 h1:+jrnNy8MR4GZXvwF9PEuSyHxA4NaTf6601oNRwCSXq0=
Expand Down
149 changes: 149 additions & 0 deletions pkg/discovery/dns/miekgdns/lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package miekgdns

import (
"bytes"
"net"

"github.com/miekg/dns"
"github.com/pkg/errors"
)

// Copied and slightly adjusted from Prometheus DNS SD:
// https://github.com/prometheus/prometheus/blob/be3c082539d85908ce03b6d280f83343e7c930eb/discovery/dns/dns.go#L212

// lookupWithSearchPath tries to get an answer for various permutations of
// the given name, appending the system-configured search path as necessary.
//
// There are three possible outcomes:
//
// 1. One of the permutations of the given name is recognized as
// "valid" by the DNS, in which case we consider ourselves "done"
// and that answer is returned. Note that, due to the way the DNS
// handles "name has resource records, but none of the specified type",
// the answer received may have an empty set of results.
//
// 2. All of the permutations of the given name are responded to by one of
// the servers in the "nameservers" list with the answer "that name does
// not exist" (NXDOMAIN). In that case, it can be considered
// pseudo-authoritative that there are no records for that name.
//
// 3. One or more of the names was responded to by all servers with some
// sort of error indication. In that case, we can't know if, in fact,
// there are records for the name or not, so whatever state the
// configuration is in, we should keep it that way until we know for
// sure (by, presumably, all the names getting answers in the future).
//
// Outcomes 1 and 2 are indicated by a valid response message (possibly an
// empty one) and no error. Outcome 3 is indicated by an error return. The
// error will be generic-looking, because trying to return all the errors
// returned by the combination of all name permutations and servers is a
// nightmare.
func (r *Resolver) lookupWithSearchPath(name string, qtype dns.Type) (*dns.Msg, error) {
conf, err := dns.ClientConfigFromFile(r.ResolvConf)
if err != nil {
return nil, errors.Wrapf(err, "could not load resolv.conf: %s", err)
}

var errs []error
for _, lname := range conf.NameList(name) {
response, err := lookupFromAnyServer(lname, qtype, conf)
if err != nil {
// We can't go home yet, because a later name
// may give us a valid, successful answer. However
// we can no longer say "this name definitely doesn't
// exist", because we did not get that answer for
// at least one name.
errs = append(errs, err)
continue
}

if response.Rcode == dns.RcodeSuccess {
// Outcome 1: GOLD!
return response, nil
}
}

if len(errs) == 0 {
// Outcome 2: everyone says NXDOMAIN.
return &dns.Msg{}, nil
}
// Outcome 3: boned.
return nil, errors.Errorf("could not resolve %q: all servers responded with errors to at least one search domain. Errs %s", name, fmtErrs(errs))
}

// lookupFromAnyServer uses all configured servers to try and resolve a specific
// name. If a viable answer is received from a server, then it is
// immediately returned, otherwise the other servers in the config are
// tried, and if none of them return a viable answer, an error is returned.
//
// A "viable answer" is one which indicates either:
//
// 1. "yes, I know that name, and here are its records of the requested type"
// (RCODE==SUCCESS, ANCOUNT > 0);
// 2. "yes, I know that name, but it has no records of the requested type"
// (RCODE==SUCCESS, ANCOUNT==0); or
// 3. "I know that name doesn't exist" (RCODE==NXDOMAIN).
//
// A non-viable answer is "anything else", which encompasses both various
// system-level problems (like network timeouts) and also
// valid-but-unexpected DNS responses (SERVFAIL, REFUSED, etc).
func lookupFromAnyServer(name string, qtype dns.Type, conf *dns.ClientConfig) (*dns.Msg, error) {
client := &dns.Client{}

var errs []error

// TODO(bwplotka): Worth to do fanout and grab fastest as golang native lib?
for _, server := range conf.Servers {
servAddr := net.JoinHostPort(server, conf.Port)
msg, err := askServerForName(name, qtype, client, servAddr, true)
if err != nil {
errs = append(errs, errors.Wrapf(err, "resolution against server %s for %s", server, name))
continue
}

if msg.Rcode == dns.RcodeSuccess || msg.Rcode == dns.RcodeNameError {
return msg, nil
}
}

return nil, errors.Errorf("could not resolve %s: no servers returned a viable answer. Errs %v", name, fmtErrs(errs))
}

func fmtErrs(errs []error) string {
b := bytes.Buffer{}
for _, err := range errs {
b.WriteString(";")
b.WriteString(err.Error())
}
return b.String()
}

// askServerForName makes a request to a specific DNS server for a specific
// name (and qtype). Retries with TCP in the event of response truncation,
// but otherwise just sends back whatever the server gave, whether that be a
// valid-looking response, or an error.
func askServerForName(name string, qType dns.Type, client *dns.Client, servAddr string, edns bool) (*dns.Msg, error) {
msg := &dns.Msg{}

msg.SetQuestion(dns.Fqdn(name), uint16(qType))
if edns {
msg.SetEdns0(dns.DefaultMsgSize, false)
}

response, _, err := client.Exchange(msg, servAddr)
if err != nil {
return nil, errors.Wrapf(err, "exchange")
}

if response.Truncated {
if client.Net == "tcp" {
return nil, errors.Errorf("got truncated message on TCP (64kiB limit exceeded?)")
}

// TCP fallback.
client.Net = "tcp"
return askServerForName(name, qType, client, servAddr, false)
}

return response, nil
}
71 changes: 71 additions & 0 deletions pkg/discovery/dns/miekgdns/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package miekgdns

import (
"context"
"net"

"github.com/miekg/dns"
"github.com/pkg/errors"
)

// DefaultResolvConfPath is a common, default resolv.conf file present on linux server.
const DefaultResolvConfPath = "/etc/resolv.conf"

// Resolver is a drop-in Resolver for *part* of std lib Golang net.DefaultResolver methods.
type Resolver struct {
ResolvConf string
}

func (r *Resolver) LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) {
var target string
if service == "" && proto == "" {
target = name
} else {
target = "_" + service + "._" + proto + "." + name
}

response, err := r.lookupWithSearchPath(target, dns.Type(dns.TypeSRV))
if err != nil {
return "", nil, err
}

for _, record := range response.Answer {
switch addr := record.(type) {
case *dns.SRV:
addrs = append(addrs, &net.SRV{
Weight: addr.Weight,
Target: addr.Target,
Priority: addr.Priority,
Port: addr.Port,
})
default:
return "", nil, errors.Errorf("invalid SRV response record %s", record)
}
}

return "", addrs, nil
}

func (r *Resolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
response, err := r.lookupWithSearchPath(host, dns.Type(dns.TypeAAAA))
if err != nil || len(response.Answer) == 0 {
// Ugly fallback to A lookup.
response, err = r.lookupWithSearchPath(host, dns.Type(dns.TypeA))
if err != nil {
return nil, err
}
}

var resp []net.IPAddr
for _, record := range response.Answer {
switch addr := record.(type) {
case *dns.A:
resp = append(resp, net.IPAddr{IP: addr.A})
case *dns.AAAA:
resp = append(resp, net.IPAddr{IP: addr.AAAA})
default:
return nil, errors.Errorf("invalid A or AAAA response record %s", record)
}
}
return resp, nil
}
Loading

0 comments on commit 2ac8ea3

Please sign in to comment.