import json import tarfile from abc import ABCMeta, abstractmethod from collections import namedtuple from cStringIO import StringIO from enum import Enum, unique from six import add_metaclass Image = namedtuple('Image', ['id', 'parent_id', 'bytes', 'size', 'config']) Image.__new__.__defaults__ = (None, None) PushResult = namedtuple('PushResult', ['checksums', 'manifests', 'headers']) PullResult = namedtuple('PullResult', ['manifests', 'image_ids']) def layer_bytes_for_contents(contents, mode='|gz'): layer_data = StringIO() def add_file(name, contents): tar_file_info = tarfile.TarInfo(name=name) tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(contents) tar_file_info.mtime = 1 tar_file = tarfile.open(fileobj=layer_data, mode='w' + mode) tar_file.addfile(tar_file_info, StringIO(contents)) tar_file.close() add_file('contents', contents) layer_bytes = layer_data.getvalue() layer_data.close() return layer_bytes @unique class Failures(Enum): """ Defines the various forms of expected failure. """ UNAUTHENTICATED = 'unauthenticated' UNAUTHORIZED = 'unauthorized' INVALID_REGISTRY = 'invalid-registry' INVALID_REPOSITORY = 'invalid-repository' APP_REPOSITORY = 'app-repository' UNKNOWN_TAG = 'unknown-tag' ANONYMOUS_NOT_ALLOWED = 'anonymous-not-allowed' DISALLOWED_LIBRARY_NAMESPACE = 'disallowed-library-namespace' MISSING_TAG = 'missing-tag' INVALID_TAG = 'invalid-tag' INVALID_IMAGES = 'invalid-images' UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type' INVALID_BLOB = 'invalid-blob' NAMESPACE_DISABLED = 'namespace-disabled' UNAUTHORIZED_FOR_MOUNT = 'unauthorized-for-mount' class ProtocolOptions(object): def __init__(self): self.munge_shas = False self.scopes = None self.cancel_blob_upload = False self.manifest_invalid_blob_references = False self.chunks_for_upload = None self.skip_head_checks = False self.manifest_content_type = None self.mount_blobs = None @add_metaclass(ABCMeta) class RegistryProtocol(object): """ Interface for protocols. """ FAILURE_CODES = {} @abstractmethod def login(self, session, username, password, scopes, expect_success): """ Performs the login flow with the given credentials, over the given scopes. """ @abstractmethod def pull(self, session, namespace, repo_name, tag_names, images, credentials=None, expected_failure=None, options=None): """ Pulls the given tag via the given session, using the given credentials, and ensures the given images match. """ @abstractmethod def push(self, session, namespace, repo_name, tag_names, images, credentials=None, expected_failure=None, options=None): """ Pushes the specified images as the given tag via the given session, using the given credentials. """ def repo_name(self, namespace, repo_name): if namespace: return '%s/%s' % (namespace, repo_name) return repo_name def conduct(self, session, method, url, expected_status=200, params=None, data=None, json_data=None, headers=None, auth=None): if json_data is not None: data = json.dumps(json_data) headers = headers or {} headers['Content-Type'] = 'application/json' if isinstance(expected_status, tuple): expected_status, expected_failure, protocol_step = expected_status if expected_failure is not None: failures = self.__class__.FAILURE_CODES.get(protocol_step, {}) expected_status = failures.get(expected_failure, expected_status) result = session.request(method, url, params=params, data=data, headers=headers, auth=auth) msg = "Expected response %s, got %s" % (expected_status, result.status_code) assert result.status_code == expected_status, msg return result