Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Processor: Log Support #1723

Merged
merged 1 commit into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions consumer/pdata/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ func (ld Logs) LogRecordCount() int {
return logCount
}

// SizeBytes returns the number of bytes in the internal representation of the
// logs.
func (ld Logs) SizeBytes() int {
size := 0
for i := range *ld.orig {
size += (*ld.orig)[i].Size()
}
return size
}

func (ld Logs) ResourceLogs() ResourceLogsSlice {
return ResourceLogsSlice(ld)
}
Expand Down
97 changes: 97 additions & 0 deletions go.sum

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions internal/data/testdata/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,19 @@ func generateOtlpLogThree() *otlplogs.LogRecord {
Body: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "something else happened"}},
}
}

func GenerateLogDataManyLogsSameResource(count int) pdata.Logs {
ld := GenerateLogDataOneEmptyLogs()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
rs0.InstrumentationLibraryLogs().At(0).Logs().Resize(count)
for i := 0; i < count; i++ {
l := rs0.InstrumentationLibraryLogs().At(0).Logs().At(i)
if i%2 == 0 {
fillLogOne(l)
} else {
fillLogTwo(l)
}
}
return ld
}
10 changes: 5 additions & 5 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Batch Processor

Supported pipeline types: metric, traces
Supported pipeline types: metric, traces, logs

The batch processor accepts spans or metrics and places them into batches.
Batching helps better compress the data and reduce the number of outgoing
connections required to transmit the data. This processor supports both size and
time based batching.
The batch processor accepts spans, metrics, or logs and places them into
batches. Batching helps better compress the data and reduce the number of
outgoing connections required to transmit the data. This processor supports
both size and time based batching.

It is highly recommended to configure the batch processor on every collector.
The batch processor should be defined in the pipeline after the `memory_limiter`
Expand Down
53 changes: 53 additions & 0 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type batch interface {

var _ consumer.TraceConsumer = (*batchProcessor)(nil)
var _ consumer.MetricsConsumer = (*batchProcessor)(nil)
var _ consumer.LogsConsumer = (*batchProcessor)(nil)

func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel telemetry.Level) *batchProcessor {
return &batchProcessor{
Expand Down Expand Up @@ -182,6 +183,12 @@ func (bp *batchProcessor) ConsumeMetrics(_ context.Context, md pdata.Metrics) er
return nil
}

// ConsumeLogs implements LogsProcessor
func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
bp.newItem <- ld
return nil
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TraceConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchTraces(trace), telemetryLevel)
Expand All @@ -192,6 +199,11 @@ func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics co
return newBatchProcessor(params, cfg, newBatchMetrics(metrics), telemetryLevel)
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.LogsConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchLogs(logs), telemetryLevel)
}

type batchTraces struct {
nextConsumer consumer.TraceConsumer
traceData pdata.Traces
Expand Down Expand Up @@ -274,3 +286,44 @@ func (bm *batchMetrics) add(item interface{}) {
bm.metricCount += uint32(newMetricsCount)
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

type batchLogs struct {
nextConsumer consumer.LogsConsumer
logData pdata.Logs
logCount uint32
}

func newBatchLogs(nextConsumer consumer.LogsConsumer) *batchLogs {
b := &batchLogs{nextConsumer: nextConsumer}
b.reset()
return b
}

func (bm *batchLogs) export(ctx context.Context) error {
return bm.nextConsumer.ConsumeLogs(ctx, bm.logData)
}

func (bm *batchLogs) itemCount() uint32 {
return bm.logCount
}

func (bm *batchLogs) size() int {
return bm.logData.SizeBytes()
}

// resets the current batchLogs structure with zero/empty values.
func (bm *batchLogs) reset() {
bm.logData = pdata.NewLogs()
bm.logCount = 0
}

func (bm *batchLogs) add(item interface{}) {
ld := item.(pdata.Logs)

newLogsCount := ld.LogRecordCount()
if newLogsCount == 0 {
return
}
bm.logCount += uint32(newLogsCount)
ld.ResourceLogs().MoveAndAppendTo(bm.logData.ResourceLogs())
}
211 changes: 211 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,214 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
td.SpanCount()
}
}

func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
logsPerRequest := 5
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

logDataSlice := make([]pdata.Logs, 0, requestCount)

for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
logs := ld.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
logs.At(logIndex).SetName(getTestLogName(requestNum, logIndex))
}
logDataSlice = append(logDataSlice, ld.Clone())
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

// Added to test case with empty resources sent.
ld := testdata.GenerateLogDataEmpty()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
logsReceivedByName := logsReceivedByName(receivedMds)
for requestNum := 0; requestNum < requestCount; requestNum++ {
logs := logDataSlice[requestNum].ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
require.EqualValues(t,
logs.At(logIndex),
logsReceivedByName[getTestLogName(requestNum, logIndex)])
}
}
}

func TestBatchLogProcessor_BatchSize(t *testing.T) {
views := MetricViews(telemetry.Detailed)
view.Register(views...)
defer view.Unregister(views...)

// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
logsPerRequest := 5
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
size += ld.SizeBytes()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}
require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).InstrumentationLibraryLogs().At(0).Logs().Len())
}
}

viewData, err := view.RetrieveData(statBatchSendSize.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.LogRecordsCount(), int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))

viewData, err = view.RetrieveData(statBatchSendSizeBytes.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData = viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, size, int(distData.Sum()))
}

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 100,
}
requestCount := 5
logsPerRequest := 10
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

// Wait for at least one batch to be sent.
for {
if sink.LogRecordsCount() != 0 {
break
}
<-time.After(cfg.Timeout)
}

elapsed := time.Since(start)
require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds())

// This should not change the results in the sink, verified by the expectedBatchesNum
require.NoError(t, batcher.Shutdown(context.Background()))

expectedBatchesNum := 1
expectedBatchingFactor := 5

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).InstrumentationLibraryLogs().At(0).Logs().Len())
}
}
}

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
}
requestCount := 5
logsPerRequest := 10
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
require.Equal(t, 1, len(sink.AllLogs()))
}

func getTestLogName(requestNum, index int) string {
return fmt.Sprintf("test-log-int-%d-%d", requestNum, index)
}

func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
logsReceivedByName := map[string]pdata.LogRecord{}
for i := range lds {
ld := lds[i]
rms := ld.ResourceLogs()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
if rm.IsNil() {
continue
}
ilms := rm.InstrumentationLibraryLogs()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.IsNil() {
continue
}
logs := ilm.Logs()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
logsReceivedByName[log.Name()] = log
}
}
}
}
return logsReceivedByName
}
14 changes: 13 additions & 1 deletion processor/batchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewFactory() component.ProcessorFactory {
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
}

func createDefaultConfig() configmodels.Processor {
Expand Down Expand Up @@ -75,3 +76,14 @@ func createMetricsProcessor(
level, _ := telemetry.GetLevel()
return newBatchMetricsProcessor(params, nextConsumer, oCfg, level), nil
}

func createLogsProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.LogsConsumer,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)
level, _ := telemetry.GetLevel()
return newBatchLogsProcessor(params, nextConsumer, oCfg, level), nil
}
4 changes: 4 additions & 0 deletions processor/batchprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ func TestCreateProcessor(t *testing.T) {
mp, err := factory.CreateMetricsProcessor(context.Background(), creationParams, nil, cfg)
assert.NotNil(t, mp)
assert.NoError(t, err, "cannot create metric processor")

lp, err := factory.CreateLogsProcessor(context.Background(), creationParams, cfg, nil)
assert.NotNil(t, lp)
assert.NoError(t, err, "cannot create logs processor")
}