Return hashes and expiration when fetching signed tags
This commit is contained in:
parent
1a78722521
commit
217b4a5ab2
5 changed files with 42 additions and 44 deletions
|
@ -36,40 +36,43 @@ class TUFMetadataAPI(object):
|
|||
self._config = config
|
||||
self._client = client or config['HTTPCLIENT']
|
||||
|
||||
def get_default_tags(self, namespace, repository):
|
||||
""" Gets the tag -> sha mappings in the 'targets/releases' delegation
|
||||
def get_default_tags_with_expiration(self, namespace, repository, targets_file=None):
|
||||
""" Gets the tag -> sha mappings in the 'targets/releases' delegation
|
||||
Returns tags, their hashes, and their
|
||||
"""
|
||||
if not targets_file:
|
||||
targets_file = 'targets/releases.json'
|
||||
gun = "%s/%s" % (namespace, repository)
|
||||
|
||||
try:
|
||||
response = self._get(gun, "targets/releases.json")
|
||||
json_response = response.json()
|
||||
targets = self._parse_targets(json_response)
|
||||
response = self._get(gun, targets_file)
|
||||
signed = self._parse_signed(response.json())
|
||||
targets = signed.get('targets')
|
||||
expiration = signed.get('expires')
|
||||
except requests.exceptions.Timeout:
|
||||
logger.exception('Timeout when trying to get metadata for %s', gun)
|
||||
return None, True
|
||||
return None, None
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.exception('Connection error when trying to get metadata for %s', gun)
|
||||
return None, True
|
||||
return None, None
|
||||
except (requests.exceptions.RequestException, ValueError):
|
||||
logger.exception('Failed to get metadata for %s', gun)
|
||||
return None, False
|
||||
return None, None
|
||||
except Non200ResponseException as ex:
|
||||
return None, ex.response.status_code != 404 and ex.response.status_code != 400
|
||||
logger.exception('Failed request for %s: %s', gun, str(ex))
|
||||
return None, None
|
||||
except InvalidMetadataException as ex:
|
||||
logger.exception('Failed to parse targets from metadata', str(ex))
|
||||
return None, False
|
||||
return None, None
|
||||
|
||||
return targets, False
|
||||
return targets, expiration
|
||||
|
||||
def _parse_targets(self, json_response):
|
||||
def _parse_signed(self, json_response):
|
||||
""" Attempts to parse the targets from a metadata response """
|
||||
signed = json_response.get('signed')
|
||||
if not signed:
|
||||
raise InvalidMetadataException("Could not find `signed` in metadata: %s" % json_response)
|
||||
targets = signed.get('targets')
|
||||
if not targets:
|
||||
raise InvalidMetadataException("Could not find `targets` in metadata: %s" % json_response)
|
||||
return targets
|
||||
return signed
|
||||
|
||||
def _auth_header(self, gun):
|
||||
""" Generate a registry auth token for apostille"""
|
||||
|
|
|
@ -35,8 +35,8 @@ valid_response = {
|
|||
|
||||
|
||||
@pytest.mark.parametrize('response_code,response_body,expected', [
|
||||
(200, valid_response, valid_response['signed']['targets']),
|
||||
(200, {'garbage': 'data'}, None)
|
||||
(200, valid_response, (valid_response['signed']['targets'], '2020-03-30T18:55:26.594764859-04:00')),
|
||||
(200, {'garbage': 'data'}, (None, None))
|
||||
])
|
||||
def test_get_metadata(response_code, response_body, expected):
|
||||
app = Flask(__name__)
|
||||
|
@ -46,8 +46,8 @@ def test_get_metadata(response_code, response_body, expected):
|
|||
request.json.return_value = response_body
|
||||
client.request.return_value = request
|
||||
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
|
||||
tags, _ = tuf_api.get_default_tags('quay', 'quay')
|
||||
assert tags == expected
|
||||
response = tuf_api.get_default_tags_with_expiration('quay', 'quay')
|
||||
assert response == expected
|
||||
|
||||
@pytest.mark.parametrize('connection_error,response_code,exception', [
|
||||
(True, 200, requests.exceptions.Timeout),
|
||||
|
@ -67,5 +67,6 @@ def test_get_metadata_exception(connection_error, response_code, exception):
|
|||
client = mock.Mock(request=request)
|
||||
client.request.side_effect = exception
|
||||
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
|
||||
tags, _ = tuf_api.get_default_tags('quay', 'quay')
|
||||
assert tags == None
|
||||
tags, expiration = tuf_api.get_default_tags_with_expiration('quay', 'quay')
|
||||
assert tags == None
|
||||
assert expiration == None
|
Reference in a new issue