23c19bcbc1
This change implements support for registry integration tests using the new py.test-based live server test fixture. We can now parametrize the protocols we use (in prep for V2_2), and it makes the code *much* cleaner and less hacky. Note that moving the vast majority of the tests over from the existing impl will come as a followup PR
154 lines
4.7 KiB
Python
154 lines
4.7 KiB
Python
import copy
|
|
import logging.config
|
|
import os
|
|
import shutil
|
|
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
import pytest
|
|
|
|
from Crypto import Random
|
|
from flask import jsonify, g
|
|
from flask_principal import Identity
|
|
|
|
from app import storage
|
|
from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
|
|
from data import model
|
|
from endpoints.csrf import generate_csrf_token
|
|
from util.log import logfile_path
|
|
|
|
from test.registry.liveserverfixture import LiveServerExecutor
|
|
|
|
|
|
@pytest.fixture()
|
|
def registry_server_executor(app):
|
|
def generate_csrf():
|
|
return generate_csrf_token()
|
|
|
|
def set_supports_direct_download(enabled):
|
|
storage.put_content(['local_us'], 'supports_direct_download', enabled)
|
|
return 'OK'
|
|
|
|
def delete_image(image_id):
|
|
image = Image.get(docker_image_id=image_id)
|
|
image.docker_image_id = 'DELETED'
|
|
image.save()
|
|
return 'OK'
|
|
|
|
def get_storage_replication_entry(image_id):
|
|
image = Image.get(docker_image_id=image_id)
|
|
QueueItem.select().where(QueueItem.queue_name ** ('%' + image.storage.uuid + '%')).get()
|
|
return 'OK'
|
|
|
|
def set_feature(feature_name, value):
|
|
import features
|
|
old_value = features._FEATURES[feature_name].value
|
|
features._FEATURES[feature_name].value = value
|
|
return jsonify({'old_value': old_value})
|
|
|
|
def clear_derived_cache():
|
|
DerivedStorageForImage.delete().execute()
|
|
return 'OK'
|
|
|
|
def clear_uncompressed_size(image_id):
|
|
image = model.image.get_image_by_id('devtable', 'newrepo', image_id)
|
|
image.storage.uncompressed_size = None
|
|
image.storage.save()
|
|
return 'OK'
|
|
|
|
def add_token():
|
|
another_token = model.token.create_delegate_token('devtable', 'newrepo', 'my-new-token',
|
|
'write')
|
|
another_token.code = 'somecooltokencode'
|
|
another_token.save()
|
|
return 'OK'
|
|
|
|
def break_database():
|
|
# Close any existing connection.
|
|
close_db_filter(None)
|
|
|
|
# Reload the database config with an invalid connection.
|
|
config = copy.copy(app.config)
|
|
config['DB_URI'] = 'sqlite:///not/a/valid/database'
|
|
configure(config)
|
|
|
|
return 'OK'
|
|
|
|
def reload_app(server_hostname):
|
|
# Close any existing connection.
|
|
close_db_filter(None)
|
|
|
|
# Reload the database config.
|
|
app.config['SERVER_HOSTNAME'] = server_hostname[len('http://'):]
|
|
configure(app.config)
|
|
|
|
# Reload random after the process split, as it cannot be used uninitialized across forks.
|
|
Random.atfork()
|
|
|
|
# Required for anonymous calls to not exception.
|
|
g.identity = Identity(None, 'none')
|
|
|
|
if os.environ.get('DEBUGLOG') == 'true':
|
|
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
|
|
|
|
return 'OK'
|
|
|
|
executor = LiveServerExecutor()
|
|
executor.register('generate_csrf', generate_csrf)
|
|
executor.register('set_supports_direct_download', set_supports_direct_download)
|
|
executor.register('delete_image', delete_image)
|
|
executor.register('get_storage_replication_entry', get_storage_replication_entry)
|
|
executor.register('set_feature', set_feature)
|
|
executor.register('clear_derived_cache', clear_derived_cache)
|
|
executor.register('clear_uncompressed_size', clear_uncompressed_size)
|
|
executor.register('add_token', add_token)
|
|
executor.register('break_database', break_database)
|
|
executor.register('reload_app', reload_app)
|
|
return executor
|
|
|
|
|
|
@pytest.fixture()
|
|
def liveserver_app(app, registry_server_executor, init_db_path):
|
|
registry_server_executor.apply_blueprint_to_app(app)
|
|
|
|
if os.environ.get('DEBUG', 'false').lower() == 'true':
|
|
app.config['DEBUG'] = True
|
|
|
|
# Copy the clean database to a new path. We cannot share the DB created by the
|
|
# normal app fixture, as it is already open in the local process.
|
|
local_db_file = NamedTemporaryFile(delete=True)
|
|
local_db_file.close()
|
|
|
|
shutil.copy2(init_db_path, local_db_file.name)
|
|
app.config['DB_URI'] = 'sqlite:///{0}'.format(local_db_file.name)
|
|
return app
|
|
|
|
|
|
@pytest.fixture()
|
|
def app_reloader(liveserver, registry_server_executor):
|
|
registry_server_executor.on(liveserver).reload_app(liveserver.url)
|
|
yield
|
|
|
|
|
|
class FeatureFlagValue(object):
|
|
""" Helper object which temporarily sets the value of a feature flag.
|
|
|
|
Usage:
|
|
|
|
with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
|
|
... Features.ANONYMOUS_ACCESS is False in this context ...
|
|
"""
|
|
|
|
def __init__(self, feature_flag, test_value, executor):
|
|
self.feature_flag = feature_flag
|
|
self.test_value = test_value
|
|
self.executor = executor
|
|
|
|
self.old_value = None
|
|
|
|
def __enter__(self):
|
|
result = self.executor.set_feature(self.feature_flag, self.test_value)
|
|
self.old_value = result.json['old_value']
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.executor.set_feature(self.feature_flag, self.old_value)
|