from datetime import datetime, timedelta import pytest from playhouse.test_utils import assert_query_count from app import docker_v2_signing_key from data import model from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, TagManifestLabel, TagManifest, TagManifestLabel, DerivedStorageForImage, TorrentInfo) from data.registry_model.registry_pre_oci_model import PreOCIModel from data.registry_model.datatypes import RepositoryReference from test.fixtures import * @pytest.fixture() def pre_oci_model(initialized_db): return PreOCIModel() @pytest.mark.parametrize('names, expected', [ (['unknown'], None), (['latest'], {'latest'}), (['latest', 'prod'], {'latest', 'prod'}), (['latest', 'prod', 'another'], {'latest', 'prod'}), (['foo', 'prod'], {'prod'}), ]) def test_find_matching_tag(names, expected, pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.find_matching_tag(repository_ref, names) if expected is None: assert found is None else: assert found.name in expected assert found.repository.namespace_name == 'devtable' assert found.repository.name == 'simple' @pytest.mark.parametrize('repo_namespace, repo_name, expected', [ ('devtable', 'simple', {'latest', 'prod'}), ('buynlarge', 'orgrepo', {'latest', 'prod'}), ]) def test_get_most_recent_tag(repo_namespace, repo_name, expected, pre_oci_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.get_most_recent_tag(repository_ref) if expected is None: assert found is None else: assert found.name in expected @pytest.mark.parametrize('repo_namespace, repo_name, expected', [ ('devtable', 'simple', True), ('buynlarge', 'orgrepo', True), ('buynlarge', 'unknownrepo', False), ]) def test_lookup_repository(repo_namespace, repo_name, expected, pre_oci_model): repo_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) if expected: assert repo_ref else: assert repo_ref is None @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('buynlarge', 'orgrepo'), ]) def test_lookup_manifests(repo_namespace, repo_name, pre_oci_model): repo = model.repository.get_repository(repo_namespace, repo_name) repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest']) found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) found = pre_oci_model.lookup_manifest_by_digest(repository_ref, found_manifest.digest, include_legacy_image=True) assert found._db_id == found_manifest._db_id assert found.digest == found_manifest.digest assert found.legacy_image def test_lookup_unknown_manifest(pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found = pre_oci_model.lookup_manifest_by_digest(repository_ref, 'sha256:deadbeef') assert found is None @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_legacy_images(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) legacy_images = pre_oci_model.get_legacy_images(repository_ref) assert len(legacy_images) found_tags = set() for image in legacy_images: found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=True) with assert_query_count(5 if found_image.parents else 4): found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=True, include_blob=True) assert found_image.docker_image_id == image.docker_image_id assert found_image.parents == image.parents assert found_image.blob assert found_image.blob.placements # Check that the tags list can be retrieved. assert image.tags is not None found_tags.update({tag.name for tag in image.tags}) # Check against the actual DB row. model_image = model.image.get_image(repository_ref._db_id, found_image.docker_image_id) assert model_image.id == found_image._db_id assert ([pid for pid in reversed(model_image.ancestor_id_list())] == [p._db_id for p in found_image.parents]) # Try without parents and ensure it raises an exception. found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=False) with pytest.raises(Exception): assert not found_image.parents assert found_tags unknown = pre_oci_model.get_legacy_image(repository_ref, 'unknown', include_parents=True) assert unknown is None def test_manifest_labels(pre_oci_model): repo = model.repository.get_repository('devtable', 'simple') repository_ref = RepositoryReference.for_repo_obj(repo) found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest']) found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) # Create a new label. created = pre_oci_model.create_manifest_label(found_manifest, 'foo', 'bar', 'api') assert created.key == 'foo' assert created.value == 'bar' assert created.source_type_name == 'api' assert created.media_type_name == 'text/plain' # Ensure we can look it up. assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) == created # Ensure it is in our list of labels. assert created in pre_oci_model.list_manifest_labels(found_manifest) assert created in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='fo') # Ensure it is *not* in our filtered list. assert created not in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='ba') # Delete the label and ensure it is gone. assert pre_oci_model.delete_manifest_label(found_manifest, created.uuid) assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) is None assert created not in pre_oci_model.list_manifest_labels(found_manifest) @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_repository_tags(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) with assert_query_count(1): tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert len(tags) for tag in tags: with assert_query_count(2): found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True) assert found_tag == tag if found_tag.legacy_image is None: continue with assert_query_count(2): found_image = pre_oci_model.get_legacy_image(repository_ref, found_tag.legacy_image.docker_image_id) assert found_image == found_tag.legacy_image def test_repository_tag_history(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'history') with assert_query_count(2): history, has_more = pre_oci_model.list_repository_tag_history(repository_ref) assert not has_more assert len(history) == 2 @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_delete_tags(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert len(tags) # Save history before the deletions. previous_history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000) assert len(previous_history) >= len(tags) # Delete every tag in the repository. for tag in tags: assert pre_oci_model.delete_tag(repository_ref, tag.name) # Make sure the tag is no longer found. with assert_query_count(1): found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True) assert found_tag is None # Ensure all tags have been deleted. tags = pre_oci_model.list_repository_tags(repository_ref) assert not len(tags) # Ensure that the tags all live in history. history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000) assert len(history) == len(previous_history) @pytest.mark.parametrize('use_manifest', [ True, False, ]) def test_retarget_tag_history(use_manifest, pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'history') history, _ = pre_oci_model.list_repository_tag_history(repository_ref) if use_manifest: manifest_or_legacy_image = pre_oci_model.lookup_manifest_by_digest(repository_ref, history[1].manifest_digest, allow_dead=True) else: manifest_or_legacy_image = history[1].legacy_image # Retarget the tag. assert manifest_or_legacy_image updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', manifest_or_legacy_image, is_reversion=True) # Ensure the tag has changed targets. if use_manifest: assert updated_tag.manifest_digest == manifest_or_legacy_image.digest else: assert updated_tag.legacy_image == manifest_or_legacy_image # Ensure history has been updated. new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref) assert len(new_history) == len(history) + 1 def test_retarget_tag(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') history, _ = pre_oci_model.list_repository_tag_history(repository_ref) prod_tag = pre_oci_model.get_repo_tag(repository_ref, 'prod', include_legacy_image=True) # Retarget the tag. updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', prod_tag.legacy_image) # Ensure the tag has changed targets. assert updated_tag.legacy_image == prod_tag.legacy_image # Ensure history has been updated. new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref) assert len(new_history) == len(history) + 1 def test_change_repository_tag_expiration(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') assert tag.lifetime_end_ts is None new_datetime = datetime.utcnow() + timedelta(days=2) previous, okay = pre_oci_model.change_repository_tag_expiration(tag, new_datetime) assert okay assert previous is None tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') assert tag.lifetime_end_ts is not None @pytest.mark.parametrize('repo_namespace, repo_name, expected_non_empty', [ ('devtable', 'simple', []), ('devtable', 'complex', ['prod', 'v2.0']), ('devtable', 'history', ['latest']), ('buynlarge', 'orgrepo', []), ('devtable', 'gargantuan', ['v2.0', 'v3.0', 'v4.0', 'v5.0', 'v6.0']), ]) def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_empty, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert len(tags) non_empty = set() for tag in tags: if pre_oci_model.get_legacy_images_owned_by_tag(tag): non_empty.add(tag.name) assert non_empty == set(expected_non_empty) def test_get_security_status(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert len(tags) for tag in tags: assert pre_oci_model.get_security_status(tag.legacy_image) @pytest.fixture() def clear_rows(initialized_db): # Remove all new-style rows so we can backfill. TagManifestLabelMap.delete().execute() ManifestLabel.delete().execute() ManifestBlob.delete().execute() ManifestLegacyImage.delete().execute() TagManifestToManifest.delete().execute() Manifest.delete().execute() TagManifestLabel.delete().execute() TagManifest.delete().execute() @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_backfill_manifest_for_tag(repo_namespace, repo_name, clear_rows, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert tags for tag in tags: assert not tag.manifest_digest assert pre_oci_model.backfill_manifest_for_tag(tag) tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True) assert tags for tag in tags: assert tag.manifest_digest manifest = pre_oci_model.get_manifest_for_tag(tag) assert manifest legacy_image = pre_oci_model.get_legacy_image(repository_ref, tag.legacy_image.docker_image_id, include_parents=True) parsed_manifest = manifest.get_parsed_manifest() assert parsed_manifest.leaf_layer_v1_image_id == legacy_image.docker_image_id assert parsed_manifest.parent_image_ids == {p.docker_image_id for p in legacy_image.parents} @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_backfill_manifest_on_lookup(repo_namespace, repo_name, clear_rows, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert tags for tag in tags: assert not tag.manifest_digest assert not pre_oci_model.get_manifest_for_tag(tag) manifest = pre_oci_model.get_manifest_for_tag(tag, backfill_if_necessary=True) assert manifest updated_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name) assert updated_tag.manifest_digest == manifest.digest @pytest.mark.parametrize('namespace, expect_enabled', [ ('devtable', True), ('buynlarge', True), ('disabled', False), ]) def test_is_namespace_enabled(namespace, expect_enabled, pre_oci_model): assert pre_oci_model.is_namespace_enabled(namespace) == expect_enabled @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) def test_list_manifest_layers(repo_namespace, repo_name, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert tags for tag in tags: manifest = pre_oci_model.get_manifest_for_tag(tag) assert manifest with assert_query_count(4): layers = pre_oci_model.list_manifest_layers(manifest) assert layers layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True) assert layers parsed_layers = list(manifest.get_parsed_manifest().layers) assert len(layers) == len(parsed_layers) for index, manifest_layer in enumerate(layers): assert manifest_layer.layer_info == parsed_layers[index] assert manifest_layer.blob.digest == str(parsed_layers[index].digest) assert manifest_layer.blob.storage_path assert manifest_layer.blob.placements repo_blob = pre_oci_model.get_repo_blob_by_digest(repository_ref, manifest_layer.blob.digest) assert repo_blob.digest == manifest_layer.blob.digest assert manifest_layer.estimated_size(1) is not None def test_derived_image(pre_oci_model): # Clear all existing derived storage. DerivedStorageForImage.delete().execute() repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') manifest = pre_oci_model.get_manifest_for_tag(tag) # Ensure the squashed image doesn't exist. assert pre_oci_model.lookup_derived_image(manifest, 'squash', {}) is None # Create a new one. squashed = pre_oci_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {}) assert pre_oci_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {}) == squashed assert squashed.unique_id # Check and set the size. assert squashed.blob.compressed_size is None pre_oci_model.set_derived_image_size(squashed, 1234) assert pre_oci_model.lookup_derived_image(manifest, 'squash', {}).blob.compressed_size == 1234 assert pre_oci_model.lookup_derived_image(manifest, 'squash', {}).unique_id == squashed.unique_id # Ensure its returned now. assert pre_oci_model.lookup_derived_image(manifest, 'squash', {}) == squashed # Ensure different metadata results in a different derived image. assert pre_oci_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) is None squashed_foo = pre_oci_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {'foo': 'bar'}) assert squashed_foo != squashed assert pre_oci_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) == squashed_foo assert squashed.unique_id != squashed_foo.unique_id # Lookup with placements. squashed = pre_oci_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {}, include_placements=True) assert squashed.blob.placements # Delete the derived image. pre_oci_model.delete_derived_image(squashed) assert pre_oci_model.lookup_derived_image(manifest, 'squash', {}) is None def test_derived_image_signatures(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') manifest = pre_oci_model.get_manifest_for_tag(tag) derived = pre_oci_model.lookup_derived_image(manifest, 'squash', {}) assert derived signature = pre_oci_model.get_derived_image_signature(derived, 'gpg2') assert signature is None pre_oci_model.set_derived_image_signature(derived, 'gpg2', 'foo') assert pre_oci_model.get_derived_image_signature(derived, 'gpg2') == 'foo' def test_torrent_info(pre_oci_model): # Remove all existing info. TorrentInfo.delete().execute() repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') manifest = pre_oci_model.get_manifest_for_tag(tag) layers = pre_oci_model.list_manifest_layers(manifest) assert layers assert pre_oci_model.get_torrent_info(layers[0].blob) is None pre_oci_model.set_torrent_info(layers[0].blob, 2, 'foo') torrent_info = pre_oci_model.get_torrent_info(layers[0].blob) assert torrent_info is not None assert torrent_info.piece_length == 2 assert torrent_info.pieces == 'foo' # Try setting it again. Nothing should happen. pre_oci_model.set_torrent_info(layers[0].blob, 3, 'bar') torrent_info = pre_oci_model.get_torrent_info(layers[0].blob) assert torrent_info is not None assert torrent_info.piece_length == 2 assert torrent_info.pieces == 'foo' def test_blob_uploads(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') blob_upload = pre_oci_model.create_blob_upload(repository_ref, 'local_us', {'some': 'metadata'}) assert blob_upload assert blob_upload.storage_metadata == {'some': 'metadata'} assert blob_upload.location_name == 'local_us' # Ensure we can find the blob upload. assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) == blob_upload # Update and ensure the changes are saved. assert pre_oci_model.update_blob_upload(blob_upload, 1, 'the-pieces_hash', blob_upload.piece_sha_state, {'new': 'metadata'}, 2, 3, blob_upload.sha_state) updated = pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) assert updated assert updated.uncompressed_byte_count == 1 assert updated.piece_hashes == 'the-pieces_hash' assert updated.storage_metadata == {'new': 'metadata'} assert updated.byte_count == 2 assert updated.chunk_count == 3 # Delete the upload. pre_oci_model.delete_blob_upload(blob_upload) # Ensure it can no longer be found. assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id)