diff --git a/data/model/_basequery.py b/data/model/_basequery.py index a04df60c3..0cb0f3fee 100644 --- a/data/model/_basequery.py +++ b/data/model/_basequery.py @@ -1,8 +1,9 @@ -from peewee import JOIN_LEFT_OUTER, Clause, SQL +from peewee import Clause, SQL, fn from cachetools import lru_cache +from data.model import DataModelException from data.database import (Repository, User, Team, TeamMember, RepositoryPermission, TeamRole, - Namespace, Visibility, db_for_update) + Namespace, Visibility, ImageStorage, Image, db_for_update) def prefix_search(field, prefix_query): @@ -97,3 +98,30 @@ def get_user_organizations(username): .join(TeamMember) .join(UserAlias, on=(UserAlias.id == TeamMember.user)) .where(User.organization == True, UserAlias.username == username)) + + +def calculate_image_aggregate_size(ancestors_str, image_size, parent_image): + ancestors = ancestors_str.split('/')[1:-1] + if not ancestors: + return image_size + + if parent_image is None: + raise DataModelException('Could not load parent image') + + ancestor_size = parent_image.aggregate_size + if ancestor_size is not None: + return ancestor_size + image_size + + # Fallback to a slower path if the parent doesn't have an aggregate size saved. + # TODO: remove this code if/when we do a full backfill. + ancestor_size = (ImageStorage + .select(fn.Sum(ImageStorage.image_size)) + .join(Image) + .where(Image.id << ancestors) + .scalar()) + if ancestor_size is None: + return None + + return ancestor_size + image_size + + diff --git a/data/model/image.py b/data/model/image.py index e8227dfaa..bbe9201ad 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -2,7 +2,7 @@ import logging import dateutil.parser import random -from peewee import JOIN_LEFT_OUTER, fn, SQL +from peewee import JOIN_LEFT_OUTER, SQL from datetime import datetime from data.model import (DataModelException, db_transaction, _basequery, storage, @@ -296,6 +296,8 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations, def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, command, v1_json_metadata, parent=None): + """ Sets metadata that is specific to how a binary piece of storage fits into the layer tree. + """ with db_transaction(): query = (Image .select(Image, ImageStorage) @@ -322,6 +324,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created # We cleanup any old checksum in case it's a retry after a fail fetched.v1_checksum = None fetched.storage.content_checksum = None + fetched.storage.save() fetched.comment = comment fetched.command = command @@ -335,59 +338,6 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created return fetched -def set_image_size(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size): - if image_size is None: - raise DataModelException('Empty image size field') - - try: - image = (Image - .select(Image, ImageStorage) - .join(Repository) - .join(Namespace, on=(Repository.namespace_user == Namespace.id)) - .switch(Image) - .join(ImageStorage, JOIN_LEFT_OUTER) - .where(Repository.name == repository_name, Namespace.username == namespace_name, - Image.docker_image_id == docker_image_id) - .get()) - except Image.DoesNotExist: - raise DataModelException('No image with specified id and repository') - - image.storage.image_size = image_size - image.storage.uncompressed_size = uncompressed_size - image.storage.save() - - image.aggregate_size = calculate_image_aggregate_size(image.ancestors, image.storage, - image.parent) - image.save() - - return image - - -def calculate_image_aggregate_size(ancestors_str, image_storage, parent_image): - ancestors = ancestors_str.split('/')[1:-1] - if not ancestors: - return image_storage.image_size - - if parent_image is None: - raise DataModelException('Could not load parent image') - - ancestor_size = parent_image.aggregate_size - if ancestor_size is not None: - return ancestor_size + image_storage.image_size - - # Fallback to a slower path if the parent doesn't have an aggregate size saved. - # TODO: remove this code if/when we do a full backfill. - ancestor_size = (ImageStorage - .select(fn.Sum(ImageStorage.image_size)) - .join(Image) - .where(Image.id << ancestors) - .scalar()) - if ancestor_size is None: - return None - - return ancestor_size + image_storage.image_size - - def get_image(repo, docker_image_id): try: return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo) @@ -452,7 +402,8 @@ def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str, pass # Get the aggregate size for the image. - aggregate_size = calculate_image_aggregate_size(ancestors, image_storage, parent_image) + aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, image_storage.image_size, + parent_image) return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment, command=command, v1_json_metadata=v1_json_metadata, created=created, diff --git a/data/model/storage.py b/data/model/storage.py index d0122d9a8..b66b20c49 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -2,10 +2,12 @@ import logging from peewee import JOIN_LEFT_OUTER, fn, SQL -from data.model import config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist -from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement, - ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, - ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) +from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist, + DataModelException, _basequery) +from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation, + ImageStorageTransformation, ImageStorageSignature, + ImageStorageSignatureKind, Repository, Namespace, TorrentInfo, + db_for_update) logger = logging.getLogger(__name__) @@ -203,6 +205,40 @@ def _reduce_as_tree(queries_to_reduce): return to_reduce_left.union_all(to_reduce_right) +def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size, + uncompressed_size): + """ Sets metadata that is specific to the binary storage of the data, irrespective of how it + is used in the layer tree. + """ + if image_size is None: + raise DataModelException('Empty image size field') + + query = (Image + .select(Image, ImageStorage) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .switch(Image) + .join(ImageStorage, JOIN_LEFT_OUTER) + .where(Repository.name == repository_name, Namespace.username == namespace_name, + Image.docker_image_id == docker_image_id)) + + try: + image = db_for_update(query).get() + except ImageStorage.DoesNotExist: + raise InvalidImageException('No image with specified id and repository') + + # We MUST do this here, it can't be done in the corresponding image call because the storage + # has not yet been pushed + image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size, + image.parent) + image.save() + + image.storage.image_size = image_size + image.storage.uncompressed_size = uncompressed_size + image.storage.save() + return image.storage + + def get_storage_locations(uuid): query = (ImageStoragePlacement .select() diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index e7557468c..d4d33c630 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -18,6 +18,7 @@ from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) from data import model, database from util.registry import gzipstream +from util.registry.torrent import PieceHasher from endpoints.v1 import v1_bp from endpoints.decorators import anon_protect @@ -214,6 +215,10 @@ def put_image_layer(namespace, repository, image_id): size_info, size_hndlr = gzipstream.calculate_size_handler() sr.add_handler(size_hndlr) + # Add a handler to hash the chunks of the upload for torrenting + piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE']) + sr.add_handler(piece_hasher.update) + # Add a handler which computes the checksum. h, sum_hndlr = checksums.simple_checksum_handler(json_data) sr.add_handler(sum_hndlr) @@ -231,8 +236,11 @@ def put_image_layer(namespace, repository, image_id): abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) # Save the size of the image. - model.image.set_image_size(image_id, namespace, repository, size_info.compressed_size, - size_info.uncompressed_size) + updated_storage = model.storage.set_image_storage_metadata(image_id, namespace, repository, + size_info.compressed_size, + size_info.uncompressed_size) + pieces_bytes = piece_hasher.piece_hashes + piece_hasher.hash_fragment.digest() + model.storage.save_torrent_info(updated_storage, app.config['TORRENT_PIECE_SIZE'], pieces_bytes) # Append the computed checksum. csums = [] diff --git a/initdb.py b/initdb.py index b37ec9dcd..73f5902dc 100644 --- a/initdb.py +++ b/initdb.py @@ -81,10 +81,8 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): checksum = __gen_checksum(docker_image_id) new_image = model.image.find_create_or_link_image(docker_image_id, repo, None, {}, 'local_us') - new_image_locations = new_image.storage.locations new_image.storage.uuid = __gen_image_uuid(repo, image_num) new_image.storage.uploading = False - new_image.storage.content_checksum = checksum new_image.storage.save() # Write some data for the storage. @@ -113,10 +111,12 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username, repo.name, str(creation_time), 'no comment', command, json.dumps(v1_metadata), parent) + new_image.storage.content_checksum = checksum + new_image.storage.save() compressed_size = random.randrange(1, 1024 * 1024 * 1024) - model.image.set_image_size(docker_image_id, repo.namespace_user.username, repo.name, - compressed_size, int(compressed_size * 1.4)) + model.storage.set_image_storage_metadata(docker_image_id, repo.namespace_user.username, + repo.name, compressed_size, int(compressed_size * 1.4)) parent = new_image diff --git a/util/registry/torrent.py b/util/registry/torrent.py index ea92410dd..0cc9cb349 100644 --- a/util/registry/torrent.py +++ b/util/registry/torrent.py @@ -35,7 +35,8 @@ def make_torrent(name, webseed, length, piece_length, pieces): class PieceHasher(object): - def __init__(self, piece_size, starting_offset, starting_piece_hash_bytes, hash_fragment_to_resume): + def __init__(self, piece_size, starting_offset=0, starting_piece_hash_bytes='', + hash_fragment_to_resume=None): if not isinstance(starting_offset, (int, long)): raise TypeError('starting_offset must be an integer') elif not isinstance(piece_size, (int, long)): @@ -43,9 +44,14 @@ class PieceHasher(object): self._current_offset = starting_offset self._piece_size = piece_size - self._hash_fragment = hash_fragment_to_resume self._piece_hashes = bytearray(starting_piece_hash_bytes) + if hash_fragment_to_resume is None: + self._hash_fragment = resumablehashlib.sha1() + else: + self._hash_fragment = hash_fragment_to_resume + + def update(self, buf): buf_offset = 0 while buf_offset < len(buf):