Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inbound transport as label to collector metrics #1446

Merged
merged 14 commits into from
Apr 6, 2019
7 changes: 6 additions & 1 deletion cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

const (
// GRPCEndpoint is for gRPC endpoint
GRPCEndpoint = "GRPC"
guanw marked this conversation as resolved.
Show resolved Hide resolved
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
Expand All @@ -44,7 +49,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{
InboundTransport: "grpc", // TODO do we have a constant?
InboundTransport: GRPCEndpoint,
SpanFormat: JaegerFormatType,
})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
const (
// UnableToReadBodyErrFormat is an error message for invalid requests
UnableToReadBodyErrFormat = "Unable to process request body: %v"
// HTTPEndpoint is for HTTP endpoint
HTTPEndpoint = "HTTP"
)

var (
Expand Down Expand Up @@ -85,7 +87,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
return
}
batches := []*tJaeger.Batch{batch}
opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant?
opts := SubmitBatchOptions{InboundTransport: HTTPEndpoint}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
Expand Down
61 changes: 43 additions & 18 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import (
)

const (
maxServiceNames = 2000
otherServices = "other-services"
maxServiceNames = 2000
defaultTransportType = "Undefined"
otherServices = "other-services"
otherServicesViaHTTP = "other-services-via-http"
otherServicesViaTChannel = "other-services-via-tchannel"
otherServicesViaGRPC = "other-services-via-grpc"
otherServicesViaDefault = "other-services-via-undefined"
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand Down Expand Up @@ -104,20 +109,27 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc {
}

func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc {
// Add 3 to maxServiceNames threshold to compensate for extra slots taken by transport types
maxServiceNames = maxServiceNames + 3
Copy link
Contributor Author

@guanw guanw Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a good idea. I'm combining otherServices with transport types so this makes the user-defined counter space smaller. So I'm compensating 3 here to make the test pass. The difference is that when user have numOfServices > maxServiceNames they might notice in a metric graph like grafana the real total number of serviceNames shows maxServiceNames + 3

return countsBySvc{
counts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}),
},
debugCounts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}),
},
counts: newCountsByTransport(factory, category, "false"),
debugCounts: newCountsByTransport(factory, category, "true"),
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServiceNames,
category: category,
}
}

func newCountsByTransport(factory metrics.Factory, category string, debugFlag string) map[string]metrics.Counter {
return map[string]metrics.Counter{
otherServicesViaHTTP: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": HTTPEndpoint}}),
otherServicesViaTChannel: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": TChannelEndpoint}}),
otherServicesViaGRPC: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": GRPCEndpoint}}),
otherServicesViaDefault: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": defaultTransportType}}),
}
}

func newCountsBySpanType(factory metrics.Factory) CountsBySpanType {
return CountsBySpanType{
RejectedBySvc: newMetricsBySvc(factory, "rejected"),
Expand All @@ -136,26 +148,26 @@ func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpa

// reportServiceNameForSpan determines the name of the service that emitted
// the span and reports a counter stat.
func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span, endpoint string) {
serviceName := span.Process.ServiceName
if serviceName == "" {
return
}
m.countSpansByServiceName(serviceName, span.Flags.IsDebug())
m.countSpansByServiceName(serviceName, span.Flags.IsDebug(), endpoint)
if span.ParentSpanID() == 0 {
m.countTracesByServiceName(serviceName, span.Flags.IsDebug())
m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), endpoint)
}
}

// countSpansByServiceName counts how many spans are received per service.
func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) {
m.spans.countByServiceName(serviceName, isDebug)
func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool, endpoint string) {
m.spans.countByServiceName(serviceName, isDebug, endpoint)
}

// countTracesByServiceName counts how many traces are received per service,
// i.e. the counter is only incremented for the root spans.
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) {
m.traces.countByServiceName(serviceName, isDebug)
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, endpoint string) {
guanw marked this conversation as resolved.
Show resolved Hide resolved
m.traces.countByServiceName(serviceName, isDebug, endpoint)
}

// countByServiceName maintains a map of counters for each service name it's
Expand All @@ -168,7 +180,7 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool)
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpointType string) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
Expand All @@ -183,11 +195,24 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
if isDebug {
debugStr = "true"
}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: map[string]string{"svc": serviceName, "debug": debugStr}})
tags := map[string]string{"svc": serviceName, "debug": debugStr, "transport": defaultTransportType}
guanw marked this conversation as resolved.
Show resolved Hide resolved
if endpointType != "" {
tags["transport"] = endpointType
}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[serviceName] = c
counter = c
} else {
counter = counts[otherServices]
switch endpointType {
case HTTPEndpoint:
counter = counts[otherServicesViaHTTP]
case TChannelEndpoint:
counter = counts[otherServicesViaTChannel]
case GRPCEndpoint:
counter = counts[otherServicesViaGRPC]
default:
counter = counts[otherServicesViaDefault]
}
}
m.lock.Unlock()
counter.Inc(1)
Expand Down
47 changes: 25 additions & 22 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,50 @@ func TestProcessorMetrics(t *testing.T) {
assert.NotNil(t, jFormat)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
Process: &model.Process{},
})
}, TChannelEndpoint)
mSpan := model.Span{
Process: &model.Process{
ServiceName: "fry",
},
}
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
mSpan.Flags.SetDebug()
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
mSpan.ReplaceParentID(1234)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
counters, gauges := baseMetrics.Backend.Snapshot()

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint])
assert.Empty(t, gauges)
}

func TestNewCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3)

metrics.countByServiceName("fry", false)
metrics.countByServiceName("leela", false)
metrics.countByServiceName("bender", false)
metrics.countByServiceName("zoidberg", false)
metrics.countByServiceName("fry", false, "")
metrics.countByServiceName("leela", false, "")
metrics.countByServiceName("bender", false, "")
metrics.countByServiceName("zoidberg", false, "")
metrics.countByServiceName("zoidberg2", false, TChannelEndpoint)
metrics.countByServiceName("zoidberg3", false, HTTPEndpoint)

counters, _ := baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry|transport="+defaultTransportType])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela|transport="+defaultTransportType])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+TChannelEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+HTTPEndpoint])

metrics.countByServiceName("zoidberg", true)
metrics.countByServiceName("bender", true)
metrics.countByServiceName("leela", true)
metrics.countByServiceName("fry", true)
metrics.countByServiceName("zoidberg", true, GRPCEndpoint)
metrics.countByServiceName("bender", true, GRPCEndpoint)
metrics.countByServiceName("leela", true, GRPCEndpoint)
metrics.countByServiceName("fry", true, GRPCEndpoint)

counters, _ = baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+GRPCEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+GRPCEndpoint])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services|transport="+GRPCEndpoint])
}
14 changes: 7 additions & 7 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package app
import (
"time"

"github.com/uber/tchannel-go"
tchannel "github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
Expand Down Expand Up @@ -112,11 +112,11 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
startTime := time.Now()
if err := sp.spanWriter.WriteSpan(span); err != nil {
sp.logger.Error("Failed to save span", zap.Error(err))
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span, "")
} else {
sp.logger.Debug("Span written to the storage by the collector",
zap.Stringer("trace-id", span.TraceID), zap.Stringer("span-id", span.SpanID))
sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span)
sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span, "")
}
sp.metrics.SaveLatency.Record(time.Since(startTime))
}
Expand All @@ -126,7 +126,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpans
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, options.SpanFormat)
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport)
if !ok && sp.reportBusy {
return nil, tchannel.ErrServerBusy
}
Expand All @@ -140,12 +140,12 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool {
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat, endpointType string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span, endpointType)

if !sp.filterSpan(span) {
spanCounts.RejectedBySvc.ReportServiceNameForSpan(span)
spanCounts.RejectedBySvc.ReportServiceNameForSpan(span, endpointType)
return true // as in "not dropped", because it's actively rejected
}
item := &queueItem{
Expand Down
13 changes: 6 additions & 7 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) {
expected := []metricstest.ExpectedMetric{}
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
})
}
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
})
}
}
Expand All @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) {
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
})
}
mb.AssertCounterMetrics(t, expected...)
Expand Down Expand Up @@ -237,7 +237,6 @@ func TestSpanProcessorErrors(t *testing.T) {
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)

Expand All @@ -250,7 +249,7 @@ func TestSpanProcessorErrors(t *testing.T) {
}, logBuf.JSONLine(0))

expected := []metricstest.ExpectedMetric{{
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x", Value: 1,
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x|transport=" + defaultTransportType, Value: 1,
}}
mb.AssertCounterMetrics(t, expected...)
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

const (
// TChannelEndpoint is for TChannel endpoint
TChannelEndpoint = "TChannel"
)

// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector.
type TChannelHandler struct {
jaegerHandler JaegerBatchesHandler
Expand All @@ -43,13 +48,13 @@ func (h *TChannelHandler) SubmitZipkinBatch(
_ thrift.Context,
spans []*zipkincore.Span,
) ([]*zipkincore.Response, error) {
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: TChannelEndpoint})
}

// SubmitBatches implements jaeger.TChanCollector.
func (h *TChannelHandler) SubmitBatches(
_ thrift.Context,
batches []*jaeger.Batch,
) ([]*jaeger.BatchSubmitResponse, error) {
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: TChannelEndpoint})
}
Loading