Skip to content

Commit

Permalink
beater: refactor tracerServer (#4628) (#4778)
Browse files Browse the repository at this point in the history
When apm-server is configured to receive its own
trace events, it starts a traceServer. To enable
reloadable config (starting/stopping the publisher,
HTTP server, etc. multiple times) we need to support
changing the publish.Reporter function used by the
traceServer at runtime.

We update traceServer to send events to another
channel, and then separate running traceServer from
serving the event publication requests.

Co-authored-by: Juan Álvarez <juan.alvarez@elastic.co>

Co-authored-by: Juan Álvarez <juan.alvarez@elastic.co>
  • Loading branch information
axw and jalvz committed Feb 18, 2021
1 parent 51e1d8d commit a8bfa31
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 55 deletions.
34 changes: 19 additions & 15 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ func (bt *beater) Run(b *beat.Beat) error {
}
recordAPMServerConfig(bt.config)

tracer, tracerServer, err := bt.initTracing(b)
tracer, tracerServer, err := initTracing(b, bt.config, bt.logger)
if err != nil {
return err
}
if tracer != nil {
defer tracer.Close()
if tracerServer != nil {
defer tracerServer.Close()
}
}

runServer := runServer
if tracerServer != nil {
Expand Down Expand Up @@ -323,19 +329,26 @@ func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
return err
}

func (bt *beater) initTracing(b *beat.Beat) (*apm.Tracer, *tracerServer, error) {
func initTracing(b *beat.Beat, cfg *config.Config, logger *logp.Logger) (*apm.Tracer, *tracerServer, error) {
var err error
tracer := b.Instrumentation.Tracer()
listener := b.Instrumentation.Listener()

if !tracer.Active() && bt.config != nil {
tracer, listener, err = initLegacyTracer(b.Info, bt.config)
if !tracer.Active() && cfg != nil {
tracer, listener, err = initLegacyTracer(b.Info, cfg)
if err != nil {
return nil, nil, err
}
}

tracerServer := newTracerServer(bt.config, listener)
var tracerServer *tracerServer
if listener != nil {
var err error
tracerServer, err = newTracerServer(listener, logger)
if err != nil {
return nil, nil, err
}
}
return tracer, tracerServer, nil
}

Expand Down Expand Up @@ -392,16 +405,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
return func(ctx context.Context, args ServerParams) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer tracerServer.stop()
<-ctx.Done()
// Close the tracer now to prevent the server
// from waiting for more events during graceful
// shutdown.
tracer.Close()
return nil
})
g.Go(func() error {
return tracerServer.serve(args.Reporter)
return tracerServer.serve(ctx, args.Reporter)
})
g.Go(func() error {
return runServer(ctx, args)
Expand Down
90 changes: 50 additions & 40 deletions beater/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,70 +22,80 @@ import (
"net"
"net/http"

"go.elastic.co/apm"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/apm-server/beater/api"
"github.com/elastic/apm-server/beater/config"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/publish"
)

func init() {
apm.DefaultTracer.Close()
}

type tracerServer struct {
cfg *config.Config
logger *logp.Logger
server *http.Server
listener net.Listener
logger *logp.Logger
requests <-chan tracerServerRequest
}

func newTracerServer(cfg *config.Config, listener net.Listener) *tracerServer {
if listener == nil {
return nil
func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, error) {
requests := make(chan tracerServerRequest)
report := func(ctx context.Context, req publish.PendingReq) error {
result := make(chan error, 1)
request := tracerServerRequest{ctx: ctx, req: req, res: result}
select {
case <-ctx.Done():
return ctx.Err()
case requests <- request:
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-result:
return err
}
}
cfg := config.DefaultConfig()
mux, err := api.NewMux(cfg, report)
if err != nil {
return nil, err
}

cfgCopy := *cfg // Copy cfg so we can disable auth
cfg = &cfgCopy
cfg.SecretToken = ""
cfg.APIKeyConfig = nil

server := &http.Server{
Handler: mux,
IdleTimeout: cfg.IdleTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
MaxHeaderBytes: cfg.MaxHeaderSize,
}

go func() {
if err := server.Serve(listener); err != http.ErrServerClosed {
logger.Error(err.Error())
}
}()
return &tracerServer{
cfg: cfg,
logger: logp.NewLogger(logs.Beater),
server: server,
listener: listener,
}
logger: logger,
requests: requests,
}, nil
}

func (s *tracerServer) serve(report publish.Reporter) error {
mux, err := api.NewMux(s.cfg, report)
if err != nil {
return err
}
s.server.Handler = mux
if err := s.server.Serve(s.listener); err != http.ErrServerClosed {
return err
}
return nil
// Close closes the tracerServer's listener.
func (s *tracerServer) Close() error {
return s.server.Shutdown(context.Background())
}

func (s *tracerServer) stop() {
err := s.server.Shutdown(context.Background())
if err != nil {
s.logger.Error(err.Error())
if err := s.server.Close(); err != nil {
s.logger.Error(err.Error())
// serve serves event publication requests for the tracer server. This may be
// called multiple times in series, but not concurrently.
func (s *tracerServer) serve(ctx context.Context, report publish.Reporter) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-s.requests:
req.res <- report(req.ctx, req.req)
}
}
}

type tracerServerRequest struct {
ctx context.Context
req publish.PendingReq
res chan<- error
}

0 comments on commit a8bfa31

Please sign in to comment.