Skip to content

Commit

Permalink
Simplify the logic in the opencensus interceptor. (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdan Drutu committed Oct 3, 2018
1 parent d524edb commit 086a878
Showing 1 changed file with 22 additions and 27 deletions.
49 changes: 22 additions & 27 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,6 @@ var errTraceExportProtocolViolation = errors.New("protocol violation: Export's f
// Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications.
func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error {
// The first message MUST have a non-nil Node.
firstMessage, err := tes.Recv()
if err != nil {
return err
}

if firstMessage.Node == nil {
return errTraceExportProtocolViolation
}

// Now that we've got the node, we can start to receive streamed up spans.
// The bundler will receive batches of spans i.e. []*tracepb.Span
traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting)
spanBufferPeriod := oci.spanBufferPeriod
Expand All @@ -88,30 +77,36 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
traceBundler.DelayThreshold = spanBufferPeriod
traceBundler.BundleCountThreshold = spanBufferCount

var lastNonNilNode *commonpb.Node = firstMessage.Node
// The first message MUST have a non-nil Node.
recv, err := tes.Recv()
if err != nil {
return err
}

// If the firstMessage has spans, we MUST add them
// See https://github.com/census-instrumentation/opencensus-service/issues/51
if len(firstMessage.Spans) > 0 {
firstPayload := &spansAndNode{node: lastNonNilNode, spans: firstMessage.Spans}
traceBundler.Add(firstPayload, len(firstPayload.spans))
// Check the condition that the first message has a non-nil Node.
if recv.Node == nil {
return errTraceExportProtocolViolation
}

for {
recv, err := tes.Recv()
if err != nil {
return err
}
var lastNonNilNode *commonpb.Node

// Now that we've got the first message with a Node, we can start to receive streamed up spans.
for {
// If a Node has been sent from downstream, save and use it.
node := recv.Node
if node != nil {
lastNonNilNode = node
if recv.Node != nil {
lastNonNilNode = recv.Node
}

// Otherwise add them to the bundler.
bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
if len(recv.Spans) > 0 {
bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}

recv, err = tes.Recv()
if err != nil {
return err
}
}
}

Expand Down

0 comments on commit 086a878

Please sign in to comment.