diff --git a/app.py b/app.py index d87c4e89a..b42b4f5f7 100644 --- a/app.py +++ b/app.py @@ -136,6 +136,7 @@ google_login = GoogleOAuthConfig(app.config, 'GOOGLE_LOGIN_CONFIG') oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login] image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'], tf) +image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, reporter=MetricQueueReporter(metric_queue)) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) diff --git a/conf/init/service/storagereplication/log/run b/conf/init/service/storagereplication/log/run new file mode 100755 index 000000000..adcd2b63f --- /dev/null +++ b/conf/init/service/storagereplication/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t storagereplication \ No newline at end of file diff --git a/conf/init/service/storagereplication/run b/conf/init/service/storagereplication/run new file mode 100755 index 000000000..ed62731f8 --- /dev/null +++ b/conf/init/service/storagereplication/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting storage replication worker' + +cd / +venv/bin/python -m workers.storagereplication 2>&1 + +echo 'Repository storage replication exited' \ No newline at end of file diff --git a/config.py b/config.py index 78d614b80..a1ecb2d52 100644 --- a/config.py +++ b/config.py @@ -130,6 +130,7 @@ class DefaultConfig(object): NOTIFICATION_QUEUE_NAME = 'notification' DIFFS_QUEUE_NAME = 'imagediff' DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' + REPLICATION_QUEUE_NAME = 'imagestoragereplication' # Super user config. Note: This MUST BE an empty list for the default config. SUPER_USERS = [] @@ -180,6 +181,9 @@ class DefaultConfig(object): # basic auth. FEATURE_REQUIRE_ENCRYPTED_BASIC_AUTH = False + # Feature Flag: Whether to automatically replicate between storage engines. + FEATURE_STORAGE_REPLICATION = False + BUILD_MANAGER = ('enterprise', {}) DISTRIBUTED_STORAGE_CONFIG = { @@ -188,6 +192,7 @@ class DefaultConfig(object): } DISTRIBUTED_STORAGE_PREFERENCE = ['local_us'] + DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS = ['local_us'] # Health checker. HEALTH_CHECKER = ('LocalHealthCheck', {}) diff --git a/data/database.py b/data/database.py index 6309c4eb4..cc802ac59 100644 --- a/data/database.py +++ b/data/database.py @@ -539,6 +539,15 @@ class ImageStoragePlacement(BaseModel): ) +class UserRegion(BaseModel): + user = QuayUserField(index=True, allows_robots=False) + location = ForeignKeyField(ImageStorageLocation) + + indexes = ( + (('user', 'location'), True), + ) + + class Image(BaseModel): # This class is intentionally denormalized. Even though images are supposed # to be globally unique we can't treat them as such for permissions and @@ -751,4 +760,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification, RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, - AccessTokenKind, Star, RepositoryActionCount, TagManifest] + AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion] diff --git a/data/migrations/versions/9512773a4a2_add_userregion_table.py b/data/migrations/versions/9512773a4a2_add_userregion_table.py new file mode 100644 index 000000000..212110054 --- /dev/null +++ b/data/migrations/versions/9512773a4a2_add_userregion_table.py @@ -0,0 +1,35 @@ +"""Add UserRegion table + +Revision ID: 9512773a4a2 +Revises: 499f6f08de3 +Create Date: 2015-09-01 14:17:08.628052 + +""" + +# revision identifiers, used by Alembic. +revision = '9512773a4a2' +down_revision = '499f6f08de3' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.create_table('userregion', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('location_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['location_id'], ['imagestoragelocation.id'], name=op.f('fk_userregion_location_id_imagestoragelocation')), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_userregion_user_id_user')), + sa.PrimaryKeyConstraint('id', name=op.f('pk_userregion')) + ) + op.create_index('userregion_location_id', 'userregion', ['location_id'], unique=False) + op.create_index('userregion_user_id', 'userregion', ['user_id'], unique=False) + ### end Alembic commands ### + + +def downgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('userregion') + ### end Alembic commands ### diff --git a/data/model/storage.py b/data/model/storage.py index d1ab07b85..97b94ed4e 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -11,6 +11,12 @@ from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStorag logger = logging.getLogger(__name__) +def add_storage_placement(storage, location_name): + """ Adds a storage placement for the given storage at the given location. """ + location = ImageStorageLocation.get(name=location_name) + ImageStoragePlacement.create(location=location, storage=storage) + + def find_or_create_derived_storage(source, transformation_name, preferred_location): existing = find_derived_storage(source, transformation_name) if existing is not None: diff --git a/data/model/user.py b/data/model/user.py index e5b34a099..1a7709ec7 100644 --- a/data/model/user.py +++ b/data/model/user.py @@ -8,7 +8,8 @@ from datetime import datetime, timedelta from data.database import (User, LoginService, FederatedLogin, RepositoryPermission, TeamMember, Team, Repository, TupleSelector, TeamRole, Namespace, Visibility, - EmailConfirmation, Role, db_for_update, random_string_generator) + EmailConfirmation, Role, db_for_update, random_string_generator, + UserRegion, ImageStorageLocation) from data.model import (DataModelException, InvalidPasswordException, InvalidRobotException, InvalidUsernameException, InvalidEmailAddressException, TooManyUsersException, TooManyLoginAttemptsException, db_transaction, @@ -463,6 +464,13 @@ def get_user_by_id(user_db_id): return None +def get_namespace_user_by_user_id(namespace_user_db_id): + try: + return User.get(User.id == namespace_user_db_id, User.robot == False) + except User.DoesNotExist: + raise InvalidUsernameException('User with id does not exist: %s' % namespace_user_db_id) + + def get_namespace_by_user_id(namespace_user_db_id): try: return User.get(User.id == namespace_user_db_id, User.robot == False).username @@ -664,3 +672,8 @@ def get_pull_credentials(robotname): 'registry': '%s://%s/v1/' % (config.app_config['PREFERRED_URL_SCHEME'], config.app_config['SERVER_HOSTNAME']), } + +def get_region_locations(user): + """ Returns the locations defined as preferred storage for the given user. """ + query = UserRegion.select().join(ImageStorageLocation).where(UserRegion.user == user) + return set([region.location.name for region in query]) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 2241a6089..7267fa0df 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -1,12 +1,13 @@ import logging import json +import features from flask import make_response, request, session, Response, redirect, abort as flask_abort from functools import wraps from datetime import datetime from time import time -from app import storage as store, image_diff_queue, app +from app import storage as store, image_diff_queue, image_replication_queue, app from auth.auth import process_auth, extract_namespace_repo_from_session from auth.auth_context import get_authenticated_user, get_grant_user_context from digest import checksums @@ -55,6 +56,30 @@ def set_uploading_flag(repo_image, is_image_uploading): repo_image.storage.save() +def _finish_image(namespace, repository, repo_image): + # Checksum is ok, we remove the marker + set_uploading_flag(repo_image, False) + + image_id = repo_image.docker_image_id + + # The layer is ready for download, send a job to the work queue to + # process it. + logger.debug('Adding layer to diff queue') + repo = model.repository.get_repository(namespace, repository) + image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({ + 'namespace_user_id': repo.namespace_user.id, + 'repository': repository, + 'image_id': image_id, + })) + + # Send a job to the work queue to replicate the image layer. + if features.STORAGE_REPLICATION: + image_replication_queue.put([repo_image.storage.uuid], json.dumps({ + 'namespace_user_id': repo.namespace_user.id, + 'storage_id': repo_image.storage.uuid, + })) + + def require_completion(f): """This make sure that the image push correctly finished.""" @wraps(f) @@ -260,18 +285,8 @@ def put_image_layer(namespace, repository, image_id): abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Checksum is ok, we remove the marker - set_uploading_flag(repo_image, False) - - # The layer is ready for download, send a job to the work queue to - # process it. - logger.debug('Adding layer to diff queue') - repo = model.repository.get_repository(namespace, repository) - image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({ - 'namespace_user_id': repo.namespace_user.id, - 'repository': repository, - 'image_id': image_id, - })) + # Mark the image as uploaded. + _finish_image(namespace, repository, repo_image) return make_response('true', 200) @@ -335,18 +350,8 @@ def put_image_checksum(namespace, repository, image_id): abort(400, 'Checksum mismatch for image: %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Checksum is ok, we remove the marker - set_uploading_flag(repo_image, False) - - # The layer is ready for download, send a job to the work queue to - # process it. - logger.debug('Adding layer to diff queue') - repo = model.repository.get_repository(namespace, repository) - image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({ - 'namespace_user_id': repo.namespace_user.id, - 'repository': repository, - 'image_id': image_id, - })) + # Mark the image as uploaded. + _finish_image(namespace, repository, repo_image) return make_response('true', 200) diff --git a/initdb.py b/initdb.py index 2de817bca..be50d12c4 100644 --- a/initdb.py +++ b/initdb.py @@ -19,6 +19,7 @@ from data.database import (db, all_models, Role, TeamRole, Visibility, LoginServ ExternalNotificationEvent, ExternalNotificationMethod, NotificationKind) from data import model from app import app, storage as store +from storage.basestorage import StoragePaths from workers import repositoryactioncounter @@ -84,6 +85,17 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): new_image.storage.checksum = checksum new_image.storage.save() + # Write some data for the storage. + if os.environ.get('WRITE_STORAGE_FILES'): + storage_paths = StoragePaths() + paths = [storage_paths.image_json_path, + storage_paths.image_ancestry_path, + storage_paths.image_layer_path] + + for path_builder in paths: + path = path_builder(new_image.storage.uuid) + store.put_content('local_us', path, checksum) + creation_time = REFERENCE_DATE + timedelta(weeks=image_num) + timedelta(days=model_num) command_list = SAMPLE_CMDS[image_num % len(SAMPLE_CMDS)] command = json.dumps(command_list) if command_list else None diff --git a/storage/__init__.py b/storage/__init__.py index 69f26def4..354430603 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -39,7 +39,8 @@ class Storage(object): if not preference: preference = storages.keys() - d_storage = DistributedStorage(storages, preference) + default_locations = app.config.get('DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS') or [] + d_storage = DistributedStorage(storages, preference, default_locations) # register extension with app app.extensions = getattr(app, 'extensions', {}) diff --git a/storage/basestorage.py b/storage/basestorage.py index 756734241..e085a5a08 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -98,6 +98,9 @@ class BaseStorage(StoragePaths): def get_checksum(self, path): raise NotImplementedError + def copy_to(self, destination, path): + raise NotImplementedError + class DigestInvalidException(RuntimeError): pass @@ -119,6 +122,3 @@ class BaseStorageV2(BaseStorage): """ Complete the chunked upload and store the final results in the path indicated. """ raise NotImplementedError - - - diff --git a/storage/cloud.py b/storage/cloud.py index 418a930ac..89ebf53aa 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -222,6 +222,28 @@ class _CloudStorage(BaseStorage): return k.etag[1:-1][:7] + def copy_to(self, destination, path): + # First try to copy directly via boto, but only if the storages are the + # same type, with the same access information. + if (self.__class__ == destination.__class__ and + self._access_key == destination._access_key and + self._secret_key == destination._secret_key): + logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket, + destination._cloud_bucket) + + source_path = self._init_path(path) + source_key = self._key_class(self._cloud_bucket, source_path) + + dest_path = destination._init_path(path) + source_key.copy(destination._cloud_bucket, dest_path) + return + + # Fallback to a slower, default copy. + logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket, + destination) + with self.stream_read_file(path) as fp: + destination.stream_write(path, fp) + class S3Storage(_CloudStorage): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): @@ -252,7 +274,6 @@ class S3Storage(_CloudStorage): """) - class GoogleCloudStorage(_CloudStorage): def __init__(self, storage_path, access_key, secret_key, bucket_name): upload_params = {} diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 26a0f5dbd..49f32c559 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -26,9 +26,15 @@ def _location_aware(unbound_func): class DistributedStorage(StoragePaths): - def __init__(self, storages, preferred_locations=[]): + def __init__(self, storages, preferred_locations=[], default_locations=[]): self._storages = dict(storages) self.preferred_locations = list(preferred_locations) + self.default_locations = list(default_locations) + + @property + def locations(self): + """ Returns the names of the locations supported. """ + return list(self._storages.keys()) get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url) get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url) @@ -42,6 +48,14 @@ class DistributedStorage(StoragePaths): remove = _location_aware(BaseStorage.remove) get_checksum = _location_aware(BaseStorage.get_checksum) get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads) + initiate_chunked_upload = _location_aware(BaseStorageV2.initiate_chunked_upload) stream_upload_chunk = _location_aware(BaseStorageV2.stream_upload_chunk) complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload) + + def copy_between(self, path, source_location, destination_location): + """ Copies a file between the source location and the destination location. """ + source_storage = self._storages[source_location] + destination_storage = self._storages[destination_location] + source_storage.copy_to(destination_storage, path) + diff --git a/storage/fakestorage.py b/storage/fakestorage.py index f351ca150..b4f27be32 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -1,4 +1,5 @@ from storage.basestorage import BaseStorage +from cStringIO import StringIO _FAKE_STORAGE_MAP = {} @@ -18,6 +19,9 @@ class FakeStorage(BaseStorage): def stream_read(self, path): yield _FAKE_STORAGE_MAP[path] + def stream_read_file(self, path): + return StringIO(_FAKE_STORAGE_MAP[path]) + def stream_write(self, path, fp, content_type=None, content_encoding=None): _FAKE_STORAGE_MAP[path] = fp.read() diff --git a/storage/local.py b/storage/local.py index fc2b79563..b2cbc458c 100644 --- a/storage/local.py +++ b/storage/local.py @@ -112,11 +112,9 @@ class LocalStorage(BaseStorageV2): sha_hash.update(buf) return sha_hash.hexdigest()[:7] - def _rel_upload_path(self, uuid): return 'uploads/{0}'.format(uuid) - def initiate_chunked_upload(self): new_uuid = str(uuid4()) @@ -162,3 +160,7 @@ class LocalStorage(BaseStorageV2): raise Exception('Storage path %s is not under a mounted volume.\n\n' 'Registry data must be stored under a mounted volume ' 'to prevent data loss' % self._root_path) + + def copy_to(self, destination, path): + with self.stream_read_file(path) as fp: + destination.stream_write(path, fp) diff --git a/test/data/test.db b/test/data/test.db index 45095cb99..ec9242221 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/workers/storagereplication.py b/workers/storagereplication.py new file mode 100644 index 000000000..9ddb192b7 --- /dev/null +++ b/workers/storagereplication.py @@ -0,0 +1,86 @@ +import logging +import features +import time + +from app import app, storage, image_replication_queue +from data.database import UseThenDisconnect, CloseForLongOperation +from data import model +from storage.basestorage import StoragePaths +from workers.queueworker import QueueWorker + +logger = logging.getLogger(__name__) + +POLL_PERIOD_SECONDS = 10 + +class StorageReplicationWorker(QueueWorker): + def process_queue_item(self, job_details): + storage_uuid = job_details['storage_id'] + logger.debug('Starting replication of image storage %s', storage_uuid) + + namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id']) + + # Lookup the namespace and its associated regions. + if not namespace: + logger.debug('Unknown namespace: %s', namespace) + return True + + locations = model.user.get_region_locations(namespace) + + # Lookup the image storage. + partial_storage = model.storage.get_storage_by_uuid(storage_uuid) + if not partial_storage: + logger.debug('Unknown storage: %s', storage_uuid) + return True + + # Check to see if the image is at all the required locations. + locations_required = locations | set(storage.default_locations) + locations_missing = locations_required - set(partial_storage.locations) + + if not locations_missing: + logger.debug('No missing locations for storage %s under namespace %s', + storage_uuid, namespace.username) + return True + + # For any missing storage locations, initiate a copy. + storage_paths = StoragePaths() + existing_location = list(partial_storage.locations)[0] + + for location in locations_missing: + logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location) + + # Copy the various paths. + paths = [storage_paths.image_json_path, + storage_paths.image_ancestry_path, + storage_paths.image_layer_path] + + try: + for path_builder in paths: + current_path = path_builder(partial_storage.uuid) + with CloseForLongOperation(app.config): + storage.copy_between(current_path, existing_location, location) + except: + logger.exception('Exception when copying path %s of image storage %s to location %s', + current_path, partial_storage.uuid, location) + return False + + # Create the storage location record for the storage now that the copies have + # completed. + model.storage.add_storage_placement(partial_storage, location) + logger.debug('Finished copy of image storage %s to location %s', + partial_storage.uuid, location) + + logger.debug('Completed replication of image storage %s to locations %s', + partial_storage.uuid, locations_missing) + return True + + +if __name__ == "__main__": + if not features.STORAGE_REPLICATION: + logger.debug('Full storage replication disabled; skipping') + while True: + time.sleep(10000) + + logger.debug('Starting replication worker') + worker = StorageReplicationWorker(image_replication_queue, + poll_period_seconds=POLL_PERIOD_SECONDS) + worker.start()