diff --git a/config.py b/config.py index 175efd3bc..84cdb4534 100644 --- a/config.py +++ b/config.py @@ -273,3 +273,6 @@ class DefaultConfig(object): 'API_VERSION': 'v1', 'API_TIMEOUT_SECONDS': 10, } + + # Torrent management flags + TORRENT_PIECE_SIZE = 512 * 1024 diff --git a/data/database.py b/data/database.py index 08bd54003..bdd4bd7c4 100644 --- a/data/database.py +++ b/data/database.py @@ -9,7 +9,7 @@ from random import SystemRandom from datetime import datetime from peewee import * from data.read_slave import ReadSlaveModel -from data.fields import ResumableSHAField, JSONField +from data.fields import ResumableSHA256Field, ResumableSHA1Field, JSONField from sqlalchemy.engine.url import make_url from collections import defaultdict @@ -803,12 +803,14 @@ class BlobUpload(BaseModel): repository = ForeignKeyField(Repository, index=True) uuid = CharField(index=True, unique=True) byte_count = IntegerField(default=0) - sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256) + sha_state = ResumableSHA256Field(null=True, default=resumablehashlib.sha256) location = ForeignKeyField(ImageStorageLocation) storage_metadata = JSONField(null=True, default={}) chunk_count = IntegerField(default=0) uncompressed_byte_count = IntegerField(null=True) created = DateTimeField(default=datetime.now, index=True) + piece_sha_state = ResumableSHA1Field(null=True, default=resumablehashlib.sha1) + piece_hashes = TextField(default='') class Meta: database = db @@ -846,6 +848,16 @@ class QuayRelease(BaseModel): ) +class TorrentInfo(BaseModel): + storage = ForeignKeyField(ImageStorage) + piece_length = IntegerField() + pieces = TextField() + + indexes = ( + (('storage', 'piece_length'), True), + ) + + all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem, RepositoryBuild, Team, TeamMember, TeamRole, LogEntryKind, LogEntry, @@ -856,4 +868,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, RepositoryAuthorizedEmail, ImageStorageTransformation, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion, - QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage] + QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage, TorrentInfo] diff --git a/data/fields.py b/data/fields.py index 123811ccd..cd68af474 100644 --- a/data/fields.py +++ b/data/fields.py @@ -5,7 +5,7 @@ import json from peewee import TextField -class ResumableSHAField(TextField): +class ResumableSHA256Field(TextField): def db_value(self, value): sha_state = value.state() @@ -28,6 +28,29 @@ class ResumableSHAField(TextField): return to_resume +class ResumableSHA1Field(TextField): + def db_value(self, value): + sha_state = value.state() + + # One of the fields is a byte string, let's base64 encode it to make sure + # we can store and fetch it regardless of default collocation. + sha_state[3] = base64.b64encode(sha_state[3]) + + return json.dumps(sha_state) + + def python_value(self, value): + to_resume = resumablehashlib.sha1() + if value is None: + return to_resume + + sha_state = json.loads(value) + + # We need to base64 decode the data bytestring. + sha_state[3] = base64.b64decode(sha_state[3]) + to_resume.set_state(sha_state) + return to_resume + + class JSONField(TextField): def db_value(self, value): return json.dumps(value) diff --git a/data/model/blob.py b/data/model/blob.py index 263978ec1..329daf7a2 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -51,6 +51,8 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo) tag.create_temporary_hidden_tag(repo, image, link_expiration_s) + return storage + def get_blob_upload(namespace, repo_name, upload_uuid): """ Load the upload which is already in progress. diff --git a/data/model/storage.py b/data/model/storage.py index ea626ef6d..efca90365 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -5,7 +5,7 @@ from peewee import JOIN_LEFT_OUTER, fn, SQL from data.model import config, db_transaction, InvalidImageException from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, - ImageStorageSignatureKind, Repository, Namespace) + ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) logger = logging.getLogger(__name__) @@ -212,3 +212,7 @@ def get_storage_locations(uuid): .where(ImageStorage.uuid == uuid)) return [location.location.name for location in query] + + +def save_torrent_info(storage_object, piece_length, pieces): + TorrentInfo.create(storage=storage_object, piece_length=piece_length, pieces=pieces) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 0f8feadde..cfb9387e2 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -14,6 +14,7 @@ from endpoints.decorators import anon_protect from util.cache import cache_control from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler +from util.registry.torrent import PieceHasher from storage.basestorage import InvalidChunkException @@ -221,6 +222,13 @@ def _upload_chunk(namespace, repo_name, upload_uuid): input_fp = wrap_with_handler(input_fp, found.sha_state.update) + piece_hasher = None + # TODO remove this when all in-progress blob uploads reliably contain piece hashes + if start_offset == 0 and found.piece_sha_state is not None: + piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'], start_offset, found.piece_hashes, + found.piece_sha_state) + input_fp = wrap_with_handler(input_fp, piece_hasher.update) + # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the # stream so we can determine the uncompressed size. We'll throw out this data if another chunk # comes in, but in the common case Docker only sends one chunk. @@ -231,7 +239,8 @@ def _upload_chunk(namespace, repo_name, upload_uuid): try: length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid, - start_offset, length, input_fp, + start_offset, length, + input_fp, found.storage_metadata, content_type=BLOB_CONTENT_TYPE) except InvalidChunkException: @@ -246,6 +255,10 @@ def _upload_chunk(namespace, repo_name, upload_uuid): # know the uncompressed size. found.uncompressed_byte_count = None + if piece_hasher is not None: + found.piece_hashes = piece_hasher.piece_hashes + found.piece_sha_state = piece_hasher.hash_fragment + found.storage_metadata = new_metadata found.byte_count += length_written found.chunk_count += 1 @@ -274,10 +287,15 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest): final_blob_location, upload_obj.storage_metadata) # Mark the blob as uploaded. - model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, - upload_obj.location, upload_obj.byte_count, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], - upload_obj.uncompressed_byte_count) + blob_storage = model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, + upload_obj.location, + upload_obj.byte_count, + app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], + upload_obj.uncompressed_byte_count) + + if upload_obj.piece_sha_state is not None: + piece_string = upload_obj.piece_hashes + upload_obj.piece_sha_state.hexdigest() + model.storage.save_torrent_info(blob_storage, app.config['TORRENT_PIECE_SIZE'], piece_string) # Delete the upload tracking row. upload_obj.delete_instance() diff --git a/test/data/test.db b/test/data/test.db index 1738e57c4..16a8c3581 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/util/registry/torrent.py b/util/registry/torrent.py new file mode 100644 index 000000000..315683d70 --- /dev/null +++ b/util/registry/torrent.py @@ -0,0 +1,43 @@ +import resumablehashlib + + +class PieceHasher(object): + def __init__(self, piece_size, starting_offset, starting_piece_hash_str, hash_fragment_to_resume): + if not isinstance(starting_offset, (int, long)): + raise TypeError('starting_offset must be an integer') + elif not isinstance(piece_size, (int, long)): + raise TypeError('piece_size must be an integer') + + self._current_offset = starting_offset + self._piece_size = piece_size + self._hash_fragment = hash_fragment_to_resume + self._piece_hashes = [starting_piece_hash_str] + + def update(self, buf): + buf_offset = 0 + while buf_offset < len(buf): + buf_bytes_to_hash = buf[0:self._piece_length_remaining()] + to_hash_len = len(buf_bytes_to_hash) + + if self._piece_offset() == 0 and to_hash_len > 0 and self._current_offset > 0: + # We are opening a new piece + self._piece_hashes.append(self._hash_fragment.hexdigest()) + self._hash_fragment = resumablehashlib.sha1() + + self._hash_fragment.update(buf_bytes_to_hash) + self._current_offset += to_hash_len + buf_offset += to_hash_len + + def _piece_length_remaining(self): + return self._piece_size - (self._current_offset % self._piece_size) + + def _piece_offset(self): + return self._current_offset % self._piece_size + + @property + def piece_hashes(self): + return ''.join(self._piece_hashes) + + @property + def hash_fragment(self): + return self._hash_fragment