Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "index out of range" when receiving a dual client/server Zipkin span #4160

Merged
merged 5 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cmd/collector/app/handler/testdata/zipkin_v1_merged_spans.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[
{
"traceId": "4ed11df465275600",
"id": "4ed11df465275600",
"name": "get /",
"timestamp": 1633073248674949,
"duration": 14861,
"localEndpoint": {
"serviceName": "first_service",
"ipv4": "10.0.2.15"
},
"annotations": [
{
"timestamp": 1633073248674949,
"value": "sr"
},
{
"timestamp": 163307324868981,
"value": "ss"
}
]
},
{
"traceId": "4ed11df465275600",
"parentId": "4ed11df465275600",
"id": "c943743e25dc2cdf",
"name": "get /api",
"timestamp": 1633073248678309,
"duration": 3360,
"localEndpoint": {
"serviceName": "first_service",
"ipv4": "10.0.2.15"
},
"annotations": [
{
"timestamp": 1633073248678309,
"value": "cs"
},
{
"timestamp": 1633073248681669,
"value": "sr"
},
{
"timestamp": 1633073248685029,
"value": "cr"
},
{
"timestamp": 1633073248688388,
"value": "ss"
}
]
}
]
24 changes: 19 additions & 5 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcess
// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
convCount := make([]int, len(spans))
for i, span := range spans {
sanitized := h.sanitizer.Sanitize(span)
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...)
// conversion may return more than one span, e.g. when the input Zipkin span represents both client & server spans
converted := convertZipkinToModel(sanitized, h.logger)
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
Expand All @@ -121,13 +125,23 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
return nil, err
}
responses := make([]*zipkincore.Response, len(spans))
for i, ok := range bools {
// at this point we may have len(spans) < len(bools) if conversion results in more spans
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
b := 0 // index through bools which we advance by convCount[i] for each iteration
for i := range spans {
res := zipkincore.NewResponse()
res.Ok = ok
res.Ok = true
for j := 0; j < convCount[i]; j++ {
res.Ok = res.Ok && bools[b]
b++
}
responses[i] = res
}

h.logger.Debug("Zipkin span batch processed by the collector.", zap.Int("span-count", len(spans)))
h.logger.Debug(
"Zipkin span batch processed by the collector.",
zap.Int("received-span-count", len(spans)),
zap.Int("processed-span-count", len(mSpans)),
)
return responses, nil
}

Expand Down
65 changes: 47 additions & 18 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package handler

import (
"errors"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
Expand Down Expand Up @@ -82,31 +85,57 @@ func (s *shouldIErrorProcessor) Close() error {
}

func TestZipkinSpanHandler(t *testing.T) {
testChunks := []struct {
tests := []struct {
name string
expectedErr error
filename string
}{
{
name: "good case",
expectedErr: nil,
},
{
name: "bad case",
expectedErr: errTestError,
},
{
name: "dual client-server span",
expectedErr: nil,
filename: "testdata/zipkin_v1_merged_spans.json",
},
}
for _, tc := range testChunks {
logger := zap.NewNop()
h := NewZipkinSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil}, zipkin.NewParentIDSanitizer())
res, err := h.SubmitZipkinBatch([]*zipkincore.Span{
{
ID: 12345,
},
}, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
} else {
assert.Len(t, res, 1)
assert.NoError(t, err)
assert.True(t, res[0].Ok)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := zap.NewNop()
h := NewZipkinSpanHandler(
logger,
&shouldIErrorProcessor{tc.expectedErr != nil},
zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...),
)
var spans []*zipkincore.Span
if tc.filename != "" {
data, err := os.ReadFile(tc.filename)
require.NoError(t, err)
spans, err = zipkindeser.DeserializeJSON(data)
require.NoError(t, err)
} else {
spans = []*zipkincore.Span{
{
ID: 12345,
},
}
}
res, err := h.SubmitZipkinBatch(spans, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
} else {
assert.Len(t, res, len(spans))
assert.NoError(t, err)
for i := range res {
assert.True(t, res[i].Ok)
}
}
})
}
}
5 changes: 3 additions & 2 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -100,7 +100,8 @@ func TestBySvcMetrics(t *testing.T) {
switch test.format {
case processor.ZipkinSpanFormat:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := handler.NewZipkinSpanHandler(logger, sp, zipkinSanitizer.NewParentIDSanitizer())
sanitizer := zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...)
zHandler := handler.NewZipkinSpanHandler(logger, sp, sanitizer)
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
Expand Down
7 changes: 4 additions & 3 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin"
"github.com/jaegertracing/jaeger/swagger-gen/models"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
case "application/x-thrift":
tSpans, err = zipkin.DeserializeThrift(bodyBytes)
case "application/json":
tSpans, err = DeserializeJSON(bodyBytes)
tSpans, err = zipkindeser.DeserializeJSON(bodyBytes)
default:
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
return
Expand Down Expand Up @@ -170,7 +171,7 @@ func jsonToThriftSpansV2(bodyBytes []byte, zipkinV2Formats strfmt.Registry) ([]*
return nil, err
}

tSpans, err := spansV2ToThrift(spans)
tSpans, err := zipkindeser.SpansV2ToThrift(spans)
if err != nil {
return nil, err
}
Expand All @@ -183,7 +184,7 @@ func protoToThriftSpansV2(bodyBytes []byte) ([]*zipkincore.Span, error) {
return nil, err
}

tSpans, err := protoSpansV2ToThrift(&spans)
tSpans, err := zipkindeser.ProtoSpansV2ToThrift(&spans)
if err != nil {
return nil, err
}
Expand Down
Loading