Another huge batch of registry v2 changes

Add patch support and resumeable sha
Implement all actual registry methods
Add a simple database generation option
This commit is contained in:
Jake Moshenko 2015-08-12 16:39:32 -04:00
parent 5ba3521e67
commit e1b3e9e6ae
29 changed files with 1095 additions and 430 deletions

15
TODO.md Normal file
View file

@ -0,0 +1,15 @@
- Convert the flattened image generator to use the database ancestry instead of the json file
- Convert verbs to load json from either db or storage
- Convert verbs to work with v1 and cas layer storage locations
- Fix all tests
- Fix uncompressed size backfill
- File issue to move queries out of uncompressed size backfill and use subquery random
- Consider removing the new jwest dependency
- Update the max fresh on registry tokens, 300s is not long enough to complete all registry actions
- Fix the sizes stored in the db
- Make sure we handle more of the v2 api than just what is required to push and pull
- Handle registry API error conditions
- Fill in the registry v2 methods on other storage engines
- Write a script to backfill the json metadata
- Verify the manifest, and throw the proper error if unverified
- Convert uploads to get locked to a placement, e.g. once an upload starts, all communication goes through that replica

2
app.py
View file

@ -43,10 +43,10 @@ CONFIG_PROVIDER = FileConfigProvider(OVERRIDE_CONFIG_DIRECTORY, 'config.yaml', '
app = Flask(__name__)
logger = logging.getLogger(__name__)
class RegexConverter(BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
logger.debug('Installing regex converter with regex: %s', items[0])
self.regex = items[0]

View file

@ -42,7 +42,6 @@ class DefaultConfig(object):
LOGGING_LEVEL = 'DEBUG'
SEND_FILE_MAX_AGE_DEFAULT = 0
POPULATE_DB_TEST_DATA = True
PREFERRED_URL_SCHEME = 'http'
SERVER_HOSTNAME = 'localhost:5000'

View file

@ -3,6 +3,9 @@ import logging
import uuid
import time
import toposort
import base64
import resumablehashlib
import json
from random import SystemRandom
from datetime import datetime
@ -343,7 +346,7 @@ class Repository(BaseModel):
# These models don't need to use transitive deletes, because the referenced objects
# are cleaned up directly
skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger}
skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload}
# We need to sort the ops so that models get cleaned in order of their dependencies
ops = reversed(list(self.dependencies(delete_nullable)))
@ -485,6 +488,7 @@ class ImageStorage(BaseModel):
uncompressed_size = BigIntegerField(null=True)
aggregate_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
class ImageStorageTransformation(BaseModel):
@ -552,6 +556,12 @@ class Image(BaseModel):
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
class Meta:
database = db
read_slaves = (read_slave,)
@ -740,6 +750,44 @@ class RepositoryAuthorizedEmail(BaseModel):
)
class ResumableSHAField(TextField):
def db_value(self, value):
sha_state = value.state()
# One of the fields is a byte string, let's base64 encode it to make sure
# we can store and fetch it regardless of default collocation
sha_state[3] = base64.b64encode(sha_state[3])
return json.dumps(sha_state)
def python_value(self, value):
to_resume = resumablehashlib.sha256()
if value is None:
return to_resume
sha_state = json.loads(value)
# We need to base64 decode the data bytestring
sha_state[3] = base64.b64decode(sha_state[3])
to_resume.set_state(sha_state)
return to_resume
class BlobUpload(BaseModel):
repository = ForeignKeyField(Repository, index=True)
uuid = CharField(index=True, unique=True)
byte_count = IntegerField(default=0)
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
location = ForeignKeyField(ImageStorageLocation)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on email and repository
(('repository', 'uuid'), True),
)
all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility,
RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem,
@ -750,4 +798,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
AccessTokenKind, Star, RepositoryActionCount, TagManifest]
AccessTokenKind, Star, RepositoryActionCount, TagManifest, BlobUpload]

View file

@ -9,6 +9,10 @@ class BlobDoesNotExist(DataModelException):
pass
class InvalidBlobUpload(DataModelException):
pass
class InvalidEmailAddressException(DataModelException):
pass
@ -65,6 +69,10 @@ class InvalidTeamMemberException(DataModelException):
pass
class InvalidManifestException(DataModelException):
pass
class TooManyLoginAttemptsException(Exception):
def __init__(self, message, retry_after):
super(TooManyLoginAttemptsException, self).__init__(message)

View file

@ -1,8 +1,8 @@
from uuid import uuid4
from data.model import tag, _basequery, BlobDoesNotExist, db_transaction
from data.model import tag, _basequery, BlobDoesNotExist, InvalidBlobUpload, db_transaction
from data.database import (Repository, Namespace, ImageStorage, Image, ImageStorageLocation,
ImageStoragePlacement)
ImageStoragePlacement, BlobUpload)
def get_repo_blob_by_digest(namespace, repo_name, blob_digest):
@ -15,9 +15,9 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest):
.join(ImageStorage)
.join(Image)
.join(Repository)
.join(Namespace)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace,
ImageStorage.checksum == blob_digest))
ImageStorage.checksum == blob_digest, ImageStorage.uploading == False))
if not placements:
raise BlobDoesNotExist('Blob does not exist with digest: {0}'.format(blob_digest))
@ -26,24 +26,45 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest):
return found
def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_name,
def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj,
link_expiration_s):
""" Store a record of the blob and temporarily link it to the specified repository.
"""
random_image_name = str(uuid4())
with db_transaction:
with db_transaction():
repo = _basequery.get_existing_repository(namespace, repo_name)
try:
storage = ImageStorage.get(checksum=blob_digest)
location = ImageStorageLocation.get(name=location_name)
ImageStoragePlacement.get(storage=storage, location=location)
ImageStoragePlacement.get(storage=storage, location=location_obj)
except ImageStorage.DoesNotExist:
storage = ImageStorage.create(checksum=blob_digest)
storage = ImageStorage.create(checksum=blob_digest, uploading=False)
ImageStoragePlacement.create(storage=storage, location=location_obj)
except ImageStoragePlacement.DoesNotExist:
ImageStoragePlacement.create(storage=storage, location=location)
ImageStoragePlacement.create(storage=storage, location=location_obj)
# Create a temporary link into the repository, to be replaced by the v1 metadata later
# and create a temporary tag to reference it
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
def get_blob_upload(namespace, repo_name, upload_uuid):
""" Load the upload which is already in progress.
"""
try:
return (BlobUpload
.select()
.join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace,
BlobUpload.uuid == upload_uuid)
.get())
except BlobUpload.DoesNotExist:
raise InvalidBlobUpload()
def initiate_upload(namespace, repo_name, uuid, location_name):
repo = _basequery.get_existing_repository(namespace, repo_name)
location = ImageStorageLocation.get(name=location_name)
return BlobUpload.create(repository=repo, location=location, uuid=uuid)

View file

@ -4,7 +4,8 @@ import dateutil.parser
from peewee import JOIN_LEFT_OUTER, fn
from datetime import datetime
from data.model import DataModelException, db_transaction, _basequery, storage
from data.model import (DataModelException, db_transaction, _basequery, storage,
InvalidImageException)
from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage,
ImageStorageLocation, RepositoryPermission, db_for_update)
@ -247,7 +248,7 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations,
return repo_image
logger.debug('Creating new storage for docker id: %s', docker_image_id)
new_storage = storage.create_storage(preferred_location)
new_storage = storage.create_v1_storage(preferred_location)
return Image.create(docker_image_id=docker_image_id,
repository=repo_obj, storage=new_storage,
@ -255,7 +256,7 @@ def find_create_or_link_image(docker_image_id, repo_obj, username, translations,
def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment,
command, parent=None):
command, v1_json_metadata, parent=None):
with db_transaction():
query = (Image
.select(Image, ImageStorage)
@ -273,7 +274,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
# We cleanup any old checksum in case it's a retry after a fail
fetched.storage.checksum = None
fetched.storage.created = datetime.now()
fetched.created = datetime.now()
if created_date_str is not None:
try:
@ -282,14 +283,14 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
# parse raises different exceptions, so we cannot use a specific kind of handler here.
pass
fetched.storage.comment = comment
fetched.storage.command = command
fetched.comment = comment
fetched.command = command
fetched.v1_json_metadata = v1_json_metadata
if parent:
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
fetched.save()
fetched.storage.save()
return fetched
@ -334,8 +335,65 @@ def set_image_size(docker_image_id, namespace_name, repository_name, image_size,
return image
def get_image(repo, dockerfile_id):
def get_image(repo, docker_image_id):
try:
return Image.get(Image.docker_image_id == dockerfile_id, Image.repository == repo)
return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo)
except Image.DoesNotExist:
return None
def get_repo_image_by_storage_checksum(namespace, repository_name, storage_checksum):
try:
return (Image
.select()
.join(ImageStorage)
.switch(Image)
.join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repository_name, Namespace.username == namespace,
ImageStorage.checksum == storage_checksum, ImageStorage.uploading == False)
.get())
except Image.DoesNotExist:
msg = 'Image with storage checksum {0} does not exist in repo {1}/{2}'.format(storage_checksum,
namespace,
repository_name)
raise InvalidImageException(msg)
def synthesize_v1_image(namespace, repository_name, storage_checksum, docker_image_id,
created_date_str, comment, command, v1_json_metadata, parent_docker_id):
""" Find an existing image with this docker image id, and if none exists, write one with the
specified metadata.
"""
repo = _basequery.get_existing_repository(namespace, repository_name)
# Sometimes the manifest may reference an image that already exists
found = get_image(repo, docker_image_id)
if found is not None:
# The image already exists, nothing to do
return found
the_bits = storage.get_repo_storage_by_checksum(namespace, repository_name, storage_checksum)
ancestors = '/'
if parent_docker_id is not None:
parent = get_repo_image(namespace, repository_name, parent_docker_id)
if parent is None:
msg = 'Parent not found with docker image id {0} in repo {1}/{2}'.format(parent_docker_id,
namespace,
repository_name)
raise InvalidImageException(msg)
ancestors = '{0}{1}/'.format(parent.ancestors, parent.id)
created = None
if created_date_str is not None:
try:
created = dateutil.parser.parse(created_date_str).replace(tzinfo=None)
except:
# parse raises different exceptions, so we cannot use a specific kind of handler here.
pass
return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment,
command=command, v1_json_metadata=v1_json_metadata, created=created,
storage=the_bits, repository=repo)

View file

@ -5,7 +5,7 @@ from peewee import JOIN_LEFT_OUTER, fn
from data.model import config, db_transaction, InvalidImageException
from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStoragePlacement,
ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature,
ImageStorageSignatureKind)
ImageStorageSignatureKind, Repository, Namespace)
logger = logging.getLogger(__name__)
@ -18,7 +18,7 @@ def find_or_create_derived_storage(source, transformation_name, preferred_locati
logger.debug('Creating storage dervied from source: %s', source.uuid)
trans = ImageStorageTransformation.get(name=transformation_name)
new_storage = create_storage(preferred_location)
new_storage = create_v1_storage(preferred_location)
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
return new_storage
@ -117,8 +117,8 @@ def garbage_collect_storage(storage_id_whitelist):
config.store.remove({location_name}, image_path)
def create_storage(location_name):
storage = ImageStorage.create()
def create_v1_storage(location_name):
storage = ImageStorage.create(cas_path=False)
location = ImageStorageLocation.get(name=location_name)
ImageStoragePlacement.create(location=location, storage=storage)
storage.locations = {location_name}
@ -138,10 +138,9 @@ def lookup_storage_signature(storage, signature_kind):
kind = ImageStorageSignatureKind.get(name=signature_kind)
try:
return (ImageStorageSignature
.select()
.where(ImageStorageSignature.storage == storage,
ImageStorageSignature.kind == kind)
.get())
.select()
.where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind)
.get())
except ImageStorageSignature.DoesNotExist:
return None
@ -149,12 +148,12 @@ def lookup_storage_signature(storage, signature_kind):
def find_derived_storage(source, transformation_name):
try:
found = (ImageStorage
.select(ImageStorage, DerivedImageStorage)
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
.join(ImageStorageTransformation)
.where(DerivedImageStorage.source == source,
ImageStorageTransformation.name == transformation_name)
.get())
.select(ImageStorage, DerivedImageStorage)
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
.join(ImageStorageTransformation)
.where(DerivedImageStorage.source == source,
ImageStorageTransformation.name == transformation_name)
.get())
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
return found
@ -176,16 +175,17 @@ def delete_derived_storage_by_uuid(storage_uuid):
image_storage.delete_instance(recursive=True)
def get_storage_by_uuid(storage_uuid):
placements = list(ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage)
.where(ImageStorage.uuid == storage_uuid))
def _get_storage(query_modifier):
query = (ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage))
placements = list(query_modifier(query))
if not placements:
raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
raise InvalidImageException()
found = placements[0].storage
found.locations = {placement.location.name for placement in placements}
@ -193,3 +193,26 @@ def get_storage_by_uuid(storage_uuid):
return found
def get_storage_by_uuid(storage_uuid):
def filter_to_uuid(query):
return query.where(ImageStorage.uuid == storage_uuid)
try:
return _get_storage(filter_to_uuid)
except InvalidImageException:
raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
def get_repo_storage_by_checksum(namespace, repository_name, checksum):
def filter_to_repo_and_checksum(query):
return (query
.join(Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.name == repository_name, Namespace.username == namespace,
ImageStorage.checksum == checksum))
try:
return _get_storage(filter_to_repo_and_checksum)
except InvalidImageException:
raise InvalidImageException('No storage found with checksum {0}'.format(checksum))

View file

@ -1,7 +1,8 @@
from uuid import uuid4
from data.model import image, db_transaction, DataModelException, _basequery
from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace,
from data.model import (image, db_transaction, DataModelException, _basequery,
InvalidManifestException)
from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest,
get_epoch_timestamp, db_for_update)
@ -36,8 +37,8 @@ def list_repository_tags(namespace_name, repository_name, include_hidden=False,
return query
def create_or_update_tag(namespace_name, repository_name, tag_name,
tag_docker_image_id, reversion=False):
def create_or_update_tag(namespace_name, repository_name, tag_name, tag_docker_image_id,
reversion=False):
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
@ -160,3 +161,55 @@ def revert_tag(repo_obj, tag_name, docker_image_id):
return create_or_update_tag(repo_obj.namespace_user.username, repo_obj.name, tag_name,
docker_image_id, reversion=True)
def store_tag_manifest(namespace, repo_name, tag_name, docker_image_id, manifest_digest,
manifest_data):
tag = create_or_update_tag(namespace, repo_name, tag_name, docker_image_id)
return TagManifest.create(tag=tag, digest=manifest_digest, json_data=manifest_data)
def _get_active_tag(namespace, repo_name, tag_name):
return _tag_alive(RepositoryTag
.select()
.join(Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(RepositoryTag.name == tag_name, Repository.name == repo_name,
Namespace.username == namespace)).get()
def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest_digest,
manifest_data):
tag = _get_active_tag(namespace, repo_name, tag_name)
return TagManifest.create(tag=tag, digest=manifest_digest, json_data=manifest_data)
def load_tag_manifest(namespace, repo_name, tag_name):
try:
return (_load_repo_manifests(namespace, repo_name)
.where(RepositoryTag.name == tag_name)
.get())
except TagManifest.DoesNotExist:
msg = 'Manifest not found for tag {0} in repo {1}/{2}'.format(tag_name, namespace, repo_name)
raise InvalidManifestException(msg)
def load_manifest_by_digest(namespace, repo_name, digest):
try:
return (_load_repo_manifests(namespace, repo_name)
.where(TagManifest.digest == digest)
.get())
except TagManifest.DoesNotExist:
msg = 'Manifest not found with digest {0} in repo {1}/{2}'.format(digest, namespace, repo_name)
raise InvalidManifestException(msg)
def _load_repo_manifests(namespace, repo_name):
return (TagManifest
.select(TagManifest, RepositoryTag)
.join(RepositoryTag)
.join(Image)
.join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace))

View file

@ -68,7 +68,7 @@ def compute_tarsum(fp, json_data):
def simple_checksum_handler(json_data):
h = hashlib.sha256(json_data + '\n')
h = hashlib.sha256(json_data.encode('utf8') + '\n')
def fn(buf):
h.update(buf)

View file

@ -2,33 +2,61 @@ import re
import os.path
import hashlib
from collections import namedtuple
Digest = namedtuple('Digest', ['is_tarsum', 'tarsum_version', 'hash_alg', 'hash_bytes'])
DIGEST_PATTERN = r'(tarsum\.(v[\w]+)\+)?([\w]+):([0-9a-f]+)'
DIGEST_REGEX = re.compile(DIGEST_PATTERN)
class InvalidDigestException(RuntimeError):
pass
def parse_digest(digest):
""" Returns the digest parsed out to its components. """
match = DIGEST_REGEX.match(digest)
if match is None or match.end() != len(digest):
raise InvalidDigestException('Not a valid digest: %s', digest)
class Digest(object):
DIGEST_REGEX = re.compile(DIGEST_PATTERN)
is_tarsum = match.group(1) is not None
return Digest(is_tarsum, match.group(2), match.group(3), match.group(4))
def __init__(self, hash_alg, hash_bytes, is_tarsum=False, tarsum_version=None):
self._hash_alg = hash_alg
self._hash_bytes = hash_bytes
self._is_tarsum = is_tarsum
self._tarsum_version = tarsum_version
def __str__(self):
if self._is_tarsum:
return 'tarsum.{0}+{1}:{2}'.format(self._tarsum_version, self._hash_alg, self._hash_bytes)
return '{0}:{1}'.format(self._hash_alg, self._hash_bytes)
def __eq__(self, rhs):
return isinstance(rhs, Digest) and str(self) == str(rhs)
@staticmethod
def parse_digest(digest):
""" Returns the digest parsed out to its components. """
match = Digest.DIGEST_REGEX.match(digest)
if match is None or match.end() != len(digest):
raise InvalidDigestException('Not a valid digest: %s', digest)
is_tarsum = match.group(1) is not None
return Digest(match.group(3), match.group(4), is_tarsum, match.group(2))
@property
def is_tarsum(self):
return self._is_tarsum
@property
def tarsum_version(self):
return self._tarsum_version
@property
def hash_alg(self):
return self._hash_alg
@property
def hash_bytes(self):
return self._hash_bytes
def content_path(digest):
""" Returns a relative path to the parsed digest. """
parsed = parse_digest(digest)
parsed = Digest.parse_digest(digest)
components = []
if parsed.is_tarsum:
@ -58,7 +86,11 @@ def sha256_digest_from_generator(content_generator):
return 'sha256:{0}'.format(digest.hexdigest())
def sha256_digest_from_hashlib(sha256_hash_obj):
return 'sha256:{0}'.format(sha256_hash_obj.hexdigest())
def digests_equal(lhs_digest_string, rhs_digest_string):
""" Parse and compare the two digests, returns True if the digests are equal, False otherwise.
"""
return parse_digest(lhs_digest_string) == parse_digest(rhs_digest_string)
return Digest.parse_digest(lhs_digest_string) == Digest.parse_digest(rhs_digest_string)

View file

@ -12,6 +12,7 @@ from auth.auth_context import get_authenticated_user, get_grant_user_context
from digest import checksums
from util.registry import changes
from util.http import abort, exact_abort
from util.registry.filelike import SocketReader
from auth.permissions import (ReadRepositoryPermission,
ModifyRepositoryPermission)
from data import model, database
@ -23,26 +24,6 @@ from endpoints.decorators import anon_protect
logger = logging.getLogger(__name__)
class SocketReader(object):
def __init__(self, fp):
self._fp = fp
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def read(self, n=-1):
buf = self._fp.read(n)
if not buf:
return ''
for handler in self.handlers:
handler(buf)
return buf
def tell(self):
raise IOError('Stream is not seekable.')
def image_is_uploading(repo_image):
if repo_image is None:
return False
@ -145,9 +126,11 @@ def get_image_layer(namespace, repository, image_id, headers):
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Looking up the layer path')
try:
path = store.image_layer_path(repo_image.storage.uuid)
path = store.blob_path(repo_image.storage.checksum)
if not repo_image.storage.cas_path:
path = store.v1_image_layer_path(repo_image.storage.uuid)
logger.info('Serving legacy v1 image from path: %s', path)
logger.debug('Looking up the direct download URL')
direct_download_url = store.get_direct_download_url(repo_image.storage.locations, path)
@ -186,14 +169,15 @@ def put_image_layer(namespace, repository, image_id):
try:
logger.debug('Retrieving image data')
uuid = repo_image.storage.uuid
json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
json_data = (repo_image.v1_json_metadata or
store.get_content(repo_image.storage.locations, store.image_json_path(uuid)))
except (IOError, AttributeError):
logger.exception('Exception when retrieving image data')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Retrieving image path info')
layer_path = store.image_layer_path(uuid)
layer_path = store.v1_image_layer_path(uuid)
logger.info('Storing layer at v1 path: %s', layer_path)
if (store.exists(repo_image.storage.locations, layer_path) and not
image_is_uploading(repo_image)):
@ -315,7 +299,8 @@ def put_image_checksum(namespace, repository, image_id):
uuid = repo_image.storage.uuid
logger.debug('Looking up repo layer data')
if not store.exists(repo_image.storage.locations, store.image_json_path(uuid)):
if (repo_image.v1_json_metadata is None and
not store.exists(repo_image.storage.locations, store.image_json_path(uuid))):
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
logger.debug('Marking image path')
@ -369,13 +354,17 @@ def get_image_json(namespace, repository, image_id, headers):
logger.debug('Looking up repo layer data')
try:
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
data = (repo_image.v1_json_metadata or
store.get_content(repo_image.storage.locations, store.image_json_path(uuid)))
except (IOError, AttributeError):
flask_abort(404)
logger.debug('Looking up repo layer size')
size = repo_image.storage.image_size
headers['X-Docker-Size'] = str(size)
headers['Content-Type'] = 'application/json'
if size is not None:
headers['X-Docker-Size'] = str(size)
response = make_response(data, 200)
response.headers.extend(headers)
@ -394,37 +383,18 @@ def get_image_ancestry(namespace, repository, image_id, headers):
if not permission.can() and not model.repository.repository_is_public(namespace, repository):
abort(403)
logger.debug('Looking up repo image')
repo_image = model.image.get_repo_image_extended(namespace, repository, image_id)
image = model.image.get_image_by_id(namespace, repository, image_id)
parents = model.image.get_parent_images(namespace, repository, image)
logger.debug('Looking up image data')
try:
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
except (IOError, AttributeError):
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
ancestry_docker_ids = [image.docker_image_id]
ancestry_docker_ids.extend([parent.docker_image_id for parent in reversed(parents)])
logger.debug('Converting to <-> from JSON')
response = make_response(json.dumps(json.loads(data)), 200)
# We can not use jsonify here because we are returning a list not an object
response = make_response(json.dumps(ancestry_docker_ids), 200)
response.headers.extend(headers)
logger.debug('Done')
return response
def generate_ancestry(image_id, uuid, locations, parent_id=None, parent_uuid=None,
parent_locations=None):
if not parent_id:
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps([image_id]))
return
data = store.get_content(parent_locations, store.image_ancestry_path(parent_uuid))
data = json.loads(data)
data.insert(0, image_id)
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps(data))
def store_checksum(image_storage, checksum):
checksum_parts = checksum.split(':')
if len(checksum_parts) != 2:
@ -447,7 +417,8 @@ def put_image_json(namespace, repository, image_id):
logger.debug('Parsing image JSON')
try:
data = json.loads(request.data.decode('utf8'))
v1_metadata = request.data
data = json.loads(v1_metadata.decode('utf8'))
except ValueError:
pass
@ -479,63 +450,38 @@ def put_image_json(namespace, repository, image_id):
model.tag.create_temporary_hidden_tag(repo, repo_image,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
uuid = repo_image.storage.uuid
if image_id != data['id']:
abort(400, 'JSON data contains invalid id for image: %(image_id)s',
issue='invalid-request', image_id=image_id)
parent_id = data.get('parent')
parent_id = data.get('parent', None)
parent_image = None
if parent_id:
logger.debug('Looking up parent image')
parent_image = model.image.get_repo_image_extended(namespace, repository, parent_id)
parent_uuid = parent_image and parent_image.storage.uuid
parent_locations = parent_image and parent_image.storage.locations
if not parent_image or parent_image.storage.uploading:
abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s',
issue='invalid-request', image_id=image_id, parent_id=parent_id)
if parent_id:
logger.debug('Looking up parent image data')
if (parent_id and not
store.exists(parent_locations, store.image_json_path(parent_uuid))):
abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s',
issue='invalid-request', image_id=image_id, parent_id=parent_id)
logger.debug('Looking up image storage paths')
json_path = store.image_json_path(uuid)
logger.debug('Checking if image already exists')
if (store.exists(repo_image.storage.locations, json_path) and not
image_is_uploading(repo_image)):
json_path = store.image_json_path(repo_image.storage.uuid)
if (not image_is_uploading(repo_image) and
(repo_image.v1_json_metadata is not None or
store.exists(repo_image.storage.locations, json_path))):
exact_abort(409, 'Image already exists')
set_uploading_flag(repo_image, True)
# If we reach that point, it means that this is a new image or a retry
# on a failed push
# save the metadata
# on a failed push, save the metadata
command_list = data.get('container_config', {}).get('Cmd', None)
command = json.dumps(command_list) if command_list else None
logger.debug('Setting image metadata')
model.image.set_image_metadata(image_id, namespace, repository, data.get('created'),
data.get('comment'), command, parent_image)
data.get('comment'), command, v1_metadata, parent_image)
logger.debug('Putting json path')
store.put_content(repo_image.storage.locations, json_path, request.data)
logger.debug('Generating image ancestry')
try:
generate_ancestry(image_id, uuid, repo_image.storage.locations, parent_id, parent_uuid,
parent_locations)
except IOError as ioe:
logger.debug('Error when generating ancestry: %s', ioe.message)
abort(404)
logger.debug('Done')
return make_response('true', 200)
@ -572,7 +518,11 @@ def process_image_changes(namespace, repository, image_id):
parent_trie.frombytes(parent_trie_bytes)
# Read in the file entries from the layer tar file
layer_path = store.image_layer_path(uuid)
layer_path = store.blob_path(repo_image.storage.checksum)
if not repo_image.storage.cas_path:
logger.info('Processing diffs for newly stored v1 image at %s', layer_path)
layer_path = store.v1_image_layer_path(uuid)
with store.stream_read_file(image.storage.locations, layer_path) as layer_tar_stream:
removed_files = set()
layer_files = changes.files_and_dirs_from_tar(layer_tar_stream,

View file

@ -3,11 +3,12 @@
import logging
from flask import Blueprint, make_response, url_for, request
from flask import Blueprint, make_response, url_for, request, jsonify
from functools import wraps
from urlparse import urlparse
from endpoints.decorators import anon_protect, anon_allowed
from endpoints.v2.errors import V2RegistryException
from auth.jwt_auth import process_jwt_auth
from auth.auth_context import get_grant_user_context
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
@ -21,6 +22,16 @@ logger = logging.getLogger(__name__)
v2_bp = Blueprint('v2', __name__)
@v2_bp.app_errorhandler(V2RegistryException)
def handle_registry_v2_exception(error):
response = jsonify({
'errors': [error.as_dict()]
})
response.status_code = error.http_status_code
logger.debug('sending response: %s', response.get_data())
return response
def _require_repo_permission(permission_class, allow_public=False):
def wrapper(func):
@wraps(func)
@ -67,3 +78,4 @@ def v2_support_enabled():
from endpoints.v2 import v2auth
from endpoints.v2 import manifest
from endpoints.v2 import blob
from endpoints.v2 import tag

View file

@ -2,16 +2,20 @@
# XXX time as this notice is removed.
import logging
import re
from flask import make_response, url_for, request
from flask import make_response, url_for, request, redirect, Response, abort as flask_abort
from app import storage, app
from data import model
from data import model, database
from digest import digest_tools
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2.errors import BlobUnknown, BlobUploadInvalid, BlobUploadUnknown
from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect
from util.http import abort
from util.cache import cache_control
from util.registry.filelike import wrap_with_hash
from storage.basestorage import InvalidChunkException
logger = logging.getLogger(__name__)
@ -19,6 +23,11 @@ logger = logging.getLogger(__name__)
BASE_BLOB_ROUTE = '/<namespace>/<repo_name>/blobs/<regex("{0}"):digest>'
BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN)
RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$')
class _InvalidRangeHeader(Exception):
pass
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
@ -27,21 +36,81 @@ BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN)
@anon_protect
def check_blob_existence(namespace, repo_name, digest):
try:
found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest)
model.image.get_repo_image_by_storage_checksum(namespace, repo_name, digest)
# The response body must be empty for a successful HEAD request
return make_response('')
except model.InvalidImageException:
raise BlobUnknown()
def _base_blob_fetch(namespace, repo_name, digest):
""" Some work that is common to both GET and HEAD requests. Callers MUST check for proper
authorization before calling this method.
"""
try:
found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest)
except model.BlobDoesNotExist:
abort(404)
raise BlobUnknown()
headers = {
'Docker-Content-Digest': digest,
}
# Add the Accept-Ranges header if the storage engine supports resumable
# downloads.
if storage.get_supports_resumable_downloads(found.storage.locations):
logger.debug('Storage supports resumable downloads')
headers['Accept-Ranges'] = 'bytes'
return found, headers
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
@process_jwt_auth
@require_repo_read
@anon_protect
@cache_control(max_age=31436000)
def check_blob_exists(namespace, repo_name, digest):
_, headers = _base_blob_fetch(namespace, repo_name, digest)
response = make_response('')
response.headers.extend(headers)
return response
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET'])
@process_jwt_auth
@require_repo_read
@anon_protect
@cache_control(max_age=31536000)
def download_blob(namespace, repo_name, digest):
# TODO Implement this
return make_response('')
found, headers = _base_blob_fetch(namespace, repo_name, digest)
path = storage.blob_path(digest)
if not found.cas_path:
logger.info('Generating legacy v1 path for image: %s', digest)
path = storage.v1_image_layer_path(found.uuid)
logger.debug('Looking up the direct download URL')
direct_download_url = storage.get_direct_download_url(found.locations, path)
if direct_download_url:
logger.debug('Returning direct download URL')
resp = redirect(direct_download_url)
resp.headers.extend(headers)
return resp
logger.debug('Streaming layer data')
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
return Response(storage.stream_read(found.locations, path), headers=headers)
def _render_range(end_byte):
return 'bytes=0-{0}'.format(end_byte)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/', methods=['POST'])
@ -49,12 +118,135 @@ def download_blob(namespace, repo_name, digest):
@require_repo_write
@anon_protect
def start_blob_upload(namespace, repo_name):
new_upload_uuid = storage.initiate_chunked_upload(storage.preferred_locations[0])
location_name = storage.preferred_locations[0]
new_upload_uuid = storage.initiate_chunked_upload(location_name)
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name)
digest = request.args.get('digest', None)
if digest is None:
# The user will send the blob data in another request
accepted = make_response('', 202)
accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace,
repo_name=repo_name, upload_uuid=new_upload_uuid)
accepted.headers['Range'] = _render_range(0)
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
return accepted
else:
# The user plans to send us the entire body right now
uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid, range_required=False)
uploaded.save()
return _finish_upload(namespace, repo_name, uploaded, digest)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['GET'])
@process_jwt_auth
@require_repo_write
@anon_protect
def fetch_existing_upload(namespace, repo_name, upload_uuid):
try:
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
accepted = make_response('', 204)
accepted.headers['Range'] = _render_range(found.byte_count)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
def _current_request_path():
return '{0}{1}'.format(request.script_root, request.path)
def _range_not_satisfiable(valid_end):
invalid_range = make_response('', 416)
invalid_range.headers['Location'] = _current_request_path()
invalid_range.headers['Range'] = '0-{0}'.format(valid_end)
invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid']
flask_abort(invalid_range)
def _parse_range_header(range_header_text, valid_start):
""" Parses the range header, and returns a tuple of the start offset and the length,
or raises an _InvalidRangeHeader exception.
"""
found = RANGE_HEADER_REGEX.match(range_header_text)
if found is None:
raise _InvalidRangeHeader()
start = int(found.group(1))
length = int(found.group(2)) - start
if start != valid_start or length <= 0:
raise _InvalidRangeHeader()
return (start, length)
def _upload_chunk(namespace, repo_name, upload_uuid, range_required):
""" Common code among the various uploading paths for appending data to blobs.
Callers MUST call .save() or .delete_instance() on the returned database object.
"""
try:
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
start_offset, length = 0, -1
range_header = request.headers.get('range', None)
if range_required and range_header is None:
_range_not_satisfiable(found.byte_count)
if range_header is not None:
try:
start_offset, length = _parse_range_header(range_header, found.byte_count)
except _InvalidRangeHeader:
_range_not_satisfiable(found.byte_count)
input_fp = wrap_with_hash(get_input_stream(request), found.sha_state)
try:
storage.stream_upload_chunk({found.location.name}, upload_uuid, start_offset, length, input_fp)
except InvalidChunkException:
_range_not_satisfiable(found.byte_count)
found.byte_count += length
return found
def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state)
if not digest_tools.digests_equal(computed_digest, expected_digest):
raise BlobUploadInvalid()
final_blob_location = digest_tools.content_path(expected_digest)
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location)
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
upload_obj.location,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
upload_obj.delete_instance()
response = make_response('', 201)
response.headers['Docker-Content-Digest'] = expected_digest
response.headers['Location'] = url_for('v2.download_blob', namespace=namespace,
repo_name=repo_name, digest=expected_digest)
return response
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
@process_jwt_auth
@require_repo_write
@anon_protect
def upload_chunk(namespace, repo_name, upload_uuid):
upload = _upload_chunk(namespace, repo_name, upload_uuid, range_required=True)
upload.save()
accepted = make_response('', 202)
accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace,
repo_name=repo_name, upload_uuid=new_upload_uuid)
accepted.headers['Range'] = 'bytes=0-0'
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
accepted.headers['Location'] = _current_request_path()
accepted.headers['Range'] = _render_range(upload.byte_count)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
@ -62,22 +254,28 @@ def start_blob_upload(namespace, repo_name):
@process_jwt_auth
@require_repo_write
@anon_protect
def upload_chunk(namespace, repo_name, upload_uuid):
def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid):
digest = request.args.get('digest', None)
upload_location = storage.preferred_locations[0]
bytes_written = storage.stream_upload_chunk(upload_location, upload_uuid, 0, -1,
get_input_stream(request))
if digest is None:
raise BlobUploadInvalid()
if digest is not None:
final_blob_location = digest_tools.content_path(digest)
storage.complete_chunked_upload(upload_location, upload_uuid, final_blob_location, digest)
model.blob.store_blob_record_and_temp_link(namespace, repo_name, digest, upload_location,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
found = _upload_chunk(namespace, repo_name, upload_uuid, range_required=False)
return _finish_upload(namespace, repo_name, found, digest)
response = make_response('', 201)
response.headers['Docker-Content-Digest'] = digest
response.headers['Location'] = url_for('v2.download_blob', namespace=namespace,
repo_name=repo_name, digest=digest)
return response
return make_response('', 202)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
@process_jwt_auth
@require_repo_write
@anon_protect
def cancel_upload(namespace, repo_name, upload_uuid):
try:
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
# We delete the record for the upload first, since if the partial upload in
# storage fails to delete, it doesn't break anything
found.delete_instance()
storage.cancel_chunked_upload({found.location.name}, found.uuid)
return make_response('', 204)

118
endpoints/v2/errors.py Normal file
View file

@ -0,0 +1,118 @@
class V2RegistryException(Exception):
def __init__(self, error_code_str, message, detail, http_status_code=400):
super(V2RegistryException, self).__init__(message)
self.http_status_code = http_status_code
self._error_code_str = error_code_str
self._detail = detail
def as_dict(self):
return {
'code': self._error_code_str,
'message': self.message,
'detail': self._detail if self._detail is not None else {},
}
class BlobUnknown(V2RegistryException):
def __init__(self, detail=None):
super(BlobUnknown, self).__init__('BLOB_UNKNOWN',
'blob unknown to registry',
detail,
404)
class BlobUploadInvalid(V2RegistryException):
def __init__(self, detail=None):
super(BlobUploadInvalid, self).__init__('BLOB_UPLOAD_INVALID',
'blob upload invalid',
detail)
class BlobUploadUnknown(V2RegistryException):
def __init__(self, detail=None):
super(BlobUploadUnknown, self).__init__('BLOB_UPLOAD_UNKNOWN',
'blob upload unknown to registry',
detail,
404)
class DigestInvalid(V2RegistryException):
def __init__(self, detail=None):
super(DigestInvalid, self).__init__('DIGEST_INVALID',
'provided digest did not match uploaded content',
detail)
class ManifestBlobUnknown(V2RegistryException):
def __init__(self, detail=None):
super(ManifestBlobUnknown, self).__init__('MANIFEST_BLOB_UNKNOWN',
'blob unknown to registry',
detail)
class ManifestInvalid(V2RegistryException):
def __init__(self, detail=None):
super(ManifestInvalid, self).__init__('MANIFEST_INVALID',
'manifest invalid',
detail)
class ManifestUnknown(V2RegistryException):
def __init__(self, detail=None):
super(ManifestUnknown, self).__init__('MANIFEST_UNKNOWN',
'manifest unknown',
detail,
404)
class ManifestUnverified(V2RegistryException):
def __init__(self, detail=None):
super(ManifestUnverified, self).__init__('MANIFEST_UNVERIFIED',
'manifest failed signature verification',
detail)
class NameInvalid(V2RegistryException):
def __init__(self, detail=None):
super(NameInvalid, self).__init__('NAME_INVALID',
'invalid repository name',
detail)
class NameUnknown(V2RegistryException):
def __init__(self, detail=None):
super(NameUnknown, self).__init__('NAME_UNKNOWN',
'repository name not known to registry',
detail,
404)
class SizeInvalid(V2RegistryException):
def __init__(self, detail=None):
super(SizeInvalid, self).__init__('SIZE_INVALID',
'provided length did not match content length',
detail)
class TagInvalid(V2RegistryException):
def __init__(self, detail=None):
super(TagInvalid, self).__init__('TAG_INVALID',
'manifest tag did not match URI',
detail)
class Unauthorized(V2RegistryException):
def __init__(self, detail=None):
super(Unauthorized, self).__init__('UNAUTHORIZED',
'access to the requested resource is not authorized',
detail,
401)
class Unsupported(V2RegistryException):
def __init__(self, detail=None):
super(Unsupported, self).__init__('UNSUPPORTED',
'The operation is unsupported.',
detail,
405)

View file

@ -5,38 +5,67 @@ import logging
import re
import jwt.utils
import yaml
import json
from flask import make_response, request
from collections import namedtuple, OrderedDict
from jwkest.jws import SIGNER_ALGS
from jwkest.jwk import RSAKey
from Crypto.PublicKey import RSA
from datetime import datetime
from app import storage
from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write
from endpoints.v2.errors import (ManifestBlobUnknown, ManifestInvalid, ManifestUnverified,
ManifestUnknown, TagInvalid, NameInvalid)
from digest import digest_tools
from data import model
logger = logging.getLogger(__name__)
VALID_TAG_PATTERN = r'[\w][\w.-]{0,127}'
VALID_TAG_REGEX = re.compile(VALID_TAG_PATTERN)
BASE_MANIFEST_ROUTE = '/<namespace>/<repo_name>/manifests/<regex("{0}"):manifest_ref>'
MANIFEST_DIGEST_ROUTE = BASE_MANIFEST_ROUTE.format(digest_tools.DIGEST_PATTERN)
MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN)
ISO_DATETIME_FORMAT_ZULU = '%Y-%m-%dT%H:%M:%SZ'
JWS_ALGORITHM = 'RS256'
ImageMetadata = namedtuple('ImageMetadata', ['digest', 'v1_metadata', 'v1_metadata_str'])
ExtractedV1Metadata = namedtuple('ExtractedV1Metadata', ['docker_id', 'parent', 'created',
'comment', 'command'])
_SIGNATURES_KEY = 'signatures'
_PROTECTED_KEY = 'protected'
_FORMAT_LENGTH_KEY = 'formatLength'
_FORMAT_TAIL_KEY = 'formatTail'
_REPO_NAME_KEY = 'name'
_REPO_TAG_KEY = 'tag'
_FS_LAYERS_KEY = 'fsLayers'
_HISTORY_KEY = 'history'
_BLOB_SUM_KEY = 'blobSum'
_V1_COMPAT_KEY = 'v1Compatibility'
_ARCH_KEY = 'architecture'
_SCHEMA_VER = 'schemaVersion'
class SignedManifest(object):
SIGNATURES_KEY = 'signatures'
PROTECTED_KEY = 'protected'
FORMAT_LENGTH_KEY = 'formatLength'
FORMAT_TAIL_KEY = 'formatTail'
REPO_NAME_KEY = 'name'
REPO_TAG_KEY = 'tag'
def __init__(self, manifest_bytes):
self._bytes = manifest_bytes
parsed = yaml.safe_load(manifest_bytes)
self._parsed = yaml.safe_load(manifest_bytes)
self._signatures = parsed[self.SIGNATURES_KEY]
self._namespace, self._repo_name = parsed[self.REPO_NAME_KEY].split('/')
self._tag = parsed[self.REPO_TAG_KEY]
self._signatures = self._parsed[_SIGNATURES_KEY]
self._namespace, self._repo_name = self._parsed[_REPO_NAME_KEY].split('/')
self._tag = self._parsed[_REPO_TAG_KEY]
self._validate()
@ -59,36 +88,195 @@ class SignedManifest(object):
def tag(self):
return self._tag
@property
def bytes(self):
return self._bytes
@property
def digest(self):
return digest_tools.sha256_digest(self.payload)
@property
def layers(self):
""" Returns a generator of objects that have the blobSum and v1Compatibility keys in them,
starting from the root image and working toward the leaf node.
"""
for blob_sum_obj, history_obj in reversed(zip(self._parsed[_FS_LAYERS_KEY],
self._parsed[_HISTORY_KEY])):
image_digest = digest_tools.Digest.parse_digest(blob_sum_obj[_BLOB_SUM_KEY])
metadata_string = history_obj[_V1_COMPAT_KEY]
v1_metadata = yaml.safe_load(metadata_string)
command_list = v1_metadata.get('container_config', {}).get('Cmd', None)
command = json.dumps(command_list) if command_list else None
extracted = ExtractedV1Metadata(v1_metadata['id'], v1_metadata.get('parent'),
v1_metadata.get('created'), v1_metadata.get('comment'),
command)
yield ImageMetadata(image_digest, extracted, metadata_string)
@property
def payload(self):
protected = self._signatures[0][self.PROTECTED_KEY]
protected = self._signatures[0][_PROTECTED_KEY]
parsed_protected = yaml.safe_load(jwt.utils.base64url_decode(protected))
logger.debug('parsed_protected: %s', parsed_protected)
signed_content_head = self._bytes[:parsed_protected[self.FORMAT_LENGTH_KEY]]
signed_content_head = self._bytes[:parsed_protected[_FORMAT_LENGTH_KEY]]
logger.debug('signed content head: %s', signed_content_head)
signed_content_tail = jwt.utils.base64url_decode(parsed_protected[self.FORMAT_TAIL_KEY])
signed_content_tail = jwt.utils.base64url_decode(parsed_protected[_FORMAT_TAIL_KEY])
logger.debug('signed content tail: %s', signed_content_tail)
return signed_content_head + signed_content_tail
@v2_bp.route('/<namespace>/<repo_name>/manifests/<regex("' + VALID_TAG_PATTERN + '"):tag_name>',
methods=['GET'])
class SignedManifestBuilder(object):
""" Class which represents a manifest which is currently being built.
"""
def __init__(self, namespace, repo_name, tag, architecture='amd64', schema_ver=1):
self._base_payload = {
_REPO_TAG_KEY: tag,
_REPO_NAME_KEY: '{0}/{1}'.format(namespace, repo_name),
_ARCH_KEY: architecture,
_SCHEMA_VER: schema_ver,
}
self._fs_layer_digests = []
self._history = []
def add_layer(self, layer_digest, v1_json_metadata):
self._fs_layer_digests.append({
_BLOB_SUM_KEY: layer_digest,
})
self._history.append({
_V1_COMPAT_KEY: v1_json_metadata,
})
def build(self, json_web_key):
""" Build the payload and sign it, returning a SignedManifest object.
"""
payload = OrderedDict(self._base_payload)
payload.update({
_HISTORY_KEY: self._history,
_FS_LAYERS_KEY: self._fs_layer_digests,
})
payload_str = json.dumps(payload, indent=3)
split_point = payload_str.rfind('\n}')
protected_payload = {
'formatTail': jwt.utils.base64url_encode(payload_str[split_point:]),
'formatLength': split_point,
'time': datetime.utcnow().strftime(ISO_DATETIME_FORMAT_ZULU),
}
protected = jwt.utils.base64url_encode(json.dumps(protected_payload))
logger.debug('Generated protected block: %s', protected)
bytes_to_sign = '{0}.{1}'.format(protected, jwt.utils.base64url_encode(payload_str))
signer = SIGNER_ALGS[JWS_ALGORITHM]
signature = jwt.utils.base64url_encode(signer.sign(bytes_to_sign, json_web_key.get_key()))
logger.debug('Generated signature: %s', signature)
signature_block = {
'header': {
'jwk': json_web_key.to_dict(),
'alg': JWS_ALGORITHM,
},
'signature': signature,
_PROTECTED_KEY: protected,
}
logger.debug('Encoded signature block: %s', json.dumps(signature_block))
payload.update({
_SIGNATURES_KEY: [signature_block],
})
return SignedManifest(json.dumps(payload, indent=3))
@v2_bp.route(MANIFEST_TAGNAME_ROUTE, methods=['GET'])
@process_jwt_auth
@require_repo_read
@anon_protect
def fetch_manifest_by_tagname(namespace, repo_name, tag_name):
logger.debug('Fetching tag manifest with name: %s', tag_name)
return make_response('Manifest {0}'.format(tag_name))
def fetch_manifest_by_tagname(namespace, repo_name, manifest_ref):
try:
manifest = model.tag.load_tag_manifest(namespace, repo_name, manifest_ref)
except model.InvalidManifestException:
try:
manifest = _generate_and_store_manifest(namespace, repo_name, manifest_ref)
except model.DataModelException:
logger.exception('Exception when generating manifest for %s/%s:%s', namespace, repo_name,
manifest_ref)
raise ManifestUnknown()
return make_response(manifest.json_data, 200)
@v2_bp.route('/<namespace>/<repo_name>/manifests/<regex("' + VALID_TAG_PATTERN + '"):tag_name>',
methods=['PUT'])
@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['GET'])
@process_jwt_auth
@require_repo_read
@anon_protect
def fetch_manifest_by_digest(namespace, repo_name, manifest_ref):
try:
manifest = model.tag.load_manifest_by_digest(namespace, repo_name, manifest_ref)
except model.InvalidManifestException:
# Without a tag name to reference, we can't make an attempt to generate the manifest
raise ManifestUnknown()
return make_response(manifest.json_data, 200)
@v2_bp.route(MANIFEST_TAGNAME_ROUTE, methods=['PUT'])
@process_jwt_auth
@require_repo_write
@anon_protect
def write_manifest_by_tagname(namespace, repo_name, tag_name):
def write_manifest_by_tagname(namespace, repo_name, manifest_ref):
manifest = SignedManifest(request.data)
manifest_digest = digest_tools.sha256_digest(manifest.payload)
if manifest.tag != manifest_ref:
raise TagInvalid()
return _write_manifest(namespace, repo_name, manifest)
@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['PUT'])
@process_jwt_auth
@require_repo_write
@anon_protect
def write_manifest_by_digest(namespace, repo_name, manifest_ref):
manifest = SignedManifest(request.data)
if manifest.digest != manifest_ref:
raise ManifestInvalid()
return _write_manifest(namespace, repo_name, manifest)
def _write_manifest(namespace, repo_name, manifest):
if manifest.namespace != namespace or manifest.repo_name != repo_name:
raise NameInvalid()
manifest_digest = manifest.digest
tag_name = manifest.tag
leaf_layer = None
try:
for mdata in manifest.layers:
# Store the v1 metadata in the db
v1_mdata = mdata.v1_metadata
digest_str = str(mdata.digest)
model.image.synthesize_v1_image(namespace, repo_name, digest_str, v1_mdata.docker_id,
v1_mdata.created, v1_mdata.comment, v1_mdata.command,
mdata.v1_metadata_str, v1_mdata.parent)
leaf_layer = mdata
except model.InvalidImageException:
raise ManifestBlobUnknown(detail={'missing': digest_str})
if leaf_layer is None:
# The manifest doesn't actually reference any layers!
raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'})
model.tag.store_tag_manifest(namespace, repo_name, tag_name, leaf_layer.v1_metadata.docker_id,
manifest_digest, request.data)
response = make_response('OK', 202)
response.headers['Docker-Content-Digest'] = manifest_digest
@ -96,15 +284,61 @@ def write_manifest_by_tagname(namespace, repo_name, tag_name):
return response
# @v2_bp.route('/<namespace>/<repo_name>/manifests/<regex("' + digest_tools.DIGEST_PATTERN + '"):tag_digest>',
# methods=['PUT'])
# @process_jwt_auth
# @require_repo_write
# @anon_protect
# def write_manifest(namespace, repo_name, tag_digest):
# logger.debug('Writing tag manifest with name: %s', tag_digest)
@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['DELETE'])
@process_jwt_auth
@require_repo_write
@anon_protect
def delete_manifest_by_digest(namespace, repo_name, manifest_ref):
""" Delete the manifest specified by the digest. Note: there is no equivalent
method for deleting by tag name because it is forbidden by the spec.
"""
try:
manifest = model.tag.load_manifest_by_digest(namespace, repo_name, manifest_ref)
except model.InvalidManifestException:
# Without a tag name to reference, we can't make an attempt to generate the manifest
raise ManifestUnknown()
# manifest_path = digest_tools.content_path(tag_digest)
# storage.stream_write('local_us', manifest_path, get_input_stream(request))
manifest.delete_instance()
# return make_response('Manifest {0}'.format(tag_digest))
return make_response('', 202)
def _generate_and_store_manifest(namespace, repo_name, tag_name):
# First look up the tag object and its ancestors
image = model.tag.get_tag_image(namespace, repo_name, tag_name)
parents = model.image.get_parent_images(namespace, repo_name, image)
# Create and populate the manifest builder
builder = SignedManifestBuilder(namespace, repo_name, tag_name)
# Add the leaf layer
builder.add_layer(image.storage.checksum, __get_and_backfill_image_metadata(image))
for parent in parents:
builder.add_layer(parent.storage.checksum, __get_and_backfill_image_metadata(parent))
# TODO, stop generating a new key every time we sign a manifest, publish our key
new_key = RSA.generate(2048)
jwk = RSAKey(key=new_key)
manifest = builder.build(jwk)
manifest_row = model.tag.associate_generated_tag_manifest(namespace, repo_name, tag_name,
manifest.digest, manifest.bytes)
return manifest_row
def __get_and_backfill_image_metadata(image):
image_metadata = image.v1_json_metadata
if image_metadata is None:
logger.warning('Loading metadata from storage for image id: %s', image.id)
metadata_path = storage.image_json_path(image.storage.uuid)
image_metadata = storage.get_content(image.storage.locations, metadata_path)
image.v1_json_metadata = image_metadata
logger.info('Saving backfilled metadata for image id: %s', image.id)
image.save()
return image_metadata

19
endpoints/v2/tag.py Normal file
View file

@ -0,0 +1,19 @@
# XXX This code is not yet ready to be run in production, and should remain disabled until such
# XXX time as this notice is removed.
from flask import jsonify
from endpoints.v2 import v2_bp, require_repo_read
from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect
from data import model
@v2_bp.route('/<namespace>/<repo_name>/tags/list', methods=['GET'])
@process_jwt_auth
@require_repo_read
@anon_protect
def list_all_tags(namespace, repo_name):
return jsonify({
'name': '{0}/{1}'.format(namespace, repo_name),
'tags': [tag.name for tag in model.tag.list_repository_tags(namespace, repo_name)],
})

View file

@ -4,6 +4,7 @@ import hashlib
import random
import calendar
import os
import argparse
from datetime import datetime, timedelta
from peewee import (SqliteDatabase, create_model_tables, drop_model_tables, savepoint_sqlite,
@ -87,9 +88,16 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
creation_time = REFERENCE_DATE + timedelta(weeks=image_num) + timedelta(days=model_num)
command_list = SAMPLE_CMDS[image_num % len(SAMPLE_CMDS)]
command = json.dumps(command_list) if command_list else None
v1_metadata = {
'id': docker_image_id,
}
if parent is not None:
v1_metadata['parent'] = parent.docker_image_id
new_image = model.image.set_image_metadata(docker_image_id, repo.namespace_user.username,
repo.name, str(creation_time), 'no comment', command,
parent)
v1_metadata, parent)
compressed_size = random.randrange(1, 1024 * 1024 * 1024)
model.image.set_image_size(docker_image_id, repo.namespace_user.username, repo.name,
@ -324,7 +332,7 @@ def wipe_database():
drop_model_tables(all_models, fail_silently=True)
def populate_database():
def populate_database(minimal=False):
logger.debug('Populating the DB with test data.')
new_user_1 = model.user.create_user('devtable', 'password', 'jschorr@devtable.com')
@ -332,6 +340,10 @@ def populate_database():
new_user_1.stripe_id = TEST_STRIPE_ID
new_user_1.save()
if minimal:
logger.debug('Skipping most db population because user requested mininal db')
return
disabled_user = model.user.create_user('disabled', 'password', 'jschorr+disabled@devtable.com')
disabled_user.verified = True
disabled_user.enabled = False
@ -380,7 +392,8 @@ def populate_database():
'to_date': formatdate(calendar.timegm(to_date.utctimetuple())),
'reason': 'database migration'
}
model.notification.create_notification('maintenance', new_user_1, metadata=notification_metadata)
model.notification.create_notification('maintenance', new_user_1,
metadata=notification_metadata)
__generate_repository(new_user_4, 'randomrepo', 'Random repo repository.', False,
@ -618,7 +631,12 @@ def populate_database():
while repositoryactioncounter.count_repository_actions():
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Initialize the test database.')
parser.add_argument('--simple', action='store_true')
args = parser.parse_args()
log_level = getattr(logging, app.config['LOGGING_LEVEL'])
logging.basicConfig(level=log_level)
@ -627,5 +645,4 @@ if __name__ == '__main__':
initialize_database()
if app.config.get('POPULATE_DB_TEST_DATA', False):
populate_database()
populate_database(args.simple)

View file

@ -7,7 +7,7 @@ from app import app as application
import endpoints.decorated
from endpoints.v1 import v1_bp
# from endpoints.v2 import v2_bp
from endpoints.v2 import v2_bp
application.register_blueprint(v1_bp, url_prefix='/v1')
# application.register_blueprint(v2_bp, url_prefix='/v2')
application.register_blueprint(v2_bp, url_prefix='/v2')

View file

@ -36,9 +36,10 @@ git+https://github.com/DevTable/aniso8601-fake.git
git+https://github.com/DevTable/anunidecode.git
git+https://github.com/DevTable/pygithub.git
git+https://github.com/DevTable/container-cloud-config.git
git+https://github.com/coreos/mockldap.git
git+https://github.com/coreos/py-bitbucket.git
git+https://github.com/coreos/pyapi-gitlab.git
git+https://github.com/coreos/mockldap.git
git+https://github.com/coreos/resumablehashlib.git
git+https://github.com/DevTable/python-etcd.git@sslfix
gipc
pyOpenSSL

View file

@ -93,8 +93,9 @@ git+https://github.com/DevTable/aniso8601-fake.git
git+https://github.com/DevTable/anunidecode.git
git+https://github.com/DevTable/pygithub.git
git+https://github.com/DevTable/container-cloud-config.git
git+https://github.com/coreos/mockldap.git
git+https://github.com/coreos/py-bitbucket.git
git+https://github.com/coreos/pyapi-gitlab.git
git+https://github.com/coreos/mockldap.git
git+https://github.com/coreos/resumablehashlib.git
git+https://github.com/DevTable/python-etcd.git@sslfix
git+https://github.com/NateFerrero/oauth2lib.git

View file

@ -1,5 +1,7 @@
import tempfile
from digest.digest_tools import content_path
class StoragePaths(object):
shared_images = 'sharedimages'
@ -23,13 +25,12 @@ class StoragePaths(object):
base_path = self.image_path(storage_uuid)
return '{0}json'.format(base_path)
def image_layer_path(self, storage_uuid):
def v1_image_layer_path(self, storage_uuid):
base_path = self.image_path(storage_uuid)
return '{0}layer'.format(base_path)
def image_ancestry_path(self, storage_uuid):
base_path = self.image_path(storage_uuid)
return '{0}ancestry'.format(base_path)
def blob_path(self, digest_str):
return content_path(digest_str)
def image_file_trie_path(self, storage_uuid):
base_path = self.image_path(storage_uuid)
@ -99,26 +100,30 @@ class BaseStorage(StoragePaths):
raise NotImplementedError
class DigestInvalidException(RuntimeError):
class InvalidChunkException(RuntimeError):
pass
class BaseStorageV2(BaseStorage):
def initiate_chunked_upload(self):
""" Start a new chunked upload, and return a handle with which the upload can be referenced.
def initiate_chunked_upload(self, upload_uuid):
""" Start a new chunked upload
"""
raise NotImplementedError
def stream_upload_chunk(self, uuid, offset, length, in_fp):
def stream_upload_chunk(self, uuid, offset, length, in_fp, hash_obj):
""" Upload the specified amount of data from the given file pointer to the chunked destination
specified, starting at the given offset. Returns the number of bytes written.
specified, starting at the given offset. Raises InvalidChunkException if the offset or
length can not be accepted.
"""
raise NotImplementedError
def complete_chunked_upload(self, uuid, final_path, digest_to_verify):
def complete_chunked_upload(self, uuid, final_path):
""" Complete the chunked upload and store the final results in the path indicated.
"""
raise NotImplementedError
def cancel_chunked_upload(self, uuid):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
"""
raise NotImplementedError

View file

@ -45,3 +45,4 @@ class DistributedStorage(StoragePaths):
initiate_chunked_upload = _location_aware(BaseStorageV2.initiate_chunked_upload)
stream_upload_chunk = _location_aware(BaseStorageV2.stream_upload_chunk)
complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload)
cancel_chunked_upload = _location_aware(BaseStorageV2.cancel_chunked_upload)

View file

@ -8,7 +8,6 @@ import psutil
from uuid import uuid4
from storage.basestorage import BaseStorageV2
from digest import digest_tools
logger = logging.getLogger(__name__)
@ -64,8 +63,9 @@ class LocalStorage(BaseStorageV2):
bytes_copied = 0
bytes_remaining = num_bytes
while bytes_remaining > 0 or num_bytes < 0:
size_to_read = min(bytes_remaining, self.buffer_size)
try:
buf = in_fp.read(self.buffer_size)
buf = in_fp.read(size_to_read)
if not buf:
break
out_fp.write(buf)
@ -112,11 +112,9 @@ class LocalStorage(BaseStorageV2):
sha_hash.update(buf)
return sha_hash.hexdigest()[:7]
def _rel_upload_path(self, uuid):
return 'uploads/{0}'.format(uuid)
def initiate_chunked_upload(self):
new_uuid = str(uuid4())
@ -131,14 +129,8 @@ class LocalStorage(BaseStorageV2):
upload_storage.seek(offset)
return self._stream_write_to_fp(in_fp, upload_storage, length)
def complete_chunked_upload(self, uuid, final_path, digest_to_verify):
def complete_chunked_upload(self, uuid, final_path):
content_path = self._rel_upload_path(uuid)
content_digest = digest_tools.sha256_digest_from_generator(self.stream_read(content_path))
if not digest_tools.digests_equal(content_digest, digest_to_verify):
msg = 'Given: {0} Computed: {1}'.format(digest_to_verify, content_digest)
raise digest_tools.InvalidDigestException(msg)
final_path_abs = self._init_path(final_path, create=True)
if not self.exists(final_path_abs):
logger.debug('Moving content into place at path: %s', final_path_abs)
@ -146,6 +138,10 @@ class LocalStorage(BaseStorageV2):
else:
logger.debug('Content already exists at path: %s', final_path_abs)
def cancel_chunked_upload(self, uuid):
content_path = self._init_path(self._rel_upload_path(uuid))
os.remove(content_path)
def validate(self):
# Load the set of disk mounts.
try:

Binary file not shown.

View file

@ -1,20 +1,23 @@
import unittest
from digest.digest_tools import parse_digest, content_path, InvalidDigestException
from digest.digest_tools import Digest, content_path, InvalidDigestException
class TestParseDigest(unittest.TestCase):
def test_parse_good(self):
examples = [
('tarsum.v123123+sha1:123deadbeef', (True, 'v123123', 'sha1', '123deadbeef')),
('tarsum.v1+sha256:123123', (True, 'v1', 'sha256', '123123')),
('tarsum.v0+md5:abc', (True, 'v0', 'md5', 'abc')),
('sha1:123deadbeef', (False, None, 'sha1', '123deadbeef')),
('sha256:123123', (False, None, 'sha256', '123123')),
('md5:abc', (False, None, 'md5', 'abc')),
('tarsum.v123123+sha1:123deadbeef', ('sha1', '123deadbeef', True, 'v123123')),
('tarsum.v1+sha256:123123', ('sha256', '123123', True, 'v1')),
('tarsum.v0+md5:abc', ('md5', 'abc', True, 'v0')),
('sha1:123deadbeef', ('sha1', '123deadbeef', False, None)),
('sha256:123123', ('sha256', '123123', False, None)),
('md5:abc', ('md5', 'abc', False, None)),
]
for digest, output in examples:
self.assertEquals(parse_digest(digest), output)
for digest, output_args in examples:
self.assertEquals(Digest.parse_digest(digest), Digest(*output_args))
# Test the string method
self.assertEquals(str(Digest.parse_digest(digest)), digest)
def test_parse_fail(self):
examples = [
@ -29,7 +32,7 @@ class TestParseDigest(unittest.TestCase):
for bad_digest in examples:
with self.assertRaises(InvalidDigestException):
parse_digest(bad_digest)
Digest.parse_digest(bad_digest)
class TestDigestPath(unittest.TestCase):

View file

@ -1,104 +0,0 @@
import logging
import json
from data.database import Image, ImageStorage, Repository, User, configure
from data import model
from app import app, storage as store
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
configure(app.config)
# Turn off debug logging for boto
logging.getLogger('boto').setLevel(logging.CRITICAL)
query = (Image
.select(Image, ImageStorage, Repository, User)
.join(ImageStorage)
.switch(Image)
.join(Repository)
.join(User)
.where(ImageStorage.uploading == False))
bad_count = 0
good_count = 0
def resolve_or_create(repo, docker_image_id, new_ancestry):
existing = model.image.get_repo_image_extended(repo.namespace_user.username, repo.name,
docker_image_id)
if existing:
logger.debug('Found existing image: %s, %s', existing.id, docker_image_id)
return existing
else:
# we need to find some storage to link it to
try:
to_link = (ImageStorage
.select()
.join(Image)
.where(Image.docker_image_id == docker_image_id)
.get())
logger.debug('Linking to storage: %s' % to_link.uuid)
created = Image.create(docker_image_id=docker_image_id, repository=repo,
storage=to_link, ancestors=new_ancestry)
logger.debug('Created image: %s' % created)
return created
except ImageStorage.DoesNotExist:
msg = 'No image available anywhere for storage: %s in namespace: %s'
logger.error(msg, docker_image_id, repo.namespace_user.username)
raise RuntimeError()
def all_ancestors_exist(ancestors):
if not ancestors:
return True
found_count = len(list(Image
.select()
.where(Image.id << ancestors)))
return found_count == len(ancestors)
cant_fix = []
for img in query:
try:
with_locations = model.image.get_repo_image_extended(img.repository.namespace_user.username,
img.repository.name, img.docker_image_id)
ancestry_storage = store.image_ancestry_path(img.storage.uuid)
if store.exists(with_locations.storage.locations, ancestry_storage):
full_ancestry = json.loads(store.get_content(with_locations.storage.locations,
ancestry_storage))[1:]
full_ancestry.reverse()
ancestor_dbids = [int(anc_id) for anc_id in img.ancestors.split('/')[1:-1]]
if len(full_ancestry) != len(ancestor_dbids) or not all_ancestors_exist(ancestor_dbids):
logger.error('Image has incomplete ancestry: %s, %s, %s, %s', img.id, img.docker_image_id,
full_ancestry, ancestor_dbids)
fixed_ancestry = '/'
for ancestor in full_ancestry:
ancestor_img = resolve_or_create(img.repository, ancestor,
fixed_ancestry)
fixed_ancestry += str(ancestor_img.id) + '/'
img.ancestors = fixed_ancestry
img.save()
bad_count += 1
else:
good_count += 1
else:
bad_count += 1
except RuntimeError:
cant_fix.append(img)
logger.debug('Bad: %s Good: %s Can\'t Fix: %s', bad_count, good_count,
len(cant_fix))
for cant in cant_fix:
logger.error('Unable to fix %s in repo %s/%s', cant.id, cant.repository.namespace_user.username,
cant.repository.name)

View file

@ -1,67 +0,0 @@
import argparse
import logging
from data import model
from data.database import ImageStoragePlacement, ImageStorageLocation
from app import storage
logger = logging.getLogger(__name__)
PATHSPECS = [
(storage.image_json_path, True),
(storage.image_layer_path, True),
(storage.image_ancestry_path, True),
(storage.image_file_trie_path, False),
(storage.image_file_diffs_path, False),
]
def migrate_image(image, destination_location):
logger.debug('Migrating image: %s -> %s', image.docker_image_id, destination_location.name)
destination_location_set = {destination_location.name}
for path_func, required in PATHSPECS:
path = path_func(image.storage.uuid)
if storage.exists(image.storage.locations, path):
if not storage.exists(destination_location_set, path):
logger.debug('Migrating path: %s', path)
with storage.stream_read_file(image.storage.locations, path) as file_to_migrate:
storage.stream_write(destination_location_set, path, file_to_migrate)
else:
logger.debug('File already present in destination: %s', path)
elif required:
raise RuntimeError('Required file not present in image to migrate: %s', path)
# Successfully migrated, now write the placement
ImageStoragePlacement.create(location=destination_location, storage=image.storage)
parser = argparse.ArgumentParser(description='Replicate an image storage.')
parser.add_argument('--namespace', type=str, required=True,
help='Namespace for the repository containing the image to be replicated')
parser.add_argument('--repository', type=str, required=True,
help='Name for the repository containing the image to be replicated')
parser.add_argument('--imageid', type=str, default=None,
help='Specific image to migrate, entire repo will be migrated if omitted')
parser.add_argument('--to', type=str, required=True,
help='Storage region to which the data should be replicated')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('boto').setLevel(logging.CRITICAL)
args = parser.parse_args()
location = ImageStorageLocation.get(name=args.to)
images = []
if args.imageid is not None:
images = [model.image.get_image_by_id(args.namespace, args.repository, args.imageid)]
else:
images = model.image.get_repository_images(args.namespace, args.repository)
for img in images:
migrate_image(img, location)

24
util/registry/filelike.py Normal file
View file

@ -0,0 +1,24 @@
class SocketReader(object):
def __init__(self, fp):
self._fp = fp
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def read(self, n=-1):
buf = self._fp.read(n)
if not buf:
return ''
for handler in self.handlers:
handler(buf)
return buf
def tell(self):
raise IOError('Stream is not seekable.')
def wrap_with_hash(in_fp, hash_obj):
wrapper = SocketReader(in_fp)
wrapper.add_handler(hash_obj.update)
return wrapper