diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index b3496bc2a18..dc989cb270d 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -44,8 +44,8 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } } _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ - InboundTransport: "grpc", // TODO do we have a constant? - SpanFormat: JaegerFormatType, + InboundTransport: GRPCTransport, + SpanFormat: ProtoSpanFormat, }) if err != nil { g.logger.Error("cannot process spans", zap.Error(err)) diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index a12527d7b82..9c10dc3308a 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -85,7 +85,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) { return } batches := []*tJaeger.Batch{batch} - opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + opts := SubmitBatchOptions{InboundTransport: HTTPTransport} if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) return diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index dceb441dc11..6ceea8fdeab 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,8 +23,11 @@ import ( ) const ( - maxServiceNames = 2000 - otherServices = "other-services" + // TODO this needs to be configurable via CLI. + maxServiceNames = 4000 + + // otherServices is the catch-all label when number of services exceeds maxServiceNames + otherServices = "other-services" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -44,7 +47,7 @@ type SpanProcessorMetrics struct { SavedOkBySvc metricsBySvc // spans actually saved SavedErrBySvc metricsBySvc // spans failed to save serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector - spanCounts map[string]CountsBySpanType + spanCounts SpanCountsByFormat } type countsBySvc struct { @@ -61,23 +64,58 @@ type metricsBySvc struct { traces countsBySvc // number of traces originated per service } -// CountsBySpanType measures received, rejected, and receivedByService metrics for a format type -type CountsBySpanType struct { - // ReceivedBySvc maintain by-service metrics for a format type +// InboundTransport identifies the transport used to receive spans. +type InboundTransport string + +const ( + // GRPCTransport indicates spans received over gRPC. + GRPCTransport InboundTransport = "grpc" + // TChannelTransport indicates spans received over TChannel. + TChannelTransport InboundTransport = "tchannel" + // HTTPTransport indicates spans received over HTTP. + HTTPTransport InboundTransport = "http" + // UnknownTransport is the fallback/catch-all category. + UnknownTransport InboundTransport = "unknown" +) + +// SpanFormat identifies the data format in which the span was originally received. +type SpanFormat string + +const ( + // JaegerSpanFormat is for Jaeger Thrift spans. + JaegerSpanFormat SpanFormat = "jaeger" + // ZipkinSpanFormat is for Zipkin Thrift spans. + ZipkinSpanFormat SpanFormat = "zipkin" + // ProtoSpanFormat is for Jaeger protobuf Spans. + ProtoSpanFormat SpanFormat = "proto" + // UnknownSpanFormat is the fallback/catch-all category. + UnknownSpanFormat SpanFormat = "unknown" +) + +// SpanCountsByFormat groups metrics by different span formats (thrift, proto, etc.) +type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport + +// SpanCountsByTransport groups metrics by inbound transport (e.g http, grpc, tchannel) +type SpanCountsByTransport map[InboundTransport]SpanCounts + +// SpanCounts contains countrs for received and rejected spans. +type SpanCounts struct { + // ReceivedBySvc maintain by-service metrics. ReceivedBySvc metricsBySvc - // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service + // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service. RejectedBySvc metricsBySvc } // NewSpanProcessorMetrics returns a SpanProcessorMetrics -func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []string) *SpanProcessorMetrics { - spanCounts := map[string]CountsBySpanType{ - ZipkinFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": ZipkinFormatType}})), - JaegerFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": JaegerFormatType}})), - UnknownFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": UnknownFormatType}})), +func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []SpanFormat) *SpanProcessorMetrics { + spanCounts := SpanCountsByFormat{ + ZipkinSpanFormat: newCountsByTransport(serviceMetrics, ZipkinSpanFormat), + JaegerSpanFormat: newCountsByTransport(serviceMetrics, JaegerSpanFormat), + ProtoSpanFormat: newCountsByTransport(serviceMetrics, ProtoSpanFormat), + UnknownSpanFormat: newCountsByTransport(serviceMetrics, UnknownSpanFormat), } for _, otherFormatType := range otherFormatTypes { - spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}})) + spanCounts[otherFormatType] = newCountsByTransport(serviceMetrics, otherFormatType) } m := &SpanProcessorMetrics{ SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), @@ -118,20 +156,35 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in } } -func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { - return CountsBySpanType{ +func newCountsByTransport(factory metrics.Factory, format SpanFormat) SpanCountsByTransport { + factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"format": string(format)}}) + return SpanCountsByTransport{ + HTTPTransport: newCounts(factory, HTTPTransport), + TChannelTransport: newCounts(factory, TChannelTransport), + GRPCTransport: newCounts(factory, GRPCTransport), + UnknownTransport: newCounts(factory, UnknownTransport), + } +} + +func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts { + factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"transport": string(transport)}}) + return SpanCounts{ RejectedBySvc: newMetricsBySvc(factory, "rejected"), ReceivedBySvc: newMetricsBySvc(factory, "received"), } } -// GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format. -func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpanType { +// GetCountsForFormat gets the SpanCounts for a given format and transport. If none exists, we use the Unknown format. +func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat SpanFormat, transport InboundTransport) SpanCounts { c, ok := m.spanCounts[spanFormat] if !ok { - return m.spanCounts[UnknownFormatType] + c = m.spanCounts[UnknownSpanFormat] + } + t, ok := c[transport] + if !ok { + t = c[UnknownTransport] } - return c + return t } // reportServiceNameForSpan determines the name of the service that emitted @@ -183,7 +236,8 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) { if isDebug { debugStr = "true" } - c := m.factory.Counter(metrics.Options{Name: m.category, Tags: map[string]string{"svc": serviceName, "debug": debugStr}}) + tags := map[string]string{"svc": serviceName, "debug": debugStr} + c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags}) counts[serviceName] = c counter = c } else { diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 1b3ac468e5c..afcd3a7c15b 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -29,12 +29,15 @@ func TestProcessorMetrics(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil}) hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil}) - spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"}) - benderFormatMetrics := spm.GetCountsForFormat("bender") - assert.NotNil(t, benderFormatMetrics) - jFormat := spm.GetCountsForFormat(JaegerFormatType) - assert.NotNil(t, jFormat) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ + spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []SpanFormat{SpanFormat("scruffy")}) + benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPTransport) + assert.NotNil(t, benderFormatHTTPMetrics) + benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCTransport) + assert.NotNil(t, benderFormatGRPCMetrics) + + jTChannelFormat := spm.GetCountsForFormat(JaegerSpanFormat, TChannelTransport) + assert.NotNil(t, jTChannelFormat) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, }) mSpan := model.Span{ @@ -42,17 +45,17 @@ func TestProcessorMetrics(t *testing.T) { ServiceName: "fry", }, } - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) mSpan.Flags.SetDebug() - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) mSpan.ReplaceParentID(1234) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry"]) - assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry"]) + assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"]) assert.Empty(t, gauges) } diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 6591ce55ccc..625b3c14bd2 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -41,7 +41,7 @@ type options struct { blockingSubmit bool queueSize int reportBusy bool - extraFormatTypes []string + extraFormatTypes []SpanFormat } // Option is a function that sets some option on StorageBuilder. @@ -128,7 +128,7 @@ func (options) ReportBusy(reportBusy bool) Option { } // ExtraFormatTypes creates an Option that initializes the extra list of format types -func (options) ExtraFormatTypes(extraFormatTypes []string) Option { +func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option { return func(b *options) { b.extraFormatTypes = extraFormatTypes } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 4a44de0f23c..b4c372c0ca2 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -25,7 +25,7 @@ import ( ) func TestAllOptionSet(t *testing.T) { - types := []string{"sneh"} + types := []SpanFormat{SpanFormat("sneh")} opts := Options.apply( Options.ReportBusy(true), Options.BlockingSubmit(true), diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index c0bfa7b404c..0ef20bf3040 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -17,7 +17,7 @@ package app import ( "time" - "github.com/uber/tchannel-go" + tchannel "github.com/uber/tchannel-go" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" @@ -28,8 +28,8 @@ import ( // ProcessSpansOptions additional options passed to processor along with the spans. type ProcessSpansOptions struct { - SpanFormat string - InboundTransport string + SpanFormat SpanFormat + InboundTransport InboundTransport } // SpanProcessor handles model spans @@ -126,7 +126,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpans sp.metrics.BatchSize.Update(int64(len(mSpans))) retMe := make([]bool, len(mSpans)) for i, mSpan := range mSpans { - ok := sp.enqueueSpan(mSpan, options.SpanFormat) + ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport) if !ok && sp.reportBusy { return nil, tchannel.ErrServerBusy } @@ -140,8 +140,8 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) { sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } -func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool { - spanCounts := sp.metrics.GetCountsForFormat(originalFormat) +func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat, transport InboundTransport) bool { + spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport) spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) if !sp.filterSpan(span) { diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 4891bfbcf1f..6b143774108 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -38,13 +38,13 @@ func TestBySvcMetrics(t *testing.T) { allowedService := "bender" type TestCase struct { - format string + format SpanFormat serviceName string rootSpan bool debug bool } - spanFormat := [2]string{ZipkinFormatType, JaegerFormatType} + spanFormat := [2]SpanFormat{ZipkinSpanFormat, JaegerSpanFormat} serviceNames := [2]string{allowedService, blackListedService} rootSpanEnabled := [2]bool{true, false} debugEnabled := [2]bool{true, false} @@ -83,13 +83,13 @@ func TestBySvcMetrics(t *testing.T) { ) var metricPrefix, format string switch test.format { - case ZipkinFormatType: + case ZipkinSpanFormat: span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug) zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer()) zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{}) metricPrefix = "service" format = "zipkin" - case JaegerFormatType: + case JaegerSpanFormat: span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug) jHandler := NewJaegerSpanHandler(logger, processor) jHandler.SubmitBatches([]*jaeger.Batch{ @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) { expected := []metricstest.ExpectedMetric{} if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } if test.rootSpan { if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } } @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) { }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } mb.AssertCounterMetrics(t, expected...) @@ -213,7 +213,7 @@ func TestSpanProcessor(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) } @@ -236,8 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) - + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -295,7 +294,7 @@ func TestSpanProcessorBusy(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.Error(t, err, "expcting busy error") assert.Nil(t, res) diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index 6a9ca42d2b5..80cdd798e77 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -43,7 +43,9 @@ func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { - return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"}) + return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{ + InboundTransport: TChannelTransport, + }) } // SubmitBatches implements jaeger.TChanCollector. @@ -51,5 +53,7 @@ func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { - return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"}) + return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{ + InboundTransport: TChannelTransport, + }) } diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/thrift_span_handler.go index 1d44359d9e3..d7d9173aa29 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -25,18 +25,9 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -const ( - // JaegerFormatType is for Jaeger Spans - JaegerFormatType = "jaeger" - // ZipkinFormatType is for zipkin Spans - ZipkinFormatType = "zipkin" - // UnknownFormatType is for spans that do not have a widely defined/well-known format type - UnknownFormatType = "unknown" -) - // SubmitBatchOptions are passed to Submit methods of the handlers. type SubmitBatchOptions struct { - InboundTransport string + InboundTransport InboundTransport } // ZipkinSpansHandler consumes and handles zipkin spans @@ -74,7 +65,7 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options } oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: JaegerFormatType, + SpanFormat: JaegerSpanFormat, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -121,7 +112,7 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options } bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: ZipkinFormatType, + SpanFormat: ZipkinSpanFormat, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 8085c7ee06c..c94fe5ec97b 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -173,7 +173,7 @@ func gunzip(r io.ReadCloser) (*gzip.Reader, error) { func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { if len(tSpans) > 0 { - opts := app.SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + opts := app.SubmitBatchOptions{InboundTransport: app.HTTPTransport} if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(tSpans, opts); err != nil { return err }