diff --git a/convert-falcon-hf-to-gguf.py b/convert-falcon-hf-to-gguf.py index be37ac073..b572e3c90 100755 --- a/convert-falcon-hf-to-gguf.py +++ b/convert-falcon-hf-to-gguf.py @@ -9,7 +9,7 @@ import os import struct import sys from pathlib import Path -from typing import Any, List +from typing import Any import gguf import numpy as np @@ -117,9 +117,9 @@ gguf_writer.add_file_type(ftype) print("gguf: get tokenizer metadata") -tokens: List[bytearray] = [] -scores: List[float] = [] -toktypes: List[int] = [] +tokens: list[bytearray] = [] +scores: list[float] = [] +toktypes: list[int] = [] tokenizer_json_file = dir_model / 'tokenizer.json' if not tokenizer_json_file.is_file(): diff --git a/convert-gptneox-hf-to-gguf.py b/convert-gptneox-hf-to-gguf.py index bc70d4af9..b1027922a 100755 --- a/convert-gptneox-hf-to-gguf.py +++ b/convert-gptneox-hf-to-gguf.py @@ -9,7 +9,7 @@ import os import struct import sys from pathlib import Path -from typing import Any, List +from typing import Any import gguf import numpy as np @@ -114,7 +114,7 @@ gguf_writer.add_layer_norm_eps(hparams["layer_norm_eps"]) print("gguf: get tokenizer metadata") -tokens: List[bytearray] = [] +tokens: list[bytearray] = [] tokenizer_json_file = dir_model / 'tokenizer.json' if not tokenizer_json_file.is_file(): diff --git a/convert-llama-7b-pth-to-gguf.py b/convert-llama-7b-pth-to-gguf.py index c043a198e..556e7cb47 100755 --- a/convert-llama-7b-pth-to-gguf.py +++ b/convert-llama-7b-pth-to-gguf.py @@ -11,7 +11,7 @@ import os import struct import sys from pathlib import Path -from typing import TYPE_CHECKING, Any, List +from typing import TYPE_CHECKING, Any import gguf import numpy as np @@ -132,9 +132,9 @@ if "rope_scaling" in hparams and hparams["rope_scaling"] != None and "factor" in print("gguf: get tokenizer metadata") -tokens: List[bytes] = [] -scores: List[float] = [] -toktypes: List[int] = [] +tokens: list[bytes] = [] +scores: list[float] = [] +toktypes: list[int] = [] tokenizer_model_file = dir_model / 'tokenizer.model' if not tokenizer_model_file.is_file(): diff --git a/convert-llama-hf-to-gguf.py b/convert-llama-hf-to-gguf.py index c608ab7cb..5e00eec1a 100755 --- a/convert-llama-hf-to-gguf.py +++ b/convert-llama-hf-to-gguf.py @@ -9,7 +9,7 @@ import os import struct import sys from pathlib import Path -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, Any import gguf import numpy as np @@ -25,7 +25,7 @@ NDArray: TypeAlias = 'np.ndarray[Any, Any]' # https://github.com/huggingface/transformers/blob/main/src/transformers/models/llama/convert_llama_weights_to_hf.py -def reverse_hf_permute(weights: NDArray, n_head: int, n_kv_head: Optional[int] = None) -> NDArray: +def reverse_hf_permute(weights: NDArray, n_head: int, n_kv_head: int | None = None) -> NDArray: if n_kv_head is not None and n_head != n_kv_head: n_head //= n_kv_head @@ -139,9 +139,9 @@ if "rope_scaling" in hparams and hparams["rope_scaling"] != None and "factor" in print("gguf: get tokenizer metadata") -tokens: List[bytes] = [] -scores: List[float] = [] -toktypes: List[int] = [] +tokens: list[bytes] = [] +scores: list[float] = [] +toktypes: list[int] = [] tokenizer_model_file = dir_model / 'tokenizer.model' if not tokenizer_model_file.is_file(): diff --git a/convert-lora-to-ggml.py b/convert-lora-to-ggml.py index 73543e000..a937410dd 100755 --- a/convert-lora-to-ggml.py +++ b/convert-lora-to-ggml.py @@ -6,12 +6,12 @@ import os import re import struct import sys -from typing import Any, BinaryIO, Dict, Sequence +from typing import Any, BinaryIO, Sequence import numpy as np import torch -NUMPY_TYPE_TO_FTYPE: Dict[str, int] = {"float32": 0, "float16": 1} +NUMPY_TYPE_TO_FTYPE: dict[str, int] = {"float32": 0, "float16": 1} HF_SUBLAYER_TO_GGML = { @@ -48,7 +48,7 @@ def translate_tensor_name(t: str) -> str: sys.exit(1) -def write_file_header(fout: BinaryIO, params: Dict[str, Any]) -> None: +def write_file_header(fout: BinaryIO, params: dict[str, Any]) -> None: fout.write(b"ggla"[::-1]) # magic (ggml lora) fout.write(struct.pack("i", 1)) # file version fout.write(struct.pack("i", params["r"])) diff --git a/convert.py b/convert.py index 30d38e17c..38b3c73a8 100755 --- a/convert.py +++ b/convert.py @@ -23,8 +23,7 @@ from abc import ABCMeta, abstractmethod from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path -from typing import (IO, TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Literal, Optional, Sequence, Set, - Tuple, Type, TypeVar, Union) +from typing import IO, TYPE_CHECKING, Any, Callable, Generator, Iterable, Literal, Sequence, TypeVar import gguf import numpy as np @@ -50,7 +49,7 @@ DEFAULT_CONCURRENCY = 8 class DataType: name: str dtype: np.dtype[Any] - valid_conversions: List[str] + valid_conversions: list[str] def elements_to_bytes(self, n_elements: int) -> int: return n_elements * self.dtype.itemsize @@ -86,7 +85,7 @@ class Q8_0QuantizedDataType(QuantizedDataType): n_blocks = arr.size // self.block_size blocks = arr.reshape((n_blocks, self.block_size)) # Much faster implementation of block quantization contributed by @Cebtenzzre - def quantize_blocks_q8_0(blocks: NDArray) -> Iterable[Tuple[Any, Any]]: + def quantize_blocks_q8_0(blocks: NDArray) -> Iterable[tuple[Any, Any]]: d = abs(blocks).max(axis = 1) / np.float32(127) with np.errstate(divide = 'ignore'): qs = (blocks / d[:, None]).round() @@ -100,13 +99,13 @@ DT_Q8_0 = Q8_0QuantizedDataType('Q8_0', quantized_dtype = np.dtype([('d', ' 1 else DT_F32 -GGML_FILE_TYPE_TO_DATA_TYPE: Dict[GGMLFileType, DataType] = { +GGML_FILE_TYPE_TO_DATA_TYPE: dict[GGMLFileType, DataType] = { GGMLFileType.AllF32 : DT_F32, GGMLFileType.MostlyF16 : DT_F16, GGMLFileType.MostlyQ8_0: DT_Q8_0, @@ -150,13 +149,13 @@ class Params: n_head_kv: int f_norm_eps: float - f_rope_freq_base: Optional[float] = None - f_rope_scale: Optional[float] = None + f_rope_freq_base: float | None = None + f_rope_scale: float | None = None - ftype: Optional[GGMLFileType] = None + ftype: GGMLFileType | None = None # path to the directory containing the model files - path_model: Optional[Path] = None + path_model: Path | None = None @staticmethod def find_n_mult(n_ff: int, n_embd: int) -> int: @@ -316,9 +315,9 @@ class Params: # class BpeVocab: - def __init__(self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]) -> None: + def __init__(self, fname_tokenizer: Path, fname_added_tokens: Path | None) -> None: self.bpe_tokenizer = json.loads(open(str(fname_tokenizer), encoding="utf-8").read()) - added_tokens: Dict[str, int] + added_tokens: dict[str, int] if fname_added_tokens is not None: added_tokens = json.load(open(fname_added_tokens, encoding="utf-8")) else: @@ -337,7 +336,7 @@ class BpeVocab: self.fname_tokenizer = fname_tokenizer self.fname_added_tokens = fname_added_tokens - def bpe_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def bpe_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: tokenizer = self.bpe_tokenizer from transformers.models.gpt2 import tokenization_gpt2 byte_encoder = tokenization_gpt2.bytes_to_unicode() @@ -347,12 +346,12 @@ class BpeVocab: score: float = -i yield text, score, gguf.TokenType.USER_DEFINED - def added_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: for text in self.added_tokens_list: score = -1000.0 yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED - def all_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: yield from self.bpe_tokens() yield from self.added_tokens() @@ -361,9 +360,9 @@ class BpeVocab: class SentencePieceVocab: - def __init__(self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]) -> None: + def __init__(self, fname_tokenizer: Path, fname_added_tokens: Path | None) -> None: self.sentencepiece_tokenizer = SentencePieceProcessor(str(fname_tokenizer)) - added_tokens: Dict[str, int] + added_tokens: dict[str, int] if fname_added_tokens is not None: added_tokens = json.load(open(fname_added_tokens, encoding="utf-8")) else: @@ -382,7 +381,7 @@ class SentencePieceVocab: self.fname_tokenizer = fname_tokenizer self.fname_added_tokens = fname_added_tokens - def sentencepiece_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def sentencepiece_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: tokenizer = self.sentencepiece_tokenizer for i in range(tokenizer.vocab_size()): piece = tokenizer.id_to_piece(i) @@ -406,19 +405,19 @@ class SentencePieceVocab: yield text, score, toktype - def added_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: for text in self.added_tokens_list: score = -1000.0 yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED - def all_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]: + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: yield from self.sentencepiece_tokens() yield from self.added_tokens() def __repr__(self) -> str: return f"" -Vocab = Union[BpeVocab, SentencePieceVocab] +Vocab: TypeAlias = 'BpeVocab | SentencePieceVocab' # # data loading @@ -498,13 +497,13 @@ def load_unquantized(lazy_tensor: LazyTensor, expected_dtype: Any = None, conver return tensor.ndarray -GGMLCompatibleTensor = Union[UnquantizedTensor] +GGMLCompatibleTensor = UnquantizedTensor @dataclass class LazyTensor: _load: Callable[[], Tensor] - shape: List[int] + shape: list[int] data_type: DataType description: str @@ -527,24 +526,24 @@ class LazyTensor: raise ValueError(f'Cannot validate conversion from {self.data_type} to {data_type}.') -LazyModel = Dict[str, LazyTensor] +LazyModel = dict[str, LazyTensor] @dataclass class ModelPlus: model: LazyModel - paths: List[Path] # Where this was read from. + paths: list[Path] # Where this was read from. format: Literal['ggml', 'torch', 'safetensors', 'none'] - vocab: Optional[Vocab] # For GGML models (which have vocab built in), the vocab. + vocab: Vocab | None # For GGML models (which have vocab built in), the vocab. -def merge_sharded(models: List[LazyModel]) -> LazyModel: +def merge_sharded(models: list[LazyModel]) -> LazyModel: # Original LLaMA models have each file contain one part of each tensor. # Use a dict instead of a set to preserve order. names = {name: None for model in models for name in model} def convert(name: str) -> LazyTensor: - lazy_tensors: List[LazyTensor] = [model[name] for model in models] + lazy_tensors: list[LazyTensor] = [model[name] for model in models] if len(lazy_tensors) == 1: # only one file; don't go through this procedure since there might # be quantized tensors @@ -572,7 +571,7 @@ def merge_sharded(models: List[LazyModel]) -> LazyModel: return {name: convert(name) for name in names} -def merge_multifile_models(models_plus: List[ModelPlus]) -> ModelPlus: +def merge_multifile_models(models_plus: list[ModelPlus]) -> ModelPlus: formats = set(mp.format for mp in models_plus) assert len(formats) == 1, "different formats?" format = formats.pop() @@ -676,7 +675,7 @@ class LazyUnpickler(pickle.Unpickler): def rebuild_from_type_v2(func, new_type, args, state): return func(*args) - CLASSES: Dict[Tuple[str, str], Any] = { + CLASSES: dict[tuple[str, str], Any] = { # getattr used here as a workaround for mypy not being smart enough to detrmine # the staticmethods have a __func__ attribute. ('torch._tensor', '_rebuild_from_type_v2'): getattr(rebuild_from_type_v2, '__func__'), @@ -709,15 +708,15 @@ def lazy_load_torch_file(outer_fp: IO[bytes], path: Path) -> ModelPlus: def lazy_load_safetensors_file(fp: IO[bytes], path: Path) -> ModelPlus: header_size, = struct.unpack(' LazyTensor: + def convert(info: dict[str, Any]) -> LazyTensor: data_type = SAFETENSORS_DATA_TYPES[info['dtype']] numpy_dtype = data_type.dtype - shape: List[int] = info['shape'] + shape: list[int] = info['shape'] begin, end = info['data_offsets'] assert 0 <= begin <= end <= len(byte_buf) assert end - begin == math.prod(shape) * numpy_dtype.itemsize @@ -756,7 +755,7 @@ def lazy_load_file(path: Path) -> ModelPlus: In = TypeVar('In') Out = TypeVar('Out') -def bounded_parallel_map(func: Callable[[In], Out], iterable: Iterable[In], concurrency: int, max_workers: Optional[int] = None, use_processpool_executor: bool = False) -> Iterable[Out]: +def bounded_parallel_map(func: Callable[[In], Out], iterable: Iterable[In], concurrency: int, max_workers: int | None = None, use_processpool_executor: bool = False) -> Iterable[Out]: '''Parallel map, but with backpressure. If the caller doesn't call `next` fast enough, this will stop calling `func` at some point rather than letting results pile up in memory. Specifically, there is a max of one @@ -765,13 +764,13 @@ def bounded_parallel_map(func: Callable[[In], Out], iterable: Iterable[In], conc yield from map(func, iterable) # Not reached. iterable = iter(iterable) - executor_class: Union[Type[ThreadPoolExecutor], Type[ProcessPoolExecutor]] + executor_class: type[ThreadPoolExecutor] | type[ProcessPoolExecutor] if use_processpool_executor: executor_class = ProcessPoolExecutor else: executor_class = ThreadPoolExecutor with executor_class(max_workers = max_workers) as executor: - futures: List[concurrent.futures.Future[Out]] = [] + futures: list[concurrent.futures.Future[Out]] = [] done = False for _ in range(concurrency): try: @@ -895,13 +894,13 @@ class OutputFile: of.close() @staticmethod - def do_item(item: Tuple[str, LazyTensor]) -> Tuple[DataType, NDArray]: + def do_item(item: tuple[str, LazyTensor]) -> tuple[DataType, NDArray]: name, lazy_tensor = item tensor = lazy_tensor.load().to_ggml() return (lazy_tensor.data_type, tensor.ndarray) @staticmethod - def maybe_do_quantize(item: Tuple[DataType, NDArray]) -> NDArray: + def maybe_do_quantize(item: tuple[DataType, NDArray]) -> NDArray: dt, arr = item if not isinstance(dt, QuantizedDataType): return arr @@ -942,7 +941,7 @@ class OutputFile: of.close() -def pick_output_type(model: LazyModel, output_type_str: Optional[str]) -> GGMLFileType: +def pick_output_type(model: LazyModel, output_type_str: str | None) -> GGMLFileType: wq_type = model[NAMES[gguf.MODEL_TENSOR.ATTN_Q].format(bid=0)+".weight"].data_type if output_type_str == "f32" or (output_type_str is None and wq_type == DT_F32): @@ -962,7 +961,7 @@ def convert_to_output_type(model: LazyModel, output_type: GGMLFileType) -> LazyM def convert_model_names(model: LazyModel, params: Params) -> LazyModel: tmap = gguf.TensorNameMap(ARCH, params.n_layer) - should_skip: Set[gguf.MODEL_TENSOR] = set(gguf.MODEL_TENSOR_SKIP.get(ARCH, [])) + should_skip: set[gguf.MODEL_TENSOR] = set(gguf.MODEL_TENSOR_SKIP.get(ARCH, [])) tmp = model @@ -997,12 +996,12 @@ def convert_model_names(model: LazyModel, params: Params) -> LazyModel: return out -def nth_multifile_path(path: Path, n: int) -> Optional[Path]: +def nth_multifile_path(path: Path, n: int) -> Path | None: '''Given any path belonging to a multi-file model (e.g. foo.bin.1), return the nth path in the model. ''' # Support the following patterns: - patterns: List[Tuple[str, str]] = [ + patterns: list[tuple[str, str]] = [ # - x.00.pth, x.01.pth, etc. (r'\.[0-9]{2}\.pth$', f'.{n:02}.pth'), # - x-00001-of-00002.bin, x-00002-of-00002.bin, etc. @@ -1018,11 +1017,11 @@ def nth_multifile_path(path: Path, n: int) -> Optional[Path]: return None -def find_multifile_paths(path: Path) -> List[Path]: +def find_multifile_paths(path: Path) -> list[Path]: '''Given any path belonging to a multi-file model (e.g. foo.bin.1), return the whole list of paths in the model. ''' - ret: List[Path] = [] + ret: list[Path] = [] for i in itertools.count(): nth_path = nth_multifile_path(path, i) if nth_path is None: @@ -1053,7 +1052,7 @@ def load_some_model(path: Path) -> ModelPlus: path = files[0] paths = find_multifile_paths(path) - models_plus: List[ModelPlus] = [] + models_plus: list[ModelPlus] = [] for path in paths: print(f"Loading model file {path}") models_plus.append(lazy_load_file(path)) @@ -1062,7 +1061,7 @@ def load_some_model(path: Path) -> ModelPlus: return model_plus -def load_vocab(path: Path, vocabtype: Optional[str]) -> Union[BpeVocab, SentencePieceVocab]: +def load_vocab(path: Path, vocabtype: str | None) -> Vocab: # Be extra-friendly and accept either a file or a directory. Also, if it's # a directory, it might be the model directory, and tokenizer.model might # be in the parent of that. @@ -1093,7 +1092,7 @@ def load_vocab(path: Path, vocabtype: Optional[str]) -> Union[BpeVocab, Sentence raise ValueError(f"Unsupported vocabulary type {vocabtype}") -def default_outfile(model_paths: List[Path], file_type: GGMLFileType) -> Path: +def default_outfile(model_paths: list[Path], file_type: GGMLFileType) -> Path: namestr = { GGMLFileType.AllF32: "f32", GGMLFileType.MostlyF16: "f16", @@ -1116,7 +1115,7 @@ def do_dump_model(model_plus: ModelPlus) -> None: print(f"{name}: shape={lazy_tensor.shape} type={lazy_tensor.data_type}; {lazy_tensor.description}") -def main(args_in: Optional[List[str]] = None) -> None: +def main(args_in: list[str] | None = None) -> None: parser = argparse.ArgumentParser(description="Convert a LLaMa model to a GGML compatible file") parser.add_argument("--dump", action="store_true", help="don't convert, just show what's in the model") parser.add_argument("--dump-single", action="store_true", help="don't convert, just show what's in a single model file") diff --git a/gguf-py/gguf/gguf.py b/gguf-py/gguf/gguf.py index c8b4e24ca..7a8e5afc1 100644 --- a/gguf-py/gguf/gguf.py +++ b/gguf-py/gguf/gguf.py @@ -10,7 +10,7 @@ import tempfile from enum import IntEnum, auto from io import BufferedWriter from pathlib import Path -from typing import IO, Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import IO, Any, BinaryIO, Callable, Sequence import numpy as np @@ -105,7 +105,7 @@ class MODEL_TENSOR(IntEnum): FFN_NORM : int = auto() -MODEL_ARCH_NAMES: Dict[MODEL_ARCH, str] = { +MODEL_ARCH_NAMES: dict[MODEL_ARCH, str] = { MODEL_ARCH.LLAMA: "llama", MODEL_ARCH.FALCON: "falcon", MODEL_ARCH.GPT2: "gpt2", @@ -114,7 +114,7 @@ MODEL_ARCH_NAMES: Dict[MODEL_ARCH, str] = { MODEL_ARCH.MPT: "mpt", } -MODEL_TENSOR_NAMES: Dict[MODEL_ARCH, Dict[MODEL_TENSOR, str]] = { +MODEL_TENSOR_NAMES: dict[MODEL_ARCH, dict[MODEL_TENSOR, str]] = { MODEL_ARCH.LLAMA: { MODEL_TENSOR.TOKEN_EMBD: "token_embd", MODEL_TENSOR.OUTPUT_NORM: "output_norm", @@ -160,7 +160,7 @@ MODEL_TENSOR_NAMES: Dict[MODEL_ARCH, Dict[MODEL_TENSOR, str]] = { } # tensors that will not be serialized -MODEL_TENSOR_SKIP: Dict[MODEL_ARCH, List[MODEL_TENSOR]] = { +MODEL_TENSOR_SKIP: dict[MODEL_ARCH, list[MODEL_TENSOR]] = { MODEL_ARCH.LLAMA: [ MODEL_TENSOR.ROPE_FREQS, MODEL_TENSOR.ATTN_ROT_EMBD, @@ -169,7 +169,7 @@ MODEL_TENSOR_SKIP: Dict[MODEL_ARCH, List[MODEL_TENSOR]] = { class TensorNameMap: - mappings_cfg: Dict[MODEL_TENSOR, Tuple[str, ...]] = { + mappings_cfg: dict[MODEL_TENSOR, tuple[str, ...]] = { # Token embeddings MODEL_TENSOR.TOKEN_EMBD: ( "gpt_neox.embed_in", # gptneox @@ -205,7 +205,7 @@ class TensorNameMap: ), } - block_mappings_cfg: Dict[MODEL_TENSOR, Tuple[str, ...]] = { + block_mappings_cfg: dict[MODEL_TENSOR, tuple[str, ...]] = { # Attention norm MODEL_TENSOR.ATTN_NORM: ( "gpt_neox.layers.{bid}.input_layernorm", # gptneox @@ -300,9 +300,9 @@ class TensorNameMap: ), } - mapping: Dict[str, Tuple[MODEL_TENSOR, str]] + mapping: dict[str, tuple[MODEL_TENSOR, str]] - tensor_names: Dict[MODEL_TENSOR, str] + tensor_names: dict[MODEL_TENSOR, str] def __init__(self, arch: MODEL_ARCH, n_blocks: int): mapping = self.mapping = {} @@ -323,7 +323,7 @@ class TensorNameMap: key = key.format(bid = bid) mapping[key] = (tensor, tensor_name) - def get_type_and_name(self, key: str, try_suffixes: Sequence[str]) -> Optional[Tuple[MODEL_TENSOR, str]]: + def get_type_and_name(self, key: str, try_suffixes: Sequence[str]) -> tuple[MODEL_TENSOR, str] | None: result = self.mapping.get(key) if result is not None: return result @@ -334,13 +334,13 @@ class TensorNameMap: return (result[0], result[1] + suffix) return None - def get_name(self, key: str, try_suffixes: Sequence[str]) -> Optional[str]: + def get_name(self, key: str, try_suffixes: Sequence[str]) -> str | None: result = self.get_type_and_name(key, try_suffixes = try_suffixes) if result is None: return None return result[1] - def get_type(self, key: str, try_suffixes: Sequence[str]) -> Optional[MODEL_TENSOR]: + def get_type(self, key: str, try_suffixes: Sequence[str]) -> MODEL_TENSOR | None: result = self.get_type_and_name(key, try_suffixes = try_suffixes) if result is None: return None @@ -434,10 +434,10 @@ class GGUFWriter: ti_data = b"" ti_data_count = 0 use_temp_file: bool - temp_file: Optional[tempfile.SpooledTemporaryFile[bytes]] = None - tensors: List[Tuple[np.ndarray[Any, Any], int]] + temp_file: tempfile.SpooledTemporaryFile[bytes] | None = None + tensors: list[tuple[np.ndarray[Any, Any], int]] - def __init__(self, path: Union[os.PathLike[str], str], arch: str, use_temp_file = True): + def __init__(self, path: os.PathLike[str] | str, arch: str, use_temp_file = True): self.fout = open(path, "wb") self.arch = arch self.add_architecture() @@ -533,7 +533,7 @@ class GGUFWriter: GGUFValueType.FLOAT64: " int: return ((x + n - 1) // n) * n - def add_tensor_info(self, name: str, tensor_shape: Sequence[int], tensor_dtype: Union[np.dtype[np.float16], np.dtype[np.float32]], tensor_nbytes: int, raw_dtype: Optional[GGMLQuantizationType] = None): + def add_tensor_info(self, name: str, tensor_shape: Sequence[int], tensor_dtype: np.dtype[np.float16] | np.dtype[np.float32], tensor_nbytes: int, raw_dtype: GGMLQuantizationType | None = None): assert raw_dtype is not None or tensor_dtype in (np.float32, np.float16), "Only F32 and F16 tensors are supported for now" encoded_name = name.encode("utf8") @@ -582,7 +582,7 @@ class GGUFWriter: self.offset_tensor += GGUFWriter.ggml_pad(tensor_nbytes, self.data_alignment) self.ti_data_count += 1 - def add_tensor(self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Optional[Sequence[int]] = None, raw_dtype: Optional[GGMLQuantizationType] = None): + def add_tensor(self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequence[int] | None = None, raw_dtype: GGMLQuantizationType | None = None): if self.use_temp_file and self.temp_file is None: fp = tempfile.SpooledTemporaryFile(mode="w+b", max_size=256*1024*1024) fp.seek(0) @@ -602,7 +602,7 @@ class GGUFWriter: if pad != 0: self.temp_file.write(bytes([0] * pad)) - def write_padding(self, fp: BinaryIO, n: int, align: Optional[int] = None): + def write_padding(self, fp: BinaryIO, n: int, align: int | None = None): pad = GGUFWriter.ggml_pad(n, align if align is not None else self.data_alignment) - n if pad != 0: fp.write(bytes([0] * pad)) @@ -728,13 +728,13 @@ class GGUFWriter: def add_tokenizer_model(self, model: str): self.add_string(KEY_TOKENIZER_MODEL, model) - def add_token_list(self, tokens: Union[Sequence[str], Sequence[bytes], Sequence[bytearray]]): + def add_token_list(self, tokens: Sequence[str] | Sequence[bytes] | Sequence[bytearray]): self.add_array(KEY_TOKENIZER_LIST, tokens) - def add_token_merges(self, merges: Union[Sequence[str], Sequence[bytes], Sequence[bytearray]]): + def add_token_merges(self, merges: Sequence[str] | Sequence[bytes] | Sequence[bytearray]): self.add_array(KEY_TOKENIZER_MERGES, merges) - def add_token_types(self, types: Union[Sequence[TokenType], Sequence[int]]): + def add_token_types(self, types: Sequence[TokenType] | Sequence[int]): self.add_array(KEY_TOKENIZER_TOKEN_TYPE, types) def add_token_scores(self, scores: Sequence[float]): @@ -758,11 +758,11 @@ class GGUFWriter: class SpecialVocab: load_merges: bool = False - merges: List[str] = [] - special_token_types: Tuple[str, ...] = tuple(('bos', 'eos', 'unk', 'sep', 'pad')) - special_token_ids: Dict[str, int] = {} + merges: list[str] = [] + special_token_types: tuple[str, ...] = tuple(('bos', 'eos', 'unk', 'sep', 'pad')) + special_token_ids: dict[str, int] = {} - def __init__(self, path: Path, load_merges: bool = False, special_token_types: Optional[Tuple[str, ...]] = None): + def __init__(self, path: Path, load_merges: bool = False, special_token_types: tuple[str, ...] | None = None): self.special_token_ids = {} self.load_merges = load_merges if special_token_types is not None: @@ -823,7 +823,7 @@ class SpecialVocab: print(f'gguf: Adding {len(self.merges)} merge(s).') gw.add_token_merges(self.merges) for typ, tokid in self.special_token_ids.items(): - handler: Optional[Callable[[int], None]] = getattr(gw, f'add_{typ}_token_id', None) + handler: Callable[[int], None] | None = getattr(gw, f'add_{typ}_token_id', None) if handler is None: print(f'gguf: WARNING: No handler for special token type {typ} with id {tokid} - skipping') continue