From 41be5b8536ebf94062f036dd4f92d2b2aa0b10e1 Mon Sep 17 00:00:00 2001
From: Prithvi Raj
Date: Tue, 30 Oct 2018 15:27:40 -0400
Subject: [PATCH] Add a metric for number of partitions held
ref #1126
Signed-off-by: Prithvi Raj
---
cmd/ingester/app/consumer/consumer.go | 4 ++++
cmd/ingester/app/consumer/consumer_metrics.go | 8 ++++++-
cmd/ingester/app/consumer/consumer_test.go | 21 ++++++++++++++-----
3 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go
index d1ed78671ba..03ca9ecf7a1 100644
--- a/cmd/ingester/app/consumer/consumer.go
+++ b/cmd/ingester/app/consumer/consumer.go
@@ -46,6 +46,7 @@ type Consumer struct {
deadlockDetector deadlockDetector
partitionIDToState map[int32]*consumerState
+ partitionsHeld metrics.Counter
}
type consumerState struct {
@@ -63,6 +64,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
+ partitionsHeld: partitionsHeld(params.MetricsFactory),
}, nil
}
@@ -100,6 +102,8 @@ func (c *Consumer) Close() error {
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
+ c.partitionsHeld.Inc(1)
+ defer c.partitionsHeld.Inc(-1)
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)
diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go
index 279a3f95911..e847af6e04f 100644
--- a/cmd/ingester/app/consumer/consumer_metrics.go
+++ b/cmd/ingester/app/consumer/consumer_metrics.go
@@ -20,6 +20,8 @@ import (
"github.com/uber/jaeger-lib/metrics"
)
+const consumerNamespace = "sarama-consumer"
+
type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
@@ -36,7 +38,7 @@ type partitionMetrics struct {
}
func (c *Consumer) namespace(partition int32) metrics.Factory {
- return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
+ return c.metricsFactory.Namespace(consumerNamespace, map[string]string{"partition": strconv.Itoa(int(partition))})
}
func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
@@ -58,3 +60,7 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}
+
+func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
+ return metricsFactory.Namespace(consumerNamespace, nil).Counter("partitions-held", nil)
+}
diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go
index 8db11895d32..479e347661d 100644
--- a/cmd/ingester/app/consumer/consumer_test.go
+++ b/cmd/ingester/app/consumer/consumer_test.go
@@ -47,7 +47,7 @@ const (
)
func TestConstructor(t *testing.T) {
- newConsumer, err := New(Params{})
+ newConsumer, err := New(Params{MetricsFactory: metrics.NullFactory})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}
@@ -83,23 +83,24 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
}
func newConsumer(
- factory metrics.Factory,
+ metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer) *Consumer {
logger, _ := zap.NewDevelopment()
return &Consumer{
- metricsFactory: factory,
+ metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
- deadlockDetector: newDeadlockDetector(factory, logger, time.Second),
+ partitionsHeld: partitionsHeld(metricsFactory),
+ deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),
processorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
- metricsFactory: factory,
+ metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
@@ -152,12 +153,22 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mc.YieldMessage(msg)
isProcessed.Wait()
+ testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
+ Name: "sarama-consumer.partitions-held",
+ Value: 1,
+ })
+
mp.AssertExpectations(t)
// Ensure that the partition consumer was updated in the map
assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(),
undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset())
undertest.Close()
+ testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
+ Name: "sarama-consumer.partitions-held",
+ Value: 0,
+ })
+
partitionTag := map[string]string{"partition": fmt.Sprint(partition)}
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.messages",