96072a44c0
Before this change, we were neglecting to sort the tags by ID, which meant that pagination was broken
972 lines
37 KiB
Python
972 lines
37 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import hashlib
|
|
import json
|
|
import uuid
|
|
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
|
|
import pytest
|
|
|
|
from mock import patch
|
|
from playhouse.test_utils import assert_query_count
|
|
|
|
from app import docker_v2_signing_key, storage
|
|
from data import model
|
|
from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
|
|
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
|
|
TagManifestLabel, TagManifest, TagManifestLabel, DerivedStorageForImage,
|
|
TorrentInfo, Tag, TagToRepositoryTag, close_db_filter,
|
|
ImageStorageLocation)
|
|
from data.cache.impl import InMemoryDataModelCache
|
|
from data.registry_model.registry_pre_oci_model import PreOCIModel
|
|
from data.registry_model.registry_oci_model import OCIModel
|
|
from data.registry_model.datatypes import RepositoryReference
|
|
from data.registry_model.blobuploader import upload_blob, BlobUploadSettings
|
|
from data.registry_model.modelsplitter import SplitModel
|
|
from data.model.blob import store_blob_record_and_temp_link
|
|
from image.docker.types import ManifestImageLayer
|
|
from image.docker.schema1 import (DockerSchema1ManifestBuilder, DOCKER_SCHEMA1_CONTENT_TYPES,
|
|
DockerSchema1Manifest)
|
|
from image.docker.schema2.manifest import DockerSchema2ManifestBuilder
|
|
from util.bytes import Bytes
|
|
|
|
from test.fixtures import *
|
|
|
|
|
|
@pytest.fixture(params=[PreOCIModel(), OCIModel(),
|
|
SplitModel(0, {'devtable'}, {'buynlarge'})])
|
|
def registry_model(request, initialized_db):
|
|
return request.param
|
|
|
|
@pytest.fixture()
|
|
def pre_oci_model(initialized_db):
|
|
return PreOCIModel()
|
|
|
|
@pytest.fixture()
|
|
def oci_model(initialized_db):
|
|
return OCIModel()
|
|
|
|
|
|
@pytest.mark.parametrize('names, expected', [
|
|
(['unknown'], None),
|
|
(['latest'], {'latest'}),
|
|
(['latest', 'prod'], {'latest', 'prod'}),
|
|
(['latest', 'prod', 'another'], {'latest', 'prod'}),
|
|
(['foo', 'prod'], {'prod'}),
|
|
])
|
|
def test_find_matching_tag(names, expected, registry_model):
|
|
repo = model.repository.get_repository('devtable', 'simple')
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found = registry_model.find_matching_tag(repository_ref, names)
|
|
if expected is None:
|
|
assert found is None
|
|
else:
|
|
assert found.name in expected
|
|
assert found.repository.namespace_name == 'devtable'
|
|
assert found.repository.name == 'simple'
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name, expected', [
|
|
('devtable', 'simple', {'latest', 'prod'}),
|
|
('buynlarge', 'orgrepo', {'latest', 'prod'}),
|
|
])
|
|
def test_get_most_recent_tag(repo_namespace, repo_name, expected, registry_model):
|
|
repo = model.repository.get_repository(repo_namespace, repo_name)
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found = registry_model.get_most_recent_tag(repository_ref)
|
|
if expected is None:
|
|
assert found is None
|
|
else:
|
|
assert found.name in expected
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name, expected', [
|
|
('devtable', 'simple', True),
|
|
('buynlarge', 'orgrepo', True),
|
|
('buynlarge', 'unknownrepo', False),
|
|
])
|
|
def test_lookup_repository(repo_namespace, repo_name, expected, registry_model):
|
|
repo_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
if expected:
|
|
assert repo_ref
|
|
else:
|
|
assert repo_ref is None
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_lookup_manifests(repo_namespace, repo_name, registry_model):
|
|
repo = model.repository.get_repository(repo_namespace, repo_name)
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found_tag = registry_model.find_matching_tag(repository_ref, ['latest'])
|
|
found_manifest = registry_model.get_manifest_for_tag(found_tag)
|
|
found = registry_model.lookup_manifest_by_digest(repository_ref, found_manifest.digest,
|
|
include_legacy_image=True)
|
|
assert found._db_id == found_manifest._db_id
|
|
assert found.digest == found_manifest.digest
|
|
assert found.legacy_image
|
|
assert found.legacy_image.parents
|
|
|
|
schema1_parsed = registry_model.get_schema1_parsed_manifest(found, 'foo', 'bar', 'baz', storage)
|
|
assert schema1_parsed is not None
|
|
|
|
|
|
def test_lookup_unknown_manifest(registry_model):
|
|
repo = model.repository.get_repository('devtable', 'simple')
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found = registry_model.lookup_manifest_by_digest(repository_ref, 'sha256:deadbeef')
|
|
assert found is None
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_legacy_images(repo_namespace, repo_name, registry_model):
|
|
repository_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
legacy_images = registry_model.get_legacy_images(repository_ref)
|
|
assert len(legacy_images)
|
|
|
|
found_tags = set()
|
|
for image in legacy_images:
|
|
found_image = registry_model.get_legacy_image(repository_ref, image.docker_image_id,
|
|
include_parents=True)
|
|
|
|
with assert_query_count(5 if found_image.parents else 4):
|
|
found_image = registry_model.get_legacy_image(repository_ref, image.docker_image_id,
|
|
include_parents=True, include_blob=True)
|
|
assert found_image.docker_image_id == image.docker_image_id
|
|
assert found_image.parents == image.parents
|
|
assert found_image.blob
|
|
assert found_image.blob.placements
|
|
|
|
# Check that the tags list can be retrieved.
|
|
assert image.tags is not None
|
|
found_tags.update({tag.name for tag in image.tags})
|
|
|
|
# Check against the actual DB row.
|
|
model_image = model.image.get_image(repository_ref._db_id, found_image.docker_image_id)
|
|
assert model_image.id == found_image._db_id
|
|
assert ([pid for pid in reversed(model_image.ancestor_id_list())] ==
|
|
[p._db_id for p in found_image.parents])
|
|
|
|
# Try without parents and ensure it raises an exception.
|
|
found_image = registry_model.get_legacy_image(repository_ref, image.docker_image_id,
|
|
include_parents=False)
|
|
with pytest.raises(Exception):
|
|
assert not found_image.parents
|
|
|
|
assert found_tags
|
|
|
|
unknown = registry_model.get_legacy_image(repository_ref, 'unknown', include_parents=True)
|
|
assert unknown is None
|
|
|
|
|
|
def test_manifest_labels(registry_model):
|
|
repo = model.repository.get_repository('devtable', 'simple')
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found_tag = registry_model.find_matching_tag(repository_ref, ['latest'])
|
|
found_manifest = registry_model.get_manifest_for_tag(found_tag)
|
|
|
|
# Create a new label.
|
|
created = registry_model.create_manifest_label(found_manifest, 'foo', 'bar', 'api')
|
|
assert created.key == 'foo'
|
|
assert created.value == 'bar'
|
|
assert created.source_type_name == 'api'
|
|
assert created.media_type_name == 'text/plain'
|
|
|
|
# Ensure we can look it up.
|
|
assert registry_model.get_manifest_label(found_manifest, created.uuid) == created
|
|
|
|
# Ensure it is in our list of labels.
|
|
assert created in registry_model.list_manifest_labels(found_manifest)
|
|
assert created in registry_model.list_manifest_labels(found_manifest, key_prefix='fo')
|
|
|
|
# Ensure it is *not* in our filtered list.
|
|
assert created not in registry_model.list_manifest_labels(found_manifest, key_prefix='ba')
|
|
|
|
# Delete the label and ensure it is gone.
|
|
assert registry_model.delete_manifest_label(found_manifest, created.uuid)
|
|
assert registry_model.get_manifest_label(found_manifest, created.uuid) is None
|
|
assert created not in registry_model.list_manifest_labels(found_manifest)
|
|
|
|
|
|
def test_manifest_label_handlers(registry_model):
|
|
repo = model.repository.get_repository('devtable', 'simple')
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found_tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
found_manifest = registry_model.get_manifest_for_tag(found_tag)
|
|
|
|
# Ensure the tag has no expiration.
|
|
assert found_tag.lifetime_end_ts is None
|
|
|
|
# Create a new label with an expires-after.
|
|
registry_model.create_manifest_label(found_manifest, 'quay.expires-after', '2h', 'api')
|
|
|
|
# Ensure the tag now has an expiration.
|
|
updated_tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
assert updated_tag.lifetime_end_ts == (updated_tag.lifetime_start_ts + (60 * 60 * 2))
|
|
|
|
|
|
def test_batch_labels(registry_model):
|
|
repo = model.repository.get_repository('devtable', 'history')
|
|
repository_ref = RepositoryReference.for_repo_obj(repo)
|
|
found_tag = registry_model.find_matching_tag(repository_ref, ['latest'])
|
|
found_manifest = registry_model.get_manifest_for_tag(found_tag)
|
|
|
|
with registry_model.batch_create_manifest_labels(found_manifest) as add_label:
|
|
add_label('foo', '1', 'api')
|
|
add_label('bar', '2', 'api')
|
|
add_label('baz', '3', 'api')
|
|
|
|
# Ensure we can look them up.
|
|
assert len(registry_model.list_manifest_labels(found_manifest)) == 3
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_repository_tags(repo_namespace, repo_name, registry_model):
|
|
repository_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
tags = registry_model.list_repository_tags(repository_ref, include_legacy_images=True)
|
|
assert len(tags)
|
|
|
|
tags_map = registry_model.get_legacy_tags_map(repository_ref, storage)
|
|
|
|
for tag in tags:
|
|
found_tag = registry_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
|
|
assert found_tag == tag
|
|
|
|
if found_tag.legacy_image is None:
|
|
continue
|
|
|
|
found_image = registry_model.get_legacy_image(repository_ref,
|
|
found_tag.legacy_image.docker_image_id)
|
|
assert found_image == found_tag.legacy_image
|
|
assert tag.name in tags_map
|
|
assert tags_map[tag.name] == found_image.docker_image_id
|
|
|
|
|
|
@pytest.mark.parametrize('namespace, name, expected_tag_count, has_expired', [
|
|
('devtable', 'simple', 2, False),
|
|
('devtable', 'history', 2, True),
|
|
('devtable', 'gargantuan', 8, False),
|
|
('public', 'publicrepo', 1, False),
|
|
])
|
|
def test_repository_tag_history(namespace, name, expected_tag_count, has_expired, registry_model):
|
|
# Pre-cache media type loads to ensure consistent query count.
|
|
Manifest.media_type.get_name(1)
|
|
|
|
repository_ref = registry_model.lookup_repository(namespace, name)
|
|
|
|
with assert_query_count(4 if isinstance(registry_model, SplitModel) else 2):
|
|
history, has_more = registry_model.list_repository_tag_history(repository_ref)
|
|
assert not has_more
|
|
assert len(history) == expected_tag_count
|
|
|
|
for tag in history:
|
|
# Retrieve the manifest to ensure it doesn't issue extra queries.
|
|
tag.manifest
|
|
|
|
if has_expired:
|
|
# Ensure the latest tag is marked expired, since there is an expired one.
|
|
with assert_query_count(1):
|
|
assert registry_model.has_expired_tag(repository_ref, 'latest')
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
@pytest.mark.parametrize('via_manifest', [
|
|
False,
|
|
True,
|
|
])
|
|
def test_delete_tags(repo_namespace, repo_name, via_manifest, registry_model):
|
|
repository_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
tags = registry_model.list_repository_tags(repository_ref)
|
|
assert len(tags)
|
|
|
|
# Save history before the deletions.
|
|
previous_history, _ = registry_model.list_repository_tag_history(repository_ref, size=1000)
|
|
assert len(previous_history) >= len(tags)
|
|
|
|
# Delete every tag in the repository.
|
|
for tag in tags:
|
|
if via_manifest:
|
|
assert registry_model.delete_tag(repository_ref, tag.name)
|
|
else:
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
if manifest is not None:
|
|
assert registry_model.delete_tags_for_manifest(manifest)
|
|
|
|
# Make sure the tag is no longer found.
|
|
# TODO(jschorr): Uncomment once we're done with the SplitModel.
|
|
#with assert_query_count(1):
|
|
found_tag = registry_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
|
|
assert found_tag is None
|
|
|
|
# Ensure all tags have been deleted.
|
|
tags = registry_model.list_repository_tags(repository_ref)
|
|
assert not len(tags)
|
|
|
|
# Ensure that the tags all live in history.
|
|
history, _ = registry_model.list_repository_tag_history(repository_ref, size=1000)
|
|
assert len(history) == len(previous_history)
|
|
|
|
|
|
@pytest.mark.parametrize('use_manifest', [
|
|
True,
|
|
False,
|
|
])
|
|
def test_retarget_tag_history(use_manifest, registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'history')
|
|
history, _ = registry_model.list_repository_tag_history(repository_ref)
|
|
|
|
if use_manifest:
|
|
manifest_or_legacy_image = registry_model.lookup_manifest_by_digest(repository_ref,
|
|
history[0].manifest_digest,
|
|
allow_dead=True)
|
|
else:
|
|
manifest_or_legacy_image = history[0].legacy_image
|
|
|
|
# Retarget the tag.
|
|
assert manifest_or_legacy_image
|
|
updated_tag = registry_model.retarget_tag(repository_ref, 'latest', manifest_or_legacy_image,
|
|
storage, is_reversion=True)
|
|
|
|
# Ensure the tag has changed targets.
|
|
if use_manifest:
|
|
assert updated_tag.manifest_digest == manifest_or_legacy_image.digest
|
|
else:
|
|
assert updated_tag.legacy_image == manifest_or_legacy_image
|
|
|
|
# Ensure history has been updated.
|
|
new_history, _ = registry_model.list_repository_tag_history(repository_ref)
|
|
assert len(new_history) == len(history) + 1
|
|
|
|
|
|
def test_retarget_tag_schema1(oci_model):
|
|
repository_ref = oci_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = oci_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = oci_model.get_manifest_for_tag(latest_tag)
|
|
|
|
existing_parsed = manifest.get_parsed_manifest()
|
|
|
|
# Retarget a new tag to the manifest.
|
|
updated_tag = oci_model.retarget_tag(repository_ref, 'somenewtag', manifest, storage)
|
|
assert updated_tag
|
|
assert updated_tag.name == 'somenewtag'
|
|
|
|
updated_manifest = oci_model.get_manifest_for_tag(updated_tag)
|
|
parsed = updated_manifest.get_parsed_manifest()
|
|
assert parsed.namespace == 'devtable'
|
|
assert parsed.repo_name == 'simple'
|
|
assert parsed.tag == 'somenewtag'
|
|
|
|
assert parsed.layers == existing_parsed.layers
|
|
|
|
# Ensure the tag has changed targets.
|
|
assert oci_model.get_repo_tag(repository_ref, 'somenewtag') == updated_tag
|
|
|
|
|
|
def test_change_repository_tag_expiration(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
assert tag.lifetime_end_ts is None
|
|
|
|
new_datetime = datetime.utcnow() + timedelta(days=2)
|
|
previous, okay = registry_model.change_repository_tag_expiration(tag, new_datetime)
|
|
|
|
assert okay
|
|
assert previous is None
|
|
|
|
tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
assert tag.lifetime_end_ts is not None
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name, expected_non_empty', [
|
|
('devtable', 'simple', []),
|
|
('devtable', 'complex', ['prod', 'v2.0']),
|
|
('devtable', 'history', ['latest']),
|
|
('buynlarge', 'orgrepo', []),
|
|
('devtable', 'gargantuan', ['v2.0', 'v3.0', 'v4.0', 'v5.0', 'v6.0']),
|
|
])
|
|
def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_empty,
|
|
registry_model):
|
|
repository_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
tags = registry_model.list_repository_tags(repository_ref)
|
|
assert len(tags)
|
|
|
|
non_empty = set()
|
|
for tag in tags:
|
|
if registry_model.get_legacy_images_owned_by_tag(tag):
|
|
non_empty.add(tag.name)
|
|
|
|
assert non_empty == set(expected_non_empty)
|
|
|
|
|
|
def test_get_security_status(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
tags = registry_model.list_repository_tags(repository_ref, include_legacy_images=True)
|
|
assert len(tags)
|
|
|
|
for tag in tags:
|
|
assert registry_model.get_security_status(tag.legacy_image)
|
|
|
|
|
|
@pytest.fixture()
|
|
def clear_rows(initialized_db):
|
|
# Remove all new-style rows so we can backfill.
|
|
TagToRepositoryTag.delete().execute()
|
|
Tag.delete().execute()
|
|
TagManifestLabelMap.delete().execute()
|
|
ManifestLabel.delete().execute()
|
|
ManifestBlob.delete().execute()
|
|
ManifestLegacyImage.delete().execute()
|
|
TagManifestToManifest.delete().execute()
|
|
Manifest.delete().execute()
|
|
TagManifestLabel.delete().execute()
|
|
TagManifest.delete().execute()
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_backfill_manifest_for_tag(repo_namespace, repo_name, clear_rows, pre_oci_model):
|
|
repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
|
|
tags, has_more = pre_oci_model.list_repository_tag_history(repository_ref, size=2500)
|
|
assert tags
|
|
assert not has_more
|
|
|
|
for tag in tags:
|
|
assert not tag.manifest_digest
|
|
assert pre_oci_model.backfill_manifest_for_tag(tag)
|
|
|
|
tags, _ = pre_oci_model.list_repository_tag_history(repository_ref)
|
|
assert tags
|
|
for tag in tags:
|
|
assert tag.manifest_digest
|
|
|
|
manifest = pre_oci_model.get_manifest_for_tag(tag)
|
|
assert manifest
|
|
|
|
legacy_image = pre_oci_model.get_legacy_image(repository_ref, tag.legacy_image.docker_image_id,
|
|
include_parents=True)
|
|
|
|
parsed_manifest = manifest.get_parsed_manifest()
|
|
assert parsed_manifest.leaf_layer_v1_image_id == legacy_image.docker_image_id
|
|
assert parsed_manifest.parent_image_ids == {p.docker_image_id for p in legacy_image.parents}
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_backfill_manifest_on_lookup(repo_namespace, repo_name, clear_rows, pre_oci_model):
|
|
repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
|
|
tags = pre_oci_model.list_repository_tags(repository_ref)
|
|
assert tags
|
|
|
|
for tag in tags:
|
|
assert not tag.manifest_digest
|
|
assert not pre_oci_model.get_manifest_for_tag(tag)
|
|
|
|
manifest = pre_oci_model.get_manifest_for_tag(tag, backfill_if_necessary=True)
|
|
assert manifest
|
|
|
|
updated_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name)
|
|
assert updated_tag.manifest_digest == manifest.digest
|
|
|
|
|
|
@pytest.mark.parametrize('namespace, expect_enabled', [
|
|
('devtable', True),
|
|
('buynlarge', True),
|
|
|
|
('disabled', False),
|
|
])
|
|
def test_is_namespace_enabled(namespace, expect_enabled, registry_model):
|
|
assert registry_model.is_namespace_enabled(namespace) == expect_enabled
|
|
|
|
|
|
@pytest.mark.parametrize('repo_namespace, repo_name', [
|
|
('devtable', 'simple'),
|
|
('devtable', 'complex'),
|
|
('devtable', 'history'),
|
|
('buynlarge', 'orgrepo'),
|
|
])
|
|
def test_layers_and_blobs(repo_namespace, repo_name, registry_model):
|
|
repository_ref = registry_model.lookup_repository(repo_namespace, repo_name)
|
|
tags = registry_model.list_repository_tags(repository_ref)
|
|
assert tags
|
|
|
|
for tag in tags:
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
assert manifest
|
|
|
|
parsed = manifest.get_parsed_manifest()
|
|
assert parsed
|
|
|
|
layers = registry_model.list_parsed_manifest_layers(repository_ref, parsed, storage)
|
|
assert layers
|
|
|
|
layers = registry_model.list_parsed_manifest_layers(repository_ref, parsed, storage,
|
|
include_placements=True)
|
|
assert layers
|
|
|
|
for index, manifest_layer in enumerate(layers):
|
|
assert manifest_layer.blob.storage_path
|
|
assert manifest_layer.blob.placements
|
|
|
|
repo_blob = registry_model.get_repo_blob_by_digest(repository_ref, manifest_layer.blob.digest)
|
|
assert repo_blob.digest == manifest_layer.blob.digest
|
|
|
|
assert manifest_layer.estimated_size(1) is not None
|
|
assert isinstance(manifest_layer.layer_info, ManifestImageLayer)
|
|
|
|
blobs = registry_model.get_manifest_local_blobs(manifest, include_placements=True)
|
|
assert {b.digest for b in blobs} == set(parsed.local_blob_digests)
|
|
|
|
|
|
def test_manifest_remote_layers(oci_model):
|
|
# Create a config blob for testing.
|
|
config_json = json.dumps({
|
|
'config': {},
|
|
"rootfs": {
|
|
"type": "layers",
|
|
"diff_ids": []
|
|
},
|
|
"history": [
|
|
{
|
|
"created": "2018-04-03T18:37:09.284840891Z",
|
|
"created_by": "do something",
|
|
},
|
|
],
|
|
})
|
|
|
|
app_config = {'TESTING': True}
|
|
repository_ref = oci_model.lookup_repository('devtable', 'simple')
|
|
with upload_blob(repository_ref, storage, BlobUploadSettings(500, 500, 500)) as upload:
|
|
upload.upload_chunk(app_config, BytesIO(config_json))
|
|
blob = upload.commit_to_blob(app_config)
|
|
|
|
# Create the manifest in the repo.
|
|
builder = DockerSchema2ManifestBuilder()
|
|
builder.set_config_digest(blob.digest, blob.compressed_size)
|
|
builder.add_layer('sha256:abcd', 1234, urls=['http://hello/world'])
|
|
manifest = builder.build()
|
|
|
|
created_manifest, _ = oci_model.create_manifest_and_retarget_tag(repository_ref, manifest,
|
|
'sometag', storage)
|
|
assert created_manifest
|
|
|
|
layers = oci_model.list_parsed_manifest_layers(repository_ref,
|
|
created_manifest.get_parsed_manifest(),
|
|
storage)
|
|
assert len(layers) == 1
|
|
assert layers[0].layer_info.is_remote
|
|
assert layers[0].layer_info.urls == ['http://hello/world']
|
|
assert layers[0].blob is None
|
|
|
|
|
|
def test_derived_image(registry_model):
|
|
# Clear all existing derived storage.
|
|
DerivedStorageForImage.delete().execute()
|
|
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
|
|
# Ensure the squashed image doesn't exist.
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {}) is None
|
|
|
|
# Create a new one.
|
|
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash',
|
|
'local_us', storage, {})
|
|
assert registry_model.lookup_or_create_derived_image(manifest, 'squash',
|
|
'local_us', storage, {}) == squashed
|
|
assert squashed.unique_id
|
|
|
|
# Check and set the size.
|
|
assert squashed.blob.compressed_size is None
|
|
registry_model.set_derived_image_size(squashed, 1234)
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {}).blob.compressed_size == 1234
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {}).unique_id == squashed.unique_id
|
|
|
|
# Ensure its returned now.
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {}) == squashed
|
|
|
|
# Ensure different metadata results in a different derived image.
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) is None
|
|
|
|
squashed_foo = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us',
|
|
storage, {'foo': 'bar'})
|
|
assert squashed_foo != squashed
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) == squashed_foo
|
|
|
|
assert squashed.unique_id != squashed_foo.unique_id
|
|
|
|
# Lookup with placements.
|
|
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us',
|
|
storage, {}, include_placements=True)
|
|
assert squashed.blob.placements
|
|
|
|
# Delete the derived image.
|
|
registry_model.delete_derived_image(squashed)
|
|
assert registry_model.lookup_derived_image(manifest, 'squash', {}) is None
|
|
|
|
|
|
def test_derived_image_signatures(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
|
|
derived = registry_model.lookup_derived_image(manifest, 'squash', {})
|
|
assert derived
|
|
|
|
signature = registry_model.get_derived_image_signature(derived, 'gpg2')
|
|
assert signature is None
|
|
|
|
registry_model.set_derived_image_signature(derived, 'gpg2', 'foo')
|
|
assert registry_model.get_derived_image_signature(derived, 'gpg2') == 'foo'
|
|
|
|
|
|
def test_torrent_info(registry_model):
|
|
# Remove all existing info.
|
|
TorrentInfo.delete().execute()
|
|
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = registry_model.get_manifest_for_tag(tag)
|
|
|
|
blobs = registry_model.get_manifest_local_blobs(manifest)
|
|
assert blobs
|
|
|
|
assert registry_model.get_torrent_info(blobs[0]) is None
|
|
registry_model.set_torrent_info(blobs[0], 2, 'foo')
|
|
|
|
# Set it again exactly, which should be a no-op.
|
|
registry_model.set_torrent_info(blobs[0], 2, 'foo')
|
|
|
|
# Check the information we've set.
|
|
torrent_info = registry_model.get_torrent_info(blobs[0])
|
|
assert torrent_info is not None
|
|
assert torrent_info.piece_length == 2
|
|
assert torrent_info.pieces == 'foo'
|
|
|
|
# Try setting it again. Nothing should happen.
|
|
registry_model.set_torrent_info(blobs[0], 3, 'bar')
|
|
|
|
torrent_info = registry_model.get_torrent_info(blobs[0])
|
|
assert torrent_info is not None
|
|
assert torrent_info.piece_length == 2
|
|
assert torrent_info.pieces == 'foo'
|
|
|
|
|
|
def test_blob_uploads(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
|
|
blob_upload = registry_model.create_blob_upload(repository_ref, str(uuid.uuid4()),
|
|
'local_us', {'some': 'metadata'})
|
|
assert blob_upload
|
|
assert blob_upload.storage_metadata == {'some': 'metadata'}
|
|
assert blob_upload.location_name == 'local_us'
|
|
|
|
# Ensure we can find the blob upload.
|
|
assert registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) == blob_upload
|
|
|
|
# Update and ensure the changes are saved.
|
|
assert registry_model.update_blob_upload(blob_upload, 1, 'the-pieces_hash',
|
|
blob_upload.piece_sha_state,
|
|
{'new': 'metadata'}, 2, 3,
|
|
blob_upload.sha_state)
|
|
|
|
updated = registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id)
|
|
assert updated
|
|
assert updated.uncompressed_byte_count == 1
|
|
assert updated.piece_hashes == 'the-pieces_hash'
|
|
assert updated.storage_metadata == {'new': 'metadata'}
|
|
assert updated.byte_count == 2
|
|
assert updated.chunk_count == 3
|
|
|
|
# Delete the upload.
|
|
registry_model.delete_blob_upload(blob_upload)
|
|
|
|
# Ensure it can no longer be found.
|
|
assert not registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id)
|
|
|
|
|
|
def test_commit_blob_upload(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
blob_upload = registry_model.create_blob_upload(repository_ref, str(uuid.uuid4()),
|
|
'local_us', {'some': 'metadata'})
|
|
|
|
# Commit the blob upload and make sure it is written as a blob.
|
|
digest = 'sha256:' + hashlib.sha256('hello').hexdigest()
|
|
blob = registry_model.commit_blob_upload(blob_upload, digest, 60)
|
|
assert blob.digest == digest
|
|
|
|
# Ensure the upload can no longer be found.
|
|
assert not registry_model.lookup_blob_upload(repository_ref, blob_upload.upload_id)
|
|
|
|
|
|
# TODO(jschorr): Re-enable for OCI model once we have a new table for temporary blobs.
|
|
def test_mount_blob_into_repository(pre_oci_model):
|
|
repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = pre_oci_model.get_manifest_for_tag(latest_tag)
|
|
|
|
target_repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
|
|
|
|
blobs = pre_oci_model.get_manifest_local_blobs(manifest, include_placements=True)
|
|
assert blobs
|
|
|
|
for blob in blobs:
|
|
# Ensure the blob doesn't exist under the repository.
|
|
assert not pre_oci_model.get_repo_blob_by_digest(target_repository_ref, blob.digest)
|
|
|
|
# Mount the blob into the repository.
|
|
assert pre_oci_model.mount_blob_into_repository(blob, target_repository_ref, 60)
|
|
|
|
# Ensure it now exists.
|
|
found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, blob.digest)
|
|
assert found == blob
|
|
|
|
|
|
class SomeException(Exception):
|
|
pass
|
|
|
|
|
|
def test_get_cached_repo_blob(registry_model):
|
|
model_cache = InMemoryDataModelCache()
|
|
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = registry_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = registry_model.get_manifest_for_tag(latest_tag)
|
|
|
|
blobs = registry_model.get_manifest_local_blobs(manifest, include_placements=True)
|
|
assert blobs
|
|
|
|
blob = blobs[0]
|
|
|
|
# Load a blob to add it to the cache.
|
|
found = registry_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest)
|
|
assert found.digest == blob.digest
|
|
assert found.uuid == blob.uuid
|
|
assert found.compressed_size == blob.compressed_size
|
|
assert found.uncompressed_size == blob.uncompressed_size
|
|
assert found.uploading == blob.uploading
|
|
assert found.placements == blob.placements
|
|
|
|
# Disconnect from the database by overwriting the connection.
|
|
def fail(x, y):
|
|
raise SomeException('Not connected!')
|
|
|
|
with patch('data.registry_model.registry_pre_oci_model.model.blob.get_repository_blob_by_digest',
|
|
fail):
|
|
with patch('data.registry_model.registry_oci_model.model.oci.blob.get_repository_blob_by_digest',
|
|
fail):
|
|
# Make sure we can load again, which should hit the cache.
|
|
cached = registry_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest)
|
|
assert cached.digest == blob.digest
|
|
assert cached.uuid == blob.uuid
|
|
assert cached.compressed_size == blob.compressed_size
|
|
assert cached.uncompressed_size == blob.uncompressed_size
|
|
assert cached.uploading == blob.uploading
|
|
assert cached.placements == blob.placements
|
|
|
|
# Try another blob, which should fail since the DB is not connected and the cache
|
|
# does not contain the blob.
|
|
with pytest.raises(SomeException):
|
|
registry_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', 'some other digest')
|
|
|
|
|
|
def test_create_manifest_and_retarget_tag(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = registry_model.get_repo_tag(repository_ref, 'latest', include_legacy_image=True)
|
|
manifest = registry_model.get_manifest_for_tag(latest_tag).get_parsed_manifest()
|
|
|
|
builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'anothertag')
|
|
builder.add_layer(manifest.blob_digests[0],
|
|
'{"id": "%s"}' % latest_tag.legacy_image.docker_image_id)
|
|
sample_manifest = builder.build(docker_v2_signing_key)
|
|
assert sample_manifest is not None
|
|
|
|
another_manifest, tag = registry_model.create_manifest_and_retarget_tag(repository_ref,
|
|
sample_manifest,
|
|
'anothertag',
|
|
storage)
|
|
assert another_manifest is not None
|
|
assert tag is not None
|
|
|
|
assert tag.name == 'anothertag'
|
|
assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict
|
|
|
|
|
|
def test_get_schema1_parsed_manifest(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = registry_model.get_repo_tag(repository_ref, 'latest', include_legacy_image=True)
|
|
manifest = registry_model.get_manifest_for_tag(latest_tag)
|
|
assert registry_model.get_schema1_parsed_manifest(manifest, '', '', '', storage)
|
|
|
|
|
|
def test_convert_manifest(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = registry_model.get_repo_tag(repository_ref, 'latest', include_legacy_image=True)
|
|
manifest = registry_model.get_manifest_for_tag(latest_tag)
|
|
|
|
mediatypes = DOCKER_SCHEMA1_CONTENT_TYPES
|
|
assert registry_model.convert_manifest(manifest, '', '', '', mediatypes, storage)
|
|
|
|
mediatypes = []
|
|
assert registry_model.convert_manifest(manifest, '', '', '', mediatypes, storage) is None
|
|
|
|
|
|
def test_create_manifest_and_retarget_tag_with_labels(registry_model):
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = registry_model.get_repo_tag(repository_ref, 'latest', include_legacy_image=True)
|
|
manifest = registry_model.get_manifest_for_tag(latest_tag).get_parsed_manifest()
|
|
|
|
json_metadata = {
|
|
'id': latest_tag.legacy_image.docker_image_id,
|
|
'config': {
|
|
'Labels': {
|
|
'quay.expires-after': '2w',
|
|
},
|
|
},
|
|
}
|
|
|
|
builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'anothertag')
|
|
builder.add_layer(manifest.blob_digests[0], json.dumps(json_metadata))
|
|
sample_manifest = builder.build(docker_v2_signing_key)
|
|
assert sample_manifest is not None
|
|
|
|
another_manifest, tag = registry_model.create_manifest_and_retarget_tag(repository_ref,
|
|
sample_manifest,
|
|
'anothertag',
|
|
storage)
|
|
assert another_manifest is not None
|
|
assert tag is not None
|
|
|
|
assert tag.name == 'anothertag'
|
|
assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict
|
|
|
|
# Ensure the labels were applied.
|
|
assert tag.lifetime_end_ms is not None
|
|
|
|
|
|
|
|
def _populate_blob(digest):
|
|
location = ImageStorageLocation.get(name='local_us')
|
|
store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 120)
|
|
|
|
|
|
def test_known_issue_schema1(registry_model):
|
|
test_dir = os.path.dirname(os.path.abspath(__file__))
|
|
path = os.path.join(test_dir, '../../../image/docker/test/validate_manifest_known_issue.json')
|
|
with open(path, 'r') as f:
|
|
manifest_bytes = f.read()
|
|
|
|
manifest = DockerSchema1Manifest(Bytes.for_string_or_unicode(manifest_bytes))
|
|
|
|
for blob_digest in manifest.local_blob_digests:
|
|
_populate_blob(blob_digest)
|
|
|
|
digest = manifest.digest
|
|
assert digest == 'sha256:44518f5a4d1cb5b7a6347763116fb6e10f6a8563b6c40bb389a0a982f0a9f47a'
|
|
|
|
# Create the manifest in the database.
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
created_manifest, _ = registry_model.create_manifest_and_retarget_tag(repository_ref, manifest,
|
|
'latest', storage)
|
|
assert created_manifest
|
|
assert created_manifest.digest == manifest.digest
|
|
assert (created_manifest.internal_manifest_bytes.as_encoded_str() ==
|
|
manifest.bytes.as_encoded_str())
|
|
|
|
# Look it up again and validate.
|
|
found = registry_model.lookup_manifest_by_digest(repository_ref, manifest.digest, allow_dead=True)
|
|
assert found
|
|
assert found.digest == digest
|
|
assert found.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str()
|
|
assert found.get_parsed_manifest().digest == digest
|
|
|
|
|
|
def test_unicode_emoji(registry_model):
|
|
builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'latest')
|
|
builder.add_layer('sha256:abcde', json.dumps({
|
|
'id': 'someid',
|
|
'author': u'😱',
|
|
}, ensure_ascii=False))
|
|
|
|
manifest = builder.build(ensure_ascii=False)
|
|
manifest._validate()
|
|
|
|
for blob_digest in manifest.local_blob_digests:
|
|
_populate_blob(blob_digest)
|
|
|
|
# Create the manifest in the database.
|
|
repository_ref = registry_model.lookup_repository('devtable', 'simple')
|
|
created_manifest, _ = registry_model.create_manifest_and_retarget_tag(repository_ref, manifest,
|
|
'latest', storage)
|
|
assert created_manifest
|
|
assert created_manifest.digest == manifest.digest
|
|
assert (created_manifest.internal_manifest_bytes.as_encoded_str() ==
|
|
manifest.bytes.as_encoded_str())
|
|
|
|
# Look it up again and validate.
|
|
found = registry_model.lookup_manifest_by_digest(repository_ref, manifest.digest, allow_dead=True)
|
|
assert found
|
|
assert found.digest == manifest.digest
|
|
assert found.internal_manifest_bytes.as_encoded_str() == manifest.bytes.as_encoded_str()
|
|
assert found.get_parsed_manifest().digest == manifest.digest
|
|
|
|
|
|
def test_list_repository_tags(oci_model):
|
|
repository_ref = oci_model.lookup_repository('devtable', 'simple')
|
|
latest_tag = oci_model.get_repo_tag(repository_ref, 'latest')
|
|
manifest = oci_model.get_manifest_for_tag(latest_tag)
|
|
|
|
tag_count = 500
|
|
|
|
# Create a bunch of tags.
|
|
tags_expected = set()
|
|
for index in range(0, tag_count):
|
|
tags_expected.add('somenewtag%s' % index)
|
|
oci_model.retarget_tag(repository_ref, 'somenewtag%s' % index, manifest, storage)
|
|
|
|
# List the tags.
|
|
tags_found = set()
|
|
tag_id = None
|
|
while True:
|
|
tags = oci_model.list_repository_tags(repository_ref, start_pagination_id=tag_id,
|
|
limit=11, sort_tags=True)
|
|
assert len(tags) <= 11
|
|
for tag in tags[0:10]:
|
|
assert tag.name not in tags_found
|
|
if tag.name in tags_expected:
|
|
tags_expected.remove(tag.name)
|
|
|
|
if len(tags) < 11:
|
|
break
|
|
|
|
tag_id = tags[10].id
|
|
|
|
# Make sure we've found all the tags.
|
|
assert not tags_expected
|