This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
fakemetrics.go
109 lines (96 loc) · 2.66 KB
/
fakemetrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package fakemetrics
import (
"fmt"
"time"
"github.com/grafana/metrictank/clock"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/carbon"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/kafkamdm"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/schema"
"github.com/raintank/met"
"github.com/raintank/met/helper"
log "github.com/sirupsen/logrus"
)
func init() {
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
}
func generateMetrics(num int) []*schema.MetricData {
metrics := make([]*schema.MetricData, num)
for i := 0; i < num; i++ {
name := fmt.Sprintf("some.id.of.a.metric.%d", i)
m := &schema.MetricData{
OrgId: 1,
Name: name,
Interval: 1,
Value: 1,
Unit: "s",
Mtype: "gauge",
}
m.SetId()
metrics[i] = m
}
return metrics
}
type FakeMetrics struct {
o out.Out
metrics []*schema.MetricData
close chan struct{}
closed bool
}
func NewFakeMetrics(metrics []*schema.MetricData, o out.Out, stats met.Backend) *FakeMetrics {
fm := &FakeMetrics{
o: o,
metrics: metrics,
close: make(chan struct{}),
}
go fm.run()
return fm
}
func NewKafka(num int, timeout time.Duration, v2 bool) *FakeMetrics {
stats, _ := helper.New(false, "", "standard", "", "")
out, err := kafkamdm.New("mdm", []string{"localhost:9092"}, "none", timeout, stats, "lastNum", v2)
if err != nil {
log.Fatalf("failed to create kafka-mdm output. %s", err.Error())
}
return NewFakeMetrics(generateMetrics(num), out, stats)
}
func NewCarbon(num int) *FakeMetrics {
stats, _ := helper.New(false, "", "standard", "", "")
out, err := carbon.New("localhost:2003", stats)
if err != nil {
log.Fatalf("failed to create kafka-mdm output. %s", err.Error())
}
return NewFakeMetrics(generateMetrics(num), out, stats)
}
func (f *FakeMetrics) Close() error {
if f.closed {
return nil
}
f.close <- struct{}{}
return f.o.Close()
}
func (f *FakeMetrics) run() {
// advantage over regular ticker:
// 1) no ticks dropped: a hiccup in flushing should be handled by still producing all stats and flushing them when we can
// 2) ticks come asap after the start of a new second, so we can measure better how long it took to get the data
ticker := clock.AlignedTickLossless(time.Second)
for {
select {
case <-f.close:
return
case tick := <-ticker:
unix := tick.Unix()
for i := range f.metrics {
f.metrics[i].Time = unix
}
err := f.o.Flush(f.metrics)
if err != nil {
panic(fmt.Sprintf("failed to send data to output: %s", err))
}
}
}
}