diff --git a/ggml.c b/ggml.c index cbf2d4bdd..f02d9bfee 100644 --- a/ggml.c +++ b/ggml.c @@ -7764,6 +7764,9 @@ static void ggml_compute_forward_acc_f32( bool inplace = (bool) ((int32_t *) dst->op_params)[4]; if (!inplace && (params->type == GGML_TASK_INIT)) { + if (params->ith != 0) { + return; + } // memcpy needs to be synchronized across threads to avoid race conditions. // => do it in INIT phase memcpy( @@ -9876,6 +9879,9 @@ static void ggml_compute_forward_mul_mat( #endif if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } if (src1->type != vec_dot_type) { char * wdata = params->wdata; const size_t row_size = ggml_row_size(vec_dot_type, ne10); @@ -10040,6 +10046,9 @@ static void ggml_compute_forward_mul_mat_id( #define MMID_MATRIX_ROW(row_id, i1) matrix_rows[(row_id)*ne11 + (i1)] if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } char * wdata = params->wdata; if (src1->type != vec_dot_type) { const size_t row_size = ggml_row_size(vec_dot_type, ne10); @@ -10225,6 +10234,9 @@ static void ggml_compute_forward_out_prod_f32( return; } #endif + if (ith != 0) { + return; + } ggml_vec_set_f32(ne0*ne1*ne2*ne3, dst->data, 0); return; } @@ -10408,6 +10420,9 @@ static void ggml_compute_forward_out_prod_q_f32( // TODO: #if defined(GGML_USE_ACCELERATE) || defined(GGML_USE_OPENBLAS) || defined(GGML_USE_CLBLAST) if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } ggml_vec_set_f32(ne0*ne1*ne2*ne3, dst->data, 0); return; } @@ -10592,6 +10607,9 @@ static void ggml_compute_forward_set_f32( bool inplace = (bool) ((int32_t *) dst->op_params)[4]; if (!inplace && (params->type == GGML_TASK_INIT)) { + if (params->ith != 0) { + return; + } // memcpy needs to be synchronized across threads to avoid race conditions. // => do it in INIT phase memcpy( @@ -10916,6 +10934,9 @@ static void ggml_compute_forward_get_rows_back_f32_f16( // ggml_compute_forward_dup_same_cont(params, opt0, dst); if (params->type == GGML_TASK_INIT) { + if (params->ith != 0) { + return; + } memset(dst->data, 0, ggml_nbytes(dst)); } @@ -10950,6 +10971,9 @@ static void ggml_compute_forward_get_rows_back_f32( // ggml_compute_forward_dup_same_cont(params, opt0, dst); if (params->type == GGML_TASK_INIT) { + if (params->ith != 0) { + return; + } memset(dst->data, 0, ggml_nbytes(dst)); } @@ -11087,6 +11111,9 @@ static void ggml_compute_forward_diag_mask_f32( GGML_ASSERT(n_past >= 0); if (!inplace && (params->type == GGML_TASK_INIT)) { + if (ith != 0) { + return; + } // memcpy needs to be synchronized across threads to avoid race conditions. // => do it in INIT phase GGML_ASSERT(ggml_nelements(dst) == ggml_nelements(src0)); @@ -12057,6 +12084,9 @@ static void ggml_compute_forward_conv_transpose_1d_f16_f32( GGML_ASSERT(nb10 == sizeof(float)); if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } memset(params->wdata, 0, params->wsize); // permute kernel data (src0) from (K x Cout x Cin) to (Cin x K x Cout) @@ -12151,6 +12181,9 @@ static void ggml_compute_forward_conv_transpose_1d_f32( GGML_ASSERT(nb10 == sizeof(float)); if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } memset(params->wdata, 0, params->wsize); // prepare kernel data (src0) from (K x Cout x Cin) to (Cin x K x Cout) @@ -12374,6 +12407,9 @@ static void ggml_compute_forward_conv_transpose_2d( GGML_ASSERT(nb10 == sizeof(float)); if (params->type == GGML_TASK_INIT) { + if (ith != 0) { + return; + } memset(params->wdata, 0, params->wsize); // permute kernel data (src0) from (Kw x Kh x Cout x Cin) to (Cin x Kw x Kh x Cout) @@ -13980,6 +14016,9 @@ static void ggml_compute_forward_add_rel_pos_f32( const bool inplace = (bool) ((int32_t *) dst->op_params)[0]; if (!inplace && params->type == GGML_TASK_INIT) { + if (params->ith != 0) { + return; + } memcpy((char *) dst->data, (char *) src0->data, ggml_nbytes(dst)); return; } @@ -16273,8 +16312,9 @@ struct ggml_compute_state_shared { const int n_threads; // synchronization primitives - atomic_int n_active; // num active threads - atomic_int node_n; // active graph node + atomic_int n_active; // num active threads + atomic_int node_n; // active graph node + atomic_int node_task; // active graph node task phase bool (*abort_callback)(void * data); // abort ggml_graph_compute when true void * abort_callback_data; @@ -16520,6 +16560,34 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { return n_tasks; } +static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) { + // wait for other threads to finish + const int last_node_n = * node_n; + + while (true) { + if (do_yield) { + sched_yield(); + } + + * node_n = atomic_load(&state->shared->node_n); + if (* node_n != last_node_n) break; + } +} + +static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) { + // wait for other threads to finish + const int last_task_phase = * task_phase; + + while (true) { + if (do_yield) { + sched_yield(); + } + + * task_phase = atomic_load(&state->shared->node_task); + if (* task_phase != last_task_phase) break; + } +} + static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; @@ -16530,7 +16598,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { set_numa_thread_affinity(state->ith, n_threads); - int node_n = -1; + int node_n = -1; + int task_phase = GGML_TASK_FINALIZE; while (true) { if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { @@ -16571,13 +16640,13 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { params.nth = n_tasks; - /* INIT */ - if (GGML_OP_HAS_INIT[node->op]) { - params.type = GGML_TASK_INIT; - ggml_compute_forward(¶ms, node); - } - if (n_tasks == 1) { + /* INIT */ + if (GGML_OP_HAS_INIT[node->op]) { + params.type = GGML_TASK_INIT; + ggml_compute_forward(¶ms, node); + } + // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1, // they do something more efficient than spinning (?) params.type = GGML_TASK_COMPUTE; @@ -16598,38 +16667,24 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { } } - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_n, node_n); + task_phase = GGML_TASK_INIT; + atomic_store(&state->shared->n_active, n_threads); + atomic_store(&state->shared->node_n, node_n); + atomic_store(&state->shared->node_task, task_phase); } else { - // wait for other threads to finish - const int last = node_n; - - const bool do_yield = last < 0 || cgraph->nodes[last]->op == GGML_OP_MUL_MAT; - - while (true) { - // TODO: this sched_yield can have significant impact on the performance - either positive or negative - // depending on the workload and the operating system. - // since it is not clear what is the best approach, it should potentially become user-configurable - // ref: https://github.com/ggerganov/ggml/issues/291 - // UPD: adding the do_yield flag seems to resolve the issue universally - if (do_yield) { - sched_yield(); - } - - node_n = atomic_load(&state->shared->node_n); - if (node_n != last) break; - }; + ggml_graph_compute_thread_sync_node(&node_n, state, false); + ggml_graph_compute_thread_sync_task(&task_phase, state, false); } // check if we should stop if (node_n >= cgraph->n_nodes) break; - /* COMPUTE */ + /* INIT & COMPUTE */ struct ggml_tensor * node = cgraph->nodes[node_n]; const int n_tasks = ggml_get_n_tasks(node, n_threads); struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_COMPUTE, + /*.type =*/ GGML_TASK_INIT, /*.ith =*/ state->ith, /*.nth =*/ n_tasks, /*.wsize =*/ cplan->work_size, @@ -16637,8 +16692,39 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { }; if (state->ith < n_tasks) { + if (GGML_OP_HAS_INIT[node->op]) { + ggml_compute_forward(¶ms, node); + } + } + + if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { + task_phase = GGML_TASK_COMPUTE; + atomic_store(&state->shared->n_active, n_threads); + atomic_store(&state->shared->node_task, task_phase); + } + else { + // TODO: this sched_yield can have significant impact on the performance - either positive or negative + // depending on the workload and the operating system. + // since it is not clear what is the best approach, it should potentially become user-configurable + // ref: https://github.com/ggerganov/ggml/issues/291 + // UPD: adding the do_yield flag seems to resolve the issue universally + const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT; + ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield); + } + + if (state->ith < n_tasks) { + params.type = GGML_TASK_COMPUTE; ggml_compute_forward(¶ms, node); } + + if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { + task_phase = GGML_TASK_FINALIZE; + atomic_store(&state->shared->n_active, n_threads); + atomic_store(&state->shared->node_task, task_phase); + } + else { + ggml_graph_compute_thread_sync_task(&task_phase, state, false); + } } return GGML_EXIT_SUCCESS; @@ -16850,6 +16936,7 @@ int ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { /*.n_threads =*/ n_threads, /*.n_active =*/ n_threads, /*.node_n =*/ -1, + /*.node_task =*/ GGML_TASK_FINALIZE, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, };