Merge pull request #1156 from coreos-inc/torrentverb

Add support for torrenting verbs
This commit is contained in:
josephschorr 2016-01-20 18:15:52 -05:00
commit cd81d1e4f5
4 changed files with 135 additions and 29 deletions

View file

@ -2,7 +2,7 @@ import logging
import json import json
import hashlib import hashlib
from flask import redirect, Blueprint, abort, send_file, make_response from flask import redirect, Blueprint, abort, send_file, make_response, request
import features import features
@ -121,6 +121,69 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location
done_uploading.save() done_uploading.save()
def _torrent_for_storage(storage_ref, is_public):
""" Returns a response containing the torrent file contents for the given storage. May abort
with an error if the state is not valid (e.g. non-public, non-user request).
"""
# Make sure the storage has a size.
if not storage_ref.image_size:
abort(404)
# Lookup the torrent information for the storage.
try:
torrent_info = model.storage.get_torrent_info(storage_ref)
except model.TorrentInfoDoesNotExist:
abort(404)
# Lookup the webseed path for the storage.
path = model.storage.get_layer_path(storage_ref)
webseed = storage.get_direct_download_url(storage_ref.locations, path)
if webseed is None:
# We cannot support webseeds for storages that cannot provide direct downloads.
abort(make_response('Storage engine does not support seeding.', 501))
# Build the filename for the torrent.
if is_public:
name = public_torrent_filename(storage_ref.uuid)
else:
user = get_authenticated_user()
if not user:
abort(403)
name = per_user_torrent_filename(user.uuid, storage_ref.uuid)
# Return the torrent file.
torrent_file = make_torrent(name, webseed, storage_ref.image_size,
torrent_info.piece_length, torrent_info.pieces)
headers = {'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
return make_response(torrent_file, 200, headers)
def _torrent_repo_verb(repo_image, tag, verb, **kwargs):
""" Handles returning a torrent for the given verb on the given image and tag. """
if not features.BITTORRENT:
# Torrent feature is not enabled.
abort(406)
# Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist,
# we cannot create it here, so we 404.
derived = model.image.find_derived_storage_for_image(repo_image, verb)
if not derived:
abort(406)
# Return the torrent.
public_repo = model.repository.is_repository_public(repo_image.repository)
torrent = _torrent_for_storage(derived, public_repo)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs)
return torrent
def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None):
permission = ReadRepositoryPermission(namespace, repository) permission = ReadRepositoryPermission(namespace, repository)
@ -179,6 +242,11 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result (repo_image, tag_image, image_json) = result
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
# image already exists).
if request.accept_mimetypes.best == 'application/x-bittorrent':
return _torrent_repo_verb(repo_image, tag, verb, **kwargs)
# Log the action. # Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
@ -215,10 +283,13 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
database.close_db_filter(None) database.close_db_filter(None)
hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE']) hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'])
def _store_metadata_and_cleanup(): def _store_metadata_and_cleanup():
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
model.storage.save_torrent_info(derived, app.config['TORRENT_PIECE_SIZE'], model.storage.save_torrent_info(derived, app.config['TORRENT_PIECE_SIZE'],
hasher.final_piece_hashes()) hasher.final_piece_hashes())
derived.image_size = hasher.hashed_bytes
derived.save()
# Create a queue process to generate the data. The queue files will read from the process # Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage. # and send the results to the client and storage.
@ -321,26 +392,4 @@ def get_tag_torrent(namespace, repo_name, digest):
except model.BlobDoesNotExist: except model.BlobDoesNotExist:
abort(404) abort(404)
path = model.storage.get_layer_path(blob) return _torrent_for_storage(blob, public_repo)
webseed = storage.get_direct_download_url(blob.locations, path)
if webseed is None:
# We cannot support webseeds for storages that cannot provide direct downloads.
abort(make_response('Storage engine does not support seeding.', 501))
try:
torrent_info = model.storage.get_torrent_info(blob)
except model.TorrentInfoDoesNotExist:
abort(404)
if public_repo:
name = public_torrent_filename(blob.uuid)
else:
name = per_user_torrent_filename(user.uuid, blob.uuid)
torrent_file = make_torrent(name, webseed, blob.image_size,
torrent_info.piece_length, torrent_info.pieces)
headers = {'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
return make_response(torrent_file, 200, headers)

View file

@ -1213,7 +1213,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
def get_squashed_image(self): def get_squashed_image(self):
response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig') response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig')
tar = tarfile.open(fileobj=StringIO(response.content)) tar = tarfile.open(fileobj=StringIO(response.content))
return tar return tar, response.content
def test_squashed_changes(self): def test_squashed_changes(self):
initial_images = [ initial_images = [
@ -1228,7 +1228,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc'
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
tar = self.get_squashed_image() tar, _ = self.get_squashed_image()
self.assertTrue(initial_image_id in tar.getnames()) self.assertTrue(initial_image_id in tar.getnames())
# Change the images. # Change the images.
@ -1243,7 +1243,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac' updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac'
# Pull the squashed version of the tag and ensure it has changed. # Pull the squashed version of the tag and ensure it has changed.
tar = self.get_squashed_image() tar, _ = self.get_squashed_image()
self.assertTrue(updated_image_id in tar.getnames()) self.assertTrue(updated_image_id in tar.getnames())
@ -1264,7 +1264,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
# Pull the squashed version of the tag. # Pull the squashed version of the tag.
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc'
tar = self.get_squashed_image() tar, _ = self.get_squashed_image()
self.assertTrue(initial_image_id in tar.getnames()) self.assertTrue(initial_image_id in tar.getnames())
@ -1293,7 +1293,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
'%s/VERSION' % expected_image_id, '%s/VERSION' % expected_image_id,
'%s/layer.tar' % expected_image_id] '%s/layer.tar' % expected_image_id]
tar = self.get_squashed_image() tar, _ = self.get_squashed_image()
self.assertEquals(expected_names, tar.getnames()) self.assertEquals(expected_names, tar.getnames())
self.assertEquals('1.0', tar.extractfile(tar.getmember('%s/VERSION' % expected_image_id)).read()) self.assertEquals('1.0', tar.extractfile(tar.getmember('%s/VERSION' % expected_image_id)).read())
@ -1308,6 +1308,58 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
image_contents = layer_tar.extractfile(layer_tar.getmember('contents')).read() image_contents = layer_tar.extractfile(layer_tar.getmember('contents')).read()
self.assertEquals('the latest image', image_contents) self.assertEquals('the latest image', image_contents)
def test_squashed_torrent(self):
initial_images = [
{
'id': 'initialid',
'contents': 'the initial image',
},
]
# Create the repo.
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc'
# Try to pull the torrent of the squashed image. This should fail with a 404 since the
# squashed image doesn't yet exist.
self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth=('devtable', 'password'),
headers=dict(accept='application/x-bittorrent'),
expected_code=404)
# Pull the squashed version of the tag.
tar, squashed = self.get_squashed_image()
self.assertTrue(initial_image_id in tar.getnames())
# Enable direct download URLs in fake storage.
self.conduct('POST', '/__test/fakestoragedd/true')
# Pull the torrent.
response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest',
auth=('devtable', 'password'),
headers=dict(accept='application/x-bittorrent'))
# Disable direct download URLs in fake storage.
self.conduct('POST', '/__test/fakestoragedd/false')
# Ensure the torrent is valid.
contents = bencode.bdecode(response.content)
# Ensure that there is a webseed.
self.assertEquals(contents['url-list'], 'http://somefakeurl')
# Ensure there is an announce and some pieces.
self.assertIsNotNone(contents.get('info', {}).get('pieces'))
self.assertIsNotNone(contents.get('announce'))
# Ensure the SHA1 matches the generated tar.
sha = resumablehashlib.sha1()
sha.update(squashed)
expected = binascii.hexlify(sha.digest())
found = binascii.hexlify(contents['info']['pieces'])
self.assertEquals(expected, found)
class LoginTests(object): class LoginTests(object):
""" Generic tests for registry login. """ """ Generic tests for registry login. """

View file

@ -53,6 +53,7 @@ class TestConfig(DefaultConfig):
LICENSE_EXPIRATION_WARNING = datetime.now() + timedelta(weeks=520) LICENSE_EXPIRATION_WARNING = datetime.now() + timedelta(weeks=520)
FEATURE_GITHUB_BUILD = True FEATURE_GITHUB_BUILD = True
FEATURE_BITTORRENT = True
CLOUDWATCH_NAMESPACE = None CLOUDWATCH_NAMESPACE = None

View file

@ -98,6 +98,10 @@ class PieceHasher(object):
self._current_offset += to_hash_len self._current_offset += to_hash_len
buf_offset += to_hash_len buf_offset += to_hash_len
@property
def hashed_bytes(self):
return self._current_offset
def _piece_length_remaining(self): def _piece_length_remaining(self):
return self._piece_size - (self._current_offset % self._piece_size) return self._piece_size - (self._current_offset % self._piece_size)