Fix non-CPU backend wrapping

This commit is contained in:
Branden Butler 2024-05-20 22:37:14 -05:00
parent 9419190533
commit 19e78d29b0
6 changed files with 305 additions and 120 deletions

View file

@ -86,7 +86,7 @@ extern "C" {
// (optional) asynchronous tensor data access
void (*GGML_CALL set_tensor_async)(ggml_backend_t backend, struct ggml_tensor * tensor, const void * data, size_t offset, size_t size);
void (*GGML_CALL get_tensor_async)(ggml_backend_t backend, const struct ggml_tensor * tensor, void * data, size_t offset, size_t size);
bool (*GGML_CALL cpy_tensor_async)(ggml_backend_t backend_src, ggml_backend_t backend_dst, const struct ggml_tensor * src, struct ggml_tensor * dst);
bool (*GGML_CALL cpy_tensor_async)(ggml_backend_t backend_src, ggml_backend_t backend_dst, struct ggml_tensor * src, struct ggml_tensor * dst);
// (optional) complete all pending operations
void (*GGML_CALL synchronize)(ggml_backend_t backend);

View file

@ -41,6 +41,8 @@ GGML_CALL size_t ggml_backend_buft_get_alloc_size(ggml_backend_buffer_type_t buf
assert(size >= ggml_nbytes(tensor));
return size;
}
// fprintf(stderr, "Getting alloc size of buft (%s)\n", ggml_backend_buft_name(buft));
// fprintf(stderr, "Getting alloc size of tensor (%s)\n", tensor->name);
return ggml_nbytes(tensor);
}
@ -313,6 +315,8 @@ static bool ggml_are_same_layout(const struct ggml_tensor * a, const struct ggml
void ggml_backend_tensor_copy(struct ggml_tensor * src, struct ggml_tensor * dst) {
GGML_ASSERT(ggml_are_same_layout(src, dst) && "cannot copy tensors with different layouts");
// fprintf(stderr, "Attempting copy from %s to %s\n", src->name, dst->name);
if (src == dst) {
return;
}
@ -340,6 +344,9 @@ void ggml_backend_tensor_copy_async(ggml_backend_t backend_src, ggml_backend_t b
return;
}
// fprintf(stderr, "Attempting async copy from %s to %s with src backend %s and dst backend %s\n", src->name, dst->name, ggml_backend_name(backend_src), ggml_backend_name(backend_dst));
if (backend_dst->iface.cpy_tensor_async != NULL) {
if (backend_dst->iface.cpy_tensor_async(backend_src, backend_dst, src, dst)) {
return;
@ -1101,8 +1108,10 @@ static int ggml_backend_sched_backend_from_buffer(ggml_backend_sched_t sched, co
// find highest prio backend that supports the buffer type
for (int i = 0; i < sched->n_backends; i++) {
if (ggml_backend_buft_supports_backend(buffer->buft, sched->backends[i])) {
if (ggml_backend_buft_supports_backend(buffer->buft, sched->backends[i]) && ggml_backend_supports_op(sched->backends[i], tensor)) {
return i;
} else {
// fprintf(stderr, "Buffer type %s does not support backend %s\n", ggml_backend_buft_name(buffer->buft), ggml_backend_name(sched->backends[i]));
}
}
@ -1204,7 +1213,7 @@ static void ggml_backend_sched_print_assignments(ggml_backend_sched_t sched, str
continue;
}
ggml_backend_t tensor_backend = ggml_backend_sched_get_tensor_backend(sched, node);
fprintf(stderr, "node #%3d (%10.10s): %20.20s (%5.5s) [%5.5s %8.8s]:", i, ggml_op_name(node->op), node->name,
fprintf(stderr, "node #%3d (%10.10s): %30.30s (%5.5s) [%30.30s %8.8s]:", i, ggml_op_name(node->op), node->name,
fmt_size(ggml_nbytes(node)), tensor_backend ? ggml_backend_name(tensor_backend) : "NULL", GET_CAUSE(node));
for (int j = 0; j < GGML_MAX_SRC; j++) {
struct ggml_tensor * src = node->src[j];
@ -1212,7 +1221,7 @@ static void ggml_backend_sched_print_assignments(ggml_backend_sched_t sched, str
continue;
}
ggml_backend_t src_backend = ggml_backend_sched_get_tensor_backend(sched, src);
fprintf(stderr, " %20.20s (%5.5s) [%5.5s %8.8s]", src->name,
fprintf(stderr, " %40.40s (%5.5s) [%30.30s %8.8s]", src->name,
fmt_size(ggml_nbytes(src)), src_backend ? ggml_backend_name(src_backend) : "NULL", GET_CAUSE(src));
}
fprintf(stderr, "\n");
@ -1552,12 +1561,17 @@ static void ggml_backend_sched_split_graph(ggml_backend_sched_t sched, struct gg
const size_t input_id = hash_id(input);
struct ggml_tensor * input_cpy = sched->tensor_copies[input_id][split->backend_id][sched->cur_copy];
// fprintf(stderr, "Creating input copy for input %s and copy %s\n", input->name, input_cpy->name);
// add a dependency to the input source so that it is not freed before the copy is done
struct ggml_tensor * input_dep = ggml_view_tensor(sched->ctx, input);
input_dep->src[0] = input;
sched->node_backend_ids[graph_copy->n_nodes] = sched->tensor_backend_id[input_id];
graph_copy->nodes[graph_copy->n_nodes++] = input_dep;
// fprintf(stderr, "Input dep src: %s\n", input_dep->src[0]->name);
// add a dependency to the input copy so that it is allocated at the start of the split
sched->node_backend_ids[graph_copy->n_nodes] = split->backend_id;
graph_copy->nodes[graph_copy->n_nodes++] = input_cpy;
@ -1640,6 +1654,8 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s
struct ggml_tensor * input = split->inputs[j];
struct ggml_tensor * input_cpy = sched->tensor_copies[hash_id(input)][split_backend_id][sched->cur_copy];
// fprintf(stderr, "Split input: %s, input copy: %s\n", input->name, input_cpy->name);
if (input->flags & GGML_TENSOR_FLAG_INPUT) {
// inputs from the user must be copied immediately to prevent the user overwriting the data before the copy is done
if (sched->events[split_backend_id][sched->cur_copy] != NULL) {

View file

@ -11154,7 +11154,7 @@ GGML_CALL static void ggml_backend_cuda_get_tensor_async(ggml_backend_t backend,
CUDA_CHECK(cudaMemcpyAsync(data, (const char *)tensor->data + offset, size, cudaMemcpyDeviceToHost, g_cudaStreams[cuda_ctx->device][0]));
}
GGML_CALL static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const ggml_tensor * src, ggml_tensor * dst) {
GGML_CALL static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, ggml_tensor * src, ggml_tensor * dst) {
GGML_ASSERT(ggml_backend_is_cuda(backend_src) || ggml_backend_is_cuda(backend_dst));
ggml_backend_buffer_t buf_src = src->view_src ? src->view_src->buffer : src->buffer;

View file

@ -44,7 +44,7 @@ void ggml_mpi_backend_init(void) {
have_init = true;
const int buffer_size = 128*1024*1024*8;
send_buffer = calloc(1, buffer_size); // 128MB buffer
// fprintf(stderr, "BUFFER ATTACH RETCODE=%d\n", MPI_Buffer_attach(send_buffer, buffer_size));
fprintf(stderr, "BUFFER ATTACH RETCODE=%d\n", MPI_Buffer_attach(send_buffer, buffer_size));
}
void ggml_mpi_sync_pipelined(
@ -230,7 +230,6 @@ void ggml_mpi_eval_init(
GGML_ASSERT(n_tokens != nullptr);
// FIXME Syncing n_seq_ids causes MPI to throw an invalid buffer error in Bsend
ggml_mpi_sync_pipelined(ctx_mpi, *n_seq_ids, *n_tokens, MPI_INT32_T, GGML_MPI_N_SEQ_IDS);
// We need to know the total number of sequence
@ -308,7 +307,9 @@ static void ggml_mpi_tensor_send(const struct ggml_tensor * t, const void* data,
MPI_Comm_rank(comm, &rank);
// fprintf(stderr, "Sending tensor %s (buffer %s) from %d to %d\n", t->name, ggml_backend_buffer_name(t->buffer), rank, mpi_rank_dst);
const int retval = MPI_Send(data, ggml_nelements(t), mpi_type, mpi_rank_dst, 0, comm);
GGML_ASSERT(rank != mpi_rank_dst);
const int retval = MPI_Bsend(data, ggml_nelements(t), mpi_type, mpi_rank_dst, 0, comm);
GGML_ASSERT(retval == MPI_SUCCESS);
}
@ -330,6 +331,9 @@ static void ggml_mpi_tensor_recv(const struct ggml_tensor * t, void * data, int
int rank;
MPI_Comm_rank(comm, &rank);
// fprintf(stderr, "Receiving tensor %s (buffer %s) from %d at %d\n", t->name, ggml_backend_buffer_name(t->buffer), mpi_rank_src, rank);
GGML_ASSERT(rank != mpi_rank_src);
const int retval = MPI_Recv(data, ggml_nelements(t), mpi_type, mpi_rank_src, MPI_ANY_TAG, comm, &status);
GGML_ASSERT(retval == MPI_SUCCESS);
}
@ -358,7 +362,7 @@ uint16_t** ggml_mpi_split_range(
for (int i=0; i < ctx_mpi->size; i++) {
ranges[i][0] = next_layer;
ranges[i][1] = MIN(end, ranges[i][0] + (node_weights[i] * range_length) + start);
next_layer = ranges[i][1];
next_layer = ranges[i][1]+1;
}
// ranges[0][0] = next_layer;
@ -460,28 +464,44 @@ int ggml_backend_mpi_rank(ggml_backend_t backend) {
return ctx->rank;
}
ggml_backend_buffer_t ggml_backend_mpi_buffer_unwrap(ggml_backend_buffer_t buffer) {
auto * ctx = (ggml_backend_mpi_buffer_context *) buffer->context;
ggml_backend_buffer_t wrapped_buffer = ctx->wrapped_buffer;
wrapped_buffer->usage = buffer->usage;
wrapped_buffer->size = buffer->size;
return wrapped_buffer;
}
GGML_CALL static const char * ggml_backend_mpi_buffer_name(ggml_backend_buffer_t buffer);
ggml_backend_buffer_type_t ggml_backend_mpi_buffer_type_unwrap(ggml_backend_buffer_type_t buft) {
GGML_ASSERT(buft != nullptr);
auto * ctx = (ggml_backend_mpi_buffer_type_context *) buft->context;
GGML_ASSERT(ctx != nullptr);
ggml_backend_buffer_type_t wrapped_buffer_type = ctx->wrapped_buffer_type;
return wrapped_buffer_type;
}
ggml_backend_buffer_t ggml_backend_mpi_buffer_unwrap(ggml_backend_buffer_t buffer) {
GGML_ASSERT(buffer != nullptr);
// fprintf(stderr, "Attempting unwrap of %s\n", ggml_backend_buffer_name(buffer));
// if(buffer->iface.get_name != ggml_backend_mpi_buffer_name) {
// return buffer;
// }
auto * ctx = (ggml_backend_mpi_buffer_context *) buffer->context;
GGML_ASSERT(ctx != nullptr);
ggml_backend_buffer_t wrapped_buffer = ctx->wrapped_buffer;
GGML_ASSERT(wrapped_buffer != nullptr);
wrapped_buffer->usage = buffer->usage;
wrapped_buffer->size = buffer->size;
if (wrapped_buffer->buft->iface.get_name == ggml_backend_mpi_buffer_type_name) {
wrapped_buffer->buft = ggml_backend_mpi_buffer_type_unwrap(wrapped_buffer->buft);
}
return wrapped_buffer;
}
GGML_CALL static const char * ggml_backend_mpi_buffer_name(ggml_backend_buffer_t buffer) {
GGML_ASSERT(buffer != nullptr);
GGML_ASSERT(ggml_backend_mpi_buffer_unwrap(buffer) != nullptr && ggml_backend_mpi_buffer_unwrap(buffer)->iface.get_name != ggml_backend_mpi_buffer_name);
return strdup(
(
@ -529,20 +549,37 @@ GGML_CALL void ggml_backend_mpi_buffer_copy_ctx_from_type(ggml_backend_buffer_ty
}
}
ggml_backend_buffer_type_t ggml_backend_mpi_buffer_type_set_wrapped_buffer_type(ggml_backend_buffer_type_t orig, ggml_backend_buffer_type_t buft) {
if (orig->iface.get_name == ggml_backend_mpi_buffer_type_name) {
((ggml_backend_mpi_buffer_type_context*)(orig->context))->wrapped_buffer_type = buft;
} else {
GGML_ASSERT(!"Original buffer type must be an MPI buffer type.");
}
return orig;
}
ggml_backend_buffer_t ggml_backend_mpi_set_wrapped_buffer(ggml_backend_buffer_t orig, ggml_backend_buffer_t buf) {
GGML_ASSERT(buf != nullptr);
GGML_ASSERT(buf->iface.get_name != ggml_backend_mpi_buffer_name);
if (orig->iface.get_name == ggml_backend_mpi_buffer_name) {
((ggml_backend_mpi_buffer_context*)(orig->context))->wrapped_buffer = buf;
if (orig->buft != nullptr) {
ggml_backend_mpi_buffer_type_set_wrapped_buffer_type(orig->buft, buf->buft);
}
} else {
fprintf(stderr, "Original buffer name: %s\n", ggml_backend_buffer_name(orig));
GGML_ASSERT(!"Original buffer must be an MPI buffer.");
}
return orig;
}
GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) {
struct ggml_mpi_context * ctx = (ggml_mpi_context *) backend->context;
std::vector<ggml_backend_buffer_type_t> backend_buft;
for (auto *curr_backend: ctx->backends) {
if (ggml_backend_is_cpu(curr_backend)) {
// use host buffers for the CPU backend compute buffer
backend_buft.push_back(ggml_backend_cpu_buffer_type());
} else {
backend_buft.push_back(ggml_backend_get_default_buffer_type(curr_backend));
}
}
std::vector<std::pair<ggml_backend_buffer_t, std::vector<ggml_backend_buffer_t>>> old_buffs(
cgraph->n_nodes);
@ -557,6 +594,7 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t
if (src == nullptr) {
break;
}
// fprintf(stderr, "Previous source: %s\n", src->name);
old_buffs[i].second.push_back(src->buffer);
}
@ -584,6 +622,8 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t
if (src->buffer->buft->iface.get_name == ggml_backend_mpi_buffer_type_name) {
n_srcs++;
src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer);
// fprintf(stderr, "After unwrapping source: %s\n", src->name);
}
}
@ -611,19 +651,19 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t
if (!ctx->remote) {
ggml_backend_sched_t sched = ggml_backend_sched_new(ctx->backends.data(), backend_buft.data(),
ggml_backend_sched_t sched = ggml_backend_sched_new(ctx->backends.data(), nullptr,
(int) ctx->backends.size(), cgraph->n_nodes + cgraph->n_leafs + n_srcs, false);
ggml_backend_sched_reserve(sched, cgraph);
ggml_backend_sched_graph_compute_async(sched, cgraph);
// ggml_backend_sched_reserve(sched, cgraph);
ggml_backend_sched_graph_compute(sched, cgraph);
ggml_backend_sched_free(sched);
}
for (int i = 0; i < cgraph->n_nodes; i++) {
cgraph->nodes[i]->buffer = ggml_backend_mpi_wrap_buffer(cgraph->nodes[i]->buffer);
ggml_backend_mpi_buffer_set_rank(cgraph->nodes[i]->buffer, ggml_backend_mpi_buffer_rank(old_buffs[i].first));
if (cgraph->nodes[i]->buffer->iface.get_name != ggml_backend_mpi_buffer_name) {
cgraph->nodes[i]->buffer = ggml_backend_mpi_set_wrapped_buffer(old_buffs[i].first, cgraph->nodes[i]->buffer);
}
for (int iter = 0; iter < GGML_MAX_SRC; iter++) {
@ -632,20 +672,22 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t
break;
}
// fprintf(stderr, "After compute src: %s\n", src_node->name);
if (src_node->buffer->iface.get_name == ggml_backend_mpi_buffer_name) {
continue;
}
src_node->buffer = ggml_backend_mpi_wrap_buffer(src_node->buffer);
src_node->buffer = ggml_backend_mpi_set_wrapped_buffer(old_buffs[i].second[iter], src_node->buffer);
// fprintf(stderr, "After setting wrapped buffer src: %s\n", src_node->name);
ggml_backend_mpi_buffer_set_rank(src_node->buffer, ggml_backend_mpi_buffer_rank(old_buffs[i].second[iter]));
}
if(cgraph->nodes[i]->view_src != nullptr && cgraph->nodes[i]->view_src->buffer->buft != nullptr) {
if (old_view_buffs[i] != nullptr) {
if (old_view_buffs[i]->iface.get_name == ggml_backend_mpi_buffer_name) {
ggml_backend_mpi_buffer_set_rank(cgraph->nodes[i]->view_src->buffer,
ggml_backend_mpi_buffer_rank(old_view_buffs[i]));
if (old_view_buffs[i]->iface.get_name == ggml_backend_mpi_buffer_name && cgraph->nodes[i]->view_src->buffer->iface.get_name != ggml_backend_mpi_buffer_name) {
cgraph->nodes[i]->view_src->buffer = ggml_backend_mpi_set_wrapped_buffer(old_view_buffs[i], cgraph->nodes[i]->view_src->buffer);
}
}
}
@ -655,6 +697,7 @@ GGML_CALL static enum ggml_status ggml_backend_mpi_graph_compute(ggml_backend_t
// FIXME check if this is correct or not (it's probably not)
for (int i = 0; i < cgraph->n_leafs; i++) {
GGML_ASSERT(false);
cgraph->leafs[i]->buffer = ggml_backend_mpi_wrap_buffer(cgraph->leafs[i]->buffer);
ggml_backend_mpi_buffer_type_set_rank(cgraph->leafs[i]->buffer->buft, ctx->rank);
}
@ -681,28 +724,15 @@ static void ggml_backend_mpi_free(ggml_backend_t backend) {
static ggml_backend_buffer_type_t ggml_backend_mpi_get_default_buffer_type(ggml_backend_t backend) {
auto * ctx = static_cast<ggml_mpi_context *>(backend->context);
if (ctx->backends.empty()) {
auto * buff = ggml_backend_mpi_wrap_buffer_type(ggml_backend_cpu_buffer_type());
ggml_backend_mpi_buffer_type_set_rank(buff, ctx->rank);
return buff;
}
auto * buff = ggml_backend_mpi_wrap_buffer_type(ctx->backends.back()->iface.get_default_buffer_type(ctx->backends.back()));
auto * buff = ggml_backend_mpi_wrap_buffer_type(ctx->backends.front()->iface.get_default_buffer_type(ctx->backends.front()));
ggml_backend_mpi_buffer_type_set_rank(buff, ctx->rank);
return buff;
}
GGML_CALL static bool ggml_backend_mpi_supports_op(ggml_backend_t backend, const struct ggml_tensor * op) {
switch (op->op) {
case GGML_OP_CPY:
return op->type != GGML_TYPE_IQ2_XXS && op->type != GGML_TYPE_IQ2_XS; // missing type_traits.from_float
case GGML_OP_MUL_MAT:
return op->src[1]->type == GGML_TYPE_F32 || op->src[1]->type == ggml_internal_get_type_traits(op->src[0]->type).vec_dot_type;
default:
return true;
}
GGML_UNUSED(backend);
return ggml_backend_supports_op(((ggml_mpi_context *) backend->context)->backends.front(),op);
}
@ -760,11 +790,13 @@ GGML_CALL static size_t ggml_backend_mpi_buffer_type_get_max_size(ggml_backend_b
GGML_CALL static size_t ggml_backend_mpi_buffer_type_get_alloc_size(ggml_backend_buffer_type_t buft, const struct ggml_tensor * tensor) {
// Have to do this instead of calling ggml_backend_type_get_alloc_size because that signature doesn't have const on tensor
return ggml_backend_mpi_buffer_type_unwrap(buft)->iface.get_alloc_size(ggml_backend_mpi_buffer_type_unwrap(buft), tensor);
size_t ret = ggml_backend_mpi_buffer_type_unwrap(buft)->iface.get_alloc_size(ggml_backend_mpi_buffer_type_unwrap(buft), tensor);
return ret;
}
GGML_CALL static bool ggml_backend_mpi_buffer_type_supports_backend(ggml_backend_buffer_type_t buft, ggml_backend_t backend) {
return backend != nullptr && ggml_backend_is_mpi(backend) && ggml_backend_mpi_buffer_type_rank(buft) == ggml_backend_mpi_rank(backend);
return backend != nullptr && ggml_backend_is_mpi(backend) && ggml_backend_mpi_buffer_type_rank(buft) == ggml_backend_mpi_rank(backend)
&& ggml_backend_buft_supports_backend(ggml_backend_mpi_buffer_type_unwrap(buft), ((ggml_mpi_context*)backend->context)->backends.front());
}
GGML_CALL static bool ggml_backend_mpi_buffer_type_is_host(ggml_backend_buffer_type_t buft) {
@ -779,15 +811,12 @@ static std::map<ggml_backend_buffer_t, ggml_backend_buffer_t> cached_buffer_wrap
static std::map<ggml_backend_t *, ggml_backend_t> cached_backends;
GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_backend_buffer_type_t buft) {
// if (cached_wrappers.find(buft) != cached_wrappers.end()) {
// fprintf(stderr, "Returning cached buffer type with name %s\n", cached_wrappers[buft]->iface.get_name(cached_wrappers[buft]));
//
// auto * ret = new ggml_backend_buffer_type;
// *ret = *cached_wrappers[buft];
// return ret;
// }
GGML_ASSERT(buft->iface.get_name != ggml_backend_mpi_buffer_type_name);
ggml_backend_buffer_type_i ggml_backend_mpi_buffer_type_interface = {
/* .get_name = */ ggml_backend_mpi_buffer_type_name,
@ -813,11 +842,25 @@ GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_back
// Set rank to 0 as default
ggml_backend_mpi_buffer_type_set_rank(ggml_backend_wrapped_buffer_type, 0);
cached_wrappers[buft] = ggml_backend_wrapped_buffer_type;
return ggml_backend_wrapped_buffer_type;
}
GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type_cached(ggml_backend_buffer_type_t buft) {
if (cached_wrappers.find(buft) != cached_wrappers.end()) {
// fprintf(stderr, "Returning cached buffer type with name %s\n",
// cached_wrappers[buft]->iface.get_name(cached_wrappers[buft]));
return cached_wrappers[buft];
}
auto * ggml_backend_wrapped_buffer_type = ggml_backend_mpi_wrap_buffer_type(buft);
cached_wrappers[buft] = ggml_backend_wrapped_buffer_type;
return ggml_backend_wrapped_buffer_type;
}
GGML_CALL static void * ggml_backend_mpi_buffer_get_base(ggml_backend_buffer_t buffer) {
@ -846,8 +889,12 @@ GGML_CALL static void ggml_backend_mpi_buffer_get_tensor(ggml_backend_buffer_t b
int src_rank = ggml_backend_mpi_buffer_rank(tensor->buffer);
if (rank != src_rank) {
// if (ggml_backend_mpi_buffer_rank(buffer) != ggml_backend_mpi_buffer_local_rank(buffer)) {
// return;
// }
if (rank != src_rank) {
// fprintf(stderr, "Getting tensor: %s, buffer %s\n", tensor->name, ggml_backend_buffer_name(buffer));
ggml_mpi_tensor_recv(tensor, data, ggml_backend_mpi_buffer_rank(tensor->buffer), ggml_backend_mpi_buffer_get_comm(tensor->buffer));
return;
}
@ -856,7 +903,7 @@ GGML_CALL static void ggml_backend_mpi_buffer_get_tensor(ggml_backend_buffer_t b
}
GGML_CALL static bool ggml_backend_mpi_buffer_cpy_tensor(ggml_backend_buffer_t buffer, const struct ggml_tensor * src, struct ggml_tensor * dst) {
if (ggml_backend_mpi_buffer_rank(src->buffer) == ggml_backend_mpi_buffer_rank(dst->buffer)) {
if (ggml_backend_mpi_buffer_rank(src->buffer) == ggml_backend_mpi_buffer_rank(dst->buffer) && ggml_backend_mpi_buffer_local_rank(buffer) == ggml_backend_mpi_buffer_rank(src->buffer)) {
return ggml_backend_mpi_buffer_unwrap(buffer)->iface.cpy_tensor(ggml_backend_mpi_buffer_unwrap(buffer), src,
dst);
}
@ -868,19 +915,65 @@ GGML_CALL static void ggml_backend_mpi_buffer_clear(ggml_backend_buffer_t buffer
return ggml_backend_mpi_buffer_unwrap(buffer)->iface.clear(ggml_backend_mpi_buffer_unwrap(buffer), value);
}
static struct ggml_backend_buffer_i mpi_backend_buffer_i = {
GGML_CALL static void ggml_backend_mpi_buffer_init_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor) {
// fprintf(stderr, "Init tensor with buffer %s, tensor %s, tensor buffer %s, tensor view src %s, tensor vs buff %s\n",
// ggml_backend_buffer_name(buffer), tensor->name, ggml_backend_buffer_name(tensor->buffer), tensor->view_src !=
// nullptr ? tensor->view_src->name : "", tensor->view_src != nullptr ? ggml_backend_buffer_name(tensor->view_src->buffer) : "");
auto *orig_buffer = tensor->buffer;
tensor->buffer = ggml_backend_mpi_buffer_unwrap(tensor->buffer);
bool view_src_null = tensor->view_src == nullptr;
ggml_backend_buffer_t orig_view_src_buffer = nullptr;
if (!view_src_null) {
orig_view_src_buffer = tensor->view_src->buffer;
tensor->view_src->buffer = ggml_backend_mpi_buffer_unwrap(tensor->view_src->buffer);
}
std::vector<ggml_backend_buffer_t> orig_src_buffers(0);
for (auto & src : tensor->src) {
if (src == nullptr) {
break;
}
orig_src_buffers.push_back(src->buffer);
if (src->buffer != nullptr && src->buffer->iface.get_name == ggml_backend_mpi_buffer_name) {
src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer);
}
}
ggml_backend_buffer_init_tensor(ggml_backend_mpi_buffer_unwrap(buffer), tensor);
tensor->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_buffer, tensor->buffer);
if (!view_src_null) {
tensor->view_src->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_view_src_buffer, tensor->view_src->buffer);
}
for (size_t i = 0; i < orig_src_buffers.size(); i++) {
if (orig_src_buffers[i]->iface.get_name == ggml_backend_mpi_buffer_name) {
tensor->src[i]->buffer = ggml_backend_mpi_set_wrapped_buffer(orig_src_buffers[i], tensor->src[i]->buffer);
}
}
}
GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer_t buf) {
struct ggml_backend_buffer_i mpi_backend_buffer_i = {
/* .get_name = */ ggml_backend_mpi_buffer_name,
/* .free_buffer = */ ggml_backend_mpi_buffer_free_buffer,
/* .get_base = */ ggml_backend_mpi_buffer_get_base,
/* .init_tensor = */ NULL, // no initialization required
/* .init_tensor = */ (buf->iface.init_tensor != nullptr) ? ggml_backend_mpi_buffer_init_tensor : nullptr,
/* .set_tensor = */ ggml_backend_mpi_buffer_set_tensor,
/* .get_tensor = */ ggml_backend_mpi_buffer_get_tensor,
/* .cpy_tensor = */ ggml_backend_mpi_buffer_cpy_tensor,
/* .clear = */ ggml_backend_mpi_buffer_clear,
/* .reset = */ NULL,
};
GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer_t buf) {
/* .reset = */ nullptr,
};
// if (cached_buffer_wrappers.find(buf) != cached_buffer_wrappers.end()) {
// fprintf(stderr, "Returning cached buffer with name %s\n", cached_buffer_wrappers[buf]->iface.get_name(cached_buffer_wrappers[buf]));
@ -892,6 +985,15 @@ GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer
// return ret;
// }
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// fprintf(stderr, "Wrapping buffer %s at rank %d\n", ggml_backend_buffer_name(buf), rank);
if (buf->iface.get_name == ggml_backend_mpi_buffer_name) {
fprintf(stderr, "WRAPPING AN ALREADY WRAPPED BUFFER: %s\n", ggml_backend_buffer_name(buf));
GGML_ASSERT(false);
}
ggml_backend_buffer_type_t t = ggml_backend_mpi_wrap_buffer_type(buf->buft);
@ -914,28 +1016,48 @@ GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer
return buffer;
}
bool ggml_backend_mpi_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const struct ggml_tensor * src, struct ggml_tensor * dst) {
// int src_rank = ggml_backend_mpi_buffer_rank(src->buffer);
// int dst_rank = ggml_backend_mpi_buffer_rank(dst->buffer);
//
// auto * ctx = static_cast<ggml_mpi_context *>(backend->context);
//
// if (ctx->remote) {
// return true;
// }
//
// if (src_rank == dst_rank) {
//// src->buffer->iface.cpy_tensor(src->buffer, src, dst);
// return true;
// }
//
// if (src_rank == ggml_backend_mpi_local_rank(backend)) {
// ggml_mpi_tensor_send(src, dst_rank, ctx->comm);
// } else if (dst_rank == ggml_backend_mpi_local_rank(backend)){
// ggml_mpi_tensor_recv(dst, src_rank, ctx->comm);
// }
bool ggml_backend_mpi_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, struct ggml_tensor * src, struct ggml_tensor * dst) {
int src_rank = ggml_backend_mpi_buffer_rank(src->buffer);
int dst_rank = ggml_backend_mpi_buffer_rank(dst->buffer);
auto * src_ctx = static_cast<ggml_mpi_context *>(backend_src->context);
auto * dst_ctx = static_cast<ggml_mpi_context *>(backend_dst->context);
if (src_ctx->remote && dst_ctx->remote) {
return true;
}
if (src_rank == dst_rank) {
src->buffer = ggml_backend_mpi_buffer_unwrap(src->buffer);
if (src->view_src) {
src->view_src->buffer = ggml_backend_mpi_buffer_unwrap(src->view_src->buffer);
}
dst->buffer = ggml_backend_mpi_buffer_unwrap(dst->buffer);
if (dst->view_src) {
dst->view_src->buffer = ggml_backend_mpi_buffer_unwrap(dst->view_src->buffer);
}
ggml_backend_tensor_copy_async(((ggml_mpi_context *) backend_src->context)->backends.front(),((ggml_mpi_context *) backend_dst->context)->backends.front(), src, dst);
src->buffer = ggml_backend_mpi_wrap_buffer(src->buffer);
if (src->view_src) {
src->view_src->buffer = ggml_backend_mpi_wrap_buffer(src->view_src->buffer);
}
dst->buffer = ggml_backend_mpi_wrap_buffer(dst->buffer);
if (dst->view_src) {
dst->view_src->buffer = ggml_backend_mpi_wrap_buffer(dst->view_src->buffer);
}
// src->buffer->iface.cpy_tensor(src->buffer, src, dst);
return true;
}
if (src_rank == ggml_backend_mpi_local_rank(backend_src)) {
ggml_mpi_tensor_send(src, dst_rank, dst_ctx->comm);
} else if (dst_rank == ggml_backend_mpi_local_rank(backend_dst)){
ggml_mpi_tensor_recv(dst, src_rank, src_ctx->comm);
}
// fprintf(stderr, "ATTEMPTING ASYNC COPY FOR SRC TENSOR %s TO DST TENSOR %s WITH SRC BACKEND %s AND DST BACKEND %s\n", src->name, dst->name, ggml_backend_name(backend_src), ggml_backend_name(backend_dst));
return false;
return true;
}
@ -947,11 +1069,31 @@ void ggml_backend_mpi_set_tensor_async(ggml_backend_t backend, struct ggml_tenso
GGML_ASSERT(ctx->rank == dst_rank);
if (dst_rank == ggml_backend_mpi_buffer_local_rank(dst->buffer)) {
auto * old_buffer = dst->buffer;
dst->buffer = ggml_backend_mpi_buffer_unwrap(dst->buffer);
if (dst->view_src) {
dst->view_src->buffer = ggml_backend_mpi_buffer_unwrap(dst->view_src->buffer);
}
ggml_backend_tensor_set_async(((ggml_mpi_context *) backend->context)->backends.front(), dst, data, offset, size);
dst->buffer = ggml_backend_mpi_wrap_buffer(dst->buffer);
if (dst->view_src) {
dst->view_src->buffer = ggml_backend_mpi_wrap_buffer(dst->view_src->buffer);
}
// dst->buffer = old_buffer;
} else {
ggml_mpi_tensor_send(dst, data, ctx->rank, ctx->comm);
}
}
GGML_CALL static void ggml_backend_mpi_synchronize(ggml_backend_t backend) {
if (!((ggml_mpi_context*)backend->context)->remote) {
ggml_backend_synchronize(((ggml_mpi_context*)backend->context)->backends.front());
}
}
ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t num_backends, int rank) {
@ -960,10 +1102,11 @@ ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t n
ggml_mpi_context * ctx = ggml_mpi_init();
std::vector<ggml_backend_t> wrapped_backends_v;
if (ctx->rank == rank) {
for (size_t i = 0; i < num_backends; i++) {
wrapped_backends_v.push_back(wrapped_backends[i]);
}
if (ctx->rank == rank) {
} else {
ctx->remote = true;
}
@ -974,14 +1117,20 @@ ggml_backend_t ggml_backend_mpi_init(ggml_backend_t * wrapped_backends, size_t n
/* .free = */ ggml_backend_mpi_free,
/* .get_default_buffer_type = */ ggml_backend_mpi_get_default_buffer_type,
/* .set_tensor_async = */ ggml_backend_mpi_set_tensor_async,
/* .get_tensor_async = */ NULL,
/* .get_tensor_async = */ nullptr,
/* .cpy_tensor_async = */ ggml_backend_mpi_cpy_tensor_async,
/* .synchronize = */ NULL,
/* .graph_plan_create = */ NULL,
/* .graph_plan_free = */ NULL,
/* .graph_plan_compute = */ NULL,
/* .synchronize = */ ggml_backend_mpi_synchronize,
/* .graph_plan_create = */ nullptr,
/* .graph_plan_free = */ nullptr,
/* .graph_plan_compute = */ nullptr,
/* .graph_compute = */ ggml_backend_mpi_graph_compute,
/* .supports_op = */ ggml_backend_mpi_supports_op,
/* .offload_op = */ nullptr,
/* .event_new = */ nullptr,
/* .event_free = */ nullptr,
/* .event_record = */ nullptr,
/* .event_wait = */ nullptr,
/* .event_synchronize = */ nullptr,
};
auto *mpi_backend = new ggml_backend {

View file

@ -99,6 +99,9 @@ struct ggml_mpi_context * ggml_mpi_init(void);
GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type(ggml_backend_buffer_type_t buft);
GGML_API GGML_CALL ggml_backend_buffer_type_t ggml_backend_mpi_wrap_buffer_type_cached(ggml_backend_buffer_type_t buft);
GGML_API GGML_CALL ggml_backend_buffer_t ggml_backend_mpi_wrap_buffer(ggml_backend_buffer_t buf);

View file

@ -1524,6 +1524,12 @@ static ggml_backend_buffer_type_t llama_default_buffer_type_offload(int gpu) {
if (buft == nullptr) {
buft = llama_default_buffer_type_cpu(true);
}
#ifdef GGML_USE_MPI
else {
buft = ggml_backend_mpi_wrap_buffer_type(buft);
}
#endif
return buft;
GGML_UNUSED(gpu);
@ -4154,14 +4160,14 @@ static bool llm_load_tensors(
// assign the repeating layers
for (int64_t i = i_gpu_start; i < n_layer; ++i) {
model.buft_layer[i] = {
split_buft,
llama_default_buffer_type_offload(main_gpu),
llama_default_buffer_type_offload(main_gpu)
};
}
// assign the output layer
if (n_gpu_layers > n_layer) {
model.buft_output = {
split_buft,
llama_default_buffer_type_offload(main_gpu),
llama_default_buffer_type_offload(main_gpu)
};
} else {
@ -4170,13 +4176,20 @@ static bool llm_load_tensors(
}
#ifdef GGML_USE_MPI
uint16_t** ranges = ggml_mpi_split_range(model.ctx_mpi, 0, n_layer - 1, node_split);
model.buft_output = llama_default_buffer_type_cpu(true);
uint16_t** ranges = ggml_mpi_split_range(model.ctx_mpi, 0, n_layer-1, node_split);
size_t size = ggml_mpi_size(model.ctx_mpi);
for (size_t i = 0; i < size; i++) {
for (uint16_t j = ranges[i][0]; j < ranges[i][1]; j++) {
model.buft_layer[ranges[i][0]].buft = llama_default_buffer_type_cpu(true);
model.buft_layer[ranges[i][0]].buft_matrix = llama_default_buffer_type_cpu(true);
model.buft_layer[ranges[i][1]].buft = llama_default_buffer_type_cpu(true);
model.buft_layer[ranges[i][1]].buft_matrix = llama_default_buffer_type_cpu(true);
for (uint16_t j = ranges[i][0]; j <= ranges[i][1]; j++) {
printf("Setting buffer rank for i %zu and j %d\n", i, j);
ggml_backend_mpi_buffer_type_set_rank(model.buft_layer[j].buft, (int)i);
ggml_backend_mpi_buffer_type_set_rank(model.buft_layer[j].buft_matrix, (int)i);
@ -5093,6 +5106,9 @@ static bool llm_load_tensors(
ggml_backend_buffer_get_base(buf),
ggml_backend_buffer_get_size(buf));
}
#endif
#ifdef GGML_USE_MPI
buf = ggml_backend_mpi_wrap_buffer(buf);
#endif
}
#ifdef GGML_USE_METAL
@ -5104,6 +5120,7 @@ static bool llm_load_tensors(
}
#endif
else {
fprintf(stderr, "Allocating context tensors from buffer type %s\n", ggml_backend_buft_name(buft));
buf = ggml_backend_alloc_ctx_tensors_from_buft(ctx, buft);
if (buf != nullptr && use_mlock && ggml_backend_buffer_is_host(buf)) {
model.mlock_bufs.emplace_back(new llama_mlock);
@ -5117,9 +5134,7 @@ static bool llm_load_tensors(
throw std::runtime_error("failed to allocate buffer");
}
#ifdef GGML_USE_MPI
buf = ggml_backend_mpi_wrap_buffer(buf);
#endif
// indicate that this buffer contains weights
// this is used by ggml_backend_sched to improve op scheduling -> ops that use a weight are preferably scheduled to the backend that contains the weight
ggml_backend_buffer_set_usage(buf, GGML_BACKEND_BUFFER_USAGE_WEIGHTS);
@ -13330,7 +13345,9 @@ struct llama_context * llama_new_context_with_model(
std::vector<ggml_backend_t> new_backends;
for (size_t i = 0; i < ggml_mpi_size(model->ctx_mpi); i++) {
new_backends.push_back(ggml_backend_mpi_init(ctx->backends.data(), ctx->backends.size(), (int) i));
for (auto & backend : ctx->backends) {
new_backends.push_back(ggml_backend_mpi_init(std::vector<ggml_backend_t>{backend, ctx->backend_cpu}.data(), 2, (int) i));
}
}
ctx->backends = new_backends;
@ -14507,7 +14524,7 @@ int llama_process_mpi_transaction(
llama_kv_cache_seq_div(ctx, 0, 0, 0, 0);
break;
default:
printf("Unknown operation, exiting\n");
printf("Unknown operation %d, exiting\n", tag);
exit(1);
break;
}
@ -14545,7 +14562,7 @@ int llama_process_mpi_worker(
// }
// break;
default:
printf("Unknown operation, exiting\n");
printf("Unknown non-transaction operation %d, exiting\n", tag);
exit(1);
break;
}