forked from armon/go-chord
-
Notifications
You must be signed in to change notification settings - Fork 2
/
chord.go
223 lines (187 loc) · 5.81 KB
/
chord.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
This package is used to provide an implementation of the
Chord network protocol.
*/
package chord
import (
"crypto/sha1"
"fmt"
"hash"
"time"
)
// Transport implements the methods needed for a Chord ring
type Transport interface {
// Gets a list of the vnodes on the box
ListVnodes(string) ([]*Vnode, error)
// Ping a Vnode, check for liveness
Ping(*Vnode) (bool, error)
// Request a nodes predecessor
GetPredecessor(*Vnode) (*Vnode, error)
// Notify our successor of ourselves
Notify(target, self *Vnode) ([]*Vnode, error)
// Find a successor
FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error)
// Clears a predecessor if it matches a given vnode. Used to leave.
ClearPredecessor(target, self *Vnode) error
// Instructs a node to skip a given successor. Used to leave.
SkipSuccessor(target, self *Vnode) error
// Register for an RPC callbacks
Register(*Vnode, VnodeRPC)
}
// VnodeRPC contains methods to invoke on the registered vnodes
type VnodeRPC interface {
GetPredecessor() (*Vnode, error)
Notify(*Vnode) ([]*Vnode, error)
FindSuccessors(int, []byte) ([]*Vnode, error)
ClearPredecessor(*Vnode) error
SkipSuccessor(*Vnode) error
}
// Delegate to notify on ring events
type Delegate interface {
NewPredecessor(local, remoteNew, remotePrev *Vnode)
Leaving(local, pred, succ *Vnode)
PredecessorLeaving(local, remote *Vnode)
SuccessorLeaving(local, remote *Vnode)
Shutdown()
}
// Config for Chord nodes
type Config struct {
Hostname string // Local host name
NumVnodes int // Number of vnodes per physical node
HashFunc func() hash.Hash `json:"-"` // Hash function to use
StabilizeMin time.Duration // Minimum stabilization time
StabilizeMax time.Duration // Maximum stabilization time
NumSuccessors int // Number of successors to maintain
Delegate Delegate `json:"-"` // Invoked to handle ring events
hashBits int // Bit size of the hash function
}
// Represents a local Vnode
type localVnode struct {
Vnode
ring *Ring
successors []*Vnode
finger []*Vnode
lastFinger int
predecessor *Vnode
stabilized time.Time
timer *time.Timer
}
// Ring stores the state required for a Chord ring
type Ring struct {
config *Config
transport Transport
vnodes []*localVnode
delegateCh chan func()
shutdown chan bool
}
// DefaultConfig returns the default Ring configuration
func DefaultConfig(hostname string) *Config {
return &Config{
hostname,
8, // 8 vnodes
sha1.New, // SHA1
time.Duration(15 * time.Second),
time.Duration(45 * time.Second),
8, // 8 successors
nil, // No delegate
160, // 160bit hash function
}
}
// Create a new Chord ring given the config and transport
func Create(conf *Config, trans Transport) (*Ring, error) {
// Initialize the hash bits
conf.hashBits = conf.HashFunc().Size() * 8
// Create and initialize a ring
ring := &Ring{}
ring.init(conf, trans)
ring.setLocalSuccessors()
ring.schedule()
return ring, nil
}
// Join an existing Chord ring
func Join(conf *Config, trans Transport, existing string) (*Ring, error) {
// Initialize the hash bits
conf.hashBits = conf.HashFunc().Size() * 8
// Request a list of Vnodes from the remote host
hosts, err := trans.ListVnodes(existing)
if err != nil {
return nil, err
}
if hosts == nil || len(hosts) == 0 {
return nil, fmt.Errorf("Remote host has no vnodes!")
}
// Create a ring
ring := &Ring{}
ring.init(conf, trans)
// Acquire a live successor for each Vnode
for _, vn := range ring.vnodes {
// Get the nearest remote vnode
nearest := nearestVnodeToKey(hosts, vn.Id)
// Query for a list of successors to this Vnode
succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id)
if err != nil {
return nil, fmt.Errorf("Failed to find successor for vnodes! Got %s", err)
}
if succs == nil || len(succs) == 0 {
return nil, fmt.Errorf("Failed to find successor for vnodes! Got no vnodes!")
}
// Assign the successors
for idx, s := range succs {
vn.successors[idx] = s
}
}
// Do not fast stabilize - This allows the joining node to initialize vnodes
// if needed and register services. Performing a fast stabilization will
// result in in errors on downstream services as internal states and structures
// have not yet been initialized due to their dependency on the ring. A normal
// stabilization allows for services to initialize state before any calls
// to the delegate are made.
ring.schedule()
return ring, nil
}
// Leave a given Chord ring and shuts down the local vnodes
func (r *Ring) Leave() error {
// Shutdown the vnodes first to avoid further stabilization runs
r.stopVnodes()
// Instruct each vnode to leave
var err error
for _, vn := range r.vnodes {
err = mergeErrors(err, vn.leave())
}
// Wait for the delegate callbacks to complete
r.stopDelegate()
return err
}
// Shutdown shuts down the local processes in a given Chord ring
// Blocks until all the vnodes terminate.
func (r *Ring) Shutdown() {
r.stopVnodes()
r.stopDelegate()
}
// Lookup does a key lookup for up to N successors of a key
func (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) {
// Ensure that n is sane
if n > r.config.NumSuccessors {
return nil, fmt.Errorf("Cannot ask for more successors than NumSuccessors!")
}
// Hash the key
h := r.config.HashFunc()
h.Write(key)
keyHash := h.Sum(nil)
// Find the nearest local vnode
nearest := r.nearestVnode(keyHash)
// Use the nearest node for the lookup
successors, err := nearest.FindSuccessors(n, keyHash)
if err != nil {
return nil, err
}
// Trim the nil successors
for successors[len(successors)-1] == nil {
successors = successors[:len(successors)-1]
}
return successors, nil
}
// ListVnodes for a given host
func (r *Ring) ListVnodes(host string) ([]*Vnode, error) {
return r.transport.ListVnodes(host)
}