Skip to content

Commit

Permalink
Collector tags. jaegertracing#1844
Browse files Browse the repository at this point in the history
  • Loading branch information
radekg committed Oct 9, 2019
1 parent 1616539 commit 3eeb346
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 42 deletions.
45 changes: 3 additions & 42 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ package reporter
import (
"flag"
"fmt"
"os"
"strings"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/flags"
)

const (
Expand Down Expand Up @@ -52,45 +52,6 @@ func AddFlags(flags *flag.FlagSet) {
// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
b.AgentTags = parseAgentTags(v.GetString(agentTags))
b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTags))
return b
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) map[string]string {
if agentTags == "" {
return nil
}
tagPairs := strings.Split(string(agentTags), ",")
tags := make(map[string]string)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
skipWhenEmpty := false

ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
if len(ed) == 1 {
// no default value specified, set to empty
skipWhenEmpty = true
ed = append(ed, "")
}

e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}

// no value is set, skip this entry
if v == "" && skipWhenEmpty {
continue
}
}

tags[k] = v
}

return tags
}
7 changes: 7 additions & 0 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"

"github.com/jaegertracing/jaeger/cmd/flags"
)

const (
Expand All @@ -31,6 +33,7 @@ const (
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorGRPCPort = "collector.grpc-port"
collectorTags = "jaeger.tags"
collectorZipkinHTTPort = "collector.zipkin.http-port"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
Expand All @@ -56,6 +59,8 @@ type CollectorOptions struct {
CollectorGRPCPort int
// TLS configures secure transport
TLS tlscfg.Options
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPPort int
// CollectorZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from
Expand All @@ -71,6 +76,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service")
flags.Int(collectorHTTPPort, ports.CollectorHTTP, "The HTTP port for the collector service")
flags.Int(collectorGRPCPort, ports.CollectorGRPC, "The gRPC port for the collector service")
flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411")
flags.String(collectorZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all")
flags.String(collectorZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type")
Expand All @@ -84,6 +90,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort)
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
Expand Down
1 change: 1 addition & 0 deletions cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (
app.Options.SpanFilter(defaultSpanFilter),
app.Options.NumWorkers(spanHb.collectorOpts.NumWorkers),
app.Options.QueueSize(spanHb.collectorOpts.QueueSize),
app.Options.CollectorTags(spanHb.collectorOpts.CollectorTags),
)

return app.NewZipkinSpanHandler(spanHb.logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
Expand Down
8 changes: 8 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type options struct {
queueSize int
reportBusy bool
extraFormatTypes []SpanFormat
collectorTags map[string]string
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -135,6 +136,13 @@ func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option {
}
}

// CollectorTags creates an Option that initializes the extra tags to append to the spans flowing through this collector
func (options) CollectorTags(extraTags map[string]string) Option {
return func(b *options) {
b.collectorTags = extraTags
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
7 changes: 7 additions & 0 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type spanProcessor struct {
spanWriter spanstore.Writer
reportBusy bool
numWorkers int
collectorTags map[string]string
}

type queueItem struct {
Expand Down Expand Up @@ -95,6 +96,7 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
reportBusy: options.reportBusy,
numWorkers: options.numWorkers,
spanWriter: spanWriter,
collectorTags: options.collectorTags,
}
sp.processSpan = ChainedProcessSpan(
options.preSave,
Expand Down Expand Up @@ -159,6 +161,11 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat
//add format tag
span.Tags = append(span.Tags, model.String("internal.span.format", string(originalFormat)))

// append the collector tags
for k, v := range sp.collectorTags {
span.Tags = append(span.Tags, model.String(k, v))
}

item := &queueItem{
queuedTime: time.Now(),
span: span,
Expand Down
41 changes: 41 additions & 0 deletions cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package flags
import (
"flag"
"fmt"
"os"
"strings"

"github.com/pkg/errors"
"github.com/spf13/viper"
Expand Down Expand Up @@ -50,6 +52,45 @@ func TryLoadConfigFile(v *viper.Viper) error {
return nil
}

// ParseJaegerTags parses the Jaeger tags string into a map.
func ParseJaegerTags(agentTags string) map[string]string {
if agentTags == "" {
return nil
}
tagPairs := strings.Split(string(agentTags), ",")
tags := make(map[string]string)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
skipWhenEmpty := false

ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
if len(ed) == 1 {
// no default value specified, set to empty
skipWhenEmpty = true
ed = append(ed, "")
}

e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}

// no value is set, skip this entry
if v == "" && skipWhenEmpty {
continue
}
}

tags[k] = v
}

return tags
}

// SharedFlags holds flags configuration
type SharedFlags struct {
// Logging holds logging configuration
Expand Down

0 comments on commit 3eeb346

Please sign in to comment.