Skip to content

Commit

Permalink
Osquerybeat: Improve osquery client connect code (#28848) (#28854)
Browse files Browse the repository at this point in the history
(cherry picked from commit d2e3b99)

Co-authored-by: Aleksandr Maus <aleksandr.maus@elastic.co>
  • Loading branch information
mergify[bot] and aleksmaus committed Nov 6, 2021
1 parent 693f41c commit dafb447
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 30 deletions.
7 changes: 6 additions & 1 deletion x-pack/osquerybeat/internal/osqd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ var protectedFlags = Flags{

// The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default
"pack_delimiter": "_",
"config_refresh": 10,

// Refresh config every 60 seconds
// The previous setting was 10 seconds which is unnecessary frequent.
// Osquery does not expect that frequent policy/configuration changes
// and can tolerate non real-time configuration change application.
"config_refresh": 60,
}

func init() {
Expand Down
41 changes: 12 additions & 29 deletions x-pack/osquerybeat/internal/osqdcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"golang.org/x/sync/semaphore"
"gotest.tools/gotestsum/log"

"github.com/osquery/osquery-go"
genosquery "github.com/osquery/osquery-go/gen/osquery"
Expand Down Expand Up @@ -125,28 +126,21 @@ func (c *Client) Connect(ctx context.Context) error {
func (c *Client) reconnect(ctx context.Context) error {
c.close()

for i := 0; i < c.connectRetries; i++ {
attempt := i + 1
llog := c.log.With("attempt", attempt)
llog.Debug("connecting")
r := retry{
maxRetry: c.connectRetries,
retryWait: retryWait,
log: c.log.With("context", "osquery client connect"),
}

return r.Run(ctx, func(ctx context.Context) error {
cli, err := osquery.NewClient(c.socketPath, c.timeout)
if err != nil {
llog.Errorf("failed to connect: %v", err)
if i < c.connectRetries-1 {
llog.Infof("wait before next connect attempt: retry_wait: %v", retryWait)
if werr := waitWithContext(ctx, retryWait); werr != nil {
err = werr
break // Context cancelled, exit loop
}
} else {
return err
}
continue
log.Errorf("failed to connect: %v", err)
return err
}
c.cli = cli
break
}
return nil
return nil
})
}

func (c *Client) Close() {
Expand Down Expand Up @@ -287,17 +281,6 @@ func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]s
return colTypes, nil
}

func waitWithContext(ctx context.Context, to time.Duration) error {
t := time.NewTimer(to)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
}
return nil
}

func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} {
resolved := make([]map[string]interface{}, 0, len(hits))
for _, hit := range hits {
Expand Down
62 changes: 62 additions & 0 deletions x-pack/osquerybeat/internal/osqdcli/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package osqdcli

import (
"context"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

type retry struct {
maxRetry int
retryWait time.Duration
log *logp.Logger
}

type tryFunc func(ctx context.Context) error

func (r *retry) Run(ctx context.Context, fn tryFunc) (err error) {
maxAttempts := r.maxRetry + 1
for i := 0; i < maxAttempts; i++ {
attempt := i + 1
r.log.Debugf("attempt %v out of %v", attempt, maxAttempts)

err = fn(ctx)

if err != nil {
r.log.Debugf("attempt %v out of %v failed, err: %v", attempt, maxAttempts, err)
if i != maxAttempts {
if r.retryWait > 0 {
r.log.Debugf("wait for %v before next retry", r.retryWait)
err = waitWithContext(ctx, retryWait)
if err != nil {
r.log.Debugf("wait returned err: %v", err)
return err
}
}
} else {
r.log.Debugf("no more attempts, return err: %v", err)
return err
}
} else {
r.log.Debugf("attempt %v out of %v succeeded", attempt, maxAttempts)
return nil
}
}
return err
}

func waitWithContext(ctx context.Context, to time.Duration) error {
t := time.NewTimer(to)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
}
return nil
}
139 changes: 139 additions & 0 deletions x-pack/osquerybeat/internal/osqdcli/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package osqdcli

import (
"context"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestRetryRun(t *testing.T) {
logp.Configure(logp.Config{
Level: logp.DebugLevel,
ToStderr: true,
Selectors: []string{"*"},
})

log := logp.NewLogger("retry_test").With("context", "osquery client connect")
ctx := context.Background()

type fields struct {
maxRetry int
retryWait time.Duration
log *logp.Logger
}

type args struct {
ctx context.Context
fn tryFunc
}

argsWithFunc := func(fn tryFunc) args {
return args{
ctx: ctx,
fn: fn,
}
}

funcSucceedsOnNAttempt := func(attempt int) func(context.Context) error {
curAttempt := 1
return func(ctx context.Context) error {
if curAttempt == attempt {
return nil
}
curAttempt++
return ErrAlreadyConnected
}
}

tests := []struct {
name string
fields fields
args args
wantErr error
}{
{
name: "no retries, no wait, success",
fields: fields{
log: log,
},
args: argsWithFunc(func(ctx context.Context) error {
return nil
}),
},
{
name: "no retries, no wait, error",
fields: fields{
log: log,
},
args: argsWithFunc(func(ctx context.Context) error {
return ErrAlreadyConnected
}),
wantErr: ErrAlreadyConnected,
},
{
name: "retries, no wait, no more retries fails",
fields: fields{
maxRetry: 3,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(8)),
wantErr: ErrAlreadyConnected,
},
{
name: "retries, no wait, success",
fields: fields{
maxRetry: 3,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
},
{
name: "retries, with wait, success",
fields: fields{
maxRetry: 3,
retryWait: 1 * time.Millisecond,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
},
{
name: "retries, with wait, success sooner",
fields: fields{
maxRetry: 3,
retryWait: 1 * time.Millisecond,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(2)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &retry{
maxRetry: tt.fields.maxRetry,
retryWait: tt.fields.retryWait,
log: tt.fields.log,
}
err := r.Run(tt.args.ctx, tt.args.fn)
if err != nil {
if tt.wantErr != nil {
diff := cmp.Diff(tt.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Error(diff)
}
} else {
t.Errorf("got err: %v, wantErr: nil", err)
}
} else if tt.wantErr != nil {
t.Errorf("got err: nil, wantErr: %v", tt.wantErr)
}
})
}
}

0 comments on commit dafb447

Please sign in to comment.