Merge pull request #2538 from coreos-inc/enable-robot-cnr

Enable robot cnr
This commit is contained in:
Antoine Legrand 2017-04-24 17:32:46 +02:00 committed by GitHub
commit 8499612c4c
5 changed files with 170 additions and 177 deletions

View file

@ -7,11 +7,10 @@ import cnr.semver
from cnr.exception import raise_package_not_found, raise_channel_not_found
from six import add_metaclass
from app import storage
from app import storage, authentication
from data import model, oci_model
from data.database import Tag, Manifest, MediaType, Blob, Repository, Channel
from util.names import parse_robot_username
class BlobDescriptor(namedtuple('Blob', ['mediaType', 'size', 'digest', 'urls'])):
""" BlobDescriptor describes a blob with its mediatype, size and digest.
@ -30,31 +29,23 @@ class ChannelView(namedtuple('ChannelView', ['name', 'current'])):
"""
class ApplicationSummaryView(namedtuple('ApplicationSummaryView', ['name',
'namespace',
'visibility',
'default',
'manifests',
'channels',
'releases',
'updated_at',
'created_at'])):
class ApplicationSummaryView(
namedtuple('ApplicationSummaryView', [
'name', 'namespace', 'visibility', 'default', 'manifests', 'channels', 'releases',
'updated_at', 'created_at'
])):
""" ApplicationSummaryView is an aggregated view of an application repository.
"""
class ApplicationManifest(namedtuple('ApplicationManifest', ['mediaType',
'digest',
'content'])):
class ApplicationManifest(namedtuple('ApplicationManifest', ['mediaType', 'digest', 'content'])):
""" ApplicationManifest embed the BlobDescriptor and some metadata around it.
An ApplicationManifest is content-addressable.
"""
class ApplicationRelease(namedtuple('ApplicationRelease', ['release',
'name',
'created_at',
'manifest'])):
class ApplicationRelease(
namedtuple('ApplicationRelease', ['release', 'name', 'created_at', 'manifest'])):
""" The ApplicationRelease associates an ApplicationManifest to a repository and release.
"""
@ -221,12 +212,15 @@ class OCIAppModel(AppRegistryDataInterface):
releases = [t.name for t in repo.tag_set_prefetch]
if not releases:
continue
available_releases = [str(x) for x in sorted(cnr.semver.versions(releases, False),
reverse=True)]
available_releases = [
str(x) for x in sorted(cnr.semver.versions(releases, False), reverse=True)
]
channels = None
if with_channels:
channels = [ChannelView(name=chan.name, current=chan.linked_tag.name)
for chan in oci_model.channel.get_repo_channels(repo)]
channels = [
ChannelView(name=chan.name, current=chan.linked_tag.name)
for chan in oci_model.channel.get_repo_channels(repo)
]
app_name = _join_package_name(repo.namespace_user.username, repo.name)
manifests = self.list_manifests(app_name, available_releases[0])
@ -239,8 +233,7 @@ class OCIAppModel(AppRegistryDataInterface):
manifests=manifests,
releases=available_releases,
updated_at=_timestamp_to_iso(repo.tag_set_prefetch[-1].lifetime_start),
created_at=_timestamp_to_iso(repo.tag_set_prefetch[0].lifetime_start),
)
created_at=_timestamp_to_iso(repo.tag_set_prefetch[0].lifetime_start),)
views.append(view)
return views
@ -270,9 +263,10 @@ class OCIAppModel(AppRegistryDataInterface):
Todo:
* Filter results with readeable reposistory for the user (including visibilitys)
"""
return [_join_package_name(r.namespace_user.username, r.name)
for r in model.repository.get_app_search(lookup=query, username=username, limit=50)]
return [
_join_package_name(r.namespace_user.username, r.name)
for r in model.repository.get_app_search(lookup=query, username=username, limit=50)
]
def list_releases(self, package_name, media_type=None):
""" Return the list of all releases of an Application
@ -307,21 +301,15 @@ class OCIAppModel(AppRegistryDataInterface):
created_at = _timestamp_to_iso(tag.lifetime_start)
blob_descriptor = BlobDescriptor(digest=_strip_sha256_header(blob.digest),
mediaType=blob.media_type.name,
size=blob.size, urls=[])
mediaType=blob.media_type.name, size=blob.size, urls=[])
app_manifest = ApplicationManifest(digest=manifest.digest, mediaType=manifest.media_type.name,
content=blob_descriptor)
app_manifest = ApplicationManifest(
digest=manifest.digest, mediaType=manifest.media_type.name, content=blob_descriptor)
app_release = ApplicationRelease(release=tag.name,
created_at=created_at,
name=package_name,
app_release = ApplicationRelease(release=tag.name, created_at=created_at, name=package_name,
manifest=app_manifest)
return app_release
except (Tag.DoesNotExist,
Manifest.DoesNotExist,
Blob.DoesNotExist,
Repository.DoesNotExist,
except (Tag.DoesNotExist, Manifest.DoesNotExist, Blob.DoesNotExist, Repository.DoesNotExist,
MediaType.DoesNotExist):
raise_package_not_found(package_name, release, media_type)
@ -330,14 +318,10 @@ class OCIAppModel(AppRegistryDataInterface):
path = cnrblob.upload_url(cnrblob.digest)
locations = storage.preferred_locations
storage.stream_write(locations, path, fp, 'application/x-gzip')
db_blob = oci_model.blob.get_or_create_blob(cnrblob.digest,
cnrblob.size,
content_media_type,
db_blob = oci_model.blob.get_or_create_blob(cnrblob.digest, cnrblob.size, content_media_type,
locations)
return BlobDescriptor(mediaType=content_media_type,
digest=_strip_sha256_header(db_blob.digest),
size=db_blob.size,
urls=[])
digest=_strip_sha256_header(db_blob.digest), size=db_blob.size, urls=[])
def create_release(self, package, user, visibility, force=False):
""" Add an app-release to a repository
@ -349,8 +333,8 @@ class OCIAppModel(AppRegistryDataInterface):
repo = model.repository.get_or_create_repository(ns, name, user, visibility=visibility,
repo_kind='application')
tag_name = package.release
oci_model.release.create_app_release(repo, tag_name, package.manifest(),
data['content']['digest'], force)
oci_model.release.create_app_release(repo, tag_name,
package.manifest(), data['content']['digest'], force)
def delete_release(self, package_name, release, media_type):
""" Remove/Delete an app-release from an app-repository.
@ -386,8 +370,7 @@ class OCIAppModel(AppRegistryDataInterface):
""" Returns all AppChannel for a package """
repo = self._application(package_name)
channels = oci_model.channel.get_repo_channels(repo)
return [ChannelView(name=chan.name,
current=chan.linked_tag.name) for chan in channels]
return [ChannelView(name=chan.name, current=chan.linked_tag.name) for chan in channels]
def fetch_channel(self, package_name, channel_name, with_releases=True):
""" Returns an AppChannel """
@ -400,12 +383,11 @@ class OCIAppModel(AppRegistryDataInterface):
if with_releases:
releases = oci_model.channel.get_channel_releases(repo, channel)
chanview = ChannelReleasesView(current=channel.linked_tag.name,
name=channel.name,
releases=[channel.linked_tag.name]+[c.name for c in releases])
chanview = ChannelReleasesView(
current=channel.linked_tag.name, name=channel.name,
releases=[channel.linked_tag.name] + [c.name for c in releases])
else:
chanview = ChannelView(current=channel.linked_tag.name,
name=channel.name)
chanview = ChannelView(current=channel.linked_tag.name, name=channel.name)
return chanview
@ -424,8 +406,18 @@ class OCIAppModel(AppRegistryDataInterface):
"""
repo = self._application(package_name)
channel = oci_model.channel.create_or_update_channel(repo, channel_name, release)
return ChannelView(current=channel.linked_tag.name,
name=channel.name)
return ChannelView(current=channel.linked_tag.name, name=channel.name)
def get_user(self, username, password):
err_msg = None
if parse_robot_username(username) is not None:
try:
user = model.user.verify_robot(username, password)
except model.InvalidRobotException as exc:
return (None, exc.message)
else:
user, err_msg = authentication.verify_and_link_user(username, password)
return (user, err_msg)
def _strip_sha256_header(digest):

View file

@ -8,7 +8,7 @@ from cnr.models.package_base import PackageBase, manifest_media_type
from app import storage
from data.interfaces.appr import oci_app_model
from data.oci_model import blob # TODO these calls should be through oci_app_model
from data.oci_model import blob # TODO these calls should be through oci_app_model
class Blob(BlobBase):
@ -42,6 +42,7 @@ class Blob(BlobBase):
class Channel(ChannelBase):
""" CNR Channel model implemented against the Quay data model. """
def __init__(self, name, package, current=None):
super(Channel, self).__init__(name, package, current=current)
self._channel_data = None
@ -63,8 +64,9 @@ class Channel(ChannelBase):
@classmethod
def all(cls, package_name):
return [Channel(c.name, package_name, c.current)
for c in oci_app_model.list_channels(package_name)]
return [
Channel(c.name, package_name, c.current) for c in oci_app_model.list_channels(package_name)
]
@property
def _channel(self):
@ -83,17 +85,28 @@ class Channel(ChannelBase):
oci_app_model.delete_channel(self.package, self.name)
class User(object):
""" User in CNR models """
@classmethod
def get_user(cls, username, password):
""" Returns True if user creds is valid """
return oci_app_model.get_user(username, password)
class Package(PackageBase):
""" CNR Package model implemented against the Quay data model. """
@classmethod
def _apptuple_to_dict(cls, apptuple):
return {'release': apptuple.release,
'created_at': apptuple.created_at,
'digest': apptuple.manifest.digest,
'mediaType': apptuple.manifest.mediaType,
'package': apptuple.name,
'content': apptuple.manifest.content._asdict()}
return {
'release': apptuple.release,
'created_at': apptuple.created_at,
'digest': apptuple.manifest.digest,
'mediaType': apptuple.manifest.mediaType,
'package': apptuple.name,
'content': apptuple.manifest.content._asdict()
}
@classmethod
def create_repository(cls, package_name, visibility, owner):
@ -105,10 +118,11 @@ class Package(PackageBase):
@classmethod
def all(cls, organization=None, media_type=None, search=None, username=None, **kwargs):
return [dict(x._asdict()) for x in oci_app_model.list_applications(namespace=organization,
media_type=media_type,
search=search,
username=username)]
return [
dict(x._asdict())
for x in oci_app_model.list_applications(namespace=organization, media_type=media_type,
search=search, username=username)
]
@classmethod
def _fetch(cls, package_name, release, media_type):
@ -137,8 +151,10 @@ class Package(PackageBase):
return oci_app_model.release_exists(package, release)
def channels(self, channel_class, iscurrent=True):
return [c.name for c in oci_app_model.list_release_channels(self.package, self.release,
active=iscurrent)]
return [
c.name
for c in oci_app_model.list_release_channels(self.package, self.release, active=iscurrent)
]
@classmethod
def manifests(cls, package, release=None):

View file

@ -1,27 +1,24 @@
import logging
from base64 import b64encode
import cnr
from cnr.api.impl import registry as cnr_registry
from cnr.api.registry import repo_name, _pull
from cnr.exception import (CnrException, InvalidUsage, InvalidParams, InvalidRelease,
UnableToLockResource, UnauthorizedAccess, Unsupported, ChannelNotFound, Forbidden,
PackageAlreadyExists, PackageNotFound, PackageReleaseNotFound)
from flask import request, jsonify
from cnr.api.registry import _pull, repo_name
from cnr.exception import (
ChannelNotFound, CnrException, Forbidden, InvalidParams, InvalidRelease, InvalidUsage,
PackageAlreadyExists, PackageNotFound, PackageReleaseNotFound, UnableToLockResource,
UnauthorizedAccess, Unsupported)
from flask import jsonify, request
from app import authentication
from auth.auth_context import get_authenticated_user
from auth.decorators import process_auth
from auth.permissions import CreateRepositoryPermission, ModifyRepositoryPermission
from endpoints.appr import appr_bp, require_app_repo_read, require_app_repo_write
from auth.permissions import (CreateRepositoryPermission, ModifyRepositoryPermission)
from endpoints.appr import (appr_bp, require_app_repo_read, require_app_repo_write)
from endpoints.appr.cnr_backend import Blob, Channel, Package, User
from endpoints.appr.decorators import disallow_for_image_repository
from endpoints.appr.cnr_backend import Package, Channel, Blob
from endpoints.decorators import anon_allowed, anon_protect
from util.names import REPOSITORY_NAME_REGEX, TAG_REGEX
logger = logging.getLogger(__name__)
@ -58,7 +55,7 @@ def login():
if not username or not password:
raise InvalidUsage('Missing username or password')
user, err = authentication.verify_credentials(username, password)
user, err = User.get_user(username, password)
if err is not None:
raise UnauthorizedAccess(err)
@ -69,8 +66,7 @@ def login():
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/blobs/sha256/<string:digest>",
methods=['GET'],
strict_slashes=False,
)
strict_slashes=False,)
@process_auth
@require_app_repo_read
@anon_protect
@ -92,11 +88,8 @@ def list_packages():
username = None
if user:
username = user.username
result_data = cnr_registry.list_packages(namespace,
package_class=Package,
search=query,
media_type=media_type,
username=username)
result_data = cnr_registry.list_packages(namespace, package_class=Package, search=query,
media_type=media_type, username=username)
return jsonify(result_data)
@ -108,31 +101,23 @@ def list_packages():
@anon_protect
def delete_package(namespace, package_name, release, media_type):
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_package(reponame,
release,
media_type,
package_class=Package)
result = cnr_registry.delete_package(reponame, release, media_type, package_class=Package)
return jsonify(result)
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>",
methods=['GET'],
strict_slashes=False
)
methods=['GET'], strict_slashes=False)
@process_auth
@require_app_repo_read
@anon_protect
def show_package(namespace, package_name, release, media_type):
reponame = repo_name(namespace, package_name)
result = cnr_registry.show_package(reponame, release,
media_type,
channel_class=Channel,
result = cnr_registry.show_package(reponame, release, media_type, channel_class=Channel,
package_class=Package)
return jsonify(result)
@appr_bp.route("/api/v1/packages/<string:namespace>/<string:package_name>", methods=['GET'],
strict_slashes=False)
@process_auth
@ -141,8 +126,7 @@ def show_package(namespace, package_name, release, media_type):
def show_package_releases(namespace, package_name):
reponame = repo_name(namespace, package_name)
media_type = request.args.get('media_type', None)
result = cnr_registry.show_package_releases(reponame,
media_type=media_type,
result = cnr_registry.show_package_releases(reponame, media_type=media_type,
package_class=Package)
return jsonify(result)
@ -154,17 +138,14 @@ def show_package_releases(namespace, package_name):
@anon_protect
def show_package_releasse_manifests(namespace, package_name, release):
reponame = repo_name(namespace, package_name)
result = cnr_registry.show_package_manifests(reponame,
release,
package_class=Package)
result = cnr_registry.show_package_manifests(reponame, release, package_class=Package)
return jsonify(result)
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/<string:release>/<string:media_type>/pull",
methods=['GET'],
strict_slashes=False,
)
strict_slashes=False,)
@process_auth
@require_app_repo_read
@anon_protect
@ -185,7 +166,7 @@ def push(namespace, package_name):
if not REPOSITORY_NAME_REGEX.match(package_name):
logger.debug('Found invalid repository name CNR push: %s', reponame)
raise InvalidUsage()
raise InvalidUsage('invalid repository name: %s' % reponame)
values = request.get_json(force=True, silent=True) or {}
private = values.get('visibility', 'private')
@ -194,12 +175,14 @@ def push(namespace, package_name):
if not Package.exists(reponame):
if not CreateRepositoryPermission(namespace).can():
raise Forbidden("Unauthorized access for: %s" % reponame,
{"package": reponame, "scopes": ['create']})
{"package": reponame,
"scopes": ['create']})
Package.create_repository(reponame, private, owner)
if not ModifyRepositoryPermission(namespace, package_name).can():
raise Forbidden("Unauthorized access for: %s" % reponame,
{"package": reponame, "scopes": ['push']})
{"package": reponame,
"scopes": ['push']})
if not 'release' in values:
raise InvalidUsage('Missing release')
@ -239,7 +222,9 @@ def list_channels(namespace, package_name):
return jsonify(cnr_registry.list_channels(reponame, channel_class=Channel))
@appr_bp.route("/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>", methods=['GET'], strict_slashes=False)
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>",
methods=['GET'], strict_slashes=False)
@process_auth
@require_app_repo_read
@anon_protect
@ -252,43 +237,41 @@ def show_channel(namespace, package_name, channel_name):
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>",
methods=['POST'],
strict_slashes=False,
)
strict_slashes=False,)
@process_auth
@require_app_repo_write
@anon_protect
def add_channel_release(namespace, package_name, channel_name, release):
if not TAG_REGEX.match(channel_name):
logger.debug('Found invalid channel name CNR add channel release: %s', channel_name)
raise InvalidUsage()
if not TAG_REGEX.match(release):
logger.debug('Found invalid release name CNR add channel release: %s', release)
raise InvalidUsage()
_check_channel_name(channel_name, release)
reponame = repo_name(namespace, package_name)
result = cnr_registry.add_channel_release(reponame, channel_name, release, channel_class=Channel,
package_class=Package)
return jsonify(result)
def _check_channel_name(channel_name, release=None):
if not TAG_REGEX.match(channel_name):
logger.debug('Found invalid channel name CNR add channel release: %s', channel_name)
raise InvalidUsage("Found invalid channelname %s" % release,
{'name': channel_name,
"release": release})
if release is not None and not TAG_REGEX.match(release):
logger.debug('Found invalid release name CNR add channel release: %s', release)
raise InvalidUsage("Found invalid channel release name %s" % release,
{'name': channel_name,
"release": release})
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>/<string:release>",
methods=['DELETE'],
strict_slashes=False,
)
strict_slashes=False,)
@process_auth
@require_app_repo_write
@anon_protect
def delete_channel_release(namespace, package_name, channel_name, release):
if not TAG_REGEX.match(channel_name):
logger.debug('Found invalid channel name CNR delete channel release: %s', channel_name)
raise InvalidUsage()
if not TAG_REGEX.match(release):
logger.debug('Found invalid release name CNR delete channel release: %s', release)
raise InvalidUsage()
_check_channel_name(channel_name, release)
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_channel_release(reponame, channel_name, release,
channel_class=Channel, package_class=Package)
@ -298,16 +281,12 @@ def delete_channel_release(namespace, package_name, channel_name, release):
@appr_bp.route(
"/api/v1/packages/<string:namespace>/<string:package_name>/channels/<string:channel_name>",
methods=['DELETE'],
strict_slashes=False,
)
strict_slashes=False,)
@process_auth
@require_app_repo_write
@anon_protect
def delete_channel(namespace, package_name, channel_name):
if not TAG_REGEX.match(channel_name):
logger.debug('Found invalid channel name CNR delete channel: %s', channel_name)
raise InvalidUsage()
_check_channel_name(channel_name)
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_channel(reponame, channel_name, channel_class=Channel)
return jsonify(result)

View file

@ -3,21 +3,21 @@ import shutil
import uuid
import pytest
from cnr.models.db_base import CnrDB
from cnr.tests.test_apiserver import BaseTestServer
from cnr.tests.conftest import *
from cnr.tests.test_apiserver import BaseTestServer
from cnr.tests.test_models import CnrTestModels
from peewee import SqliteDatabase
from app import app as application
from data.database import close_db_filter, User, db as database
from data.model import user, organization
from data.interfaces.appr import oci_app_model
import data.oci_model.blob as oci_blob
from app import app as application
from data.database import db as database
from data.database import User, close_db_filter
from data.interfaces.appr import oci_app_model
from data.model import organization, user
from endpoints.appr import appr_bp, registry
from endpoints.appr.cnr_backend import QuayDB, Channel, Package
from initdb import wipe_database, initialize_database, populate_database
from endpoints.appr.cnr_backend import Channel, Package, QuayDB
from initdb import initialize_database, populate_database, wipe_database
application.register_blueprint(appr_bp, url_prefix='/cnr')
@ -65,9 +65,10 @@ class PackageTest(Package):
blob = blob_cls.get(package_name, package.manifest.content.digest)
data = cls._apptuple_to_dict(package)
data.pop('digest')
data['channels'] = [x.name for x in oci_app_model.list_release_channels(package_name,
package.release,
False)]
data['channels'] = [
x.name
for x in oci_app_model.list_release_channels(package_name, package.release, False)
]
data['blob'] = blob.b64blob
result.append(data)
return result
@ -100,9 +101,7 @@ def sqlitedb_file(tmpdir):
def init_db_path(tmpdir_factory):
sqlitedb_file_loc = str(tmpdir_factory.mktemp("data").join("test.db"))
sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file_loc)
conf = {"TESTING": True,
"DEBUG": True,
"DB_URI": sqlitedb}
conf = {"TESTING": True, "DEBUG": True, "DB_URI": sqlitedb}
os.environ['TEST_DATABASE_URI'] = str(sqlitedb)
os.environ['DB_URI'] = str(sqlitedb)
database.initialize(SqliteDatabase(sqlitedb_file_loc))
@ -128,9 +127,7 @@ def database_uri(monkeypatch, init_db_path, sqlitedb_file):
@pytest.fixture()
def appconfig(database_uri):
conf = {"TESTING": True,
"DEBUG": True,
"DB_URI": database_uri}
conf = {"TESTING": True, "DEBUG": True, "DB_URI": database_uri}
return conf
@ -206,8 +203,7 @@ class TestQuayModels(CnrTestModels):
assert p.package == "titi/rocketchat"
assert p.release == "2.0.1"
assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80"
blob = db_with_data1.Blob.get("titi/rocketchat",
p.digest)
blob = db_with_data1.Blob.get("titi/rocketchat", p.digest)
bdb = oci_blob.get_blob(p.digest)
newblob = db_with_data1.Blob("titi/app2", blob.b64blob)
p2 = db_with_data1.Package("titi/app2", "1.0.0", "helm", newblob)
@ -215,13 +211,13 @@ class TestQuayModels(CnrTestModels):
b2db = oci_blob.get_blob(p2.digest)
assert b2db.id == bdb.id
def test_push_same_blob(self, db_with_data1):
def test_force_push_different_blob(self, db_with_data1):
p = db_with_data1.Package.get("titi/rocketchat", "2.0.1", 'kpm')
assert p.package == "titi/rocketchat"
assert p.release == "2.0.1"
assert p.digest == "d3b54b7912fe770a61b59ab612a442eac52a8a5d8d05dbe92bf8f212d68aaa80"
blob = db_with_data1.Blob.get("titi/rocketchat",
"72ed15c9a65961ecd034cca098ec18eb99002cd402824aae8a674a8ae41bd0ef")
blob = db_with_data1.Blob.get(
"titi/rocketchat", "72ed15c9a65961ecd034cca098ec18eb99002cd402824aae8a674a8ae41bd0ef")
p2 = db_with_data1.Package("titi/rocketchat", "2.0.1", "kpm", blob)
p2.save(force=True)
pnew = db_with_data1.Package.get("titi/rocketchat", "2.0.1", 'kpm')

View file

@ -1,29 +1,39 @@
import json
from flask import url_for
import pytest
from data import model
from endpoints.appr.registry import appr_bp
from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
def test_invalid_login(app, client):
app.register_blueprint(appr_bp, url_prefix='/cnr')
@pytest.mark.parametrize('login_data, expected_code', [
({
"username": "devtable",
"password": "password"
}, 200),
({
"username": "devtable",
"password": "badpass"
}, 401),
({
"username": "devtable+dtrobot",
"password": "badpass"
}, 401),
({
"username": "devtable+dtrobot2",
"password": None
}, 200),
])
def test_login(login_data, expected_code, app, client):
if "+" in login_data['username'] and login_data['password'] is None:
username, robotname = login_data['username'].split("+")
_, login_data['password'] = model.user.create_robot(robotname, model.user.get_user(username))
app.register_blueprint(appr_bp, url_prefix='/cnr')
url = url_for('appr.login')
headers = {'Content-Type': 'application/json'}
data = {'user': {'username': 'foo', 'password': 'bar'}}
data = {'user': login_data}
rv = client.open(url, method='POST', data=json.dumps(data), headers=headers)
assert rv.status_code == 401
def test_valid_login(app, client):
app.register_blueprint(appr_bp, url_prefix='/cnr')
url = url_for('appr.login')
headers = {'Content-Type': 'application/json'}
data = {'user': {'username': 'devtable', 'password': 'password'}}
rv = client.open(url, method='POST', data=json.dumps(data), headers=headers)
assert rv.status_code == 200
assert rv.status_code == expected_code