Skip to content

Commit

Permalink
Merge pull request #7591 from rystsov/USE
Browse files Browse the repository at this point in the history
Fixes Unknown Server Errors (USE) in transactions
  • Loading branch information
piyushredpanda committed Dec 10, 2022
2 parents d3ae805 + 2c7e1be commit be14638
Show file tree
Hide file tree
Showing 39 changed files with 2,561 additions and 785 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ v_cc_library(
partition_probe.cc
id_allocator_stm.cc
persisted_stm.cc
tm_stm_cache.cc
tm_stm.cc
rm_stm.cc
tx_helpers.cc
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class controller_backend;
class controller_stm_shard;
class id_allocator_frontend;
class rm_partition_frontend;
class tm_stm_cache;
class tx_gateway_frontend;
class partition_leaders_table;
class partition_allocator;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ partition::partition(
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<cluster::tm_stm_cache>& tm_stm_cache,
config::binding<uint64_t> max_concurrent_producer_ids,
std::optional<s3::bucket_name> read_replica_bucket)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _tx_gateway_frontend(tx_gateway_frontend)
, _feature_table(feature_table)
, _tm_stm_cache(tm_stm_cache)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
config::shard_local_cfg().enable_idempotence.value()) {
Expand All @@ -57,7 +59,7 @@ partition::partition(

if (_is_tx_enabled) {
_tm_stm = ss::make_shared<cluster::tm_stm>(
clusterlog, _raft.get(), feature_table);
clusterlog, _raft.get(), feature_table, _tm_stm_cache);
stm_manager->add_stm(_tm_stm);
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class partition {
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&,
ss::sharded<features::feature_table>&,
ss::sharded<cluster::tm_stm_cache>&,
config::binding<uint64_t>,
std::optional<s3::bucket_name> read_replica_bucket = std::nullopt);

Expand Down Expand Up @@ -281,6 +282,7 @@ class partition {
partition_probe _probe;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<cluster::tm_stm_cache>& _tm_stm_cache;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ partition_manager::partition_manager(
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<cluster::tm_stm_cache>& tm_stm_cache,
config::binding<uint64_t> max_concurrent_producer_ids)
: _storage(storage.local())
, _raft_manager(raft)
Expand All @@ -61,6 +62,7 @@ partition_manager::partition_manager(
, _cloud_storage_api(cloud_storage_api)
, _cloud_storage_cache(cloud_storage_cache)
, _feature_table(feature_table)
, _tm_stm_cache(tm_stm_cache)
, _max_concurrent_producer_ids(max_concurrent_producer_ids) {}

partition_manager::ntp_table_container
Expand Down Expand Up @@ -157,6 +159,7 @@ ss::future<consensus_ptr> partition_manager::manage(
_cloud_storage_api,
_cloud_storage_cache,
_feature_table,
_tm_stm_cache,
_max_concurrent_producer_ids,
read_replica_bucket);

Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class partition_manager {
ss::sharded<cloud_storage::remote>&,
ss::sharded<cloud_storage::cache>&,
ss::sharded<features::feature_table>&,
ss::sharded<cluster::tm_stm_cache>&,
config::binding<uint64_t>);

using manage_cb_t
Expand Down Expand Up @@ -200,6 +201,7 @@ class partition_manager {
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<cloud_storage::cache>& _cloud_storage_cache;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<cluster::tm_stm_cache>& _tm_stm_cache;
ss::gate _gate;
bool _block_new_leadership{false};

Expand Down
31 changes: 21 additions & 10 deletions src/v/cluster/rm_partition_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,25 @@ ss::future<begin_tx_reply> rm_partition_frontend::begin_tx(

begin_tx_reply result;
if (leader == _self) {
vlog(
txlog.trace,
"executing name:begin_tx, ntp:{}, pid:{}, tx_seq:{} timeout:{} "
"locally",
ntp,
pid,
tx_seq,
transaction_timeout_ms);
result = co_await begin_tx_locally(
ntp, pid, tx_seq, transaction_timeout_ms);
vlog(
txlog.trace,
"received name:begin_tx, ntp:{}, pid:{}, tx_seq:{}, ec:{}, etag: "
"{} locally",
ntp,
pid,
tx_seq,
result.ec,
result.etag);
if (
result.ec == tx_errc::leader_not_found
|| result.ec == tx_errc::shard_not_found) {
Expand Down Expand Up @@ -159,11 +176,13 @@ ss::future<begin_tx_reply> rm_partition_frontend::begin_tx(

vlog(
txlog.trace,
"dispatching name:begin_tx, ntp:{}, pid:{}, tx_seq:{}, from:{}, "
"dispatching name:begin_tx, ntp:{}, pid:{}, tx_seq:{} timeout:{}, "
"from:{}, "
"to:{}",
ntp,
pid,
tx_seq,
transaction_timeout_ms,
_self,
leader);
result = co_await dispatch_begin_tx(
Expand Down Expand Up @@ -310,15 +329,7 @@ ss::future<begin_tx_reply> rm_partition_frontend::do_begin_tx(
return stm->begin_tx(pid, tx_seq, transaction_timeout_ms)
.then([ntp, topic_revision](checked<model::term_id, tx_errc> etag) {
if (!etag.has_value()) {
vlog(
txlog.warn,
"rm_stm::begin_tx({},...) failed with {}",
ntp,
etag.error());
if (etag.error() == tx_errc::leader_not_found) {
return begin_tx_reply{ntp, tx_errc::leader_not_found};
}
return begin_tx_reply{ntp, tx_errc::unknown_server_error};
return begin_tx_reply{ntp, etag.error()};
}
return begin_tx_reply{
ntp, etag.value(), tx_errc::none, topic_revision};
Expand Down
Loading

0 comments on commit be14638

Please sign in to comment.