Skip to content

Commit

Permalink
tree-wide: Use named semaphores for better debuggability.
Browse files Browse the repository at this point in the history
Named semaphore is an enhancement to the usual seastar::semaphore, which
is initialized with a name string that helps to identify which semaphore
it is; when the semaphore raises an exception (i.e. timed out, broken),
the name string is included in the error message.

Tree-wide commit, for two reasons:

1. We need to use named semaphores widely to get the debuggability benefit.
2. The coupling between the semaphore_units<> type and the named
   semaphore specialization causes the choice of semaphore to leak quite a
   bit, making a treewide change easier.

Fixes: redpanda-data#5489
  • Loading branch information
Aaron Fabbri committed Jul 27, 2022
1 parent 5bbdb83 commit 77f28f3
Show file tree
Hide file tree
Showing 70 changed files with 203 additions and 163 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class ntp_archiver {
ss::lowres_clock::duration _cloud_storage_initial_backoff;
ss::lowres_clock::duration _segment_upload_timeout;
ss::lowres_clock::duration _manifest_upload_timeout;
ss::semaphore _mutex{1};
ssx::semaphore _mutex{1, "archive/ntp"};
ss::lowres_clock::duration _upload_loop_initial_backoff;
ss::lowres_clock::duration _upload_loop_max_backoff;
config::binding<std::chrono::milliseconds> _sync_manifest_timeout;
Expand Down
1 change: 0 additions & 1 deletion src/v/archival/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/smp.hh>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/recursive_directory_walker.h"
#include "resource_mgmt/io_priority.h"
#include "seastarx.h"
#include "ssx/semaphore.h"
#include "units.h"

#include <seastar/core/future.hh>
Expand Down Expand Up @@ -124,7 +125,7 @@ class cache : public ss::peering_sharded_service<cache> {
uint64_t _total_cleaned;
/// Current size of the cache directory (only used on shard 0)
uint64_t _current_cache_size{0};
ss::semaphore _cleanup_sm{1};
ssx::semaphore _cleanup_sm{1, "cloud/cache"};
std::set<std::filesystem::path> _files_in_progress;
cache_probe probe;
access_time_tracker _access_time_tracker;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class controller_backend
ss::sharded<ss::abort_source>& _as;
underlying_t _topic_deltas;
ss::timer<> _housekeeping_timer;
ss::semaphore _topics_sem{1};
ssx::semaphore _topics_sem{1, "c/controller-be"};
ss::gate _gate;
/**
* This map is populated by backend instance on shard that given NTP is
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/drain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "reflection/adl.h"
#include "seastarx.h"
#include "serde/serde.h"
#include "ssx/semaphore.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -96,7 +97,7 @@ class drain_manager : public ss::peering_sharded_service<drain_manager> {
ss::sharded<cluster::partition_manager>& _partition_manager;
std::optional<ss::future<>> _drain;
bool _draining{false};
ss::semaphore _sem{0};
ssx::semaphore _sem{0, "c/drain-mgr"};
drain_status _status;
ss::abort_source _abort;
};
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void health_monitor_backend::abortable_refresh_request::abort() {
}

health_monitor_backend::abortable_refresh_request::abortable_refresh_request(
model::node_id leader_id, ss::gate::holder holder, ss::semaphore_units<> u)
model::node_id leader_id, ss::gate::holder holder, ssx::semaphore_units u)
: leader_id(leader_id)
, holder(std::move(holder))
, units(std::move(u)) {}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include "model/metadata.h"
#include "raft/consensus.h"
#include "rpc/fwd.h"
#include "ssx/semaphore.h"

#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -90,7 +90,7 @@ class health_monitor_backend {
struct abortable_refresh_request
: ss::enable_lw_shared_from_this<abortable_refresh_request> {
abortable_refresh_request(
model::node_id, ss::gate::holder, ss::semaphore_units<>);
model::node_id, ss::gate::holder, ssx::semaphore_units);

ss::future<std::error_code>
abortable_await(ss::future<std::error_code>);
Expand All @@ -99,7 +99,7 @@ class health_monitor_backend {
bool finished = false;
model::node_id leader_id;
ss::gate::holder holder;
ss::semaphore_units<> units;
ssx::semaphore_units units;
ss::promise<std::error_code> done;
};

Expand Down
1 change: 0 additions & 1 deletion src/v/coproc/script_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "utils/mutex.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>

namespace coproc {
Expand Down
2 changes: 1 addition & 1 deletion src/v/coproc/script_context_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using input_read_results = std::vector<process_batch_request::data>;
/// Arugments to pass to 'read_from_inputs', trivially copyable
struct input_read_args {
script_id id;
ss::semaphore& read_sem;
ssx::semaphore& read_sem;
ss::abort_source& abort_src;
routes_t& inputs;
};
Expand Down
5 changes: 3 additions & 2 deletions src/v/coproc/shared_script_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "coproc/sys_refs.h"
#include "random/simple_time_jitter.h"
#include "rpc/reconnect_transport.h"
#include "ssx/semaphore.h"
#include "utils/mutex.h"

#include <seastar/core/semaphore.hh>
Expand All @@ -37,8 +38,8 @@ struct shared_script_resources {
simple_time_jitter<ss::lowres_clock> jitter{1s};

/// Max amount of requests allowed to concurrently hold data in memory
ss::semaphore read_sem{
config::shard_local_cfg().coproc_max_ingest_bytes.value()};
ssx::semaphore read_sem{
config::shard_local_cfg().coproc_max_ingest_bytes.value(), "coproc/ssr"};

/// Underlying transport connection to the wasm engine
rpc::reconnect_transport transport;
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "kafka/protocol/list_offsets.h"
#include "kafka/types.h"
#include "net/unresolved_address.h"
#include "ssx/semaphore.h"
#include "utils/retry.h"

#include <seastar/core/condition-variable.hh>
Expand Down Expand Up @@ -61,7 +62,7 @@ class wait_or_start {

private:
func _func;
ss::semaphore _lock{1};
ssx::semaphore _lock{1, "k/client"};
};

class client {
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ ss::future<session_resources> connection_context::throttle_request(
return reserve_request_units(key, request_size);
})
.then([this, delay, track, tracker = std::move(tracker)](
ss::semaphore_units<> units) mutable {
ssx::semaphore_units units) mutable {
return server().get_request_unit().then(
[this,
delay,
mem_units = std::move(units),
track,
tracker = std::move(tracker)](
ss::semaphore_units<> qd_units) mutable {
ssx::semaphore_units qd_units) mutable {
session_resources r{
.backpressure_delay = delay.duration,
.memlocks = std::move(mem_units),
Expand All @@ -212,7 +212,7 @@ ss::future<session_resources> connection_context::throttle_request(
});
}

ss::future<ss::semaphore_units<>>
ss::future<ssx::semaphore_units>
connection_context::reserve_request_units(api_key key, size_t size) {
// Defer to the handler for the request type for the memory estimate, but
// if the request isn't found, use the default estimate (although in that
Expand Down
13 changes: 7 additions & 6 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include "security/acl.h"
#include "security/mtls.h"
#include "security/sasl_authentication.h"
#include "ssx/semaphore.h"
#include "utils/hdr_hist.h"
#include "utils/named_type.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>

#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -68,8 +68,8 @@ struct session_resources {
using pointer = ss::lw_shared_ptr<session_resources>;

ss::lowres_clock::duration backpressure_delay;
ss::semaphore_units<> memlocks;
ss::semaphore_units<> queue_units;
ssx::semaphore_units memlocks;
ssx::semaphore_units queue_units;
std::unique_ptr<hdr_hist::measurement> method_latency;
std::unique_ptr<request_tracker> tracker;
};
Expand Down Expand Up @@ -168,7 +168,7 @@ class connection_context final
// Reserve units from memory from the memory semaphore in proportion
// to the number of bytes the request procesisng is expected to
// take.
ss::future<ss::semaphore_units<>>
ss::future<ssx::semaphore_units>
reserve_request_units(api_key key, size_t size);

// Apply backpressure sequence, where the request processing may be
Expand All @@ -189,8 +189,9 @@ class connection_context final
* The future<> returned by this method resolves when all ready *and*
* in-order responses have been processed, which is not the same as all
* ready responses. In particular, responses which are ready may not be
* processed if there are earlier (lower sequence number) responses which
* are not yet ready: they will be processed by a future invocation.
* processed if there are earlier (lower sequence number) responses
* which are not yet ready: they will be processed by a future
* invocation.
*
* @return ss::future<> a future which as described above.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "model/namespace.h"
#include "raft/group_manager.h"
#include "seastarx.h"
#include "ssx/semaphore.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -195,7 +196,7 @@ class group_manager {

struct attached_partition {
bool loading;
ss::semaphore sem{1};
ssx::semaphore sem{1, "k/group-mgr"};
ss::abort_source as;
ss::lw_shared_ptr<cluster::partition> partition;
ss::basic_rwlock<> catchup_lock;
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ class protocol final : public net::server::protocol {
}
}

ss::future<ss::semaphore_units<>> get_request_unit() {
ss::future<ssx::semaphore_units> get_request_unit() {
if (_qdc_mon) {
return _qdc_mon->qdc.get_unit();
}
return ss::make_ready_future<ss::semaphore_units<>>(
ss::semaphore_units<>());
return ss::make_ready_future<ssx::semaphore_units>(
ssx::semaphore_units());
}

cluster::controller_api& controller_api() {
Expand Down
4 changes: 3 additions & 1 deletion src/v/net/batched_output_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include "net/batched_output_stream.h"

#include "likely.h"
#include "ssx/semaphore.h"
#include "vassert.h"

#include <seastar/core/future.hh>
#include <seastar/core/scattered_message.hh>
#include <seastar/core/semaphore.hh>

#include <fmt/format.h>

Expand All @@ -23,7 +25,7 @@ batched_output_stream::batched_output_stream(
ss::output_stream<char> o, size_t cache)
: _out(std::move(o))
, _cache_size(cache)
, _write_sem(std::make_unique<ss::semaphore>(1)) {
, _write_sem(std::make_unique<ssx::semaphore>(1, "net/batch-ostream")) {
// Size zero reserved for identifying default-initialized
// instances in stop()
vassert(_cache_size > 0, "Size must be > 0");
Expand Down
3 changes: 2 additions & 1 deletion src/v/net/batched_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "seastarx.h"
#include "ssx/semaphore.h"

#include <seastar/core/iostream.hh>
#include <seastar/core/semaphore.hh>
Expand Down Expand Up @@ -73,7 +74,7 @@ class batched_output_stream {

ss::output_stream<char> _out;
size_t _cache_size{0};
std::unique_ptr<ss::semaphore> _write_sem;
std::unique_ptr<ssx::semaphore> _write_sem;
size_t _unflushed_bytes{0};
bool _closed = false;
};
Expand Down
6 changes: 3 additions & 3 deletions src/v/net/connection_rate_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

#include "net/server_probe.h"
#include "seastarx.h"
#include "ssx/semaphore.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/util/bool_class.hh>

using namespace std::chrono_literals;
Expand All @@ -31,7 +31,7 @@ class connection_rate_counter {
explicit connection_rate_counter(int64_t max_rate) noexcept
: max_tokens(max_rate)
, one_token_time(std::max(1000 / max_tokens, 1l))
, bucket(max_rate)
, bucket(max_rate, "conn-rate")
, last_update_time(Clock::now()) {}

void break_semaphore() { bucket.broken(); }
Expand Down Expand Up @@ -87,7 +87,7 @@ class connection_rate_counter {
private:
bool need_wait() { return avaiable_new_connections() == 0; }

ss::basic_semaphore<ss::semaphore_default_exception_factory, Clock> bucket;
ssx::named_semaphore<Clock> bucket;
typename Clock::time_point last_update_time;
};

Expand Down
3 changes: 2 additions & 1 deletion src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "seastar/core/coroutine.hh"
#include "ssx/future-util.h"
#include "ssx/metrics.h"
#include "ssx/semaphore.h"
#include "ssx/sformat.h"
#include "vassert.h"
#include "vlog.h"
Expand All @@ -32,7 +33,7 @@ namespace net {

server::server(server_configuration c)
: cfg(std::move(c))
, _memory(cfg.max_service_memory_per_core)
, _memory{size_t{static_cast<size_t>(cfg.max_service_memory_per_core)}, "net/server-mem"}
, _public_metrics(ssx::metrics::public_metrics_handle) {}

server::server(ss::sharded<server_configuration>* s)
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class server {
ss::lw_shared_ptr<net::connection> conn;

server_probe& probe() { return _s->_probe; }
ss::semaphore& memory() { return _s->_memory; }
ssx::semaphore& memory() { return _s->_memory; }
hdr_hist& hist() { return _s->_hist; }
ss::gate& conn_gate() { return _s->_conn_gate; }
ss::abort_source& abort_source() { return _s->_as; }
Expand Down Expand Up @@ -179,7 +179,7 @@ class server {
void setup_public_metrics();

std::unique_ptr<protocol> _proto;
ss::semaphore _memory;
ssx::semaphore _memory;
std::vector<std::unique_ptr<listener>> _listeners;
boost::intrusive::list<net::connection> _connections;
ss::abort_source _as;
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ proxy::proxy(
size_t max_memory,
ss::sharded<kafka::client::client>& client)
: _config(config)
, _mem_sem(max_memory)
, _mem_sem(max_memory, "pproxy/mem")
, _client(client)
, _ctx{{{}, _mem_sem, {}, smp_sg}, *this}
, _server(
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class proxy {

private:
configuration _config;
ss::semaphore _mem_sem;
ssx::semaphore _mem_sem;
ss::sharded<kafka::client::client>& _client;
ctx_server<proxy>::context_t _ctx;
ctx_server<proxy> _server;
Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "random/simple_time_jitter.h"
#include "ssx/semaphore.h"
#include "utils/retry.h"

namespace pandaproxy::schema_registry {
Expand Down Expand Up @@ -136,13 +137,13 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

/// Serialize wait_for operations, to avoid issuing
/// gratuitous number of reads to the topic on concurrent GETs.
ss::semaphore _wait_for_sem{1};
ssx::semaphore _wait_for_sem{1, "pproxy/schema-wait"};

/// Shard 0 only: Reads have progressed as far as this offset
model::offset _loaded_offset{-1};

/// Shard 0 only: Serialize write operations.
ss::semaphore _write_sem{1};
ssx::semaphore _write_sem{1, "pproxy/schema-write"};

// ======================
// End of Shard 0 state
Expand Down
Loading

0 comments on commit 77f28f3

Please sign in to comment.