diff --git a/cmd/collector/app/handler/testdata/zipkin_v1_merged_spans.json b/cmd/collector/app/handler/testdata/zipkin_v1_merged_spans.json new file mode 100644 index 00000000000..8a714836ad5 --- /dev/null +++ b/cmd/collector/app/handler/testdata/zipkin_v1_merged_spans.json @@ -0,0 +1,53 @@ +[ + { + "traceId": "4ed11df465275600", + "id": "4ed11df465275600", + "name": "get /", + "timestamp": 1633073248674949, + "duration": 14861, + "localEndpoint": { + "serviceName": "first_service", + "ipv4": "10.0.2.15" + }, + "annotations": [ + { + "timestamp": 1633073248674949, + "value": "sr" + }, + { + "timestamp": 163307324868981, + "value": "ss" + } + ] + }, + { + "traceId": "4ed11df465275600", + "parentId": "4ed11df465275600", + "id": "c943743e25dc2cdf", + "name": "get /api", + "timestamp": 1633073248678309, + "duration": 3360, + "localEndpoint": { + "serviceName": "first_service", + "ipv4": "10.0.2.15" + }, + "annotations": [ + { + "timestamp": 1633073248678309, + "value": "cs" + }, + { + "timestamp": 1633073248681669, + "value": "sr" + }, + { + "timestamp": 1633073248685029, + "value": "cr" + }, + { + "timestamp": 1633073248688388, + "value": "ss" + } + ] + } +] diff --git a/cmd/collector/app/handler/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go index fd232a846f0..3448a3fbc62 100644 --- a/cmd/collector/app/handler/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -108,9 +108,13 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcess // SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format. func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) { mSpans := make([]*model.Span, 0, len(spans)) - for _, span := range spans { + convCount := make([]int, len(spans)) + for i, span := range spans { sanitized := h.sanitizer.Sanitize(span) - mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...) + // conversion may return more than one span, e.g. when the input Zipkin span represents both client & server spans + converted := convertZipkinToModel(sanitized, h.logger) + convCount[i] = len(converted) + mSpans = append(mSpans, converted...) } bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ InboundTransport: options.InboundTransport, @@ -121,13 +125,23 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options return nil, err } responses := make([]*zipkincore.Response, len(spans)) - for i, ok := range bools { + // at this point we may have len(spans) < len(bools) if conversion results in more spans + b := 0 // index through bools which we advance by convCount[i] for each iteration + for i := range spans { res := zipkincore.NewResponse() - res.Ok = ok + res.Ok = true + for j := 0; j < convCount[i]; j++ { + res.Ok = res.Ok && bools[b] + b++ + } responses[i] = res } - h.logger.Debug("Zipkin span batch processed by the collector.", zap.Int("span-count", len(spans))) + h.logger.Debug( + "Zipkin span batch processed by the collector.", + zap.Int("received-span-count", len(spans)), + zap.Int("processed-span-count", len(mSpans)), + ) return responses, nil } diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index 88f712db596..5fb1b79e0cc 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -17,13 +17,16 @@ package handler import ( "errors" + "os" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" + zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" + "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -82,31 +85,57 @@ func (s *shouldIErrorProcessor) Close() error { } func TestZipkinSpanHandler(t *testing.T) { - testChunks := []struct { + tests := []struct { + name string expectedErr error + filename string }{ { + name: "good case", expectedErr: nil, }, { + name: "bad case", expectedErr: errTestError, }, + { + name: "dual client-server span", + expectedErr: nil, + filename: "testdata/zipkin_v1_merged_spans.json", + }, } - for _, tc := range testChunks { - logger := zap.NewNop() - h := NewZipkinSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil}, zipkin.NewParentIDSanitizer()) - res, err := h.SubmitZipkinBatch([]*zipkincore.Span{ - { - ID: 12345, - }, - }, SubmitBatchOptions{}) - if tc.expectedErr != nil { - assert.Nil(t, res) - assert.Equal(t, tc.expectedErr, err) - } else { - assert.Len(t, res, 1) - assert.NoError(t, err) - assert.True(t, res[0].Ok) - } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger := zap.NewNop() + h := NewZipkinSpanHandler( + logger, + &shouldIErrorProcessor{tc.expectedErr != nil}, + zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...), + ) + var spans []*zipkincore.Span + if tc.filename != "" { + data, err := os.ReadFile(tc.filename) + require.NoError(t, err) + spans, err = zipkindeser.DeserializeJSON(data) + require.NoError(t, err) + } else { + spans = []*zipkincore.Span{ + { + ID: 12345, + }, + } + } + res, err := h.SubmitZipkinBatch(spans, SubmitBatchOptions{}) + if tc.expectedErr != nil { + assert.Nil(t, res) + assert.Equal(t, tc.expectedErr, err) + } else { + assert.Len(t, res, len(spans)) + assert.NoError(t, err) + for i := range res { + assert.True(t, res[i].Ok) + } + } + }) } } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 864baf47993..00c87f871ff 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -30,7 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" + zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -100,7 +100,8 @@ func TestBySvcMetrics(t *testing.T) { switch test.format { case processor.ZipkinSpanFormat: span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug) - zHandler := handler.NewZipkinSpanHandler(logger, sp, zipkinSanitizer.NewParentIDSanitizer()) + sanitizer := zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...) + zHandler := handler.NewZipkinSpanHandler(logger, sp, sanitizer) zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{}) metricPrefix = "service" format = "zipkin" diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index a6e6b06aa77..cd41fcf101d 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin" "github.com/jaegertracing/jaeger/swagger-gen/models" @@ -93,7 +94,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { case "application/x-thrift": tSpans, err = zipkin.DeserializeThrift(bodyBytes) case "application/json": - tSpans, err = DeserializeJSON(bodyBytes) + tSpans, err = zipkindeser.DeserializeJSON(bodyBytes) default: http.Error(w, "Unsupported Content-Type", http.StatusBadRequest) return @@ -170,7 +171,7 @@ func jsonToThriftSpansV2(bodyBytes []byte, zipkinV2Formats strfmt.Registry) ([]* return nil, err } - tSpans, err := spansV2ToThrift(spans) + tSpans, err := zipkindeser.SpansV2ToThrift(spans) if err != nil { return nil, err } @@ -183,7 +184,7 @@ func protoToThriftSpansV2(bodyBytes []byte) ([]*zipkincore.Span, error) { return nil, err } - tSpans, err := protoSpansV2ToThrift(&spans) + tSpans, err := zipkindeser.ProtoSpansV2ToThrift(&spans) if err != nil { return nil, err } diff --git a/cmd/collector/app/zipkin/http_handler_test.go b/cmd/collector/app/zipkin/http_handler_test.go index 43a1edc16ea..4a78cb49251 100644 --- a/cmd/collector/app/zipkin/http_handler_test.go +++ b/cmd/collector/app/zipkin/http_handler_test.go @@ -18,6 +18,7 @@ package zipkin import ( "bytes" "compress/gzip" + "crypto/rand" "encoding/json" "fmt" "io" @@ -35,6 +36,7 @@ import ( zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + zm "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks" zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -91,19 +93,17 @@ func TestViaClient(t *testing.T) { } func waitForSpans(t *testing.T, handler *mockZipkinHandler, expecting int) { - deadline := time.Now().Add(2 * time.Second) - for { - if time.Now().After(deadline) { - t.Error("never received a span") - return - } - if have := len(handler.getSpans()); expecting != have { - time.Sleep(time.Millisecond) - continue - } - break - } - assert.Len(t, handler.getSpans(), expecting) + assert.Eventuallyf( + t, + func() bool { + return len(handler.getSpans()) == expecting + }, + 2*time.Second, + time.Millisecond, + "expecting to receive %d span(s), have %d span(s)", + expecting, + len(handler.getSpans()), + ) } func TestThriftFormat(t *testing.T) { @@ -116,66 +116,86 @@ func TestThriftFormat(t *testing.T) { assert.EqualValues(t, "", resBodyStr) } -func TestJsonFormat(t *testing.T) { +func TestZipkinJsonV1Format(t *testing.T) { server, handler := initializeTestServer(nil) + mockHandler := handler.zipkinSpansHandler.(*mockZipkinHandler) defer server.Close() - endpJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 65535) - annoJSON := createAnno("cs", 1515, endpJSON) - binAnnoJSON := createBinAnno("http.status_code", "200", endpJSON) - spanJSON := createSpan("bar", "1234567891234565", "1234567891234567", "1234567891234568", 156, 15145, false, - annoJSON, binAnnoJSON) - statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json")) - assert.NoError(t, err) - assert.EqualValues(t, http.StatusAccepted, statusCode) - assert.EqualValues(t, "", resBodyStr) - waitForSpans(t, handler.zipkinSpansHandler.(*mockZipkinHandler), 1) - recdSpan := handler.zipkinSpansHandler.(*mockZipkinHandler).getSpans()[0] - require.Len(t, recdSpan.Annotations, 1) - require.NotNil(t, recdSpan.Annotations[0].Host) - assert.EqualValues(t, -1, recdSpan.Annotations[0].Host.Port, "Port 65535 must be represented as -1 in zipkin.thrift") - - statusCode, resBodyStr, err = postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json; charset=utf-8")) - assert.NoError(t, err) - assert.EqualValues(t, http.StatusAccepted, statusCode) - assert.EqualValues(t, "", resBodyStr) + var ( + endpJSON = zm.CreateEndpoint("foo", "127.0.0.1", "2001:db8::c001", 65535) + annoJSON = zm.CreateAnno("cs", 1515, endpJSON) + binAnnoJSON = zm.CreateBinAnno("http.status_code", "200", endpJSON) + spanJSON = zm.CreateSpan("bar", "1234567891234565", "1234567891234567", "1234567891234568", 156, 15145, false, + annoJSON, binAnnoJSON) + endpErrJSON = zm.CreateEndpoint("", "127.0.0.A", "", 80) + ) - endpErrJSON := createEndpoint("", "127.0.0.A", "", 80) + t.Run("good span", func(t *testing.T) { + statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json")) + assert.NoError(t, err) + assert.EqualValues(t, http.StatusAccepted, statusCode) + assert.EqualValues(t, "", resBodyStr) + waitForSpans(t, handler.zipkinSpansHandler.(*mockZipkinHandler), 1) + recdSpan := mockHandler.getSpans()[0] + require.Len(t, recdSpan.Annotations, 1) + require.NotNil(t, recdSpan.Annotations[0].Host) + assert.EqualValues(t, -1, recdSpan.Annotations[0].Host.Port, "Port 65535 must be represented as -1 in zipkin.thrift") + }) + + t.Run("good span with utf8", func(t *testing.T) { + statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json; charset=utf-8")) + assert.NoError(t, err) + assert.EqualValues(t, http.StatusAccepted, statusCode) + assert.EqualValues(t, "", resBodyStr) + }) + + // give distinct error string to the handler which actually indicates that processing until then was successful + fakeErr := fmt.Errorf("processing till this point was good!") + handler.zipkinSpansHandler.(*mockZipkinHandler).err = fakeErr - // error zipkinSpanHandler - handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead") tests := []struct { + name string payload string expected string statusCode int }{ { + name: "good span", payload: spanJSON, - expected: "Cannot submit Zipkin batch: Bad times ahead\n", + expected: fakeErr.Error(), statusCode: http.StatusInternalServerError, }, { - payload: createSpan("bar", "", "1", "1", 156, 15145, false, annoJSON, binAnnoJSON), + name: "empty span id", + payload: zm.CreateSpan("bar", "", "1", "1", 156, 15145, false, annoJSON, binAnnoJSON), expected: "Unable to process request body: strconv.ParseUint: parsing "": invalid syntax\n", statusCode: http.StatusBadRequest, }, { - payload: createSpan("bar", "ZTA", "1", "1", 156, 15145, false, "", ""), + name: "invalid span id", + payload: zm.CreateSpan("bar", "ZTA", "1", "1", 156, 15145, false, "", ""), expected: "Unable to process request body: strconv.ParseUint: parsing "ZTA": invalid syntax\n", statusCode: http.StatusBadRequest, }, { - payload: createSpan("bar", "1", "", "1", 156, 15145, false, "", createAnno("cs", 1, endpErrJSON)), + name: "invalid ipv4 address", + payload: zm.CreateSpan("bar", "1", "", "1", 156, 15145, false, "", zm.CreateAnno("cs", 1, endpErrJSON)), expected: "Unable to process request body: wrong ipv4\n", statusCode: http.StatusBadRequest, }, } for _, test := range tests { - statusCode, resBodyStr, err = postBytes(server.URL+`/api/v1/spans`, []byte(test.payload), createHeader("application/json")) - require.NoError(t, err) - assert.EqualValues(t, test.statusCode, statusCode) - assert.EqualValues(t, test.expected, resBodyStr) + t.Run(test.name, func(t *testing.T) { + statusCode, resBodyStr, err := postBytes( + server.URL+`/api/v1/spans`, + []byte(test.payload), + createHeader("application/json"), + ) + require.NoError(t, err) + assert.EqualValues(t, test.statusCode, statusCode) + assert.Contains(t, resBodyStr, test.expected) + }) } } @@ -287,18 +307,18 @@ func TestSaveProtoSpansV2(t *testing.T) { server, handler := initializeTestServer(nil) defer server.Close() - validID := randBytesOfLen(8) - validTraceID := randBytesOfLen(16) + validID := randBytesOfLen(t, 8) + validTraceID := randBytesOfLen(t, 16) tests := []struct { Span zipkinProto.Span StatusCode int resBody string }{ - {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, LocalEndpoint: &zipkinProto.Endpoint{Ipv4: randBytesOfLen(4)}, Kind: zipkinProto.Span_CLIENT}, StatusCode: http.StatusAccepted}, - {Span: zipkinProto.Span{Id: randBytesOfLen(4)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for SpanID\n"}, - {Span: zipkinProto.Span{Id: validID, TraceId: randBytesOfLen(32)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for TraceID\n"}, - {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, ParentId: randBytesOfLen(16)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for SpanID\n"}, - {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, LocalEndpoint: &zipkinProto.Endpoint{Ipv4: randBytesOfLen(2)}}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: wrong Ipv4\n"}, + {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, LocalEndpoint: &zipkinProto.Endpoint{Ipv4: randBytesOfLen(t, 4)}, Kind: zipkinProto.Span_CLIENT}, StatusCode: http.StatusAccepted}, + {Span: zipkinProto.Span{Id: randBytesOfLen(t, 4)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for SpanID\n"}, + {Span: zipkinProto.Span{Id: validID, TraceId: randBytesOfLen(t, 32)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for TraceID\n"}, + {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, ParentId: randBytesOfLen(t, 16)}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: invalid length for SpanID\n"}, + {Span: zipkinProto.Span{Id: validID, TraceId: validTraceID, LocalEndpoint: &zipkinProto.Endpoint{Ipv4: randBytesOfLen(t, 2)}}, StatusCode: http.StatusBadRequest, resBody: "Unable to process request body: wrong Ipv4\n"}, } for _, test := range tests { l := zipkinProto.ListOfSpans{ @@ -398,3 +418,11 @@ func postBytes(urlStr string, bytesBody []byte, header *http.Header) (int, strin } return res.StatusCode, string(body), nil } + +func randBytesOfLen(t *testing.T, n int) []byte { + b := make([]byte, n) + nn, err := rand.Read(b) + require.NoError(t, err) + require.Equal(t, n, nn) + return b +} diff --git a/cmd/collector/app/zipkin/annotation.go b/cmd/collector/app/zipkin/zipkindeser/annotation.go similarity index 98% rename from cmd/collector/app/zipkin/annotation.go rename to cmd/collector/app/zipkin/zipkindeser/annotation.go index c4be6e087e0..05798f402a7 100644 --- a/cmd/collector/app/zipkin/annotation.go +++ b/cmd/collector/app/zipkin/zipkindeser/annotation.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" diff --git a/cmd/collector/app/zipkin/annotation_test.go b/cmd/collector/app/zipkin/zipkindeser/annotation_test.go similarity index 99% rename from cmd/collector/app/zipkin/annotation_test.go rename to cmd/collector/app/zipkin/zipkindeser/annotation_test.go index 5f3d388dc5e..86d57a4579e 100644 --- a/cmd/collector/app/zipkin/annotation_test.go +++ b/cmd/collector/app/zipkin/zipkindeser/annotation_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "testing" diff --git a/cmd/collector/app/zipkin/fixtures/zipkin_01.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_01.json similarity index 100% rename from cmd/collector/app/zipkin/fixtures/zipkin_01.json rename to cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_01.json diff --git a/cmd/collector/app/zipkin/fixtures/zipkin_02.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_02.json similarity index 100% rename from cmd/collector/app/zipkin/fixtures/zipkin_02.json rename to cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_02.json diff --git a/cmd/collector/app/zipkin/fixtures/zipkin_03.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_03.json similarity index 100% rename from cmd/collector/app/zipkin/fixtures/zipkin_03.json rename to cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_03.json diff --git a/cmd/collector/app/zipkin/fixtures/zipkin_proto_01.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_proto_01.json similarity index 100% rename from cmd/collector/app/zipkin/fixtures/zipkin_proto_01.json rename to cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_proto_01.json diff --git a/cmd/collector/app/zipkin/fixtures/zipkin_proto_02.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_proto_02.json similarity index 100% rename from cmd/collector/app/zipkin/fixtures/zipkin_proto_02.json rename to cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_proto_02.json diff --git a/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_v1_merged_spans.json b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_v1_merged_spans.json new file mode 100644 index 00000000000..8a714836ad5 --- /dev/null +++ b/cmd/collector/app/zipkin/zipkindeser/fixtures/zipkin_v1_merged_spans.json @@ -0,0 +1,53 @@ +[ + { + "traceId": "4ed11df465275600", + "id": "4ed11df465275600", + "name": "get /", + "timestamp": 1633073248674949, + "duration": 14861, + "localEndpoint": { + "serviceName": "first_service", + "ipv4": "10.0.2.15" + }, + "annotations": [ + { + "timestamp": 1633073248674949, + "value": "sr" + }, + { + "timestamp": 163307324868981, + "value": "ss" + } + ] + }, + { + "traceId": "4ed11df465275600", + "parentId": "4ed11df465275600", + "id": "c943743e25dc2cdf", + "name": "get /api", + "timestamp": 1633073248678309, + "duration": 3360, + "localEndpoint": { + "serviceName": "first_service", + "ipv4": "10.0.2.15" + }, + "annotations": [ + { + "timestamp": 1633073248678309, + "value": "cs" + }, + { + "timestamp": 1633073248681669, + "value": "sr" + }, + { + "timestamp": 1633073248685029, + "value": "cr" + }, + { + "timestamp": 1633073248688388, + "value": "ss" + } + ] + } +] diff --git a/cmd/collector/app/zipkin/json.go b/cmd/collector/app/zipkin/zipkindeser/json.go similarity index 99% rename from cmd/collector/app/zipkin/json.go rename to cmd/collector/app/zipkin/zipkindeser/json.go index 0c3842d5058..c28faecef70 100644 --- a/cmd/collector/app/zipkin/json.go +++ b/cmd/collector/app/zipkin/zipkindeser/json.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "bytes" diff --git a/cmd/collector/app/zipkin/json_test.go b/cmd/collector/app/zipkin/zipkindeser/json_test.go similarity index 82% rename from cmd/collector/app/zipkin/json_test.go rename to cmd/collector/app/zipkin/zipkindeser/json_test.go index 3ac8edb18ee..2f58a9b4652 100644 --- a/cmd/collector/app/zipkin/json_test.go +++ b/cmd/collector/app/zipkin/zipkindeser/json_test.go @@ -13,45 +13,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "encoding/json" "errors" - "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + m "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var ( - endpointFmt = `{"serviceName": "%s", "ipv4": "%s", "ipv6": "%s", "port": %d}` - annoFmt = `{"value": "%s", "timestamp": %d, "endpoint": %s}` - binaAnnoFmt = `{"key": "%s", "value": "%s", "endpoint": %s}` - spanFmt = `[{"name": "%s", "id": "%s", "parentId": "%s", "traceId": "%s", "timestamp": %d, "duration": %d, "debug": %t, "annotations": [%s], "binaryAnnotations": [%s]}]` -) - -func createEndpoint(serviveName string, ipv4 string, ipv6 string, port int) string { - return fmt.Sprintf(endpointFmt, serviveName, ipv4, ipv6, port) -} - -func createAnno(val string, ts int, endpoint string) string { - return fmt.Sprintf(annoFmt, val, ts, endpoint) -} - -func createBinAnno(key string, val string, endpoint string) string { - return fmt.Sprintf(binaAnnoFmt, key, val, endpoint) -} - -func createSpan(name string, id string, parentID string, traceID string, ts int64, duration int64, debug bool, - anno string, binAnno string, -) string { - return fmt.Sprintf(spanFmt, name, id, parentID, traceID, ts, duration, debug, anno, binAnno) -} - func TestDecodeWrongJson(t *testing.T) { spans, err := DeserializeJSON([]byte("")) require.Error(t, err) @@ -60,7 +35,7 @@ func TestDecodeWrongJson(t *testing.T) { func TestUnmarshalEndpoint(t *testing.T) { endp := &endpoint{} - err := json.Unmarshal([]byte(createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66)), endp) + err := json.Unmarshal([]byte(m.CreateEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66)), endp) require.NoError(t, err) assert.Equal(t, "foo", endp.ServiceName) assert.Equal(t, "127.0.0.1", endp.IPv4) @@ -69,8 +44,8 @@ func TestUnmarshalEndpoint(t *testing.T) { func TestUnmarshalAnnotation(t *testing.T) { anno := &annotation{} - endpointJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) - err := json.Unmarshal([]byte(createAnno("bar", 154, endpointJSON)), anno) + endpointJSON := m.CreateEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) + err := json.Unmarshal([]byte(m.CreateAnno("bar", 154, endpointJSON)), anno) require.NoError(t, err) assert.Equal(t, "bar", anno.Value) assert.Equal(t, int64(154), anno.Timestamp) @@ -79,8 +54,8 @@ func TestUnmarshalAnnotation(t *testing.T) { func TestUnmarshalBinAnnotation(t *testing.T) { binAnno := &binaryAnnotation{} - endpointJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) - err := json.Unmarshal([]byte(createBinAnno("foo", "bar", endpointJSON)), binAnno) + endpointJSON := m.CreateEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) + err := json.Unmarshal([]byte(m.CreateBinAnno("foo", "bar", endpointJSON)), binAnno) require.NoError(t, err) assert.Equal(t, "foo", binAnno.Key) assert.Equal(t, "bar", binAnno.Value.(string)) @@ -153,10 +128,10 @@ func TestUnmarshalBinAnnotationNumberValue(t *testing.T) { } func TestUnmarshalSpan(t *testing.T) { - endpJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) - annoJSON := createAnno("cs", 1515, endpJSON) - binAnnoJSON := createBinAnno("http.status_code", "200", endpJSON) - spanJSON := createSpan("bar", "1234567891234567", "1234567891234567", "1234567891234567", 156, 15145, false, + endpJSON := m.CreateEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) + annoJSON := m.CreateAnno("cs", 1515, endpJSON) + binAnnoJSON := m.CreateBinAnno("http.status_code", "200", endpJSON) + spanJSON := m.CreateSpan("bar", "1234567891234567", "1234567891234567", "1234567891234567", 156, 15145, false, annoJSON, binAnnoJSON) spans, err := decode([]byte(spanJSON)) @@ -173,7 +148,7 @@ func TestUnmarshalSpan(t *testing.T) { assert.Equal(t, 1, len(spans[0].Annotations)) assert.Equal(t, 1, len(spans[0].BinaryAnnotations)) - spans, err = decode([]byte(createSpan("bar", "1234567891234567", "1234567891234567", "1234567891234567", + spans, err = decode([]byte(m.CreateSpan("bar", "1234567891234567", "1234567891234567", "1234567891234567", 156, 15145, false, "", ""))) require.NoError(t, err) assert.Equal(t, 1, len(spans)) @@ -183,34 +158,31 @@ func TestUnmarshalSpan(t *testing.T) { func TestIncorrectSpanIds(t *testing.T) { // id missing - spanJSON := createSpan("bar", "", "1", "2", 156, 15145, false, "", "") + spanJSON := m.CreateSpan("bar", "", "1", "2", 156, 15145, false, "", "") spans, err := DeserializeJSON([]byte(spanJSON)) require.Error(t, err) assert.Equal(t, "strconv.ParseUint: parsing \"\": invalid syntax", err.Error()) assert.Nil(t, spans) // id longer than 32 - spanJSON = createSpan("bar", "123456789123456712345678912345678", "1", "2", + spanJSON = m.CreateSpan("bar", "123456789123456712345678912345678", "1", "2", 156, 15145, false, "", "") spans, err = DeserializeJSON([]byte(spanJSON)) require.Error(t, err) assert.Equal(t, "SpanID cannot be longer than 16 hex characters: 123456789123456712345678912345678", err.Error()) assert.Nil(t, spans) // traceId missing - spanJSON = createSpan("bar", "2", "1", "", 156, 15145, false, - "", "") + spanJSON = m.CreateSpan("bar", "2", "1", "", 156, 15145, false, "", "") spans, err = DeserializeJSON([]byte(spanJSON)) require.Error(t, err) assert.Equal(t, "strconv.ParseUint: parsing \"\": invalid syntax", err.Error()) assert.Nil(t, spans) // 128 bit traceId - spanJSON = createSpan("bar", "2", "1", "12345678912345671234567891234567", 156, 15145, false, - "", "") + spanJSON = m.CreateSpan("bar", "2", "1", "12345678912345671234567891234567", 156, 15145, false, "", "") spans, err = DeserializeJSON([]byte(spanJSON)) require.NoError(t, err) assert.NotNil(t, spans) // wrong 128 bit traceId - spanJSON = createSpan("bar", "22", "12", "#2345678912345671234567891234562", 156, 15145, false, - "", "") + spanJSON = m.CreateSpan("bar", "22", "12", "#2345678912345671234567891234562", 156, 15145, false, "", "") spans, err = DeserializeJSON([]byte(spanJSON)) require.Error(t, err) assert.Nil(t, spans) diff --git a/cmd/collector/app/zipkin/jsonv2.go b/cmd/collector/app/zipkin/zipkindeser/jsonv2.go similarity index 97% rename from cmd/collector/app/zipkin/jsonv2.go rename to cmd/collector/app/zipkin/zipkindeser/jsonv2.go index 35a15f70073..d4cf454e873 100644 --- a/cmd/collector/app/zipkin/jsonv2.go +++ b/cmd/collector/app/zipkin/zipkindeser/jsonv2.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "github.com/jaegertracing/jaeger/model" @@ -20,7 +20,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -func spansV2ToThrift(spans models.ListOfSpans) ([]*zipkincore.Span, error) { +// SpansV2ToThrift converts Zipkin V2 JSON to Zipkin Thrift. +func SpansV2ToThrift(spans models.ListOfSpans) ([]*zipkincore.Span, error) { tSpans := make([]*zipkincore.Span, 0, len(spans)) for _, span := range spans { tSpan, err := spanV2ToThrift(span) diff --git a/cmd/collector/app/zipkin/jsonv2_test.go b/cmd/collector/app/zipkin/zipkindeser/jsonv2_test.go similarity index 97% rename from cmd/collector/app/zipkin/jsonv2_test.go rename to cmd/collector/app/zipkin/zipkindeser/jsonv2_test.go index 610fa292def..bb72507932a 100644 --- a/cmd/collector/app/zipkin/jsonv2_test.go +++ b/cmd/collector/app/zipkin/zipkindeser/jsonv2_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "os" @@ -31,7 +31,7 @@ import ( func TestFixtures(t *testing.T) { var spans models.ListOfSpans loadJSON(t, "fixtures/zipkin_01.json", &spans) - tSpans, err := spansV2ToThrift(spans) + tSpans, err := SpansV2ToThrift(spans) require.NoError(t, err) assert.Equal(t, len(tSpans), 1) var pid int64 = 1 @@ -58,7 +58,7 @@ func TestFixtures(t *testing.T) { func TestLCFromLocalEndpoint(t *testing.T) { var spans models.ListOfSpans loadJSON(t, "fixtures/zipkin_02.json", &spans) - tSpans, err := spansV2ToThrift(spans) + tSpans, err := SpansV2ToThrift(spans) require.NoError(t, err) assert.Equal(t, len(tSpans), 1) var ts int64 = 1 @@ -78,7 +78,7 @@ func TestLCFromLocalEndpoint(t *testing.T) { func TestMissingKafkaEndpoint(t *testing.T) { var spans models.ListOfSpans loadJSON(t, "fixtures/zipkin_03.json", &spans) - tSpans, err := spansV2ToThrift(spans) + tSpans, err := SpansV2ToThrift(spans) require.NoError(t, err) assert.Equal(t, 1, len(tSpans)) var ts int64 = 1597704629675602 @@ -197,7 +197,7 @@ func TestErrEndpoints(t *testing.T) { func TestErrSpans(t *testing.T) { id := "z" - tSpans, err := spansV2ToThrift(models.ListOfSpans{&models.Span{ID: &id}}) + tSpans, err := SpansV2ToThrift(models.ListOfSpans{&models.Span{ID: &id}}) require.Error(t, err) require.Nil(t, tSpans) assert.Equal(t, err.Error(), "strconv.ParseUint: parsing \"z\": invalid syntax") diff --git a/cmd/collector/app/zipkin/protov2.go b/cmd/collector/app/zipkin/zipkindeser/protov2.go similarity index 97% rename from cmd/collector/app/zipkin/protov2.go rename to cmd/collector/app/zipkin/zipkindeser/protov2.go index bce0298a752..47dbf5c1ba1 100644 --- a/cmd/collector/app/zipkin/protov2.go +++ b/cmd/collector/app/zipkin/zipkindeser/protov2.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "encoding/binary" @@ -24,8 +24,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -// Converts Zipkin Protobuf spans to Thrift model -func protoSpansV2ToThrift(listOfSpans *zipkinProto.ListOfSpans) ([]*zipkincore.Span, error) { +// ProtoSpansV2ToThrift converts Zipkin Protobuf spans to Thrift model +func ProtoSpansV2ToThrift(listOfSpans *zipkinProto.ListOfSpans) ([]*zipkincore.Span, error) { tSpans := make([]*zipkincore.Span, 0, len(listOfSpans.Spans)) for _, span := range listOfSpans.Spans { tSpan, err := protoSpanV2ToThrift(span) diff --git a/cmd/collector/app/zipkin/protov2_test.go b/cmd/collector/app/zipkin/zipkindeser/protov2_test.go similarity index 97% rename from cmd/collector/app/zipkin/protov2_test.go rename to cmd/collector/app/zipkin/zipkindeser/protov2_test.go index eec8c8590e5..1e4fdf1aa2e 100644 --- a/cmd/collector/app/zipkin/protov2_test.go +++ b/cmd/collector/app/zipkin/zipkindeser/protov2_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package zipkin +package zipkindeser import ( "crypto/rand" @@ -30,7 +30,7 @@ import ( func TestProtoSpanFixtures(t *testing.T) { var spans zipkinProto.ListOfSpans loadJSON(t, "fixtures/zipkin_proto_01.json", &spans) - tSpans, err := protoSpansV2ToThrift(&spans) + tSpans, err := ProtoSpansV2ToThrift(&spans) require.NoError(t, err) assert.Equal(t, len(tSpans), 1) var pid int64 = 1 @@ -57,7 +57,7 @@ func TestProtoSpanFixtures(t *testing.T) { func TestLCFromProtoSpanLocalEndpoint(t *testing.T) { var spans zipkinProto.ListOfSpans loadProto(t, "fixtures/zipkin_proto_02.json", &spans) - tSpans, err := protoSpansV2ToThrift(&spans) + tSpans, err := ProtoSpansV2ToThrift(&spans) require.NoError(t, err) assert.Equal(t, len(tSpans), 1) var ts int64 = 1 diff --git a/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks.go b/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks.go new file mode 100644 index 00000000000..2466a92e4b2 --- /dev/null +++ b/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks.go @@ -0,0 +1,49 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkindesermocks + +import "fmt" + +var ( + endpointFmt = `{"serviceName": "%s", "ipv4": "%s", "ipv6": "%s", "port": %d}` + annoFmt = `{"value": "%s", "timestamp": %d, "endpoint": %s}` + binaAnnoFmt = `{"key": "%s", "value": "%s", "endpoint": %s}` + spanFmt = `[{"name": "%s", "id": "%s", "parentId": "%s", "traceId": "%s", "timestamp": %d, "duration": %d, "debug": %t, "annotations": [%s], "binaryAnnotations": [%s]}]` +) + +// CreateEndpoint builds endpoint JSON. +func CreateEndpoint(serviceName string, ipv4 string, ipv6 string, port int) string { + return fmt.Sprintf(endpointFmt, serviceName, ipv4, ipv6, port) +} + +// CreateAnno builds annotation JSON. +func CreateAnno(val string, ts int, endpoint string) string { + return fmt.Sprintf(annoFmt, val, ts, endpoint) +} + +// CreateBinAnno builds binary annotation JSON. +func CreateBinAnno(key string, val string, endpoint string) string { + return fmt.Sprintf(binaAnnoFmt, key, val, endpoint) +} + +// CreateSpan builds span JSON. +func CreateSpan( + name, id, parentID, traceID string, + ts, duration int64, + debug bool, + anno, binAnno string, +) string { + return fmt.Sprintf(spanFmt, name, id, parentID, traceID, ts, duration, debug, anno, binAnno) +} diff --git a/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks_test.go b/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks_test.go new file mode 100644 index 00000000000..4557043e1a6 --- /dev/null +++ b/cmd/collector/app/zipkin/zipkindeser/zipkindesermocks/mocks_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkindesermocks + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Fake test to provide test coverage. +func TestMocks(t *testing.T) { + s := CreateEndpoint("serviceName", "ipv4", "ipv6", 1) + assert.Equal(t, `{"serviceName": "serviceName", "ipv4": "ipv4", "ipv6": "ipv6", "port": 1}`, s) + + s = CreateAnno("val", 100, `{"endpoint": "x"}`) + assert.Equal(t, `{"value": "val", "timestamp": 100, "endpoint": {"endpoint": "x"}}`, s) + + s = CreateBinAnno("key", "val", `{"endpoint": "x"}`) + assert.Equal(t, `{"key": "key", "value": "val", "endpoint": {"endpoint": "x"}}`, s) + + s = CreateSpan("name", "id", "pid", "tid", 100, 100, false, "anno", "binAnno") + assert.Equal(t, `[{"name": "name", "id": "id", "parentId": "pid", "traceId": "tid", "timestamp": 100, "duration": 100, "debug": false, "annotations": [anno], "binaryAnnotations": [binAnno]}]`, s) +}