-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
sender.go
200 lines (173 loc) · 4.5 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package sender
import (
"fmt"
backend "github.com/open-falcon/falcon-plus/common/backend_pool"
cmodel "github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/transfer/g"
"github.com/open-falcon/falcon-plus/modules/transfer/proc"
rings "github.com/toolkits/consistent/rings"
nlist "github.com/toolkits/container/list"
"log"
)
const (
DefaultSendQueueMaxSize = 102400 //10.24w
)
// 默认参数
var (
MinStep int //最小上报周期,单位sec
)
// 服务节点的一致性哈希环
// pk -> node
var (
JudgeNodeRing *rings.ConsistentHashNodeRing
GraphNodeRing *rings.ConsistentHashNodeRing
)
// 发送缓存队列
// node -> queue_of_data
var (
TsdbQueue *nlist.SafeListLimited
JudgeQueues = make(map[string]*nlist.SafeListLimited)
GraphQueues = make(map[string]*nlist.SafeListLimited)
)
// 连接池
// node_address -> connection_pool
var (
JudgeConnPools *backend.SafeRpcConnPools
TsdbConnPoolHelper *backend.TsdbConnPoolHelper
GraphConnPools *backend.SafeRpcConnPools
)
// 初始化数据发送服务, 在main函数中调用
func Start() {
// 初始化默认参数
MinStep = g.Config().MinStep
if MinStep < 1 {
MinStep = 30 //默认30s
}
//
initConnPools()
initSendQueues()
initNodeRings()
// SendTasks依赖基础组件的初始化,要最后启动
startSendTasks()
startSenderCron()
log.Println("send.Start, ok")
}
// 将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定
func Push2JudgeSendQueue(items []*cmodel.MetaData) {
for _, item := range items {
pk := item.PK()
node, err := JudgeNodeRing.GetNode(pk)
if err != nil {
log.Println("E:", err)
continue
}
// align ts
step := int(item.Step)
if step < MinStep {
step = MinStep
}
ts := alignTs(item.Timestamp, int64(step))
judgeItem := &cmodel.JudgeItem{
Endpoint: item.Endpoint,
Metric: item.Metric,
Value: item.Value,
Timestamp: ts,
JudgeType: item.CounterType,
Tags: item.Tags,
}
Q := JudgeQueues[node]
isSuccess := Q.PushFront(judgeItem)
// statistics
if !isSuccess {
proc.SendToJudgeDropCnt.Incr()
}
}
}
// 将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定
func Push2GraphSendQueue(items []*cmodel.MetaData) {
cfg := g.Config().Graph
for _, item := range items {
graphItem, err := convert2GraphItem(item)
if err != nil {
log.Println("E:", err)
continue
}
pk := item.PK()
// statistics. 为了效率,放到了这里,因此只有graph是enbale时才能trace
proc.RecvDataTrace.Trace(pk, item)
proc.RecvDataFilter.Filter(pk, item.Value, item)
node, err := GraphNodeRing.GetNode(pk)
if err != nil {
log.Println("E:", err)
continue
}
cnode := cfg.ClusterList[node]
errCnt := 0
for _, addr := range cnode.Addrs {
Q := GraphQueues[node+addr]
if !Q.PushFront(graphItem) {
errCnt += 1
}
}
// statistics
if errCnt > 0 {
proc.SendToGraphDropCnt.Incr()
}
}
}
// 打到Graph的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) {
item := &cmodel.GraphItem{}
item.Endpoint = d.Endpoint
item.Metric = d.Metric
item.Tags = d.Tags
item.Timestamp = d.Timestamp
item.Value = d.Value
item.Step = int(d.Step)
if item.Step < MinStep {
item.Step = MinStep
}
item.Heartbeat = item.Step * 2
if d.CounterType == g.GAUGE {
item.DsType = d.CounterType
item.Min = "U"
item.Max = "U"
} else if d.CounterType == g.COUNTER {
item.DsType = g.DERIVE
item.Min = "0"
item.Max = "U"
} else if d.CounterType == g.DERIVE {
item.DsType = g.DERIVE
item.Min = "0"
item.Max = "U"
} else {
return item, fmt.Errorf("not_supported_counter_type")
}
item.Timestamp = alignTs(item.Timestamp, int64(item.Step)) //item.Timestamp - item.Timestamp%int64(item.Step)
return item, nil
}
// 将原始数据入到tsdb发送缓存队列
func Push2TsdbSendQueue(items []*cmodel.MetaData) {
for _, item := range items {
tsdbItem := convert2TsdbItem(item)
isSuccess := TsdbQueue.PushFront(tsdbItem)
if !isSuccess {
proc.SendToTsdbDropCnt.Incr()
}
}
}
// 转化为tsdb格式
func convert2TsdbItem(d *cmodel.MetaData) *cmodel.TsdbItem {
t := cmodel.TsdbItem{Tags: make(map[string]string)}
for k, v := range d.Tags {
t.Tags[k] = v
}
t.Tags["endpoint"] = d.Endpoint
t.Metric = d.Metric
t.Timestamp = d.Timestamp
t.Value = d.Value
return &t
}
func alignTs(ts int64, period int64) int64 {
return ts - ts%period
}