Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service] Make graph package public #8111

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/mx-psi_graph.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Makes `pipelines` package public to allow overrriding telemetry providers"

# One or more tracking issues or pull requests related to the change
issues: [4970]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 2 additions & 2 deletions service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/graph"
"go.opentelemetry.io/collector/service/pipelines"
)

var _ component.Host = (*serviceHost)(nil)
Expand All @@ -26,7 +26,7 @@ type serviceHost struct {

buildInfo component.BuildInfo

pipelines *graph.Graph
pipelines *pipelines.Pipelines
serviceExtensions *extensions.Extensions
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"
package pipelines // import "go.opentelemetry.io/collector/service/pipelines"

import (
"context"
Expand Down
122 changes: 64 additions & 58 deletions service/internal/graph/graph.go → service/pipelines/pipelines.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"
package pipelines // import "go.opentelemetry.io/collector/service/pipelines"

import (
"context"
Expand All @@ -22,74 +22,78 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/pipelines"
)

// Settings holds configuration for building builtPipelines.
// Settings holds configuration for building a Graph.
type Settings struct {
// Telemetry specifies the telemetry settings.
Telemetry component.TelemetrySettings
// BuildInfo provides collector start information.
BuildInfo component.BuildInfo

ReceiverBuilder *receiver.Builder
// ReceiverBuilder is a helper struct that given a set of Configs and Factories helps with creating receivers.
ReceiverBuilder *receiver.Builder
// ProcessorBuilder is a helper struct that given a set of Configs and Factories helps with creating processors.
ProcessorBuilder *processor.Builder
ExporterBuilder *exporter.Builder
// ExporterBuilder is a helper struct that given a set of Configs and Factories helps with creating exporters.
ExporterBuilder *exporter.Builder
// ConnectorBuilder is a helper struct that given a set of Configs and Factories helps with creating connectors.
ConnectorBuilder *connector.Builder

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs pipelines.Config
}

type Graph struct {
// Pipelines of collector components.
type Pipelines struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[component.ID]*pipelineNodes
}

func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
// New builds a collector pipeline set.
func New(ctx context.Context, set Settings, cfg Config) (*Pipelines, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mildly confused by the PR because the Config type is already in the new package (and it's not a struct, like most Config types are in this code base). Then, there is now a Settings alongside Config and in the context of this PR it's no longer clear why we have both a settings and a config type. Can you explain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's no longer clear why we have both a settings and a config type

In the opentelemetry-collector codebase, Config types specify configuration that would be set by the user in the Collector YAML configuration, while Settings specifies configuration that would be set only in the context of using the Go API directly.

This change was applied because of @bogdandrutu's suggestion here #8111 (comment) to make it in line with the extensions.

it's not a struct, like most Config types are in this code base

I think the important point of Config types is that they can be unmarshaled; I am not sure if there are other examples of configuration not being a struct but in my mind this is not a problem.

pipelines := &Pipelines{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
pipelines: make(map[component.ID]*pipelineNodes, len(cfg)),
}
for pipelineID := range set.PipelineConfigs {
for pipelineID := range cfg {
pipelines.pipelines[pipelineID] = &pipelineNodes{
receivers: make(map[int64]graph.Node),
exporters: make(map[int64]graph.Node),
}
}
if err := pipelines.createNodes(set); err != nil {
if err := pipelines.createNodes(set, cfg); err != nil {
return nil, err
}
pipelines.createEdges()
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
func (g *Graph) createNodes(set Settings) error {
func (p *Pipelines) createNodes(set Settings, cfg Config) error {
// Build a list of all connectors for easy reference
connectors := make(map[component.ID]struct{})

// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for pipelineID, pipelineCfg := range cfg {
pipe := p.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
if set.ConnectorBuilder.IsConfigured(recvID) {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
rcvrNode := g.createReceiver(pipelineID.Type(), recvID)
rcvrNode := p.createReceiver(pipelineID.Type(), recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}

pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)

for _, procID := range pipelineCfg.Processors {
pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID))
pipe.processors = append(pipe.processors, p.createProcessor(pipelineID, procID))
}

pipe.fanOutNode = newFanOutNode(pipelineID)
Expand All @@ -100,7 +104,7 @@ func (g *Graph) createNodes(set Settings) error {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
expNode := g.createExporter(pipelineID.Type(), exprID)
expNode := p.createExporter(pipelineID.Type(), exprID)
pipe.exporters[expNode.ID()] = expNode
}
}
Expand Down Expand Up @@ -155,93 +159,93 @@ func (g *Graph) createNodes(set Settings) error {
// Connector is not supported for this combination, but we know it is used correctly elsewhere
continue
}
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
connNode := p.createConnector(eID, rID, connID)
p.pipelines[eID].exporters[connNode.ID()] = connNode
p.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
return nil
}

func (g *Graph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode {
func (p *Pipelines) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineType, recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
if node := p.componentGraph.Node(rcvrNode.ID()); node != nil {
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
p.componentGraph.AddNode(rcvrNode)
return rcvrNode
}

func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode {
func (p *Pipelines) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
p.componentGraph.AddNode(procNode)
return procNode
}

func (g *Graph) createExporter(pipelineType component.DataType, exprID component.ID) *exporterNode {
func (p *Pipelines) createExporter(pipelineType component.DataType, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineType, exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
if node := p.componentGraph.Node(expNode.ID()); node != nil {
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
p.componentGraph.AddNode(expNode)
return expNode
}

func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
func (p *Pipelines) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
if node := p.componentGraph.Node(connNode.ID()); node != nil {
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
p.componentGraph.AddNode(connNode)
return connNode
}

func (g *Graph) createEdges() {
for _, pg := range g.pipelines {
func (p *Pipelines) createEdges() {
for _, pg := range p.pipelines {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
p.componentGraph.SetEdge(p.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
p.componentGraph.SetEdge(p.componentGraph.NewEdge(from, to))
from = processor
}
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
p.componentGraph.SetEdge(p.componentGraph.NewEdge(from, to))

for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
p.componentGraph.SetEdge(p.componentGraph.NewEdge(pg.fanOutNode, exporter))
}
}
}

func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
nodes, err := topo.Sort(g.componentGraph)
func (p *Pipelines) buildComponents(ctx context.Context, set Settings) error {
nodes, err := topo.Sort(p.componentGraph)
if err != nil {
return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
return cycleErr(err, topo.DirectedCyclesIn(p.componentGraph))
}

for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, p.nextConsumers(n.ID()))
case *processorNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, p.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
case *connectorNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, p.nextConsumers(n.ID()))
case *capabilitiesNode:
capability := consumer.Capabilities{MutatesData: false}
for _, proc := range g.pipelines[n.pipelineID].processors {
for _, proc := range p.pipelines[n.pipelineID].processors {
capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData
}
next := g.nextConsumers(n.ID())[0]
next := p.nextConsumers(n.ID())[0]
switch n.pipelineID.Type() {
case component.DataTypeTraces:
cc := capabilityconsumer.NewTraces(next.(consumer.Traces), capability)
Expand All @@ -257,7 +261,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
n.ConsumeLogsFunc = cc.ConsumeLogs
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
nexts := p.nextConsumers(n.ID())
switch n.pipelineID.Type() {
case component.DataTypeTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
Expand Down Expand Up @@ -288,8 +292,8 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
}

// Find all nodes
func (g *Graph) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := g.componentGraph.From(nodeID)
func (p *Pipelines) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := p.componentGraph.From(nodeID)
nexts := make([]baseConsumer, 0, nextNodes.Len())
for nextNodes.Next() {
nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
Expand All @@ -316,8 +320,9 @@ type pipelineNodes struct {
exporters map[int64]graph.Node
}

func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
// StartAll components in the graph in topological order.
func (p *Pipelines) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(p.componentGraph)
if err != nil {
return err
}
Expand All @@ -338,8 +343,9 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
return nil
}

func (g *Graph) ShutdownAll(ctx context.Context) error {
nodes, err := topo.Sort(g.componentGraph)
// ShutdownAll components in the graph in topological order.
func (p *Pipelines) ShutdownAll(ctx context.Context) error {
nodes, err := topo.Sort(p.componentGraph)
if err != nil {
return err
}
Expand All @@ -366,16 +372,16 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
// connector. See https://github.com/open-telemetry/opentelemetry-collector/issues/7370 and
// https://github.com/open-telemetry/opentelemetry-collector/pull/7390#issuecomment-1483710184
// for additional information.
func (g *Graph) GetExporters() map[component.DataType]map[component.ID]component.Component {
func (p *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Component {
exportersMap := make(map[component.DataType]map[component.ID]component.Component)
exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Component)
exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Component)
exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Component)

for _, pg := range g.pipelines {
for _, pg := range p.pipelines {
for _, expNode := range pg.exporters {
// Skip connectors, otherwise individual components can introduce cycles
if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok {
if expNode, ok := p.componentGraph.Node(expNode.ID()).(*exporterNode); ok {
exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component
}
}
Expand Down
Loading
Loading