separate threads for r/w ops
This commit is contained in:
parent
289637a6a3
commit
bb997650be
1 changed files with 35 additions and 25 deletions
|
@ -1,6 +1,5 @@
|
||||||
# Convert a LLaMA model checkpoint to a ggml compatible file
|
# Convert a LLaMA model checkpoint to a ggml compatible file
|
||||||
#
|
#
|
||||||
# Load the model using Torch
|
|
||||||
# Iterate over all variables and write them to a binary file.
|
# Iterate over all variables and write them to a binary file.
|
||||||
#
|
#
|
||||||
# For each variable, write the following:
|
# For each variable, write the following:
|
||||||
|
@ -26,6 +25,9 @@ from tqdm import tqdm
|
||||||
import zipfile
|
import zipfile
|
||||||
import pickle
|
import pickle
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
import io
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
|
||||||
from sentencepiece import SentencePieceProcessor
|
from sentencepiece import SentencePieceProcessor
|
||||||
|
|
||||||
|
@ -138,11 +140,9 @@ def process_part(p):
|
||||||
self.shape = shape
|
self.shape = shape
|
||||||
self.dtype = dtype
|
self.dtype = dtype
|
||||||
self.loadinfo = loadinfo
|
self.loadinfo = loadinfo
|
||||||
# print(shape, dtype)
|
|
||||||
|
|
||||||
def numpy(self):
|
def numpy(self):
|
||||||
fname_model, base_name, storage_offset, k, shape, dtype = self.loadinfo
|
myzip, base_name, storage_offset, k, shape, dtype = self.loadinfo
|
||||||
with zipfile.ZipFile(fname_model, 'r') as myzip:
|
|
||||||
with myzip.open(f'{base_name}/data/{k}') as myfile:
|
with myzip.open(f'{base_name}/data/{k}') as myfile:
|
||||||
bytes_size = np.dtype(self.dtype).itemsize
|
bytes_size = np.dtype(self.dtype).itemsize
|
||||||
myfile.seek(storage_offset * bytes_size, 1)
|
myfile.seek(storage_offset * bytes_size, 1)
|
||||||
|
@ -150,12 +150,12 @@ def process_part(p):
|
||||||
myfile.readinto(ret.data)
|
myfile.readinto(ret.data)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def my_unpickle(datapkl, fname_model, base_name):
|
def my_unpickle(datapkl, myzip, base_name):
|
||||||
def my_rebuild_tensor(storage, storage_offset, size, stride, requires_grad, backward_hooks, metadata=None):
|
def my_rebuild_tensor(storage, storage_offset, size, stride, requires_grad, backward_hooks, metadata=None):
|
||||||
storage_type = storage[1]
|
storage_type = storage[1]
|
||||||
obj_key = storage[2]
|
obj_key = storage[2]
|
||||||
return Tensor(shape=size, dtype=storage_type, loadinfo=(
|
return Tensor(shape=size, dtype=storage_type, loadinfo=(
|
||||||
fname_model, base_name, storage_offset,
|
myzip, base_name, storage_offset,
|
||||||
obj_key, size, storage_type
|
obj_key, size, storage_type
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@ -172,15 +172,24 @@ def process_part(p):
|
||||||
|
|
||||||
return MyUnpickler(datapkl).load()
|
return MyUnpickler(datapkl).load()
|
||||||
|
|
||||||
with zipfile.ZipFile(fname, 'r') as myzip:
|
myzip = zipfile.ZipFile(fname, 'r')
|
||||||
base_name = myzip.namelist()[0].split('/', 1)[0]
|
base_name = myzip.namelist()[0].split('/', 1)[0]
|
||||||
# print(myzip.namelist())
|
|
||||||
with myzip.open(f'{base_name}/data.pkl') as myfile:
|
with myzip.open(f'{base_name}/data.pkl') as myfile:
|
||||||
model = my_unpickle(myfile, fname, base_name)
|
model = my_unpickle(myfile, myzip, base_name)
|
||||||
return model
|
return model
|
||||||
|
|
||||||
model = load_model(fname_model)
|
model = load_model(fname_model)
|
||||||
|
|
||||||
|
q = queue.Queue(maxsize=2)
|
||||||
|
|
||||||
|
def writer():
|
||||||
|
while True:
|
||||||
|
item = q.get()
|
||||||
|
fout.write(item.getvalue())
|
||||||
|
q.task_done()
|
||||||
|
|
||||||
|
threading.Thread(target=writer, daemon=True).start()
|
||||||
|
|
||||||
for k, v in (t := tqdm(model.items())):
|
for k, v in (t := tqdm(model.items())):
|
||||||
t.set_description(f"Processing {k} with shape {tuple(v.shape)} and type {np.dtype(v.dtype)}")
|
t.set_description(f"Processing {k} with shape {tuple(v.shape)} and type {np.dtype(v.dtype)}")
|
||||||
name = k
|
name = k
|
||||||
|
@ -190,8 +199,6 @@ def process_part(p):
|
||||||
if name[-5:] == "freqs":
|
if name[-5:] == "freqs":
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# print("Processing variable: " + name + " with shape: ", shape, " and type: ", v.dtype)
|
|
||||||
|
|
||||||
#data = tf.train.load_variable(dir_model, name).squeeze()
|
#data = tf.train.load_variable(dir_model, name).squeeze()
|
||||||
data = v.numpy().squeeze()
|
data = v.numpy().squeeze()
|
||||||
n_dims = len(data.shape)
|
n_dims = len(data.shape)
|
||||||
|
@ -217,23 +224,26 @@ def process_part(p):
|
||||||
data = data.astype(np.float32)
|
data = data.astype(np.float32)
|
||||||
ftype_cur = 0
|
ftype_cur = 0
|
||||||
|
|
||||||
|
memout = io.BytesIO()
|
||||||
# header
|
# header
|
||||||
sname = name.encode('utf-8')
|
sname = name.encode('utf-8')
|
||||||
fout.write(struct.pack("iii", n_dims, len(sname), ftype_cur))
|
memout.write(struct.pack("iii", n_dims, len(sname), ftype_cur))
|
||||||
for i in range(n_dims):
|
for i in range(n_dims):
|
||||||
fout.write(struct.pack("i", dshape[n_dims - 1 - i]))
|
memout.write(struct.pack("i", dshape[n_dims - 1 - i]))
|
||||||
fout.write(sname)
|
memout.write(sname)
|
||||||
|
|
||||||
# data
|
# data
|
||||||
data.tofile(fout)
|
memout.write(data.tobytes())
|
||||||
|
# data.tofile(memout)
|
||||||
|
q.put(memout)
|
||||||
|
|
||||||
|
q.join()
|
||||||
|
|
||||||
# I hope this deallocates the memory ..
|
|
||||||
model = None
|
model = None
|
||||||
|
|
||||||
fout.close()
|
fout.close()
|
||||||
|
|
||||||
print("Done. Output file: " + fname_out + ", (part ", p, ")")
|
print("Done. Output file: " + fname_out + ", (part ", p, ")")
|
||||||
print("")
|
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||||
futures = {executor.submit(process_part, p) for p in range(n_parts)}
|
futures = {executor.submit(process_part, p) for p in range(n_parts)}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue