diff --git a/data/model/storage.py b/data/model/storage.py index 0f2220f91..e22ca58a3 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -180,7 +180,8 @@ def lookup_repo_storages_by_content_checksum(repo, checksums): for counter, checksum in enumerate(set(checksums)): query_alias = 'q{0}'.format(counter) candidate_subq = (ImageStorage - .select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size) + .select(ImageStorage.id, ImageStorage.content_checksum, + ImageStorage.image_size, ImageStorage.uuid) .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum == checksum) .limit(1) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 13b89913d..5bcdd89aa 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -1,13 +1,12 @@ import logging import json -import features from flask import make_response, request, session, Response, redirect, abort as flask_abort from functools import wraps from datetime import datetime from time import time -from app import storage as store, image_replication_queue, app +from app import storage as store, app from auth.auth import process_auth, extract_namespace_repo_from_session from auth.auth_context import get_authenticated_user from auth.registry_jwt_auth import get_granted_username @@ -21,6 +20,7 @@ from util.registry import gzipstream from util.registry.torrent import PieceHasher from endpoints.v1 import v1_bp from endpoints.decorators import anon_protect +from util.registry.replication import queue_storage_replication logger = logging.getLogger(__name__) @@ -43,12 +43,7 @@ def _finish_image(namespace, repository, repo_image): set_uploading_flag(repo_image, False) # Send a job to the work queue to replicate the image layer. - if features.STORAGE_REPLICATION: - repo = model.repository.get_repository(namespace, repository) - image_replication_queue.put([repo_image.storage.uuid], json.dumps({ - 'namespace_user_id': repo.namespace_user.id, - 'storage_id': repo_image.storage.uuid, - })) + queue_storage_replication(namespace, repo_image.storage) def require_completion(f): diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 45eae6207..43547a1a9 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -23,6 +23,7 @@ from endpoints.v2.errors import (BlobUnknown, ManifestInvalid, ManifestUnknown, NameInvalid) from endpoints.trackhelper import track_and_log from endpoints.notificationhelper import spawn_notification +from util.registry.replication import queue_storage_replication from digest import digest_tools from data import model from data.database import RepositoryTag @@ -441,7 +442,6 @@ def _write_manifest_itself(namespace_name, repo_name, manifest): image = model.image.synthesize_v1_image(repo, blob_storage, working_docker_id, v1_mdata.created, v1_mdata.comment, v1_mdata.command, v1_metadata_json, parent_image) - images_map[v1_mdata.docker_id] = image if not layers: @@ -453,6 +453,15 @@ def _write_manifest_itself(namespace_name, repo_name, manifest): leaf_layer_id = images_map[layers[-1].v1_metadata.docker_id].docker_image_id model.tag.store_tag_manifest(namespace_name, repo_name, tag_name, leaf_layer_id, manifest_digest, manifest.bytes) + + # Queue all blob manifests for replication. + # TODO(jschorr): Find a way to optimize this insertion. + if features.STORAGE_REPLICATION: + for mdata in layers: + digest_str = str(mdata.digest) + blob_storage = storage_map.get(digest_str) + queue_storage_replication(namespace_name, blob_storage) + return (repo, tag_name, manifest_digest) diff --git a/test/registry_tests.py b/test/registry_tests.py index 1776cee81..559be2cef 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -12,14 +12,14 @@ import gpgme import Crypto.Random from cachetools import lru_cache -from flask import request, jsonify +from flask import request, jsonify, abort from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.backends import default_backend from app import app, storage -from data.database import close_db_filter, configure, DerivedStorageForImage +from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image from data import model from endpoints.v1 import v1_bp from endpoints.v2 import v2_bp @@ -75,6 +75,13 @@ def set_fakestorage_directdownload(enabled): return 'OK' +@testbp.route('/storagerepentry/', methods=['GET']) +def get_storage_replication_entry(image_id): + image = Image.get(docker_image_id=image_id) + QueueItem.select().where(QueueItem.queue_name ** ('%' + image.storage.uuid + '%')).get() + return 'OK' + + @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features @@ -1041,12 +1048,37 @@ class RegistryTestsMixin(object): self.do_pull('', 'newrepo', 'devtable', 'password') self.do_pull('library', 'newrepo', 'devtable', 'password') + def test_library_disabled(self): with TestFeature(self, 'LIBRARY_SUPPORT', False): self.do_push('library', 'newrepo', 'devtable', 'password') self.do_pull('library', 'newrepo', 'devtable', 'password') + def test_image_replication(self): + with TestFeature(self, 'STORAGE_REPLICATION', True): + images = [ + { + 'id': 'baseid', + 'contents': 'The base image', + }, + { + 'id': 'latestid', + 'contents': 'The latest image', + 'unicode': u'the Pawe\xc5\x82 Kami\xc5\x84ski image', + 'parent': 'baseid', + }, + ] + + # Push a new repository. + self.do_push('public', 'newrepo', 'public', 'password', images=images) + + # Ensure that we have a storage replication entry for each image pushed. + self.conduct('GET', '/__test/storagerepentry/baseid', expected_code=200) + self.conduct('GET', '/__test/storagerepentry/latestid', expected_code=200) + + + class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 registry. """ diff --git a/util/backfillreplication.py b/util/backfillreplication.py new file mode 100644 index 000000000..f13c61244 --- /dev/null +++ b/util/backfillreplication.py @@ -0,0 +1,30 @@ +import logging +import features + +from endpoints.replication import queue_storage_replication +from data.database import Image, ImageStorage, Repository, User + +def backfill_replication(): + encountered = set() + query = (Image.select(Image, ImageStorage, Repository, User) + .join(ImageStorage) + .switch(Image) + .join(Repository) + .join(User)) + + for image in query: + if image.storage.uuid in encountered: + continue + + print "Enqueueing image storage %s to be replicated" % (image.storage.uuid) + encountered.add(image.storage.uuid) + queue_storage_replication(image.repository.namespace_user.username, image.storage) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + if not features.STORAGE_REPLICATION: + print "Storage replication is not enabled" + else: + backfill_replication() + diff --git a/util/registry/replication.py b/util/registry/replication.py new file mode 100644 index 000000000..b20be90f8 --- /dev/null +++ b/util/registry/replication.py @@ -0,0 +1,14 @@ +import features +import json +from data import model + +from app import image_replication_queue + +def queue_storage_replication(namespace, storage): + """ Queues replication for the given image storage under the given namespace (if enabled). """ + if features.STORAGE_REPLICATION: + namespace_user = model.user.get_namespace_user(namespace) + image_replication_queue.put([storage.uuid], json.dumps({ + 'namespace_user_id': namespace_user.id, + 'storage_id': storage.uuid, + }))