Skip to content

Commit

Permalink
Beatless initial PR (elastic#8485)
Browse files Browse the repository at this point in the history
## This PR provides the following

1. Plugin infra for developing providers and functions
2. A local stdin provider only used for testing, I will remove it in the
final version.
3. AWS provider and function types for:
  - Cloudwatch logs
  - SQS
  - Kinesis
  - Api web gateway proxy
4. License checker
5. Packaging of artifact
6. Runners
7. CLI infrastructure
8. CLI to push a cloudwatch logs function.
9. CLI to delete any function
10. Processors support.
11. Types to validate value from the users and the lambda function.

## What it doesn't provides

- ECS and full event extraction. (NOT for v1)
- Specifying the AWS credentials in the configuration
- CLI for SQS, Kinesis, API
- Robust CLI interaction with the API, rollback on failure / versioning.
- Removal of not supported outputs
- Removal of seccomp check
- Integration tests
- Updated build task to produce containing the user executable beat and
the linux beats.
- Concurrency / memory settings
  • Loading branch information
ph committed Oct 24, 2018
1 parent 1a0682f commit a02794d
Show file tree
Hide file tree
Showing 80 changed files with 4,884 additions and 192 deletions.
1 change: 1 addition & 0 deletions dev-tools/generate_notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import csv
import re
import pdb
import copy


Expand Down
13 changes: 9 additions & 4 deletions libbeat/feature/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// Registry is the global plugin registry, this variable is meant to be temporary to move all the
// internal factory to receive a context that include the current beat registry.
var Registry = newRegistry()
var registry = NewRegistry()

// Featurable implements the description of a feature.
type Featurable interface {
Expand Down Expand Up @@ -96,10 +96,15 @@ func New(namespace, name string, factory interface{}, description Describer) *Fe
}
}

// GlobalRegistry return the configured global registry.
func GlobalRegistry() *Registry {
return registry
}

// RegisterBundle registers a bundle of features.
func RegisterBundle(bundle *Bundle) error {
for _, f := range bundle.Features() {
err := Registry.Register(f)
err := GlobalRegistry().Register(f)
if err != nil {
return err
}
Expand All @@ -119,7 +124,7 @@ func MustRegisterBundle(bundle *Bundle) {
// implementation.
func OverwriteBundle(bundle *Bundle) error {
for _, f := range bundle.Features() {
err := Registry.Register(f)
err := GlobalRegistry().Register(f)
if err != nil {
return err
}
Expand All @@ -138,7 +143,7 @@ func MustOverwriteBundle(bundle *Bundle) {

// Register register a new feature on the global registry.
func Register(feature Featurable) error {
return Registry.Register(feature)
return GlobalRegistry().Register(feature)
}

// MustRegister register a new Feature on the global registry and panic on error.
Expand Down
20 changes: 10 additions & 10 deletions libbeat/feature/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ import (

type mapper map[string]map[string]Featurable

// Registry implements a global registry for any kind of feature in beats.
// Registry implements a global FeatureRegistry for any kind of feature in beats.
// feature are grouped by namespace, a namespace is a kind of plugin like outputs, inputs, or queue.
// The feature name must be unique.
type registry struct {
type Registry struct {
sync.RWMutex
namespaces mapper
log *logp.Logger
}

// NewRegistry returns a new registry.
func newRegistry() *registry {
return &registry{
func NewRegistry() *Registry {
return &Registry{
namespaces: make(mapper),
log: logp.NewLogger("registry"),
}
}

// Register registers a new feature into a specific namespace, namespace are lazy created.
// Feature name must be unique.
func (r *registry) Register(feature Featurable) error {
func (r *Registry) Register(feature Featurable) error {
r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -97,7 +97,7 @@ func (r *registry) Register(feature Featurable) error {
}

// Unregister removes a feature from the registry.
func (r *registry) Unregister(namespace, name string) error {
func (r *Registry) Unregister(namespace, name string) error {
r.Lock()
defer r.Unlock()
ns := normalize(namespace)
Expand All @@ -117,7 +117,7 @@ func (r *registry) Unregister(namespace, name string) error {
}

// Lookup searches for a Feature by the namespace-name pair.
func (r *registry) Lookup(namespace, name string) (Featurable, error) {
func (r *Registry) Lookup(namespace, name string) (Featurable, error) {
r.RLock()
defer r.RUnlock()

Expand All @@ -138,7 +138,7 @@ func (r *registry) Lookup(namespace, name string) (Featurable, error) {
}

// LookupAll returns all the features for a specific namespace.
func (r *registry) LookupAll(namespace string) ([]Featurable, error) {
func (r *Registry) LookupAll(namespace string) ([]Featurable, error) {
r.RLock()
defer r.RUnlock()

Expand All @@ -160,7 +160,7 @@ func (r *registry) LookupAll(namespace string) ([]Featurable, error) {
}

// Overwrite allow to replace an existing feature with a new implementation.
func (r *registry) Overwrite(feature Featurable) error {
func (r *Registry) Overwrite(feature Featurable) error {
_, err := r.Lookup(feature.Namespace(), feature.Name())
if err == nil {
err := r.Unregister(feature.Namespace(), feature.Name())
Expand All @@ -173,7 +173,7 @@ func (r *registry) Overwrite(feature Featurable) error {
}

// Size returns the number of registered features in the registry.
func (r *registry) Size() int {
func (r *Registry) Size() int {
r.RLock()
defer r.RUnlock()

Expand Down
24 changes: 12 additions & 12 deletions libbeat/feature/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func TestRegister(t *testing.T) {
f := func() {}

t.Run("when the factory is nil", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
err := r.Register(New("outputs", "null", nil, defaultDetails))
if !assert.Error(t, err) {
return
}
})

t.Run("namespace and feature doesn't exist", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
err := r.Register(New("outputs", "null", f, defaultDetails))
if !assert.NoError(t, err) {
return
Expand All @@ -47,7 +47,7 @@ func TestRegister(t *testing.T) {
})

t.Run("namespace exists and feature doesn't exist", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "bar", f, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
if !assert.NoError(t, err) {
Expand All @@ -58,7 +58,7 @@ func TestRegister(t *testing.T) {
})

t.Run("namespace exists and feature exists and not the same factory", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", func() {}, defaultDetails))
err := r.Register(New("processor", "foo", f, defaultDetails))
if !assert.Error(t, err) {
Expand All @@ -69,7 +69,7 @@ func TestRegister(t *testing.T) {

t.Run("when the exact feature is already registered", func(t *testing.T) {
feature := New("processor", "foo", f, defaultDetails)
r := newRegistry()
r := NewRegistry()
r.Register(feature)
err := r.Register(feature)
if !assert.NoError(t, err) {
Expand All @@ -82,7 +82,7 @@ func TestRegister(t *testing.T) {
func TestFeature(t *testing.T) {
f := func() {}

r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
r.Register(New("HOLA", "fOO", f, defaultDetails))

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestFeature(t *testing.T) {
func TestLookup(t *testing.T) {
f := func() {}

r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
r.Register(New("processor", "foo2", f, defaultDetails))
r.Register(New("HELLO", "fOO", f, defaultDetails))
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestUnregister(t *testing.T) {
f := func() {}

t.Run("when the namespace and the feature exists", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
assert.Equal(t, 1, r.Size())
err := r.Unregister("processor", "foo")
Expand All @@ -157,7 +157,7 @@ func TestUnregister(t *testing.T) {
})

t.Run("when the namespace exist and the feature doesn't", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
assert.Equal(t, 1, r.Size())
err := r.Unregister("processor", "bar")
Expand All @@ -168,7 +168,7 @@ func TestUnregister(t *testing.T) {
})

t.Run("when the namespace doesn't exists", func(t *testing.T) {
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
assert.Equal(t, 1, r.Size())
err := r.Unregister("outputs", "bar")
Expand All @@ -182,15 +182,15 @@ func TestUnregister(t *testing.T) {
func TestOverwrite(t *testing.T) {
t.Run("when the feature doesn't exist", func(t *testing.T) {
f := func() {}
r := newRegistry()
r := NewRegistry()
assert.Equal(t, 0, r.Size())
r.Overwrite(New("processor", "foo", f, defaultDetails))
assert.Equal(t, 1, r.Size())
})

t.Run("overwrite when the feature exists", func(t *testing.T) {
f := func() {}
r := newRegistry()
r := NewRegistry()
r.Register(New("processor", "foo", f, defaultDetails))
assert.Equal(t, 1, r.Size())

Expand Down
2 changes: 1 addition & 1 deletion libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Register(name string, fn FactoryFunc, stability feature.Stability) {
// Factory retrieves config manager constructor. If no one is registered
// it will create a nil manager
func Factory() FactoryFunc {
factories, err := feature.Registry.LookupAll(Namespace)
factories, err := feature.GlobalRegistry().LookupAll(Namespace)
if err != nil {
return nilFactory
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queue_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func RegisterType(name string, fn Factory) {

// FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown
func FindFactory(name string) Factory {
f, err := feature.Registry.Lookup(Namespace, name)
f, err := feature.GlobalRegistry().Lookup(Namespace, name)
if err != nil {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/beatless/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.vscode
/*/_meta/kibana.generated
beatless
beatless.test
build
data
fields.yml
Expand Down
16 changes: 16 additions & 0 deletions x-pack/beatless/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.10.3
MAINTAINER Pier-Hugues Pellerin <ph@elastic.co>

RUN set -x && \
apt-get update && \
apt-get install -y --no-install-recommends \
netcat python-pip rsync virtualenv && \
apt-get clean

RUN pip install --upgrade setuptools

# Setup work environment
ENV BEATLESS_PATH /go/src/github.com/elastic/beats/x-pack/beatless

RUN mkdir -p $BEATLESS_PATH/build/coverage
WORKDIR $BEATLESS_PATH
8 changes: 7 additions & 1 deletion x-pack/beatless/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ LICENSE=Elastic
BEAT_TITLE?=Beatless
SYSTEM_TESTS?=true
BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME}
TEST_ENVIRONMENT?=false
TEST_ENVIRONMENT?=true
GOX_FLAGS=-arch="amd64 386 arm ppc64 ppc64le"
ES_BEATS?=../../
FIELDS_FILE_PATH=module
Expand All @@ -14,3 +14,9 @@ include $(ES_BEATS)/libbeat/scripts/Makefile
# Runs all collection steps and updates afterwards
.PHONY: collect
collect:

# TODO(ph) This is used for debugging until we change the build to create 2 artifacts,
# we will do this in another PR.
.PHONY: linux
linux:
GOOS=linux go build -o pkg/beatless
49 changes: 49 additions & 0 deletions x-pack/beatless/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
########################## Beatless Configuration ###########################

# This file is a full configuration example documenting all non-deprecated
# options in comments. For a shorter configuration example, that contains only
# the most common options, please see beatless.yml in the same directory.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/beatless/index.html
#
#============================ Provider ===============================
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
beatless.provider.aws.functions:
# Define the list of function availables, each function required to have a unique name.
- name: fn_cloudwatch_logs
type: cloudwatch_logs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for cloudwatch logs"

# Concurrency, is the reserved number of instances for that function.
# Default is unreserved.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
# dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
triggers:
- log_group_name: /aws/lambda/beatless-cloudwatch_logs
filter_pattern: mylog_

# Define custom processors for this function.
#processors:
# - dissect:
# tokenizer: "%{key1} %{key2}"
38 changes: 35 additions & 3 deletions x-pack/beatless/_meta/beat.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
################### Beatless Configuration Example #########################
###################### Beatless Configuration Example #######################

############################# Beatless ######################################
# This file is an example configuration file highlighting only the most common
# options. The beatless.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/beatless/index.html
#

beatless:
#============================ Provider ===============================
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
beatless.provider.aws.functions:
# Accepts events from a cloudwatch log group.
- name: fn_cloudwatch_logs
type: cloudwatch_logs
# The IAM role that the lambda will take when executing your function.
role: iam
# List of cloudwatch streams registered to this function.
triggers:
- log_group_name: /aws/lambda/beatless-cloudwatch_logs
filter_name: myfiltername
filter_pattern: mylog_

# Accepts events from a SQS queue.
# - name: fn_sqs
# type: sqs
#
# Accepts events form a Kinesis stream
# - name: fn_kinesis
# type: kinesis
#
# Accepts events from an api gateway proxy call.
# - name: fn_apigateway_proxy
# type: api_gateway_proxy
Loading

0 comments on commit a02794d

Please sign in to comment.