diff --git a/cmd/agent/app/reporter/flags.go b/cmd/agent/app/reporter/flags.go index f45029ca58d..3fdfe904024 100644 --- a/cmd/agent/app/reporter/flags.go +++ b/cmd/agent/app/reporter/flags.go @@ -17,10 +17,10 @@ package reporter import ( "flag" "fmt" - "os" - "strings" "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/cmd/flags" ) const ( @@ -52,45 +52,6 @@ func AddFlags(flags *flag.FlagSet) { // InitFromViper initializes Options with properties retrieved from Viper. func (b *Options) InitFromViper(v *viper.Viper) *Options { b.ReporterType = Type(v.GetString(reporterType)) - b.AgentTags = parseAgentTags(v.GetString(agentTags)) + b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTags)) return b } - -// Parsing logic borrowed from jaegertracing/jaeger-client-go -func parseAgentTags(agentTags string) map[string]string { - if agentTags == "" { - return nil - } - tagPairs := strings.Split(string(agentTags), ",") - tags := make(map[string]string) - for _, p := range tagPairs { - kv := strings.SplitN(p, "=", 2) - k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) - - if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { - skipWhenEmpty := false - - ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2) - if len(ed) == 1 { - // no default value specified, set to empty - skipWhenEmpty = true - ed = append(ed, "") - } - - e, d := ed[0], ed[1] - v = os.Getenv(e) - if v == "" && d != "" { - v = d - } - - // no value is set, skip this entry - if v == "" && skipWhenEmpty { - continue - } - } - - tags[k] = v - } - - return tags -} diff --git a/cmd/collector/app/builder/builder_flags.go b/cmd/collector/app/builder/builder_flags.go index 04c80a4d2bc..e2775be6c3e 100644 --- a/cmd/collector/app/builder/builder_flags.go +++ b/cmd/collector/app/builder/builder_flags.go @@ -23,6 +23,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/ports" + + "github.com/jaegertracing/jaeger/cmd/flags" ) const ( @@ -31,6 +33,7 @@ const ( collectorPort = "collector.port" collectorHTTPPort = "collector.http-port" collectorGRPCPort = "collector.grpc-port" + collectorTags = "jaeger.tags" collectorZipkinHTTPort = "collector.zipkin.http-port" collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins" collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers" @@ -56,6 +59,8 @@ type CollectorOptions struct { CollectorGRPCPort int // TLS configures secure transport TLS tlscfg.Options + // CollectorTags is the string representing collector tags to append to each and every span + CollectorTags map[string]string // CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests CollectorZipkinHTTPPort int // CollectorZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from @@ -71,6 +76,7 @@ func AddFlags(flags *flag.FlagSet) { flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service") flags.Int(collectorHTTPPort, ports.CollectorHTTP, "The HTTP port for the collector service") flags.Int(collectorGRPCPort, ports.CollectorGRPC, "The gRPC port for the collector service") + flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411") flags.String(collectorZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all") flags.String(collectorZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type") @@ -84,6 +90,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions { cOpts.CollectorPort = v.GetInt(collectorPort) cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort) cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort) + cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort) cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins) cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders) diff --git a/cmd/collector/app/builder/span_handler_builder.go b/cmd/collector/app/builder/span_handler_builder.go index 88779939da6..fe7de2c9419 100644 --- a/cmd/collector/app/builder/span_handler_builder.go +++ b/cmd/collector/app/builder/span_handler_builder.go @@ -67,6 +67,7 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() ( app.Options.SpanFilter(defaultSpanFilter), app.Options.NumWorkers(spanHb.collectorOpts.NumWorkers), app.Options.QueueSize(spanHb.collectorOpts.QueueSize), + app.Options.CollectorTags(spanHb.collectorOpts.CollectorTags), ) return app.NewZipkinSpanHandler(spanHb.logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 6d94a6c52bc..a43ed32205f 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -43,6 +43,7 @@ type options struct { queueSize int reportBusy bool extraFormatTypes []SpanFormat + collectorTags map[string]string } // Option is a function that sets some option on StorageBuilder. @@ -135,6 +136,13 @@ func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option { } } +// CollectorTags creates an Option that initializes the extra tags to append to the spans flowing through this collector +func (options) CollectorTags(extraTags map[string]string) Option { + return func(b *options) { + b.collectorTags = extraTags + } +} + func (o options) apply(opts ...Option) options { ret := options{} for _, opt := range opts { diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 709d984e79a..2c335723540 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -50,6 +50,7 @@ type spanProcessor struct { spanWriter spanstore.Writer reportBusy bool numWorkers int + collectorTags map[string]string } type queueItem struct { @@ -95,6 +96,7 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso reportBusy: options.reportBusy, numWorkers: options.numWorkers, spanWriter: spanWriter, + collectorTags: options.collectorTags, } sp.processSpan = ChainedProcessSpan( options.preSave, @@ -159,6 +161,11 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat //add format tag span.Tags = append(span.Tags, model.String("internal.span.format", string(originalFormat))) + // append the collector tags + for k, v := range sp.collectorTags { + span.Tags = append(span.Tags, model.String(k, v)) + } + item := &queueItem{ queuedTime: time.Now(), span: span, diff --git a/cmd/flags/flags.go b/cmd/flags/flags.go index 78522f23579..f70c628ab99 100644 --- a/cmd/flags/flags.go +++ b/cmd/flags/flags.go @@ -18,6 +18,8 @@ package flags import ( "flag" "fmt" + "os" + "strings" "github.com/pkg/errors" "github.com/spf13/viper" @@ -50,6 +52,45 @@ func TryLoadConfigFile(v *viper.Viper) error { return nil } +// ParseJaegerTags parses the Jaeger tags string into a map. +func ParseJaegerTags(agentTags string) map[string]string { + if agentTags == "" { + return nil + } + tagPairs := strings.Split(string(agentTags), ",") + tags := make(map[string]string) + for _, p := range tagPairs { + kv := strings.SplitN(p, "=", 2) + k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) + + if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { + skipWhenEmpty := false + + ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2) + if len(ed) == 1 { + // no default value specified, set to empty + skipWhenEmpty = true + ed = append(ed, "") + } + + e, d := ed[0], ed[1] + v = os.Getenv(e) + if v == "" && d != "" { + v = d + } + + // no value is set, skip this entry + if v == "" && skipWhenEmpty { + continue + } + } + + tags[k] = v + } + + return tags +} + // SharedFlags holds flags configuration type SharedFlags struct { // Logging holds logging configuration