diff --git a/TODO.md b/TODO.md new file mode 100644 index 000000000..53fbdc5b2 --- /dev/null +++ b/TODO.md @@ -0,0 +1,15 @@ +- Convert the flattened image generator to use the database ancestry instead of the json file +- Convert verbs to load json from either db or storage +- Convert verbs to work with v1 and cas layer storage locations +- Fix all tests +- Fix uncompressed size backfill +- File issue to move queries out of uncompressed size backfill and use subquery random +- Consider removing the new jwest dependency +- Update the max fresh on registry tokens, 300s is not long enough to complete all registry actions +- Fix the sizes stored in the db +- Make sure we handle more of the v2 api than just what is required to push and pull +- Handle registry API error conditions +- Fill in the registry v2 methods on other storage engines +- Write a script to backfill the json metadata +- Verify the manifest, and throw the proper error if unverified +- Convert uploads to get locked to a placement, e.g. once an upload starts, all communication goes through that replica diff --git a/app.py b/app.py index 4b2e71c66..36047425c 100644 --- a/app.py +++ b/app.py @@ -43,10 +43,10 @@ CONFIG_PROVIDER = FileConfigProvider(OVERRIDE_CONFIG_DIRECTORY, 'config.yaml', ' app = Flask(__name__) logger = logging.getLogger(__name__) + class RegexConverter(BaseConverter): def __init__(self, url_map, *items): super(RegexConverter, self).__init__(url_map) - logger.debug('Installing regex converter with regex: %s', items[0]) self.regex = items[0] diff --git a/config.py b/config.py index 78d614b80..7163b3cd6 100644 --- a/config.py +++ b/config.py @@ -42,7 +42,6 @@ class DefaultConfig(object): LOGGING_LEVEL = 'DEBUG' SEND_FILE_MAX_AGE_DEFAULT = 0 - POPULATE_DB_TEST_DATA = True PREFERRED_URL_SCHEME = 'http' SERVER_HOSTNAME = 'localhost:5000' diff --git a/data/database.py b/data/database.py index 225b0c11c..ccfad8b82 100644 --- a/data/database.py +++ b/data/database.py @@ -3,6 +3,9 @@ import logging import uuid import time import toposort +import base64 +import resumablehashlib +import json from random import SystemRandom from datetime import datetime @@ -343,7 +346,7 @@ class Repository(BaseModel): # These models don't need to use transitive deletes, because the referenced objects # are cleaned up directly - skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger} + skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload} # We need to sort the ops so that models get cleaned in order of their dependencies ops = reversed(list(self.dependencies(delete_nullable))) @@ -485,6 +488,7 @@ class ImageStorage(BaseModel): uncompressed_size = BigIntegerField(null=True) aggregate_size = BigIntegerField(null=True) uploading = BooleanField(default=True, null=True) + cas_path = BooleanField(default=True) class ImageStorageTransformation(BaseModel): @@ -552,6 +556,12 @@ class Image(BaseModel): storage = ForeignKeyField(ImageStorage, index=True, null=True) + created = DateTimeField(null=True) + comment = TextField(null=True) + command = TextField(null=True) + aggregate_size = BigIntegerField(null=True) + v1_json_metadata = TextField(null=True) + class Meta: database = db read_slaves = (read_slave,) @@ -740,6 +750,44 @@ class RepositoryAuthorizedEmail(BaseModel): ) +class ResumableSHAField(TextField): + def db_value(self, value): + sha_state = value.state() + + # One of the fields is a byte string, let's base64 encode it to make sure + # we can store and fetch it regardless of default collocation + sha_state[3] = base64.b64encode(sha_state[3]) + + return json.dumps(sha_state) + + def python_value(self, value): + to_resume = resumablehashlib.sha256() + if value is None: + return to_resume + + sha_state = json.loads(value) + + # We need to base64 decode the data bytestring + sha_state[3] = base64.b64decode(sha_state[3]) + to_resume.set_state(sha_state) + return to_resume + + +class BlobUpload(BaseModel): + repository = ForeignKeyField(Repository, index=True) + uuid = CharField(index=True, unique=True) + byte_count = IntegerField(default=0) + sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256) + location = ForeignKeyField(ImageStorageLocation) + + class Meta: + database = db + read_slaves = (read_slave,) + indexes = ( + # create a unique index on email and repository + (('repository', 'uuid'), True), + ) + all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem, @@ -750,4 +798,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification, RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, - AccessTokenKind, Star, RepositoryActionCount, TagManifest] + AccessTokenKind, Star, RepositoryActionCount, TagManifest, BlobUpload] diff --git a/data/model/__init__.py b/data/model/__init__.py index 8c1214c54..771801009 100644 --- a/data/model/__init__.py +++ b/data/model/__init__.py @@ -9,6 +9,10 @@ class BlobDoesNotExist(DataModelException): pass +class InvalidBlobUpload(DataModelException): + pass + + class InvalidEmailAddressException(DataModelException): pass @@ -65,6 +69,10 @@ class InvalidTeamMemberException(DataModelException): pass +class InvalidManifestException(DataModelException): + pass + + class TooManyLoginAttemptsException(Exception): def __init__(self, message, retry_after): super(TooManyLoginAttemptsException, self).__init__(message) diff --git a/data/model/blob.py b/data/model/blob.py index 4bad62584..e97539b66 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -1,8 +1,8 @@ from uuid import uuid4 -from data.model import tag, _basequery, BlobDoesNotExist, db_transaction +from data.model import tag, _basequery, BlobDoesNotExist, InvalidBlobUpload, db_transaction from data.database import (Repository, Namespace, ImageStorage, Image, ImageStorageLocation, - ImageStoragePlacement) + ImageStoragePlacement, BlobUpload) def get_repo_blob_by_digest(namespace, repo_name, blob_digest): @@ -15,9 +15,9 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest): .join(ImageStorage) .join(Image) .join(Repository) - .join(Namespace) + .join(Namespace, on=(Namespace.id == Repository.namespace_user)) .where(Repository.name == repo_name, Namespace.username == namespace, - ImageStorage.checksum == blob_digest)) + ImageStorage.checksum == blob_digest, ImageStorage.uploading == False)) if not placements: raise BlobDoesNotExist('Blob does not exist with digest: {0}'.format(blob_digest)) @@ -26,24 +26,45 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest): return found -def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_name, + +def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, link_expiration_s): """ Store a record of the blob and temporarily link it to the specified repository. """ random_image_name = str(uuid4()) - with db_transaction: + with db_transaction(): repo = _basequery.get_existing_repository(namespace, repo_name) - try: storage = ImageStorage.get(checksum=blob_digest) - location = ImageStorageLocation.get(name=location_name) - ImageStoragePlacement.get(storage=storage, location=location) + ImageStoragePlacement.get(storage=storage, location=location_obj) except ImageStorage.DoesNotExist: - storage = ImageStorage.create(checksum=blob_digest) + storage = ImageStorage.create(checksum=blob_digest, uploading=False) + ImageStoragePlacement.create(storage=storage, location=location_obj) except ImageStoragePlacement.DoesNotExist: - ImageStoragePlacement.create(storage=storage, location=location) + ImageStoragePlacement.create(storage=storage, location=location_obj) # Create a temporary link into the repository, to be replaced by the v1 metadata later # and create a temporary tag to reference it image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo) tag.create_temporary_hidden_tag(repo, image, link_expiration_s) + + +def get_blob_upload(namespace, repo_name, upload_uuid): + """ Load the upload which is already in progress. + """ + try: + return (BlobUpload + .select() + .join(Repository) + .join(Namespace, on=(Namespace.id == Repository.namespace_user)) + .where(Repository.name == repo_name, Namespace.username == namespace, + BlobUpload.uuid == upload_uuid) + .get()) + except BlobUpload.DoesNotExist: + raise InvalidBlobUpload() + + +def initiate_upload(namespace, repo_name, uuid, location_name): + repo = _basequery.get_existing_repository(namespace, repo_name) + location = ImageStorageLocation.get(name=location_name) + return BlobUpload.create(repository=repo, location=location, uuid=uuid) diff --git a/data/model/image.py b/data/model/image.py index 0da208b46..c1f1eba38 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -4,7 +4,8 @@ import dateutil.parser from peewee import JOIN_LEFT_OUTER, fn from datetime import datetime -from data.model import DataModelException, db_transaction, _basequery, storage +from data.model import (DataModelException, db_transaction, _basequery, storage, + InvalidImageException) from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, ImageStorageLocation, RepositoryPermission, db_for_update) @@ -247,7 +248,7 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations, return repo_image logger.debug('Creating new storage for docker id: %s', docker_image_id) - new_storage = storage.create_storage(preferred_location) + new_storage = storage.create_v1_storage(preferred_location) return Image.create(docker_image_id=docker_image_id, repository=repo_obj, storage=new_storage, @@ -255,7 +256,7 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations, def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, - command, parent=None): + command, v1_json_metadata, parent=None): with db_transaction(): query = (Image .select(Image, ImageStorage) @@ -273,7 +274,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created # We cleanup any old checksum in case it's a retry after a fail fetched.storage.checksum = None - fetched.storage.created = datetime.now() + fetched.created = datetime.now() if created_date_str is not None: try: @@ -282,14 +283,14 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created # parse raises different exceptions, so we cannot use a specific kind of handler here. pass - fetched.storage.comment = comment - fetched.storage.command = command + fetched.comment = comment + fetched.command = command + fetched.v1_json_metadata = v1_json_metadata if parent: fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id) fetched.save() - fetched.storage.save() return fetched @@ -334,8 +335,65 @@ def set_image_size(docker_image_id, namespace_name, repository_name, image_size, return image -def get_image(repo, dockerfile_id): +def get_image(repo, docker_image_id): try: - return Image.get(Image.docker_image_id == dockerfile_id, Image.repository == repo) + return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo) except Image.DoesNotExist: return None + + +def get_repo_image_by_storage_checksum(namespace, repository_name, storage_checksum): + try: + return (Image + .select() + .join(ImageStorage) + .switch(Image) + .join(Repository) + .join(Namespace, on=(Namespace.id == Repository.namespace_user)) + .where(Repository.name == repository_name, Namespace.username == namespace, + ImageStorage.checksum == storage_checksum, ImageStorage.uploading == False) + .get()) + except Image.DoesNotExist: + msg = 'Image with storage checksum {0} does not exist in repo {1}/{2}'.format(storage_checksum, + namespace, + repository_name) + raise InvalidImageException(msg) + + +def synthesize_v1_image(namespace, repository_name, storage_checksum, docker_image_id, + created_date_str, comment, command, v1_json_metadata, parent_docker_id): + """ Find an existing image with this docker image id, and if none exists, write one with the + specified metadata. + """ + + repo = _basequery.get_existing_repository(namespace, repository_name) + # Sometimes the manifest may reference an image that already exists + + found = get_image(repo, docker_image_id) + if found is not None: + # The image already exists, nothing to do + return found + + the_bits = storage.get_repo_storage_by_checksum(namespace, repository_name, storage_checksum) + + ancestors = '/' + if parent_docker_id is not None: + parent = get_repo_image(namespace, repository_name, parent_docker_id) + if parent is None: + msg = 'Parent not found with docker image id {0} in repo {1}/{2}'.format(parent_docker_id, + namespace, + repository_name) + raise InvalidImageException(msg) + ancestors = '{0}{1}/'.format(parent.ancestors, parent.id) + + created = None + if created_date_str is not None: + try: + created = dateutil.parser.parse(created_date_str).replace(tzinfo=None) + except: + # parse raises different exceptions, so we cannot use a specific kind of handler here. + pass + + return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment, + command=command, v1_json_metadata=v1_json_metadata, created=created, + storage=the_bits, repository=repo) diff --git a/data/model/storage.py b/data/model/storage.py index d1ab07b85..8d0c05bdf 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -5,7 +5,7 @@ from peewee import JOIN_LEFT_OUTER, fn from data.model import config, db_transaction, InvalidImageException from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, - ImageStorageSignatureKind) + ImageStorageSignatureKind, Repository, Namespace) logger = logging.getLogger(__name__) @@ -18,7 +18,7 @@ def find_or_create_derived_storage(source, transformation_name, preferred_locati logger.debug('Creating storage dervied from source: %s', source.uuid) trans = ImageStorageTransformation.get(name=transformation_name) - new_storage = create_storage(preferred_location) + new_storage = create_v1_storage(preferred_location) DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans) return new_storage @@ -117,8 +117,8 @@ def garbage_collect_storage(storage_id_whitelist): config.store.remove({location_name}, image_path) -def create_storage(location_name): - storage = ImageStorage.create() +def create_v1_storage(location_name): + storage = ImageStorage.create(cas_path=False) location = ImageStorageLocation.get(name=location_name) ImageStoragePlacement.create(location=location, storage=storage) storage.locations = {location_name} @@ -138,10 +138,9 @@ def lookup_storage_signature(storage, signature_kind): kind = ImageStorageSignatureKind.get(name=signature_kind) try: return (ImageStorageSignature - .select() - .where(ImageStorageSignature.storage == storage, - ImageStorageSignature.kind == kind) - .get()) + .select() + .where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind) + .get()) except ImageStorageSignature.DoesNotExist: return None @@ -149,12 +148,12 @@ def lookup_storage_signature(storage, signature_kind): def find_derived_storage(source, transformation_name): try: found = (ImageStorage - .select(ImageStorage, DerivedImageStorage) - .join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative)) - .join(ImageStorageTransformation) - .where(DerivedImageStorage.source == source, - ImageStorageTransformation.name == transformation_name) - .get()) + .select(ImageStorage, DerivedImageStorage) + .join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative)) + .join(ImageStorageTransformation) + .where(DerivedImageStorage.source == source, + ImageStorageTransformation.name == transformation_name) + .get()) found.locations = {placement.location.name for placement in found.imagestorageplacement_set} return found @@ -176,16 +175,17 @@ def delete_derived_storage_by_uuid(storage_uuid): image_storage.delete_instance(recursive=True) -def get_storage_by_uuid(storage_uuid): - placements = list(ImageStoragePlacement - .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation) - .join(ImageStorageLocation) - .switch(ImageStoragePlacement) - .join(ImageStorage) - .where(ImageStorage.uuid == storage_uuid)) +def _get_storage(query_modifier): + query = (ImageStoragePlacement + .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation) + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage)) + + placements = list(query_modifier(query)) if not placements: - raise InvalidImageException('No storage found with uuid: %s', storage_uuid) + raise InvalidImageException() found = placements[0].storage found.locations = {placement.location.name for placement in placements} @@ -193,3 +193,26 @@ def get_storage_by_uuid(storage_uuid): return found +def get_storage_by_uuid(storage_uuid): + def filter_to_uuid(query): + return query.where(ImageStorage.uuid == storage_uuid) + + try: + return _get_storage(filter_to_uuid) + except InvalidImageException: + raise InvalidImageException('No storage found with uuid: %s', storage_uuid) + + +def get_repo_storage_by_checksum(namespace, repository_name, checksum): + def filter_to_repo_and_checksum(query): + return (query + .join(Image) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(Repository.name == repository_name, Namespace.username == namespace, + ImageStorage.checksum == checksum)) + + try: + return _get_storage(filter_to_repo_and_checksum) + except InvalidImageException: + raise InvalidImageException('No storage found with checksum {0}'.format(checksum)) diff --git a/data/model/tag.py b/data/model/tag.py index 199d99704..b020f019a 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -1,7 +1,8 @@ from uuid import uuid4 -from data.model import image, db_transaction, DataModelException, _basequery -from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, +from data.model import (image, db_transaction, DataModelException, _basequery, + InvalidManifestException) +from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest, get_epoch_timestamp, db_for_update) @@ -36,8 +37,8 @@ def list_repository_tags(namespace_name, repository_name, include_hidden=False, return query -def create_or_update_tag(namespace_name, repository_name, tag_name, - tag_docker_image_id, reversion=False): +def create_or_update_tag(namespace_name, repository_name, tag_name, tag_docker_image_id, + reversion=False): try: repo = _basequery.get_existing_repository(namespace_name, repository_name) except Repository.DoesNotExist: @@ -160,3 +161,55 @@ def revert_tag(repo_obj, tag_name, docker_image_id): return create_or_update_tag(repo_obj.namespace_user.username, repo_obj.name, tag_name, docker_image_id, reversion=True) + +def store_tag_manifest(namespace, repo_name, tag_name, docker_image_id, manifest_digest, + manifest_data): + tag = create_or_update_tag(namespace, repo_name, tag_name, docker_image_id) + return TagManifest.create(tag=tag, digest=manifest_digest, json_data=manifest_data) + + +def _get_active_tag(namespace, repo_name, tag_name): + return _tag_alive(RepositoryTag + .select() + .join(Image) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(RepositoryTag.name == tag_name, Repository.name == repo_name, + Namespace.username == namespace)).get() + + +def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest_digest, + manifest_data): + tag = _get_active_tag(namespace, repo_name, tag_name) + return TagManifest.create(tag=tag, digest=manifest_digest, json_data=manifest_data) + + +def load_tag_manifest(namespace, repo_name, tag_name): + try: + return (_load_repo_manifests(namespace, repo_name) + .where(RepositoryTag.name == tag_name) + .get()) + except TagManifest.DoesNotExist: + msg = 'Manifest not found for tag {0} in repo {1}/{2}'.format(tag_name, namespace, repo_name) + raise InvalidManifestException(msg) + + +def load_manifest_by_digest(namespace, repo_name, digest): + try: + return (_load_repo_manifests(namespace, repo_name) + .where(TagManifest.digest == digest) + .get()) + except TagManifest.DoesNotExist: + msg = 'Manifest not found with digest {0} in repo {1}/{2}'.format(digest, namespace, repo_name) + raise InvalidManifestException(msg) + + +def _load_repo_manifests(namespace, repo_name): + return (TagManifest + .select(TagManifest, RepositoryTag) + .join(RepositoryTag) + .join(Image) + .join(Repository) + .join(Namespace, on=(Namespace.id == Repository.namespace_user)) + .where(Repository.name == repo_name, Namespace.username == namespace)) + diff --git a/digest/checksums.py b/digest/checksums.py index 154907823..ea30e4dc1 100644 --- a/digest/checksums.py +++ b/digest/checksums.py @@ -68,7 +68,7 @@ def compute_tarsum(fp, json_data): def simple_checksum_handler(json_data): - h = hashlib.sha256(json_data + '\n') + h = hashlib.sha256(json_data.encode('utf8') + '\n') def fn(buf): h.update(buf) diff --git a/digest/digest_tools.py b/digest/digest_tools.py index efebac831..6c3f444dc 100644 --- a/digest/digest_tools.py +++ b/digest/digest_tools.py @@ -2,33 +2,61 @@ import re import os.path import hashlib -from collections import namedtuple - - -Digest = namedtuple('Digest', ['is_tarsum', 'tarsum_version', 'hash_alg', 'hash_bytes']) - DIGEST_PATTERN = r'(tarsum\.(v[\w]+)\+)?([\w]+):([0-9a-f]+)' -DIGEST_REGEX = re.compile(DIGEST_PATTERN) class InvalidDigestException(RuntimeError): pass -def parse_digest(digest): - """ Returns the digest parsed out to its components. """ - match = DIGEST_REGEX.match(digest) - if match is None or match.end() != len(digest): - raise InvalidDigestException('Not a valid digest: %s', digest) +class Digest(object): + DIGEST_REGEX = re.compile(DIGEST_PATTERN) - is_tarsum = match.group(1) is not None - return Digest(is_tarsum, match.group(2), match.group(3), match.group(4)) + def __init__(self, hash_alg, hash_bytes, is_tarsum=False, tarsum_version=None): + self._hash_alg = hash_alg + self._hash_bytes = hash_bytes + self._is_tarsum = is_tarsum + self._tarsum_version = tarsum_version + + def __str__(self): + if self._is_tarsum: + return 'tarsum.{0}+{1}:{2}'.format(self._tarsum_version, self._hash_alg, self._hash_bytes) + return '{0}:{1}'.format(self._hash_alg, self._hash_bytes) + + def __eq__(self, rhs): + return isinstance(rhs, Digest) and str(self) == str(rhs) + + @staticmethod + def parse_digest(digest): + """ Returns the digest parsed out to its components. """ + match = Digest.DIGEST_REGEX.match(digest) + if match is None or match.end() != len(digest): + raise InvalidDigestException('Not a valid digest: %s', digest) + + is_tarsum = match.group(1) is not None + return Digest(match.group(3), match.group(4), is_tarsum, match.group(2)) + + @property + def is_tarsum(self): + return self._is_tarsum + + @property + def tarsum_version(self): + return self._tarsum_version + + @property + def hash_alg(self): + return self._hash_alg + + @property + def hash_bytes(self): + return self._hash_bytes def content_path(digest): """ Returns a relative path to the parsed digest. """ - parsed = parse_digest(digest) + parsed = Digest.parse_digest(digest) components = [] if parsed.is_tarsum: @@ -58,7 +86,11 @@ def sha256_digest_from_generator(content_generator): return 'sha256:{0}'.format(digest.hexdigest()) +def sha256_digest_from_hashlib(sha256_hash_obj): + return 'sha256:{0}'.format(sha256_hash_obj.hexdigest()) + + def digests_equal(lhs_digest_string, rhs_digest_string): """ Parse and compare the two digests, returns True if the digests are equal, False otherwise. """ - return parse_digest(lhs_digest_string) == parse_digest(rhs_digest_string) + return Digest.parse_digest(lhs_digest_string) == Digest.parse_digest(rhs_digest_string) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 2241a6089..42ad34ba1 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -12,6 +12,7 @@ from auth.auth_context import get_authenticated_user, get_grant_user_context from digest import checksums from util.registry import changes from util.http import abort, exact_abort +from util.registry.filelike import SocketReader from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) from data import model, database @@ -23,26 +24,6 @@ from endpoints.decorators import anon_protect logger = logging.getLogger(__name__) -class SocketReader(object): - def __init__(self, fp): - self._fp = fp - self.handlers = [] - - def add_handler(self, handler): - self.handlers.append(handler) - - def read(self, n=-1): - buf = self._fp.read(n) - if not buf: - return '' - for handler in self.handlers: - handler(buf) - return buf - - def tell(self): - raise IOError('Stream is not seekable.') - - def image_is_uploading(repo_image): if repo_image is None: return False @@ -145,9 +126,11 @@ def get_image_layer(namespace, repository, image_id, headers): abort(404, 'Image %(image_id)s not found', issue='unknown-image', image_id=image_id) - logger.debug('Looking up the layer path') try: - path = store.image_layer_path(repo_image.storage.uuid) + path = store.blob_path(repo_image.storage.checksum) + if not repo_image.storage.cas_path: + path = store.v1_image_layer_path(repo_image.storage.uuid) + logger.info('Serving legacy v1 image from path: %s', path) logger.debug('Looking up the direct download URL') direct_download_url = store.get_direct_download_url(repo_image.storage.locations, path) @@ -186,14 +169,15 @@ def put_image_layer(namespace, repository, image_id): try: logger.debug('Retrieving image data') uuid = repo_image.storage.uuid - json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) + json_data = (repo_image.v1_json_metadata or + store.get_content(repo_image.storage.locations, store.image_json_path(uuid))) except (IOError, AttributeError): logger.exception('Exception when retrieving image data') abort(404, 'Image %(image_id)s not found', issue='unknown-image', image_id=image_id) - logger.debug('Retrieving image path info') - layer_path = store.image_layer_path(uuid) + layer_path = store.v1_image_layer_path(uuid) + logger.info('Storing layer at v1 path: %s', layer_path) if (store.exists(repo_image.storage.locations, layer_path) and not image_is_uploading(repo_image)): @@ -315,7 +299,8 @@ def put_image_checksum(namespace, repository, image_id): uuid = repo_image.storage.uuid logger.debug('Looking up repo layer data') - if not store.exists(repo_image.storage.locations, store.image_json_path(uuid)): + if (repo_image.v1_json_metadata is None and + not store.exists(repo_image.storage.locations, store.image_json_path(uuid))): abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) logger.debug('Marking image path') @@ -369,13 +354,17 @@ def get_image_json(namespace, repository, image_id, headers): logger.debug('Looking up repo layer data') try: uuid = repo_image.storage.uuid - data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) + data = (repo_image.v1_json_metadata or + store.get_content(repo_image.storage.locations, store.image_json_path(uuid))) except (IOError, AttributeError): flask_abort(404) logger.debug('Looking up repo layer size') size = repo_image.storage.image_size - headers['X-Docker-Size'] = str(size) + + headers['Content-Type'] = 'application/json' + if size is not None: + headers['X-Docker-Size'] = str(size) response = make_response(data, 200) response.headers.extend(headers) @@ -394,37 +383,18 @@ def get_image_ancestry(namespace, repository, image_id, headers): if not permission.can() and not model.repository.repository_is_public(namespace, repository): abort(403) - logger.debug('Looking up repo image') - repo_image = model.image.get_repo_image_extended(namespace, repository, image_id) + image = model.image.get_image_by_id(namespace, repository, image_id) + parents = model.image.get_parent_images(namespace, repository, image) - logger.debug('Looking up image data') - try: - uuid = repo_image.storage.uuid - data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) - except (IOError, AttributeError): - abort(404, 'Image %(image_id)s not found', issue='unknown-image', - image_id=image_id) + ancestry_docker_ids = [image.docker_image_id] + ancestry_docker_ids.extend([parent.docker_image_id for parent in reversed(parents)]) - logger.debug('Converting to <-> from JSON') - response = make_response(json.dumps(json.loads(data)), 200) + # We can not use jsonify here because we are returning a list not an object + response = make_response(json.dumps(ancestry_docker_ids), 200) response.headers.extend(headers) - - logger.debug('Done') return response -def generate_ancestry(image_id, uuid, locations, parent_id=None, parent_uuid=None, - parent_locations=None): - if not parent_id: - store.put_content(locations, store.image_ancestry_path(uuid), json.dumps([image_id])) - return - - data = store.get_content(parent_locations, store.image_ancestry_path(parent_uuid)) - data = json.loads(data) - data.insert(0, image_id) - store.put_content(locations, store.image_ancestry_path(uuid), json.dumps(data)) - - def store_checksum(image_storage, checksum): checksum_parts = checksum.split(':') if len(checksum_parts) != 2: @@ -447,7 +417,8 @@ def put_image_json(namespace, repository, image_id): logger.debug('Parsing image JSON') try: - data = json.loads(request.data.decode('utf8')) + v1_metadata = request.data + data = json.loads(v1_metadata.decode('utf8')) except ValueError: pass @@ -479,63 +450,38 @@ def put_image_json(namespace, repository, image_id): model.tag.create_temporary_hidden_tag(repo, repo_image, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) - uuid = repo_image.storage.uuid - if image_id != data['id']: abort(400, 'JSON data contains invalid id for image: %(image_id)s', issue='invalid-request', image_id=image_id) - parent_id = data.get('parent') + parent_id = data.get('parent', None) parent_image = None if parent_id: logger.debug('Looking up parent image') parent_image = model.image.get_repo_image_extended(namespace, repository, parent_id) - parent_uuid = parent_image and parent_image.storage.uuid - parent_locations = parent_image and parent_image.storage.locations + if not parent_image or parent_image.storage.uploading: + abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s', + issue='invalid-request', image_id=image_id, parent_id=parent_id) - if parent_id: - logger.debug('Looking up parent image data') - - if (parent_id and not - store.exists(parent_locations, store.image_json_path(parent_uuid))): - abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s', - issue='invalid-request', image_id=image_id, parent_id=parent_id) - - logger.debug('Looking up image storage paths') - json_path = store.image_json_path(uuid) - - logger.debug('Checking if image already exists') - if (store.exists(repo_image.storage.locations, json_path) and not - image_is_uploading(repo_image)): + json_path = store.image_json_path(repo_image.storage.uuid) + if (not image_is_uploading(repo_image) and + (repo_image.v1_json_metadata is not None or + store.exists(repo_image.storage.locations, json_path))): exact_abort(409, 'Image already exists') set_uploading_flag(repo_image, True) # If we reach that point, it means that this is a new image or a retry - # on a failed push - # save the metadata + # on a failed push, save the metadata command_list = data.get('container_config', {}).get('Cmd', None) command = json.dumps(command_list) if command_list else None logger.debug('Setting image metadata') model.image.set_image_metadata(image_id, namespace, repository, data.get('created'), - data.get('comment'), command, parent_image) + data.get('comment'), command, v1_metadata, parent_image) - logger.debug('Putting json path') - store.put_content(repo_image.storage.locations, json_path, request.data) - - logger.debug('Generating image ancestry') - - try: - generate_ancestry(image_id, uuid, repo_image.storage.locations, parent_id, parent_uuid, - parent_locations) - except IOError as ioe: - logger.debug('Error when generating ancestry: %s', ioe.message) - abort(404) - - logger.debug('Done') return make_response('true', 200) @@ -572,7 +518,11 @@ def process_image_changes(namespace, repository, image_id): parent_trie.frombytes(parent_trie_bytes) # Read in the file entries from the layer tar file - layer_path = store.image_layer_path(uuid) + layer_path = store.blob_path(repo_image.storage.checksum) + if not repo_image.storage.cas_path: + logger.info('Processing diffs for newly stored v1 image at %s', layer_path) + layer_path = store.v1_image_layer_path(uuid) + with store.stream_read_file(image.storage.locations, layer_path) as layer_tar_stream: removed_files = set() layer_files = changes.files_and_dirs_from_tar(layer_tar_stream, diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index 0de3dc539..e0d49b70c 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -3,11 +3,12 @@ import logging -from flask import Blueprint, make_response, url_for, request +from flask import Blueprint, make_response, url_for, request, jsonify from functools import wraps from urlparse import urlparse from endpoints.decorators import anon_protect, anon_allowed +from endpoints.v2.errors import V2RegistryException from auth.jwt_auth import process_jwt_auth from auth.auth_context import get_grant_user_context from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, @@ -21,6 +22,16 @@ logger = logging.getLogger(__name__) v2_bp = Blueprint('v2', __name__) +@v2_bp.app_errorhandler(V2RegistryException) +def handle_registry_v2_exception(error): + response = jsonify({ + 'errors': [error.as_dict()] + }) + response.status_code = error.http_status_code + logger.debug('sending response: %s', response.get_data()) + return response + + def _require_repo_permission(permission_class, allow_public=False): def wrapper(func): @wraps(func) @@ -67,3 +78,4 @@ def v2_support_enabled(): from endpoints.v2 import v2auth from endpoints.v2 import manifest from endpoints.v2 import blob +from endpoints.v2 import tag diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 1c5639ab0..78d71ec58 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -2,16 +2,20 @@ # XXX time as this notice is removed. import logging +import re -from flask import make_response, url_for, request +from flask import make_response, url_for, request, redirect, Response, abort as flask_abort from app import storage, app -from data import model +from data import model, database from digest import digest_tools from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream +from endpoints.v2.errors import BlobUnknown, BlobUploadInvalid, BlobUploadUnknown from auth.jwt_auth import process_jwt_auth from endpoints.decorators import anon_protect -from util.http import abort +from util.cache import cache_control +from util.registry.filelike import wrap_with_hash +from storage.basestorage import InvalidChunkException logger = logging.getLogger(__name__) @@ -19,6 +23,11 @@ logger = logging.getLogger(__name__) BASE_BLOB_ROUTE = '///blobs/' BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN) +RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$') + + +class _InvalidRangeHeader(Exception): + pass @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @@ -27,21 +36,81 @@ BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN) @anon_protect def check_blob_existence(namespace, repo_name, digest): try: - found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest) + model.image.get_repo_image_by_storage_checksum(namespace, repo_name, digest) # The response body must be empty for a successful HEAD request return make_response('') + except model.InvalidImageException: + raise BlobUnknown() + + +def _base_blob_fetch(namespace, repo_name, digest): + """ Some work that is common to both GET and HEAD requests. Callers MUST check for proper + authorization before calling this method. + """ + try: + found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest) except model.BlobDoesNotExist: - abort(404) + raise BlobUnknown() + + headers = { + 'Docker-Content-Digest': digest, + } + + # Add the Accept-Ranges header if the storage engine supports resumable + # downloads. + if storage.get_supports_resumable_downloads(found.storage.locations): + logger.debug('Storage supports resumable downloads') + headers['Accept-Ranges'] = 'bytes' + + return found, headers + + +@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) +@process_jwt_auth +@require_repo_read +@anon_protect +@cache_control(max_age=31436000) +def check_blob_exists(namespace, repo_name, digest): + _, headers = _base_blob_fetch(namespace, repo_name, digest) + + response = make_response('') + response.headers.extend(headers) + return response @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET']) @process_jwt_auth @require_repo_read @anon_protect +@cache_control(max_age=31536000) def download_blob(namespace, repo_name, digest): - # TODO Implement this - return make_response('') + found, headers = _base_blob_fetch(namespace, repo_name, digest) + + path = storage.blob_path(digest) + if not found.cas_path: + logger.info('Generating legacy v1 path for image: %s', digest) + path = storage.v1_image_layer_path(found.uuid) + + logger.debug('Looking up the direct download URL') + direct_download_url = storage.get_direct_download_url(found.locations, path) + + if direct_download_url: + logger.debug('Returning direct download URL') + resp = redirect(direct_download_url) + resp.headers.extend(headers) + return resp + + logger.debug('Streaming layer data') + + # Close the database handle here for this process before we send the long download. + database.close_db_filter(None) + + return Response(storage.stream_read(found.locations, path), headers=headers) + + +def _render_range(end_byte): + return 'bytes=0-{0}'.format(end_byte) @v2_bp.route('///blobs/uploads/', methods=['POST']) @@ -49,12 +118,135 @@ def download_blob(namespace, repo_name, digest): @require_repo_write @anon_protect def start_blob_upload(namespace, repo_name): - new_upload_uuid = storage.initiate_chunked_upload(storage.preferred_locations[0]) + location_name = storage.preferred_locations[0] + new_upload_uuid = storage.initiate_chunked_upload(location_name) + model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name) + + digest = request.args.get('digest', None) + if digest is None: + # The user will send the blob data in another request + accepted = make_response('', 202) + accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace, + repo_name=repo_name, upload_uuid=new_upload_uuid) + accepted.headers['Range'] = _render_range(0) + accepted.headers['Docker-Upload-UUID'] = new_upload_uuid + return accepted + else: + # The user plans to send us the entire body right now + uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid, range_required=False) + uploaded.save() + + return _finish_upload(namespace, repo_name, uploaded, digest) + + +@v2_bp.route('///blobs/uploads/', methods=['GET']) +@process_jwt_auth +@require_repo_write +@anon_protect +def fetch_existing_upload(namespace, repo_name, upload_uuid): + try: + found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) + except model.InvalidBlobUpload: + raise BlobUploadUnknown() + + accepted = make_response('', 204) + accepted.headers['Range'] = _render_range(found.byte_count) + accepted.headers['Docker-Upload-UUID'] = upload_uuid + return accepted + + +def _current_request_path(): + return '{0}{1}'.format(request.script_root, request.path) + + +def _range_not_satisfiable(valid_end): + invalid_range = make_response('', 416) + invalid_range.headers['Location'] = _current_request_path() + invalid_range.headers['Range'] = '0-{0}'.format(valid_end) + invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid'] + flask_abort(invalid_range) + + +def _parse_range_header(range_header_text, valid_start): + """ Parses the range header, and returns a tuple of the start offset and the length, + or raises an _InvalidRangeHeader exception. + """ + found = RANGE_HEADER_REGEX.match(range_header_text) + if found is None: + raise _InvalidRangeHeader() + + start = int(found.group(1)) + length = int(found.group(2)) - start + + if start != valid_start or length <= 0: + raise _InvalidRangeHeader() + + return (start, length) + + +def _upload_chunk(namespace, repo_name, upload_uuid, range_required): + """ Common code among the various uploading paths for appending data to blobs. + Callers MUST call .save() or .delete_instance() on the returned database object. + """ + try: + found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) + except model.InvalidBlobUpload: + raise BlobUploadUnknown() + + start_offset, length = 0, -1 + range_header = request.headers.get('range', None) + + if range_required and range_header is None: + _range_not_satisfiable(found.byte_count) + + if range_header is not None: + try: + start_offset, length = _parse_range_header(range_header, found.byte_count) + except _InvalidRangeHeader: + _range_not_satisfiable(found.byte_count) + + input_fp = wrap_with_hash(get_input_stream(request), found.sha_state) + + try: + storage.stream_upload_chunk({found.location.name}, upload_uuid, start_offset, length, input_fp) + except InvalidChunkException: + _range_not_satisfiable(found.byte_count) + + found.byte_count += length + return found + + +def _finish_upload(namespace, repo_name, upload_obj, expected_digest): + computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state) + if not digest_tools.digests_equal(computed_digest, expected_digest): + raise BlobUploadInvalid() + + final_blob_location = digest_tools.content_path(expected_digest) + storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location) + model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, + upload_obj.location, + app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) + upload_obj.delete_instance() + + response = make_response('', 201) + response.headers['Docker-Content-Digest'] = expected_digest + response.headers['Location'] = url_for('v2.download_blob', namespace=namespace, + repo_name=repo_name, digest=expected_digest) + return response + + +@v2_bp.route('///blobs/uploads/', methods=['PATCH']) +@process_jwt_auth +@require_repo_write +@anon_protect +def upload_chunk(namespace, repo_name, upload_uuid): + upload = _upload_chunk(namespace, repo_name, upload_uuid, range_required=True) + upload.save() + accepted = make_response('', 202) - accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace, - repo_name=repo_name, upload_uuid=new_upload_uuid) - accepted.headers['Range'] = 'bytes=0-0' - accepted.headers['Docker-Upload-UUID'] = new_upload_uuid + accepted.headers['Location'] = _current_request_path() + accepted.headers['Range'] = _render_range(upload.byte_count) + accepted.headers['Docker-Upload-UUID'] = upload_uuid return accepted @@ -62,22 +254,28 @@ def start_blob_upload(namespace, repo_name): @process_jwt_auth @require_repo_write @anon_protect -def upload_chunk(namespace, repo_name, upload_uuid): +def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid): digest = request.args.get('digest', None) - upload_location = storage.preferred_locations[0] - bytes_written = storage.stream_upload_chunk(upload_location, upload_uuid, 0, -1, - get_input_stream(request)) + if digest is None: + raise BlobUploadInvalid() - if digest is not None: - final_blob_location = digest_tools.content_path(digest) - storage.complete_chunked_upload(upload_location, upload_uuid, final_blob_location, digest) - model.blob.store_blob_record_and_temp_link(namespace, repo_name, digest, upload_location, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) + found = _upload_chunk(namespace, repo_name, upload_uuid, range_required=False) + return _finish_upload(namespace, repo_name, found, digest) - response = make_response('', 201) - response.headers['Docker-Content-Digest'] = digest - response.headers['Location'] = url_for('v2.download_blob', namespace=namespace, - repo_name=repo_name, digest=digest) - return response - return make_response('', 202) +@v2_bp.route('///blobs/uploads/', methods=['DELETE']) +@process_jwt_auth +@require_repo_write +@anon_protect +def cancel_upload(namespace, repo_name, upload_uuid): + try: + found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) + except model.InvalidBlobUpload: + raise BlobUploadUnknown() + + # We delete the record for the upload first, since if the partial upload in + # storage fails to delete, it doesn't break anything + found.delete_instance() + storage.cancel_chunked_upload({found.location.name}, found.uuid) + + return make_response('', 204) diff --git a/endpoints/v2/errors.py b/endpoints/v2/errors.py new file mode 100644 index 000000000..0bee322b0 --- /dev/null +++ b/endpoints/v2/errors.py @@ -0,0 +1,118 @@ +class V2RegistryException(Exception): + def __init__(self, error_code_str, message, detail, http_status_code=400): + super(V2RegistryException, self).__init__(message) + self.http_status_code = http_status_code + + self._error_code_str = error_code_str + self._detail = detail + + def as_dict(self): + return { + 'code': self._error_code_str, + 'message': self.message, + 'detail': self._detail if self._detail is not None else {}, + } + + +class BlobUnknown(V2RegistryException): + def __init__(self, detail=None): + super(BlobUnknown, self).__init__('BLOB_UNKNOWN', + 'blob unknown to registry', + detail, + 404) + + +class BlobUploadInvalid(V2RegistryException): + def __init__(self, detail=None): + super(BlobUploadInvalid, self).__init__('BLOB_UPLOAD_INVALID', + 'blob upload invalid', + detail) + + +class BlobUploadUnknown(V2RegistryException): + def __init__(self, detail=None): + super(BlobUploadUnknown, self).__init__('BLOB_UPLOAD_UNKNOWN', + 'blob upload unknown to registry', + detail, + 404) + + +class DigestInvalid(V2RegistryException): + def __init__(self, detail=None): + super(DigestInvalid, self).__init__('DIGEST_INVALID', + 'provided digest did not match uploaded content', + detail) + + +class ManifestBlobUnknown(V2RegistryException): + def __init__(self, detail=None): + super(ManifestBlobUnknown, self).__init__('MANIFEST_BLOB_UNKNOWN', + 'blob unknown to registry', + detail) + + +class ManifestInvalid(V2RegistryException): + def __init__(self, detail=None): + super(ManifestInvalid, self).__init__('MANIFEST_INVALID', + 'manifest invalid', + detail) + + +class ManifestUnknown(V2RegistryException): + def __init__(self, detail=None): + super(ManifestUnknown, self).__init__('MANIFEST_UNKNOWN', + 'manifest unknown', + detail, + 404) + + +class ManifestUnverified(V2RegistryException): + def __init__(self, detail=None): + super(ManifestUnverified, self).__init__('MANIFEST_UNVERIFIED', + 'manifest failed signature verification', + detail) + + +class NameInvalid(V2RegistryException): + def __init__(self, detail=None): + super(NameInvalid, self).__init__('NAME_INVALID', + 'invalid repository name', + detail) + + +class NameUnknown(V2RegistryException): + def __init__(self, detail=None): + super(NameUnknown, self).__init__('NAME_UNKNOWN', + 'repository name not known to registry', + detail, + 404) + + +class SizeInvalid(V2RegistryException): + def __init__(self, detail=None): + super(SizeInvalid, self).__init__('SIZE_INVALID', + 'provided length did not match content length', + detail) + + +class TagInvalid(V2RegistryException): + def __init__(self, detail=None): + super(TagInvalid, self).__init__('TAG_INVALID', + 'manifest tag did not match URI', + detail) + + +class Unauthorized(V2RegistryException): + def __init__(self, detail=None): + super(Unauthorized, self).__init__('UNAUTHORIZED', + 'access to the requested resource is not authorized', + detail, + 401) + + +class Unsupported(V2RegistryException): + def __init__(self, detail=None): + super(Unsupported, self).__init__('UNSUPPORTED', + 'The operation is unsupported.', + detail, + 405) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 10868f3c9..1b8cf1434 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -5,38 +5,67 @@ import logging import re import jwt.utils import yaml +import json from flask import make_response, request +from collections import namedtuple, OrderedDict +from jwkest.jws import SIGNER_ALGS +from jwkest.jwk import RSAKey +from Crypto.PublicKey import RSA +from datetime import datetime from app import storage from auth.jwt_auth import process_jwt_auth from endpoints.decorators import anon_protect -from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream +from endpoints.v2 import v2_bp, require_repo_read, require_repo_write +from endpoints.v2.errors import (ManifestBlobUnknown, ManifestInvalid, ManifestUnverified, + ManifestUnknown, TagInvalid, NameInvalid) from digest import digest_tools +from data import model logger = logging.getLogger(__name__) VALID_TAG_PATTERN = r'[\w][\w.-]{0,127}' -VALID_TAG_REGEX = re.compile(VALID_TAG_PATTERN) + +BASE_MANIFEST_ROUTE = '///manifests/' +MANIFEST_DIGEST_ROUTE = BASE_MANIFEST_ROUTE.format(digest_tools.DIGEST_PATTERN) +MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN) + + +ISO_DATETIME_FORMAT_ZULU = '%Y-%m-%dT%H:%M:%SZ' +JWS_ALGORITHM = 'RS256' + + +ImageMetadata = namedtuple('ImageMetadata', ['digest', 'v1_metadata', 'v1_metadata_str']) +ExtractedV1Metadata = namedtuple('ExtractedV1Metadata', ['docker_id', 'parent', 'created', + 'comment', 'command']) + + +_SIGNATURES_KEY = 'signatures' +_PROTECTED_KEY = 'protected' +_FORMAT_LENGTH_KEY = 'formatLength' +_FORMAT_TAIL_KEY = 'formatTail' +_REPO_NAME_KEY = 'name' +_REPO_TAG_KEY = 'tag' +_FS_LAYERS_KEY = 'fsLayers' +_HISTORY_KEY = 'history' +_BLOB_SUM_KEY = 'blobSum' +_V1_COMPAT_KEY = 'v1Compatibility' +_ARCH_KEY = 'architecture' +_SCHEMA_VER = 'schemaVersion' class SignedManifest(object): - SIGNATURES_KEY = 'signatures' - PROTECTED_KEY = 'protected' - FORMAT_LENGTH_KEY = 'formatLength' - FORMAT_TAIL_KEY = 'formatTail' - REPO_NAME_KEY = 'name' - REPO_TAG_KEY = 'tag' def __init__(self, manifest_bytes): self._bytes = manifest_bytes - parsed = yaml.safe_load(manifest_bytes) + self._parsed = yaml.safe_load(manifest_bytes) - self._signatures = parsed[self.SIGNATURES_KEY] - self._namespace, self._repo_name = parsed[self.REPO_NAME_KEY].split('/') - self._tag = parsed[self.REPO_TAG_KEY] + self._signatures = self._parsed[_SIGNATURES_KEY] + self._namespace, self._repo_name = self._parsed[_REPO_NAME_KEY].split('/') + self._tag = self._parsed[_REPO_TAG_KEY] self._validate() @@ -59,36 +88,195 @@ class SignedManifest(object): def tag(self): return self._tag + @property + def bytes(self): + return self._bytes + + @property + def digest(self): + return digest_tools.sha256_digest(self.payload) + + @property + def layers(self): + """ Returns a generator of objects that have the blobSum and v1Compatibility keys in them, + starting from the root image and working toward the leaf node. + """ + for blob_sum_obj, history_obj in reversed(zip(self._parsed[_FS_LAYERS_KEY], + self._parsed[_HISTORY_KEY])): + image_digest = digest_tools.Digest.parse_digest(blob_sum_obj[_BLOB_SUM_KEY]) + metadata_string = history_obj[_V1_COMPAT_KEY] + v1_metadata = yaml.safe_load(metadata_string) + + command_list = v1_metadata.get('container_config', {}).get('Cmd', None) + command = json.dumps(command_list) if command_list else None + + extracted = ExtractedV1Metadata(v1_metadata['id'], v1_metadata.get('parent'), + v1_metadata.get('created'), v1_metadata.get('comment'), + command) + yield ImageMetadata(image_digest, extracted, metadata_string) + @property def payload(self): - protected = self._signatures[0][self.PROTECTED_KEY] + protected = self._signatures[0][_PROTECTED_KEY] parsed_protected = yaml.safe_load(jwt.utils.base64url_decode(protected)) logger.debug('parsed_protected: %s', parsed_protected) - signed_content_head = self._bytes[:parsed_protected[self.FORMAT_LENGTH_KEY]] + signed_content_head = self._bytes[:parsed_protected[_FORMAT_LENGTH_KEY]] logger.debug('signed content head: %s', signed_content_head) - signed_content_tail = jwt.utils.base64url_decode(parsed_protected[self.FORMAT_TAIL_KEY]) + signed_content_tail = jwt.utils.base64url_decode(parsed_protected[_FORMAT_TAIL_KEY]) logger.debug('signed content tail: %s', signed_content_tail) return signed_content_head + signed_content_tail -@v2_bp.route('///manifests/', - methods=['GET']) +class SignedManifestBuilder(object): + """ Class which represents a manifest which is currently being built. + """ + def __init__(self, namespace, repo_name, tag, architecture='amd64', schema_ver=1): + self._base_payload = { + _REPO_TAG_KEY: tag, + _REPO_NAME_KEY: '{0}/{1}'.format(namespace, repo_name), + _ARCH_KEY: architecture, + _SCHEMA_VER: schema_ver, + } + + self._fs_layer_digests = [] + self._history = [] + + def add_layer(self, layer_digest, v1_json_metadata): + self._fs_layer_digests.append({ + _BLOB_SUM_KEY: layer_digest, + }) + self._history.append({ + _V1_COMPAT_KEY: v1_json_metadata, + }) + + def build(self, json_web_key): + """ Build the payload and sign it, returning a SignedManifest object. + """ + payload = OrderedDict(self._base_payload) + payload.update({ + _HISTORY_KEY: self._history, + _FS_LAYERS_KEY: self._fs_layer_digests, + }) + + payload_str = json.dumps(payload, indent=3) + + split_point = payload_str.rfind('\n}') + + protected_payload = { + 'formatTail': jwt.utils.base64url_encode(payload_str[split_point:]), + 'formatLength': split_point, + 'time': datetime.utcnow().strftime(ISO_DATETIME_FORMAT_ZULU), + } + protected = jwt.utils.base64url_encode(json.dumps(protected_payload)) + logger.debug('Generated protected block: %s', protected) + + bytes_to_sign = '{0}.{1}'.format(protected, jwt.utils.base64url_encode(payload_str)) + + signer = SIGNER_ALGS[JWS_ALGORITHM] + signature = jwt.utils.base64url_encode(signer.sign(bytes_to_sign, json_web_key.get_key())) + logger.debug('Generated signature: %s', signature) + + signature_block = { + 'header': { + 'jwk': json_web_key.to_dict(), + 'alg': JWS_ALGORITHM, + }, + 'signature': signature, + _PROTECTED_KEY: protected, + } + + logger.debug('Encoded signature block: %s', json.dumps(signature_block)) + + payload.update({ + _SIGNATURES_KEY: [signature_block], + }) + + return SignedManifest(json.dumps(payload, indent=3)) + + +@v2_bp.route(MANIFEST_TAGNAME_ROUTE, methods=['GET']) @process_jwt_auth @require_repo_read @anon_protect -def fetch_manifest_by_tagname(namespace, repo_name, tag_name): - logger.debug('Fetching tag manifest with name: %s', tag_name) - return make_response('Manifest {0}'.format(tag_name)) +def fetch_manifest_by_tagname(namespace, repo_name, manifest_ref): + try: + manifest = model.tag.load_tag_manifest(namespace, repo_name, manifest_ref) + except model.InvalidManifestException: + try: + manifest = _generate_and_store_manifest(namespace, repo_name, manifest_ref) + except model.DataModelException: + logger.exception('Exception when generating manifest for %s/%s:%s', namespace, repo_name, + manifest_ref) + raise ManifestUnknown() + + return make_response(manifest.json_data, 200) -@v2_bp.route('///manifests/', - methods=['PUT']) +@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['GET']) +@process_jwt_auth +@require_repo_read +@anon_protect +def fetch_manifest_by_digest(namespace, repo_name, manifest_ref): + try: + manifest = model.tag.load_manifest_by_digest(namespace, repo_name, manifest_ref) + except model.InvalidManifestException: + # Without a tag name to reference, we can't make an attempt to generate the manifest + raise ManifestUnknown() + + return make_response(manifest.json_data, 200) + + +@v2_bp.route(MANIFEST_TAGNAME_ROUTE, methods=['PUT']) @process_jwt_auth @require_repo_write @anon_protect -def write_manifest_by_tagname(namespace, repo_name, tag_name): +def write_manifest_by_tagname(namespace, repo_name, manifest_ref): manifest = SignedManifest(request.data) - manifest_digest = digest_tools.sha256_digest(manifest.payload) + if manifest.tag != manifest_ref: + raise TagInvalid() + + return _write_manifest(namespace, repo_name, manifest) + + +@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['PUT']) +@process_jwt_auth +@require_repo_write +@anon_protect +def write_manifest_by_digest(namespace, repo_name, manifest_ref): + manifest = SignedManifest(request.data) + if manifest.digest != manifest_ref: + raise ManifestInvalid() + + return _write_manifest(namespace, repo_name, manifest) + + +def _write_manifest(namespace, repo_name, manifest): + if manifest.namespace != namespace or manifest.repo_name != repo_name: + raise NameInvalid() + + manifest_digest = manifest.digest + tag_name = manifest.tag + + leaf_layer = None + try: + for mdata in manifest.layers: + # Store the v1 metadata in the db + v1_mdata = mdata.v1_metadata + digest_str = str(mdata.digest) + model.image.synthesize_v1_image(namespace, repo_name, digest_str, v1_mdata.docker_id, + v1_mdata.created, v1_mdata.comment, v1_mdata.command, + mdata.v1_metadata_str, v1_mdata.parent) + leaf_layer = mdata + + except model.InvalidImageException: + raise ManifestBlobUnknown(detail={'missing': digest_str}) + + if leaf_layer is None: + # The manifest doesn't actually reference any layers! + raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'}) + + model.tag.store_tag_manifest(namespace, repo_name, tag_name, leaf_layer.v1_metadata.docker_id, + manifest_digest, request.data) response = make_response('OK', 202) response.headers['Docker-Content-Digest'] = manifest_digest @@ -96,15 +284,61 @@ def write_manifest_by_tagname(namespace, repo_name, tag_name): return response -# @v2_bp.route('///manifests/', -# methods=['PUT']) -# @process_jwt_auth -# @require_repo_write -# @anon_protect -# def write_manifest(namespace, repo_name, tag_digest): -# logger.debug('Writing tag manifest with name: %s', tag_digest) +@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['DELETE']) +@process_jwt_auth +@require_repo_write +@anon_protect +def delete_manifest_by_digest(namespace, repo_name, manifest_ref): + """ Delete the manifest specified by the digest. Note: there is no equivalent + method for deleting by tag name because it is forbidden by the spec. + """ + try: + manifest = model.tag.load_manifest_by_digest(namespace, repo_name, manifest_ref) + except model.InvalidManifestException: + # Without a tag name to reference, we can't make an attempt to generate the manifest + raise ManifestUnknown() -# manifest_path = digest_tools.content_path(tag_digest) -# storage.stream_write('local_us', manifest_path, get_input_stream(request)) + manifest.delete_instance() -# return make_response('Manifest {0}'.format(tag_digest)) + return make_response('', 202) + + +def _generate_and_store_manifest(namespace, repo_name, tag_name): + # First look up the tag object and its ancestors + image = model.tag.get_tag_image(namespace, repo_name, tag_name) + parents = model.image.get_parent_images(namespace, repo_name, image) + + # Create and populate the manifest builder + builder = SignedManifestBuilder(namespace, repo_name, tag_name) + + # Add the leaf layer + builder.add_layer(image.storage.checksum, __get_and_backfill_image_metadata(image)) + + for parent in parents: + builder.add_layer(parent.storage.checksum, __get_and_backfill_image_metadata(parent)) + + # TODO, stop generating a new key every time we sign a manifest, publish our key + new_key = RSA.generate(2048) + jwk = RSAKey(key=new_key) + + manifest = builder.build(jwk) + + manifest_row = model.tag.associate_generated_tag_manifest(namespace, repo_name, tag_name, + manifest.digest, manifest.bytes) + + return manifest_row + + +def __get_and_backfill_image_metadata(image): + image_metadata = image.v1_json_metadata + if image_metadata is None: + logger.warning('Loading metadata from storage for image id: %s', image.id) + + metadata_path = storage.image_json_path(image.storage.uuid) + image_metadata = storage.get_content(image.storage.locations, metadata_path) + image.v1_json_metadata = image_metadata + + logger.info('Saving backfilled metadata for image id: %s', image.id) + image.save() + + return image_metadata diff --git a/endpoints/v2/tag.py b/endpoints/v2/tag.py new file mode 100644 index 000000000..7a4c949ad --- /dev/null +++ b/endpoints/v2/tag.py @@ -0,0 +1,19 @@ +# XXX This code is not yet ready to be run in production, and should remain disabled until such +# XXX time as this notice is removed. + +from flask import jsonify + +from endpoints.v2 import v2_bp, require_repo_read +from auth.jwt_auth import process_jwt_auth +from endpoints.decorators import anon_protect +from data import model + +@v2_bp.route('///tags/list', methods=['GET']) +@process_jwt_auth +@require_repo_read +@anon_protect +def list_all_tags(namespace, repo_name): + return jsonify({ + 'name': '{0}/{1}'.format(namespace, repo_name), + 'tags': [tag.name for tag in model.tag.list_repository_tags(namespace, repo_name)], + }) diff --git a/initdb.py b/initdb.py index 2de817bca..5c0e86f0a 100644 --- a/initdb.py +++ b/initdb.py @@ -4,6 +4,7 @@ import hashlib import random import calendar import os +import argparse from datetime import datetime, timedelta from peewee import (SqliteDatabase, create_model_tables, drop_model_tables, savepoint_sqlite, @@ -87,9 +88,16 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): creation_time = REFERENCE_DATE + timedelta(weeks=image_num) + timedelta(days=model_num) command_list = SAMPLE_CMDS[image_num % len(SAMPLE_CMDS)] command = json.dumps(command_list) if command_list else None + + v1_metadata = { + 'id': docker_image_id, + } + if parent is not None: + v1_metadata['parent'] = parent.docker_image_id + new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username, repo.name, str(creation_time), 'no comment', command, - parent) + v1_metadata, parent) compressed_size = random.randrange(1, 1024 * 1024 * 1024) model.image.set_image_size(docker_image_id, repo.namespace_user.username, repo.name, @@ -324,7 +332,7 @@ def wipe_database(): drop_model_tables(all_models, fail_silently=True) -def populate_database(): +def populate_database(minimal=False): logger.debug('Populating the DB with test data.') new_user_1 = model.user.create_user('devtable', 'password', 'jschorr@devtable.com') @@ -332,6 +340,10 @@ def populate_database(): new_user_1.stripe_id = TEST_STRIPE_ID new_user_1.save() + if minimal: + logger.debug('Skipping most db population because user requested mininal db') + return + disabled_user = model.user.create_user('disabled', 'password', 'jschorr+disabled@devtable.com') disabled_user.verified = True disabled_user.enabled = False @@ -380,7 +392,8 @@ def populate_database(): 'to_date': formatdate(calendar.timegm(to_date.utctimetuple())), 'reason': 'database migration' } - model.notification.create_notification('maintenance', new_user_1, metadata=notification_metadata) + model.notification.create_notification('maintenance', new_user_1, + metadata=notification_metadata) __generate_repository(new_user_4, 'randomrepo', 'Random repo repository.', False, @@ -618,7 +631,12 @@ def populate_database(): while repositoryactioncounter.count_repository_actions(): pass + if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Initialize the test database.') + parser.add_argument('--simple', action='store_true') + args = parser.parse_args() + log_level = getattr(logging, app.config['LOGGING_LEVEL']) logging.basicConfig(level=log_level) @@ -627,5 +645,4 @@ if __name__ == '__main__': initialize_database() - if app.config.get('POPULATE_DB_TEST_DATA', False): - populate_database() + populate_database(args.simple) diff --git a/registry.py b/registry.py index 5d8dab3df..1582a2879 100644 --- a/registry.py +++ b/registry.py @@ -7,7 +7,7 @@ from app import app as application import endpoints.decorated from endpoints.v1 import v1_bp -# from endpoints.v2 import v2_bp +from endpoints.v2 import v2_bp application.register_blueprint(v1_bp, url_prefix='/v1') -# application.register_blueprint(v2_bp, url_prefix='/v2') +application.register_blueprint(v2_bp, url_prefix='/v2') diff --git a/requirements-nover.txt b/requirements-nover.txt index 6fe50fc80..de0dd0763 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -36,9 +36,10 @@ git+https://github.com/DevTable/aniso8601-fake.git git+https://github.com/DevTable/anunidecode.git git+https://github.com/DevTable/pygithub.git git+https://github.com/DevTable/container-cloud-config.git +git+https://github.com/coreos/mockldap.git git+https://github.com/coreos/py-bitbucket.git git+https://github.com/coreos/pyapi-gitlab.git -git+https://github.com/coreos/mockldap.git +git+https://github.com/coreos/resumablehashlib.git git+https://github.com/DevTable/python-etcd.git@sslfix gipc pyOpenSSL diff --git a/requirements.txt b/requirements.txt index 1061d96e9..0df1eaa41 100644 --- a/requirements.txt +++ b/requirements.txt @@ -93,8 +93,9 @@ git+https://github.com/DevTable/aniso8601-fake.git git+https://github.com/DevTable/anunidecode.git git+https://github.com/DevTable/pygithub.git git+https://github.com/DevTable/container-cloud-config.git +git+https://github.com/coreos/mockldap.git git+https://github.com/coreos/py-bitbucket.git git+https://github.com/coreos/pyapi-gitlab.git -git+https://github.com/coreos/mockldap.git +git+https://github.com/coreos/resumablehashlib.git git+https://github.com/DevTable/python-etcd.git@sslfix git+https://github.com/NateFerrero/oauth2lib.git diff --git a/storage/basestorage.py b/storage/basestorage.py index 756734241..55035901c 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -1,5 +1,7 @@ import tempfile +from digest.digest_tools import content_path + class StoragePaths(object): shared_images = 'sharedimages' @@ -23,13 +25,12 @@ class StoragePaths(object): base_path = self.image_path(storage_uuid) return '{0}json'.format(base_path) - def image_layer_path(self, storage_uuid): + def v1_image_layer_path(self, storage_uuid): base_path = self.image_path(storage_uuid) return '{0}layer'.format(base_path) - def image_ancestry_path(self, storage_uuid): - base_path = self.image_path(storage_uuid) - return '{0}ancestry'.format(base_path) + def blob_path(self, digest_str): + return content_path(digest_str) def image_file_trie_path(self, storage_uuid): base_path = self.image_path(storage_uuid) @@ -99,26 +100,30 @@ class BaseStorage(StoragePaths): raise NotImplementedError -class DigestInvalidException(RuntimeError): +class InvalidChunkException(RuntimeError): pass class BaseStorageV2(BaseStorage): - def initiate_chunked_upload(self): - """ Start a new chunked upload, and return a handle with which the upload can be referenced. + def initiate_chunked_upload(self, upload_uuid): + """ Start a new chunked upload """ raise NotImplementedError - def stream_upload_chunk(self, uuid, offset, length, in_fp): + def stream_upload_chunk(self, uuid, offset, length, in_fp, hash_obj): """ Upload the specified amount of data from the given file pointer to the chunked destination - specified, starting at the given offset. Returns the number of bytes written. + specified, starting at the given offset. Raises InvalidChunkException if the offset or + length can not be accepted. """ raise NotImplementedError - def complete_chunked_upload(self, uuid, final_path, digest_to_verify): + def complete_chunked_upload(self, uuid, final_path): """ Complete the chunked upload and store the final results in the path indicated. """ raise NotImplementedError - + def cancel_chunked_upload(self, uuid): + """ Cancel the chunked upload and clean up any outstanding partially uploaded data. + """ + raise NotImplementedError diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 26a0f5dbd..860f1ab45 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -45,3 +45,4 @@ class DistributedStorage(StoragePaths): initiate_chunked_upload = _location_aware(BaseStorageV2.initiate_chunked_upload) stream_upload_chunk = _location_aware(BaseStorageV2.stream_upload_chunk) complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload) + cancel_chunked_upload = _location_aware(BaseStorageV2.cancel_chunked_upload) diff --git a/storage/local.py b/storage/local.py index fc2b79563..6e18004e0 100644 --- a/storage/local.py +++ b/storage/local.py @@ -8,7 +8,6 @@ import psutil from uuid import uuid4 from storage.basestorage import BaseStorageV2 -from digest import digest_tools logger = logging.getLogger(__name__) @@ -64,8 +63,9 @@ class LocalStorage(BaseStorageV2): bytes_copied = 0 bytes_remaining = num_bytes while bytes_remaining > 0 or num_bytes < 0: + size_to_read = min(bytes_remaining, self.buffer_size) try: - buf = in_fp.read(self.buffer_size) + buf = in_fp.read(size_to_read) if not buf: break out_fp.write(buf) @@ -112,11 +112,9 @@ class LocalStorage(BaseStorageV2): sha_hash.update(buf) return sha_hash.hexdigest()[:7] - def _rel_upload_path(self, uuid): return 'uploads/{0}'.format(uuid) - def initiate_chunked_upload(self): new_uuid = str(uuid4()) @@ -131,14 +129,8 @@ class LocalStorage(BaseStorageV2): upload_storage.seek(offset) return self._stream_write_to_fp(in_fp, upload_storage, length) - def complete_chunked_upload(self, uuid, final_path, digest_to_verify): + def complete_chunked_upload(self, uuid, final_path): content_path = self._rel_upload_path(uuid) - content_digest = digest_tools.sha256_digest_from_generator(self.stream_read(content_path)) - - if not digest_tools.digests_equal(content_digest, digest_to_verify): - msg = 'Given: {0} Computed: {1}'.format(digest_to_verify, content_digest) - raise digest_tools.InvalidDigestException(msg) - final_path_abs = self._init_path(final_path, create=True) if not self.exists(final_path_abs): logger.debug('Moving content into place at path: %s', final_path_abs) @@ -146,6 +138,10 @@ class LocalStorage(BaseStorageV2): else: logger.debug('Content already exists at path: %s', final_path_abs) + def cancel_chunked_upload(self, uuid): + content_path = self._init_path(self._rel_upload_path(uuid)) + os.remove(content_path) + def validate(self): # Load the set of disk mounts. try: diff --git a/test/data/test.db b/test/data/test.db index cd69ffaa0..e0423a2c8 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/test/test_digest_tools.py b/test/test_digest_tools.py index 954c01052..2ba2c6aec 100644 --- a/test/test_digest_tools.py +++ b/test/test_digest_tools.py @@ -1,20 +1,23 @@ import unittest -from digest.digest_tools import parse_digest, content_path, InvalidDigestException +from digest.digest_tools import Digest, content_path, InvalidDigestException class TestParseDigest(unittest.TestCase): def test_parse_good(self): examples = [ - ('tarsum.v123123+sha1:123deadbeef', (True, 'v123123', 'sha1', '123deadbeef')), - ('tarsum.v1+sha256:123123', (True, 'v1', 'sha256', '123123')), - ('tarsum.v0+md5:abc', (True, 'v0', 'md5', 'abc')), - ('sha1:123deadbeef', (False, None, 'sha1', '123deadbeef')), - ('sha256:123123', (False, None, 'sha256', '123123')), - ('md5:abc', (False, None, 'md5', 'abc')), + ('tarsum.v123123+sha1:123deadbeef', ('sha1', '123deadbeef', True, 'v123123')), + ('tarsum.v1+sha256:123123', ('sha256', '123123', True, 'v1')), + ('tarsum.v0+md5:abc', ('md5', 'abc', True, 'v0')), + ('sha1:123deadbeef', ('sha1', '123deadbeef', False, None)), + ('sha256:123123', ('sha256', '123123', False, None)), + ('md5:abc', ('md5', 'abc', False, None)), ] - for digest, output in examples: - self.assertEquals(parse_digest(digest), output) + for digest, output_args in examples: + self.assertEquals(Digest.parse_digest(digest), Digest(*output_args)) + + # Test the string method + self.assertEquals(str(Digest.parse_digest(digest)), digest) def test_parse_fail(self): examples = [ @@ -29,7 +32,7 @@ class TestParseDigest(unittest.TestCase): for bad_digest in examples: with self.assertRaises(InvalidDigestException): - parse_digest(bad_digest) + Digest.parse_digest(bad_digest) class TestDigestPath(unittest.TestCase): diff --git a/tools/auditancestry.py b/tools/auditancestry.py deleted file mode 100644 index 7464e7591..000000000 --- a/tools/auditancestry.py +++ /dev/null @@ -1,104 +0,0 @@ -import logging -import json - -from data.database import Image, ImageStorage, Repository, User, configure -from data import model -from app import app, storage as store - - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) - -configure(app.config) - -# Turn off debug logging for boto -logging.getLogger('boto').setLevel(logging.CRITICAL) - - -query = (Image - .select(Image, ImageStorage, Repository, User) - .join(ImageStorage) - .switch(Image) - .join(Repository) - .join(User) - .where(ImageStorage.uploading == False)) - -bad_count = 0 -good_count = 0 - -def resolve_or_create(repo, docker_image_id, new_ancestry): - existing = model.image.get_repo_image_extended(repo.namespace_user.username, repo.name, - docker_image_id) - if existing: - logger.debug('Found existing image: %s, %s', existing.id, docker_image_id) - return existing - else: - # we need to find some storage to link it to - try: - to_link = (ImageStorage - .select() - .join(Image) - .where(Image.docker_image_id == docker_image_id) - .get()) - logger.debug('Linking to storage: %s' % to_link.uuid) - created = Image.create(docker_image_id=docker_image_id, repository=repo, - storage=to_link, ancestors=new_ancestry) - logger.debug('Created image: %s' % created) - return created - except ImageStorage.DoesNotExist: - msg = 'No image available anywhere for storage: %s in namespace: %s' - logger.error(msg, docker_image_id, repo.namespace_user.username) - raise RuntimeError() - - -def all_ancestors_exist(ancestors): - if not ancestors: - return True - - found_count = len(list(Image - .select() - .where(Image.id << ancestors))) - return found_count == len(ancestors) - - -cant_fix = [] -for img in query: - try: - with_locations = model.image.get_repo_image_extended(img.repository.namespace_user.username, - img.repository.name, img.docker_image_id) - ancestry_storage = store.image_ancestry_path(img.storage.uuid) - if store.exists(with_locations.storage.locations, ancestry_storage): - full_ancestry = json.loads(store.get_content(with_locations.storage.locations, - ancestry_storage))[1:] - full_ancestry.reverse() - - ancestor_dbids = [int(anc_id) for anc_id in img.ancestors.split('/')[1:-1]] - - if len(full_ancestry) != len(ancestor_dbids) or not all_ancestors_exist(ancestor_dbids): - logger.error('Image has incomplete ancestry: %s, %s, %s, %s', img.id, img.docker_image_id, - full_ancestry, ancestor_dbids) - - fixed_ancestry = '/' - for ancestor in full_ancestry: - ancestor_img = resolve_or_create(img.repository, ancestor, - fixed_ancestry) - fixed_ancestry += str(ancestor_img.id) + '/' - - img.ancestors = fixed_ancestry - img.save() - - bad_count += 1 - else: - good_count += 1 - else: - bad_count += 1 - - except RuntimeError: - cant_fix.append(img) - - logger.debug('Bad: %s Good: %s Can\'t Fix: %s', bad_count, good_count, - len(cant_fix)) - -for cant in cant_fix: - logger.error('Unable to fix %s in repo %s/%s', cant.id, cant.repository.namespace_user.username, - cant.repository.name) diff --git a/tools/migrateimage.py b/tools/migrateimage.py deleted file mode 100644 index 950f171a4..000000000 --- a/tools/migrateimage.py +++ /dev/null @@ -1,67 +0,0 @@ -import argparse -import logging - -from data import model -from data.database import ImageStoragePlacement, ImageStorageLocation -from app import storage - - -logger = logging.getLogger(__name__) - - -PATHSPECS = [ - (storage.image_json_path, True), - (storage.image_layer_path, True), - (storage.image_ancestry_path, True), - (storage.image_file_trie_path, False), - (storage.image_file_diffs_path, False), -] - - -def migrate_image(image, destination_location): - logger.debug('Migrating image: %s -> %s', image.docker_image_id, destination_location.name) - destination_location_set = {destination_location.name} - - for path_func, required in PATHSPECS: - path = path_func(image.storage.uuid) - - if storage.exists(image.storage.locations, path): - if not storage.exists(destination_location_set, path): - logger.debug('Migrating path: %s', path) - - with storage.stream_read_file(image.storage.locations, path) as file_to_migrate: - storage.stream_write(destination_location_set, path, file_to_migrate) - else: - logger.debug('File already present in destination: %s', path) - elif required: - raise RuntimeError('Required file not present in image to migrate: %s', path) - - # Successfully migrated, now write the placement - ImageStoragePlacement.create(location=destination_location, storage=image.storage) - -parser = argparse.ArgumentParser(description='Replicate an image storage.') -parser.add_argument('--namespace', type=str, required=True, - help='Namespace for the repository containing the image to be replicated') -parser.add_argument('--repository', type=str, required=True, - help='Name for the repository containing the image to be replicated') -parser.add_argument('--imageid', type=str, default=None, - help='Specific image to migrate, entire repo will be migrated if omitted') -parser.add_argument('--to', type=str, required=True, - help='Storage region to which the data should be replicated') - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - logging.getLogger('boto').setLevel(logging.CRITICAL) - - args = parser.parse_args() - - location = ImageStorageLocation.get(name=args.to) - - images = [] - if args.imageid is not None: - images = [model.image.get_image_by_id(args.namespace, args.repository, args.imageid)] - else: - images = model.image.get_repository_images(args.namespace, args.repository) - - for img in images: - migrate_image(img, location) diff --git a/util/registry/filelike.py b/util/registry/filelike.py new file mode 100644 index 000000000..758c5889f --- /dev/null +++ b/util/registry/filelike.py @@ -0,0 +1,24 @@ +class SocketReader(object): + def __init__(self, fp): + self._fp = fp + self.handlers = [] + + def add_handler(self, handler): + self.handlers.append(handler) + + def read(self, n=-1): + buf = self._fp.read(n) + if not buf: + return '' + for handler in self.handlers: + handler(buf) + return buf + + def tell(self): + raise IOError('Stream is not seekable.') + + +def wrap_with_hash(in_fp, hash_obj): + wrapper = SocketReader(in_fp) + wrapper.add_handler(hash_obj.update) + return wrapper