Merge pull request #2739 from coreos-inc/joseph.schorr/QUAY-664/verbs-interface-refactor

Refactor Verbs model definitions to match new style
This commit is contained in:
josephschorr 2017-06-29 10:07:36 +03:00 committed by GitHub
commit 0e702c72f2
19 changed files with 375 additions and 455 deletions

View file

@ -1,58 +1,10 @@
import datetime
import json
from contextlib import contextmanager
from data import model
from endpoints.test.shared import conduct_call
from endpoints.api import api
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
@contextmanager
def client_with_identity(auth_username, client):
with client.session_transaction() as sess:
if auth_username and auth_username is not None:
loaded = model.user.get_user(auth_username)
sess['user_id'] = loaded.uuid
sess['login_time'] = datetime.datetime.now()
sess[CSRF_TOKEN_KEY] = CSRF_TOKEN
else:
sess['user_id'] = 'anonymous'
yield client
with client.session_transaction() as sess:
sess['user_id'] = None
sess['login_time'] = None
sess[CSRF_TOKEN_KEY] = None
def add_csrf_param(params):
""" Returns a params dict with the CSRF parameter added. """
params = params or {}
params[CSRF_TOKEN_KEY] = CSRF_TOKEN
return params
def conduct_api_call(client, resource, method, params, body=None, expected_code=200):
""" Conducts an API call to the given resource via the given client, and ensures its returned
status matches the code given.
Returns the response.
"""
params = add_csrf_param(params)
final_url = api.url_for(resource, **params)
headers = {}
headers.update({"Content-Type": "application/json"})
if body is not None:
body = json.dumps(body)
rv = client.open(final_url, method=method, data=body, headers=headers)
msg = '%s %s: got %s expected: %s | %s' % (method, final_url, rv.status_code, expected_code,
rv.data)
assert rv.status_code == expected_code, msg
return rv
return conduct_call(client, resource, api.url_for, method, params, body, expected_code)

View file

@ -16,7 +16,8 @@ from endpoints.api.trigger import (BuildTriggerList, BuildTrigger, BuildTriggerS
BuildTriggerActivate, BuildTriggerAnalyze, ActivateBuildTrigger,
TriggerBuildList, BuildTriggerFieldValues, BuildTriggerSources,
BuildTriggerSourceNamespaces)
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from test.fixtures import *
BUILD_ARGS = {'build_uuid': '1234'}

View file

@ -2,8 +2,9 @@ import pytest
from data import model
from endpoints.api import api
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.organization import Organization
from endpoints.test.shared import client_with_identity
from test.fixtures import *
@pytest.mark.parametrize('expiration, expected_code', [

View file

@ -2,8 +2,9 @@ import pytest
from mock import patch, ANY, MagicMock
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.repository import RepositoryTrust, Repository
from endpoints.test.shared import client_with_identity
from features import FeatureNameValue
from test.fixtures import *
@ -52,8 +53,8 @@ def test_signing_disabled(client):
params = {'repository': 'devtable/simple'}
response = conduct_api_call(cl, Repository, 'GET', params).json
assert not response['trust_enabled']
def test_sni_support():
import ssl
assert ssl.HAS_SNI

View file

@ -4,7 +4,8 @@ from playhouse.test_utils import assert_query_count
from data.model import _basequery
from endpoints.api.search import ConductRepositorySearch, ConductSearch
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from test.fixtures import *
@pytest.mark.parametrize('query, expected_query_count', [

View file

@ -4,12 +4,13 @@ from flask_principal import AnonymousIdentity
from endpoints.api import api
from endpoints.api.repositorynotification import RepositoryNotification
from endpoints.api.team import OrganizationTeamSyncing
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.repository import RepositoryTrust
from endpoints.api.signing import RepositorySignatures
from endpoints.api.search import ConductRepositorySearch
from endpoints.api.superuser import SuperUserRepositoryBuildLogs, SuperUserRepositoryBuildResource
from endpoints.api.superuser import SuperUserRepositoryBuildStatus
from endpoints.test.shared import client_with_identity
from test.fixtures import *

View file

@ -3,8 +3,9 @@ import pytest
from collections import Counter
from mock import patch
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.signing import RepositorySignatures
from endpoints.test.shared import client_with_identity
from test.fixtures import *
@ -14,21 +15,21 @@ VALID_TARGETS_MAP = {
"latest": {
"hashes": {
"sha256": "2Q8GLEgX62VBWeL76axFuDj/Z1dd6Zhx0ZDM6kNwPkQ="
},
},
"length": 2111
}
},
},
"expiration": "2020-05-22T10:26:46.618176424-04:00"
},
},
"targets": {
"targets": {
"latest": {
"hashes": {
"sha256": "2Q8GLEgX62VBWeL76axFuDj/Z1dd6Zhx0ZDM6kNwPkQ="
},
},
"length": 2111
}
},
},
"expiration": "2020-05-22T10:26:01.953414888-04:00"}
}

View file

@ -2,8 +2,10 @@ import pytest
from mock import patch, Mock
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.tag import RepositoryTag, RestoreTag
from endpoints.test.shared import client_with_identity
from features import FeatureNameValue
from test.fixtures import *

View file

@ -4,9 +4,11 @@ from mock import patch
from data import model
from endpoints.api import api
from endpoints.api.test.shared import client_with_identity, conduct_api_call
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.team import OrganizationTeamSyncing, TeamMemberList
from endpoints.api.organization import Organization
from endpoints.test.shared import client_with_identity
from test.test_ldap import mock_ldap
from test.fixtures import *

View file

@ -5,7 +5,7 @@ from flask import url_for
from data import model
from endpoints.appr.registry import appr_bp, blobs
from endpoints.api.test.shared import client_with_identity
from endpoints.test.shared import client_with_identity
from test.fixtures import *
BLOB_ARGS = {'digest': 'abcd1235'}

View file

68
endpoints/test/shared.py Normal file
View file

@ -0,0 +1,68 @@
import datetime
import json
import base64
from contextlib import contextmanager
from data import model
from flask import g
from flask_principal import Identity
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
@contextmanager
def client_with_identity(auth_username, client):
with client.session_transaction() as sess:
if auth_username and auth_username is not None:
loaded = model.user.get_user(auth_username)
sess['user_id'] = loaded.uuid
sess['login_time'] = datetime.datetime.now()
sess[CSRF_TOKEN_KEY] = CSRF_TOKEN
else:
sess['user_id'] = 'anonymous'
yield client
with client.session_transaction() as sess:
sess['user_id'] = None
sess['login_time'] = None
sess[CSRF_TOKEN_KEY] = None
def add_csrf_param(params):
""" Returns a params dict with the CSRF parameter added. """
params = params or {}
if not CSRF_TOKEN_KEY in params:
params[CSRF_TOKEN_KEY] = CSRF_TOKEN
return params
def gen_basic_auth(username, password):
""" Generates a basic auth header. """
return 'Basic ' + base64.b64encode("%s:%s" % (username, password))
def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200,
headers=None):
""" Conducts a call to a Flask endpoint. """
params = add_csrf_param(params)
final_url = url_for(resource, **params)
headers = headers or {}
headers.update({"Content-Type": "application/json"})
if body is not None:
body = json.dumps(body)
# Required for anonymous calls to not exception.
g.identity = Identity(None, 'none')
rv = client.open(final_url, method=method, data=body, headers=headers)
msg = '%s %s: got %s expected: %s | %s' % (method, final_url, rv.status_code, expected_code,
rv.data)
assert rv.status_code == expected_code, msg
return rv

View file

@ -10,9 +10,9 @@ from auth.auth_context import get_authenticated_user
from auth.decorators import process_auth
from auth.permissions import ReadRepositoryPermission
from data import database
from data.interfaces.verbs import pre_oci_model as model
from endpoints.common import route_show_if, parse_repository_name
from endpoints.decorators import anon_protect
from endpoints.verbs.models_pre_oci import pre_oci_model as model
from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from image.appc import AppCImageFormatter
from image.docker.squashed import SquashedDockerImageFormatter
@ -22,16 +22,14 @@ from util.http import exact_abort
from util.registry.filelike import wrap_with_handler
from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess
from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename,
PieceHasher)
from util.registry.torrent import (
make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher)
logger = logging.getLogger(__name__)
verbs = Blueprint('verbs', __name__)
license_validator.enforce_license_before_request(verbs)
LAYER_MIMETYPE = 'binary/octet-stream'
@ -60,7 +58,8 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
yield current_image_stream
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, get_next_layer)
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image,
get_next_layer)
for handler_fn in handlers:
stream = wrap_with_handler(stream, handler_fn)
@ -89,6 +88,7 @@ def _write_derived_image_to_storage(verb, derived_image, queue_file):
""" Read from the generated stream and write it back to the storage engine. This method runs in a
separate process.
"""
def handle_exception(ex):
logger.debug('Exception when building %s derived image %s: %s', verb, derived_image.ref, ex)
@ -139,8 +139,9 @@ def _torrent_for_blob(blob, is_public):
torrent_file = make_torrent(name, webseed, blob.size, torrent_info.piece_length,
torrent_info.pieces)
headers = {'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
headers = {
'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
return make_response(torrent_file, 200, headers)
@ -158,8 +159,7 @@ def _torrent_repo_verb(repo_image, tag, verb, **kwargs):
abort(406)
# Return the torrent.
repo = model.get_repository(repo_image.repository.namespace_name,
repo_image.repository.name)
repo = model.get_repository(repo_image.repository.namespace_name, repo_image.repository.name)
repo_is_public = repo is not None and repo.is_public
torrent = _torrent_for_blob(derived_image.blob, repo_is_public)
@ -229,15 +229,14 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True])
# Lookup/create the derived image for the verb and repo image.
derived_image = model.lookup_or_create_derived_image(repo_image, verb,
storage.preferred_locations[0],
varying_metadata={'tag': tag})
derived_image = model.lookup_or_create_derived_image(
repo_image, verb, storage.preferred_locations[0], varying_metadata={'tag': tag})
if not derived_image.blob.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived_image.ref)
derived_layer_path = model.get_blob_path(derived_image.blob)
is_head_request = request.method == 'HEAD'
download_url = storage.get_direct_download_url(derived_image.blob.locations, derived_layer_path,
head=is_head_request)
download_url = storage.get_direct_download_url(derived_image.blob.locations,
derived_layer_path, head=is_head_request)
if download_url:
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived_image.ref)
return redirect(download_url)
@ -246,8 +245,9 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
database.close_db_filter(None)
logger.debug('Sending cached derived %s image %s', verb, derived_image.ref)
return send_file(storage.stream_read_file(derived_image.blob.locations, derived_layer_path),
mimetype=LAYER_MIMETYPE)
return send_file(
storage.stream_read_file(derived_image.blob.locations, derived_layer_path),
mimetype=LAYER_MIMETYPE)
logger.debug('Building and returning derived %s image %s', verb, derived_image.ref)
@ -270,9 +270,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
# and send the results to the client and storage.
handlers = [hasher.update]
args = (formatter, repo_image, tag, derived_image_id, handlers)
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_store_metadata_and_cleanup)
queue_process = QueueProcess(
_open_stream,
8 * 1024,
10 * 1024 * 1024, # 8K/10M chunk/max
args,
finished=_store_metadata_and_cleanup)
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
@ -336,11 +339,13 @@ def get_aci_signature(server, namespace, repository, tag, os, arch):
@route_show_if(features.ACI_CONVERSION)
@anon_protect
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=['GET', 'HEAD'])
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=[
'GET', 'HEAD'])
@process_auth
def get_aci_image(server, namespace, repository, tag, os, arch):
return _repo_verb(namespace, repository, tag, 'aci', AppCImageFormatter(),
sign=True, checker=os_arch_checker(os, arch), os=os, arch=arch)
return _repo_verb(namespace, repository, tag, 'aci',
AppCImageFormatter(), sign=True, checker=os_arch_checker(os, arch), os=os,
arch=arch)
@anon_protect

View file

@ -0,0 +1,154 @@
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from six import add_metaclass
class Repository(
namedtuple('Repository', ['id', 'name', 'namespace_name', 'description', 'is_public',
'kind'])):
"""
Repository represents a namespaced collection of tags.
:type id: int
:type name: string
:type namespace_name: string
:type description: string
:type is_public: bool
:type kind: string
"""
class DerivedImage(namedtuple('DerivedImage', ['ref', 'blob', 'internal_source_image_db_id'])):
"""
DerivedImage represents a user-facing alias for an image which was derived from another image.
"""
class RepositoryReference(namedtuple('RepositoryReference', ['id', 'name', 'namespace_name'])):
"""
RepositoryReference represents a reference to a Repository, without its full metadata.
"""
class ImageWithBlob(
namedtuple('Image', [
'image_id', 'blob', 'compat_metadata', 'repository', 'internal_db_id', 'v1_metadata'])):
"""
ImageWithBlob represents a user-facing alias for referencing an image, along with its blob.
"""
class Blob(namedtuple('Blob', ['uuid', 'size', 'uncompressed_size', 'uploading', 'locations'])):
"""
Blob represents an opaque binary blob saved to the storage system.
"""
class TorrentInfo(namedtuple('TorrentInfo', ['piece_length', 'pieces'])):
"""
TorrentInfo represents the torrent piece information associated with a blob.
"""
@add_metaclass(ABCMeta)
class VerbsDataInterface(object):
"""
Interface that represents all data store interactions required by the registry's custom HTTP
verbs.
"""
@abstractmethod
def get_repository(self, namespace_name, repo_name):
"""
Returns a repository tuple for the repository with the given name under the given namespace.
Returns None if no such repository was found.
"""
pass
@abstractmethod
def get_manifest_layers_with_blobs(self, repo_image):
"""
Returns the full set of manifest layers and their associated blobs starting at the given
repository image and working upwards to the root image.
"""
pass
@abstractmethod
def get_blob_path(self, blob):
"""
Returns the storage path for the given blob.
"""
pass
@abstractmethod
def get_derived_image_signature(self, derived_image, signer_name):
"""
Returns the signature associated with the derived image and a specific signer or None if none.
"""
pass
@abstractmethod
def set_derived_image_signature(self, derived_image, signer_name, signature):
"""
Sets the calculated signature for the given derived image and signer to that specified.
"""
pass
@abstractmethod
def delete_derived_image(self, derived_image):
"""
Deletes a derived image and all of its storage.
"""
pass
@abstractmethod
def set_blob_size(self, blob, size):
"""
Sets the size field on a blob to the value specified.
"""
pass
@abstractmethod
def get_repo_blob_by_digest(self, namespace_name, repo_name, digest):
"""
Returns the blob with the given digest under the matching repository or None if none.
"""
pass
@abstractmethod
def get_torrent_info(self, blob):
"""
Returns the torrent information associated with the given blob or None if none.
"""
pass
@abstractmethod
def set_torrent_info(self, blob, piece_length, pieces):
"""
Sets the torrent infomation associated with the given blob to that specified.
"""
pass
@abstractmethod
def lookup_derived_image(self, repo_image, verb, varying_metadata=None):
"""
Looks up the derived image for the given repository image, verb and optional varying metadata
and returns it or None if none.
"""
pass
@abstractmethod
def lookup_or_create_derived_image(self, repo_image, verb, location, varying_metadata=None):
"""
Looks up the derived image for the given repository image, verb and optional varying metadata
and returns it. If none exists, a new derived image is created.
"""
pass
@abstractmethod
def get_tag_image(self, namespace_name, repo_name, tag_name):
"""
Returns the image associated with the live tag with the given name under the matching repository
or None if none.
"""
pass

View file

@ -0,0 +1,203 @@
import json
from data import model
from image.docker.v1 import DockerV1Metadata
from endpoints.verbs.models_interface import (
Blob,
DerivedImage,
ImageWithBlob,
Repository,
RepositoryReference,
TorrentInfo,
VerbsDataInterface,)
class PreOCIModel(VerbsDataInterface):
"""
PreOCIModel implements the data model for the registry's custom HTTP verbs using a database schema
before it was changed to support the OCI specification.
"""
def get_repository(self, namespace_name, repo_name):
repo = model.repository.get_repository(namespace_name, repo_name)
if repo is None:
return None
return _repository_for_repo(repo)
def get_manifest_layers_with_blobs(self, repo_image):
repo_image_record = model.image.get_image_by_id(
repo_image.repository.namespace_name, repo_image.repository.name, repo_image.image_id)
parents = model.image.get_parent_images_with_placements(
repo_image.repository.namespace_name, repo_image.repository.name, repo_image_record)
yield repo_image
for parent in parents:
metadata = {}
try:
metadata = json.loads(parent.v1_json_metadata)
except ValueError:
pass
yield ImageWithBlob(
image_id=parent.docker_image_id,
blob=_blob(parent.storage),
repository=repo_image.repository,
compat_metadata=metadata,
v1_metadata=_docker_v1_metadata(repo_image.repository.namespace_name,
repo_image.repository.name, parent),
internal_db_id=parent.id,)
def get_derived_image_signature(self, derived_image, signer_name):
storage = model.storage.get_storage_by_uuid(derived_image.blob.uuid)
signature_entry = model.storage.lookup_storage_signature(storage, signer_name)
if signature_entry is None:
return None
return signature_entry.signature
def set_derived_image_signature(self, derived_image, signer_name, signature):
storage = model.storage.get_storage_by_uuid(derived_image.blob.uuid)
signature_entry = model.storage.find_or_create_storage_signature(storage, signer_name)
signature_entry.signature = signature
signature_entry.uploading = False
signature_entry.save()
def delete_derived_image(self, derived_image):
model.image.delete_derived_storage_by_uuid(derived_image.blob.uuid)
def set_blob_size(self, blob, size):
storage_entry = model.storage.get_storage_by_uuid(blob.uuid)
storage_entry.image_size = size
storage_entry.uploading = False
storage_entry.save()
def get_blob_path(self, blob):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
return model.storage.get_layer_path(blob_record)
def get_repo_blob_by_digest(self, namespace_name, repo_name, digest):
try:
blob_record = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
except model.BlobDoesNotExist:
return None
return _blob(blob_record)
def get_torrent_info(self, blob):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
try:
torrent_info = model.storage.get_torrent_info(blob_record)
except model.TorrentInfoDoesNotExist:
return None
return TorrentInfo(
pieces=torrent_info.pieces,
piece_length=torrent_info.piece_length,)
def set_torrent_info(self, blob, piece_length, pieces):
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
model.storage.save_torrent_info(blob_record, piece_length, pieces)
def lookup_derived_image(self, repo_image, verb, varying_metadata=None):
blob_record = model.image.find_derived_storage_for_image(repo_image.internal_db_id, verb,
varying_metadata)
if blob_record is None:
return None
return _derived_image(blob_record, repo_image)
def lookup_or_create_derived_image(self, repo_image, verb, location, varying_metadata=None):
blob_record = model.image.find_or_create_derived_storage(repo_image.internal_db_id, verb,
location, varying_metadata)
return _derived_image(blob_record, repo_image)
def get_tag_image(self, namespace_name, repo_name, tag_name):
try:
found = model.tag.get_tag_image(namespace_name, repo_name, tag_name, include_storage=True)
except model.DataModelException:
return None
metadata = {}
try:
metadata = json.loads(found.v1_json_metadata)
except ValueError:
pass
return ImageWithBlob(
image_id=found.docker_image_id,
blob=_blob(found.storage),
repository=RepositoryReference(
namespace_name=namespace_name,
name=repo_name,
id=found.repository_id,),
compat_metadata=metadata,
v1_metadata=_docker_v1_metadata(namespace_name, repo_name, found),
internal_db_id=found.id,)
pre_oci_model = PreOCIModel()
def _docker_v1_metadata(namespace_name, repo_name, repo_image):
"""
Returns a DockerV1Metadata object for the given Pre-OCI repo_image under the
repository with the given namespace and name. Note that the namespace and
name are passed here as an optimization, and are *not checked* against the
image.
"""
return DockerV1Metadata(
namespace_name=namespace_name,
repo_name=repo_name,
image_id=repo_image.docker_image_id,
checksum=repo_image.v1_checksum,
compat_json=repo_image.v1_json_metadata,
created=repo_image.created,
comment=repo_image.comment,
command=repo_image.command,
# Note: These are not needed in verbs and are expensive to load, so we just skip them.
content_checksum=None,
parent_image_id=None,)
def _derived_image(blob_record, repo_image):
"""
Returns a DerivedImage object for the given Pre-OCI data model blob and repo_image instance.
"""
return DerivedImage(
ref=repo_image.internal_db_id,
blob=_blob(blob_record),
internal_source_image_db_id=repo_image.internal_db_id,)
def _blob(blob_record):
"""
Returns a Blob object for the given Pre-OCI data model blob instance.
"""
if hasattr(blob_record, 'locations'):
locations = blob_record.locations
else:
locations = model.storage.get_storage_locations(blob_record.uuid)
return Blob(
uuid=blob_record.uuid,
size=blob_record.image_size,
uncompressed_size=blob_record.uncompressed_size,
uploading=blob_record.uploading,
locations=locations,)
def _repository_for_repo(repo):
""" Returns a Repository object representing the Pre-OCI data model repo instance given. """
return Repository(
id=repo.id,
name=repo.name,
namespace_name=repo.namespace_user.username,
description=repo.description,
is_public=model.repository.is_repository_public(repo),
kind=model.repository.get_repo_kind_name(repo),)

View file

@ -0,0 +1,74 @@
import pytest
from flask import url_for
from endpoints.test.shared import conduct_call, gen_basic_auth
from test.fixtures import *
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
CREATOR_ACCESS_USER = 'creator'
PUBLIC_REPO = 'public/publicrepo'
PRIVATE_REPO = 'devtable/shared'
ORG_REPO = 'buynlarge/orgrepo'
ANOTHER_ORG_REPO = 'buynlarge/anotherorgrepo'
ACI_ARGS = {
'server': 'someserver',
'tag': 'fake',
'os': 'linux',
'arch': 'x64',}
@pytest.mark.parametrize('user', [
(0, None),
(1, NO_ACCESS_USER),
(2, READ_ACCESS_USER),
(3, CREATOR_ACCESS_USER),
(4, ADMIN_ACCESS_USER),])
@pytest.mark.parametrize(
'endpoint,method,repository,single_repo_path,params,expected_statuses',
[
('get_aci_signature', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)),
('get_aci_signature', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_signature', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_signature', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)),
# get_aci_image
('get_aci_image', 'GET', PUBLIC_REPO, False, ACI_ARGS, (404, 404, 404, 404, 404)),
('get_aci_image', 'GET', PRIVATE_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_image', 'GET', ORG_REPO, False, ACI_ARGS, (403, 403, 404, 403, 404)),
('get_aci_image', 'GET', ANOTHER_ORG_REPO, False, ACI_ARGS, (403, 403, 403, 403, 404)),
# get_squashed_tag
('get_squashed_tag', 'GET', PUBLIC_REPO, False, dict(tag='fake'), (404, 404, 404, 404, 404)),
('get_squashed_tag', 'GET', PRIVATE_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)),
('get_squashed_tag', 'GET', ORG_REPO, False, dict(tag='fake'), (403, 403, 404, 403, 404)),
('get_squashed_tag', 'GET', ANOTHER_ORG_REPO, False, dict(tag='fake'), (403, 403, 403, 403,
404)),
# get_tag_torrent
('get_tag_torrent', 'GET', PUBLIC_REPO, True, dict(digest='sha256:1234'), (404, 404, 404, 404,
404)),
('get_tag_torrent', 'GET', PRIVATE_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403,
404)),
('get_tag_torrent', 'GET', ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 404, 403,
404)),
('get_tag_torrent', 'GET', ANOTHER_ORG_REPO, True, dict(digest='sha256:1234'), (403, 403, 403,
403, 404)),])
def test_verbs_security(user, endpoint, method, repository, single_repo_path, params,
expected_statuses, app, client):
headers = {}
if user[1] is not None:
headers['Authorization'] = gen_basic_auth(user[1], 'password')
if single_repo_path:
params['repository'] = repository
else:
(namespace, repo_name) = repository.split('/')
params['namespace'] = namespace
params['repository'] = repo_name
conduct_call(client, 'verbs.' + endpoint, url_for, method, params,
expected_code=expected_statuses[user[0]], headers=headers)