Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/ecsobserver] Add exporter to convert task to target #3333

Merged
merged 7 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions extension/observer/ecsobserver/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"errors"

"github.com/aws/aws-sdk-go/aws"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/errctx"
)

// error.go defines common error interfaces and util methods for generating reports
// for log and metrics that can be used for debugging.

const (
errKeyTask = "task"
errKeyTarget = "target"
)

type errWithAttributes interface {
// message does not include attributes like task arn etc.
// and expect the caller extract them using getters.
message() string
// zapFields will be logged as json attribute and allows searching and filter backend like cloudwatch.
// For example { $.ErrScope == "Target" } list all the error whose scope is a (scrape) target.
zapFields() []zap.Field
}

func printErrors(logger *zap.Logger, err error) {
merr := multierr.Errors(err)
if merr == nil {
return
}

for _, err := range merr {
m := err.Error()
// Use the short message, this makes searching the code via error message easier
// as additional info are flushed as fields.
var errAttr errWithAttributes
if errors.As(err, &errAttr) {
m = errAttr.message()
}
fields, scope := extractErrorFields(err)
fields = append(fields, zap.String("ErrScope", scope))
logger.Error(m, fields...)
}
}

func extractErrorFields(err error) ([]zap.Field, string) {
var fields []zap.Field
scope := "Unknown"
var errAttr errWithAttributes
// Stop early because we are only attaching value for our internal errors.
if !errors.As(err, &errAttr) {
return fields, scope
}
fields = errAttr.zapFields()
v, ok := errctx.ValueFrom(err, errKeyTask)
if ok {
// Rename ok to tok because linter says it shadows outer ok.
// Though the linter seems to allow the similar block to shadow...
if task, tok := v.(*Task); tok {
fields = append(fields, zap.String("TaskArn", aws.StringValue(task.Task.TaskArn)))
scope = "Task"
}
}
v, ok = errctx.ValueFrom(err, errKeyTarget)
if ok {
if target, ok := v.(MatchedTarget); ok {
// TODO: change to string once another PR for matcher got merged
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3386 defines Stringer
fields = append(fields, zap.Int("MatcherType", int(target.MatcherType)))
scope = "Target"
}
}
return fields, scope
}
28 changes: 28 additions & 0 deletions extension/observer/ecsobserver/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"testing"

"go.uber.org/zap"
)

func TestSetInvalidError(t *testing.T) {
printErrors(zap.NewExample(), nil) // you know, for coverage
// The actual test cen be found in the following locations:
//
// exporter_test.go where we filter logs by error scope
}
117 changes: 116 additions & 1 deletion extension/observer/ecsobserver/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@

package ecsobserver

import "fmt"
import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/errctx"
)

const (
defaultMetricsPath = "/metrics"
)

// CommonExporterConfig should be embedded into filter config.
// They set labels like job, metrics_path etc. that can override prometheus default.
Expand Down Expand Up @@ -45,3 +57,106 @@ type commonExportSetting struct {
func (s *commonExportSetting) hasContainerPort(containerPort int) bool {
return s.metricsPorts[containerPort]
}

// taskExporter converts annotated Task into PrometheusECSTarget.
type taskExporter struct {
logger *zap.Logger
cluster string
}

func newTaskExporter(logger *zap.Logger, cluster string) *taskExporter {
return &taskExporter{
logger: logger,
cluster: cluster,
}
}

// exportTasks loops a list of tasks and export prometheus scrape targets.
// It keeps track of error but does NOT stop when error occurs.
// The returned targets are valid, invalid targets are saved in a multi error.
// Caller can ignore the error because the only source is failing to get ip and port.
// The error(s) can generates debug log or metrics.
// To print the error with its task as context, use printExporterErrors.
func (e *taskExporter) exportTasks(tasks []*Task) ([]PrometheusECSTarget, error) {
var merr error
var allTargets []PrometheusECSTarget
for _, t := range tasks {
targets, err := e.exportTask(t)
multierr.AppendInto(&merr, err) // if err == nil, AppendInto does nothing
// Even if there are error, returned targets are still valid.
allTargets = append(allTargets, targets...)
}
return allTargets, merr
}

// exportTask exports all the matched container within a single task.
// One task can contain multiple containers. One container can have more than one target
// if there are multiple ports in `metrics_port`.
func (e *taskExporter) exportTask(task *Task) ([]PrometheusECSTarget, error) {
// All targets in one task shares same IP.
privateIP, err := task.PrivateIP()
if err != nil {
return nil, errctx.WithValue(err, errKeyTask, task)
}

// Base for all the containers in this task, most attributes are same.
baseTarget := PrometheusECSTarget{
Source: aws.StringValue(task.Task.TaskArn),
MetricsPath: defaultMetricsPath,
ClusterName: e.cluster,
TaskDefinitionFamily: aws.StringValue(task.Definition.Family),
TaskDefinitionRevision: int(aws.Int64Value(task.Definition.Revision)),
TaskStartedBy: aws.StringValue(task.Task.StartedBy),
TaskLaunchType: aws.StringValue(task.Task.LaunchType),
TaskGroup: aws.StringValue(task.Task.Group),
TaskTags: task.TaskTags(),
HealthStatus: aws.StringValue(task.Task.HealthStatus),
}
if task.Service != nil {
baseTarget.ServiceName = aws.StringValue(task.Service.ServiceName)
}
if task.EC2 != nil {
ec2 := task.EC2
baseTarget.EC2InstanceID = aws.StringValue(ec2.InstanceId)
baseTarget.EC2InstanceType = aws.StringValue(ec2.InstanceType)
baseTarget.EC2Tags = task.EC2Tags()
baseTarget.EC2VpcID = aws.StringValue(ec2.VpcId)
baseTarget.EC2SubnetID = aws.StringValue(ec2.SubnetId)
baseTarget.EC2PrivateIP = privateIP
baseTarget.EC2PublicIP = aws.StringValue(ec2.PublicIpAddress)
}

var targetsInTask []PrometheusECSTarget
var merr error
for _, m := range task.Matched {
container := task.Definition.ContainerDefinitions[m.ContainerIndex]
// Shallow copy task level attributes
containerTarget := baseTarget
// Add container specific info
containerTarget.ContainerName = aws.StringValue(container.Name)
containerTarget.ContainerLabels = task.ContainerLabels(m.ContainerIndex)
// Multiple targets for a single container
for _, matchedTarget := range m.Targets {
// Shallow copy from container
target := containerTarget
mappedPort, err := task.MappedPort(container, int64(matchedTarget.Port))
if err != nil {
err = errctx.WithValues(err, map[string]interface{}{
errKeyTarget: matchedTarget,
errKeyTask: task,
})
}
// Skip this target and keep track of port error, does not abort.
if multierr.AppendInto(&merr, err) {
continue
}
target.Address = fmt.Sprintf("%s:%d", privateIP, mappedPort)
if matchedTarget.MetricsPath != "" {
target.MetricsPath = matchedTarget.MetricsPath
}
target.Job = matchedTarget.Job
targetsInTask = append(targetsInTask, target)
}
}
return targetsInTask, merr
}
Loading