Add api for getting all signed tags, separated by delegation
This commit is contained in:
parent
6023e15274
commit
3e3ed11634
2 changed files with 199 additions and 22 deletions
|
@ -66,6 +66,22 @@ class TUFMetadataAPIInterface(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_all_tags_with_expiration(self, namespace, repository, targets_file=None, targets_map=None):
|
||||
"""
|
||||
Gets the tag -> sha mappings of all delegations for a repo, as well as the expiration of the signatures.
|
||||
Does not verify the metadata, this is purely for display purposes.
|
||||
|
||||
Args:
|
||||
namespace: namespace containing the repository
|
||||
repository: the repo to get tags for
|
||||
targets_file: the specific target or delegation to read from. Default: targets.json
|
||||
|
||||
Returns:
|
||||
targets
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_metadata(self, namespace, repository):
|
||||
"""
|
||||
|
@ -111,30 +127,49 @@ class ImplementedTUFMetadataAPI(TUFMetadataAPIInterface):
|
|||
|
||||
if not targets_file:
|
||||
targets_file = 'targets/releases.json'
|
||||
gun = self._gun(namespace, repository)
|
||||
|
||||
try:
|
||||
response = self._get(gun, targets_file)
|
||||
signed = self._parse_signed(response.json())
|
||||
targets = signed.get('targets')
|
||||
expiration = signed.get('expires')
|
||||
except requests.exceptions.Timeout:
|
||||
logger.exception('Timeout when trying to get metadata for %s', gun)
|
||||
return None, None
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.exception('Connection error when trying to get metadata for %s', gun)
|
||||
return None, None
|
||||
except (requests.exceptions.RequestException, ValueError):
|
||||
logger.exception('Failed to get metadata for %s', gun)
|
||||
return None, None
|
||||
except Non200ResponseException as ex:
|
||||
logger.exception('Failed request for %s: %s', gun, str(ex))
|
||||
return None, None
|
||||
except InvalidMetadataException as ex:
|
||||
logger.exception('Failed to parse targets from metadata', str(ex))
|
||||
signed = self._get_signed(namespace, repository, targets_file)
|
||||
if not signed:
|
||||
return None, None
|
||||
|
||||
return targets, expiration
|
||||
return signed.get('targets'), signed.get('expires')
|
||||
|
||||
def get_all_tags_with_expiration(self, namespace, repository, targets_file=None, targets_map=None):
|
||||
"""
|
||||
Gets the tag -> sha mappings of all delegations for a repo, as well as the expiration of the signatures.
|
||||
Does not verify the metadata, this is purely for display purposes.
|
||||
|
||||
Args:
|
||||
namespace: namespace containing the repository
|
||||
repository: the repo to get tags for
|
||||
targets_file: the specific target or delegation to read from. Default: targets.json
|
||||
|
||||
Returns:
|
||||
targets
|
||||
"""
|
||||
|
||||
if not targets_file:
|
||||
targets_file = 'targets.json'
|
||||
|
||||
if not targets_map:
|
||||
targets_map = {}
|
||||
|
||||
signed = self._get_signed(namespace, repository, targets_file)
|
||||
if not signed:
|
||||
return None
|
||||
|
||||
if signed.get('targets'):
|
||||
targets_map[targets_file] = {
|
||||
'targets': signed.get('targets'),
|
||||
'expiration': signed.get('expires'),
|
||||
}
|
||||
|
||||
delegation_names = [role.get('name') for role in signed.get('delegations').get('roles')]
|
||||
|
||||
for delegation in delegation_names:
|
||||
targets_map = self.get_all_tags_with_expiration(namespace, repository, targets_file=delegation, targets_map=targets_map)
|
||||
|
||||
return targets_map
|
||||
|
||||
def delete_metadata(self, namespace, repository):
|
||||
"""
|
||||
|
@ -166,6 +201,25 @@ class ImplementedTUFMetadataAPI(TUFMetadataAPIInterface):
|
|||
|
||||
def _gun(self, namespace, repository):
|
||||
return join(self._gun_prefix, namespace, repository)
|
||||
|
||||
def _get_signed(self, namespace, repository, targets_file):
|
||||
gun = self._gun(namespace, repository)
|
||||
|
||||
try:
|
||||
response = self._get(gun, targets_file)
|
||||
signed = self._parse_signed(response.json())
|
||||
return signed
|
||||
except requests.exceptions.Timeout:
|
||||
logger.exception('Timeout when trying to get metadata for %s', gun)
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.exception('Connection error when trying to get metadata for %s', gun)
|
||||
except (requests.exceptions.RequestException, ValueError):
|
||||
logger.exception('Failed to get metadata for %s', gun)
|
||||
except Non200ResponseException as ex:
|
||||
logger.exception('Failed request for %s: %s', gun, str(ex))
|
||||
except InvalidMetadataException as ex:
|
||||
logger.exception('Failed to parse targets from metadata', str(ex))
|
||||
return None
|
||||
|
||||
def _parse_signed(self, json_response):
|
||||
""" Attempts to parse the targets from a metadata response """
|
||||
|
|
Reference in a new issue