Skip to content

Commit

Permalink
Keep buffer of Jdbc writer after connection loss
Browse files Browse the repository at this point in the history
Fixes #91
  • Loading branch information
pmwmedia committed May 8, 2022
1 parent 5622635 commit e234a16
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
57 changes: 43 additions & 14 deletions tinylog-impl/src/main/java/org/tinylog/writers/JdbcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public final class JdbcWriter extends AbstractWriter {
private final Object mutex;
private final String sql;
private final List<Token> tokens;
private final List<LogEntry> entries;

private Connection connection;
private PreparedStatement statement;
private long batchCount;
private long lostCount;
private long reconnectTimestamp;

Expand Down Expand Up @@ -92,6 +92,7 @@ public JdbcWriter(final Map<String, String> properties) throws NamingException,

mutex = getBooleanValue("writingthread") ? null : new Object();

entries = new ArrayList<LogEntry>();
connection = connect(url, user, password);
sql = renderSql(properties, connection.getMetaData().getIdentifierQuoteString());
statement = connection.prepareStatement(sql);
Expand Down Expand Up @@ -154,13 +155,11 @@ public void close() throws SQLException {
private void doWrite(final LogEntry logEntry) throws SQLException {
if (checkConnection()) {
if (batch) {
batchCount += 1;
entries.add(logEntry);
}

try {
for (int i = 0; i < tokens.size(); ++i) {
tokens.get(i).apply(logEntry, statement, i + 1);
}
applyLogEntry(logEntry);
} catch (SQLException ex) {
resetConnection();
throw ex;
Expand All @@ -169,9 +168,9 @@ private void doWrite(final LogEntry logEntry) throws SQLException {
try {
if (batch) {
statement.addBatch();
if (batchCount >= MAX_BATCH_SIZE) {
if (entries.size() >= MAX_BATCH_SIZE) {
statement.executeBatch();
batchCount = 0;
entries.clear();
}
} else {
statement.executeUpdate();
Expand All @@ -180,6 +179,8 @@ private void doWrite(final LogEntry logEntry) throws SQLException {
resetConnection();
throw ex;
}
} else if (batch && entries.size() < MAX_BATCH_SIZE) {
entries.add(logEntry);
} else {
lostCount += 1;
}
Expand All @@ -192,10 +193,10 @@ private void doWrite(final LogEntry logEntry) throws SQLException {
* Database access failed
*/
private void doFlush() throws SQLException {
if (batchCount > 0) {
if (entries.size() > 0) {
try {
statement.executeBatch();
batchCount = 0;
entries.clear();
} catch (SQLException ex) {
resetConnection();
throw ex;
Expand All @@ -215,6 +216,10 @@ private void doClose() throws SQLException {
doFlush();
}
} finally {
if (!entries.isEmpty()) {
lostCount += entries.size();
}

if (lostCount > 0) {
InternalLogger.log(Level.ERROR, "Lost log entries due to broken database connection: " + lostCount);
}
Expand All @@ -237,8 +242,21 @@ private boolean checkConnection() {
try {
connection = connect(url, user, password);
statement = connection.prepareStatement(sql);
InternalLogger.log(Level.ERROR, "Lost log entries due to broken database connection: " + lostCount);
lostCount = 0;

if (!entries.isEmpty()) {
for (LogEntry entry : entries) {
applyLogEntry(entry);
statement.addBatch();
}
statement.executeBatch();
entries.clear();
}

if (lostCount > 0) {
InternalLogger.log(Level.ERROR, "Lost log entries due to broken database connection: " + lostCount);
lostCount = 0;
}

return true;
} catch (NamingException ex) {
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -266,8 +284,7 @@ private void resetConnection() {
if (reconnect) {
closeConnectionSilently();
statement = null;
lostCount = batch ? batchCount : 1;
batchCount = 0;
lostCount = batch ? 0 : 1;
reconnectTimestamp = 0;
}
}
Expand All @@ -289,13 +306,25 @@ private void closeConnectionSilently() {
}
}

/**
* Applies a log entry to the current {@link PreparedStatement}.
*
* @param logEntry Log entry to apply
* @throws SQLException Failed to apply the passed log entry
*/
private void applyLogEntry(final LogEntry logEntry) throws SQLException {
for (int i = 0; i < tokens.size(); ++i) {
tokens.get(i).apply(logEntry, statement, i + 1);
}
}

/**
* Establishes the connection to the database.
*
* @param url
* JDBC or data source URL
* @param user
* User name for login (can be {@code null} if no login is required)
* Username for login (can be {@code null} if no login is required)
* @param password
* Password for login (can be {@code null} if no login is required)
* @return Connection to the database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,10 @@ public void repairBrokenBatchedConnection() throws NamingException, SQLException
Thread.sleep(1000);

writer.write(LogEntryBuilder.empty().message("Three").create());
assertThat(systemStream.consumeErrorOutput()).containsOnlyOnce("ERROR").containsOnlyOnce("2");

writer.close();

assertThat(fetchTable(TABLE_NAME)).column("MESSAGE").containsValues("Three");
assertThat(fetchTable(TABLE_NAME)).column("MESSAGE").containsValues("One", "Two", "Three");
assertThat(systemStream.consumeErrorOutput()).isEmpty();
}

/**
Expand Down

0 comments on commit e234a16

Please sign in to comment.