From 086a87864ecbbb79744f628e32a41f7716a09ae7 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 3 Oct 2018 11:57:48 -0400 Subject: [PATCH] Simplify the logic in the opencensus interceptor. (#58) --- interceptor/opencensus/opencensus.go | 49 +++++++++++++--------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index b04d719a3c8..1184930fc03 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -62,17 +62,6 @@ var errTraceExportProtocolViolation = errors.New("protocol violation: Export's f // Export is the gRPC method that receives streamed traces from // OpenCensus-traceproto compatible libraries/applications. func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error { - // The first message MUST have a non-nil Node. - firstMessage, err := tes.Recv() - if err != nil { - return err - } - - if firstMessage.Node == nil { - return errTraceExportProtocolViolation - } - - // Now that we've got the node, we can start to receive streamed up spans. // The bundler will receive batches of spans i.e. []*tracepb.Span traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting) spanBufferPeriod := oci.spanBufferPeriod @@ -88,30 +77,36 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err traceBundler.DelayThreshold = spanBufferPeriod traceBundler.BundleCountThreshold = spanBufferCount - var lastNonNilNode *commonpb.Node = firstMessage.Node + // The first message MUST have a non-nil Node. + recv, err := tes.Recv() + if err != nil { + return err + } - // If the firstMessage has spans, we MUST add them - // See https://github.com/census-instrumentation/opencensus-service/issues/51 - if len(firstMessage.Spans) > 0 { - firstPayload := &spansAndNode{node: lastNonNilNode, spans: firstMessage.Spans} - traceBundler.Add(firstPayload, len(firstPayload.spans)) + // Check the condition that the first message has a non-nil Node. + if recv.Node == nil { + return errTraceExportProtocolViolation } - for { - recv, err := tes.Recv() - if err != nil { - return err - } + var lastNonNilNode *commonpb.Node + // Now that we've got the first message with a Node, we can start to receive streamed up spans. + for { // If a Node has been sent from downstream, save and use it. - node := recv.Node - if node != nil { - lastNonNilNode = node + if recv.Node != nil { + lastNonNilNode = recv.Node } // Otherwise add them to the bundler. - bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} - traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + if len(recv.Spans) > 0 { + bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} + traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + } + + recv, err = tes.Recv() + if err != nil { + return err + } } }