llama : add pipeline parallelism support (#6017)

* llama : add pipeline parallelism support for batch processing with multiple CUDA GPUs

ggml-ci

* server : add -ub, --ubatch-size parameter

* fix server embedding test

* llama : fix Mamba inference for pipeline parallelism

Tested to work correctly with both `main` and `parallel` examples.

* llama : limit max batch size to n_batch

* add LLAMA_SCHED_MAX_COPIES to configure the number of input copies for pipeline parallelism
default increase to 4 (from 2)

changing this value may improve performance for some systems, but increases memory usage

* fix hip build

* fix sycl build (disable cpy_tensor_async)

* fix hip build

* llama : limit n_batch and n_ubatch to n_ctx during context creation

* llama : fix norm backend

* batched-bench : sync after decode

* swiftui : sync after decode

* ggml : allow ggml_get_rows to use multiple threads if they are available

* check n_ubatch >= n_tokens with non-casual attention

* llama : do not limit n_batch to n_ctx with non-casual attn

* server : construct batch with size of llama_n_batch

* ggml_backend_cpu_graph_compute : fix return value when alloc fails

* llama : better n_batch and n_ubatch comment

* fix merge

* small fix

* reduce default n_batch to 2048

---------

Co-authored-by: Francis Couture-Harpin <git@compilade.net>
Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>
This commit is contained in:
slaren 2024-03-13 18:54:21 +01:00 committed by GitHub
parent d8fd0ccf6a
commit f30ea47a87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1467 additions and 887 deletions

113
ggml.c
View file

@ -11560,8 +11560,6 @@ static void ggml_compute_forward_get_rows_q(
const struct ggml_tensor * src0 = dst->src[0];
const struct ggml_tensor * src1 = dst->src[1];
assert(params->ith == 0);
if (params->type == GGML_TASK_TYPE_INIT || params->type == GGML_TASK_TYPE_FINALIZE) {
return;
}
@ -11569,7 +11567,7 @@ static void ggml_compute_forward_get_rows_q(
GGML_TENSOR_BINARY_OP_LOCALS
const int64_t nc = ne00;
const int64_t nr = ggml_nelements(src1); GGML_UNUSED(nr);
const int64_t nr = ggml_nelements(src1);
const enum ggml_type type = src0->type;
ggml_to_float_t const dequantize_row_q = type_traits[type].to_float;
@ -11579,17 +11577,25 @@ static void ggml_compute_forward_get_rows_q(
assert(nb00 == ggml_type_size(type));
assert(ggml_nrows(dst) == nr);
// TODO: multi-thread
for (int64_t i12 = 0; i12 < ne12; ++i12) {
for (int64_t i11 = 0; i11 < ne11; ++i11) {
for (int64_t i10 = 0; i10 < ne10; ++i10) {
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
const int ith = params->ith;
const int nth = params->nth;
dequantize_row_q(
(const void *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03),
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3), nc);
}
}
// rows per thread
const int dr = (nr + nth - 1)/nth;
// row range for this thread
const int ir0 = dr*ith;
const int ir1 = MIN(ir0 + dr, nr);
for (int64_t i = ir0; i < ir1; ++i) {
const int64_t i12 = i/(ne11*ne10);
const int64_t i11 = (i - i12*ne11*ne10)/ne10;
const int64_t i10 = (i - i12*ne11*ne10 - i11*ne10);
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
dequantize_row_q(
(const void *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03),
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3), nc);
}
}
@ -11600,8 +11606,6 @@ static void ggml_compute_forward_get_rows_f16(
const struct ggml_tensor * src0 = dst->src[0];
const struct ggml_tensor * src1 = dst->src[1];
assert(params->ith == 0);
if (params->type == GGML_TASK_TYPE_INIT || params->type == GGML_TASK_TYPE_FINALIZE) {
return;
}
@ -11609,24 +11613,32 @@ static void ggml_compute_forward_get_rows_f16(
GGML_TENSOR_BINARY_OP_LOCALS
const int64_t nc = ne00;
const int64_t nr = ggml_nelements(src1); GGML_UNUSED(nr);
const int64_t nr = ggml_nelements(src1);
assert(ne0 == nc);
assert(ne02 == ne11);
assert(nb00 == sizeof(ggml_fp16_t));
assert(ggml_nrows(dst) == nr);
// TODO: multi-thread
for (int64_t i12 = 0; i12 < ne12; ++i12) {
for (int64_t i11 = 0; i11 < ne11; ++i11) {
for (int64_t i10 = 0; i10 < ne10; ++i10) {
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
const int ith = params->ith;
const int nth = params->nth;
ggml_fp16_to_fp32_row(
(const void *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03),
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3), nc);
}
}
// rows per thread
const int dr = (nr + nth - 1)/nth;
// row range for this thread
const int ir0 = dr*ith;
const int ir1 = MIN(ir0 + dr, nr);
for (int64_t i = ir0; i < ir1; ++i) {
const int64_t i12 = i/(ne11*ne10);
const int64_t i11 = (i - i12*ne11*ne10)/ne10;
const int64_t i10 = (i - i12*ne11*ne10 - i11*ne10);
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
ggml_fp16_to_fp32_row(
(const void *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03),
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3), nc);
}
}
@ -11637,8 +11649,6 @@ static void ggml_compute_forward_get_rows_f32(
const struct ggml_tensor * src0 = dst->src[0];
const struct ggml_tensor * src1 = dst->src[1];
assert(params->ith == 0);
if (params->type == GGML_TASK_TYPE_INIT || params->type == GGML_TASK_TYPE_FINALIZE) {
return;
}
@ -11646,24 +11656,32 @@ static void ggml_compute_forward_get_rows_f32(
GGML_TENSOR_BINARY_OP_LOCALS
const int64_t nc = ne00;
const int64_t nr = ggml_nelements(src1); GGML_UNUSED(nr);
const int64_t nr = ggml_nelements(src1);
assert(ne0 == nc);
assert(ne02 == ne11);
assert(nb00 == sizeof(float));
assert(ggml_nrows(dst) == nr);
// TODO: multi-thread
for (int64_t i12 = 0; i12 < ne12; ++i12) {
for (int64_t i11 = 0; i11 < ne11; ++i11) {
for (int64_t i10 = 0; i10 < ne10; ++i10) {
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
const int ith = params->ith;
const int nth = params->nth;
ggml_vec_cpy_f32(nc,
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3),
(float *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03));
}
}
// rows per thread
const int dr = (nr + nth - 1)/nth;
// row range for this thread
const int ir0 = dr*ith;
const int ir1 = MIN(ir0 + dr, nr);
for (int64_t i = ir0; i < ir1; ++i) {
const int64_t i12 = i/(ne11*ne10);
const int64_t i11 = (i - i12*ne11*ne10)/ne10;
const int64_t i10 = (i - i12*ne11*ne10 - i11*ne10);
const int64_t i01 = *(int32_t *) ((char *) src1->data + i10*nb10 + i11*nb11 + i12*nb12);
ggml_vec_cpy_f32(nc,
(float *) ((char *) dst->data + i10*nb1 + i11*nb2 + i12*nb3),
(float *) ((char *) src0->data + i01*nb01 + i11*nb02 + i12*nb03));
}
}
@ -17796,7 +17814,7 @@ static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const
node->perf_time_us += time_us_cur;
}
static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_threads) {
int n_tasks = 0;
switch (node->op) {
@ -17877,6 +17895,12 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
{
n_tasks = n_threads;
} break;
case GGML_OP_GET_ROWS:
{
// FIXME: the cost of launching additional threads decreases performance with GPU offloading
//n_tasks = MIN(n_threads, ggml_nelements(node->src[1]));
n_tasks = MIN(n_cur_threads, ggml_nelements(node->src[1]));
} break;
case GGML_OP_SCALE:
case GGML_OP_SET:
case GGML_OP_CONT:
@ -17884,7 +17908,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
case GGML_OP_VIEW:
case GGML_OP_PERMUTE:
case GGML_OP_TRANSPOSE:
case GGML_OP_GET_ROWS:
case GGML_OP_GET_ROWS_BACK:
case GGML_OP_DIAG:
{
@ -18102,7 +18125,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
/* FINALIZE */
struct ggml_tensor * node = cgraph->nodes[node_n];
if (GGML_OP_HAS_FINALIZE[node->op]) {
params.nth = ggml_get_n_tasks(node, n_threads);
params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
ggml_compute_forward(&params, node);
}
ggml_graph_compute_perf_stats_node(node, state->shared);
@ -18112,7 +18135,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
while (++node_n < cgraph->n_nodes) {
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads);
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
state->shared->perf_node_start_cycles = ggml_perf_cycles();
state->shared->perf_node_start_time_us = ggml_perf_time_us();
@ -18160,7 +18183,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
/* INIT & COMPUTE */
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads);
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
@ -18225,7 +18248,7 @@ struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threa
for (int i = 0; i < cgraph->n_nodes; i++) {
struct ggml_tensor * node = cgraph->nodes[i];
const int n_tasks = ggml_get_n_tasks(node, n_threads);
const int n_tasks = ggml_get_n_tasks(node, n_threads, 1);
max_tasks = MAX(max_tasks, n_tasks);