Fix issue with Docker 1.8.3 and pulling public repos with no creds

We now return the valid subset of auth scopes requested.

Adds a test for this case and adds testing of all returned JWTs in the V2 login tests
This commit is contained in:
Joseph Schorr 2016-01-22 16:49:32 -05:00
parent 566a91f003
commit 8cd38569d6
5 changed files with 202 additions and 148 deletions

View file

@ -9,7 +9,7 @@ import features
from app import metric_queue
from endpoints.decorators import anon_protect, anon_allowed
from endpoints.v2.errors import V2RegistryException
from endpoints.v2.errors import V2RegistryException, Unauthorized
from auth.auth_context import get_grant_context
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
AdministerRepositoryPermission)
@ -45,7 +45,7 @@ def _require_repo_permission(permission_class, allow_public=False):
(allow_public and
model.repository.repository_is_public(namespace, repo_name))):
return func(namespace, repo_name, *args, **kwargs)
raise abort(401)
raise Unauthorized()
return wrapped
return wrapper

View file

@ -61,6 +61,12 @@ def generate_registry_jwt():
oauthtoken = get_validated_oauth_token()
logger.debug('Authenticated OAuth token: %s', oauthtoken)
auth_credentials_sent = bool(request.headers.get('authorization', ''))
if auth_credentials_sent and not user and not token:
# The auth credentials sent for the user are invalid.
logger.debug('Invalid auth credentials')
abort(401)
access = []
user_event_data = {
'action': 'login',
@ -84,7 +90,7 @@ def generate_registry_jwt():
# Ensure that we are never creating an invalid repository.
if not REPOSITORY_NAME_REGEX.match(reponame):
logger.debug('Found invalid repository name in auth flow: %v', reponame)
logger.debug('Found invalid repository name in auth flow: %s', reponame)
abort(400)
final_actions = []
@ -92,38 +98,30 @@ def generate_registry_jwt():
if 'push' in actions:
# If there is no valid user or token, then the repository cannot be
# accessed.
if user is None and token is None:
logger.debug('No user and no token for requested "push" scope')
abort(401)
# Lookup the repository. If it exists, make sure the entity has modify
# permission. Otherwise, make sure the entity has create permission.
repo = model.repository.get_repository(namespace, reponame)
if repo:
if not ModifyRepositoryPermission(namespace, reponame).can():
logger.debug('No permission to modify repository %v/%v', namespace, reponame)
abort(403)
else:
if not CreateRepositoryPermission(namespace).can() or user is None:
logger.debug('No permission to create repository %v/%v', namespace, reponame)
abort(403)
logger.debug('Creating repository: %s/%s', namespace, reponame)
model.repository.create_repository(namespace, reponame, user)
final_actions.append('push')
if user is not None or token is not None:
# Lookup the repository. If it exists, make sure the entity has modify
# permission. Otherwise, make sure the entity has create permission.
repo = model.repository.get_repository(namespace, reponame)
if repo:
if ModifyRepositoryPermission(namespace, reponame).can():
final_actions.append('push')
else:
logger.debug('No permission to modify repository %s/%s', namespace, reponame)
else:
if CreateRepositoryPermission(namespace).can() and user is not None:
logger.debug('Creating repository: %s/%s', namespace, reponame)
model.repository.create_repository(namespace, reponame, user)
final_actions.append('push')
else:
logger.debug('No permission to create repository %s/%s', namespace, reponame)
if 'pull' in actions:
# Grant pull if the user can read the repo or it is public. We also
# grant it if the user already has push, as they can clearly change
# the repository.
# Grant pull if the user can read the repo or it is public.
if (ReadRepositoryPermission(namespace, reponame).can() or
model.repository.repository_is_public(namespace, reponame) or
'push' in final_actions):
model.repository.repository_is_public(namespace, reponame)):
final_actions.append('pull')
else:
logger.debug('No permission to pull repository %v/%v', namespace, reponame)
abort(403)
logger.debug('No permission to pull repository %s/%s', namespace, reponame)
# Add the access for the JWT.
access.append({
@ -137,6 +135,8 @@ def generate_registry_jwt():
user_action = 'push_start'
elif 'pull' in final_actions:
user_action = 'pull_start'
else:
user_action = 'login'
user_event_data = {
'action': user_action,

View file

@ -8,9 +8,12 @@ import resumablehashlib
import binascii
import Crypto.Random
from cachetools import lru_cache
from flask import request, jsonify
from flask.blueprints import Blueprint
from flask.ext.testing import LiveServerTestCase
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from app import app, storage
from data.database import close_db_filter, configure
@ -24,6 +27,7 @@ from initdb import wipe_database, initialize_database, populate_database
from endpoints.csrf import generate_csrf_token
from tempfile import NamedTemporaryFile
from jsonschema import validate as validate_schema
from util.security import strictjwt
import endpoints.decorated
import json
@ -137,6 +141,23 @@ _PORT_NUMBER = 5001
_CLEAN_DATABASE_PATH = None
_JWK = RSAKey(key=RSA.generate(2048))
class FailureCodes:
""" Defines tuples representing the HTTP status codes for various errors. The tuple
is defined as ('errordescription', V1HTTPStatusCode, V2HTTPStatusCode). """
UNAUTHENTICATED = ('unauthenticated', 401, 401)
UNAUTHORIZED = ('unauthorized', 403, 401)
INVALID_REGISTRY = ('invalidregistry', 404, 404)
DOES_NOT_EXIST = ('doesnotexist', 404, 404)
INVALID_REQUEST = ('invalidrequest', 400, 400)
def _get_expected_code(expected_failure, version, success_status_code):
""" Returns the HTTP status code for the expected failure under the specified protocol version
(1 or 2). If none, returns the success status code. """
if not expected_failure:
return success_status_code
return expected_failure[version]
def _get_repo_name(namespace, name):
if namespace == '':
@ -290,7 +311,7 @@ class V1RegistryMixin(BaseRegistryMixin):
class V1RegistryPushMixin(V1RegistryMixin):
def do_push(self, namespace, repository, username, password, images=None, expected_code=201):
def do_push(self, namespace, repository, username, password, images=None, expect_failure=None):
images = images or self._get_default_images()
auth = (username, password)
repo_name = _get_repo_name(namespace, repository)
@ -299,6 +320,7 @@ class V1RegistryPushMixin(V1RegistryMixin):
self.v1_ping()
# PUT /v1/repositories/{namespace}/{repository}/
expected_code = _get_expected_code(expect_failure, 1, 201)
self.conduct('PUT', '/v1/repositories/%s' % repo_name,
data=json.dumps(images), auth=auth,
expected_code=expected_code)
@ -341,7 +363,7 @@ class V1RegistryPushMixin(V1RegistryMixin):
class V1RegistryPullMixin(V1RegistryMixin):
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200,
def do_pull(self, namespace, repository, username=None, password='password', expect_failure=None,
images=None):
images = images or self._get_default_images()
repo_name = _get_repo_name(namespace, repository)
@ -356,6 +378,7 @@ class V1RegistryPullMixin(V1RegistryMixin):
prefix = '/v1/repositories/%s/' % repo_name
# GET /v1/repositories/{namespace}/{repository}/
expected_code = _get_expected_code(expect_failure, 1, 200)
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
if expected_code != 200:
return
@ -427,7 +450,10 @@ class V2RegistryMixin(BaseRegistryMixin):
def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]):
auth = (username, password)
auth = None
if username and password:
auth = (username, password)
repo_name = _get_repo_name(namespace, repository)
params = {
@ -436,7 +462,7 @@ class V2RegistryMixin(BaseRegistryMixin):
'service': app.config['SERVER_HOSTNAME'],
}
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
response = self.conduct('GET', '/v2/auth', params=params, auth=auth,
expected_code=expected_code)
if expected_code == 200:
@ -449,18 +475,21 @@ class V2RegistryMixin(BaseRegistryMixin):
class V2RegistryPushMixin(V2RegistryMixin):
def do_push(self, namespace, repository, username, password, images=None, tag_name=None,
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200,
scopes=None):
cancel=False, invalid=False, expect_failure=None, scopes=None):
images = images or self._get_default_images()
repo_name = _get_repo_name(namespace, repository)
# Ping!
self.v2_ping()
# Auth.
# Auth. If the expected failure is an invalid registry, in V2 we'll receive that error from
# the auth endpoint first, rather than just the V2 requests below.
expected_auth_code = 200
if expect_failure == FailureCodes.INVALID_REGISTRY:
expected_auth_code = 400
self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'],
expected_code=expected_auth_code)
if expected_auth_code != 200:
return
@ -478,6 +507,8 @@ class V2RegistryPushMixin(V2RegistryMixin):
builder.add_layer(checksum, json.dumps(image_data))
expected_code = _get_expected_code(expect_failure, 2, 404)
# Build the manifest.
manifest = builder.build(_JWK)
@ -493,6 +524,11 @@ class V2RegistryPushMixin(V2RegistryMixin):
self.conduct('HEAD', '/v2/%s/blobs/%s' % (repo_name, checksum),
expected_code=404, auth='jwt')
# If we expected a non-404 status code, then the HEAD operation has failed and we cannot
# continue performing the push.
if expected_code != 404:
return
# Start a new upload of the layer data.
response = self.conduct('POST', '/v2/%s/blobs/uploads/' % repo_name,
expected_code=202, auth='jwt')
@ -548,8 +584,9 @@ class V2RegistryPushMixin(V2RegistryMixin):
self.assertEquals(response.headers['Docker-Content-Digest'], checksum)
self.assertEquals(response.headers['Content-Length'], str(len(layer_bytes)))
# Write the manifest.
put_code = 404 if invalid else expected_manifest_code
# Write the manifest. If we expect it to be invalid, we expect a 404 code. Otherwise, we expect
# a 202 response for success.
put_code = 404 if invalid else 202
self.conduct('PUT', '/v2/%s/manifests/%s' % (repo_name, tag_name),
data=manifest.bytes, expected_code=put_code,
headers={'Content-Type': 'application/json'}, auth='jwt')
@ -558,25 +595,32 @@ class V2RegistryPushMixin(V2RegistryMixin):
class V2RegistryPullMixin(V2RegistryMixin):
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200,
manifest_id=None, expected_manifest_code=200, images=None):
def do_pull(self, namespace, repository, username=None, password='password', expect_failure=None,
manifest_id=None, images=None):
images = images or self._get_default_images()
repo_name = _get_repo_name(namespace, repository)
# Ping!
self.v2_ping()
# Auth.
# Auth. If the failure expected is unauthenticated, then the auth endpoint will 401 before
# we reach any of the registry operations.
expected_auth_code = 200
if expect_failure == FailureCodes.UNAUTHENTICATED:
expected_auth_code = 401
self.do_auth(username, password, namespace, repository, scopes=['pull'],
expected_code=expected_code)
if expected_code != 200:
expected_code=expected_auth_code)
if expected_auth_code != 200:
return
# Retrieve the manifest for the tag or digest.
manifest_id = manifest_id or 'latest'
expected_code = _get_expected_code(expect_failure, 2, 200)
response = self.conduct('GET', '/v2/%s/manifests/%s' % (repo_name, manifest_id),
auth='jwt', expected_code=expected_manifest_code)
if expected_manifest_code != 200:
auth='jwt', expected_code=expected_code)
if expected_code != 200:
return
manifest_data = json.loads(response.text)
@ -621,20 +665,23 @@ class V1RegistryLoginMixin(object):
class V2RegistryLoginMixin(object):
def do_login(self, username, password, scope, expect_success=True, expected_code=None):
def do_login(self, username, password, scope, expect_success=True):
params = {
'account': username,
'scope': scope,
'service': app.config['SERVER_HOSTNAME'],
}
if expected_code is None:
if expect_success:
expected_code = 200
else:
expected_code = 403
if expect_success:
expected_code = 200
else:
expected_code = 401
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
auth = None
if username and password:
auth = (username, password)
response = self.conduct('GET', '/v2/auth', params=params, auth=auth,
expected_code=expected_code)
return response
@ -739,7 +786,9 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
self.do_pull('public', 'newrepo', expected_code=403)
self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHORIZED)
self.do_pull('public', 'newrepo', 'devtable', 'password',
expect_failure=FailureCodes.UNAUTHORIZED)
# Make the repository public.
self.conduct_api_login('public', 'password')
@ -757,7 +806,8 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
self.do_pull('public', 'newrepo', 'devtable', 'password',
expect_failure=FailureCodes.UNAUTHORIZED)
# Make the repository public.
self.conduct_api_login('public', 'password')
@ -775,7 +825,8 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo as public, which should fail as it belongs
# to devtable.
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
self.do_pull('devtable', 'newrepo', 'public', 'password',
expect_failure=FailureCodes.UNAUTHORIZED)
# Pull the repository as devtable, which should succeed because the repository is owned by
# devtable.
@ -791,7 +842,8 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
self.do_pull('public', 'newrepo', 'devtable', 'password',
expect_failure=FailureCodes.UNAUTHORIZED)
# Make the repository public.
self.conduct_api_login('public', 'password')
@ -811,7 +863,8 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
self.do_pull('public', 'newrepo', 'devtable', 'password',
expect_failure=FailureCodes.UNAUTHORIZED)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
@ -826,7 +879,7 @@ class RegistryTestsMixin(object):
# First try to pull the (currently private) repo as anonymous, which should fail as it
# is private.
self.do_pull('public', 'newrepo', expected_code=401)
self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHENTICATED)
# Make the repository public.
self.conduct_api_login('public', 'password')
@ -835,7 +888,7 @@ class RegistryTestsMixin(object):
# Try again to pull the (currently public) repo as anonymous, which should fail as
# anonymous access is disabled.
self.do_pull('public', 'newrepo', expected_code=401)
self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHENTICATED)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
@ -898,7 +951,8 @@ class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMix
'id': 'onlyimagehere',
'contents': 'somecontents',
}]
self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expected_code=404)
self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images,
expect_failure=FailureCodes.INVALID_REGISTRY)
def test_push_unicode_metadata(self):
self.conduct_api_login('devtable', 'password')
@ -948,7 +1002,7 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
# Ensure the tag no longer exists.
self.do_pull('devtable', 'newrepo', 'devtable', 'password',
expected_manifest_code=404)
expect_failure=FailureCodes.DOES_NOT_EXIST)
def test_push_only_push_scope(self):
images = [{
@ -959,15 +1013,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images,
scopes=['push'])
def test_attempt_push_only_push_scope(self):
images = [{
'id': 'onlyimagehere',
'contents': 'foobar',
}]
self.do_push('public', 'somenewrepo', 'devtable', 'password', images,
scopes=['push'], expected_auth_code=403)
def test_push_reponame_with_slashes(self):
# Attempt to add a repository name with slashes. This should fail as we do not support it.
images = [{
@ -976,7 +1021,7 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
}]
self.do_push('public', 'newrepo/somesubrepo', 'devtable', 'password', images,
expected_auth_code=400)
expect_failure=FailureCodes.INVALID_REGISTRY)
def test_invalid_push(self):
self.do_push('devtable', 'newrepo', 'devtable', 'password', invalid=True)
@ -999,7 +1044,7 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
# Attempt to pull the invalid tag.
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid',
expected_manifest_code=404)
expect_failure=FailureCodes.INVALID_REGISTRY)
def test_partial_upload_below_5mb(self):
@ -1097,7 +1142,7 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images,
expected_manifest_code=400)
expect_failure=FailureCodes.INVALID_REQUEST)
def test_multiple_layers(self):
# Push a manifest with multiple layers.
@ -1116,7 +1161,8 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
def test_invalid_regname(self):
self.do_push('devtable', 'this/is/a/repo', 'devtable', 'password', expected_auth_code=400)
self.do_push('devtable', 'this/is/a/repo', 'devtable', 'password',
expect_failure=FailureCodes.INVALID_REGISTRY)
def test_multiple_tags(self):
latest_images = [
@ -1432,49 +1478,108 @@ class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base
class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase):
""" Tests for V2 login. """
@staticmethod
@lru_cache(maxsize=1)
def load_public_key(certificate_file_path):
with open(certificate_file_path) as cert_file:
cert_obj = load_pem_x509_certificate(cert_file.read(), default_backend())
return cert_obj.public_key()
def do_logincheck(self, username, password, scope, expected_actions=[], expect_success=True):
response = self.do_login(username, password, scope, expect_success=expect_success)
if not expect_success:
return
# Validate the returned JWT.
encoded = response.json()['token']
expected_issuer = app.config['JWT_AUTH_TOKEN_ISSUER']
audience = app.config['SERVER_HOSTNAME']
max_signed_s = app.config.get('JWT_AUTH_MAX_FRESH_S', 3660)
certificate_file_path = app.config['JWT_AUTH_CERTIFICATE_PATH']
public_key = V2LoginTests.load_public_key(certificate_file_path)
max_exp = strictjwt.exp_max_s_option(max_signed_s)
payload = strictjwt.decode(encoded, public_key, algorithms=['RS256'], audience=audience,
issuer=expected_issuer, options=max_exp)
if scope is None:
self.assertEquals(0, len(payload['access']))
else:
self.assertEquals(1, len(payload['access']))
self.assertEquals(payload['access'][0]['actions'], expected_actions)
def test_nouser_noscope(self):
self.do_login('', '', expected_code=401, scope='')
self.do_logincheck('', '', expect_success=False, scope=None)
def test_validuser_unknownrepo(self):
self.do_login('devtable', 'password', expect_success=False,
scope='repository:invalidnamespace/simple:pull')
self.do_logincheck('devtable', 'password', expect_success=True,
scope='repository:invalidnamespace/simple:pull',
expected_actions=[])
def test_validuser_unknownnamespacerepo(self):
self.do_login('devtable', 'password', expect_success=True,
scope='repository:devtable/newrepo:push')
self.do_logincheck('devtable', 'password', expect_success=True,
scope='repository:devtable/newrepo:push',
expected_actions=['push'])
def test_validuser_noaccess(self):
self.do_login('public', 'password', expect_success=False,
scope='repository:devtable/simple:pull')
self.do_logincheck('public', 'password', expect_success=True,
scope='repository:devtable/simple:pull',
expected_actions=[])
def test_validuser_noscope(self):
self.do_login('public', 'password', expect_success=True, scope=None)
self.do_logincheck('public', 'password', expect_success=True, scope=None)
def test_invaliduser_noscope(self):
self.do_login('invaliduser', 'invalidpass', expected_code=401, scope=None)
self.do_logincheck('invaliduser', 'invalidpass', expect_success=False, scope=None)
def test_invalidpassword_noscope(self):
self.do_login('public', 'invalidpass', expected_code=401, scope=None)
self.do_logincheck('public', 'invalidpass', expect_success=False, scope=None)
def test_oauth_noaccess(self):
self.do_login('$oauthtoken', 'test', expect_success=False,
scope='repository:public/publicrepo:pull,push')
self.do_logincheck('$oauthtoken', 'test', expect_success=True,
scope='repository:freshuser/unknownrepo:pull,push',
expected_actions=[])
def test_oauth_public(self):
self.do_logincheck('$oauthtoken', 'test', expect_success=True,
scope='repository:public/publicrepo:pull,push',
expected_actions=['pull'])
def test_nouser_pull_publicrepo(self):
self.do_login('', '', expect_success=True, scope='repository:public/publicrepo:pull')
self.do_logincheck('', '', expect_success=True, scope='repository:public/publicrepo:pull',
expected_actions=['pull'])
def test_nouser_push_publicrepo(self):
self.do_login('', '', expected_code=401, scope='repository:public/publicrepo:push')
self.do_logincheck('', '', expect_success=True, scope='repository:public/publicrepo:push',
expected_actions=[])
def test_library_invaliduser(self):
self.do_login('invaliduser', 'password', expected_code=401, scope='repository:librepo:pull,push')
self.do_logincheck('invaliduser', 'password', expect_success=False,
scope='repository:librepo:pull,push')
def test_library_noaccess(self):
self.do_login('freshuser', 'password', expected_code=403, scope='repository:librepo:pull,push')
self.do_logincheck('freshuser', 'password', expect_success=True,
scope='repository:librepo:pull,push',
expected_actions=[])
def test_library_access(self):
self.do_login('devtable', 'password', expect_success=200, scope='repository:librepo:pull,push')
self.do_logincheck('devtable', 'password', expect_success=True,
scope='repository:librepo:pull,push',
expected_actions=['push', 'pull'])
def test_nouser_pushpull_publicrepo(self):
# Note: Docker 1.8.3 will ask for both push and pull scopes at all times. For public pulls
# with no credentials, we were returning a 401. This test makes sure we get back just a pull
# token.
self.do_logincheck('', '', expect_success=True,
scope='repository:public/publicrepo:pull,push',
expected_actions=['pull'])
if __name__ == '__main__':

View file

@ -253,21 +253,11 @@ class IndexV2TestSpec(object):
self.kwargs = kwargs
self.auth_no_access_code = 403
self.auth_read_code = 403
self.auth_admin_code = 403
self.anon_code = 401
self.no_access_code = 403
self.read_code = 200
self.admin_code = 200
def auth_status(self, auth_no_access_code=403, auth_read_code=200, auth_admin_code=200):
self.auth_no_access_code = auth_no_access_code
self.auth_read_code = auth_read_code
self.auth_admin_code = auth_admin_code
return self
def request_status(self, anon_code=401, no_access_code=403, read_code=200, admin_code=200):
self.anon_code = anon_code
self.no_access_code = no_access_code
@ -290,170 +280,131 @@ def build_v2_index_specs():
return [
# v2.list_all_tags
IndexV2TestSpec('v2.list_all_tags', 'GET', PUBLIC_REPO).
auth_status(200, 200, 200).
request_status(200, 200, 200, 200),
IndexV2TestSpec('v2.list_all_tags', 'GET', PRIVATE_REPO).
auth_status(403, 200, 200).
request_status(401, 401, 200, 200),
IndexV2TestSpec('v2.list_all_tags', 'GET', ORG_REPO).
auth_status(403, 200, 200).
request_status(401, 401, 200, 200),
# v2.fetch_manifest_by_tagname
IndexV2TestSpec('v2.fetch_manifest_by_tagname', 'GET', PUBLIC_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(200, 200, 200).
request_status(404, 404, 404, 404),
IndexV2TestSpec('v2.fetch_manifest_by_tagname', 'GET', PRIVATE_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
IndexV2TestSpec('v2.fetch_manifest_by_tagname', 'GET', ORG_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
# v2.fetch_manifest_by_digest
IndexV2TestSpec('v2.fetch_manifest_by_digest', 'GET', PUBLIC_REPO, manifest_ref=FAKE_DIGEST).
auth_status(200, 200, 200).
request_status(404, 404, 404, 404),
IndexV2TestSpec('v2.fetch_manifest_by_digest', 'GET', PRIVATE_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
IndexV2TestSpec('v2.fetch_manifest_by_digest', 'GET', ORG_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
# v2.write_manifest_by_tagname
IndexV2TestSpec('v2.write_manifest_by_tagname', 'PUT', PUBLIC_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.write_manifest_by_tagname', 'PUT', PRIVATE_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
IndexV2TestSpec('v2.write_manifest_by_tagname', 'PUT', ORG_REPO, manifest_ref=FAKE_MANIFEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
# v2.write_manifest_by_digest
IndexV2TestSpec('v2.write_manifest_by_digest', 'PUT', PUBLIC_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.write_manifest_by_digest', 'PUT', PRIVATE_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
IndexV2TestSpec('v2.write_manifest_by_digest', 'PUT', ORG_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
# v2.delete_manifest_by_digest
IndexV2TestSpec('v2.delete_manifest_by_digest', 'DELETE', PUBLIC_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.delete_manifest_by_digest', 'DELETE', PRIVATE_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
IndexV2TestSpec('v2.delete_manifest_by_digest', 'DELETE', ORG_REPO, manifest_ref=FAKE_DIGEST).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
# v2.check_blob_exists
IndexV2TestSpec('v2.check_blob_exists', 'HEAD', PUBLIC_REPO, digest=FAKE_DIGEST).
auth_status(200, 200, 200).
request_status(404, 404, 404, 404),
IndexV2TestSpec('v2.check_blob_exists', 'HEAD', PRIVATE_REPO, digest=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
IndexV2TestSpec('v2.check_blob_exists', 'HEAD', ORG_REPO, digest=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
# v2.download_blob
IndexV2TestSpec('v2.download_blob', 'GET', PUBLIC_REPO, digest=FAKE_DIGEST).
auth_status(200, 200, 200).
request_status(404, 404, 404, 404),
IndexV2TestSpec('v2.download_blob', 'GET', PRIVATE_REPO, digest=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
IndexV2TestSpec('v2.download_blob', 'GET', ORG_REPO, digest=FAKE_DIGEST).
auth_status(403, 200, 200).
request_status(401, 401, 404, 404),
# v2.start_blob_upload
IndexV2TestSpec('v2.start_blob_upload', 'POST', PUBLIC_REPO).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.start_blob_upload', 'POST', PRIVATE_REPO).
auth_status(403, 403, 200).
request_status(401, 401, 401, 202),
IndexV2TestSpec('v2.start_blob_upload', 'POST', ORG_REPO).
auth_status(403, 403, 200).
request_status(401, 401, 401, 202),
# v2.fetch_existing_upload
IndexV2TestSpec('v2.fetch_existing_upload', 'GET', PUBLIC_REPO, 'push,pull', upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.fetch_existing_upload', 'GET', PRIVATE_REPO, 'push,pull', upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
IndexV2TestSpec('v2.fetch_existing_upload', 'GET', ORG_REPO, 'push,pull', upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
# v2.upload_chunk
IndexV2TestSpec('v2.upload_chunk', 'PATCH', PUBLIC_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.upload_chunk', 'PATCH', PRIVATE_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
IndexV2TestSpec('v2.upload_chunk', 'PATCH', ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
# v2.monolithic_upload_or_last_chunk
IndexV2TestSpec('v2.monolithic_upload_or_last_chunk', 'PUT', PUBLIC_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.monolithic_upload_or_last_chunk', 'PUT', PRIVATE_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
IndexV2TestSpec('v2.monolithic_upload_or_last_chunk', 'PUT', ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 400),
# v2.cancel_upload
IndexV2TestSpec('v2.cancel_upload', 'DELETE', PUBLIC_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 403).
request_status(401, 401, 401, 401),
IndexV2TestSpec('v2.cancel_upload', 'DELETE', PRIVATE_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
IndexV2TestSpec('v2.cancel_upload', 'DELETE', ORG_REPO, upload_uuid=FAKE_UPLOAD_ID).
auth_status(403, 403, 200).
request_status(401, 401, 401, 404),
]

View file

@ -31,7 +31,6 @@ class _SpecTestBuilder(type):
expected_index_status = getattr(test_spec, attrs['result_attr'])
if attrs['auth_username']:
expected_auth_status = getattr(test_spec, 'auth_' + attrs['result_attr'])
# Get a signed JWT.
username = attrs['auth_username']
@ -44,12 +43,11 @@ class _SpecTestBuilder(type):
headers=[('authorization', test_spec.gen_basic_auth(username, password))],
query_string=query_string)
msg = 'Auth failed for %s %s: got %s, expected: %s' % (
test_spec.method_name, test_spec.index_name, arv.status_code, expected_auth_status)
self.assertEqual(arv.status_code, expected_auth_status, msg)
msg = 'Auth failed for %s %s: got %s, expected: 200' % (
test_spec.method_name, test_spec.index_name, arv.status_code)
self.assertEqual(arv.status_code, 200, msg)
if arv.status_code == 200:
headers = [('authorization', 'Bearer ' + json.loads(arv.data)['token'])]
headers = [('authorization', 'Bearer ' + json.loads(arv.data)['token'])]
rv = c.open(url, headers=headers, method=test_spec.method_name)
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,