remove feature flag DISABLE_GGML_COMPUTE_SPIN_V2

This commit is contained in:
mqy 2023-04-10 02:37:31 +08:00
parent 9603f7f5bf
commit faa3dde7b8
2 changed files with 3 additions and 179 deletions

View file

@ -31,7 +31,7 @@ endif
# #
# keep standard at C11 and C++11 # keep standard at C11 and C++11
CFLAGS = -I. -O3 -DNDEBUG -std=c11 -fPIC #-DDISABLE_GGML_COMPUTE_SPIN_V2 CFLAGS = -I. -O3 -DNDEBUG -std=c11 -fPIC
CXXFLAGS = -I. -I./examples -O3 -DNDEBUG -std=c++11 -fPIC CXXFLAGS = -I. -I./examples -O3 -DNDEBUG -std=c++11 -fPIC
LDFLAGS = LDFLAGS =

180
ggml.c
View file

@ -9239,28 +9239,15 @@ typedef pthread_t ggml_thread_t;
#endif #endif
// To rollback quickly, set `-DDISABLE_GGML_COMPUTE_SPIN_V2` to `CFLAGS` in Makefile.
// TODO(mqy): cleanup feature flag DISABLE_GGML_COMPUTE_SPIN_V2.
struct ggml_compute_state_shared { struct ggml_compute_state_shared {
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
ggml_lock_t spin;
#endif
int n_threads; int n_threads;
// synchronization primitives // synchronization primitives
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
atomic_int n_ready;
atomic_bool has_work;
atomic_bool stop; // stop all threads
#else
// The `flag` works as work counter + stop indicator. // The `flag` works as work counter + stop indicator.
// > 0: main thread store initial value, every worker decrease it by 1. // > 0: main thread stores initial value, every worker decreases it by 1.
// = 0: all done. // = 0: all done.
// < 0: stop now. // < 0: stop now.
atomic_int flag; atomic_int flag;
#endif
}; };
struct ggml_compute_state { struct ggml_compute_state {
@ -9275,61 +9262,16 @@ struct ggml_compute_state {
static thread_ret_t ggml_graph_compute_thread(void * data) { static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data; struct ggml_compute_state * state = (struct ggml_compute_state *) data;
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
const int n_threads = state->shared->n_threads;
#endif
while (true) { while (true) {
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
int flag = atomic_load(&state->shared->flag); int flag = atomic_load(&state->shared->flag);
if (flag < 0) return NULL; // stop if (flag < 0) return NULL; // stop
if (flag > 0) { // pending works if (flag > 0) { // pending works
if (state->node) { // my work if (state->node) { // my work
GGML_ASSERT (state->params.ith < state->params.nth);
ggml_compute_forward(&state->params, state->node); ggml_compute_forward(&state->params, state->node);
state->node = NULL; state->node = NULL;
atomic_fetch_sub(&state->shared->flag, 1); // done atomic_fetch_sub(&state->shared->flag, 1); // done
} }
} }
#else
if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) {
atomic_store(&state->shared->has_work, false);
} else {
while (atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ggml_lock_lock (&state->shared->spin);
ggml_lock_unlock(&state->shared->spin);
}
}
atomic_fetch_sub(&state->shared->n_ready, 1);
// wait for work
while (!atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ggml_lock_lock (&state->shared->spin);
ggml_lock_unlock(&state->shared->spin);
}
// check if we should stop
if (atomic_load(&state->shared->stop)) {
break;
}
if (state->node) {
if (state->params.ith < state->params.nth) {
ggml_compute_forward(&state->params, state->node);
}
state->node = NULL;
} else {
break;
}
#endif
} }
return 0; return 0;
@ -9339,28 +9281,13 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
const int n_threads = cgraph->n_threads; const int n_threads = cgraph->n_threads;
struct ggml_compute_state_shared state_shared = { struct ggml_compute_state_shared state_shared = {
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
/*.spin =*/ GGML_LOCK_INITIALIZER,
#endif
/*.n_threads =*/ n_threads, /*.n_threads =*/ n_threads,
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
/*.flag =*/ 0, /*.flag =*/ 0,
#else
/*.n_ready =*/ 0,
/*.has_work =*/ false,
/*.stop =*/ false,
#endif
}; };
struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL; struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL;
// create thread pool // create thread pool
if (n_threads > 1) { if (n_threads > 1) {
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
ggml_lock_init(&state_shared.spin);
atomic_store(&state_shared.has_work, true);
#endif
for (int j = 0; j < n_threads - 1; j++) { for (int j = 0; j < n_threads - 1; j++) {
workers[j] = (struct ggml_compute_state) { workers[j] = (struct ggml_compute_state) {
.thrd = 0, .thrd = 0,
@ -9615,23 +9542,8 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// COMPUTE // COMPUTE
if (node->n_tasks > 1) { if (node->n_tasks > 1) {
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
#endif
// launch thread pool // launch thread pool
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
for (int j = 0; j < node->n_tasks - 1; j++) { for (int j = 0; j < node->n_tasks - 1; j++) {
#else
for (int j = 0; j < n_threads - 1; j++) {
#endif
workers[j].params = (struct ggml_compute_params) { workers[j].params = (struct ggml_compute_params) {
.type = GGML_TASK_COMPUTE, .type = GGML_TASK_COMPUTE,
.ith = j + 1, .ith = j + 1,
@ -9642,18 +9554,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
workers[j].node = node; workers[j].node = node;
} }
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2 atomic_store(&state_shared.flag, node->n_tasks - 1);
atomic_store(&state_shared.flag, node->n_tasks-1);
#else
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) > 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
atomic_store(&state_shared.has_work, true);
#endif
} }
params.type = GGML_TASK_COMPUTE; params.type = GGML_TASK_COMPUTE;
@ -9661,66 +9562,16 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// wait for thread pool // wait for thread pool
if (node->n_tasks > 1) { if (node->n_tasks > 1) {
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
while (atomic_load(&state_shared.flag) != 0) {} while (atomic_load(&state_shared.flag) != 0) {}
#else
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) != 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
#endif
} }
// FINALIZE // FINALIZE
if (node->n_tasks > 1) { if (node->n_tasks > 1) {
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
for (int j = 0; j < node->n_tasks-1; j++) { for (int j = 0; j < node->n_tasks-1; j++) {
workers[j].params.type = GGML_TASK_FINALIZE; workers[j].params.type = GGML_TASK_FINALIZE;
workers[j].node = node; workers[j].node = node;
} }
atomic_store(&state_shared.flag, node->n_tasks-1); atomic_store(&state_shared.flag, node->n_tasks-1);
#else
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
// launch thread pool
for (int j = 0; j < n_threads - 1; j++) {
workers[j].params = (struct ggml_compute_params) {
.type = GGML_TASK_FINALIZE,
.ith = j + 1,
.nth = node->n_tasks,
.wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
.wdata = cgraph->work ? cgraph->work->data : NULL,
};
workers[j].node = node;
}
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) > 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
atomic_store(&state_shared.has_work, true);
#endif
} }
params.type = GGML_TASK_FINALIZE; params.type = GGML_TASK_FINALIZE;
@ -9728,25 +9579,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// wait for thread pool // wait for thread pool
if (node->n_tasks > 1) { if (node->n_tasks > 1) {
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
while (atomic_load(&state_shared.flag) != 0) {} while (atomic_load(&state_shared.flag) != 0) {}
#else
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) != 0) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
#endif
} }
// performance stats (node) // performance stats (node)
@ -9762,22 +9595,13 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
// join thread pool // join thread pool
if (n_threads > 1) { if (n_threads > 1) {
#ifndef DISABLE_GGML_COMPUTE_SPIN_V2
atomic_store(&state_shared.flag, -1); atomic_store(&state_shared.flag, -1);
#else
atomic_store(&state_shared.stop, true);
atomic_store(&state_shared.has_work, true);
#endif
for (int j = 0; j < n_threads - 1; j++) { for (int j = 0; j < n_threads - 1; j++) {
int rc = ggml_thread_join(workers[j].thrd, NULL); int rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == 0); GGML_ASSERT(rc == 0);
UNUSED(rc); UNUSED(rc);
} }
#ifdef DISABLE_GGML_COMPUTE_SPIN_V2
ggml_lock_destroy(&state_shared.spin);
#endif
} }
// performance stats (graph) // performance stats (graph)