Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for dynamic queue sizes #1985

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

const (
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorQueueSize = "collector.queue-size"
collectorNumWorkers = "collector.num-workers"
collectorPort = "collector.port"
Expand All @@ -46,6 +47,8 @@ var tlsFlagsConfig = tlscfg.ServerFlagsConfig{

// CollectorOptions holds configuration for collector
type CollectorOptions struct {
// DynQueueSizeMemory determines how much memory to use for the queue
DynQueueSizeMemory uint
// QueueSize is the size of collector's queue
QueueSize int
// NumWorkers is the number of internal workers in a collector
Expand All @@ -70,6 +73,7 @@ type CollectorOptions struct {

// AddFlags adds flags for CollectorOptions
func AddFlags(flags *flag.FlagSet) {
flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.Int(collectorQueueSize, app.DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service")
Expand All @@ -84,6 +88,7 @@ func AddFlags(flags *flag.FlagSet) {

// InitFromViper initializes CollectorOptions with properties from viper
func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.CollectorPort = v.GetInt(collectorPort)
Expand Down
2 changes: 2 additions & 0 deletions cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (b *SpanHandlerBuilder) BuildHandlers() (
app.Options.NumWorkers(b.CollectorOpts.NumWorkers),
app.Options.QueueSize(b.CollectorOpts.QueueSize),
app.Options.CollectorTags(b.CollectorOpts.CollectorTags),
app.Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now
app.Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
)

return app.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
Expand Down
8 changes: 7 additions & 1 deletion cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ type SpanProcessorMetrics struct {
InQueueLatency metrics.Timer
// SpansDropped measures the number of spans we discarded because the queue was full
SpansDropped metrics.Counter
// SpansBytes records how many bytes were processed
SpansBytes metrics.Gauge
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the comment sounds like this should be a counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it, but I'm keeping track of this in a local var, and this metric reflects the value of that var.

// BatchSize measures the span batch size
BatchSize metrics.Gauge // size of span batch
// QueueLength measures the size of the internal span queue
// QueueCapacity measures the capacity of the internal span queue
QueueCapacity metrics.Gauge
// QueueLength measures the current number of elements in the internal span queue
QueueLength metrics.Gauge
// SavedOkBySvc contains span and trace counts by service
SavedOkBySvc metricsBySvc // spans actually saved
Expand Down Expand Up @@ -150,7 +154,9 @@ func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics
InQueueLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "in-queue-latency", Tags: nil}),
SpansDropped: hostMetrics.Counter(metrics.Options{Name: "spans.dropped", Tags: nil}),
BatchSize: hostMetrics.Gauge(metrics.Options{Name: "batch-size", Tags: nil}),
QueueCapacity: hostMetrics.Gauge(metrics.Options{Name: "queue-capacity", Tags: nil}),
QueueLength: hostMetrics.Gauge(metrics.Options{Name: "queue-length", Tags: nil}),
SpansBytes: hostMetrics.Gauge(metrics.Options{Name: "spans.bytes", Tags: nil}),
SavedOkBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "ok"}}), "saved-by-svc"),
SavedErrBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "err"}}), "saved-by-svc"),
spanCounts: spanCounts,
Expand Down
42 changes: 29 additions & 13 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,21 @@ const (
)

type options struct {
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
reportBusy bool
extraFormatTypes []SpanFormat
collectorTags map[string]string
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
reportBusy bool
extraFormatTypes []SpanFormat
collectorTags map[string]string
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -122,6 +124,20 @@ func (options) QueueSize(queueSize int) Option {
}
}

// DynQueueSize creates an Option that initializes the queue size
func (options) DynQueueSizeWarmup(dynQueueSizeWarmup uint) Option {
return func(b *options) {
b.dynQueueSizeWarmup = dynQueueSizeWarmup
}
}

// DynQueueSize creates an Option that initializes the queue size
func (options) DynQueueSizeMemory(dynQueueSizeMemory uint) Option {
return func(b *options) {
b.dynQueueSizeMemory = dynQueueSizeMemory
}
}

// ReportBusy creates an Option that initializes the reportBusy boolean
func (options) ReportBusy(reportBusy bool) Option {
return func(b *options) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ func TestAllOptionSet(t *testing.T) {
Options.PreProcessSpans(func(spans []*model.Span) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Options.DynQueueSizeMemory(1024),
Options.PreSave(func(span *model.Span) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
)
assert.EqualValues(t, 5, opts.numWorkers)
assert.EqualValues(t, 10, opts.queueSize)
assert.EqualValues(t, map[string]string{"extra": "tags"}, opts.collectorTags)
assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup)
assert.EqualValues(t, 1024, opts.dynQueueSizeMemory)
}

func TestNoOptionsSet(t *testing.T) {
Expand All @@ -59,4 +63,5 @@ func TestNoOptionsSet(t *testing.T) {
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
assert.EqualValues(t, &span, opts.sanitizer(&span))
assert.EqualValues(t, 0, opts.dynQueueSizeWarmup)
}
156 changes: 129 additions & 27 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package app

import (
"sync"
"time"

tchannel "github.com/uber/tchannel-go"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
Expand All @@ -27,6 +29,14 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
// if this proves to be too low, we can increase it
maxQueueSize = 1_000_000

// if the new queue size isn't 20% bigger than the previous one, don't change
minRequiredChange = 1.2
)

// ProcessSpansOptions additional options passed to processor along with the spans.
type ProcessSpansOptions struct {
SpanFormat SpanFormat
Expand All @@ -40,17 +50,23 @@ type SpanProcessor interface {
}

type spanProcessor struct {
queue *queue.BoundedQueue
metrics *SpanProcessorMetrics
preProcessSpans ProcessSpans
filterSpan FilterSpan // filter is called before the sanitizer but after preProcessSpans
sanitizer sanitizer.SanitizeSpan // sanitizer is called before processSpan
processSpan ProcessSpan
logger *zap.Logger
spanWriter spanstore.Writer
reportBusy bool
numWorkers int
collectorTags map[string]string
queue *queue.BoundedQueue
queueResizeMu sync.Mutex
metrics *SpanProcessorMetrics
preProcessSpans ProcessSpans
filterSpan FilterSpan // filter is called before the sanitizer but after preProcessSpans
sanitizer sanitizer.SanitizeSpan // sanitizer is called before processSpan
processSpan ProcessSpan
logger *zap.Logger
spanWriter spanstore.Writer
reportBusy bool
numWorkers int
collectorTags map[string]string
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
bytesProcessed *atomic.Uint64
spansProcessed *atomic.Uint64
stopCh chan struct{}
}

type queueItem struct {
Expand All @@ -70,7 +86,11 @@ func NewSpanProcessor(
sp.processItemFromQueue(value)
})

sp.queue.StartLengthReporting(1*time.Second, sp.metrics.QueueLength)
sp.background(1*time.Second, sp.updateGauges)

if sp.dynQueueSizeWarmup > 0 {
sp.background(1*time.Minute, sp.updateQueueSize)
}

return sp
}
Expand All @@ -87,27 +107,39 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)

sp := spanProcessor{
queue: boundedQueue,
metrics: handlerMetrics,
logger: options.logger,
preProcessSpans: options.preProcessSpans,
filterSpan: options.spanFilter,
sanitizer: options.sanitizer,
reportBusy: options.reportBusy,
numWorkers: options.numWorkers,
spanWriter: spanWriter,
collectorTags: options.collectorTags,
}
sp.processSpan = ChainedProcessSpan(
options.preSave,
sp.saveSpan,
)
queue: boundedQueue,
metrics: handlerMetrics,
logger: options.logger,
preProcessSpans: options.preProcessSpans,
filterSpan: options.spanFilter,
sanitizer: options.sanitizer,
reportBusy: options.reportBusy,
numWorkers: options.numWorkers,
spanWriter: spanWriter,
collectorTags: options.collectorTags,
stopCh: make(chan struct{}),
dynQueueSizeMemory: options.dynQueueSizeMemory,
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
dynQueueSizeWarmup: options.dynQueueSizeWarmup,
bytesProcessed: atomic.NewUint64(0),
spansProcessed: atomic.NewUint64(0),
}

processSpanFuncs := []ProcessSpan{options.preSave, sp.saveSpan}
if options.dynQueueSizeWarmup > 0 {
// add to processSpanFuncs
options.logger.Info("Dynamically adjusting the queue size at runtime.",
zap.Uint("memory-mib", options.dynQueueSizeMemory/1024/1024),
zap.Uint("queue-size-warmup", options.dynQueueSizeWarmup))
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
}

sp.processSpan = ChainedProcessSpan(processSpanFuncs...)
return &sp
}

// Stop halts the span processor and all its go-routines.
func (sp *spanProcessor) Stop() {
close(sp.stopCh)
sp.queue.Stop()
}

Expand All @@ -130,6 +162,11 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
sp.metrics.SaveLatency.Record(time.Since(startTime))
}

func (sp *spanProcessor) countSpan(span *model.Span) {
sp.bytesProcessed.Add(uint64(span.Size()))
sp.spansProcessed.Inc()
}

func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) {
sp.preProcessSpans(mSpans)
sp.metrics.BatchSize.Update(int64(len(mSpans)))
Expand Down Expand Up @@ -177,3 +214,68 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat
}
return sp.queue.Produce(item)
}

func (sp *spanProcessor) background(reportPeriod time.Duration, callback func()) {
go func() {
ticker := time.NewTicker(reportPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
callback()
case <-sp.stopCh:
return
}
}
}()
}

func (sp *spanProcessor) updateQueueSize() {
if sp.dynQueueSizeWarmup == 0 {
return
}

if sp.dynQueueSizeMemory == 0 {
sp.logger.Warn("The dynamic queue size warmup value is set, but not the amount of memory to use. Skipping.")
return
}

if sp.spansProcessed.Load() < uint64(sp.dynQueueSizeWarmup) {
return
}

sp.queueResizeMu.Lock()
defer sp.queueResizeMu.Unlock()

// first, we get the average size of a span, by dividing the bytes processed by num of spans
average := sp.bytesProcessed.Load() / sp.spansProcessed.Load()

// finally, we divide the available memory by the average size of a span
idealQueueSize := float64(sp.dynQueueSizeMemory / uint(average))

// cap the queue size, just to be safe...
if idealQueueSize > maxQueueSize {
idealQueueSize = maxQueueSize
}

var diff float64
current := float64(sp.queue.Capacity())
if idealQueueSize > current {
diff = idealQueueSize / current
} else {
diff = current / idealQueueSize
}

// resizing is a costly operation, we only perform it if we are at least n% apart from the desired value
if diff > minRequiredChange {
s := int(idealQueueSize)
sp.logger.Info("Resizing the internal span queue", zap.Int("new-size", s), zap.Uint64("average-span-size-bytes", average))
sp.queue.Resize(s)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (sp *spanProcessor) updateGauges() {
sp.metrics.SpansBytes.Update(int64(sp.bytesProcessed.Load()))
sp.metrics.QueueLength.Update(int64(sp.queue.Size()))
sp.metrics.QueueCapacity.Update(int64(sp.queue.Capacity()))
}
Loading