Allows admins to completely wall off a namespace by disabling it Fixes https://jira.coreos.com/browse/QUAY-869
		
			
				
	
	
		
			207 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			207 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import copy
 | |
| import logging.config
 | |
| import json
 | |
| import os
 | |
| import shutil
 | |
| 
 | |
| from tempfile import NamedTemporaryFile
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from Crypto import Random
 | |
| from flask import jsonify, g
 | |
| from flask_principal import Identity
 | |
| 
 | |
| from app import storage
 | |
| from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
 | |
| from data import model
 | |
| from endpoints.csrf import generate_csrf_token
 | |
| from util.log import logfile_path
 | |
| 
 | |
| from test.registry.liveserverfixture import LiveServerExecutor
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def registry_server_executor(app):
 | |
|   def generate_csrf():
 | |
|     return generate_csrf_token()
 | |
| 
 | |
|   def set_supports_direct_download(enabled):
 | |
|     storage.put_content(['local_us'], 'supports_direct_download', 'true' if enabled else 'false')
 | |
|     return 'OK'
 | |
| 
 | |
|   def delete_image(image_id):
 | |
|     image = Image.get(docker_image_id=image_id)
 | |
|     image.docker_image_id = 'DELETED'
 | |
|     image.save()
 | |
|     return 'OK'
 | |
| 
 | |
|   def get_storage_replication_entry(image_id):
 | |
|     image = Image.get(docker_image_id=image_id)
 | |
|     QueueItem.select().where(QueueItem.queue_name ** ('%' + image.storage.uuid + '%')).get()
 | |
|     return 'OK'
 | |
| 
 | |
|   def set_feature(feature_name, value):
 | |
|     import features
 | |
|     old_value = features._FEATURES[feature_name].value
 | |
|     features._FEATURES[feature_name].value = value
 | |
|     return jsonify({'old_value': old_value})
 | |
| 
 | |
|   def clear_derived_cache():
 | |
|     DerivedStorageForImage.delete().execute()
 | |
|     return 'OK'
 | |
| 
 | |
|   def clear_uncompressed_size(image_id):
 | |
|     image = model.image.get_image_by_id('devtable', 'newrepo', image_id)
 | |
|     image.storage.uncompressed_size = None
 | |
|     image.storage.save()
 | |
|     return 'OK'
 | |
| 
 | |
|   def add_token():
 | |
|     another_token = model.token.create_delegate_token('devtable', 'newrepo', 'my-new-token',
 | |
|                                                       'write')
 | |
|     another_token.code = 'somecooltokencode'
 | |
|     another_token.save()
 | |
|     return another_token.code
 | |
| 
 | |
|   def break_database():
 | |
|     # Close any existing connection.
 | |
|     close_db_filter(None)
 | |
| 
 | |
|     # Reload the database config with an invalid connection.
 | |
|     config = copy.copy(app.config)
 | |
|     config['DB_URI'] = 'sqlite:///not/a/valid/database'
 | |
|     configure(config)
 | |
| 
 | |
|     return 'OK'
 | |
| 
 | |
|   def reload_app(server_hostname):
 | |
|     # Close any existing connection.
 | |
|     close_db_filter(None)
 | |
| 
 | |
|     # Reload the database config.
 | |
|     app.config['SERVER_HOSTNAME'] = server_hostname[len('http://'):]
 | |
|     configure(app.config)
 | |
| 
 | |
|     # Reload random after the process split, as it cannot be used uninitialized across forks.
 | |
|     Random.atfork()
 | |
| 
 | |
|     # Required for anonymous calls to not exception.
 | |
|     g.identity = Identity(None, 'none')
 | |
| 
 | |
|     if os.environ.get('DEBUGLOG') == 'true':
 | |
|       logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
 | |
| 
 | |
|     return 'OK'
 | |
| 
 | |
|   def create_app_repository(namespace, name):
 | |
|     user = model.user.get_user(namespace)
 | |
|     model.repository.create_repository(namespace, name, user, repo_kind='application')
 | |
|     return 'OK'
 | |
| 
 | |
|   def disable_namespace(namespace):
 | |
|     namespace_obj = model.user.get_namespace_user(namespace)
 | |
|     namespace_obj.enabled = False
 | |
|     namespace_obj.save()
 | |
|     return 'OK'
 | |
| 
 | |
|   executor = LiveServerExecutor()
 | |
|   executor.register('generate_csrf', generate_csrf)
 | |
|   executor.register('set_supports_direct_download', set_supports_direct_download)
 | |
|   executor.register('delete_image', delete_image)
 | |
|   executor.register('get_storage_replication_entry', get_storage_replication_entry)
 | |
|   executor.register('set_feature', set_feature)
 | |
|   executor.register('clear_derived_cache', clear_derived_cache)
 | |
|   executor.register('clear_uncompressed_size', clear_uncompressed_size)
 | |
|   executor.register('add_token', add_token)
 | |
|   executor.register('break_database', break_database)
 | |
|   executor.register('reload_app', reload_app)
 | |
|   executor.register('create_app_repository', create_app_repository)
 | |
|   executor.register('disable_namespace', disable_namespace)
 | |
|   return executor
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def liveserver_app(app, registry_server_executor, init_db_path):
 | |
|   registry_server_executor.apply_blueprint_to_app(app)
 | |
| 
 | |
|   if os.environ.get('DEBUG', 'false').lower() == 'true':
 | |
|     app.config['DEBUG'] = True
 | |
| 
 | |
|   # Copy the clean database to a new path. We cannot share the DB created by the
 | |
|   # normal app fixture, as it is already open in the local process.
 | |
|   local_db_file = NamedTemporaryFile(delete=True)
 | |
|   local_db_file.close()
 | |
| 
 | |
|   shutil.copy2(init_db_path, local_db_file.name)
 | |
|   app.config['DB_URI'] = 'sqlite:///{0}'.format(local_db_file.name)
 | |
|   return app
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def app_reloader(liveserver, registry_server_executor):
 | |
|   registry_server_executor.on(liveserver).reload_app(liveserver.url)
 | |
|   yield
 | |
| 
 | |
| 
 | |
| class FeatureFlagValue(object):
 | |
|   """ Helper object which temporarily sets the value of a feature flag.
 | |
| 
 | |
|       Usage:
 | |
| 
 | |
|       with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
 | |
|         ... Features.ANONYMOUS_ACCESS is False in this context ...
 | |
|   """
 | |
| 
 | |
|   def __init__(self, feature_flag, test_value, executor):
 | |
|     self.feature_flag = feature_flag
 | |
|     self.test_value = test_value
 | |
|     self.executor = executor
 | |
| 
 | |
|     self.old_value = None
 | |
| 
 | |
|   def __enter__(self):
 | |
|     result = self.executor.set_feature(self.feature_flag, self.test_value)
 | |
|     self.old_value = result.json()['old_value']
 | |
| 
 | |
|   def __exit__(self, type, value, traceback):
 | |
|     self.executor.set_feature(self.feature_flag, self.old_value)
 | |
| 
 | |
| 
 | |
| class ApiCaller(object):
 | |
|   def __init__(self, liveserver_session, registry_server_executor):
 | |
|     self.liveserver_session = liveserver_session
 | |
|     self.csrf_token = registry_server_executor.on_session(liveserver_session).generate_csrf()
 | |
| 
 | |
|   def conduct_auth(self, username, password):
 | |
|     r = self.post('/api/v1/signin',
 | |
|                   data=json.dumps(dict(username=username, password=password)),
 | |
|                   headers={'Content-Type': 'application/json'})
 | |
|     assert r.status_code == 200
 | |
| 
 | |
|   def _adjust_params(self, kwargs):
 | |
|     if 'params' not in kwargs:
 | |
|       kwargs['params'] = {}
 | |
| 
 | |
|     kwargs['params'].update({
 | |
|       '_csrf_token': self.csrf_token,
 | |
|     })
 | |
|     return kwargs
 | |
| 
 | |
|   def get(self, url, **kwargs):
 | |
|     kwargs = self._adjust_params(kwargs)
 | |
|     return self.liveserver_session.get(url, **kwargs)
 | |
| 
 | |
|   def post(self, url, **kwargs):
 | |
|     kwargs = self._adjust_params(kwargs)
 | |
|     return self.liveserver_session.post(url, **kwargs)
 | |
| 
 | |
|   def change_repo_visibility(self, namespace, repository, visibility):
 | |
|     self.post('/api/v1/repository/%s/%s/changevisibility' % (namespace, repository),
 | |
|               data=json.dumps(dict(visibility=visibility)),
 | |
|               headers={'Content-Type': 'application/json'})
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="function")
 | |
| def api_caller(liveserver, registry_server_executor):
 | |
|   return ApiCaller(liveserver.new_session(), registry_server_executor)
 |