Skip to content

Commit

Permalink
Add a metric for number of partitions held (jaegertracing#1154)
Browse files Browse the repository at this point in the history
ref jaegertracing#1126

Signed-off-by: Prithvi Raj <p.r@uber.com>
  • Loading branch information
vprithvi authored and Louis-Etienne Dorval committed Nov 1, 2018
1 parent e0fcab0 commit bb47a9c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
4 changes: 4 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Consumer struct {
deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionsHeld metrics.Counter
}

type consumerState struct {
Expand All @@ -63,6 +64,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(params.MetricsFactory),
}, nil
}

Expand Down Expand Up @@ -100,6 +102,8 @@ func (c *Consumer) Close() error {

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionsHeld.Inc(1)
defer c.partitionsHeld.Inc(-1)
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)
Expand Down
8 changes: 7 additions & 1 deletion cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/uber/jaeger-lib/metrics"
)

const consumerNamespace = "sarama-consumer"

type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
Expand All @@ -36,7 +38,7 @@ type partitionMetrics struct {
}

func (c *Consumer) namespace(partition int32) metrics.Factory {
return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return c.metricsFactory.Namespace(consumerNamespace, map[string]string{"partition": strconv.Itoa(int(partition))})
}

func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
Expand All @@ -58,3 +60,7 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}

func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
return metricsFactory.Namespace(consumerNamespace, nil).Counter("partitions-held", nil)
}
21 changes: 16 additions & 5 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
newConsumer, err := New(Params{MetricsFactory: metrics.NullFactory})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}
Expand Down Expand Up @@ -83,23 +83,24 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
}

func newConsumer(
factory metrics.Factory,
metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer) *Consumer {

logger, _ := zap.NewDevelopment()
return &Consumer{
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
deadlockDetector: newDeadlockDetector(factory, logger, time.Second),
partitionsHeld: partitionsHeld(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
Expand Down Expand Up @@ -152,12 +153,22 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mc.YieldMessage(msg)
isProcessed.Wait()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 1,
})

mp.AssertExpectations(t)
// Ensure that the partition consumer was updated in the map
assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(),
undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset())
undertest.Close()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 0,
})

partitionTag := map[string]string{"partition": fmt.Sprint(partition)}
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.messages",
Expand Down

0 comments on commit bb47a9c

Please sign in to comment.