Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
report MetricPoint and MetricPointWithoutOrg separately
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Apr 17, 2018
1 parent a3edb46 commit a772c10
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 15 deletions.
3 changes: 2 additions & 1 deletion cmd/mt-kafka-mdm-sniff-out-of-order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
)

var (
Expand Down Expand Up @@ -111,7 +112,7 @@ func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition
ip.lock.Unlock()
}

func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, partition int32) {
func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) {
now := Msg{
Part: partition,
Seen: time.Now(),
Expand Down
3 changes: 2 additions & 1 deletion cmd/mt-kafka-mdm-sniff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
)

var (
Expand Down Expand Up @@ -72,7 +73,7 @@ func (ip inputPrinter) ProcessMetricData(metric *schema.MetricData, partition in
}
}

func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
stdoutLock.Lock()
err := ip.tplP.Execute(os.Stdout, DataP{
partition,
Expand Down
17 changes: 13 additions & 4 deletions dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,25 @@
"yaxis": 2
},
{
"alias": "/received/",
"alias": "/metricdata.received/",
"lines": true,
"points": false,
"color": "#3f6833"
"color": "#2a4422",
"stack": "A"
},
{
"alias": "/metricpoint.received/",
"lines": true,
"points": false,
"color": "#7eb26d"
"color": "#3f6833",
"stack": "A"
},
{
"alias": "/metricpoint_no_org.received/",
"lines": true,
"points": false,
"color": "#7eb26d",
"stack": "A"
},
{
"alias": "/invalid/",
Expand Down Expand Up @@ -180,7 +189,7 @@
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
"value_type": "individual"
},
"type": "graph",
"xaxis": {
Expand Down
15 changes: 11 additions & 4 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"fmt"
"time"

schema "gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"

"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/mdata"
Expand All @@ -16,7 +17,7 @@ import (

type Handler interface {
ProcessMetricData(md *schema.MetricData, partition int32)
ProcessMetricPoint(point schema.MetricPoint, partition int32)
ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32)
}

// TODO: clever way to document all metrics for all different inputs
Expand All @@ -25,6 +26,7 @@ type Handler interface {
type DefaultHandler struct {
receivedMD *stats.Counter32
receivedMP *stats.Counter32
receivedMPNO *stats.Counter32
invalidMD *stats.Counter32
invalidMP *stats.Counter32
unknownMP *stats.Counter32
Expand All @@ -39,6 +41,7 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
return DefaultHandler{
receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)),
receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)),
receivedMPNO: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint_no_org.received", input)),
invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
Expand All @@ -52,8 +55,12 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input

// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
// concurrency-safe.
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
in.receivedMP.Inc()
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
if format == msg.FormatMetricPoint {
in.receivedMP.Inc()
} else {
in.receivedMPNO.Inc()
}
if !point.Valid() {
in.invalidMP.Inc()
log.Debug("in: Invalid metric %v", point)
Expand Down
5 changes: 3 additions & 2 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,15 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
}

func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
if msg.IsPointMsg(data) {
format, isPointMsg := msg.IsPointMsg(data)
if isPointMsg {
_, point, err := msg.ReadPointMsg(data, uint32(orgId))
if err != nil {
metricsDecodeErr.Inc()
log.Error(3, "kafka-mdm decode error, skipping message. %s", err)
return
}
k.Handler.ProcessMetricPoint(point, partition)
k.Handler.ProcessMetricPoint(point, format, partition)
return
}

Expand Down
12 changes: 9 additions & 3 deletions vendor/gopkg.in/raintank/schema.v1/msg/msg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a772c10

Please sign in to comment.