diff --git a/doc/releasenotes/15_0_0_summary.md b/doc/releasenotes/15_0_0_summary.md index d52da3adb37..4a61c408d0e 100644 --- a/doc/releasenotes/15_0_0_summary.md +++ b/doc/releasenotes/15_0_0_summary.md @@ -66,6 +66,13 @@ The following VTTablet flags were deprecated in 7.0. They have now been deleted - --enable-query-plan-field-caching is now deprecated. It will be removed in v16. - --enable_semi_sync is now deprecated. It will be removed in v16. Instead, set the correct durability policy using `SetKeyspaceDurabilityPolicy` +### New command line flags and behavior + +#### vtgate --mysql-server-pool-conn-read-buffers + +`--mysql-server-pool-conn-read-buffers` enables pooling of buffers used to read from incoming +connections, similar to the way pooling happens for write buffers. Defaults to off. + ### VDiff2 We introduced the ability to resume a VDiff2 workflow: diff --git a/go/flags/endtoend/vtexplain.txt b/go/flags/endtoend/vtexplain.txt index 032a77ddf6b..0c1fcf20549 100644 --- a/go/flags/endtoend/vtexplain.txt +++ b/go/flags/endtoend/vtexplain.txt @@ -64,6 +64,7 @@ Usage of vtexplain: --max_memory_rows int Maximum number of rows that will be held in memory for intermediate results as well as the final result. (default 300000) --max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query. --message_stream_grace_period duration the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent. (default 30s) + --mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers --mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections. --mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static") --mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP") diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index cfa3e3c7a1e..f6e82121219 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -78,6 +78,7 @@ Usage of vtgate: --max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query. --message_stream_grace_period duration the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent. (default 30s) --min_number_serving_vttablets int The minimum number of vttablets for each replicating tablet_type (e.g. replica, rdonly) that will be continue to be used even with replication lag above discovery_low_replication_lag, but still below discovery_high_replication_lag_minimum_serving. (default 2) + --mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers --mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections. --mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static") --mysql_auth_server_static_file string JSON File to read the users/passwords from. diff --git a/go/mysql/auth_server_clientcert_test.go b/go/mysql/auth_server_clientcert_test.go index 31e9585826c..7cceb1396b2 100644 --- a/go/mysql/auth_server_clientcert_test.go +++ b/go/mysql/auth_server_clientcert_test.go @@ -42,7 +42,7 @@ func TestValidCert(t *testing.T) { authServer := newAuthServerClientCert() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -123,7 +123,7 @@ func TestNoCert(t *testing.T) { authServer := newAuthServerClientCert() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go index f6e522b87bd..ddc1c6e379b 100644 --- a/go/mysql/client_test.go +++ b/go/mysql/client_test.go @@ -149,7 +149,7 @@ func TestTLSClientDisabled(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -221,7 +221,7 @@ func TestTLSClientPreferredDefault(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -294,7 +294,7 @@ func TestTLSClientRequired(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -341,7 +341,7 @@ func TestTLSClientVerifyCA(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -424,7 +424,7 @@ func TestTLSClientVerifyIdentity(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() diff --git a/go/mysql/conn.go b/go/mysql/conn.go index f6d87344546..fd6f4d29fef 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -232,6 +232,8 @@ var bufPool = bucketpool.New(connBufferSize, MaxPacketSize) // writersPool is used for pooling bufio.Writer objects. var writersPool = sync.Pool{New: func() any { return bufio.NewWriterSize(nil, connBufferSize) }} +var readersPool = sync.Pool{New: func() any { return bufio.NewReaderSize(nil, connBufferSize) }} + // newConn is an internal method to create a Conn. Used by client and server // side for common creation code. func newConn(conn net.Conn) *Conn { @@ -254,9 +256,19 @@ func newServerConn(conn net.Conn, listener *Listener) *Conn { closed: sync2.NewAtomicBool(false), PrepareData: make(map[uint32]*PrepareData), } + if listener.connReadBufferSize > 0 { - c.bufferedReader = bufio.NewReaderSize(conn, listener.connReadBufferSize) + var buf *bufio.Reader + if listener.connBufferPooling { + buf = readersPool.Get().(*bufio.Reader) + buf.Reset(conn) + } else { + buf = bufio.NewReaderSize(conn, listener.connReadBufferSize) + } + + c.bufferedReader = buf } + return c } @@ -289,6 +301,14 @@ func (c *Conn) endWriterBuffering() error { return c.bufferedWriter.Flush() } +func (c *Conn) returnReader() { + if c.bufferedReader == nil { + return + } + c.bufferedReader.Reset(nil) + readersPool.Put(c.bufferedReader) +} + // getWriter returns the current writer. It may be either // the original connection or a wrapper. The returned unget // function must be invoked after the writing is finished. diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index a4275d14207..92d97a0a1ba 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -182,7 +182,7 @@ func New(t testing.TB) *DB { authServer := mysql.NewAuthServerNone() // Start listening. - db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false) + db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } diff --git a/go/mysql/handshake_test.go b/go/mysql/handshake_test.go index 58921ddbc3d..3948fe851bf 100644 --- a/go/mysql/handshake_test.go +++ b/go/mysql/handshake_test.go @@ -42,7 +42,7 @@ func TestClearTextClientAuth(t *testing.T) { defer authServer.close() // Create the listener. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -100,7 +100,7 @@ func TestSSLConnection(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } diff --git a/go/mysql/query_benchmark_test.go b/go/mysql/query_benchmark_test.go index db056398500..e777d8c34f5 100644 --- a/go/mysql/query_benchmark_test.go +++ b/go/mysql/query_benchmark_test.go @@ -34,18 +34,31 @@ func init() { const benchmarkQueryPrefix = "benchmark " -func benchmarkQuery(b *testing.B, threads int, query string) { - th := &testHandler{} - - authServer := NewAuthServerNone() +type mkListenerCfg func(AuthServer, Handler) ListenerConfig - lCfg := ListenerConfig{ +func mkDefaultListenerCfg(authServer AuthServer, handler Handler) ListenerConfig { + return ListenerConfig{ Protocol: "tcp", Address: "127.0.0.1:", AuthServer: authServer, - Handler: th, + Handler: handler, ConnReadBufferSize: testReadConnBufferSize, } +} + +func mkReadBufferPoolingCfg(authServer AuthServer, handler Handler) ListenerConfig { + cfg := mkDefaultListenerCfg(authServer, handler) + cfg.ConnBufferPooling = true + return cfg +} + +func benchmarkQuery(b *testing.B, threads int, query string, mkCfg mkListenerCfg) { + th := &testHandler{} + + authServer := NewAuthServerNone() + + lCfg := mkCfg(authServer, th) + l, err := NewListenerWithConfig(lCfg) if err != nil { b.Fatalf("NewListener failed: %v", err) @@ -107,13 +120,35 @@ func benchmarkQuery(b *testing.B, threads int, query string) { // executes M queries on them, then closes them. // It is meant as a somewhat real load test. func BenchmarkParallelShortQueries(b *testing.B) { - benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows") + benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows", mkDefaultListenerCfg) } func BenchmarkParallelMediumQueries(b *testing.B) { - benchmarkQuery(b, 10, benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize)) + benchmarkQuery( + b, + 10, + benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize), + mkDefaultListenerCfg, + ) } func BenchmarkParallelRandomQueries(b *testing.B) { - benchmarkQuery(b, 10, "") + benchmarkQuery(b, 10, "", mkDefaultListenerCfg) +} + +func BenchmarkParallelShortQueriesWithReadBufferPooling(b *testing.B) { + benchmarkQuery(b, 10, benchmarkQueryPrefix+"select rows", mkReadBufferPoolingCfg) +} + +func BenchmarkParallelMediumQueriesWithReadBufferPooling(b *testing.B) { + benchmarkQuery( + b, + 10, + benchmarkQueryPrefix+"select"+strings.Repeat("x", connBufferSize), + mkReadBufferPoolingCfg, + ) +} + +func BenchmarkParallelRandomQueriesWithReadBufferPooling(b *testing.B) { + benchmarkQuery(b, 10, "", mkReadBufferPoolingCfg) } diff --git a/go/mysql/server.go b/go/mysql/server.go index f59598b90f4..c32f0d6d85f 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -188,6 +188,9 @@ type Listener struct { // Reads are unbuffered if it's <=0. connReadBufferSize int + // connBufferPooling configures if vtgate server pools connection buffers + connBufferPooling bool + // shutdown indicates that Shutdown method was called. shutdown sync2.AtomicBool @@ -203,7 +206,14 @@ type Listener struct { } // NewFromListener creates a new mysql listener from an existing net.Listener -func NewFromListener(l net.Listener, authServer AuthServer, handler Handler, connReadTimeout time.Duration, connWriteTimeout time.Duration) (*Listener, error) { +func NewFromListener( + l net.Listener, + authServer AuthServer, + handler Handler, + connReadTimeout time.Duration, + connWriteTimeout time.Duration, + connBufferPooling bool, +) (*Listener, error) { cfg := ListenerConfig{ Listener: l, AuthServer: authServer, @@ -211,22 +221,31 @@ func NewFromListener(l net.Listener, authServer AuthServer, handler Handler, con ConnReadTimeout: connReadTimeout, ConnWriteTimeout: connWriteTimeout, ConnReadBufferSize: connBufferSize, + ConnBufferPooling: connBufferPooling, } return NewListenerWithConfig(cfg) } // NewListener creates a new Listener. -func NewListener(protocol, address string, authServer AuthServer, handler Handler, connReadTimeout time.Duration, connWriteTimeout time.Duration, proxyProtocol bool) (*Listener, error) { +func NewListener( + protocol, address string, + authServer AuthServer, + handler Handler, + connReadTimeout time.Duration, + connWriteTimeout time.Duration, + proxyProtocol bool, + connBufferPooling bool, +) (*Listener, error) { listener, err := net.Listen(protocol, address) if err != nil { return nil, err } if proxyProtocol { proxyListener := &proxyproto.Listener{Listener: listener} - return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout) + return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling) } - return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout) + return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling) } // ListenerConfig should be used with NewListenerWithConfig to specify listener parameters. @@ -240,6 +259,7 @@ type ListenerConfig struct { ConnReadTimeout time.Duration ConnWriteTimeout time.Duration ConnReadBufferSize int + ConnBufferPooling bool } // NewListenerWithConfig creates new listener using provided config. There are @@ -265,6 +285,7 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) { connReadTimeout: cfg.ConnReadTimeout, connWriteTimeout: cfg.ConnWriteTimeout, connReadBufferSize: cfg.ConnReadBufferSize, + connBufferPooling: cfg.ConnBufferPooling, }, nil } @@ -325,6 +346,10 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Ti // startWriterBuffering is called c.endWriterBuffering() + if l.connBufferPooling { + c.returnReader() + } + conn.Close() }() diff --git a/go/mysql/server_flaky_test.go b/go/mysql/server_flaky_test.go index 6ec95aef594..c5a07ac2d51 100644 --- a/go/mysql/server_flaky_test.go +++ b/go/mysql/server_flaky_test.go @@ -256,7 +256,7 @@ func TestConnectionFromListener(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:") require.NoError(t, err, "net.Listener failed") - l, err := NewFromListener(listener, authServer, th, 0, 0) + l, err := NewFromListener(listener, authServer, th, 0, 0, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -285,7 +285,7 @@ func TestConnectionWithoutSourceHost(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -318,7 +318,7 @@ func TestConnectionWithSourceHost(t *testing.T) { } defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -351,7 +351,7 @@ func TestConnectionUseMysqlNativePasswordWithSourceHost(t *testing.T) { } defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -389,7 +389,7 @@ func TestConnectionUnixSocket(t *testing.T) { os.Remove(unixSocket.Name()) - l, err := NewListener("unix", unixSocket.Name(), authServer, th, 0, 0, false) + l, err := NewListener("unix", unixSocket.Name(), authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -415,7 +415,7 @@ func TestClientFoundRows(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -464,7 +464,7 @@ func TestConnCounts(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -521,7 +521,7 @@ func TestServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) l.SlowConnectWarnThreshold.Set(time.Nanosecond * 1) defer l.Close() @@ -620,7 +620,7 @@ func TestServerStats(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) l.SlowConnectWarnThreshold.Set(time.Nanosecond * 1) defer l.Close() @@ -693,7 +693,7 @@ func TestClearTextServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() go l.Accept() @@ -766,7 +766,7 @@ func TestDialogServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) l.AllowClearTextWithoutTLS.Set(true) defer l.Close() @@ -809,7 +809,7 @@ func TestTLSServer(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -907,7 +907,7 @@ func TestTLSRequired(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() @@ -996,7 +996,7 @@ func TestCachingSha2PasswordAuthWithTLS(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -1095,7 +1095,7 @@ func TestCachingSha2PasswordAuthWithMoreData(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -1169,7 +1169,7 @@ func TestCachingSha2PasswordAuthWithoutTLS(t *testing.T) { defer authServer.close() // Create the listener. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -1213,7 +1213,7 @@ func TestErrorCodes(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() go l.Accept() @@ -1391,7 +1391,7 @@ func TestListenerShutdown(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) require.NoError(t, err) defer l.Close() go l.Accept() @@ -1464,7 +1464,7 @@ func TestServerFlush(t *testing.T) { th := &testHandler{} - l, err := NewListener("tcp", "127.0.0.1:", NewAuthServerNone(), th, 0, 0, false) + l, err := NewListener("tcp", "127.0.0.1:", NewAuthServerNone(), th, 0, 0, false, false) require.NoError(t, err) defer l.Close() go l.Accept() diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index bbb15ac4dbc..d590caa7e04 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -75,6 +75,8 @@ var ( mysqlConnWriteTimeout = flag.Duration("mysql_server_write_timeout", 0, "connection write timeout") mysqlQueryTimeout = flag.Duration("mysql_server_query_timeout", 0, "mysql query timeout") + mysqlConnBufferPooling = flag.Bool("mysql-server-pool-conn-read-buffers", false, "If set, the server will pool incoming connection read buffers") + mysqlDefaultWorkloadName = flag.String("mysql_default_workload", "OLTP", "Default session workload (OLTP, OLAP, DBA)") mysqlDefaultWorkload int32 @@ -433,7 +435,16 @@ func initMySQLProtocol() { var err error vtgateHandle = newVtgateHandler(rpcVTGate) if *mysqlServerPort >= 0 { - mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, vtgateHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol) + mysqlListener, err = mysql.NewListener( + *mysqlTCPVersion, + net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), + authServer, + vtgateHandle, + *mysqlConnReadTimeout, + *mysqlConnWriteTimeout, + *mysqlProxyProtocol, + *mysqlConnBufferPooling, + ) if err != nil { log.Exitf("mysql.NewListener failed: %v", err) } @@ -476,7 +487,17 @@ func initMySQLProtocol() { // newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts // to clean it up. func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) { - listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false) + listener, err := mysql.NewListener( + "unix", + address, + authServer, + handler, + *mysqlConnReadTimeout, + *mysqlConnWriteTimeout, + false, + *mysqlConnBufferPooling, + ) + switch err := err.(type) { case nil: return listener, nil @@ -497,7 +518,16 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys log.Errorf("Couldn't remove existent socket file: %s", address) return nil, err } - listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false) + listener, listenerErr := mysql.NewListener( + "unix", + address, + authServer, + handler, + *mysqlConnReadTimeout, + *mysqlConnWriteTimeout, + false, + *mysqlConnBufferPooling, + ) return listener, listenerErr default: return nil, err