This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry/protocols.py
Joseph Schorr f86c087b3b Prevent registry operations against disabled namespaces
Allows admins to completely wall off a namespace by disabling it

Fixes https://jira.coreos.com/browse/QUAY-869
2018-05-22 18:36:04 -04:00

112 lines
3.7 KiB
Python

import json
import tarfile
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from cStringIO import StringIO
from enum import Enum, unique
from six import add_metaclass
Image = namedtuple('Image', ['id', 'parent_id', 'bytes', 'size', 'config'])
Image.__new__.__defaults__ = (None, None)
PushResult = namedtuple('PushResult', ['checksums', 'manifests', 'headers'])
PullResult = namedtuple('PullResult', ['manifests', 'image_ids'])
def layer_bytes_for_contents(contents, mode='|gz'):
layer_data = StringIO()
def add_file(name, contents):
tar_file_info = tarfile.TarInfo(name=name)
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(contents)
tar_file_info.mtime = 1
tar_file = tarfile.open(fileobj=layer_data, mode='w' + mode)
tar_file.addfile(tar_file_info, StringIO(contents))
tar_file.close()
add_file('contents', contents)
layer_bytes = layer_data.getvalue()
layer_data.close()
return layer_bytes
@unique
class Failures(Enum):
""" Defines the various forms of expected failure. """
UNAUTHENTICATED = 'unauthenticated'
UNAUTHORIZED = 'unauthorized'
INVALID_REGISTRY = 'invalid-registry'
INVALID_REPOSITORY = 'invalid-repository'
APP_REPOSITORY = 'app-repository'
UNKNOWN_TAG = 'unknown-tag'
ANONYMOUS_NOT_ALLOWED = 'anonymous-not-allowed'
DISALLOWED_LIBRARY_NAMESPACE = 'disallowed-library-namespace'
MISSING_TAG = 'missing-tag'
INVALID_TAG = 'invalid-tag'
INVALID_IMAGES = 'invalid-images'
UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type'
INVALID_BLOB = 'invalid-blob'
NAMESPACE_DISABLED = 'namespace-disabled'
class ProtocolOptions(object):
def __init__(self):
self.munge_shas = False
self.scopes = None
self.cancel_blob_upload = False
self.manifest_invalid_blob_references = False
self.chunks_for_upload = None
self.skip_head_checks = False
self.manifest_content_type = None
@add_metaclass(ABCMeta)
class RegistryProtocol(object):
""" Interface for protocols. """
FAILURE_CODES = {}
@abstractmethod
def login(self, session, username, password, scopes, expect_success):
""" Performs the login flow with the given credentials, over the given scopes. """
@abstractmethod
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
""" Pulls the given tag via the given session, using the given credentials, and
ensures the given images match.
"""
@abstractmethod
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
""" Pushes the specified images as the given tag via the given session, using
the given credentials.
"""
def repo_name(self, namespace, repo_name):
if namespace:
return '%s/%s' % (namespace, repo_name)
return repo_name
def conduct(self, session, method, url, expected_status=200, params=None, data=None,
json_data=None, headers=None, auth=None):
if json_data is not None:
data = json.dumps(json_data)
headers = headers or {}
headers['Content-Type'] = 'application/json'
if isinstance(expected_status, tuple):
expected_status, expected_failure, protocol_step = expected_status
if expected_failure is not None:
failures = self.__class__.FAILURE_CODES.get(protocol_step, {})
expected_status = failures.get(expected_failure, expected_status)
result = session.request(method, url, params=params, data=data, headers=headers, auth=auth)
msg = "Expected response %s, got %s" % (expected_status, result.status_code)
assert result.status_code == expected_status, msg
return result