From 7c572fd218e811ac15cc0ddf8e5f5f6b11a81ff9 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 20 Jan 2016 16:33:10 -0500 Subject: [PATCH] Add support for torrenting verbs Fixes #1130 --- endpoints/verbs.py | 97 ++++++++++++++++++++++++++++++---------- test/registry_tests.py | 62 ++++++++++++++++++++++--- test/testconfig.py | 1 + util/registry/torrent.py | 4 ++ 4 files changed, 135 insertions(+), 29 deletions(-) diff --git a/endpoints/verbs.py b/endpoints/verbs.py index 85c46f495..568a469a9 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -2,7 +2,7 @@ import logging import json import hashlib -from flask import redirect, Blueprint, abort, send_file, make_response +from flask import redirect, Blueprint, abort, send_file, make_response, request import features @@ -121,6 +121,69 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location done_uploading.save() +def _torrent_for_storage(storage_ref, is_public): + """ Returns a response containing the torrent file contents for the given storage. May abort + with an error if the state is not valid (e.g. non-public, non-user request). + """ + # Make sure the storage has a size. + if not storage_ref.image_size: + abort(404) + + # Lookup the torrent information for the storage. + try: + torrent_info = model.storage.get_torrent_info(storage_ref) + except model.TorrentInfoDoesNotExist: + abort(404) + + # Lookup the webseed path for the storage. + path = model.storage.get_layer_path(storage_ref) + webseed = storage.get_direct_download_url(storage_ref.locations, path) + if webseed is None: + # We cannot support webseeds for storages that cannot provide direct downloads. + abort(make_response('Storage engine does not support seeding.', 501)) + + # Build the filename for the torrent. + if is_public: + name = public_torrent_filename(storage_ref.uuid) + else: + user = get_authenticated_user() + if not user: + abort(403) + + name = per_user_torrent_filename(user.uuid, storage_ref.uuid) + + # Return the torrent file. + torrent_file = make_torrent(name, webseed, storage_ref.image_size, + torrent_info.piece_length, torrent_info.pieces) + + headers = {'Content-Type': 'application/x-bittorrent', + 'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)} + + return make_response(torrent_file, 200, headers) + + +def _torrent_repo_verb(repo_image, tag, verb, **kwargs): + """ Handles returning a torrent for the given verb on the given image and tag. """ + if not features.BITTORRENT: + # Torrent feature is not enabled. + abort(406) + + # Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist, + # we cannot create it here, so we 404. + derived = model.image.find_derived_storage_for_image(repo_image, verb) + if not derived: + abort(406) + + # Return the torrent. + public_repo = model.repository.is_repository_public(repo_image.repository) + torrent = _torrent_for_storage(derived, public_repo) + + # Log the action. + track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs) + + return torrent + + def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): permission = ReadRepositoryPermission(namespace, repository) @@ -179,6 +242,11 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) (repo_image, tag_image, image_json) = result + # Check for torrent. If found, we return a torrent for the repo verb image (if the derived + # image already exists). + if request.accept_mimetypes.best == 'application/x-bittorrent': + return _torrent_repo_verb(repo_image, tag, verb, **kwargs) + # Log the action. track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) @@ -215,10 +283,13 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= database.close_db_filter(None) hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE']) + def _store_metadata_and_cleanup(): with database.UseThenDisconnect(app.config): model.storage.save_torrent_info(derived, app.config['TORRENT_PIECE_SIZE'], hasher.final_piece_hashes()) + derived.image_size = hasher.hashed_bytes + derived.save() # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. @@ -321,26 +392,4 @@ def get_tag_torrent(namespace, repo_name, digest): except model.BlobDoesNotExist: abort(404) - path = model.storage.get_layer_path(blob) - webseed = storage.get_direct_download_url(blob.locations, path) - if webseed is None: - # We cannot support webseeds for storages that cannot provide direct downloads. - abort(make_response('Storage engine does not support seeding.', 501)) - - try: - torrent_info = model.storage.get_torrent_info(blob) - except model.TorrentInfoDoesNotExist: - abort(404) - - if public_repo: - name = public_torrent_filename(blob.uuid) - else: - name = per_user_torrent_filename(user.uuid, blob.uuid) - - torrent_file = make_torrent(name, webseed, blob.image_size, - torrent_info.piece_length, torrent_info.pieces) - - headers = {'Content-Type': 'application/x-bittorrent', - 'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)} - - return make_response(torrent_file, 200, headers) + return _torrent_for_storage(blob, public_repo) diff --git a/test/registry_tests.py b/test/registry_tests.py index a4922c295..d5d801c53 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -1213,7 +1213,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC def get_squashed_image(self): response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig') tar = tarfile.open(fileobj=StringIO(response.content)) - return tar + return tar, response.content def test_squashed_changes(self): initial_images = [ @@ -1228,7 +1228,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' # Pull the squashed version of the tag. - tar = self.get_squashed_image() + tar, _ = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) # Change the images. @@ -1243,7 +1243,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac' # Pull the squashed version of the tag and ensure it has changed. - tar = self.get_squashed_image() + tar, _ = self.get_squashed_image() self.assertTrue(updated_image_id in tar.getnames()) @@ -1264,7 +1264,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC # Pull the squashed version of the tag. initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' - tar = self.get_squashed_image() + tar, _ = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) @@ -1293,7 +1293,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC '%s/VERSION' % expected_image_id, '%s/layer.tar' % expected_image_id] - tar = self.get_squashed_image() + tar, _ = self.get_squashed_image() self.assertEquals(expected_names, tar.getnames()) self.assertEquals('1.0', tar.extractfile(tar.getmember('%s/VERSION' % expected_image_id)).read()) @@ -1308,6 +1308,58 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC image_contents = layer_tar.extractfile(layer_tar.getmember('contents')).read() self.assertEquals('the latest image', image_contents) + def test_squashed_torrent(self): + initial_images = [ + { + 'id': 'initialid', + 'contents': 'the initial image', + }, + ] + + # Create the repo. + self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) + initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' + + # Try to pull the torrent of the squashed image. This should fail with a 404 since the + # squashed image doesn't yet exist. + self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth=('devtable', 'password'), + headers=dict(accept='application/x-bittorrent'), + expected_code=404) + + # Pull the squashed version of the tag. + tar, squashed = self.get_squashed_image() + self.assertTrue(initial_image_id in tar.getnames()) + + # Enable direct download URLs in fake storage. + self.conduct('POST', '/__test/fakestoragedd/true') + + # Pull the torrent. + response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', + auth=('devtable', 'password'), + headers=dict(accept='application/x-bittorrent')) + + # Disable direct download URLs in fake storage. + self.conduct('POST', '/__test/fakestoragedd/false') + + # Ensure the torrent is valid. + contents = bencode.bdecode(response.content) + + # Ensure that there is a webseed. + self.assertEquals(contents['url-list'], 'http://somefakeurl') + + # Ensure there is an announce and some pieces. + self.assertIsNotNone(contents.get('info', {}).get('pieces')) + self.assertIsNotNone(contents.get('announce')) + + # Ensure the SHA1 matches the generated tar. + sha = resumablehashlib.sha1() + sha.update(squashed) + + expected = binascii.hexlify(sha.digest()) + found = binascii.hexlify(contents['info']['pieces']) + + self.assertEquals(expected, found) + class LoginTests(object): """ Generic tests for registry login. """ diff --git a/test/testconfig.py b/test/testconfig.py index 62646291e..0146a0988 100644 --- a/test/testconfig.py +++ b/test/testconfig.py @@ -53,6 +53,7 @@ class TestConfig(DefaultConfig): LICENSE_EXPIRATION_WARNING = datetime.now() + timedelta(weeks=520) FEATURE_GITHUB_BUILD = True + FEATURE_BITTORRENT = True CLOUDWATCH_NAMESPACE = None diff --git a/util/registry/torrent.py b/util/registry/torrent.py index 9fdc391ab..70746de68 100644 --- a/util/registry/torrent.py +++ b/util/registry/torrent.py @@ -98,6 +98,10 @@ class PieceHasher(object): self._current_offset += to_hash_len buf_offset += to_hash_len + @property + def hashed_bytes(self): + return self._current_offset + def _piece_length_remaining(self): return self._piece_size - (self._current_offset % self._piece_size)