Skip to content

Commit

Permalink
Change heartbeat thread controls
Browse files Browse the repository at this point in the history
When enabling heartbeats, the user must specify:
- heartbeat_s: jl_heartbeat() must be called at least once every heartbeat_s; if it
  isn't, a one-line heartbeat loss report is printed
- show_tasks_after_n: after these many heartbeat_s have passed without jl_heartbeat()
  being called, print task backtraces and stop all reporting
- reset_after_n: after these many heartbeat_s have passed with jl_heartbeat()
  being called, print a heartbeats recovered message and reset reporting
  • Loading branch information
kpamnany authored and RAI CI (GitHub Action Automation) committed Feb 20, 2024
1 parent d34f6f7 commit 5c6f1a1
Showing 1 changed file with 46 additions and 49 deletions.
95 changes: 46 additions & 49 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,9 @@ volatile int heartbeat_enabled;
uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread
heartbeat_off_sem; // thread -> jl_heartbeat_enable
int heartbeat_interval_s,
n_loss_reports,
reset_reporting_s;
int last_report_s, report_interval_s, n_reported;
tasks_after_n,
reset_tasks_after_n;
int tasks_showed, n_hbs_missed, n_hbs_recvd;
_Atomic(int) heartbeats;

JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT;
Expand All @@ -998,21 +998,19 @@ void jl_init_heartbeat(void)

// enable/disable heartbeats
// heartbeat_s: interval within which jl_heartbeat() must be called
// n_reports: for one heartbeat loss interval, how many times to report
// reset_reporting_after_s: how long to wait after a heartbeat loss
// interval and a return to steady heartbeats, before resetting
// reporting behavior
// show_tasks_after_n: number of heartbeats missed before printing task backtraces
// reset_after_n: number of heartbeats after which to reset
//
// When disabling heartbeats, the heartbeat thread must wake up,
// find out that heartbeats are now diabled, and reset. For now, we

Check warning on line 1005 in src/threading.c

View workflow job for this annotation

GitHub Actions / Check for new typos

perhaps "diabled" should be "disabled".
// handle this by preventing re-enabling of heartbeats until this
// completes.
JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports,
int reset_reporting_after_s)
JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
int reset_after_n)
{
if (heartbeat_s <= 0) {
heartbeat_enabled = 0;
heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0;
heartbeat_interval_s = tasks_after_n = reset_tasks_after_n = 0;
}
else {
// must disable before enabling
Expand All @@ -1026,10 +1024,11 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports,

jl_atomic_store_relaxed(&heartbeats, 0);
heartbeat_interval_s = heartbeat_s;
n_loss_reports = n_reports;
reset_reporting_s = reset_reporting_after_s;
last_report_s = 0;
report_interval_s = heartbeat_interval_s;
tasks_after_n = show_tasks_after_n;
reset_tasks_after_n = reset_after_n;
tasks_showed = 0;
n_hbs_missed = 0;
n_hbs_recvd = 0;
heartbeat_enabled = 1;
uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread
}
Expand Down Expand Up @@ -1065,44 +1064,42 @@ void sleep_for(int secs, int nsecs)
uint8_t check_heartbeats(uint8_t gc_state)
{
int hb = jl_atomic_exchange(&heartbeats, 0);
uint64_t curr_s = jl_hrtime() / 1e9;

if (hb <= 0) {
// we didn't get a heartbeat in the last interval; should we report?
if (n_reported < n_loss_reports &&
curr_s - last_report_s >= report_interval_s) {
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;

// exit GC-safe region to report then re-enter
jl_gc_safe_leave(ptls, gc_state);
jl_safe_printf("==== heartbeat loss ====\n");
jl_print_task_backtraces(0);
gc_state = jl_gc_safe_enter(ptls);

// we've reported
n_reported++;

// record the reporting time _after_ the report
last_report_s = jl_hrtime() / 1e9;

// double the reporting interval up to a maximum
if (report_interval_s < 60 * heartbeat_interval_s) {
report_interval_s *= 2;
// we didn't get a heartbeat
n_hbs_recvd = 0;
n_hbs_missed++;

// if we've printed task backtraces already, do nothing
if (!tasks_showed) {
// otherwise, at least show this message
jl_safe_printf("==== heartbeat loss (%ds) ====\n",
n_hbs_missed * heartbeat_interval_s);
// if we've missed enough heartbeats, print task backtraces
if (n_hbs_missed >= tasks_after_n) {
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;

// exit GC-safe region to report then re-enter
jl_gc_safe_leave(ptls, gc_state);
jl_print_task_backtraces(0);
gc_state = jl_gc_safe_enter(ptls);

// we printed task backtraces
tasks_showed = 1;
}
}
// no heartbeats, don't change reporting state
return gc_state;
}
else {
// we got a heartbeat; reset the report count
n_reported = 0;
}

// reset the reporting interval only once we're steadily getting
// heartbeats for the requested reset interval
if (curr_s - reset_reporting_s > last_report_s) {
report_interval_s = heartbeat_interval_s;
// got a heartbeat
n_hbs_recvd++;
// if we'd printed task backtraces, check for reset
if (tasks_showed && n_hbs_recvd >= reset_tasks_after_n) {
tasks_showed = 0;
jl_safe_printf("==== heartbeats recovered (lost for %ds) ====\n",
n_hbs_missed * heartbeat_interval_s);
}
n_hbs_missed = 0;
}

return gc_state;
Expand All @@ -1111,7 +1108,7 @@ uint8_t check_heartbeats(uint8_t gc_state)
// heartbeat thread function
void jl_heartbeat_threadfun(void *arg)
{
int s, ns = 1e9 - 1, rs;
int s = 59, ns = 1e9 - 1, rs;
uint64_t t0, tchb;

// We need a TLS because backtraces are accumulated into ptls->bt_size
Expand Down Expand Up @@ -1169,8 +1166,8 @@ void jl_init_heartbeat(void)
{
}

JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports,
int reset_reporting_after_s)
JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
int reset_after_n)
{
return -1;
}
Expand Down

0 comments on commit 5c6f1a1

Please sign in to comment.