Split BPE and SentencePiece vocabularies

This commit is contained in:
goerch 2023-08-08 07:23:01 +02:00
parent 38fbb74038
commit f1f85de815

View file

@ -238,22 +238,58 @@ class Params:
return params
class SentencePieceVocab:
def __init__(self, fname_tokenizer: Path, fname_added_tokens: Optional[Path], vocabtype: Optional[str]) -> None:
self.vocabtype = vocabtype
if self.vocabtype == "bpe":
self.sentencepiece_tokenizer = json.loads(open(str(fname_tokenizer), encoding="utf-8").read())
else:
self.sentencepiece_tokenizer = SentencePieceProcessor(str(fname_tokenizer))
class BpeVocab:
def __init__(self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]) -> None:
self.bpe_tokenizer = json.loads(open(str(fname_tokenizer), encoding="utf-8").read())
added_tokens: Dict[str, int]
if fname_added_tokens is not None:
added_tokens = json.load(open(fname_added_tokens, encoding="utf-8"))
else:
added_tokens = {}
if self.vocabtype == "bpe":
vocab_size: int = len(self.sentencepiece_tokenizer)
vocab_size: int = len(self.bpe_tokenizer)
expected_ids = list(range(vocab_size, vocab_size + len(added_tokens)))
actual_ids = sorted(added_tokens.values())
if expected_ids != actual_ids:
raise Exception(f"Expected added token IDs to be sequential and start at {len(added_tokens)}; got {actual_ids}")
items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1])
self.added_tokens_list = [text for (text, idx) in items]
self.vocab_size_base: int = vocab_size
self.vocab_size: int = self.vocab_size_base + len(self.added_tokens_list)
self.fname_tokenizer = fname_tokenizer
self.fname_added_tokens = fname_added_tokens
def bpe_tokens(self) -> Iterable[Tuple[bytes, float]]:
tokenizer = self.bpe_tokenizer
from transformers.models.gpt2 import tokenization_gpt2
byte_encoder = tokenization_gpt2.bytes_to_unicode()
byte_decoder = {v: k for k, v in byte_encoder.items()}
for i, item in enumerate(tokenizer):
text: bytes = item.encode("utf-8")
score: float = -i
yield text, score
def added_tokens(self) -> Iterable[Tuple[bytes, float]]:
for text in self.added_tokens_list:
score = -1000.0
yield text.encode("utf-8"), score
def all_tokens(self) -> Iterable[Tuple[bytes, float]]:
yield from self.bpe_tokens()
yield from self.added_tokens()
def __repr__(self) -> str:
return f"BpeVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
class SentencePieceVocab:
def __init__(self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]) -> None:
self.sentencepiece_tokenizer = SentencePieceProcessor(str(fname_tokenizer))
added_tokens: Dict[str, int]
if fname_added_tokens is not None:
added_tokens = json.load(open(fname_added_tokens, encoding="utf-8"))
else:
vocab_size: int = self.sentencepiece_tokenizer.vocab_size()
added_tokens = {}
vocab_size: int = self.sentencepiece_tokenizer.vocab_size()
expected_ids = list(range(vocab_size, vocab_size + len(added_tokens)))
actual_ids = sorted(added_tokens.values())
if expected_ids != actual_ids:
@ -267,20 +303,11 @@ class SentencePieceVocab:
def sentencepiece_tokens(self) -> Iterable[Tuple[bytes, float]]:
tokenizer = self.sentencepiece_tokenizer
if self.vocabtype == "bpe":
from transformers.models.gpt2 import tokenization_gpt2
byte_encoder = tokenization_gpt2.bytes_to_unicode()
byte_decoder = {v: k for k, v in byte_encoder.items()}
for i, item in enumerate(tokenizer):
text: bytes = item.encode("utf-8")
score: float = -i
for i in range(tokenizer.vocab_size()):
piece = tokenizer.id_to_piece(i)
text: bytes = piece.encode("utf-8")
score: float = tokenizer.get_score(i)
yield text, score
else:
for i in range(tokenizer.vocab_size()):
piece = tokenizer.id_to_piece(i)
text: bytes = piece.encode("utf-8")
score: float = tokenizer.get_score(i)
yield text, score
def added_tokens(self) -> Iterable[Tuple[bytes, float]]:
for text in self.added_tokens_list:
@ -307,7 +334,7 @@ class GGMLVocab:
return f"<GGMLVocab with {self.vocab_size} tokens>"
Vocab = Union[SentencePieceVocab, GGMLVocab]
Vocab = Union[BpeVocab, SentencePieceVocab, GGMLVocab]
def permute(weights: NDArray, n_head: int, n_kv_head: Optional[int] = None) -> NDArray:
@ -1032,7 +1059,7 @@ def bounded_parallel_map(func: Callable[[In], Out], iterable: Iterable[In], conc
def check_vocab_size(params: Params, vocab: Vocab) -> None:
if params.n_vocab != vocab.vocab_size:
# GGMLVocab comes from the same file as the model so shouldn't mismatch:
assert isinstance(vocab, SentencePieceVocab)
assert isinstance(vocab, BpeVocab) or isinstance(vocab, SentencePieceVocab)
if params.n_vocab == vocab.vocab_size_base:
print("Ignoring added_tokens.json since model matches vocab size without it.")
vocab.added_tokens_list = []
@ -1081,7 +1108,7 @@ class OutputFile:
@staticmethod
def write_vocab_only(fname_out: Path, vocab: Vocab) -> None:
of = OutputFile(fname_out)
params = Params(n_vocab=vocab.vocab_size, n_embd=0, n_mult=0, n_head=1, n_layer=0)
params = Params(n_vocab=vocab.vocab_size, n_embd=0, n_mult=0, n_head=1, n_layer=0, n_kv_head=None)
of = OutputFile(fname_out)
of.write_file_header(params, file_type=GGMLFileType.AllF32)
of.write_vocab(vocab)
@ -1216,7 +1243,7 @@ def filter_and_sort_tensors(model: LazyModel) -> LazyModel:
return {name: model[name] for name in TENSORS_LIST if name in model}
def load_vocab(path: Path, vocabtype: Optional[str]) -> SentencePieceVocab:
def load_vocab(path: Path, vocabtype: Optional[str]) -> Union[BpeVocab, SentencePieceVocab]:
print(f"vocabtype: {vocabtype}")
# Be extra-friendly and accept either a file or a directory. Also, if it's
# a directory, it might be the model directory, and tokenizer.model might
@ -1238,8 +1265,12 @@ def load_vocab(path: Path, vocabtype: Optional[str]) -> SentencePieceVocab:
"if it's in another directory, pass the directory as --vocab-dir")
added_tokens_path = path.parent / "added_tokens.json"
print(f"Loading vocab file {path}")
return SentencePieceVocab(path, added_tokens_path if added_tokens_path.exists() else None,
vocabtype)
if vocabtype == "bpe":
return BpeVocab(path, added_tokens_path if added_tokens_path.exists() else None)
elif vocabtype == "spm":
return SentencePieceVocab(path, added_tokens_path if added_tokens_path.exists() else None)
else:
raise ValueError(f"Unsupported vocabulary type {vocabtype}")
def default_outfile(model_paths: List[Path], file_type: GGMLFileType) -> Path: