From bb997650be33b49519a23927d5e354f7defb4943 Mon Sep 17 00:00:00 2001 From: Dmitry Wolf Date: Wed, 15 Mar 2023 22:38:28 +0300 Subject: [PATCH] separate threads for r/w ops --- convert-pth-to-ggml.py | 60 ++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/convert-pth-to-ggml.py b/convert-pth-to-ggml.py index 75e182cc1..055614958 100644 --- a/convert-pth-to-ggml.py +++ b/convert-pth-to-ggml.py @@ -1,6 +1,5 @@ # Convert a LLaMA model checkpoint to a ggml compatible file # -# Load the model using Torch # Iterate over all variables and write them to a binary file. # # For each variable, write the following: @@ -26,6 +25,9 @@ from tqdm import tqdm import zipfile import pickle import concurrent.futures +import io +import threading +import queue from sentencepiece import SentencePieceProcessor @@ -138,24 +140,22 @@ def process_part(p): self.shape = shape self.dtype = dtype self.loadinfo = loadinfo - # print(shape, dtype) def numpy(self): - fname_model, base_name, storage_offset, k, shape, dtype = self.loadinfo - with zipfile.ZipFile(fname_model, 'r') as myzip: - with myzip.open(f'{base_name}/data/{k}') as myfile: - bytes_size = np.dtype(self.dtype).itemsize - myfile.seek(storage_offset * bytes_size, 1) - ret = np.empty(shape, dtype=dtype) - myfile.readinto(ret.data) - return ret + myzip, base_name, storage_offset, k, shape, dtype = self.loadinfo + with myzip.open(f'{base_name}/data/{k}') as myfile: + bytes_size = np.dtype(self.dtype).itemsize + myfile.seek(storage_offset * bytes_size, 1) + ret = np.empty(shape, dtype=dtype) + myfile.readinto(ret.data) + return ret - def my_unpickle(datapkl, fname_model, base_name): + def my_unpickle(datapkl, myzip, base_name): def my_rebuild_tensor(storage, storage_offset, size, stride, requires_grad, backward_hooks, metadata=None): storage_type = storage[1] obj_key = storage[2] return Tensor(shape=size, dtype=storage_type, loadinfo=( - fname_model, base_name, storage_offset, + myzip, base_name, storage_offset, obj_key, size, storage_type )) @@ -172,15 +172,24 @@ def process_part(p): return MyUnpickler(datapkl).load() - with zipfile.ZipFile(fname, 'r') as myzip: - base_name = myzip.namelist()[0].split('/', 1)[0] - # print(myzip.namelist()) - with myzip.open(f'{base_name}/data.pkl') as myfile: - model = my_unpickle(myfile, fname, base_name) + myzip = zipfile.ZipFile(fname, 'r') + base_name = myzip.namelist()[0].split('/', 1)[0] + with myzip.open(f'{base_name}/data.pkl') as myfile: + model = my_unpickle(myfile, myzip, base_name) return model model = load_model(fname_model) + q = queue.Queue(maxsize=2) + + def writer(): + while True: + item = q.get() + fout.write(item.getvalue()) + q.task_done() + + threading.Thread(target=writer, daemon=True).start() + for k, v in (t := tqdm(model.items())): t.set_description(f"Processing {k} with shape {tuple(v.shape)} and type {np.dtype(v.dtype)}") name = k @@ -190,8 +199,6 @@ def process_part(p): if name[-5:] == "freqs": continue - # print("Processing variable: " + name + " with shape: ", shape, " and type: ", v.dtype) - #data = tf.train.load_variable(dir_model, name).squeeze() data = v.numpy().squeeze() n_dims = len(data.shape) @@ -217,23 +224,26 @@ def process_part(p): data = data.astype(np.float32) ftype_cur = 0 + memout = io.BytesIO() # header sname = name.encode('utf-8') - fout.write(struct.pack("iii", n_dims, len(sname), ftype_cur)) + memout.write(struct.pack("iii", n_dims, len(sname), ftype_cur)) for i in range(n_dims): - fout.write(struct.pack("i", dshape[n_dims - 1 - i])) - fout.write(sname) + memout.write(struct.pack("i", dshape[n_dims - 1 - i])) + memout.write(sname) # data - data.tofile(fout) + memout.write(data.tobytes()) + # data.tofile(memout) + q.put(memout) + + q.join() - # I hope this deallocates the memory .. model = None fout.close() print("Done. Output file: " + fname_out + ", (part ", p, ")") - print("") with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = {executor.submit(process_part, p) for p in range(n_parts)}