Skip to content

Commit

Permalink
Merge pull request redpanda-data#7909 from rystsov/issue-7827-fix
Browse files Browse the repository at this point in the history
Increase timeout in graceful_leadership_transfer_test to fix 7827
  • Loading branch information
rystsov committed Dec 31, 2022
2 parents 106db47 + 7018b7a commit a6a021b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
57 changes: 41 additions & 16 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,19 @@ tx_gateway_frontend::do_end_txn(

checked<cluster::tm_transaction, tx_errc> r(tx_errc::unknown_server_error);
if (request.committed) {
if (tx.status == tm_transaction::tx_status::killed) {
vlog(
txlog.warn,
"can't commit an expired tx:{} pid:{} etag:{} tx_seq:{} in "
"term:{}",
tx.id,
tx.pid,
tx.etag,
tx.tx_seq,
term);
outcome->set_value(tx_errc::fenced);
co_return tx_errc::fenced;
}
bool is_status_ok = false;
if (is_fetch_tx_supported()) {
is_status_ok = tx.status == tm_transaction::tx_status::ongoing
Expand Down Expand Up @@ -1746,6 +1759,19 @@ tx_gateway_frontend::do_end_txn(
co_return tx_errc::unknown_server_error;
}
} else {
if (tx.status == tm_transaction::tx_status::killed) {
vlog(
txlog.warn,
"can't abort an expired tx:{} pid:{} etag:{} tx_seq:{} in "
"term:{}",
tx.id,
tx.pid,
tx.etag,
tx.tx_seq,
term);
outcome->set_value(tx_errc::fenced);
co_return tx_errc::fenced;
}
try {
r = co_await do_abort_tm_tx(term, stm, tx, timeout);
} catch (...) {
Expand Down Expand Up @@ -1887,17 +1913,11 @@ tx_gateway_frontend::do_abort_tm_tx(
}

if (!is_fetch_tx_supported()) {
if (
tx.status != tm_transaction::tx_status::ongoing
&& tx.status != tm_transaction::tx_status::killed) {
if (tx.status != tm_transaction::tx_status::ongoing) {
co_return tx_errc::invalid_txn_state;
}
} else if (
tx.status == tm_transaction::tx_status::aborting
|| tx.status == tm_transaction::tx_status::killed) {
if (tx.etag == expected_term) {
// a retried abort
} else {
} else if (tx.status == tm_transaction::tx_status::aborting) {
if (tx.etag != expected_term) {
vlog(
txlog.trace,
"abort encountered old aborted tx:{} etag:{} pid:{} tx_seq:{} "
Expand Down Expand Up @@ -1927,9 +1947,7 @@ tx_gateway_frontend::do_abort_tm_tx(
tx.pid);
co_return tx_errc::unknown_server_error;
}
if (
old_tx.status == tm_transaction::tx_status::aborting
|| tx.status == tm_transaction::tx_status::killed) {
if (old_tx.status == tm_transaction::tx_status::aborting) {
if (old_tx.tx_seq != tx.tx_seq) {
vlog(
txlog.warn,
Expand Down Expand Up @@ -1960,7 +1978,7 @@ tx_gateway_frontend::do_abort_tm_tx(
} else {
vlog(
txlog.warn,
"fetched status:{} isn't aborting, killed nor ongoing",
"fetched status:{} isn't aborting nor ongoing",
old_tx.status);
co_return tx_errc::unknown_server_error;
}
Expand Down Expand Up @@ -2104,8 +2122,7 @@ tx_gateway_frontend::do_abort_tm_tx(

if (
tx.status != tm_transaction::tx_status::ongoing
&& tx.status != tm_transaction::tx_status::aborting
&& tx.status != tm_transaction::tx_status::killed) {
&& tx.status != tm_transaction::tx_status::aborting) {
vlog(
txlog.warn,
"abort encontered a tx with unexpected status:{} (tx:{} etag:{} "
Expand Down Expand Up @@ -2932,7 +2949,15 @@ tx_gateway_frontend::get_ongoing_tx(
// from the client perspective it will look like a tx wasn't
// failed at all but in fact the second part of the tx will
// start a new transactions
co_return tx_errc::invalid_txn_state;
vlog(
txlog.warn,
"can't modify an expired tx:{} pid:{} etag:{} tx_seq:{} in term:{}",
tx.id,
tx.pid,
tx.etag,
tx.tx_seq,
expected_term);
co_return tx_errc::fenced;
} else {
// A previous transaction has failed after its status has been
// decided, rolling it forward.
Expand Down
33 changes: 30 additions & 3 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from rptest.services.cluster import cluster
from rptest.util import wait_until_result
from rptest.clients.types import TopicSpec
from time import sleep

import uuid

Expand Down Expand Up @@ -194,15 +195,15 @@ def rejoin_member_test(self):
assert False, "send_offsetes should fail"
except ck.cimpl.KafkaException as e:
kafka_error = e.args[0]
assert kafka_error.code() == ck.cimpl.KafkaError.INVALID_TXN_STATE
assert kafka_error.code() == ck.cimpl.KafkaError._FENCED

try:
# if abort fails an app should recreate a producer otherwise
# it may continue to use the original producer
producer.abort_transaction()
except ck.cimpl.KafkaException as e:
kafka_error = e.args[0]
assert kafka_error.code() == ck.cimpl.KafkaError.INVALID_TXN_STATE
assert kafka_error.code() == ck.cimpl.KafkaError._FENCED

@cluster(num_nodes=3)
def change_static_member_test(self):
Expand Down Expand Up @@ -258,13 +259,39 @@ def change_static_member_test(self):

producer.abort_transaction()

@cluster(num_nodes=3)
def expired_tx_test(self):
producer = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
'transaction.timeout.ms': 5000,
})

producer.init_transactions()
producer.begin_transaction()

for i in range(0, 10):
producer.produce(self.input_t.name,
str(i),
str(i),
partition=0,
on_delivery=self.on_delivery)
producer.flush()
sleep(10)
try:
producer.commit_transaction()
assert False, "tx is expected to be expired"
except ck.cimpl.KafkaException as e:
kafka_error = e.args[0]
assert kafka_error.code() == ck.cimpl.KafkaError._FENCED

@cluster(num_nodes=3)
def graceful_leadership_transfer_test(self):

producer = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
'transaction.timeout.ms': 10000,
'transaction.timeout.ms': 60000,
})

producer.init_transactions()
Expand Down

0 comments on commit a6a021b

Please sign in to comment.