forked from ipfs/go-peertaskqueue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peertaskqueue.go
312 lines (268 loc) · 8.75 KB
/
peertaskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package peertaskqueue
import (
"fmt"
"sync"
pq "github.com/ipfs/go-ipfs-pq"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type peerTaskQueueEvent int
const (
peerAdded = peerTaskQueueEvent(1)
peerRemoved = peerTaskQueueEvent(2)
defaultRoundSize = 10000 // total data to distribute among peers per round
)
type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
roundSize int
}
// Option is a function that configures the peer task queue
type Option func(*PeerTaskQueue) Option
func chain(firstOption Option, secondOption Option) Option {
return func(ptq *PeerTaskQueue) Option {
firstReverse := firstOption(ptq)
secondReverse := secondOption(ptq)
return chain(secondReverse, firstReverse)
}
}
// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing
func IgnoreFreezing(ignoreFreezing bool) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.ignoreFreezing
ptq.ignoreFreezing = ignoreFreezing
return IgnoreFreezing(previous)
}
}
// TaskMerger is an option that specifies merge behaviour when pushing a task
// with the same Topic as an existing Topic.
func TaskMerger(tmfp peertracker.TaskMerger) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskMerger
ptq.taskMerger = tmfp
return TaskMerger(previous)
}
}
func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
if &hook == &testHook {
ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...)
break
}
}
return addHook(hook)
}
}
func addHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
ptq.hooks = append(ptq.hooks, hook)
return removeHook(hook)
}
}
// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerAdded {
onPeerAddedHook(p)
}
}
return addHook(hook)
}
// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerRemoved {
onPeerRemovedHook(p)
}
}
return addHook(hook)
}
// New creates a new PeerTaskQueue
func New(options ...Option) *PeerTaskQueue {
return newWithRoundSize(defaultRoundSize, options...)
}
func newWithRoundSize(roundSize int, options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
roundSize: roundSize,
}
ptq.pQueue = pq.New(peertracker.PeerCompare)
ptq.Options(options...)
return ptq
}
// Options uses configuration functions to configure the peer task queue.
// It returns an Option that can be called to reverse the changes.
func (ptq *PeerTaskQueue) Options(options ...Option) Option {
if len(options) == 0 {
return nil
}
if len(options) == 1 {
return options[0](ptq)
}
reverse := options[0](ptq)
return chain(ptq.Options(options[1:]...), reverse)
}
func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
for _, hook := range ptq.hooks {
hook(to, event)
}
}
func (ptq *PeerTaskQueue) SetWeight(id peer.ID, weight int) error {
ptq.lock.Lock()
defer ptq.lock.Unlock()
tracker, ok := ptq.peerTrackers[id]
if !ok {
return fmt.Errorf("Peer with id %s not found", id)
}
tracker.SetWeight(weight)
return nil
}
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}
peerTracker.PushTasks(tasks...)
ptq.pQueue.Update(peerTracker.Index())
}
func (ptq *PeerTaskQueue) newRound() {
totalWeight := 0
for _, tracker := range ptq.peerTrackers {
totalWeight += tracker.Weight()
}
for _, tracker := range ptq.peerTrackers {
tracker.SetWorkRemaining((ptq.roundSize * tracker.Weight()) / totalWeight)
ptq.pQueue.Update(tracker.Index())
}
}
// PopTasks finds the peer with the highest priority and pops as many tasks
// off the peer's queue as necessary to cover targetMinWork, in priority order.
// If there are not enough tasks to cover targetMinWork it just returns
// whatever is in the peer's queue.
// - Peers with the most "active" work are deprioritized.
// This heuristic is for fairness, we try to keep all peers "busy".
// - Peers with the most "pending" work are prioritized.
// This heuristic is so that peers with a lot to do get asked for work first.
// The third response argument is pending work: the amount of work in the
// queue for this peer.
func (ptq *PeerTaskQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
var peerTracker *peertracker.PeerTracker
// Choose the highest priority peer
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil, -1
}
if peerTracker.WorkRemaining() <= 0 { // all peers have <= remaining work, reset round
ptq.newRound()
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil { // shouldn't happen since we got a peerTracker last time
return "", nil, -1
}
}
// Get the highest priority tasks for the given peer
out, pendingWork, poppedWork := peerTracker.PopTasks(targetMinWork)
peerTracker.SetWorkRemaining(peerTracker.WorkRemaining() - poppedWork)
// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
ptq.pQueue.Pop()
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
// We may have modified the peer tracker's state (by popping tasks), so
// update its position in the priority queue
ptq.pQueue.Update(peerTracker.Index())
}
return peerTracker.Target(), out, pendingWork
}
// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}
// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}
// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}
// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(topic) {
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !ptq.ignoreFreezing {
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
peerTracker.Freeze()
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// FullThaw completely thaws all peers in the queue so they can execute tasks.
func (ptq *PeerTaskQueue) FullThaw() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.FullThaw()
delete(ptq.frozenPeers, p)
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// ThawRound unthaws peers incrementally, so that those have been frozen the least
// become unfrozen and able to execute tasks first.
func (ptq *PeerTaskQueue) ThawRound() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Thaw() {
delete(ptq.frozenPeers, p)
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}