kv/ti data are still wrong

This commit is contained in:
Christian Zhou-Zheng 2024-06-09 00:34:36 -04:00
parent 03cc9bcbe8
commit 97dd416903
4 changed files with 229 additions and 310 deletions

View file

@ -59,7 +59,7 @@ class Model:
tensor_map: gguf.TensorNameMap tensor_map: gguf.TensorNameMap
tensor_names: set[str] | None tensor_names: set[str] | None
fname_out: Path fname_out: Path
gguf_writer: gguf.GGUFWriterSplit gguf_writer: gguf.GGUFWriter
# subclasses should define this! # subclasses should define this!
model_arch: gguf.MODEL_ARCH model_arch: gguf.MODEL_ARCH
@ -95,8 +95,8 @@ class Model:
ftype_lw: str = ftype_up.lower() ftype_lw: str = ftype_up.lower()
# allow templating the file name with the output ftype, useful with the "auto" ftype # allow templating the file name with the output ftype, useful with the "auto" ftype
self.fname_out = fname_out.parent / fname_out.name.format(ftype_lw, outtype=ftype_lw, ftype=ftype_lw, OUTTYPE=ftype_up, FTYPE=ftype_up) self.fname_out = fname_out.parent / fname_out.name.format(ftype_lw, outtype=ftype_lw, ftype=ftype_lw, OUTTYPE=ftype_up, FTYPE=ftype_up)
self.gguf_writer = gguf.GGUFWriterSplit(self.fname_out, gguf.MODEL_ARCH_NAMES[self.model_arch], split_arguments, self.gguf_writer = gguf.GGUFWriter(None, gguf.MODEL_ARCH_NAMES[self.model_arch], split_arguments,
endianess=self.endianess, use_temp_file=self.use_temp_file) endianess=self.endianess, use_temp_file=self.use_temp_file)
@classmethod @classmethod
def __init_subclass__(cls): def __init_subclass__(cls):
@ -326,16 +326,14 @@ class Model:
def write(self): def write(self):
self.write_tensors() self.write_tensors()
self.gguf_writer.init_shards()
self.gguf_writer.write_header_to_file(self.fname_out) self.gguf_writer.write_header_to_file(self.fname_out)
self.gguf_writer.write_kv_data_to_file() self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.write_tensors_to_file(progress=True) self.gguf_writer.write_tensors_to_file(progress=True)
self.gguf_writer.close() self.gguf_writer.close()
def write_vocab(self): def write_vocab(self):
if self.gguf_writer.split_arguments.split: if self.gguf_writer.split_arguments.split_style != gguf.SplitStyle.NONE:
raise ValueError('Splitting the vocabulary is not supported') raise ValueError('Splitting the vocabulary is not supported')
self.gguf_writer.init_shards()
self.gguf_writer.write_header_to_file(self.fname_out) self.gguf_writer.write_header_to_file(self.fname_out)
self.gguf_writer.write_kv_data_to_file() self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.close() self.gguf_writer.close()

View file

@ -2,7 +2,6 @@ from .constants import *
from .lazy import * from .lazy import *
from .gguf_reader import * from .gguf_reader import *
from .gguf_writer import * from .gguf_writer import *
from .gguf_writer_split import *
from .quants import * from .quants import *
from .tensor_mapping import * from .tensor_mapping import *
from .vocab import * from .vocab import *

View file

@ -5,10 +5,13 @@ import os
import shutil import shutil
import struct import struct
import tempfile import tempfile
from argparse import Namespace
from collections import deque
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path
from io import BufferedWriter from io import BufferedWriter
from typing import IO, Any, Sequence, Mapping from typing import IO, Any, Sequence, Mapping, TypeAlias
from string import ascii_letters, digits from string import ascii_letters, digits
import numpy as np import numpy as np
@ -27,10 +30,19 @@ from .constants import (
) )
from .quants import quant_shape_from_byte_shape from .quants import quant_shape_from_byte_shape
from .constants import Keys
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SHARD_NAME_FORMAT = "{:s}-{:05d}-of-{:05d}.gguf"
NUM_SHARD_KV_DATA = 6
METADATA_ONLY_INDICATOR = -1
KVTempData: TypeAlias = dict[str, tuple[Any, GGUFValueType | None]] # {key: (value, type)}
TensorTempData: TypeAlias = tuple[str, np.ndarray[Any, Any], GGMLQuantizationType | None] # (tensor name, tensor data, tensor dtype)
@dataclass @dataclass
class TensorInfo: class TensorInfo:
shape: Sequence[int] shape: Sequence[int]
@ -45,6 +57,25 @@ class GGUFValue:
type: GGUFValueType type: GGUFValueType
@dataclass
class Shard:
path: Path
tensor_count: int
size: int
tensors: deque[TensorTempData]
class SplitArguments:
def __init__(self, args: Namespace) -> None:
self.split_max_tensors = args.split_max_tensors if args.split_max_tensors else 0
self.split_max_size = GGUFWriter.split_str_to_n_bytes(args.split_max_size) if args.split_max_size else 0
self.split_style = SplitStyle.TENSORS if self.split_max_tensors \
else SplitStyle.SIZE if self.split_max_size \
else SplitStyle.NONE
self.dry_run = args.dry_run
self.small_first_shard = args.no_tensor_first_split
class WriterState(Enum): class WriterState(Enum):
NO_FILE = auto() NO_FILE = auto()
EMPTY = auto() EMPTY = auto()
@ -54,11 +85,17 @@ class WriterState(Enum):
WEIGHTS = auto() WEIGHTS = auto()
class SplitStyle(Enum):
NONE = auto()
TENSORS = auto()
SIZE = auto()
class GGUFWriter: class GGUFWriter:
fout: BufferedWriter | None fout: list[BufferedWriter | None]
path: os.PathLike[str] | str | None path: os.PathLike[str] | str | None
temp_file: tempfile.SpooledTemporaryFile[bytes] | None temp_file: tempfile.SpooledTemporaryFile[bytes] | None
tensors: dict[str, TensorInfo] tensors: list[dict[str, TensorInfo]]
kv_data: dict[str, GGUFValue] kv_data: dict[str, GGUFValue]
state: WriterState state: WriterState
_simple_value_packing = { _simple_value_packing = {
@ -76,25 +113,55 @@ class GGUFWriter:
} }
def __init__( def __init__(
self, path: os.PathLike[str] | str | None, arch: str, use_temp_file: bool = False, self, path: os.PathLike[str] | str | None, arch: str, split_arguments: SplitArguments,
endianess: GGUFEndian = GGUFEndian.LITTLE, add_architecture: bool = True use_temp_file: bool = False, endianess: GGUFEndian = GGUFEndian.LITTLE
): ):
self.fout = None self.fout = []
self.path = path self.path = path
self.arch = arch self.arch = arch
self.endianess = endianess self.endianess = endianess
self.data_alignment = GGUF_DEFAULT_ALIGNMENT self.data_alignment = GGUF_DEFAULT_ALIGNMENT
self.split_arguments = split_arguments
self.use_temp_file = use_temp_file self.use_temp_file = use_temp_file
self.temp_file = None self.temp_file = None
self.tensors = dict() self.tensors = []
self.kv_data = dict() self.kv_data = dict()
logger.info("gguf: This GGUF file is for {0} Endian only".format( logger.info("gguf: This GGUF file is for {0} Endian only".format(
"Big" if self.endianess == GGUFEndian.BIG else "Little", "Big" if self.endianess == GGUFEndian.BIG else "Little",
)) ))
self.state = WriterState.NO_FILE self.state = WriterState.NO_FILE
if add_architecture: if self.split_arguments.small_first_shard:
self.add_architecture() self.tensors.append(dict())
self.add_architecture()
def verify_arguments(self) -> None:
total_tensors = sum(len(ti) for ti in self.tensors)
total_size = sum(sum(GGUFWriter.get_tensor_size(ti) for ti in t.values()) for t in self.tensors)
if self.split_arguments.split_max_tensors and total_tensors < self.split_arguments.split_max_tensors:
logger.warning("Model has fewer tensors than the split threshold, not splitting")
self.split_style = SplitStyle.NONE
if self.split_arguments.split_max_size and total_size < self.split_arguments.split_max_size:
logger.warning("Model has smaller size than the split threshold, not splitting")
self.split_style = SplitStyle.NONE
# no shards are created when writing vocab so make one
if not self.tensors or len(self.tensors) == 0:
self.tensors.append(dict())
def format_shard_names(self) -> list[os.PathLike[str]]:
pathobj = Path(self.path)
if self.split_arguments.split_style == SplitStyle.NONE:
return [pathobj]
shard_names = []
for i in range(len(self.tensors)):
shard_names.append(pathobj.with_name(SHARD_NAME_FORMAT.format(pathobj.stem, i + 1, len(self.tensors))))
return shard_names
def open_output_file(self, path: os.PathLike[str] | str | None = None) -> None: def open_output_file(self, path: os.PathLike[str] | str | None = None) -> None:
if self.state is WriterState.EMPTY and self.fout is not None and (path is None or path == self.path): if self.state is WriterState.EMPTY and self.fout is not None and (path is None or path == self.path):
@ -107,24 +174,52 @@ class GGUFWriter:
self.path = path self.path = path
if self.path is not None: if self.path is not None:
if self.fout is not None: self.fout = []
self.fout.close() for fout in self.format_shard_names():
self.fout = open(self.path, "wb") self.fout.append(open(fout, "wb"))
self.state = WriterState.EMPTY self.state = WriterState.EMPTY
def print_plan(self) -> None:
logger.info("Writing the following files:")
for i in range(len(self.fout)):
logger.info(f" {self.fout[i].name}: n_tensors = {len(self.tensors[i])}, total_size = {GGUFWriter.format_n_bytes_to_str(GGUFWriter.get_tensors_total_size(self.tensors[i].values()))}")
if self.split_arguments.dry_run:
logger.info("Dry run, not writing files")
exit()
def write_header_to_file(self, path: os.PathLike[str] | str | None = None) -> None: def write_header_to_file(self, path: os.PathLike[str] | str | None = None) -> None:
self.verify_arguments()
self.open_output_file(path) self.open_output_file(path)
self.print_plan()
if self.state is not WriterState.EMPTY: if self.state is not WriterState.EMPTY:
raise ValueError(f'Expected output file to be empty, got {self.state}') raise ValueError(f'Expected output file to be empty, got {self.state}')
self._write_packed("<I", GGUF_MAGIC, skip_pack_prefix = True) assert len(self.fout) == len(self.tensors)
self._write_packed("I", GGUF_VERSION)
self._write_packed("Q", len(self.tensors)) for i in range(len(self.fout)):
self._write_packed("Q", len(self.kv_data)) fout = self.fout[i]
self.flush() self._write_packed(fout, "<I", GGUF_MAGIC, skip_pack_prefix = True)
self._write_packed(fout, "I", GGUF_VERSION)
self._write_packed(fout, "Q", len(self.tensors[i]))
kv_data_len = len(self.kv_data) if i == 0 else 0
if self.split_arguments.split_style != SplitStyle.NONE or self.split_arguments.small_first_shard:
kv_data_len += NUM_SHARD_KV_DATA
self._write_packed(fout, "Q", kv_data_len)
self.fout[i].flush()
self.state = WriterState.HEADER self.state = WriterState.HEADER
def add_shard_kv_data(self, kv_data: bytearray, shard_no: int) -> bytearray:
total_tensors = sum(len(t) for t in self.tensors)
kv_data += self._pack_val(Keys.Split.LLM_KV_SPLIT_NO, GGUFValueType.STRING, add_vtype=False)
kv_data += self._pack_val(shard_no, GGUFValueType.UINT16, add_vtype=True)
kv_data += self._pack_val(Keys.Split.LLM_KV_SPLIT_COUNT, GGUFValueType.STRING, add_vtype=False)
kv_data += self._pack_val(len(self.fout), GGUFValueType.UINT16, add_vtype=True)
kv_data += self._pack_val(Keys.Split.LLM_KV_SPLIT_TENSORS_COUNT, GGUFValueType.STRING, add_vtype=False)
kv_data += self._pack_val(total_tensors, GGUFValueType.INT32, add_vtype=True)
return kv_data
def write_kv_data_to_file(self) -> None: def write_kv_data_to_file(self) -> None:
if self.state is not WriterState.HEADER: if self.state is not WriterState.HEADER:
raise ValueError(f'Expected output file to contain the header, got {self.state}') raise ValueError(f'Expected output file to contain the header, got {self.state}')
@ -136,8 +231,16 @@ class GGUFWriter:
kv_data += self._pack_val(key, GGUFValueType.STRING, add_vtype=False) kv_data += self._pack_val(key, GGUFValueType.STRING, add_vtype=False)
kv_data += self._pack_val(val.value, val.type, add_vtype=True) kv_data += self._pack_val(val.value, val.type, add_vtype=True)
self.fout.write(kv_data) if len(self.fout) > 1:
self.flush() kv_data = self.add_shard_kv_data(kv_data, 0)
# only the first shard needs kv data
self.fout[0].write(kv_data)
self.fout[0].flush()
for i in range(1, len(self.fout)):
self.fout[i].write(self.add_shard_kv_data(bytearray(), i))
self.fout[i].flush()
self.state = WriterState.KV_DATA self.state = WriterState.KV_DATA
def write_ti_data_to_file(self) -> None: def write_ti_data_to_file(self) -> None:
@ -145,21 +248,23 @@ class GGUFWriter:
raise ValueError(f'Expected output file to contain KV data, got {self.state}') raise ValueError(f'Expected output file to contain KV data, got {self.state}')
assert self.fout is not None assert self.fout is not None
ti_data = bytearray() for i in range(len(self.fout)):
offset_tensor = 0 assert self.fout[i] is not None
ti_data = bytearray()
offset_tensor = 0
for name, ti in self.tensors.items(): for name, ti in self.tensors[i].items():
ti_data += self._pack_val(name, GGUFValueType.STRING, add_vtype=False) ti_data += self._pack_val(name, GGUFValueType.STRING, add_vtype=False)
n_dims = len(ti.shape) n_dims = len(ti.shape)
ti_data += self._pack("I", n_dims) ti_data += self._pack("I", n_dims)
for i in range(n_dims): for i in range(n_dims):
ti_data += self._pack("Q", ti.shape[n_dims - 1 - i]) ti_data += self._pack("Q", ti.shape[n_dims - 1 - i])
ti_data += self._pack("I", ti.dtype) ti_data += self._pack("I", ti.dtype)
ti_data += self._pack("Q", offset_tensor) ti_data += self._pack("Q", offset_tensor)
offset_tensor += GGUFWriter.ggml_pad(ti.nbytes, self.data_alignment) offset_tensor += GGUFWriter.ggml_pad(ti.nbytes, self.data_alignment)
self.fout.write(ti_data) self.fout[i].write(ti_data)
self.flush() self.fout[i].flush()
self.state = WriterState.TI_DATA self.state = WriterState.TI_DATA
def add_key_value(self, key: str, val: Any, vtype: GGUFValueType) -> None: def add_key_value(self, key: str, val: Any, vtype: GGUFValueType) -> None:
@ -248,7 +353,18 @@ class GGUFWriter:
if tensor_dtype == np.uint8: if tensor_dtype == np.uint8:
tensor_shape = quant_shape_from_byte_shape(tensor_shape, raw_dtype) tensor_shape = quant_shape_from_byte_shape(tensor_shape, raw_dtype)
self.tensors[name] = TensorInfo(shape=tensor_shape, dtype=dtype, nbytes=tensor_nbytes) # create splits as necessary, such as to start it off
if (len(self.tensors) == self.split_arguments.small_first_shard \
# or split when over tensor limit
or (self.split_arguments.split_style == SplitStyle.TENSORS \
and len(self.tensors[-1]) >= self.split_arguments.split_max_tensors) \
# or split when over size limit
or (self.split_arguments.split_style == SplitStyle.SIZE \
and GGUFWriter.get_tensors_total_size(self.tensors[-1].values()) + tensor_nbytes > self.split_arguments.split_max_size)):
self.tensors.append(dict())
self.tensors[-1][name] = TensorInfo(shape=tensor_shape, dtype=dtype, nbytes=tensor_nbytes)
def add_tensor( def add_tensor(
self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequence[int] | None = None, self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequence[int] | None = None,
@ -265,7 +381,7 @@ class GGUFWriter:
self.add_tensor_info(name, shape, tensor.dtype, tensor.nbytes, raw_dtype=raw_dtype) self.add_tensor_info(name, shape, tensor.dtype, tensor.nbytes, raw_dtype=raw_dtype)
if self.temp_file is None: if self.temp_file is None:
self.tensors[name].tensor = tensor self.tensors[-1][name].tensor = tensor
return return
tensor.tofile(self.temp_file) tensor.tofile(self.temp_file)
@ -283,9 +399,12 @@ class GGUFWriter:
if self.endianess == GGUFEndian.BIG: if self.endianess == GGUFEndian.BIG:
tensor.byteswap(inplace=True) tensor.byteswap(inplace=True)
self.write_padding(self.fout, self.fout.tell())
tensor.tofile(self.fout) for fout in self.fout:
self.write_padding(self.fout, tensor.nbytes) assert fout is not None
self.write_padding(fout, fout.tell())
tensor.tofile(fout)
self.write_padding(fout, tensor.nbytes)
self.state = WriterState.WEIGHTS self.state = WriterState.WEIGHTS
@ -294,27 +413,31 @@ class GGUFWriter:
assert self.fout is not None assert self.fout is not None
self.write_padding(self.fout, self.fout.tell()) for fout in self.fout:
assert fout is not None
self.write_padding(fout, fout.tell())
if self.temp_file is None: if self.temp_file is None:
bar = None for i in range(len(self.fout)):
assert self.fout[i] is not None
bar = None
if progress: if progress:
from tqdm import tqdm from tqdm import tqdm
total_bytes = sum(t.nbytes for t in self.tensors.values()) total_bytes = GGUFWriter.get_tensors_total_size(self.tensors[i].values())
bar = tqdm(desc="Writing", total=total_bytes, unit="byte", unit_scale=True) bar = tqdm(desc="Writing", total=total_bytes, unit="byte", unit_scale=True)
# relying on the fact that Python dicts preserve insertion order (since 3.7) # relying on the fact that Python dicts preserve insertion order (since 3.7)
for ti in self.tensors.values(): for ti in self.tensors[i].values():
assert ti.tensor is not None # can only iterate once over the tensors assert ti.tensor is not None # can only iterate once over the tensors
assert ti.tensor.nbytes == ti.nbytes assert ti.tensor.nbytes == ti.nbytes
ti.tensor.tofile(self.fout) ti.tensor.tofile(self.fout[i])
if bar is not None: if bar is not None:
bar.update(ti.nbytes) bar.update(ti.nbytes)
self.write_padding(self.fout, ti.nbytes) self.write_padding(self.fout[i], ti.nbytes)
ti.tensor = None ti.tensor = None
else: else:
self.temp_file.seek(0) self.temp_file.seek(0)
@ -326,12 +449,16 @@ class GGUFWriter:
def flush(self) -> None: def flush(self) -> None:
assert self.fout is not None assert self.fout is not None
self.fout.flush() for fout in self.fout:
assert fout is not None
fout.flush()
def close(self) -> None: def close(self) -> None:
if self.fout is not None: if self.fout is not None:
self.fout.close() for fout in self.fout:
self.fout = None if fout is not None:
fout.close()
self.fout = []
def add_architecture(self) -> None: def add_architecture(self) -> None:
self.add_string(Keys.General.ARCHITECTURE, self.arch) self.add_string(Keys.General.ARCHITECTURE, self.arch)
@ -609,6 +736,46 @@ class GGUFWriter:
return kv_data return kv_data
def _write_packed(self, fmt: str, value: Any, skip_pack_prefix: bool = False) -> None: def _write_packed(self, fout: BufferedWriter, fmt: str, value: Any, skip_pack_prefix: bool = False) -> None:
assert self.fout is not None assert fout is not None
self.fout.write(self._pack(fmt, value, skip_pack_prefix)) fout.write(self._pack(fmt, value, skip_pack_prefix))
@staticmethod
def get_tensor_size(tensor) -> int:
try:
return tensor.data_type.elements_to_bytes(np.prod(tensor.shape))
except AttributeError: # numpy ndarray[Any, Any]
return tensor.nbytes
@staticmethod
def get_tensors_total_size(tensors) -> int:
return sum(GGUFWriter.get_tensor_size(ti) for ti in tensors)
@staticmethod
def split_str_to_n_bytes(split_str: str) -> int:
if split_str.endswith("K"):
n = int(split_str[:-1]) * 1000
elif split_str.endswith("M"):
n = int(split_str[:-1]) * 1000 * 1000
elif split_str.endswith("G"):
n = int(split_str[:-1]) * 1000 * 1000 * 1000
elif split_str.isnumeric():
n = int(split_str)
else:
raise ValueError(f"Invalid split size: {split_str}, must be a number, optionally followed by K, M, or G")
if n <= 0:
raise ValueError(f"Invalid split size: {split_str}, must be positive")
return n
@staticmethod
def format_n_bytes_to_str(num: int) -> str:
if num == METADATA_ONLY_INDICATOR:
return "negligible - metadata only"
fnum = float(num)
for unit in ("", "K", "M", "G"):
if abs(fnum) < 1000.0:
return f"{fnum:3.1f}{unit}"
fnum /= 1000.0
return f"{fnum:.1f}T - over 1TB, --split recommended"

View file

@ -1,245 +0,0 @@
from __future__ import annotations
import os
import logging
from enum import IntEnum
from typing import TYPE_CHECKING, Any, Sequence
from argparse import Namespace
from collections import deque
from dataclasses import dataclass
from pathlib import Path
import numpy as np
if TYPE_CHECKING:
from typing_extensions import TypeAlias
from .constants import (
GGMLQuantizationType,
GGUFEndian,
GGUFValueType
)
from .gguf_writer import GGUFWriter, WriterState
from .constants import Keys
logger = logging.getLogger(__name__)
SHARD_NAME_FORMAT = "{:s}-{:05d}-of-{:05d}.gguf"
METADATA_ONLY_INDICATOR = -1
KVTempData: TypeAlias = dict[str, tuple[Any, GGUFValueType | None]] # {key: (value, type)}
TensorTempData: TypeAlias = tuple[str, np.ndarray[Any, Any], GGMLQuantizationType | None] # (tensor name, tensor data, tensor dtype)
@dataclass
class Shard:
path: Path
tensor_count: int
size: int
tensors: deque[TensorTempData]
class SplitStyle(IntEnum):
NONE = 0
TENSORS = 1
SIZE = 2
class SplitArguments:
def __init__(self, args: Namespace) -> None:
self.split_max_tensors = args.split_max_tensors if args.split_max_tensors else 0
self.split_max_size = GGUFWriterSplit.split_str_to_n_bytes(args.split_max_size) if args.split_max_size else 0
self.split_style = SplitStyle.TENSORS if self.split_max_tensors \
else SplitStyle.SIZE if self.split_max_size \
else SplitStyle.NONE
self.dry_run = args.dry_run
self.small_first_shard = args.no_tensor_first_split
class GGUFWriterSplit(GGUFWriter):
kv_data: KVTempData
split_arguments: SplitArguments
shards: list[Shard]
shard_writers: list[tuple[GGUFWriter, os.PathLike[str]]]
def __init__(self, path: os.PathLike[str] | str, arch: str, split_arguments: SplitArguments,
use_temp_file: bool = True, endianess: GGUFEndian = GGUFEndian.LITTLE
) -> None:
# we intentionally don't call superclass constructor
self.arch = arch
self.path = Path(path)
self.endianess = endianess
self.kv_data = {}
self.shards = []
self.shard_writers = []
self.total_tensors = 0
self.use_temp_file = use_temp_file
self.split_arguments = split_arguments
self.recent_key = None
self.state = WriterState.EMPTY
if self.split_arguments.small_first_shard:
self.shards.append(Shard(Path(), 0, METADATA_ONLY_INDICATOR, deque()))
def init_shards(self) -> None:
self.total_tensors = sum(shard.tensor_count for shard in self.shards)
total_size = sum(shard.size for shard in self.shards)
# check if we need to split
if self.split_arguments.split_max_tensors and self.total_tensors < self.split_arguments.split_max_tensors:
logger.warning("Model has fewer tensors than the split threshold, not splitting")
self.split_style = SplitStyle.NONE
if self.split_arguments.split_max_size and total_size < self.split_arguments.split_max_size:
logger.warning("Model has smaller size than the split threshold, not splitting")
self.split_style = SplitStyle.NONE
# no shards are created when writing vocab so make one
if not self.shards:
self.shards.append(Shard(Path(), 0, METADATA_ONLY_INDICATOR, deque()))
# format shard names
if len(self.shards) == 1:
self.shards[0].path = self.path
else:
for i in range(len(self.shards)):
self.shards[i].path = self.path.with_name(SHARD_NAME_FORMAT.format(self.path.stem, i + 1, len(self.shards)))
# print shard info
logger.info("Writing the following files:")
for shard in self.shards:
logger.info(f" {shard.path}: n_tensors = {shard.tensor_count}, total_size = {GGUFWriterSplit.format_n_bytes_to_str(shard.size)}")
if self.split_arguments.dry_run:
logger.info("Dry run, not writing files")
exit()
for i, shard in enumerate(self.shards):
# add_architecture is used for consistency - examples/gguf_split doesn't add arch to all shards
writer = GGUFWriter(None, self.arch, use_temp_file=self.use_temp_file,
endianess=self.endianess, add_architecture=(i == 0))
# only the first shard needs all the KV data
if i == 0:
for key, (value, etype) in self.kv_data.items():
writer.add_key_value(key, value, etype)
# add split metadata unless it's one file - small first shard splits even with SplitStyle.NONE
if self.split_arguments.split_style != SplitStyle.NONE or self.split_arguments.small_first_shard:
writer.add_uint16(Keys.Split.LLM_KV_SPLIT_NO, i)
writer.add_uint16(Keys.Split.LLM_KV_SPLIT_COUNT, len(self.shards))
writer.add_int32(Keys.Split.LLM_KV_SPLIT_TENSORS_COUNT, self.total_tensors)
# add tensors, deque popleft() ensures references to eager tensors are not kept
while True:
try:
(name, tensor, dtype) = shard.tensors.popleft()
writer.add_tensor(name, tensor, raw_dtype=dtype)
except IndexError:
break
self.shard_writers.append((writer, shard.path))
def write_header_to_file(self, path: os.PathLike[str] | str | None = None) -> None:
if self.state is not WriterState.EMPTY:
raise ValueError(f'Expected GGUFWriterSplit state to be EMPTY, got {self.state}')
for (writer, path) in self.shard_writers:
writer.write_header_to_file(path)
self.state = WriterState.HEADER
def write_kv_data_to_file(self) -> None:
if self.state is not WriterState.HEADER:
raise ValueError(f'Expected GGUFWriterSplit state to be HEADER, got {self.state}')
for (writer, _) in self.shard_writers:
writer.write_kv_data_to_file()
self.state = WriterState.KV_DATA
def write_tensors_to_file(self, *, progress: bool = False) -> None:
if self.state is not WriterState.KV_DATA:
raise ValueError(f'Expected GGUFWriterSplit state to be KV_DATA, got {self.state}')
running_total = self.total_tensors
for i in range(len(self.shard_writers)):
writer = self.shard_writers[i][0]
is_metadata = len(writer.tensors) == 0
if is_metadata:
logger.info(f"Writing to shard {i + 1}/{len(self.shards)} with metadata only")
else:
logger.info(f"Writing to shard {i + 1}/{len(self.shards)} with {len(writer.tensors)}/{running_total} remaining tensors (of {self.total_tensors} total)")
running_total -= len(writer.tensors)
writer.write_tensors_to_file(progress=(progress and not is_metadata))
del writer
self.state = WriterState.TI_DATA
# override add_key_value to handle kv data separately
def add_key_value(self, key: str, val: Any, vtype: GGUFValueType) -> None:
self.kv_data[key] = (val, vtype)
def add_tensor(
self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequence[int] | None = None,
raw_dtype: GGMLQuantizationType | None = None,
) -> None:
# we build splits as tensors are added so we need logic to figure out when to split
# logic is all in the conditional because it short-circuits, otherwise accessing self.shards[-1] would throw an error
# create a first shard to start it off
if (len(self.shards) == self.split_arguments.small_first_shard \
# or split when over tensor limit
or (self.split_arguments.split_style == SplitStyle.TENSORS \
and self.shards[-1].tensor_count >= self.split_arguments.split_max_tensors) \
# or split when over size limit
or (self.split_arguments.split_style == SplitStyle.SIZE \
and self.shards[-1].size + GGUFWriterSplit.get_tensor_size(tensor) > self.split_arguments.split_max_size)):
# we fill in the name later when we know how many shards there are
self.shards.append(Shard(Path(), 1, GGUFWriterSplit.get_tensor_size(tensor), deque([(name, tensor, raw_dtype)])))
else:
self.shards[-1].tensor_count += 1
self.shards[-1].size += GGUFWriterSplit.get_tensor_size(tensor)
self.shards[-1].tensors.append((name, tensor, raw_dtype))
def close(self) -> None:
for (writer, _) in self.shard_writers:
writer.close()
@staticmethod
def get_tensor_size(tensor) -> int:
try:
return tensor.data_type.elements_to_bytes(np.prod(tensor.shape))
except AttributeError: # numpy ndarray[Any, Any]
return tensor.nbytes
@staticmethod
def split_str_to_n_bytes(split_str: str) -> int:
if split_str.endswith("K"):
n = int(split_str[:-1]) * 1000
elif split_str.endswith("M"):
n = int(split_str[:-1]) * 1000 * 1000
elif split_str.endswith("G"):
n = int(split_str[:-1]) * 1000 * 1000 * 1000
elif split_str.isnumeric():
n = int(split_str)
else:
raise ValueError(f"Invalid split size: {split_str}, must be a number, optionally followed by K, M, or G")
if n <= 0:
raise ValueError(f"Invalid split size: {split_str}, must be positive")
return n
@staticmethod
def format_n_bytes_to_str(num: int) -> str:
if num == METADATA_ONLY_INDICATOR:
return "negligible - metadata only"
fnum = float(num)
for unit in ("", "K", "M", "G"):
if abs(fnum) < 1000.0:
return f"{fnum:3.1f}{unit}"
fnum /= 1000.0
return f"{fnum:.1f}T - over 1TB, --split recommended"