Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport prepared queries rework #686

Merged
merged 11 commits into from
Jun 19, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class DB2PreparedStatement implements PreparedStatement {
final DB2ParamDesc paramDesc;
final DB2RowDesc rowDesc;
final Section section;
final boolean cacheable;

private final Map<String, QueryInstance> activeQueries = new HashMap<>(4);

Expand All @@ -52,12 +51,11 @@ public static class QueryInstance {
}
}

DB2PreparedStatement(String sql, DB2ParamDesc paramDesc, DB2RowDesc rowDesc, Section section, boolean cacheable) {
DB2PreparedStatement(String sql, DB2ParamDesc paramDesc, DB2RowDesc rowDesc, Section section) {
this.paramDesc = paramDesc;
this.rowDesc = rowDesc;
this.sql = sql;
this.section = section;
this.cacheable = cacheable;
}

@Override
Expand All @@ -80,11 +78,6 @@ public String prepare(TupleInternal values) {
return paramDesc.prepare(values);
}

@Override
public boolean cacheable() {
return cacheable;
}

QueryInstance getQueryInstance(String cursorId) {
cursorId = cursorId == null ? UUID.randomUUID().toString() : cursorId;
return activeQueries.computeIfAbsent(cursorId, c -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class PrepareStatementCodec extends CommandCodec<PreparedStatement, PrepareState
private static final Logger LOG = LoggerFactory.getLogger(PrepareStatementCodec.class);

private static enum CommandHandlerState {
INIT, HANDLING_PARAM_COLUMN_DEFINITION,
PARAM_DEFINITIONS_DECODING_COMPLETED,
INIT, HANDLING_PARAM_COLUMN_DEFINITION,
PARAM_DEFINITIONS_DECODING_COMPLETED,
HANDLING_COLUMN_COLUMN_DEFINITION,
COLUMN_DEFINITIONS_DECODING_COMPLETED
}
Expand Down Expand Up @@ -89,7 +89,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {

private void handleReadyForQuery() {
completionHandler.handle(CommandResponse.success(new DB2PreparedStatement(cmd.sql(), new DB2ParamDesc(paramDesc),
new DB2RowDesc(rowDesc), section, cmd.cacheable())));
new DB2RowDesc(rowDesc), section)));
}

private void resetIntermediaryResult() {
Expand Down
6 changes: 2 additions & 4 deletions vertx-mysql-client/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,10 @@ Sometimes you might meet the notorious error `Can't create more than max_prepare
You can adjust the server system variable `max_prepared_stmt_count` but it has an upper bound value so you can't get rid of the error in this way.

The best way to alleviate this is enabling prepared statement caching, so the prepared statements with the same SQL string could be reused and the client does not have to create a brand new prepared statement for every request.
The prepared statement will be automatically closed when it's evicted from the cache.
The prepared statement will be automatically closed after the statement is executed.
In this way the chances of reaching the limit could be greatly reduced though it could not be totally eliminated.

Note using `SqlClient#preparedQuery` without prepared statement caching enabled will not close the prepared statement after executing!

You can also manage the lifecycle of prepared statements manually by creating a `PreparedStatement` object via `SqlConnection#prepare` interface, or even use the https://dev.mysql.com/doc/refman/8.0/en/sql-prepared-statements.html[SQL syntax prepared statement].
You can also manage the lifecycle of prepared statements manually by creating a `PreparedStatement` object via `SqlConnection#prepare` interface so that you can choose when to deallocate the statement handle, or even use the https://dev.mysql.com/doc/refman/8.0/en/sql-prepared-statements.html[SQL syntax prepared statement].

=== demystifying prepared batch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.vertx.mysqlclient.impl.datatype.DataFormat;
import io.vertx.mysqlclient.impl.protocol.CommandType;
import io.vertx.sqlclient.impl.command.ExtendedQueryCommand;

import static io.vertx.mysqlclient.impl.protocol.Packets.*;
Expand Down Expand Up @@ -29,4 +30,26 @@ protected void handleInitPacket(ByteBuf payload) {
}
}

@Override
protected void handleAllResultsetDecodingCompleted() {
// Close prepare statement
MySQLPreparedStatement ps = (MySQLPreparedStatement) this.cmd.ps;
if (ps.closeAfterUsage) {
sendCloseStatementCommand(ps);
}
super.handleAllResultsetDecodingCompleted();
}

private void sendCloseStatementCommand(MySQLPreparedStatement statement) {
ByteBuf packet = allocateBuffer(9);
// encode packet header
packet.writeMediumLE(5);
packet.writeByte(0); // sequenceId set to zero

// encode packet payload
packet.writeByte(CommandType.COM_STMT_CLOSE);
packet.writeIntLE((int) statement.statementId);

sendNonSplitPacket(packet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ class MySQLPreparedStatement implements PreparedStatement {
final String sql;
final MySQLParamDesc paramDesc;
final MySQLRowDesc rowDesc;
final boolean cacheable;
final boolean closeAfterUsage;

private boolean sendTypesToServer;
private final DataType[] bindingTypes;

boolean isCursorOpen;

MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean cacheable) {
MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean closeAfterUsage) {
this.statementId = statementId;
this.paramDesc = paramDesc;
this.rowDesc = rowDesc;
this.sql = sql;
this.cacheable = cacheable;
this.closeAfterUsage = closeAfterUsage;

this.bindingTypes = new DataType[paramDesc.paramDefinitions().length];
// init param bindings
Expand Down Expand Up @@ -104,9 +104,4 @@ private String bindParameters(MySQLParamDesc paramDesc, TupleInternal params) {
sendTypesToServer = reboundParameters; // parameter must be re-bound
return null;
}

@Override
public boolean cacheable() {
return cacheable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void handleReadyForQuery() {
this.statementId,
new MySQLParamDesc(paramDescs),
new MySQLRowDesc(columnDescs, DataFormat.BINARY),
cmd.cacheable())));
!cmd.isCached())));
}

private void resetIntermediaryResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private void handleSingleResultsetEndPacket(int serverStatusFlags, long affected
cmd.resultHandler().addProperty(MySQLClient.LAST_INSERTED_ID, lastInsertId);
}

private void handleAllResultsetDecodingCompleted() {
protected void handleAllResultsetDecodingCompleted() {
CommandResponse<Boolean> response;
if (this.failure != null) {
response = CommandResponse.failure(this.failure);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package io.vertx.mysqlclient;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.Tuple;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.atomic.AtomicInteger;

@RunWith(VertxUnitRunner.class)
public class MySQLPreparedStatementTest extends MySQLTestBase {
Vertx vertx;
Expand Down Expand Up @@ -54,4 +59,40 @@ public void testContinuousPreparedQueriesWithDifferentTypeParameters(TestContext
}));
}));
}

@Test
public void testMaxPreparedStatementEviction(TestContext ctx) {
testPreparedStatements(ctx, new MySQLConnectOptions(options).setCachePreparedStatements(true).setPreparedStatementCacheMaxSize(16), 128, 16);
}

@Test
public void testOneShotPreparedStatements(TestContext ctx) {
testPreparedStatements(ctx, new MySQLConnectOptions(options).setCachePreparedStatements(false), 128, 0);
}

private void testPreparedStatements(TestContext ctx, MySQLConnectOptions options, int num, int expected) {
Assume.assumeFalse(MySQLTestBase.rule.isUsingMySQL5_6() || MySQLTestBase.rule.isUsingMariaDB());
Async async = ctx.async();
MySQLConnection.connect(vertx, options.setUser("root").setPassword("password"), ctx.asyncAssertSuccess(conn -> {
conn.query("SELECT * FROM performance_schema.prepared_statements_instances").execute(ctx.asyncAssertSuccess(res1 -> {
ctx.assertEquals(0, res1.size());
AtomicInteger count = new AtomicInteger(num);
for (int i = 0;i < num;i++) {
int val = i;
conn.preparedQuery("SELECT " + i).execute(Tuple.tuple(), ctx.asyncAssertSuccess(res2 -> {
ctx.assertEquals(1, res2.size());
ctx.assertEquals(val, res2.iterator().next().getInteger(0));
if (count.decrementAndGet() == 0) {
ctx.assertEquals(num - 1, val);
conn.query("SELECT * FROM performance_schema.prepared_statements_instances").execute(ctx.asyncAssertSuccess(res3 -> {
ctx.assertEquals(expected, res3.size());
conn.close();
async.complete();
}));
}
}));
}
}));
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import org.junit.After;
import org.junit.Assume;
Expand All @@ -15,17 +16,11 @@
import org.junit.runner.RunWith;

import java.time.LocalDate;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

@RunWith(VertxUnitRunner.class)
public class MySQLQueryTest extends MySQLTestBase {
Expand Down Expand Up @@ -127,6 +122,74 @@ public void testCachePreparedStatementWithSameSql(TestContext ctx) {
}));
}

@Test
public void testCachePreparedStatementBatchWithSameSql(TestContext ctx) {
MySQLConnection.connect(vertx, options.setCachePreparedStatements(true), ctx.asyncAssertSuccess(conn -> {
conn.query("SHOW VARIABLES LIKE 'max_prepared_stmt_count'").execute(ctx.asyncAssertSuccess(res1 -> {
Row row = res1.iterator().next();
int maxPreparedStatementCount = Integer.parseInt(row.getString(1));
ctx.assertEquals("max_prepared_stmt_count", row.getString(0));
ctx.assertEquals(16382, maxPreparedStatementCount);

for (int i = 0; i < 20000; i++) {
int val = i * 1000;
List<Tuple> tuples = new ArrayList<>();
tuples.add(Tuple.of(val));
tuples.add(Tuple.of(val + 1));
conn.preparedQuery("Select cast(? AS CHAR)").executeBatch(tuples, ctx.asyncAssertSuccess(res2 -> {
String v1 = res2.iterator().next().getString(0);
String v2 = res2.next().iterator().next().getString(0);
ctx.assertEquals("" + val, v1);
ctx.assertEquals("" + (val + 1), v2);
}));
}
}));
}));
}

@Test
public void testAutoClosingNonCacheOneShotPreparedQueryStatement(TestContext ctx) {
MySQLConnection.connect(vertx, options.setCachePreparedStatements(false), ctx.asyncAssertSuccess(conn -> {
conn.query("SHOW VARIABLES LIKE 'max_prepared_stmt_count'").execute(ctx.asyncAssertSuccess(res1 -> {
Row row = res1.iterator().next();
int maxPreparedStatementCount = Integer.parseInt(row.getString(1));
ctx.assertEquals("max_prepared_stmt_count", row.getString(0));
ctx.assertEquals(16382, maxPreparedStatementCount);

for (int i = 0; i < 20000; i++) {
// if we don't close the statement automatically in the codec, the statement handles would leak and raise an statement limit error
conn.preparedQuery("SELECT 'test'").execute(ctx.asyncAssertSuccess(res2 -> {
ctx.assertEquals("test", res2.iterator().next().getString(0));
}));
}
}));
}));
}

@Test
public void testAutoClosingNonCacheOneShotPreparedBatchStatement(TestContext ctx) {
MySQLConnection.connect(vertx, options.setCachePreparedStatements(false), ctx.asyncAssertSuccess(conn -> {
conn.query("SHOW VARIABLES LIKE 'max_prepared_stmt_count'").execute(ctx.asyncAssertSuccess(res0 -> {
Row row = res0.iterator().next();
int maxPreparedStatementCount = Integer.parseInt(row.getString(1));
ctx.assertEquals("max_prepared_stmt_count", row.getString(0));
ctx.assertEquals(16382, maxPreparedStatementCount);

for (int i = 0; i < 20000; i++) {
// if we don't close the statement automatically in the codec, the statement handles would leak and raise an statement limit error
List<Tuple> params = Arrays.asList(Tuple.of(1), Tuple.of(2), Tuple.of(3));
conn.preparedQuery("SELECT CAST(? AS CHAR)").executeBatch(params, ctx.asyncAssertSuccess(res1 -> {
ctx.assertEquals("1", res1.iterator().next().getString(0));
RowSet<Row> res2 = res1.next();
ctx.assertEquals("2", res2.iterator().next().getString(0));
RowSet<Row> res3 = res2.next();
ctx.assertEquals("3", res3.iterator().next().getString(0));
}));
}
}));
}));
}

@Test
public void testDecodePacketSizeMoreThan16MB(TestContext ctx) {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,9 @@ class CloseStatementCommandCodec extends PgCommandCodec<Void, CloseStatementComm

@Override
public void encode(PgEncoder out) {
PgPreparedStatement statement = (PgPreparedStatement) cmd.statement();
if (statement.cacheable()) {
// we don't need to close unnamed prepared statements
CommandResponse<Void> resp = CommandResponse.success(null);
completionHandler.handle(resp);
} else {
// close the named prepared statement
out.writeClosePreparedStatement(((PgPreparedStatement) cmd.statement()).bind.statement);
out.writeSync();
}
// close the named prepared statement
out.writeClosePreparedStatement(((PgPreparedStatement) cmd.statement()).bind.statement);
out.writeSync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ void encode(PgEncoder encoder) {
encoder.writeSync();
} else {
PgPreparedStatement ps = (PgPreparedStatement) cmd.preparedStatement();
if (ps.bind.statement == 0) {
encoder.writeParse(new Parse(ps.sql()));
}
if (cmd.isBatch()) {
if (cmd.paramsList().isEmpty()) {
// We set suspended to false as we won't get a command complete command back from Postgres
Expand All @@ -61,11 +58,6 @@ void encode(PgEncoder encoder) {
}
}

@Override
void handleRowDescription(PgRowDesc rowDescription) {
decoder = new RowResultDecoder<>(cmd.collector(), rowDescription);
}

@Override
void handleParseComplete() {
// Response to Parse
Expand All @@ -89,9 +81,13 @@ void handleBindComplete() {

@Override
public void handleErrorResponse(ErrorResponse errorResponse) {
if (cmd.preparedStatement().cacheable() && errorResponse.getMessage().matches(TABLE_SCHEMA_CHANGE_ERROR_MESSAGE_PATTERN)) {
if (((PgPreparedStatement)cmd.preparedStatement()).isCached() && isTableSchemaErrorMessage(errorResponse)) {
encoder.channelHandlerContext().fireChannelRead(new InvalidCachedStatementEvent(cmd.preparedStatement().sql()));
}
super.handleErrorResponse(errorResponse);
}

private boolean isTableSchemaErrorMessage(ErrorResponse errorResponse) {
return errorResponse.getMessage().matches(TABLE_SCHEMA_CHANGE_ERROR_MESSAGE_PATTERN) || errorResponse.getMessage().equals("cached plan must not change result type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

class PgColumnDesc {

final String name;
public static final PgColumnDesc[] EMPTY_COLUMNS = new PgColumnDesc[0];
final String name;
final int relationId;
final DataType dataType;
final DataFormat dataFormat; // are we sure that ????
Expand Down
Loading