-
Notifications
You must be signed in to change notification settings - Fork 0
/
promise.go
140 lines (115 loc) · 2.41 KB
/
promise.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package ring
import (
"errors"
)
var (
ErrRingShutdown = errors.New("ring is closed")
)
type Promise interface {
Error() error
Response() interface{}
}
type promiseError struct {
err error
errCh chan error
responded bool
ShutdownCh chan struct{}
}
func (p *promiseError) init() {
p.errCh = make(chan error, 1)
}
func (p *promiseError) Error() error {
if p.err != nil {
return p.err
}
if p.errCh == nil {
panic("waiting for response on nil channel")
}
select {
case p.err = <-p.errCh:
case <-p.ShutdownCh:
p.err = ErrRingShutdown
}
return p.err
}
func (p *promiseError) respondError(err error) {
if p.responded {
return
}
if p.errCh != nil {
p.errCh <- err
close(p.errCh)
}
p.responded = true
}
type Message struct {
Data *proposal
MsgType PaxosMessageType
From ServerAddress
}
type proposalPromise struct {
promiseError
req *Message
resp *Message
}
func (c *proposalPromise) init() {
c.promiseError.init()
}
func (c *proposalPromise) Request() *Message {
return c.req
}
func (c *proposalPromise) Response() interface{} {
return c.resp
}
func (c *proposalPromise) respond(resp interface{}) *proposalPromise {
c.resp = resp.(*Message)
return c
}
type ConfigurationRequest struct {
}
// ConfigurationNode has the reusable router information.
type ConfigurationNode struct {
NodeKey string
VirtualNodes map[uint64]int
Tags map[string]string
}
type ConfigurationResponse struct {
StartupConfig *StartupConfig
BindAddr string
}
type configurationPromise struct {
promiseError
req *ConfigurationRequest
resp *ConfigurationResponse
}
func (c *configurationPromise) init() {
c.promiseError.init()
}
func (c *configurationPromise) Request() *ConfigurationRequest {
return c.req
}
func (c *configurationPromise) Response() interface{} {
return c.resp
}
func (c *configurationPromise) respond(resp interface{}) *configurationPromise {
c.resp = resp.(*ConfigurationResponse)
return c
}
// rpcResponse captures both a response and a potential error.
type rpcResponse struct {
Response interface{}
Error error
}
type RPCInfo struct {
From ServerAddress
}
// rpc has a command, and provides a response mechanism.
type rpc struct {
Command interface{}
RespChan chan<- rpcResponse
*RPCInfo
}
// Respond is used to respondError with a response, error or both
func (r *rpc) Respond(resp interface{}, err error) {
r.RespChan <- rpcResponse{resp, err}
}