threading: removed feature wait_on_done to figure out causes of deadlock in windows AVX
This commit is contained in:
parent
aac7f7cc04
commit
08972d2aee
5 changed files with 38 additions and 82 deletions
|
@ -167,7 +167,6 @@ struct ggml_compute_state_shared {
|
||||||
|
|
||||||
// commands.
|
// commands.
|
||||||
atomic_bool wait_now;
|
atomic_bool wait_now;
|
||||||
atomic_bool wait_on_done;
|
|
||||||
atomic_bool stop;
|
atomic_bool stop;
|
||||||
|
|
||||||
// Default task runner, can be overriden by node.task_profile.runner.
|
// Default task runner, can be overriden by node.task_profile.runner.
|
||||||
|
@ -263,11 +262,12 @@ void ggml_threading_suspend(struct ggml_threading_context *ctx) {
|
||||||
|
|
||||||
struct ggml_compute_state_shared *shared = &ctx->shared;
|
struct ggml_compute_state_shared *shared = &ctx->shared;
|
||||||
|
|
||||||
|
const int n_worker_threads = ctx->n_threads - 1;
|
||||||
|
|
||||||
ggml_spin_lock(&shared->spin);
|
ggml_spin_lock(&shared->spin);
|
||||||
shared->wait_now = true;
|
shared->wait_now = true;
|
||||||
ggml_spin_unlock(&ctx->shared.spin);
|
ggml_spin_unlock(&ctx->shared.spin);
|
||||||
|
|
||||||
const int n_worker_threads = ctx->n_threads - 1;
|
|
||||||
while (shared->n_waiting != n_worker_threads) {
|
while (shared->n_waiting != n_worker_threads) {
|
||||||
ggml_spin_pause();
|
ggml_spin_pause();
|
||||||
}
|
}
|
||||||
|
@ -281,8 +281,8 @@ void ggml_threading_suspend(struct ggml_threading_context *ctx) {
|
||||||
|
|
||||||
// Wakeup all workers.
|
// Wakeup all workers.
|
||||||
//
|
//
|
||||||
// Workers takes some time to wakeup, and has to lock spin after wakeup. Yield
|
// Workers takes some time to wakeup.
|
||||||
// is used to avoid signal frequently. Current implementation is highly
|
// Yield is used to avoid notify frequently. Current implementation is highly
|
||||||
// experimental. See tests/test-ggml-threading.c for details.
|
// experimental. See tests/test-ggml-threading.c for details.
|
||||||
void ggml_threading_resume(struct ggml_threading_context *ctx) {
|
void ggml_threading_resume(struct ggml_threading_context *ctx) {
|
||||||
if (ctx->n_threads == 1) {
|
if (ctx->n_threads == 1) {
|
||||||
|
@ -302,14 +302,20 @@ void ggml_threading_resume(struct ggml_threading_context *ctx) {
|
||||||
// Dead lock detection.
|
// Dead lock detection.
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
int64_t last_notify_ms = 0;
|
int64_t last_notify_ms = 0;
|
||||||
const int max_notify_count = ctx->n_threads - 1;
|
const int max_notify_count = 50;
|
||||||
const int max_duration_ms = 100 * max_notify_count;
|
const int max_duration_ms = 1000;
|
||||||
|
|
||||||
|
if (shared->n_waiting != 0 && shared->n_waiting != ctx->n_threads - 1) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"[ggml-threading] expected n_waiting is 0 or %d, actual %d, abort\n",
|
||||||
|
ctx->n_threads - 1, shared->n_waiting);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
ggml_spin_lock(&shared->spin);
|
ggml_spin_lock(&shared->spin);
|
||||||
shared->wait_now = false;
|
shared->wait_now = false;
|
||||||
|
|
||||||
while (shared->n_waiting != 0) {
|
while (shared->n_waiting != 0) {
|
||||||
|
|
||||||
GGML_ASSERT(pthread_mutex_lock(&shared->mutex) == 0);
|
GGML_ASSERT(pthread_mutex_lock(&shared->mutex) == 0);
|
||||||
if (shared->n_waiting == 0) {
|
if (shared->n_waiting == 0) {
|
||||||
GGML_ASSERT(pthread_mutex_unlock(&shared->mutex) == 0);
|
GGML_ASSERT(pthread_mutex_unlock(&shared->mutex) == 0);
|
||||||
|
@ -323,7 +329,11 @@ void ggml_threading_resume(struct ggml_threading_context *ctx) {
|
||||||
GGML_ASSERT(pthread_mutex_unlock(&shared->mutex) == 0);
|
GGML_ASSERT(pthread_mutex_unlock(&shared->mutex) == 0);
|
||||||
last_notify_ms = ggml_time_ms();
|
last_notify_ms = ggml_time_ms();
|
||||||
|
|
||||||
sched_yield();
|
if (counter % 1 == 0) {
|
||||||
|
ggml_spin_pause();
|
||||||
|
} else {
|
||||||
|
sched_yield();
|
||||||
|
}
|
||||||
|
|
||||||
int elapsed = last_notify_ms > 0 ? ggml_time_ms() - last_notify_ms : 0;
|
int elapsed = last_notify_ms > 0 ? ggml_time_ms() - last_notify_ms : 0;
|
||||||
|
|
||||||
|
@ -372,24 +382,6 @@ static void ggml_threading_setup_workers(struct ggml_threading_context *ctx,
|
||||||
ggml_threading_resume(ctx);
|
ggml_threading_resume(ctx);
|
||||||
ggml_spin_lock(&shared->spin);
|
ggml_spin_lock(&shared->spin);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((ctx->features & GGML_THREADING_FEATURE_WAIT_ON_DONE) > 0) {
|
|
||||||
// Optimize energy: wait_on_done. We MAY also check following nodes,
|
|
||||||
// but that's a bit complicated.
|
|
||||||
shared->wait_on_done = false;
|
|
||||||
for (int i = type + 1; i <= GGML_TASK_FINALIZE; i++) {
|
|
||||||
struct ggml_task_stage *next = &profile->stages[i];
|
|
||||||
if (next->parallel) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (next->wait) {
|
|
||||||
shared->wait_on_done = true;
|
|
||||||
PRINT_DEBUG("[main] wait_on_done is enabled for "
|
|
||||||
"current task stage\n");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (current->wait) {
|
} else if (current->wait) {
|
||||||
if (shared->n_waiting < n_worker_threads) {
|
if (shared->n_waiting < n_worker_threads) {
|
||||||
ggml_spin_unlock(&ctx->shared.spin);
|
ggml_spin_unlock(&ctx->shared.spin);
|
||||||
|
@ -437,17 +429,7 @@ ggml_thread_ret_t ggml_threading_graph_compute_thread(void *data) {
|
||||||
state->has_work = false;
|
state->has_work = false;
|
||||||
shared->n_tasks--;
|
shared->n_tasks--;
|
||||||
|
|
||||||
bool wait = shared->wait_on_done && !state->has_work;
|
|
||||||
if (wait) {
|
|
||||||
ggml_threading_cond_wait(state);
|
|
||||||
}
|
|
||||||
|
|
||||||
ggml_spin_unlock(&shared->spin);
|
ggml_spin_unlock(&shared->spin);
|
||||||
|
|
||||||
// no need to pause.
|
|
||||||
if (wait) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ggml_spin_pause();
|
ggml_spin_pause();
|
||||||
|
@ -594,11 +576,11 @@ ggml_threading_start(int n_threads, ggml_threading_thread_runner *thread_runner,
|
||||||
.n_tasks = 0,
|
.n_tasks = 0,
|
||||||
.n_waiting = 0,
|
.n_waiting = 0,
|
||||||
.wait_now = false,
|
.wait_now = false,
|
||||||
.wait_on_done = false,
|
|
||||||
.stop = false,
|
.stop = false,
|
||||||
.task_runner = task_runner,
|
.task_runner = task_runner,
|
||||||
.ctx = ctx,
|
.ctx = ctx,
|
||||||
};
|
};
|
||||||
|
atomic_flag_clear(&ctx->shared.spin);
|
||||||
|
|
||||||
PRINT_DEBUG("[main] thread start, features: %d\n", features);
|
PRINT_DEBUG("[main] thread start, features: %d\n", features);
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,7 @@ struct ggml_threading_context;
|
||||||
// Optional (experimental) features.
|
// Optional (experimental) features.
|
||||||
enum ggml_threading_features {
|
enum ggml_threading_features {
|
||||||
GGML_THREADING_FEATURE_NONE = 0,
|
GGML_THREADING_FEATURE_NONE = 0,
|
||||||
GGML_THREADING_FEATURE_WAIT_ON_DONE = 1 << 0,
|
GGML_THREADING_FEATURE_PERF = 1,
|
||||||
GGML_THREADING_FEATURE_PERF = 1 << 1,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// The thread runner to feed into OS threads.
|
// The thread runner to feed into OS threads.
|
||||||
|
|
|
@ -802,7 +802,7 @@ bool ggml_mulmat_tune_bench(struct ggml_mulmat_tune *tune,
|
||||||
|
|
||||||
struct ggml_threading_context *thrd_ctx =
|
struct ggml_threading_context *thrd_ctx =
|
||||||
ggml_threading_start(tune->n_threads, NULL, NULL,
|
ggml_threading_start(tune->n_threads, NULL, NULL,
|
||||||
GGML_THREADING_FEATURE_WAIT_ON_DONE, stages_time);
|
GGML_THREADING_FEATURE_NONE, stages_time);
|
||||||
|
|
||||||
for (int i_shape = 0; i_shape < tune->n_shapes; i_shape++) {
|
for (int i_shape = 0; i_shape < tune->n_shapes; i_shape++) {
|
||||||
const struct ggml_mulmat_tune_shape *shape = &tune->shapes[i_shape];
|
const struct ggml_mulmat_tune_shape *shape = &tune->shapes[i_shape];
|
||||||
|
|
4
ggml.c
4
ggml.c
|
@ -16203,14 +16203,14 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ~ 50 us
|
// ~ 50 us
|
||||||
//printf("=== prepare computing took %d us\n", (int)(ggml_time_us() - t0));
|
// printf("=== prepare computing took %d us\n", (int)(ggml_time_us() - t0));
|
||||||
}
|
}
|
||||||
|
|
||||||
const int64_t perf_start_cycles = ggml_perf_cycles();
|
const int64_t perf_start_cycles = ggml_perf_cycles();
|
||||||
const int64_t perf_start_time_us = ggml_perf_time_us();
|
const int64_t perf_start_time_us = ggml_perf_time_us();
|
||||||
|
|
||||||
struct ggml_threading_context *thrd_ctx = ggml_threading_start(n_threads,
|
struct ggml_threading_context *thrd_ctx = ggml_threading_start(n_threads,
|
||||||
NULL, ggml_compute_forward, GGML_THREADING_FEATURE_WAIT_ON_DONE, NULL);
|
NULL, ggml_compute_forward, GGML_THREADING_FEATURE_NONE, NULL);
|
||||||
|
|
||||||
for (int i = 0; i < cgraph->n_nodes; i++) {
|
for (int i = 0; i < cgraph->n_nodes; i++) {
|
||||||
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes);
|
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes);
|
||||||
|
|
|
@ -71,12 +71,7 @@ static int test_driver(int id, struct ggml_tensor *node, int n_threads) {
|
||||||
work_done_arr[i] = 0;
|
work_done_arr[i] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool wait_on_done = (node->task_profile.dev_flags[0] > 0u);
|
|
||||||
|
|
||||||
enum ggml_threading_features features = GGML_THREADING_FEATURE_PERF;
|
enum ggml_threading_features features = GGML_THREADING_FEATURE_PERF;
|
||||||
if (wait_on_done) {
|
|
||||||
features |= GGML_THREADING_FEATURE_WAIT_ON_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
int t0 = (int)ggml_time_us();
|
int t0 = (int)ggml_time_us();
|
||||||
|
|
||||||
|
@ -118,9 +113,9 @@ static int test_driver(int id, struct ggml_tensor *node, int n_threads) {
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("\tstage-0: parallel: %d, wait: %d\n\tstage-1: parallel: %d, wait: "
|
printf("\tstage-0: parallel: %d, wait: %d\n\tstage-1: parallel: %d, wait: "
|
||||||
"%d, wait_on_done: %d %s\n",
|
"%d\n",
|
||||||
stages[0].parallel, stages[0].wait, stages[1].parallel,
|
stages[0].parallel, stages[0].wait, stages[1].parallel,
|
||||||
stages[1].wait, wait_on_done, stages[1].wait ? "<--------" : "");
|
stages[1].wait);
|
||||||
|
|
||||||
if (actual == expect) {
|
if (actual == expect) {
|
||||||
printf("\tthreading: init %6.3f ms, compute %6.3f ms, cleanup %6.3f "
|
printf("\tthreading: init %6.3f ms, compute %6.3f ms, cleanup %6.3f "
|
||||||
|
@ -214,7 +209,7 @@ lifecycle_runner(const struct ggml_compute_params *params,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test thread lifecycle: start -> suspend -> resume -> stop
|
// Test thread lifecycle: start -> suspend -> resume -> stop
|
||||||
static int test_lifecycle(bool wait_on_done) {
|
static int test_lifecycle() {
|
||||||
struct ggml_tensor node;
|
struct ggml_tensor node;
|
||||||
memset(&node, 0, sizeof(struct ggml_tensor));
|
memset(&node, 0, sizeof(struct ggml_tensor));
|
||||||
|
|
||||||
|
@ -243,9 +238,7 @@ static int test_lifecycle(bool wait_on_done) {
|
||||||
int threads_arr_len = sizeof(threads_arr) / sizeof(threads_arr[0]);
|
int threads_arr_len = sizeof(threads_arr) / sizeof(threads_arr[0]);
|
||||||
int n_threads = 1;
|
int n_threads = 1;
|
||||||
|
|
||||||
enum ggml_threading_features features =
|
enum ggml_threading_features features = GGML_THREADING_FEATURE_NONE;
|
||||||
wait_on_done ? GGML_THREADING_FEATURE_NONE
|
|
||||||
: GGML_THREADING_FEATURE_WAIT_ON_DONE;
|
|
||||||
for (int i = 0; i < threads_arr_len; i++) {
|
for (int i = 0; i < threads_arr_len; i++) {
|
||||||
n_threads = threads_arr[i];
|
n_threads = threads_arr[i];
|
||||||
int start_time = (int)ggml_time_ms();
|
int start_time = (int)ggml_time_ms();
|
||||||
|
@ -351,7 +344,6 @@ int main(void) {
|
||||||
// physical cores:
|
// physical cores:
|
||||||
// - the wait/wakeup time varies much: can be up to tens or hundreds of the
|
// - the wait/wakeup time varies much: can be up to tens or hundreds of the
|
||||||
// average time, thus greatly punishes those small workloads.
|
// average time, thus greatly punishes those small workloads.
|
||||||
// - wait_on_done is general faster than wait_now, can be 10x faster.
|
|
||||||
|
|
||||||
int threads_arr[] = {1, 2, 4, 6, 8, 16, 32, 64};
|
int threads_arr[] = {1, 2, 4, 6, 8, 16, 32, 64};
|
||||||
int threads_arr_len = sizeof(threads_arr) / sizeof(threads_arr[0]);
|
int threads_arr_len = sizeof(threads_arr) / sizeof(threads_arr[0]);
|
||||||
|
@ -408,7 +400,7 @@ int main(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// node.task_profile.dev_flags: byte 0 for wait_on_done, byte 1 for loops.
|
// node.task_profile.dev_flags: byte 0 unused, byte 1 for loops.
|
||||||
|
|
||||||
for (int x = 0; x < workload_arr_len; x++) {
|
for (int x = 0; x < workload_arr_len; x++) {
|
||||||
node.task_profile.dev_flags[1] = workload_arr[x];
|
node.task_profile.dev_flags[1] = workload_arr[x];
|
||||||
|
@ -423,7 +415,7 @@ int main(void) {
|
||||||
"n_threads: %2d ====\n",
|
"n_threads: %2d ====\n",
|
||||||
workload_arr[x], n_threads);
|
workload_arr[x], n_threads);
|
||||||
|
|
||||||
// multi-threads: parallel + wait_now/wait_on_done
|
// multi-threads: parallel + wait
|
||||||
|
|
||||||
if (n_threads == 1) {
|
if (n_threads == 1) {
|
||||||
stages[0].parallel = false;
|
stages[0].parallel = false;
|
||||||
|
@ -489,22 +481,11 @@ int main(void) {
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // disable wait_on_done
|
node.task_profile.dev_flags[0] = 0u; // wait now.
|
||||||
node.task_profile.dev_flags[0] = 0u; // wait now.
|
|
||||||
|
|
||||||
n_tests++;
|
n_tests++;
|
||||||
if (test_driver(n_tests, &node, n_threads) == 0) {
|
if (test_driver(n_tests, &node, n_threads) == 0) {
|
||||||
n_passed++;
|
n_passed++;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // enable wait_on_done
|
|
||||||
node.task_profile.dev_flags[0] = 1u; // wait on done
|
|
||||||
|
|
||||||
n_tests++;
|
|
||||||
if (test_driver(n_tests, &node, n_threads) == 0) {
|
|
||||||
n_passed++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -548,18 +529,12 @@ int main(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// lifecycle.
|
// lifecycle.
|
||||||
for (int i = 0; i < 2; i++) {
|
printf("[test-ggml-threading] test lifecycle ...\n");
|
||||||
bool wait_on_done = (i == 1);
|
++n_tests;
|
||||||
printf("[test-ggml-threading] test lifecycle (wait_on_done = %d) ...\n",
|
|
||||||
wait_on_done);
|
|
||||||
++n_tests;
|
|
||||||
|
|
||||||
if (test_lifecycle(wait_on_done) == 0) {
|
if (test_lifecycle() == 0) {
|
||||||
++n_passed;
|
++n_passed;
|
||||||
printf("[test-ggml-threading] test lifecycle (wait_on_done = %d): "
|
printf("[test-ggml-threading] test lifecycle: ok\n\n");
|
||||||
"ok\n\n",
|
|
||||||
wait_on_done);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("[test-ggml-threading] %d/%d passed.\n", n_passed, n_tests);
|
printf("[test-ggml-threading] %d/%d passed.\n", n_passed, n_tests);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue