-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
index_update_incr_task.go
72 lines (62 loc) · 1.84 KB
/
index_update_incr_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package index
import (
"database/sql"
log "github.com/Sirupsen/logrus"
"time"
nsema "github.com/toolkits/concurrent/semaphore"
ntime "github.com/toolkits/time"
"github.com/open-falcon/falcon-plus/modules/graph/g"
proc "github.com/open-falcon/falcon-plus/modules/graph/proc"
)
const (
IndexUpdateIncrTaskSleepInterval = time.Duration(1) * time.Second // 增量更新间隔时间, 默认30s
)
var (
semaUpdateIndexIncr = nsema.NewSemaphore(2) // 索引增量更新时操作mysql的并发控制
)
// 启动索引的 异步、增量更新 任务, 每隔一定时间,刷新cache中的数据到数据库中
func StartIndexUpdateIncrTask() {
for {
time.Sleep(IndexUpdateIncrTaskSleepInterval)
startTs := time.Now().Unix()
cnt := updateIndexIncr()
endTs := time.Now().Unix()
// statistics
proc.IndexUpdateIncrCnt.SetCnt(int64(cnt))
proc.IndexUpdateIncr.Incr()
proc.IndexUpdateIncr.PutOther("lastStartTs", ntime.FormatTs(startTs))
proc.IndexUpdateIncr.PutOther("lastTimeConsumingInSec", endTs-startTs)
}
}
// 进行一次增量更新
func updateIndexIncr() int {
ret := 0
if unIndexedItemCache == nil || unIndexedItemCache.Size() <= 0 {
return ret
}
dbConn, err := g.GetDbConn("UpdateIndexIncrTask")
if err != nil {
log.Error("[ERROR] get dbConn fail", err)
return ret
}
keys := unIndexedItemCache.Keys()
for _, key := range keys {
icitem := unIndexedItemCache.Get(key)
unIndexedItemCache.Remove(key)
if icitem != nil {
// 并发更新mysql
semaUpdateIndexIncr.Acquire()
go func(key string, icitem *IndexCacheItem, dbConn *sql.DB) {
defer semaUpdateIndexIncr.Release()
err := updateIndexFromOneItem(icitem.Item, dbConn)
if err != nil {
proc.IndexUpdateIncrErrorCnt.Incr()
} else {
IndexedItemCache.Put(key, icitem)
}
}(key, icitem.(*IndexCacheItem), dbConn)
ret++
}
}
return ret
}