From 6c9ac0fc5211378a7eaa51769bdc20311864d082 Mon Sep 17 00:00:00 2001 From: teleprint-me <77757836+teleprint-me@users.noreply.github.com> Date: Fri, 24 May 2024 01:30:29 -0400 Subject: [PATCH] refactor: Add a custom tokenizer component and fix vocab request class --- gguf-py/gguf/huggingface_hub.py | 153 ++++++++++++++++++-------------- 1 file changed, 85 insertions(+), 68 deletions(-) diff --git a/gguf-py/gguf/huggingface_hub.py b/gguf-py/gguf/huggingface_hub.py index 1e5494e97..afd012882 100644 --- a/gguf-py/gguf/huggingface_hub.py +++ b/gguf-py/gguf/huggingface_hub.py @@ -5,6 +5,7 @@ import pathlib from hashlib import sha256 import requests +from sentencepiece import SentencePieceProcessor from transformers import AutoTokenizer from .constants import ( @@ -103,6 +104,71 @@ class HFHubBase: self._model_path = value +class HFTokenizer(HFHubBase): + def __init__(self, model_path: str, auth_token: str, logger: logging.Logger): + super().__init__(model_path, auth_token, logger) + self._model_path = model_path + + @staticmethod + def get_vocab_filenames(vocab_type: VocabType) -> tuple[str]: + if vocab_type == VocabType.SPM: + return HF_TOKENIZER_SPM_FILES + # NOTE: WPM and BPE are equivalent + return HF_TOKENIZER_BPE_FILES + + @staticmethod + def get_vocab_name(vocab_type: VocabType) -> str: + return VOCAB_TYPE_NAMES.get(vocab_type) + + @staticmethod + def get_vocab_enum(vocab_name: str) -> VocabType: + return { + "SPM": VocabType.SPM, + "BPE": VocabType.BPE, + "WPM": VocabType.WPM, + }.get(vocab_name, VocabType.NON) + + def config(self, model_repo: str) -> dict[str, object]: + path = self.model_path / model_repo / "config.json" + with path.read_text(encoding='utf-8') as file: + return json.loads(file) + + def tokenizer_config(self, model_repo: str) -> dict[str, object]: + path = self.model_path / model_repo / "tokenizer_config.json" + with path.read_text(encoding='utf-8') as file: + return json.loads(file) + + def tokenizer_json(self, model_repo: str) -> dict[str, object]: + path = self.model_path / model_repo / "tokenizer.json" + with path.read_text(encoding='utf-8') as file: + return json.loads(file) + + def tokenizer_model(self, model_repo: str) -> SentencePieceProcessor: + path = self.model_path / model_repo / "tokenizer.model" + processor = SentencePieceProcessor() + processor.LoadFromFile(path.read_bytes()) + return processor + + def get_tokenizer_json_hash(self, model_repo: str) -> str: + tokenizer = self.tokenizer_json(model_repo) + tokenizer_path = self.model_path / model_repo / "tokenizer.json" + sha256sum = sha256(str(tokenizer).encode()).hexdigest() + self.logger.info(f"Hashed '{tokenizer_path}' as {sha256sum}") + return sha256sum + + def log_tokenizer_json_info(self, model_repo: str) -> None: + tokenizer = self.tokenizer_json(model_repo) + self.logger.info(f"JSON:ModelRepo: {model_repo}") + for k, v in tokenizer.get("model", {}).items(): + if k == "vocab": + continue # NOTE: Do not pollute the output + self.logger.info(f"JSON:Model: {k}: {json.dumps(v, indent=2)}") + for k, v in tokenizer.get("normalizer", {}).items(): + self.logger.info(f"JSON:Normalizer: {k}: {json.dumps(v, indent=2)}") + for k, v in tokenizer.get("pre_tokenizer", {}).items(): + self.logger.info(f"JSON:PreTokenizer: {k}: {json.dumps(v, indent=2)}") + + class HFVocabRequest(HFHubBase): def __init__( self, @@ -111,94 +177,45 @@ class HFVocabRequest(HFHubBase): logger: None | logging.Logger ): super().__init__(model_path, auth_token, logger) + self._tokenizer = HFTokenizer(model_path, auth_token, logger) @property - def tokenizer_type(self) -> VocabType: - return VocabType - - @property - def tokenizer_path(self) -> pathlib.Path: - return self.model_path / "tokenizer.json" - - def get_vocab_name(self, vocab_type: VocabType) -> str: - return VOCAB_TYPE_NAMES.get(vocab_type) - - def get_vocab_enum(self, vocab_name: str) -> VocabType: - return { - "SPM": VocabType.SPM, - "BPE": VocabType.BPE, - "WPM": VocabType.WPM, - }.get(vocab_name, VocabType.NON) - - def get_vocab_filenames(self, vocab_type: VocabType) -> tuple[str]: - if vocab_type == self.tokenizer_type.SPM: - return HF_TOKENIZER_SPM_FILES - # NOTE: WPM and BPE are equivalent - return HF_TOKENIZER_BPE_FILES + def tokenizer(self) -> HFTokenizer: + return self._tokenizer def get_vocab_file( self, model_repo: str, file_name: str, file_path: pathlib.Path, ) -> bool: # NOTE: Do not use bare exceptions! They mask issues! - # Allow the exception to occur or handle it explicitly. + # Allow the exception to occur or explicitly handle it. resolve_url = self.hub.resolve_url(model_repo, file_name) response = self.hub.download_file(resolve_url) self.hub.write_file(response.content, file_path) self.logger.info(f"Downloaded tokenizer {file_name} from {model_repo}") def get_all_vocab_files(self, model_repo: str, vocab_type: VocabType) -> None: - vocab_list = self.get_vocab_filenames(vocab_type) + vocab_list = HFTokenizer.get_vocab_filenames(vocab_type) for vocab_file in vocab_list: dir_path = self.model_path / model_repo file_path = dir_path / vocab_file os.makedirs(dir_path, exist_ok=True) self.get_vocab_file(model_repo, vocab_file, file_path) - def get_normalizer(self) -> None | dict[str, object]: - with open(self.tokenizer_path, mode="r") as file: - tokenizer_json = json.load(file) - return tokenizer_json.get("normalizer") + def get_normalizer(self, model_repo: str) -> None | dict[str, object]: + normalizer = self.tokenizer.tokenizer_json(model_repo).get("normalizer", dict()) + if normalizer: + self.logger.info(f"JSON:Normalizer: {json.dumps(normalizer, indent=2)}") + else: + self.logger.warn(f"WARN:Normalizer: {normalizer}") + return normalizer - def get_pre_tokenizer(self) -> None | dict[str, object]: - with open(self.tokenizer_path, mode="r") as file: - tokenizer_json = json.load(file) - return tokenizer_json.get("pre_tokenizer") - - def generate_checksum(self) -> None: - checksums = [] - for model in self.models: - mapping = {} - file_path = f"{self.model_path}/{model['repo']}" - - try: - tokenizer = AutoTokenizer.from_pretrained(file_path, trust_remote=True) - except OSError as e: - self.logger.error(f"Failed to hash tokenizer {model['repo']}: {e}") - continue - - mapping.update(model) - mapping['checksum'] = sha256(str(tokenizer.vocab).encode()).hexdigest() - self.logger.info(f"Hashed {mapping['repo']} as {mapping['checksum']}") - checksums.append(mapping) - - with open(f"{self.model_path}/checksums.json", mode="w") as file: - json.dump(checksums, file) - - def log_pre_tokenizer_info(self) -> None: - for model in self.models: - try: - with open(f"{self.model_path}/{model['repo']}/tokenizer.json", "r", encoding="utf-8") as f: - self.logger.info(f"Start: {model['repo']}") - cfg = json.load(f) - self.logger.info(f"normalizer: {json.dumps(cfg['normalizer'], indent=4)}") - self.logger.info(f"pre_tokenizer: {json.dumps(cfg['pre_tokenizer'], indent=4)}") - if "type" in cfg["model"]: - self.logger.info(f"type: {json.dumps(cfg['model']['type'])}") - if "ignore_merges" in cfg["model"]: - self.logger.info(f"ignore_merges: {json.dumps(cfg['model']['ignore_merges'], indent=4)}") - self.logger.info(f"End: {model['repo']}") - except FileNotFoundError as e: - self.logger.error(f"Failed to log tokenizer {model['repo']}: {e}") + def get_pre_tokenizer(self, model_repo: str) -> None | dict[str, object]: + pre_tokenizer = self.tokenizer.tokenizer_json(model_repo).get("pre_tokenizer", dict()) + if pre_tokenizer: + self.logger.info(f"JSON:PreTokenizer: {json.dumps(pre_tokenizer, indent=2)}") + else: + self.logger.warn(f"WARN:PreTokenizer: {pre_tokenizer}") + return pre_tokenizer # TODO: