This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/test/test_manifestbackfillworker.py
Joseph Schorr 96e0fc4ad6 Fix manifest backfill for manifests pointing to V1 images
V1 images don't have checksums, so we just always use the full storage set for the manifest, rather than a checksum map
2018-08-13 15:51:18 -04:00

70 lines
2.7 KiB
Python

from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
TagManifestLabel)
from workers.manifestbackfillworker import backfill_manifest
from test.fixtures import *
@pytest.fixture()
def clear_rows(initialized_db):
# Remove all new-style rows so we can backfill.
TagManifestLabelMap.delete().execute()
ManifestLabel.delete().execute()
ManifestBlob.delete().execute()
ManifestLegacyImage.delete().execute()
TagManifestToManifest.delete().execute()
Manifest.delete().execute()
def test_manifestbackfillworker(clear_rows, initialized_db):
for tag_manifest in TagManifest.select():
# Backfill the manifest.
assert backfill_manifest(tag_manifest)
# Ensure if we try again, the backfill is skipped.
assert not backfill_manifest(tag_manifest)
# Ensure that we now have the expected manifest rows.
map_row = TagManifestToManifest.get(tag_manifest=tag_manifest)
assert not map_row.broken
manifest_row = map_row.manifest
assert manifest_row.manifest_bytes == tag_manifest.json_data
assert manifest_row.digest == tag_manifest.digest
assert manifest_row.repository == tag_manifest.tag.repository
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert tag_manifest.tag.image == legacy_image
expected_storages = {tag_manifest.tag.image.storage.id}
for parent_image_id in tag_manifest.tag.image.ancestor_id_list():
expected_storages.add(Image.get(id=parent_image_id).storage_id)
found_storages = {manifest_blob.blob_id for manifest_blob
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
assert expected_storages == found_storages
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
# Delete existing tag manifest so we can reuse the tag.
TagManifestLabel.delete().execute()
TagManifest.delete().execute()
# Add a broken manifest.
broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar',
tag=RepositoryTag.get())
# Ensure the backfill works.
assert backfill_manifest(broken_manifest)
# Ensure the mapping is marked as broken.
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
assert map_row.broken
manifest_row = map_row.manifest
assert manifest_row.manifest_bytes == broken_manifest.json_data
assert manifest_row.digest == broken_manifest.digest
assert manifest_row.repository == broken_manifest.tag.repository
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert broken_manifest.tag.image == legacy_image