diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b3d4e85068d..73e35599e0a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916] - Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950] - Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] - Fix issue preventing docker container events to be stored if the container has a network interface without ip address. {issue}11225[11225] {pull}11247[11247] - Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 82e5a82af37..877b818870a 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) // Outleter is the outlet for an input type Outleter interface { Close() error + Done() <-chan struct{} OnEvent(data *util.Data) bool } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index d130cf9ceeb..c0fe2b0c9e3 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -27,6 +27,7 @@ type outlet struct { wg eventCounter client beat.Client isOpen atomic.Bool + done chan struct{} } func newOutlet(client beat.Client, wg eventCounter) *outlet { @@ -34,6 +35,7 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { wg: wg, client: client, isOpen: atomic.MakeBool(true), + done: make(chan struct{}), } return o } @@ -41,11 +43,16 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { func (o *outlet) Close() error { isOpen := o.isOpen.Swap(false) if isOpen { + close(o.done) return o.client.Close() } return nil } +func (o *outlet) Done() <-chan struct{} { + return o.done +} + func (o *outlet) OnEvent(d *util.Data) bool { if !o.isOpen.Load() { return false diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 134765c4cd8..aec2132fa20 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error { return nil } +func (o *subOutlet) Done() <-chan struct{} { + return o.done +} + func (o *subOutlet) OnEvent(d *util.Data) bool { o.mutex.Lock() @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter { if sig != nil { go func() { - <-sig - outlet.Close() + select { + case <-outlet.Done(): + return + case <-sig: + outlet.Close() + } }() } return outlet diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 3a36cb6040d..bdaba0c7d2a 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -169,3 +169,4 @@ type TestOutlet struct{} func (o TestOutlet) OnEvent(event *util.Data) bool { return true } func (o TestOutlet) Close() error { return nil } +func (o TestOutlet) Done() <-chan struct{} { return nil }