3c8b87e086
All registry_tests now pass
146 lines
6.7 KiB
Python
146 lines
6.7 KiB
Python
import unittest
|
|
import hashlib
|
|
|
|
from app import app, storage, docker_v2_signing_key
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from data import model, database
|
|
from endpoints.v2.manifest import _write_manifest
|
|
from image.docker.schema1 import DockerSchema1ManifestBuilder
|
|
|
|
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
REPO = 'simple'
|
|
FIRST_TAG = 'first'
|
|
SECOND_TAG = 'second'
|
|
THIRD_TAG = 'third'
|
|
|
|
|
|
class TestManifests(unittest.TestCase):
|
|
@staticmethod
|
|
def _set_tag_expiration_policy(namespace, expiration_s):
|
|
namespace_user = model.user.get_user(namespace)
|
|
model.user.change_user_tag_expiration(namespace_user, expiration_s)
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
|
|
self._set_tag_expiration_policy(ADMIN_ACCESS_USER, 0)
|
|
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def _perform_cleanup(self):
|
|
database.RepositoryTag.delete().where(database.RepositoryTag.hidden == True).execute()
|
|
repo_object = model.repository.get_repository(ADMIN_ACCESS_USER, REPO)
|
|
model.repository.garbage_collect_repo(repo_object)
|
|
|
|
def test_missing_link(self):
|
|
""" Tests for a corner case that could result in missing a link to a blob referenced by a
|
|
manifest. The test exercises the case as follows:
|
|
|
|
1) Push a manifest of a single layer with a Docker ID `FIRST_ID`, pointing
|
|
to blob `FIRST_BLOB`. The database should contain the tag referencing the layer, with
|
|
no changed ID and the blob not being GCed.
|
|
|
|
2) Push a manifest of two layers:
|
|
|
|
Layer 1: `FIRST_ID` with blob `SECOND_BLOB`: Will result in a new synthesized ID
|
|
Layer 2: `SECOND_ID` with blob `THIRD_BLOB`: Will result in `SECOND_ID` pointing to the
|
|
`THIRD_BLOB`, with a parent pointing to the new synthesized ID's layer.
|
|
|
|
3) Push a manifest of two layers:
|
|
|
|
Layer 1: `THIRD_ID` with blob `FOURTH_BLOB`: Will result in a new `THIRD_ID` layer
|
|
Layer 2: `FIRST_ID` with blob `THIRD_BLOB`: Since `FIRST_ID` already points to `SECOND_BLOB`,
|
|
this will synthesize a new ID. With the current bug, the synthesized ID will match
|
|
that of `SECOND_ID`, leaving `THIRD_ID` unlinked and therefore, after a GC, missing
|
|
`FOURTH_BLOB`.
|
|
"""
|
|
location_name = storage.preferred_locations[0]
|
|
location = database.ImageStorageLocation.get(name=location_name)
|
|
|
|
# Create first blob.
|
|
first_blob_sha = 'sha256:' + hashlib.sha256("FIRST").hexdigest()
|
|
model.blob.store_blob_record_and_temp_link(ADMIN_ACCESS_USER, REPO, first_blob_sha, location, 0, 0, 0)
|
|
|
|
# Push the first manifest.
|
|
first_manifest = (DockerSchema1ManifestBuilder(ADMIN_ACCESS_USER, REPO, FIRST_TAG)
|
|
.add_layer(first_blob_sha, '{"id": "first"}')
|
|
.build(docker_v2_signing_key))
|
|
|
|
_write_manifest(ADMIN_ACCESS_USER, REPO, first_manifest)
|
|
|
|
# Delete all temp tags and perform GC.
|
|
self._perform_cleanup()
|
|
|
|
# Ensure that the first blob still exists, along with the first tag.
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, first_blob_sha))
|
|
self.assertIsNotNone(model.tag.load_tag_manifest(ADMIN_ACCESS_USER, REPO, FIRST_TAG))
|
|
self.assertEquals("first", model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, FIRST_TAG).docker_image_id)
|
|
|
|
# Create the second and third blobs.
|
|
second_blob_sha = 'sha256:' + hashlib.sha256("SECOND").hexdigest()
|
|
third_blob_sha = 'sha256:' + hashlib.sha256("THIRD").hexdigest()
|
|
|
|
model.blob.store_blob_record_and_temp_link(ADMIN_ACCESS_USER, REPO, second_blob_sha, location, 0, 0, 0)
|
|
model.blob.store_blob_record_and_temp_link(ADMIN_ACCESS_USER, REPO, third_blob_sha, location, 0, 0, 0)
|
|
|
|
# Push the second manifest.
|
|
second_manifest = (DockerSchema1ManifestBuilder(ADMIN_ACCESS_USER, REPO, SECOND_TAG)
|
|
.add_layer(third_blob_sha, '{"id": "second", "parent": "first"}')
|
|
.add_layer(second_blob_sha, '{"id": "first"}')
|
|
.build(docker_v2_signing_key))
|
|
|
|
_write_manifest(ADMIN_ACCESS_USER, REPO, second_manifest)
|
|
|
|
# Delete all temp tags and perform GC.
|
|
self._perform_cleanup()
|
|
|
|
# Ensure that the first and second blobs still exists, along with the second tag.
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, first_blob_sha))
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, second_blob_sha))
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, third_blob_sha))
|
|
|
|
self.assertIsNotNone(model.tag.load_tag_manifest(ADMIN_ACCESS_USER, REPO, FIRST_TAG))
|
|
self.assertIsNotNone(model.tag.load_tag_manifest(ADMIN_ACCESS_USER, REPO, SECOND_TAG))
|
|
|
|
self.assertEquals("first", model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, FIRST_TAG).docker_image_id)
|
|
|
|
# Ensure the IDs have changed.
|
|
self.assertNotEquals("first", model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, SECOND_TAG).parent.docker_image_id)
|
|
self.assertNotEquals("second", model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, SECOND_TAG).docker_image_id)
|
|
|
|
# Create the fourth blob.
|
|
fourth_blob_sha = 'sha256:' + hashlib.sha256("FOURTH").hexdigest()
|
|
model.blob.store_blob_record_and_temp_link(ADMIN_ACCESS_USER, REPO, fourth_blob_sha, location, 0, 0, 0)
|
|
|
|
# Push the third manifest.
|
|
third_manifest = (DockerSchema1ManifestBuilder(ADMIN_ACCESS_USER, REPO, THIRD_TAG)
|
|
.add_layer(third_blob_sha, '{"id": "second", "parent": "first"}')
|
|
.add_layer(fourth_blob_sha, '{"id": "first"}') # Note the change in BLOB from the second manifest.
|
|
.build(docker_v2_signing_key))
|
|
|
|
_write_manifest(ADMIN_ACCESS_USER, REPO, third_manifest)
|
|
|
|
# Delete all temp tags and perform GC.
|
|
self._perform_cleanup()
|
|
|
|
# Ensure all blobs are present.
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, first_blob_sha))
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, second_blob_sha))
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, third_blob_sha))
|
|
self.assertIsNotNone(model.blob.get_repo_blob_by_digest(ADMIN_ACCESS_USER, REPO, fourth_blob_sha))
|
|
|
|
# Ensure new synthesized IDs were created.
|
|
self.assertNotEquals(
|
|
model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, SECOND_TAG).docker_image_id,
|
|
model.tag.get_tag_image(ADMIN_ACCESS_USER, REPO, THIRD_TAG).docker_image_id)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|