Merge branch 'master' into no-signing-whitelist
This commit is contained in:
commit
45bf7efc84
434 changed files with 10877 additions and 11061 deletions
Binary file not shown.
|
@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser
|
|||
from endpoints.api import api_bp
|
||||
from endpoints.appr import appr_bp
|
||||
from endpoints.web import web
|
||||
from endpoints.verbs import verbs as verbs_bp
|
||||
|
||||
from initdb import initialize_database, populate_database
|
||||
|
||||
|
@ -125,6 +126,11 @@ def initialized_db(appconfig):
|
|||
# Configure the database.
|
||||
configure(appconfig)
|
||||
|
||||
# Initialize caches.
|
||||
model._basequery._lookup_team_roles()
|
||||
model._basequery.get_public_repo_visibility()
|
||||
model.log.get_log_entry_kinds()
|
||||
|
||||
# If under a test *real* database, setup a savepoint.
|
||||
under_test_real_database = bool(os.environ.get('TEST_DATABASE_URI'))
|
||||
if under_test_real_database:
|
||||
|
@ -166,6 +172,7 @@ def app(appconfig, initialized_db):
|
|||
app.register_blueprint(api_bp, url_prefix='/api')
|
||||
app.register_blueprint(appr_bp, url_prefix='/cnr')
|
||||
app.register_blueprint(web, url_prefix='/')
|
||||
app.register_blueprint(verbs_bp, url_prefix='/c1')
|
||||
|
||||
app.config.update(appconfig)
|
||||
return app
|
||||
|
|
|
@ -1015,6 +1015,8 @@ class RegistryTestsMixin(object):
|
|||
|
||||
self.assertEquals(1, len(logs))
|
||||
self.assertEquals('push_repo', logs[0]['kind'])
|
||||
self.assertEquals('public', logs[0]['metadata']['namespace'])
|
||||
self.assertEquals('newrepo', logs[0]['metadata']['repo'])
|
||||
self.assertEquals('public', logs[0]['performer']['name'])
|
||||
|
||||
# Pull the repository.
|
||||
|
@ -1044,6 +1046,8 @@ class RegistryTestsMixin(object):
|
|||
|
||||
self.assertEquals(1, len(logs))
|
||||
self.assertEquals('push_repo', logs[0]['kind'])
|
||||
self.assertEquals('buynlarge', logs[0]['metadata']['namespace'])
|
||||
self.assertEquals('newrepo', logs[0]['metadata']['repo'])
|
||||
self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name'])
|
||||
|
||||
# Pull the repository.
|
||||
|
@ -1055,6 +1059,8 @@ class RegistryTestsMixin(object):
|
|||
|
||||
self.assertEquals(2, len(logs))
|
||||
self.assertEquals('pull_repo', logs[0]['kind'])
|
||||
self.assertEquals('buynlarge', logs[0]['metadata']['namespace'])
|
||||
self.assertEquals('newrepo', logs[0]['metadata']['repo'])
|
||||
self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name'])
|
||||
|
||||
|
||||
|
@ -1074,6 +1080,8 @@ class RegistryTestsMixin(object):
|
|||
logs = result.json()['logs']
|
||||
|
||||
self.assertEquals('pull_repo', logs[0]['kind'])
|
||||
self.assertEquals('devtable', logs[0]['metadata']['namespace'])
|
||||
self.assertEquals('newrepo', logs[0]['metadata']['repo'])
|
||||
self.assertEquals('my-new-token', logs[0]['metadata']['token'])
|
||||
|
||||
|
||||
|
@ -1091,6 +1099,8 @@ class RegistryTestsMixin(object):
|
|||
|
||||
self.assertEquals(2, len(logs))
|
||||
self.assertEquals('pull_repo', logs[0]['kind'])
|
||||
self.assertEquals('devtable', logs[0]['metadata']['namespace'])
|
||||
self.assertEquals('newrepo', logs[0]['metadata']['repo'])
|
||||
|
||||
self.assertEquals('devtable', logs[0]['performer']['name'])
|
||||
self.assertEquals(1, logs[0]['metadata']['oauth_token_id'])
|
||||
|
@ -1792,35 +1802,60 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
def test_one_five_blacklist(self):
|
||||
self.conduct('GET', '/v2/', expected_code=404, user_agent='Go 1.1 package http')
|
||||
|
||||
def test_catalog(self):
|
||||
def test_normal_catalog(self):
|
||||
# Look for public repositories and ensure all are public.
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) > 0)
|
||||
with TestFeature(self, 'PUBLIC_CATALOG', False):
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) == 0)
|
||||
|
||||
for reponame in data['repositories']:
|
||||
self.assertTrue(reponame.find('public/') == 0)
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
all_repos = []
|
||||
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
def test_public_catalog(self):
|
||||
# Look for public repositories and ensure all are public.
|
||||
with TestFeature(self, 'PUBLIC_CATALOG', True):
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) > 0)
|
||||
|
||||
# Ensure we have a next link.
|
||||
self.assertIsNotNone(response.headers.get('Link'))
|
||||
for reponame in data['repositories']:
|
||||
self.assertTrue(reponame.find('public/') == 0)
|
||||
|
||||
# Request with the next link.
|
||||
link_url = response.headers.get('Link')[1:].split(';')[0][:-1]
|
||||
v2_index = link_url.find('/v2/')
|
||||
relative_url = link_url[v2_index:]
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
all_repos = []
|
||||
|
||||
next_response = self.conduct('GET', relative_url, auth='jwt')
|
||||
next_data = next_response.json()
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
all_repos.extend(data['repositories'])
|
||||
|
||||
self.assertEquals(len(next_data['repositories']), 2)
|
||||
self.assertNotEquals(next_data['repositories'], data['repositories'])
|
||||
# Ensure we have a next link.
|
||||
self.assertIsNotNone(response.headers.get('Link'))
|
||||
|
||||
# Request with the next link.
|
||||
while response.headers.get('Link'):
|
||||
link_url = response.headers.get('Link')[1:].split(';')[0][:-1]
|
||||
v2_index = link_url.find('/v2/')
|
||||
relative_url = link_url[v2_index:]
|
||||
|
||||
next_response = self.conduct('GET', relative_url, auth='jwt')
|
||||
next_data = next_response.json()
|
||||
all_repos.extend(next_data['repositories'])
|
||||
|
||||
self.assertTrue(len(next_data['repositories']) <= 2)
|
||||
self.assertNotEquals(next_data['repositories'], data['repositories'])
|
||||
response = next_response
|
||||
|
||||
# Ensure the authed request has the public repository.
|
||||
public = [reponame for reponame in all_repos if reponame.find('/publicrepo') >= 0]
|
||||
self.assertTrue(bool(public))
|
||||
|
||||
|
||||
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
|
||||
|
|
|
@ -509,100 +509,3 @@ def build_v2_index_specs():
|
|||
request_status(401, 401, 401, 401, 404),
|
||||
]
|
||||
|
||||
|
||||
class VerbTestSpec(object):
|
||||
def __init__(self, index_name, method_name, repo_name, rpath=False, **kwargs):
|
||||
self.index_name = index_name
|
||||
self.repo_name = repo_name
|
||||
self.method_name = method_name
|
||||
self.single_repository_path = rpath
|
||||
|
||||
self.kwargs = kwargs
|
||||
|
||||
self.anon_code = 401
|
||||
self.no_access_code = 403
|
||||
self.read_code = 200
|
||||
self.admin_code = 200
|
||||
self.creator_code = 200
|
||||
|
||||
def request_status(self, anon_code=401, no_access_code=403, read_code=200, creator_code=200,
|
||||
admin_code=200):
|
||||
self.anon_code = anon_code
|
||||
self.no_access_code = no_access_code
|
||||
self.read_code = read_code
|
||||
self.creator_code = creator_code
|
||||
self.admin_code = admin_code
|
||||
return self
|
||||
|
||||
def get_url(self):
|
||||
if self.single_repository_path:
|
||||
return url_for(self.index_name, repository=self.repo_name, **self.kwargs)
|
||||
else:
|
||||
(namespace, repo_name) = self.repo_name.split('/')
|
||||
return url_for(self.index_name, namespace=namespace, repository=repo_name, **self.kwargs)
|
||||
|
||||
def gen_basic_auth(self, username, password):
|
||||
encoded = b64encode('%s:%s' % (username, password))
|
||||
return 'basic %s' % encoded
|
||||
|
||||
ACI_ARGS = {
|
||||
'server': 'someserver',
|
||||
'tag': 'fake',
|
||||
'os': 'linux',
|
||||
'arch': 'x64',
|
||||
}
|
||||
|
||||
def build_verbs_specs():
|
||||
return [
|
||||
# get_aci_signature
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_signature', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_aci_image
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', PUBLIC_REPO, **ACI_ARGS).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', PRIVATE_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_aci_image', 'GET', ANOTHER_ORG_REPO, **ACI_ARGS).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_squashed_tag
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', PUBLIC_REPO, tag='fake').
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', PRIVATE_REPO, tag='fake').
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', ORG_REPO, tag='fake').
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_squashed_tag', 'GET', ANOTHER_ORG_REPO, tag='fake').
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
|
||||
# get_tag_torrent
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', PUBLIC_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(404, 404, 404, 404, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', PRIVATE_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', ORG_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 404, 403, 404),
|
||||
|
||||
VerbTestSpec('verbs.get_tag_torrent', 'GET', ANOTHER_ORG_REPO, digest='sha256:1234', rpath=True).
|
||||
request_status(403, 403, 403, 403, 404),
|
||||
]
|
||||
|
|
|
@ -4318,7 +4318,7 @@ class TestSuperUserMessages(ApiTestCase):
|
|||
def setUp(self):
|
||||
ApiTestCase.setUp(self)
|
||||
self._set_url(GlobalUserMessages)
|
||||
self.message = {'message': {'content': '', 'severity': 'info', 'media_type': 'text/plain'}}
|
||||
self.message = {'message': {'content': 'msg', 'severity': 'info', 'media_type': 'text/plain'}}
|
||||
|
||||
def test_post_anonymous(self):
|
||||
self._run_test('POST', 401, None, None)
|
||||
|
|
|
@ -4834,6 +4834,20 @@ class TestRepositoryManifestLabels(ApiTestCase):
|
|||
|
||||
self.assertEquals(0, len(json['labels']))
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels,
|
||||
params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
data=dict(key='bad_label', value='world',
|
||||
media_type='text/plain'),
|
||||
expected_code=400)
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels,
|
||||
params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
data=dict(key='hello', value='world',
|
||||
media_type='bad_media_type'),
|
||||
expected_code=400)
|
||||
|
||||
# Add some labels to the manifest.
|
||||
with assert_action_logged('manifest_label_add'):
|
||||
label1 = self.postJsonResponse(RepositoryManifestLabels,
|
||||
|
|
|
@ -84,6 +84,9 @@ class TestCloudStorage(unittest.TestCase):
|
|||
self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type')
|
||||
self.assertEquals(new_data, self.engine.get_content(_TEST_PATH))
|
||||
|
||||
def test_chunked_upload_no_chunks(self):
|
||||
self._chunk_upload_test(0)
|
||||
|
||||
def test_chunked_upload_single_chunk(self):
|
||||
self._chunk_upload_test(1)
|
||||
|
||||
|
@ -114,7 +117,7 @@ class TestCloudStorage(unittest.TestCase):
|
|||
self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY]))
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata,
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata,
|
||||
force_client_side=force_client_side)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
|
|
552
test/test_gc.py
552
test/test_gc.py
|
@ -1,552 +0,0 @@
|
|||
import unittest
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
from contextlib import contextmanager
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
||||
from app import app, storage
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import model, database
|
||||
from data.database import Image, ImageStorage, DerivedStorageForImage, Label, TagManifestLabel, Blob
|
||||
|
||||
|
||||
ADMIN_ACCESS_USER = 'devtable'
|
||||
PUBLIC_USER = 'public'
|
||||
|
||||
REPO = 'somerepo'
|
||||
|
||||
|
||||
class TestGarbageCollection(unittest.TestCase):
|
||||
@staticmethod
|
||||
def _set_tag_expiration_policy(namespace, expiration_s):
|
||||
namespace_user = model.user.get_user(namespace)
|
||||
model.user.change_user_tag_expiration(namespace_user, expiration_s)
|
||||
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
self._set_tag_expiration_policy(ADMIN_ACCESS_USER, 0)
|
||||
self._set_tag_expiration_policy(PUBLIC_USER, 0)
|
||||
|
||||
self.app = app.test_client()
|
||||
self.ctx = app.test_request_context()
|
||||
self.ctx.__enter__()
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
@staticmethod
|
||||
def createImage(docker_image_id, repository_obj, username):
|
||||
preferred = storage.preferred_locations[0]
|
||||
image = model.image.find_create_or_link_image(docker_image_id, repository_obj, username, {},
|
||||
preferred)
|
||||
image.storage.uploading = False
|
||||
image.storage.save()
|
||||
|
||||
# Create derived images as well.
|
||||
model.image.find_or_create_derived_storage(image, 'squash', preferred)
|
||||
model.image.find_or_create_derived_storage(image, 'aci', preferred)
|
||||
|
||||
# Add some torrent info.
|
||||
try:
|
||||
database.TorrentInfo.get(storage=image.storage)
|
||||
except database.TorrentInfo.DoesNotExist:
|
||||
model.storage.save_torrent_info(image.storage, 1, 'helloworld')
|
||||
|
||||
# Add some additional placements to the image.
|
||||
for location_name in ['local_eu']:
|
||||
location = database.ImageStorageLocation.get(name=location_name)
|
||||
|
||||
try:
|
||||
database.ImageStoragePlacement.get(location=location, storage=image.storage)
|
||||
except:
|
||||
continue
|
||||
|
||||
database.ImageStoragePlacement.create(location=location, storage=image.storage)
|
||||
|
||||
return image.storage
|
||||
|
||||
def createRepository(self, namespace=ADMIN_ACCESS_USER, name=REPO, **kwargs):
|
||||
user = model.user.get_user(namespace)
|
||||
repo = model.repository.create_repository(namespace, name, user)
|
||||
|
||||
# Populate the repository with the tags.
|
||||
image_map = {}
|
||||
for tag_name in kwargs:
|
||||
image_ids = kwargs[tag_name]
|
||||
parent = None
|
||||
|
||||
for image_id in image_ids:
|
||||
if not image_id in image_map:
|
||||
image_map[image_id] = self.createImage(image_id, repo, namespace)
|
||||
|
||||
v1_metadata = {
|
||||
'id': image_id,
|
||||
}
|
||||
if parent is not None:
|
||||
v1_metadata['parent'] = parent.docker_image_id
|
||||
|
||||
# Set the ancestors for the image.
|
||||
parent = model.image.set_image_metadata(image_id, namespace, name, '', '', '', v1_metadata,
|
||||
parent=parent)
|
||||
|
||||
# Set the tag for the image.
|
||||
tag_manifest, _ = model.tag.store_tag_manifest(namespace, name, tag_name, image_ids[-1],
|
||||
'sha:someshahere', '{}')
|
||||
|
||||
# Add some labels to the tag.
|
||||
model.label.create_manifest_label(tag_manifest, 'foo', 'bar', 'manifest')
|
||||
model.label.create_manifest_label(tag_manifest, 'meh', 'grah', 'manifest')
|
||||
|
||||
return repo
|
||||
|
||||
def gcNow(self, repository):
|
||||
self.assertTrue(model.repository.garbage_collect_repo(repository))
|
||||
|
||||
def deleteTag(self, repository, tag, perform_gc=True):
|
||||
model.tag.delete_tag(repository.namespace_user.username, repository.name, tag)
|
||||
if perform_gc:
|
||||
self.assertTrue(model.repository.garbage_collect_repo(repository))
|
||||
|
||||
def moveTag(self, repository, tag, docker_image_id):
|
||||
model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag,
|
||||
docker_image_id)
|
||||
self.assertTrue(model.repository.garbage_collect_repo(repository))
|
||||
|
||||
def assertNotDeleted(self, repository, *args):
|
||||
for docker_image_id in args:
|
||||
self.assertTrue(bool(model.image.get_image_by_id(repository.namespace_user.username,
|
||||
repository.name, docker_image_id)))
|
||||
|
||||
def assertDeleted(self, repository, *args):
|
||||
for docker_image_id in args:
|
||||
try:
|
||||
# Verify the image is missing when accessed by the repository.
|
||||
model.image.get_image_by_id(repository.namespace_user.username, repository.name,
|
||||
docker_image_id)
|
||||
except model.DataModelException:
|
||||
return
|
||||
|
||||
self.fail('Expected image %s to be deleted' % docker_image_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_dangling_storage_count():
|
||||
storage_ids = set([current.id for current in ImageStorage.select()])
|
||||
referenced_by_image = set([image.storage_id for image in Image.select()])
|
||||
referenced_by_derived = set([derived.derivative_id
|
||||
for derived in DerivedStorageForImage.select()])
|
||||
|
||||
return len(storage_ids - referenced_by_image - referenced_by_derived)
|
||||
|
||||
@staticmethod
|
||||
def _get_dangling_label_count():
|
||||
label_ids = set([current.id for current in Label.select()])
|
||||
referenced_by_manifest = set([mlabel.label_id for mlabel in TagManifestLabel.select()])
|
||||
return len(label_ids - referenced_by_manifest)
|
||||
|
||||
@contextmanager
|
||||
def assert_gc_integrity(self, expect_storage_removed=True):
|
||||
""" Specialized assertion for ensuring that GC cleans up all dangling storages
|
||||
and labels, invokes the callback for images removed and doesn't invoke the
|
||||
callback for images *not* removed.
|
||||
"""
|
||||
# TODO: Consider also asserting the number of DB queries being performed.
|
||||
|
||||
# Add a callback for when images are removed.
|
||||
removed_image_storages = []
|
||||
model.config.register_image_cleanup_callback(removed_image_storages.extend)
|
||||
|
||||
# Store the number of dangling storages and labels.
|
||||
existing_storage_count = self._get_dangling_storage_count()
|
||||
existing_label_count = self._get_dangling_label_count()
|
||||
yield
|
||||
|
||||
# Ensure the number of dangling storages and labels has not changed.
|
||||
updated_storage_count = self._get_dangling_storage_count()
|
||||
self.assertEqual(updated_storage_count, existing_storage_count)
|
||||
|
||||
updated_label_count = self._get_dangling_label_count()
|
||||
self.assertEqual(updated_label_count, existing_label_count)
|
||||
|
||||
# Ensure that for each call to the image+storage cleanup callback, the image and its
|
||||
# storage is not found *anywhere* in the database.
|
||||
for removed_image_and_storage in removed_image_storages:
|
||||
with self.assertRaises(Image.DoesNotExist):
|
||||
Image.get(id=removed_image_and_storage.id)
|
||||
|
||||
with self.assertRaises(ImageStorage.DoesNotExist):
|
||||
ImageStorage.get(id=removed_image_and_storage.storage_id)
|
||||
|
||||
with self.assertRaises(ImageStorage.DoesNotExist):
|
||||
ImageStorage.get(uuid=removed_image_and_storage.storage.uuid)
|
||||
|
||||
self.assertEquals(expect_storage_removed, bool(removed_image_storages))
|
||||
|
||||
# Ensure all CAS storage is in the storage engine.
|
||||
preferred = storage.preferred_locations[0]
|
||||
for storage_row in ImageStorage.select():
|
||||
if storage_row.cas_path:
|
||||
storage.get_content({preferred}, storage.blob_path(storage_row.content_checksum))
|
||||
|
||||
for blob_row in Blob.select():
|
||||
storage.get_content({preferred}, storage.blob_path(blob_row.digest))
|
||||
|
||||
def test_has_garbage(self):
|
||||
""" Remove all existing repositories, then add one without garbage, check, then add one with
|
||||
garbage, and check again.
|
||||
"""
|
||||
# Delete all existing repos.
|
||||
for repo in database.Repository.select().order_by(database.Repository.id):
|
||||
self.assertTrue(model.repository.purge_repository(repo.namespace_user.username, repo.name))
|
||||
|
||||
# Change the time machine expiration on the namespace.
|
||||
(database.User
|
||||
.update(removed_tag_expiration_s=1000000000)
|
||||
.where(database.User.username == ADMIN_ACCESS_USER)
|
||||
.execute())
|
||||
|
||||
# Create a repository without any garbage.
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
|
||||
|
||||
# Ensure that no repositories are returned by the has garbage check.
|
||||
self.assertIsNone(model.repository.find_repository_with_garbage(1000000000))
|
||||
|
||||
# Delete a tag.
|
||||
self.deleteTag(repository, 'latest', perform_gc=False)
|
||||
|
||||
# There should still not be any repositories with garbage, due to time machine.
|
||||
self.assertIsNone(model.repository.find_repository_with_garbage(1000000000))
|
||||
|
||||
# Change the time machine expiration on the namespace.
|
||||
(database.User
|
||||
.update(removed_tag_expiration_s=0)
|
||||
.where(database.User.username == ADMIN_ACCESS_USER)
|
||||
.execute())
|
||||
|
||||
# Now we should find the repository for GC.
|
||||
repository = model.repository.find_repository_with_garbage(0)
|
||||
self.assertIsNotNone(repository)
|
||||
self.assertEquals(REPO, repository.name)
|
||||
|
||||
# GC the repository.
|
||||
self.assertTrue(model.repository.garbage_collect_repo(repository))
|
||||
|
||||
# There should now be no repositories with garbage.
|
||||
self.assertIsNone(model.repository.find_repository_with_garbage(0))
|
||||
|
||||
def test_find_garbage_policy_functions(self):
|
||||
with assert_query_count(1):
|
||||
one_policy = model.repository.get_random_gc_policy()
|
||||
all_policies = model.repository._get_gc_expiration_policies()
|
||||
self.assertIn(one_policy, all_policies)
|
||||
|
||||
def test_one_tag(self):
|
||||
""" Create a repository with a single tag, then remove that tag and verify that the repository
|
||||
is now empty. """
|
||||
with self.assert_gc_integrity():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
def test_two_tags_unshared_images(self):
|
||||
""" Repository has two tags with no shared images between them. """
|
||||
with self.assert_gc_integrity():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'f1', 'f2')
|
||||
|
||||
def test_two_tags_shared_images(self):
|
||||
""" Repository has two tags with shared images. Deleting the tag should only remove the
|
||||
unshared images.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
def test_unrelated_repositories(self):
|
||||
""" Two repositories with different images. Removing the tag from one leaves the other's
|
||||
images intact.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'j1', 'j2', 'j3')
|
||||
|
||||
def test_related_repositories(self):
|
||||
""" Two repositories with shared images. Removing the tag from one leaves the other's
|
||||
images intact.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
|
||||
self.assertDeleted(repository1, 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'j1')
|
||||
|
||||
def test_inaccessible_repositories(self):
|
||||
""" Two repositories under different namespaces should result in the images being deleted
|
||||
but not completely removed from the database.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
|
||||
repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'i3')
|
||||
|
||||
def test_multiple_shared_images(self):
|
||||
""" Repository has multiple tags with shared images. Selectively deleting the tags, and
|
||||
verifying at each step.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
|
||||
# Delete tag other. Should delete f2, since it is not shared.
|
||||
self.deleteTag(repository, 'other')
|
||||
self.assertDeleted(repository, 'f2')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1')
|
||||
|
||||
# Move tag fourth to i3. This should remove f1 since it is no longer referenced.
|
||||
self.moveTag(repository, 'fourth', 'i3')
|
||||
self.assertDeleted(repository, 'f1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
|
||||
# Delete tag 'latest'. This should do nothing since fourth is on the same branch.
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
|
||||
# Delete tag 'third'. This should remove t1->t3.
|
||||
self.deleteTag(repository, 'third')
|
||||
self.assertDeleted(repository, 't1', 't2', 't3')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
# Add tag to i1.
|
||||
self.moveTag(repository, 'newtag', 'i1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
# Delete tag 'fourth'. This should remove i2 and i3.
|
||||
self.deleteTag(repository, 'fourth')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
|
||||
# Delete tag 'newtag'. This should remove the remaining image.
|
||||
self.deleteTag(repository, 'newtag')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
def test_empty_gc(self):
|
||||
with self.assert_gc_integrity(expect_storage_removed=False):
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
|
||||
self.gcNow(repository)
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2')
|
||||
|
||||
def test_time_machine_no_gc(self):
|
||||
""" Repository has two tags with shared images. Deleting the tag should not remove any images
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=False):
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
def test_time_machine_gc(self):
|
||||
""" Repository has two tags with shared images. Deleting the second tag should cause the images
|
||||
for the first deleted tag to gc.
|
||||
"""
|
||||
with self.assert_gc_integrity():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 1)
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
self.deleteTag(repository, 'other') # This will cause the images associated with latest to gc
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
def test_images_shared_storage(self):
|
||||
""" Repository with two tags, both with the same shared storage. Deleting the first
|
||||
tag should delete the first image, but *not* its storage.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=False):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Add two tags, each with their own image, but with the same storage.
|
||||
image_storage = model.storage.create_v1_storage(storage.preferred_locations[0])
|
||||
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=image_storage,
|
||||
ancestors='/')
|
||||
|
||||
second_image = Image.create(docker_image_id='i2',
|
||||
repository=repository, storage=image_storage,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere', '{}')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'second', second_image.docker_image_id,
|
||||
'sha:someshahere', '{}')
|
||||
|
||||
# Delete the first tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
self.assertNotDeleted(repository, 'i2')
|
||||
|
||||
def test_image_with_cas(self):
|
||||
""" A repository with a tag pointing to an image backed by CAS. Deleting and GCing the tag
|
||||
should result in the storage and its CAS data being removed.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=True):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Create an image storage record under CAS.
|
||||
content = 'hello world'
|
||||
digest = 'sha256:' + hashlib.sha256(content).hexdigest()
|
||||
preferred = storage.preferred_locations[0]
|
||||
storage.put_content({preferred}, storage.blob_path(digest), content)
|
||||
|
||||
image_storage = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
location = database.ImageStorageLocation.get(name=preferred)
|
||||
database.ImageStoragePlacement.create(location=location, storage=image_storage)
|
||||
|
||||
# Ensure the CAS path exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
# Create the image and the tag.
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=image_storage,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere1', '{}')
|
||||
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
|
||||
# Delete the tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
# Ensure the CAS path is gone.
|
||||
self.assertFalse(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
def test_images_shared_cas(self):
|
||||
""" A repository, each two tags, pointing to the same image, which has image storage
|
||||
with the same *CAS path*, but *distinct records*. Deleting the first tag should delete the
|
||||
first image, and its storage, but not the file in storage, as it shares its CAS path.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=True):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Create two image storage records with the same content checksum.
|
||||
content = 'hello world'
|
||||
digest = 'sha256:' + hashlib.sha256(content).hexdigest()
|
||||
preferred = storage.preferred_locations[0]
|
||||
storage.put_content({preferred}, storage.blob_path(digest), content)
|
||||
|
||||
is1 = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
is2 = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
|
||||
location = database.ImageStorageLocation.get(name=preferred)
|
||||
|
||||
database.ImageStoragePlacement.create(location=location, storage=is1)
|
||||
database.ImageStoragePlacement.create(location=location, storage=is2)
|
||||
|
||||
# Ensure the CAS path exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
# Create two images in the repository, and two tags, each pointing to one of the storages.
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=is1,
|
||||
ancestors='/')
|
||||
|
||||
second_image = Image.create(docker_image_id='i2',
|
||||
repository=repository, storage=is2,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere1', '{}')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'second', second_image.docker_image_id,
|
||||
'sha:someshahere2', '{}')
|
||||
|
||||
self.assertNotDeleted(repository, 'i1', 'i2')
|
||||
|
||||
# Delete the first tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
self.assertNotDeleted(repository, 'i2')
|
||||
|
||||
# Ensure the CAS path still exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
def test_images_shared_cas_with_new_blob_table(self):
|
||||
""" A repository with a tag and image that shares its CAS path with a record in the new Blob
|
||||
table. Deleting the first tag should delete the first image, and its storage, but not the
|
||||
file in storage, as it shares its CAS path with the blob row.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=True):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Create two image storage records with the same content checksum.
|
||||
content = 'hello world'
|
||||
digest = 'sha256:' + hashlib.sha256(content).hexdigest()
|
||||
preferred = storage.preferred_locations[0]
|
||||
storage.put_content({preferred}, storage.blob_path(digest), content)
|
||||
|
||||
media_type = database.MediaType.get(name='text/plain')
|
||||
|
||||
is1 = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
is2 = database.Blob.create(digest=digest, size=0, media_type=media_type)
|
||||
|
||||
location = database.ImageStorageLocation.get(name=preferred)
|
||||
database.ImageStoragePlacement.create(location=location, storage=is1)
|
||||
|
||||
# Ensure the CAS path exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
# Create the image in the repository, and the tag.
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=is1,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere1', '{}')
|
||||
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
|
||||
# Delete the tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
# Ensure the CAS path still exists, as it is referenced by the Blob table.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -8,7 +8,7 @@ from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|||
from endpoints.notificationevent import VulnerabilityFoundEvent
|
||||
from endpoints.v2 import v2_bp
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from util.secscan.api import SecurityScannerAPI
|
||||
from util.secscan.api import SecurityScannerAPI, APIRequestFailure
|
||||
from util.secscan.analyzer import LayerAnalyzer
|
||||
from util.secscan.fake import fake_security_scanner
|
||||
from util.secscan.notifier import SecurityNotificationHandler, ProcessNotificationPageResult
|
||||
|
@ -160,7 +160,8 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer))
|
||||
|
||||
analyzer = LayerAnalyzer(app.config, self.api)
|
||||
analyzer.analyze_recursively(layer)
|
||||
with self.assertRaises(APIRequestFailure):
|
||||
analyzer.analyze_recursively(layer)
|
||||
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertAnalyzed(layer, security_scanner, False, -1)
|
||||
|
@ -184,6 +185,27 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||
|
||||
def test_analyze_layer_unexpected_status(self):
|
||||
""" Tests that a response from a scanner with an unexpected status code fails correctly. """
|
||||
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||
self.assertFalse(layer.security_indexed)
|
||||
self.assertEquals(-1, layer.security_indexed_engine)
|
||||
|
||||
with fake_security_scanner() as security_scanner:
|
||||
# Make is so trying to analyze the parent will fail with an error.
|
||||
security_scanner.set_unexpected_status_layer_id(security_scanner.layer_id(layer.parent))
|
||||
|
||||
# Try to the layer and its parents, but with one request causing an error.
|
||||
analyzer = LayerAnalyzer(app.config, self.api)
|
||||
with self.assertRaises(APIRequestFailure):
|
||||
analyzer.analyze_recursively(layer)
|
||||
|
||||
# Make sure it isn't analyzed.
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertAnalyzed(layer, security_scanner, False, -1)
|
||||
|
||||
|
||||
def test_analyze_layer_missing_parent_handled(self):
|
||||
""" Tests that a missing parent causes an automatic reanalysis, which succeeds. """
|
||||
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
import unittest
|
||||
|
||||
import endpoints.decorated # Register the various exceptions via decorators.
|
||||
|
||||
from app import app
|
||||
from endpoints.verbs import verbs
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from test.specs import build_verbs_specs
|
||||
|
||||
app.register_blueprint(verbs, url_prefix='/c1')
|
||||
|
||||
NO_ACCESS_USER = 'freshuser'
|
||||
READ_ACCESS_USER = 'reader'
|
||||
ADMIN_ACCESS_USER = 'devtable'
|
||||
CREATOR_ACCESS_USER = 'creator'
|
||||
|
||||
|
||||
class EndpointTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
|
||||
class _SpecTestBuilder(type):
|
||||
@staticmethod
|
||||
def _test_generator(url, test_spec, attrs):
|
||||
def test(self):
|
||||
with app.test_client() as c:
|
||||
headers = {}
|
||||
|
||||
if attrs['auth_username']:
|
||||
headers['Authorization'] = test_spec.gen_basic_auth(attrs['auth_username'], 'password')
|
||||
|
||||
expected_status = getattr(test_spec, attrs['result_attr'])
|
||||
|
||||
rv = c.open(url, headers=headers, method=test_spec.method_name)
|
||||
msg = '%s %s: got %s, expected: %s (auth: %s | headers %s)' % (test_spec.method_name,
|
||||
test_spec.index_name, rv.status_code, expected_status, attrs['auth_username'],
|
||||
headers)
|
||||
|
||||
self.assertEqual(rv.status_code, expected_status, msg)
|
||||
|
||||
return test
|
||||
|
||||
|
||||
def __new__(cls, name, bases, attrs):
|
||||
with app.test_request_context() as ctx:
|
||||
specs = attrs['spec_func']()
|
||||
for test_spec in specs:
|
||||
test_name = '%s_%s_%s_%s_%s' % (test_spec.index_name, test_spec.method_name,
|
||||
test_spec.repo_name, attrs['auth_username'] or 'anon',
|
||||
attrs['result_attr'])
|
||||
test_name = test_name.replace('/', '_').replace('-', '_')
|
||||
|
||||
test_name = 'test_' + test_name.lower().replace('verbs.', 'verbs_')
|
||||
url = test_spec.get_url()
|
||||
attrs[test_name] = _SpecTestBuilder._test_generator(url, test_spec, attrs)
|
||||
|
||||
return type(name, bases, attrs)
|
||||
|
||||
|
||||
class TestAnonymousAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'anon_code'
|
||||
auth_username = None
|
||||
|
||||
|
||||
class TestNoAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'no_access_code'
|
||||
auth_username = NO_ACCESS_USER
|
||||
|
||||
|
||||
class TestReadAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'read_code'
|
||||
auth_username = READ_ACCESS_USER
|
||||
|
||||
|
||||
class TestCreatorAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'creator_code'
|
||||
auth_username = CREATOR_ACCESS_USER
|
||||
|
||||
|
||||
class TestAdminAccess(EndpointTestCase):
|
||||
__metaclass__ = _SpecTestBuilder
|
||||
spec_func = build_verbs_specs
|
||||
result_attr = 'admin_code'
|
||||
auth_username = ADMIN_ACCESS_USER
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue