import re import os.path import hashlib DIGEST_PATTERN = r'(tarsum\.(v[\w]+)\+)?([\w]+):([0-9a-f]+)' class InvalidDigestException(RuntimeError): pass class Digest(object): DIGEST_REGEX = re.compile(DIGEST_PATTERN) def __init__(self, hash_alg, hash_bytes, is_tarsum=False, tarsum_version=None): self._hash_alg = hash_alg self._hash_bytes = hash_bytes self._is_tarsum = is_tarsum self._tarsum_version = tarsum_version def __str__(self): if self._is_tarsum: return 'tarsum.{0}+{1}:{2}'.format(self._tarsum_version, self._hash_alg, self._hash_bytes) return '{0}:{1}'.format(self._hash_alg, self._hash_bytes) def __eq__(self, rhs): return isinstance(rhs, Digest) and str(self) == str(rhs) @staticmethod def parse_digest(digest): """ Returns the digest parsed out to its components. """ match = Digest.DIGEST_REGEX.match(digest) if match is None or match.end() != len(digest): raise InvalidDigestException('Not a valid digest: %s', digest) is_tarsum = match.group(1) is not None return Digest(match.group(3), match.group(4), is_tarsum, match.group(2)) @property def is_tarsum(self): return self._is_tarsum @property def tarsum_version(self): return self._tarsum_version @property def hash_alg(self): return self._hash_alg @property def hash_bytes(self): return self._hash_bytes def content_path(digest): """ Returns a relative path to the parsed digest. """ parsed = Digest.parse_digest(digest) components = [] if parsed.is_tarsum: components.extend(['tarsum', parsed.tarsum_version]) # Generate a prefix which is always two characters, and which will be filled with leading zeros # if the input does not contain at least two characters. e.g. ABC -> AB, A -> 0A prefix = parsed.hash_bytes[0:2].zfill(2) components.extend([parsed.hash_alg, prefix, parsed.hash_bytes]) return os.path.join(*components) def sha256_digest(content): """ Returns a sha256 hash of the content bytes in digest form. """ def single_chunk_generator(): yield content return sha256_digest_from_generator(single_chunk_generator()) def sha256_digest_from_generator(content_generator): """ Reads all of the data from the iterator and creates a sha256 digest from the content """ digest = hashlib.sha256() for chunk in content_generator: digest.update(chunk) return 'sha256:{0}'.format(digest.hexdigest()) def sha256_digest_from_hashlib(sha256_hash_obj): return 'sha256:{0}'.format(sha256_hash_obj.hexdigest()) def digests_equal(lhs_digest_string, rhs_digest_string): """ Parse and compare the two digests, returns True if the digests are equal, False otherwise. """ return Digest.parse_digest(lhs_digest_string) == Digest.parse_digest(rhs_digest_string)