threadpool: skip polling for unused threads
Currently all threads do N polling rounds even if only 1 thread is active (n_threads_cur == 1). This commit adds a check to skip the polling for unused threads (ith >= n_threads_cur). n_threads_cur is now an atomic_int to explicitly tell thread sanitizer that it is written from one thread and read from other threads (not a race conditions).
This commit is contained in:
parent
23e0d70bac
commit
2bd9f47800
1 changed files with 34 additions and 17 deletions
|
@ -2016,7 +2016,7 @@ struct ggml_threadpool {
|
||||||
|
|
||||||
struct ggml_compute_state * workers; // per thread state
|
struct ggml_compute_state * workers; // per thread state
|
||||||
int n_threads_max; // number of threads in the pool
|
int n_threads_max; // number of threads in the pool
|
||||||
int n_threads_cur; // number of threads used in the current graph
|
atomic_int n_threads_cur; // number of threads used in the current graph
|
||||||
|
|
||||||
int32_t prio; // Scheduling priority
|
int32_t prio; // Scheduling priority
|
||||||
uint32_t poll; // Polling level (0 - no polling)
|
uint32_t poll; // Polling level (0 - no polling)
|
||||||
|
@ -3180,7 +3180,8 @@ inline static void ggml_critical_section_start(void) {
|
||||||
|
|
||||||
#ifdef GGML_USE_OPENMP
|
#ifdef GGML_USE_OPENMP
|
||||||
static void ggml_barrier(struct ggml_threadpool * threadpool) {
|
static void ggml_barrier(struct ggml_threadpool * threadpool) {
|
||||||
if (threadpool->n_threads_cur == 1) {
|
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
|
||||||
|
if (n_threads == 1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3188,14 +3189,14 @@ static void ggml_barrier(struct ggml_threadpool * threadpool) {
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
static void ggml_barrier(struct ggml_threadpool * threadpool) {
|
static void ggml_barrier(struct ggml_threadpool * threadpool) {
|
||||||
if (threadpool->n_threads_cur == 1) {
|
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
|
||||||
|
if (n_threads == 1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_int * n_barrier = &threadpool->n_barrier;
|
atomic_int * n_barrier = &threadpool->n_barrier;
|
||||||
atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
|
atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
|
||||||
|
|
||||||
int n_threads = threadpool->n_threads_cur;
|
|
||||||
int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
|
int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
|
||||||
|
|
||||||
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
|
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
|
||||||
|
@ -19968,7 +19969,13 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
|
||||||
|
|
||||||
#ifndef GGML_USE_OPENMP
|
#ifndef GGML_USE_OPENMP
|
||||||
|
|
||||||
static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
|
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
|
||||||
|
struct ggml_threadpool * threadpool = state->threadpool;
|
||||||
|
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
|
||||||
|
return (state->ith < n_threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
|
||||||
struct ggml_threadpool * threadpool = state->threadpool;
|
struct ggml_threadpool * threadpool = state->threadpool;
|
||||||
|
|
||||||
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
|
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
|
||||||
|
@ -19976,7 +19983,7 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
|
||||||
// check for new graph/work
|
// check for new graph/work
|
||||||
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
|
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
|
||||||
if (new_graph != state->last_graph) {
|
if (new_graph != state->last_graph) {
|
||||||
state->pending = (state->ith < threadpool->n_threads_cur);
|
state->pending = ggml_graph_compute_thread_active(state);
|
||||||
state->last_graph = new_graph;
|
state->last_graph = new_graph;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19986,11 +19993,16 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
|
||||||
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
|
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
|
||||||
struct ggml_threadpool * threadpool = state->threadpool;
|
struct ggml_threadpool * threadpool = state->threadpool;
|
||||||
|
|
||||||
|
// Skip polling for unused threads
|
||||||
|
if (!ggml_graph_compute_thread_active(state)) {
|
||||||
|
return state->pending;
|
||||||
|
}
|
||||||
|
|
||||||
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
|
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
|
||||||
// Perhaps, we can adjust it dynamically based on load and things.
|
// Perhaps, we can adjust it dynamically based on load and things.
|
||||||
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
|
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
|
||||||
|
|
||||||
for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
|
for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
|
||||||
// No new work. Keep polling.
|
// No new work. Keep polling.
|
||||||
ggml_thread_cpu_relax();
|
ggml_thread_cpu_relax();
|
||||||
}
|
}
|
||||||
|
@ -20006,9 +20018,9 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
|
||||||
}
|
}
|
||||||
|
|
||||||
ggml_mutex_lock_shared(&threadpool->mutex);
|
ggml_mutex_lock_shared(&threadpool->mutex);
|
||||||
while (!ggml_graph_compute_ready(state)) {
|
while (!ggml_graph_compute_thread_ready(state)) {
|
||||||
// No new work. Wait for the signal.
|
// No new work. Wait for the signal.
|
||||||
GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
|
GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
|
||||||
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
|
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
|
||||||
}
|
}
|
||||||
ggml_mutex_unlock_shared(&threadpool->mutex);
|
ggml_mutex_unlock_shared(&threadpool->mutex);
|
||||||
|
@ -20055,12 +20067,17 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start processing new graph
|
// Start processing new graph
|
||||||
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
|
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
|
||||||
{
|
{
|
||||||
// always take the mutex here because the worker threads are doing hybrid poll/wait
|
// always take the mutex here because the worker threads are doing hybrid poll/wait
|
||||||
|
|
||||||
ggml_mutex_lock(&threadpool->mutex);
|
ggml_mutex_lock(&threadpool->mutex);
|
||||||
|
|
||||||
|
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
|
||||||
|
|
||||||
|
// Update the number of active threads
|
||||||
|
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
|
||||||
|
|
||||||
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
|
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
|
||||||
|
|
||||||
if (threadpool->pause) {
|
if (threadpool->pause) {
|
||||||
|
@ -20195,15 +20212,10 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
|
||||||
// No worker threads should be accessing the parameters below at this stage
|
// No worker threads should be accessing the parameters below at this stage
|
||||||
threadpool->cgraph = cgraph;
|
threadpool->cgraph = cgraph;
|
||||||
threadpool->cplan = cplan;
|
threadpool->cplan = cplan;
|
||||||
threadpool->n_threads_cur = n_threads;
|
|
||||||
threadpool->current_chunk = 0;
|
threadpool->current_chunk = 0;
|
||||||
threadpool->ec = GGML_STATUS_SUCCESS;
|
threadpool->ec = GGML_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n_threads > threadpool->n_threads_max) {
|
|
||||||
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef GGML_USE_OPENMP
|
#ifdef GGML_USE_OPENMP
|
||||||
if (n_threads > 1) {
|
if (n_threads > 1) {
|
||||||
#pragma omp parallel num_threads(n_threads)
|
#pragma omp parallel num_threads(n_threads)
|
||||||
|
@ -20212,7 +20224,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
|
||||||
{
|
{
|
||||||
// update the number of threads from the actual number of threads that we got from OpenMP
|
// update the number of threads from the actual number of threads that we got from OpenMP
|
||||||
n_threads = omp_get_num_threads();
|
n_threads = omp_get_num_threads();
|
||||||
threadpool->n_threads_cur = n_threads;
|
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
|
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
|
||||||
|
@ -20221,8 +20233,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
|
||||||
ggml_graph_compute_thread(&threadpool->workers[0]);
|
ggml_graph_compute_thread(&threadpool->workers[0]);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
if (n_threads > threadpool->n_threads_max) {
|
||||||
|
GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
|
||||||
|
n_threads = threadpool->n_threads_max;
|
||||||
|
}
|
||||||
|
|
||||||
// Kick all threads to start the new graph
|
// Kick all threads to start the new graph
|
||||||
ggml_graph_compute_kickoff(threadpool);
|
ggml_graph_compute_kickoff(threadpool, n_threads);
|
||||||
|
|
||||||
// This is a work thread too
|
// This is a work thread too
|
||||||
ggml_graph_compute_thread(&threadpool->workers[0]);
|
ggml_graph_compute_thread(&threadpool->workers[0]);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue