Skip to content

Commit

Permalink
setup collector receiver
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Rühsen <tim.ruhsen@elastic.co>
  • Loading branch information
dmathieu and rockdaboot committed Sep 19, 2024
1 parent 0a8979a commit a639d0a
Show file tree
Hide file tree
Showing 7 changed files with 821 additions and 27 deletions.
45 changes: 45 additions & 0 deletions collector/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package collector

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverprofiles"

"github.com/open-telemetry/opentelemetry-ebpf-profiler/collector/internal"
)

var (
typeStr = component.MustNewType("otelreceiver")

errInvalidConfig = errors.New("invalid config")
)

// NewFactory creates a factory for the receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
internal.CreateDefaultConfig,
receiverprofiles.WithProfiles(createProfilesReceiver, component.StabilityLevelAlpha))
}

func createProfilesReceiver(
_ context.Context,
params receiver.Settings, //nolint:gocritic // we must respect the collector API
baseCfg component.Config,
nextConsumer consumerprofiles.Profiles) (receiverprofiles.Profiles, error) {
logger := params.Logger
cfg, ok := baseCfg.(*internal.Config)
if !ok {
return nil, errInvalidConfig
}

rcvr := internal.NewController(logger, nextConsumer, cfg)
return rcvr, nil
}
50 changes: 50 additions & 0 deletions collector/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package collector

import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-ebpf-profiler/collector/internal"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestNewFactory(t *testing.T) {
f := NewFactory()
require.NotNil(t, f)
}

func TestCreateProfilesReceiver(t *testing.T) {
for _, tt := range []struct {
name string
config component.Config

wantError error
}{
{
name: "Default config",
config: internal.CreateDefaultConfig(),
},
{
name: "Nil config",
wantError: errInvalidConfig,
},
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

_, err := createProfilesReceiver(
context.Background(),
receivertest.NewNopSettings(),
tt.config,
consumertest.NewNop(),
)
require.ErrorIs(t, err, tt.wantError)
})
}
}
95 changes: 95 additions & 0 deletions collector/internal/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal

import (
"errors"
"fmt"
"time"

log "github.com/sirupsen/logrus"
"github.com/tklauser/numcpus"
"go.opentelemetry.io/collector/component"

tracertypes "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracer/types"
)

const (
defaultProjectID = "1"
defaultHostID = 0x1234
)

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
ProjectID string `mapstructure:"project-id"`
HostID uint64 `mapstructure:"host-id"`
SecretToken string `mapstructure:"secret-token"`
CollectionAgent string `mapstructure:"collection-agent"`
Tracers string `mapstructure:"tracers"`
Tags string `mapstructure:"tags"`
BpfVerifierLogSize int `mapstructure:"bpf-log-size"`
BpfVerifierLogLevel uint `mapstructure:"bpf-log-level"`
MapScaleFactor uint8 `mapstructure:"map-scale-factor"`
Verbose bool `mapstructure:"verbose"`
DisableTLS bool `mapstructure:"disable-tls"`
NoKernelVersionCheck bool `mapstructure:"no-kernel-version-check"`
ProbabilisticThreshold uint `mapstructure:"probabilistic-threshold"`
ProbabilisticInterval time.Duration `mapstructure:"probabilistic-interval"`
EnvironmentType string `mapstructure:"environment-type"`
ReporterInterval time.Duration `mapstructure:"reporter-interval"`
MonitorInterval time.Duration `mapstructure:"monitor-interval"`
SamplesPerSecond int `mapstructure:"samples-per-second"`
SendErrorFrames bool `mapstructure:"send-error-frames"`

// Written in CreateDefaultConfig()
PresentCPUCores int
}

// Validate checks if the receiver configuration is valid.
func (cfg *Config) Validate() error {
if cfg.ReporterInterval.Seconds() < 1 {
return errors.New("the interval has to be set to at least 1 second (1s)")
}

if cfg.ProjectID == "" {
return errors.New("projectid must be set")
}

if cfg.PresentCPUCores <= 0 {
return errors.New("failed to determine number of CPUs")
}

if cfg.SamplesPerSecond <= 0 {
return errors.New("samples per second must be > 0")
}

if _, err := tracertypes.Parse(cfg.Tracers); err != nil {
return fmt.Errorf("failed to parse tracers '%s': %v", cfg.Tracers, err)
}

// todo: Add more validation

return nil
}

func CreateDefaultConfig() component.Config {
presentCores, err := numcpus.GetPresent()
if err != nil {
log.Errorf("Failed to read CPU file: %v", err)
}

// todo: export default values (currently in main.go)
return &Config{
ProjectID: defaultProjectID,
HostID: defaultHostID,
ReporterInterval: 5 * time.Second,
MonitorInterval: 5 * time.Second,
SamplesPerSecond: 20,
ProbabilisticInterval: 1 * time.Minute,
ProbabilisticThreshold: 100,
CollectionAgent: "127.0.0.1:11000", // devfiler
Tracers: "all",
PresentCPUCores: presentCores,
}
}
Loading

0 comments on commit a639d0a

Please sign in to comment.