fix(endpoints): add tests

this adds tests to the analyze trigger api

[testing -> locally using pytest]

Issue: https://www.pivotaltracker.com/story/show/144661631

- [ ] It works!
- [ ] Comments provide sufficient explanations for the next contributor
- [ ] Tests cover changes and corner cases
- [ ] Follows Quay syntax patterns and format
This commit is contained in:
Charlton Austin 2017-05-12 10:36:47 -05:00
parent 3b728014ac
commit 4ec55665de
3 changed files with 298 additions and 158 deletions

View file

@ -0,0 +1,152 @@
import pytest
from mock import Mock
from auth import permissions
from data import model
from endpoints.api.trigger_analyzer import TriggerAnalyzer
from util import dockerfileparse
BAD_PATH = "\"server_hostname/\" is not a valid Quay repository path"
EMPTY_CONF = {}
GOOD_CONF = {'context': '/', 'dockerfile_path': '/file'}
BAD_CONF = {'context': 'context', 'dockerfile_path': 'dockerfile_path'}
ONE_ROBOT = {'can_read': False, 'is_robot': True, 'kind': 'user', 'name': 'name'}
DOCKERFILE_NOT_CHILD = 'Dockerfile, context, is not a child of the context, dockerfile_path.'
THE_DOCKERFILE_SPECIFIED = 'Could not parse the Dockerfile specified'
DOCKERFILE_PATH_NOT_FOUND = 'Specified Dockerfile path for the trigger was not found on the main branch. This trigger may fail.'
NO_FROM_LINE = 'No FROM line found in the Dockerfile'
REPO_NOT_FOUND = 'Repository "server_hostname/path/file" referenced by the Dockerfile was not found'
@pytest.fixture
def get_monkeypatch(monkeypatch):
return monkeypatch
def patch_permissions(monkeypatch, can_read=False):
def can_read_fn(base_namespace, base_repository):
return can_read
monkeypatch.setattr(permissions, 'ReadRepositoryPermission', can_read_fn)
def patch_list_namespace_robots(monkeypatch):
my_mock = Mock()
my_mock.configure_mock(**{'username': 'name'})
return_value = [my_mock]
def return_list_mocks(namesapce):
return return_value
monkeypatch.setattr(model.user, 'list_namespace_robots', return_list_mocks)
return return_value
def patch_get_all_repo_users_transitive(monkeypatch):
my_mock = Mock()
my_mock.configure_mock(**{'username': 'name'})
return_value = [my_mock]
def return_get_mocks(namesapce, image_repostiory):
return return_value
monkeypatch.setattr(model.user, 'get_all_repo_users_transitive', return_get_mocks)
return return_value
def patch_parse_dockerfile(monkeypatch, get_base_image):
if get_base_image is not None:
def return_return_value(content):
parse_mock = Mock()
parse_mock.configure_mock(**{'get_base_image': get_base_image})
return parse_mock
monkeypatch.setattr(dockerfileparse, "parse_dockerfile", return_return_value)
else:
def return_return_value(content):
return get_base_image
monkeypatch.setattr(dockerfileparse, "parse_dockerfile", return_return_value)
def patch_model_repository_get_repository(monkeypatch, get_repository):
if get_repository is not None:
def mock_get_repository(base_namespace, base_repository):
vis_mock = Mock()
vis_mock.name = get_repository
get_repo_mock = Mock(visibility=vis_mock)
return get_repo_mock
else:
def mock_get_repository(base_namespace, base_repository):
return None
monkeypatch.setattr(model.repository, "get_repository", mock_get_repository)
def return_none():
return None
def return_content():
return Mock()
def return_server_hostname():
return "server_hostname/"
def return_non_server_hostname():
return "slime"
def return_path():
return "server_hostname/path/file"
@pytest.mark.parametrize(
'handler_fn, config_dict, admin_org_permission, status, message, get_base_image, robots, server_hostname, get_repository, can_read, namespace, name', [
(return_none, EMPTY_CONF, False, "warning", DOCKERFILE_PATH_NOT_FOUND, None, [], None, None, False, "namespace", None),
(return_none, EMPTY_CONF, True, "warning", DOCKERFILE_PATH_NOT_FOUND, None, [ONE_ROBOT], None, None, False, "namespace", None),
(return_content, BAD_CONF, False, "error", THE_DOCKERFILE_SPECIFIED, None, [], None, None, False, "namespace", None),
(return_none, EMPTY_CONF, False, "warning", DOCKERFILE_PATH_NOT_FOUND, return_none, [], None, None, False, "namespace", None),
(return_none, EMPTY_CONF, True, "warning", DOCKERFILE_PATH_NOT_FOUND, return_none, [ONE_ROBOT], None, None, False, "namespace", None),
(return_content, BAD_CONF, False, "error", DOCKERFILE_NOT_CHILD, return_none, [], None, None, False, "namespace", None),
(return_content, GOOD_CONF, False, "warning", NO_FROM_LINE, return_none, [], None, None, False, "namespace", None),
(return_content, GOOD_CONF, False, "publicbase", None, return_non_server_hostname, [], "server_hostname", None, False, "namespace", None),
(return_content, GOOD_CONF, False, "warning", BAD_PATH, return_server_hostname, [], "server_hostname", None, False, "namespace", None),
(return_content, GOOD_CONF, False, "error", REPO_NOT_FOUND, return_path, [], "server_hostname", None, False, "namespace", None),
(return_content, GOOD_CONF, False, "error", REPO_NOT_FOUND, return_path, [], "server_hostname", "nonpublic", False, "namespace", None),
(return_content, GOOD_CONF, False, "requiresrobot", None, return_path, [], "server_hostname", "nonpublic", True, "path", "file"),
(return_content, GOOD_CONF, False, "publicbase", None, return_path, [], "server_hostname", "public", True, "path", "file"),
])
def test_trigger_analyzer(handler_fn, config_dict, admin_org_permission, status, message, get_base_image, robots,
server_hostname, get_repository, can_read, namespace, name,
get_monkeypatch):
patch_list_namespace_robots(get_monkeypatch)
patch_get_all_repo_users_transitive(get_monkeypatch)
patch_parse_dockerfile(get_monkeypatch, get_base_image)
patch_model_repository_get_repository(get_monkeypatch, get_repository)
patch_permissions(get_monkeypatch, can_read)
handler_mock = Mock()
handler_mock.configure_mock(**{'load_dockerfile_contents': handler_fn})
trigger_analyzer = TriggerAnalyzer(handler_mock, 'namespace', server_hostname, config_dict, admin_org_permission)
assert trigger_analyzer.analyze_trigger() == {'namespace': namespace,
'name': name,
'robots': robots,
'status': status,
'message': message,
'is_admin': admin_org_permission}

View file

@ -1,6 +1,5 @@
""" Create, list and manage build triggers. """
import json
import logging
from os import path
from urllib import quote
@ -22,9 +21,9 @@ from endpoints.api import (RepositoryParamResource, nickname, resource, require_
validate_json_request, api, path_param, abort,
disallow_for_app_repositories, disallow_under_trust)
from endpoints.api.build import build_status_view, trigger_view, RepositoryBuildStatus
from endpoints.api.trigger_analyzer import TriggerAnalyzer
from endpoints.building import start_build, MaximumBuildsQueuedException
from endpoints.exception import NotFound, Unauthorized, InvalidRequest
from util.dockerfileparse import parse_dockerfile
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
@ -35,6 +34,13 @@ def _prepare_webhook_url(scheme, username, password, hostname, path):
return urlunparse((scheme, auth_hostname, path, '', '', ''))
def get_trigger(trigger_uuid):
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
return trigger
@resource('/v1/repository/<apirepopath:repository>/trigger/')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
class BuildTriggerList(RepositoryParamResource):
@ -62,12 +68,7 @@ class BuildTrigger(RepositoryParamResource):
@nickname('getBuildTrigger')
def get(self, namespace_name, repo_name, trigger_uuid):
""" Get information for the specified build trigger. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
return trigger_view(trigger, can_admin=True)
return trigger_view(get_trigger(trigger_uuid), can_admin=True)
@require_repo_admin
@disallow_for_app_repositories
@ -75,10 +76,7 @@ class BuildTrigger(RepositoryParamResource):
@nickname('deleteBuildTrigger')
def delete(self, namespace_name, repo_name, trigger_uuid):
""" Delete the specified build trigger. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if handler.is_active():
@ -121,10 +119,7 @@ class BuildTriggerSubdirs(RepositoryParamResource):
@validate_json_request('BuildTriggerSubdirRequest')
def post(self, namespace_name, repo_name, trigger_uuid):
""" List the subdirectories available for the specified build trigger and source. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():
@ -189,11 +184,7 @@ class BuildTriggerActivate(RepositoryParamResource):
@validate_json_request('BuildTriggerActivateRequest')
def post(self, namespace_name, repo_name, trigger_uuid):
""" Activate the specified build trigger. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if handler.is_active():
raise InvalidRequest('Trigger config is not sufficient for activation.')
@ -290,10 +281,7 @@ class BuildTriggerAnalyze(RepositoryParamResource):
@validate_json_request('BuildTriggerAnalyzeRequest')
def post(self, namespace_name, repo_name, trigger_uuid):
""" Analyze the specified build trigger configuration. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
if trigger.repository.namespace_user.username != namespace_name:
raise NotFound()
@ -303,100 +291,14 @@ class BuildTriggerAnalyze(RepositoryParamResource):
new_config_dict = request.get_json()['config']
handler = BuildTriggerHandler.get_handler(trigger, new_config_dict)
def analyze_view(image_namespace, image_repository, status, message=None):
# Retrieve the list of robots and mark whether they have read access already.
robots = []
if AdministerOrganizationPermission(image_namespace).can():
if image_repository is not None:
perm_query = model.user.get_all_repo_users_transitive(image_namespace, image_repository)
user_ids_with_permission = set([user.id for user in perm_query])
else:
user_ids_with_permission = set()
def robot_view(robot):
return {
'name': robot.username,
'kind': 'user',
'is_robot': True,
'can_read': robot.id in user_ids_with_permission,
}
robots = [robot_view(robot) for robot in model.user.list_namespace_robots(image_namespace)]
return {
'namespace': image_namespace,
'name': image_repository,
'robots': robots,
'status': status,
'message': message,
'is_admin': AdministerOrganizationPermission(image_namespace).can(),
}
server_hostname = app.config['SERVER_HOSTNAME']
try:
# Default to the current namespace.
base_namespace = namespace_name
base_repository = None
# Load the contents of the Dockerfile.
contents = handler.load_dockerfile_contents()
if not contents:
return analyze_view(base_namespace, base_repository, 'warning',
message='Specified Dockerfile path for the trigger was not found on the main ' +
'branch. This trigger may fail.')
# Parse the contents of the Dockerfile.
parsed = parse_dockerfile(contents)
if not parsed:
return analyze_view(base_namespace, base_repository, 'error',
message='Could not parse the Dockerfile specified')
# Check whether the dockerfile_path is correct
if new_config_dict.get('context'):
if not is_parent(new_config_dict.get('context'), new_config_dict.get('dockerfile_path')):
return analyze_view(base_namespace, base_repository, 'error',
message='Dockerfile, %s, is not child of the context, %s.' %
(new_config_dict.get('context'), new_config_dict.get('dockerfile_path')))
# Determine the base image (i.e. the FROM) for the Dockerfile.
base_image = parsed.get_base_image()
if not base_image:
return analyze_view(base_namespace, base_repository, 'warning',
message='No FROM line found in the Dockerfile')
# Check to see if the base image lives in Quay.
quay_registry_prefix = '%s/' % (app.config['SERVER_HOSTNAME'])
if not base_image.startswith(quay_registry_prefix):
return analyze_view(base_namespace, base_repository, 'publicbase')
# Lookup the repository in Quay.
result = str(base_image)[len(quay_registry_prefix):].split('/', 2)
if len(result) != 2:
msg = '"%s" is not a valid Quay repository path' % (base_image)
return analyze_view(base_namespace, base_repository, 'warning', message=msg)
(base_namespace, base_repository) = result
found_repository = model.repository.get_repository(base_namespace, base_repository)
if not found_repository:
return {
'status': 'error',
'message': 'Repository "%s" referenced by the Dockerfile was not found' % (base_image)
}
# If the repository is private and the user cannot see that repo, then
# mark it as not found.
can_read = ReadRepositoryPermission(base_namespace, base_repository)
if found_repository.visibility.name != 'public' and not can_read:
return {
'status': 'error',
'message': 'Repository "%s" referenced by the Dockerfile was not found' % (base_image)
}
if found_repository.visibility.name == 'public':
return analyze_view(base_namespace, base_repository, 'publicbase')
else:
return analyze_view(base_namespace, base_repository, 'requiresrobot')
trigger_analyzer = TriggerAnalyzer(handler,
namespace_name,
server_hostname,
new_config_dict,
AdministerOrganizationPermission(namespace_name).can())
return trigger_analyzer.analyze_trigger()
except RepositoryReadException as rre:
return {
'status': 'error',
@ -407,30 +309,6 @@ class BuildTriggerAnalyze(RepositoryParamResource):
'status': 'notimplemented',
}
raise NotFound()
def is_parent(context, dockerfile_path):
""" This checks whether the context is a parent of the dockerfile_path"""
if context == "" or dockerfile_path == "":
return False
normalized_context = path.normpath(context)
if normalized_context[len(normalized_context) - 1] != path.sep:
normalized_context += path.sep
if normalized_context[0] != path.sep:
normalized_context = path.sep + normalized_context
normalized_subdir = path.normpath(path.dirname(dockerfile_path))
if normalized_subdir[0] != path.sep:
normalized_subdir = path.sep + normalized_subdir
if normalized_subdir[len(normalized_subdir) - 1] != path.sep:
normalized_subdir += path.sep
return normalized_subdir.startswith(normalized_context)
@resource('/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/start')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@ -466,10 +344,7 @@ class ActivateBuildTrigger(RepositoryParamResource):
@validate_json_request('RunParameters')
def post(self, namespace_name, repo_name, trigger_uuid):
""" Manually start a build from the specified trigger. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if not handler.is_active():
@ -530,10 +405,7 @@ class BuildTriggerFieldValues(RepositoryParamResource):
@nickname('listTriggerFieldValues')
def post(self, namespace_name, repo_name, trigger_uuid, field_name):
""" List the field values for a custom run field. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
config = request.get_json() or None
if AdministerRepositoryPermission(namespace_name, repo_name).can():
@ -578,10 +450,7 @@ class BuildTriggerSources(RepositoryParamResource):
""" List the build sources for the trigger configuration thus far. """
namespace = request.get_json()['namespace']
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():
@ -610,10 +479,7 @@ class BuildTriggerSourceNamespaces(RepositoryParamResource):
@nickname('listTriggerBuildSourceNamespaces')
def get(self, namespace_name, repo_name, trigger_uuid):
""" List the build sources for the trigger configuration thus far. """
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():

View file

@ -0,0 +1,122 @@
from os import path
from auth import permissions
from data import model
from util import dockerfileparse
def is_parent(context, dockerfile_path):
""" This checks whether the context is a parent of the dockerfile_path"""
if context == "" or dockerfile_path == "":
return False
normalized_context = path.normpath(context)
if normalized_context[len(normalized_context) - 1] != path.sep:
normalized_context += path.sep
if normalized_context[0] != path.sep:
normalized_context = path.sep + normalized_context
normalized_subdir = path.normpath(path.dirname(dockerfile_path))
if normalized_subdir[0] != path.sep:
normalized_subdir = path.sep + normalized_subdir
if normalized_subdir[len(normalized_subdir) - 1] != path.sep:
normalized_subdir += path.sep
return normalized_subdir.startswith(normalized_context)
class TriggerAnalyzer:
""" This analyzes triggers and returns the appropriate trigger and robot view to the frontend. """
def __init__(self, handler, namespace_name, server_hostname, new_config_dict, admin_org_permission):
self.handler = handler
self.namespace_name = namespace_name
self.server_hostname = server_hostname
self.new_config_dict = new_config_dict
self.admin_org_permission = admin_org_permission
def analyze_trigger(self):
# Load the contents of the Dockerfile.
contents = self.handler.load_dockerfile_contents()
if not contents:
return self.analyze_view(self.namespace_name, None, 'warning',
message='Specified Dockerfile path for the trigger was not found on the main ' +
'branch. This trigger may fail.')
# Parse the contents of the Dockerfile.
parsed = dockerfileparse.parse_dockerfile(contents)
if not parsed:
return self.analyze_view(self.namespace_name, None, 'error', message='Could not parse the Dockerfile specified')
# Check whether the dockerfile_path is correct
if self.new_config_dict.get('context') and not is_parent(self.new_config_dict.get('context'),
self.new_config_dict.get('dockerfile_path')):
return self.analyze_view(self.namespace_name, None, 'error',
message='Dockerfile, %s, is not a child of the context, %s.' %
(self.new_config_dict.get('context'),
self.new_config_dict.get('dockerfile_path')))
# Determine the base image (i.e. the FROM) for the Dockerfile.
base_image = parsed.get_base_image()
if not base_image:
return self.analyze_view(self.namespace_name, None, 'warning', message='No FROM line found in the Dockerfile')
# Check to see if the base image lives in Quay.
quay_registry_prefix = '%s/' % self.server_hostname
if not base_image.startswith(quay_registry_prefix):
return self.analyze_view(self.namespace_name, None, 'publicbase')
# Lookup the repository in Quay.
result = str(base_image)[len(quay_registry_prefix):].split('/', 2)
if len(result) != 2:
msg = '"%s" is not a valid Quay repository path' % base_image
return self.analyze_view(self.namespace_name, None, 'warning', message=msg)
(base_namespace, base_repository) = result
found_repository = model.repository.get_repository(base_namespace, base_repository)
if not found_repository:
return self.analyze_view(self.namespace_name, None, 'error',
message='Repository "%s" referenced by the Dockerfile was not found' % base_image)
# If the repository is private and the user cannot see that repo, then
# mark it as not found.
can_read = permissions.ReadRepositoryPermission(base_namespace, base_repository)
if found_repository.visibility.name != 'public' and not can_read:
return self.analyze_view(self.namespace_name, None, 'error',
message='Repository "%s" referenced by the Dockerfile was not found' % base_image)
if found_repository.visibility.name == 'public':
return self.analyze_view(base_namespace, base_repository, 'publicbase')
return self.analyze_view(base_namespace, base_repository, 'requiresrobot')
def analyze_view(self, image_namespace, image_repository, status, message=None):
# Retrieve the list of robots and mark whether they have read access already.
robots = []
if self.admin_org_permission:
if image_repository is not None:
perm_query = model.user.get_all_repo_users_transitive(image_namespace, image_repository)
user_ids_with_permission = set([user.id for user in perm_query])
else:
user_ids_with_permission = set()
def robot_view(robot):
return {
'name': robot.username,
'kind': 'user',
'is_robot': True,
'can_read': robot.id in user_ids_with_permission,
}
robots = [robot_view(robot) for robot in model.user.list_namespace_robots(image_namespace)]
return {
'namespace': image_namespace,
'name': image_repository,
'robots': robots,
'status': status,
'message': message,
'is_admin': self.admin_org_permission,
}