From 52186adcbef227c6c37e5a44242b6f5e8ca45a72 Mon Sep 17 00:00:00 2001 From: ngxson Date: Sat, 2 Mar 2024 12:07:38 +0100 Subject: [PATCH] refactor --- llama.cpp | 122 +++++++++++++++++++++++++++--------------------------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/llama.cpp b/llama.cpp index d9e2cc57d..de058bb6b 100644 --- a/llama.cpp +++ b/llama.cpp @@ -11311,6 +11311,47 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s } } +// TODO: remove this when #5830 is merged +static int32_t llama_tensor_quantize_internal(enum ggml_type new_type, const float * f32_data, void * new_data, const int chunk_size, int nrows, int n_per_row, int64_t * hist_cur, const float * imatrix, std::vector & workers, const int nthread) { + std::mutex mutex; + int counter = 0; + size_t new_size = 0; + if (nthread < 2) { + // single-thread + return ggml_quantize_chunk(new_type, f32_data, new_data, 0, nrows, n_per_row, hist_cur, imatrix); + } + auto compute = [&mutex, &counter, &hist_cur, &new_size, new_type, f32_data, new_data, chunk_size, + nrows, n_per_row, imatrix]() { + std::array local_hist = {}; + const int nrows_per_chunk = chunk_size / n_per_row; + size_t local_size = 0; + while (true) { + std::unique_lock lock(mutex); + int first_row = counter; counter += nrows_per_chunk; + if (first_row >= nrows) { + if (local_size > 0) { + for (int j=0; j lock(mutex); - n_running--; - } - condition.notify_all(); - }; - auto worker_acquire = [&]() { - std::unique_lock lock(mutex); - condition.wait(lock, [&]{ - return n_running < n_threads; - }); - n_running++; - }; - - // process function, to be run as thread - // TODO: multi-threading here is done for each tensor (instead of each row like in llama_model_quantize_internal), this is not ideal but still better than single-thread - const size_t n_start = n_curr; - auto process_output_tensor = [&]() { - worker_acquire(); - std::unique_lock lock(mutex); - struct ggml_tensor * out_tensor = output_tensors[n_curr]; - const size_t my_number = n_curr++; - lock.unlock(); + std::vector workers; + workers.reserve(n_threads); + // process tensors associated to layer + for (auto & out_tensor : output_tensors) { const size_t n_elements = ggml_nelements(out_tensor); std::vector> in_buf; std::vector> f32_in_buf; // dequant it internally @@ -11513,6 +11532,10 @@ int32_t llama_merge_models(const struct llama_merge_config * config) { int i_layer_out = get_i_layer(out_name.c_str()); auto layer = config->layers[i_layer_out]; + if (i_layer_out < 0) { + continue; // skip non-layer tensors + } + for (size_t i_model = 0; i_model < config->n_models; i_model++) { int src_layer = layer.srcs[i_model]; // source layer float scale = layer.scales[i_model]; @@ -11521,16 +11544,13 @@ int32_t llama_merge_models(const struct llama_merge_config * config) { if (in_tensor == nullptr) { LLAMA_LOG_ERROR("Cannot find layer name %s from model %ld\n", src_name.c_str(), i_model + 1); clean_up(); - return; // TODO: not good for multi-threading + return -1; // stop } read_tensor_data(in_tensor, *mls[i_model], in_buf); // dequant the tensor to FP32 if (in_tensor->type != GGML_TYPE_F32) { //LLAMA_LOG_ERROR("dequant "); - std::vector workers; - int nthread = 4; // limit for now - workers.reserve(nthread); - llama_convert_tensor_internal(in_tensor, f32_in_buf, workers, n_elements, nthread); + llama_convert_tensor_internal(in_tensor, f32_in_buf, workers, n_elements, n_threads); } else { // if we already have f32, just copy it //LLAMA_LOG_ERROR("f32_copy "); @@ -11552,27 +11572,24 @@ int32_t llama_merge_models(const struct llama_merge_config * config) { std::array hist_cur = {}; const int n_per_row = out_tensor->ne[0]; const int n_rows = n_elements / n_per_row; - size_t new_size = ggml_quantize_chunk( + static const int min_chunk_size = 32 * 512; + const int chunk_size = n_per_row >= min_chunk_size ? n_per_row : n_per_row * ((min_chunk_size + n_per_row - 1)/n_per_row); + size_t new_size = llama_tensor_quantize_internal( out_tensor->type, f32_out_buf.data(), out_buf.data(), - 0, // start offset + chunk_size, n_rows, n_per_row, hist_cur.data(), // unused for now - nullptr); + nullptr, + workers, + n_threads); GGML_ASSERT(new_size == out_buf.size()); } - // wait until my turn + // write tensor to file { - std::unique_lock lock(mutex); - // if I'm the first, no need to wait for other - if (my_number > n_start) { - condition.wait(lock, [&]{ - return n_done == my_number; - }); - } LLAMA_LOG_ERROR("===> INPUT [layer %d] %f %f %f\n", i_layer_out, f32_in_buf[0].value, f32_in_buf[1].value, f32_in_buf[2].value); LLAMA_LOG_ERROR("===> OUTPUT [layer %d] %f %f %f\n", i_layer_out, f32_out_buf[0], f32_out_buf[1], f32_out_buf[2]); // my turn, write the result! @@ -11581,21 +11598,6 @@ int32_t llama_merge_models(const struct llama_merge_config * config) { zeros(fout, GGML_PAD(out_buf.size(), GGUF_DEFAULT_ALIGNMENT) - out_buf.size()); log_step(out_tensor); } - worker_release(); - }; - - // spawn all thread to process - std::vector threads; - for (auto & out_tensor : output_tensors) { - std::string out_name = ggml_get_name(out_tensor); - int i_layer_out = get_i_layer(out_name.c_str()); - if (i_layer_out >= 0) { - // tensor belongs to a layer, start worker thread for it - threads.emplace_back(process_output_tensor); - } - } - for (auto & thread : threads) { - thread.join(); } // go back to beginning of file and write the updated meta data