diff --git a/ggml.c b/ggml.c index 1e0713f82..5abd6180e 100644 --- a/ggml.c +++ b/ggml.c @@ -1719,6 +1719,59 @@ static inline void __lsx_f16x4_store(ggml_fp16_t *x, __m128 y) { #define GGML_F16_ARR (GGML_F16_STEP/GGML_F16_EPR) #endif +// +// synchronization primitives +// + +struct ggml_once { + atomic_int state; +}; + +struct ggml_barrier { + atomic_uint phase; + atomic_int count; +}; + +void ggml_once(struct ggml_once * once, void init(void)) { + uint32_t old = atomic_load_explicit(&once->state, memory_order_acquire); + if (!old && atomic_compare_exchange_strong_explicit(&once->state, &old, 1, + memory_order_acquire, + memory_order_relaxed)) { + init(); + atomic_store_explicit(&once->state, 2, memory_order_release); + return; + } + while (old == 1) { + old = atomic_load_explicit(&once->state, memory_order_acquire); + } +} + +int ggml_delay(int backoff) { + if (backoff < 12) { + volatile int i; + for (i = 0; i != 1 << backoff; i++) { + } + backoff++; + } else { + sched_yield(); + } + return backoff; +} + +// creates barrier and blocks until all threads call this +void ggml_syncthreads(struct ggml_barrier * b, int nth) { + unsigned phase = atomic_load_explicit(&b->phase, memory_order_relaxed); + if (atomic_fetch_add_explicit(&b->count, 1, memory_order_acq_rel) + 1 == nth) { + atomic_store_explicit(&b->count, 0, memory_order_relaxed); + atomic_store_explicit(&b->phase, phase + 1, memory_order_release); + } else { + int backoff = 0; + while (atomic_load_explicit(&b->phase, memory_order_acquire) == phase) { + backoff = ggml_delay(backoff); + } + } +} + // // fundamental operations // @@ -2783,7 +2836,6 @@ static void ggml_setup_op_has_task_pass(void) { bool * p = GGML_OP_HAS_INIT; p[GGML_OP_ACC ] = true; - p[GGML_OP_MUL_MAT ] = true; p[GGML_OP_MUL_MAT_ID ] = true; p[GGML_OP_OUT_PROD ] = true; p[GGML_OP_SET ] = true; @@ -12321,7 +12373,7 @@ static void ggml_compute_forward_mul_mat( #if defined(GGML_USE_CLBLAST) if (ggml_cl_can_mul_mat(src0, src1, dst)) { - if (params->ith == 0 && params->type == GGML_TASK_TYPE_COMPUTE) { + if (params->ith == 0) { ggml_cl_mul_mat(src0, src1, dst, params->wdata, params->wsize); } return; @@ -12334,31 +12386,25 @@ static void ggml_compute_forward_mul_mat( const size_t desired_wsize = ne13*ne12*ne_plane*sizeof(float); UNUSED(desired_wsize); - if (params->type == GGML_TASK_TYPE_INIT) { - if (type != GGML_TYPE_F32) { - assert(params->wsize >= desired_wsize); - // parallelize by src0 rows - for (int64_t i13 = 0; i13 < ne13; i13++) { - for (int64_t i12 = 0; i12 < ne12; i12++) { - // broadcast src0 into src1 across 2nd,3rd dimension - const int64_t i03 = i13/r3; - const int64_t i02 = i12/r2; + if (type != GGML_TYPE_F32) { + assert(params->wsize >= desired_wsize); + // parallelize by src0 rows + for (int64_t i13 = 0; i13 < ne13; i13++) { + for (int64_t i12 = 0; i12 < ne12; i12++) { + // broadcast src0 into src1 across 2nd,3rd dimension + const int64_t i03 = i13/r3; + const int64_t i02 = i12/r2; - const void * x = (char *) src0->data + i02*nb02 + i03*nb03; - float * const wdata = (float *) params->wdata + i13*ne12*ne_plane + i12*ne_plane; - ggml_to_float_t const to_float = type_traits[type].to_float; + const void * x = (char *) src0->data + i02*nb02 + i03*nb03; + float * const wdata = (float *) params->wdata + i13*ne12*ne_plane + i12*ne_plane; + ggml_to_float_t const to_float = type_traits[type].to_float; - for (int64_t i01 = ith; i01 < ne01; i01 += nth) { - to_float((const char *) x + i01*nb01, wdata + i01*ne00, ne00); - } + for (int64_t i01 = ith; i01 < ne01; i01 += nth) { + to_float((const char *) x + i01*nb01, wdata + i01*ne00, ne00); } } } - return; - } - - if (params->type == GGML_TASK_TYPE_FINALIZE) { - return; + ggml_syncthreads(params->barrier, params->nth); } // perform sgemm, parallelization controlled by blas lib @@ -12417,32 +12463,29 @@ static void ggml_compute_forward_mul_mat( UseGgmlGemm1:; #endif - if (params->type == GGML_TASK_TYPE_INIT) { - if (ith != 0) { - return; - } - if (src1->type != vec_dot_type) { - char * wdata = params->wdata; - const size_t row_size = ggml_row_size(vec_dot_type, ne10); + if (src1->type != vec_dot_type) { + char * wdata = params->wdata; + const size_t row_size = ggml_row_size(vec_dot_type, ne10); - assert(params->wsize >= ne11*ne12*ne13*row_size); - GGML_ASSERT(src1->type == GGML_TYPE_F32); + assert(params->wsize >= ne11*ne12*ne13*row_size); + GGML_ASSERT(src1->type == GGML_TYPE_F32); - for (int64_t i13 = 0; i13 < ne13; ++i13) { - for (int64_t i12 = 0; i12 < ne12; ++i12) { - for (int64_t i11 = 0; i11 < ne11; ++i11) { + int chore = 0; + for (int64_t i13 = 0; i13 < ne13; ++i13) { + for (int64_t i12 = 0; i12 < ne12; ++i12) { + for (int64_t i11 = 0; i11 < ne11; ++i11) { + if (chore == ith) { from_float_to_vec_dot((float *)((char *) src1->data + i13*nb13 + i12*nb12 + i11*nb11), (void *) wdata, ne10); - wdata += row_size; } + if (++chore == nth) { + chore = 0; + } + wdata += row_size; } } } - return; - } - - if (params->type == GGML_TASK_TYPE_FINALIZE) { - return; + ggml_syncthreads(params->barrier, params->nth); } const void * wdata = (src1->type == vec_dot_type) ? src1->data : params->wdata; @@ -19557,6 +19600,7 @@ struct ggml_compute_state_shared { atomic_int n_active; // num active threads atomic_int node_n; // active graph node atomic_int node_task; // active graph node task phase + struct ggml_barrier barrier; ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void * abort_callback_data; @@ -19882,11 +19926,12 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { // all other threads are finished and spinning // do finalize and init here so we don't have synchronize again struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_FINALIZE, - /*.ith =*/ 0, - /*.nth =*/ 0, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, + /*.type =*/ GGML_TASK_TYPE_FINALIZE, + /*.ith =*/ 0, + /*.nth =*/ 0, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + /*.barrier =*/ &state->shared->barrier, }; if (node_n != -1) { @@ -19954,11 +19999,12 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_INIT, - /*.ith =*/ state->ith, - /*.nth =*/ n_tasks, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, + /*.type =*/ GGML_TASK_TYPE_INIT, + /*.ith =*/ state->ith, + /*.nth =*/ n_tasks, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + /*.barrier =*/ &state->shared->barrier, }; if (state->ith < n_tasks) { @@ -20233,6 +20279,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.n_active =*/ n_threads, /*.node_n =*/ -1, /*.node_task =*/ GGML_TASK_TYPE_FINALIZE, + /*.barrier =*/ {0, 0}, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, }; diff --git a/ggml.h b/ggml.h index 35ac9110c..c9e6dc738 100644 --- a/ggml.h +++ b/ggml.h @@ -680,6 +680,12 @@ extern "C" { GGML_TASK_TYPE_FINALIZE, }; + struct ggml_once; + struct ggml_barrier; + int ggml_delay(int backoff); + void ggml_syncthreads(struct ggml_barrier * b, int nth); + void ggml_once(struct ggml_once * once, void init(void)); + struct ggml_compute_params { enum ggml_task_type type; @@ -689,6 +695,8 @@ extern "C" { // work buffer for all threads size_t wsize; void * wdata; + + struct ggml_barrier *barrier; }; // numa strategies