Fix tuf api calls

This commit is contained in:
Evan Cordell 2017-05-23 10:11:48 -04:00
parent 961ebdfe44
commit f877865e82
2 changed files with 42 additions and 27 deletions

View file

@ -8,37 +8,47 @@ from endpoints.api.signing import RepositorySignatures
from test.fixtures import *
VALID_TARGETS = {
'latest': {
'hashes': {
'sha256': 'mLmxwTyUrqIRDaz8uaBapfrp3GPERfsDg2kiMujlteo='
VALID_TARGETS_MAP = {
"targets/ci": {
"targets": {
"latest": {
"hashes": {
"sha256": "2Q8GLEgX62VBWeL76axFuDj/Z1dd6Zhx0ZDM6kNwPkQ="
},
'length': 1500
},
'test_tag': {
'hashes': {
'sha256': '1234123'
},
'length': 50
"length": 2111
}
},
"expiration": "2020-05-22T10:26:46.618176424-04:00"
},
"targets": {
"targets": {
"latest": {
"hashes": {
"sha256": "2Q8GLEgX62VBWeL76axFuDj/Z1dd6Zhx0ZDM6kNwPkQ="
},
"length": 2111
}
},
"expiration": "2020-05-22T10:26:01.953414888-04:00"}
}
def tags_equal(expected, actual):
expected_tags = expected.get('tags')
actual_tags = actual.get('tags')
expected_tags = expected.get('delegations')
actual_tags = actual.get('delegations')
if expected_tags and actual_tags:
return Counter(expected_tags) == Counter(actual_tags)
return expected == actual
@pytest.mark.parametrize('targets,expected', [
(VALID_TARGETS, {'tags': VALID_TARGETS, 'expiration': 'expires'}),
({'bad': 'tags'}, {'tags': {'bad': 'tags'}, 'expiration': 'expires'}),
({}, {'tags': {}, 'expiration': 'expires'}),
(None, {'tags': None, 'expiration': 'expires'}), # API returns None on exceptions
@pytest.mark.parametrize('targets_map,expected', [
(VALID_TARGETS_MAP, {'delegations': VALID_TARGETS_MAP}),
({'bad': 'tags'}, {'delegations': {'bad': 'tags'}}),
({}, {'delegations': {}}),
(None, {'delegations': None}), # API returns None on exceptions
])
def test_get_signatures(targets, expected, client):
def test_get_signatures(targets_map, expected, client):
with patch('endpoints.api.signing.tuf_metadata_api') as mock_tuf:
mock_tuf.get_default_tags_with_expiration.return_value = (targets, 'expires')
mock_tuf.get_all_tags_with_expiration.return_value = targets_map
with client_with_identity('devtable', client) as cl:
params = {'repository': 'devtable/trusted'}
assert tags_equal(expected, conduct_api_call(cl, RepositorySignatures, 'GET', params, None, 200).json)

View file

@ -11,7 +11,7 @@ from data.database import CloseForLongOperation
from util.abchelpers import nooper
from util.failover import failover, FailoverException
from util.security.instancekeys import InstanceKeys
from util.security.registry_jwt import build_context_and_subject, generate_bearer_token, QUAY_TUF_ROOT
from util.security.registry_jwt import build_context_and_subject, generate_bearer_token, QUAY_TUF_ROOT, SIGNER_TUF_ROOT
DEFAULT_HTTP_HEADERS = {'Connection': 'close'}
@ -151,15 +151,20 @@ class ImplementedTUFMetadataAPI(TUFMetadataAPIInterface):
if not targets_file:
targets_file = 'targets.json'
targets_name = targets_file
if targets_name.endswith('.json'):
targets_name = targets_name[:-5]
if not targets_map:
targets_map = {}
signed = self._get_signed(namespace, repository, targets_file)
if not signed:
return None
targets_map[targets_name] = None
return targets_map
if signed.get('targets'):
targets_map[targets_file] = {
targets_map[targets_name] = {
'targets': signed.get('targets'),
'expiration': signed.get('expires'),
}
@ -167,7 +172,7 @@ class ImplementedTUFMetadataAPI(TUFMetadataAPIInterface):
delegation_names = [role.get('name') for role in signed.get('delegations').get('roles')]
for delegation in delegation_names:
targets_map = self.get_all_tags_with_expiration(namespace, repository, targets_file=delegation, targets_map=targets_map)
targets_map = self.get_all_tags_with_expiration(namespace, repository, targets_file=delegation + '.json', targets_map=targets_map)
return targets_map
@ -235,7 +240,7 @@ class ImplementedTUFMetadataAPI(TUFMetadataAPIInterface):
'name': gun,
'actions': actions,
}]
context, subject = build_context_and_subject(user=None, token=None, oauthtoken=None, tuf_root=QUAY_TUF_ROOT)
context, subject = build_context_and_subject(user=None, token=None, oauthtoken=None, tuf_root=SIGNER_TUF_ROOT)
token = generate_bearer_token(self._config["SERVER_HOSTNAME"], subject, context, access,
TOKEN_VALIDITY_LIFETIME_S, self._instance_keys)
return {'Authorization': 'Bearer %s' % token}