diff --git a/llama.cpp b/llama.cpp index eb8178b39..47d79015f 100644 --- a/llama.cpp +++ b/llama.cpp @@ -8471,7 +8471,7 @@ static void llama_convert_tensor_internal( hpx::future fut = hpx::run_as_hpx_thread([&futures, nthread, qtype, block_size, block_size_bytes, blocks_per_thread, spare_blocks, &tensor, &in_buff_offs, &f32_output, &out_buff_offs]() -> hpx::future { - for (int tnum = 0; tnum < nthread; tnum++) { + for (int tnum = 1; tnum < nthread; tnum++) { size_t thr_blocks = blocks_per_thread + (tnum == nthread - 1 ? spare_blocks : 0); // num blocks for this thread size_t thr_elems = thr_blocks * block_size; // number of elements for this thread size_t thr_block_bytes = thr_blocks * block_size_bytes; // number of input bytes for this thread @@ -8490,6 +8490,25 @@ static void llama_convert_tensor_internal( out_buff_offs += thr_elems; } + { + size_t thr_blocks = blocks_per_thread + (0 == nthread - 1 ? spare_blocks : 0); // num blocks for this thread + size_t thr_elems = thr_blocks * block_size; // number of elements for this thread + size_t thr_block_bytes = thr_blocks * block_size_bytes; // number of input bytes for this thread + + auto compute = [qtype] (ggml_type typ, uint8_t * inbuf, float * outbuf, int nels) { + if (typ == GGML_TYPE_F16) { + ggml_fp16_to_fp32_row((ggml_fp16_t *)inbuf, outbuf, nels); + } else { + qtype.to_float(inbuf, outbuf, nels); + } + }; + + compute(tensor->type, (uint8_t *) tensor->data + in_buff_offs, f32_output + out_buff_offs, thr_elems); + + in_buff_offs += thr_block_bytes; + out_buff_offs += thr_elems; + } + hpx::wait_all(futures); return hpx::make_ready_future(); }); @@ -8772,12 +8791,13 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s #if defined(GGML_USE_HPX) std::vector> futures; - futures.reserve(nthread); + futures.reserve(nthread-1); + hpx::mutex mutex; #else std::vector workers; workers.reserve(nthread); -#endif std::mutex mutex; +#endif int idx = 0; @@ -8875,7 +8895,7 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s work.resize(nelements * 4); // upper bound on size } new_data = work.data(); - std::array hist_cur = {}; + std::vector hist_cur = {}; static const int chunk_size = 32 * 512; const int nchunk = (nelements + chunk_size - 1)/chunk_size; @@ -8885,6 +8905,51 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s } else { size_t counter = 0; new_size = 0; + +#if defined(GGML_USE_HPX) + std::vector> thread_local_hist(nthread_use); + std::vector local_sizes(nthread_use, 0); + std::vector counters(nthread_use, counter); + std::generate(counters.begin(), counters.end(), 0, [chunk_size, n = 0]() mutable { return (++n) * chunk_size; }); + + std::function()> computefn = + [&new_size, new_type, f32_data, new_data, nelements](const std::size_t thread, const std::size_t counter) -> hpx::future { + + auto & local_hist = thread_local_hist[thread]; + std::size_t & local_size = local_sizes[thread]; + std:size_t first = counter; + + while(true) { + first = counter; + if (first >= nelements) { + if (local_size > 0) { + for (int j=0; j(); + } + + hpx::future this_fut = compute(0, counters[0]); + for (int it = 1; it < nthread_use - 1; ++it) { + futures.push_back(hpx::run_as_hpx_thread(compute, it, counters[it])); + } + hpx::wait_all(futures); + this_fut.wait(); + for(auto & local_hist : thread_local_hist) { + for(auto j = 0; j < int(local_hist.size()); ++j) { + hist_cur[j] += local_hist[j]; + } + } + + new_size = std::reduce(local_sizes.begin(), local_sizes.end(), new_size, std::plus{}); + futures.clear(); +#else auto compute = [&mutex, &counter, &hist_cur, &new_size, new_type, f32_data, new_data, nelements]() { std::array local_hist = {}; size_t local_size = 0; @@ -8905,14 +8970,7 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s local_size += ggml_quantize_chunk(new_type, f32_data, new_data, first, last - first, local_hist.data()); } }; -#if defined(GGML_USE_HPX) - for (int it = 0; it < nthread_use - 1; ++it) { - futures.push_back(hpx::async(compute)); - } - compute(); - hpx::wait_all(futures); - futures.clear(); -#else + for (int it = 0; it < nthread_use - 1; ++it) { workers.emplace_back(compute); }