make llama_decode async, sync on get_logits

This commit is contained in:
slaren 2024-01-14 05:20:24 +01:00
parent af789e7e93
commit 0068da7fef
5 changed files with 123 additions and 63 deletions

View file

@ -1041,16 +1041,22 @@ struct sql_printer : public printer {
};
static void test_prompt(llama_context * ctx, int n_prompt, int n_past, int n_batch, int n_threads) {
llama_set_n_threads(ctx, n_threads, n_threads);
std::vector<llama_token> tokens(n_prompt, llama_token_bos(llama_get_model(ctx)));
llama_decode(ctx, llama_batch_get_one(tokens.data(), n_prompt, n_past, 0));
GGML_UNUSED(n_batch);
/*
std::vector<llama_token> tokens(n_batch, llama_token_bos(llama_get_model(ctx)));
int n_processed = 0;
llama_set_n_threads(ctx, n_threads, n_threads);
while (n_processed < n_prompt) {
int n_tokens = std::min(n_prompt - n_processed, n_batch);
llama_decode(ctx, llama_batch_get_one(tokens.data(), n_tokens, n_past + n_processed, 0));
n_processed += n_tokens;
}
*/
}
static void test_gen(llama_context * ctx, int n_gen, int n_past, int n_threads) {
@ -1149,12 +1155,12 @@ int main(int argc, char ** argv) {
// warmup run
if (t.n_prompt > 0) {
//test_prompt(ctx, std::min(2, t.n_batch), 0, t.n_batch, t.n_threads);
test_prompt(ctx, std::min(t.n_prompt, 32), 0, t.n_batch, t.n_threads);
test_prompt(ctx, std::min(t.n_batch, std::min(t.n_prompt, 32)), 0, t.n_batch, t.n_threads);
}
if (t.n_gen > 0) {
test_gen(ctx, 1, 0, t.n_threads);
}
llama_get_logits(ctx); // force sync
for (int i = 0; i < params.reps; i++) {
llama_kv_cache_clear(ctx);
@ -1166,6 +1172,8 @@ int main(int argc, char ** argv) {
if (t.n_gen > 0) {
test_gen(ctx, t.n_gen, t.n_prompt, t.n_threads);
}
llama_get_logits(ctx); // force sync
uint64_t t_ns = get_time_ns() - t_start;
t.samples_ns.push_back(t_ns);
}

View file

@ -194,21 +194,21 @@ void ggml_backend_tensor_get_async(ggml_backend_t backend, const struct ggml_ten
void ggml_backend_tensor_set(struct ggml_tensor * tensor, const void * data, size_t offset, size_t size) {
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;
GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(buf != NULL && "tensor buffer not set");
GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(offset + size <= ggml_nbytes(tensor) && "tensor write out of bounds");
tensor->buffer->iface.set_tensor(buf, tensor, data, offset, size);
buf->iface.set_tensor(buf, tensor, data, offset, size);
}
void ggml_backend_tensor_get(const struct ggml_tensor * tensor, void * data, size_t offset, size_t size) {
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;
GGML_ASSERT(buf != NULL && "tensor buffer not set");
GGML_ASSERT(tensor->data != NULL && "tensor not allocated");
GGML_ASSERT(tensor->buffer != NULL && "tensor buffer not set");
GGML_ASSERT(offset + size <= ggml_nbytes(tensor) && "tensor read out of bounds");
tensor->buffer->iface.get_tensor(buf, tensor, data, offset, size);
buf->iface.get_tensor(buf, tensor, data, offset, size);
}
void ggml_backend_synchronize(ggml_backend_t backend) {
@ -1432,6 +1432,12 @@ void ggml_backend_sched_reset(ggml_backend_sched_t sched) {
sched_reset(sched);
}
void ggml_backend_sched_synchronize(ggml_backend_sched_t sched) {
for (int i = 0; i < sched->n_backends; i++) {
ggml_backend_synchronize(sched->backends[i]);
}
}
int ggml_backend_sched_get_n_splits(ggml_backend_sched_t sched) {
return sched->n_splits;
}

View file

@ -168,6 +168,9 @@ extern "C" {
// Reset all assignments and allocators - must be called before using the sched allocators to allocate inputs
GGML_API void ggml_backend_sched_reset(ggml_backend_sched_t sched);
// Synchronize all backends
GGML_API void ggml_backend_sched_synchronize(ggml_backend_sched_t sched);
//
// Utils
//

View file

@ -10724,6 +10724,10 @@ ggml_backend_buffer_type_t ggml_backend_cuda_host_buffer_type() {
return &ggml_backend_cuda_buffer_type_host;
}
static bool ggml_backend_buffer_is_cuda_host(ggml_backend_buffer_t buffer) {
return buffer->buft->iface.get_name == ggml_backend_cuda_host_buffer_type_name;
}
// backend
static const char * ggml_backend_cuda_name(ggml_backend_t backend) {
@ -10747,8 +10751,9 @@ static ggml_backend_buffer_type_t ggml_backend_cuda_get_default_buffer_type(ggml
static void ggml_backend_cuda_set_tensor_async(ggml_backend_t backend, ggml_tensor * tensor, const void * data, size_t offset, size_t size) {
ggml_backend_cuda_context * cuda_ctx = (ggml_backend_cuda_context *)backend->context;
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;
GGML_ASSERT(tensor->buffer->buft == ggml_backend_cuda_buffer_type(cuda_ctx->device) && "unsupported buffer type");
GGML_ASSERT(buf->buft == ggml_backend_cuda_buffer_type(cuda_ctx->device) && "unsupported buffer type");
GGML_ASSERT(tensor->backend == GGML_BACKEND_GPU);
CUDA_CHECK(cudaMemcpyAsync((char *)tensor->data + offset, data, size, cudaMemcpyHostToDevice, g_cudaStreams[cuda_ctx->device][0]));
@ -10756,43 +10761,64 @@ static void ggml_backend_cuda_set_tensor_async(ggml_backend_t backend, ggml_tens
static void ggml_backend_cuda_get_tensor_async(ggml_backend_t backend, const ggml_tensor * tensor, void * data, size_t offset, size_t size) {
ggml_backend_cuda_context * cuda_ctx = (ggml_backend_cuda_context *)backend->context;
ggml_backend_buffer_t buf = tensor->view_src ? tensor->view_src->buffer : tensor->buffer;
GGML_ASSERT(tensor->buffer->buft == ggml_backend_cuda_buffer_type(cuda_ctx->device) && "unsupported buffer type");
GGML_ASSERT(buf->buft == ggml_backend_cuda_buffer_type(cuda_ctx->device) && "unsupported buffer type");
GGML_ASSERT(tensor->backend == GGML_BACKEND_GPU);
CUDA_CHECK(cudaMemcpyAsync(data, (const char *)tensor->data + offset, size, cudaMemcpyDeviceToHost, g_cudaStreams[cuda_ctx->device][0]));
}
static bool ggml_backend_cuda_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const ggml_tensor * src, ggml_tensor * dst) {
if (!ggml_backend_is_cuda(backend_src) || !ggml_backend_is_cuda(backend_dst)) {
if (!ggml_backend_is_cuda(backend_src) && !ggml_backend_is_cuda(backend_dst)) {
printf("not cuda either %s -> %s\n", src->name, dst->name);
return false;
}
if (!ggml_backend_buffer_is_cuda(src->buffer) || !ggml_backend_buffer_is_cuda(dst->buffer)) {
// host -> device
if (ggml_backend_buffer_is_cuda_host(src->buffer) && ggml_backend_buffer_is_cuda(dst->buffer)) {
ggml_backend_cuda_context * cuda_ctx_dst = (ggml_backend_cuda_context *)backend_dst->context;
CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(dst), cudaMemcpyHostToDevice, g_cudaStreams[cuda_ctx_dst->device][0]));
return true;
}
// device -> host
if (ggml_backend_buffer_is_cuda_host(dst->buffer) && ggml_backend_buffer_is_cuda(src->buffer)) {
ggml_backend_cuda_context * cuda_ctx_src = (ggml_backend_cuda_context *)backend_src->context;
CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(dst), cudaMemcpyDeviceToHost, g_cudaStreams[cuda_ctx_src->device][0]));
return true;
}
if (!ggml_backend_buffer_is_cuda(src->buffer)) {
return false;
}
if (!ggml_backend_buffer_is_cuda(dst->buffer)) {
return false;
}
// device -> device
ggml_backend_cuda_context * cuda_ctx_src = (ggml_backend_cuda_context *)backend_src->context;
ggml_backend_cuda_context * cuda_ctx_dst = (ggml_backend_cuda_context *)backend_dst->context;
if (backend_src == backend_dst) {
CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(dst), cudaMemcpyDeviceToDevice, g_cudaStreams[cuda_ctx_dst->device][0]));
} else {
if (backend_src != backend_dst) {
//printf("async copy between devices %s, %d -> %d\n", src->name, cuda_ctx_src->device, cuda_ctx_dst->device);
cudaDeviceSynchronize();
// TODO: reuse event?
cudaEvent_t event;
CUDA_CHECK(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
// record event on src stream
CUDA_CHECK(cudaEventRecord(event, g_cudaStreams[cuda_ctx_src->device][0]));
// wait on dst stream
CUDA_CHECK(cudaStreamWaitEvent(g_cudaStreams[cuda_ctx_dst->device][0], event, 0));
// copy
CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(dst), cudaMemcpyDeviceToDevice, g_cudaStreams[cuda_ctx_dst->device][0]));
CUDA_CHECK(cudaEventDestroy(event));
return true;
}
return false;
// copy
CUDA_CHECK(cudaMemcpyAsync(dst->data, src->data, ggml_nbytes(dst), cudaMemcpyDeviceToDevice, g_cudaStreams[cuda_ctx_dst->device][0]));
return true;
}
static void ggml_backend_cuda_synchronize(ggml_backend_t backend) {

101
llama.cpp
View file

@ -1619,6 +1619,8 @@ struct llama_context {
for (ggml_backend_t backend : backends) {
ggml_backend_free(backend);
}
ggml_backend_buffer_free(buf_logits);
}
llama_cparams cparams;
@ -1649,7 +1651,12 @@ struct llama_context {
int32_t n_eval = 0; // number of eval calls
// decode output (2-dimensional array: [n_tokens][n_vocab])
std::vector<float> logits;
//std::vector<float> logits;
ggml_backend_buffer_t buf_logits = nullptr;
size_t logits_size = 0;
float * logits = nullptr;
#ifndef NDEBUG
// guard against access to unset logits
std::vector<bool> logits_valid;
@ -1666,6 +1673,7 @@ struct llama_context {
ggml_tallocr * alloc_cpu = nullptr;
std::vector<ggml_backend_buffer_t> buf_cpu_ub;
size_t buf_cpu_ub_cur = 0;
// temporary buffer for copying data to/from the backend
std::vector<no_init<uint8_t>> buf_copy;
@ -6197,8 +6205,9 @@ static int llama_decode_internal(
const int64_t n_vocab = hparams.n_vocab;
auto & logits_out = lctx.logits;
auto * logits_out = lctx.logits;
/*
if (all_batch.logits) {
logits_out.resize(n_vocab * n_tokens_all);
} else if (lctx.logits_all) {
@ -6206,6 +6215,7 @@ static int llama_decode_internal(
} else {
logits_out.resize(n_vocab);
}
*/
#ifndef NDEBUG
auto & logits_valid = lctx.logits_valid;
@ -6215,7 +6225,9 @@ static int llama_decode_internal(
logits_out.clear();
#endif
const uint32_t n_microbatch = 256;
const uint32_t n_microbatch = cparams.n_batch;
//const uint32_t n_microbatch = 256;
for (uint32_t cur_token = 0; cur_token < n_tokens_all; cur_token += n_microbatch) {
const uint32_t n_tokens = std::min(n_microbatch, n_tokens_all - cur_token);
@ -6287,18 +6299,16 @@ static int llama_decode_internal(
//printf("kv_self.n = %5d, kv_self.used = %5d, kv_self.head = %5d\n", kv_self.n, kv_self.used, kv_self.head);
int i_ub = cur_token / n_microbatch;
size_t n_buf = lctx.buf_cpu_ub.size();
if (i_ub != 0 && i_ub % n_buf == 0) {
// sync all backends
// change the CPU compute buffer to avoid overwriting inputs
size_t buf_cpu_ub_cur = lctx.buf_cpu_ub_cur;
lctx.buf_cpu_ub_cur = (lctx.buf_cpu_ub_cur + 1) % lctx.buf_cpu_ub.size();
if (buf_cpu_ub_cur == 0 && cur_token > 0) {
// sync all backends to ensure that the current buffer is not in use
printf("not enough buffers, syncing now\n");
// TODO: ggml_backend_sched_synchronize()
for (auto * backend : lctx.backends) {
ggml_backend_synchronize(backend);
}
ggml_backend_sched_synchronize(lctx.sched);
}
ggml_tallocr_set_buffer(lctx.alloc_cpu, lctx.buf_cpu_ub[i_ub % n_buf]);
ggml_tallocr_set_buffer(lctx.alloc_cpu, lctx.buf_cpu_ub.at(buf_cpu_ub_cur));
ggml_backend_sched_reset(lctx.sched);
@ -6343,8 +6353,6 @@ static int llama_decode_internal(
ggml_backend_sched_graph_compute(lctx.sched, gf);
// fprintf(stderr, "splits: %d\n", ggml_backend_sched_get_n_splits(lctx.sched));
#ifdef GGML_USE_MPI
ggml_mpi_graph_compute_post(lctx.ctx_mpi, gf, n_layer);
#endif
@ -6384,34 +6392,28 @@ static int llama_decode_internal(
ggml_backend_t res_backend = ggml_backend_sched_get_node_backend(lctx.sched, res);
GGML_ASSERT(res_backend != nullptr);
if (batch.logits) {
//logits_out.resize(n_vocab * n_tokens);
for (uint32_t i = 0; i < n_tokens; i++) {
if (batch.logits[i] == 0) {
continue;
}
ggml_backend_tensor_get_async(res_backend, res, logits_out.data() + n_vocab*(cur_token + i), n_vocab*i*sizeof(float), n_vocab*sizeof(float));
ggml_backend_tensor_get_async(res_backend, res, logits_out + n_vocab*(cur_token + i), n_vocab*i*sizeof(float), n_vocab*sizeof(float));
#ifndef NDEBUG
logits_valid[i] = true;
logits_valid[cur_token + i] = true;
#endif
}
} else if (lctx.logits_all) {
//logits_out.resize(n_vocab * n_tokens);
//ggml_backend_tensor_get_async(res_backend, res, logits_out.data(), 0, n_vocab*n_tokens*sizeof(float));
ggml_backend_tensor_get_async(res_backend, res, logits_out.data() + cur_token*n_vocab, 0, n_vocab*n_tokens*sizeof(float));
ggml_backend_tensor_get_async(res_backend, res, logits_out + n_vocab*cur_token, 0, n_vocab*n_tokens*sizeof(float));
#ifndef NDEBUG
std::fill(logits_valid.begin(), logits_valid.end(), true);
std::fill(logits_valid.begin() + cur_token, logits_valid.begin() + cur_token + n_tokens, true);
#endif
} else {
if (cur_token + n_tokens >= n_tokens_all) {
//logits_out.resize(n_vocab);
ggml_backend_tensor_get_async(res_backend, res, logits_out.data(), n_vocab*(n_tokens - 1)*sizeof(float), n_vocab*sizeof(float));
}
//ggml_backend_tensor_get_async(res_backend, res, logits_out.data(), n_vocab*(n_tokens - 1)*sizeof(float), n_vocab*sizeof(float));
ggml_backend_tensor_get_async(res_backend, res, logits_out, n_vocab*(n_tokens - 1)*sizeof(float), n_vocab*sizeof(float));
#ifndef NDEBUG
logits_valid[0] = true;
logits_valid[0] = true;
#endif
}
}
//ggml_backend_synchronize(res_backend);
}
// FIXME
@ -6423,15 +6425,9 @@ static int llama_decode_internal(
embedding_out.resize(n_embd);
ggml_backend_t embeddings_backend = ggml_backend_sched_get_node_backend(lctx.sched, embeddings);
ggml_backend_tensor_get_async(embeddings_backend, embeddings, embedding_out.data(), (n_embd*(n_tokens - 1))*sizeof(float), n_embd*sizeof(float));
//ggml_backend_synchronize(embeddings_backend);
}
}
// TODO: ggml_backend_sched_synchronize()
for (auto * backend : lctx.backends) {
ggml_backend_synchronize(backend);
}
// measure the performance only for the single-token evals
if (n_tokens_all == 1) {
lctx.t_eval_us += ggml_time_us() - t_start_us;
@ -9433,7 +9429,8 @@ struct llama_context * llama_new_context_with_model(
}
// resized during inference, reserve maximum
ctx->logits.reserve(hparams.n_vocab*cparams.n_batch);
//ctx->logits.reserve(hparams.n_vocab*cparams.n_batch);
ctx->logits_size = hparams.n_vocab*cparams.n_ctx;
if (params.embedding){
ctx->embedding.resize(hparams.n_embd);
@ -9479,6 +9476,18 @@ struct llama_context * llama_new_context_with_model(
ggml_backend_buffer_t buf = ggml_backend_buft_alloc_buffer(llama_default_buffer_type_cpu(true), buf_size);
ctx->buf_cpu_ub.push_back(buf);
}
// allocate buffer for logits output
ctx->buf_logits = ggml_backend_buft_alloc_buffer(llama_default_buffer_type_cpu(true), hparams.n_vocab*cparams.n_ctx*sizeof(float));
if (ctx->buf_logits == nullptr) {
LLAMA_LOG_ERROR("%s: failed to allocate logits buffer\n", __func__);
llama_free(ctx);
return nullptr;
}
ctx->logits = (float *) ggml_backend_buffer_get_base(ctx->buf_logits);
ggml_backend_buffer_clear(ctx->buf_logits, 0);
LLAMA_LOG_INFO("%s: logits buffer size = %8.2f MiB, type = %s\n", __func__,
ggml_backend_buffer_get_size(ctx->buf_logits) / 1024.0 / 1024.0,
ggml_backend_buffer_name(ctx->buf_logits));
for (ggml_backend_t backend : ctx->backends) {
ggml_backend_buffer_t buf = ggml_backend_sched_get_buffer(ctx->sched, backend);
@ -9792,7 +9801,7 @@ size_t llama_get_state_size(const struct llama_context * ctx) {
const size_t s_rng = LLAMA_MAX_RNG_STATE;
const size_t s_logits_size = sizeof(size_t);
// assume worst case for logits although only currently set ones are serialized
const size_t s_logits = ctx->logits.capacity() * sizeof(float);
const size_t s_logits = ctx->logits_size * sizeof(float);
const size_t s_embedding_size = sizeof(size_t);
const size_t s_embedding = ctx->embedding.size() * sizeof(float);
const size_t s_kv_size = sizeof(size_t);
@ -9884,12 +9893,12 @@ static void llama_copy_state_data_internal(struct llama_context * ctx, llama_dat
// copy logits
{
const size_t logits_size = ctx->logits.size();
const size_t logits_size = ctx->logits_size;
data_ctx->write(&logits_size, sizeof(logits_size));
if (logits_size) {
data_ctx->write(ctx->logits.data(), logits_size * sizeof(float));
data_ctx->write(ctx->logits, logits_size * sizeof(float));
}
}
@ -9991,12 +10000,12 @@ size_t llama_set_state_data(struct llama_context * ctx, uint8_t * src) {
memcpy(&logits_size, inp, sizeof(logits_size)); inp += sizeof(logits_size);
GGML_ASSERT(ctx->logits.capacity() >= logits_size);
GGML_ASSERT(ctx->logits_size >= logits_size);
if (logits_size) {
ctx->logits.resize(logits_size);
//ctx->logits.resize(logits_size);
memcpy(ctx->logits.data(), inp, logits_size * sizeof(float));
memcpy(ctx->logits, inp, logits_size * sizeof(float));
inp += logits_size * sizeof(float);
}
}
@ -10271,15 +10280,23 @@ int32_t llama_decode(
}
float * llama_get_logits(struct llama_context * ctx) {
return ctx->logits.data();
ggml_backend_sched_synchronize(ctx->sched);
ctx->buf_cpu_ub_cur = 0;
return ctx->logits;
}
float * llama_get_logits_ith(struct llama_context * ctx, int32_t i) {
ggml_backend_sched_synchronize(ctx->sched);
ctx->buf_cpu_ub_cur = 0;
assert(ctx->logits_valid.at(i));
return ctx->logits.data() + i*ctx->model.hparams.n_vocab;
return ctx->logits + i*ctx->model.hparams.n_vocab;
}
float * llama_get_embeddings(struct llama_context * ctx) {
ggml_backend_sched_synchronize(ctx->sched);
ctx->buf_cpu_ub_cur = 0;
return ctx->embedding.data();
}