/
dialer.go
93 lines (73 loc) · 2.17 KB
/
dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package quic
import (
"crypto/tls"
"fmt"
"net"
"github.com/SentimensRG/ctx"
quic "github.com/lucas-clemente/quic-go"
"github.com/nanomsg/mangos"
"github.com/pkg/errors"
)
type dialMux struct {
mux dialMuxer
sess *refcntSession
sock mangos.Socket
}
func newDialMux(sock mangos.Socket, m dialMuxer) *dialMux {
return &dialMux{sock: sock, mux: m}
}
func (dm *dialMux) LoadSession(n netlocator, tc *tls.Config, qc *quic.Config) error {
dm.mux.Lock()
defer dm.mux.Unlock()
var ok bool
if dm.sess, ok = dm.mux.GetSession(n); !ok {
// We don't have a session for this [ ??? ] yet, so create it
qs, err := quic.DialAddr(n.Netloc(), tc, qc)
if err != nil {
return err
}
// Init refcnt to track the Session's usage and clean up when we're done
dm.sess = newRefCntSession(qs, dm.mux)
dm.mux.AddSession(qs.RemoteAddr(), dm.sess) // don't add until it's incremented
}
dm.sess.Incr()
return nil
}
func (dm dialMux) Dial(path string) (net.Conn, error) {
stream, err := dm.sess.OpenStreamSync()
if err != nil {
return nil, errors.Wrap(err, "open stream")
}
// There's no Close method for mangos.PipeDialer, so we need to decr
// the ref counter when the stream closes.
ctx.Defer(stream.Context(), func() { _ = dm.sess.DecrAndClose() })
// this is where we do the path negotiation
var n dialNegotiator = newNegotiator(stream)
if err = n.WriteHeaders(fmt.Sprintf("%s\n", path)); err != nil {
_ = stream.Close()
return nil, errors.Wrap(err, "write headers")
}
if err = n.Ack(); err != nil {
return nil, errors.Wrap(err, "ack")
}
return &conn{Stream: stream, Session: dm.sess}, nil
}
type dialer struct {
netloc
*dialMux
opt *options
sock mangos.Socket
}
func (d dialer) Dial() (mangos.Pipe, error) {
tc, qc := getQUICCfg(d.opt)
if err := d.LoadSession(d.netloc, tc, qc); err != nil {
return nil, errors.Wrap(err, "dial quic")
}
conn, err := d.dialMux.Dial(d.Path)
if err != nil {
return nil, errors.Wrap(err, "dial path")
}
return mangos.NewConnPipe(conn, d.sock)
}
func (d dialer) GetOption(name string) (interface{}, error) { return d.opt.get(name) }
func (d dialer) SetOption(name string, v interface{}) error { return d.opt.set(name, v) }