-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
send_tasks.go
190 lines (164 loc) · 4.64 KB
/
send_tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package sender
import (
"bytes"
cmodel "github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/transfer/g"
"github.com/open-falcon/falcon-plus/modules/transfer/proc"
nsema "github.com/toolkits/concurrent/semaphore"
"github.com/toolkits/container/list"
"log"
"time"
)
// send
const (
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)
// TODO 添加对发送任务的控制,比如stop等
func startSendTasks() {
cfg := g.Config()
// init semaphore
judgeConcurrent := cfg.Judge.MaxConns
graphConcurrent := cfg.Graph.MaxConns
tsdbConcurrent := cfg.Tsdb.MaxConns
if tsdbConcurrent < 1 {
tsdbConcurrent = 1
}
if judgeConcurrent < 1 {
judgeConcurrent = 1
}
if graphConcurrent < 1 {
graphConcurrent = 1
}
// init send go-routines
for node := range cfg.Judge.Cluster {
queue := JudgeQueues[node]
go forward2JudgeTask(queue, node, judgeConcurrent)
}
for node, nitem := range cfg.Graph.ClusterList {
for _, addr := range nitem.Addrs {
queue := GraphQueues[node+addr]
go forward2GraphTask(queue, node, addr, graphConcurrent)
}
}
if cfg.Tsdb.Enabled {
go forward2TsdbTask(tsdbConcurrent)
}
}
// Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge
func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) {
batch := g.Config().Judge.Batch // 一次发送,最多batch条数据
addr := g.Config().Judge.Cluster[node]
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
judgeItems := make([]*cmodel.JudgeItem, count)
for i := 0; i < count; i++ {
judgeItems[i] = items[i].(*cmodel.JudgeItem)
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(addr string, judgeItems []*cmodel.JudgeItem, count int) {
defer sema.Release()
resp := &cmodel.SimpleRpcResponse{}
var err error
sendOk := false
for i := 0; i < 3; i++ { //最多重试3次
err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp)
if err == nil {
sendOk = true
break
}
time.Sleep(time.Millisecond * 10)
}
// statistics
if !sendOk {
log.Printf("send judge %s:%s fail: %v", node, addr, err)
proc.SendToJudgeFailCnt.IncrBy(int64(count))
} else {
proc.SendToJudgeCnt.IncrBy(int64(count))
}
}(addr, judgeItems, count)
}
}
// Graph定时任务, 将 Graph发送缓存中的数据 通过rpc连接池 发送到Graph
func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
batch := g.Config().Graph.Batch // 一次发送,最多batch条数据
sema := nsema.NewSemaphore(concurrent)
for {
items := Q.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
graphItems := make([]*cmodel.GraphItem, count)
for i := 0; i < count; i++ {
graphItems[i] = items[i].(*cmodel.GraphItem)
}
sema.Acquire()
go func(addr string, graphItems []*cmodel.GraphItem, count int) {
defer sema.Release()
resp := &cmodel.SimpleRpcResponse{}
var err error
sendOk := false
for i := 0; i < 3; i++ { //最多重试3次
err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)
if err == nil {
sendOk = true
break
}
time.Sleep(time.Millisecond * 10)
}
// statistics
if !sendOk {
log.Printf("send to graph %s:%s fail: %v", node, addr, err)
proc.SendToGraphFailCnt.IncrBy(int64(count))
} else {
proc.SendToGraphCnt.IncrBy(int64(count))
}
}(addr, graphItems, count)
}
}
// Tsdb定时任务, 将数据通过api发送到tsdb
func forward2TsdbTask(concurrent int) {
batch := g.Config().Tsdb.Batch // 一次发送,最多batch条数据
retry := g.Config().Tsdb.MaxRetry
sema := nsema.NewSemaphore(concurrent)
for {
items := TsdbQueue.PopBackBy(batch)
if len(items) == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(itemList []interface{}) {
defer sema.Release()
var tsdbBuffer bytes.Buffer
for i := 0; i < len(itemList); i++ {
tsdbItem := itemList[i].(*cmodel.TsdbItem)
tsdbBuffer.WriteString(tsdbItem.TsdbString())
tsdbBuffer.WriteString("\n")
}
var err error
for i := 0; i < retry; i++ {
err = TsdbConnPoolHelper.Send(tsdbBuffer.Bytes())
if err == nil {
proc.SendToTsdbCnt.IncrBy(int64(len(itemList)))
break
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
proc.SendToTsdbFailCnt.IncrBy(int64(len(itemList)))
log.Println(err)
return
}
}(items)
}
}