from datetime import datetime, timedelta import pytest from playhouse.test_utils import assert_query_count from app import docker_v2_signing_key from data import model from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, TagManifestLabel, TagManifest, TagManifestLabel) from data.registry_model.registry_pre_oci_model import PreOCIModel from data.registry_model.datatypes import RepositoryReference from test.fixtures import * @pytest.fixture() def pre_oci_model(initialized_db): return PreOCIModel() @pytest.mark.parametrize('names, expected', [ (['unknown'], None), (['latest'], {'latest'}), (['latest', 'prod'], {'latest', 'prod'}), (['latest', 'prod', 'another'], {'latest', 'prod'}), (['foo', 'prod'], {'prod'}), ]) def test_find_matching_tag(names, expected, pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.find_matching_tag(repository_ref, names) if expected is None: assert found is None else: assert found.name in expected @pytest.mark.parametrize('repo_namespace, repo_name, expected', [ ('devtable', 'simple', {'latest', 'prod'}), ('buynlarge', 'orgrepo', {'latest', 'prod'}), ]) def test_get_most_recent_tag(repo_namespace, repo_name, expected, pre_oci_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.get_most_recent_tag(repository_ref) if expected is None: assert found is None else: assert found.name in expected @pytest.mark.parametrize('repo_namespace, repo_name, expected', [ ('devtable', 'simple', True), ('buynlarge', 'orgrepo', True), ('buynlarge', 'unknownrepo', False), ]) def test_lookup_repository(repo_namespace, repo_name, expected, pre_oci_model): repo_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) if expected: assert repo_ref else: assert repo_ref is None @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('buynlarge', 'orgrepo'), ]) def test_lookup_manifests(repo_namespace, repo_name, pre_oci_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest']) found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) found = pre_oci_model.lookup_manifest_by_digest(repository_ref, found_manifest.digest, include_legacy_image=True) assert found._db_id == found_manifest._db_id assert found.digest == found_manifest.digest assert found.legacy_image def test_lookup_unknown_manifest(pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.lookup_manifest_by_digest(repository_ref, 'sha256:deadbeef') assert found is None @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_legacy_images(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) legacy_images = pre_oci_model.get_legacy_images(repository_ref) assert len(legacy_images) found_tags = set() for image in legacy_images: found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=True) with assert_query_count(4 if found_image.parents else 3): found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=True) assert found_image.docker_image_id == image.docker_image_id assert found_image.parents == image.parents # Check that the tags list can be retrieved. assert image.tags is not None found_tags.update({tag.name for tag in image.tags}) # Check against the actual DB row. model_image = model.image.get_image(repository_ref._db_id, found_image.docker_image_id) assert model_image.id == found_image._db_id assert ([pid for pid in reversed(model_image.ancestor_id_list())] == [p._db_id for p in found_image.parents]) # Try without parents and ensure it raises an exception. found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=False) with pytest.raises(Exception): assert not found_image.parents assert found_tags unknown = pre_oci_model.get_legacy_image(repository_ref, 'unknown', include_parents=True) assert unknown is None def test_manifest_labels(pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest']) found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) # Create a new label. created = pre_oci_model.create_manifest_label(found_manifest, 'foo', 'bar', 'api') assert created.key == 'foo' assert created.value == 'bar' assert created.source_type_name == 'api' assert created.media_type_name == 'text/plain' # Ensure we can look it up. assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) == created # Ensure it is in our list of labels. assert created in pre_oci_model.list_manifest_labels(found_manifest) assert created in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='fo') # Ensure it is *not* in our filtered list. assert created not in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='ba') # Delete the label and ensure it is gone. assert pre_oci_model.delete_manifest_label(found_manifest, created.uuid) assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) is None assert created not in pre_oci_model.list_manifest_labels(found_manifest) @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_repository_tags(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) with assert_query_count(1): tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert len(tags) for tag in tags: with assert_query_count(2): found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True) assert found_tag == tag if found_tag.legacy_image is None: continue with assert_query_count(2): found_image = pre_oci_model.get_legacy_image(repository_ref, found_tag.legacy_image.docker_image_id) assert found_image == found_tag.legacy_image def test_repository_tag_history(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'history') with assert_query_count(2): history, has_more = pre_oci_model.list_repository_tag_history(repository_ref) assert not has_more assert len(history) == 2 @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_delete_tags(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert len(tags) # Save history before the deletions. previous_history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000) assert len(previous_history) >= len(tags) # Delete every tag in the repository. for tag in tags: assert pre_oci_model.delete_tag(repository_ref, tag.name) # Make sure the tag is no longer found. with assert_query_count(1): found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True) assert found_tag is None # Ensure all tags have been deleted. tags = pre_oci_model.list_repository_tags(repository_ref) assert not len(tags) # Ensure that the tags all live in history. history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000) assert len(history) == len(previous_history) @pytest.mark.parametrize('use_manifest', [ True, False, ]) def test_retarget_tag_history(use_manifest, pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'history') history, _ = pre_oci_model.list_repository_tag_history(repository_ref) if use_manifest: manifest_or_legacy_image = pre_oci_model.lookup_manifest_by_digest(repository_ref, history[1].manifest_digest, allow_dead=True) else: manifest_or_legacy_image = history[1].legacy_image # Retarget the tag. assert manifest_or_legacy_image updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', manifest_or_legacy_image, is_reversion=True) # Ensure the tag has changed targets. if use_manifest: assert updated_tag.manifest_digest == manifest_or_legacy_image.digest else: assert updated_tag.legacy_image == manifest_or_legacy_image # Ensure history has been updated. new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref) assert len(new_history) == len(history) + 1 def test_retarget_tag(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') history, _ = pre_oci_model.list_repository_tag_history(repository_ref) prod_tag = pre_oci_model.get_repo_tag(repository_ref, 'prod', include_legacy_image=True) # Retarget the tag. updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', prod_tag.legacy_image) # Ensure the tag has changed targets. assert updated_tag.legacy_image == prod_tag.legacy_image # Ensure history has been updated. new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref) assert len(new_history) == len(history) + 1 def test_change_repository_tag_expiration(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') assert tag.lifetime_end_ts is None new_datetime = datetime.utcnow() + timedelta(days=2) previous, okay = pre_oci_model.change_repository_tag_expiration(tag, new_datetime) assert okay assert previous is None tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') assert tag.lifetime_end_ts is not None @pytest.mark.parametrize('repo_namespace, repo_name, expected_non_empty', [ ('devtable', 'simple', []), ('devtable', 'complex', ['prod', 'v2.0']), ('devtable', 'history', ['latest']), ('buynlarge', 'orgrepo', []), ('devtable', 'gargantuan', ['v2.0', 'v3.0', 'v4.0', 'v5.0', 'v6.0']), ]) def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_empty, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert len(tags) non_empty = set() for tag in tags: if pre_oci_model.get_legacy_images_owned_by_tag(tag): non_empty.add(tag.name) assert non_empty == set(expected_non_empty) def test_get_security_status(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert len(tags) for tag in tags: assert pre_oci_model.get_security_status(tag.legacy_image) @pytest.fixture() def clear_rows(initialized_db): # Remove all new-style rows so we can backfill. TagManifestLabelMap.delete().execute() ManifestLabel.delete().execute() ManifestBlob.delete().execute() ManifestLegacyImage.delete().execute() TagManifestToManifest.delete().execute() Manifest.delete().execute() TagManifestLabel.delete().execute() TagManifest.delete().execute() @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_backfill_manifest_for_tag(repo_namespace, repo_name, clear_rows, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert tags for tag in tags: assert not tag.manifest_digest assert pre_oci_model.backfill_manifest_for_tag(tag) tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert tags for tag in tags: assert tag.manifest_digest manifest = pre_oci_model.get_manifest_for_tag(tag) assert manifest legacy_image = pre_oci_model.get_legacy_image(repository_ref, tag.legacy_image.docker_image_id, include_parents=True) parsed_manifest = manifest.get_parsed_manifest() assert parsed_manifest.leaf_layer_v1_image_id == legacy_image.docker_image_id assert parsed_manifest.parent_image_ids == {p.docker_image_id for p in legacy_image.parents}