Skip to content

Commit

Permalink
Fix duplicate logs across resources (#5803)
Browse files Browse the repository at this point in the history
1. Create scope map with resource key to map the correct log record. 
2. Add test case with different resource and scope combination

Fixes #5782 

### Benchmarks

```
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform
               │   old.txt   │               new.txt               │
               │   sec/op    │   sec/op     vs base                │
ResourceLogs-8   3.266µ ± 3%   1.100µ ± 5%  -66.33% (p=0.000 n=10)

               │   old.txt    │               new.txt                │
               │     B/op     │     B/op      vs base                │
ResourceLogs-8   8.297Ki ± 0%   2.430Ki ± 0%  -70.72% (p=0.000 n=10)

               │   old.txt   │              new.txt               │
               │  allocs/op  │ allocs/op   vs base                │
ResourceLogs-8   178.00 ± 0%   52.00 ± 0%  -70.79% (p=0.000 n=10)
```

---------

Co-authored-by: Sam Xie <sam@samxie.me>
  • Loading branch information
pree-dew and XSAM committed Sep 17, 2024
1 parent 42fd8fe commit 534ce5a
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 606 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)
- Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5803)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
99 changes: 38 additions & 61 deletions exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform"

import (
"sync"
"time"

cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand All @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
return nil
}

resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
defer func() {
clear(resMap)
resourceLogsMapPool.Put(resMap)
}()
resourceLogsMap(&resMap, records)
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)

out := make([]*lpb.ResourceLogs, 0, len(resMap))
for _, rl := range resMap {
out = append(out, rl)
type key struct {
r attribute.Distinct
is instrumentation.Scope
}
return out
}

var resourceLogsMapPool = sync.Pool{
New: func() any {
return make(map[attribute.Distinct]*lpb.ResourceLogs)
},
}
scopeMap := make(map[key]*lpb.ScopeLogs)

func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
var resources int
for _, r := range records {
res := r.Resource()
rl, ok := (*dst)[res.Equivalent()]
if !ok {
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
(*dst)[res.Equivalent()] = rl
}
rl.ScopeLogs = ScopeLogs(records)
}
}

// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
defer func() {
clear(scopeMap)
scopeLogsMapPool.Put(scopeMap)
}()
scopeLogsMap(&scopeMap, records)

out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
for _, sl := range scopeMap {
out = append(out, sl)
}
return out
}

var scopeLogsMapPool = sync.Pool{
New: func() any {
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
},
}

func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
for _, r := range records {
rKey := res.Equivalent()
scope := r.InstrumentationScope()
sl, ok := (*dst)[scope]
if !ok {
k := key{
r: rKey,
is: scope,
}
sl, iOk := scopeMap[k]
if !iOk {
sl = new(lpb.ScopeLogs)
var emptyScope instrumentation.Scope
if scope != emptyScope {
Expand All @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
}
sl.SchemaUrl = scope.SchemaURL
}
(*dst)[scope] = sl
scopeMap[k] = sl
}

sl.LogRecords = append(sl.LogRecords, LogRecord(r))
rl, rOk := resMap[rKey]
if !rOk {
resources++
rl = new(lpb.ResourceLogs)
if res.Len() > 0 {
rl.Resource = &rpb.Resource{
Attributes: AttrIter(res.Iter()),
}
}
rl.SchemaUrl = res.SchemaURL()
resMap[rKey] = rl
}
if !iOk {
rl.ScopeLogs = append(rl.ScopeLogs, sl)
}
}

// Transform the categorized map into a slice
resLogs := make([]*lpb.ResourceLogs, 0, resources)
for _, rl := range resMap {
resLogs = append(resLogs, rl)
}

return resLogs
}

// LogRecord returns an OTLP LogRecord generated from record.
Expand Down
Loading

0 comments on commit 534ce5a

Please sign in to comment.