Add HFVocab into convert.py
This commit is contained in:
parent
11bff29045
commit
f7e377d62f
1 changed files with 53 additions and 4 deletions
57
convert.py
57
convert.py
|
@ -414,7 +414,54 @@ class SentencePieceVocab:
|
|||
def __repr__(self) -> str:
|
||||
return f"<SentencePieceVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
|
||||
|
||||
Vocab: TypeAlias = 'BpeVocab | SentencePieceVocab'
|
||||
class HFVocab:
|
||||
def __init__(self, fname_tokenizer: Path, fname_added_tokens: Path | None) -> None:
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(str(fname_tokenizer))
|
||||
|
||||
added_tokens: dict[str, int]
|
||||
if fname_added_tokens is not None:
|
||||
added_tokens = json.load(open(fname_added_tokens, encoding="utf-8"))
|
||||
else:
|
||||
added_tokens = {}
|
||||
|
||||
vocab_size: int = self.tokenizer.vocab_size
|
||||
|
||||
expected_ids = list(range(vocab_size, vocab_size + len(added_tokens)))
|
||||
actual_ids = sorted(added_tokens.values())
|
||||
if expected_ids != actual_ids:
|
||||
raise Exception(f"Expected added token IDs to be sequential and start at {len(added_tokens)}; got {actual_ids}")
|
||||
|
||||
items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1])
|
||||
self.added_tokens_list = [text for (text, idx) in items]
|
||||
self.vocab_size_base: int = vocab_size
|
||||
self.vocab_size: int = self.vocab_size_base + len(self.added_tokens_list)
|
||||
self.fname_tokenizer = fname_tokenizer
|
||||
self.fname_added_tokens = fname_added_tokens
|
||||
|
||||
def hf_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
||||
tokenizer = self.tokenizer
|
||||
reverse_vocab = {id: encoded_tok for encoded_tok, id in tokenizer.vocab.items()}
|
||||
byte_encoder = bytes_to_unicode()
|
||||
byte_decoder = {v: k for k, v in byte_encoder.items()}
|
||||
print(len(byte_decoder), byte_decoder)
|
||||
|
||||
for i in range(tokenizer.vocab_size):
|
||||
text = reverse_vocab[i].encode("utf-8")
|
||||
yield text, 0.0, gguf.TokenType.NORMAL
|
||||
|
||||
def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
||||
for text in self.added_tokens_list:
|
||||
score = -1000.0
|
||||
yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED
|
||||
|
||||
def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
||||
yield from self.hf_tokens()
|
||||
yield from self.added_tokens()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<HFVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
|
||||
|
||||
Vocab: TypeAlias = 'BpeVocab | SentencePieceVocab | HFVocab'
|
||||
|
||||
#
|
||||
# data loading
|
||||
|
@ -1084,6 +1131,8 @@ def load_vocab(path: Path, vocabtype: str | None) -> Vocab:
|
|||
return BpeVocab(path, added_tokens_path if added_tokens_path.exists() else None)
|
||||
elif vocabtype == "spm":
|
||||
return SentencePieceVocab(path, added_tokens_path if added_tokens_path.exists() else None)
|
||||
elif vocabtype == "hf":
|
||||
return HFVocab(path, added_tokens_path if added_tokens_path.exists() else None)
|
||||
else:
|
||||
raise ValueError(f"Unsupported vocabulary type {vocabtype}")
|
||||
|
||||
|
@ -1120,7 +1169,7 @@ def main(args_in: list[str] | None = None) -> None:
|
|||
parser.add_argument("--vocab-dir", type=Path, help="directory containing tokenizer.model, if separate from model file")
|
||||
parser.add_argument("--outfile", type=Path, help="path to write to; default: based on input")
|
||||
parser.add_argument("model", type=Path, help="directory containing model file, or model file itself (*.pth, *.pt, *.bin)")
|
||||
parser.add_argument("--vocabtype", choices=["spm", "bpe"], help="vocab format (default: spm)", default="spm")
|
||||
parser.add_argument("--vocabtype", choices=["spm", "bpe", "hf"], help="vocab format (default: spm)", default="spm")
|
||||
parser.add_argument("--ctx", type=int, help="model training context (default: based on input)")
|
||||
parser.add_argument("--concurrency", type=int, help=f"concurrency used for conversion (default: {DEFAULT_CONCURRENCY})", default = DEFAULT_CONCURRENCY)
|
||||
args = parser.parse_args(args_in)
|
||||
|
@ -1162,7 +1211,7 @@ def main(args_in: list[str] | None = None) -> None:
|
|||
assert args.outfile, "need --outfile if using --vocab-only"
|
||||
# FIXME: Try to respect vocab_dir somehow?
|
||||
vocab = load_vocab(args.vocab_dir or args.model, args.vocabtype)
|
||||
special_vocab = gguf.SpecialVocab(model_plus.paths[0].parent, load_merges = args.vocabtype == 'bpe')
|
||||
special_vocab = gguf.SpecialVocab(model_plus.paths[0].parent, load_merges = args.vocabtype in ('bpe', 'hf'))
|
||||
outfile = args.outfile
|
||||
OutputFile.write_vocab_only(outfile, params, vocab, special_vocab)
|
||||
print(f"Wrote {outfile}")
|
||||
|
@ -1174,7 +1223,7 @@ def main(args_in: list[str] | None = None) -> None:
|
|||
vocab_dir = args.vocab_dir if args.vocab_dir else model_plus.paths[0].parent
|
||||
vocab = load_vocab(vocab_dir, args.vocabtype)
|
||||
# FIXME: Try to respect vocab_dir somehow?
|
||||
special_vocab = gguf.SpecialVocab(model_plus.paths[0].parent, load_merges = args.vocabtype == 'bpe')
|
||||
special_vocab = gguf.SpecialVocab(model_plus.paths[0].parent, load_merges = args.vocabtype in ('bpe', 'hf'))
|
||||
|
||||
model = model_plus.model
|
||||
model = convert_model_names(model, params)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue