Add additional logs and an additional test for verbs
This commit is contained in:
parent
b2db266747
commit
a706d99849
2 changed files with 26 additions and 2 deletions
|
@ -172,14 +172,20 @@ def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None):
|
|||
repo = model.get_repository(namespace, repo_name)
|
||||
repo_is_public = repo is not None and repo.is_public
|
||||
if not permission.can() and not repo_is_public:
|
||||
logger.debug('No permission to read repository %s/%s for user %s with verb %s', namespace,
|
||||
repo_name, get_authenticated_user(), verb)
|
||||
abort(403)
|
||||
|
||||
# Lookup the requested tag.
|
||||
tag_image = model.get_tag_image(namespace, repo_name, tag)
|
||||
if tag_image is None:
|
||||
logger.debug('Tag %s does not exist in repository %s/%s for user %s', tag, namespace, repo_name,
|
||||
get_authenticated_user())
|
||||
abort(404)
|
||||
|
||||
if repo is not None and repo.kind != 'image':
|
||||
logger.debug('Repository %s/%s for user %s is not an image repo', namespace, repo_name,
|
||||
get_authenticated_user())
|
||||
abort(405)
|
||||
|
||||
# If there is a data checker, call it first.
|
||||
|
@ -215,6 +221,8 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
|
|||
|
||||
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
|
||||
# Verify that the image exists and that we have access to it.
|
||||
logger.debug('Verifying repo verb %s for repository %s/%s with user %s with mimetype %s',
|
||||
verb, namespace, repository, get_authenticated_user(), request.accept_mimetypes.best)
|
||||
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
|
||||
|
||||
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
|
||||
|
|
|
@ -2180,11 +2180,27 @@ class ACIConversionTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerT
|
|||
class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase):
|
||||
""" Tests for registry squashing. """
|
||||
|
||||
def get_squashed_image(self):
|
||||
response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig')
|
||||
def get_squashed_image(self, auth='sig'):
|
||||
response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth=auth)
|
||||
tar = tarfile.open(fileobj=StringIO(response.content))
|
||||
return tar, response.content
|
||||
|
||||
def test_squashed_with_credentials(self):
|
||||
initial_images = [
|
||||
{
|
||||
'id': 'initialid',
|
||||
'contents': 'the initial image',
|
||||
},
|
||||
]
|
||||
|
||||
# Create the repo.
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
|
||||
initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc'
|
||||
|
||||
# Pull the squashed version of the tag.
|
||||
tar, _ = self.get_squashed_image(auth=('devtable', 'password'))
|
||||
self.assertTrue(initial_image_id in tar.getnames())
|
||||
|
||||
def test_squashed_changes(self):
|
||||
initial_images = [
|
||||
{
|
||||
|
|
Reference in a new issue