Skip to content

Commit

Permalink
Make agent timeout for reporting configurable (#1034)
Browse files Browse the repository at this point in the history
* agent: Make timeout for reporting configurable

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Fix tests and make sure deprecated flags work

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
  • Loading branch information
gouthamve authored and pavolloffay committed Nov 23, 2018
1 parent 4ef95af commit 2096f06
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr

func initCollectorAndReporter(t *testing.T) (*metrics.LocalFactory, *testutils.MockTCollector, reporter.Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := tchreporter.New("jaeger-collector", collector.Channel, nil, metricsFactory, zap.NewNop())
reporter := tchreporter.New("jaeger-collector", collector.Channel, time.Second, nil, metricsFactory, zap.NewNop())
return metricsFactory, collector, reporter
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/agent/app/reporter/tchannel/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

agentServiceName = "jaeger-agent"
defaultCollectorServiceName = "jaeger-collector"
defaultReportTimeout = time.Second
)

// Builder Struct to hold configurations
Expand All @@ -49,6 +50,9 @@ type Builder struct {
// ConnCheckTimeout is the timeout used when establishing new connections.
ConnCheckTimeout time.Duration

// ReportTimeout is the timeout used when reporting span batches.
ReportTimeout time.Duration

discoverer discovery.Discoverer
notifier discovery.Notifier
channel *tchannel.Channel
Expand Down Expand Up @@ -119,11 +123,15 @@ func (b *Builder) CreateReporter(mFactory metrics.Factory, logger *zap.Logger) (
b = b.WithDiscoverer(d).WithDiscoveryNotifier(&discovery.Dispatcher{})
}

if b.ReportTimeout == 0 {
b.ReportTimeout = defaultReportTimeout
}

peerListMgr, err := b.enableDiscovery(b.channel, logger)
if err != nil {
return nil, errors.Wrap(err, "cannot enable service discovery")
}
return New(b.CollectorServiceName, b.channel, peerListMgr, mFactory, logger), nil
return New(b.CollectorServiceName, b.channel, b.ReportTimeout, peerListMgr, mFactory, logger), nil
}

func defaultInt(value int, defaultVal int) int {
Expand Down
15 changes: 13 additions & 2 deletions cmd/agent/app/reporter/tchannel/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
hostPort = "host-port"
discoveryMinPeers = "discovery.min-peers"
discoveryConnCheckTimeout = "discovery.conn-check-timeout"
reportTimeout = "report-timeout"
)

// AddFlags adds flags for Builder.
Expand All @@ -46,6 +47,10 @@ func AddFlags(flags *flag.FlagSet) {
tchannelPrefix+discoveryConnCheckTimeout,
defaultConnCheckTimeout,
"sets the timeout used when establishing new connections")
flags.Duration(
tchannelPrefix+reportTimeout,
time.Second,
"sets the timeout used when reporting spans")
// TODO remove deprecated in 1.9
flags.String(
collectorHostPort,
Expand Down Expand Up @@ -79,7 +84,13 @@ func (b *Builder) InitFromViper(v *viper.Viper, logger *zap.Logger) *Builder {
if len(v.GetString(tchannelPrefix+hostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(tchannelPrefix+hostPort), ",")
}
b.DiscoveryMinPeers = v.GetInt(tchannelPrefix + discoveryMinPeers)
b.ConnCheckTimeout = v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout)

if value := v.GetInt(tchannelPrefix + discoveryMinPeers); value != defaultMinPeers {
b.DiscoveryMinPeers = value
}
if value := v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout); value != defaultConnCheckTimeout {
b.ConnCheckTimeout = value
}
b.ReportTimeout = v.GetDuration(tchannelPrefix + reportTimeout)
return b
}
28 changes: 16 additions & 12 deletions cmd/agent/app/reporter/tchannel/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ import (
)

func TestBingFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

tests := []struct {
flags []string
builder Builder
Expand All @@ -42,27 +35,38 @@ func TestBingFlags(t *testing.T) {
"--reporter.tchannel.host-port=1.2.3.4:555,1.2.3.4:666",
"--reporter.tchannel.discovery.min-peers=42",
"--reporter.tchannel.discovery.conn-check-timeout=85s",
}, builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
"--reporter.tchannel.report-timeout=80s",
}, builder: Builder{ConnCheckTimeout: time.Second * 85, ReportTimeout: time.Second * 80, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
},
{flags: []string{
"--collector.host-port=1.2.3.4:555,1.2.3.4:666",
"--discovery.min-peers=42",
"--discovery.min-peers=45",
"--discovery.conn-check-timeout=85s",
},
builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
builder: Builder{ConnCheckTimeout: time.Second * 85, ReportTimeout: defaultReportTimeout, DiscoveryMinPeers: 45, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
},
{flags: []string{
"--collector.host-port=1.2.3.4:555,1.2.3.4:666",
"--discovery.min-peers=42",
"--discovery.min-peers=46",
"--discovery.conn-check-timeout=85s",
"--reporter.tchannel.host-port=1.2.3.4:5556,1.2.3.4:6667",
"--reporter.tchannel.discovery.min-peers=43",
"--reporter.tchannel.discovery.conn-check-timeout=86s",
},
builder: Builder{ConnCheckTimeout: time.Second * 86, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}},
builder: Builder{ConnCheckTimeout: time.Second * 86, ReportTimeout: defaultReportTimeout, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}},
},
}
for _, test := range tests {
// Reset flags every iteration.
v := viper.New()
command := cobra.Command{}

flags := &flag.FlagSet{}
AddFlags(flags)
command.ResetFlags()
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags(test.flags)
require.NoError(t, err)
b := Builder{}
Expand Down
5 changes: 4 additions & 1 deletion cmd/agent/app/reporter/tchannel/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Reporter struct {
channel *tchannel.Channel
zClient zipkincore.TChanZipkinCollector
jClient jaeger.TChanCollector
reportTimeout time.Duration
peerListMgr *peerlistmgr.PeerListManager
batchesMetrics map[string]batchMetrics
logger *zap.Logger
Expand All @@ -64,6 +65,7 @@ type Reporter struct {
func New(
collectorServiceName string,
channel *tchannel.Channel,
reportTimeout time.Duration,
peerListMgr *peerlistmgr.PeerListManager,
mFactory metrics.Factory,
zlogger *zap.Logger,
Expand All @@ -81,6 +83,7 @@ func New(
channel: channel,
zClient: zClient,
jClient: jClient,
reportTimeout: reportTimeout,
peerListMgr: peerListMgr,
logger: zlogger,
batchesMetrics: batchesMetrics,
Expand Down Expand Up @@ -122,7 +125,7 @@ func (r *Reporter) EmitBatch(batch *jaeger.Batch) error {
}

func (r *Reporter) submitAndReport(submissionFunc func(ctx thrift.Context) error, errMsg string, size int64, batchMetrics batchMetrics) error {
ctx, cancel := tchannel.NewContextBuilder(time.Second).DisableTracing().Build()
ctx, cancel := tchannel.NewContextBuilder(r.reportTimeout).DisableTracing().Build()
defer cancel()

if err := submissionFunc(ctx); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/agent/app/reporter/tchannel/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tchannel

import (
"fmt"
"testing"
"time"

Expand All @@ -31,7 +32,7 @@ import (

func initRequirements(t *testing.T) (*metrics.LocalFactory, *testutils.MockTCollector, *Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := New("jaeger-collector", collector.Channel, nil, metricsFactory, zap.NewNop())
reporter := New("jaeger-collector", collector.Channel, time.Second, nil, metricsFactory, zap.NewNop())
return metricsFactory, collector, reporter
}

Expand Down Expand Up @@ -105,6 +106,7 @@ func submitTestJaegerBatch(reporter *Reporter) error {
}

func checkCounters(t *testing.T, mf *metrics.LocalFactory, batchesSubmitted, spansSubmitted, batchesFailures, spansFailures int, format string) {
fmt.Println(mf.Snapshot())
mTestutils.AssertCounterMetrics(t, mf, []mTestutils.ExpectedMetric{
{Name: "tchannel-reporter.batches.submitted", Tags: map[string]string{"format": format}, Value: batchesSubmitted},
{Name: "tchannel-reporter.spans.submitted", Tags: map[string]string{"format": format}, Value: spansSubmitted},
Expand Down

0 comments on commit 2096f06

Please sign in to comment.