From 19e78d29b0c079ef6251a78e2a3475940b2ac9ec Mon Sep 17 00:00:00 2001 From: Branden Butler Date: Mon, 20 May 2024 22:37:14 -0500 Subject: [PATCH] Fix non-CPU backend wrapping --- ggml-backend-impl.h | 2 +- ggml-backend.c | 22 ++- ggml-cuda.cu | 2 +- ggml-mpi.cpp | 359 +++++++++++++++++++++++++++++++------------- ggml-mpi.h | 3 + llama.cpp | 37 +++-- 6 files changed, 305 insertions(+), 120 deletions(-) diff --git a/ggml-backend-impl.h b/ggml-backend-impl.h index f121e1de4..15e703200 100644 --- a/ggml-backend-impl.h +++ b/ggml-backend-impl.h @@ -86,7 +86,7 @@ extern "C" { // (optional) asynchronous tensor data access void (*GGML_CALL set_tensor_async)(ggml_backend_t backend, struct ggml_tensor * tensor, const void * data, size_t offset, size_t size); void (*GGML_CALL get_tensor_async)(ggml_backend_t backend, const struct ggml_tensor * tensor, void * data, size_t offset, size_t size); - bool (*GGML_CALL cpy_tensor_async)(ggml_backend_t backend_src, ggml_backend_t backend_dst, const struct ggml_tensor * src, struct ggml_tensor * dst); + bool (*GGML_CALL cpy_tensor_async)(ggml_backend_t backend_src, ggml_backend_t backend_dst, struct ggml_tensor * src, struct ggml_tensor * dst); // (optional) complete all pending operations void (*GGML_CALL synchronize)(ggml_backend_t backend); diff --git a/ggml-backend.c b/ggml-backend.c index 7e07d5d71..c7069836c 100644 --- a/ggml-backend.c +++ b/ggml-backend.c @@ -41,6 +41,8 @@ GGML_CALL size_t ggml_backend_buft_get_alloc_size(ggml_backend_buffer_type_t buf assert(size >= ggml_nbytes(tensor)); return size; } +// fprintf(stderr, "Getting alloc size of buft (%s)\n", ggml_backend_buft_name(buft)); +// fprintf(stderr, "Getting alloc size of tensor (%s)\n", tensor->name); return ggml_nbytes(tensor); } @@ -313,6 +315,8 @@ static bool ggml_are_same_layout(const struct ggml_tensor * a, const struct ggml void ggml_backend_tensor_copy(struct ggml_tensor * src, struct ggml_tensor * dst) { GGML_ASSERT(ggml_are_same_layout(src, dst) && "cannot copy tensors with different layouts"); +// fprintf(stderr, "Attempting copy from %s to %s\n", src->name, dst->name); + if (src == dst) { return; } @@ -340,6 +344,9 @@ void ggml_backend_tensor_copy_async(ggml_backend_t backend_src, ggml_backend_t b return; } +// fprintf(stderr, "Attempting async copy from %s to %s with src backend %s and dst backend %s\n", src->name, dst->name, ggml_backend_name(backend_src), ggml_backend_name(backend_dst)); + + if (backend_dst->iface.cpy_tensor_async != NULL) { if (backend_dst->iface.cpy_tensor_async(backend_src, backend_dst, src, dst)) { return; @@ -1101,8 +1108,10 @@ static int ggml_backend_sched_backend_from_buffer(ggml_backend_sched_t sched, co // find highest prio backend that supports the buffer type for (int i = 0; i < sched->n_backends; i++) { - if (ggml_backend_buft_supports_backend(buffer->buft, sched->backends[i])) { + if (ggml_backend_buft_supports_backend(buffer->buft, sched->backends[i]) && ggml_backend_supports_op(sched->backends[i], tensor)) { return i; + } else { +// fprintf(stderr, "Buffer type %s does not support backend %s\n", ggml_backend_buft_name(buffer->buft), ggml_backend_name(sched->backends[i])); } } @@ -1204,7 +1213,7 @@ static void ggml_backend_sched_print_assignments(ggml_backend_sched_t sched, str continue; } ggml_backend_t tensor_backend = ggml_backend_sched_get_tensor_backend(sched, node); - fprintf(stderr, "node #%3d (%10.10s): %20.20s (%5.5s) [%5.5s %8.8s]:", i, ggml_op_name(node->op), node->name, + fprintf(stderr, "node #%3d (%10.10s): %30.30s (%5.5s) [%30.30s %8.8s]:", i, ggml_op_name(node->op), node->name, fmt_size(ggml_nbytes(node)), tensor_backend ? ggml_backend_name(tensor_backend) : "NULL", GET_CAUSE(node)); for (int j = 0; j < GGML_MAX_SRC; j++) { struct ggml_tensor * src = node->src[j]; @@ -1212,7 +1221,7 @@ static void ggml_backend_sched_print_assignments(ggml_backend_sched_t sched, str continue; } ggml_backend_t src_backend = ggml_backend_sched_get_tensor_backend(sched, src); - fprintf(stderr, " %20.20s (%5.5s) [%5.5s %8.8s]", src->name, + fprintf(stderr, " %40.40s (%5.5s) [%30.30s %8.8s]", src->name, fmt_size(ggml_nbytes(src)), src_backend ? ggml_backend_name(src_backend) : "NULL", GET_CAUSE(src)); } fprintf(stderr, "\n"); @@ -1552,12 +1561,17 @@ static void ggml_backend_sched_split_graph(ggml_backend_sched_t sched, struct gg const size_t input_id = hash_id(input); struct ggml_tensor * input_cpy = sched->tensor_copies[input_id][split->backend_id][sched->cur_copy]; +// fprintf(stderr, "Creating input copy for input %s and copy %s\n", input->name, input_cpy->name); + // add a dependency to the input source so that it is not freed before the copy is done struct ggml_tensor * input_dep = ggml_view_tensor(sched->ctx, input); input_dep->src[0] = input; sched->node_backend_ids[graph_copy->n_nodes] = sched->tensor_backend_id[input_id]; graph_copy->nodes[graph_copy->n_nodes++] = input_dep; +// fprintf(stderr, "Input dep src: %s\n", input_dep->src[0]->name); + + // add a dependency to the input copy so that it is allocated at the start of the split sched->node_backend_ids[graph_copy->n_nodes] = split->backend_id; graph_copy->nodes[graph_copy->n_nodes++] = input_cpy; @@ -1640,6 +1654,8 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s struct ggml_tensor * input = split->inputs[j]; struct ggml_tensor * input_cpy = sched->tensor_copies[hash_id(input)][split_backend_id][sched->cur_copy]; +// fprintf(stderr, "Split input: %s, input copy: %s\n", input->name, input_cpy->name); + if (input->flags & GGML_TENSOR_FLAG_INPUT) { // inputs from the user must be copied immediately to prevent the user overwriting the data before the copy is done if (sched->events[split_backend_id][sched->cur_copy] != NULL) { diff --git a/ggml-cuda.cu b/ggml-cuda.cu index 139025588..815484b98 100644 --- a/ggml-cuda.cu +++ b/ggml-cuda.cu @@ -11154,7 +11154,7 @@ GGML_CALL static void ggml_backend_cuda_get_tensor_async(ggml_backend_t backend, CUDA_CHECK(cudaMemcpyAsync(data, (const char *)tensor->data + offset, size, cudaMemcpyDeviceToHost, g_cudaStreams[cuda_ctx->device][0])); } -GGML_CALL static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const ggml_tensor * src, ggml_tensor * dst) { +GGML_CALL static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, ggml_tensor * src, ggml_tensor * dst) { GGML_ASSERT(ggml_backend_is_cuda(backend_src) || ggml_backend_is_cuda(backend_dst)); ggml_backend_buffer_t buf_src = src->view_src ? src->view_src->buffer : src->buffer; diff --git a/ggml-mpi.cpp b/ggml-mpi.cpp index f8c87f2d6..14d305cea 100644 --- a/ggml-mpi.cpp +++ b/ggml-mpi.cpp @@ -44,7 +44,7 @@ void ggml_mpi_backend_init(void) { have_init = true; const int buffer_size = 128*1024*1024*8; send_buffer = calloc(1, buffer_size); // 128MB buffer -// fprintf(stderr, "BUFFER ATTACH RETCODE=%d\n", MPI_Buffer_attach(send_buffer, buffer_size)); + fprintf(stderr, "BUFFER ATTACH RETCODE=%d\n", MPI_Buffer_attach(send_buffer, buffer_size)); } void ggml_mpi_sync_pipelined( @@ -230,7 +230,6 @@ void ggml_mpi_eval_init( GGML_ASSERT(n_tokens != nullptr); - // FIXME Syncing n_seq_ids causes MPI to throw an invalid buffer error in Bsend ggml_mpi_sync_pipelined(ctx_mpi, *n_seq_ids, *n_tokens, MPI_INT32_T, GGML_MPI_N_SEQ_IDS); // We need to know the total number of sequence @@ -308,7 +307,9 @@ static void ggml_mpi_tensor_send(const struct ggml_tensor * t, const void* data, MPI_Comm_rank(comm, &rank); // fprintf(stderr, "Sending tensor %s (buffer %s) from %d to %d\n", t->name, ggml_backend_buffer_name(t->buffer), rank, mpi_rank_dst); - const int retval = MPI_Send(data, ggml_nelements(t), mpi_type, mpi_rank_dst, 0, comm); + GGML_ASSERT(rank != mpi_rank_dst); + + const int retval = MPI_Bsend(data, ggml_nelements(t), mpi_type, mpi_rank_dst, 0, comm); GGML_ASSERT(retval == MPI_SUCCESS); } @@ -330,6 +331,9 @@ static void ggml_mpi_tensor_recv(const struct ggml_tensor * t, void * data, int int rank; MPI_Comm_rank(comm, &rank); // fprintf(stderr, "Receiving tensor %s (buffer %s) from %d at %d\n", t->name, ggml_backend_buffer_name(t->buffer), mpi_rank_src, rank); + + GGML_ASSERT(rank != mpi_rank_src); + const int retval = MPI_Recv(data, ggml_nelements(t), mpi_type, mpi_rank_src, MPI_ANY_TAG, comm, &status); GGML_ASSERT(retval == MPI_SUCCESS); } @@ -358,7 +362,7 @@ uint16_t** ggml_mpi_split_range( for (int i=0; i < ctx_mpi->size; i++) { ranges[i][0] = next_layer; ranges[i][1] = MIN(end, ranges[i][0] + (node_weights[i] * range_length) + start); - next_layer = ranges[i][1]; + next_layer = ranges[i][1]+1; } // ranges[0][0] = next_layer; @@ -460,28 +464,44 @@ int ggml_backend_mpi_rank(ggml_backend_t backend) { return ctx->rank; } -ggml_backend_buffer_t ggml_backend_mpi_buffer_unwrap(ggml_backend_buffer_t buffer) { - auto * ctx = (ggml_backend_mpi_buffer_context *) buffer->context; - - ggml_backend_buffer_t wrapped_buffer = ctx->wrapped_buffer; - wrapped_buffer->usage = buffer->usage; - wrapped_buffer->size = buffer->size; - return wrapped_buffer; - -} +GGML_CALL static const char * ggml_backend_mpi_buffer_name(ggml_backend_buffer_t buffer); ggml_backend_buffer_type_t ggml_backend_mpi_buffer_type_unwrap(ggml_backend_buffer_type_t buft) { + GGML_ASSERT(buft != nullptr); auto * ctx = (ggml_backend_mpi_buffer_type_context *) buft->context; + GGML_ASSERT(ctx != nullptr); + ggml_backend_buffer_type_t wrapped_buffer_type = ctx->wrapped_buffer_type; return wrapped_buffer_type; } +ggml_backend_buffer_t ggml_backend_mpi_buffer_unwrap(ggml_backend_buffer_t buffer) { + GGML_ASSERT(buffer != nullptr); +// fprintf(stderr, "Attempting unwrap of %s\n", ggml_backend_buffer_name(buffer)); +// if(buffer->iface.get_name != ggml_backend_mpi_buffer_name) { +// return buffer; +// } + auto * ctx = (ggml_backend_mpi_buffer_context *) buffer->context; + GGML_ASSERT(ctx != nullptr); + ggml_backend_buffer_t wrapped_buffer = ctx->wrapped_buffer; + GGML_ASSERT(wrapped_buffer != nullptr); + wrapped_buffer->usage = buffer->usage; + wrapped_buffer->size = buffer->size; + if (wrapped_buffer->buft->iface.get_name == ggml_backend_mpi_buffer_type_name) { + wrapped_buffer->buft = ggml_backend_mpi_buffer_type_unwrap(wrapped_buffer->buft); + } + return wrapped_buffer; + +} + + + GGML_CALL static const char * ggml_backend_mpi_buffer_name(ggml_backend_buffer_t buffer) { - - + GGML_ASSERT(buffer != nullptr); + GGML_ASSERT(ggml_backend_mpi_buffer_unwrap(buffer) != nullptr && ggml_backend_mpi_buffer_unwrap(buffer)->iface.get_name != ggml_backend_mpi_buffer_name); return strdup( ( @@ -529,20 +549,37 @@ GGML_CALL void ggml_backend_mpi_buffer_copy_ctx_from_type(ggml_backend_buffer_ty } } +ggml_backend_buffer_type_t ggml_backend_mpi_buffer_type_set_wrapped_buffer_type(ggml_backend_buffer_type_t orig, ggml_backend_buffer_type_t buft) { + if (orig->iface.get_name == ggml_backend_mpi_buffer_type_name) { + ((ggml_backend_mpi_buffer_type_context*)(orig->context))->wrapped_buffer_type = buft; + } else { + GGML_ASSERT(!"Original buffer type must be an MPI buffer type."); + } + + return orig; + +} + +ggml_backend_buffer_t ggml_backend_mpi_set_wrapped_buffer(ggml_backend_buffer_t orig, ggml_backend_buffer_t buf) { + GGML_ASSERT(buf != nullptr); + GGML_ASSERT(buf->iface.get_name != ggml_backend_mpi_buffer_name); + if (orig->iface.get_name == ggml_backend_mpi_buffer_name) { + ((ggml_backend_mpi_buffer_context*)(orig->context))->wrapped_buffer = buf; + if (orig->buft != nullptr) { + ggml_backend_mpi_buffer_type_set_wrapped_buffer_type(orig->buft, buf->buft); + } + } else { + fprintf(stderr, "Original buffer name: %s\n", ggml_backend_buffer_name(orig)); + GGML_ASSERT(!"Original buffer must be an MPI buffer."); + + } + return orig; +} + GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) { struct ggml_mpi_context * ctx = (ggml_mpi_context *) backend->context; - std::vector backend_buft; - for (auto *curr_backend: ctx->backends) { - if (ggml_backend_is_cpu(curr_backend)) { - // use host buffers for the CPU backend compute buffer - backend_buft.push_back(ggml_backend_cpu_buffer_type()); - } else { - backend_buft.push_back(ggml_backend_get_default_buffer_type(curr_backend)); - } - } - std::vector>> old_buffs( cgraph->n_nodes); @@ -557,6 +594,7 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t if (src == nullptr) { break; } +// fprintf(stderr, "Previous source: %s\n", src->name); old_buffs[i].second.push_back(src->buffer); } @@ -584,6 +622,8 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t if (src->buffer->buft->iface.get_name == ggml_backend_mpi_buffer_type_name) { n_srcs++; src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer); +// fprintf(stderr, "After unwrapping source: %s\n", src->name); + } } @@ -611,19 +651,19 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t if (!ctx->remote) { - ggml_backend_sched_t sched = ggml_backend_sched_new(ctx->backends.data(), backend_buft.data(), + ggml_backend_sched_t sched = ggml_backend_sched_new(ctx->backends.data(), nullptr, (int) ctx->backends.size(), cgraph->n_nodes + cgraph->n_leafs + n_srcs, false); - ggml_backend_sched_reserve(sched, cgraph); - ggml_backend_sched_graph_compute_async(sched, cgraph); +// ggml_backend_sched_reserve(sched, cgraph); + ggml_backend_sched_graph_compute(sched, cgraph); ggml_backend_sched_free(sched); } for (int i = 0; i < cgraph->n_nodes; i++) { - cgraph->nodes[i]->buffer = ggml_backend_mpi_wrap_buffer(cgraph->nodes[i]->buffer); - - ggml_backend_mpi_buffer_set_rank(cgraph->nodes[i]->buffer, ggml_backend_mpi_buffer_rank(old_buffs[i].first)); + if (cgraph->nodes[i]->buffer->iface.get_name != ggml_backend_mpi_buffer_name) { + cgraph->nodes[i]->buffer = ggml_backend_mpi_set_wrapped_buffer(old_buffs[i].first, cgraph->nodes[i]->buffer); + } for (int iter = 0; iter < GGML_MAX_SRC; iter++) { @@ -632,20 +672,22 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t break; } +// fprintf(stderr, "After compute src: %s\n", src_node->name); + if (src_node->buffer->iface.get_name == ggml_backend_mpi_buffer_name) { continue; } - src_node->buffer = ggml_backend_mpi_wrap_buffer(src_node->buffer); + src_node->buffer = ggml_backend_mpi_set_wrapped_buffer(old_buffs[i].second[iter], src_node->buffer); + +// fprintf(stderr, "After setting wrapped buffer src: %s\n", src_node->name); - ggml_backend_mpi_buffer_set_rank(src_node->buffer, ggml_backend_mpi_buffer_rank(old_buffs[i].second[iter])); } if(cgraph->nodes[i]->view_src != nullptr && cgraph->nodes[i]->view_src->buffer->buft != nullptr) { if (old_view_buffs[i] != nullptr) { - if (old_view_buffs[i]->iface.get_name == ggml_backend_mpi_buffer_name) { - ggml_backend_mpi_buffer_set_rank(cgraph->nodes[i]->view_src->buffer, - ggml_backend_mpi_buffer_rank(old_view_buffs[i])); + if (old_view_buffs[i]->iface.get_name == ggml_backend_mpi_buffer_name && cgraph->nodes[i]->view_src->buffer->iface.get_name != ggml_backend_mpi_buffer_name) { + cgraph->nodes[i]->view_src->buffer = ggml_backend_mpi_set_wrapped_buffer(old_view_buffs[i], cgraph->nodes[i]->view_src->buffer); } } } @@ -655,6 +697,7 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t // FIXME check if this is correct or not (it's probably not) for (int i = 0; i < cgraph->n_leafs; i++) { + GGML_ASSERT(false); cgraph->leafs[i]->buffer = ggml_backend_mpi_wrap_buffer(cgraph->leafs[i]->buffer); ggml_backend_mpi_buffer_type_set_rank(cgraph->leafs[i]->buffer->buft, ctx->rank); } @@ -681,28 +724,15 @@ static void ggml_backend_mpi_free(ggml_backend_t backend) { static ggml_backend_buffer_type_t ggml_backend_mpi_get_default_buffer_type(ggml_backend_t backend) { auto * ctx = static_cast(backend->context); - if (ctx->backends.empty()) { - auto * buff = ggml_backend_mpi_wrap_buffer_type(ggml_backend_cpu_buffer_type()); - ggml_backend_mpi_buffer_type_set_rank(buff, ctx->rank); - return buff; - } - auto * buff = ggml_backend_mpi_wrap_buffer_type(ctx->backends.back()->iface.get_default_buffer_type(ctx->backends.back())); + + auto * buff = ggml_backend_mpi_wrap_buffer_type(ctx->backends.front()->iface.get_default_buffer_type(ctx->backends.front())); ggml_backend_mpi_buffer_type_set_rank(buff, ctx->rank); return buff; } GGML_CALL static bool ggml_backend_mpi_supports_op(ggml_backend_t backend, const struct ggml_tensor * op) { - switch (op->op) { - case GGML_OP_CPY: - return op->type != GGML_TYPE_IQ2_XXS && op->type != GGML_TYPE_IQ2_XS; // missing type_traits.from_float - case GGML_OP_MUL_MAT: - return op->src[1]->type == GGML_TYPE_F32 || op->src[1]->type == ggml_internal_get_type_traits(op->src[0]->type).vec_dot_type; - default: - return true; - } - - GGML_UNUSED(backend); + return ggml_backend_supports_op(((ggml_mpi_context *) backend->context)->backends.front(),op); } @@ -760,11 +790,13 @@ GGML_CALL static size_t ggml_backend_mpi_buffer_type_get_max_size(ggml_backend_b GGML_CALL static size_t ggml_backend_mpi_buffer_type_get_alloc_size(ggml_backend_buffer_type_t buft, const struct ggml_tensor * tensor) { // Have to do this instead of calling ggml_backend_type_get_alloc_size because that signature doesn't have const on tensor - return ggml_backend_mpi_buffer_type_unwrap(buft)->iface.get_alloc_size(ggml_backend_mpi_buffer_type_unwrap(buft), tensor); + size_t ret = ggml_backend_mpi_buffer_type_unwrap(buft)->iface.get_alloc_size(ggml_backend_mpi_buffer_type_unwrap(buft), tensor); + return ret; } GGML_CALL static bool ggml_backend_mpi_buffer_type_supports_backend(ggml_backend_buffer_type_t buft, ggml_backend_t backend) { - return backend != nullptr && ggml_backend_is_mpi(backend) && ggml_backend_mpi_buffer_type_rank(buft) == ggml_backend_mpi_rank(backend); + return backend != nullptr && ggml_backend_is_mpi(backend) && ggml_backend_mpi_buffer_type_rank(buft) == ggml_backend_mpi_rank(backend) + && ggml_backend_buft_supports_backend(ggml_backend_mpi_buffer_type_unwrap(buft), ((ggml_mpi_context*)backend->context)->backends.front()); } GGML_CALL static bool ggml_backend_mpi_buffer_type_is_host(ggml_backend_buffer_type_t buft) { @@ -779,15 +811,12 @@ static std::map cached_buffer_wrap static std::map cached_backends; + + GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_backend_buffer_type_t buft) { -// if (cached_wrappers.find(buft) != cached_wrappers.end()) { -// fprintf(stderr, "Returning cached buffer type with name %s\n", cached_wrappers[buft]->iface.get_name(cached_wrappers[buft])); -// -// auto * ret = new ggml_backend_buffer_type; -// *ret = *cached_wrappers[buft]; -// return ret; -// } + GGML_ASSERT(buft->iface.get_name != ggml_backend_mpi_buffer_type_name); + ggml_backend_buffer_type_i ggml_backend_mpi_buffer_type_interface = { /* .get_name = */ ggml_backend_mpi_buffer_type_name, @@ -813,11 +842,25 @@ GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_back // Set rank to 0 as default ggml_backend_mpi_buffer_type_set_rank(ggml_backend_wrapped_buffer_type, 0); - cached_wrappers[buft] = ggml_backend_wrapped_buffer_type; return ggml_backend_wrapped_buffer_type; } +GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type_cached(ggml_backend_buffer_type_t buft) { + if (cached_wrappers.find(buft) != cached_wrappers.end()) { +// fprintf(stderr, "Returning cached buffer type with name %s\n", +// cached_wrappers[buft]->iface.get_name(cached_wrappers[buft])); + + + return cached_wrappers[buft]; + } + + auto * ggml_backend_wrapped_buffer_type = ggml_backend_mpi_wrap_buffer_type(buft); + cached_wrappers[buft] = ggml_backend_wrapped_buffer_type; + return ggml_backend_wrapped_buffer_type; + +} + GGML_CALL static void * ggml_backend_mpi_buffer_get_base(ggml_backend_buffer_t buffer) { @@ -846,8 +889,12 @@ GGML_CALL static void ggml_backend_mpi_buffer_get_tensor(ggml_backend_buffer_t b int src_rank = ggml_backend_mpi_buffer_rank(tensor->buffer); - if (rank != src_rank) { +// if (ggml_backend_mpi_buffer_rank(buffer) != ggml_backend_mpi_buffer_local_rank(buffer)) { +// return; +// } + if (rank != src_rank) { +// fprintf(stderr, "Getting tensor: %s, buffer %s\n", tensor->name, ggml_backend_buffer_name(buffer)); ggml_mpi_tensor_recv(tensor, data, ggml_backend_mpi_buffer_rank(tensor->buffer), ggml_backend_mpi_buffer_get_comm(tensor->buffer)); return; } @@ -856,7 +903,7 @@ GGML_CALL static void ggml_backend_mpi_buffer_get_tensor(ggml_backend_buffer_t b } GGML_CALL static bool ggml_backend_mpi_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const struct ggml_tensor * src, struct ggml_tensor * dst) { - if (ggml_backend_mpi_buffer_rank(src->buffer) == ggml_backend_mpi_buffer_rank(dst->buffer)) { + if (ggml_backend_mpi_buffer_rank(src->buffer) == ggml_backend_mpi_buffer_rank(dst->buffer) && ggml_backend_mpi_buffer_local_rank(buffer) == ggml_backend_mpi_buffer_rank(src->buffer)) { return ggml_backend_mpi_buffer_unwrap(buffer)->iface.cpy_tensor(ggml_backend_mpi_buffer_unwrap(buffer), src, dst); } @@ -868,20 +915,66 @@ GGML_CALL static void ggml_backend_mpi_buffer_clear(ggml_backend_buffer_t buffer return ggml_backend_mpi_buffer_unwrap(buffer)->iface.clear(ggml_backend_mpi_buffer_unwrap(buffer), value); } -static struct ggml_backend_buffer_i mpi_backend_buffer_i = { - /* .get_name = */ ggml_backend_mpi_buffer_name, - /* .free_buffer = */ ggml_backend_mpi_buffer_free_buffer, - /* .get_base = */ ggml_backend_mpi_buffer_get_base, - /* .init_tensor = */ NULL, // no initialization required - /* .set_tensor = */ ggml_backend_mpi_buffer_set_tensor, - /* .get_tensor = */ ggml_backend_mpi_buffer_get_tensor, - /* .cpy_tensor = */ ggml_backend_mpi_buffer_cpy_tensor, - /* .clear = */ ggml_backend_mpi_buffer_clear, - /* .reset = */ NULL, -}; +GGML_CALL static void ggml_backend_mpi_buffer_init_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor) { +// fprintf(stderr, "Init tensor with buffer %s, tensor %s, tensor buffer %s, tensor view src %s, tensor vs buff %s\n", +// ggml_backend_buffer_name(buffer), tensor->name, ggml_backend_buffer_name(tensor->buffer), tensor->view_src != +// nullptr ? tensor->view_src->name : "", tensor->view_src != nullptr ? ggml_backend_buffer_name(tensor->view_src->buffer) : ""); + auto *orig_buffer = tensor->buffer; + tensor->buffer = ggml_backend_mpi_buffer_unwrap(tensor->buffer); + + bool view_src_null = tensor->view_src == nullptr; + ggml_backend_buffer_t orig_view_src_buffer = nullptr; + if (!view_src_null) { + orig_view_src_buffer = tensor->view_src->buffer; + tensor->view_src->buffer = ggml_backend_mpi_buffer_unwrap(tensor->view_src->buffer); + } + + std::vector orig_src_buffers(0); + for (auto & src : tensor->src) { + if (src == nullptr) { + break; + } + + + orig_src_buffers.push_back(src->buffer); + + if (src->buffer != nullptr && src->buffer->iface.get_name == ggml_backend_mpi_buffer_name) { + src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer); + } + } + + + ggml_backend_buffer_init_tensor(ggml_backend_mpi_buffer_unwrap(buffer), tensor); + tensor->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_buffer, tensor->buffer); + if (!view_src_null) { + tensor->view_src->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_view_src_buffer, tensor->view_src->buffer); + } + + for (size_t i = 0; i < orig_src_buffers.size(); i++) { + if (orig_src_buffers[i]->iface.get_name == ggml_backend_mpi_buffer_name) { + tensor->src[i]->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_src_buffers[i], tensor->src[i]->buffer); + } + } +} + + + + GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer_t buf) { + struct ggml_backend_buffer_i mpi_backend_buffer_i = { + /* .get_name = */ ggml_backend_mpi_buffer_name, + /* .free_buffer = */ ggml_backend_mpi_buffer_free_buffer, + /* .get_base = */ ggml_backend_mpi_buffer_get_base, + /* .init_tensor = */ (buf->iface.init_tensor != nullptr) ? ggml_backend_mpi_buffer_init_tensor : nullptr, + /* .set_tensor = */ ggml_backend_mpi_buffer_set_tensor, + /* .get_tensor = */ ggml_backend_mpi_buffer_get_tensor, + /* .cpy_tensor = */ ggml_backend_mpi_buffer_cpy_tensor, + /* .clear = */ ggml_backend_mpi_buffer_clear, + /* .reset = */ nullptr, + }; + // if (cached_buffer_wrappers.find(buf) != cached_buffer_wrappers.end()) { // fprintf(stderr, "Returning cached buffer with name %s\n", cached_buffer_wrappers[buf]->iface.get_name(cached_buffer_wrappers[buf])); // auto * ret = new ggml_backend_buffer; @@ -892,6 +985,15 @@ GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer // return ret; // } + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + +// fprintf(stderr, "Wrapping buffer %s at rank %d\n", ggml_backend_buffer_name(buf), rank); + + if (buf->iface.get_name == ggml_backend_mpi_buffer_name) { + fprintf(stderr, "WRAPPING AN ALREADY WRAPPED BUFFER: %s\n", ggml_backend_buffer_name(buf)); + GGML_ASSERT(false); + } ggml_backend_buffer_type_t t = ggml_backend_mpi_wrap_buffer_type(buf->buft); @@ -914,28 +1016,48 @@ GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer return buffer; } -bool ggml_backend_mpi_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const struct ggml_tensor * src, struct ggml_tensor * dst) { -// int src_rank = ggml_backend_mpi_buffer_rank(src->buffer); -// int dst_rank = ggml_backend_mpi_buffer_rank(dst->buffer); -// -// auto * ctx = static_cast(backend->context); -// -// if (ctx->remote) { -// return true; -// } -// -// if (src_rank == dst_rank) { -//// src->buffer->iface.cpy_tensor(src->buffer, src, dst); -// return true; -// } -// -// if (src_rank == ggml_backend_mpi_local_rank(backend)) { -// ggml_mpi_tensor_send(src, dst_rank, ctx->comm); -// } else if (dst_rank == ggml_backend_mpi_local_rank(backend)){ -// ggml_mpi_tensor_recv(dst, src_rank, ctx->comm); -// } +bool ggml_backend_mpi_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, struct ggml_tensor * src, struct ggml_tensor * dst) { + int src_rank = ggml_backend_mpi_buffer_rank(src->buffer); + int dst_rank = ggml_backend_mpi_buffer_rank(dst->buffer); + + auto * src_ctx = static_cast(backend_src->context); + auto * dst_ctx = static_cast(backend_dst->context); + + + if (src_ctx->remote && dst_ctx->remote) { + return true; + } + + if (src_rank == dst_rank) { + src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer); + if (src->view_src) { + src->view_src->buffer = ggml_backend_mpi_buffer_unwrap(src->view_src->buffer); + } + dst->buffer = ggml_backend_mpi_buffer_unwrap(dst->buffer); + if (dst->view_src) { + dst->view_src->buffer = ggml_backend_mpi_buffer_unwrap(dst->view_src->buffer); + } + ggml_backend_tensor_copy_async(((ggml_mpi_context *) backend_src->context)->backends.front(),((ggml_mpi_context *) backend_dst->context)->backends.front(), src, dst); + + src->buffer = ggml_backend_mpi_wrap_buffer(src->buffer); + if (src->view_src) { + src->view_src->buffer = ggml_backend_mpi_wrap_buffer(src->view_src->buffer); + } + dst->buffer = ggml_backend_mpi_wrap_buffer(dst->buffer); + if (dst->view_src) { + dst->view_src->buffer = ggml_backend_mpi_wrap_buffer(dst->view_src->buffer); + } +// src->buffer->iface.cpy_tensor(src->buffer, src, dst); + return true; + } + + if (src_rank == ggml_backend_mpi_local_rank(backend_src)) { + ggml_mpi_tensor_send(src, dst_rank, dst_ctx->comm); + } else if (dst_rank == ggml_backend_mpi_local_rank(backend_dst)){ + ggml_mpi_tensor_recv(dst, src_rank, src_ctx->comm); + } // fprintf(stderr, "ATTEMPTING ASYNC COPY FOR SRC TENSOR %s TO DST TENSOR %s WITH SRC BACKEND %s AND DST BACKEND %s\n", src->name, dst->name, ggml_backend_name(backend_src), ggml_backend_name(backend_dst)); - return false; + return true; } @@ -947,11 +1069,31 @@ void ggml_backend_mpi_set_tensor_async(ggml_backend_t backend, struct ggml_tenso GGML_ASSERT(ctx->rank == dst_rank); - ggml_mpi_tensor_send(dst, data, ctx->rank, ctx->comm); + if (dst_rank == ggml_backend_mpi_buffer_local_rank(dst->buffer)) { + auto * old_buffer = dst->buffer; + dst->buffer = ggml_backend_mpi_buffer_unwrap(dst->buffer); + if (dst->view_src) { + dst->view_src->buffer = ggml_backend_mpi_buffer_unwrap(dst->view_src->buffer); + } + ggml_backend_tensor_set_async(((ggml_mpi_context *) backend->context)->backends.front(), dst, data, offset, size); + dst->buffer = ggml_backend_mpi_wrap_buffer(dst->buffer); + if (dst->view_src) { + dst->view_src->buffer = ggml_backend_mpi_wrap_buffer(dst->view_src->buffer); + } +// dst->buffer = old_buffer; + } else { + + ggml_mpi_tensor_send(dst, data, ctx->rank, ctx->comm); + } } +GGML_CALL static void ggml_backend_mpi_synchronize(ggml_backend_t backend) { + if (!((ggml_mpi_context*)backend->context)->remote) { + ggml_backend_synchronize(((ggml_mpi_context*)backend->context)->backends.front()); + } +} ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t num_backends, int rank) { @@ -960,10 +1102,11 @@ ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t n ggml_mpi_context * ctx = ggml_mpi_init(); std::vector wrapped_backends_v; + for (size_t i = 0; i < num_backends; i++) { + wrapped_backends_v.push_back(wrapped_backends[i]); + } if (ctx->rank == rank) { - for (size_t i = 0; i < num_backends; i++) { - wrapped_backends_v.push_back(wrapped_backends[i]); - } + } else { ctx->remote = true; } @@ -974,14 +1117,20 @@ ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t n /* .free = */ ggml_backend_mpi_free, /* .get_default_buffer_type = */ ggml_backend_mpi_get_default_buffer_type, /* .set_tensor_async = */ ggml_backend_mpi_set_tensor_async, - /* .get_tensor_async = */ NULL, + /* .get_tensor_async = */ nullptr, /* .cpy_tensor_async = */ ggml_backend_mpi_cpy_tensor_async, - /* .synchronize = */ NULL, - /* .graph_plan_create = */ NULL, - /* .graph_plan_free = */ NULL, - /* .graph_plan_compute = */ NULL, + /* .synchronize = */ ggml_backend_mpi_synchronize, + /* .graph_plan_create = */ nullptr, + /* .graph_plan_free = */ nullptr, + /* .graph_plan_compute = */ nullptr, /* .graph_compute = */ ggml_backend_mpi_graph_compute, /* .supports_op = */ ggml_backend_mpi_supports_op, + /* .offload_op = */ nullptr, + /* .event_new = */ nullptr, + /* .event_free = */ nullptr, + /* .event_record = */ nullptr, + /* .event_wait = */ nullptr, + /* .event_synchronize = */ nullptr, }; auto *mpi_backend = new ggml_backend { diff --git a/ggml-mpi.h b/ggml-mpi.h index d988a81e4..d4bcc533a 100644 --- a/ggml-mpi.h +++ b/ggml-mpi.h @@ -99,6 +99,9 @@ struct ggml_mpi_context * ggml_mpi_init(void); GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_backend_buffer_type_t buft); +GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type_cached(ggml_backend_buffer_type_t buft); + + GGML_API GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer_t buf); diff --git a/llama.cpp b/llama.cpp index a6392237b..1f04b8fb7 100644 --- a/llama.cpp +++ b/llama.cpp @@ -1524,6 +1524,12 @@ static ggml_backend_buffer_type_t llama_default_buffer_type_offload(int gpu) { if (buft == nullptr) { buft = llama_default_buffer_type_cpu(true); } + +#ifdef GGML_USE_MPI + else { + buft = ggml_backend_mpi_wrap_buffer_type(buft); + } +#endif return buft; GGML_UNUSED(gpu); @@ -4154,14 +4160,14 @@ static bool llm_load_tensors( // assign the repeating layers for (int64_t i = i_gpu_start; i < n_layer; ++i) { model.buft_layer[i] = { - split_buft, + llama_default_buffer_type_offload(main_gpu), llama_default_buffer_type_offload(main_gpu) }; } // assign the output layer if (n_gpu_layers > n_layer) { model.buft_output = { - split_buft, + llama_default_buffer_type_offload(main_gpu), llama_default_buffer_type_offload(main_gpu) }; } else { @@ -4170,13 +4176,20 @@ static bool llm_load_tensors( } #ifdef GGML_USE_MPI - uint16_t** ranges = ggml_mpi_split_range(model.ctx_mpi, 0, n_layer - 1, node_split); + model.buft_output = llama_default_buffer_type_cpu(true); + + uint16_t** ranges = ggml_mpi_split_range(model.ctx_mpi, 0, n_layer-1, node_split); size_t size = ggml_mpi_size(model.ctx_mpi); for (size_t i = 0; i < size; i++) { - for (uint16_t j = ranges[i][0]; j < ranges[i][1]; j++) { + model.buft_layer[ranges[i][0]].buft = llama_default_buffer_type_cpu(true); + model.buft_layer[ranges[i][0]].buft_matrix = llama_default_buffer_type_cpu(true); + + model.buft_layer[ranges[i][1]].buft = llama_default_buffer_type_cpu(true); + model.buft_layer[ranges[i][1]].buft_matrix = llama_default_buffer_type_cpu(true); + for (uint16_t j = ranges[i][0]; j <= ranges[i][1]; j++) { printf("Setting buffer rank for i %zu and j %d\n", i, j); ggml_backend_mpi_buffer_type_set_rank(model.buft_layer[j].buft, (int)i); ggml_backend_mpi_buffer_type_set_rank(model.buft_layer[j].buft_matrix, (int)i); @@ -5093,6 +5106,9 @@ static bool llm_load_tensors( ggml_backend_buffer_get_base(buf), ggml_backend_buffer_get_size(buf)); } +#endif +#ifdef GGML_USE_MPI + buf = ggml_backend_mpi_wrap_buffer(buf); #endif } #ifdef GGML_USE_METAL @@ -5104,6 +5120,7 @@ static bool llm_load_tensors( } #endif else { + fprintf(stderr, "Allocating context tensors from buffer type %s\n", ggml_backend_buft_name(buft)); buf = ggml_backend_alloc_ctx_tensors_from_buft(ctx, buft); if (buf != nullptr && use_mlock && ggml_backend_buffer_is_host(buf)) { model.mlock_bufs.emplace_back(new llama_mlock); @@ -5117,9 +5134,7 @@ static bool llm_load_tensors( throw std::runtime_error("failed to allocate buffer"); } - #ifdef GGML_USE_MPI - buf = ggml_backend_mpi_wrap_buffer(buf); - #endif + // indicate that this buffer contains weights // this is used by ggml_backend_sched to improve op scheduling -> ops that use a weight are preferably scheduled to the backend that contains the weight ggml_backend_buffer_set_usage(buf, GGML_BACKEND_BUFFER_USAGE_WEIGHTS); @@ -13330,7 +13345,9 @@ struct llama_context * llama_new_context_with_model( std::vector new_backends; for (size_t i = 0; i < ggml_mpi_size(model->ctx_mpi); i++) { - new_backends.push_back(ggml_backend_mpi_init(ctx->backends.data(), ctx->backends.size(), (int) i)); + for (auto & backend : ctx->backends) { + new_backends.push_back(ggml_backend_mpi_init(std::vector{backend, ctx->backend_cpu}.data(), 2, (int) i)); + } } ctx->backends = new_backends; @@ -14507,7 +14524,7 @@ int llama_process_mpi_transaction( llama_kv_cache_seq_div(ctx, 0, 0, 0, 0); break; default: - printf("Unknown operation, exiting\n"); + printf("Unknown operation %d, exiting\n", tag); exit(1); break; } @@ -14545,7 +14562,7 @@ int llama_process_mpi_worker( // } // break; default: - printf("Unknown operation, exiting\n"); + printf("Unknown non-transaction operation %d, exiting\n", tag); exit(1); break; }