From 068d0793c47a901f569e76a2e41908c19efe8c56 Mon Sep 17 00:00:00 2001 From: Galunid Date: Fri, 24 May 2024 14:10:55 +0200 Subject: [PATCH] Move vocab thing to vocab.py --- examples/convert-no-torch.py | 304 +---------------------------------- gguf-py/gguf/vocab.py | 304 ++++++++++++++++++++++++++++++++++- 2 files changed, 305 insertions(+), 303 deletions(-) diff --git a/examples/convert-no-torch.py b/examples/convert-no-torch.py index da1247957..0b7cf7120 100755 --- a/examples/convert-no-torch.py +++ b/examples/convert-no-torch.py @@ -24,10 +24,10 @@ from abc import ABC, abstractmethod from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, IO, Iterable, Literal, Protocol, TypeVar, runtime_checkable, Optional +from typing import TYPE_CHECKING, Any, Callable, IO, Iterable, Literal, TypeVar, Optional import numpy as np -from sentencepiece import SentencePieceProcessor +from gguf import BaseVocab, Vocab, NoVocab, BpeVocab, SentencePieceVocab, LlamaHfVocab if 'NO_LOCAL_GGUF' not in os.environ: sys.path.insert(1, str(Path(__file__).parent / 'gguf-py')) @@ -380,306 +380,6 @@ class Metadata: return metadata -# -# vocab -# - - -@runtime_checkable -class BaseVocab(Protocol): - tokenizer_model: ClassVar[str] - name: ClassVar[str] - - -class NoVocab(BaseVocab): - tokenizer_model = "no_vocab" - name = "no_vocab" - - def __repr__(self) -> str: - return "" - - -@runtime_checkable -class Vocab(BaseVocab, Protocol): - vocab_size: int - added_tokens_dict: dict[str, int] - added_tokens_list: list[str] - fname_tokenizer: Path - - def __init__(self, base_path: Path): ... - def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: ... - - -class BpeVocab(Vocab): - tokenizer_model = "gpt2" - name = "bpe" - - def __init__(self, base_path: Path): - added_tokens: dict[str, int] = {} - - if (fname_tokenizer := base_path / 'vocab.json').exists(): - # "slow" tokenizer - with open(fname_tokenizer, encoding="utf-8") as f: - self.vocab = json.load(f) - - try: - # FIXME: Verify that added tokens here _cannot_ overlap with the main vocab. - with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f: - added_tokens = json.load(f) - except FileNotFoundError: - pass - else: - # "fast" tokenizer - fname_tokenizer = base_path / FAST_TOKENIZER_FILE - - # if this fails, FileNotFoundError propagates to caller - with open(fname_tokenizer, encoding="utf-8") as f: - tokenizer_json = json.load(f) - - tokenizer_model: dict[str, Any] = tokenizer_json['model'] - if ( - tokenizer_model['type'] != 'BPE' or tokenizer_model.get('byte_fallback', False) - or tokenizer_json['decoder']['type'] != 'ByteLevel' - ): - raise FileNotFoundError('Cannot find GPT-2 BPE tokenizer') - - self.vocab = tokenizer_model["vocab"] - - if (added := tokenizer_json.get('added_tokens')) is not None: - # Added tokens here can be duplicates of the main vocabulary. - added_tokens = {item['content']: item['id'] - for item in added - if item['content'] not in self.vocab} - - vocab_size = len(self.vocab) - expected_ids = list(range(vocab_size, vocab_size + len(added_tokens))) - actual_ids = sorted(added_tokens.values()) - if expected_ids != actual_ids: - expected_end_id = vocab_size + len(actual_ids) - 1 - raise ValueError(f"Expected the {len(actual_ids)} added token ID(s) to be sequential in the range " - f"{vocab_size} - {expected_end_id}; got {actual_ids}") - - items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1]) - self.added_tokens_dict = added_tokens - self.added_tokens_list = [text for (text, idx) in items] - self.vocab_size_base = vocab_size - self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) - self.fname_tokenizer = fname_tokenizer - - def bpe_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - reverse_vocab = {id: encoded_tok for encoded_tok, id in self.vocab.items()} - - for i, _ in enumerate(self.vocab): - yield reverse_vocab[i], 0.0, gguf.TokenType.NORMAL - - def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - for text in self.added_tokens_list: - score = -1000.0 - yield text.encode("utf-8"), score, gguf.TokenType.CONTROL - - def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - yield from self.bpe_tokens() - yield from self.added_tokens() - - def __repr__(self) -> str: - return f"" - - -class SentencePieceVocab(Vocab): - tokenizer_model = "llama" - name = "spm" - - def __init__(self, base_path: Path): - added_tokens: dict[str, int] = {} - if (fname_tokenizer := base_path / 'tokenizer.model').exists(): - # normal location - try: - with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f: - added_tokens = json.load(f) - except FileNotFoundError: - pass - elif not (fname_tokenizer := base_path.parent / 'tokenizer.model').exists(): - # not found in alternate location either - raise FileNotFoundError('Cannot find tokenizer.model') - - self.sentencepiece_tokenizer = SentencePieceProcessor() - self.sentencepiece_tokenizer.LoadFromFile(str(fname_tokenizer)) - vocab_size = self.sentencepiece_tokenizer.vocab_size() - - new_tokens = {id: piece for piece, id in added_tokens.items() if id >= vocab_size} - expected_new_ids = list(range(vocab_size, vocab_size + len(new_tokens))) - actual_new_ids = sorted(new_tokens.keys()) - - if expected_new_ids != actual_new_ids: - raise ValueError(f"Expected new token IDs {expected_new_ids} to be sequential; got {actual_new_ids}") - - # Token pieces that were added to the base vocabulary. - self.added_tokens_dict = added_tokens - self.added_tokens_list = [new_tokens[id] for id in actual_new_ids] - self.vocab_size_base = vocab_size - self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) - self.fname_tokenizer = fname_tokenizer - - def sentencepiece_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - tokenizer = self.sentencepiece_tokenizer - for i in range(tokenizer.vocab_size()): - piece = tokenizer.IdToPiece(i) - text = piece.encode("utf-8") - score: float = tokenizer.GetScore(i) - - toktype = gguf.TokenType.NORMAL - if tokenizer.IsUnknown(i): - toktype = gguf.TokenType.UNKNOWN - if tokenizer.IsControl(i): - toktype = gguf.TokenType.CONTROL - - # NOTE: I think added_tokens are user defined. - # ref: https://github.com/google/sentencepiece/blob/master/src/sentencepiece_model.proto - # if tokenizer.is_user_defined(i): toktype = gguf.TokenType.USER_DEFINED - - if tokenizer.IsUnused(i): - toktype = gguf.TokenType.UNUSED - if tokenizer.IsByte(i): - toktype = gguf.TokenType.BYTE - - yield text, score, toktype - - def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - for text in self.added_tokens_list: - score = -1000.0 - yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED - - def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - yield from self.sentencepiece_tokens() - yield from self.added_tokens() - - def __repr__(self) -> str: - return f"" - - -class LlamaHfVocab(Vocab): - tokenizer_model = "llama" - name = "hfft" - - def __init__(self, base_path: Path): - fname_tokenizer = base_path / FAST_TOKENIZER_FILE - # if this fails, FileNotFoundError propagates to caller - with open(fname_tokenizer, encoding='utf-8') as f: - tokenizer_json = json.load(f) - - # pre-check so we know if we need transformers - tokenizer_model: dict[str, Any] = tokenizer_json['model'] - is_llama3 = ( - tokenizer_model['type'] == 'BPE' and tokenizer_model.get('ignore_merges', False) - and not tokenizer_model.get('byte_fallback', True) - ) - if is_llama3: - raise TypeError('Llama 3 must be converted with BpeVocab') - - if not is_llama3 and ( - tokenizer_model['type'] != 'BPE' or not tokenizer_model.get('byte_fallback', False) - or tokenizer_json['decoder']['type'] != 'Sequence' - ): - raise FileNotFoundError('Cannot find Llama BPE tokenizer') - - try: - from transformers import AutoTokenizer - except ImportError as e: - raise ImportError( - "To use LlamaHfVocab, please install the `transformers` package. " - "You can install it with `pip install transformers`." - ) from e - - # Allow the tokenizer to default to slow or fast versions. - # Explicitly set tokenizer to use local paths. - self.tokenizer = AutoTokenizer.from_pretrained( - base_path, - cache_dir=base_path, - local_files_only=True, - ) - assert self.tokenizer.is_fast # assume tokenizer.json is used - - # Initialize lists and dictionaries for added tokens - self.added_tokens_list = [] - self.added_tokens_dict = dict() - self.added_tokens_ids = set() - - # Process added tokens - for tok, tokidx in sorted( - self.tokenizer.get_added_vocab().items(), key=lambda x: x[1] - ): - # Only consider added tokens that are not in the base vocabulary - if tokidx >= self.tokenizer.vocab_size: - self.added_tokens_list.append(tok) - self.added_tokens_dict[tok] = tokidx - self.added_tokens_ids.add(tokidx) - - # Store special tokens and their IDs - self.specials = { - tok: self.tokenizer.get_vocab()[tok] - for tok in self.tokenizer.all_special_tokens - } - self.special_ids = set(self.tokenizer.all_special_ids) - - # Set vocabulary sizes - self.vocab_size_base = self.tokenizer.vocab_size - self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) - - self.fname_tokenizer = fname_tokenizer - - def hf_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - reverse_vocab = { - id: encoded_tok for encoded_tok, id in self.tokenizer.get_vocab().items() - } - - for token_id in range(self.vocab_size_base): - # Skip processing added tokens here - if token_id in self.added_tokens_ids: - continue - - # Convert token text to bytes - token_text = reverse_vocab[token_id].encode("utf-8") - - # Yield token text, score, and type - yield token_text, self.get_token_score(token_id), self.get_token_type( - token_id, token_text, self.special_ids # Reuse already stored special IDs - ) - - def get_token_type(self, token_id: int, token_text: bytes, special_ids: set[int]) -> gguf.TokenType: - # Special case for byte tokens - if re.fullmatch(br"<0x[0-9A-Fa-f]{2}>", token_text): - return gguf.TokenType.BYTE - - # Determine token type based on whether it's a special token - return gguf.TokenType.CONTROL if token_id in special_ids else gguf.TokenType.NORMAL - - def get_token_score(self, token_id: int) -> float: - # Placeholder for actual logic to determine the token's score - # This needs to be implemented based on specific requirements - return -1000.0 # Default score - - def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - for text in self.added_tokens_list: - if text in self.specials: - toktype = self.get_token_type(self.specials[text], b'', self.special_ids) - score = self.get_token_score(self.specials[text]) - else: - toktype = gguf.TokenType.USER_DEFINED - score = -1000.0 - - yield text.encode("utf-8"), score, toktype - - def has_newline_token(self): - return "<0x0A>" in self.tokenizer.vocab or "\n" in self.tokenizer.vocab - - def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: - yield from self.hf_tokens() - yield from self.added_tokens() - - def __repr__(self) -> str: - return f"" - - # # data loading # TODO: reuse (probably move to gguf.py?) diff --git a/gguf-py/gguf/vocab.py b/gguf-py/gguf/vocab.py index 3ba99be4f..9854b6607 100644 --- a/gguf-py/gguf/vocab.py +++ b/gguf-py/gguf/vocab.py @@ -1,15 +1,22 @@ from __future__ import annotations +import re import logging import json import os from pathlib import Path -from typing import Any, Callable, Sequence, Mapping, Iterable +from typing import Any, Callable, Sequence, Mapping, Iterable, Protocol, ClassVar, runtime_checkable + +from sentencepiece import SentencePieceProcessor + +import gguf from .gguf_writer import GGUFWriter logger = logging.getLogger(__name__) +ADDED_TOKENS_FILE = 'added_tokens.json' +FAST_TOKENIZER_FILE = 'tokenizer.json' class SpecialVocab: merges: list[str] @@ -163,3 +170,298 @@ class SpecialVocab: for typ in self.special_token_types: self._set_special_token(typ, config.get(f'{typ}_token_id')) return True + + +@runtime_checkable +class BaseVocab(Protocol): + tokenizer_model: ClassVar[str] + name: ClassVar[str] + + +@runtime_checkable +class Vocab(BaseVocab, Protocol): + vocab_size: int + added_tokens_dict: dict[str, int] + added_tokens_list: list[str] + fname_tokenizer: Path + + def __init__(self, base_path: Path): ... + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: ... + + +class NoVocab(BaseVocab): + tokenizer_model = "no_vocab" + name = "no_vocab" + + def __repr__(self) -> str: + return "" + + +class BpeVocab(Vocab): + tokenizer_model = "gpt2" + name = "bpe" + + def __init__(self, base_path: Path): + added_tokens: dict[str, int] = {} + + if (fname_tokenizer := base_path / 'vocab.json').exists(): + # "slow" tokenizer + with open(fname_tokenizer, encoding="utf-8") as f: + self.vocab = json.load(f) + + try: + # FIXME: Verify that added tokens here _cannot_ overlap with the main vocab. + with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f: + added_tokens = json.load(f) + except FileNotFoundError: + pass + else: + # "fast" tokenizer + fname_tokenizer = base_path / FAST_TOKENIZER_FILE + + # if this fails, FileNotFoundError propagates to caller + with open(fname_tokenizer, encoding="utf-8") as f: + tokenizer_json = json.load(f) + + tokenizer_model: dict[str, Any] = tokenizer_json['model'] + if ( + tokenizer_model['type'] != 'BPE' or tokenizer_model.get('byte_fallback', False) + or tokenizer_json['decoder']['type'] != 'ByteLevel' + ): + raise FileNotFoundError('Cannot find GPT-2 BPE tokenizer') + + self.vocab = tokenizer_model["vocab"] + + if (added := tokenizer_json.get('added_tokens')) is not None: + # Added tokens here can be duplicates of the main vocabulary. + added_tokens = {item['content']: item['id'] + for item in added + if item['content'] not in self.vocab} + + vocab_size = len(self.vocab) + expected_ids = list(range(vocab_size, vocab_size + len(added_tokens))) + actual_ids = sorted(added_tokens.values()) + if expected_ids != actual_ids: + expected_end_id = vocab_size + len(actual_ids) - 1 + raise ValueError(f"Expected the {len(actual_ids)} added token ID(s) to be sequential in the range " + f"{vocab_size} - {expected_end_id}; got {actual_ids}") + + items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1]) + self.added_tokens_dict = added_tokens + self.added_tokens_list = [text for (text, idx) in items] + self.vocab_size_base = vocab_size + self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) + self.fname_tokenizer = fname_tokenizer + + def bpe_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + reverse_vocab = {id: encoded_tok for encoded_tok, id in self.vocab.items()} + + for i, _ in enumerate(self.vocab): + yield reverse_vocab[i], 0.0, gguf.TokenType.NORMAL + + def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + for text in self.added_tokens_list: + score = -1000.0 + yield text.encode("utf-8"), score, gguf.TokenType.CONTROL + + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + yield from self.bpe_tokens() + yield from self.added_tokens() + + def __repr__(self) -> str: + return f"" + + +class SentencePieceVocab(Vocab): + tokenizer_model = "llama" + name = "spm" + + def __init__(self, base_path: Path): + added_tokens: dict[str, int] = {} + if (fname_tokenizer := base_path / 'tokenizer.model').exists(): + # normal location + try: + with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f: + added_tokens = json.load(f) + except FileNotFoundError: + pass + elif not (fname_tokenizer := base_path.parent / 'tokenizer.model').exists(): + # not found in alternate location either + raise FileNotFoundError('Cannot find tokenizer.model') + + self.sentencepiece_tokenizer = SentencePieceProcessor() + self.sentencepiece_tokenizer.LoadFromFile(str(fname_tokenizer)) + vocab_size = self.sentencepiece_tokenizer.vocab_size() + + new_tokens = {id: piece for piece, id in added_tokens.items() if id >= vocab_size} + expected_new_ids = list(range(vocab_size, vocab_size + len(new_tokens))) + actual_new_ids = sorted(new_tokens.keys()) + + if expected_new_ids != actual_new_ids: + raise ValueError(f"Expected new token IDs {expected_new_ids} to be sequential; got {actual_new_ids}") + + # Token pieces that were added to the base vocabulary. + self.added_tokens_dict = added_tokens + self.added_tokens_list = [new_tokens[id] for id in actual_new_ids] + self.vocab_size_base = vocab_size + self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) + self.fname_tokenizer = fname_tokenizer + + def sentencepiece_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + tokenizer = self.sentencepiece_tokenizer + for i in range(tokenizer.vocab_size()): + piece = tokenizer.IdToPiece(i) + text = piece.encode("utf-8") + score: float = tokenizer.GetScore(i) + + toktype = gguf.TokenType.NORMAL + if tokenizer.IsUnknown(i): + toktype = gguf.TokenType.UNKNOWN + if tokenizer.IsControl(i): + toktype = gguf.TokenType.CONTROL + + # NOTE: I think added_tokens are user defined. + # ref: https://github.com/google/sentencepiece/blob/master/src/sentencepiece_model.proto + # if tokenizer.is_user_defined(i): toktype = gguf.TokenType.USER_DEFINED + + if tokenizer.IsUnused(i): + toktype = gguf.TokenType.UNUSED + if tokenizer.IsByte(i): + toktype = gguf.TokenType.BYTE + + yield text, score, toktype + + def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + for text in self.added_tokens_list: + score = -1000.0 + yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED + + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + yield from self.sentencepiece_tokens() + yield from self.added_tokens() + + def __repr__(self) -> str: + return f"" + + +class LlamaHfVocab(Vocab): + tokenizer_model = "llama" + name = "hfft" + + def __init__(self, base_path: Path): + fname_tokenizer = base_path / FAST_TOKENIZER_FILE + # if this fails, FileNotFoundError propagates to caller + with open(fname_tokenizer, encoding='utf-8') as f: + tokenizer_json = json.load(f) + + # pre-check so we know if we need transformers + tokenizer_model: dict[str, Any] = tokenizer_json['model'] + is_llama3 = ( + tokenizer_model['type'] == 'BPE' and tokenizer_model.get('ignore_merges', False) + and not tokenizer_model.get('byte_fallback', True) + ) + if is_llama3: + raise TypeError('Llama 3 must be converted with BpeVocab') + + if not is_llama3 and ( + tokenizer_model['type'] != 'BPE' or not tokenizer_model.get('byte_fallback', False) + or tokenizer_json['decoder']['type'] != 'Sequence' + ): + raise FileNotFoundError('Cannot find Llama BPE tokenizer') + + try: + from transformers import AutoTokenizer + except ImportError as e: + raise ImportError( + "To use LlamaHfVocab, please install the `transformers` package. " + "You can install it with `pip install transformers`." + ) from e + + # Allow the tokenizer to default to slow or fast versions. + # Explicitly set tokenizer to use local paths. + self.tokenizer = AutoTokenizer.from_pretrained( + base_path, + cache_dir=base_path, + local_files_only=True, + ) + assert self.tokenizer.is_fast # assume tokenizer.json is used + + # Initialize lists and dictionaries for added tokens + self.added_tokens_list = [] + self.added_tokens_dict = dict() + self.added_tokens_ids = set() + + # Process added tokens + for tok, tokidx in sorted( + self.tokenizer.get_added_vocab().items(), key=lambda x: x[1] + ): + # Only consider added tokens that are not in the base vocabulary + if tokidx >= self.tokenizer.vocab_size: + self.added_tokens_list.append(tok) + self.added_tokens_dict[tok] = tokidx + self.added_tokens_ids.add(tokidx) + + # Store special tokens and their IDs + self.specials = { + tok: self.tokenizer.get_vocab()[tok] + for tok in self.tokenizer.all_special_tokens + } + self.special_ids = set(self.tokenizer.all_special_ids) + + # Set vocabulary sizes + self.vocab_size_base = self.tokenizer.vocab_size + self.vocab_size = self.vocab_size_base + len(self.added_tokens_list) + + self.fname_tokenizer = fname_tokenizer + + def hf_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + reverse_vocab = { + id: encoded_tok for encoded_tok, id in self.tokenizer.get_vocab().items() + } + + for token_id in range(self.vocab_size_base): + # Skip processing added tokens here + if token_id in self.added_tokens_ids: + continue + + # Convert token text to bytes + token_text = reverse_vocab[token_id].encode("utf-8") + + # Yield token text, score, and type + yield token_text, self.get_token_score(token_id), self.get_token_type( + token_id, token_text, self.special_ids # Reuse already stored special IDs + ) + + def get_token_type(self, token_id: int, token_text: bytes, special_ids: set[int]) -> gguf.TokenType: + # Special case for byte tokens + if re.fullmatch(br"<0x[0-9A-Fa-f]{2}>", token_text): + return gguf.TokenType.BYTE + + # Determine token type based on whether it's a special token + return gguf.TokenType.CONTROL if token_id in special_ids else gguf.TokenType.NORMAL + + def get_token_score(self, token_id: int) -> float: + # Placeholder for actual logic to determine the token's score + # This needs to be implemented based on specific requirements + return -1000.0 # Default score + + def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + for text in self.added_tokens_list: + if text in self.specials: + toktype = self.get_token_type(self.specials[text], b'', self.special_ids) + score = self.get_token_score(self.specials[text]) + else: + toktype = gguf.TokenType.USER_DEFINED + score = -1000.0 + + yield text.encode("utf-8"), score, toktype + + def has_newline_token(self): + return "<0x0A>" in self.tokenizer.vocab or "\n" in self.tokenizer.vocab + + def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: + yield from self.hf_tokens() + yield from self.added_tokens() + + def __repr__(self) -> str: + return f""