From bea8b9ac53f1a9c3a9e067731c6dbb29df74468a Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 6 Jul 2015 15:00:07 -0400 Subject: [PATCH] More changes for registry-v2 in python. Implement the minimal changes to the local filesystem storage driver and feed them through the distributed storage driver. Create a digest package which contains digest_tools and checksums. Fix the tests to use the new v1 endpoint locations. Fix repository.delete_instance to properly filter the generated queries to avoid most subquery deletes, but still generate them when not explicitly filtered. --- auth/jwt_auth.py | 2 +- auth/permissions.py | 4 +- data/database.py | 55 ++++++++++---- data/model/__init__.py | 15 +++- data/model/blob.py | 22 ++++++ data/model/legacy.py | 45 +++++------- digest/__init__.py | 0 {util => digest}/checksums.py | 0 {endpoints/v2 => digest}/digest_tools.py | 18 ++++- endpoints/v1/registry.py | 3 +- endpoints/v2/__init__.py | 2 +- endpoints/v2/blob.py | 77 ++++++++++++++++++++ endpoints/v2/blobs.py | 20 ------ endpoints/v2/manifest.py | 86 ++++++++++++++++------ endpoints/v2/registry.py | 12 ---- storage/basestorage.py | 25 +++++++ storage/distributedstorage.py | 5 +- storage/local.py | 67 ++++++++++++++--- test/registry_tests.py | 8 +-- test/specs.py | 92 ++++++++++++------------ test/test_anon_checked.py | 8 +-- test/test_digest_tools.py | 2 +- test/test_endpoint_security.py | 8 +-- 23 files changed, 397 insertions(+), 179 deletions(-) create mode 100644 data/model/blob.py create mode 100644 digest/__init__.py rename {util => digest}/checksums.py (100%) rename {endpoints/v2 => digest}/digest_tools.py (66%) create mode 100644 endpoints/v2/blob.py delete mode 100644 endpoints/v2/blobs.py delete mode 100644 endpoints/v2/registry.py diff --git a/auth/jwt_auth.py b/auth/jwt_auth.py index e8b568691..5072bdffa 100644 --- a/auth/jwt_auth.py +++ b/auth/jwt_auth.py @@ -61,7 +61,7 @@ def process_jwt_auth(func): return (None, 'Invalid username or password') username = payload['sub'] - loaded_identity = Identity(username, 'signed_grant') + loaded_identity = Identity(username, 'signed_jwt') # Process the grants from the payload if 'access' in payload: diff --git a/auth/permissions.py b/auth/permissions.py index 91a8176d0..100475cac 100644 --- a/auth/permissions.py +++ b/auth/permissions.py @@ -269,8 +269,8 @@ def on_identity_loaded(sender, identity): logger.debug('Delegate token added permission: %s', repo_grant) identity.provides.add(repo_grant) - elif identity.auth_type == 'signed_grant': - logger.debug('Loaded signed grants identity') + elif identity.auth_type == 'signed_grant' or identity.auth_type == 'signed_jwt': + logger.debug('Loaded %s identity for: %s', identity.auth_type, identity.id) else: logger.error('Unknown identity auth type: %s', identity.auth_type) diff --git a/data/database.py b/data/database.py index 069081f68..7b1183128 100644 --- a/data/database.py +++ b/data/database.py @@ -2,12 +2,14 @@ import string import logging import uuid import time +import toposort from random import SystemRandom from datetime import datetime from peewee import * from data.read_slave import ReadSlaveModel from sqlalchemy.engine.url import make_url +from collections import defaultdict from data.read_slave import ReadSlaveModel from util.names import urn_generator @@ -297,23 +299,46 @@ class Repository(BaseModel): ) def delete_instance(self, recursive=False, delete_nullable=False): - # Note: peewee generates extra nested deletion statements here that are slow and unnecessary. - # Therefore, we define our own deletion order here and use the dependency system to verify it. - ordered_dependencies = [RepositoryAuthorizedEmail, RepositoryTag, Image, LogEntry, - RepositoryBuild, RepositoryBuildTrigger, RepositoryNotification, - RepositoryPermission, AccessToken, Star, RepositoryActionCount] + if not recursive: + raise RuntimeError('Non-recursive delete on repository.') - for query, fk in self.dependencies(search_nullable=True): + # These models don't need to use transitive deletes, because the referenced objects + # are cleaned up directly + skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger} + + # We need to sort the ops so that models get cleaned in order of their dependencies + ops = reversed(list(self.dependencies(delete_nullable))) + filtered_ops = [] + + dependencies = defaultdict(set) + + for query, fk in ops: + if fk.model_class not in skip_transitive_deletes or query.op != 'in': + filtered_ops.append((query, fk)) + + if query.op == 'in': + dependencies[fk.model_class.__name__].add(query.rhs.model_class.__name__) + elif query.op == '=': + dependencies[fk.model_class.__name__].add(Repository.__name__) + else: + raise RuntimeError('Unknown operator in recursive repository delete query') + + sorted_models = list(reversed(toposort.toposort_flatten(dependencies))) + def sorted_model_key(query_fk_tuple): + cmp_query, cmp_fk = query_fk_tuple + if cmp_query.op == 'in': + return -1 + return sorted_models.index(cmp_fk.model_class.__name__) + filtered_ops.sort(key=sorted_model_key) + + for query, fk in filtered_ops: model = fk.model_class - if not model in ordered_dependencies: - raise Exception('Missing repository deletion dependency: %s', model) - - for model in ordered_dependencies: - model.delete().where(model.repository == self).execute() - - # Delete the repository itself. - super(Repository, self).delete_instance(recursive=False, delete_nullable=False) + if fk.null and not delete_nullable: + model.update(**{fk.name: None}).where(query).execute() + else: + model.delete().where(query).execute() + return self.delete().where(self._pk_expr()).execute() class Star(BaseModel): user = ForeignKeyField(User, index=True) @@ -679,4 +704,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification, RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, - AccessTokenKind, Star, RepositoryActionCount] + AccessTokenKind, Star, RepositoryActionCount, TagManifest] diff --git a/data/model/__init__.py b/data/model/__init__.py index f7c9ba24b..69aeb43e8 100644 --- a/data/model/__init__.py +++ b/data/model/__init__.py @@ -1 +1,14 @@ -from data.model.legacy import * \ No newline at end of file +class DataModelException(Exception): + pass + + +class Config(object): + def __init__(self): + self.app_config = None + self.store = None + + +config = Config() + + +from data.model.legacy import * diff --git a/data/model/blob.py b/data/model/blob.py new file mode 100644 index 000000000..f1d110a45 --- /dev/null +++ b/data/model/blob.py @@ -0,0 +1,22 @@ +from data.model import config, DataModelException + +from data.database import ImageStorage, Image, ImageStorageLocation, ImageStoragePlacement + + +class BlobDoesNotExist(DataModelException): + pass + + +def get_blob_by_digest(blob_digest): + try: + return ImageStorage.get(checksum=blob_digest) + except ImageStorage.DoesNotExist: + raise BlobDoesNotExist('Blob does not exist with digest: {0}'.format(blob_digest)) + + +def store_blob_record(blob_digest, location_name): + storage = ImageStorage.create(checksum=blob_digest) + location = ImageStorageLocation.get(name=location_name) + ImageStoragePlacement.create(location=location, storage=storage) + storage.locations = {location_name} + return storage diff --git a/data/model/legacy.py b/data/model/legacy.py index 947dc188c..5f96c761c 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -19,6 +19,7 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor db, BUILD_PHASE, QuayUserField, ImageStorageSignature, QueueItem, ImageStorageSignatureKind, validate_database_url, db_for_update, AccessTokenKind, Star, get_epoch_timestamp, RepositoryActionCount) +from data.model import config as model_config, DataModelException from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError from util.validation import (validate_username, validate_email, validate_password, INVALID_PASSWORD_MESSAGE) @@ -36,18 +37,6 @@ Namespace = User.alias() logger = logging.getLogger(__name__) -class Config(object): - def __init__(self): - self.app_config = None - self.store = None - -config = Config() - - -class DataModelException(Exception): - pass - - class InvalidEmailAddressException(DataModelException): pass @@ -1211,7 +1200,7 @@ def change_username(user_id, new_username): if not username_valid: raise InvalidUsernameException('Invalid username %s: %s' % (new_username, username_issue)) - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Reload the user for update user = db_for_update(User.select().where(User.id == user_id)).get() @@ -1587,7 +1576,7 @@ def _create_storage(location_name): def _find_or_link_image(existing_image, repository, username, translations, preferred_location): # TODO(jake): This call is currently recursively done under a single transaction. Can we make # it instead be done under a set of transactions? - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Check for an existing image, under the transaction, to make sure it doesn't already exist. repo_image = get_repo_image(repository.namespace_user.username, repository.name, existing_image.docker_image_id) @@ -1659,7 +1648,7 @@ def find_create_or_link_image(docker_image_id, repository, username, translation pass # Otherwise, create a new storage directly. - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Final check for an existing image, under the transaction. repo_image = get_repo_image(repository.namespace_user.username, repository.name, docker_image_id) @@ -1796,7 +1785,7 @@ def set_image_size(docker_image_id, namespace_name, repository_name, image_size, def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, command, parent=None): - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): query = (Image .select(Image, ImageStorage) .join(Repository) @@ -1980,7 +1969,7 @@ def garbage_collect_repository(namespace_name, repository_name): _garbage_collect_tags(namespace_name, repository_name) - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # TODO (jake): We could probably select this and all the images in a single query using # a different kind of join. @@ -2021,7 +2010,7 @@ def _garbage_collect_storage(storage_id_whitelist): return def placements_query_to_paths_set(placements_query): - return {(placement.location.name, config.store.image_path(placement.storage.uuid)) + return {(placement.location.name, model_config.store.image_path(placement.storage.uuid)) for placement in placements_query} def orphaned_storage_query(select_base_query, candidates, group_by): @@ -2040,7 +2029,7 @@ def _garbage_collect_storage(storage_id_whitelist): # image storage being deleted for an image storage which is later reused during this time, # but since these are caches anyway, it isn't terrible and worth the tradeoff (for now). logger.debug('Garbage collecting derived storage from candidates: %s', storage_id_whitelist) - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Find out which derived storages will be removed, and add them to the whitelist # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence orphaned_from_candidates = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id), @@ -2066,7 +2055,7 @@ def _garbage_collect_storage(storage_id_whitelist): # TODO(jake): We might want to allow for null storages on placements, which would allow us to # delete the storages, then delete the placements in a non-transaction. logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Track all of the data that should be removed from blob storage placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement .select(ImageStoragePlacement, @@ -2107,7 +2096,7 @@ def _garbage_collect_storage(storage_id_whitelist): # This may end up producing garbage in s3, trading off for higher availability in the database. for location_name, image_path in paths_to_remove: logger.debug('Removing %s from %s', image_path, location_name) - config.store.remove({location_name}, image_path) + model_config.store.remove({location_name}, image_path) def get_tag_image(namespace_name, repository_name, tag_name): @@ -2158,7 +2147,7 @@ def create_or_update_tag(namespace_name, repository_name, tag_name, now_ts = get_epoch_timestamp() - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): try: tag = db_for_update(_tag_alive(RepositoryTag .select() @@ -2179,7 +2168,7 @@ def create_or_update_tag(namespace_name, repository_name, tag_name, def delete_tag(namespace_name, repository_name, tag_name): now_ts = get_epoch_timestamp() - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): try: query = _tag_alive(RepositoryTag .select(RepositoryTag, Repository) @@ -2332,7 +2321,7 @@ def purge_repository(namespace_name, repository_name): # Delete the rest of the repository metadata fetched = _get_repository(namespace_name, repository_name) - fetched.delete_instance(recursive=True, delete_nullable=True) + fetched.delete_instance(recursive=True, delete_nullable=False) def get_private_repo_count(username): @@ -2502,8 +2491,8 @@ def get_pull_credentials(robotname): return { 'username': robot.username, 'password': login_info.service_ident, - 'registry': '%s://%s/v1/' % (config.app_config['PREFERRED_URL_SCHEME'], - config.app_config['SERVER_HOSTNAME']), + 'registry': '%s://%s/v1/' % (model_config.app_config['PREFERRED_URL_SCHEME'], + model_config.app_config['SERVER_HOSTNAME']), } @@ -2649,7 +2638,7 @@ def create_notification(kind_name, target, metadata={}): def create_unique_notification(kind_name, target, metadata={}): - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): if list_notifications(target, kind_name, limit=1).count() == 0: create_notification(kind_name, target, metadata) @@ -2897,7 +2886,7 @@ def confirm_team_invite(code, user): return (team, inviter) def cancel_repository_build(build, work_queue): - with config.app_config['DB_TRANSACTION_FACTORY'](db): + with model_config.app_config['DB_TRANSACTION_FACTORY'](db): # Reload the build for update. try: build = db_for_update(RepositoryBuild.select().where(RepositoryBuild.id == build.id)).get() diff --git a/digest/__init__.py b/digest/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/util/checksums.py b/digest/checksums.py similarity index 100% rename from util/checksums.py rename to digest/checksums.py diff --git a/endpoints/v2/digest_tools.py b/digest/digest_tools.py similarity index 66% rename from endpoints/v2/digest_tools.py rename to digest/digest_tools.py index 0857286fe..a9f55c2eb 100644 --- a/endpoints/v2/digest_tools.py +++ b/digest/digest_tools.py @@ -42,5 +42,21 @@ def content_path(digest): def sha256_digest(content): """ Returns a sha256 hash of the content bytes in digest form. """ - digest = hashlib.sha256(content) + def single_chunk_generator(): + yield content + return sha256_digest_from_generator(single_chunk_generator()) + + +def sha256_digest_from_generator(content_generator): + """ Reads all of the data from the iterator and creates a sha256 digest from the content + """ + digest = hashlib.sha256() + for chunk in content_generator: + digest.update(chunk) return 'sha256:{0}'.format(digest.hexdigest()) + + +def digests_equal(lhs_digest_string, rhs_digest_string): + """ Parse and compare the two digests, returns True if the digests are equal, False otherwise. + """ + return parse_digest(lhs_digest_string) == parse_digest(rhs_digest_string) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index a425ef88e..d11bba86d 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -10,7 +10,8 @@ from time import time from app import storage as store, image_diff_queue, app from auth.auth import process_auth, extract_namespace_repo_from_session from auth.auth_context import get_authenticated_user, get_grant_user_context -from util import checksums, changes +from digest import checksums +from util import changes from util.http import abort, exact_abort from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index cfad13224..53de97ef6 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -59,4 +59,4 @@ def v2_support_enabled(): from endpoints.v2 import v2auth from endpoints.v2 import manifest -from endpoints.v2 import blobs +from endpoints.v2 import blob diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py new file mode 100644 index 000000000..4fc4c0d38 --- /dev/null +++ b/endpoints/v2/blob.py @@ -0,0 +1,77 @@ +import logging + +from flask import make_response, url_for, request + +import data.model.blob + +from app import storage +from digest import digest_tools +from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream +from auth.jwt_auth import process_jwt_auth +from endpoints.decorators import anon_protect +from util.http import abort + + +logger = logging.getLogger(__name__) + + +@v2_bp.route('///blobs/', + methods=['HEAD']) +@process_jwt_auth +@require_repo_read +@anon_protect +def check_blob_existence(namespace, repo_name, digest): + try: + found = data.model.blob.get_blob_by_digest(digest) + + # The response body must be empty for a successful HEAD request + return make_response('') + except data.model.blob.BlobDoesNotExist: + abort(404) + + +@v2_bp.route('///blobs/', + methods=['GET']) +@process_jwt_auth +@require_repo_read +@anon_protect +def download_blob(namespace, repo_name, digest): + return make_response('') + + +@v2_bp.route('///blobs/uploads/', methods=['POST']) +@process_jwt_auth +@require_repo_write +@anon_protect +def start_blob_upload(namespace, repo_name): + new_upload_uuid = storage.initiate_chunked_upload(storage.preferred_locations[0]) + accepted = make_response('', 202) + accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace, + repo_name=repo_name, upload_uuid=new_upload_uuid) + accepted.headers['Range'] = 'bytes=0-0' + accepted.headers['Docker-Upload-UUID'] = new_upload_uuid + return accepted + + +@v2_bp.route('///blobs/uploads/', methods=['PUT']) +@process_jwt_auth +@require_repo_write +@anon_protect +def upload_chunk(namespace, repo_name, upload_uuid): + digest = request.args.get('digest', None) + upload_location = storage.preferred_locations[0] + bytes_written = storage.stream_upload_chunk(upload_location, upload_uuid, 0, -1, + get_input_stream(request)) + + if digest is not None: + final_blob_location = digest_tools.content_path(digest) + storage.complete_chunked_upload(upload_location, upload_uuid, final_blob_location, digest) + data.model.blob.store_blob_record(digest, upload_location) + + response = make_response('', 201) + response.headers['Docker-Content-Digest'] = digest + response.headers['Location'] = url_for('v2.download_blob', namespace=namespace, + repo_name=repo_name, digest=digest) + return response + + return make_response('', 202) diff --git a/endpoints/v2/blobs.py b/endpoints/v2/blobs.py deleted file mode 100644 index 8ce69beaf..000000000 --- a/endpoints/v2/blobs.py +++ /dev/null @@ -1,20 +0,0 @@ -import logging - -from flask import make_response - -from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, require_repo_admin -from auth.jwt_auth import process_jwt_auth -from auth.permissions import ReadRepositoryPermission -from endpoints.decorators import anon_protect - - -logger = logging.getLogger(__name__) - - -@v2_bp.route('///blobs/', methods=['HEAD']) -@process_jwt_auth -@require_repo_read -@anon_protect -def check_blob_existence(namespace, repo_name, tarsum): - logger.debug('Fetching blob with tarsum: %s', tarsum) - return make_response('Blob {0}'.format(tarsum)) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 2d6c7b39c..a9feb3c96 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -1,16 +1,15 @@ import logging import re -import hashlib +import jwt.utils +import yaml from flask import make_response, request from app import storage from auth.jwt_auth import process_jwt_auth -from auth.permissions import ReadRepositoryPermission from endpoints.decorators import anon_protect -from endpoints.v2 import (v2_bp, require_repo_read, require_repo_write, require_repo_admin, - get_input_stream) -from endpoints.v2 import digest_tools +from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream +from digest import digest_tools logger = logging.getLogger(__name__) @@ -20,9 +19,53 @@ VALID_TAG_PATTERN = r'[\w][\w.-]{0,127}' VALID_TAG_REGEX = re.compile(VALID_TAG_PATTERN) -def is_tag_name(reference): - match = VALID_TAG_REGEX.match(reference) - return match is not None and match.end() == len(reference) +class SignedManifest(object): + SIGNATURES_KEY = 'signatures' + PROTECTED_KEY = 'protected' + FORMAT_LENGTH_KEY = 'formatLength' + FORMAT_TAIL_KEY = 'formatTail' + REPO_NAME_KEY = 'name' + REPO_TAG_KEY = 'tag' + + def __init__(self, manifest_bytes): + self._bytes = manifest_bytes + parsed = yaml.safe_load(manifest_bytes) + + self._signatures = parsed[self.SIGNATURES_KEY] + self._namespace, self._repo_name = parsed[self.REPO_NAME_KEY].split('/') + self._tag = parsed[self.REPO_TAG_KEY] + + self._validate() + + def _validate(self): + pass + + @property + def signatures(self): + return self._signatures + + @property + def namespace(self): + return self._namespace + + @property + def repo_name(self): + return self._repo_name + + @property + def tag(self): + return self._tag + + @property + def payload(self): + protected = self._signatures[0][self.PROTECTED_KEY] + parsed_protected = yaml.safe_load(jwt.utils.base64url_decode(protected)) + logger.debug('parsed_protected: %s', parsed_protected) + signed_content_head = self._bytes[:parsed_protected[self.FORMAT_LENGTH_KEY]] + logger.debug('signed content head: %s', signed_content_head) + signed_content_tail = jwt.utils.base64url_decode(parsed_protected[self.FORMAT_TAIL_KEY]) + logger.debug('signed content tail: %s', signed_content_tail) + return signed_content_head + signed_content_tail @v2_bp.route('///manifests/', @@ -41,23 +84,24 @@ def fetch_manifest_by_tagname(namespace, repo_name, tag_name): @require_repo_write @anon_protect def write_manifest_by_tagname(namespace, repo_name, tag_name): - manifest_data = request.data - logger.debug('Manifest data: %s', manifest_data) + manifest = SignedManifest(request.data) + manifest_digest = digest_tools.sha256_digest(manifest.payload) + response = make_response('OK', 202) - response.headers['Docker-Content-Digest'] = digest_tools.sha256_digest(manifest_data) + response.headers['Docker-Content-Digest'] = manifest_digest response.headers['Location'] = 'https://fun.com' return response -@v2_bp.route('///manifests/', - methods=['PUT']) -@process_jwt_auth -@require_repo_write -@anon_protect -def write_manifest(namespace, repo_name, tag_digest): - logger.debug('Writing tag manifest with name: %s', tag_digest) +# @v2_bp.route('///manifests/', +# methods=['PUT']) +# @process_jwt_auth +# @require_repo_write +# @anon_protect +# def write_manifest(namespace, repo_name, tag_digest): +# logger.debug('Writing tag manifest with name: %s', tag_digest) - manifest_path = digest_tools.content_path(tag_digest) - storage.stream_write('local_us', manifest_path, get_input_stream(request)) +# manifest_path = digest_tools.content_path(tag_digest) +# storage.stream_write('local_us', manifest_path, get_input_stream(request)) - return make_response('Manifest {0}'.format(tag_digest)) +# return make_response('Manifest {0}'.format(tag_digest)) diff --git a/endpoints/v2/registry.py b/endpoints/v2/registry.py deleted file mode 100644 index a70cc3a15..000000000 --- a/endpoints/v2/registry.py +++ /dev/null @@ -1,12 +0,0 @@ -import logging - -from endpoints.v2 import v2_bp - - -logging.getLogger(__name__) - - -@v2_bp.route() - -@process_auth -@anon_protect diff --git a/storage/basestorage.py b/storage/basestorage.py index da297fcf1..2560f80db 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -93,3 +93,28 @@ class BaseStorage(StoragePaths): def get_checksum(self, path): raise NotImplementedError + + +class DigestInvalidException(RuntimeError): + pass + + +class BaseStorageV2(BaseStorage): + def initiate_chunked_upload(self): + """ Start a new chunked upload, and return a handle with which the upload can be referenced. + """ + raise NotImplementedError + + def stream_upload_chunk(self, uuid, offset, length, in_fp): + """ Upload the specified amount of data from the given file pointer to the chunked destination + specified, starting at the given offset. Returns the number of bytes written. + """ + raise NotImplementedError + + def complete_chunked_upload(self, uuid, final_path, digest_to_verify): + """ Complete the chunked upload and store the final results in the path indicated. + """ + raise NotImplementedError + + + diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 67650225d..26a0f5dbd 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -3,7 +3,7 @@ import logging from functools import wraps -from storage.basestorage import StoragePaths, BaseStorage +from storage.basestorage import StoragePaths, BaseStorage, BaseStorageV2 logger = logging.getLogger(__name__) @@ -42,3 +42,6 @@ class DistributedStorage(StoragePaths): remove = _location_aware(BaseStorage.remove) get_checksum = _location_aware(BaseStorage.get_checksum) get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads) + initiate_chunked_upload = _location_aware(BaseStorageV2.initiate_chunked_upload) + stream_upload_chunk = _location_aware(BaseStorageV2.stream_upload_chunk) + complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload) diff --git a/storage/local.py b/storage/local.py index 056c68c05..78b6ec7c2 100644 --- a/storage/local.py +++ b/storage/local.py @@ -3,10 +3,13 @@ import shutil import hashlib import io -from storage.basestorage import BaseStorage +from uuid import uuid4 + +from storage.basestorage import BaseStorageV2 +from digest import digest_tools -class LocalStorage(BaseStorage): +class LocalStorage(BaseStorageV2): def __init__(self, storage_path): self._root_path = storage_path @@ -46,15 +49,26 @@ class LocalStorage(BaseStorage): def stream_write(self, path, fp, content_type=None, content_encoding=None): # Size is mandatory path = self._init_path(path, create=True) - with open(path, mode='wb') as f: - while True: - try: - buf = fp.read(self.buffer_size) - if not buf: - break - f.write(buf) - except IOError: + with open(path, mode='wb') as out_fp: + self._stream_write_to_fp(fp, out_fp) + + def _stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1): + """ Copy the specified number of bytes from the input file stream to the output stream. If + num_bytes < 0 copy until the stream ends. + """ + bytes_copied = 0 + bytes_remaining = num_bytes + while bytes_remaining > 0 or num_bytes < 0: + try: + buf = in_fp.read(self.buffer_size) + if not buf: break + out_fp.write(buf) + bytes_copied += len(buf) + except IOError: + break + + return bytes_copied def list_directory(self, path=None): path = self._init_path(path) @@ -92,3 +106,36 @@ class LocalStorage(BaseStorage): break sha_hash.update(buf) return sha_hash.hexdigest()[:7] + + + def _rel_upload_path(self, uuid): + return 'uploads/{0}'.format(uuid) + + + def initiate_chunked_upload(self): + new_uuid = str(uuid4()) + + # Just create an empty file at the path + with open(self._init_path(self._rel_upload_path(new_uuid), create=True), 'w'): + pass + + return new_uuid + + def stream_upload_chunk(self, uuid, offset, length, in_fp): + with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage: + upload_storage.seek(offset) + return self._stream_write_to_fp(in_fp, upload_storage, length) + + def complete_chunked_upload(self, uuid, final_path, digest_to_verify): + content_path = self._rel_upload_path(uuid) + content_digest = digest_tools.sha256_digest_from_generator(self.stream_read(content_path)) + + if not digest_tools.digests_equal(content_digest, digest_to_verify): + msg = 'Given: {0} Computed: {1}'.format(digest_to_verify, content_digest) + raise digest_tools.InvalidDigestException(msg) + + final_path = self._init_path(final_path, create=True) + shutil.move(self._init_path(content_path), final_path) + + + diff --git a/test/registry_tests.py b/test/registry_tests.py index 3f7d3da41..15b7ff983 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -6,9 +6,7 @@ from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app -from endpoints.registry import registry -from endpoints.index import index -from endpoints.tags import tags +from endpoints.v1 import v1_bp from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token @@ -23,9 +21,7 @@ from cStringIO import StringIO from util.checksums import compute_simple try: - app.register_blueprint(index, url_prefix='/v1') - app.register_blueprint(tags, url_prefix='/v1') - app.register_blueprint(registry, url_prefix='/v1') + app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered diff --git a/test/specs.py b/test/specs.py index f8a4e491b..3e5b4e585 100644 --- a/test/specs.py +++ b/test/specs.py @@ -112,123 +112,123 @@ class IndexTestSpec(object): def build_index_specs(): return [ - IndexTestSpec(url_for('registry.get_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_layer', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 404, 404, 404, 404), - IndexTestSpec(url_for('registry.get_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_layer', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.get_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_layer', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.put_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_layer', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_layer', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 403, 404).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_layer', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_layer', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 403, 404).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_checksum', + IndexTestSpec(url_for('v1.put_image_checksum', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_checksum', + IndexTestSpec(url_for('v1.put_image_checksum', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_checksum', + IndexTestSpec(url_for('v1.put_image_checksum', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('registry.get_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_json', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 404, 404, 404, 404), - IndexTestSpec(url_for('registry.get_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_json', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.get_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.get_image_json', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.get_image_ancestry', + IndexTestSpec(url_for('v1.get_image_ancestry', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 404, 404, 404, 404), - IndexTestSpec(url_for('registry.get_image_ancestry', + IndexTestSpec(url_for('v1.get_image_ancestry', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.get_image_ancestry', + IndexTestSpec(url_for('v1.get_image_ancestry', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 404, 404), - IndexTestSpec(url_for('registry.put_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_json', image_id=FAKE_IMAGE_ID), PUBLIC_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_json', image_id=FAKE_IMAGE_ID), PRIVATE_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('registry.put_image_json', image_id=FAKE_IMAGE_ID), + IndexTestSpec(url_for('v1.put_image_json', image_id=FAKE_IMAGE_ID), ORG_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('index.create_user'), NO_REPO, 400, 400, 400, + IndexTestSpec(url_for('v1.create_user'), NO_REPO, 400, 400, 400, 400).set_method('POST').set_data_from_obj(NEW_USER_DETAILS), - IndexTestSpec(url_for('index.get_user'), NO_REPO, 404, 200, 200, 200), + IndexTestSpec(url_for('v1.get_user'), NO_REPO, 404, 200, 200, 200), - IndexTestSpec(url_for('index.update_user', username=FAKE_USERNAME), + IndexTestSpec(url_for('v1.update_user', username=FAKE_USERNAME), NO_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('index.create_repository', repository=PUBLIC_REPO), + IndexTestSpec(url_for('v1.create_repository', repository=PUBLIC_REPO), NO_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('index.create_repository', repository=PRIVATE_REPO), + IndexTestSpec(url_for('v1.create_repository', repository=PRIVATE_REPO), NO_REPO, 403, 403, 403, 201).set_method('PUT'), - IndexTestSpec(url_for('index.create_repository', repository=ORG_REPO), + IndexTestSpec(url_for('v1.create_repository', repository=ORG_REPO), NO_REPO, 403, 403, 403, 201).set_method('PUT'), - IndexTestSpec(url_for('index.update_images', repository=PUBLIC_REPO), + IndexTestSpec(url_for('v1.update_images', repository=PUBLIC_REPO), NO_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('index.update_images', repository=PRIVATE_REPO), + IndexTestSpec(url_for('v1.update_images', repository=PRIVATE_REPO), NO_REPO, 403, 403, 403, 204).set_method('PUT'), - IndexTestSpec(url_for('index.update_images', repository=ORG_REPO), NO_REPO, + IndexTestSpec(url_for('v1.update_images', repository=ORG_REPO), NO_REPO, 403, 403, 403, 204).set_method('PUT'), - IndexTestSpec(url_for('index.get_repository_images', + IndexTestSpec(url_for('v1.get_repository_images', repository=PUBLIC_REPO), NO_REPO, 200, 200, 200, 200), - IndexTestSpec(url_for('index.get_repository_images', + IndexTestSpec(url_for('v1.get_repository_images', repository=PRIVATE_REPO)), - IndexTestSpec(url_for('index.get_repository_images', repository=ORG_REPO)), + IndexTestSpec(url_for('v1.get_repository_images', repository=ORG_REPO)), - IndexTestSpec(url_for('index.delete_repository_images', + IndexTestSpec(url_for('v1.delete_repository_images', repository=PUBLIC_REPO), NO_REPO, 501, 501, 501, 501).set_method('DELETE'), - IndexTestSpec(url_for('index.put_repository_auth', repository=PUBLIC_REPO), + IndexTestSpec(url_for('v1.put_repository_auth', repository=PUBLIC_REPO), NO_REPO, 501, 501, 501, 501).set_method('PUT'), - IndexTestSpec(url_for('index.get_search'), NO_REPO, 200, 200, 200, 200), + IndexTestSpec(url_for('v1.get_search'), NO_REPO, 200, 200, 200, 200), - IndexTestSpec(url_for('index.ping'), NO_REPO, 200, 200, 200, 200), + IndexTestSpec(url_for('v1.ping'), NO_REPO, 200, 200, 200, 200), - IndexTestSpec(url_for('tags.get_tags', repository=PUBLIC_REPO), NO_REPO, + IndexTestSpec(url_for('v1.get_tags', repository=PUBLIC_REPO), NO_REPO, 200, 200, 200, 200), - IndexTestSpec(url_for('tags.get_tags', repository=PRIVATE_REPO)), - IndexTestSpec(url_for('tags.get_tags', repository=ORG_REPO)), + IndexTestSpec(url_for('v1.get_tags', repository=PRIVATE_REPO)), + IndexTestSpec(url_for('v1.get_tags', repository=ORG_REPO)), - IndexTestSpec(url_for('tags.get_tag', repository=PUBLIC_REPO, + IndexTestSpec(url_for('v1.get_tag', repository=PUBLIC_REPO, tag=FAKE_TAG_NAME), NO_REPO, 400, 400, 400, 400), - IndexTestSpec(url_for('tags.get_tag', repository=PRIVATE_REPO, + IndexTestSpec(url_for('v1.get_tag', repository=PRIVATE_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400), - IndexTestSpec(url_for('tags.get_tag', repository=ORG_REPO, + IndexTestSpec(url_for('v1.get_tag', repository=ORG_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 400, 400), - IndexTestSpec(url_for('tags.put_tag', repository=PUBLIC_REPO, + IndexTestSpec(url_for('v1.put_tag', repository=PUBLIC_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 403).set_method('PUT'), - IndexTestSpec(url_for('tags.put_tag', repository=PRIVATE_REPO, + IndexTestSpec(url_for('v1.put_tag', repository=PRIVATE_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('tags.put_tag', repository=ORG_REPO, + IndexTestSpec(url_for('v1.put_tag', repository=ORG_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 400).set_method('PUT'), - IndexTestSpec(url_for('tags.delete_tag', repository=PUBLIC_REPO, + IndexTestSpec(url_for('v1.delete_tag', repository=PUBLIC_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 403).set_method('DELETE'), - IndexTestSpec(url_for('tags.delete_tag', repository=PRIVATE_REPO, + IndexTestSpec(url_for('v1.delete_tag', repository=PRIVATE_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 400).set_method('DELETE'), - IndexTestSpec(url_for('tags.delete_tag', repository=ORG_REPO, + IndexTestSpec(url_for('v1.delete_tag', repository=ORG_REPO, tag=FAKE_TAG_NAME), NO_REPO, 403, 403, 403, 400).set_method('DELETE'), ] diff --git a/test/test_anon_checked.py b/test/test_anon_checked.py index f9eed9ee8..05a6651d8 100644 --- a/test/test_anon_checked.py +++ b/test/test_anon_checked.py @@ -1,8 +1,6 @@ import unittest -from endpoints.tags import tags -from endpoints.registry import registry -from endpoints.index import index +from endpoints.v1 import v1_bp from endpoints.verbs import verbs @@ -23,9 +21,7 @@ class TestAnonymousAccessChecked(unittest.TestCase): deferred_function(Checker(self)) def test_anonymous_access_checked(self): - self.verifyBlueprint(tags) - self.verifyBlueprint(registry) - self.verifyBlueprint(index) + self.verifyBlueprint(v1_bp) self.verifyBlueprint(verbs) if __name__ == '__main__': diff --git a/test/test_digest_tools.py b/test/test_digest_tools.py index 626eda86e..954c01052 100644 --- a/test/test_digest_tools.py +++ b/test/test_digest_tools.py @@ -1,6 +1,6 @@ import unittest -from endpoints.v2.digest_tools import parse_digest, content_path, InvalidDigestException +from digest.digest_tools import parse_digest, content_path, InvalidDigestException class TestParseDigest(unittest.TestCase): def test_parse_good(self): diff --git a/test/test_endpoint_security.py b/test/test_endpoint_security.py index 0423b28dc..27b393cd7 100644 --- a/test/test_endpoint_security.py +++ b/test/test_endpoint_security.py @@ -4,14 +4,10 @@ from app import app from util.names import parse_namespace_repository from initdb import setup_database_for_testing, finished_database_for_testing from specs import build_index_specs -from endpoints.registry import registry -from endpoints.index import index -from endpoints.tags import tags +from endpoints.v1 import v1_bp -app.register_blueprint(index, url_prefix='/v1') -app.register_blueprint(tags, url_prefix='/v1') -app.register_blueprint(registry, url_prefix='/v1') +app.register_blueprint(v1_bp, url_prefix='/v1') NO_ACCESS_USER = 'freshuser'