threadpool: improve abort handling

Do not use threadpool->ec (exit code) to decide whether to exit the compute loop.
threadpool->ec is not atomic which makes thread-sanitizer rightfully unhappy about it.

Instead introduce atomic threadpool->abort flag used for this. This is consistent with
how we handle threadpool->stop or pause.

While at it add an explicit atomic_load for n_threads_cur for consistency.
This commit is contained in:
Max Krasnyansky 2024-09-16 15:25:20 -07:00
parent b9763b3301
commit a8095187d8

View file

@ -2013,6 +2013,7 @@ struct ggml_threadpool {
// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
atomic_bool abort; // Used for aborting processing of a graph
struct ggml_compute_state * workers; // per thread state
int n_threads_max; // number of threads in the pool
@ -19928,34 +19929,33 @@ struct ggml_cplan ggml_graph_plan(
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_threadpool * tp = state->threadpool;
const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
const struct ggml_cplan * cplan = state->threadpool->cplan;
const struct ggml_cgraph * cgraph = tp->cgraph;
const struct ggml_cplan * cplan = tp->cplan;
set_numa_thread_affinity(state->ith);
struct ggml_compute_params params = {
/*.ith =*/ state->ith,
/*.nth =*/ state->threadpool->n_threads_cur,
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.threadpool=*/ state->threadpool,
/*.threadpool=*/ tp,
};
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];
ggml_compute_forward(&params, node);
if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->threadpool->ec = GGML_STATUS_ABORTED;
if (state->ith == 0 && cplan->abort_callback &&
cplan->abort_callback(cplan->abort_callback_data)) {
tp->abort = true;
tp->ec = GGML_STATUS_ABORTED;
}
ggml_barrier(state->threadpool);
if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
break;
}
}
return 0;
@ -20144,6 +20144,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
threadpool->current_chunk = 0;
threadpool->stop = false;
threadpool->pause = tpp->paused;
threadpool->abort = false;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = tpp->n_threads;
@ -20220,6 +20221,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->current_chunk = 0;
threadpool->abort = false;
threadpool->ec = GGML_STATUS_SUCCESS;
}