Hash v1 uploads for torrent chunks

This commit is contained in:
Jake Moshenko 2016-01-05 12:14:52 -05:00
parent 44fcc7e44b
commit 8f80d7064b
6 changed files with 98 additions and 69 deletions

View file

@ -1,8 +1,9 @@
from peewee import JOIN_LEFT_OUTER, Clause, SQL from peewee import Clause, SQL, fn
from cachetools import lru_cache from cachetools import lru_cache
from data.model import DataModelException
from data.database import (Repository, User, Team, TeamMember, RepositoryPermission, TeamRole, from data.database import (Repository, User, Team, TeamMember, RepositoryPermission, TeamRole,
Namespace, Visibility, db_for_update) Namespace, Visibility, ImageStorage, Image, db_for_update)
def prefix_search(field, prefix_query): def prefix_search(field, prefix_query):
@ -97,3 +98,30 @@ def get_user_organizations(username):
.join(TeamMember) .join(TeamMember)
.join(UserAlias, on=(UserAlias.id == TeamMember.user)) .join(UserAlias, on=(UserAlias.id == TeamMember.user))
.where(User.organization == True, UserAlias.username == username)) .where(User.organization == True, UserAlias.username == username))
def calculate_image_aggregate_size(ancestors_str, image_size, parent_image):
ancestors = ancestors_str.split('/')[1:-1]
if not ancestors:
return image_size
if parent_image is None:
raise DataModelException('Could not load parent image')
ancestor_size = parent_image.aggregate_size
if ancestor_size is not None:
return ancestor_size + image_size
# Fallback to a slower path if the parent doesn't have an aggregate size saved.
# TODO: remove this code if/when we do a full backfill.
ancestor_size = (ImageStorage
.select(fn.Sum(ImageStorage.image_size))
.join(Image)
.where(Image.id << ancestors)
.scalar())
if ancestor_size is None:
return None
return ancestor_size + image_size

View file

@ -2,7 +2,7 @@ import logging
import dateutil.parser import dateutil.parser
import random import random
from peewee import JOIN_LEFT_OUTER, fn, SQL from peewee import JOIN_LEFT_OUTER, SQL
from datetime import datetime from datetime import datetime
from data.model import (DataModelException, db_transaction, _basequery, storage, from data.model import (DataModelException, db_transaction, _basequery, storage,
@ -296,6 +296,8 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations,
def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment,
command, v1_json_metadata, parent=None): command, v1_json_metadata, parent=None):
""" Sets metadata that is specific to how a binary piece of storage fits into the layer tree.
"""
with db_transaction(): with db_transaction():
query = (Image query = (Image
.select(Image, ImageStorage) .select(Image, ImageStorage)
@ -322,6 +324,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
# We cleanup any old checksum in case it's a retry after a fail # We cleanup any old checksum in case it's a retry after a fail
fetched.v1_checksum = None fetched.v1_checksum = None
fetched.storage.content_checksum = None fetched.storage.content_checksum = None
fetched.storage.save()
fetched.comment = comment fetched.comment = comment
fetched.command = command fetched.command = command
@ -335,59 +338,6 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
return fetched return fetched
def set_image_size(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size):
if image_size is None:
raise DataModelException('Empty image size field')
try:
image = (Image
.select(Image, ImageStorage)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Image)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(Repository.name == repository_name, Namespace.username == namespace_name,
Image.docker_image_id == docker_image_id)
.get())
except Image.DoesNotExist:
raise DataModelException('No image with specified id and repository')
image.storage.image_size = image_size
image.storage.uncompressed_size = uncompressed_size
image.storage.save()
image.aggregate_size = calculate_image_aggregate_size(image.ancestors, image.storage,
image.parent)
image.save()
return image
def calculate_image_aggregate_size(ancestors_str, image_storage, parent_image):
ancestors = ancestors_str.split('/')[1:-1]
if not ancestors:
return image_storage.image_size
if parent_image is None:
raise DataModelException('Could not load parent image')
ancestor_size = parent_image.aggregate_size
if ancestor_size is not None:
return ancestor_size + image_storage.image_size
# Fallback to a slower path if the parent doesn't have an aggregate size saved.
# TODO: remove this code if/when we do a full backfill.
ancestor_size = (ImageStorage
.select(fn.Sum(ImageStorage.image_size))
.join(Image)
.where(Image.id << ancestors)
.scalar())
if ancestor_size is None:
return None
return ancestor_size + image_storage.image_size
def get_image(repo, docker_image_id): def get_image(repo, docker_image_id):
try: try:
return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo) return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo)
@ -452,7 +402,8 @@ def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str,
pass pass
# Get the aggregate size for the image. # Get the aggregate size for the image.
aggregate_size = calculate_image_aggregate_size(ancestors, image_storage, parent_image) aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, image_storage.image_size,
parent_image)
return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment, return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment,
command=command, v1_json_metadata=v1_json_metadata, created=created, command=command, v1_json_metadata=v1_json_metadata, created=created,

View file

@ -2,10 +2,12 @@ import logging
from peewee import JOIN_LEFT_OUTER, fn, SQL from peewee import JOIN_LEFT_OUTER, fn, SQL
from data.model import config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist,
from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement, DataModelException, _basequery)
ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation,
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) ImageStorageTransformation, ImageStorageSignature,
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo,
db_for_update)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -203,6 +205,40 @@ def _reduce_as_tree(queries_to_reduce):
return to_reduce_left.union_all(to_reduce_right) return to_reduce_left.union_all(to_reduce_right)
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
uncompressed_size):
""" Sets metadata that is specific to the binary storage of the data, irrespective of how it
is used in the layer tree.
"""
if image_size is None:
raise DataModelException('Empty image size field')
query = (Image
.select(Image, ImageStorage)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Image)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(Repository.name == repository_name, Namespace.username == namespace_name,
Image.docker_image_id == docker_image_id))
try:
image = db_for_update(query).get()
except ImageStorage.DoesNotExist:
raise InvalidImageException('No image with specified id and repository')
# We MUST do this here, it can't be done in the corresponding image call because the storage
# has not yet been pushed
image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size,
image.parent)
image.save()
image.storage.image_size = image_size
image.storage.uncompressed_size = uncompressed_size
image.storage.save()
return image.storage
def get_storage_locations(uuid): def get_storage_locations(uuid):
query = (ImageStoragePlacement query = (ImageStoragePlacement
.select() .select()

View file

@ -18,6 +18,7 @@ from auth.permissions import (ReadRepositoryPermission,
ModifyRepositoryPermission) ModifyRepositoryPermission)
from data import model, database from data import model, database
from util.registry import gzipstream from util.registry import gzipstream
from util.registry.torrent import PieceHasher
from endpoints.v1 import v1_bp from endpoints.v1 import v1_bp
from endpoints.decorators import anon_protect from endpoints.decorators import anon_protect
@ -214,6 +215,10 @@ def put_image_layer(namespace, repository, image_id):
size_info, size_hndlr = gzipstream.calculate_size_handler() size_info, size_hndlr = gzipstream.calculate_size_handler()
sr.add_handler(size_hndlr) sr.add_handler(size_hndlr)
# Add a handler to hash the chunks of the upload for torrenting
piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'])
sr.add_handler(piece_hasher.update)
# Add a handler which computes the checksum. # Add a handler which computes the checksum.
h, sum_hndlr = checksums.simple_checksum_handler(json_data) h, sum_hndlr = checksums.simple_checksum_handler(json_data)
sr.add_handler(sum_hndlr) sr.add_handler(sum_hndlr)
@ -231,8 +236,11 @@ def put_image_layer(namespace, repository, image_id):
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
# Save the size of the image. # Save the size of the image.
model.image.set_image_size(image_id, namespace, repository, size_info.compressed_size, updated_storage = model.storage.set_image_storage_metadata(image_id, namespace, repository,
size_info.uncompressed_size) size_info.compressed_size,
size_info.uncompressed_size)
pieces_bytes = piece_hasher.piece_hashes + piece_hasher.hash_fragment.digest()
model.storage.save_torrent_info(updated_storage, app.config['TORRENT_PIECE_SIZE'], pieces_bytes)
# Append the computed checksum. # Append the computed checksum.
csums = [] csums = []

View file

@ -81,10 +81,8 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
checksum = __gen_checksum(docker_image_id) checksum = __gen_checksum(docker_image_id)
new_image = model.image.find_create_or_link_image(docker_image_id, repo, None, {}, 'local_us') new_image = model.image.find_create_or_link_image(docker_image_id, repo, None, {}, 'local_us')
new_image_locations = new_image.storage.locations
new_image.storage.uuid = __gen_image_uuid(repo, image_num) new_image.storage.uuid = __gen_image_uuid(repo, image_num)
new_image.storage.uploading = False new_image.storage.uploading = False
new_image.storage.content_checksum = checksum
new_image.storage.save() new_image.storage.save()
# Write some data for the storage. # Write some data for the storage.
@ -113,10 +111,12 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username, new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username,
repo.name, str(creation_time), 'no comment', command, repo.name, str(creation_time), 'no comment', command,
json.dumps(v1_metadata), parent) json.dumps(v1_metadata), parent)
new_image.storage.content_checksum = checksum
new_image.storage.save()
compressed_size = random.randrange(1, 1024 * 1024 * 1024) compressed_size = random.randrange(1, 1024 * 1024 * 1024)
model.image.set_image_size(docker_image_id, repo.namespace_user.username, repo.name, model.storage.set_image_storage_metadata(docker_image_id, repo.namespace_user.username,
compressed_size, int(compressed_size * 1.4)) repo.name, compressed_size, int(compressed_size * 1.4))
parent = new_image parent = new_image

View file

@ -35,7 +35,8 @@ def make_torrent(name, webseed, length, piece_length, pieces):
class PieceHasher(object): class PieceHasher(object):
def __init__(self, piece_size, starting_offset, starting_piece_hash_bytes, hash_fragment_to_resume): def __init__(self, piece_size, starting_offset=0, starting_piece_hash_bytes='',
hash_fragment_to_resume=None):
if not isinstance(starting_offset, (int, long)): if not isinstance(starting_offset, (int, long)):
raise TypeError('starting_offset must be an integer') raise TypeError('starting_offset must be an integer')
elif not isinstance(piece_size, (int, long)): elif not isinstance(piece_size, (int, long)):
@ -43,9 +44,14 @@ class PieceHasher(object):
self._current_offset = starting_offset self._current_offset = starting_offset
self._piece_size = piece_size self._piece_size = piece_size
self._hash_fragment = hash_fragment_to_resume
self._piece_hashes = bytearray(starting_piece_hash_bytes) self._piece_hashes = bytearray(starting_piece_hash_bytes)
if hash_fragment_to_resume is None:
self._hash_fragment = resumablehashlib.sha1()
else:
self._hash_fragment = hash_fragment_to_resume
def update(self, buf): def update(self, buf):
buf_offset = 0 buf_offset = 0
while buf_offset < len(buf): while buf_offset < len(buf):