Merge pull request #191 from coreos-inc/carmen

Add automatic storage replication
This commit is contained in:
josephschorr 2015-09-01 15:04:36 -04:00
commit 62ea4a6cf4
18 changed files with 259 additions and 35 deletions

1
app.py
View file

@ -136,6 +136,7 @@ google_login = GoogleOAuthConfig(app.config, 'GOOGLE_LOGIN_CONFIG')
oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login]
image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'], tf)
image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
reporter=MetricQueueReporter(metric_queue))
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec logger -i -t storagereplication

View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting storage replication worker'
cd /
venv/bin/python -m workers.storagereplication 2>&1
echo 'Repository storage replication exited'

View file

@ -130,6 +130,7 @@ class DefaultConfig(object):
NOTIFICATION_QUEUE_NAME = 'notification'
DIFFS_QUEUE_NAME = 'imagediff'
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
REPLICATION_QUEUE_NAME = 'imagestoragereplication'
# Super user config. Note: This MUST BE an empty list for the default config.
SUPER_USERS = []
@ -180,6 +181,9 @@ class DefaultConfig(object):
# basic auth.
FEATURE_REQUIRE_ENCRYPTED_BASIC_AUTH = False
# Feature Flag: Whether to automatically replicate between storage engines.
FEATURE_STORAGE_REPLICATION = False
BUILD_MANAGER = ('enterprise', {})
DISTRIBUTED_STORAGE_CONFIG = {
@ -188,6 +192,7 @@ class DefaultConfig(object):
}
DISTRIBUTED_STORAGE_PREFERENCE = ['local_us']
DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS = ['local_us']
# Health checker.
HEALTH_CHECKER = ('LocalHealthCheck', {})

View file

@ -539,6 +539,15 @@ class ImageStoragePlacement(BaseModel):
)
class UserRegion(BaseModel):
user = QuayUserField(index=True, allows_robots=False)
location = ForeignKeyField(ImageStorageLocation)
indexes = (
(('user', 'location'), True),
)
class Image(BaseModel):
# This class is intentionally denormalized. Even though images are supposed
# to be globally unique we can't treat them as such for permissions and
@ -751,4 +760,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
AccessTokenKind, Star, RepositoryActionCount, TagManifest]
AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion]

View file

@ -0,0 +1,35 @@
"""Add UserRegion table
Revision ID: 9512773a4a2
Revises: 499f6f08de3
Create Date: 2015-09-01 14:17:08.628052
"""
# revision identifiers, used by Alembic.
revision = '9512773a4a2'
down_revision = '499f6f08de3'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.create_table('userregion',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('location_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['location_id'], ['imagestoragelocation.id'], name=op.f('fk_userregion_location_id_imagestoragelocation')),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_userregion_user_id_user')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_userregion'))
)
op.create_index('userregion_location_id', 'userregion', ['location_id'], unique=False)
op.create_index('userregion_user_id', 'userregion', ['user_id'], unique=False)
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_table('userregion')
### end Alembic commands ###

View file

@ -11,6 +11,12 @@ from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStorag
logger = logging.getLogger(__name__)
def add_storage_placement(storage, location_name):
""" Adds a storage placement for the given storage at the given location. """
location = ImageStorageLocation.get(name=location_name)
ImageStoragePlacement.create(location=location, storage=storage)
def find_or_create_derived_storage(source, transformation_name, preferred_location):
existing = find_derived_storage(source, transformation_name)
if existing is not None:

View file

@ -8,7 +8,8 @@ from datetime import datetime, timedelta
from data.database import (User, LoginService, FederatedLogin, RepositoryPermission, TeamMember,
Team, Repository, TupleSelector, TeamRole, Namespace, Visibility,
EmailConfirmation, Role, db_for_update, random_string_generator)
EmailConfirmation, Role, db_for_update, random_string_generator,
UserRegion, ImageStorageLocation)
from data.model import (DataModelException, InvalidPasswordException, InvalidRobotException,
InvalidUsernameException, InvalidEmailAddressException,
TooManyUsersException, TooManyLoginAttemptsException, db_transaction,
@ -463,6 +464,13 @@ def get_user_by_id(user_db_id):
return None
def get_namespace_user_by_user_id(namespace_user_db_id):
try:
return User.get(User.id == namespace_user_db_id, User.robot == False)
except User.DoesNotExist:
raise InvalidUsernameException('User with id does not exist: %s' % namespace_user_db_id)
def get_namespace_by_user_id(namespace_user_db_id):
try:
return User.get(User.id == namespace_user_db_id, User.robot == False).username
@ -664,3 +672,8 @@ def get_pull_credentials(robotname):
'registry': '%s://%s/v1/' % (config.app_config['PREFERRED_URL_SCHEME'],
config.app_config['SERVER_HOSTNAME']),
}
def get_region_locations(user):
""" Returns the locations defined as preferred storage for the given user. """
query = UserRegion.select().join(ImageStorageLocation).where(UserRegion.user == user)
return set([region.location.name for region in query])

View file

@ -1,12 +1,13 @@
import logging
import json
import features
from flask import make_response, request, session, Response, redirect, abort as flask_abort
from functools import wraps
from datetime import datetime
from time import time
from app import storage as store, image_diff_queue, app
from app import storage as store, image_diff_queue, image_replication_queue, app
from auth.auth import process_auth, extract_namespace_repo_from_session
from auth.auth_context import get_authenticated_user, get_grant_user_context
from digest import checksums
@ -55,6 +56,30 @@ def set_uploading_flag(repo_image, is_image_uploading):
repo_image.storage.save()
def _finish_image(namespace, repository, repo_image):
# Checksum is ok, we remove the marker
set_uploading_flag(repo_image, False)
image_id = repo_image.docker_image_id
# The layer is ready for download, send a job to the work queue to
# process it.
logger.debug('Adding layer to diff queue')
repo = model.repository.get_repository(namespace, repository)
image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'repository': repository,
'image_id': image_id,
}))
# Send a job to the work queue to replicate the image layer.
if features.STORAGE_REPLICATION:
image_replication_queue.put([repo_image.storage.uuid], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'storage_id': repo_image.storage.uuid,
}))
def require_completion(f):
"""This make sure that the image push correctly finished."""
@wraps(f)
@ -260,18 +285,8 @@ def put_image_layer(namespace, repository, image_id):
abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s',
issue='checksum-mismatch', image_id=image_id)
# Checksum is ok, we remove the marker
set_uploading_flag(repo_image, False)
# The layer is ready for download, send a job to the work queue to
# process it.
logger.debug('Adding layer to diff queue')
repo = model.repository.get_repository(namespace, repository)
image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'repository': repository,
'image_id': image_id,
}))
# Mark the image as uploaded.
_finish_image(namespace, repository, repo_image)
return make_response('true', 200)
@ -335,18 +350,8 @@ def put_image_checksum(namespace, repository, image_id):
abort(400, 'Checksum mismatch for image: %(image_id)s',
issue='checksum-mismatch', image_id=image_id)
# Checksum is ok, we remove the marker
set_uploading_flag(repo_image, False)
# The layer is ready for download, send a job to the work queue to
# process it.
logger.debug('Adding layer to diff queue')
repo = model.repository.get_repository(namespace, repository)
image_diff_queue.put([repo.namespace_user.username, repository, image_id], json.dumps({
'namespace_user_id': repo.namespace_user.id,
'repository': repository,
'image_id': image_id,
}))
# Mark the image as uploaded.
_finish_image(namespace, repository, repo_image)
return make_response('true', 200)

View file

@ -19,6 +19,7 @@ from data.database import (db, all_models, Role, TeamRole, Visibility, LoginServ
ExternalNotificationEvent, ExternalNotificationMethod, NotificationKind)
from data import model
from app import app, storage as store
from storage.basestorage import StoragePaths
from workers import repositoryactioncounter
@ -84,6 +85,17 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
new_image.storage.checksum = checksum
new_image.storage.save()
# Write some data for the storage.
if os.environ.get('WRITE_STORAGE_FILES'):
storage_paths = StoragePaths()
paths = [storage_paths.image_json_path,
storage_paths.image_ancestry_path,
storage_paths.image_layer_path]
for path_builder in paths:
path = path_builder(new_image.storage.uuid)
store.put_content('local_us', path, checksum)
creation_time = REFERENCE_DATE + timedelta(weeks=image_num) + timedelta(days=model_num)
command_list = SAMPLE_CMDS[image_num % len(SAMPLE_CMDS)]
command = json.dumps(command_list) if command_list else None

View file

@ -39,7 +39,8 @@ class Storage(object):
if not preference:
preference = storages.keys()
d_storage = DistributedStorage(storages, preference)
default_locations = app.config.get('DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS') or []
d_storage = DistributedStorage(storages, preference, default_locations)
# register extension with app
app.extensions = getattr(app, 'extensions', {})

View file

@ -98,6 +98,9 @@ class BaseStorage(StoragePaths):
def get_checksum(self, path):
raise NotImplementedError
def copy_to(self, destination, path):
raise NotImplementedError
class DigestInvalidException(RuntimeError):
pass
@ -119,6 +122,3 @@ class BaseStorageV2(BaseStorage):
""" Complete the chunked upload and store the final results in the path indicated.
"""
raise NotImplementedError

View file

@ -222,6 +222,28 @@ class _CloudStorage(BaseStorage):
return k.etag[1:-1][:7]
def copy_to(self, destination, path):
# First try to copy directly via boto, but only if the storages are the
# same type, with the same access information.
if (self.__class__ == destination.__class__ and
self._access_key == destination._access_key and
self._secret_key == destination._secret_key):
logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket,
destination._cloud_bucket)
source_path = self._init_path(path)
source_key = self._key_class(self._cloud_bucket, source_path)
dest_path = destination._init_path(path)
source_key.copy(destination._cloud_bucket, dest_path)
return
# Fallback to a slower, default copy.
logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket,
destination)
with self.stream_read_file(path) as fp:
destination.stream_write(path, fp)
class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
@ -252,7 +274,6 @@ class S3Storage(_CloudStorage):
</CORSRule>
</CORSConfiguration>""")
class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name):
upload_params = {}

View file

@ -26,9 +26,15 @@ def _location_aware(unbound_func):
class DistributedStorage(StoragePaths):
def __init__(self, storages, preferred_locations=[]):
def __init__(self, storages, preferred_locations=[], default_locations=[]):
self._storages = dict(storages)
self.preferred_locations = list(preferred_locations)
self.default_locations = list(default_locations)
@property
def locations(self):
""" Returns the names of the locations supported. """
return list(self._storages.keys())
get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url)
get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url)
@ -42,6 +48,14 @@ class DistributedStorage(StoragePaths):
remove = _location_aware(BaseStorage.remove)
get_checksum = _location_aware(BaseStorage.get_checksum)
get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads)
initiate_chunked_upload = _location_aware(BaseStorageV2.initiate_chunked_upload)
stream_upload_chunk = _location_aware(BaseStorageV2.stream_upload_chunk)
complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload)
def copy_between(self, path, source_location, destination_location):
""" Copies a file between the source location and the destination location. """
source_storage = self._storages[source_location]
destination_storage = self._storages[destination_location]
source_storage.copy_to(destination_storage, path)

View file

@ -1,4 +1,5 @@
from storage.basestorage import BaseStorage
from cStringIO import StringIO
_FAKE_STORAGE_MAP = {}
@ -18,6 +19,9 @@ class FakeStorage(BaseStorage):
def stream_read(self, path):
yield _FAKE_STORAGE_MAP[path]
def stream_read_file(self, path):
return StringIO(_FAKE_STORAGE_MAP[path])
def stream_write(self, path, fp, content_type=None, content_encoding=None):
_FAKE_STORAGE_MAP[path] = fp.read()

View file

@ -112,11 +112,9 @@ class LocalStorage(BaseStorageV2):
sha_hash.update(buf)
return sha_hash.hexdigest()[:7]
def _rel_upload_path(self, uuid):
return 'uploads/{0}'.format(uuid)
def initiate_chunked_upload(self):
new_uuid = str(uuid4())
@ -162,3 +160,7 @@ class LocalStorage(BaseStorageV2):
raise Exception('Storage path %s is not under a mounted volume.\n\n'
'Registry data must be stored under a mounted volume '
'to prevent data loss' % self._root_path)
def copy_to(self, destination, path):
with self.stream_read_file(path) as fp:
destination.stream_write(path, fp)

Binary file not shown.

View file

@ -0,0 +1,86 @@
import logging
import features
import time
from app import app, storage, image_replication_queue
from data.database import UseThenDisconnect, CloseForLongOperation
from data import model
from storage.basestorage import StoragePaths
from workers.queueworker import QueueWorker
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 10
class StorageReplicationWorker(QueueWorker):
def process_queue_item(self, job_details):
storage_uuid = job_details['storage_id']
logger.debug('Starting replication of image storage %s', storage_uuid)
namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id'])
# Lookup the namespace and its associated regions.
if not namespace:
logger.debug('Unknown namespace: %s', namespace)
return True
locations = model.user.get_region_locations(namespace)
# Lookup the image storage.
partial_storage = model.storage.get_storage_by_uuid(storage_uuid)
if not partial_storage:
logger.debug('Unknown storage: %s', storage_uuid)
return True
# Check to see if the image is at all the required locations.
locations_required = locations | set(storage.default_locations)
locations_missing = locations_required - set(partial_storage.locations)
if not locations_missing:
logger.debug('No missing locations for storage %s under namespace %s',
storage_uuid, namespace.username)
return True
# For any missing storage locations, initiate a copy.
storage_paths = StoragePaths()
existing_location = list(partial_storage.locations)[0]
for location in locations_missing:
logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location)
# Copy the various paths.
paths = [storage_paths.image_json_path,
storage_paths.image_ancestry_path,
storage_paths.image_layer_path]
try:
for path_builder in paths:
current_path = path_builder(partial_storage.uuid)
with CloseForLongOperation(app.config):
storage.copy_between(current_path, existing_location, location)
except:
logger.exception('Exception when copying path %s of image storage %s to location %s',
current_path, partial_storage.uuid, location)
return False
# Create the storage location record for the storage now that the copies have
# completed.
model.storage.add_storage_placement(partial_storage, location)
logger.debug('Finished copy of image storage %s to location %s',
partial_storage.uuid, location)
logger.debug('Completed replication of image storage %s to locations %s',
partial_storage.uuid, locations_missing)
return True
if __name__ == "__main__":
if not features.STORAGE_REPLICATION:
logger.debug('Full storage replication disabled; skipping')
while True:
time.sleep(10000)
logger.debug('Starting replication worker')
worker = StorageReplicationWorker(image_replication_queue,
poll_period_seconds=POLL_PERIOD_SECONDS)
worker.start()