Skip to content

Commit

Permalink
perform realloc for memory.grow by suspending other threads
Browse files Browse the repository at this point in the history
instead of allocating the max size for shared memory on instantiation.

while wasm threads proposal requires the max size specified for shared
memory, it's almost impossible to make a good estimation of max memory
at build time, especially for interpreter-like applications.
(eg. toywasm itself)

this change allows dynamic allocation of the shared memory for
such threaded applications. (by specifying a huge max memory)

on the other hand, this approach makes memory.grow very expensive.
this commit shorten CHECK_INTERRUPT_INTERVAL_MS to mitigate it a bit.
  • Loading branch information
yamt committed Apr 17, 2023
1 parent e00c774 commit b15ffac
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 4 deletions.
5 changes: 5 additions & 0 deletions cli/repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "nbio.h"
#include "repl.h"
#include "report.h"
#include "suspend.h"
#include "toywasm_version.h"
#include "type.h"
#include "usched.h"
Expand Down Expand Up @@ -790,13 +791,17 @@ exec_func(struct exec_context *ctx, uint32_t funcidx,
struct wasi_threads_instance *wasi_threads = state->wasi_threads;
if (wasi_threads != NULL) {
ctx->intrp = wasi_threads_interrupt_pointer(wasi_threads);
ctx->cluster = wasi_threads_cluster(wasi_threads);
#if defined(TOYWASM_USE_USER_SCHED)
ctx->sched = wasi_threads_sched(wasi_threads);
#endif
}
#endif
ret = instance_execute_func(ctx, funcidx, ptype, rtype, param, result);
while (ret == ETOYWASMRESTART) {
#if defined(TOYWASM_ENABLE_WASM_THREADS)
suspend_parked(ctx->cluster);
#endif
xlog_trace("%s: restarting execution\n", __func__);
#if defined(TOYWASM_USE_USER_SCHED)
struct sched *sched = ctx->sched;
Expand Down
1 change: 1 addition & 0 deletions cmake/ToywasmConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ option(TOYWASM_USE_SEPARATE_LOCALS "Separate locals and stack" ON)
option(TOYWASM_USE_SMALL_CELLS "Use smaller stack cells" ON)
option(TOYWASM_USE_RESULTTYPE_CELLIDX "Index local lookup for resulttype" ON)
option(TOYWASM_USE_LOCALTYPE_CELLIDX "Index local lookup for localtype" ON)
option(TOYWASM_PREALLOC_SHARED_MEMORY "Preallocate shared memory" OFF)
option(TOYWASM_ENABLE_WRITER "Enable module writer" ON)
option(TOYWASM_ENABLE_WASM_EXTENDED_CONST "Enable extended-const proposal" OFF)
option(TOYWASM_ENABLE_WASM_MULTI_MEMORY "Enable multi-memory proposal" OFF)
Expand Down
1 change: 1 addition & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ set(lib_core_sources
if(TOYWASM_ENABLE_WASM_THREADS)
list(APPEND lib_core_sources
"cluster.c"
"suspend.c"
"waitlist.c")
if(TOYWASM_USE_USER_SCHED)
list(APPEND lib_core_sources
Expand Down
8 changes: 8 additions & 0 deletions lib/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ cluster_init(struct cluster *c)
toywasm_cv_init(&c->cv);
c->nrunners = 0;
atomic_init(&c->interrupt, 0);

c->suspend_state = SUSPEND_STATE_NONE;
c->nparked = 0;
toywasm_cv_init(&c->stop_cv);
}

void
Expand All @@ -31,6 +35,7 @@ cluster_join(struct cluster *c)
void
cluster_add_thread(struct cluster *c)
{
/* XXX should park on SUSPEND_STATE_STOPPING? */
assert(c->nrunners < UINT32_MAX);
c->nrunners++;
}
Expand All @@ -43,4 +48,7 @@ cluster_remove_thread(struct cluster *c)
if (c->nrunners == 0) {
toywasm_cv_signal(&c->cv, &c->lock);
}
if (c->suspend_state == SUSPEND_STATE_STOPPING) {
toywasm_cv_signal(&c->stop_cv, &c->lock);
}
}
12 changes: 12 additions & 0 deletions lib/cluster.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>

#include "lock.h"

enum suspend_state {
SUSPEND_STATE_NONE = 0,
SUSPEND_STATE_STOPPING,
SUSPEND_STATE_RESUMING,
};

/*
* a group of instances.
* something similar to the "agent cluster" concept in web.
Expand All @@ -12,6 +19,11 @@ struct cluster {
TOYWASM_CV_DEFINE(cv);
uint32_t nrunners;
atomic_uint interrupt;

/* suspend */
_Atomic enum suspend_state suspend_state;
uint32_t nparked;
TOYWASM_CV_DEFINE(stop_cv);
};

void cluster_init(struct cluster *c);
Expand Down
5 changes: 3 additions & 2 deletions lib/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ struct trap_info {
#define ETOYWASMTRAP -1
#define ETOYWASMRESTART -2

#if defined(TOYWASM_USE_USER_SCHED)
/* use shorter interval for userlang thread */
/* use shorter interval for userland thread */
#if defined(TOYWASM_USE_USER_SCHED) || !defined(TOYWASM_PREALLOC_SHARED_MEMORY)
#define CHECK_INTERRUPT_INTERVAL_MS 50
#else
#define CHECK_INTERRUPT_INTERVAL_MS 300
Expand Down Expand Up @@ -169,6 +169,7 @@ struct exec_context {

/* check_interrupt() */
const atomic_uint *intrp;
struct cluster *cluster;

/* scheduler */
struct sched *sched;
Expand Down
45 changes: 43 additions & 2 deletions lib/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "nbio.h"
#include "platform.h"
#include "shared_memory_impl.h"
#include "suspend.h"
#include "timeutil.h"
#include "type.h"
#include "usched.h"
Expand Down Expand Up @@ -857,6 +858,14 @@ check_interrupt(struct exec_context *ctx)
return trap_with_id(ctx, TRAP_VOLUNTARY_THREAD_EXIT,
"interrupt");
}
#if defined(TOYWASM_ENABLE_WASM_THREADS)
if (ctx->cluster != NULL) {
int ret = suspend_check_interrupt(ctx->cluster);
if (ret != 0) {
return ret;
}
}
#endif /* defined(TOYWASM_ENABLE_WASM_THREADS) */
#if defined(TOYWASM_USE_USER_SCHED)
if (ctx->sched != NULL && sched_need_resched(ctx->sched)) {
xlog_trace("%s: need resched ctx %p", __func__, ctx);
Expand Down Expand Up @@ -1321,17 +1330,49 @@ memory_grow(struct exec_context *ctx, uint32_t memidx, uint32_t sz)
const struct module *m = inst->module;
assert(memidx < m->nimportedmems + m->nmems);
struct meminst *mi = VEC_ELEM(inst->mems, memidx);
const struct memtype *mt = mi->type;
const struct limits *lim = &mt->lim;
memory_lock(mi);
uint32_t orig_size = mi->size_in_pages;
uint32_t orig_size;
#if defined(TOYWASM_ENABLE_WASM_THREADS) && \
!defined(TOYWASM_PREALLOC_SHARED_MEMORY)
retry:
#endif /* !defined(TOYWASM_PREALLOC_SHARED_MEMORY) */
orig_size = mi->size_in_pages;
uint64_t new_size = (uint64_t)orig_size + sz;
const struct limits *lim = &mi->type->lim;
assert(lim->max <= WASM_MAX_PAGES);
if (new_size > lim->max) {
memory_unlock(mi);
return (uint32_t)-1; /* fail */
}
xlog_trace("memory grow %" PRIu32 " -> %" PRIu32, mi->size_in_pages,
(uint32_t)new_size);
#if defined(TOYWASM_ENABLE_WASM_THREADS) && \
!defined(TOYWASM_PREALLOC_SHARED_MEMORY)
if (new_size != orig_size && mi->shared != NULL) {
struct cluster *c = ctx->cluster;
int ret;
assert(c != NULL);
memory_unlock(mi);
suspend_threads(c);
memory_lock(mi);
if (mi->size_in_pages != orig_size) {
goto retry;
}
uint64_t need = new_size * WASM_PAGE_SIZE;
assert(need > mi->allocated);
ret = resize_array((void **)&mi->data, 1, need);
if (ret == 0) {
mi->allocated = need;
}
resume_threads(c);
if (ret != 0) {
memory_unlock(mi);
xlog_trace("%s: realloc failed", __func__);
return (uint32_t)-1; /* fail */
}
}
#endif /* !defined(TOYWASM_PREALLOC_SHARED_MEMORY) */
mi->size_in_pages = new_size;
memory_unlock(mi);
return orig_size; /* success */
Expand Down
8 changes: 8 additions & 0 deletions lib/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "instance.h"
#include "module.h"
#include "shared_memory_impl.h"
#include "suspend.h"
#include "type.h"
#include "util.h"
#include "xlog.h"
Expand Down Expand Up @@ -176,7 +177,11 @@ memory_instance_create(struct meminst **mip,
}
#if defined(TOYWASM_ENABLE_WASM_THREADS)
if ((mt->flags & MEMTYPE_FLAG_SHARED) != 0) {
#if defined(TOYWASM_PREALLOC_SHARED_MEMORY)
uint32_t need_in_pages = mt->lim.max;
#else
uint32_t need_in_pages = mt->lim.min;
#endif /* defined(TOYWASM_PREALLOC_SHARED_MEMORY) */
uint64_t need_in_bytes = need_in_pages * WASM_PAGE_SIZE;
if (need_in_bytes > UINT32_MAX) {
free(mp);
Expand Down Expand Up @@ -483,6 +488,9 @@ instance_create_execute_init(struct instance *inst, struct exec_context *ctx)
xlog_trace("%s: restarting execution of the start "
"function\n",
__func__);
#if defined(TOYWASM_ENABLE_WASM_THREADS)
suspend_parked(ctx->cluster);
#endif
ret = exec_expr_continue(ctx);
}
if (ret != 0) {
Expand Down
120 changes: 120 additions & 0 deletions lib/suspend.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include <assert.h>
#include <inttypes.h>

#include "cluster.h"
#include "context.h"
#include "suspend.h"
#include "timeutil.h"
#include "xlog.h"

#if !defined(TOYWASM_USE_USER_SCHED)
static void
parked(struct cluster *c) REQUIRES(c->lock)
{
c->nparked++;
xlog_trace("%s: parked %" PRIu32 " / %" PRIu32, __func__, c->nparked,
c->nrunners);
if (c->nrunners == c->nparked + 1) {
toywasm_cv_broadcast(&c->stop_cv, &c->lock);
}
while (c->suspend_state == SUSPEND_STATE_STOPPING) {
toywasm_cv_wait(&c->stop_cv, &c->lock);
}
xlog_trace("%s: parked %" PRIu32 " / %" PRIu32, __func__, c->nparked,
c->nrunners);
assert(c->nparked > 0);
c->nparked--;
assert(c->suspend_state == SUSPEND_STATE_RESUMING);
if (c->nparked == 0) {
c->suspend_state = SUSPEND_STATE_NONE;
toywasm_cv_broadcast(&c->stop_cv, &c->lock);
}
}
#endif /* !defined(TOYWASM_USE_USER_SCHED) */

int
suspend_check_interrupt(struct cluster *c)
{
#if !defined(TOYWASM_USE_USER_SCHED)
if (c->suspend_state == SUSPEND_STATE_STOPPING) {
xlog_trace("%s: restart", __func__);
return ETOYWASMRESTART;
}
#endif /* !defined(TOYWASM_USE_USER_SCHED) */
return 0;
}

void
suspend_parked(struct cluster *c)
{
#if !defined(TOYWASM_USE_USER_SCHED)
if (c == NULL) {
return;
}
if (c->suspend_state != SUSPEND_STATE_STOPPING) {
return;
}
xlog_trace("%s: parked", __func__);
toywasm_mutex_lock(&c->lock);
parked(c);
toywasm_mutex_unlock(&c->lock);
#endif /* !defined(TOYWASM_USE_USER_SCHED) */
}

void
suspend_threads(struct cluster *c)
{
#if !defined(TOYWASM_USE_USER_SCHED)
toywasm_mutex_lock(&c->lock);
retry:
if (c->suspend_state == SUSPEND_STATE_STOPPING) {
xlog_trace("%s: parking for the previous suspend", __func__);
parked(c);
goto retry;
}
if (c->suspend_state == SUSPEND_STATE_RESUMING) {
xlog_trace("%s: waitng for the previous suspend to complete",
__func__);
toywasm_cv_wait(&c->stop_cv, &c->lock);
goto retry;
}
assert(c->nparked == 0);
struct timespec start;
struct timespec end;
timespec_now(CLOCK_REALTIME, &start);
c->suspend_state = SUSPEND_STATE_STOPPING;
while (c->nrunners != c->nparked + 1) {
xlog_trace("%s: waiting %" PRIu32 " / %" PRIu32, __func__,
c->nparked, c->nrunners);
toywasm_cv_wait(&c->stop_cv, &c->lock);
}
timespec_now(CLOCK_REALTIME, &end);
if (timespec_cmp(&end, &start) > 0) {
struct timespec diff;
timespec_sub(&end, &start, &diff);
xlog_trace("%s: suspending %" PRIu32
" threads took %ju.%09lu seconds",
__func__, c->nrunners - 1, (uintmax_t)diff.tv_sec,
diff.tv_nsec);
}
toywasm_mutex_unlock(&c->lock);
#endif /* !defined(TOYWASM_USE_USER_SCHED) */
}

void
resume_threads(struct cluster *c)
{
#if !defined(TOYWASM_USE_USER_SCHED)
xlog_trace("%s: resuming", __func__);
toywasm_mutex_lock(&c->lock);
assert(c->suspend_state == SUSPEND_STATE_STOPPING);
assert(c->nrunners == c->nparked + 1);
if (c->nparked > 0) {
c->suspend_state = SUSPEND_STATE_RESUMING;
} else {
c->suspend_state = SUSPEND_STATE_NONE;
}
toywasm_cv_broadcast(&c->stop_cv, &c->lock);
toywasm_mutex_unlock(&c->lock);
#endif /* !defined(TOYWASM_USE_USER_SCHED) */
}
6 changes: 6 additions & 0 deletions lib/suspend.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
struct cluster;

int suspend_check_interrupt(struct cluster *c);
void suspend_parked(struct cluster *c);
void suspend_threads(struct cluster *c);
void resume_threads(struct cluster *c);
1 change: 1 addition & 0 deletions lib/toywasm_config.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const char *toywasm_config_string =
"\tTOYWASM_USE_SMALL_CELLS = @TOYWASM_USE_SMALL_CELLS@\n"
"\tTOYWASM_USE_RESULTTYPE_CELLIDX = @TOYWASM_USE_RESULTTYPE_CELLIDX@\n"
"\tTOYWASM_USE_LOCALTYPE_CELLIDX = @TOYWASM_USE_LOCALTYPE_CELLIDX@\n"
"\tTOYWASM_PREALLOC_SHARED_MEMORY = @TOYWASM_PREALLOC_SHARED_MEMORY@\n"
"\tTOYWASM_ENABLE_WRITER = @TOYWASM_ENABLE_WRITER@\n"
"\tTOYWASM_ENABLE_WASM_EXTENDED_CONST = @TOYWASM_ENABLE_WASM_EXTENDED_CONST@\n"
"\tTOYWASM_ENABLE_WASM_MULTI_MEMORY = @TOYWASM_ENABLE_WASM_MULTI_MEMORY@\n"
Expand Down
1 change: 1 addition & 0 deletions lib/toywasm_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#cmakedefine TOYWASM_USE_SMALL_CELLS
#cmakedefine TOYWASM_USE_RESULTTYPE_CELLIDX
#cmakedefine TOYWASM_USE_LOCALTYPE_CELLIDX
#cmakedefine TOYWASM_PREALLOC_SHARED_MEMORY
#cmakedefine TOYWASM_ENABLE_WRITER
#cmakedefine TOYWASM_ENABLE_WASM_EXTENDED_CONST
#cmakedefine TOYWASM_ENABLE_WASM_MULTI_MEMORY
Expand Down
Loading

0 comments on commit b15ffac

Please sign in to comment.