Skip to content

Commit

Permalink
Stop waiting for signals on closed outleters (#11263)
Browse files Browse the repository at this point in the history
Outleters start a goroutine to handle the finalization of filebeat. If the
outleter is closed by other means the goroutine will be kept running
even if it has nothing to do, leaking goroutines.

Stop this goroutine if the outleter is closed.
  • Loading branch information
jsoriano committed Mar 19, 2019
1 parent 57f8b5c commit 57c9891
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916]
- Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950]
- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105]
- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263]
- Fix issue preventing docker container events to be stored if the container has a network interface without ip address. {issue}11225[11225] {pull}11247[11247]
- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test
case. {issue}11004[11004] {pull}11105[11105]
Expand Down
1 change: 1 addition & 0 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
// Outleter is the outlet for an input
type Outleter interface {
Close() error
Done() <-chan struct{}
OnEvent(data *util.Data) bool
}
7 changes: 7 additions & 0 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ type outlet struct {
wg eventCounter
client beat.Client
isOpen atomic.Bool
done chan struct{}
}

func newOutlet(client beat.Client, wg eventCounter) *outlet {
o := &outlet{
wg: wg,
client: client,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
}
return o
}

func (o *outlet) Close() error {
isOpen := o.isOpen.Swap(false)
if isOpen {
close(o.done)
return o.client.Close()
}
return nil
}

func (o *outlet) Done() <-chan struct{} {
return o.done
}

func (o *outlet) OnEvent(d *util.Data) bool {
if !o.isOpen.Load() {
return false
Expand Down
12 changes: 10 additions & 2 deletions filebeat/channel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error {
return nil
}

func (o *subOutlet) Done() <-chan struct{} {
return o.done
}

func (o *subOutlet) OnEvent(d *util.Data) bool {

o.mutex.Lock()
Expand Down Expand Up @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
if sig != nil {
go func() {
<-sig
outlet.Close()
select {
case <-outlet.Done():
return
case <-sig:
outlet.Close()
}
}()
}
return outlet
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/input_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,4 @@ type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }

0 comments on commit 57c9891

Please sign in to comment.