code-stye Yapf: 5 files updated

data/interfaces/appr.py endpoints/appr/cnr_backend.py endpoints/appr/registry.py endpoints/appr/test/test_api.py endpoints/appr/test/test_registry.py
This commit is contained in:
Antoine Legrand 2017-04-13 16:00:50 +02:00
parent 578f87f94c
commit 599ce0de54
6 changed files with 112 additions and 130 deletions

View file

@ -7,7 +7,6 @@ import cnr.semver
from cnr.exception import raise_package_not_found, raise_channel_not_found from cnr.exception import raise_package_not_found, raise_channel_not_found
from six import add_metaclass from six import add_metaclass
from app import storage, authentication from app import storage, authentication
from data import model, oci_model from data import model, oci_model
from data.database import Tag, Manifest, MediaType, Blob, Repository, Channel from data.database import Tag, Manifest, MediaType, Blob, Repository, Channel
@ -30,31 +29,23 @@ class ChannelView(namedtuple('ChannelView', ['name', 'current'])):
""" """
class ApplicationSummaryView(namedtuple('ApplicationSummaryView', ['name', class ApplicationSummaryView(
'namespace', namedtuple('ApplicationSummaryView', [
'visibility', 'name', 'namespace', 'visibility', 'default', 'manifests', 'channels', 'releases',
'default', 'updated_at', 'created_at'
'manifests', ])):
'channels',
'releases',
'updated_at',
'created_at'])):
""" ApplicationSummaryView is an aggregated view of an application repository. """ ApplicationSummaryView is an aggregated view of an application repository.
""" """
class ApplicationManifest(namedtuple('ApplicationManifest', ['mediaType', class ApplicationManifest(namedtuple('ApplicationManifest', ['mediaType', 'digest', 'content'])):
'digest',
'content'])):
""" ApplicationManifest embed the BlobDescriptor and some metadata around it. """ ApplicationManifest embed the BlobDescriptor and some metadata around it.
An ApplicationManifest is content-addressable. An ApplicationManifest is content-addressable.
""" """
class ApplicationRelease(namedtuple('ApplicationRelease', ['release', class ApplicationRelease(
'name', namedtuple('ApplicationRelease', ['release', 'name', 'created_at', 'manifest'])):
'created_at',
'manifest'])):
""" The ApplicationRelease associates an ApplicationManifest to a repository and release. """ The ApplicationRelease associates an ApplicationManifest to a repository and release.
""" """
@ -221,12 +212,15 @@ class OCIAppModel(AppRegistryDataInterface):
releases = [t.name for t in repo.tag_set_prefetch] releases = [t.name for t in repo.tag_set_prefetch]
if not releases: if not releases:
continue continue
available_releases = [str(x) for x in sorted(cnr.semver.versions(releases, False), available_releases = [
reverse=True)] str(x) for x in sorted(cnr.semver.versions(releases, False), reverse=True)
]
channels = None channels = None
if with_channels: if with_channels:
channels = [ChannelView(name=chan.name, current=chan.linked_tag.name) channels = [
for chan in oci_model.channel.get_repo_channels(repo)] ChannelView(name=chan.name, current=chan.linked_tag.name)
for chan in oci_model.channel.get_repo_channels(repo)
]
app_name = _join_package_name(repo.namespace_user.username, repo.name) app_name = _join_package_name(repo.namespace_user.username, repo.name)
manifests = self.list_manifests(app_name, available_releases[0]) manifests = self.list_manifests(app_name, available_releases[0])
@ -239,8 +233,7 @@ class OCIAppModel(AppRegistryDataInterface):
manifests=manifests, manifests=manifests,
releases=available_releases, releases=available_releases,
updated_at=_timestamp_to_iso(repo.tag_set_prefetch[-1].lifetime_start), updated_at=_timestamp_to_iso(repo.tag_set_prefetch[-1].lifetime_start),
created_at=_timestamp_to_iso(repo.tag_set_prefetch[0].lifetime_start), created_at=_timestamp_to_iso(repo.tag_set_prefetch[0].lifetime_start),)
)
views.append(view) views.append(view)
return views return views
@ -270,9 +263,10 @@ class OCIAppModel(AppRegistryDataInterface):
Todo: Todo:
* Filter results with readeable reposistory for the user (including visibilitys) * Filter results with readeable reposistory for the user (including visibilitys)
""" """
return [_join_package_name(r.namespace_user.username, r.name) return [
for r in model.repository.get_app_search(lookup=query, username=username, limit=50)] _join_package_name(r.namespace_user.username, r.name)
for r in model.repository.get_app_search(lookup=query, username=username, limit=50)
]
def list_releases(self, package_name, media_type=None): def list_releases(self, package_name, media_type=None):
""" Return the list of all releases of an Application """ Return the list of all releases of an Application
@ -307,21 +301,15 @@ class OCIAppModel(AppRegistryDataInterface):
created_at = _timestamp_to_iso(tag.lifetime_start) created_at = _timestamp_to_iso(tag.lifetime_start)
blob_descriptor = BlobDescriptor(digest=_strip_sha256_header(blob.digest), blob_descriptor = BlobDescriptor(digest=_strip_sha256_header(blob.digest),
mediaType=blob.media_type.name, mediaType=blob.media_type.name, size=blob.size, urls=[])
size=blob.size, urls=[])
app_manifest = ApplicationManifest(digest=manifest.digest, mediaType=manifest.media_type.name, app_manifest = ApplicationManifest(
content=blob_descriptor) digest=manifest.digest, mediaType=manifest.media_type.name, content=blob_descriptor)
app_release = ApplicationRelease(release=tag.name, app_release = ApplicationRelease(release=tag.name, created_at=created_at, name=package_name,
created_at=created_at,
name=package_name,
manifest=app_manifest) manifest=app_manifest)
return app_release return app_release
except (Tag.DoesNotExist, except (Tag.DoesNotExist, Manifest.DoesNotExist, Blob.DoesNotExist, Repository.DoesNotExist,
Manifest.DoesNotExist,
Blob.DoesNotExist,
Repository.DoesNotExist,
MediaType.DoesNotExist): MediaType.DoesNotExist):
raise_package_not_found(package_name, release, media_type) raise_package_not_found(package_name, release, media_type)
@ -330,14 +318,10 @@ class OCIAppModel(AppRegistryDataInterface):
path = cnrblob.upload_url(cnrblob.digest) path = cnrblob.upload_url(cnrblob.digest)
locations = storage.preferred_locations locations = storage.preferred_locations
storage.stream_write(locations, path, fp, 'application/x-gzip') storage.stream_write(locations, path, fp, 'application/x-gzip')
db_blob = oci_model.blob.get_or_create_blob(cnrblob.digest, db_blob = oci_model.blob.get_or_create_blob(cnrblob.digest, cnrblob.size, content_media_type,
cnrblob.size,
content_media_type,
locations) locations)
return BlobDescriptor(mediaType=content_media_type, return BlobDescriptor(mediaType=content_media_type,
digest=_strip_sha256_header(db_blob.digest), digest=_strip_sha256_header(db_blob.digest), size=db_blob.size, urls=[])
size=db_blob.size,
urls=[])
def create_release(self, package, user, visibility, force=False): def create_release(self, package, user, visibility, force=False):
""" Add an app-release to a repository """ Add an app-release to a repository
@ -349,8 +333,8 @@ class OCIAppModel(AppRegistryDataInterface):
repo = model.repository.get_or_create_repository(ns, name, user, visibility=visibility, repo = model.repository.get_or_create_repository(ns, name, user, visibility=visibility,
repo_kind='application') repo_kind='application')
tag_name = package.release tag_name = package.release
oci_model.release.create_app_release(repo, tag_name, package.manifest(), oci_model.release.create_app_release(repo, tag_name,
data['content']['digest'], force) package.manifest(), data['content']['digest'], force)
def delete_release(self, package_name, release, media_type): def delete_release(self, package_name, release, media_type):
""" Remove/Delete an app-release from an app-repository. """ Remove/Delete an app-release from an app-repository.
@ -386,8 +370,7 @@ class OCIAppModel(AppRegistryDataInterface):
""" Returns all AppChannel for a package """ """ Returns all AppChannel for a package """
repo = self._application(package_name) repo = self._application(package_name)
channels = oci_model.channel.get_repo_channels(repo) channels = oci_model.channel.get_repo_channels(repo)
return [ChannelView(name=chan.name, return [ChannelView(name=chan.name, current=chan.linked_tag.name) for chan in channels]
current=chan.linked_tag.name) for chan in channels]
def fetch_channel(self, package_name, channel_name, with_releases=True): def fetch_channel(self, package_name, channel_name, with_releases=True):
""" Returns an AppChannel """ """ Returns an AppChannel """
@ -400,12 +383,11 @@ class OCIAppModel(AppRegistryDataInterface):
if with_releases: if with_releases:
releases = oci_model.channel.get_channel_releases(repo, channel) releases = oci_model.channel.get_channel_releases(repo, channel)
chanview = ChannelReleasesView(current=channel.linked_tag.name, chanview = ChannelReleasesView(
name=channel.name, current=channel.linked_tag.name, name=channel.name,
releases=[channel.linked_tag.name]+[c.name for c in releases]) releases=[channel.linked_tag.name] + [c.name for c in releases])
else: else:
chanview = ChannelView(current=channel.linked_tag.name, chanview = ChannelView(current=channel.linked_tag.name, name=channel.name)
name=channel.name)
return chanview return chanview
@ -424,8 +406,7 @@ class OCIAppModel(AppRegistryDataInterface):
""" """
repo = self._application(package_name) repo = self._application(package_name)
channel = oci_model.channel.create_or_update_channel(repo, channel_name, release) channel = oci_model.channel.create_or_update_channel(repo, channel_name, release)
return ChannelView(current=channel.linked_tag.name, return ChannelView(current=channel.linked_tag.name, name=channel.name)
name=channel.name)
def get_user(self, username, password): def get_user(self, username, password):
err_msg = None err_msg = None
@ -438,6 +419,7 @@ class OCIAppModel(AppRegistryDataInterface):
user, err_msg = authentication.verify_and_link_user(username, password) user, err_msg = authentication.verify_and_link_user(username, password)
return (user, err_msg) return (user, err_msg)
def _strip_sha256_header(digest): def _strip_sha256_header(digest):
if digest.startswith('sha256:'): if digest.startswith('sha256:'):
return digest.split('sha256:')[1] return digest.split('sha256:')[1]

View file

@ -42,6 +42,7 @@ class Blob(BlobBase):
class Channel(ChannelBase): class Channel(ChannelBase):
""" CNR Channel model implemented against the Quay data model. """ """ CNR Channel model implemented against the Quay data model. """
def __init__(self, name, package, current=None): def __init__(self, name, package, current=None):
super(Channel, self).__init__(name, package, current=current) super(Channel, self).__init__(name, package, current=current)
self._channel_data = None self._channel_data = None
@ -63,8 +64,9 @@ class Channel(ChannelBase):
@classmethod @classmethod
def all(cls, package_name): def all(cls, package_name):
return [Channel(c.name, package_name, c.current) return [
for c in oci_app_model.list_channels(package_name)] Channel(c.name, package_name, c.current) for c in oci_app_model.list_channels(package_name)
]
@property @property
def _channel(self): def _channel(self):
@ -97,12 +99,14 @@ class Package(PackageBase):
@classmethod @classmethod
def _apptuple_to_dict(cls, apptuple): def _apptuple_to_dict(cls, apptuple):
return {'release': apptuple.release, return {
'release': apptuple.release,
'created_at': apptuple.created_at, 'created_at': apptuple.created_at,
'digest': apptuple.manifest.digest, 'digest': apptuple.manifest.digest,
'mediaType': apptuple.manifest.mediaType, 'mediaType': apptuple.manifest.mediaType,
'package': apptuple.name, 'package': apptuple.name,
'content': apptuple.manifest.content._asdict()} 'content': apptuple.manifest.content._asdict()
}
@classmethod @classmethod
def create_repository(cls, package_name, visibility, owner): def create_repository(cls, package_name, visibility, owner):
@ -114,10 +118,11 @@ class Package(PackageBase):
@classmethod @classmethod
def all(cls, organization=None, media_type=None, search=None, username=None, **kwargs): def all(cls, organization=None, media_type=None, search=None, username=None, **kwargs):
return [dict(x._asdict()) for x in oci_app_model.list_applications(namespace=organization, return [
media_type=media_type, dict(x._asdict())
search=search, for x in oci_app_model.list_applications(namespace=organization, media_type=media_type,
username=username)] search=search, username=username)
]
@classmethod @classmethod
def _fetch(cls, package_name, release, media_type): def _fetch(cls, package_name, release, media_type):
@ -146,8 +151,10 @@ class Package(PackageBase):
return oci_app_model.release_exists(package, release) return oci_app_model.release_exists(package, release)
def channels(self, channel_class, iscurrent=True): def channels(self, channel_class, iscurrent=True):
return [c.name for c in oci_app_model.list_release_channels(self.package, self.release, return [
active=iscurrent)] c.name
for c in oci_app_model.list_release_channels(self.package, self.release, active=iscurrent)
]
@classmethod @classmethod
def manifests(cls, package, release=None): def manifests(cls, package, release=None):

View file

@ -19,7 +19,6 @@ from endpoints.appr.decorators import disallow_for_image_repository
from endpoints.decorators import anon_allowed, anon_protect from endpoints.decorators import anon_allowed, anon_protect
from util.names import REPOSITORY_NAME_REGEX, TAG_REGEX from util.names import REPOSITORY_NAME_REGEX, TAG_REGEX
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -67,8 +66,7 @@ def login():
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/blobs/sha256/<string:digest>", "/api/v1/packages/<string:namespace>/<string:package_name>/blobs/sha256/<string:digest>",
methods=['GET'], methods=['GET'],
strict_slashes=False, strict_slashes=False,)
)
@process_auth @process_auth
@require_app_repo_read @require_app_repo_read
@anon_protect @anon_protect
@ -90,11 +88,8 @@ def list_packages():
username = None username = None
if user: if user:
username = user.username username = user.username
result_data = cnr_registry.list_packages(namespace, result_data = cnr_registry.list_packages(namespace, package_class=Package, search=query,
package_class=Package, media_type=media_type, username=username)
search=query,
media_type=media_type,
username=username)
return jsonify(result_data) return jsonify(result_data)
@ -106,31 +101,23 @@ def list_packages():
@anon_protect @anon_protect
def delete_package(namespace, package_name, release, media_type): def delete_package(namespace, package_name, release, media_type):
reponame = repo_name(namespace, package_name) reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_package(reponame, result = cnr_registry.delete_package(reponame, release, media_type, package_class=Package)
release,
media_type,
package_class=Package)
return jsonify(result) return jsonify(result)
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>", "/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>",
methods=['GET'], methods=['GET'], strict_slashes=False)
strict_slashes=False
)
@process_auth @process_auth
@require_app_repo_read @require_app_repo_read
@anon_protect @anon_protect
def show_package(namespace, package_name, release, media_type): def show_package(namespace, package_name, release, media_type):
reponame = repo_name(namespace, package_name) reponame = repo_name(namespace, package_name)
result = cnr_registry.show_package(reponame, release, result = cnr_registry.show_package(reponame, release, media_type, channel_class=Channel,
media_type,
channel_class=Channel,
package_class=Package) package_class=Package)
return jsonify(result) return jsonify(result)
@appr_bp.route("/api/v1/packages/<string:namespace>/<string:package_name>", methods=['GET'], @appr_bp.route("/api/v1/packages/<string:namespace>/<string:package_name>", methods=['GET'],
strict_slashes=False) strict_slashes=False)
@process_auth @process_auth
@ -139,8 +126,7 @@ def show_package(namespace, package_name, release, media_type):
def show_package_releases(namespace, package_name): def show_package_releases(namespace, package_name):
reponame = repo_name(namespace, package_name) reponame = repo_name(namespace, package_name)
media_type = request.args.get('media_type', None) media_type = request.args.get('media_type', None)
result = cnr_registry.show_package_releases(reponame, result = cnr_registry.show_package_releases(reponame, media_type=media_type,
media_type=media_type,
package_class=Package) package_class=Package)
return jsonify(result) return jsonify(result)
@ -152,17 +138,14 @@ def show_package_releases(namespace, package_name):
@anon_protect @anon_protect
def show_package_releasse_manifests(namespace, package_name, release): def show_package_releasse_manifests(namespace, package_name, release):
reponame = repo_name(namespace, package_name) reponame = repo_name(namespace, package_name)
result = cnr_registry.show_package_manifests(reponame, result = cnr_registry.show_package_manifests(reponame, release, package_class=Package)
release,
package_class=Package)
return jsonify(result) return jsonify(result)
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>/pull", "/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>/pull",
methods=['GET'], methods=['GET'],
strict_slashes=False, strict_slashes=False,)
)
@process_auth @process_auth
@require_app_repo_read @require_app_repo_read
@anon_protect @anon_protect
@ -192,12 +175,14 @@ def push(namespace, package_name):
if not Package.exists(reponame): if not Package.exists(reponame):
if not CreateRepositoryPermission(namespace).can(): if not CreateRepositoryPermission(namespace).can():
raise Forbidden("Unauthorized access for: %s" % reponame, raise Forbidden("Unauthorized access for: %s" % reponame,
{"package": reponame, "scopes": ['create']}) {"package": reponame,
"scopes": ['create']})
Package.create_repository(reponame, private, owner) Package.create_repository(reponame, private, owner)
if not ModifyRepositoryPermission(namespace, package_name).can(): if not ModifyRepositoryPermission(namespace, package_name).can():
raise Forbidden("Unauthorized access for: %s" % reponame, raise Forbidden("Unauthorized access for: %s" % reponame,
{"package": reponame, "scopes": ['push']}) {"package": reponame,
"scopes": ['push']})
if not 'release' in values: if not 'release' in values:
raise InvalidUsage('Missing release') raise InvalidUsage('Missing release')
@ -237,7 +222,9 @@ def list_channels(namespace, package_name):
return jsonify(cnr_registry.list_channels(reponame, channel_class=Channel)) return jsonify(cnr_registry.list_channels(reponame, channel_class=Channel))
@appr_bp.route("/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>", methods=['GET'], strict_slashes=False) @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>",
methods=['GET'], strict_slashes=False)
@process_auth @process_auth
@require_app_repo_read @require_app_repo_read
@anon_protect @anon_protect
@ -250,8 +237,7 @@ def show_channel(namespace, package_name, channel_name):
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>", "/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>",
methods=['POST'], methods=['POST'],
strict_slashes=False, strict_slashes=False,)
)
@process_auth @process_auth
@require_app_repo_write @require_app_repo_write
@anon_protect @anon_protect
@ -262,6 +248,7 @@ def add_channel_release(namespace, package_name, channel_name, release):
package_class=Package) package_class=Package)
return jsonify(result) return jsonify(result)
def _check_channel_name(channel_name, release=None): def _check_channel_name(channel_name, release=None):
if not TAG_REGEX.match(channel_name): if not TAG_REGEX.match(channel_name):
logger.debug('Found invalid channel name CNR add channel release: %s', channel_name) logger.debug('Found invalid channel name CNR add channel release: %s', channel_name)
@ -275,11 +262,11 @@ def _check_channel_name(channel_name, release=None):
{'name': channel_name, {'name': channel_name,
"release": release}) "release": release})
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>", "/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>",
methods=['DELETE'], methods=['DELETE'],
strict_slashes=False, strict_slashes=False,)
)
@process_auth @process_auth
@require_app_repo_write @require_app_repo_write
@anon_protect @anon_protect
@ -294,8 +281,7 @@ def delete_channel_release(namespace, package_name, channel_name, release):
@appr_bp.route( @appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>", "/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>",
methods=['DELETE'], methods=['DELETE'],
strict_slashes=False, strict_slashes=False,)
)
@process_auth @process_auth
@require_app_repo_write @require_app_repo_write
@anon_protect @anon_protect

View file

@ -3,21 +3,21 @@ import shutil
import uuid import uuid
import pytest import pytest
from cnr.models.db_base import CnrDB from cnr.models.db_base import CnrDB
from cnr.tests.test_apiserver import BaseTestServer
from cnr.tests.conftest import * from cnr.tests.conftest import *
from cnr.tests.test_apiserver import BaseTestServer
from cnr.tests.test_models import CnrTestModels from cnr.tests.test_models import CnrTestModels
from peewee import SqliteDatabase from peewee import SqliteDatabase
from app import app as application
from data.database import close_db_filter, User, db as database
from data.model import user, organization
from data.interfaces.appr import oci_app_model
import data.oci_model.blob as oci_blob import data.oci_model.blob as oci_blob
from app import app as application
from data.database import db as database
from data.database import User, close_db_filter
from data.interfaces.appr import oci_app_model
from data.model import organization, user
from endpoints.appr import appr_bp, registry from endpoints.appr import appr_bp, registry
from endpoints.appr.cnr_backend import QuayDB, Channel, Package from endpoints.appr.cnr_backend import Channel, Package, QuayDB
from initdb import wipe_database, initialize_database, populate_database from initdb import initialize_database, populate_database, wipe_database
application.register_blueprint(appr_bp, url_prefix='/cnr') application.register_blueprint(appr_bp, url_prefix='/cnr')
@ -65,9 +65,10 @@ class PackageTest(Package):
blob = blob_cls.get(package_name, package.manifest.content.digest) blob = blob_cls.get(package_name, package.manifest.content.digest)
data = cls._apptuple_to_dict(package) data = cls._apptuple_to_dict(package)
data.pop('digest') data.pop('digest')
data['channels'] = [x.name for x in oci_app_model.list_release_channels(package_name, data['channels'] = [
package.release, x.name
False)] for x in oci_app_model.list_release_channels(package_name, package.release, False)
]
data['blob'] = blob.b64blob data['blob'] = blob.b64blob
result.append(data) result.append(data)
return result return result
@ -100,9 +101,7 @@ def sqlitedb_file(tmpdir):
def init_db_path(tmpdir_factory): def init_db_path(tmpdir_factory):
sqlitedb_file_loc = str(tmpdir_factory.mktemp("data").join("test.db")) sqlitedb_file_loc = str(tmpdir_factory.mktemp("data").join("test.db"))
sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file_loc) sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file_loc)
conf = {"TESTING": True, conf = {"TESTING": True, "DEBUG": True, "DB_URI": sqlitedb}
"DEBUG": True,
"DB_URI": sqlitedb}
os.environ['TEST_DATABASE_URI'] = str(sqlitedb) os.environ['TEST_DATABASE_URI'] = str(sqlitedb)
os.environ['DB_URI'] = str(sqlitedb) os.environ['DB_URI'] = str(sqlitedb)
database.initialize(SqliteDatabase(sqlitedb_file_loc)) database.initialize(SqliteDatabase(sqlitedb_file_loc))
@ -128,9 +127,7 @@ def database_uri(monkeypatch, init_db_path, sqlitedb_file):
@pytest.fixture() @pytest.fixture()
def appconfig(database_uri): def appconfig(database_uri):
conf = {"TESTING": True, conf = {"TESTING": True, "DEBUG": True, "DB_URI": database_uri}
"DEBUG": True,
"DB_URI": database_uri}
return conf return conf
@ -206,8 +203,7 @@ class TestQuayModels(CnrTestModels):
assert p.package == "titi/rocketchat" assert p.package == "titi/rocketchat"
assert p.release == "2.0.1" assert p.release == "2.0.1"
assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80" assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80"
blob = db_with_data1.Blob.get("titi/rocketchat", blob = db_with_data1.Blob.get("titi/rocketchat", p.digest)
p.digest)
bdb = oci_blob.get_blob(p.digest) bdb = oci_blob.get_blob(p.digest)
newblob = db_with_data1.Blob("titi/app2", blob.b64blob) newblob = db_with_data1.Blob("titi/app2", blob.b64blob)
p2 = db_with_data1.Package("titi/app2", "1.0.0", "helm", newblob) p2 = db_with_data1.Package("titi/app2", "1.0.0", "helm", newblob)
@ -220,8 +216,8 @@ class TestQuayModels(CnrTestModels):
assert p.package == "titi/rocketchat" assert p.package == "titi/rocketchat"
assert p.release == "2.0.1" assert p.release == "2.0.1"
assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80" assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80"
blob = db_with_data1.Blob.get("titi/rocketchat", blob = db_with_data1.Blob.get(
"72ed15c9a65961ecd034cca098ec18eb99002cd402824aae8a674a8ae41bd0ef") "titi/rocketchat", "72ed15c9a65961ecd034cca098ec18eb99002cd402824aae8a674a8ae41bd0ef")
p2 = db_with_data1.Package("titi/rocketchat", "2.0.1", "kpm", blob) p2 = db_with_data1.Package("titi/rocketchat", "2.0.1", "kpm", blob)
p2.save(force=True) p2.save(force=True)
pnew = db_with_data1.Package.get("titi/rocketchat", "2.0.1", 'kpm') pnew = db_with_data1.Package.get("titi/rocketchat", "2.0.1", 'kpm')

View file

@ -9,11 +9,23 @@ from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_f
@pytest.mark.parametrize('login_data, expected_code', [ @pytest.mark.parametrize('login_data, expected_code', [
({"username": "devtable", "password": "password"}, 200), ({
({"username": "devtable", "password": "badpass"}, 401), "username": "devtable",
({"username": "devtable+dtrobot", "password": "badpass"}, 401), "password": "password"
({"username": "devtable+dtrobot2", "password": None}, 200), }, 200),
]) ({
"username": "devtable",
"password": "badpass"
}, 401),
({
"username": "devtable+dtrobot",
"password": "badpass"
}, 401),
({
"username": "devtable+dtrobot2",
"password": None
}, 200),
])
def test_login(login_data, expected_code, app, client): def test_login(login_data, expected_code, app, client):
if "+" in login_data['username'] and login_data['password'] is None: if "+" in login_data['username'] and login_data['password'] is None:
username, robotname = login_data['username'].split("+") username, robotname = login_data['username'].split("+")
@ -25,4 +37,3 @@ def test_login(login_data, expected_code, app, client):
rv = client.open(url, method='POST', data=json.dumps(data), headers=headers) rv = client.open(url, method='POST', data=json.dumps(data), headers=headers)
assert rv.status_code == expected_code assert rv.status_code == expected_code

Binary file not shown.