Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client #8543

Merged
merged 3 commits into from
Jul 17, 2019

Conversation

vjsamuel
Copy link
Contributor

@vjsamuel vjsamuel commented Oct 3, 2018

This PR changes the way TCP input source is created to a ConnectionFactory based model which takes a SplitFunc, NetworkFunc and two ClientCallback funcs that can be called during a client connect and disconnect.

These hooks allow the TCP implementer to do better logging, stateful processing and also spin up splitters per client.

@elasticmachine
Copy link
Collaborator

Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?

ID() string
}

// Client allows a callback to occur when a new client connects or disconnects to the server

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported type ClientCallback should be of the form "ClientCallback ..." (with optional leading article)

@vjsamuel vjsamuel changed the title Add ConnectionFactory to TCP input source to add factories per client Add ClientFactory to TCP input source to add factories per client Oct 3, 2018
@vjsamuel vjsamuel changed the title Add ClientFactory to TCP input source to add factories per client Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client Oct 3, 2018
@vjsamuel vjsamuel force-pushed the stateful_tcp branch 5 times, most recently from de8c1e1 to 554755b Compare October 6, 2018 06:08
@ph ph self-assigned this Oct 17, 2018
@ph ph added the Filebeat Filebeat label Oct 17, 2018
ph
ph previously requested changes Oct 17, 2018

"github.com/elastic/beats/filebeat/inputsource"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
)

// Client is a remote client.
// ClientInfo is a remote client.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the rename?

return tcp.New(&config.Config, splitFunc, cb)
factory := func() (inputsource.NetworkFunc, bufio.SplitFunc, tcp.ClientCallback, tcp.ClientCallback) {
return cb, splitFunc, nil, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this factory in the TCP package so we can reuse the same factory definition for both TCP input and syslog.

@@ -41,15 +42,32 @@ type client struct {
splitFunc bufio.SplitFunc
maxMessageSize uint64
timeout time.Duration
onConnect ClientCallback
OnDisconnect ClientCallback
id string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep the original type uuid.UUID here instead of using a string.

@@ -41,15 +42,32 @@ type client struct {
splitFunc bufio.SplitFunc
maxMessageSize uint64
timeout time.Duration
onConnect ClientCallback
OnDisconnect ClientCallback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other Fields and callback, s/OnDisconnect/onDisconnect

@@ -124,11 +123,11 @@ func (s *Server) run() {

err := client.handle()
if err != nil {
s.log.Debugw("Client error", "error", err)
s.log.Debugw("ClientInfo error", "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not change the type, so let's keep Client as the log selector instead of ClientInfo

}

defer s.log.Debugw(
"Client disconnected",
"ClientInfo disconnected",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above lets keep Client.

}

// ClientFactory passes a connection ID as an input and gets back a NetworkFunc and a SplitFunc
type ClientFactory func() (inputsource.NetworkFunc, bufio.SplitFunc, ClientCallback, ClientCallback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that we miss some code in this PR I am looking at the comments and I would expect a factory to create a new object or some new code provided by X input. Looking at this method signature I don't see any parameters but In the comment I see passes a connection ID as an input and gets back.

From what I remember from previous discussion we want to achieve the following: create a parser unique per client, that could contain states?

I think we are adding dependencies where we could split them, we could do the following:

  • Revert the changes for UUID in the TCP type, creating an ID could be implemented in the custom decoder when it get created.
  • Remove any callbacks, onConnect/onDisconnect
  • Create a new interface, this interface will receive an io.Reader which is
type TCPDecoder interface {
  func Handle(io.Reader) error
  func Close() // Maybe needed for you case to do some cleanup.
}
  • The factory return an instance of a TCPDecoder
  • Move the current logic into a SplitDecoder, the split decoder will accepts the split func and a callback, this will make it backward compatible with what we have today.

I believe this will better encapsulate your code and you will be free to do whatever you want from the io.Reader, I think this make it lot more flexible.

idx := 0
for client := range s.clients {
for client, _ := range s.clients {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should omit 2nd value from range; this loop is equivalent to for client := range ...

client := &client{
conn: conn,
log: log.With("remote_address", conn.RemoteAddr()),
) *splitClient {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewSplitClient returns unexported type *tcp.splitClient, which can be annoying to use

filebeat/inputsource/tcp/client.go Outdated Show resolved Hide resolved
filebeat/inputsource/tcp/client.go Outdated Show resolved Hide resolved
filebeat/inputsource/tcp/client.go Outdated Show resolved Hide resolved
@vjsamuel vjsamuel force-pushed the stateful_tcp branch 3 times, most recently from a3308d5 to b3193c5 Compare July 11, 2019 23:41
}

// NewSplitClient allows creation of a TCP client that has splitting capabilities.
func NewSplitClient(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the constructor used somewhere else? Let's not export symbols we don't need somewhere else for sure.

conn net.Conn,
log *logp.Logger,
// ClientFactory returns a Client func
type ClientFactory func(config *Config) Client
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to documented more detailed how "client" and "server" interact.

Also I'm not sure I like the name Client here. Client suggests we have an actor connection to some remote endpoint. This is more like a Handler or ConnectionHandler.

The config parameter is a shared resource between server and all clients. Maybe better pass as value, so no handler/client gets the chance to change a value under the hood.

@@ -79,24 +95,24 @@ func (c *client) handle() error {
for scanner.Scan() {
err := scanner.Err()
if err != nil {
// we are forcing a close on the socket, lets ignore any error that could happen.
// we are forcing a Close on the socket, lets ignore any error that could happen.
select {
case <-c.done:
break
default:
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel could be closed anytime, not only if the reader fails.
The way this function is written the callback if scan could produce an 'event', but channel is closed in the meantime. Is this expected behavior or do we rather not want to publish the event on close and guarantee the handler can return immediately.

@@ -71,7 +71,7 @@ type DeadlineReader struct {
}

// NewDeadlineReader returns a new DeadlineReader
func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader {
func NewDeadlineReader(c net.Conn, timeout time.Duration) io.Reader {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the return type here? Prefer to return concrete types, but accept interfaces.

@urso urso dismissed ph’s stale review July 12, 2019 12:22

looks like requested changes have been adressed

@urso
Copy link

urso commented Jul 12, 2019

Jenkins, test this.

@urso
Copy link

urso commented Jul 12, 2019

This CI fail seems to be related:

16:56:14 command [go test -race -cover -coverprofile /tmp/gotestcover-180254964 github.com/elastic/beats/filebeat/inputsource/tcp]: exit status 1
16:56:14 ==================
16:56:15 WARNING: DATA RACE
16:56:15 Read at 0x00c0000a6c00 by goroutine 12:
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*splitHandler).Close()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/client.go:126 +0x56
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*Server).Stop()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server.go:142 +0x202
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.TestReceiveEventsAndMetadata.func1()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server_test.go:201 +0xa04
16:56:15   testing.tRunner()
16:56:15       /var/lib/jenkins/.gvm/versions/go1.12.4.linux.amd64/src/testing/testing.go:865 +0x163
16:56:15 
16:56:15 Previous write at 0x00c0000a6c00 by goroutine 10:
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*splitHandler).Handle()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/client.go:80 +0x6d
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*Server).run.func1()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server.go:120 +0x339
16:56:15 
16:56:15 Goroutine 12 (running) created at:
16:56:15   testing.(*T).Run()
16:56:15       /var/lib/jenkins/.gvm/versions/go1.12.4.linux.amd64/src/testing/testing.go:916 +0x65a
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.TestReceiveEventsAndMetadata()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server_test.go:157 +0x1979
16:56:15   testing.tRunner()
16:56:15       /var/lib/jenkins/.gvm/versions/go1.12.4.linux.amd64/src/testing/testing.go:865 +0x163
16:56:15 
16:56:15 Goroutine 10 (running) created at:
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*Server).run()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server.go:111 +0x220
16:56:15   github.com/elastic/beats/filebeat/inputsource/tcp.(*Server).Start.func1()
16:56:15       /var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat/inputsource/tcp/server.go:85 +0x80
16:56:15 ==================
16:56:15 --- FAIL: TestReceiveEventsAndMetadata (1.72s)
16:56:15     --- FAIL: TestReceiveEventsAndMetadata/MaxReadBufferReachedUserConfigured (0.01s)
16:56:15         testing.go:809: race detected during execution of test
16:56:15     testing.go:809: race detected during execution of test
16:56:15 FAIL
16:56:15 coverage: 81.6% of statements
16:56:15 FAIL	github.com/elastic/beats/filebeat/inputsource/tcp	2.018s
16:56:15 ?   	github.com/elastic/beats/filebeat/scripts/generator/fileset	[no test files]
16:56:15 ?   	github.com/elastic/beats/filebeat/scripts/generator/module	[no test files]
16:56:18 ?   	github.com/elastic/beats/filebeat/scripts/mage	[no test files]
16:56:18 ok  	github.com/elastic/beats/filebeat/registrar	1.038s	coverage: 17.3% of statements
16:56:19 ok  	github.com/elastic/beats/filebeat/scripts/tester	1.020s	coverage: 14.0% of statements
16:56:19 ok  	github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata	1.024s	coverage: 88.5% of statements
16:56:20 ok  	github.com/elastic/beats/filebeat/util	1.018s	coverage: 71.4% of statements
16:56:20 test failed
16:56:20 make[1]: *** [unit-tests] Error 1
16:56:20 make[1]: Leaving directory `/var/lib/jenkins/workspace/elastic+beats+pull-request+multijob-linux/beat/filebeat/label/linux-immutable/src/github.com/elastic/beats/filebeat'
16:56:20 make: *** [testsuite] Error 2

@vjsamuel
Copy link
Contributor Author

@urso @ph i am not able to reproduce the data race on local. can you please help rerun Jenkins?

@vjsamuel
Copy link
Contributor Author

never mind i have updated the PR with the fix

@@ -231,6 +231,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...v7.0.0-rc1[Check the HEA
- Add ISO8601 timestamp support in syslog metricset. {issue}8716[8716] {pull}10736[10736]
- Add support for loading custom NetFlow and IPFIX field definitions to netflow input. {pull}10945[10945] {pull}11223[11223]
- Added categorization fields for SSH login events in the system/auth fileset. {pull}11334[11334]
- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog suggest it's an usr visible feature in 7.0.0-rc1. Please move the changelog entry to: CHANGELOG-developer.next.asciidoc

@urso urso merged commit e5c955f into elastic:master Jul 17, 2019
@vjsamuel
Copy link
Contributor Author

thanks @urso

@vjsamuel vjsamuel deleted the stateful_tcp branch July 17, 2019 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Filebeat Filebeat
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants