from playhouse.test_utils import assert_query_count from app import docker_v2_signing_key from data.database import Tag, ManifestBlob, get_epoch_timestamp_ms from data.model.oci.manifest import lookup_manifest, get_or_create_manifest from data.model.oci.tag import filter_to_alive_tags, get_tag from data.model.oci.shared import get_legacy_image_for_manifest from data.model.repository import get_repository from image.docker.schema1 import DockerSchema1ManifestBuilder, DockerSchema1Manifest from test.fixtures import * def test_lookup_manifest(initialized_db): found = False for tag in filter_to_alive_tags(Tag.select()): found = True repo = tag.repository digest = tag.manifest.digest with assert_query_count(1): assert lookup_manifest(repo, digest) == tag.manifest assert found for tag in Tag.select(): repo = tag.repository digest = tag.manifest.digest with assert_query_count(1): assert lookup_manifest(repo, digest, allow_dead=True) == tag.manifest def test_lookup_manifest_dead_tag(initialized_db): dead_tag = Tag.select().where(Tag.lifetime_end_ms <= get_epoch_timestamp_ms()).get() assert dead_tag.lifetime_end_ms <= get_epoch_timestamp_ms() assert lookup_manifest(dead_tag.repository, dead_tag.manifest.digest) is None assert (lookup_manifest(dead_tag.repository, dead_tag.manifest.digest, allow_dead=True) == dead_tag.manifest) def test_get_or_create_manifest(initialized_db): repository = get_repository('devtable', 'simple') latest_tag = get_tag(repository, 'latest') legacy_image = get_legacy_image_for_manifest(latest_tag.manifest) parsed = DockerSchema1Manifest(latest_tag.manifest.manifest_bytes, validate=False) builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'anothertag') builder.add_layer(parsed.blob_digests[0], '{"id": "%s"}' % legacy_image.docker_image_id) sample_manifest_instance = builder.build(docker_v2_signing_key) # Create a new manifest. created, newly_created = get_or_create_manifest(repository, sample_manifest_instance) assert newly_created assert created is not None assert created.digest == sample_manifest_instance.digest assert created.manifest_bytes == sample_manifest_instance.bytes assert get_legacy_image_for_manifest(created) is not None blob_digests = [mb.blob.content_checksum for mb in ManifestBlob.select().where(ManifestBlob.manifest == created)] assert parsed.blob_digests[0] in blob_digests # Retrieve it again and ensure it is the same manifest. created2, newly_created2 = get_or_create_manifest(repository, sample_manifest_instance) assert not newly_created2 assert created2 == created def test_get_or_create_manifest_invalid_image(initialized_db): repository = get_repository('devtable', 'simple') latest_tag = get_tag(repository, 'latest') parsed = DockerSchema1Manifest(latest_tag.manifest.manifest_bytes, validate=False) builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'anothertag') builder.add_layer(parsed.blob_digests[0], '{"id": "foo", "parent": "someinvalidimageid"}') sample_manifest_instance = builder.build(docker_v2_signing_key) created, newly_created = get_or_create_manifest(repository, sample_manifest_instance) assert created is None assert newly_created is None