From 198dce1c24e824e61a3e9b73018801a6bc712cef Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 21 Dec 2022 18:27:07 -0800 Subject: [PATCH 1/4] txn: fail abort on expired transaction Kafka doesn't allow to about an already expired transaction, updating Redpanda to do the same --- src/v/cluster/tx_gateway_frontend.cc | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index f3bbdfb8db91..8c52af67c3b8 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1887,17 +1887,11 @@ tx_gateway_frontend::do_abort_tm_tx( } if (!is_fetch_tx_supported()) { - if ( - tx.status != tm_transaction::tx_status::ongoing - && tx.status != tm_transaction::tx_status::killed) { + if (tx.status != tm_transaction::tx_status::ongoing) { co_return tx_errc::invalid_txn_state; } - } else if ( - tx.status == tm_transaction::tx_status::aborting - || tx.status == tm_transaction::tx_status::killed) { - if (tx.etag == expected_term) { - // a retried abort - } else { + } else if (tx.status == tm_transaction::tx_status::aborting) { + if (tx.etag != expected_term) { vlog( txlog.trace, "abort encountered old aborted tx:{} etag:{} pid:{} tx_seq:{} " @@ -1927,9 +1921,7 @@ tx_gateway_frontend::do_abort_tm_tx( tx.pid); co_return tx_errc::unknown_server_error; } - if ( - old_tx.status == tm_transaction::tx_status::aborting - || tx.status == tm_transaction::tx_status::killed) { + if (old_tx.status == tm_transaction::tx_status::aborting) { if (old_tx.tx_seq != tx.tx_seq) { vlog( txlog.warn, @@ -1960,7 +1952,7 @@ tx_gateway_frontend::do_abort_tm_tx( } else { vlog( txlog.warn, - "fetched status:{} isn't aborting, killed nor ongoing", + "fetched status:{} isn't aborting nor ongoing", old_tx.status); co_return tx_errc::unknown_server_error; } @@ -2104,8 +2096,7 @@ tx_gateway_frontend::do_abort_tm_tx( if ( tx.status != tm_transaction::tx_status::ongoing - && tx.status != tm_transaction::tx_status::aborting - && tx.status != tm_transaction::tx_status::killed) { + && tx.status != tm_transaction::tx_status::aborting) { vlog( txlog.warn, "abort encontered a tx with unexpected status:{} (tx:{} etag:{} " From 3d900af1f48cf6cd44fba2ac57fe59737e8b837e Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 21 Dec 2022 19:07:42 -0800 Subject: [PATCH 2/4] txn: make ops on an expired tx return invalid_producer_epoch Kafka returns invalid_producer_epoch when an operation is attempted on an expired transaction. Fixing Redpanda to do the same --- src/v/cluster/tx_gateway_frontend.cc | 36 +++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 8c52af67c3b8..d8a2dd7a8d28 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1706,6 +1706,19 @@ tx_gateway_frontend::do_end_txn( checked r(tx_errc::unknown_server_error); if (request.committed) { + if (tx.status == tm_transaction::tx_status::killed) { + vlog( + txlog.warn, + "can't commit an expired tx:{} pid:{} etag:{} tx_seq:{} in " + "term:{}", + tx.id, + tx.pid, + tx.etag, + tx.tx_seq, + term); + outcome->set_value(tx_errc::fenced); + co_return tx_errc::fenced; + } bool is_status_ok = false; if (is_fetch_tx_supported()) { is_status_ok = tx.status == tm_transaction::tx_status::ongoing @@ -1746,6 +1759,19 @@ tx_gateway_frontend::do_end_txn( co_return tx_errc::unknown_server_error; } } else { + if (tx.status == tm_transaction::tx_status::killed) { + vlog( + txlog.warn, + "can't abort an expired tx:{} pid:{} etag:{} tx_seq:{} in " + "term:{}", + tx.id, + tx.pid, + tx.etag, + tx.tx_seq, + term); + outcome->set_value(tx_errc::fenced); + co_return tx_errc::fenced; + } try { r = co_await do_abort_tm_tx(term, stm, tx, timeout); } catch (...) { @@ -2923,7 +2949,15 @@ tx_gateway_frontend::get_ongoing_tx( // from the client perspective it will look like a tx wasn't // failed at all but in fact the second part of the tx will // start a new transactions - co_return tx_errc::invalid_txn_state; + vlog( + txlog.warn, + "can't modify an expired tx:{} pid:{} etag:{} tx_seq:{} in term:{}", + tx.id, + tx.pid, + tx.etag, + tx.tx_seq, + expected_term); + co_return tx_errc::fenced; } else { // A previous transaction has failed after its status has been // decided, rolling it forward. From b048ee5b947ff47ecec06650982be84358917422 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 21 Dec 2022 21:28:33 -0800 Subject: [PATCH 3/4] ducky: add tx expiration test --- tests/rptest/tests/transactions_test.py | 31 +++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/transactions_test.py b/tests/rptest/tests/transactions_test.py index 85043646c9e8..41d2cfe30b20 100644 --- a/tests/rptest/tests/transactions_test.py +++ b/tests/rptest/tests/transactions_test.py @@ -10,6 +10,7 @@ from rptest.services.cluster import cluster from rptest.util import wait_until_result from rptest.clients.types import TopicSpec +from time import sleep import uuid @@ -194,7 +195,7 @@ def rejoin_member_test(self): assert False, "send_offsetes should fail" except ck.cimpl.KafkaException as e: kafka_error = e.args[0] - assert kafka_error.code() == ck.cimpl.KafkaError.INVALID_TXN_STATE + assert kafka_error.code() == ck.cimpl.KafkaError._FENCED try: # if abort fails an app should recreate a producer otherwise @@ -202,7 +203,7 @@ def rejoin_member_test(self): producer.abort_transaction() except ck.cimpl.KafkaException as e: kafka_error = e.args[0] - assert kafka_error.code() == ck.cimpl.KafkaError.INVALID_TXN_STATE + assert kafka_error.code() == ck.cimpl.KafkaError._FENCED @cluster(num_nodes=3) def change_static_member_test(self): @@ -258,6 +259,32 @@ def change_static_member_test(self): producer.abort_transaction() + @cluster(num_nodes=3) + def expired_tx_test(self): + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': '0', + 'transaction.timeout.ms': 5000, + }) + + producer.init_transactions() + producer.begin_transaction() + + for i in range(0, 10): + producer.produce(self.input_t.name, + str(i), + str(i), + partition=0, + on_delivery=self.on_delivery) + producer.flush() + sleep(10) + try: + producer.commit_transaction() + assert False, "tx is expected to be expired" + except ck.cimpl.KafkaException as e: + kafka_error = e.args[0] + assert kafka_error.code() == ck.cimpl.KafkaError._FENCED + @cluster(num_nodes=3) def graceful_leadership_transfer_test(self): From 7018b7ae00cb471c7fa44469aa6febf1f8d55ca0 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 21 Dec 2022 21:29:25 -0800 Subject: [PATCH 4/4] ducky: increase timeout to fix 7827 sometimes two leadership transfers take more than 10s and when it happens a tx times out causing graceful_leadership_transfer_test to fail fixes https://github.com/redpanda-data/redpanda/issues/7827 --- tests/rptest/tests/transactions_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/tests/transactions_test.py b/tests/rptest/tests/transactions_test.py index 41d2cfe30b20..c7bf1fc123ae 100644 --- a/tests/rptest/tests/transactions_test.py +++ b/tests/rptest/tests/transactions_test.py @@ -291,7 +291,7 @@ def graceful_leadership_transfer_test(self): producer = ck.Producer({ 'bootstrap.servers': self.redpanda.brokers(), 'transactional.id': '0', - 'transaction.timeout.ms': 10000, + 'transaction.timeout.ms': 60000, }) producer.init_transactions()