trying to remove mutex/lock from parallel region
This commit is contained in:
parent
1cdfdb34e0
commit
beb28d68ab
1 changed files with 70 additions and 12 deletions
82
llama.cpp
82
llama.cpp
|
@ -8471,7 +8471,7 @@ static void llama_convert_tensor_internal(
|
||||||
hpx::future<void> fut =
|
hpx::future<void> fut =
|
||||||
hpx::run_as_hpx_thread([&futures, nthread, qtype, block_size, block_size_bytes, blocks_per_thread, spare_blocks, &tensor, &in_buff_offs, &f32_output, &out_buff_offs]() -> hpx::future<void>
|
hpx::run_as_hpx_thread([&futures, nthread, qtype, block_size, block_size_bytes, blocks_per_thread, spare_blocks, &tensor, &in_buff_offs, &f32_output, &out_buff_offs]() -> hpx::future<void>
|
||||||
{
|
{
|
||||||
for (int tnum = 0; tnum < nthread; tnum++) {
|
for (int tnum = 1; tnum < nthread; tnum++) {
|
||||||
size_t thr_blocks = blocks_per_thread + (tnum == nthread - 1 ? spare_blocks : 0); // num blocks for this thread
|
size_t thr_blocks = blocks_per_thread + (tnum == nthread - 1 ? spare_blocks : 0); // num blocks for this thread
|
||||||
size_t thr_elems = thr_blocks * block_size; // number of elements for this thread
|
size_t thr_elems = thr_blocks * block_size; // number of elements for this thread
|
||||||
size_t thr_block_bytes = thr_blocks * block_size_bytes; // number of input bytes for this thread
|
size_t thr_block_bytes = thr_blocks * block_size_bytes; // number of input bytes for this thread
|
||||||
|
@ -8490,6 +8490,25 @@ static void llama_convert_tensor_internal(
|
||||||
out_buff_offs += thr_elems;
|
out_buff_offs += thr_elems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
size_t thr_blocks = blocks_per_thread + (0 == nthread - 1 ? spare_blocks : 0); // num blocks for this thread
|
||||||
|
size_t thr_elems = thr_blocks * block_size; // number of elements for this thread
|
||||||
|
size_t thr_block_bytes = thr_blocks * block_size_bytes; // number of input bytes for this thread
|
||||||
|
|
||||||
|
auto compute = [qtype] (ggml_type typ, uint8_t * inbuf, float * outbuf, int nels) {
|
||||||
|
if (typ == GGML_TYPE_F16) {
|
||||||
|
ggml_fp16_to_fp32_row((ggml_fp16_t *)inbuf, outbuf, nels);
|
||||||
|
} else {
|
||||||
|
qtype.to_float(inbuf, outbuf, nels);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
compute(tensor->type, (uint8_t *) tensor->data + in_buff_offs, f32_output + out_buff_offs, thr_elems);
|
||||||
|
|
||||||
|
in_buff_offs += thr_block_bytes;
|
||||||
|
out_buff_offs += thr_elems;
|
||||||
|
}
|
||||||
|
|
||||||
hpx::wait_all(futures);
|
hpx::wait_all(futures);
|
||||||
return hpx::make_ready_future<void>();
|
return hpx::make_ready_future<void>();
|
||||||
});
|
});
|
||||||
|
@ -8772,12 +8791,13 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s
|
||||||
|
|
||||||
#if defined(GGML_USE_HPX)
|
#if defined(GGML_USE_HPX)
|
||||||
std::vector<hpx::future<void>> futures;
|
std::vector<hpx::future<void>> futures;
|
||||||
futures.reserve(nthread);
|
futures.reserve(nthread-1);
|
||||||
|
hpx::mutex mutex;
|
||||||
#else
|
#else
|
||||||
std::vector<std::thread> workers;
|
std::vector<std::thread> workers;
|
||||||
workers.reserve(nthread);
|
workers.reserve(nthread);
|
||||||
#endif
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
#endif
|
||||||
|
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
|
|
||||||
|
@ -8875,7 +8895,7 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s
|
||||||
work.resize(nelements * 4); // upper bound on size
|
work.resize(nelements * 4); // upper bound on size
|
||||||
}
|
}
|
||||||
new_data = work.data();
|
new_data = work.data();
|
||||||
std::array<int64_t, 1 << 4> hist_cur = {};
|
std::vector<std::array<int64_t, 1 << 4> hist_cur = {};
|
||||||
|
|
||||||
static const int chunk_size = 32 * 512;
|
static const int chunk_size = 32 * 512;
|
||||||
const int nchunk = (nelements + chunk_size - 1)/chunk_size;
|
const int nchunk = (nelements + chunk_size - 1)/chunk_size;
|
||||||
|
@ -8885,6 +8905,51 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s
|
||||||
} else {
|
} else {
|
||||||
size_t counter = 0;
|
size_t counter = 0;
|
||||||
new_size = 0;
|
new_size = 0;
|
||||||
|
|
||||||
|
#if defined(GGML_USE_HPX)
|
||||||
|
std::vector<std::array<int64_t, 1 << 4>> thread_local_hist(nthread_use);
|
||||||
|
std::vector<std::size_t> local_sizes(nthread_use, 0);
|
||||||
|
std::vector<std::size_t> counters(nthread_use, counter);
|
||||||
|
std::generate(counters.begin(), counters.end(), 0, [chunk_size, n = 0]() mutable { return (++n) * chunk_size; });
|
||||||
|
|
||||||
|
std::function<hpx::future<void>()> computefn =
|
||||||
|
[&new_size, new_type, f32_data, new_data, nelements](const std::size_t thread, const std::size_t counter) -> hpx::future<void> {
|
||||||
|
|
||||||
|
auto & local_hist = thread_local_hist[thread];
|
||||||
|
std::size_t & local_size = local_sizes[thread];
|
||||||
|
std:size_t first = counter;
|
||||||
|
|
||||||
|
while(true) {
|
||||||
|
first = counter;
|
||||||
|
if (first >= nelements) {
|
||||||
|
if (local_size > 0) {
|
||||||
|
for (int j=0; j<int(local_hist.size()); ++j) {
|
||||||
|
local_hist[j] += local_hist[j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
size_t last = std::min(nelements, first + chunk_size);
|
||||||
|
local_size += ggml_quantize_chunk(new_type, f32_data, new_data, first, last - first, local_hist.data());
|
||||||
|
}
|
||||||
|
return hpx::make_ready_future<void>();
|
||||||
|
}
|
||||||
|
|
||||||
|
hpx::future<void> this_fut = compute(0, counters[0]);
|
||||||
|
for (int it = 1; it < nthread_use - 1; ++it) {
|
||||||
|
futures.push_back(hpx::run_as_hpx_thread(compute, it, counters[it]));
|
||||||
|
}
|
||||||
|
hpx::wait_all(futures);
|
||||||
|
this_fut.wait();
|
||||||
|
for(auto & local_hist : thread_local_hist) {
|
||||||
|
for(auto j = 0; j < int(local_hist.size()); ++j) {
|
||||||
|
hist_cur[j] += local_hist[j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
new_size = std::reduce(local_sizes.begin(), local_sizes.end(), new_size, std::plus<std::size_t>{});
|
||||||
|
futures.clear();
|
||||||
|
#else
|
||||||
auto compute = [&mutex, &counter, &hist_cur, &new_size, new_type, f32_data, new_data, nelements]() {
|
auto compute = [&mutex, &counter, &hist_cur, &new_size, new_type, f32_data, new_data, nelements]() {
|
||||||
std::array<int64_t, 1 << 4> local_hist = {};
|
std::array<int64_t, 1 << 4> local_hist = {};
|
||||||
size_t local_size = 0;
|
size_t local_size = 0;
|
||||||
|
@ -8905,14 +8970,7 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s
|
||||||
local_size += ggml_quantize_chunk(new_type, f32_data, new_data, first, last - first, local_hist.data());
|
local_size += ggml_quantize_chunk(new_type, f32_data, new_data, first, last - first, local_hist.data());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
#if defined(GGML_USE_HPX)
|
|
||||||
for (int it = 0; it < nthread_use - 1; ++it) {
|
|
||||||
futures.push_back(hpx::async(compute));
|
|
||||||
}
|
|
||||||
compute();
|
|
||||||
hpx::wait_all(futures);
|
|
||||||
futures.clear();
|
|
||||||
#else
|
|
||||||
for (int it = 0; it < nthread_use - 1; ++it) {
|
for (int it = 0; it < nthread_use - 1; ++it) {
|
||||||
workers.emplace_back(compute);
|
workers.emplace_back(compute);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue