Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vtgate] Add flag to pool connection read buffers #11167

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/releasenotes/15_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ The following VTTablet flags were deprecated in 7.0. They have now been deleted
- --enable-query-plan-field-caching is now deprecated. It will be removed in v16.
- --enable_semi_sync is now deprecated. It will be removed in v16. Instead, set the correct durability policy using `SetKeyspaceDurabilityPolicy`

### New command line flags and behavior

#### vtgate --mysql-server-pool-conn-read-buffers

`--mysql-server-pool-conn-read-buffers` enables pooling of buffers used to read from incoming
connections, similar to the way pooling happens for write buffers. Defaults to off.

### VDiff2

We introduced the ability to resume a VDiff2 workflow:
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtexplain.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Usage of vtexplain:
--max_memory_rows int Maximum number of rows that will be held in memory for intermediate results as well as the final result. (default 300000)
--max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query.
--message_stream_grace_period duration the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent. (default 30s)
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections.
--mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static")
--mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Usage of vtgate:
--max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query.
--message_stream_grace_period duration the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent. (default 30s)
--min_number_serving_vttablets int The minimum number of vttablets for each replicating tablet_type (e.g. replica, rdonly) that will be continue to be used even with replication lag above discovery_low_replication_lag, but still below discovery_high_replication_lag_minimum_serving. (default 2)
--mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers
--mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections.
--mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static")
--mysql_auth_server_static_file string JSON File to read the users/passwords from.
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/auth_server_clientcert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestValidCert(t *testing.T) {
authServer := newAuthServerClientCert()

// Create the listener, so we can get its host.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
if err != nil {
t.Fatalf("NewListener failed: %v", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestNoCert(t *testing.T) {
authServer := newAuthServerClientCert()

// Create the listener, so we can get its host.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
if err != nil {
t.Fatalf("NewListener failed: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions go/mysql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTLSClientDisabled(t *testing.T) {
// Below, we are enabling --ssl-verify-server-cert, which adds
// a check that the common name of the certificate matches the
// server host name we connect to.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
require.NoError(t, err)
defer l.Close()

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestTLSClientPreferredDefault(t *testing.T) {
// Below, we are enabling --ssl-verify-server-cert, which adds
// a check that the common name of the certificate matches the
// server host name we connect to.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
require.NoError(t, err)
defer l.Close()

Expand Down Expand Up @@ -294,7 +294,7 @@ func TestTLSClientRequired(t *testing.T) {
// Below, we are enabling --ssl-verify-server-cert, which adds
// a check that the common name of the certificate matches the
// server host name we connect to.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
require.NoError(t, err)
defer l.Close()

Expand Down Expand Up @@ -341,7 +341,7 @@ func TestTLSClientVerifyCA(t *testing.T) {
// Below, we are enabling --ssl-verify-server-cert, which adds
// a check that the common name of the certificate matches the
// server host name we connect to.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
require.NoError(t, err)
defer l.Close()

Expand Down Expand Up @@ -424,7 +424,7 @@ func TestTLSClientVerifyIdentity(t *testing.T) {
// Below, we are enabling --ssl-verify-server-cert, which adds
// a check that the common name of the certificate matches the
// server host name we connect to.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
require.NoError(t, err)
defer l.Close()

Expand Down
22 changes: 21 additions & 1 deletion go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ var bufPool = bucketpool.New(connBufferSize, MaxPacketSize)
// writersPool is used for pooling bufio.Writer objects.
var writersPool = sync.Pool{New: func() any { return bufio.NewWriterSize(nil, connBufferSize) }}

var readersPool = sync.Pool{New: func() any { return bufio.NewReaderSize(nil, connBufferSize) }}

// newConn is an internal method to create a Conn. Used by client and server
// side for common creation code.
func newConn(conn net.Conn) *Conn {
Expand All @@ -254,9 +256,19 @@ func newServerConn(conn net.Conn, listener *Listener) *Conn {
closed: sync2.NewAtomicBool(false),
PrepareData: make(map[uint32]*PrepareData),
}

if listener.connReadBufferSize > 0 {
c.bufferedReader = bufio.NewReaderSize(conn, listener.connReadBufferSize)
var buf *bufio.Reader
if listener.connBufferPooling {
buf = readersPool.Get().(*bufio.Reader)
buf.Reset(conn)
} else {
buf = bufio.NewReaderSize(conn, listener.connReadBufferSize)
}

c.bufferedReader = buf
}

return c
}

Expand Down Expand Up @@ -289,6 +301,14 @@ func (c *Conn) endWriterBuffering() error {
return c.bufferedWriter.Flush()
}

func (c *Conn) returnReader() {
if c.bufferedReader == nil {
return
}
c.bufferedReader.Reset(nil)
readersPool.Put(c.bufferedReader)
}

// getWriter returns the current writer. It may be either
// the original connection or a wrapper. The returned unget
// function must be invoked after the writing is finished.
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func New(t testing.TB) *DB {
authServer := mysql.NewAuthServerNone()

// Start listening.
db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false)
db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false, false)
if err != nil {
t.Fatalf("NewListener failed: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestClearTextClientAuth(t *testing.T) {
defer authServer.close()

// Create the listener.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
if err != nil {
t.Fatalf("NewListener failed: %v", err)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestSSLConnection(t *testing.T) {
defer authServer.close()

// Create the listener, so we can get its host.
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false)
l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false)
if err != nil {
t.Fatalf("NewListener failed: %v", err)
}
Expand Down
53 changes: 44 additions & 9 deletions go/mysql/query_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,31 @@ func init() {

const benchmarkQueryPrefix = "benchmark "

func benchmarkQuery(b *testing.B, threads int, query string) {
th := &testHandler{}

authServer := NewAuthServerNone()
type mkListenerCfg func(AuthServer, Handler) ListenerConfig

lCfg := ListenerConfig{
func mkDefaultListenerCfg(authServer AuthServer, handler Handler) ListenerConfig {
return ListenerConfig{
Protocol: "tcp",
Address: "127.0.0.1:",
AuthServer: authServer,
Handler: th,
Handler: handler,
ConnReadBufferSize: testReadConnBufferSize,
}
}

func mkReadBufferPoolingCfg(authServer AuthServer, handler Handler) ListenerConfig {
cfg := mkDefaultListenerCfg(authServer, handler)
cfg.ConnBufferPooling = true
return cfg
}

func benchmarkQuery(b *testing.B, threads int, query string, mkCfg mkListenerCfg) {
th := &testHandler{}

authServer := NewAuthServerNone()

lCfg := mkCfg(authServer, th)

l, err := NewListenerWithConfig(lCfg)
if err != nil {
b.Fatalf("NewListener failed: %v", err)
Expand Down Expand Up @@ -107,13 +120,35 @@ func benchmarkQuery(b *testing.B, threads int, query string) {
// executes M queries on them, then closes them.
// It is meant as a somewhat real load test.
func BenchmarkParallelShortQueries(b *testing.B) {
benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows")
benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows", mkDefaultListenerCfg)
}

func BenchmarkParallelMediumQueries(b *testing.B) {
benchmarkQuery(b, 10, benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize))
benchmarkQuery(
b,
10,
benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize),
mkDefaultListenerCfg,
)
}

func BenchmarkParallelRandomQueries(b *testing.B) {
benchmarkQuery(b, 10, "")
benchmarkQuery(b, 10, "", mkDefaultListenerCfg)
}

func BenchmarkParallelShortQueriesWithReadBufferPooling(b *testing.B) {
benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows", mkReadBufferPoolingCfg)
}

func BenchmarkParallelMediumQueriesWithReadBufferPooling(b *testing.B) {
benchmarkQuery(
b,
10,
benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize),
mkReadBufferPoolingCfg,
)
}

func BenchmarkParallelRandomQueriesWithReadBufferPooling(b *testing.B) {
benchmarkQuery(b, 10, "", mkReadBufferPoolingCfg)
}
33 changes: 29 additions & 4 deletions go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ type Listener struct {
// Reads are unbuffered if it's <=0.
connReadBufferSize int

// connBufferPooling configures if vtgate server pools connection buffers
connBufferPooling bool

// shutdown indicates that Shutdown method was called.
shutdown sync2.AtomicBool

Expand All @@ -203,30 +206,46 @@ type Listener struct {
}

// NewFromListener creates a new mysql listener from an existing net.Listener
func NewFromListener(l net.Listener, authServer AuthServer, handler Handler, connReadTimeout time.Duration, connWriteTimeout time.Duration) (*Listener, error) {
func NewFromListener(
l net.Listener,
authServer AuthServer,
handler Handler,
connReadTimeout time.Duration,
connWriteTimeout time.Duration,
connBufferPooling bool,
) (*Listener, error) {
cfg := ListenerConfig{
Listener: l,
AuthServer: authServer,
Handler: handler,
ConnReadTimeout: connReadTimeout,
ConnWriteTimeout: connWriteTimeout,
ConnReadBufferSize: connBufferSize,
ConnBufferPooling: connBufferPooling,
}
return NewListenerWithConfig(cfg)
}

// NewListener creates a new Listener.
func NewListener(protocol, address string, authServer AuthServer, handler Handler, connReadTimeout time.Duration, connWriteTimeout time.Duration, proxyProtocol bool) (*Listener, error) {
func NewListener(
protocol, address string,
authServer AuthServer,
handler Handler,
connReadTimeout time.Duration,
connWriteTimeout time.Duration,
proxyProtocol bool,
connBufferPooling bool,
) (*Listener, error) {
listener, err := net.Listen(protocol, address)
if err != nil {
return nil, err
}
if proxyProtocol {
proxyListener := &proxyproto.Listener{Listener: listener}
return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout)
return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling)
}

return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout)
return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling)
}

// ListenerConfig should be used with NewListenerWithConfig to specify listener parameters.
Expand All @@ -240,6 +259,7 @@ type ListenerConfig struct {
ConnReadTimeout time.Duration
ConnWriteTimeout time.Duration
ConnReadBufferSize int
ConnBufferPooling bool
}

// NewListenerWithConfig creates new listener using provided config. There are
Expand All @@ -265,6 +285,7 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) {
connReadTimeout: cfg.ConnReadTimeout,
connWriteTimeout: cfg.ConnWriteTimeout,
connReadBufferSize: cfg.ConnReadBufferSize,
connBufferPooling: cfg.ConnBufferPooling,
}, nil
}

Expand Down Expand Up @@ -325,6 +346,10 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Ti
// startWriterBuffering is called
c.endWriterBuffering()

if l.connBufferPooling {
c.returnReader()
}

conn.Close()
}()

Expand Down
Loading