This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry/protocol_v2.py
Joseph Schorr 23c19bcbc1 Implement support for registry integration tests via py.test
This change implements support for registry integration tests using the new py.test-based live server test fixture. We can now parametrize the protocols we use (in prep for V2_2), and it makes the code *much* cleaner and less hacky.

Note that moving the vast majority of the tests over from the existing impl will come as a followup PR
2018-05-01 13:28:24 +03:00

246 lines
9.1 KiB
Python

import hashlib
import json
from enum import Enum, unique
from image.docker.schema1 import DockerSchema1ManifestBuilder, DockerSchema1Manifest
from test.registry.protocols import (RegistryProtocol, Failures, ProtocolOptions, PushResult,
PullResult)
@unique
class V2ProtocolSteps(Enum):
""" Defines the various steps of the protocol, for matching failures. """
AUTH = 'auth'
BLOB_HEAD_CHECK = 'blob-head-check'
GET_MANIFEST = 'get-manifest'
class V2Protocol(RegistryProtocol):
FAILURE_CODES = {
V2ProtocolSteps.AUTH: {
Failures.UNAUTHENTICATED: 401,
Failures.UNAUTHORIZED: 403,
Failures.INVALID_REGISTRY: 400,
Failures.APP_REPOSITORY: 405,
},
V2ProtocolSteps.GET_MANIFEST: {
Failures.UNKNOWN_TAG: 404,
},
}
def __init__(self, jwk):
self.jwk = jwk
def ping(self, session):
result = session.get('/v2/')
assert result.status_code == 401
assert result.headers['Docker-Distribution-API-Version'] == 'registry/2.0'
def auth(self, session, credentials, namespace, repository, scopes=None,
expected_failure=None):
"""
Performs the V2 Auth flow, returning the token (if any) and the response.
Spec: https://docs.docker.com/registry/spec/auth/token/
"""
scopes = scopes or []
auth = None
if credentials is not None:
username, _ = credentials
auth = credentials
params = {
'account': username,
'service': 'localhost:5000',
}
if scopes:
params['scope'] = 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes))
response = self.conduct(session, 'GET', '/v2/auth', params=params, auth=auth,
expected_status=(200, expected_failure, V2ProtocolSteps.AUTH))
if expected_failure is None:
assert response.json().get('token') is not None
return response.json().get('token'), response
return None, response
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
scopes = options.scopes or ['push', 'pull']
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return
headers = {
'Authorization': 'Bearer ' + token,
}
# Build fake manifests.
manifests = {}
for tag_name in tag_names:
builder = DockerSchema1ManifestBuilder(namespace, repo_name, tag_name)
for image in reversed(images):
checksum = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
# If invalid blob references were requested, just make it up.
if options.manifest_invalid_blob_references:
checksum = 'sha256:' + hashlib.sha256('notarealthing').hexdigest()
builder.add_layer(checksum, json.dumps({'id': image.id, 'parent': image.parent_id}))
# Build the manifest.
manifests[tag_name] = builder.build(self.jwk)
# Push the layer data.
checksums = {}
for image in reversed(images):
checksum = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
checksums[image.id] = checksum
# Layer data should not yet exist.
self.conduct(session, 'HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repo_name, checksum),
expected_status=(404, expected_failure, V2ProtocolSteps.BLOB_HEAD_CHECK),
headers=headers)
# Start a new upload of the layer data.
response = self.conduct(session, 'POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repo_name),
expected_status=202,
headers=headers)
upload_uuid = response.headers['Docker-Upload-UUID']
new_upload_location = response.headers['Location']
assert new_upload_location.startswith('http://localhost:5000')
# We need to make this relative just for the tests because the live server test
# case modifies the port.
location = response.headers['Location'][len('http://localhost:5000'):]
# PATCH the image data into the layer.
if options.chunks_for_upload is None:
self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204,
headers=headers)
else:
# If chunked upload is requested, upload the data as a series of chunks, checking
# status at every point.
for chunk_data in options.chunks_for_upload:
if len(chunk_data) == 3:
(start_byte, end_byte, expected_code) = chunk_data
else:
(start_byte, end_byte) = chunk_data
expected_code = 204
patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)}
patch_headers.update(headers)
contents_chunk = image.bytes[start_byte:end_byte]
self.conduct(session, 'PATCH', location, data=contents_chunk,
expected_status=expected_code,
headers=patch_headers)
if expected_code != 204:
return
# Retrieve the upload status at each point, and ensure it is valid.
status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repo_name, upload_uuid)
response = self.conduct(session, 'GET', status_url, expected_status=204, headers=headers)
assert response.headers['Docker-Upload-UUID'] == upload_uuid
assert response.headers['Range'] == "bytes=0-%s" % end_byte
if options.cancel_blob_upload:
self.conduct(session, 'DELETE', location, params=dict(digest=checksum), expected_status=204,
headers=headers)
# Ensure the upload was canceled.
status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repo_name, upload_uuid)
self.conduct(session, 'GET', status_url, expected_status=404, headers=headers)
return
# Finish the layer upload with a PUT.
response = self.conduct(session, 'PUT', location, params=dict(digest=checksum),
expected_status=201, headers=headers)
assert response.headers['Docker-Content-Digest'] == checksum
# Ensure the layer exists now.
response = self.conduct(session, 'HEAD',
'/v2/%s/%s/blobs/%s' % (namespace, repo_name, checksum),
expected_status=200, headers=headers)
assert response.headers['Docker-Content-Digest'] == checksum
assert response.headers['Content-Length'] == str(len(image.bytes))
# And retrieve the layer data.
result = self.conduct(session, 'GET', '/v2/%s/%s/blobs/%s' % (namespace, repo_name, checksum),
headers=headers, expected_status=200)
assert result.content == image.bytes
# Write a manifest for each tag.
for tag_name in tag_names:
manifest = manifests[tag_name]
# Write the manifest. If we expect it to be invalid, we expect a 404 code. Otherwise, we
# expect a 202 response for success.
put_code = 404 if options.manifest_invalid_blob_references else 202
manifest_headers = {'Content-Type': 'application/json'}
manifest_headers.update(headers)
self.conduct(session, 'PUT', '/v2/%s/%s/manifests/%s' % (namespace, repo_name, tag_name),
data=manifest.bytes, expected_status=put_code,
headers=manifest_headers)
return PushResult(checksums=checksums, manifests=manifests)
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
scopes = options.scopes or ['pull']
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return None
headers = {
'Authorization': 'Bearer ' + token,
}
manifests = {}
for tag_name in tag_names:
# Retrieve the manifest for the tag or digest.
response = self.conduct(session, 'GET',
'/v2/%s/%s/manifests/%s' % (namespace, repo_name, tag_name),
expected_status=(200, expected_failure, V2ProtocolSteps.GET_MANIFEST),
headers=headers)
if expected_failure is not None:
return None
# Ensure the manifest returned by us is valid.
manifest = DockerSchema1Manifest(response.text)
manifests[tag_name] = manifest
# Verify the layers.
for index, layer in enumerate(manifest.layers):
result = self.conduct(session, 'GET',
'/v2/%s/%s/blobs/%s' % (namespace, repo_name, layer.digest),
expected_status=200,
headers=headers)
assert result.content == images[index].bytes
return PullResult(manifests=manifests)