Add the basics of geographic data distribution and get the tests to work.

This commit is contained in:
Jake Moshenko 2014-06-17 16:03:43 -04:00
parent 2bf12996f5
commit bf98575feb
23 changed files with 198 additions and 100 deletions

View file

@ -72,10 +72,6 @@ class DefaultConfig(object):
# copies.
USE_CDN = True
# Data storage
STORAGE_TYPE = 'LocalStorage'
STORAGE_PATH = 'test/data/registry'
# Authentication
AUTHENTICATION_TYPE = 'Database'
@ -149,3 +145,10 @@ class DefaultConfig(object):
# Feature Flag: Whether to support GitHub build triggers.
FEATURE_GITHUB_BUILD = False
DISTRIBUTED_STORAGE_CONFIG = {
'local_eu': ['LocalStorage', 'test/data/registry/eu'],
'local_us': ['LocalStorage', 'test/data/registry/us'],
}
DISTRIBUTED_STORAGE_PREFERENCE = ['local_us']

View file

@ -222,6 +222,22 @@ class ImageStorage(BaseModel):
uploading = BooleanField(default=True, null=True)
class ImageStorageLocation(BaseModel):
name = CharField(unique=True, index=True)
class ImageStoragePlacement(BaseModel):
storage = ForeignKeyField(ImageStorage)
location = ForeignKeyField(ImageStorageLocation)
class Meta:
database = db
indexes = (
# An image can only be placed in the same place once
(('storage', 'location'), True),
)
class Image(BaseModel):
# This class is intentionally denormalized. Even though images are supposed
# to be globally unique we can't treat them as such for permissions and
@ -341,4 +357,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
RepositoryBuild, Team, TeamMember, TeamRole, Webhook, LogEntryKind, LogEntry,
PermissionPrototype, ImageStorage, BuildTriggerService, RepositoryBuildTrigger,
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
Notification]
Notification, ImageStorageLocation, ImageStoragePlacement]

View file

@ -845,20 +845,28 @@ def get_repository(namespace_name, repository_name):
def get_repo_image(namespace_name, repository_name, image_id):
query = (Image
.select(Image, ImageStorage)
location_list = list((ImageStoragePlacement
.select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage)
.join(Image)
.join(Repository)
.switch(Image)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(Repository.name == repository_name,
Repository.namespace == namespace_name,
Image.docker_image_id == image_id))
Image.docker_image_id == image_id)))
try:
return query.get()
except Image.DoesNotExist:
if not location_list:
return None
location_names = {location.location.name for location in location_list}
image = location_list[0].storage.image
image.storage = location_list[0].storage
image.storage.locations = location_names
return image
def repository_is_public(namespace_name, repository_name):
joined = Repository.select().join(Visibility)
@ -940,7 +948,7 @@ def create_repository(namespace, name, creating_user, visibility='private'):
return repo
def __translate_ancestry(old_ancestry, translations, repository, username):
def __translate_ancestry(old_ancestry, translations, repository, username, preferred_location):
if old_ancestry == '/':
return '/'
@ -950,9 +958,8 @@ def __translate_ancestry(old_ancestry, translations, repository, username):
# Figure out which docker_image_id the old id refers to, then find a
# a local one
old = Image.select(Image.docker_image_id).where(Image.id == old_id).get()
image_in_repo = find_create_or_link_image(old.docker_image_id,
repository, username,
translations)
image_in_repo = find_create_or_link_image(old.docker_image_id, repository, username,
translations, preferred_location)
translations[old_id] = image_in_repo.id
return translations[old_id]
@ -962,8 +969,8 @@ def __translate_ancestry(old_ancestry, translations, repository, username):
return '/%s/' % '/'.join(new_ids)
def find_create_or_link_image(docker_image_id, repository, username,
translations):
def find_create_or_link_image(docker_image_id, repository, username, translations,
preferred_location):
with config.app_config['DB_TRANSACTION_FACTORY'](db):
repo_image = get_repo_image(repository.namespace, repository.name,
docker_image_id)
@ -990,20 +997,29 @@ def find_create_or_link_image(docker_image_id, repository, username,
msg = 'Linking image to existing storage with docker id: %s and uuid: %s'
logger.debug(msg, docker_image_id, to_copy.storage.uuid)
new_image_ancestry = __translate_ancestry(to_copy.ancestors,
translations, repository,
username)
new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repository,
username, preferred_location)
storage = to_copy.storage
storage.locations = {placement.location.name
for placement in storage.imagestorageplacement_set}
origin_image_id = to_copy.id
except Image.DoesNotExist:
logger.debug('Creating new storage for docker id: %s', docker_image_id)
storage = ImageStorage.create()
location = ImageStorageLocation.get(name=preferred_location)
ImageStoragePlacement.create(location=location, storage=storage)
storage.locations = {preferred_location}
logger.debug('Storage locations: %s', storage.locations)
new_image = Image.create(docker_image_id=docker_image_id,
repository=repository, storage=storage,
ancestors=new_image_ancestry)
logger.debug('new_image storage locations: %s', new_image.storage.locations)
if origin_image_id:
logger.debug('Storing translation %s -> %s', origin_image_id, new_image.id)
translations[origin_image_id] = new_image.id
@ -1130,9 +1146,14 @@ def garbage_collect_repository(namespace_name, repository_name):
for storage in storage_to_remove:
logger.debug('Garbage collecting image storage: %s', storage.uuid)
storage.delete_instance()
image_path = config.store.image_path(storage.uuid)
config.store.remove(image_path)
for placement in storage.imagestorageplacement_set:
location_name = placement.location.name
placement.delete_instance()
config.store.remove(location_name, image_path)
storage.delete_instance()
return len(to_remove)

View file

@ -82,7 +82,7 @@ class RepositoryImageChanges(RepositoryParamResource):
diffs_path = store.image_file_diffs_path(image.storage.uuid)
try:
response_json = json.loads(store.get_content(diffs_path))
response_json = json.loads(store.get_content(image.storage.locations, diffs_path))
return response_json
except IOError:
raise NotFound()

View file

@ -8,7 +8,7 @@ from collections import OrderedDict
from data import model
from data.model import oauth
from app import analytics, app, webhook_queue, authentication, userevents
from app import analytics, app, webhook_queue, authentication, userevents, storage
from auth.auth import process_auth
from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token
from util.names import parse_repository_name
@ -228,7 +228,7 @@ def create_repository(namespace, repository):
translations = {}
for image_description in added_images.values():
model.find_create_or_link_image(image_description['id'], repo, username,
translations)
translations, storage.preferred_locations[0])
profile.debug('Created images')

View file

@ -106,14 +106,14 @@ def get_image_layer(namespace, repository, image_id, headers):
path = store.image_layer_path(repo_image.storage.uuid)
profile.debug('Looking up the direct download URL')
direct_download_url = store.get_direct_download_url(path)
direct_download_url = store.get_direct_download_url(repo_image.storage.locations, path)
if direct_download_url:
profile.debug('Returning direct download URL')
return redirect(direct_download_url)
profile.debug('Streaming layer data')
return Response(store.stream_read(path), headers=headers)
return Response(store.stream_read(repo_image.storage.locations, path), headers=headers)
except (IOError, AttributeError):
profile.debug('Image not found')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
@ -136,7 +136,7 @@ def put_image_layer(namespace, repository, image_id):
try:
profile.debug('Retrieving image data')
uuid = repo_image.storage.uuid
json_data = store.get_content(store.image_json_path(uuid))
json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
except (IOError, AttributeError):
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
@ -144,7 +144,7 @@ def put_image_layer(namespace, repository, image_id):
profile.debug('Retrieving image path info')
layer_path = store.image_layer_path(uuid)
if (store.exists(layer_path) and not
if (store.exists(repo_image.storage.locations, layer_path) and not
image_is_uploading(repo_image)):
exact_abort(409, 'Image already exists')
@ -163,7 +163,7 @@ def put_image_layer(namespace, repository, image_id):
sr.add_handler(store_hndlr)
h, sum_hndlr = checksums.simple_checksum_handler(json_data)
sr.add_handler(sum_hndlr)
store.stream_write(layer_path, sr)
store.stream_write(repo_image.storage.locations, layer_path, sr)
csums.append('sha256:{0}'.format(h.hexdigest()))
try:
@ -231,7 +231,7 @@ def put_image_checksum(namespace, repository, image_id):
uuid = repo_image.storage.uuid
profile.debug('Looking up repo layer data')
if not store.exists(store.image_json_path(uuid)):
if not store.exists(repo_image.storage.locations, store.image_json_path(uuid)):
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
profile.debug('Marking image path')
@ -283,7 +283,8 @@ def get_image_json(namespace, repository, image_id, headers):
profile.debug('Looking up repo layer data')
try:
data = store.get_content(store.image_json_path(repo_image.storage.uuid))
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
except (IOError, AttributeError):
flask_abort(404)
@ -313,7 +314,8 @@ def get_image_ancestry(namespace, repository, image_id, headers):
profile.debug('Looking up image data')
try:
data = store.get_content(store.image_ancestry_path(repo_image.storage.uuid))
uuid = repo_image.storage.uuid
data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
except (IOError, AttributeError):
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
@ -326,17 +328,15 @@ def get_image_ancestry(namespace, repository, image_id, headers):
return response
def generate_ancestry(image_id, uuid, parent_id=None,
parent_uuid=None):
def generate_ancestry(image_id, uuid, locations, parent_id=None, parent_uuid=None,
parent_locations=None):
if not parent_id:
store.put_content(store.image_ancestry_path(uuid),
json.dumps([image_id]))
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps([image_id]))
return
data = store.get_content(store.image_ancestry_path(parent_uuid))
data = store.get_content(parent_locations, store.image_ancestry_path(parent_uuid))
data = json.loads(data)
data.insert(0, image_id)
store.put_content(store.image_ancestry_path(uuid),
json.dumps(data))
store.put_content(locations, store.image_ancestry_path(uuid), json.dumps(data))
def store_checksum(image_storage, checksum):
@ -393,7 +393,7 @@ def put_image_json(namespace, repository, image_id):
profile.debug('Looking up parent image data')
if (parent_id and not
store.exists(store.image_json_path(parent_uuid))):
store.exists(parent_image.storage.locations, store.image_json_path(parent_uuid))):
abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s',
issue='invalid-request', image_id=image_id, parent_id=parent_id)
@ -401,7 +401,7 @@ def put_image_json(namespace, repository, image_id):
json_path = store.image_json_path(uuid)
profile.debug('Checking if image already exists')
if (store.exists(json_path) and not
if (store.exists(repo_image.storage.locations, json_path) and not
image_is_uploading(repo_image)):
exact_abort(409, 'Image already exists')
@ -424,10 +424,11 @@ def put_image_json(namespace, repository, image_id):
parent_image)
profile.debug('Putting json path')
store.put_content(json_path, request.data)
store.put_content(repo_image.storage.locations, json_path, request.data)
profile.debug('Generating image ancestry')
generate_ancestry(image_id, uuid, parent_id, parent_uuid)
generate_ancestry(image_id, uuid, repo_image.storage.locations, parent_id, parent_uuid,
parent_image.storage.locations)
profile.debug('Done')
return make_response('true', 200)
@ -442,7 +443,7 @@ def process_image_changes(namespace, repository, image_id):
image_diffs_path = store.image_file_diffs_path(uuid)
image_trie_path = store.image_file_trie_path(uuid)
if store.exists(image_diffs_path):
if store.exists(repo_image.storage.locations, image_diffs_path):
logger.debug('Diffs already exist for image: %s' % image_id)
return image_trie_path
@ -452,18 +453,18 @@ def process_image_changes(namespace, repository, image_id):
# Compute the diffs and fs for the parent first if necessary
parent_trie_path = None
if parents:
parent_trie_path = process_image_changes(namespace, repository,
parent_trie_path, parent_locations = process_image_changes(namespace, repository,
parents[-1].docker_image_id)
# Read in the collapsed layer state of the filesystem for the parent
parent_trie = changes.empty_fs()
if parent_trie_path:
parent_trie_bytes = store.get_content(parent_trie_path)
parent_trie_bytes = store.get_content(parent_locations, parent_trie_path)
parent_trie.frombytes(parent_trie_bytes)
# Read in the file entries from the layer tar file
layer_path = store.image_layer_path(uuid)
with store.stream_read_file(layer_path) as layer_tar_stream:
with store.stream_read_file(image.storage.locations, layer_path) as layer_tar_stream:
removed_files = set()
layer_files = changes.files_and_dirs_from_tar(layer_tar_stream,
removed_files)
@ -473,7 +474,7 @@ def process_image_changes(namespace, repository, image_id):
(new_trie, added, changed, removed) = new_metadata
# Write out the new trie
store.put_content(image_trie_path, new_trie.tobytes())
store.put_content(image.storage.locations, image_trie_path, new_trie.tobytes())
# Write out the diffs
diffs = {}
@ -481,6 +482,6 @@ def process_image_changes(namespace, repository, image_id):
for section, source_trie in zip(sections, new_metadata[1:]):
diffs[section] = list(source_trie)
diffs[section].sort()
store.put_content(image_diffs_path, json.dumps(diffs, indent=2))
store.put_content(image.storage.locations, image_diffs_path, json.dumps(diffs, indent=2))
return image_trie_path
return image_trie_path, image.storage.locations

View file

@ -67,8 +67,8 @@ def __create_subtree(repo, structure, creator_username, parent):
logger.debug('new docker id: %s' % docker_image_id)
checksum = __gen_checksum(docker_image_id)
new_image = model.find_create_or_link_image(docker_image_id, repo, None,
{})
new_image = model.find_create_or_link_image(docker_image_id, repo, None, {}, 'local_us')
new_image_locations = new_image.storage.locations
new_image.storage.uuid = IMAGE_UUIDS[image_num % len(IMAGE_UUIDS)]
new_image.storage.uploading = False
new_image.storage.checksum = checksum
@ -89,7 +89,7 @@ def __create_subtree(repo, structure, creator_username, parent):
source_diff = SAMPLE_DIFFS[image_num % len(SAMPLE_DIFFS)]
with open(source_diff, 'r') as source_file:
store.stream_write(diff_path, source_file)
store.stream_write(new_image_locations, diff_path, source_file)
parent = new_image
@ -235,6 +235,9 @@ def initialize_database():
NotificationKind.create(name='test_notification')
ImageStorageLocation.create(name='local_eu')
ImageStorageLocation.create(name='local_us')
def wipe_database():
logger.debug('Wiping all data from the DB.')

View file

@ -1,6 +1,7 @@
from storage.local import LocalStorage
from storage.s3 import S3Storage
from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
class Storage(object):
@ -12,25 +13,32 @@ class Storage(object):
self.state = None
def init_app(self, app):
storage_type = app.config.get('STORAGE_TYPE', 'LocalStorage')
path = app.config.get('STORAGE_PATH', '')
# storage_type = app.config.get('STORAGE_TYPE', 'LocalStorage')
# path = app.config.get('STORAGE_PATH', '')
if storage_type == 'LocalStorage':
storage = LocalStorage(path)
elif storage_type == 'S3Storage':
access_key = app.config.get('STORAGE_AWS_ACCESS_KEY', '')
secret_key = app.config.get('STORAGE_AWS_SECRET_KEY', '')
bucket = app.config.get('STORAGE_S3_BUCKET', '')
storage = S3Storage(path, access_key, secret_key, bucket)
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
driver = storage_params[0]
if driver == 'LocalStorage':
storage = LocalStorage(*storage_params[1:])
elif driver == 'S3Storage':
storage = S3Storage(*storage_params[1:])
else:
storage = FakeStorage()
storages[location] = storage
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
if not preference:
preference = storages.keys()
d_storage = DistributedStorage(storages, preference)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['storage'] = storage
return storage
app.extensions['storage'] = d_storage
return d_storage
def __getattr__(self, name):
return getattr(self.state, name, None)

View file

@ -1,33 +1,8 @@
import tempfile
class BaseStorage(object):
"""Storage is organized as follow:
$ROOT/images/<image_id>/json
$ROOT/images/<image_id>/layer
$ROOT/repositories/<namespace>/<repository_name>/<tag_name>
"""
# Useful if we want to change those locations later without rewriting
# the code which uses Storage
repositories = 'repositories'
images = 'images'
class StoragePaths(object):
shared_images = 'sharedimages'
# Set the IO buffer to 64kB
buffer_size = 64 * 1024
@staticmethod
def temp_store_handler():
tmpf = tempfile.TemporaryFile()
def fn(buf):
try:
tmpf.write(buf)
except IOError:
pass
return tmpf, fn
def image_path(self, storage_uuid):
return '{0}/{1}/'.format(self.shared_images, storage_uuid)
@ -52,6 +27,33 @@ class BaseStorage(object):
base_path = self.image_path(storage_uuid)
return '{0}diffs.json'.format(base_path)
class BaseStorage(StoragePaths):
"""Storage is organized as follow:
$ROOT/images/<image_id>/json
$ROOT/images/<image_id>/layer
$ROOT/repositories/<namespace>/<repository_name>/<tag_name>
"""
# Useful if we want to change those locations later without rewriting
# the code which uses Storage
repositories = 'repositories'
images = 'images'
# Set the IO buffer to 64kB
buffer_size = 64 * 1024
@staticmethod
def temp_store_handler():
tmpf = tempfile.TemporaryFile()
def fn(buf):
try:
tmpf.write(buf)
except IOError:
pass
return tmpf, fn
def get_direct_download_url(self, path, expires_in=60):
return None

View file

@ -0,0 +1,41 @@
import random
import logging
from functools import wraps
from storage.basestorage import StoragePaths, BaseStorage
logger = logging.getLogger(__name__)
def _location_aware(unbound_func):
@wraps(unbound_func)
def wrapper(self, locations, *args, **kwargs):
storage = None
for preferred in self.preferred_locations:
if preferred in locations:
storage = self._storages[preferred]
if not storage:
storage = self._storages[random.sample(locations, 1)[0]]
storage_func = getattr(storage, unbound_func.__name__)
return storage_func(*args, **kwargs)
return wrapper
class DistributedStorage(StoragePaths):
def __init__(self, storages, preferred_locations=[]):
self._storages = dict(storages)
self.preferred_locations = list(preferred_locations)
get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url)
get_content = _location_aware(BaseStorage.get_content)
put_content = _location_aware(BaseStorage.put_content)
stream_read = _location_aware(BaseStorage.stream_read)
stream_read_file = _location_aware(BaseStorage.stream_read_file)
stream_write = _location_aware(BaseStorage.stream_write)
list_directory = _location_aware(BaseStorage.list_directory)
exists = _location_aware(BaseStorage.exists)
remove = _location_aware(BaseStorage.remove)

Binary file not shown.

View file

@ -3,7 +3,7 @@ import json as py_json
from flask import url_for
from endpoints.api import api
from app import app
from app import app, storage
from initdb import setup_database_for_testing, finished_database_for_testing
from data import model
@ -43,7 +43,9 @@ class TestImageSharing(unittest.TestCase):
def createStorage(self, docker_image_id, repository=REPO, username=ADMIN_ACCESS_USER):
repository_obj = model.get_repository(repository.split('/')[0], repository.split('/')[1])
image = model.find_create_or_link_image(docker_image_id, repository_obj, username, {})
preferred = storage.preferred_locations[0]
image = model.find_create_or_link_image(docker_image_id, repository_obj, username, {},
preferred)
return image.storage.id
def assertSameStorage(self, docker_image_id, storage_id, repository=REPO, username=ADMIN_ACCESS_USER):

View file

@ -24,7 +24,8 @@ class TestConfig(DefaultConfig):
DB_TRANSACTION_FACTORY = create_transaction
STORAGE_TYPE = 'FakeStorage'
DISTRIBUTED_STORAGE_CONFIG = {'local_us': ['FakeStorage']}
DISTRIBUTED_STORAGE_PREFERENCE = ['local_us']
BUILDLOGS_MODULE_AND_CLASS = ('test.testlogs', 'testlogs.TestBuildLogs')
BUILDLOGS_OPTIONS = ['devtable', 'building', 'deadbeef-dead-beef-dead-beefdeadbeef']