This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry/protocol_v1.py
2018-09-24 12:54:56 -04:00

210 lines
7.7 KiB
Python

import json
from cStringIO import StringIO
from enum import Enum, unique
from digest.checksums import compute_simple
from test.registry.protocols import (RegistryProtocol, Failures, ProtocolOptions, PushResult,
PullResult)
@unique
class V1ProtocolSteps(Enum):
""" Defines the various steps of the protocol, for matching failures. """
PUT_IMAGES = 'put-images'
GET_IMAGES = 'get-images'
PUT_TAG = 'put-tag'
PUT_IMAGE_JSON = 'put-image-json'
DELETE_TAG = 'delete-tag'
class V1Protocol(RegistryProtocol):
FAILURE_CODES = {
V1ProtocolSteps.PUT_IMAGES: {
Failures.UNAUTHENTICATED: 401,
Failures.UNAUTHORIZED: 403,
Failures.APP_REPOSITORY: 405,
Failures.SLASH_REPOSITORY: 404,
Failures.INVALID_REPOSITORY: 400,
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
Failures.NAMESPACE_DISABLED: 400,
},
V1ProtocolSteps.GET_IMAGES: {
Failures.UNAUTHENTICATED: 403,
Failures.UNAUTHORIZED: 403,
Failures.APP_REPOSITORY: 405,
Failures.ANONYMOUS_NOT_ALLOWED: 401,
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
Failures.NAMESPACE_DISABLED: 400,
},
V1ProtocolSteps.PUT_IMAGE_JSON: {
Failures.INVALID_IMAGES: 400,
},
V1ProtocolSteps.PUT_TAG: {
Failures.MISSING_TAG: 404,
Failures.INVALID_TAG: 400,
Failures.INVALID_IMAGES: 400,
Failures.NAMESPACE_DISABLED: 400,
},
}
def __init__(self, jwk):
pass
def _auth_for_credentials(self, credentials):
if credentials is None:
return None
return credentials
def ping(self, session):
assert session.get('/v1/_ping').status_code == 200
def login(self, session, username, password, scopes, expect_success):
data = {
'username': username,
'password': password,
}
response = self.conduct(session, 'POST', '/v1/users/', json_data=data, expected_status=400)
assert (response.text == '"Username or email already exists"') == expect_success
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
auth = self._auth_for_credentials(credentials)
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
prefix = '/v1/repositories/%s/' % self.repo_name(namespace, repo_name)
# Ping!
self.ping(session)
# GET /v1/repositories/{namespace}/{repository}/images
headers = {'X-Docker-Token': 'true'}
result = self.conduct(session, 'GET', prefix + 'images', auth=auth, headers=headers,
expected_status=(200, expected_failure, V1ProtocolSteps.GET_IMAGES))
if result.status_code != 200:
return
headers = {}
if credentials is not None:
headers['Authorization'] = 'token ' + result.headers['www-authenticate']
else:
assert not 'www-authenticate' in result.headers
# GET /v1/repositories/{namespace}/{repository}/tags
image_ids = self.conduct(session, 'GET', prefix + 'tags', headers=headers).json()
assert len(image_ids.values()) == len(tag_names)
for tag_name in tag_names:
if tag_name not in image_ids:
assert expected_failure == Failures.UNKNOWN_TAG
return None
tag_image_id = image_ids[tag_name]
if not options.munge_shas:
# Ensure we have a matching image ID.
known_ids = {image.id for image in images}
assert tag_image_id in known_ids
# Retrieve the ancestry of the tagged image.
image_prefix = '/v1/images/%s/' % tag_image_id
ancestors = self.conduct(session, 'GET', image_prefix + 'ancestry', headers=headers).json()
assert len(ancestors) == len(images)
for index, image_id in enumerate(reversed(ancestors)):
# /v1/images/{imageID}/{ancestry, json, layer}
image_prefix = '/v1/images/%s/' % image_id
self.conduct(session, 'GET', image_prefix + 'ancestry', headers=headers)
result = self.conduct(session, 'GET', image_prefix + 'json', headers=headers)
assert result.json()['id'] == image_id
# Ensure we can HEAD the image layer.
self.conduct(session, 'HEAD', image_prefix + 'layer', headers=headers)
# And retrieve the layer data.
result = self.conduct(session, 'GET', image_prefix + 'layer', headers=headers)
assert result.content == images[index].bytes
return PullResult(manifests=None, image_ids=image_ids)
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
auth = self._auth_for_credentials(credentials)
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# PUT /v1/repositories/{namespace}/{repository}/
result = self.conduct(session, 'PUT',
'/v1/repositories/%s/' % self.repo_name(namespace, repo_name),
expected_status=(201, expected_failure, V1ProtocolSteps.PUT_IMAGES),
json_data={},
auth=auth)
if result.status_code != 201:
return
headers = {}
headers['Authorization'] = 'token ' + result.headers['www-authenticate']
for image in images:
# PUT /v1/images/{imageID}/json
image_json_data = {'id': image.id}
if image.size is not None:
image_json_data['Size'] = image.size
if image.parent_id is not None:
image_json_data['parent'] = image.parent_id
if image.config is not None:
image_json_data['config'] = image.config
response = self.conduct(session, 'PUT', '/v1/images/%s/json' % image.id,
json_data=image_json_data, headers=headers,
expected_status=(200, expected_failure,
V1ProtocolSteps.PUT_IMAGE_JSON))
if response.status_code != 200:
break
# PUT /v1/images/{imageID}/layer
self.conduct(session, 'PUT', '/v1/images/%s/layer' % image.id,
data=StringIO(image.bytes), headers=headers)
# PUT /v1/images/{imageID}/checksum
checksum = compute_simple(StringIO(image.bytes), json.dumps(image_json_data))
checksum_headers = {'X-Docker-Checksum-Payload': checksum}
checksum_headers.update(headers)
self.conduct(session, 'PUT', '/v1/images/%s/checksum' % image.id,
headers=checksum_headers)
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
for tag_name in tag_names:
self.conduct(session, 'PUT',
'/v1/repositories/%s/tags/%s' % (self.repo_name(namespace, repo_name), tag_name),
data='"%s"' % images[-1].id,
headers=headers,
expected_status=(200, expected_failure, V1ProtocolSteps.PUT_TAG))
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct(session, 'PUT',
'/v1/repositories/%s/images' % self.repo_name(namespace, repo_name),
expected_status=204, headers=headers)
return PushResult(checksums=None, manifests=None, headers=headers)
def delete(self, session, namespace, repo_name, tag_names, credentials=None,
expected_failure=None, options=None):
auth = self._auth_for_credentials(credentials)
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
for tag_name in tag_names:
# DELETE /v1/repositories/{namespace}/{repository}/tags/{tag}
self.conduct(session, 'DELETE',
'/v1/repositories/%s/tags/%s' % (self.repo_name(namespace, repo_name), tag_name),
auth=auth,
expected_status=(200, expected_failure, V1ProtocolSteps.DELETE_TAG))