Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/ecsobserver] Write discovered targets as Prometheus file sd #3785

Merged
merged 9 commits into from
Jun 30, 2021
73 changes: 36 additions & 37 deletions extension/observer/ecsobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
The `ecsobserver` uses the ECS/EC2 API to discover prometheus scrape targets from all running tasks and filter them
based on service names, task definitions and container labels.

NOTE: If you run collector as a sidecar, you should consider
use [ECS resource detector](../../../processor/resourcedetectionprocessor/README.md) instead. However, it does not have
service, EC2 instances etc. because it only queries local API.

## Config

The configuration is based on
Expand All @@ -14,10 +18,10 @@ The configuration is based on
```yaml
extensions:
ecs_observer:
refresh_interval: 15s
cluster_name: 'Cluster-1'
cluster_region: 'us-west-2'
result_file: '/etc/ecs_sd_targets.yaml'
refresh_interval: 60s # format is https://golang.org/pkg/time/#ParseDuration
cluster_name: 'Cluster-1' # cluster name need manual config
cluster_region: 'us-west-2' # region can be configured directly or use AWS_REGION env var
result_file: '/etc/ecs_sd_targets.yaml' # the directory for file must already exists
services:
- name_pattern: '^retail-.*$'
docker_labels:
Expand All @@ -37,11 +41,22 @@ receivers:
- job_name: "ecs-task"
file_sd_configs:
- files:
- '/etc/ecs_sd_targets.yaml'
- '/etc/ecs_sd_targets.yaml' # MUST match the file name in ecs_observer.result_file
relabel_configs: # Relabel here because label with __ prefix will be dropped by receiver.
- source_labels: [ __meta_ecs_cluster_name ] # ClusterName
action: replace
target_label: ClusterName
- source_labels: [ __meta_ecs_service_name ] # ServiceName
action: replace
target_label: ServiceName
- action: labelmap # Convert docker labels on container to metric labels
regex: ^__meta_ecs_container_labels_(.+)$ # Capture the key using regex, e.g. __meta_ecs_container_labels_Java_EMF_Metrics -> Java_EMF_Metrics
replacement: '$$1'

processors:
batch:

# Use awsemf for CloudWatch Container Insights Prometheus. The extension does not have requirement on exporter.
exporters:
awsemf:

Expand Down Expand Up @@ -249,6 +264,8 @@ prometheus instead of extension and can cause confusion.

## Output Format

[Example in unit test](testdata/ut_targets.expected.yaml).

The format is based
on [cloudwatch agent](https://github.com/aws/amazon-cloudwatch-agent/tree/master/internal/ecsservicediscovery#example-result)
, [ec2 sd](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
Expand Down Expand Up @@ -354,14 +371,13 @@ The implementation has two parts, core ecs service discovery logic and adapter f

### Packages

- `extension/observer/ecsobserver` adapter to implement the observer interface
- `internal/awsecs` polling AWS ECS and EC2 API and filter based on config
- `internal/awsconfig` the shared aws specific config (e.g. init sdk client), which eventually should be shared by every
package that calls AWS API (e.g. emf, xray).
- `extension/observer/ecsobserver` main logic
- [internal/ecsmock](internal/ecsmock) mock ECS cluster
- [internal/errctx](internal/errctx) structured error wrapping

### Flow

The pseudo code showing the overall flow.
The pseudocode showing the overall flow.

```
NewECSSD() {
Expand Down Expand Up @@ -414,36 +430,17 @@ otel's own /metrics.

### Error Handling

- Auth error will be logged, but the extension will not fail as the IAM role can be updated and take effect without
restarting the ECS task.
- If errors happen in the middle (e.g. rate limit), the discovery result will be merged with previous success runs to
avoid discarding active targets, though it may keep some stale targets as well.
- Auth and cluster not found error will cause the extension to stop (calling `host.ReportFatalError`). Although IAM role
can be updated at runtime without restarting the collector, it's better to fail to make the problem obvious. Same
applies to cluster not found. In the future we can add config to downgrade those errors if user want to monitor an ECS
cluster with collector running outside the cluster, the collector can run anywhere as long as it can reach scrape
targets and AWS API.
- If we have non-critical error, we overwrite existing file with whatever targets we have, we might not have all the
targets due to throttle etc.

### Unit Test

A mock ECS and EC2 server will be implemented in `internal/awsecs`. The rough implementation will be like the following:

```go
type ECSServiceMock struct {
definitions map[string]*ecs.TaskDefinition
tasks map[string]*ecs.Task
services map[string]*ecs.Service
taskToEC2 map[string]*ec2.Instance
}

// RunOnEC2 registers the task definition and instance.
// It creates a task and sets it to running.
// The returned task pointer can be modified directly and will be reflected in mocked AWS API call results.
func (e *ECSServiceMock) RunOnEC2(def *ecs.TaskDefinition, instance *ec2.Instance) *ecs.Task {
panic("impl")
}

// RunOnFargate is similar to RunOnEC2 except instance is not needed as fargate is 'serverless'.
// A unique private ip will be generated to simulate awsvpc.
func (e *ECSServiceMock) RunOnFargate(def *ecs.TaskDefinition) *ecs.Task {
panic("impl")
}
```
A mock ECS and EC2 server is in [internal/ecsmock](internal/ecsmock), see [fetcher_test](fetcher_test.go) for its usage.

### Integration Test

Expand All @@ -452,6 +449,8 @@ against actual ECS service on both EC2 and Fargate.

## Changelog

- 2021-06-02 first version that actually works on ECS by @pingleig, thanks @anuraaga @Aneurysm9 @jrcamp @mxiamxia for
reviewing (all the PRs ...)
- 2021-02-24 Updated doc by @pingleig
- 2020-12-29 Initial implementation by [Raphael](https://github.com/theRoughCode)
in [#1920](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1920)
32 changes: 29 additions & 3 deletions extension/observer/ecsobserver/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -41,6 +43,32 @@ type errWithAttributes interface {
zapFields() []zap.Field
}

// hasCriticalError returns first critical error.
// Currently only access error and cluster not found are treated as critical.
func hasCriticalError(logger *zap.Logger, err error) error {
merr := multierr.Errors(err)
if merr == nil {
merr = []error{err} // fake a multi error
}
for _, err := range merr {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
// NOTE: we don't use zap.Error because the stack trace is quite useless here
// We print the error after entire fetch and match loop is done, and source
// of these error are from user config, wrong IAM, typo in cluster name etc.
switch awsErr.Code() {
case ecs.ErrCodeAccessDeniedException:
logger.Error("AccessDenied", zap.String("ErrMessage", awsErr.Message()))
return awsErr
case ecs.ErrCodeClusterNotFoundException:
logger.Error("Cluster NotFound", zap.String("ErrMessage", awsErr.Message()))
return awsErr
}
}
}
return nil
}

func printErrors(logger *zap.Logger, err error) {
merr := multierr.Errors(err)
if merr == nil {
Expand Down Expand Up @@ -82,9 +110,7 @@ func extractErrorFields(err error) ([]zap.Field, string) {
v, ok = errctx.ValueFrom(err, errKeyTarget)
if ok {
if target, ok := v.(MatchedTarget); ok {
// TODO: change to string once another PR for matcher got merged
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3386 defines Stringer
fields = append(fields, zap.Int("MatcherType", int(target.MatcherType)))
fields = append(fields, zap.String("MatcherType", target.MatcherType.String()))
scope = "Target"
}
}
Expand Down
8 changes: 5 additions & 3 deletions extension/observer/ecsobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ var _ component.Extension = (*ecsObserver)(nil)
// ecsObserver implements component.ServiceExtension interface.
type ecsObserver struct {
logger *zap.Logger
sd *ServiceDiscovery
sd *serviceDiscovery

// for Shutdown
cancel func()
}

// Start runs the service discovery in backeground
// Start runs the service discovery in background
func (e *ecsObserver) Start(_ context.Context, host component.Host) error {
e.logger.Info("Starting ECSDiscovery")
// Ignore the ctx parameter as it is not for long running operation
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
go func() {
if err := e.sd.RunAndWriteFile(ctx); err != nil {
if err := e.sd.runAndWriteFile(ctx); err != nil {
e.logger.Error("ECSDiscovery stopped by error", zap.Error(err))
// Stop the collector
host.ReportFatalError(err)
}
}()
return nil
Expand Down
88 changes: 83 additions & 5 deletions extension/observer/ecsobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,98 @@ package ecsobserver

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock"
)

// inspectErrorHost implements component.Host.
// btw: I only find assertNoErrorHost in other components, seems there is no exported util struct.
type inspectErrorHost struct {
component.Host

// Why we need a mutex here? Our extension only has one go routine so it seems
// we don't need to protect the error as our extension is the only component for this 'host'.
// But without the lock the test actually fails on race detector.
// There is no actual concurrency in our test, when we read the error in test assertion,
// we know the extension has already stopped because we provided invalided config and waited long enough.
// However, (I assume) from race detector's perspective, race between a stopped goroutine and a running one
// is same as two running goroutines. A goroutine's stop condition is uncertain at runtime, and the data
// access order may varies, goroutine A can stop before B in first run and reverse in next run.
// As long as there is some read/write of one memory area without protection from multiple go routines,
// it means the code can have data race, but it does not mean this race always happen.
// In our case, the race never happens because we hard coded the sleep time of two go routines.
//
// btw: assertNoErrorHost does not have mutex because it never saves the error. Its ReportFatalError
// just call assertion and forget about nil error. For unexpected error it call helpers to fail the test
// and those helper func all have mutex. https://golang.org/src/testing/testing.go
mu sync.Mutex
err error
}

func newInspectErrorHost() component.Host {
return &inspectErrorHost{
Host: componenttest.NewNopHost(),
}
}

func (h *inspectErrorHost) ReportFatalError(err error) {
h.mu.Lock()
h.err = err
h.mu.Unlock()
}

func (h *inspectErrorHost) getError() error {
h.mu.Lock()
cp := h.err
h.mu.Unlock()
return cp
}

// Simply start and stop, the actual test logic is in sd_test.go until we implement the ListWatcher interface.
// In that case sd itself does not use timer and relies on caller to trigger List.
func TestExtensionStartStop(t *testing.T) {
ext, err := createExtension(context.TODO(), component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig())
require.NoError(t, err)
require.IsType(t, &ecsObserver{}, ext)
require.NoError(t, ext.Start(context.TODO(), componenttest.NewNopHost()))
require.NoError(t, ext.Shutdown(context.TODO()))
settings := component.ExtensionCreateSettings{Logger: zap.NewExample()}
refreshInterval := time.Millisecond
waitDuration := 2 * refreshInterval

createTestExt := func(c *ecsmock.Cluster, output string) component.Extension {
f := newTestTaskFetcher(t, c)
cfg := createDefaultConfig()
sdCfg := cfg.(*Config)
sdCfg.RefreshInterval = refreshInterval
sdCfg.ResultFile = output
ext, err := createExtensionWithFetcher(settings, sdCfg, f)
require.NoError(t, err)
return ext
}

t.Run("noop", func(t *testing.T) {
c := ecsmock.NewCluster()
ext := createTestExt(c, "testdata/ut_ext_noop.actual.yaml")
require.IsType(t, &ecsObserver{}, ext)
host := newInspectErrorHost()
require.NoError(t, ext.Start(context.TODO(), host))
time.Sleep(waitDuration)
require.NoError(t, host.(*inspectErrorHost).getError())
require.NoError(t, ext.Shutdown(context.TODO()))
})

t.Run("critical error", func(t *testing.T) {
c := ecsmock.NewClusterWithName("different than default config")
ext := createTestExt(c, "testdata/ut_ext_critical_error.actual.yaml")
host := newInspectErrorHost()
require.NoError(t, ext.Start(context.TODO(), host))
time.Sleep(waitDuration)
err := host.(*inspectErrorHost).getError()
require.Error(t, err)
require.Error(t, hasCriticalError(zap.NewExample(), err))
})
}
12 changes: 11 additions & 1 deletion extension/observer/ecsobserver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ecsobserver

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -42,7 +43,16 @@ func createDefaultConfig() config.Extension {

func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
sdCfg := cfg.(*Config)
sd, err := NewDiscovery(*sdCfg, ServiceDiscoveryOptions{Logger: params.Logger})
fetcher, err := newTaskFetcherFromConfig(*sdCfg, params.Logger)
if err != nil {
return nil, fmt.Errorf("init fetcher failed: %w", err)
}
return createExtensionWithFetcher(params, sdCfg, fetcher)
}

// fetcher is mock in unit test or AWS API client
func createExtensionWithFetcher(params component.ExtensionCreateSettings, sdCfg *Config, fetcher *taskFetcher) (component.Extension, error) {
sd, err := newDiscovery(*sdCfg, serviceDiscoveryOptions{Logger: params.Logger, Fetcher: fetcher})
if err != nil {
return nil, err
}
Expand Down
Loading