913 lines
31 KiB
Python
913 lines
31 KiB
Python
import unittest
|
|
import requests
|
|
import os
|
|
import math
|
|
import random
|
|
import string
|
|
|
|
import Crypto.Random
|
|
from flask import request, jsonify
|
|
from flask.blueprints import Blueprint
|
|
from flask.ext.testing import LiveServerTestCase
|
|
|
|
from app import app
|
|
from data.database import close_db_filter, configure
|
|
from endpoints.v1 import v1_bp
|
|
from endpoints.v2 import v2_bp
|
|
from endpoints.v2.manifest import SignedManifestBuilder
|
|
from endpoints.api import api_bp
|
|
from initdb import wipe_database, initialize_database, populate_database
|
|
from endpoints.csrf import generate_csrf_token
|
|
from tempfile import NamedTemporaryFile
|
|
from jsonschema import validate as validate_schema
|
|
|
|
import endpoints.decorated
|
|
import json
|
|
import features
|
|
import hashlib
|
|
import logging
|
|
|
|
import tarfile
|
|
import shutil
|
|
|
|
from jwkest.jws import SIGNER_ALGS
|
|
from jwkest.jwk import RSAKey
|
|
from Crypto.PublicKey import RSA
|
|
|
|
from cStringIO import StringIO
|
|
from digest.checksums import compute_simple
|
|
|
|
try:
|
|
app.register_blueprint(v1_bp, url_prefix='/v1')
|
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
|
app.register_blueprint(api_bp, url_prefix='/api')
|
|
except ValueError:
|
|
# Blueprint was already registered
|
|
pass
|
|
|
|
|
|
# Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the
|
|
# DB connection.
|
|
|
|
testbp = Blueprint('testbp', __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@testbp.route('/csrf', methods=['GET'])
|
|
def generate_csrf():
|
|
return generate_csrf_token()
|
|
|
|
@testbp.route('/feature/<feature_name>', methods=['POST'])
|
|
def set_feature(feature_name):
|
|
import features
|
|
old_value = features._FEATURES[feature_name].value
|
|
features._FEATURES[feature_name].value = request.get_json()['value']
|
|
return jsonify({'old_value': old_value})
|
|
|
|
@testbp.route('/reloadapp', methods=['POST'])
|
|
def reload_app():
|
|
# Close any existing connection.
|
|
close_db_filter(None)
|
|
|
|
# Reload the database config.
|
|
configure(app.config)
|
|
|
|
# Reload random after the process split, as it cannot be used uninitialized across forks.
|
|
Crypto.Random.atfork()
|
|
return 'OK'
|
|
|
|
app.register_blueprint(testbp, url_prefix='/__test')
|
|
|
|
|
|
class TestFeature(object):
|
|
""" Helper object which temporarily sets the value of a feature flag.
|
|
"""
|
|
|
|
def __init__(self, test_case, feature_flag, test_value):
|
|
self.test_case = test_case
|
|
self.feature_flag = feature_flag
|
|
self.test_value = test_value
|
|
self.old_value = None
|
|
|
|
def __enter__(self):
|
|
result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
|
|
data=json.dumps(dict(value=self.test_value)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
result_data = json.loads(result.text)
|
|
self.old_value = result_data['old_value']
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
|
|
data=json.dumps(dict(value=self.old_value)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
|
|
_PORT_NUMBER = 5001
|
|
_CLEAN_DATABASE_PATH = None
|
|
_JWK = RSAKey(key=RSA.generate(2048))
|
|
|
|
|
|
def get_new_database_uri():
|
|
# If a clean copy of the database has not yet been created, create one now.
|
|
global _CLEAN_DATABASE_PATH
|
|
if not _CLEAN_DATABASE_PATH:
|
|
wipe_database()
|
|
initialize_database()
|
|
populate_database()
|
|
close_db_filter(None)
|
|
|
|
# Save the path of the clean database.
|
|
_CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name
|
|
|
|
# Create a new temp file to be used as the actual backing database for the test.
|
|
# Note that we have the close() the file to ensure we can copy to it via shutil.
|
|
local_db_file = NamedTemporaryFile(delete=True)
|
|
local_db_file.close()
|
|
|
|
# Copy the clean database to the path.
|
|
shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name)
|
|
return 'sqlite:///{0}'.format(local_db_file.name)
|
|
|
|
|
|
class RegistryTestCaseMixin(LiveServerTestCase):
|
|
def create_app(self):
|
|
global _PORT_NUMBER
|
|
_PORT_NUMBER = _PORT_NUMBER + 1
|
|
|
|
if os.environ.get('DEBUG') == 'true':
|
|
app.config['DEBUG'] = True
|
|
|
|
app.config['TESTING'] = True
|
|
app.config['LIVESERVER_PORT'] = _PORT_NUMBER
|
|
app.config['DB_URI'] = get_new_database_uri()
|
|
return app
|
|
|
|
def setUp(self):
|
|
self.clearSession()
|
|
|
|
# Tell the remote running app to reload the database and app. By default, the app forks from the
|
|
# current context and has already loaded the DB config with the *original* DB URL. We call
|
|
# the remote reload method to force it to pick up the changes to DB_URI set in the create_app
|
|
# method.
|
|
self.conduct('POST', '/__test/reloadapp')
|
|
|
|
def clearSession(self):
|
|
self.session = requests.Session()
|
|
self.signature = None
|
|
self.docker_token = 'true'
|
|
self.jwt = None
|
|
|
|
# Load the CSRF token.
|
|
self.csrf_token = ''
|
|
self.csrf_token = self.conduct('GET', '/__test/csrf').text
|
|
|
|
def do_tag(self, namespace, repository, tag, image_id, expected_code=200):
|
|
self.conduct('PUT', '/v1/repositories/%s/%s/tags/%s' % (namespace, repository, tag),
|
|
data='"%s"' % image_id, expected_code=expected_code, auth='sig')
|
|
|
|
def conduct_api_login(self, username, password):
|
|
self.conduct('POST', '/api/v1/signin',
|
|
data=json.dumps(dict(username=username, password=password)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
def change_repo_visibility(self, repository, namespace, visibility):
|
|
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
|
|
data=json.dumps(dict(visibility=visibility)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
|
|
class BaseRegistryMixin(object):
|
|
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200):
|
|
params = params or {}
|
|
params['_csrf_token'] = self.csrf_token
|
|
|
|
headers = headers or {}
|
|
auth_tuple = None
|
|
|
|
if self.docker_token:
|
|
headers['X-Docker-Token'] = self.docker_token
|
|
|
|
if auth == 'sig':
|
|
if self.signature:
|
|
headers['Authorization'] = 'token ' + self.signature
|
|
elif auth == 'jwt':
|
|
if self.jwt:
|
|
headers['Authorization'] = 'Bearer ' + self.jwt
|
|
elif auth:
|
|
auth_tuple = auth
|
|
|
|
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
|
|
auth=auth_tuple, params=params)
|
|
if response.status_code != expected_code:
|
|
print response.text
|
|
|
|
if 'www-authenticate' in response.headers:
|
|
self.signature = response.headers['www-authenticate']
|
|
|
|
if 'X-Docker-Token' in response.headers:
|
|
self.docker_token = response.headers['X-Docker-Token']
|
|
|
|
self.assertEquals(response.status_code, expected_code)
|
|
return response
|
|
|
|
def _get_default_images(self):
|
|
return [{'id': 'someid', 'contents': 'somecontent'}]
|
|
|
|
|
|
class V1RegistryMixin(BaseRegistryMixin):
|
|
def v1_ping(self):
|
|
self.conduct('GET', '/v1/_ping')
|
|
|
|
|
|
class V1RegistryPushMixin(V1RegistryMixin):
|
|
def do_push(self, namespace, repository, username, password, images=None, expected_code=201):
|
|
images = images or self._get_default_images()
|
|
auth = (username, password)
|
|
|
|
# Ping!
|
|
self.v1_ping()
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/
|
|
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
|
|
data=json.dumps(images), auth=auth,
|
|
expected_code=expected_code)
|
|
|
|
if expected_code != 201:
|
|
return
|
|
|
|
last_image_id = None
|
|
for image_data in images:
|
|
image_id = image_data['id']
|
|
last_image_id = image_id
|
|
|
|
# PUT /v1/images/{imageID}/json
|
|
self.conduct('PUT', '/v1/images/%s/json' % image_id,
|
|
data=json.dumps({'id': image_id}), auth='sig')
|
|
|
|
# PUT /v1/images/{imageID}/layer
|
|
tar_file_info = tarfile.TarInfo(name='image_name')
|
|
tar_file_info.type = tarfile.REGTYPE
|
|
tar_file_info.size = len(image_id)
|
|
|
|
layer_data = StringIO()
|
|
|
|
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
|
|
tar_file.addfile(tar_file_info, StringIO(image_id))
|
|
tar_file.close()
|
|
|
|
layer_bytes = layer_data.getvalue()
|
|
layer_data.close()
|
|
|
|
self.conduct('PUT', '/v1/images/%s/layer' % image_id,
|
|
data=StringIO(layer_bytes), auth='sig')
|
|
|
|
# PUT /v1/images/{imageID}/checksum
|
|
checksum = compute_simple(StringIO(layer_bytes), json.dumps({'id': image_id}))
|
|
self.conduct('PUT', '/v1/images/%s/checksum' % image_id,
|
|
headers={'X-Docker-Checksum-Payload': checksum},
|
|
auth='sig')
|
|
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
|
|
self.do_tag(namespace, repository, 'latest', images[0]['id'])
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/images
|
|
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
|
|
expected_code=204,
|
|
auth='sig')
|
|
|
|
|
|
class V1RegistryPullMixin(V1RegistryMixin):
|
|
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200,
|
|
images=None):
|
|
images = images or self._get_default_images()
|
|
|
|
auth = None
|
|
if username:
|
|
auth = (username, password)
|
|
|
|
# Ping!
|
|
self.v1_ping()
|
|
|
|
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
|
|
|
|
# GET /v1/repositories/{namespace}/{repository}/
|
|
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
|
|
if expected_code != 200:
|
|
return
|
|
|
|
# GET /v1/repositories/{namespace}/{repository}/
|
|
result = json.loads(self.conduct('GET', prefix + 'tags', auth='sig').text)
|
|
|
|
self.assertEquals(len(images), len(result.values()))
|
|
|
|
for image_data in images:
|
|
image_id = image_data['id']
|
|
self.assertIn(image_id, result.values())
|
|
|
|
# /v1/images/{imageID}/{ancestry, json, layer}
|
|
image_prefix = '/v1/images/%s/' % image_id
|
|
self.conduct('GET', image_prefix + 'ancestry', auth='sig')
|
|
self.conduct('GET', image_prefix + 'json', auth='sig')
|
|
self.conduct('GET', image_prefix + 'layer', auth='sig')
|
|
|
|
|
|
|
|
class V2RegistryMixin(BaseRegistryMixin):
|
|
MANIFEST_SCHEMA = {
|
|
'type': 'object',
|
|
'properties': {
|
|
'name': {
|
|
'type': 'string',
|
|
},
|
|
'tag': {
|
|
'type': 'string',
|
|
},
|
|
'signatures': {
|
|
'type': 'array',
|
|
'itemType': {
|
|
'type': 'object',
|
|
},
|
|
},
|
|
'fsLayers': {
|
|
'type': 'array',
|
|
'itemType': {
|
|
'type': 'object',
|
|
'properties': {
|
|
'blobSum': {
|
|
'type': 'string',
|
|
},
|
|
},
|
|
'required': 'blobSum',
|
|
},
|
|
},
|
|
'history': {
|
|
'type': 'array',
|
|
'itemType': {
|
|
'type': 'object',
|
|
'properties': {
|
|
'v1Compatibility': {
|
|
'type': 'object',
|
|
},
|
|
},
|
|
'required': ['v1Compatibility'],
|
|
},
|
|
},
|
|
},
|
|
'required': ['name', 'tag', 'fsLayers', 'history', 'signatures'],
|
|
}
|
|
|
|
def v2_ping(self):
|
|
response = self.conduct('GET', '/v2/', expected_code=200 if self.jwt else 401, auth='jwt')
|
|
self.assertEquals(response.headers['Docker-Distribution-API-Version'], 'registry/2.0')
|
|
|
|
|
|
def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]):
|
|
auth = (username, password)
|
|
params = {
|
|
'account': username,
|
|
'scope': 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes)),
|
|
'service': app.config['SERVER_HOSTNAME'],
|
|
}
|
|
|
|
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
|
|
expected_code=expected_code)
|
|
|
|
if expected_code == 200:
|
|
response_json = json.loads(response.text)
|
|
self.assertIsNotNone(response_json.get('token'))
|
|
self.jwt = response_json['token']
|
|
|
|
return response
|
|
|
|
|
|
class V2RegistryPushMixin(V2RegistryMixin):
|
|
def do_push(self, namespace, repository, username, password, images=None, tag_name=None,
|
|
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200):
|
|
images = images or self._get_default_images()
|
|
|
|
# Ping!
|
|
self.v2_ping()
|
|
|
|
# Auth.
|
|
self.do_auth(username, password, namespace, repository, scopes=['push', 'pull'],
|
|
expected_code=expected_auth_code)
|
|
|
|
if expected_auth_code != 200:
|
|
return
|
|
|
|
# Build a fake manifest.
|
|
tag_name = tag_name or 'latest'
|
|
builder = SignedManifestBuilder(namespace, repository, tag_name)
|
|
for image_data in images:
|
|
checksum = 'sha256:' + hashlib.sha256(image_data['contents']).hexdigest()
|
|
if invalid:
|
|
checksum = 'sha256:' + hashlib.sha256('foobarbaz').hexdigest()
|
|
|
|
builder.add_layer(checksum, json.dumps(image_data))
|
|
|
|
# Build the manifest.
|
|
manifest = builder.build(_JWK)
|
|
|
|
# Push the image's layers.
|
|
checksums = {}
|
|
for image_data in images:
|
|
image_id = image_data['id']
|
|
full_contents = image_data['contents']
|
|
chunks = image_data.get('chunks')
|
|
|
|
# Layer data should not yet exist.
|
|
checksum = 'sha256:' + hashlib.sha256(full_contents).hexdigest()
|
|
self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum),
|
|
expected_code=404, auth='jwt')
|
|
|
|
# Start a new upload of the layer data.
|
|
response = self.conduct('POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repository),
|
|
expected_code=202, auth='jwt')
|
|
|
|
upload_uuid = response.headers['Docker-Upload-UUID']
|
|
location = response.headers['Location'][len(self.get_server_url()):]
|
|
|
|
# PATCH the image data into the layer.
|
|
if chunks is None:
|
|
self.conduct('PATCH', location, data=full_contents, expected_code=204, auth='jwt')
|
|
else:
|
|
for chunk in chunks:
|
|
if len(chunk) == 3:
|
|
(start_byte, end_byte, expected_code) = chunk
|
|
else:
|
|
(start_byte, end_byte) = chunk
|
|
expected_code = 204
|
|
|
|
contents_chunk = full_contents[start_byte:end_byte]
|
|
self.conduct('PATCH', location, data=contents_chunk, expected_code=expected_code, auth='jwt',
|
|
headers={'Range': 'bytes=%s-%s' % (start_byte, end_byte)})
|
|
|
|
if expected_code != 204:
|
|
return
|
|
|
|
# Retrieve the upload status at each point.
|
|
status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repository, upload_uuid)
|
|
response = self.conduct('GET', status_url, expected_code=204, auth='jwt',
|
|
headers=dict(host=self.get_server_url()))
|
|
self.assertEquals(response.headers['Docker-Upload-UUID'], upload_uuid)
|
|
self.assertEquals(response.headers['Range'], "bytes=0-%s" % end_byte)
|
|
|
|
if cancel:
|
|
self.conduct('DELETE', location, params=dict(digest=checksum), expected_code=204,
|
|
auth='jwt')
|
|
|
|
# Ensure the upload was canceled.
|
|
status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repository, upload_uuid)
|
|
self.conduct('GET', status_url, expected_code=404, auth='jwt',
|
|
headers=dict(host=self.get_server_url()))
|
|
return
|
|
|
|
# Finish the layer upload with a PUT.
|
|
response = self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201,
|
|
auth='jwt')
|
|
|
|
self.assertEquals(response.headers['Docker-Content-Digest'], checksum)
|
|
checksums[image_id] = checksum
|
|
|
|
# Ensure the layer exists now.
|
|
response = self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum),
|
|
expected_code=200, auth='jwt')
|
|
self.assertEquals(response.headers['Docker-Content-Digest'], checksum)
|
|
self.assertEquals(response.headers['Content-Length'], str(len(full_contents)))
|
|
|
|
# Write the manifest.
|
|
put_code = 404 if invalid else expected_manifest_code
|
|
self.conduct('PUT', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name),
|
|
data=manifest.bytes, expected_code=put_code,
|
|
headers={'Content-Type': 'application/json'}, auth='jwt')
|
|
|
|
return checksums, manifest.digest
|
|
|
|
|
|
class V2RegistryPullMixin(V2RegistryMixin):
|
|
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200,
|
|
manifest_id=None, expected_manifest_code=200, images=None):
|
|
images = images or self._get_default_images()
|
|
|
|
# Ping!
|
|
self.v2_ping()
|
|
|
|
# Auth.
|
|
self.do_auth(username, password, namespace, repository, scopes=['pull'],
|
|
expected_code=expected_code)
|
|
if expected_code != 200:
|
|
return
|
|
|
|
# Retrieve the manifest for the tag or digest.
|
|
manifest_id = manifest_id or 'latest'
|
|
response = self.conduct('GET', '/v2/%s/%s/manifests/%s' % (namespace, repository, manifest_id),
|
|
auth='jwt', expected_code=expected_manifest_code)
|
|
if expected_manifest_code != 200:
|
|
return
|
|
|
|
manifest_data = json.loads(response.text)
|
|
|
|
# Ensure the manifest returned by us is valid.
|
|
validate_schema(manifest_data, V2RegistryMixin.MANIFEST_SCHEMA)
|
|
|
|
# Verify the layers.
|
|
blobs = {}
|
|
for layer in manifest_data['fsLayers']:
|
|
blob_id = layer['blobSum']
|
|
result = self.conduct('GET', '/v2/%s/%s/blobs/%s' % (namespace, repository, blob_id),
|
|
expected_code=200, auth='jwt')
|
|
|
|
blobs[blob_id] = result.text
|
|
|
|
# Verify the V1 metadata is present for each expected image.
|
|
found_v1_layers = set()
|
|
history = manifest_data['history']
|
|
for entry in history:
|
|
v1_history = json.loads(entry['v1Compatibility'])
|
|
found_v1_layers.add(v1_history['id'])
|
|
|
|
for image in images:
|
|
self.assertIn(image['id'], found_v1_layers)
|
|
|
|
return blobs
|
|
|
|
|
|
class RegistryTestsMixin(object):
|
|
def test_pull_publicrepo_anonymous(self):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
self.do_push('public', 'newrepo', 'public', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo anonymously, which should fail (since it is
|
|
# private)
|
|
self.do_pull('public', 'newrepo', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository anonymously, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo')
|
|
|
|
|
|
def test_pull_publicrepo_devtable(self):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
self.do_push('public', 'newrepo', 'public', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_pull_private_repo(self):
|
|
# Add a new repository under the devtable user, so we have a real repository to pull.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as public, which should fail as it belongs
|
|
# to devtable.
|
|
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by
|
|
# devtable.
|
|
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_public_no_anonymous_access_with_auth(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
self.do_push('public', 'newrepo', 'public', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_private_no_anonymous_access(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
self.do_push('public', 'newrepo', 'public', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Pull the repository as public, which should succeed because the repository is owned by public.
|
|
self.do_pull('public', 'newrepo', 'public', 'password')
|
|
|
|
|
|
def test_public_no_anonymous_access_no_auth(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
self.do_push('public', 'newrepo', 'public', 'password')
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as anonymous, which should fail as it
|
|
# is private.
|
|
self.do_pull('public', 'newrepo', expected_code=401)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Try again to pull the (currently public) repo as anonymous, which should fail as
|
|
# anonymous access is disabled.
|
|
self.do_pull('public', 'newrepo', expected_code=401)
|
|
|
|
# Pull the repository as public, which should succeed because the repository is owned by public.
|
|
self.do_pull('public', 'newrepo', 'public', 'password')
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_creator_user(self):
|
|
self.do_push('buynlarge', 'newrepo', 'creator', 'password')
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_robot_owner(self):
|
|
# Lookup the robot's password.
|
|
self.conduct_api_login('devtable', 'password')
|
|
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
|
|
robot_token = json.loads(resp.text)['token']
|
|
|
|
self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_robot_creator(self):
|
|
# Lookup the robot's password.
|
|
self.conduct_api_login('devtable', 'password')
|
|
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot')
|
|
robot_token = json.loads(resp.text)['token']
|
|
|
|
self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
|
|
RegistryTestCaseMixin, LiveServerTestCase):
|
|
""" Tests for V1 registry. """
|
|
def test_push_reponame_with_slashes(self):
|
|
# Attempt to add a repository name with slashes. This should fail as we do not support it.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expected_code=400)
|
|
|
|
def test_tag_validation(self):
|
|
image_id = 'onlyimagehere'
|
|
images = [{
|
|
'id': image_id
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.do_tag('public', 'newrepo', '1', image_id)
|
|
self.do_tag('public', 'newrepo', 'x' * 128, image_id)
|
|
self.do_tag('public', 'newrepo', '', image_id, expected_code=400)
|
|
self.do_tag('public', 'newrepo', 'x' * 129, image_id, expected_code=400)
|
|
self.do_tag('public', 'newrepo', '.fail', image_id, expected_code=400)
|
|
self.do_tag('public', 'newrepo', '-fail', image_id, expected_code=400)
|
|
|
|
|
|
class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin,
|
|
RegistryTestCaseMixin, LiveServerTestCase):
|
|
""" Tests for V2 registry. """
|
|
|
|
def test_push_reponame_with_slashes(self):
|
|
# Attempt to add a repository name with slashes. This should fail as we do not support it.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo/somesubrepo', 'devtable', 'password', images,
|
|
expected_auth_code=400)
|
|
|
|
def test_invalid_push(self):
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', invalid=True)
|
|
|
|
def test_cancel_push(self):
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
|
|
|
|
|
|
def test_pull_by_checksum(self):
|
|
# Add a new repository under the user, so we have a real repository to pull.
|
|
_, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
|
|
|
# Attempt to pull by digest.
|
|
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest)
|
|
|
|
|
|
def test_pull_invalid_image_tag(self):
|
|
# Add a new repository under the user, so we have a real repository to pull.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
|
self.clearSession()
|
|
|
|
# Attempt to pull the invalid tag.
|
|
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid',
|
|
expected_manifest_code=404)
|
|
|
|
|
|
def test_partial_upload_below_5mb(self):
|
|
chunksize = 1024 * 1024 * 2
|
|
size = chunksize * 3
|
|
contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
|
|
|
|
chunk_count = int(math.ceil((len(contents) * 1.0) / chunksize))
|
|
chunks = [(index * chunksize, (index + 1)*chunksize) for index in range(chunk_count)]
|
|
|
|
images = [
|
|
{
|
|
'id':'someid',
|
|
'contents': contents,
|
|
'chunks': chunks
|
|
}
|
|
]
|
|
|
|
# Push the chunked upload.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
|
|
# Pull the image back and verify the contents.
|
|
blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
self.assertEquals(len(blobs.items()), 1)
|
|
self.assertEquals(blobs.items()[0][1], contents)
|
|
|
|
def test_partial_upload_way_below_5mb(self):
|
|
size = 1024
|
|
contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
|
|
chunks = [(0, 100), (100, size)]
|
|
|
|
images = [
|
|
{
|
|
'id':'someid',
|
|
'contents': contents,
|
|
'chunks': chunks
|
|
}
|
|
]
|
|
|
|
# Push the chunked upload.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
|
|
# Pull the image back and verify the contents.
|
|
blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
self.assertEquals(len(blobs.items()), 1)
|
|
self.assertEquals(blobs.items()[0][1], contents)
|
|
|
|
def test_partial_upload_resend_below_5mb(self):
|
|
size = 150
|
|
contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
|
|
|
|
chunks = [(0, 100), (10, size)]
|
|
|
|
images = [
|
|
{
|
|
'id':'someid',
|
|
'contents': contents,
|
|
'chunks': chunks
|
|
}
|
|
]
|
|
|
|
# Push the chunked upload.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
|
|
# Pull the image back and verify the contents.
|
|
blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
self.assertEquals(len(blobs.items()), 1)
|
|
self.assertEquals(blobs.items()[0][1], contents)
|
|
|
|
def test_partial_upload_try_resend_with_gap(self):
|
|
size = 150
|
|
contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
|
|
|
|
chunks = [(0, 100), (101, size, 416)]
|
|
|
|
images = [
|
|
{
|
|
'id':'someid',
|
|
'contents': contents,
|
|
'chunks': chunks
|
|
}
|
|
]
|
|
|
|
# Attempt to push the chunked upload, which should fail.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
|
|
def test_multiple_layers_invalid(self):
|
|
# Attempt to push a manifest with an image depending on an unknown base layer.
|
|
images = [
|
|
{
|
|
'id': 'latestid',
|
|
'contents': 'the latest image',
|
|
'parent': 'baseid',
|
|
}
|
|
]
|
|
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images,
|
|
expected_manifest_code=400)
|
|
|
|
def test_multiple_layers(self):
|
|
# Push a manifest with multiple layers.
|
|
images = [
|
|
{
|
|
'id': 'latestid',
|
|
'contents': 'the latest image',
|
|
'parent': 'baseid',
|
|
},
|
|
{
|
|
'id': 'baseid',
|
|
'contents': 'The base image',
|
|
}
|
|
]
|
|
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
|
|
|
|
def test_invalid_regname(self):
|
|
self.do_push('devtable', 'this/is/a/repo', 'devtable', 'password', expected_auth_code=400)
|
|
|
|
def test_multiple_tags(self):
|
|
latest_images = [
|
|
{
|
|
'id': 'latestid',
|
|
'contents': 'the latest image'
|
|
}
|
|
]
|
|
|
|
foobar_images = [
|
|
{
|
|
'id': 'foobarid',
|
|
'contents': 'the foobar image',
|
|
}
|
|
]
|
|
|
|
# Create the repo.
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=latest_images,
|
|
tag_name='latest')
|
|
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=foobar_images,
|
|
tag_name='foobar')
|
|
|
|
# Retrieve the tags.
|
|
response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt', expected_code=200)
|
|
data = json.loads(response.text)
|
|
self.assertEquals(data['name'], "devtable/newrepo")
|
|
self.assertIn('latest', data['tags'])
|
|
self.assertIn('foobar', data['tags'])
|
|
|
|
# Retrieve the tags with pagination.
|
|
response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt',
|
|
params=dict(n=1), expected_code=200)
|
|
|
|
data = json.loads(response.text)
|
|
self.assertEquals(data['name'], "devtable/newrepo")
|
|
self.assertEquals(len(data['tags']), 1)
|
|
self.assertTrue(response.headers['Link'].find('n=1&last=2') > 0)
|
|
|
|
# Try to get tags before a repo exists.
|
|
self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401)
|
|
|
|
|
|
|
|
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
|
|
RegistryTestCaseMixin, LiveServerTestCase):
|
|
""" Tests for V1 push, V2 pull registry. """
|
|
|
|
class V1PullV2PushRegistryTests(V1RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin,
|
|
RegistryTestCaseMixin, LiveServerTestCase):
|
|
""" Tests for V1 pull, V2 push registry. """
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|