This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/config_app/config_util/test/test_k8saccessor.py

69 lines
2.7 KiB
Python
Raw Normal View History

import pytest
from httmock import urlmatch, HTTMock, response
from config_app.config_util.k8saccessor import KubernetesAccessorSingleton
from config_app.config_util.k8sconfig import KubernetesConfig
@pytest.mark.parametrize('kube_config, expected_api, expected_query', [
({'api_host': 'www.customhost.com'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host': 'www.customhost.com', 'qe_deployment_selector': 'custom-selector'},
'/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments',
'labelSelector=quay-enterprise-component%3Dcustom-selector'),
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments', 'labelSelector=quay-enterprise-component%3Dapp'),
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace', 'qe_deployment_selector': 'custom-selector'},
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments',
'labelSelector=quay-enterprise-component%3Dcustom-selector'),
])
def test_get_qe_deployments(kube_config, expected_api, expected_query):
config = KubernetesConfig(**kube_config)
url_hit = [False]
@urlmatch(netloc=r'www.customhost.com')
def handler(request, _):
assert request.path == expected_api
assert request.query == expected_query
url_hit[0] = True
return response(200, '{}')
with HTTMock(handler):
KubernetesAccessorSingleton._instance = None
assert KubernetesAccessorSingleton.get_instance(config).get_qe_deployments() is not None
assert url_hit[0]
@pytest.mark.parametrize('kube_config, deployment_names, expected_api_hits', [
({'api_host': 'www.customhost.com'}, [], []),
({'api_host': 'www.customhost.com'}, ['myDeployment'],
['/apis/extensions/v1beta1/namespaces/quay-enterprise/deployments/myDeployment']),
({'api_host': 'www.customhost.com', 'qe_namespace': 'custom-namespace'},
['myDeployment', 'otherDeployment'],
['/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/myDeployment',
'/apis/extensions/v1beta1/namespaces/custom-namespace/deployments/otherDeployment']),
])
def test_cycle_qe_deployments(kube_config, deployment_names, expected_api_hits):
KubernetesAccessorSingleton._instance = None
config = KubernetesConfig(**kube_config)
url_hit = [False] * len(expected_api_hits)
i = [0]
@urlmatch(netloc=r'www.customhost.com', method='PATCH')
def handler(request, _):
assert request.path == expected_api_hits[i[0]]
url_hit[i[0]] = True
i[0] += 1
return response(200, '{}')
with HTTMock(handler):
KubernetesAccessorSingleton.get_instance(config).cycle_qe_deployments(deployment_names)
assert all(url_hit)