Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust heartbeat behavior #180

Open
wants to merge 6 commits into
base: v1.10.2+RAI
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3734,6 +3734,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
return recollect;
}

extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
{
JL_PROBE_GC_BEGIN(collection);
Expand Down Expand Up @@ -3775,6 +3778,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
// existence of the thread in the jl_n_threads count.
//
// TODO: concurrently queue objects
jl_heartbeat_pause();
jl_fence();
gc_n_threads = jl_atomic_load_acquire(&jl_n_threads);
gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states);
Expand Down Expand Up @@ -3806,6 +3810,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)

gc_n_threads = 0;
gc_all_tls_states = NULL;
jl_heartbeat_resume();
jl_safepoint_end_gc();
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
JL_PROBE_GC_END();
Expand Down
16 changes: 15 additions & 1 deletion src/stackwalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,20 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT
}

extern int gc_first_tid;
extern int jl_inside_heartbeat_thread(void);
extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr
// Print backtraces for all live tasks, for all threads, to jl_safe_printf
// stderr. This can take a _long_ time!
JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
{
// disable heartbeats to prevent heartbeat loss while running this,
// unless this is called from the heartbeat thread
if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_pause();
}
Comment on lines +1181 to +1183
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Why the if-check?

If a heartbeat loss causes us to print backtraces, could the backtraces we're printing cause another heartbeat loss?

I think I remember you said the answer is "no" when we discussed over zoom, but it would be nice to clarify that in the comments, here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I'd written that up. Sorry, I'll add a comment. If this is called from the heartbeat thread, there will be no additional heartbeat loss because it is the heartbeat thread itself that determines whether there is heartbeat loss. So when it returns from here, the counter for heartbeats missed is unchanged.


size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
int ctid = -1;
Expand Down Expand Up @@ -1232,6 +1242,10 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1);
}
jl_safe_printf("thread (%d) ++++ Done\n", ctid);

if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_resume();
}
}

#ifdef __cplusplus
Expand Down
54 changes: 50 additions & 4 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,40 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return 0;
}

// temporarily pause the heartbeat thread
JL_DLLEXPORT int jl_heartbeat_pause(void)
{
if (!heartbeat_enabled) {
return -1;
}
heartbeat_enabled = 0;
return 0;
}

// resume the paused heartbeat thread
JL_DLLEXPORT int jl_heartbeat_resume(void)
{
// cannot resume if the heartbeat thread is already running
if (heartbeat_enabled) {
return -1;
}

// cannot resume if we weren't paused (disabled != paused)
if (heartbeat_interval_s == 0) {
return -1;
}

// heartbeat thread must be ready
if (uv_sem_trywait(&heartbeat_off_sem) != 0) {
return -1;
}
n_hbs_missed = 0;
n_hbs_recvd = 0;
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved
heartbeat_enabled = 1;
uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread
return 0;
}

// heartbeat
JL_DLLEXPORT void jl_heartbeat(void)
{
Expand Down Expand Up @@ -1099,7 +1133,7 @@ void jl_heartbeat_threadfun(void *arg)
uv_sem_post(&heartbeat_off_sem);

// sleep the thread here; this semaphore is posted in
// jl_heartbeat_enable()
// jl_heartbeat_enable() or jl_heartbeat_resume()
uv_sem_wait(&heartbeat_on_sem);

// Set the sleep duration.
Expand All @@ -1111,7 +1145,7 @@ void jl_heartbeat_threadfun(void *arg)
// heartbeat is enabled; sleep, waiting for the desired interval
sleep_for(s, ns);

// if heartbeats were turned off while we were sleeping, reset
// if heartbeats were turned off/paused while we were sleeping, reset
if (!heartbeat_enabled) {
continue;
}
Expand All @@ -1122,13 +1156,15 @@ void jl_heartbeat_threadfun(void *arg)
tchb = jl_hrtime() - t0;

// adjust the next sleep duration based on how long the heartbeat
// check took
// check took, but if it took too long then use the normal duration
rs = 1;
while (tchb > 1e9) {
rs++;
tchb -= 1e9;
}
s = heartbeat_interval_s - rs;
if (rs < heartbeat_interval_s) {
s = heartbeat_interval_s - rs;
}
ns = 1e9 - tchb;
}
}
Expand All @@ -1150,6 +1186,16 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return -1;
}

JL_DLLEXPORT int jl_heartbeat_pause(void)
{
return -1;
}

JL_DLLEXPORT int jl_heartbeat_resume(void)
{
return -1;
}

JL_DLLEXPORT void jl_heartbeat(void)
{
}
Expand Down
Loading