lower synchronization overhead
This commit is contained in:
parent
b71dfe637f
commit
adaad10e97
1 changed files with 111 additions and 230 deletions
339
ggml.c
339
ggml.c
|
@ -3702,12 +3702,6 @@ static const char * GGML_OP_SYMBOL[GGML_OP_COUNT] = {
|
||||||
"f(x,y)",
|
"f(x,y)",
|
||||||
};
|
};
|
||||||
|
|
||||||
// only send finalize op to thread pool if it actually does something
|
|
||||||
// currently none of them?
|
|
||||||
static const bool GGML_OP_HAS_FINALIZE[GGML_OP_COUNT] = {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
|
|
||||||
static_assert(GGML_OP_COUNT == 51, "GGML_OP_COUNT != 51");
|
static_assert(GGML_OP_COUNT == 51, "GGML_OP_COUNT != 51");
|
||||||
|
|
||||||
static_assert(sizeof(struct ggml_object)%GGML_MEM_ALIGN == 0, "ggml_object size must be a multiple of GGML_MEM_ALIGN");
|
static_assert(sizeof(struct ggml_object)%GGML_MEM_ALIGN == 0, "ggml_object size must be a multiple of GGML_MEM_ALIGN");
|
||||||
|
@ -14099,75 +14093,114 @@ void clear_numa_thread_affinity(void)
|
||||||
// TODO: Windows etc.
|
// TODO: Windows etc.
|
||||||
// (the linux implementation may also work on BSD, someone should test)
|
// (the linux implementation may also work on BSD, someone should test)
|
||||||
void set_numa_thread_affinity(int thread_n, int n_threads) { UNUSED(thread_n); UNUSED(n_threads); }
|
void set_numa_thread_affinity(int thread_n, int n_threads) { UNUSED(thread_n); UNUSED(n_threads); }
|
||||||
void clear_numa_thread_affinity() {}
|
void clear_numa_thread_affinity(void) {}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct ggml_compute_state_shared {
|
struct ggml_compute_state_shared {
|
||||||
ggml_lock_t spin;
|
struct ggml_cgraph * cgraph;
|
||||||
|
int64_t perf_node_start_cycles;
|
||||||
|
int64_t perf_node_start_time_us;
|
||||||
int n_threads;
|
int n_threads;
|
||||||
|
|
||||||
// synchronization primitives
|
// synchronization primitives
|
||||||
atomic_int n_ready;
|
atomic_int n_active; // num active threads
|
||||||
atomic_bool has_work;
|
atomic_int node_n; // active graph node
|
||||||
atomic_bool stop; // stop all threads
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ggml_compute_state {
|
struct ggml_compute_state {
|
||||||
ggml_thread_t thrd;
|
ggml_thread_t thrd;
|
||||||
|
int ith;
|
||||||
struct ggml_compute_params params;
|
|
||||||
struct ggml_tensor * node;
|
|
||||||
|
|
||||||
struct ggml_compute_state_shared * shared;
|
struct ggml_compute_state_shared * shared;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const struct ggml_compute_state_shared * st)
|
||||||
|
{
|
||||||
|
int64_t cycles_cur = ggml_perf_cycles() - st->perf_node_start_cycles;
|
||||||
|
int64_t time_us_cur = ggml_perf_time_us() - st->perf_node_start_time_us;
|
||||||
|
|
||||||
|
node->perf_runs++;
|
||||||
|
node->perf_cycles += cycles_cur;
|
||||||
|
node->perf_time_us += time_us_cur;
|
||||||
|
}
|
||||||
|
|
||||||
static thread_ret_t ggml_graph_compute_thread(void * data) {
|
static thread_ret_t ggml_graph_compute_thread(void * data) {
|
||||||
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
|
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
|
||||||
|
struct ggml_cgraph * cgraph = state->shared->cgraph;
|
||||||
const int n_threads = state->shared->n_threads;
|
const int n_threads = state->shared->n_threads;
|
||||||
set_numa_thread_affinity(state->params.ith, n_threads);
|
set_numa_thread_affinity(state->ith, n_threads);
|
||||||
|
|
||||||
|
int node_n = -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) {
|
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
|
||||||
atomic_store(&state->shared->has_work, false);
|
// all other threads are finished and spinning
|
||||||
|
// do finalize and init here so we don't have synchronize again
|
||||||
|
struct ggml_compute_params params = {
|
||||||
|
/*.type =*/ GGML_TASK_FINALIZE,
|
||||||
|
/*.ith =*/ 0,
|
||||||
|
/*.nth =*/ 0,
|
||||||
|
/*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
||||||
|
/*.wdata =*/ cgraph->work ? cgraph->work->data : NULL,
|
||||||
|
};
|
||||||
|
if (node_n != -1) {
|
||||||
|
/* FINALIZE */
|
||||||
|
struct ggml_tensor * node = state->shared->cgraph->nodes[node_n];
|
||||||
|
params.nth = node->n_tasks;
|
||||||
|
ggml_compute_forward(¶ms, node);
|
||||||
|
ggml_graph_compute_perf_stats_node(node, state->shared);
|
||||||
|
}
|
||||||
|
// distribute new work or execute it direct if 1T
|
||||||
|
while (++node_n < cgraph->n_nodes) {
|
||||||
|
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
|
||||||
|
|
||||||
|
struct ggml_tensor * node = cgraph->nodes[node_n];
|
||||||
|
|
||||||
|
state->shared->perf_node_start_cycles = ggml_perf_cycles();
|
||||||
|
state->shared->perf_node_start_time_us = ggml_perf_time_us();
|
||||||
|
|
||||||
|
/* INIT */
|
||||||
|
params.type = GGML_TASK_INIT;
|
||||||
|
params.nth = node->n_tasks;
|
||||||
|
ggml_compute_forward(¶ms, node);
|
||||||
|
|
||||||
|
if (node->n_tasks == 1) {
|
||||||
|
// TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
|
||||||
|
// they do something more efficient than spinning (?)
|
||||||
|
params.type = GGML_TASK_COMPUTE;
|
||||||
|
ggml_compute_forward(¶ms, node);
|
||||||
|
params.type = GGML_TASK_FINALIZE;
|
||||||
|
ggml_compute_forward(¶ms, node);
|
||||||
|
ggml_graph_compute_perf_stats_node(node, state->shared);
|
||||||
} else {
|
} else {
|
||||||
while (atomic_load(&state->shared->has_work)) {
|
break;
|
||||||
if (atomic_load(&state->shared->stop)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
ggml_lock_lock (&state->shared->spin);
|
|
||||||
ggml_lock_unlock(&state->shared->spin);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
atomic_store(&state->shared->n_active, n_threads);
|
||||||
atomic_fetch_sub(&state->shared->n_ready, 1);
|
atomic_store(&state->shared->node_n, node_n);
|
||||||
|
} else {
|
||||||
// wait for work
|
// wait for other threads to finish
|
||||||
while (!atomic_load(&state->shared->has_work)) {
|
const int last = node_n;
|
||||||
if (atomic_load(&state->shared->stop)) {
|
do {
|
||||||
return 0;
|
sched_yield();
|
||||||
|
node_n = atomic_load(&state->shared->node_n);
|
||||||
|
} while (node_n == last);
|
||||||
}
|
}
|
||||||
ggml_lock_lock (&state->shared->spin);
|
|
||||||
ggml_lock_unlock(&state->shared->spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if we should stop
|
// check if we should stop
|
||||||
if (atomic_load(&state->shared->stop)) {
|
if (node_n >= cgraph->n_nodes) break;
|
||||||
break;
|
struct ggml_tensor * node = cgraph->nodes[node_n];
|
||||||
}
|
/* COMPUTE */
|
||||||
|
struct ggml_compute_params params = {
|
||||||
if (state->node) {
|
/*.type =*/ GGML_TASK_COMPUTE,
|
||||||
if (state->params.ith < state->params.nth) {
|
/*.ith =*/ state->ith,
|
||||||
ggml_compute_forward(&state->params, state->node);
|
/*.nth =*/ node->n_tasks,
|
||||||
}
|
/*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
||||||
|
/*.wdata =*/ cgraph->work ? cgraph->work->data : NULL,
|
||||||
state->node = NULL;
|
};
|
||||||
|
if(state->ith < node->n_tasks) {
|
||||||
|
ggml_compute_forward(¶ms, node);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14175,39 +14208,14 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
|
||||||
const int n_threads = cgraph->n_threads;
|
const int n_threads = cgraph->n_threads;
|
||||||
|
|
||||||
struct ggml_compute_state_shared state_shared = {
|
struct ggml_compute_state_shared state_shared = {
|
||||||
/*.spin =*/ GGML_LOCK_INITIALIZER,
|
/*.cgraph =*/ cgraph,
|
||||||
|
/*.perf_node_start_cycles =*/ 0,
|
||||||
|
/*.perf_node_start_time_us =*/ 0,
|
||||||
/*.n_threads =*/ n_threads,
|
/*.n_threads =*/ n_threads,
|
||||||
/*.n_ready =*/ 0,
|
/*.n_active =*/ n_threads,
|
||||||
/*.has_work =*/ false,
|
/*.node_n =*/ -1,
|
||||||
/*.stop =*/ false,
|
|
||||||
};
|
};
|
||||||
struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL;
|
struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);
|
||||||
|
|
||||||
// create thread pool
|
|
||||||
if (n_threads > 1) {
|
|
||||||
ggml_lock_init(&state_shared.spin);
|
|
||||||
|
|
||||||
atomic_store(&state_shared.has_work, true);
|
|
||||||
|
|
||||||
for (int j = 0; j < n_threads - 1; j++) {
|
|
||||||
workers[j] = (struct ggml_compute_state) {
|
|
||||||
.thrd = 0,
|
|
||||||
.params = {
|
|
||||||
.type = GGML_TASK_COMPUTE,
|
|
||||||
.ith = j + 1,
|
|
||||||
.nth = n_threads,
|
|
||||||
.wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
|
||||||
.wdata = cgraph->work ? cgraph->work->data : NULL,
|
|
||||||
},
|
|
||||||
.node = NULL,
|
|
||||||
.shared = &state_shared,
|
|
||||||
};
|
|
||||||
|
|
||||||
int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
|
|
||||||
GGML_ASSERT(rc == 0);
|
|
||||||
UNUSED(rc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize tasks + work buffer
|
// initialize tasks + work buffer
|
||||||
{
|
{
|
||||||
|
@ -14468,166 +14476,39 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create thread pool
|
||||||
|
if (n_threads > 1) {
|
||||||
|
for (int j = 1; j < n_threads; ++j) {
|
||||||
|
workers[j] = (struct ggml_compute_state) {
|
||||||
|
.thrd = 0,
|
||||||
|
.ith = j,
|
||||||
|
.shared = &state_shared,
|
||||||
|
};
|
||||||
|
|
||||||
|
int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
|
||||||
|
GGML_ASSERT(rc == 0);
|
||||||
|
UNUSED(rc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
workers[0].ith = 0;
|
||||||
|
workers[0].shared = &state_shared;
|
||||||
|
|
||||||
|
|
||||||
const int64_t perf_start_cycles = ggml_perf_cycles();
|
const int64_t perf_start_cycles = ggml_perf_cycles();
|
||||||
const int64_t perf_start_time_us = ggml_perf_time_us();
|
const int64_t perf_start_time_us = ggml_perf_time_us();
|
||||||
|
|
||||||
for (int i = 0; i < cgraph->n_nodes; i++) {
|
// this is a work thread too
|
||||||
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes);
|
ggml_graph_compute_thread(&workers[0]);
|
||||||
|
// don't leave affinity set on the main thread
|
||||||
struct ggml_tensor * node = cgraph->nodes[i];
|
clear_numa_thread_affinity();
|
||||||
|
|
||||||
// TODO: this could be used to avoid unnecessary computations, but it needs to be improved
|
|
||||||
//if (node->grad == NULL && node->perf_runs > 0) {
|
|
||||||
// continue;
|
|
||||||
//}
|
|
||||||
|
|
||||||
const int64_t perf_node_start_cycles = ggml_perf_cycles();
|
|
||||||
const int64_t perf_node_start_time_us = ggml_perf_time_us();
|
|
||||||
|
|
||||||
// INIT
|
|
||||||
struct ggml_compute_params params = {
|
|
||||||
/*.type =*/ GGML_TASK_INIT,
|
|
||||||
/*.ith =*/ 0,
|
|
||||||
/*.nth =*/ node->n_tasks,
|
|
||||||
/*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
|
||||||
/*.wdata =*/ cgraph->work ? cgraph->work->data : NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
ggml_compute_forward(¶ms, node);
|
|
||||||
|
|
||||||
// COMPUTE
|
|
||||||
if (node->n_tasks > 1) {
|
|
||||||
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
|
|
||||||
atomic_store(&state_shared.has_work, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.has_work)) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
// launch thread pool
|
|
||||||
for (int j = 0; j < n_threads - 1; j++) {
|
|
||||||
workers[j].params = (struct ggml_compute_params) {
|
|
||||||
.type = GGML_TASK_COMPUTE,
|
|
||||||
.ith = j + 1,
|
|
||||||
.nth = node->n_tasks,
|
|
||||||
.wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
|
||||||
.wdata = cgraph->work ? cgraph->work->data : NULL,
|
|
||||||
};
|
|
||||||
workers[j].node = node;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_fetch_sub(&state_shared.n_ready, 1);
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.n_ready) > 0) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store(&state_shared.has_work, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
params.type = GGML_TASK_COMPUTE;
|
|
||||||
ggml_compute_forward(¶ms, node);
|
|
||||||
|
|
||||||
// wait for thread pool
|
|
||||||
if (node->n_tasks > 1) {
|
|
||||||
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
|
|
||||||
atomic_store(&state_shared.has_work, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.has_work)) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_fetch_sub(&state_shared.n_ready, 1);
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.n_ready) != 0) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FINALIZE
|
|
||||||
if (node->n_tasks > 1 && GGML_OP_HAS_FINALIZE[node->op]) {
|
|
||||||
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
|
|
||||||
atomic_store(&state_shared.has_work, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.has_work)) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
// launch thread pool
|
|
||||||
for (int j = 0; j < n_threads - 1; j++) {
|
|
||||||
workers[j].params = (struct ggml_compute_params) {
|
|
||||||
.type = GGML_TASK_FINALIZE,
|
|
||||||
.ith = j + 1,
|
|
||||||
.nth = node->n_tasks,
|
|
||||||
.wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
|
|
||||||
.wdata = cgraph->work ? cgraph->work->data : NULL,
|
|
||||||
};
|
|
||||||
workers[j].node = node;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_fetch_sub(&state_shared.n_ready, 1);
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.n_ready) > 0) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store(&state_shared.has_work, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
params.type = GGML_TASK_FINALIZE;
|
|
||||||
ggml_compute_forward(¶ms, node);
|
|
||||||
|
|
||||||
// wait for thread pool
|
|
||||||
if (node->n_tasks > 1 && GGML_OP_HAS_FINALIZE[node->op]) {
|
|
||||||
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
|
|
||||||
atomic_store(&state_shared.has_work, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.has_work)) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_fetch_sub(&state_shared.n_ready, 1);
|
|
||||||
|
|
||||||
while (atomic_load(&state_shared.n_ready) != 0) {
|
|
||||||
ggml_lock_lock (&state_shared.spin);
|
|
||||||
ggml_lock_unlock(&state_shared.spin);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// performance stats (node)
|
|
||||||
{
|
|
||||||
int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles;
|
|
||||||
int64_t perf_time_us_cur = ggml_perf_time_us() - perf_node_start_time_us;
|
|
||||||
|
|
||||||
node->perf_runs++;
|
|
||||||
node->perf_cycles += perf_cycles_cur;
|
|
||||||
node->perf_time_us += perf_time_us_cur;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// join thread pool
|
// join thread pool
|
||||||
if (n_threads > 1) {
|
if (n_threads > 1) {
|
||||||
atomic_store(&state_shared.stop, true);
|
for (int j = 1; j < n_threads; j++) {
|
||||||
atomic_store(&state_shared.has_work, true);
|
|
||||||
|
|
||||||
for (int j = 0; j < n_threads - 1; j++) {
|
|
||||||
int rc = ggml_thread_join(workers[j].thrd, NULL);
|
int rc = ggml_thread_join(workers[j].thrd, NULL);
|
||||||
GGML_ASSERT(rc == 0);
|
GGML_ASSERT(rc == 0);
|
||||||
UNUSED(rc);
|
UNUSED(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
ggml_lock_destroy(&state_shared.spin);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// performance stats (graph)
|
// performance stats (graph)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue