Skip to content

Commit

Permalink
Add support to convert loggregator events to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jriguera committed Jul 15, 2024
1 parent 0096c24 commit 9867125
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 7 deletions.
45 changes: 44 additions & 1 deletion src/pkg/otelcolclient/otelcolclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,16 @@ type Client struct {
b *SignalBatcher
// Forward timers as traces
emitTraces bool
// Forward events as logs
emitEvents bool
}

// New creates a new Client that will batch metrics and logs.
func New(w Writer, emitTraces bool) *Client {
func New(w Writer, emitTraces bool, emitEvents bool) *Client {
return &Client{
b: NewSignalBatcher(100, 100*time.Millisecond, w),
emitTraces: emitTraces,
emitEvents: emitEvents,
}
}

Expand All @@ -138,6 +141,8 @@ func (c *Client) Write(e *loggregator_v2.Envelope) error {
c.writeTimer(e)
case *loggregator_v2.Envelope_Log:
c.writeLog(e)
case *loggregator_v2.Envelope_Event:
c.writeEvent(e)
}
return nil
}
Expand Down Expand Up @@ -180,6 +185,44 @@ func (c *Client) writeLog(e *loggregator_v2.Envelope) {
})
}

func (c *Client) writeEvent(e *loggregator_v2.Envelope) {
if !c.emitEvents {
return
}
atts := attributes(e)
body := e.GetEvent().GetBody()
title := e.GetEvent().GetTitle()
c.b.WriteLog(&logspb.ResourceLogs{
ScopeLogs: []*logspb.ScopeLogs{
{
LogRecords: []*logspb.LogRecord{
{
TimeUnixNano: uint64(e.GetTimestamp()),
Attributes: atts,
ObservedTimeUnixNano: uint64(time.Now().UnixNano()),
Body: &commonpb.AnyValue{
Value: &commonpb.AnyValue_KvlistValue{
KvlistValue: &commonpb.KeyValueList{
Values: []*commonpb.KeyValue{
{
Key: "title",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: title}},
},
{
Key: "body",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: body}},
},
},
},
},
},
},
},
},
},
})
}

// writeCounter translates a loggregator v2 Counter to OTLP and adds the metric to the pending batch.
func (c *Client) writeCounter(e *loggregator_v2.Envelope) {
atts := attributes(e)
Expand Down
78 changes: 72 additions & 6 deletions src/pkg/otelcolclient/otelcolclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var _ = Describe("Client", func() {
100*time.Millisecond,
w,
)
c = Client{b: b, emitTraces: true}
c = Client{b: b, emitTraces: true, emitEvents: true}
})

AfterEach(func() {
Expand Down Expand Up @@ -878,18 +878,84 @@ var _ = Describe("Client", func() {
})
})

Context("when given an event", func() {
Context("when given event", func() {
BeforeEach(func() {
envelope = &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}
envelope = &loggregator_v2.Envelope{
Timestamp: 1257894000000000000,
SourceId: "fake-source-id",
InstanceId: "fake-instance-id",
Tags: map[string]string{
"origin": "fake-origin.some-vm",
},
Message: &loggregator_v2.Envelope_Event{
Event: &loggregator_v2.Event{
Title: "event title",
Body: "event body",
},
},
}
})

It("returns nil", func() {
Expect(returnedErr).NotTo(HaveOccurred())
})

It("does nothing", func() {
Expect(spyMSC.requests).NotTo(Receive())
Consistently(buf.Contents()).Should(HaveLen(0))
It("emits an event log", func() {
var lsr *collogspb.ExportLogsServiceRequest
Expect(spyLSC.requests).To(Receive(&lsr))

expectedReq := &collogspb.ExportLogsServiceRequest{
ResourceLogs: []*logspb.ResourceLogs{
{
ScopeLogs: []*logspb.ScopeLogs{
{
LogRecords: []*logspb.LogRecord{
{
ObservedTimeUnixNano: uint64(time.Now().UnixNano()),
TimeUnixNano: uint64(1257894000000000000),
Body: &commonpb.AnyValue{
Value: &commonpb.AnyValue_KvlistValue{
KvlistValue: &commonpb.KeyValueList{
Values: []*commonpb.KeyValue{
{
Key: "title",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "event title"}},
},
{
Key: "body",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "event body"}},
},
},
},
},
},
Attributes: []*commonpb.KeyValue{
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
},
{
Key: "origin",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
},
},
},
},
},
},
},
},
}
dict := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
Expect(lsr.ResourceLogs[0].ScopeLogs[0].LogRecords[0].ObservedTimeUnixNano).NotTo(BeZero())
expectedReq.ResourceLogs[0].ScopeLogs[0].LogRecords[0].ObservedTimeUnixNano = lsr.ResourceLogs[0].ScopeLogs[0].LogRecords[0].ObservedTimeUnixNano
Expect(cmp.Diff(lsr, expectedReq, protocmp.Transform(), dict)).To(BeEmpty())
})
})

Expand Down

0 comments on commit 9867125

Please sign in to comment.