Skip to content

Commit

Permalink
Use named semaphores for better debuggability.
Browse files Browse the repository at this point in the history
Named semaphore is an enhancement to the usual seastar::semaphore, which
is initialized with a name string that helps to identify which semaphore
it is; when the semaphore raises an exception (i.e. timed out, broken),
the name string is included in the error message.

Ended up doing a single tree-wide commit, versus picking out the
different uses and the areas their types propagate to. As you can see in
the patch the coupling between the semaphore_units<> type and the named
semaphore specialization causes the choice of semaphore to leak quite a
bit, making a treewide change easier.

Fixes: redpanda-data#5489
  • Loading branch information
Aaron Fabbri committed Jul 15, 2022
1 parent df45155 commit 16c5baf
Show file tree
Hide file tree
Showing 69 changed files with 256 additions and 168 deletions.
3 changes: 2 additions & 1 deletion src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ class ntp_archiver {
ss::lowres_clock::duration _cloud_storage_initial_backoff;
ss::lowres_clock::duration _segment_upload_timeout;
ss::lowres_clock::duration _manifest_upload_timeout;
ss::semaphore _mutex{1};
ss::named_semaphore _mutex{
1, ss::named_semaphore_exception_factory{"ntp-archiver"}};
ss::lowres_clock::duration _upload_loop_initial_backoff;
ss::lowres_clock::duration _upload_loop_max_backoff;
config::binding<std::chrono::milliseconds> _sync_manifest_timeout;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ class cache : public ss::peering_sharded_service<cache> {
uint64_t _total_cleaned;
/// Current size of the cache directory (only used on shard 0)
uint64_t _current_cache_size{0};
ss::semaphore _cleanup_sm{1};
ss::named_semaphore _cleanup_sm{
1, ss::named_semaphore_exception_factory{"cloud-cache"}};
std::set<std::filesystem::path> _files_in_progress;
cache_probe probe;
access_time_tracker _access_time_tracker;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>
Expand Down Expand Up @@ -351,7 +352,8 @@ class controller_backend
ss::sharded<ss::abort_source>& _as;
underlying_t _topic_deltas;
ss::timer<> _housekeeping_timer;
ss::semaphore _topics_sem{1};
ss::named_semaphore _topics_sem{
1, ss::named_semaphore_exception_factory{"controller-be"}};
ss::gate _gate;
/**
* This map is populated by backend instance on shard that given NTP is
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/drain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class drain_manager : public ss::peering_sharded_service<drain_manager> {
ss::sharded<cluster::partition_manager>& _partition_manager;
std::optional<ss::future<>> _drain;
bool _draining{false};
ss::semaphore _sem{0};
ss::named_semaphore _sem{
0, ss::named_semaphore_exception_factory{"drain-mgr"}};
drain_status _status;
ss::abort_source _abort;
};
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ void health_monitor_backend::abortable_refresh_request::abort() {
}

health_monitor_backend::abortable_refresh_request::abortable_refresh_request(
model::node_id leader_id, ss::gate::holder holder, ss::semaphore_units<> u)
model::node_id leader_id,
ss::gate::holder holder,
ssx::named_semaphore_units u)
: leader_id(leader_id)
, holder(std::move(holder))
, units(std::move(u)) {}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include "model/metadata.h"
#include "raft/consensus.h"
#include "rpc/fwd.h"
#include "ssx/sugar.h"

#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -90,7 +90,7 @@ class health_monitor_backend {
struct abortable_refresh_request
: ss::enable_lw_shared_from_this<abortable_refresh_request> {
abortable_refresh_request(
model::node_id, ss::gate::holder, ss::semaphore_units<>);
model::node_id, ss::gate::holder, ssx::named_semaphore_units);

ss::future<std::error_code>
abortable_await(ss::future<std::error_code>);
Expand All @@ -99,7 +99,7 @@ class health_monitor_backend {
bool finished = false;
model::node_id leader_id;
ss::gate::holder holder;
ss::semaphore_units<> units;
ssx::named_semaphore_units units;
ss::promise<std::error_code> done;
};

Expand Down
1 change: 0 additions & 1 deletion src/v/coproc/script_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "utils/mutex.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>

namespace coproc {
Expand Down
2 changes: 1 addition & 1 deletion src/v/coproc/script_context_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using input_read_results = std::vector<process_batch_request::data>;
/// Arugments to pass to 'read_from_inputs', trivially copyable
struct input_read_args {
script_id id;
ss::semaphore& read_sem;
ss::named_semaphore& read_sem;
ss::abort_source& abort_src;
routes_t& inputs;
};
Expand Down
5 changes: 3 additions & 2 deletions src/v/coproc/shared_script_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ struct shared_script_resources {
simple_time_jitter<ss::lowres_clock> jitter{1s};

/// Max amount of requests allowed to concurrently hold data in memory
ss::semaphore read_sem{
config::shard_local_cfg().coproc_max_ingest_bytes.value()};
ss::named_semaphore read_sem{
config::shard_local_cfg().coproc_max_ingest_bytes.value(),
ss::named_semaphore_exception_factory{"coproc-ssr"}};

/// Underlying transport connection to the wasm engine
rpc::reconnect_transport transport;
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class wait_or_start {

private:
func _func;
ss::semaphore _lock{1};
ss::named_semaphore _lock{
1, ss::named_semaphore_exception_factory{"k-client"}};
};

class client {
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ ss::future<session_resources> connection_context::throttle_request(
return reserve_request_units(key, request_size);
})
.then([this, delay, track, tracker = std::move(tracker)](
ss::semaphore_units<> units) mutable {
ssx::named_semaphore_units units) mutable {
return server().get_request_unit().then(
[this,
delay,
mem_units = std::move(units),
track,
tracker = std::move(tracker)](
ss::semaphore_units<> qd_units) mutable {
ssx::named_semaphore_units qd_units) mutable {
session_resources r{
.backpressure_delay = delay.duration,
.memlocks = std::move(mem_units),
Expand All @@ -212,7 +212,7 @@ ss::future<session_resources> connection_context::throttle_request(
});
}

ss::future<ss::semaphore_units<>>
ss::future<ssx::named_semaphore_units>
connection_context::reserve_request_units(api_key key, size_t size) {
// Defer to the handler for the request type for the memory estimate, but
// if the request isn't found, use the default estimate (although in that
Expand Down
13 changes: 7 additions & 6 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include "security/acl.h"
#include "security/mtls.h"
#include "security/sasl_authentication.h"
#include "ssx/sugar.h"
#include "utils/hdr_hist.h"
#include "utils/named_type.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>

#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -68,8 +68,8 @@ struct session_resources {
using pointer = ss::lw_shared_ptr<session_resources>;

ss::lowres_clock::duration backpressure_delay;
ss::semaphore_units<> memlocks;
ss::semaphore_units<> queue_units;
ssx::named_semaphore_units memlocks;
ssx::named_semaphore_units queue_units;
std::unique_ptr<hdr_hist::measurement> method_latency;
std::unique_ptr<request_tracker> tracker;
};
Expand Down Expand Up @@ -168,7 +168,7 @@ class connection_context final
// Reserve units from memory from the memory semaphore in proportion
// to the number of bytes the request procesisng is expected to
// take.
ss::future<ss::semaphore_units<>>
ss::future<ssx::named_semaphore_units>
reserve_request_units(api_key key, size_t size);

// Apply backpressure sequence, where the request processing may be
Expand All @@ -189,8 +189,9 @@ class connection_context final
* The future<> returned by this method resolves when all ready *and*
* in-order responses have been processed, which is not the same as all
* ready responses. In particular, responses which are ready may not be
* processed if there are earlier (lower sequence number) responses which
* are not yet ready: they will be processed by a future invocation.
* processed if there are earlier (lower sequence number) responses
* which are not yet ready: they will be processed by a future
* invocation.
*
* @return ss::future<> a future which as described above.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ class group_manager {

struct attached_partition {
bool loading;
ss::semaphore sem{1};
ss::named_semaphore sem{
1, ss::named_semaphore_exception_factory{"k-group-mgr"}};
ss::abort_source as;
ss::lw_shared_ptr<cluster::partition> partition;
ss::basic_rwlock<> catchup_lock;
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ class protocol final : public net::server::protocol {
}
}

ss::future<ss::semaphore_units<>> get_request_unit() {
ss::future<ssx::named_semaphore_units> get_request_unit() {
if (_qdc_mon) {
return _qdc_mon->qdc.get_unit();
}
return ss::make_ready_future<ss::semaphore_units<>>(
ss::semaphore_units<>());
return ss::make_ready_future<ssx::named_semaphore_units>(
ssx::named_semaphore_units());
}

cluster::controller_api& controller_api() {
Expand Down
3 changes: 2 additions & 1 deletion src/v/net/batched_output_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ batched_output_stream::batched_output_stream(
ss::output_stream<char> o, size_t cache)
: _out(std::move(o))
, _cache_size(cache)
, _write_sem(std::make_unique<ss::semaphore>(1)) {
, _write_sem(std::make_unique<ss::named_semaphore>(
1, ss::named_semaphore_exception_factory{"batch-ostream"})) {
// Size zero reserved for identifying default-initialized
// instances in stop()
vassert(_cache_size > 0, "Size must be > 0");
Expand Down
2 changes: 1 addition & 1 deletion src/v/net/batched_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class batched_output_stream {

ss::output_stream<char> _out;
size_t _cache_size{0};
std::unique_ptr<ss::semaphore> _write_sem;
std::unique_ptr<ss::named_semaphore> _write_sem;
size_t _unflushed_bytes{0};
bool _closed = false;
};
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/connection_rate_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class connection_rate_counter {
explicit connection_rate_counter(int64_t max_rate) noexcept
: max_tokens(max_rate)
, one_token_time(std::max(1000 / max_tokens, 1l))
, bucket(max_rate)
, bucket(max_rate, ss::named_semaphore_exception_factory{"conn-rate"})
, last_update_time(Clock::now()) {}

void break_semaphore() { bucket.broken(); }
Expand Down Expand Up @@ -87,7 +87,7 @@ class connection_rate_counter {
private:
bool need_wait() { return avaiable_new_connections() == 0; }

ss::basic_semaphore<ss::semaphore_default_exception_factory, Clock> bucket;
ss::basic_semaphore<ss::named_semaphore_exception_factory, Clock> bucket;
typename Clock::time_point last_update_time;
};

Expand Down
5 changes: 4 additions & 1 deletion src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
#include <seastar/core/loop.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/net/api.hh>
#include <seastar/util/later.hh>

namespace net {

server::server(server_configuration c)
: cfg(std::move(c))
, _memory(cfg.max_service_memory_per_core)
, _memory(
cfg.max_service_memory_per_core,
ss::named_semaphore_exception_factory{"server-mem"})
, _public_metrics(ssx::metrics::public_metrics_handle) {}

server::server(ss::sharded<server_configuration>* s)
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class server {
ss::lw_shared_ptr<net::connection> conn;

server_probe& probe() { return _s->_probe; }
ss::semaphore& memory() { return _s->_memory; }
ss::named_semaphore& memory() { return _s->_memory; }
hdr_hist& hist() { return _s->_hist; }
ss::gate& conn_gate() { return _s->_conn_gate; }
ss::abort_source& abort_source() { return _s->_as; }
Expand Down Expand Up @@ -198,7 +198,7 @@ class server {
void setup_public_metrics();

std::unique_ptr<protocol> _proto;
ss::semaphore _memory;
ss::named_semaphore _memory;
std::vector<std::unique_ptr<listener>> _listeners;
boost::intrusive::list<net::connection> _connections;
ss::abort_source _as;
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ proxy::proxy(
size_t max_memory,
ss::sharded<kafka::client::client>& client)
: _config(config)
, _mem_sem(max_memory)
, _mem_sem(max_memory, ss::named_semaphore_exception_factory{"proxy-mem"})
, _client(client)
, _ctx{{{}, _mem_sem, {}, smp_sg}, *this}
, _server(
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class proxy {

private:
configuration _config;
ss::semaphore _mem_sem;
ss::named_semaphore _mem_sem;
ss::sharded<kafka::client::client>& _client;
ctx_server<proxy>::context_t _ctx;
ctx_server<proxy> _server;
Expand Down
6 changes: 4 additions & 2 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

/// Serialize wait_for operations, to avoid issuing
/// gratuitous number of reads to the topic on concurrent GETs.
ss::semaphore _wait_for_sem{1};
ss::named_semaphore _wait_for_sem{
1, ss::named_semaphore_exception_factory{"schema-wait"}};

/// Shard 0 only: Reads have progressed as far as this offset
model::offset _loaded_offset{-1};

/// Shard 0 only: Serialize write operations.
ss::semaphore _write_sem{1};
ss::named_semaphore _write_sem{
1, ss::named_semaphore_exception_factory{"schema-write"}};

// ======================
// End of Shard 0 state
Expand Down
3 changes: 2 additions & 1 deletion src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/std-coroutine.hh>
#include <seastar/http/api_docs.hh>

Expand Down Expand Up @@ -220,7 +221,7 @@ service::service(
sharded_store& store,
ss::sharded<seq_writer>& sequencer)
: _config(config)
, _mem_sem(max_memory)
, _mem_sem(max_memory, ss::named_semaphore_exception_factory{"schema-svc"})
, _client(client)
, _ctx{{{}, _mem_sem, {}, smp_sg}, *this}
, _server(
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class service {
ss::future<> create_internal_topic();
ss::future<> fetch_internal_topic();
configuration _config;
ss::semaphore _mem_sem;
ss::named_semaphore _mem_sem;
ss::gate _gate;
ss::sharded<kafka::client::client>& _client;
ctx_server<service>::context_t _ctx;
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/schema_registry/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/types.h"
#include "seastarx.h"
#include "ssx/sugar.h"

#include <seastar/core/semaphore.hh>

Expand Down Expand Up @@ -71,7 +72,7 @@ make_schema_definition(std::string_view sv) {
/// Successive calls to operator()() will restart the process.
class one_shot {
enum class state { empty, started, available };
using futurator = ss::futurize<ss::semaphore_units<>>;
using futurator = ss::futurize<ssx::named_semaphore_units>;

public:
explicit one_shot(ss::noncopyable_function<ss::future<>()> func)
Expand All @@ -87,7 +88,8 @@ class one_shot {
units.release();
auto ex = f.get_exception();
_started_sem.broken(ex);
_started_sem = ss::semaphore{0};
_started_sem = ss::named_semaphore{
0, ss::named_semaphore_exception_factory{"oneshot"}};
return futurator::make_exception_future(ex);
}

Expand All @@ -98,7 +100,8 @@ class one_shot {

private:
ss::noncopyable_function<ss::future<>()> _func;
ss::semaphore _started_sem{0};
ss::named_semaphore _started_sem{
0, ss::named_semaphore_exception_factory{"oneshot"}};
};

} // namespace pandaproxy::schema_registry
Loading

0 comments on commit 16c5baf

Please sign in to comment.