Skip to content

Commit

Permalink
Introduce connectorprofiles.NewProfilesRouter (#11023)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

I missed the connector router when setting up connectorprofiles.

cc @mx-psi
  • Loading branch information
dmathieu committed Sep 3, 2024
1 parent b6a4476 commit d0fde2f
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 21 deletions.
25 changes: 25 additions & 0 deletions .chloggen/connectorprofiles-router.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connectorprofiles

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ProfilesRouterAndConsumer interface, and NewProfilesRouter method.

# One or more tracking issues or pull requests related to the change
issues: [11023]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
5 changes: 3 additions & 2 deletions connector/connectorprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ go 1.22.0

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.108.1
go.opentelemetry.io/collector/component v0.108.1
go.opentelemetry.io/collector/component/componentprofiles v0.108.1
go.opentelemetry.io/collector/connector v0.108.1
go.opentelemetry.io/collector/consumer v0.108.1
go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1
go.opentelemetry.io/collector/consumer/consumertest v0.108.1
go.opentelemetry.io/collector/pdata/pprofile v0.108.1
go.opentelemetry.io/collector/pdata/testdata v0.108.1
)

require (
Expand All @@ -19,10 +22,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector v0.108.1 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.108.1 // indirect
go.opentelemetry.io/collector/pdata v1.14.1 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.108.1 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
Expand Down
37 changes: 37 additions & 0 deletions connector/connectorprofiles/profiles_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connectorprofiles // import "go.opentelemetry.io/collector/connector/connectorprofiles"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/internal"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)

// ProfilesRouterAndConsumer feeds the first consumerprofiles.Profiles in each of the specified pipelines.
type ProfilesRouterAndConsumer interface {
consumerprofiles.Profiles
Consumer(...component.ID) (consumerprofiles.Profiles, error)
PipelineIDs() []component.ID
privateFunc()
}

type profilesRouter struct {
consumerprofiles.Profiles
internal.BaseRouter[consumerprofiles.Profiles]
}

func NewProfilesRouter(cm map[component.ID]consumerprofiles.Profiles) ProfilesRouterAndConsumer {
consumers := make([]consumerprofiles.Profiles, 0, len(cm))
for _, cons := range cm {
consumers = append(consumers, cons)
}
return &profilesRouter{
Profiles: fanoutconsumer.NewProfiles(consumers),
BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewProfiles, cm),
}
}

func (r *profilesRouter) privateFunc() {}
157 changes: 157 additions & 0 deletions connector/connectorprofiles/profiles_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connectorprofiles

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/testdata"
)

type mutatingProfilesSink struct {
*consumertest.ProfilesSink
}

func (mts *mutatingProfilesSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func TestProfilesRouterMultiplexing(t *testing.T) {
var max = 20
for numIDs := 1; numIDs < max; numIDs++ {
for numCons := 1; numCons < max; numCons++ {
for numProfiles := 1; numProfiles < max; numProfiles++ {
t.Run(
fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numProfiles),
fuzzProfiles(numIDs, numCons, numProfiles),
)
}
}
}
}

func fuzzProfiles(numIDs, numCons, numProfiles int) func(*testing.T) {
return func(t *testing.T) {
allIDs := make([]component.ID, 0, numCons)
allCons := make([]consumerprofiles.Profiles, 0, numCons)
allConsMap := make(map[component.ID]consumerprofiles.Profiles)

// If any consumer is mutating, the router must report mutating
for i := 0; i < numCons; i++ {
allIDs = append(allIDs, component.MustNewIDWithName("sink", strconv.Itoa(numCons)))
// Random chance for each consumer to be mutating
if (numCons+numProfiles+i)%4 == 0 {
allCons = append(allCons, &mutatingProfilesSink{ProfilesSink: new(consumertest.ProfilesSink)})
} else {
allCons = append(allCons, new(consumertest.ProfilesSink))
}
allConsMap[allIDs[i]] = allCons[i]
}

r := NewProfilesRouter(allConsMap)
td := testdata.GenerateProfiles(1)

// Keep track of how many logs each consumer should receive.
// This will be validated after every call to RouteProfiles.
expected := make(map[component.ID]int, numCons)

for i := 0; i < numProfiles; i++ {
// Build a random set of ids (no duplicates)
randCons := make(map[component.ID]bool, numIDs)
for j := 0; j < numIDs; j++ {
// This number should be pretty random and less than numCons
conNum := (numCons + numIDs + i + j) % numCons
randCons[allIDs[conNum]] = true
}

// Convert to slice, update expectations
conIDs := make([]component.ID, 0, len(randCons))
for id := range randCons {
conIDs = append(conIDs, id)
expected[id]++
}

// Route to list of consumers
fanout, err := r.Consumer(conIDs...)
assert.NoError(t, err)
assert.NoError(t, fanout.ConsumeProfiles(context.Background(), td))

// Validate expectations for all consumers
for id := range expected {
profiles := []pprofile.Profiles{}
switch con := allConsMap[id].(type) {
case *consumertest.ProfilesSink:
profiles = con.AllProfiles()
case *mutatingProfilesSink:
profiles = con.AllProfiles()
}
assert.Len(t, profiles, expected[id])
for n := 0; n < len(profiles); n++ {
assert.EqualValues(t, td, profiles[n])
}
}
}
}
}

func TestProfilessRouterConsumer(t *testing.T) {
ctx := context.Background()
td := testdata.GenerateProfiles(1)

fooID := component.MustNewID("foo")
barID := component.MustNewID("bar")

foo := new(consumertest.ProfilesSink)
bar := new(consumertest.ProfilesSink)
r := NewProfilesRouter(map[component.ID]consumerprofiles.Profiles{fooID: foo, barID: bar})

rcs := r.PipelineIDs()
assert.Len(t, rcs, 2)
assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs)

assert.Len(t, foo.AllProfiles(), 0)
assert.Len(t, bar.AllProfiles(), 0)

both, err := r.Consumer(fooID, barID)
assert.NotNil(t, both)
assert.NoError(t, err)

assert.NoError(t, both.ConsumeProfiles(ctx, td))
assert.Len(t, foo.AllProfiles(), 1)
assert.Len(t, bar.AllProfiles(), 1)

fooOnly, err := r.Consumer(fooID)
assert.NotNil(t, fooOnly)
assert.NoError(t, err)

assert.NoError(t, fooOnly.ConsumeProfiles(ctx, td))
assert.Len(t, foo.AllProfiles(), 2)
assert.Len(t, bar.AllProfiles(), 1)

barOnly, err := r.Consumer(barID)
assert.NotNil(t, barOnly)
assert.NoError(t, err)

assert.NoError(t, barOnly.ConsumeProfiles(ctx, td))
assert.Len(t, foo.AllProfiles(), 2)
assert.Len(t, bar.AllProfiles(), 2)

none, err := r.Consumer()
assert.Nil(t, none)
assert.Error(t, err)

fake, err := r.Consumer(component.MustNewID("fake"))
assert.Nil(t, fake)
assert.Error(t, err)
}
20 changes: 10 additions & 10 deletions connector/router.go → connector/internal/router.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connector // import "go.opentelemetry.io/collector/connector"
package internal // import "go.opentelemetry.io/collector/connector/internal"

import (
"fmt"
Expand All @@ -11,36 +11,36 @@ import (
"go.opentelemetry.io/collector/component"
)

type baseRouter[T any] struct {
type BaseRouter[T any] struct {
fanout func([]T) T
consumers map[component.ID]T
Consumers map[component.ID]T
}

func newBaseRouter[T any](fanout func([]T) T, cm map[component.ID]T) baseRouter[T] {
func NewBaseRouter[T any](fanout func([]T) T, cm map[component.ID]T) BaseRouter[T] {
consumers := make(map[component.ID]T, len(cm))
for k, v := range cm {
consumers[k] = v
}
return baseRouter[T]{fanout: fanout, consumers: consumers}
return BaseRouter[T]{fanout: fanout, Consumers: consumers}
}

func (r *baseRouter[T]) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
func (r *BaseRouter[T]) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.Consumers))
for id := range r.Consumers {
ids = append(ids, id)
}
return ids
}

func (r *baseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) {
func (r *BaseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) {
var ret T
if len(pipelineIDs) == 0 {
return ret, fmt.Errorf("missing consumers")
}
consumers := make([]T, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
c, ok := r.Consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
Expand Down
11 changes: 6 additions & 5 deletions connector/logs_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/internal"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)
Expand All @@ -23,7 +24,7 @@ type LogsRouterAndConsumer interface {

type logsRouter struct {
consumer.Logs
baseRouter[consumer.Logs]
internal.BaseRouter[consumer.Logs]
}

func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer {
Expand All @@ -33,13 +34,13 @@ func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer {
}
return &logsRouter{
Logs: fanoutconsumer.NewLogs(consumers),
baseRouter: newBaseRouter(fanoutconsumer.NewLogs, cm),
BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewLogs, cm),
}
}

func (r *logsRouter) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
ids := make([]component.ID, 0, len(r.Consumers))
for id := range r.Consumers {
ids = append(ids, id)
}
return ids
Expand All @@ -52,7 +53,7 @@ func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error
consumers := make([]consumer.Logs, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
c, ok := r.Consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
Expand Down
5 changes: 3 additions & 2 deletions connector/metrics_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package connector // import "go.opentelemetry.io/collector/connector"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/internal"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)
Expand All @@ -19,7 +20,7 @@ type MetricsRouterAndConsumer interface {

type metricsRouter struct {
consumer.Metrics
baseRouter[consumer.Metrics]
internal.BaseRouter[consumer.Metrics]
}

func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndConsumer {
Expand All @@ -29,7 +30,7 @@ func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndCons
}
return &metricsRouter{
Metrics: fanoutconsumer.NewMetrics(consumers),
baseRouter: newBaseRouter(fanoutconsumer.NewMetrics, cm),
BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewMetrics, cm),
}
}

Expand Down
5 changes: 3 additions & 2 deletions connector/traces_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package connector // import "go.opentelemetry.io/collector/connector"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/internal"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)
Expand All @@ -19,7 +20,7 @@ type TracesRouterAndConsumer interface {

type tracesRouter struct {
consumer.Traces
baseRouter[consumer.Traces]
internal.BaseRouter[consumer.Traces]
}

func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsumer {
Expand All @@ -29,7 +30,7 @@ func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsume
}
return &tracesRouter{
Traces: fanoutconsumer.NewTraces(consumers),
baseRouter: newBaseRouter(fanoutconsumer.NewTraces, cm),
BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewTraces, cm),
}
}

Expand Down

0 comments on commit d0fde2f

Please sign in to comment.