Enable storage replication for V2 and add backfill tool

Fixes #1501
This commit is contained in:
Joseph Schorr 2016-05-31 16:43:49 -04:00
parent 1ddc73416c
commit 12924784ce
6 changed files with 93 additions and 12 deletions

View file

@ -180,7 +180,8 @@ def lookup_repo_storages_by_content_checksum(repo, checksums):
for counter, checksum in enumerate(set(checksums)):
query_alias = 'q{0}'.format(counter)
candidate_subq = (ImageStorage
.select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size)
.select(ImageStorage.id, ImageStorage.content_checksum,
ImageStorage.image_size, ImageStorage.uuid)
.join(Image)
.where(Image.repository == repo, ImageStorage.content_checksum == checksum)
.limit(1)

View file

@ -1,13 +1,12 @@
import logging
import json
import features
from flask import make_response, request, session, Response, redirect, abort as flask_abort
from functools import wraps
from datetime import datetime
from time import time
from app import storage as store, image_replication_queue, app
from app import storage as store, app
from auth.auth import process_auth, extract_namespace_repo_from_session
from auth.auth_context import get_authenticated_user
from auth.registry_jwt_auth import get_granted_username
@ -21,6 +20,7 @@ from util.registry import gzipstream
from util.registry.torrent import PieceHasher
from endpoints.v1 import v1_bp
from endpoints.decorators import anon_protect
from util.registry.replication import queue_storage_replication
logger = logging.getLogger(__name__)
@ -43,12 +43,7 @@ def _finish_image(namespace, repository, repo_image):
set_uploading_flag(repo_image, False)
# Send a job to the work queue to replicate the image layer.
if features.STORAGE_REPLICATION:
repo = model.repository.get_repository(namespace, repository)
image_replication_queue.put([repo_image.storage.uuid], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'storage_id': repo_image.storage.uuid,
}))
queue_storage_replication(namespace, repo_image.storage)
def require_completion(f):

View file

@ -23,6 +23,7 @@ from endpoints.v2.errors import (BlobUnknown, ManifestInvalid, ManifestUnknown,
NameInvalid)
from endpoints.trackhelper import track_and_log
from endpoints.notificationhelper import spawn_notification
from util.registry.replication import queue_storage_replication
from digest import digest_tools
from data import model
from data.database import RepositoryTag
@ -441,7 +442,6 @@ def _write_manifest_itself(namespace_name, repo_name, manifest):
image = model.image.synthesize_v1_image(repo, blob_storage, working_docker_id,
v1_mdata.created, v1_mdata.comment, v1_mdata.command,
v1_metadata_json, parent_image)
images_map[v1_mdata.docker_id] = image
if not layers:
@ -453,6 +453,15 @@ def _write_manifest_itself(namespace_name, repo_name, manifest):
leaf_layer_id = images_map[layers[-1].v1_metadata.docker_id].docker_image_id
model.tag.store_tag_manifest(namespace_name, repo_name, tag_name, leaf_layer_id, manifest_digest,
manifest.bytes)
# Queue all blob manifests for replication.
# TODO(jschorr): Find a way to optimize this insertion.
if features.STORAGE_REPLICATION:
for mdata in layers:
digest_str = str(mdata.digest)
blob_storage = storage_map.get(digest_str)
queue_storage_replication(namespace_name, blob_storage)
return (repo, tag_name, manifest_digest)

View file

@ -12,14 +12,14 @@ import gpgme
import Crypto.Random
from cachetools import lru_cache
from flask import request, jsonify
from flask import request, jsonify, abort
from flask.blueprints import Blueprint
from flask.ext.testing import LiveServerTestCase
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from app import app, storage
from data.database import close_db_filter, configure, DerivedStorageForImage
from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
from data import model
from endpoints.v1 import v1_bp
from endpoints.v2 import v2_bp
@ -75,6 +75,13 @@ def set_fakestorage_directdownload(enabled):
return 'OK'
@testbp.route('/storagerepentry/<image_id>', methods=['GET'])
def get_storage_replication_entry(image_id):
image = Image.get(docker_image_id=image_id)
QueueItem.select().where(QueueItem.queue_name ** ('%' + image.storage.uuid + '%')).get()
return 'OK'
@testbp.route('/feature/<feature_name>', methods=['POST'])
def set_feature(feature_name):
import features
@ -1041,12 +1048,37 @@ class RegistryTestsMixin(object):
self.do_pull('', 'newrepo', 'devtable', 'password')
self.do_pull('library', 'newrepo', 'devtable', 'password')
def test_library_disabled(self):
with TestFeature(self, 'LIBRARY_SUPPORT', False):
self.do_push('library', 'newrepo', 'devtable', 'password')
self.do_pull('library', 'newrepo', 'devtable', 'password')
def test_image_replication(self):
with TestFeature(self, 'STORAGE_REPLICATION', True):
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'The latest image',
'unicode': u'the Pawe\xc5\x82 Kami\xc5\x84ski image',
'parent': 'baseid',
},
]
# Push a new repository.
self.do_push('public', 'newrepo', 'public', 'password', images=images)
# Ensure that we have a storage replication entry for each image pushed.
self.conduct('GET', '/__test/storagerepentry/baseid', expected_code=200)
self.conduct('GET', '/__test/storagerepentry/latestid', expected_code=200)
class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V1 registry. """

View file

@ -0,0 +1,30 @@
import logging
import features
from endpoints.replication import queue_storage_replication
from data.database import Image, ImageStorage, Repository, User
def backfill_replication():
encountered = set()
query = (Image.select(Image, ImageStorage, Repository, User)
.join(ImageStorage)
.switch(Image)
.join(Repository)
.join(User))
for image in query:
if image.storage.uuid in encountered:
continue
print "Enqueueing image storage %s to be replicated" % (image.storage.uuid)
encountered.add(image.storage.uuid)
queue_storage_replication(image.repository.namespace_user.username, image.storage)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
if not features.STORAGE_REPLICATION:
print "Storage replication is not enabled"
else:
backfill_replication()

View file

@ -0,0 +1,14 @@
import features
import json
from data import model
from app import image_replication_queue
def queue_storage_replication(namespace, storage):
""" Queues replication for the given image storage under the given namespace (if enabled). """
if features.STORAGE_REPLICATION:
namespace_user = model.user.get_namespace_user(namespace)
image_replication_queue.put([storage.uuid], json.dumps({
'namespace_user_id': namespace_user.id,
'storage_id': storage.uuid,
}))