This commit is contained in:
ngxson 2024-03-02 12:07:38 +01:00
parent 886e68aee9
commit 52186adcbe

122
llama.cpp
View file

@ -11311,6 +11311,47 @@ static void llama_model_quantize_internal(const std::string & fname_inp, const s
} }
} }
// TODO: remove this when #5830 is merged
static int32_t llama_tensor_quantize_internal(enum ggml_type new_type, const float * f32_data, void * new_data, const int chunk_size, int nrows, int n_per_row, int64_t * hist_cur, const float * imatrix, std::vector<std::thread> & workers, const int nthread) {
std::mutex mutex;
int counter = 0;
size_t new_size = 0;
if (nthread < 2) {
// single-thread
return ggml_quantize_chunk(new_type, f32_data, new_data, 0, nrows, n_per_row, hist_cur, imatrix);
}
auto compute = [&mutex, &counter, &hist_cur, &new_size, new_type, f32_data, new_data, chunk_size,
nrows, n_per_row, imatrix]() {
std::array<int64_t, 1 << 4> local_hist = {};
const int nrows_per_chunk = chunk_size / n_per_row;
size_t local_size = 0;
while (true) {
std::unique_lock<std::mutex> lock(mutex);
int first_row = counter; counter += nrows_per_chunk;
if (first_row >= nrows) {
if (local_size > 0) {
for (int j=0; j<int(local_hist.size()); ++j) {
hist_cur[j] += local_hist[j];
}
new_size += local_size;
}
break;
}
lock.unlock();
const int this_nrow = std::min(nrows - first_row, nrows_per_chunk);
local_size += ggml_quantize_chunk(new_type, f32_data, new_data,
first_row * n_per_row, this_nrow, n_per_row, local_hist.data(), imatrix);
}
};
for (int it = 0; it < nthread - 1; ++it) {
workers.emplace_back(compute);
}
compute();
for (auto & w : workers) { w.join(); }
workers.clear();
return new_size;
}
int32_t llama_merge_models(const struct llama_merge_config * config) { int32_t llama_merge_models(const struct llama_merge_config * config) {
#if defined(__linux__) || defined(_WIN32) #if defined(__linux__) || defined(_WIN32)
constexpr bool use_mmap = true; constexpr bool use_mmap = true;
@ -11439,10 +11480,8 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
return ggml_nbytes(tensor); return ggml_nbytes(tensor);
}; };
std::mutex mutex; size_t n_done = 0;
std::condition_variable condition; size_t n_curr = 0;
size_t n_done = 0; // protected by mutex
size_t n_curr = 0; // protected by mutex
auto log_step = [&](const struct ggml_tensor * tensor) { auto log_step = [&](const struct ggml_tensor * tensor) {
n_done++; n_done++;
LLAMA_LOG_INFO("[%4ld/%4ld] %36s - [%s], input type = %6s\n", LLAMA_LOG_INFO("[%4ld/%4ld] %36s - [%s], input type = %6s\n",
@ -11476,33 +11515,13 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
log_step(out_tensor); log_step(out_tensor);
} }
// TODO: allow user to set n_threads
const int n_threads = std::thread::hardware_concurrency(); const int n_threads = std::thread::hardware_concurrency();
int n_running = 0; // protected by mutex std::vector<std::thread> workers;
auto worker_release = [&]() { workers.reserve(n_threads);
{
std::unique_lock<std::mutex> lock(mutex);
n_running--;
}
condition.notify_all();
};
auto worker_acquire = [&]() {
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [&]{
return n_running < n_threads;
});
n_running++;
};
// process function, to be run as thread
// TODO: multi-threading here is done for each tensor (instead of each row like in llama_model_quantize_internal), this is not ideal but still better than single-thread
const size_t n_start = n_curr;
auto process_output_tensor = [&]() {
worker_acquire();
std::unique_lock<std::mutex> lock(mutex);
struct ggml_tensor * out_tensor = output_tensors[n_curr];
const size_t my_number = n_curr++;
lock.unlock();
// process tensors associated to layer
for (auto & out_tensor : output_tensors) {
const size_t n_elements = ggml_nelements(out_tensor); const size_t n_elements = ggml_nelements(out_tensor);
std::vector<no_init<uint8_t>> in_buf; std::vector<no_init<uint8_t>> in_buf;
std::vector<no_init<float>> f32_in_buf; // dequant it internally std::vector<no_init<float>> f32_in_buf; // dequant it internally
@ -11513,6 +11532,10 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
int i_layer_out = get_i_layer(out_name.c_str()); int i_layer_out = get_i_layer(out_name.c_str());
auto layer = config->layers[i_layer_out]; auto layer = config->layers[i_layer_out];
if (i_layer_out < 0) {
continue; // skip non-layer tensors
}
for (size_t i_model = 0; i_model < config->n_models; i_model++) { for (size_t i_model = 0; i_model < config->n_models; i_model++) {
int src_layer = layer.srcs[i_model]; // source layer int src_layer = layer.srcs[i_model]; // source layer
float scale = layer.scales[i_model]; float scale = layer.scales[i_model];
@ -11521,16 +11544,13 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
if (in_tensor == nullptr) { if (in_tensor == nullptr) {
LLAMA_LOG_ERROR("Cannot find layer name %s from model %ld\n", src_name.c_str(), i_model + 1); LLAMA_LOG_ERROR("Cannot find layer name %s from model %ld\n", src_name.c_str(), i_model + 1);
clean_up(); clean_up();
return; // TODO: not good for multi-threading return -1; // stop
} }
read_tensor_data(in_tensor, *mls[i_model], in_buf); read_tensor_data(in_tensor, *mls[i_model], in_buf);
// dequant the tensor to FP32 // dequant the tensor to FP32
if (in_tensor->type != GGML_TYPE_F32) { if (in_tensor->type != GGML_TYPE_F32) {
//LLAMA_LOG_ERROR("dequant "); //LLAMA_LOG_ERROR("dequant ");
std::vector<std::thread> workers; llama_convert_tensor_internal(in_tensor, f32_in_buf, workers, n_elements, n_threads);
int nthread = 4; // limit for now
workers.reserve(nthread);
llama_convert_tensor_internal(in_tensor, f32_in_buf, workers, n_elements, nthread);
} else { } else {
// if we already have f32, just copy it // if we already have f32, just copy it
//LLAMA_LOG_ERROR("f32_copy "); //LLAMA_LOG_ERROR("f32_copy ");
@ -11552,27 +11572,24 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
std::array<int64_t, 1 << 4> hist_cur = {}; std::array<int64_t, 1 << 4> hist_cur = {};
const int n_per_row = out_tensor->ne[0]; const int n_per_row = out_tensor->ne[0];
const int n_rows = n_elements / n_per_row; const int n_rows = n_elements / n_per_row;
size_t new_size = ggml_quantize_chunk( static const int min_chunk_size = 32 * 512;
const int chunk_size = n_per_row >= min_chunk_size ? n_per_row : n_per_row * ((min_chunk_size + n_per_row - 1)/n_per_row);
size_t new_size = llama_tensor_quantize_internal(
out_tensor->type, out_tensor->type,
f32_out_buf.data(), f32_out_buf.data(),
out_buf.data(), out_buf.data(),
0, // start offset chunk_size,
n_rows, n_rows,
n_per_row, n_per_row,
hist_cur.data(), // unused for now hist_cur.data(), // unused for now
nullptr); nullptr,
workers,
n_threads);
GGML_ASSERT(new_size == out_buf.size()); GGML_ASSERT(new_size == out_buf.size());
} }
// wait until my turn // write tensor to file
{ {
std::unique_lock<std::mutex> lock(mutex);
// if I'm the first, no need to wait for other
if (my_number > n_start) {
condition.wait(lock, [&]{
return n_done == my_number;
});
}
LLAMA_LOG_ERROR("===> INPUT [layer %d] %f %f %f\n", i_layer_out, f32_in_buf[0].value, f32_in_buf[1].value, f32_in_buf[2].value); LLAMA_LOG_ERROR("===> INPUT [layer %d] %f %f %f\n", i_layer_out, f32_in_buf[0].value, f32_in_buf[1].value, f32_in_buf[2].value);
LLAMA_LOG_ERROR("===> OUTPUT [layer %d] %f %f %f\n", i_layer_out, f32_out_buf[0], f32_out_buf[1], f32_out_buf[2]); LLAMA_LOG_ERROR("===> OUTPUT [layer %d] %f %f %f\n", i_layer_out, f32_out_buf[0], f32_out_buf[1], f32_out_buf[2]);
// my turn, write the result! // my turn, write the result!
@ -11581,21 +11598,6 @@ int32_t llama_merge_models(const struct llama_merge_config * config) {
zeros(fout, GGML_PAD(out_buf.size(), GGUF_DEFAULT_ALIGNMENT) - out_buf.size()); zeros(fout, GGML_PAD(out_buf.size(), GGUF_DEFAULT_ALIGNMENT) - out_buf.size());
log_step(out_tensor); log_step(out_tensor);
} }
worker_release();
};
// spawn all thread to process
std::vector<std::thread> threads;
for (auto & out_tensor : output_tensors) {
std::string out_name = ggml_get_name(out_tensor);
int i_layer_out = get_i_layer(out_name.c_str());
if (i_layer_out >= 0) {
// tensor belongs to a layer, start worker thread for it
threads.emplace_back(process_output_tensor);
}
}
for (auto & thread : threads) {
thread.join();
} }
// go back to beginning of file and write the updated meta data // go back to beginning of file and write the updated meta data