Merge pull request #1117 from jakedt/torrent
Add torrent hashes for v1 uploads
This commit is contained in:
commit
9496c69663
8 changed files with 133 additions and 103 deletions
|
@ -1,18 +1,19 @@
|
|||
from peewee import JOIN_LEFT_OUTER, Clause, SQL
|
||||
from peewee import Clause, SQL, fn
|
||||
from cachetools import lru_cache
|
||||
|
||||
from data.model import DataModelException
|
||||
from data.database import (Repository, User, Team, TeamMember, RepositoryPermission, TeamRole,
|
||||
Namespace, Visibility, db_for_update)
|
||||
Namespace, Visibility, ImageStorage, Image, db_for_update)
|
||||
|
||||
|
||||
def prefix_search(field, prefix_query):
|
||||
""" Returns the wildcard match for searching for the given prefix query. """
|
||||
# Escape the known wildcard characters.
|
||||
prefix_query = (prefix_query
|
||||
.replace('!', '!!')
|
||||
.replace('%', '!%')
|
||||
.replace('_', '!_')
|
||||
.replace('[', '!['))
|
||||
.replace('!', '!!')
|
||||
.replace('%', '!%')
|
||||
.replace('_', '!_')
|
||||
.replace('[', '!['))
|
||||
|
||||
return field ** Clause(prefix_query + '%', SQL("ESCAPE '!'"))
|
||||
|
||||
|
@ -46,8 +47,9 @@ def filter_to_repos_for_user(query, username=None, namespace=None, include_publi
|
|||
where_clause = (Namespace.username == namespace)
|
||||
|
||||
if include_public:
|
||||
queries.append(query.clone()
|
||||
.where(Repository.visibility == get_public_repo_visibility(), where_clause))
|
||||
queries.append(query
|
||||
.clone()
|
||||
.where(Repository.visibility == get_public_repo_visibility(), where_clause))
|
||||
|
||||
if username:
|
||||
UserThroughTeam = User.alias()
|
||||
|
@ -57,29 +59,32 @@ def filter_to_repos_for_user(query, username=None, namespace=None, include_publi
|
|||
AdminUser = User.alias()
|
||||
|
||||
# Add repositories in which the user has permission.
|
||||
queries.append(query.clone()
|
||||
.switch(RepositoryPermission)
|
||||
.join(User)
|
||||
.where(User.username == username, where_clause))
|
||||
queries.append(query
|
||||
.clone()
|
||||
.switch(RepositoryPermission)
|
||||
.join(User)
|
||||
.where(User.username == username, where_clause))
|
||||
|
||||
# Add repositories in which the user is a member of a team that has permission.
|
||||
queries.append(query.clone()
|
||||
.switch(RepositoryPermission)
|
||||
.join(Team)
|
||||
.join(TeamMember)
|
||||
.join(UserThroughTeam, on=(UserThroughTeam.id == TeamMember.user))
|
||||
.where(UserThroughTeam.username == username, where_clause))
|
||||
queries.append(query
|
||||
.clone()
|
||||
.switch(RepositoryPermission)
|
||||
.join(Team)
|
||||
.join(TeamMember)
|
||||
.join(UserThroughTeam, on=(UserThroughTeam.id == TeamMember.user))
|
||||
.where(UserThroughTeam.username == username, where_clause))
|
||||
|
||||
# Add repositories under namespaces in which the user is the org admin.
|
||||
queries.append(query.clone()
|
||||
.switch(Repository)
|
||||
.join(Org, on=(Repository.namespace_user == Org.id))
|
||||
.join(AdminTeam, on=(Org.id == AdminTeam.organization))
|
||||
.join(TeamRole, on=(AdminTeam.role == TeamRole.id))
|
||||
.switch(AdminTeam)
|
||||
.join(AdminTeamMember, on=(AdminTeam.id == AdminTeamMember.team))
|
||||
.join(AdminUser, on=(AdminTeamMember.user == AdminUser.id))
|
||||
.where(AdminUser.username == username, where_clause))
|
||||
queries.append(query
|
||||
.clone()
|
||||
.switch(Repository)
|
||||
.join(Org, on=(Repository.namespace_user == Org.id))
|
||||
.join(AdminTeam, on=(Org.id == AdminTeam.organization))
|
||||
.join(TeamRole, on=(AdminTeam.role == TeamRole.id))
|
||||
.switch(AdminTeam)
|
||||
.join(AdminTeamMember, on=(AdminTeam.id == AdminTeamMember.team))
|
||||
.join(AdminUser, on=(AdminTeamMember.user == AdminUser.id))
|
||||
.where(AdminUser.username == username, where_clause))
|
||||
|
||||
return reduce(lambda l, r: l | r, queries)
|
||||
|
||||
|
@ -93,3 +98,30 @@ def get_user_organizations(username):
|
|||
.join(TeamMember)
|
||||
.join(UserAlias, on=(UserAlias.id == TeamMember.user))
|
||||
.where(User.organization == True, UserAlias.username == username))
|
||||
|
||||
|
||||
def calculate_image_aggregate_size(ancestors_str, image_size, parent_image):
|
||||
ancestors = ancestors_str.split('/')[1:-1]
|
||||
if not ancestors:
|
||||
return image_size
|
||||
|
||||
if parent_image is None:
|
||||
raise DataModelException('Could not load parent image')
|
||||
|
||||
ancestor_size = parent_image.aggregate_size
|
||||
if ancestor_size is not None:
|
||||
return ancestor_size + image_size
|
||||
|
||||
# Fallback to a slower path if the parent doesn't have an aggregate size saved.
|
||||
# TODO: remove this code if/when we do a full backfill.
|
||||
ancestor_size = (ImageStorage
|
||||
.select(fn.Sum(ImageStorage.image_size))
|
||||
.join(Image)
|
||||
.where(Image.id << ancestors)
|
||||
.scalar())
|
||||
if ancestor_size is None:
|
||||
return None
|
||||
|
||||
return ancestor_size + image_size
|
||||
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
|||
import dateutil.parser
|
||||
import random
|
||||
|
||||
from peewee import JOIN_LEFT_OUTER, fn, SQL
|
||||
from peewee import JOIN_LEFT_OUTER, SQL
|
||||
from datetime import datetime
|
||||
|
||||
from data.model import (DataModelException, db_transaction, _basequery, storage,
|
||||
|
@ -21,11 +21,11 @@ def get_repository_image_and_deriving(docker_image_id, storage_uuid):
|
|||
"""
|
||||
try:
|
||||
image_found = (Image
|
||||
.select()
|
||||
.join(ImageStorage)
|
||||
.where(Image.docker_image_id == docker_image_id,
|
||||
ImageStorage.uuid == storage_uuid)
|
||||
.get())
|
||||
.select()
|
||||
.join(ImageStorage)
|
||||
.where(Image.docker_image_id == docker_image_id,
|
||||
ImageStorage.uuid == storage_uuid)
|
||||
.get())
|
||||
except Image.DoesNotExist:
|
||||
return Image.select().where(Image.id < 0) # Empty query
|
||||
|
||||
|
@ -296,6 +296,8 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations,
|
|||
|
||||
def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment,
|
||||
command, v1_json_metadata, parent=None):
|
||||
""" Sets metadata that is specific to how a binary piece of storage fits into the layer tree.
|
||||
"""
|
||||
with db_transaction():
|
||||
query = (Image
|
||||
.select(Image, ImageStorage)
|
||||
|
@ -322,6 +324,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
|
|||
# We cleanup any old checksum in case it's a retry after a fail
|
||||
fetched.v1_checksum = None
|
||||
fetched.storage.content_checksum = None
|
||||
fetched.storage.save()
|
||||
|
||||
fetched.comment = comment
|
||||
fetched.command = command
|
||||
|
@ -335,59 +338,6 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
|
|||
return fetched
|
||||
|
||||
|
||||
def set_image_size(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size):
|
||||
if image_size is None:
|
||||
raise DataModelException('Empty image size field')
|
||||
|
||||
try:
|
||||
image = (Image
|
||||
.select(Image, ImageStorage)
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.switch(Image)
|
||||
.join(ImageStorage, JOIN_LEFT_OUTER)
|
||||
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
||||
Image.docker_image_id == docker_image_id)
|
||||
.get())
|
||||
except Image.DoesNotExist:
|
||||
raise DataModelException('No image with specified id and repository')
|
||||
|
||||
image.storage.image_size = image_size
|
||||
image.storage.uncompressed_size = uncompressed_size
|
||||
image.storage.save()
|
||||
|
||||
image.aggregate_size = calculate_image_aggregate_size(image.ancestors, image.storage,
|
||||
image.parent)
|
||||
image.save()
|
||||
|
||||
return image
|
||||
|
||||
|
||||
def calculate_image_aggregate_size(ancestors_str, image_storage, parent_image):
|
||||
ancestors = ancestors_str.split('/')[1:-1]
|
||||
if not ancestors:
|
||||
return image_storage.image_size
|
||||
|
||||
if parent_image is None:
|
||||
raise DataModelException('Could not load parent image')
|
||||
|
||||
ancestor_size = parent_image.aggregate_size
|
||||
if ancestor_size is not None:
|
||||
return ancestor_size + image_storage.image_size
|
||||
|
||||
# Fallback to a slower path if the parent doesn't have an aggregate size saved.
|
||||
# TODO: remove this code if/when we do a full backfill.
|
||||
ancestor_size = (ImageStorage
|
||||
.select(fn.Sum(ImageStorage.image_size))
|
||||
.join(Image)
|
||||
.where(Image.id << ancestors)
|
||||
.scalar())
|
||||
if ancestor_size is None:
|
||||
return None
|
||||
|
||||
return ancestor_size + image_storage.image_size
|
||||
|
||||
|
||||
def get_image(repo, docker_image_id):
|
||||
try:
|
||||
return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo)
|
||||
|
@ -452,7 +402,8 @@ def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str,
|
|||
pass
|
||||
|
||||
# Get the aggregate size for the image.
|
||||
aggregate_size = calculate_image_aggregate_size(ancestors, image_storage, parent_image)
|
||||
aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, image_storage.image_size,
|
||||
parent_image)
|
||||
|
||||
return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment,
|
||||
command=command, v1_json_metadata=v1_json_metadata, created=created,
|
||||
|
|
|
@ -2,10 +2,12 @@ import logging
|
|||
|
||||
from peewee import JOIN_LEFT_OUTER, fn, SQL
|
||||
|
||||
from data.model import config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist
|
||||
from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement,
|
||||
ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature,
|
||||
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo)
|
||||
from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist,
|
||||
DataModelException, _basequery)
|
||||
from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation,
|
||||
ImageStorageTransformation, ImageStorageSignature,
|
||||
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo,
|
||||
db_for_update)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -203,6 +205,40 @@ def _reduce_as_tree(queries_to_reduce):
|
|||
return to_reduce_left.union_all(to_reduce_right)
|
||||
|
||||
|
||||
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
|
||||
uncompressed_size):
|
||||
""" Sets metadata that is specific to the binary storage of the data, irrespective of how it
|
||||
is used in the layer tree.
|
||||
"""
|
||||
if image_size is None:
|
||||
raise DataModelException('Empty image size field')
|
||||
|
||||
query = (Image
|
||||
.select(Image, ImageStorage)
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.switch(Image)
|
||||
.join(ImageStorage)
|
||||
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
||||
Image.docker_image_id == docker_image_id))
|
||||
|
||||
try:
|
||||
image = db_for_update(query).get()
|
||||
except ImageStorage.DoesNotExist:
|
||||
raise InvalidImageException('No image with specified id and repository')
|
||||
|
||||
# We MUST do this here, it can't be done in the corresponding image call because the storage
|
||||
# has not yet been pushed
|
||||
image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size,
|
||||
image.parent)
|
||||
image.save()
|
||||
|
||||
image.storage.image_size = image_size
|
||||
image.storage.uncompressed_size = uncompressed_size
|
||||
image.storage.save()
|
||||
return image.storage
|
||||
|
||||
|
||||
def get_storage_locations(uuid):
|
||||
query = (ImageStoragePlacement
|
||||
.select()
|
||||
|
|
|
@ -18,6 +18,7 @@ from auth.permissions import (ReadRepositoryPermission,
|
|||
ModifyRepositoryPermission)
|
||||
from data import model, database
|
||||
from util.registry import gzipstream
|
||||
from util.registry.torrent import PieceHasher
|
||||
from endpoints.v1 import v1_bp
|
||||
from endpoints.decorators import anon_protect
|
||||
|
||||
|
@ -214,6 +215,10 @@ def put_image_layer(namespace, repository, image_id):
|
|||
size_info, size_hndlr = gzipstream.calculate_size_handler()
|
||||
sr.add_handler(size_hndlr)
|
||||
|
||||
# Add a handler to hash the chunks of the upload for torrenting
|
||||
piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'])
|
||||
sr.add_handler(piece_hasher.update)
|
||||
|
||||
# Add a handler which computes the checksum.
|
||||
h, sum_hndlr = checksums.simple_checksum_handler(json_data)
|
||||
sr.add_handler(sum_hndlr)
|
||||
|
@ -231,8 +236,11 @@ def put_image_layer(namespace, repository, image_id):
|
|||
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
|
||||
|
||||
# Save the size of the image.
|
||||
model.image.set_image_size(image_id, namespace, repository, size_info.compressed_size,
|
||||
size_info.uncompressed_size)
|
||||
updated_storage = model.storage.set_image_storage_metadata(image_id, namespace, repository,
|
||||
size_info.compressed_size,
|
||||
size_info.uncompressed_size)
|
||||
pieces_bytes = piece_hasher.piece_hashes + piece_hasher.hash_fragment.digest()
|
||||
model.storage.save_torrent_info(updated_storage, app.config['TORRENT_PIECE_SIZE'], pieces_bytes)
|
||||
|
||||
# Append the computed checksum.
|
||||
csums = []
|
||||
|
|
|
@ -224,7 +224,7 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
|||
|
||||
piece_hasher = None
|
||||
# TODO remove this when all in-progress blob uploads reliably contain piece hashes
|
||||
if start_offset == 0 and found.piece_sha_state is not None:
|
||||
if start_offset == 0 or found.piece_sha_state is not None:
|
||||
piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'], start_offset, found.piece_hashes,
|
||||
found.piece_sha_state)
|
||||
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
|
||||
|
|
|
@ -81,10 +81,8 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
|
|||
checksum = __gen_checksum(docker_image_id)
|
||||
|
||||
new_image = model.image.find_create_or_link_image(docker_image_id, repo, None, {}, 'local_us')
|
||||
new_image_locations = new_image.storage.locations
|
||||
new_image.storage.uuid = __gen_image_uuid(repo, image_num)
|
||||
new_image.storage.uploading = False
|
||||
new_image.storage.content_checksum = checksum
|
||||
new_image.storage.save()
|
||||
|
||||
# Write some data for the storage.
|
||||
|
@ -113,10 +111,12 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
|
|||
new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username,
|
||||
repo.name, str(creation_time), 'no comment', command,
|
||||
json.dumps(v1_metadata), parent)
|
||||
new_image.storage.content_checksum = checksum
|
||||
new_image.storage.save()
|
||||
|
||||
compressed_size = random.randrange(1, 1024 * 1024 * 1024)
|
||||
model.image.set_image_size(docker_image_id, repo.namespace_user.username, repo.name,
|
||||
compressed_size, int(compressed_size * 1.4))
|
||||
model.storage.set_image_storage_metadata(docker_image_id, repo.namespace_user.username,
|
||||
repo.name, compressed_size, int(compressed_size * 1.4))
|
||||
|
||||
parent = new_image
|
||||
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
from multiprocessing import Queue
|
||||
import os
|
||||
|
||||
class QueueFile(object):
|
||||
""" Class which implements a file-like interface and reads from a blocking
|
||||
multiprocessing queue.
|
||||
|
|
|
@ -35,7 +35,8 @@ def make_torrent(name, webseed, length, piece_length, pieces):
|
|||
|
||||
|
||||
class PieceHasher(object):
|
||||
def __init__(self, piece_size, starting_offset, starting_piece_hash_bytes, hash_fragment_to_resume):
|
||||
def __init__(self, piece_size, starting_offset=0, starting_piece_hash_bytes='',
|
||||
hash_fragment_to_resume=None):
|
||||
if not isinstance(starting_offset, (int, long)):
|
||||
raise TypeError('starting_offset must be an integer')
|
||||
elif not isinstance(piece_size, (int, long)):
|
||||
|
@ -43,9 +44,14 @@ class PieceHasher(object):
|
|||
|
||||
self._current_offset = starting_offset
|
||||
self._piece_size = piece_size
|
||||
self._hash_fragment = hash_fragment_to_resume
|
||||
self._piece_hashes = bytearray(starting_piece_hash_bytes)
|
||||
|
||||
if hash_fragment_to_resume is None:
|
||||
self._hash_fragment = resumablehashlib.sha1()
|
||||
else:
|
||||
self._hash_fragment = hash_fragment_to_resume
|
||||
|
||||
|
||||
def update(self, buf):
|
||||
buf_offset = 0
|
||||
while buf_offset < len(buf):
|
||||
|
|
Reference in a new issue