Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add support for socket options #12702

Merged
merged 2 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions pkg/transport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package transport

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
Expand All @@ -39,18 +40,34 @@ import (

// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
if l, err = newListener(addr, scheme); err != nil {
if l, err = newListener(addr, scheme, nil); err != nil {
return nil, err
}
return wrapTLS(scheme, tlsinfo, l)
}

func newListener(addr string, scheme string) (net.Listener, error) {
// NewListenerWithSocketOpts creates new listener with support for socket options.
func NewListenerWithSocketOpts(addr, scheme string, tlsinfo *TLSInfo, sopts *SocketOpts) (net.Listener, error) {
ln, err := newListener(addr, scheme, sopts)
if err != nil {
return nil, err
}
if tlsinfo != nil {
wrapTLS(scheme, tlsinfo, ln)
}
return ln, nil
}

func newListener(addr string, scheme string, sopts *SocketOpts) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
return net.Listen("tcp", addr)
config, err := newListenConfig(sopts)
if err != nil {
return nil, err
}
return config.Listen(context.TODO(), "tcp", addr)
}

func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
Expand All @@ -63,6 +80,17 @@ func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, err
return newTLSListener(l, tlsinfo, checkSAN)
}

func newListenConfig(sopts *SocketOpts) (net.ListenConfig, error) {
lc := net.ListenConfig{}
if sopts != nil {
ctls := getControls(sopts)
if len(ctls) > 0 {
lc.Control = ctls.Control
}
}
return lc, nil
}

type TLSInfo struct {
CertFile string
KeyFile string
Expand Down
70 changes: 70 additions & 0 deletions pkg/transport/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,58 @@ func TestNewListenerTLSInfo(t *testing.T) {
testNewListenerTLSInfoAccept(t, *tlsInfo)
}

func TestNewListenerWithSocketOpts(t *testing.T) {
tlsInfo, del, err := createSelfCert()
if err != nil {
t.Fatalf("unable to create cert: %v", err)
}
defer del()
tests := map[string]struct {
socketOpts *SocketOpts
expectedErr bool
}{
"nil": {
socketOpts: nil,
expectedErr: true,
},
"empty": {
socketOpts: &SocketOpts{},
expectedErr: true,
},
"reuse address": {
socketOpts: &SocketOpts{ReuseAddress: true},
expectedErr: true,
},
"reuse address and reuse port": {
socketOpts: &SocketOpts{ReuseAddress: true, ReusePort: true},
expectedErr: false,
},
"reuse port": {
socketOpts: &SocketOpts{ReusePort: true},
expectedErr: false,
},
}
for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
ln, err := NewListenerWithSocketOpts("127.0.0.1:0", "https", tlsInfo, test.socketOpts)
if err != nil {
t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err)
}
defer ln.Close()
ln2, err := NewListenerWithSocketOpts(ln.Addr().String(), "https", tlsInfo, test.socketOpts)
if test.expectedErr && err == nil {
t.Fatalf("expected error")
}
if !test.expectedErr && err != nil {
t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err)
}
if ln2 != nil {
ln2.Close()
}
})
}
}

func testNewListenerTLSInfoAccept(t *testing.T, tlsInfo TLSInfo) {
ln, err := NewListener("127.0.0.1:0", "https", &tlsInfo)
if err != nil {
Expand Down Expand Up @@ -401,3 +453,21 @@ func TestIsClosedConnError(t *testing.T) {
t.Fatalf("expect true, got false (%v)", err)
}
}

func TestSocktOptsEmpty(t *testing.T) {
tests := []struct {
sopts SocketOpts
want bool
}{
{SocketOpts{}, true},
{SocketOpts{ReuseAddress: true, ReusePort: false}, false},
{SocketOpts{ReusePort: true}, false},
}

for i, tt := range tests {
got := tt.sopts.Empty()
if tt.want != got {
t.Errorf("#%d: result of Empty() incorrect: want=%t got=%t", i, tt.want, got)
}
}
}
45 changes: 45 additions & 0 deletions pkg/transport/sockopt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package transport

import (
"syscall"
)

type Controls []func(network, addr string, conn syscall.RawConn) error

func (ctls Controls) Control(network, addr string, conn syscall.RawConn) error {
for _, s := range ctls {
if err := s(network, addr, conn); err != nil {
return err
}
}
return nil
}

type SocketOpts struct {
// ReusePort enables socket option SO_REUSEPORT [1] which allows rebind of
// a port already in use. User should keep in mind that flock can fail
// in which case lock on data file could result in unexpected
// condition. User should take caution to protect against lock race.
// [1] https://man7.org/linux/man-pages/man7/socket.7.html
ReusePort bool
// ReuseAddress enables a socket option SO_REUSEADDR which allows
// binding to an address in `TIME_WAIT` state. Useful to improve MTTR
// in cases where etcd slow to restart due to excessive `TIME_WAIT`.
// [1] https://man7.org/linux/man-pages/man7/socket.7.html
ReuseAddress bool
}

func getControls(sopts *SocketOpts) Controls {
ctls := Controls{}
if sopts.ReuseAddress {
ctls = append(ctls, setReuseAddress)
}
if sopts.ReusePort {
ctls = append(ctls, setReusePort)
}
return ctls
}

func (sopts *SocketOpts) Empty() bool {
return sopts.ReuseAddress == false && sopts.ReusePort == false
}
20 changes: 20 additions & 0 deletions pkg/transport/sockopt_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// +build !windows

package transport

import (
"golang.org/x/sys/unix"
"syscall"
)

func setReusePort(network, address string, conn syscall.RawConn) error {
return conn.Control(func(fd uintptr) {
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
})
}

func setReuseAddress(network, address string, conn syscall.RawConn) error {
return conn.Control(func(fd uintptr) {
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1)
})
}
18 changes: 18 additions & 0 deletions pkg/transport/sockopt_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// +build windows

package transport

import (
"fmt"
"syscall"
)

func setReusePort(network, address string, c syscall.RawConn) error {
return fmt.Errorf("port reuse is not supported on Windows")
}

// Windows supports SO_REUSEADDR, but it may cause undefined behavior, as
// there is no protection against port hijacking.
func setReuseAddress(network, addr string, conn syscall.RawConn) error {
return fmt.Errorf("address reuse is not supported on Windows")
}
25 changes: 19 additions & 6 deletions pkg/transport/timeout_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ import (
// If read/write on the accepted connection blocks longer than its time limit,
// it will return timeout error.
func NewTimeoutListener(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (net.Listener, error) {
ln, err := newListener(addr, scheme)
ln, err := newListener(addr, scheme, nil)
if err != nil {
return nil, err
}
ln = &rwTimeoutListener{
return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo)
}

// NewTimeoutListerWithSocketOpts returns a listener that listens on the given address.
// If read/write on the accepted connection blocks longer than its time limit,
// it will return timeout error. Socket options can be passed and will be applied to the
// ListenConfig.
func NewTimeoutListerWithSocketOpts(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration, sopts *SocketOpts) (net.Listener, error) {
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
ln, err := newListener(addr, scheme, sopts)
if err != nil {
return nil, err
}
return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo)
}

func newTimeoutListener(ln net.Listener, scheme string, rdtimeoutd, wtimeoutd time.Duration, tlsinfo *TLSInfo) (net.Listener, error) {
timeoutListener := &rwTimeoutListener{
Listener: ln,
rdtimeoutd: rdtimeoutd,
wtimeoutd: wtimeoutd,
}
if ln, err = wrapTLS(scheme, tlsinfo, ln); err != nil {
return nil, err
}
return ln, nil
return wrapTLS(scheme, tlsinfo, timeoutListener)
}

type rwTimeoutListener struct {
Expand Down
5 changes: 5 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// PreVote is true to enable Raft Pre-Vote.
// If enabled, Raft runs an additional election phase
// to check whether it would get enough votes to win
Expand Down Expand Up @@ -398,6 +401,8 @@ func NewConfig() *Config {
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,

SocketOpts: transport.SocketOpts{},

TickMs: 100,
ElectionMs: 1000,
InitialElectionTickAdvance: true,
Expand Down
14 changes: 11 additions & 3 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e = nil
}()

if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
Expand Down Expand Up @@ -181,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down Expand Up @@ -458,7 +466,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
peers[i].Listener, err = rafthttp.NewListenerWithSocketOpts(u, &cfg.PeerTLSInfo, &cfg.SocketOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -565,7 +573,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
continue
}

if sctx.l, err = net.Listen(network, addr); err != nil {
if sctx.l, err = transport.NewListenerWithSocketOpts(addr, u.Scheme, nil, &cfg.SocketOpts); err != nil {
return nil, err
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
Expand Down Expand Up @@ -678,7 +686,7 @@ func (e *Etcd) serveMetrics() (err error) {
if murl.Scheme == "http" {
tlsInfo = nil
}
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
ml, err := transport.NewListenerWithSocketOpts(murl.Host, murl.Scheme, tlsInfo, &e.cfg.SocketOpts)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.ec.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")

// clustering
fs.Var(
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ Member:
Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
--grpc-keepalive-timeout '20s'
Additional duration of wait before closing a non-responsive connection (0 to disable).
--socket-reuse-port 'false'
Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.
--socket-reuse-address 'false'
Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in TIME_WAIT state.
hexfusion marked this conversation as resolved.
Show resolved Hide resolved

Clustering:
--initial-advertise-peer-urls 'http://localhost:2380'
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/api/rafthttp/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
}

func NewListenerWithSocketOpts(u url.URL, tlsinfo *transport.TLSInfo, sopts *transport.SocketOpts) (net.Listener, error) {
return transport.NewTimeoutListerWithSocketOpts(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout, sopts)
}

// NewRoundTripper returns a roundTripper used to send requests
// to rafthttp listener of remote peers.
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
Expand Down
3 changes: 3 additions & 0 deletions server/etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ type ServerConfig struct {
// PreVote is true to enable Raft Pre-Vote.
PreVote bool

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// Logger logs server-side operations.
// If not nil, it disables "capnslog" and uses the given logger.
Logger *zap.Logger
Expand Down