Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[agent] Make timeout for reporting configurable #1034

Merged
merged 2 commits into from
Nov 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr

func initCollectorAndReporter(t *testing.T) (*metrics.LocalFactory, *testutils.MockTCollector, reporter.Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := reporter.WrapWithMetrics(tchreporter.New("jaeger-collector", collector.Channel, nil, zap.NewNop()), metricsFactory)
reporter := reporter.WrapWithMetrics(tchreporter.New("jaeger-collector", collector.Channel, time.Second, nil, zap.NewNop()), metricsFactory)
return metricsFactory, collector, reporter
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/agent/app/reporter/tchannel/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (

agentServiceName = "jaeger-agent"
defaultCollectorServiceName = "jaeger-collector"
defaultReportTimeout = time.Second
)

// Builder Struct to hold configurations
Expand All @@ -48,6 +49,9 @@ type Builder struct {
// ConnCheckTimeout is the timeout used when establishing new connections.
ConnCheckTimeout time.Duration

// ReportTimeout is the timeout used when reporting span batches.
ReportTimeout time.Duration

discoverer discovery.Discoverer
notifier discovery.Notifier
channel *tchannel.Channel
Expand Down Expand Up @@ -118,11 +122,15 @@ func (b *Builder) CreateReporter(logger *zap.Logger) (*Reporter, error) {
b = b.WithDiscoverer(d).WithDiscoveryNotifier(&discovery.Dispatcher{})
}

if b.ReportTimeout == 0 {
b.ReportTimeout = defaultReportTimeout
}

peerListMgr, err := b.enableDiscovery(b.channel, logger)
if err != nil {
return nil, errors.Wrap(err, "cannot enable service discovery")
}
return New(b.CollectorServiceName, b.channel, peerListMgr, logger), nil
return New(b.CollectorServiceName, b.channel, b.ReportTimeout, peerListMgr, logger), nil
}

func defaultInt(value int, defaultVal int) int {
Expand Down
15 changes: 13 additions & 2 deletions cmd/agent/app/reporter/tchannel/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
hostPort = "host-port"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
discoveryMinPeers = "discovery.min-peers"
discoveryConnCheckTimeout = "discovery.conn-check-timeout"
reportTimeout = "report-timeout"
)

// AddFlags adds flags for Builder.
Expand All @@ -46,6 +47,10 @@ func AddFlags(flags *flag.FlagSet) {
tchannelPrefix+discoveryConnCheckTimeout,
defaultConnCheckTimeout,
"sets the timeout used when establishing new connections")
flags.Duration(
tchannelPrefix+reportTimeout,
time.Second,
"sets the timeout used when reporting spans")
// TODO remove deprecated in 1.9
flags.String(
collectorHostPort,
Expand Down Expand Up @@ -79,7 +84,13 @@ func (b *Builder) InitFromViper(v *viper.Viper, logger *zap.Logger) *Builder {
if len(v.GetString(tchannelPrefix+hostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(tchannelPrefix+hostPort), ",")
}
b.DiscoveryMinPeers = v.GetInt(tchannelPrefix + discoveryMinPeers)
b.ConnCheckTimeout = v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout)

if value := v.GetInt(tchannelPrefix + discoveryMinPeers); value != defaultMinPeers {
b.DiscoveryMinPeers = value
}
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
if value := v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout); value != defaultConnCheckTimeout {
b.ConnCheckTimeout = value
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking this does not solve the overriding problem if the manually provided value was the same as the default value, but it's still an improvement. The key thing is that we just need to remove the legacy flags soon.

b.ReportTimeout = v.GetDuration(tchannelPrefix + reportTimeout)
return b
}
28 changes: 16 additions & 12 deletions cmd/agent/app/reporter/tchannel/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ import (
)

func TestBingFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

tests := []struct {
flags []string
builder Builder
Expand All @@ -42,27 +35,38 @@ func TestBingFlags(t *testing.T) {
"--reporter.tchannel.host-port=1.2.3.4:555,1.2.3.4:666",
"--reporter.tchannel.discovery.min-peers=42",
"--reporter.tchannel.discovery.conn-check-timeout=85s",
}, builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
"--reporter.tchannel.report-timeout=80s",
}, builder: Builder{ConnCheckTimeout: time.Second * 85, ReportTimeout: time.Second * 80, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
},
{flags: []string{
"--collector.host-port=1.2.3.4:555,1.2.3.4:666",
"--discovery.min-peers=42",
"--discovery.min-peers=45",
"--discovery.conn-check-timeout=85s",
},
builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
builder: Builder{ConnCheckTimeout: time.Second * 85, ReportTimeout: defaultReportTimeout, DiscoveryMinPeers: 45, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}},
},
{flags: []string{
"--collector.host-port=1.2.3.4:555,1.2.3.4:666",
"--discovery.min-peers=42",
"--discovery.min-peers=46",
"--discovery.conn-check-timeout=85s",
"--reporter.tchannel.host-port=1.2.3.4:5556,1.2.3.4:6667",
"--reporter.tchannel.discovery.min-peers=43",
"--reporter.tchannel.discovery.conn-check-timeout=86s",
},
builder: Builder{ConnCheckTimeout: time.Second * 86, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}},
builder: Builder{ConnCheckTimeout: time.Second * 86, ReportTimeout: defaultReportTimeout, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}},
},
}
for _, test := range tests {
// Reset flags every iteration.
v := viper.New()
command := cobra.Command{}

flags := &flag.FlagSet{}
AddFlags(flags)
command.ResetFlags()
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags(test.flags)
require.NoError(t, err)
b := Builder{}
Expand Down
29 changes: 16 additions & 13 deletions cmd/agent/app/reporter/tchannel/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,34 @@ type batchMetrics struct {

// Reporter forwards received spans to central collector tier over TChannel.
type Reporter struct {
channel *tchannel.Channel
zClient zipkincore.TChanZipkinCollector
jClient jaeger.TChanCollector
peerListMgr *peerlistmgr.PeerListManager
logger *zap.Logger
serviceName string
channel *tchannel.Channel
zClient zipkincore.TChanZipkinCollector
jClient jaeger.TChanCollector
reportTimeout time.Duration
peerListMgr *peerlistmgr.PeerListManager
logger *zap.Logger
serviceName string
}

// New creates new TChannel-based Reporter.
func New(
collectorServiceName string,
channel *tchannel.Channel,
reportTimeout time.Duration,
peerListMgr *peerlistmgr.PeerListManager,
zlogger *zap.Logger,
) *Reporter {
thriftClient := thrift.NewClient(channel, collectorServiceName, nil)
zClient := zipkincore.NewTChanZipkinCollectorClient(thriftClient)
jClient := jaeger.NewTChanCollectorClient(thriftClient)
return &Reporter{
channel: channel,
zClient: zClient,
jClient: jClient,
peerListMgr: peerListMgr,
logger: zlogger,
serviceName: collectorServiceName,
channel: channel,
zClient: zClient,
jClient: jClient,
reportTimeout: reportTimeout,
peerListMgr: peerListMgr,
logger: zlogger,
serviceName: collectorServiceName,
}
}

Expand Down Expand Up @@ -111,7 +114,7 @@ func (r *Reporter) EmitBatch(batch *jaeger.Batch) error {
}

func (r *Reporter) submitAndReport(submissionFunc func(ctx thrift.Context) error, errMsg string, size int64) error {
ctx, cancel := tchannel.NewContextBuilder(time.Second).DisableTracing().Build()
ctx, cancel := tchannel.NewContextBuilder(r.reportTimeout).DisableTracing().Build()
defer cancel()

if err := submissionFunc(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func initRequirements(t *testing.T) (*metrics.LocalFactory, *testutils.MockTCollector, *Reporter) {
metricsFactory, collector := testutils.InitMockCollector(t)
reporter := New("jaeger-collector", collector.Channel, nil, zap.NewNop())
reporter := New("jaeger-collector", collector.Channel, time.Second, nil, zap.NewNop())
return metricsFactory, collector, reporter
}

Expand Down