Introduce ggml_threadpool

Added an API to support explicit management of threadpools.
This commit is contained in:
fmz 2024-05-24 12:04:00 -07:00
parent 74f33adf5f
commit 9a4bdc8c12
33 changed files with 1282 additions and 271 deletions

View file

@ -11,6 +11,17 @@
"CMAKE_INSTALL_RPATH": "$ORIGIN;$ORIGIN/.."
}
},
{
"name": "msvc",
"hidden": true,
"generator": "Visual Studio 17 2022",
"architecture": "ARM",
"binaryDir": "${sourceDir}/build-${presetName}",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "ON",
"CMAKE_INSTALL_RPATH": "$ORIGIN;$ORIGIN/.."
}
},
{ "name": "debug", "hidden": true, "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug" } },
{ "name": "release", "hidden": true, "cacheVariables": { "CMAKE_BUILD_TYPE": "RelWithDebInfo" } },
@ -38,8 +49,8 @@
{ "name": "arm64-windows-llvm-release", "inherits": [ "base", "arm64-windows-llvm", "release" ] },
{ "name": "arm64-windows-llvm+static-release", "inherits": [ "base", "arm64-windows-llvm", "release", "static" ] },
{ "name": "arm64-windows-msvc-debug" , "inherits": [ "base", "arm64-windows-msvc", "debug" ] },
{ "name": "arm64-windows-msvc-release", "inherits": [ "base", "arm64-windows-msvc", "release" ] },
{ "name": "arm64-windows-msvc+static-release", "inherits": [ "base", "arm64-windows-msvc", "release", "static" ] }
{ "name": "arm64-windows-msvc-debug" , "inherits": [ "msvc", "arm64-windows-msvc", "debug" ] },
{ "name": "arm64-windows-msvc-release", "inherits": [ "msvc", "arm64-windows-msvc", "release" ] },
{ "name": "arm64-windows-msvc+static-release", "inherits": [ "msvc", "arm64-windows-msvc", "release", "static" ] }
]
}

View file

@ -218,6 +218,34 @@ void gpt_params_handle_model_default(gpt_params & params) {
}
}
static void postprocess_cpu_params(cpu_params& cpuparams, const cpu_params* role_model = nullptr) {
int32_t n_set = 0;
if (cpuparams.n_threads < 0) {
// Assuming everything about cpuparams is invalid
if (role_model != nullptr) {
cpuparams = *role_model;
} else {
cpuparams.n_threads = cpu_get_num_math();
}
}
for (int32_t i = 0; i < GGML_N_CORES_MAX; i++) {
if (cpuparams.cpumask[i]) {
n_set++;
}
}
if (n_set == 0) {
// You hit the jackpot!
memset(&cpuparams.cpumask[0], 1, GGML_N_CORES_MAX);
n_set = GGML_N_CORES_MAX;
}
if (n_set < cpuparams.n_threads) {
// Not enough set bits, may experience performance issues.
fprintf(stderr, "warn: Not enough set bits in CPU mask (%d) to satisfy requested thread count: %d\n", n_set, cpuparams.n_threads);
}
}
bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
bool invalid_param = false;
std::string arg;
@ -237,6 +265,11 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
}
}
postprocess_cpu_params(params.cpuparams, nullptr);
postprocess_cpu_params(params.cpuparams_batch, &params.cpuparams);
postprocess_cpu_params(params.draft_cpuparams, &params.cpuparams);
postprocess_cpu_params(params.draft_cpuparams_batch, &params.cpuparams_batch);
if (params.prompt_cache_all &&
(params.interactive || params.interactive_first ||
params.instruct)) {
@ -280,6 +313,79 @@ bool gpt_params_parse(int argc, char ** argv, gpt_params & params) {
return result;
}
static bool parse_cpu_range(const std::string & range, bool (&boolmask)[GGML_N_CORES_MAX]) {
size_t dash_loc = range.find('-');
if (dash_loc == std::string::npos) {
fprintf(stderr, "Format of CPU range is invalid! Expected [<start>]-[<end>].\n");
return false;
}
size_t start_i;
size_t end_i;
if (dash_loc == 0) {
start_i = 0;
} else {
start_i = std::stoull(range.substr(0, dash_loc));
if (start_i >= GGML_N_CORES_MAX) {
fprintf(stderr, "Start index out of bounds!\n");
return false;
}
}
if (dash_loc == range.length() - 1) {
end_i = GGML_N_CORES_MAX - 1;
} else {
end_i = std::stoull(range.substr(dash_loc + 1));
if (end_i >= GGML_N_CORES_MAX) {
fprintf(stderr, "End index out of bounds!\n");
return false;
}
}
for (size_t i = start_i; i <= end_i; i++) {
boolmask[i] = true;
}
return true;
}
static bool parse_cpu_mask(const std::string & mask, bool (&boolmask)[GGML_N_CORES_MAX]) {
// Discard potential 0x prefix
size_t start_i = 0;
if (mask.length() >= 2 && mask.substr(0, 2) == "0x") {
start_i = 2;
}
size_t num_digits = mask.length() - start_i;
if (num_digits > 128) num_digits = 128;
size_t end_i = num_digits + start_i;
for (size_t i = start_i, n = (num_digits*4 - 1); i < end_i; i++, n-=4) {
char c = mask.at(i);
int8_t id = c;
if ((c >= '0' && c <= '9')) {
id -= '0';
} else if (c >= 'a' && c <= 'f') {
id -= 'a' - 10;
} else if (c >= 'A' && c <= 'F') {
id -= 'A' - 10;
} else {
fprintf(stderr, "Invalid hex character '%c' at position %d\n", c, int32_t(i));
return false;
}
boolmask[ n ] = boolmask[ n ] || ((id & 8) != 0);
boolmask[n - 1] = boolmask[n - 1] || ((id & 4) != 0);
boolmask[n - 2] = boolmask[n - 2] || ((id & 2) != 0);
boolmask[n - 3] = boolmask[n - 3] || ((id & 1) != 0);
}
return true;
}
bool gpt_params_find_arg(int argc, char ** argv, const std::string & arg, gpt_params & params, int & i, bool & invalid_param) {
llama_sampling_params & sparams = params.sparams;
@ -298,43 +404,187 @@ bool gpt_params_find_arg(int argc, char ** argv, const std::string & arg, gpt_pa
invalid_param = true;
return true;
}
params.n_threads = std::stoi(argv[i]);
if (params.n_threads <= 0) {
params.n_threads = std::thread::hardware_concurrency();
params.cpuparams.n_threads = std::stoi(argv[i]);
if (params.cpuparams.n_threads <= 0) {
params.cpuparams.n_threads = std::thread::hardware_concurrency();
}
return true;
}
if (arg == "-C" || arg == "--cpu-mask") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string mask = argv[i];
params.cpuparams.mask_valid = true;
invalid_param = !parse_cpu_mask(mask, params.cpuparams.cpumask);
return true;
}
if (arg == "-Cr" || arg == "--cpu-range") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string range = argv[i];
params.cpuparams.mask_valid = true;
invalid_param = !parse_cpu_range(range, params.cpuparams.cpumask);
return true;
}
if (arg == "--prio") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.cpuparams.priority = std::stoul(argv[i]);
return true;
}
if (arg == "--cpu-strict") {
params.cpuparams.strict_cpu = true;
return true;
}
if (arg == "--poll") {
params.cpuparams.poll = true;
return true;
}
if (arg == "-tb" || arg == "--threads-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.n_threads_batch = std::stoi(argv[i]);
if (params.n_threads_batch <= 0) {
params.n_threads_batch = std::thread::hardware_concurrency();
params.cpuparams_batch.n_threads = std::stoi(argv[i]);
if (params.cpuparams_batch.n_threads <= 0) {
params.cpuparams_batch.n_threads = std::thread::hardware_concurrency();
}
return true;
}
if (arg == "-Cb" || arg == "--cpu-mask-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string mask = argv[i];
params.cpuparams_batch.mask_valid = true;
invalid_param = !parse_cpu_mask(mask, params.cpuparams_batch.cpumask);
return true;
}
if (arg == "-Crb" || arg == "--cpu-range_batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string range = argv[i];
params.cpuparams_batch.mask_valid = true;
invalid_param = !parse_cpu_range(range, params.cpuparams_batch.cpumask);
return true;
}
if (arg == "--prio-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.cpuparams_batch.priority = std::stoul(argv[i]);
return true;
}
if (arg == "--cpu-strict-batch") {
params.cpuparams_batch.strict_cpu = true;
return true;
}
if (arg == "--poll-batch") {
params.cpuparams_batch.poll = true;
return true;
}
if (arg == "-td" || arg == "--threads-draft") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.n_threads_draft = std::stoi(argv[i]);
if (params.n_threads_draft <= 0) {
params.n_threads_draft = std::thread::hardware_concurrency();
params.draft_cpuparams.n_threads = std::stoi(argv[i]);
if (params.draft_cpuparams.n_threads <= 0) {
params.draft_cpuparams.n_threads = std::thread::hardware_concurrency();
}
return true;
}
if (arg == "-tbd" || arg == "--threads-batch-draft") {
if (arg == "-Cd" || arg == "--cpu-mask-draft") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.n_threads_batch_draft = std::stoi(argv[i]);
if (params.n_threads_batch_draft <= 0) {
params.n_threads_batch_draft = std::thread::hardware_concurrency();
std::string mask = argv[i];
params.draft_cpuparams.mask_valid = true;
invalid_param = !parse_cpu_mask(mask, params.draft_cpuparams.cpumask);
return true;
}
if (arg == "-Crd" || arg == "--cpu-range-draft") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string range = argv[i];
params.draft_cpuparams.mask_valid = true;
invalid_param = !parse_cpu_range(range, params.draft_cpuparams.cpumask);
return true;
}
if (arg == "--prio-draft") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.draft_cpuparams.priority = std::stoul(argv[i]);
return true;
}
if (arg == "--cpu-strict-draft") {
params.draft_cpuparams.strict_cpu = true;
return true;
}
if (arg == "--poll-draft") {
params.draft_cpuparams.poll = true;
return true;
}
if (arg == "-tdb" || arg == "--threads-draft-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.draft_cpuparams_batch.n_threads = std::stoi(argv[i]);
if (params.draft_cpuparams_batch.n_threads <= 0) {
params.draft_cpuparams_batch.n_threads = std::thread::hardware_concurrency();
}
return true;
}
if (arg == "-Cdb" || arg == "--cpu-mask-draft-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string mask = argv[i];
params.draft_cpuparams_batch.mask_valid = true;
invalid_param = !parse_cpu_mask(mask, params.draft_cpuparams_batch.cpumask);
return true;
}
if (arg == "-Crdb" || arg == "--cpu-range-draft-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
std::string range = argv[i];
params.draft_cpuparams_batch.mask_valid = true;
invalid_param = !parse_cpu_range(range, params.draft_cpuparams_batch.cpumask);
return true;
}
if (arg == "--prio-draft-batch") {
if (++i >= argc) {
invalid_param = true;
return true;
}
params.draft_cpuparams_batch.priority = std::stoul(argv[i]);
return true;
}
if (arg == "--cpu-strict-draft-batch") {
params.draft_cpuparams_batch.strict_cpu = true;
return true;
}
if (arg == "--poll-draft_batch") {
params.draft_cpuparams_batch.poll = true;
return true;
}
if (arg == "-p" || arg == "--prompt") {
@ -1373,13 +1623,41 @@ void gpt_params_print_usage(int /*argc*/, char ** argv, const gpt_params & param
printf(" (can be specified more than once for multiple prompts).\n");
printf(" --color colorise output to distinguish prompt and user input from generations\n");
printf(" -s SEED, --seed SEED RNG seed (default: -1, use random seed for < 0)\n");
printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.n_threads);
printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.cpuparams.n_threads);
printf(" -tb N, --threads-batch N\n");
printf(" number of threads to use during batch and prompt processing (default: same as --threads)\n");
printf(" -C M, --cpu-mask M CPU affinity mask: arbitrarily long hex. Takes precedence over cpu-range (default: \"\")\n");
printf(" -Cr --cpu-range lo-hi Ranges of CPUs for affinity (alternative to --cpu-mask)\n");
printf(" -Cb M, --cpu-mask-batch M\n");
printf(" CPU affinity mask: arbitrarily long hex. Takes precedence over cpu-range (default: \"\")\n");
printf(" -Crb --cpu-range-batch lo-hi\n");
printf(" Ranges of CPUs for affinity (alternative to --cpu-mask)\n");
printf(" --cpu-strict Use strict CPU placement (default: %u)\n", (unsigned) params.cpuparams.strict_cpu);
printf(" --cpu-strict-batch Use strict CPU placement (default: %u)\n", (unsigned) params.cpuparams.strict_cpu);
printf(" --priority N Set process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.cpuparams.priority);
printf(" --priority-batch N Set process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.cpuparams.priority);
printf(" --poll Use polling to wait for work (default: %u)\n", (unsigned) params.cpuparams.poll);
printf(" --poll-batch Use polling to wait for work (default: %u)\n", (unsigned) params.cpuparams.poll);
printf(" -td N, --threads-draft N");
printf(" number of threads to use during generation (default: same as --threads)\n");
printf(" -tbd N, --threads-batch-draft N\n");
printf(" number of threads to use during batch and prompt processing (default: same as --threads-draft)\n");
printf(" number of threads to use during generation for draft model (default: same as --threads)\n");
printf(" -tdb N, --threads-draft-batch N\n");
printf(" number of threads to use during batch and prompt processing for draft model (default: same as --threads-draft)\n");
printf(" -Cd M, --cpu-mask-draft M\n");
printf(" Draft model CPU affinity mask. Takes precedence over cpu-range-draft (default: \"\")\n");
printf(" -Crd --cpu-range-draft lo-hi\n");
printf(" Ranges of CPUs for affinity (alternative to --cpu-mask-draft)\n");
printf(" -Cdb M, --cpu-mask-draft-batch M\n");
printf(" Draft model CPU affinity mask. Takes precedence over cpu-range-draft (default: \"\")\n");
printf(" -Crdb --cpu-range-draft-batch lo-hi\n");
printf(" Ranges of CPUs for affinity (alternative to --cpu-mask-draft)\n");
printf(" --cpu-strict-draft Use strict CPU placement for draft model (default: %u)\n", (unsigned) params.draft_cpuparams.strict_cpu);
printf(" --cpu-strict-draft-batch\n");
printf(" Use strict CPU placement for draft model (default: %u)\n", (unsigned) params.draft_cpuparams.strict_cpu);
printf(" --priority-draft N Set draft process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.draft_cpuparams.priority);
printf(" --priority-draft-batch N\n");
printf(" Set draft process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.draft_cpuparams.priority);
printf(" --poll-draft Use polling to wait for draft model work (default: %u)\n", (unsigned) params.draft_cpuparams.poll);
printf(" --poll-draft-batch Use polling to wait for draft model work (default: %u)\n", (unsigned) params.draft_cpuparams.poll);
printf(" -p PROMPT, --prompt PROMPT\n");
printf(" prompt to start generation with (default: empty)\n");
printf(" -e, --escape process prompt escapes sequences (\\n, \\r, \\t, \\', \\\", \\\\)\n");
@ -1551,9 +1829,9 @@ void gpt_params_print_usage(int /*argc*/, char ** argv, const gpt_params & param
std::string gpt_params_get_system_info(const gpt_params & params) {
std::ostringstream os;
os << "system_info: n_threads = " << params.n_threads;
if (params.n_threads_batch != -1) {
os << " (n_threads_batch = " << params.n_threads_batch << ")";
os << "system_info: n_threads = " << params.cpuparams.n_threads;
if (params.cpuparams_batch.n_threads != -1) {
os << " (n_threads_batch = " << params.cpuparams_batch.n_threads << ")";
}
os << " / " << std::thread::hardware_concurrency() << " | " << llama_print_system_info();
@ -1943,7 +2221,7 @@ std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_par
((i > 0) || params.lora_base.empty())
? NULL
: params.lora_base.c_str(),
params.n_threads);
params.cpuparams.n_threads);
if (err != 0) {
fprintf(stderr, "%s: error: failed to apply lora adapter\n", __func__);
llama_free(lctx);
@ -2028,8 +2306,9 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param
cparams.n_seq_max = params.n_parallel;
cparams.n_batch = params.n_batch;
cparams.n_ubatch = params.n_ubatch;
cparams.n_threads = params.n_threads;
cparams.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
cparams.n_threads = params.cpuparams.n_threads;
cparams.n_threads_batch = params.cpuparams_batch.n_threads == -1 ?
params.cpuparams.n_threads : params.cpuparams_batch.n_threads;
cparams.seed = params.seed;
cparams.logits_all = params.logits_all;
cparams.embeddings = params.embedding;
@ -2054,6 +2333,22 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param
return cparams;
}
struct ggml_threadpool_params ggml_threadpool_params_from_cpu_params(const cpu_params & params) {
struct ggml_threadpool_params tpp;
tpp.mask_specified = params.mask_valid;
if (params.mask_valid) {
std::memcpy(&tpp.cpumask, &params.cpumask, GGML_N_CORES_MAX);
}
tpp.n_threads = params.n_threads;
tpp.prio = params.priority;
tpp.poll = params.poll;
tpp.strict_cpu = params.strict_cpu;
return tpp;
}
#ifdef LLAMA_USE_CURL
static bool starts_with(const std::string & str, const std::string & prefix) {
@ -2971,7 +3266,7 @@ void yaml_dump_non_result_info(FILE * stream, const gpt_params & params, const l
yaml_dump_vector_float(stream, "tensor_split", tensor_split_vector);
fprintf(stream, "tfs: %f # default: 1.0\n", sparams.tfs_z);
fprintf(stream, "threads: %d # default: %u\n", params.n_threads, std::thread::hardware_concurrency());
fprintf(stream, "threads: %d # default: %u\n", params.cpuparams.n_threads, std::thread::hardware_concurrency());
fprintf(stream, "top_k: %d # default: 40\n", sparams.top_k);
fprintf(stream, "top_p: %f # default: 0.95\n", sparams.top_p);
fprintf(stream, "min_p: %f # default: 0.0\n", sparams.min_p);

View file

@ -52,13 +52,18 @@ int32_t cpu_get_num_math();
// CLI argument parsing
//
struct cpu_params {
int32_t n_threads = -1;
bool cpumask[GGML_N_CORES_MAX] = {false}; // CPU affinity mask.
bool mask_valid = false; // Default: any CPU
int32_t priority = 0; // Scheduling prio : (0 - normal, 1 - medium, 2 - high, 3 - realtime)
bool strict_cpu = false; // Use strict CPU placement
bool poll = false; // Use polling (busywait) to wait for work
};
struct gpt_params {
uint32_t seed = LLAMA_DEFAULT_SEED; // RNG seed
int32_t n_threads = cpu_get_num_math();
int32_t n_threads_draft = -1;
int32_t n_threads_batch = -1; // number of threads to use for batch processing (-1 = use n_threads)
int32_t n_threads_batch_draft = -1;
int32_t n_predict = -1; // new tokens to predict
int32_t n_ctx = 512; // context size
int32_t n_batch = 2048; // logical batch size for prompt processing (must be >=32 to use BLAS)
@ -91,6 +96,11 @@ struct gpt_params {
ggml_backend_sched_eval_callback cb_eval = nullptr;
void * cb_eval_user_data = nullptr;
struct cpu_params cpuparams;
struct cpu_params cpuparams_batch;
struct cpu_params draft_cpuparams;
struct cpu_params draft_cpuparams_batch;
ggml_numa_strategy numa = GGML_NUMA_STRATEGY_DISABLED;
enum llama_rope_scaling_type rope_scaling_type = LLAMA_ROPE_SCALING_TYPE_UNSPECIFIED;
@ -220,7 +230,8 @@ std::string fs_get_cache_directory();
std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_params(gpt_params & params);
struct llama_model_params llama_model_params_from_gpt_params (const gpt_params & params);
struct llama_context_params llama_context_params_from_gpt_params(const gpt_params & params);
struct llama_context_params llama_context_params_from_gpt_params (const gpt_params & params);
struct ggml_threadpool_params ggml_threadpool_params_from_cpu_params(const cpu_params & params);
struct llama_model * llama_load_model_from_url(const char * model_url, const char * path_model, const struct llama_model_params & params);
struct llama_model * llama_load_model_from_hf(const char * repo, const char * file, const char * path_model, const struct llama_model_params & params);

View file

@ -19,7 +19,7 @@ constexpr float rms_norm_eps = 5e-6f;
#endif
static void ggml_graph_compute_helper(std::vector<uint8_t> & buf, ggml_cgraph * graph, int n_threads) {
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads);
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr);
if (plan.work_size > 0) {
buf.resize(plan.work_size);

View file

@ -119,8 +119,9 @@ int main(int argc, char ** argv) {
ctx_params.n_ubatch = n_ubatch;
ctx_params.flash_attn = flash_attn;
ctx_params.n_threads = params.n_threads;
ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
ctx_params.n_threads = params.cpuparams.n_threads;
ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ?
params.cpuparams.n_threads : params.cpuparams_batch.n_threads;
// ensure enough sequences are available
ctx_params.n_seq_max = *std::max_element(n_pl.begin(), n_pl.end());

View file

@ -83,8 +83,9 @@ int main(int argc, char ** argv) {
ctx_params.n_ctx = n_kv_req;
ctx_params.n_batch = std::max(n_len, n_parallel);
ctx_params.n_seq_max = n_parallel;
ctx_params.n_threads = params.n_threads;
ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
ctx_params.n_threads = params.cpuparams.n_threads;
ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ?
params.cpuparams.n_threads : params.cpuparams_batch.n_threads;
llama_context * ctx = llama_new_context_with_model(model, ctx_params);

View file

@ -21,7 +21,7 @@
#endif
static void ggml_graph_compute_helper(std::vector<uint8_t> & buf, ggml_cgraph * graph, int n_threads) {
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads);
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr);
if (plan.work_size > 0) {
buf.resize(plan.work_size);

View file

@ -344,7 +344,7 @@ static bool apply_lora(struct ggml_tensor * tensor, struct lora_data * lora, int
ggml_gallocr_alloc_graph(alloc, gf);
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads);
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, nullptr);
static std::vector<uint8_t> data_work;
data_work.resize(cplan.work_size);
cplan.work_data = data_work.data();

View file

@ -1818,7 +1818,7 @@ int main(int argc, char ** argv) {
opt_cb_data.millis_per_iter = 0.0;
// measure required memory for work buffer
size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads).work_size + GGML_OBJECT_SIZE;
size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads, nullptr).work_size + GGML_OBJECT_SIZE;
printf("%s: work_size = %zu bytes (%.1f MB)\n", __func__, max_work_size, (float) max_work_size / (1024.0f*1024.0f));
// context for work buffer

View file

@ -1915,7 +1915,7 @@ bool clip_image_batch_encode(clip_ctx * ctx, const int n_threads, const clip_ima
}
#endif
ggml_backend_graph_compute(ctx->backend, gf);
ggml_backend_graph_compute(ctx->backend, gf, NULL);
// the last node is the embedding tensor
struct ggml_tensor * embeddings = gf->nodes[gf->n_nodes - 1];

View file

@ -126,14 +126,14 @@ static struct llava_image_embed * load_image(llava_context * ctx_llava, gpt_para
if (!params->image.empty()) {
LOG_TEE("using base64 encoded image instead of command line image path\n");
}
embed = llava_image_embed_make_with_prompt_base64(ctx_llava->ctx_clip, params->n_threads, prompt);
embed = llava_image_embed_make_with_prompt_base64(ctx_llava->ctx_clip, params->cpuparams.n_threads, prompt);
if (!embed) {
LOG_TEE("%s: can't load image from prompt\n", __func__);
return NULL;
}
params->prompt = remove_image_from_prompt(prompt);
} else {
embed = llava_image_embed_make_with_filename(ctx_llava->ctx_clip, params->n_threads, fname.c_str());
embed = llava_image_embed_make_with_filename(ctx_llava->ctx_clip, params->cpuparams.n_threads, fname.c_str());
if (!embed) {
fprintf(stderr, "%s: is %s really an image file?\n", __func__, fname.c_str());
return NULL;

View file

@ -202,11 +202,38 @@ int main(int argc, char ** argv) {
ctx_guidance = llama_new_context_with_model(model, lparams);
}
LOG("%s: llama threadpool init = n_threads = %d\n",
__func__,
(int32_t) params.cpuparams.n_threads
);
struct ggml_threadpool_params tpp_batch =
ggml_threadpool_params_from_cpu_params(params.cpuparams_batch);
struct ggml_threadpool_params tpp =
ggml_threadpool_params_from_cpu_params(params.cpuparams);
struct ggml_compute_threadpool * threadpool_batch = ggml_create_threadpool(&tpp_batch);
if (!threadpool_batch) {
LOG_TEE("%s: batch threadpool create failed : n_threads %d\n", __func__, tpp_batch.n_threads);
exit(1);
}
struct ggml_compute_threadpool * threadpool = ggml_create_threadpool(&tpp);
if (!threadpool) {
LOG_TEE("%s: threadpool create failed : n_threads %d\n", __func__, tpp.n_threads);
exit(1);
}
if (model == NULL) {
LOG_TEE("%s: error: unable to load model\n", __func__);
return 1;
}
llama_attach_batch_threadpool(ctx, threadpool_batch);
llama_attach_threadpool(ctx, threadpool);
if (ctx_guidance) {
llama_attach_batch_threadpool(ctx_guidance, threadpool_batch);
llama_attach_threadpool(ctx_guidance, threadpool);
}
const int n_ctx_train = llama_n_ctx_train(model);
const int n_ctx = llama_n_ctx(ctx);
LOG("n_ctx: %d\n", n_ctx);
@ -955,6 +982,8 @@ int main(int argc, char ** argv) {
llama_sampling_free(ctx_sampling);
llama_backend_free();
ggml_release_threadpool(threadpool);
#ifndef LOG_DISABLE_LOGS
LOG_TEE("Log end\n");
#endif // LOG_DISABLE_LOGS

View file

@ -94,8 +94,9 @@ int main(int argc, char ** argv) {
ctx_params.seed = seed;
ctx_params.n_ctx = llama_n_ctx_train(model)*n_grp + n_keep;
ctx_params.n_batch = 512;
ctx_params.n_threads = params.n_threads;
ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
ctx_params.n_threads = params.cpuparams.n_threads;
ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ?
params.cpuparams.n_threads : params.cpuparams_batch.n_threads;
GGML_ASSERT(ctx_params.n_batch % n_grp == 0 && "n_batch must be divisible by n_grp");

View file

@ -2329,7 +2329,7 @@ static void server_print_usage(const char * argv0, const gpt_params & params, co
printf("options:\n");
printf(" -h, --help show this help message and exit\n");
printf(" -v, --verbose verbose output (default: %s)\n", server_verbose ? "enabled" : "disabled");
printf(" -t N, --threads N number of threads to use during computation (default: %d)\n", params.n_threads);
printf(" -t N, --threads N number of threads to use during computation (default: %d)\n", params.cpuparams.n_threads);
printf(" -tb N, --threads-batch N number of threads to use during batch and prompt processing (default: same as --threads)\n");
printf(" --threads-http N number of threads in the http server pool to process requests (default: max(hardware concurrency - 1, --parallel N + 2))\n");
printf(" -c N, --ctx-size N size of the prompt context (default: %d)\n", params.n_ctx);
@ -2612,7 +2612,7 @@ static void server_params_parse(int argc, char ** argv, server_params & sparams,
invalid_param = true;
break;
}
params.n_threads = std::stoi(argv[i]);
params.cpuparams.n_threads = std::stoi(argv[i]);
} else if (arg == "--grp-attn-n" || arg == "-gan") {
if (++i >= argc) {
invalid_param = true;
@ -2632,7 +2632,7 @@ static void server_params_parse(int argc, char ** argv, server_params & sparams,
invalid_param = true;
break;
}
params.n_threads_batch = std::stoi(argv[i]);
params.cpuparams_batch.n_threads = std::stoi(argv[i]);
} else if (arg == "--threads-http") {
if (++i >= argc) {
invalid_param = true;
@ -2943,8 +2943,8 @@ int main(int argc, char ** argv) {
});
LOG_INFO("system info", {
{"n_threads", params.n_threads},
{"n_threads_batch", params.n_threads_batch},
{"n_threads", params.cpuparams.n_threads},
{"n_threads_batch", params.cpuparams_batch.n_threads},
{"total_threads", std::thread::hardware_concurrency()},
{"system_info", llama_print_system_info()},
});

View file

@ -53,8 +53,9 @@ int main(int argc, char ** argv) {
ctx_params.seed = 1234;
ctx_params.n_ctx = 2048;
ctx_params.n_threads = params.n_threads;
ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
ctx_params.n_threads = params.cpuparams.n_threads;
ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ?
params.cpuparams.n_threads : params.cpuparams_batch.n_threads;
llama_context * ctx = llama_new_context_with_model(model, ctx_params);

View file

@ -24,6 +24,14 @@ struct seq_draft {
struct llama_sampling_context * ctx_sampling;
};
static void switch_active_threadpool(
ggml_compute_threadpool_t cur,
ggml_compute_threadpool_t nxt
) {
ggml_pause_threadpool(cur);
ggml_resume_threadpool(nxt);
}
int main(int argc, char ** argv) {
gpt_params params;
@ -67,13 +75,19 @@ int main(int argc, char ** argv) {
// load the target model
std::tie(model_tgt, ctx_tgt) = llama_init_from_gpt_params(params);
ggml_threadpool_params tpp_tgt = ggml_threadpool_params_from_cpu_params(params.cpuparams);
ggml_compute_threadpool * threadpool_tgt = ggml_create_threadpool(&tpp_tgt);
if (!threadpool_tgt) {
LOG_TEE("%s: target threadpool create failed : n_threads %d\n", __func__, tpp_tgt.n_threads);
exit(1);
}
// load the draft model
params.model = params.model_draft;
params.n_gpu_layers = params.n_gpu_layers_draft;
if (params.n_threads_draft > 0) {
params.n_threads = params.n_threads_draft;
if (params.draft_cpuparams.n_threads > 0) {
params.cpuparams = params.draft_cpuparams;
}
params.n_threads_batch = params.n_threads_batch_draft;
std::tie(model_dft, ctx_dft) = llama_init_from_gpt_params(params);
const bool vocab_type_tgt = llama_vocab_type(model_tgt);
@ -98,6 +112,17 @@ int main(int argc, char ** argv) {
return 1;
}
ggml_threadpool_params tpp_dft = ggml_threadpool_params_from_cpu_params(params.draft_cpuparams);
ggml_compute_threadpool * threadpool_dft = ggml_create_threadpool(&tpp_dft);
if (!threadpool_dft) {
LOG_TEE("%s: draft threadpool create failed : n_threads %d\n", __func__, tpp_dft.n_threads);
exit(1);
}
llama_attach_threadpool(ctx_tgt, threadpool_tgt);
llama_attach_threadpool(ctx_dft, threadpool_dft);
ggml_pause_threadpool(threadpool_dft);
{
const int n_vocab_tgt = llama_n_vocab(model_tgt);
const int n_vocab_dft = llama_n_vocab(model_dft);
@ -153,6 +178,7 @@ int main(int argc, char ** argv) {
// eval the prompt with both models
llama_decode(ctx_tgt, llama_batch_get_one( inp.data(), n_input - 1, 0, 0));
llama_decode(ctx_tgt, llama_batch_get_one(&inp.back(), 1, n_input - 1, 0));
switch_active_threadpool(threadpool_tgt, threadpool_dft);
llama_decode(ctx_dft, llama_batch_get_one( inp.data(), n_input, 0, 0));
const auto t_enc_end = ggml_time_us();
@ -542,6 +568,7 @@ int main(int argc, char ** argv) {
// evaluate the drafted tokens on the draft model
llama_decode(ctx_dft, batch_dft);
++n_past_cur;
++n_drafted;
@ -549,6 +576,7 @@ int main(int argc, char ** argv) {
break;
}
}
switch_active_threadpool(threadpool_dft, threadpool_tgt);
// evaluate the target model on the drafted tokens
{
@ -559,6 +587,8 @@ int main(int argc, char ** argv) {
// LOG("target batch: %s\n", LOG_BATCH_TOSTR_PRETTY(ctx_tgt, batch_tgt).c_str());
llama_decode(ctx_tgt, batch_tgt);
switch_active_threadpool(threadpool_tgt, threadpool_dft);
++n_past_tgt;
}
@ -608,6 +638,9 @@ int main(int argc, char ** argv) {
llama_backend_free();
ggml_release_threadpool(threadpool_tgt);
ggml_release_threadpool(threadpool_dft);
fprintf(stderr, "\n\n");
return 0;

View file

@ -1211,7 +1211,7 @@ int main(int argc, char ** argv) {
opt_cb_data.millis_per_iter = 0.0;
// measure required memory for work buffer
size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads).work_size + GGML_OBJECT_SIZE;
size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads, nullptr).work_size + GGML_OBJECT_SIZE;
printf("%s: work_size = %zu bytes (%.1f MB)\n", __func__, max_work_size, (float) max_work_size / (1024.0f*1024.0f));
// context for work buffer

View file

@ -9,6 +9,7 @@ extern "C" {
typedef struct ggml_backend_buffer_type * ggml_backend_buffer_type_t;
typedef struct ggml_backend_buffer * ggml_backend_buffer_t;
typedef struct ggml_backend * ggml_backend_t;
typedef struct ggml_compute_threadpool * ggml_compute_threadpool_t;
// Tensor allocator
struct ggml_tallocr {

View file

@ -92,13 +92,14 @@ extern "C" {
void (*GGML_CALL synchronize)(ggml_backend_t backend);
// compute graph with a plan (not used currently)
ggml_backend_graph_plan_t (*GGML_CALL graph_plan_create) (ggml_backend_t backend, const struct ggml_cgraph * cgraph);
ggml_backend_graph_plan_t (*GGML_CALL graph_plan_create) (ggml_backend_t backend, const struct ggml_cgraph * cgraph, ggml_compute_threadpool_t threadpool);
void (*GGML_CALL graph_plan_free) (ggml_backend_t backend, ggml_backend_graph_plan_t plan);
// compute graph with a plan
enum ggml_status (*GGML_CALL graph_plan_compute)(ggml_backend_t backend, ggml_backend_graph_plan_t plan);
// compute graph without a plan (async)
enum ggml_status (*GGML_CALL graph_compute) (ggml_backend_t backend, struct ggml_cgraph * cgraph);
enum ggml_status (*GGML_CALL graph_compute) (ggml_backend_t backend, struct ggml_cgraph * cgraph, ggml_compute_threadpool_t threadpool);
// check if the backend supports an operation
bool (*GGML_CALL supports_op)(ggml_backend_t backend, const struct ggml_tensor * op);

View file

@ -254,10 +254,14 @@ void ggml_backend_synchronize(ggml_backend_t backend) {
backend->iface.synchronize(backend);
}
ggml_backend_graph_plan_t ggml_backend_graph_plan_create(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
ggml_backend_graph_plan_t ggml_backend_graph_plan_create(
ggml_backend_t backend,
const struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool
) {
GGML_ASSERT(backend->iface.graph_plan_create != NULL);
return backend->iface.graph_plan_create(backend, cgraph);
return backend->iface.graph_plan_create(backend, cgraph, threadpool);
}
void ggml_backend_graph_plan_free(ggml_backend_t backend, ggml_backend_graph_plan_t plan) {
@ -266,20 +270,31 @@ void ggml_backend_graph_plan_free(ggml_backend_t backend, ggml_backend_graph_pla
backend->iface.graph_plan_free(backend, plan);
}
enum ggml_status ggml_backend_graph_plan_compute(ggml_backend_t backend, ggml_backend_graph_plan_t plan) {
enum ggml_status ggml_backend_graph_plan_compute(
ggml_backend_t backend,
ggml_backend_graph_plan_t plan
) {
GGML_ASSERT(backend->iface.graph_plan_compute != NULL);
return backend->iface.graph_plan_compute(backend, plan);
}
enum ggml_status ggml_backend_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
enum ggml_status err = ggml_backend_graph_compute_async(backend, cgraph);
enum ggml_status ggml_backend_graph_compute(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool
) {
enum ggml_status err = ggml_backend_graph_compute_async(backend, cgraph, threadpool);
ggml_backend_synchronize(backend);
return err;
}
enum ggml_status ggml_backend_graph_compute_async(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
return backend->iface.graph_compute(backend, cgraph);
enum ggml_status ggml_backend_graph_compute_async(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool
) {
return backend->iface.graph_compute(backend, cgraph, threadpool);
}
bool ggml_backend_supports_op(ggml_backend_t backend, const struct ggml_tensor * op) {
@ -758,12 +773,16 @@ struct ggml_backend_plan_cpu {
struct ggml_cgraph cgraph;
};
GGML_CALL static ggml_backend_graph_plan_t ggml_backend_cpu_graph_plan_create(ggml_backend_t backend, const struct ggml_cgraph * cgraph) {
GGML_CALL static ggml_backend_graph_plan_t ggml_backend_cpu_graph_plan_create(
ggml_backend_t backend,
const struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool
) {
struct ggml_backend_cpu_context * cpu_ctx = (struct ggml_backend_cpu_context *)backend->context;
struct ggml_backend_plan_cpu * cpu_plan = malloc(sizeof(struct ggml_backend_plan_cpu));
cpu_plan->cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads);
cpu_plan->cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads, threadpool);
cpu_plan->cgraph = *cgraph; // FIXME: deep copy
if (cpu_plan->cplan.work_size > 0) {
@ -797,10 +816,14 @@ GGML_CALL static enum ggml_status ggml_backend_cpu_graph_plan_compute(ggml_backe
GGML_UNUSED(backend);
}
GGML_CALL static enum ggml_status ggml_backend_cpu_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
GGML_CALL static enum ggml_status ggml_backend_cpu_graph_compute(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool
) {
struct ggml_backend_cpu_context * cpu_ctx = (struct ggml_backend_cpu_context *)backend->context;
struct ggml_cplan cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads);
struct ggml_cplan cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads, threadpool);
if (cpu_ctx->work_size < cplan.work_size) {
free(cpu_ctx->work_data);
@ -1630,7 +1653,10 @@ static bool ggml_backend_sched_alloc_splits(ggml_backend_sched_t sched) {
return true;
}
static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t sched) {
static enum ggml_status ggml_backend_sched_compute_splits(
ggml_backend_sched_t sched,
ggml_compute_threadpool_t threadpool
) {
struct ggml_backend_sched_split * splits = sched->splits;
for (int i = 0; i < sched->n_splits; i++) {
@ -1664,7 +1690,7 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s
}
if (!sched->callback_eval) {
enum ggml_status ec = ggml_backend_graph_compute_async(split_backend, &split->graph);
enum ggml_status ec = ggml_backend_graph_compute_async(split_backend, &split->graph, threadpool);
if (ec != GGML_STATUS_SUCCESS) {
return ec;
}
@ -1686,7 +1712,7 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s
struct ggml_cgraph gv = ggml_graph_view(&split->graph, j0, j1 + 1);
enum ggml_status ec = ggml_backend_graph_compute_async(split_backend, &gv);
enum ggml_status ec = ggml_backend_graph_compute_async(split_backend, &gv, threadpool);
if (ec != GGML_STATUS_SUCCESS) {
return ec;
}
@ -1825,13 +1851,21 @@ bool ggml_backend_sched_alloc_graph(ggml_backend_sched_t sched, struct ggml_cgra
return true;
}
enum ggml_status ggml_backend_sched_graph_compute(ggml_backend_sched_t sched, struct ggml_cgraph * graph) {
enum ggml_status err = ggml_backend_sched_graph_compute_async(sched, graph);
enum ggml_status ggml_backend_sched_graph_compute(
ggml_backend_sched_t sched,
struct ggml_cgraph * graph,
ggml_compute_threadpool_t threadpool
) {
enum ggml_status err = ggml_backend_sched_graph_compute_async(sched, graph, threadpool);
ggml_backend_sched_synchronize(sched);
return err;
}
enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sched, struct ggml_cgraph * graph) {
enum ggml_status ggml_backend_sched_graph_compute_async(
ggml_backend_sched_t sched,
struct ggml_cgraph * graph,
ggml_compute_threadpool_t threadpool
) {
if (!sched->is_reset && !sched->is_alloc) {
ggml_backend_sched_reset(sched);
}
@ -1842,7 +1876,7 @@ enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sch
}
}
return ggml_backend_sched_compute_splits(sched);
return ggml_backend_sched_compute_splits(sched, threadpool);
}
void ggml_backend_sched_synchronize(ggml_backend_sched_t sched) {
@ -2081,8 +2115,8 @@ bool ggml_backend_compare_graph_backend(ggml_backend_t backend1, ggml_backend_t
struct ggml_cgraph g1v = ggml_graph_view(g1, i, i + 1);
struct ggml_cgraph g2v = ggml_graph_view(g2, i, i + 1);
ggml_backend_graph_compute(backend1, &g1v);
ggml_backend_graph_compute(backend2, &g2v);
ggml_backend_graph_compute(backend1, &g1v, NULL);
ggml_backend_graph_compute(backend2, &g2v, NULL);
if (ggml_is_view_op(t1->op)) {
continue;

View file

@ -67,12 +67,24 @@ extern "C" {
GGML_API void ggml_backend_synchronize(ggml_backend_t backend);
GGML_API ggml_backend_graph_plan_t ggml_backend_graph_plan_create(ggml_backend_t backend, struct ggml_cgraph * cgraph);
GGML_API ggml_backend_graph_plan_t ggml_backend_graph_plan_create(
ggml_backend_t backend,
const struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool);
GGML_API void ggml_backend_graph_plan_free (ggml_backend_t backend, ggml_backend_graph_plan_t plan);
GGML_API enum ggml_status ggml_backend_graph_plan_compute (ggml_backend_t backend, ggml_backend_graph_plan_t plan);
GGML_API enum ggml_status ggml_backend_graph_compute (ggml_backend_t backend, struct ggml_cgraph * cgraph);
GGML_API enum ggml_status ggml_backend_graph_compute_async(ggml_backend_t backend, struct ggml_cgraph * cgraph);
GGML_API enum ggml_status ggml_backend_graph_plan_compute (
ggml_backend_t backend,
ggml_backend_graph_plan_t plan);
GGML_API enum ggml_status ggml_backend_graph_compute(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool);
GGML_API enum ggml_status ggml_backend_graph_compute_async(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool);
GGML_API bool ggml_backend_supports_op(ggml_backend_t backend, const struct ggml_tensor * op);
GGML_API bool ggml_backend_offload_op(ggml_backend_t backend, const struct ggml_tensor * op);
@ -193,8 +205,8 @@ extern "C" {
// Allocate and compute graph on the backend scheduler
GGML_API bool ggml_backend_sched_alloc_graph(ggml_backend_sched_t sched, struct ggml_cgraph * graph);
GGML_API enum ggml_status ggml_backend_sched_graph_compute(ggml_backend_sched_t sched, struct ggml_cgraph * graph);
GGML_API enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sched, struct ggml_cgraph * graph);
GGML_API enum ggml_status ggml_backend_sched_graph_compute(ggml_backend_sched_t sched, struct ggml_cgraph * graph, ggml_compute_threadpool_t threadpool);
GGML_API enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sched, struct ggml_cgraph * graph, ggml_compute_threadpool_t threadpool);
GGML_API void ggml_backend_sched_synchronize(ggml_backend_sched_t sched);
// Reset all assignments and allocators - must be called before changing the node backends

View file

@ -2495,9 +2495,13 @@ static bool ggml_graph_node_has_matching_properties(ggml_tensor * node, ggml_gra
return true;
}
GGML_CALL static enum ggml_status ggml_backend_cuda_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) {
ggml_backend_cuda_context * cuda_ctx = (ggml_backend_cuda_context *)backend->context;
GGML_CALL static enum ggml_status ggml_backend_cuda_graph_compute(
ggml_backend_t backend,
ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool) {
GGML_UNUSED(threadpool);
ggml_backend_cuda_context * cuda_ctx = (ggml_backend_cuda_context *)backend->context;
ggml_cuda_set_device(cuda_ctx->device);
#ifdef USE_CUDA_GRAPH

View file

@ -1948,7 +1948,12 @@ static ggml_backend_buffer_type_t ggml_backend_kompute_get_default_buffer_type(g
return ggml_backend_kompute_buffer_type(ctx->device);
}
static ggml_status ggml_backend_kompute_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
static ggml_status ggml_backend_kompute_graph_compute(
ggml_backend_t backend,
struct ggml_cgraph * cgraph
ggml_compute_threadpool_t threadpool) {
GGML_UNUSED(threadpool);
auto * ctx = static_cast<ggml_kompute_context *>(backend->context);
ggml_vk_graph_compute(ctx, cgraph);
return GGML_STATUS_SUCCESS;

View file

@ -3103,7 +3103,12 @@ GGML_CALL static ggml_backend_buffer_type_t ggml_backend_metal_get_default_buffe
UNUSED(backend);
}
GGML_CALL static enum ggml_status ggml_backend_metal_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) {
GGML_CALL static enum ggml_status ggml_backend_metal_graph_compute(
ggml_backend_t backend,
struct ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool) {
UNUSED(threadpool);
struct ggml_metal_context * metal_ctx = (struct ggml_metal_context *)backend->context;
return ggml_metal_graph_compute(metal_ctx, cgraph);

View file

@ -2235,7 +2235,12 @@ static ggml_backend_buffer_type_t ggml_backend_opencl_get_default_buffer_type(gg
GGML_UNUSED(backend);
}
static ggml_status ggml_backend_opencl_graph_compute(ggml_backend_t backend, ggml_cgraph * graph) {
static ggml_status ggml_backend_opencl_graph_compute(
ggml_backend_t backend,
ggml_cgraph * graph,
ggml_compute_threadpool_t threadpool) {
GGML_UNUSED(threadpool);
for (int i = 0; i < graph->n_nodes; ++i) {
ggml_tensor * node = graph->nodes[i];

View file

@ -17022,7 +17022,13 @@ catch (sycl::exception const &exc) {
std::exit(1);
}
GGML_CALL static ggml_status ggml_backend_sycl_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) {
GGML_CALL static ggml_status ggml_backend_sycl_graph_compute(
ggml_backend_t backend,
ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool) {
GGML_UNUSED(threadpool);
ggml_backend_sycl_context * sycl_ctx = (ggml_backend_sycl_context *)backend->context;
ggml_sycl_set_main_device(sycl_ctx->device);

View file

@ -6225,7 +6225,12 @@ static bool ggml_vk_is_empty(ggml_tensor * node) {
return ggml_is_empty(node) || node->op == GGML_OP_NONE || node->op == GGML_OP_RESHAPE || node->op == GGML_OP_TRANSPOSE || node->op == GGML_OP_VIEW || node->op == GGML_OP_PERMUTE;
}
GGML_CALL static ggml_status ggml_backend_vk_graph_compute(ggml_backend_t backend, ggml_cgraph * cgraph) {
GGML_CALL static ggml_status ggml_backend_vk_graph_compute(
ggml_backend_t backend,
ggml_cgraph * cgraph,
ggml_compute_threadpool_t threadpool) {
GGML_UNUSED(threadpool);
#ifdef GGML_VULKAN_DEBUG
std::cerr << "ggml_backend_vk_graph_compute(" << cgraph->n_nodes << " nodes)" << std::endl;
#endif

775
ggml.c
View file

@ -1739,30 +1739,103 @@ struct ggml_context_container {
struct ggml_context context;
};
struct ggml_compute_state_shared {
const struct ggml_cgraph* cgraph;
const struct ggml_cplan* cplan;
//
// thread data
//
typedef pthread_t ggml_thread_t;
#if defined(_WIN32)
typedef CONDITION_VARIABLE ggml_cond_t;
typedef SRWLOCK ggml_mutex_t;
#define ggml_mutex_init(m) InitializeSRWLock(m)
#define ggml_mutex_destroy(m)
#define ggml_mutex_lock(m) AcquireSRWLockExclusive(m)
#define ggml_mutex_unlock(m) ReleaseSRWLockExclusive(m)
#define ggml_mutex_lock_shared(m) AcquireSRWLockShared(m)
#define ggml_mutex_unlock_shared(m) ReleaseSRWLockShared(m)
#define ggml_cond_init(c) InitializeConditionVariable(c)
#define ggml_cond_destroy(c)
#define ggml_cond_wait(c, m) SleepConditionVariableSRW(c, m, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
#define ggml_cond_broadcast(c) WakeAllConditionVariable(c)
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#else
typedef pthread_cond_t ggml_cond_t;
typedef pthread_mutex_t ggml_mutex_t;
#define ggml_mutex_init(m) pthread_mutex_init(m, NULL)
#define ggml_mutex_destroy(m) pthread_mutex_destroy(m)
#define ggml_mutex_lock(m) pthread_mutex_lock(m)
#define ggml_mutex_unlock(m) pthread_mutex_unlock(m)
#define ggml_mutex_lock_shared(m) pthread_mutex_lock(m)
#define ggml_mutex_unlock_shared(m) pthread_mutex_unlock(m)
#define ggml_lock_init(x) UNUSED(x)
#define ggml_lock_destroy(x) UNUSED(x)
#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64))
#define ggml_lock_lock(x) _mm_pause()
#else
#define ggml_lock_lock(x) UNUSED(x)
#endif
#define ggml_lock_unlock(x) UNUSED(x)
#define GGML_LOCK_INITIALIZER 0
#define ggml_cond_init(c) pthread_cond_init(c, NULL)
#define ggml_cond_destroy(c) pthread_cond_destroy(c)
#define ggml_cond_wait(c, m) pthread_cond_wait(c, m)
#define ggml_cond_broadcast(c) pthread_cond_broadcast(c)
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#endif
// Threadpool def
struct ggml_compute_threadpool {
// synchronization primitives
atomic_int n_ready; // number of ready threads (inter-graph sync)
atomic_int n_active; // number of active threads (intra-graph sync)
atomic_int node_n; // active graph node
atomic_int node_task; // active graph node task phase
volatile bool stop; // Used for stopping the threadpool altogether
volatile bool pause; // Used for pausing the threadpool or individual threads
struct ggml_cgraph * cgraph;
struct ggml_cplan * cplan;
struct ggml_compute_state * workers; // per thread state
int32_t n_threads_max; // number of threads in the pool
int32_t n_threads_cur; // number of threads used in the current graph
bool poll; // Use polling (busywait) // TODO
int32_t prio; // Scheduling priority
ggml_mutex_t mutex; // mutex for cond.var
ggml_cond_t cond; // cond.var for waiting for new work
int64_t perf_node_start_cycles;
int64_t perf_node_start_time_us;
const int n_threads;
// synchronization primitives
atomic_int n_active; // num active threads
atomic_int node_n; // active graph node
atomic_int node_task; // active graph node task phase
ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void* abort_callback_data;
void * abort_callback_data;
atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
};
// Per-thread state
struct ggml_compute_state {
ggml_thread_t thrd;
bool cpumask[GGML_N_CORES_MAX];
int ith;
struct ggml_compute_state_shared* shared;
struct ggml_compute_threadpool * threadpool;
enum ggml_status ec;
};
@ -12472,7 +12545,7 @@ UseGgmlGemm1:;
return;
}
// Every thread starts at ith, so the first unprocessed chunk is nth. This save a bit of coordination right at the start.
atomic_store(&state->shared->current_chunk, nth);
atomic_store(&state->threadpool->current_chunk, nth);
if (src1->type != vec_dot_type) {
char * wdata = params->wdata;
const size_t row_size = ggml_row_size(vec_dot_type, ne10);
@ -12594,7 +12667,7 @@ UseGgmlGemm2:;
break;
}
current_chunk = atomic_fetch_add(&state->shared->current_chunk, 1);
current_chunk = atomic_fetch_add(&state->threadpool->current_chunk, 1);
}
#ifdef GGML_PERF
@ -18893,65 +18966,6 @@ void ggml_graph_clear(struct ggml_cgraph * cgraph) {
memset(cgraph->visited_hash_table.keys, 0, cgraph->visited_hash_table.size * sizeof(struct ggml_tensor *));
}
//
// thread data
//
// synchronization is done via busy loops
// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops
//
#ifdef __APPLE__
//#include <os/lock.h>
//
//typedef os_unfair_lock ggml_lock_t;
//
//#define ggml_lock_init(x) UNUSED(x)
//#define ggml_lock_destroy(x) UNUSED(x)
//#define ggml_lock_lock os_unfair_lock_lock
//#define ggml_lock_unlock os_unfair_lock_unlock
//
//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT
typedef int ggml_lock_t;
#define ggml_lock_init(x) UNUSED(x)
#define ggml_lock_destroy(x) UNUSED(x)
#define ggml_lock_lock(x) UNUSED(x)
#define ggml_lock_unlock(x) UNUSED(x)
#define GGML_LOCK_INITIALIZER 0
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#else
//typedef pthread_spinlock_t ggml_lock_t;
//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE)
//#define ggml_lock_destroy pthread_spin_destroy
//#define ggml_lock_lock pthread_spin_lock
//#define ggml_lock_unlock pthread_spin_unlock
typedef int ggml_lock_t;
#define ggml_lock_init(x) UNUSED(x)
#define ggml_lock_destroy(x) UNUSED(x)
#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64))
#define ggml_lock_lock(x) _mm_pause()
#else
#define ggml_lock_lock(x) UNUSED(x)
#endif
#define ggml_lock_unlock(x) UNUSED(x)
#define GGML_LOCK_INITIALIZER 0
#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#endif
// Android's libc implementation "bionic" does not support setting affinity
#if defined(__gnu_linux__)
static void set_numa_thread_affinity(int thread_n) {
@ -19026,9 +19040,10 @@ static void set_numa_thread_affinity(int thread_n) { UNUSED(thread_n); }
static void clear_numa_thread_affinity(void) {}
#endif
static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const struct ggml_compute_state_shared * st) {
int64_t cycles_cur = ggml_perf_cycles() - st->perf_node_start_cycles;
int64_t time_us_cur = ggml_perf_time_us() - st->perf_node_start_time_us;
static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const struct ggml_compute_threadpool * tp) {
int64_t cycles_cur = ggml_perf_cycles() - tp->perf_node_start_cycles;
int64_t time_us_cur = ggml_perf_time_us() - tp->perf_node_start_time_us;
node->perf_runs++;
node->perf_cycles += cycles_cur;
@ -19282,16 +19297,355 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
return n_tasks;
}
static thread_ret_t ggml_graph_compute_secondary_thread(void* data);
enum {
SCHED_PRIO_NORMAL,
SCHED_PRIO_MEDIUM,
SCHED_PRIO_HIGH,
SCHED_PRIO_REALTIME
};
#if defined(_WIN32)
#include "windows.h"
// TODO: support > 64 CPUs
static bool __thread_affinity(bool * mask) {
HANDLE h = GetCurrentThread();
uint64_t bitmask = 0ULL;
assert(GGML_N_CORES_MAX >= 64);
for (int32_t i = 0; i < 8; i++) {
int32_t idx = i * 8;
uint8_t val = 0;
val |= mask[idx + 0] << 0;
val |= mask[idx + 1] << 1;
val |= mask[idx + 2] << 2;
val |= mask[idx + 3] << 3;
val |= mask[idx + 4] << 4;
val |= mask[idx + 5] << 5;
val |= mask[idx + 6] << 6;
val |= mask[idx + 7] << 7;
bitmask |= (uint64_t)val << idx;
}
for (int32_t i = 64; i < GGML_N_CORES_MAX; i++) {
if (mask[i]) {
fprintf(stderr, "warn: setting thread-affinity for > 64 CPUs isn't supported on windows!\n");
break;
}
}
DWORD_PTR m = (DWORD_PTR)mask;
m = SetThreadAffinityMask(h, m);
return m != 0;
}
static bool __process_priority(int32_t prio) {
DWORD p = NORMAL_PRIORITY_CLASS;
switch (prio) {
case SCHED_PRIO_NORMAL: p = NORMAL_PRIORITY_CLASS; break;
case SCHED_PRIO_MEDIUM: p = ABOVE_NORMAL_PRIORITY_CLASS; break;
case SCHED_PRIO_HIGH: p = HIGH_PRIORITY_CLASS; break;
case SCHED_PRIO_REALTIME: p = REALTIME_PRIORITY_CLASS; break;
}
return SetPriorityClass(GetCurrentProcess(), p);
}
static bool __thread_priority(int32_t prio) {
DWORD p = NORMAL_PRIORITY_CLASS;
switch (prio) {
case SCHED_PRIO_NORMAL: p = THREAD_PRIORITY_NORMAL; break;
case SCHED_PRIO_MEDIUM: p = THREAD_PRIORITY_ABOVE_NORMAL; break;
case SCHED_PRIO_HIGH: p = THREAD_PRIORITY_HIGHEST; break;
case SCHED_PRIO_REALTIME: p = THREAD_PRIORITY_TIME_CRITICAL; break;
}
return SetThreadPriority(GetCurrentThread(), p);
}
#elif defined(__APPLE__)
#include <sys/types.h>
#include <sys/resource.h>
static bool __thread_affinity(const bool * mask) {
UNUSED(mask);
return true;
}
static bool __process_priority(int32_t prio) {
int32_t p = 0;
switch (prio) {
case SCHED_PRIO_NORMAL: p = 0; break;
case SCHED_PRIO_MEDIUM: p = -5; break;
case SCHED_PRIO_HIGH: p = -10; break;
case SCHED_PRIO_REALTIME: p = -20; break;
}
int32_t r = setpriority(PRIO_PROCESS, 0, p);
return r != -1;
}
static bool __thread_priority(int32_t prio) {
UNUSED(prio);
return true;
}
#else // posix?
#include <sched.h>
static bool __thread_affinity(const bool * mask) {
cpu_set_t cpuset;
int32_t err;
CPU_ZERO(&cpuset);
for (uint32_t i = 0; i < GGML_N_CORES_MAX; i++) {
if (mask[i]) {
CPU_SET(i, &cpuset);
}
}
#ifdef __ANDROID__
err = sched_setaffinity(0, sizeof(cpuset), &cpuset);
if (err < 0) {
err = errno;
}
#else
err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
#endif
if (err != 0) {
//fprintf(stderr, "warn: failed to set affinity mask 0x%llx (err %d: %s)\n", (unsigned long long)mask, err, strerror(err));
return false;
}
return true;
}
static bool __process_priority(int32_t prio) {
struct sched_param p;
int32_t policy = SCHED_OTHER;
switch (prio) {
case SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break;
case SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break;
case SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break;
case SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break;
}
int32_t err = sched_setscheduler(0, policy, &p);
if (err != 0) {
//fprintf(stderr, "warn: failed to set process priority %d (err %d)\n", prio, err);
return false;
}
return true;
}
static bool __thread_priority(int32_t prio) {
struct sched_param p;
int32_t policy = SCHED_OTHER;
switch (prio) {
case SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break;
case SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break;
case SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break;
case SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break;
}
int32_t err = pthread_setschedparam(pthread_self(), policy, &p);
if (err != 0) {
//fprintf(stderr, "warn: failed to set thread priority %d (err %d)\n", prio, err);
return false;
}
return true;
}
#endif
static void __init_stack(size_t size) {
void* ptr = alloca(size);
if (ptr) {
memset(ptr, 0, size);
}
}
#ifdef __aarch64__
static inline void __cpu_relax(void) {
__asm__ volatile("yield" ::: "memory");
}
#else
static inline void __cpu_relax(void) {
__asm__ volatile("rep; nop" ::: "memory");
}
#endif
static void __cpumask_next(const bool * global_mask, bool * local_mask, bool strict, int32_t* iter) {
if (!global_mask) {
memset(local_mask, 1, GGML_N_CORES_MAX);
return;
}
if (!strict) {
memcpy(local_mask, global_mask, GGML_N_CORES_MAX);
return;
} else {
memset(local_mask, 0, GGML_N_CORES_MAX);
int32_t base_idx = *iter;
for (int32_t i = 0; i < GGML_N_CORES_MAX; i++) {
int32_t idx = base_idx + i;
if (idx > GGML_N_CORES_MAX) {
// Just a cheaper modulo
idx -= GGML_N_CORES_MAX;
}
if (global_mask[idx]) {
local_mask[idx] = 1;
*iter = idx + 1;
return;
}
}
}
}
struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) {
struct ggml_compute_threadpool * threadpool =
GGML_ALIGNED_MALLOC(sizeof(struct ggml_compute_threadpool));
{
threadpool->n_ready = 1; // the main thread is "ready"
threadpool->n_active = 0;
threadpool->node_n = 0;
threadpool->node_task = GGML_TASK_TYPE_FINALIZE;
threadpool->stop = false;
threadpool->pause = true;
threadpool->cgraph = NULL;
threadpool->cplan = NULL;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = 0;
threadpool->poll = tpp->poll;
threadpool->prio = tpp->prio;
threadpool->perf_node_start_cycles = 0ULL;
threadpool->perf_node_start_time_us = 0ULL;
threadpool->abort_callback = NULL;
threadpool->abort_callback_data = NULL;
threadpool->current_chunk = 0;
}
ggml_mutex_init(&threadpool->mutex);
ggml_cond_init(&threadpool->cond);
struct ggml_compute_state * workers =
GGML_ALIGNED_MALLOC(sizeof(struct ggml_compute_state) * tpp->n_threads);
threadpool->workers = workers;
__init_stack(2ULL * 1024 * 1024);
int cpumask_iter = 0;
__process_priority(tpp->prio);
__thread_priority(tpp->prio);
for (int j = 0; j < tpp->n_threads; j++) {
workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.ith = j,
.threadpool = threadpool,
.ec = GGML_STATUS_SUCCESS,
};
if (tpp->mask_specified) {
__cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter);
} else {
workers[j].cpumask[j] = true;
}
// Spin threads for all secondary workers
if (j > 0) {
int32_t rc = ggml_thread_create(
&workers[j].thrd,
NULL,
ggml_graph_compute_secondary_thread,
&workers[j]
);
GGML_ASSERT(rc == 0);
}
}
// Set the main-thread's affinity last
__thread_affinity(workers[0].cpumask);
// Ensure all threads entered the compute loop before returning.
while (atomic_load(&threadpool->n_ready) != threadpool->n_threads_max) { ; }
return threadpool;
}
void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
if (!threadpool) return;
struct ggml_compute_state* workers = threadpool->workers;
const int32_t n_threads = threadpool->n_threads_max;
// Don't really need to lock in the polling mode but it doesn't hurt
ggml_mutex_lock(&threadpool->mutex);
threadpool->n_threads_cur = n_threads;
threadpool->stop = true;
threadpool->pause = false;
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
for (int32_t j = 1; j < n_threads; j++) {
int32_t rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == GGML_EXIT_SUCCESS || rc == GGML_EXIT_ABORTED);
UNUSED(rc);
}
GGML_ALIGNED_FREE(workers);
ggml_mutex_destroy(&threadpool->mutex);
ggml_cond_destroy(&threadpool->cond);
GGML_ALIGNED_FREE(threadpool);
}
void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
GGML_PRINT_DEBUG("Pausing threadpool\n");
threadpool->pause = true;
}
void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
ggml_mutex_lock(&threadpool->mutex);
GGML_PRINT_DEBUG("Resuming threadpool\n");
threadpool->pause = false;
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
}
static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
// wait for other threads to finish
const int last_node_n = * node_n;
while (true) {
if (do_yield) {
sched_yield();
__cpu_relax();
}
* node_n = atomic_load(&state->shared->node_n);
* node_n = atomic_load(&state->threadpool->node_n);
if (* node_n != last_node_n) break;
#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
@ -19306,10 +19660,10 @@ static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_co
while (true) {
if (do_yield) {
sched_yield();
__cpu_relax();
}
* task_phase = atomic_load(&state->shared->node_task);
* task_phase = atomic_load(&state->threadpool->node_task);
if (* task_phase != last_task_phase) break;
#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
@ -19318,13 +19672,13 @@ static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_co
}
}
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
static thread_ret_t ggml_graph_compute_thread(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;
const struct ggml_cgraph * cgraph = threadpool->cgraph;
const struct ggml_cplan * cplan = threadpool->cplan;
const int n_threads = state->shared->n_threads;
const int n_threads = threadpool->n_threads_cur;
set_numa_thread_affinity(state->ith);
@ -19333,12 +19687,12 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
while (true) {
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->shared->node_n += 1;
threadpool->node_n += 1;
state->ec = GGML_STATUS_ABORTED;
return 0;
}
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) {
// all other threads are finished and spinning
// do finalize and init here so we don't have synchronize again
struct ggml_compute_params params = {
@ -19353,20 +19707,20 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
/* FINALIZE */
struct ggml_tensor * node = cgraph->nodes[node_n];
if (GGML_OP_HAS_FINALIZE[node->op]) {
params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
params.nth = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur);
ggml_compute_forward(&params, node, state);
}
ggml_graph_compute_perf_stats_node(node, state->shared);
ggml_graph_compute_perf_stats_node(node, threadpool);
}
// distribute new work or execute it direct if 1T
while (++node_n < cgraph->n_nodes) {
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
const int n_tasks = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur);
state->shared->perf_node_start_cycles = ggml_perf_cycles();
state->shared->perf_node_start_time_us = ggml_perf_time_us();
threadpool->perf_node_start_cycles = ggml_perf_cycles();
threadpool->perf_node_start_time_us = ggml_perf_time_us();
params.nth = n_tasks;
@ -19387,7 +19741,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
ggml_compute_forward(&params, node, state);
}
ggml_graph_compute_perf_stats_node(node, state->shared);
ggml_graph_compute_perf_stats_node(node, threadpool);
} else {
break;
}
@ -19398,9 +19752,9 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
}
task_phase = GGML_TASK_TYPE_INIT;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_n, node_n);
atomic_store(&state->shared->node_task, task_phase);
atomic_store(&threadpool->n_active, n_threads);
atomic_store(&threadpool->node_n, node_n);
atomic_store(&threadpool->node_task, task_phase);
} else {
ggml_graph_compute_thread_sync_node(&node_n, state, false);
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
@ -19411,7 +19765,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
/* INIT & COMPUTE */
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
const int n_tasks = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur);
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
@ -19427,12 +19781,11 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
}
}
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_COMPUTE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
}
else {
atomic_store(&threadpool->n_active, n_threads);
atomic_store(&threadpool->node_task, task_phase);
} else {
// TODO: this sched_yield can have significant impact on the performance - either positive or negative
// depending on the workload and the operating system.
// since it is not clear what is the best approach, it should potentially become user-configurable
@ -19447,10 +19800,10 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
ggml_compute_forward(&params, node, state);
}
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_FINALIZE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
atomic_store(&threadpool->n_active, n_threads);
atomic_store(&threadpool->node_task, task_phase);
}
else {
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
@ -19460,9 +19813,91 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
return 0;
}
struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) {
if (n_threads <= 0) {
n_threads = GGML_DEFAULT_N_THREADS;
static inline int32_t ggml_graph_compute_check_for_work(struct ggml_compute_state * state) {
int32_t node_n;
struct ggml_compute_threadpool * threadpool = state->threadpool;
do {
if (threadpool->poll) {
node_n = atomic_load(&threadpool->node_n);
if (node_n != -1) {
// No new work. Yield, and keep polling.
sched_yield();
node_n = atomic_load(&threadpool->node_n);
}
} else {
ggml_mutex_lock_shared(&threadpool->mutex);
node_n = atomic_load(&threadpool->node_n);
if (node_n != -1 && !threadpool->stop && !threadpool->pause) {
// No new work. Wait for the signal.
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
node_n = atomic_load(&threadpool->node_n);
}
ggml_mutex_unlock_shared(&threadpool->mutex);
}
} while (state->ith >= threadpool->n_threads_cur);
return node_n;
}
static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_compute_threadpool * threadpool = state->threadpool;
#ifndef __aarch64__
__init_stack(2ULL * 1024 * 1024);
#endif
__thread_priority(threadpool->prio);
__thread_affinity(state->cpumask);
// Indicate that we're ready to go
atomic_fetch_add(&threadpool->n_ready, 1);
while (true) {
// Check if we need to sleep
while (threadpool->pause) {
GGML_PRINT_DEBUG("thread #%d inside pause loop\n", state->ith);
ggml_mutex_lock_shared(&threadpool->mutex);
if (threadpool->pause) {
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
}
GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith);
ggml_mutex_unlock_shared(&threadpool->mutex);
}
// This needs to be checked for after the cond_wait
if (threadpool->stop) break;
// Check if there is new work
// node_n == -1 means we have a fresh graph to compute on.
// Only the main thread sets node_n back to -1.
int32_t node_n = ggml_graph_compute_check_for_work(state);
if (node_n == -1) {
atomic_fetch_sub(&threadpool->n_ready, 1);
int64_t ret = (int64_t) ggml_graph_compute_thread(state);
if (ret == GGML_EXIT_ABORTED)
return (thread_ret_t) ret;
if (ret != GGML_EXIT_SUCCESS && ret != GGML_EXIT_ABORTED) {
fprintf(stderr, "ggml_graph_compute_thread exited with an unexpected error: %lld\n", (long long int) ret);
GGML_ASSERT(false);
}
atomic_fetch_add(&threadpool->n_ready, 1);
}
}
return (thread_ret_t) 0;
}
struct ggml_cplan ggml_graph_plan(
const struct ggml_cgraph * cgraph,
int32_t n_threads,
struct ggml_compute_threadpool * threadpool
) {
if (threadpool == NULL) {
//GGML_PRINT("WARNING: Threadpool is not specified. Will create a disposable threadpool\n");
}
size_t work_size = 0;
@ -19634,10 +20069,11 @@ struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threa
}
if (work_size > 0) {
work_size += CACHE_LINE_SIZE*(n_threads - 1);
work_size += CACHE_LINE_SIZE*(n_threads);
}
cplan.n_threads = MIN(max_tasks, n_threads);
cplan.threadpool = threadpool;
cplan.n_threads = n_threads;
cplan.work_size = work_size;
cplan.work_data = NULL;
@ -19654,63 +20090,78 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
}
}
const int n_threads = cplan->n_threads;
struct ggml_compute_state_shared state_shared = {
/*.cgraph =*/ cgraph,
/*.cgraph_plan =*/ cplan,
/*.perf_node_start_cycles =*/ 0,
/*.perf_node_start_time_us =*/ 0,
/*.n_threads =*/ n_threads,
/*.n_active =*/ n_threads,
/*.node_n =*/ -1,
/*.node_task =*/ GGML_TASK_TYPE_FINALIZE,
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
/*.current_chunk; =*/ 0,
};
struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);
// create thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; ++j) {
workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.ith = j,
.shared = &state_shared,
.ec = GGML_STATUS_SUCCESS,
};
const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
GGML_ASSERT(rc == 0);
UNUSED(rc);
}
}
workers[0].ith = 0;
workers[0].shared = &state_shared;
workers[0].ec = GGML_STATUS_SUCCESS;
const int64_t perf_start_cycles = ggml_perf_cycles();
const int64_t perf_start_time_us = ggml_perf_time_us();
// this is a work thread too
ggml_graph_compute_thread(&workers[0]);
enum ggml_status compute_status = workers[0].ec;
const int n_threads = cplan->n_threads;
struct ggml_compute_threadpool * threadpool = cplan->threadpool;
bool disposable_threadpool = false;
if (threadpool == NULL) {
//GGML_PRINT("NOTE: Threadpool is not specified. Will create a disposable threadpool\n");
struct ggml_threadpool_params tpp = {
.mask_specified = false,
.n_threads = n_threads,
.strict_cpu = false,
.prio = 1,
.poll = false
};
threadpool = ggml_create_threadpool(&tpp);
ggml_resume_threadpool(threadpool);
disposable_threadpool = true;
} else if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: Requesting more threads that the threadpool contains. Expect a bad time.\n");
}
// Initialize worker ordering
for (int j = 0; j < n_threads; ++j) {
threadpool->workers[j].ith = j;
}
// Set up work
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_active = n_threads;
threadpool->n_threads_cur = n_threads;
atomic_store(&threadpool->node_n, -1);
// Kick threadpool
if (!threadpool->poll && n_threads > 1) {
ggml_mutex_lock(&threadpool->mutex);
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
}
int compute_status = GGML_STATUS_SUCCESS;
// The main-thread is a work thread too. Start computing...
atomic_fetch_sub(&threadpool->n_ready, 1);
compute_status = (size_t) ggml_graph_compute_thread(&threadpool->workers[0]);
// don't leave affinity set on the main thread
clear_numa_thread_affinity();
// join or kill thread pool
// Wait for all other threads to finish
if (n_threads > 1) {
for (int j = 1; j < n_threads; j++) {
const int rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == 0);
if (workers[j].ec != GGML_STATUS_SUCCESS)
compute_status = workers[j].ec;
atomic_fetch_add(&threadpool->n_ready, 1);
// wait for thread pool
while (atomic_load(&threadpool->n_ready) < threadpool->n_threads_max) {
__cpu_relax();
}
}
for (int j = 0; j < n_threads; ++j) {
if (threadpool->workers[j].ec != GGML_STATUS_SUCCESS) {
compute_status = threadpool->workers[j].ec;
}
}
if (disposable_threadpool) {
ggml_release_threadpool(threadpool);
}
// performance stats (graph)
{
int64_t perf_cycles_cur = ggml_perf_cycles() - perf_start_cycles;
@ -19731,8 +20182,12 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
return compute_status;
}
enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) {
struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads);
enum ggml_status ggml_graph_compute_with_ctx(
struct ggml_context * ctx,
struct ggml_cgraph * cgraph,
int32_t n_threads
) {
struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
@ -20543,7 +20998,7 @@ static enum ggml_opt_result ggml_opt_adam(
float * pf = params.past > 0 ? opt->adam.pf->data : NULL; // past function values
struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads);
struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs;
@ -20890,7 +21345,7 @@ static enum ggml_opt_result ggml_opt_lbfgs(
opt->iter = iter;
}
struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads);
struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs;

25
ggml.h
View file

@ -274,6 +274,8 @@
#define GGML_UNREACHABLE() ((void) 0)
#endif
#define GGML_N_CORES_MAX 512
// used to copy the number of elements and stride in bytes of tensors into local variables.
// main purpose is to reduce code duplication and improve readability.
//
@ -609,6 +611,8 @@ extern "C" {
// If it returns true, the computation is aborted
typedef bool (*ggml_abort_callback)(void * data);
struct ggml_compute_threadpool;
// the compute plan that needs to be prepared for ggml_graph_compute()
// since https://github.com/ggerganov/ggml/issues/287
struct ggml_cplan {
@ -616,6 +620,7 @@ extern "C" {
uint8_t * work_data; // work buffer, to be allocated by caller before calling to `ggml_graph_compute()`
int n_threads;
struct ggml_compute_threadpool * threadpool;
// abort ggml_graph_compute when true
ggml_abort_callback abort_callback;
@ -653,6 +658,15 @@ extern "C" {
int64_t perf_time_us;
};
struct ggml_threadpool_params {
bool cpumask[GGML_N_CORES_MAX];
bool mask_specified;
int32_t n_threads;
int32_t prio;
bool poll;
bool strict_cpu;
};
// scratch buffer
struct ggml_scratch {
size_t offs;
@ -2025,9 +2039,18 @@ extern "C" {
GGML_API size_t ggml_graph_overhead(void);
GGML_API size_t ggml_graph_overhead_custom(size_t size, bool grads);
GGML_API struct ggml_compute_threadpool* ggml_create_threadpool (struct ggml_threadpool_params * params);
GGML_API void ggml_release_threadpool (struct ggml_compute_threadpool * threadpool);
GGML_API int32_t ggml_threadpool_get_n_threads(struct ggml_compute_threadpool * threadpool);
GGML_API void ggml_pause_threadpool (struct ggml_compute_threadpool * threadpool);
GGML_API void ggml_resume_threadpool (struct ggml_compute_threadpool * threadpool);
// ggml_graph_plan() has to be called before ggml_graph_compute()
// when plan.work_size > 0, caller must allocate memory for plan.work_data
GGML_API struct ggml_cplan ggml_graph_plan (const struct ggml_cgraph * cgraph, int n_threads /*= GGML_DEFAULT_N_THREADS*/);
GGML_API struct ggml_cplan ggml_graph_plan(
const struct ggml_cgraph * cgraph,
int n_threads,
struct ggml_compute_threadpool * threadpool);
GGML_API enum ggml_status ggml_graph_compute ( struct ggml_cgraph * cgraph, struct ggml_cplan * cplan);
// same as ggml_graph_compute() but the work data is allocated as a part of the context
// note: the drawback of this API is that you must have ensured that the context has enough memory for the work data

View file

@ -2198,6 +2198,9 @@ struct llama_context {
#endif
ggml_backend_t backend_cpu = nullptr;
ggml_compute_threadpool_t threadpool = nullptr;
ggml_compute_threadpool_t threadpool_batch = nullptr;
const llama_model & model;
// key + value cache for the self attention
@ -11348,7 +11351,13 @@ static size_t llama_output_reserve(llama_context & lctx, size_t n_outputs) {
static void llama_graph_compute(
llama_context & lctx,
ggml_cgraph * gf,
int n_threads) {
int n_threads,
ggml_compute_threadpool * threadpool) {
#ifdef GGML_USE_MPI
const int64_t n_layer = lctx.model.hparams.n_layer;
ggml_mpi_graph_compute_pre(lctx.ctx_mpi, gf, n_layer);
#endif
#ifdef GGML_USE_METAL
if (ggml_backend_is_metal(lctx.backend_metal)) {
ggml_backend_metal_set_n_cb(lctx.backend_metal, n_threads);
@ -11360,7 +11369,7 @@ static void llama_graph_compute(
ggml_backend_cpu_set_abort_callback(lctx.backend_cpu, lctx.abort_callback, lctx.abort_callback_data);
}
ggml_backend_sched_graph_compute_async(lctx.sched, gf);
ggml_backend_sched_graph_compute_async(lctx.sched, gf, threadpool);
// fprintf(stderr, "splits: %d\n", ggml_backend_sched_get_n_splits(lctx.sched));
}
@ -11483,7 +11492,35 @@ static int llama_decode_internal(
lctx.n_outputs = n_outputs_new;
}
int n_threads = n_tokens == 1 ? cparams.n_threads : cparams.n_threads_batch;
int n_threads;
ggml_compute_threadpool* threadpool = nullptr; // nullptr -> disposable threadpool
if (n_tokens == 1) {
if (lctx.threadpool_batch) {
ggml_pause_threadpool(lctx.threadpool_batch);
}
if (lctx.threadpool) {
ggml_resume_threadpool(lctx.threadpool);
threadpool = lctx.threadpool;
}
n_threads = cparams.n_threads;
} else {
if (lctx.threadpool && !lctx.threadpool_batch) {
ggml_pause_threadpool(lctx.threadpool);
}
if (lctx.threadpool_batch) {
ggml_resume_threadpool(lctx.threadpool_batch);
threadpool = lctx.threadpool_batch;
n_threads = cparams.n_threads_batch;
} else if (lctx.threadpool) {
ggml_resume_threadpool(lctx.threadpool);
threadpool = lctx.threadpool;
n_threads = cparams.n_threads;
} else {
n_threads = cparams.n_threads_batch;
}
}
GGML_ASSERT(n_threads > 0);
// helpers for smoother batch API transition
@ -11596,7 +11633,7 @@ static int llama_decode_internal(
llama_set_inputs(lctx, u_batch);
llama_graph_compute(lctx, gf, n_threads);
llama_graph_compute(lctx, gf, n_threads, threadpool);
// update the kv ring buffer
{
@ -11920,7 +11957,7 @@ static void llama_kv_cache_defrag_internal(struct llama_context & lctx) {
ggml_cgraph * gf = llama_build_graph_defrag(lctx, ids);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool);
#endif
//const int64_t t_end = ggml_time_us();
@ -11942,7 +11979,7 @@ static void llama_kv_cache_update_internal(struct llama_context & lctx) {
llama_set_k_shift(lctx);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool);
need_reserve = true;
}
@ -11968,7 +12005,7 @@ static void llama_kv_cache_update_internal(struct llama_context & lctx) {
llama_set_s_copy(lctx);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads);
llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool);
need_reserve = true;
}
@ -15391,7 +15428,7 @@ static int llama_apply_lora_from_file_internal(
return 1;
}
ggml_backend_graph_compute(backend_cpu, gf);
ggml_backend_graph_compute(backend_cpu, gf, nullptr);
ggml_backend_tensor_set(model_t, r->data, 0, ggml_nbytes(r));
@ -15558,6 +15595,31 @@ void llama_numa_init(enum ggml_numa_strategy numa) {
}
}
void llama_attach_threadpool(
struct llama_context * ctx,
ggml_compute_threadpool_t threadpool) {
ctx->threadpool = threadpool;
}
void llama_attach_batch_threadpool(
struct llama_context * ctx,
ggml_compute_threadpool_t threadpool_batch) {
ctx->threadpool_batch = threadpool_batch;
}
void llama_detach_threadpool(struct llama_context * ctx) {
ctx->threadpool = nullptr;
}
void llama_detach_batch_threadpool(struct llama_context * ctx) {
ctx->threadpool = nullptr;
}
void llama_detach_threadpools(struct llama_context * ctx) {
llama_detach_threadpool(ctx);
llama_detach_batch_threadpool(ctx);
}
void llama_backend_free(void) {
ggml_quantize_free();
}

10
llama.h
View file

@ -388,6 +388,16 @@ extern "C" {
//optional:
LLAMA_API void llama_numa_init(enum ggml_numa_strategy numa);
LLAMA_API void llama_attach_threadpool(
struct llama_context * ctx,
ggml_compute_threadpool_t threadpool);
LLAMA_API void llama_attach_batch_threadpool(
struct llama_context * ctx,
ggml_compute_threadpool_t threadpool);
LLAMA_API void llama_detach_threadpool(struct llama_context * ctx);
LLAMA_API void llama_detach_batch_threadpool(struct llama_context * ctx);
LLAMA_API void llama_detach_threadpools(struct llama_context * ctx);
// Call once at the end of the program - currently only used for MPI
LLAMA_API void llama_backend_free(void);

View file

@ -587,7 +587,7 @@ struct test_case {
ggml_build_forward_expand(gf, out);
// warmup run
ggml_backend_graph_compute(backend, gf);
ggml_backend_graph_compute(backend, gf, nullptr);
// duplicate the op
size_t target_size = ggml_backend_is_cpu(backend) ? 1ULL << 33 : 1ULL << 35; // 8 GB CPU, 32 GB GPU
@ -619,7 +619,7 @@ struct test_case {
ggml_backend_synchronize(backend);
int64_t start_time = ggml_time_us();
ggml_backend_graph_compute(backend, gf);
ggml_backend_graph_compute(backend, gf, nullptr);
ggml_backend_synchronize(backend);
int64_t end_time = ggml_time_us();
double time_us = end_time - start_time;

View file

@ -113,7 +113,7 @@ static struct ggml_tensor * get_random_tensor_f32(
}
static void ggml_graph_compute_helper(std::vector<uint8_t> & buf, ggml_cgraph * graph, int n_threads) {
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads);
struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr);
if (plan.work_size > 0) {
buf.resize(plan.work_size);