Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8091 to 6.x: Publisher pipeline: pass logger and metrics registry #8147

Merged
merged 1 commit into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Report number of open file handles on Windows. {pull}8329[8329]
- Added the `add_process_metadata` processor to enrich events with process information. {pull}6789[6789]
- Add Beats Central Management {pull}8559[8559]
- Report configured queue type. {pull}8091[8091]

*Auditbeat*

Expand Down
9 changes: 8 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}
Expand Down
40 changes: 26 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

type reporter struct {
done *stopper
done *stopper
logger *logp.Logger

checkRetry time.Duration

Expand All @@ -58,7 +59,9 @@ type reporter struct {
out []outputs.NetworkClient
}

var debugf = logp.MakeDebug("monitoring")
const selector = "monitoring"

var debugf = logp.MakeDebug(selector)

var errNoMonitoring = errors.New("xpack monitoring not available")

Expand Down Expand Up @@ -104,7 +107,9 @@ func defaultConfig(settings report.Settings) config {
}

func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) {
log := logp.L().Named(selector)
config := defaultConfig(settings)

if err := cfg.Unpack(&config); err != nil {
return nil, err
}
Expand All @@ -121,7 +126,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
return nil, err
}
if proxyURL != nil {
logp.Info("Using proxy URL: %s", proxyURL)
log.Infof("Using proxy URL: %s", proxyURL)
}
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
Expand Down Expand Up @@ -154,10 +159,11 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
return memqueue.NewBroker(log,
memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
}

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")
Expand All @@ -167,6 +173,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{},
monitoring,
queueFactory,
outputs.Group{
Expand All @@ -189,6 +196,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

r := &reporter{
logger: log,
done: newStopper(),
beatMeta: makeMeta(beat),
tags: config.Tags,
Expand All @@ -211,18 +219,20 @@ func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

log := r.logger

logged := false

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out[rand.Intn(len(r.out))]
err := client.Connect()
if err == nil {
closing(client)
closing(log, client)
break
} else {
if !logged {
logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
logged = true
}
debugf("Monitoring could not connect to elasticsearch, failed with %v", err)
Expand All @@ -235,7 +245,7 @@ func (r *reporter) initLoop(c config) {
}
}

logp.Info("Successfully connected to X-Pack Monitoring endpoint.")
log.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop("state", "state", c.StatePeriod)
Expand All @@ -247,8 +257,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration)
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)
log := r.logger

log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand Down Expand Up @@ -317,9 +329,9 @@ func makeClient(
return newPublishClient(esClient, params), nil
}

func closing(c io.Closer) {
func closing(log *logp.Logger, c io.Closer) {
if err := c.Close(); err != nil {
logp.Warn("Closed failed with: %v", err)
log.Warnf("Closed failed with: %v", err)
}
}

Expand Down
10 changes: 5 additions & 5 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/queue"
)
Expand All @@ -33,9 +32,10 @@ import (
// - reload
type outputController struct {
beat beat.Info
monitors Monitors

logger *logp.Logger
observer outputObserver
reg *monitoring.Registry

queue queue.Queue

Expand Down Expand Up @@ -63,14 +63,14 @@ type outputWorker interface {

func newOutputController(
beat beat.Info,
reg *monitoring.Registry,
monitors Monitors,
log *logp.Logger,
observer outputObserver,
b queue.Queue,
) *outputController {
c := &outputController{
beat: beat,
reg: reg,
monitors: monitors,
logger: log,
observer: observer,
queue: b,
Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error {
return err
}

output, err := loadOutput(c.beat, c.reg, outputCfg)
output, err := loadOutput(c.beat, c.monitors, outputCfg)
if err != nil {
return err
}
Expand Down
72 changes: 46 additions & 26 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ var publishDisabled = false

const defaultQueueType = "mem"

// Monitors configures visibility for observing state and progress of the
// pipeline.
type Monitors struct {
Metrics *monitoring.Registry
Telemetry *monitoring.Registry
Logger *logp.Logger
}

func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}
Expand All @@ -46,12 +54,17 @@ func init() {
// configured queue and outputs.
func Load(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
Expand Down Expand Up @@ -80,72 +93,74 @@ func Load(
},
}

queueBuilder, err := createQueueBuilder(config.Queue)
queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
}

out, err := loadOutput(beatInfo, reg, outcfg)
out, err := loadOutput(beatInfo, monitors, outcfg)
if err != nil {
return nil, err
}

p, err := New(beatInfo, reg, queueBuilder, out, settings)
p, err := New(beatInfo, monitors, monitors.Metrics, queueBuilder, out, settings)
if err != nil {
return nil, err
}

logp.Info("Beat name: %s", name)
log.Info("Beat name: %s", name)
return p, err
}

func loadOutput(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}

if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
log.Info(msg)
return outputs.Fail(errors.New(msg))
}

var (
outReg *monitoring.Registry
metrics *monitoring.Registry
outStats outputs.Observer
)

if reg != nil {
outReg = reg.GetRegistry("output")
if outReg != nil {
outReg.Clear()
} else {
outReg = reg.NewRegistry("output")
}
outStats = outputs.NewStats(outReg)
if monitors.Metrics != nil {
metrics = monitors.Metrics.NewRegistry("output")
outStats = outputs.NewStats(metrics)
}

out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}

if outReg != nil {
monitoring.NewString(outReg, "type").Set(outcfg.Name())
if metrics != nil {
monitoring.NewString(metrics, "type").Set(outcfg.Name())
}
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.NewRegistry("output")
monitoring.NewString(telemetry, "name").Set(outcfg.Name())
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputRegistry := stateRegistry.NewRegistry("output")
monitoring.NewString(outputRegistry, "name").Set(outcfg.Name())

return out, nil
}

func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
Expand All @@ -161,7 +176,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que
queueConfig = common.NewConfig()
}

if monitors.Telemetry != nil {
queueReg := monitors.Telemetry.NewRegistry("queue")
monitoring.NewString(queueReg, "name").Set(queueType)
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
return queueFactory(eventer, monitors.Logger, queueConfig)
}, nil
}
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"time"

"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
Expand Down Expand Up @@ -153,6 +153,7 @@ type queueFactory func(queue.Eventer) (queue.Queue, error)
// queue and outputs will be closed.
func New(
beat beat.Info,
monitors Monitors,
metrics *monitoring.Registry,
queueFactory queueFactory,
out outputs.Group,
Expand Down Expand Up @@ -205,7 +206,7 @@ func New(
}
p.eventSema = newSema(maxEvents)

p.output = newOutputController(beat, metrics, log, p.observer, p.queue)
p.output = newOutputController(beat, monitors, log, p.observer, p.queue)
p.output.Set(out)

return p, nil
Expand Down
9 changes: 7 additions & 2 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ func RunTests(
return fmt.Errorf("unpacking config failed: %v", err)
}

// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
pipeline, err := pipeline.Load(info, pipeline.Monitors{
Metrics: nil,
Telemetry: nil,
Logger: logp.L(),
},
config.Pipeline,
config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
}
Expand Down
Loading