Skip to content

Commit

Permalink
wip: new k/groups logger
Browse files Browse the repository at this point in the history
  • Loading branch information
travisdowns committed Apr 26, 2022
1 parent 2d7406a commit ccf8cf9
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 108 deletions.
162 changes: 82 additions & 80 deletions src/v/kafka/server/group.cc

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ class group {
_offsets;
model::violation_recovery_policy _recovery_policy;
ctx_log _ctxlog;
ctx_log _ctx_glog;
ctx_log _ctx_txlog;
group_metadata_serializer _md_serializer;

Expand Down
38 changes: 19 additions & 19 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ ss::future<> group_manager::stop() {
}

void group_manager::detach_partition(const model::ntp& ntp) {
klog.debug("detaching group metadata partition {}", ntp);
kgrouplog.debug("detaching group metadata partition {}", ntp);
ssx::spawn_with_gate(_gate, [this, _ntp{ntp}]() -> ss::future<> {
auto ntp(_ntp);
auto it = _partitions.find(ntp);
Expand All @@ -154,7 +154,7 @@ void group_manager::detach_partition(const model::ntp& ntp) {
}

void group_manager::attach_partition(ss::lw_shared_ptr<cluster::partition> p) {
klog.debug("attaching group metadata partition {}", p->ntp());
kgrouplog.debug("attaching group metadata partition {}", p->ntp());
auto attached = ss::make_lw_shared<attached_partition>(p);
auto res = _partitions.try_emplace(p->ntp(), attached);
// TODO: this is not a forever assertion. this should just generally never
Expand Down Expand Up @@ -191,7 +191,7 @@ ss::future<> group_manager::cleanup_removed_topic_partitions(
if (it->second != g) {
return ss::now();
}
vlog(klog.trace, "Removed group {}", g);
vlog(kgrouplog.trace, "Removed group {}", g);
_groups.erase(it);
_groups.rehash(0);
return ss::now();
Expand Down Expand Up @@ -228,7 +228,7 @@ void group_manager::handle_topic_delta(
});
})
.handle_exception([](std::exception_ptr e) {
vlog(klog.warn, "Topic clean-up encountered error: {}", e);
vlog(kgrouplog.warn, "Topic clean-up encountered error: {}", e);
});
}

Expand Down Expand Up @@ -402,7 +402,7 @@ ss::future<> group_manager::recover_partition(
if (group_stm.has_data()) {
auto group = get_group(group_id);
vlog(
klog.info,
kgrouplog.info,
"Recovering {} - {}",
group_id,
group_stm.get_metadata());
Expand Down Expand Up @@ -468,7 +468,7 @@ ss::future<> group_manager::recover_partition(
for (auto& [group_id, group_stm] : ctx.groups) {
if (group_stm.is_removed()) {
if (_groups.contains(group_id) && group_stm.offsets().size() > 0) {
klog.warn(
kgrouplog.warn(
"Unexpected active group unload {} loading {}",
group_id,
p->partition->ntp());
Expand All @@ -493,7 +493,7 @@ group_manager::join_group(join_group_request&& r) {
r.data.session_timeout_ms < _conf.group_min_session_timeout_ms()
|| r.data.session_timeout_ms > _conf.group_max_session_timeout_ms()) {
vlog(
klog.trace,
kgrouplog.trace,
"Join group {} rejected for invalid session timeout {} valid range "
"[{},{}]. Request {}",
r.data.group_id,
Expand All @@ -513,7 +513,7 @@ group_manager::join_group(join_group_request&& r) {
// not exist we should reject the request.</kafka>
if (r.data.member_id != unknown_member_id) {
vlog(
klog.trace,
kgrouplog.trace,
"Join group {} rejected for known member {} joining unknown "
"group. Request {}",
r.data.group_id,
Expand All @@ -529,7 +529,7 @@ group_manager::join_group(join_group_request&& r) {
// gone. this is generally not going to be a scenario that can
// happen until we have rebalancing / partition deletion feature.
vlog(
klog.trace,
kgrouplog.trace,
"Join group {} rejected for unavailable ntp {}",
r.data.group_id,
r.ntp);
Expand All @@ -548,7 +548,7 @@ group_manager::join_group(join_group_request&& r) {
_groups.emplace(r.data.group_id, group);
_groups.rehash(0);
is_new_group = true;
vlog(klog.trace, "Created new group {} while joining", r.data.group_id);
vlog(kgrouplog.trace, "Created new group {} while joining", r.data.group_id);
}

return group->handle_join_group(std::move(r), is_new_group)
Expand All @@ -558,7 +558,7 @@ group_manager::join_group(join_group_request&& r) {
ss::future<sync_group_response>
group_manager::sync_group(sync_group_request&& r) {
if (r.data.group_instance_id) {
vlog(klog.trace, "Static group membership is not supported");
vlog(kgrouplog.trace, "Static group membership is not supported");
return make_sync_error(error_code::unsupported_version);
}

Expand All @@ -583,7 +583,7 @@ group_manager::sync_group(sync_group_request&& r) {
return group->handle_sync_group(std::move(r)).finally([group] {});
} else {
vlog(
klog.trace,
kgrouplog.trace,
"Cannot handle sync group request for unknown group {}",
r.data.group_id);
return make_sync_error(error_code::unknown_member_id);
Expand All @@ -592,7 +592,7 @@ group_manager::sync_group(sync_group_request&& r) {

ss::future<heartbeat_response> group_manager::heartbeat(heartbeat_request&& r) {
if (r.data.group_instance_id) {
vlog(klog.trace, "Static group membership is not supported");
vlog(kgrouplog.trace, "Static group membership is not supported");
return make_heartbeat_error(error_code::unsupported_version);
}

Expand All @@ -613,7 +613,7 @@ ss::future<heartbeat_response> group_manager::heartbeat(heartbeat_request&& r) {
}

vlog(
klog.trace,
kgrouplog.trace,
"Cannot handle heartbeat request for unknown group {}",
r.data.group_id);

Expand All @@ -633,7 +633,7 @@ group_manager::leave_group(leave_group_request&& r) {
return group->handle_leave_group(std::move(r)).finally([group] {});
} else {
vlog(
klog.trace,
kgrouplog.trace,
"Cannot handle leave group request for unknown group {}",
r.data.group_id);
return make_leave_error(error_code::unknown_member_id);
Expand Down Expand Up @@ -1009,14 +1009,14 @@ error_code group_manager::validate_group_status(
const model::ntp& ntp, const group_id& group, api_key api) {
if (!valid_group_id(group, api)) {
vlog(
klog.trace, "Group name {} is invalid for operation {}", group, api);
kgrouplog.trace, "Group name {} is invalid for operation {}", group, api);
return error_code::invalid_group_id;
}

if (const auto it = _partitions.find(ntp); it != _partitions.end()) {
if (!it->second->partition->is_leader()) {
vlog(
klog.trace,
kgrouplog.trace,
"Group {} operation {} sent to non-leader coordinator {}",
group,
api,
Expand All @@ -1026,7 +1026,7 @@ error_code group_manager::validate_group_status(

if (it->second->loading) {
vlog(
klog.trace,
kgrouplog.trace,
"Group {} operation {} sent to loading coordinator {}",
group,
api,
Expand All @@ -1051,7 +1051,7 @@ error_code group_manager::validate_group_status(
}

vlog(
klog.trace,
kgrouplog.trace,
"Group {} operation {} misdirected to non-coordinator {}",
group,
api,
Expand Down
8 changes: 4 additions & 4 deletions src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ group_recovery_consumer::operator()(model::record_batch batch) {
group_it->second.try_set_fence(cmd.pid.get_id(), cmd.pid.get_epoch());
co_return ss::stop_iteration::no;
} else {
vlog(klog.trace, "ignoring batch with type {}", batch.header().type);
vlog(kgrouplog.trace, "ignoring batch with type {}", batch.header().type);
co_return ss::stop_iteration::no;
}
}
Expand All @@ -140,14 +140,14 @@ void group_recovery_consumer::handle_record(model::record r) {
__builtin_unreachable();
} catch (...) {
vlog(
klog.error,
kgrouplog.error,
"error handling record record - {}",
std::current_exception());
}
}

void group_recovery_consumer::handle_group_metadata(group_metadata_kv md) {
vlog(klog.trace, "Recovering group metadata {}", md.key.group_id);
vlog(kgrouplog.trace, "Recovering group metadata {}", md.key.group_id);

if (md.value) {
// until we switch over to a compacted topic or use raft snapshots,
Expand All @@ -166,7 +166,7 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) {
model::topic_partition tp(md.key.topic, md.key.partition);
if (md.value) {
vlog(
klog.trace,
kgrouplog.trace,
"Recovering offset {}/{} with metadata {}",
md.key.topic,
md.key.partition,
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
if (total_max_bytes > max_bytes_per_fetch) {
auto per_partition = max_bytes_per_fetch / ntp_fetch_configs.size();
vlog(
klog.debug,
klog.warn,
"Fetch requested very large response ({}), clamping each partition's "
"max_bytes to {} bytes",
total_max_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/list_groups.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ss::future<response_ptr> list_groups_handler::handle(
request_context ctx, [[maybe_unused]] ss::smp_service_group g) {
list_groups_request request{};
request.decode(ctx.reader(), ctx.header().version);
vlog(klog.trace, "Handling request {}", request);
vlog(kgrouplog.trace, "Handling request {}", request);

auto&& [error, groups] = co_await ctx.groups().list_groups();

Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ list_offsets_handler::handle(request_context ctx, ss::smp_service_group ssg) {
list_offsets_request request;
request.decode(ctx.reader(), ctx.header().version);
request.compute_duplicate_topics();
vlog(klog.trace, "Handling request {}", request);
vlog(kgrouplog.trace, "Handling request {}", request);

auto unauthorized_it = std::partition(
request.data.topics.begin(),
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@

namespace kafka {
ss::logger klog("kafka");
ss::logger kgrouplog("k/groups");
} // namespace kafka
3 changes: 3 additions & 0 deletions src/v/kafka/server/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
#include <seastar/util/log.hh>

namespace kafka {
// logger for kafka requests other than group membership
extern ss::logger klog;
// the logger for group membership
extern ss::logger kgrouplog;
} // namespace kafka
4 changes: 2 additions & 2 deletions src/v/kafka/server/rm_group_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ss::future<bool> try_create_consumer_group_topic(
vassert(res.size() == 1, "expected exactly one result");
if (res[0].ec != cluster::errc::success) {
vlog(
klog.warn,
kgrouplog.warn,
"can not create {}/{} topic - error: {}",
mapper.ns()(),
mapper.topic()(),
Expand All @@ -69,7 +69,7 @@ ss::future<bool> try_create_consumer_group_topic(
})
.handle_exception([&mapper](const std::exception_ptr& e) {
vlog(
klog.warn,
kgrouplog.warn,
"can not create {}/{} topic - exception: {}",
mapper.ns()(),
mapper.topic()(),
Expand Down

0 comments on commit ccf8cf9

Please sign in to comment.