import json import tarfile from cachetools import lru_cache from io import BytesIO from image.docker.interfaces import ContentRetriever class ContentRetrieverForTesting(ContentRetriever): def __init__(self, digests=None): self.digests = digests or {} def add_digest(self, digest, content): self.digests[digest] = content def get_manifest_bytes_with_digest(self, digest): return self.digests.get(digest) def get_blob_bytes_with_digest(self, digest): return self.digests.get(digest) @classmethod def for_config(cls, config_obj, digest, size): config_str = json.dumps(config_obj) padded_string = config_str + ' ' * (size - len(config_str)) digests = {} digests[digest] = padded_string return ContentRetrieverForTesting(digests) @lru_cache(maxsize=1) def generate_empty_layer_data(): """ Generates the layer data for an "empty" layer. """ with BytesIO() as f: tar_file = tarfile.open(fileobj=f, mode='w|gw') tar_file.close() return f.getvalue()