threadpool: remove special-casing for disposable threadpools

With the efficient hybrid polling there is no need to make disposable pools any different.
This simplifies the overall logic and reduces branching.

Include n_threads in debug print for disposable threadpool.

Declare pause and stop flags as atomic_bool
This doesn't actually generate any memory barriers and simply informs
the thread sanitizer that these flags can be written & read by different
threads without locking.
This commit is contained in:
Max Krasnyansky 2024-08-12 22:18:16 -07:00 committed by fmz
parent 9d3e78c6b8
commit 538bd9f730

View file

@ -1964,16 +1964,16 @@ struct ggml_compute_threadpool {
atomic_int n_barrier_passed;
atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
volatile bool stop; // Used for stopping the threadpool altogether
volatile bool pause; // Used for pausing the threadpool or individual threads
// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
struct ggml_compute_state * workers; // per thread state
int32_t n_threads_max; // number of threads in the pool
int32_t n_threads_cur; // number of threads used in the current graph
int32_t prio; // Scheduling priority
bool disposable; // Doesn't initialize a conv-var
uint32_t poll; // Polling level (0 - no polling)
int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)
ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void * abort_callback_data;
@ -18860,15 +18860,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
struct ggml_compute_state* workers = threadpool->workers;
const int32_t n_threads = threadpool->n_threads_max;
if (!threadpool->disposable) {
ggml_mutex_lock(&threadpool->mutex);
}
ggml_mutex_lock(&threadpool->mutex);
threadpool->stop = true;
threadpool->pause = false;
if (!threadpool->disposable) {
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
}
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
for (int32_t j = 1; j < n_threads; j++) {
int32_t rc = ggml_thread_join(workers[j].thrd, NULL);
@ -18878,10 +18876,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
GGML_ALIGNED_FREE(workers);
if (!threadpool->disposable) {
ggml_mutex_destroy(&threadpool->mutex);
ggml_cond_destroy(&threadpool->cond);
}
ggml_mutex_destroy(&threadpool->mutex);
ggml_cond_destroy(&threadpool->cond);
#endif // GGML_USE_OPENMP
GGML_ALIGNED_FREE(threadpool);
@ -18904,7 +18900,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool
void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
GGML_ASSERT(!threadpool->disposable);
ggml_mutex_lock(&threadpool->mutex);
if (!threadpool->pause) {
__ggml_pause_threadpool(threadpool);
@ -18917,7 +18912,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
GGML_ASSERT(!threadpool->disposable);
ggml_mutex_lock(&threadpool->mutex);
if (threadpool->pause) {
__ggml_resume_threadpool(threadpool);
@ -18934,7 +18928,7 @@ struct ggml_cplan ggml_graph_plan(
struct ggml_compute_threadpool * threadpool) {
if (threadpool == NULL) {
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n");
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
}
if (n_threads <= 0) {
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
@ -19143,7 +19137,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
if (threadpool->stop || threadpool->pause || state->pending) { return true; }
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
// check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
@ -19192,8 +19187,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_compute_threadpool * threadpool = state->threadpool;
GGML_ASSERT(!threadpool->disposable);
__thread_priority(threadpool->prio);
if (state->mask_specified)
__thread_affinity(state->cpumask);
@ -19209,6 +19202,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith);
ggml_mutex_unlock_shared(&threadpool->mutex);
}
// This needs to be checked for after the cond_wait
if (threadpool->stop) break;
@ -19233,6 +19227,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
return (thread_ret_t) 0;
}
// Start processing new graph
static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool)
{
// always take the mutex here because the worker threads are doing hybrid poll/wait
ggml_mutex_lock(&threadpool->mutex);
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
if (threadpool->pause) {
// resume does cond broadcast
__ggml_resume_threadpool(threadpool);
} else {
ggml_cond_broadcast(&threadpool->cond);
}
ggml_mutex_unlock(&threadpool->mutex);
}
#endif // GGML_USE_OPENMP
bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) {
@ -19250,7 +19263,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons
static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
struct ggml_threadpool_params * tpp,
bool disposable,
struct ggml_cgraph * cgraph,
struct ggml_cplan * cplan) {
@ -19264,11 +19276,10 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
threadpool->n_barrier_passed = 0;
threadpool->current_chunk = 0;
threadpool->stop = false;
threadpool->pause = disposable ? false : tpp->paused;
threadpool->pause = tpp->paused;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = disposable ? tpp->n_threads : 0;
threadpool->disposable = disposable;
threadpool->n_threads_cur = tpp->n_threads;
threadpool->poll = tpp->poll;
threadpool->prio = tpp->prio;
@ -19278,10 +19289,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
}
#ifndef GGML_USE_OPENMP
if (!disposable) {
ggml_mutex_init(&threadpool->mutex);
ggml_cond_init(&threadpool->cond);
}
ggml_mutex_init(&threadpool->mutex);
ggml_cond_init(&threadpool->cond);
#endif // GGML_USE_OPENMP
struct ggml_compute_state * workers =
@ -19316,14 +19325,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
__cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter);
}
// Disposable threadpools need to have a valid cplan and cgraph immediately.
thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread;
// Spin threads for all secondary workers
if (j > 0) {
int32_t rc = ggml_thread_create(
&workers[j].thrd,
NULL,
thread_entrypoint,
ggml_graph_compute_secondary_thread,
&workers[j]
);
GGML_ASSERT(rc == 0);
@ -19335,7 +19342,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
}
struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) {
return ggml_create_threadpool_impl(tpp, false, NULL, NULL);
return ggml_create_threadpool_impl(tpp, NULL, NULL);
}
enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
@ -19349,35 +19356,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
bool disposable_threadpool = false;
if (threadpool == NULL) {
GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n");
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
disposable_threadpool = true;
struct ggml_threadpool_params ttp = {
.mask_specified = false,
.n_threads = n_threads,
.prio = 0,
.poll = false,
.poll = 1,
.strict_cpu = false,
.paused = false
};
threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan);
threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan);
} else {
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}
// Not a disposable threadpool:
// Reset some of the paramters that need resetting
// Reset some of the parameters that need resetting
// No worker threads should be accessing the parameters below at this stage
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->n_barrier = 0;
threadpool->n_barrier_passed = 0;
threadpool->current_chunk = 0;
threadpool->ec = GGML_STATUS_SUCCESS;
}
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}
#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
@ -19403,26 +19410,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
ggml_graph_compute_thread(&worker);
}
#else
if (!disposable_threadpool) {
// Update main thread affinity to match the current threadpool
if (threadpool->workers[0].mask_specified) {
__thread_affinity(threadpool->workers[0].cpumask);
}
// always take the mutex here because the worker threads are doing hybrid poll/wait
ggml_mutex_lock(&threadpool->mutex);
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
if (threadpool->pause) {
// resume does cond broadcast
__ggml_resume_threadpool(threadpool);
} else {
ggml_cond_broadcast(&threadpool->cond);
}
ggml_mutex_unlock(&threadpool->mutex);
// Update main thread affinity to match the current threadpool
if (threadpool->workers[0].mask_specified) {
__thread_affinity(threadpool->workers[0].cpumask);
}
// this is a work thread too
// Kick all threads to start the new graph
ggml_graph_compute_kickoff(threadpool);
// This is a work thread too
ggml_graph_compute_thread(&threadpool->workers[0]);
#endif