This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/digest/digest_tools.py
Jake Moshenko e1b3e9e6ae Another huge batch of registry v2 changes
Add patch support and resumeable sha
Implement all actual registry methods
Add a simple database generation option
2015-08-12 16:41:12 -04:00

96 lines
2.7 KiB
Python

import re
import os.path
import hashlib
DIGEST_PATTERN = r'(tarsum\.(v[\w]+)\+)?([\w]+):([0-9a-f]+)'
class InvalidDigestException(RuntimeError):
pass
class Digest(object):
DIGEST_REGEX = re.compile(DIGEST_PATTERN)
def __init__(self, hash_alg, hash_bytes, is_tarsum=False, tarsum_version=None):
self._hash_alg = hash_alg
self._hash_bytes = hash_bytes
self._is_tarsum = is_tarsum
self._tarsum_version = tarsum_version
def __str__(self):
if self._is_tarsum:
return 'tarsum.{0}+{1}:{2}'.format(self._tarsum_version, self._hash_alg, self._hash_bytes)
return '{0}:{1}'.format(self._hash_alg, self._hash_bytes)
def __eq__(self, rhs):
return isinstance(rhs, Digest) and str(self) == str(rhs)
@staticmethod
def parse_digest(digest):
""" Returns the digest parsed out to its components. """
match = Digest.DIGEST_REGEX.match(digest)
if match is None or match.end() != len(digest):
raise InvalidDigestException('Not a valid digest: %s', digest)
is_tarsum = match.group(1) is not None
return Digest(match.group(3), match.group(4), is_tarsum, match.group(2))
@property
def is_tarsum(self):
return self._is_tarsum
@property
def tarsum_version(self):
return self._tarsum_version
@property
def hash_alg(self):
return self._hash_alg
@property
def hash_bytes(self):
return self._hash_bytes
def content_path(digest):
""" Returns a relative path to the parsed digest. """
parsed = Digest.parse_digest(digest)
components = []
if parsed.is_tarsum:
components.extend(['tarsum', parsed.tarsum_version])
# Generate a prefix which is always two characters, and which will be filled with leading zeros
# if the input does not contain at least two characters. e.g. ABC -> AB, A -> 0A
prefix = parsed.hash_bytes[0:2].zfill(2)
components.extend([parsed.hash_alg, prefix, parsed.hash_bytes])
return os.path.join(*components)
def sha256_digest(content):
""" Returns a sha256 hash of the content bytes in digest form. """
def single_chunk_generator():
yield content
return sha256_digest_from_generator(single_chunk_generator())
def sha256_digest_from_generator(content_generator):
""" Reads all of the data from the iterator and creates a sha256 digest from the content
"""
digest = hashlib.sha256()
for chunk in content_generator:
digest.update(chunk)
return 'sha256:{0}'.format(digest.hexdigest())
def sha256_digest_from_hashlib(sha256_hash_obj):
return 'sha256:{0}'.format(sha256_hash_obj.hexdigest())
def digests_equal(lhs_digest_string, rhs_digest_string):
""" Parse and compare the two digests, returns True if the digests are equal, False otherwise.
"""
return Digest.parse_digest(lhs_digest_string) == Digest.parse_digest(rhs_digest_string)