This repository has been archived by the owner on Nov 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
OutgoingPhysicalMessageDiagnostics.cs
80 lines (71 loc) · 2.96 KB
/
OutgoingPhysicalMessageDiagnostics.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.Pipeline;
namespace NServiceBus.Extensions.Diagnostics
{
public class OutgoingPhysicalMessageDiagnostics : Behavior<IOutgoingPhysicalMessageContext>
{
private readonly IActivityEnricher _activityEnricher;
private readonly DiagnosticListener _diagnosticListener;
private const string EventName = ActivityNames.OutgoingPhysicalMessage + ".Sent";
public OutgoingPhysicalMessageDiagnostics(IActivityEnricher activityEnricher)
: this(new DiagnosticListener(ActivityNames.OutgoingPhysicalMessage), activityEnricher)
{
}
public OutgoingPhysicalMessageDiagnostics(DiagnosticListener diagnosticListener,
IActivityEnricher activityEnricher)
{
_diagnosticListener = diagnosticListener;
_activityEnricher = activityEnricher;
}
public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
{
var activity = context.Extensions.TryGet<ICurrentActivity>(out var currentActivity)
? currentActivity.Current
: Activity.Current;
if (activity != null)
{
_activityEnricher.Enrich(activity, context);
InjectHeaders(activity, context);
}
await next().ConfigureAwait(false);
if (_diagnosticListener.IsEnabled(EventName))
{
_diagnosticListener.Write(EventName, context);
}
}
private static void InjectHeaders(Activity activity, IOutgoingPhysicalMessageContext context)
{
if (activity.IdFormat == ActivityIdFormat.W3C)
{
if (!context.Headers.ContainsKey(Headers.TraceParentHeaderName))
{
context.Headers[Headers.TraceParentHeaderName] = activity.Id;
if (activity.TraceStateString != null)
{
context.Headers[Headers.TraceStateHeaderName] = activity.TraceStateString;
}
}
}
else
{
if (!context.Headers.ContainsKey(Headers.RequestIdHeaderName))
{
context.Headers[Headers.RequestIdHeaderName] = activity.Id;
}
}
if (!context.Headers.ContainsKey(Headers.CorrelationContextHeaderName)
&& !context.Headers.ContainsKey(Headers.BaggageHeaderName))
{
var baggage = string.Join(",", activity.Baggage.Select(item => $"{item.Key}={item.Value}"));
if (!string.IsNullOrEmpty(baggage))
{
context.Headers[Headers.CorrelationContextHeaderName] = baggage;
context.Headers[Headers.BaggageHeaderName] = baggage;
}
}
}
}
}