From 6b515403c80d5ccd79193dd0f9216409dad8e986 Mon Sep 17 00:00:00 2001 From: mqy Date: Mon, 17 Apr 2023 14:28:39 +0800 Subject: [PATCH] threading: preemptive, local/global --- ggml.c | 458 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 248 insertions(+), 210 deletions(-) diff --git a/ggml.c b/ggml.c index 69974989c..523d4019a 100644 --- a/ggml.c +++ b/ggml.c @@ -3181,9 +3181,9 @@ struct ggml_context_container { // enum ggml_task_type { - GGML_TASK_INIT = 0, - GGML_TASK_COMPUTE, - GGML_TASK_FINALIZE, + GGML_TASK_INIT = 1, + GGML_TASK_COMPUTE = 2, + GGML_TASK_FINALIZE = 4, }; struct ggml_compute_params { @@ -9241,6 +9241,9 @@ static void ggml_compute_forward_map_binary( ///////////////////////////////// + + + static void ggml_compute_forward(struct ggml_compute_params * params, struct ggml_tensor * tensor) { GGML_ASSERT(params); @@ -9867,112 +9870,169 @@ typedef pthread_t ggml_thread_t; #endif -struct ggml_compute_state_shared { - ggml_lock_t spin; +#define GGML_TASK_SPIN_PAUSE +#define GGML_GLOBAL_THREADS +#define GGML_MAX_THREADS 32 - int n_threads; +// Spin lock causes a small performance penalty. Spin pause eases certain competition. +// On Itel macOS, with 10 threads, observed that spin lock/pause almost competes master. +static inline void ggml_spin_pause(void) { +#ifdef GGML_TASK_SPIN_PAUSE +#if defined(__x86_64__) +#include + _mm_pause(); +#elif defined(__aarch64__) + __asm__ __volatile__ ("wfe"); +#endif +#endif +} - // synchronization primitives - atomic_int n_ready; - atomic_bool has_work; - atomic_bool stop; // stop all threads -}; +static inline void ggml_compute_spin_lock(volatile atomic_flag * obj) { + while (atomic_flag_test_and_set(obj)) { + ggml_spin_pause(); + } +} + +static inline void ggml_compute_spin_unlock(volatile atomic_flag * obj) { + atomic_flag_clear(obj); +} struct ggml_compute_state { - ggml_thread_t thrd; - struct ggml_compute_params params; struct ggml_tensor * node; - - struct ggml_compute_state_shared * shared; }; +struct ggml_compute_state_shared { + // spin lock. + atomic_flag spin; + + // the position of next task to take. + atomic_int next; + + // number of valid tasks in the tasks. -1 to stop all threads. + atomic_int n_task; + + // task done counter. + atomic_int n_done; + +#ifdef GGML_GLOBAL_THREADS + // main thread issues cond wait command. + atomic_bool wait_cmd; + pthread_mutex_t mutex; + pthread_cond_t cond; +#endif + + // fix-sized task array. + struct ggml_compute_state tasks[GGML_MAX_THREADS]; + + // thread ids. + pthread_t thread_ids[GGML_MAX_THREADS]; +}; + +#ifdef GGML_GLOBAL_THREADS +static struct ggml_compute_state_shared * state_shared = NULL; +#endif + static thread_ret_t ggml_graph_compute_thread(void * data) { - struct ggml_compute_state * state = (struct ggml_compute_state *) data; - - const int n_threads = state->shared->n_threads; - + struct ggml_compute_state_shared * shared = (struct ggml_compute_state_shared *) data; + struct ggml_compute_state * task = NULL; + while (true) { - if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) { - atomic_store(&state->shared->has_work, false); - } else { - while (atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); - } - } - - atomic_fetch_sub(&state->shared->n_ready, 1); - - // wait for work - while (!atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); - } - - // check if we should stop - if (atomic_load(&state->shared->stop)) { + if (shared->n_task < 0) { break; } - if (state->node) { - if (state->params.ith < state->params.nth) { - ggml_compute_forward(&state->params, state->node); +#ifdef GGML_GLOBAL_THREADS + if (shared->wait_cmd) { + pthread_mutex_lock(&shared->mutex); + if (shared->wait_cmd) { + pthread_cond_wait(&shared->cond, &shared->mutex); } - - state->node = NULL; - } else { - break; + pthread_mutex_unlock(&shared->mutex); } +#endif + + ggml_compute_spin_lock(&shared->spin); + if (shared->next < shared->n_task) { + task = &shared->tasks[shared->next]; + shared->next++; + } + ggml_compute_spin_unlock(&shared->spin); + + if (task != NULL) { + ggml_compute_forward(&task->params, task->node); + shared->n_done++; + task = NULL; + } + + ggml_spin_pause(); } - return 0; } -void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) { - const int n_threads = cgraph->n_threads; - - struct ggml_compute_state_shared state_shared = { - /*.spin =*/ GGML_LOCK_INITIALIZER, - /*.n_threads =*/ n_threads, - /*.n_ready =*/ 0, - /*.has_work =*/ false, - /*.stop =*/ false, - }; - struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL; - - // create thread pool - if (n_threads > 1) { - ggml_lock_init(&state_shared.spin); - - atomic_store(&state_shared.has_work, true); - - for (int j = 0; j < n_threads - 1; j++) { - workers[j] = (struct ggml_compute_state) { - .thrd = 0, - .params = { - .type = GGML_TASK_COMPUTE, - .ith = j + 1, - .nth = n_threads, - .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, - .wdata = cgraph->work ? cgraph->work->data : NULL, - }, - .node = NULL, - .shared = &state_shared, - }; - - int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); - GGML_ASSERT(rc == 0); - UNUSED(rc); - } +// Get supported task types (bit OR) for given forward op. +// TODO: use static map. +static int ggml_forward_op_tasks(enum ggml_op op) { + switch (op) { + case GGML_OP_DUP: + case GGML_OP_ADD: + case GGML_OP_SUB: + case GGML_OP_MUL: + case GGML_OP_DIV: + case GGML_OP_SQR: + case GGML_OP_SQRT: + case GGML_OP_SUM: + case GGML_OP_MEAN: + case GGML_OP_REPEAT: + case GGML_OP_ABS: + case GGML_OP_SGN: + case GGML_OP_NEG: + case GGML_OP_STEP: + case GGML_OP_RELU: + case GGML_OP_GELU: + case GGML_OP_SILU: + case GGML_OP_NORM: + // case GGML_OP_RMS_NORM: // ?? + case GGML_OP_SCALE: + case GGML_OP_CPY: + case GGML_OP_CONT: + case GGML_OP_ROPE: // ?? + return GGML_TASK_COMPUTE; + case GGML_OP_RESHAPE: + return 0; + case GGML_OP_VIEW: + return 0; + case GGML_OP_PERMUTE: + return 0; + case GGML_OP_TRANSPOSE: + return 0; + case GGML_OP_GET_ROWS: // ?? + return GGML_TASK_COMPUTE; + case GGML_OP_DIAG_MASK_INF: + return GGML_TASK_COMPUTE; + case GGML_OP_SOFT_MAX: + return GGML_TASK_COMPUTE; + case GGML_OP_MUL_MAT: + case GGML_OP_CONV_1D_1S: + case GGML_OP_CONV_1D_2S: + case GGML_OP_FLASH_ATTN: + case GGML_OP_FLASH_FF: + return GGML_TASK_COMPUTE | GGML_TASK_INIT | GGML_TASK_FINALIZE; + case GGML_OP_NONE: + return 0; + case GGML_OP_COUNT: + return 0; + default: + break; } + return GGML_TASK_COMPUTE | GGML_TASK_INIT | GGML_TASK_FINALIZE; +} + +void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) { + int n_threads = cgraph->n_threads; + GGML_ASSERT(n_threads <= GGML_MAX_THREADS); + // initialize tasks + work buffer { size_t work_size = 0; @@ -10184,143 +10244,116 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) } } +#ifndef GGML_GLOBAL_THREADS + struct ggml_compute_state_shared * state_shared = NULL; +#endif + + if (n_threads > 1 && state_shared == NULL) { + int64_t len = sizeof(struct ggml_compute_state_shared); + state_shared = malloc(len); + memset(state_shared, 0, len); +#ifdef GGML_GLOBAL_THREADS + pthread_mutex_init(&state_shared->mutex, NULL); + pthread_cond_init(&state_shared->cond, NULL); + state_shared->wait_cmd = true; +#endif + for (int j = 0; j < n_threads - 1; j++) { + int rc = ggml_thread_create(&state_shared->thread_ids[j], NULL, ggml_graph_compute_thread, state_shared); + GGML_ASSERT(rc == 0); + } + } + + +#ifdef GGML_GLOBAL_THREADS + // wakeup threads. + pthread_mutex_lock(&state_shared->mutex); + state_shared->wait_cmd = false; + pthread_cond_broadcast(&state_shared->cond); + pthread_mutex_unlock(&state_shared->mutex); +#endif + +#ifdef GGML_PERF const int64_t perf_start_cycles = ggml_perf_cycles(); const int64_t perf_start_time_us = ggml_perf_time_us(); +#endif for (int i = 0; i < cgraph->n_nodes; i++) { GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes); struct ggml_tensor * node = cgraph->nodes[i]; + int op_task_types = ggml_forward_op_tasks(node->op); + if (op_task_types == 0) { + continue; + } + // TODO: this could be used to avoid unnecessary computations, but it needs to be improved //if (node->grad == NULL && node->perf_runs > 0) { // continue; //} +#ifdef GGML_PERF const int64_t perf_node_start_cycles = ggml_perf_cycles(); const int64_t perf_node_start_time_us = ggml_perf_time_us(); +#endif + + enum ggml_task_type type = GGML_TASK_INIT; + struct ggml_compute_params params; // INIT - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_INIT, - /*.ith =*/ 0, - /*.nth =*/ node->n_tasks, - /*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0, - /*.wdata =*/ cgraph->work ? cgraph->work->data : NULL, - }; - - ggml_compute_forward(¶ms, node); - - // COMPUTE - if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } - - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - // launch thread pool - for (int j = 0; j < n_threads - 1; j++) { - workers[j].params = (struct ggml_compute_params) { - .type = GGML_TASK_COMPUTE, - .ith = j + 1, - .nth = node->n_tasks, - .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, - .wdata = cgraph->work ? cgraph->work->data : NULL, - }; - workers[j].node = node; - } - - atomic_fetch_sub(&state_shared.n_ready, 1); - - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - atomic_store(&state_shared.has_work, true); + if (op_task_types & type) { + params = (struct ggml_compute_params){ + .type = type, + .ith = 0, + .nth = node->n_tasks, + .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, + .wdata = cgraph->work ? cgraph->work->data : NULL, + }; + ggml_compute_forward(¶ms, node); } - params.type = GGML_TASK_COMPUTE; - ggml_compute_forward(¶ms, node); + int n = node->n_tasks - 1; + // COMPUTE and FINALIZE. + for (int k = 0; k < 2; k++) { + type = k == 0? GGML_TASK_COMPUTE : GGML_TASK_FINALIZE; + if (op_task_types & type) { + if (n > 0) { + //ggml_compute_spin_lock(&state_shared->spin); + for (int j = 0; j < n; j++) { + state_shared->tasks[j] = (struct ggml_compute_state) { + .params = { + .type = type, + .ith = j + 1, + .nth = node->n_tasks, + .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, + .wdata = cgraph->work ? cgraph->work->data : NULL, + }, + .node = node, + }; + } + ggml_compute_spin_lock(&state_shared->spin); + state_shared->next = 0; + //state_shared->n_done = 0; + state_shared->n_task = n; + ggml_compute_spin_unlock(&state_shared->spin); + } - // wait for thread pool - if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } - - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - atomic_fetch_sub(&state_shared.n_ready, 1); - - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - } - - // FINALIZE - if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } - - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - // launch thread pool - for (int j = 0; j < n_threads - 1; j++) { - workers[j].params = (struct ggml_compute_params) { - .type = GGML_TASK_FINALIZE, - .ith = j + 1, - .nth = node->n_tasks, - .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, - .wdata = cgraph->work ? cgraph->work->data : NULL, - }; - workers[j].node = node; - } - - atomic_fetch_sub(&state_shared.n_ready, 1); - - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - atomic_store(&state_shared.has_work, true); - } - - params.type = GGML_TASK_FINALIZE; - ggml_compute_forward(¶ms, node); - - // wait for thread pool - if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); - } - - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } - - atomic_fetch_sub(&state_shared.n_ready, 1); - - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + params.type = type; + params.wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0; + params.wdata = cgraph->work ? cgraph->work->data : NULL; + ggml_compute_forward(¶ms, node); + + // wait for tasks done. + if (n > 0) { + while (state_shared->n_done != n) { + ggml_spin_pause(); + } + state_shared->n_done = 0; + } } } +#ifdef GGML_PERF // performance stats (node) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles; @@ -10330,22 +10363,26 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) node->perf_cycles += perf_cycles_cur; node->perf_time_us += perf_time_us_cur; } +#endif } - // join thread pool if (n_threads > 1) { - atomic_store(&state_shared.stop, true); - atomic_store(&state_shared.has_work, true); - +#ifdef GGML_GLOBAL_THREADS + // put threads to wait. + pthread_mutex_lock(&state_shared->mutex); + state_shared->wait_cmd = true; + pthread_mutex_unlock(&state_shared->mutex); +#else + // join thread pool + state_shared->n_task = -1; for (int j = 0; j < n_threads - 1; j++) { - int rc = ggml_thread_join(workers[j].thrd, NULL); + int rc = ggml_thread_join(state_shared->thread_ids[j], NULL); GGML_ASSERT(rc == 0); - UNUSED(rc); } - - ggml_lock_destroy(&state_shared.spin); +#endif } +#ifdef GGML_PERF // performance stats (graph) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_start_cycles; @@ -10362,6 +10399,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) (double) perf_time_us_cur / 1000.0, (double) cgraph->perf_time_us / 1000.0 / cgraph->perf_runs); } +#endif } void ggml_graph_reset(struct ggml_cgraph * cgraph) {