refactor: Add a custom tokenizer component and fix vocab request class

This commit is contained in:
teleprint-me 2024-05-24 01:30:29 -04:00
parent e62e09bbb1
commit 6c9ac0fc52
No known key found for this signature in database
GPG key ID: B0D11345E65C4D48

View file

@ -5,6 +5,7 @@ import pathlib
from hashlib import sha256
import requests
from sentencepiece import SentencePieceProcessor
from transformers import AutoTokenizer
from .constants import (
@ -103,6 +104,71 @@ class HFHubBase:
self._model_path = value
class HFTokenizer(HFHubBase):
def __init__(self, model_path: str, auth_token: str, logger: logging.Logger):
super().__init__(model_path, auth_token, logger)
self._model_path = model_path
@staticmethod
def get_vocab_filenames(vocab_type: VocabType) -> tuple[str]:
if vocab_type == VocabType.SPM:
return HF_TOKENIZER_SPM_FILES
# NOTE: WPM and BPE are equivalent
return HF_TOKENIZER_BPE_FILES
@staticmethod
def get_vocab_name(vocab_type: VocabType) -> str:
return VOCAB_TYPE_NAMES.get(vocab_type)
@staticmethod
def get_vocab_enum(vocab_name: str) -> VocabType:
return {
"SPM": VocabType.SPM,
"BPE": VocabType.BPE,
"WPM": VocabType.WPM,
}.get(vocab_name, VocabType.NON)
def config(self, model_repo: str) -> dict[str, object]:
path = self.model_path / model_repo / "config.json"
with path.read_text(encoding='utf-8') as file:
return json.loads(file)
def tokenizer_config(self, model_repo: str) -> dict[str, object]:
path = self.model_path / model_repo / "tokenizer_config.json"
with path.read_text(encoding='utf-8') as file:
return json.loads(file)
def tokenizer_json(self, model_repo: str) -> dict[str, object]:
path = self.model_path / model_repo / "tokenizer.json"
with path.read_text(encoding='utf-8') as file:
return json.loads(file)
def tokenizer_model(self, model_repo: str) -> SentencePieceProcessor:
path = self.model_path / model_repo / "tokenizer.model"
processor = SentencePieceProcessor()
processor.LoadFromFile(path.read_bytes())
return processor
def get_tokenizer_json_hash(self, model_repo: str) -> str:
tokenizer = self.tokenizer_json(model_repo)
tokenizer_path = self.model_path / model_repo / "tokenizer.json"
sha256sum = sha256(str(tokenizer).encode()).hexdigest()
self.logger.info(f"Hashed '{tokenizer_path}' as {sha256sum}")
return sha256sum
def log_tokenizer_json_info(self, model_repo: str) -> None:
tokenizer = self.tokenizer_json(model_repo)
self.logger.info(f"JSON:ModelRepo: {model_repo}")
for k, v in tokenizer.get("model", {}).items():
if k == "vocab":
continue # NOTE: Do not pollute the output
self.logger.info(f"JSON:Model: {k}: {json.dumps(v, indent=2)}")
for k, v in tokenizer.get("normalizer", {}).items():
self.logger.info(f"JSON:Normalizer: {k}: {json.dumps(v, indent=2)}")
for k, v in tokenizer.get("pre_tokenizer", {}).items():
self.logger.info(f"JSON:PreTokenizer: {k}: {json.dumps(v, indent=2)}")
class HFVocabRequest(HFHubBase):
def __init__(
self,
@ -111,94 +177,45 @@ class HFVocabRequest(HFHubBase):
logger: None | logging.Logger
):
super().__init__(model_path, auth_token, logger)
self._tokenizer = HFTokenizer(model_path, auth_token, logger)
@property
def tokenizer_type(self) -> VocabType:
return VocabType
@property
def tokenizer_path(self) -> pathlib.Path:
return self.model_path / "tokenizer.json"
def get_vocab_name(self, vocab_type: VocabType) -> str:
return VOCAB_TYPE_NAMES.get(vocab_type)
def get_vocab_enum(self, vocab_name: str) -> VocabType:
return {
"SPM": VocabType.SPM,
"BPE": VocabType.BPE,
"WPM": VocabType.WPM,
}.get(vocab_name, VocabType.NON)
def get_vocab_filenames(self, vocab_type: VocabType) -> tuple[str]:
if vocab_type == self.tokenizer_type.SPM:
return HF_TOKENIZER_SPM_FILES
# NOTE: WPM and BPE are equivalent
return HF_TOKENIZER_BPE_FILES
def tokenizer(self) -> HFTokenizer:
return self._tokenizer
def get_vocab_file(
self, model_repo: str, file_name: str, file_path: pathlib.Path,
) -> bool:
# NOTE: Do not use bare exceptions! They mask issues!
# Allow the exception to occur or handle it explicitly.
# Allow the exception to occur or explicitly handle it.
resolve_url = self.hub.resolve_url(model_repo, file_name)
response = self.hub.download_file(resolve_url)
self.hub.write_file(response.content, file_path)
self.logger.info(f"Downloaded tokenizer {file_name} from {model_repo}")
def get_all_vocab_files(self, model_repo: str, vocab_type: VocabType) -> None:
vocab_list = self.get_vocab_filenames(vocab_type)
vocab_list = HFTokenizer.get_vocab_filenames(vocab_type)
for vocab_file in vocab_list:
dir_path = self.model_path / model_repo
file_path = dir_path / vocab_file
os.makedirs(dir_path, exist_ok=True)
self.get_vocab_file(model_repo, vocab_file, file_path)
def get_normalizer(self) -> None | dict[str, object]:
with open(self.tokenizer_path, mode="r") as file:
tokenizer_json = json.load(file)
return tokenizer_json.get("normalizer")
def get_normalizer(self, model_repo: str) -> None | dict[str, object]:
normalizer = self.tokenizer.tokenizer_json(model_repo).get("normalizer", dict())
if normalizer:
self.logger.info(f"JSON:Normalizer: {json.dumps(normalizer, indent=2)}")
else:
self.logger.warn(f"WARN:Normalizer: {normalizer}")
return normalizer
def get_pre_tokenizer(self) -> None | dict[str, object]:
with open(self.tokenizer_path, mode="r") as file:
tokenizer_json = json.load(file)
return tokenizer_json.get("pre_tokenizer")
def generate_checksum(self) -> None:
checksums = []
for model in self.models:
mapping = {}
file_path = f"{self.model_path}/{model['repo']}"
try:
tokenizer = AutoTokenizer.from_pretrained(file_path, trust_remote=True)
except OSError as e:
self.logger.error(f"Failed to hash tokenizer {model['repo']}: {e}")
continue
mapping.update(model)
mapping['checksum'] = sha256(str(tokenizer.vocab).encode()).hexdigest()
self.logger.info(f"Hashed {mapping['repo']} as {mapping['checksum']}")
checksums.append(mapping)
with open(f"{self.model_path}/checksums.json", mode="w") as file:
json.dump(checksums, file)
def log_pre_tokenizer_info(self) -> None:
for model in self.models:
try:
with open(f"{self.model_path}/{model['repo']}/tokenizer.json", "r", encoding="utf-8") as f:
self.logger.info(f"Start: {model['repo']}")
cfg = json.load(f)
self.logger.info(f"normalizer: {json.dumps(cfg['normalizer'], indent=4)}")
self.logger.info(f"pre_tokenizer: {json.dumps(cfg['pre_tokenizer'], indent=4)}")
if "type" in cfg["model"]:
self.logger.info(f"type: {json.dumps(cfg['model']['type'])}")
if "ignore_merges" in cfg["model"]:
self.logger.info(f"ignore_merges: {json.dumps(cfg['model']['ignore_merges'], indent=4)}")
self.logger.info(f"End: {model['repo']}")
except FileNotFoundError as e:
self.logger.error(f"Failed to log tokenizer {model['repo']}: {e}")
def get_pre_tokenizer(self, model_repo: str) -> None | dict[str, object]:
pre_tokenizer = self.tokenizer.tokenizer_json(model_repo).get("pre_tokenizer", dict())
if pre_tokenizer:
self.logger.info(f"JSON:PreTokenizer: {json.dumps(pre_tokenizer, indent=2)}")
else:
self.logger.warn(f"WARN:PreTokenizer: {pre_tokenizer}")
return pre_tokenizer
# TODO: