Merge pull request #3059 from quay/joseph.schorr/QUAY-906/reg-tests

Move registry integration tests to py.test
This commit is contained in:
Joseph Schorr 2018-05-22 17:09:11 -04:00 committed by GitHub
commit 6ffafe44d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 2149 additions and 204 deletions

View file

@ -85,7 +85,7 @@ local jobs = {
}, },
'registry-tests': unittest_stage { 'registry-tests': unittest_stage {
script: [ pytest_cmd + ' ./test/registry_tests.py'], script: [ pytest_cmd + ' ./test/registry/registry_tests.py'],
coverage: @"/^TOTAL.*\s+(\d+\%)\s*$/", coverage: @"/^TOTAL.*\s+(\d+\%)\s*$/",
}, },

View file

@ -292,7 +292,7 @@ registry-tests:
coverage: /^TOTAL.*\s+(\d+\%)\s*$/ coverage: /^TOTAL.*\s+(\d+\%)\s*$/
image: quay.io/quay/quay-ci:${CI_COMMIT_REF_SLUG} image: quay.io/quay/quay-ci:${CI_COMMIT_REF_SLUG}
script: script:
- py.test --cov="." --cov-report=html --cov-report=term-missing --timeout=3600 --verbose -x --color=no --show-count ./test/registry_tests.py - py.test --cov="." --cov-report=html --cov-report=term-missing --timeout=3600 --verbose -x --color=no --show-count ./test/registry/registry_tests.py
stage: tests stage: tests
tags: tags:
- kubernetes - kubernetes

View file

@ -1,189 +0,0 @@
# vim:ft=dockerfile
FROM phusion/baseimage:0.9.19
ENV DEBIAN_FRONTEND noninteractive
ENV HOME /root
ENV QUAYCONF /quay/conf
ENV QUAYDIR /quay
ENV QUAYPATH "."
RUN mkdir $QUAYDIR
WORKDIR $QUAYDIR
# This is so we don't break http golang/go#17066
# When Ubuntu has nginx >= 1.11.0 we can switch back.
RUN add-apt-repository ppa:nginx/development
# Add Yarn repository until it is officially added to Ubuntu
RUN curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -
RUN echo "deb https://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
# Install system packages
RUN apt-get update && apt-get upgrade -y # 26MAY2017
RUN apt-get install -y \
dnsmasq \
g++ \
gdb \
gdebi-core \
git \
jpegoptim \
libevent-2.0.5 \
libevent-dev \
libffi-dev \
libfreetype6-dev \
libgpgme11 \
libgpgme11-dev \
libjpeg62 \
libjpeg62-dev \
libjpeg8 \
libldap-2.4-2 \
libldap2-dev \
libmagic1 \
libpq-dev \
libpq5 \
libsasl2-dev \
libsasl2-modules \
monit \
nginx \
nodejs \
optipng \
openssl \
python-dbg \
python-dev \
python-pip \
python-virtualenv \
yarn=0.22.0-1 \
w3m
# Install python dependencies
ADD requirements.txt requirements.txt
RUN virtualenv --distribute venv
RUN venv/bin/pip install -r requirements.txt # 07SEP2016
RUN venv/bin/pip freeze
# Check python dependencies for the GPL
# Due to the following bug, pip results must be piped to a file before grepping:
# https://github.com/pypa/pip/pull/3304
RUN cat requirements.txt | grep -v "^-e" | awk -F'==' '{print $1}' | xargs venv/bin/pip --disable-pip-version-check show > pipinfo.txt && \
test -z $(cat pipinfo.txt | grep GPL | grep -v LGPL) && \
rm pipinfo.txt
# Install cfssl
RUN mkdir /gocode
ENV GOPATH /gocode
RUN curl -O https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz && \
tar -xvf go1.6.linux-amd64.tar.gz && \
mv go /usr/local && \
rm -rf go1.6.linux-amd64.tar.gz && \
/usr/local/go/bin/go get -u github.com/cloudflare/cfssl/cmd/cfssl && \
/usr/local/go/bin/go get -u github.com/cloudflare/cfssl/cmd/cfssljson && \
cp /gocode/bin/cfssljson /bin/cfssljson && \
cp /gocode/bin/cfssl /bin/cfssl && \
rm -rf /gocode && rm -rf /usr/local/go
# Install jwtproxy
RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64
RUN chmod +x /usr/local/bin/jwtproxy
# Install prometheus-aggregator
RUN curl -L -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator
RUN chmod +x /usr/local/bin/prometheus-aggregator
# Install front-end dependencies
RUN ln -s /usr/bin/nodejs /usr/bin/node
ADD package.json package.json
ADD tsconfig.json tsconfig.json
ADD webpack.config.js webpack.config.js
ADD yarn.lock yarn.lock
RUN yarn install --ignore-engines
# Add static files
ADD static static
# Run Webpack
RUN yarn build
# Optimize our images
ADD static/img static/img
RUN jpegoptim static/img/**/*.jpg
RUN optipng -clobber -quiet static/img/**/*.png
RUN apt-get remove -y --auto-remove python-dev g++ libjpeg62-dev libevent-dev libldap2-dev libsasl2-dev libpq-dev libffi-dev libgpgme11-dev nodejs jpegoptim optipng w3m
RUN apt-get autoremove -y
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Set up the init system
ADD conf/init/copy_config_files.sh /etc/my_init.d/
ADD conf/init/doupdatelimits.sh /etc/my_init.d/
ADD conf/init/copy_syslog_config.sh /etc/my_init.d/
ADD conf/init/certs_create.sh /etc/my_init.d/
ADD conf/init/certs_install.sh /etc/my_init.d/
ADD conf/init/nginx_conf_create.sh /etc/my_init.d/
ADD conf/init/runmigration.sh /etc/my_init.d/
ADD conf/init/syslog-ng.conf /etc/syslog-ng/
ADD conf/init/zz_boot.sh /etc/my_init.d/
ADD conf/init/service/ /etc/service/
RUN rm -rf /etc/service/syslog-forwarder
ADD conf/kill-buildmanager.sh /usr/local/bin/kill-buildmanager.sh
ADD conf/monitrc /etc/monit/monitrc
RUN chmod 0600 /etc/monit/monitrc
# remove after phusion/baseimage-docker#338 is fixed
ADD conf/init/logrotate.conf /etc/logrotate.conf
# TODO(ssewell): only works on a detached head, make work with ref
ADD .git/HEAD GIT_HEAD
# Add all of the files!
ADD . .
RUN mkdir static/fonts static/ldn
# Download any external libs.
RUN venv/bin/python -m external_libraries
RUN mkdir -p /usr/local/nginx/logs/
RUN pyclean .
# Cleanup any NPM-related stuff.
RUN rm -rf /root/.npm
RUN rm -rf .npm
RUN rm -rf /usr/local/lib/node_modules
RUN rm -rf /usr/share/yarn/node_modules
RUN rm -rf /root/node_modules
RUN rm -rf node_modules
RUN rm -rf grunt
RUN rm package.json yarn.lock
# Run the tests
ARG RUN_TESTS=true
ENV RUN_TESTS ${RUN_TESTS}
ENV RUN_ACI_TESTS False
ADD requirements-tests.txt requirements-tests.txt
RUN if [ "$RUN_TESTS" = true ]; then \
venv/bin/pip install -r requirements-tests.txt ;\
fi
RUN if [ "$RUN_TESTS" = true ]; then \
TEST=true PYTHONPATH="." venv/bin/py.test --timeout=7200 --verbose \
--show-count -x --color=no ./ && rm -rf /var/tmp/; \
fi
RUN if [ "$RUN_TESTS" = true ]; then \
TEST=true PYTHONPATH="." venv/bin/py.test --timeout=7200 --verbose \
--show-count -x --color=no test/registry_tests.py && rm -rf /var/tmp/;\
fi
RUN rm -rf /root/.cache
RUN PYTHONPATH=. venv/bin/alembic heads | grep -E '^[0-9a-f]+ \(head\)$' > ALEMBIC_HEAD
VOLUME ["/conf/stack", "/var/log", "/datastorage", "/tmp", "/conf/etcd"]
EXPOSE 443 8443 80

View file

@ -50,7 +50,7 @@ registry-test:
TEST=true PYTHONPATH="." py.test \ TEST=true PYTHONPATH="." py.test \
--cov="." --cov-report=html --cov-report=term-missing \ --cov="." --cov-report=html --cov-report=term-missing \
--timeout=3600 --verbose --show-count -x \ --timeout=3600 --verbose --show-count -x \
test/registry_tests.py test/registry/registry_tests.py
test: unit-test registry-test test: unit-test registry-test

View file

@ -274,7 +274,7 @@ TEST=true python -m test.test_api_usage -f SuiteName
TEST=true PYTHONPATH="." py.test --verbose TEST=true PYTHONPATH="." py.test --verbose
# To run a specific test module # To run a specific test module
TEST=true PYTHONPATH="." py.test --verbose test/registry_tests.py TEST=true PYTHONPATH="." py.test --verbose test/registry/registry_tests.py
# To run a specific test unique test # To run a specific test unique test
TEST=true PYTHONPATH="." py.test --verbose test/test_api_usage.py::TestDeleteNamespace TEST=true PYTHONPATH="." py.test --verbose test/test_api_usage.py::TestDeleteNamespace

View file

@ -13,4 +13,4 @@ run:
pip install --quiet -r quay-pull-request/requirements-tests.txt pip install --quiet -r quay-pull-request/requirements-tests.txt
cd quay-pull-request cd quay-pull-request
PYTHONPATH="." py.test --timeout=7200 --verbose \ PYTHONPATH="." py.test --timeout=7200 --verbose \
--show-count -x test/registry_tests.py --show-count -x test/registry/registry_tests.py

2
data/cache/impl.py vendored
View file

@ -40,7 +40,7 @@ class NoopDataModelCache(DataModelCache):
class InMemoryDataModelCache(DataModelCache): class InMemoryDataModelCache(DataModelCache):
""" Implementation of the data model cache backed by an in-memory dictionary. """ """ Implementation of the data model cache backed by an in-memory dictionary. """
def __init__(self): def __init__(self):
self.cache = ExpiresDict(rebuilder=lambda: {}) self.cache = ExpiresDict()
def retrieve(self, cache_key, loader, should_cache=is_not_none): def retrieve(self, cache_key, loader, should_cache=is_not_none):
not_found = [None] not_found = [None]

View file

@ -9,7 +9,7 @@ import features
from app import app from app import app
from auth.auth_context import get_authenticated_context from auth.auth_context import get_authenticated_context
from util.names import parse_namespace_repository from util.names import parse_namespace_repository, ImplicitLibraryNamespaceNotAllowed
from util.http import abort from util.http import abort
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,9 +26,14 @@ def parse_repository_name(include_tag=False,
def inner(func): def inner(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg], try:
app.config['LIBRARY_NAMESPACE'], repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
include_tag=include_tag) app.config['LIBRARY_NAMESPACE'],
include_tag=include_tag,
allow_library=features.LIBRARY_SUPPORT)
except ImplicitLibraryNamespaceNotAllowed:
abort(400)
del kwargs[incoming_repo_kwarg] del kwargs[incoming_repo_kwarg]
kwargs[ns_kwarg_name] = repo_name_components[0] kwargs[ns_kwarg_name] = repo_name_components[0]
kwargs[repo_kwarg_name] = repo_name_components[1] kwargs[repo_kwarg_name] = repo_name_components[1]

View file

@ -6,15 +6,18 @@ import pytest
import shutil import shutil
from flask import Flask, jsonify from flask import Flask, jsonify
from flask_login import LoginManager from flask_login import LoginManager
from flask_principal import identity_loaded, Permission, Identity, identity_changed, Principal
from peewee import SqliteDatabase, savepoint, InternalError from peewee import SqliteDatabase, savepoint, InternalError
from app import app as application from app import app as application
from auth.permissions import on_identity_loaded
from data import model from data import model
from data.database import close_db_filter, db, configure from data.database import close_db_filter, db, configure
from data.model.user import LoginWrappedDBUser from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp from endpoints.api import api_bp
from endpoints.appr import appr_bp from endpoints.appr import appr_bp
from endpoints.web import web from endpoints.web import web
from endpoints.v1 import v1_bp
from endpoints.v2 import v2_bp from endpoints.v2 import v2_bp
from endpoints.verbs import verbs as verbs_bp from endpoints.verbs import verbs as verbs_bp
from endpoints.webhooks import webhooks from endpoints.webhooks import webhooks
@ -118,6 +121,9 @@ def appconfig(database_uri):
'autorollback': True, 'autorollback': True,
}, },
"DB_TRANSACTION_FACTORY": _create_transaction, "DB_TRANSACTION_FACTORY": _create_transaction,
"DATA_MODEL_CACHE_CONFIG": {
'engine': 'inmemory',
},
} }
return conf return conf
@ -167,6 +173,12 @@ def app(appconfig, initialized_db):
def load_user(user_uuid): def load_user(user_uuid):
return LoginWrappedDBUser(user_uuid) return LoginWrappedDBUser(user_uuid)
@identity_loaded.connect_via(app)
def on_identity_loaded_for_test(sender, identity):
on_identity_loaded(sender, identity)
Principal(app, use_sessions=False)
app.url_map.converters['regex'] = RegexConverter app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
app.url_map.converters['repopath'] = RepositoryPathConverter app.url_map.converters['repopath'] = RepositoryPathConverter
@ -175,6 +187,7 @@ def app(appconfig, initialized_db):
app.register_blueprint(appr_bp, url_prefix='/cnr') app.register_blueprint(appr_bp, url_prefix='/cnr')
app.register_blueprint(web, url_prefix='/') app.register_blueprint(web, url_prefix='/')
app.register_blueprint(verbs_bp, url_prefix='/c1') app.register_blueprint(verbs_bp, url_prefix='/c1')
app.register_blueprint(v1_bp, url_prefix='/v1')
app.register_blueprint(v2_bp, url_prefix='/v2') app.register_blueprint(v2_bp, url_prefix='/v2')
app.register_blueprint(webhooks, url_prefix='/webhooks') app.register_blueprint(webhooks, url_prefix='/webhooks')

View file

200
test/registry/fixtures.py Normal file
View file

@ -0,0 +1,200 @@
import copy
import logging.config
import json
import os
import shutil
from tempfile import NamedTemporaryFile
import pytest
from Crypto import Random
from flask import jsonify, g
from flask_principal import Identity
from app import storage
from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
from data import model
from endpoints.csrf import generate_csrf_token
from util.log import logfile_path
from test.registry.liveserverfixture import LiveServerExecutor
@pytest.fixture()
def registry_server_executor(app):
def generate_csrf():
return generate_csrf_token()
def set_supports_direct_download(enabled):
storage.put_content(['local_us'], 'supports_direct_download', 'true' if enabled else 'false')
return 'OK'
def delete_image(image_id):
image = Image.get(docker_image_id=image_id)
image.docker_image_id = 'DELETED'
image.save()
return 'OK'
def get_storage_replication_entry(image_id):
image = Image.get(docker_image_id=image_id)
QueueItem.select().where(QueueItem.queue_name ** ('%' + image.storage.uuid + '%')).get()
return 'OK'
def set_feature(feature_name, value):
import features
old_value = features._FEATURES[feature_name].value
features._FEATURES[feature_name].value = value
return jsonify({'old_value': old_value})
def clear_derived_cache():
DerivedStorageForImage.delete().execute()
return 'OK'
def clear_uncompressed_size(image_id):
image = model.image.get_image_by_id('devtable', 'newrepo', image_id)
image.storage.uncompressed_size = None
image.storage.save()
return 'OK'
def add_token():
another_token = model.token.create_delegate_token('devtable', 'newrepo', 'my-new-token',
'write')
another_token.code = 'somecooltokencode'
another_token.save()
return another_token.code
def break_database():
# Close any existing connection.
close_db_filter(None)
# Reload the database config with an invalid connection.
config = copy.copy(app.config)
config['DB_URI'] = 'sqlite:///not/a/valid/database'
configure(config)
return 'OK'
def reload_app(server_hostname):
# Close any existing connection.
close_db_filter(None)
# Reload the database config.
app.config['SERVER_HOSTNAME'] = server_hostname[len('http://'):]
configure(app.config)
# Reload random after the process split, as it cannot be used uninitialized across forks.
Random.atfork()
# Required for anonymous calls to not exception.
g.identity = Identity(None, 'none')
if os.environ.get('DEBUGLOG') == 'true':
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
return 'OK'
def create_app_repository(namespace, name):
user = model.user.get_user(namespace)
model.repository.create_repository(namespace, name, user, repo_kind='application')
return 'OK'
executor = LiveServerExecutor()
executor.register('generate_csrf', generate_csrf)
executor.register('set_supports_direct_download', set_supports_direct_download)
executor.register('delete_image', delete_image)
executor.register('get_storage_replication_entry', get_storage_replication_entry)
executor.register('set_feature', set_feature)
executor.register('clear_derived_cache', clear_derived_cache)
executor.register('clear_uncompressed_size', clear_uncompressed_size)
executor.register('add_token', add_token)
executor.register('break_database', break_database)
executor.register('reload_app', reload_app)
executor.register('create_app_repository', create_app_repository)
return executor
@pytest.fixture()
def liveserver_app(app, registry_server_executor, init_db_path):
registry_server_executor.apply_blueprint_to_app(app)
if os.environ.get('DEBUG', 'false').lower() == 'true':
app.config['DEBUG'] = True
# Copy the clean database to a new path. We cannot share the DB created by the
# normal app fixture, as it is already open in the local process.
local_db_file = NamedTemporaryFile(delete=True)
local_db_file.close()
shutil.copy2(init_db_path, local_db_file.name)
app.config['DB_URI'] = 'sqlite:///{0}'.format(local_db_file.name)
return app
@pytest.fixture()
def app_reloader(liveserver, registry_server_executor):
registry_server_executor.on(liveserver).reload_app(liveserver.url)
yield
class FeatureFlagValue(object):
""" Helper object which temporarily sets the value of a feature flag.
Usage:
with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
... Features.ANONYMOUS_ACCESS is False in this context ...
"""
def __init__(self, feature_flag, test_value, executor):
self.feature_flag = feature_flag
self.test_value = test_value
self.executor = executor
self.old_value = None
def __enter__(self):
result = self.executor.set_feature(self.feature_flag, self.test_value)
self.old_value = result.json()['old_value']
def __exit__(self, type, value, traceback):
self.executor.set_feature(self.feature_flag, self.old_value)
class ApiCaller(object):
def __init__(self, liveserver_session, registry_server_executor):
self.liveserver_session = liveserver_session
self.csrf_token = registry_server_executor.on_session(liveserver_session).generate_csrf()
def conduct_auth(self, username, password):
r = self.post('/api/v1/signin',
data=json.dumps(dict(username=username, password=password)),
headers={'Content-Type': 'application/json'})
assert r.status_code == 200
def _adjust_params(self, kwargs):
if 'params' not in kwargs:
kwargs['params'] = {}
kwargs['params'].update({
'_csrf_token': self.csrf_token,
})
return kwargs
def get(self, url, **kwargs):
kwargs = self._adjust_params(kwargs)
return self.liveserver_session.get(url, **kwargs)
def post(self, url, **kwargs):
kwargs = self._adjust_params(kwargs)
return self.liveserver_session.post(url, **kwargs)
def change_repo_visibility(self, namespace, repository, visibility):
self.post('/api/v1/repository/%s/%s/changevisibility' % (namespace, repository),
data=json.dumps(dict(visibility=visibility)),
headers={'Content-Type': 'application/json'})
@pytest.fixture(scope="function")
def api_caller(liveserver, registry_server_executor):
return ApiCaller(liveserver.new_session(), registry_server_executor)

View file

@ -0,0 +1,272 @@
import inspect
import json
import multiprocessing
import socket
import socketserver
import time
from contextlib import contextmanager
from urlparse import urlparse, urljoin
import pytest
import requests
from flask import request
from flask.blueprints import Blueprint
class liveFlaskServer(object):
""" Helper class for spawning a live Flask server for testing.
Based on https://github.com/jarus/flask-testing/blob/master/flask_testing/utils.py#L421
"""
def __init__(self, app, port_value):
self.app = app
self._port_value = port_value
self._process = None
def get_server_url(self):
"""
Return the url of the test server
"""
return 'http://localhost:%s' % self._port_value.value
def terminate_live_server(self):
if self._process:
self._process.terminate()
def spawn_live_server(self):
self._process = None
port_value = self._port_value
def worker(app, port):
# Based on solution: http://stackoverflow.com/a/27598916
# Monkey-patch the server_bind so we can determine the port bound by Flask.
# This handles the case where the port specified is `0`, which means that
# the OS chooses the port. This is the only known way (currently) of getting
# the port out of Flask once we call `run`.
original_socket_bind = socketserver.TCPServer.server_bind
def socket_bind_wrapper(self):
ret = original_socket_bind(self)
# Get the port and save it into the port_value, so the parent process
# can read it.
(_, port) = self.socket.getsockname()
port_value.value = port
socketserver.TCPServer.server_bind = original_socket_bind
return ret
socketserver.TCPServer.server_bind = socket_bind_wrapper
app.run(port=port, use_reloader=False)
self._process = multiprocessing.Process(target=worker, args=(self.app, 0))
self._process.start()
# We must wait for the server to start listening, but give up
# after a specified maximum timeout
timeout = self.app.config.get('LIVESERVER_TIMEOUT', 5)
start_time = time.time()
while True:
time.sleep(0.1)
elapsed_time = (time.time() - start_time)
if elapsed_time > timeout:
raise RuntimeError("Failed to start the server after %d seconds. " % timeout)
if self._can_connect():
break
def _can_connect(self):
host, port = self._get_server_address()
if port == 0:
# Port specified by the user was 0, and the OS has not yet assigned
# the proper port.
return False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
except socket.error:
success = False
else:
success = True
finally:
sock.close()
return success
def _get_server_address(self):
"""
Gets the server address used to test the connection with a socket.
Respects both the LIVESERVER_PORT config value and overriding
get_server_url()
"""
parts = urlparse(self.get_server_url())
host = parts.hostname
port = parts.port
if port is None:
if parts.scheme == 'http':
port = 80
elif parts.scheme == 'https':
port = 443
else:
raise RuntimeError("Unsupported server url scheme: %s" % parts.scheme)
return host, port
class LiveFixtureServerSession(object):
""" Helper class for calling the live server via a single requests Session. """
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def _get_url(self, url):
return urljoin(self.base_url, url)
def get(self, url, **kwargs):
return self.session.get(self._get_url(url), **kwargs)
def post(self, url, **kwargs):
return self.session.post(self._get_url(url), **kwargs)
def put(self, url, **kwargs):
return self.session.put(self._get_url(url), **kwargs)
def delete(self, url, **kwargs):
return self.session.delete(self._get_url(url), **kwargs)
def request(self, method, url, **kwargs):
return self.session.request(method, self._get_url(url), **kwargs)
class LiveFixtureServer(object):
""" Helper for interacting with a live server. """
def __init__(self, url):
self.url = url
@contextmanager
def session(self):
""" Yields a session for speaking to the live server. """
yield LiveFixtureServerSession(self.url)
def new_session(self):
""" Returns a new session for speaking to the live server. """
return LiveFixtureServerSession(self.url)
@pytest.fixture(scope='function')
def liveserver(liveserver_app):
""" Runs a live Flask server for the app for the duration of the test.
Based on https://github.com/jarus/flask-testing/blob/master/flask_testing/utils.py#L421
"""
context = liveserver_app.test_request_context()
context.push()
port = multiprocessing.Value('i', 0)
live_server = liveFlaskServer(liveserver_app, port)
try:
live_server.spawn_live_server()
yield LiveFixtureServer(live_server.get_server_url())
finally:
context.pop()
live_server.terminate_live_server()
@pytest.fixture(scope='function')
def liveserver_session(liveserver, liveserver_app):
""" Fixtures which instantiates a liveserver and returns a single session for
interacting with that server.
"""
return LiveFixtureServerSession(liveserver.url)
class LiveServerExecutor(object):
""" Helper class which can be used to register functions to be executed in the
same process as the live server. This is necessary because the live server
runs in a different process and, therefore, in order to execute state changes
outside of the server's normal flows (i.e. via code), it must be executed
*in-process* via an HTTP call. The LiveServerExecutor class abstracts away
all the setup for this process.
Usage:
def _perform_operation(first_param, second_param):
... do some operation in the app ...
return 'some value'
@pytest.fixture(scope="session")
def my_server_executor():
executor = LiveServerExecutor()
executor.register('performoperation', _perform_operation)
return executor
@pytest.fixture()
def liveserver_app(app, my_server_executor):
... other app setup here ...
my_server_executor.apply_blueprint_to_app(app)
return app
def test_mytest(liveserver, my_server_executor):
# Invokes 'performoperation' in the liveserver's process.
my_server_executor.on(liveserver).performoperation('first', 'second')
"""
def __init__(self):
self.funcs = {}
def register(self, fn_name, fn):
""" Registers the given function under the given name. """
self.funcs[fn_name] = fn
def apply_blueprint_to_app(self, app):
""" Applies a blueprint to the app, to support invocation from this executor. """
testbp = Blueprint('testbp', __name__)
def build_invoker(fn_name, fn):
path = '/' + fn_name
@testbp.route(path, methods=['POST'], endpoint=fn_name)
def _(**kwargs):
arg_values = request.get_json()['args']
return fn(*arg_values)
for fn_name, fn in self.funcs.iteritems():
build_invoker(fn_name, fn)
app.register_blueprint(testbp, url_prefix='/__test')
def on(self, server):
""" Returns an invoker for the given live server. """
return liveServerExecutorInvoker(self.funcs, server)
def on_session(self, server_session):
""" Returns an invoker for the given live server session. """
return liveServerExecutorInvoker(self.funcs, server_session)
class liveServerExecutorInvoker(object):
def __init__(self, funcs, server_or_session):
self._funcs = funcs
self._server_or_session = server_or_session
def __getattribute__(self, name):
if name.startswith('_'):
return object.__getattribute__(self, name)
if name not in self._funcs:
raise AttributeError('Unknown function: %s' % name)
def invoker(*args):
path = '/__test/%s' % name
headers = {'Content-Type': 'application/json'}
if isinstance(self._server_or_session, LiveFixtureServerSession):
return self._server_or_session.post(path, data=json.dumps({'args': args}), headers=headers)
else:
with self._server_or_session.session() as session:
return session.post(path, data=json.dumps({'args': args}), headers=headers)
return invoker

View file

@ -0,0 +1,77 @@
import random
import string
import pytest
from Crypto.PublicKey import RSA
from jwkest.jwk import RSAKey
from test.registry.protocols import Image, layer_bytes_for_contents
from test.registry.protocol_v1 import V1Protocol
from test.registry.protocol_v2 import V2Protocol
@pytest.fixture(scope="session")
def basic_images():
""" Returns basic images for push and pull testing. """
# Note: order is from base layer down to leaf.
parent_bytes = layer_bytes_for_contents('parent contents')
image_bytes = layer_bytes_for_contents('some contents')
return [
Image(id='parentid', bytes=parent_bytes, parent_id=None),
Image(id='someid', bytes=image_bytes, parent_id='parentid'),
]
@pytest.fixture(scope="session")
def sized_images():
""" Returns basic images (with sizes) for push and pull testing. """
# Note: order is from base layer down to leaf.
parent_bytes = layer_bytes_for_contents('parent contents', mode='')
image_bytes = layer_bytes_for_contents('some contents', mode='')
return [
Image(id='parentid', bytes=parent_bytes, parent_id=None, size=len(parent_bytes)),
Image(id='someid', bytes=image_bytes, parent_id='parentid', size=len(image_bytes)),
]
@pytest.fixture(scope="session")
def jwk():
return RSAKey(key=RSA.generate(2048))
@pytest.fixture(params=[V2Protocol])
def v2_protocol(request, jwk):
return request.param(jwk)
@pytest.fixture(params=[V1Protocol])
def v1_protocol(request, jwk):
return request.param(jwk)
@pytest.fixture(params=[V2Protocol])
def manifest_protocol(request, jwk):
return request.param(jwk)
@pytest.fixture(params=[V1Protocol, V2Protocol])
def loginer(request, jwk):
return request.param(jwk)
@pytest.fixture(params=[V1Protocol, V2Protocol])
def pusher(request, jwk):
return request.param(jwk)
@pytest.fixture(params=[V1Protocol, V2Protocol])
def puller(request, jwk):
return request.param(jwk)
@pytest.fixture(scope="session")
def random_layer_data():
size = 4096
contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
return layer_bytes_for_contents(contents)

View file

@ -0,0 +1,179 @@
import json
from cStringIO import StringIO
from enum import Enum, unique
from digest.checksums import compute_simple
from test.registry.protocols import (RegistryProtocol, Failures, ProtocolOptions, PushResult,
PullResult)
@unique
class V1ProtocolSteps(Enum):
""" Defines the various steps of the protocol, for matching failures. """
PUT_IMAGES = 'put-images'
GET_IMAGES = 'get-images'
PUT_TAG = 'put-tag'
class V1Protocol(RegistryProtocol):
FAILURE_CODES = {
V1ProtocolSteps.PUT_IMAGES: {
Failures.UNAUTHENTICATED: 401,
Failures.UNAUTHORIZED: 403,
Failures.APP_REPOSITORY: 405,
Failures.INVALID_REPOSITORY: 404,
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
},
V1ProtocolSteps.GET_IMAGES: {
Failures.UNAUTHENTICATED: 403,
Failures.UNAUTHORIZED: 403,
Failures.APP_REPOSITORY: 405,
Failures.ANONYMOUS_NOT_ALLOWED: 401,
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
},
V1ProtocolSteps.PUT_TAG: {
Failures.MISSING_TAG: 404,
Failures.INVALID_TAG: 400,
Failures.INVALID_IMAGES: 400,
},
}
def __init__(self, jwk):
pass
def _auth_for_credentials(self, credentials):
if credentials is None:
return None
return credentials
def ping(self, session):
assert session.get('/v1/_ping').status_code == 200
def login(self, session, username, password, scopes, expect_success):
data = {
'username': username,
'password': password,
}
response = self.conduct(session, 'POST', '/v1/users/', json_data=data, expected_status=400)
assert (response.text == '"Username or email already exists"') == expect_success
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
auth = self._auth_for_credentials(credentials)
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
prefix = '/v1/repositories/%s/' % self.repo_name(namespace, repo_name)
# Ping!
self.ping(session)
# GET /v1/repositories/{namespace}/{repository}/images
headers = {'X-Docker-Token': 'true'}
result = self.conduct(session, 'GET', prefix + 'images', auth=auth, headers=headers,
expected_status=(200, expected_failure, V1ProtocolSteps.GET_IMAGES))
if expected_failure is not None:
return
headers = {}
if credentials is not None:
headers['Authorization'] = 'token ' + result.headers['www-authenticate']
else:
assert not 'www-authenticate' in result.headers
# GET /v1/repositories/{namespace}/{repository}/tags
image_ids = self.conduct(session, 'GET', prefix + 'tags', headers=headers).json()
assert len(image_ids.values()) == len(tag_names)
for tag_name in tag_names:
tag_image_id = image_ids[tag_name]
if not options.munge_shas:
# Ensure we have a matching image ID.
known_ids = {image.id for image in images}
assert tag_image_id in known_ids
# Retrieve the ancestry of the tagged image.
image_prefix = '/v1/images/%s/' % tag_image_id
ancestors = self.conduct(session, 'GET', image_prefix + 'ancestry', headers=headers).json()
assert len(ancestors) == len(images)
for index, image_id in enumerate(reversed(ancestors)):
# /v1/images/{imageID}/{ancestry, json, layer}
image_prefix = '/v1/images/%s/' % image_id
self.conduct(session, 'GET', image_prefix + 'ancestry', headers=headers)
result = self.conduct(session, 'GET', image_prefix + 'json', headers=headers)
assert result.json()['id'] == image_id
# Ensure we can HEAD the image layer.
self.conduct(session, 'HEAD', image_prefix + 'layer', headers=headers)
# And retrieve the layer data.
result = self.conduct(session, 'GET', image_prefix + 'layer', headers=headers)
assert result.content == images[index].bytes
return PullResult(manifests=None, image_ids=image_ids)
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
auth = self._auth_for_credentials(credentials)
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# PUT /v1/repositories/{namespace}/{repository}/
result = self.conduct(session, 'PUT',
'/v1/repositories/%s/' % self.repo_name(namespace, repo_name),
expected_status=(201, expected_failure, V1ProtocolSteps.PUT_IMAGES),
json_data={},
auth=auth)
if expected_failure is not None:
return
headers = {}
headers['Authorization'] = 'token ' + result.headers['www-authenticate']
for image in images:
# PUT /v1/images/{imageID}/json
image_json_data = {'id': image.id}
if image.size is not None:
image_json_data['Size'] = image.size
if image.parent_id is not None:
image_json_data['parent'] = image.parent_id
if image.config is not None:
image_json_data['config'] = image.config
self.conduct(session, 'PUT', '/v1/images/%s/json' % image.id,
json_data=image_json_data, headers=headers)
# PUT /v1/images/{imageID}/layer
self.conduct(session, 'PUT', '/v1/images/%s/layer' % image.id,
data=StringIO(image.bytes), headers=headers)
# PUT /v1/images/{imageID}/checksum
checksum = compute_simple(StringIO(image.bytes), json.dumps(image_json_data))
checksum_headers = {'X-Docker-Checksum-Payload': checksum}
checksum_headers.update(headers)
self.conduct(session, 'PUT', '/v1/images/%s/checksum' % image.id,
headers=checksum_headers)
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
for tag_name in tag_names:
self.conduct(session, 'PUT',
'/v1/repositories/%s/tags/%s' % (self.repo_name(namespace, repo_name), tag_name),
data='"%s"' % images[-1].id,
headers=headers,
expected_status=(200, expected_failure, V1ProtocolSteps.PUT_TAG))
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct(session, 'PUT',
'/v1/repositories/%s/images' % self.repo_name(namespace, repo_name),
expected_status=204, headers=headers)
return PushResult(checksums=None, manifests=None, headers=headers)

View file

@ -0,0 +1,371 @@
import hashlib
import json
from enum import Enum, unique
from image.docker.schema1 import DockerSchema1ManifestBuilder, DockerSchema1Manifest
from test.registry.protocols import (RegistryProtocol, Failures, ProtocolOptions, PushResult,
PullResult)
@unique
class V2ProtocolSteps(Enum):
""" Defines the various steps of the protocol, for matching failures. """
AUTH = 'auth'
BLOB_HEAD_CHECK = 'blob-head-check'
GET_MANIFEST = 'get-manifest'
PUT_MANIFEST = 'put-manifest'
class V2Protocol(RegistryProtocol):
FAILURE_CODES = {
V2ProtocolSteps.AUTH: {
Failures.UNAUTHENTICATED: 401,
Failures.INVALID_REGISTRY: 400,
Failures.APP_REPOSITORY: 405,
Failures.ANONYMOUS_NOT_ALLOWED: 401,
Failures.INVALID_REPOSITORY: 400,
},
V2ProtocolSteps.GET_MANIFEST: {
Failures.UNKNOWN_TAG: 404,
Failures.UNAUTHORIZED: 403,
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
},
V2ProtocolSteps.PUT_MANIFEST: {
Failures.DISALLOWED_LIBRARY_NAMESPACE: 400,
Failures.MISSING_TAG: 404,
Failures.INVALID_TAG: 400,
Failures.INVALID_IMAGES: 400,
Failures.INVALID_BLOB: 400,
Failures.UNSUPPORTED_CONTENT_TYPE: 415,
},
}
def __init__(self, jwk):
self.jwk = jwk
def ping(self, session):
result = session.get('/v2/')
assert result.status_code == 401
assert result.headers['Docker-Distribution-API-Version'] == 'registry/2.0'
def login(self, session, username, password, scopes, expect_success):
scopes = scopes if isinstance(scopes, list) else [scopes]
params = {
'account': username,
'service': 'localhost:5000',
'scope': scopes,
}
auth = (username, password)
if not username or not password:
auth = None
response = session.get('/v2/auth', params=params, auth=auth)
if expect_success:
assert response.status_code / 100 == 2
else:
assert response.status_code / 100 == 4
return response
def auth(self, session, credentials, namespace, repo_name, scopes=None,
expected_failure=None):
"""
Performs the V2 Auth flow, returning the token (if any) and the response.
Spec: https://docs.docker.com/registry/spec/auth/token/
"""
scopes = scopes or []
auth = None
username = None
if credentials is not None:
username, _ = credentials
auth = credentials
params = {
'account': username,
'service': 'localhost:5000',
}
if scopes:
params['scope'] = 'repository:%s:%s' % (self.repo_name(namespace, repo_name),
','.join(scopes))
response = self.conduct(session, 'GET', '/v2/auth', params=params, auth=auth,
expected_status=(200, expected_failure, V2ProtocolSteps.AUTH))
if expected_failure is None:
assert response.json().get('token') is not None
return response.json().get('token'), response
return None, response
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
scopes = options.scopes or ['push', 'pull']
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return
headers = {
'Authorization': 'Bearer ' + token,
}
# Build fake manifests.
manifests = {}
for tag_name in tag_names:
builder = DockerSchema1ManifestBuilder(namespace, repo_name, tag_name)
for image in reversed(images):
checksum = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
# If invalid blob references were requested, just make it up.
if options.manifest_invalid_blob_references:
checksum = 'sha256:' + hashlib.sha256('notarealthing').hexdigest()
layer_dict = {'id': image.id, 'parent': image.parent_id}
if image.config is not None:
layer_dict['config'] = image.config
if image.size is not None:
layer_dict['Size'] = image.size
builder.add_layer(checksum, json.dumps(layer_dict))
# Build the manifest.
manifests[tag_name] = builder.build(self.jwk)
# Push the layer data.
checksums = {}
for image in reversed(images):
checksum = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
checksums[image.id] = checksum
if not options.skip_head_checks:
# Layer data should not yet exist.
self.conduct(session, 'HEAD',
'/v2/%s/blobs/%s' % (self.repo_name(namespace, repo_name), checksum),
expected_status=(404, expected_failure, V2ProtocolSteps.BLOB_HEAD_CHECK),
headers=headers)
# Start a new upload of the layer data.
response = self.conduct(session, 'POST',
'/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name),
expected_status=202,
headers=headers)
upload_uuid = response.headers['Docker-Upload-UUID']
new_upload_location = response.headers['Location']
assert new_upload_location.startswith('http://localhost:5000')
# We need to make this relative just for the tests because the live server test
# case modifies the port.
location = response.headers['Location'][len('http://localhost:5000'):]
# PATCH the image data into the layer.
if options.chunks_for_upload is None:
self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204,
headers=headers)
else:
# If chunked upload is requested, upload the data as a series of chunks, checking
# status at every point.
for chunk_data in options.chunks_for_upload:
if len(chunk_data) == 3:
(start_byte, end_byte, expected_code) = chunk_data
else:
(start_byte, end_byte) = chunk_data
expected_code = 204
patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)}
patch_headers.update(headers)
contents_chunk = image.bytes[start_byte:end_byte]
self.conduct(session, 'PATCH', location, data=contents_chunk,
expected_status=expected_code,
headers=patch_headers)
if expected_code != 204:
return
# Retrieve the upload status at each point, and ensure it is valid.
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name),
upload_uuid)
response = self.conduct(session, 'GET', status_url, expected_status=204, headers=headers)
assert response.headers['Docker-Upload-UUID'] == upload_uuid
assert response.headers['Range'] == "bytes=0-%s" % end_byte
if options.cancel_blob_upload:
self.conduct(session, 'DELETE', location, params=dict(digest=checksum), expected_status=204,
headers=headers)
# Ensure the upload was canceled.
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name), upload_uuid)
self.conduct(session, 'GET', status_url, expected_status=404, headers=headers)
return
# Finish the layer upload with a PUT.
response = self.conduct(session, 'PUT', location, params=dict(digest=checksum),
expected_status=201, headers=headers)
assert response.headers['Docker-Content-Digest'] == checksum
# Ensure the layer exists now.
response = self.conduct(session, 'HEAD',
'/v2/%s/blobs/%s' % (self.repo_name(namespace, repo_name), checksum),
expected_status=200, headers=headers)
assert response.headers['Docker-Content-Digest'] == checksum
assert response.headers['Content-Length'] == str(len(image.bytes))
# And retrieve the layer data.
result = self.conduct(session, 'GET',
'/v2/%s/blobs/%s' % (self.repo_name(namespace, repo_name), checksum),
headers=headers, expected_status=200)
assert result.content == image.bytes
# Write a manifest for each tag.
for tag_name in tag_names:
manifest = manifests[tag_name]
# Write the manifest. If we expect it to be invalid, we expect a 404 code. Otherwise, we
# expect a 202 response for success.
put_code = 404 if options.manifest_invalid_blob_references else 202
manifest_headers = {'Content-Type': 'application/json'}
manifest_headers.update(headers)
if options.manifest_content_type is not None:
manifest_headers['Content-Type'] = options.manifest_content_type
self.conduct(session, 'PUT',
'/v2/%s/manifests/%s' % (self.repo_name(namespace, repo_name), tag_name),
data=manifest.bytes,
expected_status=(put_code, expected_failure, V2ProtocolSteps.PUT_MANIFEST),
headers=manifest_headers)
return PushResult(checksums=checksums, manifests=manifests, headers=headers)
def delete(self, session, namespace, repo_name, tag_names, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
scopes = options.scopes or ['*']
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return None
headers = {
'Authorization': 'Bearer ' + token,
}
for tag_name in tag_names:
self.conduct(session, 'DELETE',
'/v2/%s/manifests/%s' % (self.repo_name(namespace, repo_name), tag_name),
headers=headers,
expected_status=202)
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
options = options or ProtocolOptions()
scopes = options.scopes or ['pull']
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return None
headers = {
'Authorization': 'Bearer ' + token,
}
manifests = {}
image_ids = {}
for tag_name in tag_names:
# Retrieve the manifest for the tag or digest.
response = self.conduct(session, 'GET',
'/v2/%s/manifests/%s' % (self.repo_name(namespace, repo_name),
tag_name),
expected_status=(200, expected_failure, V2ProtocolSteps.GET_MANIFEST),
headers=headers)
if expected_failure is not None:
return None
# Ensure the manifest returned by us is valid.
manifest = DockerSchema1Manifest(response.text)
manifests[tag_name] = manifest
image_ids[tag_name] = manifest.leaf_layer.v1_metadata.image_id
# Verify the layers.
for index, layer in enumerate(manifest.layers):
result = self.conduct(session, 'GET',
'/v2/%s/blobs/%s' % (self.repo_name(namespace, repo_name),
layer.digest),
expected_status=200,
headers=headers)
assert result.content == images[index].bytes
return PullResult(manifests=manifests, image_ids=image_ids)
def catalog(self, session, page_size=2, credentials=None, options=None, expected_failure=None,
namespace=None, repo_name=None):
options = options or ProtocolOptions()
scopes = options.scopes or []
# Ping!
self.ping(session)
# Perform auth and retrieve a token.
headers = {}
if credentials is not None:
token, _ = self.auth(session, credentials, namespace, repo_name, scopes=scopes,
expected_failure=expected_failure)
if token is None:
return None
headers = {
'Authorization': 'Bearer ' + token,
}
results = []
url = '/v2/_catalog'
params = {}
if page_size is not None:
params['n'] = page_size
while True:
response = self.conduct(session, 'GET', url, headers=headers, params=params)
data = response.json()
assert len(data['repositories']) <= page_size
results.extend(data['repositories'])
if not response.headers.get('Link'):
return results
link_url = response.headers['Link']
v2_index = link_url.find('/v2/')
url = link_url[v2_index:]
return results

111
test/registry/protocols.py Normal file
View file

@ -0,0 +1,111 @@
import json
import tarfile
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from cStringIO import StringIO
from enum import Enum, unique
from six import add_metaclass
Image = namedtuple('Image', ['id', 'parent_id', 'bytes', 'size', 'config'])
Image.__new__.__defaults__ = (None, None)
PushResult = namedtuple('PushResult', ['checksums', 'manifests', 'headers'])
PullResult = namedtuple('PullResult', ['manifests', 'image_ids'])
def layer_bytes_for_contents(contents, mode='|gz'):
layer_data = StringIO()
def add_file(name, contents):
tar_file_info = tarfile.TarInfo(name=name)
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(contents)
tar_file_info.mtime = 1
tar_file = tarfile.open(fileobj=layer_data, mode='w' + mode)
tar_file.addfile(tar_file_info, StringIO(contents))
tar_file.close()
add_file('contents', contents)
layer_bytes = layer_data.getvalue()
layer_data.close()
return layer_bytes
@unique
class Failures(Enum):
""" Defines the various forms of expected failure. """
UNAUTHENTICATED = 'unauthenticated'
UNAUTHORIZED = 'unauthorized'
INVALID_REGISTRY = 'invalid-registry'
INVALID_REPOSITORY = 'invalid-repository'
APP_REPOSITORY = 'app-repository'
UNKNOWN_TAG = 'unknown-tag'
ANONYMOUS_NOT_ALLOWED = 'anonymous-not-allowed'
DISALLOWED_LIBRARY_NAMESPACE = 'disallowed-library-namespace'
MISSING_TAG = 'missing-tag'
INVALID_TAG = 'invalid-tag'
INVALID_IMAGES = 'invalid-images'
UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type'
INVALID_BLOB = 'invalid-blob'
class ProtocolOptions(object):
def __init__(self):
self.munge_shas = False
self.scopes = None
self.cancel_blob_upload = False
self.manifest_invalid_blob_references = False
self.chunks_for_upload = None
self.skip_head_checks = False
self.manifest_content_type = None
@add_metaclass(ABCMeta)
class RegistryProtocol(object):
""" Interface for protocols. """
FAILURE_CODES = {}
@abstractmethod
def login(self, session, username, password, scopes, expect_success):
""" Performs the login flow with the given credentials, over the given scopes. """
@abstractmethod
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
""" Pulls the given tag via the given session, using the given credentials, and
ensures the given images match.
"""
@abstractmethod
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
expected_failure=None, options=None):
""" Pushes the specified images as the given tag via the given session, using
the given credentials.
"""
def repo_name(self, namespace, repo_name):
if namespace:
return '%s/%s' % (namespace, repo_name)
return repo_name
def conduct(self, session, method, url, expected_status=200, params=None, data=None,
json_data=None, headers=None, auth=None):
if json_data is not None:
data = json.dumps(json_data)
headers = headers or {}
headers['Content-Type'] = 'application/json'
if isinstance(expected_status, tuple):
expected_status, expected_failure, protocol_step = expected_status
if expected_failure is not None:
failures = self.__class__.FAILURE_CODES.get(protocol_step, {})
expected_status = failures.get(expected_failure, expected_status)
result = session.request(method, url, params=params, data=data, headers=headers, auth=auth)
msg = "Expected response %s, got %s" % (expected_status, result.status_code)
assert result.status_code == expected_status, msg
return result

View file

@ -0,0 +1,892 @@
# pylint: disable=W0401, W0621, W0613, W0614, R0913
import hashlib
import tarfile
from cStringIO import StringIO
import binascii
import bencode
import resumablehashlib
from test.fixtures import *
from test.registry.liveserverfixture import *
from test.registry.fixtures import *
from test.registry.protocol_fixtures import *
from test.registry.protocols import Failures, Image, layer_bytes_for_contents, ProtocolOptions
from app import instance_keys
from util.security.registry_jwt import decode_bearer_header
from util.timedeltastring import convert_to_timedelta
def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Basic push and pull of an image to a new repository. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_basic_push_pull_by_manifest(manifest_protocol, basic_images, liveserver_session,
app_reloader):
""" Test: Basic push and pull-by-manifest of an image to a new repository. """
credentials = ('devtable', 'password')
# Push a new repository.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository by digests to verify.
digests = [str(manifest.digest) for manifest in result.manifests.values()]
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images,
credentials=credentials)
def test_push_invalid_credentials(pusher, basic_images, liveserver_session, app_reloader):
""" Test: Ensure we get auth errors when trying to push with invalid credentials. """
invalid_credentials = ('devtable', 'notcorrectpassword')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
def test_pull_invalid_credentials(puller, basic_images, liveserver_session, app_reloader):
""" Test: Ensure we get auth errors when trying to pull with invalid credentials. """
invalid_credentials = ('devtable', 'notcorrectpassword')
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
def test_push_pull_formerly_bad_repo_name(pusher, puller, basic_images, liveserver_session,
app_reloader):
""" Test: Basic push and pull of an image to a new repository with a name that formerly
failed. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
credentials=credentials)
def test_application_repo(pusher, puller, basic_images, liveserver_session, app_reloader,
registry_server_executor, liveserver):
""" Test: Attempting to push or pull from an *application* repository raises a 405. """
credentials = ('devtable', 'password')
registry_server_executor.on(liveserver).create_app_repository('devtable', 'someapprepo')
# Attempt to push to the repository.
pusher.push(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
# Attempt to pull from the repository.
puller.pull(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
def test_middle_layer_different_sha(manifest_protocol, v1_protocol, liveserver_session,
app_reloader):
""" Test: Pushing of a 3-layer image with the *same* V1 ID's, but the middle layer having
different bytes, must result in new IDs being generated for the leaf layer, as
they point to different "images".
"""
credentials = ('devtable', 'password')
first_images = [
Image(id='baseimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('base')),
Image(id='middleimage', parent_id='baseimage', size=None,
bytes=layer_bytes_for_contents('middle')),
Image(id='leafimage', parent_id='middleimage', size=None,
bytes=layer_bytes_for_contents('leaf')),
]
# First push and pull the images, to ensure we have the basics setup and working.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', first_images,
credentials=credentials)
first_pull_result = manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
first_images, credentials=credentials)
first_manifest = first_pull_result.manifests['latest']
assert set([image.id for image in first_images]) == set(first_manifest.image_ids)
assert first_pull_result.image_ids['latest'] == 'leafimage'
# Next, create an image list with the middle image's *bytes* changed.
second_images = list(first_images)
second_images[1] = Image(id='middleimage', parent_id='baseimage', size=None,
bytes=layer_bytes_for_contents('different middle bytes'))
# Push and pull the image, ensuring that the produced ID for the middle and leaf layers
# are synthesized.
options = ProtocolOptions()
options.munge_shas = True
options.skip_head_checks = True
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', second_images,
credentials=credentials, options=options)
second_pull_result = v1_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
second_images, credentials=credentials, options=options)
assert second_pull_result.image_ids['latest'] != 'leafimage'
def add_robot(api_caller, _):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
return ('buynlarge+ownerbot', resp.json()['token'])
def add_token(_, executor):
return ('$token', executor.add_token().text)
@pytest.mark.parametrize('credentials, namespace, expected_performer', [
(('devtable', 'password'), 'devtable', 'devtable'),
(add_robot, 'buynlarge', 'buynlarge+ownerbot'),
(('$oauthtoken', 'test'), 'devtable', 'devtable'),
(('$app', 'test'), 'devtable', 'devtable'),
(add_token, 'devtable', None),
])
def test_push_pull_logging(credentials, namespace, expected_performer, pusher, puller, basic_images,
liveserver_session, liveserver, api_caller, app_reloader,
registry_server_executor):
""" Test: Basic push and pull, ensuring that logs are added for each operation. """
# Create the repository before the test push.
start_images = [Image(id='startimage', parent_id=None, size=None,
bytes=layer_bytes_for_contents('start image'))]
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', start_images,
credentials=('devtable', 'password'))
# Retrieve the credentials to use. This must be after the repo is created, because
# some credentials creation code adds the new entity to the repository.
if not isinstance(credentials, tuple):
credentials = credentials(api_caller, registry_server_executor.on(liveserver))
# Push to the repository with the specified credentials.
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
credentials=credentials)
# Check the logs for the push.
api_caller.conduct_auth('devtable', 'password')
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
logs = result.json()['logs']
assert len(logs) == 2
assert logs[0]['kind'] == 'push_repo'
assert logs[0]['metadata']['namespace'] == namespace
assert logs[0]['metadata']['repo'] == 'newrepo'
if expected_performer is not None:
assert logs[0]['performer']['name'] == expected_performer
# Pull the repository to verify.
puller.pull(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
credentials=credentials)
# Check the logs for the pull.
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
logs = result.json()['logs']
assert len(logs) == 3
assert logs[0]['kind'] == 'pull_repo'
assert logs[0]['metadata']['namespace'] == namespace
assert logs[0]['metadata']['repo'] == 'newrepo'
if expected_performer is not None:
assert logs[0]['performer']['name'] == expected_performer
def test_pull_publicrepo_anonymous(pusher, puller, basic_images, liveserver_session,
app_reloader, api_caller, liveserver):
""" Test: Pull a public repository anonymously. """
# Add a new repository under the public user, so we have a repository to pull.
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('public', 'password'))
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.UNAUTHORIZED)
# Using a non-public user should also fail.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'),
expected_failure=Failures.UNAUTHORIZED)
# Make the repository public.
api_caller.conduct_auth('public', 'password')
api_caller.change_repo_visibility('public', 'newrepo', 'public')
# Pull the repository anonymously, which should succeed because the repository is public.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images)
def test_pull_publicrepo_no_anonymous_access(pusher, puller, basic_images, liveserver_session,
app_reloader, api_caller, liveserver,
registry_server_executor):
""" Test: Attempts to pull a public repository anonymously, with the feature flag disabled. """
# Add a new repository under the public user, so we have a repository to pull.
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('public', 'password'))
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.UNAUTHORIZED)
# Using a non-public user should also fail.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'),
expected_failure=Failures.UNAUTHORIZED)
# Make the repository public.
api_caller.conduct_auth('public', 'password')
api_caller.change_repo_visibility('public', 'newrepo', 'public')
with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
# Attempt again to pull the (now public) repo anonymously, which should fail since
# the feature flag for anonymous access is turned off.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.ANONYMOUS_NOT_ALLOWED)
# Using a non-public user should now succeed.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'))
def test_basic_organization_flow(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Basic push and pull of an image to a new repository by members of an organization. """
# Add a new repository under the organization via the creator user.
pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('creator', 'password'))
# Ensure that the creator can pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('creator', 'password'))
# Ensure that the admin can pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'))
# Ensure that the reader *cannot* pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('reader', 'password'),
expected_failure=Failures.UNAUTHORIZED)
def test_library_support(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Pushing and pulling from the implicit library namespace. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository from the library namespace to verify.
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_library_namespace_with_support_disabled(pusher, puller, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing and pulling from the explicit library namespace, even when the
implicit one is disabled.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Push a new repository.
pusher.push(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository from the library namespace to verify.
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_push_library_with_support_disabled(pusher, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing to the implicit library namespace, when disabled,
should fail.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Attempt to push a new repository.
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
def test_pull_library_with_support_disabled(puller, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing to the implicit library namespace, when disabled,
should fail.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Attempt to pull the repository from the library namespace.
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
def test_image_replication(pusher, basic_images, liveserver_session, app_reloader, liveserver,
registry_server_executor):
""" Test: Ensure that entries are created for replication of the images pushed. """
credentials = ('devtable', 'password')
with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)):
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Ensure that entries were created for each image.
for image in basic_images:
r = registry_server_executor.on(liveserver).get_storage_replication_entry(image.id)
assert r.text == 'OK'
def test_push_reponame_with_slashes(pusher, basic_images, liveserver_session, app_reloader):
""" Test: Attempt to add a repository name with slashes. This should fail as we do not
support it.
"""
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'some/slash', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.INVALID_REPOSITORY)
@pytest.mark.parametrize('tag_name, expected_failure', [
('l', None),
('1', None),
('x' * 128, None),
('', Failures.MISSING_TAG),
('x' * 129, Failures.INVALID_TAG),
('.fail', Failures.INVALID_TAG),
('-fail', Failures.INVALID_TAG),
])
def test_tag_validaton(tag_name, expected_failure, pusher, basic_images, liveserver_session,
app_reloader):
""" Test: Various forms of tags and whether they succeed or fail as expected. """
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', tag_name, basic_images,
credentials=credentials,
expected_failure=expected_failure)
def test_invalid_parent(pusher, liveserver_session, app_reloader):
""" Test: Attempt to push an image with an invalid/missing parent. """
images = [
Image(id='childimage', parent_id='parentimage', size=None,
bytes=layer_bytes_for_contents('child')),
]
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials,
expected_failure=Failures.INVALID_IMAGES)
def test_wrong_image_order(pusher, liveserver_session, app_reloader):
""" Test: Attempt to push an image with its layers in the wrong order. """
images = [
Image(id='childimage', parent_id='parentimage', size=None,
bytes=layer_bytes_for_contents('child')),
Image(id='parentimage', parent_id=None, size=None,
bytes=layer_bytes_for_contents('parent')),
]
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials,
expected_failure=Failures.INVALID_IMAGES)
@pytest.mark.parametrize('labels', [
# Basic labels.
[('foo', 'bar', 'text/plain'), ('baz', 'meh', 'text/plain')],
# Theoretically invalid, but allowed when pushed via registry protocol.
[('theoretically-invalid--label', 'foo', 'text/plain')],
# JSON label.
[('somejson', '{"some": "json"}', 'application/json'), ('plain', '', 'text/plain')],
# JSON-esque (but not valid JSON) labels.
[('foo', '[hello world]', 'text/plain'), ('bar', '{wassup?!}', 'text/plain')],
])
def test_labels(labels, manifest_protocol, liveserver_session, api_caller, app_reloader):
""" Test: Image pushed with labels has those labels found in the database after the
push succeeds.
"""
images = [
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
config={'Labels': {key: value for (key, value, _) in labels}}),
]
credentials = ('devtable', 'password')
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
digest = result.manifests['latest'].digest
api_caller.conduct_auth('devtable', 'password')
data = api_caller.get('/api/v1/repository/devtable/newrepo/manifest/%s/labels' % digest).json()
labels_found = data['labels']
assert len(labels_found) == len(labels)
labels_found_map = {l['key']: l for l in labels_found}
assert set(images[0].config['Labels'].keys()) == set(labels_found_map.keys())
for key, _, media_type in labels:
assert labels_found_map[key]['source_type'] == 'manifest'
assert labels_found_map[key]['media_type'] == media_type
@pytest.mark.parametrize('label_value, expected_expiration', [
('1d', True),
('1h', True),
('2w', True),
('1g', False),
('something', False),
])
def test_expiration_label(label_value, expected_expiration, manifest_protocol, liveserver_session,
api_caller, app_reloader):
""" Test: Tag pushed with a valid `quay.expires-after` will have its expiration set to its
start time plus the duration specified. If the duration is invalid, no expiration will
be set.
"""
images = [
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
config={'Labels': {'quay.expires-after': label_value}}),
]
credentials = ('devtable', 'password')
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
api_caller.conduct_auth('devtable', 'password')
tag_data = api_caller.get('/api/v1/repository/devtable/newrepo/tag').json()['tags'][0]
if expected_expiration:
diff = convert_to_timedelta(label_value).total_seconds()
assert tag_data['end_ts'] == tag_data['start_ts'] + diff
else:
assert tag_data.get('end_ts') is None
@pytest.mark.parametrize('content_type', [
'application/vnd.oci.image.manifest.v1+json',
'application/vnd.docker.distribution.manifest.v2+json',
'application/vnd.foo.bar',
])
def test_unsupported_manifest_content_type(content_type, manifest_protocol, basic_images,
liveserver_session, app_reloader):
""" Test: Attempt to push a manifest with an unsupported media type. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.manifest_content_type = content_type
# Attempt to push a new repository.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials,
options=options,
expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE)
def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_session, app_reloader):
""" Test: Attempt to push a manifest with an invalid blob reference. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.manifest_invalid_blob_references = True
# Attempt to push a new repository.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials,
options=options,
expected_failure=Failures.INVALID_BLOB)
def test_delete_tag(manifest_protocol, puller, basic_images, liveserver_session,
app_reloader):
""" Test: Push a repository, delete a tag, and attempt to pull. """
credentials = ('devtable', 'password')
# Push the tags.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'],
basic_images, credentials=credentials)
# Delete tag `one` by digest.
manifest_protocol.delete(liveserver_session, 'devtable', 'newrepo',
result.manifests['one'].digest,
credentials=credentials)
# Attempt to pull tag `one` and ensure it doesn't work.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images,
credentials=credentials,
expected_failure=Failures.UNKNOWN_TAG)
# Pull tag `two` to verify it works.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'two', basic_images,
credentials=credentials)
def test_cancel_upload(manifest_protocol, basic_images, liveserver_session, app_reloader):
""" Test: Cancelation of blob uploads. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.cancel_blob_upload = True
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
basic_images, credentials=credentials)
def test_blob_caching(manifest_protocol, basic_images, liveserver_session, app_reloader,
liveserver, registry_server_executor):
""" Test: Pulling of blobs after initially pulled will result in the blobs being cached. """
credentials = ('devtable', 'password')
# Push a tag.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
basic_images, credentials=credentials)
# Conduct the initial pull to prime the cache.
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Disconnect the server from the database.
registry_server_executor.on(liveserver).break_database()
# Pull each blob, which should succeed due to caching. If caching is broken, this will
# fail when it attempts to hit the database.
for layer in result.manifests['latest'].layers:
blob_id = str(layer.digest)
r = liveserver_session.get('/v2/devtable/newrepo/blobs/%s' % blob_id, headers=result.headers)
assert r.status_code == 200
@pytest.mark.parametrize('chunks', [
# Two chunks.
[(0, 100), (100, None)],
# Multiple chunks.
[(0, 10), (10, 20), (20, None)],
[(0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, None)],
# Overlapping chunks.
[(0, 1024), (10, None)],
])
def test_chunked_blob_uploading(chunks, random_layer_data, manifest_protocol, puller,
liveserver_session, app_reloader):
""" Test: Uploading of blobs as chunks. """
credentials = ('devtable', 'password')
adjusted_chunks = []
for start_byte, end_byte in chunks:
adjusted_chunks.append((start_byte, end_byte if end_byte else len(random_layer_data)))
images = [
Image(id='theimage', parent_id=None, bytes=random_layer_data),
]
options = ProtocolOptions()
options.chunks_for_upload = adjusted_chunks
# Push the image, using the specified chunking.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
images, credentials=credentials, options=options)
# Pull to verify the image was created.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
def test_chunked_uploading_mismatched_chunks(manifest_protocol, random_layer_data,
liveserver_session, app_reloader):
""" Test: Attempt to upload chunks with data missing. """
credentials = ('devtable', 'password')
images = [
Image(id='theimage', parent_id=None, bytes=random_layer_data),
]
# Note: Byte #100 is missing.
options = ProtocolOptions()
options.chunks_for_upload = [(0, 100), (101, len(random_layer_data), 416)]
# Attempt to push, with the chunked upload failing.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
images, credentials=credentials, options=options)
@pytest.mark.parametrize('public_catalog, credentials, expected_repos', [
# No public access and no credentials => No results.
(False, None, None),
# Public access and no credentials => public repositories.
(True, None, ['public/publicrepo']),
# Private creds => private repositories.
(False, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
(True, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
])
@pytest.mark.parametrize('page_size', [
1,
2,
10,
50,
100,
])
def test_catalog(public_catalog, credentials, expected_repos, page_size, v2_protocol,
liveserver_session, app_reloader, liveserver, registry_server_executor):
""" Test: Retrieving results from the V2 catalog. """
with FeatureFlagValue('PUBLIC_CATALOG', public_catalog, registry_server_executor.on(liveserver)):
results = v2_protocol.catalog(liveserver_session, page_size=page_size, credentials=credentials,
namespace='devtable', repo_name='simple')
if expected_repos is None:
assert len(results) == 0
else:
assert set(expected_repos).issubset(set(results))
def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
registry_server_executor, app_reloader):
""" Test: Retrieve a torrent for pulling the image via the Quay CLI. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Required for torrent.
registry_server_executor.on(liveserver).set_supports_direct_download(True)
# For each layer, retrieve a torrent for the blob.
for image in basic_images:
digest = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
response = liveserver_session.get('/c1/torrent/devtable/newrepo/blobs/%s' % digest,
auth=credentials)
torrent_info = bencode.bdecode(response.content)
# Check the announce URL.
assert torrent_info['url-list'] == 'http://somefakeurl?goes=here'
# Check the metadata.
assert torrent_info.get('info', {}).get('pieces') is not None
assert torrent_info.get('announce') is not None
# Check the pieces.
sha = resumablehashlib.sha1()
sha.update(image.bytes)
expected = binascii.hexlify(sha.digest())
found = binascii.hexlify(torrent_info['info']['pieces'])
assert expected == found
@pytest.mark.parametrize('use_estimates', [
False,
True,
])
def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session,
liveserver, registry_server_executor, app_reloader):
""" Test: Pulling of squashed images. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images,
credentials=credentials)
if use_estimates:
# Clear the uncompressed size stored for the images, to ensure that we estimate instead.
for image in sized_images:
registry_server_executor.on(liveserver).clear_uncompressed_size(image.id)
# Pull the squashed version.
response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
tar = tarfile.open(fileobj=StringIO(response.content))
# Verify the squashed image.
expected_image_id = '13a2d9711a3e242fcd50a7627c02d86901ac801b78dcea3147e8ff640078de52'
expected_names = ['repositories',
expected_image_id,
'%s/json' % expected_image_id,
'%s/VERSION' % expected_image_id,
'%s/layer.tar' % expected_image_id]
assert tar.getnames() == expected_names
# Verify the JSON image data.
json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read())
# Ensure the JSON loads and parses.
result = json.loads(json_data)
assert result['id'] == expected_image_id
# Ensure that squashed layer tar can be opened.
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
def get_robot_password(api_caller):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
return resp.json()['token']
def get_encrypted_password(api_caller):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.post('/api/v1/user/clientkey', data=json.dumps(dict(password='password')),
headers={'Content-Type': 'application/json'})
return resp.json()['key']
@pytest.mark.parametrize('username, password, expect_success', [
# Invalid username.
('invaliduser', 'somepassword', False),
# Invalid password.
('devtable', 'invalidpassword', False),
# Invalid OAuth token.
('$oauthtoken', 'unknown', False),
# Invalid CLI token.
('$app', 'invalid', False),
# Valid.
('devtable', 'password', True),
# Robot.
('buynlarge+ownerbot', get_robot_password, True),
# Encrypted password.
('devtable', get_encrypted_password, True),
# OAuth.
('$oauthtoken', 'test', True),
# CLI Token.
('$app', 'test', True),
])
def test_login(username, password, expect_success, loginer, liveserver_session,
api_caller, app_reloader):
""" Test: Login flow. """
if not isinstance(password, str):
password = password(api_caller)
loginer.login(liveserver_session, username, password, [], expect_success)
@pytest.mark.parametrize('username, password, scopes, expected_access, expect_success', [
# No scopes.
('devtable', 'password', [], [], True),
# Basic pull.
('devtable', 'password', ['repository:devtable/simple:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['pull']},
], True),
# Basic push.
('devtable', 'password', ['repository:devtable/simple:push'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push']},
], True),
# Basic push/pull.
('devtable', 'password', ['repository:devtable/simple:push,pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull']},
], True),
# Admin.
('devtable', 'password', ['repository:devtable/simple:push,pull,*'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
], True),
# Basic pull with endpoint.
('devtable', 'password', ['repository:localhost:5000/devtable/simple:pull'], [
{'type': 'repository', 'name': 'localhost:5000/devtable/simple', 'actions': ['pull']},
], True),
# Basic pull with invalid endpoint.
('devtable', 'password', ['repository:someinvalid/devtable/simple:pull'], [], False),
# Pull with no access.
('public', 'password', ['repository:devtable/simple:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
], True),
# Anonymous push and pull on a private repository.
('', '', ['repository:devtable/simple:pull,push'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
], True),
# Pull and push with no push access.
('reader', 'password', ['repository:buynlarge/orgrepo:pull,push'], [
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
], True),
# OAuth.
('$oauthtoken', 'test', ['repository:public/publicrepo:pull,push'], [
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
# Anonymous public repo.
('', '', ['repository:public/publicrepo:pull,push'], [
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
# Multiple scopes.
('devtable', 'password',
['repository:devtable/simple:push,pull,*', 'repository:buynlarge/orgrepo:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
], True),
# Multiple scopes.
('devtable', 'password',
['repository:devtable/simple:push,pull,*', 'repository:public/publicrepo:push,pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
])
def test_login_scopes(username, password, scopes, expected_access, expect_success, v2_protocol,
liveserver_session, api_caller, app_reloader):
""" Test: Login via the V2 auth protocol reacts correctly to requested scopes. """
if not isinstance(password, str):
password = password(api_caller)
response = v2_protocol.login(liveserver_session, username, password, scopes, expect_success)
if not expect_success:
return
# Validate the returned token.
encoded = response.json()['token']
payload = decode_bearer_header('Bearer ' + encoded, instance_keys,
{'SERVER_HOSTNAME': 'localhost:5000'})
assert payload is not None
assert payload['access'] == expected_access

View file

@ -1,5 +1,5 @@
[tox] [tox]
envlist = {py27}-{unittest,registry} envlist = {py27}-{unittest,registry,registry_old}
skipsdist = True skipsdist = True
[testenv] [testenv]
@ -11,7 +11,8 @@ deps =
setenv = setenv =
PYTHONPATH = {toxinidir}:{toxinidir} PYTHONPATH = {toxinidir}:{toxinidir}
TEST=true TEST=true
registry: FILE="test/registry_tests.py" registry: FILE="test/registry/registry_tests.py"
registry_old: FILE="test/registry_tests.py"
unittest: FILE="" unittest: FILE=""
commands = commands =
py.test --verbose {env:FILE} -vv {posargs} py.test --verbose {env:FILE} -vv {posargs}

View file

@ -17,9 +17,9 @@ class ExpiresEntry(object):
class ExpiresDict(object): class ExpiresDict(object):
""" ExpiresDict defines a dictionary-like class whose keys have expiration. The rebuilder is """ ExpiresDict defines a dictionary-like class whose keys have expiration. The rebuilder is
a function that returns the full contents of the cached dictionary as a dict of the keys a function that returns the full contents of the cached dictionary as a dict of the keys
and whose values are TTLEntry's. and whose values are TTLEntry's. If the rebuilder is None, then no rebuilding is performed.
""" """
def __init__(self, rebuilder): def __init__(self, rebuilder=None):
self._rebuilder = rebuilder self._rebuilder = rebuilder
self._items = {} self._items = {}
@ -49,6 +49,9 @@ class ExpiresDict(object):
return self.get(key) is not None return self.get(key) is not None
def _rebuild(self): def _rebuild(self):
if self._rebuilder is None:
return self._items
items = self._rebuilder() items = self._rebuilder()
self._items = items self._items = items
return items return items

View file

@ -14,6 +14,13 @@ TAG_REGEX = re.compile(FULL_TAG_PATTERN)
TAG_ERROR = ('Invalid tag: must match [A-Za-z0-9_.-], NOT start with "." or "-", ' TAG_ERROR = ('Invalid tag: must match [A-Za-z0-9_.-], NOT start with "." or "-", '
'and can contain 1-128 characters') 'and can contain 1-128 characters')
class ImplicitLibraryNamespaceNotAllowed(Exception):
""" Exception raised if the implicit library namespace was specified but is
not allowed. """
pass
def escape_tag(tag, default='latest'): def escape_tag(tag, default='latest'):
""" Escapes a Docker tag, ensuring it matches the tag regular expression. """ """ Escapes a Docker tag, ensuring it matches the tag regular expression. """
if not tag: if not tag:
@ -24,13 +31,16 @@ def escape_tag(tag, default='latest'):
return tag[0:127] return tag[0:127]
def parse_namespace_repository(repository, library_namespace, include_tag=False): def parse_namespace_repository(repository, library_namespace, include_tag=False,
allow_library=True):
repository = repository.encode('unidecode', 'ignore') repository = repository.encode('unidecode', 'ignore')
parts = repository.rstrip('/').split('/', 1) parts = repository.rstrip('/').split('/', 1)
if len(parts) < 2: if len(parts) < 2:
namespace = library_namespace namespace = library_namespace
repository = parts[0] repository = parts[0]
if not allow_library:
raise ImplicitLibraryNamespaceNotAllowed()
else: else:
(namespace, repository) = parts (namespace, repository) = parts