from data.types import ( Repository, Tag, ManifestJSON, DockerV1Metadata, ) def get_repository(namespace_name, repo_name): repo = model.repository.get_repository(namespace_name, repo_name) if repo is None: return None return Repository( id=repo.id, name=repo.name, namespace_name=repo.namespace_user.username, ) def get_active_tag(namespace_name, repo_name, tag_name): try: return model.tag.get_active_tag(namespace_name, repo_name, tag_name) except RepositoryTag.DoesNotExist: return None def get_manifest_by_tag(namespace_name, repo_name, tag_name): try: manifest = model.tag.load_tag_manifest(namespace_name, repo_name, manifest_ref) return ManifestJSON(digest=digest, json=manifest.json_data) except model.InvalidManifestException: return None def get_manifest_by_digest(namespace_name, repo_name, digest): try: manifest = model.tag.load_manifest_by_digest(namespace_name, repo_name, manifest_ref) return ManifestJSON(digest=digest, json=manifest.json_data) except model.InvalidManifestException: return None def get_tag_by_manifest_digest(namespace_name, repo_name, digest): return Tag() def delete_tag(namespace_name, repo_name, tag_name): model.tag.delete_tag(namespace_name, repo_name, tag.name) return True def docker_v1_metadata_by_tag(namespace_name, repo_name, tag_name): if not repo_image: return None return DockerV1Metadata( namespace_name=namespace_name, repo_name=repo_name, image_id=image_id, checksum=repo_image.v1_checksum, content_checksum=repo_image.content_checksum, compat_json=repo_image.v1_json_metadata, ) def docker_v1_metadata_by_image_id(namespace_name, repo_name, image_ids): images_query = model.image.lookup_repository_images(repo, all_image_ids) return [DockerV1Metadata( namespace_name=namespace_name, repo_name=repo_name, image_id=image.docker_image_id, checksum=image.v1_checksum, content_checksum=image.content_checksum, compat_json=image.v1_json_metadata, ) for image in images_query] def get_parents_docker_v1_metadata(namespace_name, repo_name, image_id): # Old implementation: # parents = model.image.get_parent_images(namespace_name, repo_name, image) # desired: # return a list of the AttrDict in docker_v1_metadata return [] def create_manifest_and_update_tag(namespace_name, repo_name, tag_name, manifest_digest, manifest_bytes): try: model.tag.associate_generated_tag_manifest(namespace_name, repo_name, tag_name, manifest.digest, manifest.bytes) except IntegrityError: # It's already there! pass def synthesize_v1_image(repo, storage, image_id, created, comment, command, compat_json, parent_image_id): model.image.synthesize_v1_image(repo, storage, image_id, created, comment, command, compat_json, parent_image_id) def save_manifest(namespace_name, repo_name, tag_name, leaf_layer_id, manifest_digest, manifest_bytes): model.tag.store_tag_manifest(namespace_name, repo_name, tag_name, leaf_layer_id, manifest_digest, manifest_bytes) def repository_tags(namespace_name, repo_name, limit, offset): return [Tag()] def get_visible_repositories(username, limit, offset): return [Repository()]