Merge remote-tracking branch 'upstream/master' into python-registry-v2

This commit is contained in:
Jake Moshenko 2015-11-11 16:41:40 -05:00
commit ab340e20ea
43 changed files with 862 additions and 248 deletions

View file

@ -1,3 +1,7 @@
### v1.13.3
- Fixed backfill for migration (#846)
### v1.13.2
- Fixed 404 API calls redirecting to 404 page (#762)

View file

@ -43,7 +43,7 @@ ADD conf/init/doupdatelimits.sh /etc/my_init.d/
ADD conf/init/copy_syslog_config.sh /etc/my_init.d/
ADD conf/init/runmigration.sh /etc/my_init.d/
ADD conf/init/syslog-ng.conf /etc/syslog-ng/
ADD conf/init/zz_release.sh /etc/my_init.d/
ADD conf/init/zz_boot.sh /etc/my_init.d/
ADD conf/init/service/ /etc/service/

View file

@ -24,7 +24,7 @@ sqlalchemy.url = sqlite:///will/be/overridden
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
keys = root,sqlalchemy,alembic,peewee,boto
[handlers]
keys = console
@ -33,10 +33,20 @@ keys = console
keys = generic
[logger_root]
level = WARN
level = DEBUG
handlers = console
qualname =
[logger_peewee]
level = WARN
handlers =
qualname = peewee
[logger_boto]
level = WARN
handlers =
qualname = boto
[logger_sqlalchemy]
level = WARN
handlers =
@ -54,5 +64,5 @@ level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

7
app.py
View file

@ -79,11 +79,14 @@ else:
config_provider.update_app_config(app.config)
# Update any configuration found in the override environment variable.
OVERRIDE_CONFIG_KEY = 'QUAY_OVERRIDE_CONFIG'
environ_config = json.loads(os.environ.get(OVERRIDE_CONFIG_KEY, '{}'))
app.config.update(environ_config)
# Allow user to define a custom storage preference for the local instance.
_distributed_storage_preference = os.environ.get('QUAY_DISTRIBUTED_STORAGE_PREFERENCE', '').split()
if _distributed_storage_preference:
app.config['DISTRIBUTED_STORAGE_PREFERENCE'] = _distributed_storage_preference
class RequestWithId(Request):
request_gen = staticmethod(urn_generator(['request']))

20
boot.py Normal file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env python
import release
from app import app
from data.model.release import set_region_release
from util.config.database import sync_database_with_config
def main():
if app.config.get('SETUP_COMPLETE', False):
sync_database_with_config(app.config)
# Record deploy
if release.REGION and release.GIT_HEAD:
set_region_release(release.SERVICE, release.REGION, release.GIT_HEAD)
if __name__ == '__main__':
main()

View file

@ -6,7 +6,7 @@ ssh_authorized_keys:
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ jakemoshenko
- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= quentin
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDI7LtxLItapmUbt3Gs+4Oxa1i22fkx1+aJDkAjiRWPSX3+cxOzuPfHX9uFzr+qj5hy4J7ErrPp8q9alu+il9lE26GQuUxOZiaUrXu4dRCXXdCqTHARWBxGUXjkxdMp2HIzFpBxmVqcRubrgM36LBzKapdDOqQdz7XnNm5Jmf0tH/N0+TgV60P0WVY1CxmTya+JHNFVgazhd+oIGEhTyW/eszMGcFUgZet7DQFytYIQXYSwwGpGdJ+0InKAJ2SzCt/yuUlSrhrVM8vSGeami1XYmgQiyth1zjteMd8uTrc9NREH7bZTNcMFBqVYE3BYQWGRrv8pMMgP9gxgLbxtVsUl barakmich-titania
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQDUWB4aSjSRHCz5/6H9/EJhJVvRmPThvEzyHinaWPsuM9prBSLci9NF9WneVl30nczkvllA+w34kycdrS3fKpjTbODaEOLHBobWl3bccY0I6kr86q5z67NZffjCm/P/RL+dBaOiBWS8PV8oiDF1P6YdMo8Jk46n9fozmLCXHUuCw5BJ8PGjQqbsEzA3qFMeKZYdJHOizOfeIfKfCWYrrumVRY9v6SAUDoFOl4PZEM7QdGp9EoRYb9MNLgKLnZ4RjbcLoFwiqxY4KEM4zfjZPNOECiLCuJqvHM2QawwuO1klJ16HpJk+FzOTWQoZtT47LoE/XNSOcNtAOiD+OQ449ia1EArhm7+1DnLXvHXKIl1JtuqJz+wFCsbNSdB7P562OHAGRIxYK3DfE+0CZH1BeHYl7xiRBeCtZ+OZMIocqeJtq8taIS7Un5wnGcQWxFtQnr/f65EgbIi7G2dxPcjhr6K+GWYezsiReVVKnIClq2MHhABG9QOncKDIa47L3nyx3pm4ZfMbC2jmnK2pFgGGSfYDy4487JnAUOG1mzZ9vm4gDhatT+vZFSBOwv1e4CErBh/wYXooF5I0nGmE6y6zkKFqP+ZolJ6iXmXQ7Ea2oaGeyaprweBjkhHgghi4KbwKbClope4Zo9X9JJYBLQSW33sEEuy8MlSBpdZAbz9t/FvJaw== mjibson
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQCxfToKI6APRz7uJF7cmb45k3rVE2FoDHI+NgDZ5nZP/QeuP5VjPwGjRXIaAIWJjF1tH3xJ02MIMb0gzWECdPOeU7Ft26xa7VybFDmog81zxq/h64AINhjZuT0qilr1Y8ojv+m1HOdPa3LNSRTGUZNVpotV8LVcyjKGax1uWlyQUmlrNAe1iNIEghGJiihj2ctsuNLfUQ5IosvOjzNpbbOj6fI2ByVAPZVL6eBvMWggTgQxKaZKAzhAoZW/IMUCuvQB0MKLdXDBkcMZx/09pb1R20RAKRZOOlVgFvyFbgjmTtjGOjmonxNzmKRcxOvmQQtrespDghACin2mOxsdvQ8wbeOKMnay2HXSK6FITHOaoJhSBjFz2elqahd49VRdKsWHKdvAs+UeaC5lTKMJULckM2Ra6HJqI7xzyAQvCcJqjtslrzhPCA7mpW8YPygZFKe4oqcqp5yYgm7XjsUOORDmqRN+K3gttaw/rPLMhHKbInLHj0VzGk1tyNTXbsWqZL4rLmjd/oNU3XuoMYSalNKkp1y967mGAYOlT2JdrXAusuzpr5QJVTDEa7GMY7gD4WP8EOqfg7U/9xCJosPpGilA5pp6bge/I3RURsYQT/CGg0nWjjZ5nKlYh8u4oHgBfM3RGIw5bBQuer5e2L3RNVlka+MdEOVQGA4UEKl6E+Vs5Q== matt.jibson@gmail.com
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDiNawWSZL2MF99zwG9cFjGmML6agsKwaacQEoTsjcjHGixyUnqHXaLdrGma5i/uphZPkI5XRBKiuIROACY/aRoIxJUpV7AQ1Zx87cILx6fDVePvU5lW2DdhlCDUdwjuzDb/WO/c/qMWjOPqRG4q8XvB7nhuORMMgdpDXWVH4LXPmFez1iIBCKNk04l6Se7wiEOQjaBnTDiBDYlWD78r6RdiAU5eIxpq+lKBDTcET0vegwcA/WE4YOlYBbOrgtHrgwWqG/pXxUu77aapDOmfjtDrgim6XP5kEnytg5gCaN9iLvIpT8b1wD/1Z+LoNSZg6m9gkcC2yTRI0apOBa2G8lz silas@pro.local
write_files:

3
conf/init/zz_boot.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/bash
/venv/bin/python /boot.py

View file

@ -1,8 +0,0 @@
#!/bin/bash
set -e
source venv/bin/activate
export PYTHONPATH=.
python /release.py

View file

@ -19,7 +19,7 @@ class RedisBuildLogs(object):
def __init__(self, redis_config):
args = dict(redis_config)
args.update({'socket_connect_timeout': 5})
args.update({'socket_connect_timeout': 5, 'socket_timeout': 5})
self._redis_config = redis_config
self._redis = redis.StrictRedis(**args)

View file

@ -574,7 +574,7 @@ class Image(BaseModel):
security_indexed = BooleanField(default=False)
security_indexed_engine = IntegerField(default=-1)
parent = ForeignKeyField('self', index=True, null=True, related_name='children')
parent_id = IntegerField(index=True, null=True)
class Meta:
database = db

View file

@ -1,7 +1,9 @@
"""add support for quay's security indexer
Revision ID: 57dad559ff2d
Revises: 154f2befdfbe
Create Date: 2015-07-13 16:51:41.669249
"""
# revision identifiers, used by Alembic.
@ -17,14 +19,13 @@ def upgrade(tables):
op.add_column('image', sa.Column('security_indexed', sa.Boolean(), nullable=False, default=False, server_default=sa.sql.expression.false()))
op.add_column('image', sa.Column('security_indexed_engine', sa.Integer(), nullable=False, default=-1, server_default="-1"))
op.create_index('image_parent_id', 'image', ['parent_id'], unique=False)
op.create_foreign_key(op.f('fk_image_parent_id_image'), 'image', 'image', ['parent_id'], ['id'])
### end Alembic commands ###
op.create_index('image_security_indexed_engine_security_indexed', 'image', ['security_indexed_engine', 'security_indexed'])
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_index('image_security_indexed_engine_security_indexed', 'image')
op.drop_constraint(op.f('fk_image_parent_id_image'), 'image', type_='foreignkey')
op.drop_index('image_parent_id', table_name='image')
op.drop_column('image', 'security_indexed')
op.drop_column('image', 'security_indexed_engine')

View file

@ -49,7 +49,7 @@ def get_repo_image(namespace_name, repository_name, docker_image_id):
def get_repo_image_extended(namespace_name, repository_name, docker_image_id):
def limit_to_image_id(query):
return query.where(Image.docker_image_id == docker_image_id).limit(1)
return query.where(Image.docker_image_id == docker_image_id)
images = get_repository_images_base(namespace_name, repository_name, limit_to_image_id)
if not images:
@ -195,14 +195,21 @@ def _find_or_link_image(existing_image, repo_obj, username, translations, prefer
copied_storage.locations = {placement.location.name
for placement in copied_storage.imagestorageplacement_set}
translated_parent_id = None
if new_image_ancestry != '/':
translated_parent_id = int(new_image_ancestry.split('/')[-2])
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
repository=repo_obj, storage=copied_storage,
repository=repo_obj,
storage=copied_storage,
ancestors=new_image_ancestry,
command=existing_image.command,
created=existing_image.created,
comment=existing_image.comment,
v1_json_metadata=existing_image.v1_json_metadata,
aggregate_size=existing_image.aggregate_size)
aggregate_size=existing_image.aggregate_size,
parent_id=translated_parent_id,
v1_checksum=existing_image.v1_checksum)
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
@ -301,7 +308,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
if parent:
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
fetched.parent = parent
fetched.parent_id = parent.id
fetched.save()
return fetched
@ -417,3 +424,19 @@ def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str,
return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment,
command=command, v1_json_metadata=v1_json_metadata, created=created,
storage=image_storage, repository=repo)
def ensure_image_locations(*names):
with db_transaction():
locations = ImageStorageLocation.select().where(ImageStorageLocation.name << names)
insert_names = list(names)
for location in locations:
insert_names.remove(location.name)
if not insert_names:
return
data = [{'name': name} for name in insert_names]
ImageStorageLocation.insert_many(data).execute()

View file

@ -137,12 +137,12 @@ def garbage_collect_repo(repo):
# iterable of tuples containing [(k, v), (k, v), ...]
all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo).tuples()
images_to_storages = dict(all_repo_images)
to_remove = set(images_to_storages.keys()).difference(referenced_ancestors)
to_remove = list(set(images_to_storages.keys()).difference(referenced_ancestors))
if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove}
Image.delete().where(Image.id << list(to_remove)).execute()
Image.delete().where(Image.id << to_remove).execute()
if len(to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', to_remove)

View file

@ -18,15 +18,14 @@ import features
import uuid
import json
def lookup_allowed_private_repos(namespace):
""" Returns false if the given namespace has used its allotment of private repositories. """
# Lookup the namespace and verify it has a subscription.
def get_namespace_plan(namespace):
""" Returns the plan of the given namespace. """
namespace_user = model.user.get_namespace_user(namespace)
if namespace_user is None:
return False
return None
if not namespace_user.stripe_id:
return False
return None
# Ask Stripe for the subscribed plan.
# TODO: Can we cache this or make it faster somehow?
@ -36,14 +35,20 @@ def lookup_allowed_private_repos(namespace):
abort(503, message='Cannot contact Stripe')
if not cus.subscription:
return None
return get_plan(cus.subscription.plan.id)
def lookup_allowed_private_repos(namespace):
""" Returns false if the given namespace has used its allotment of private repositories. """
current_plan = get_namespace_plan(namespace)
if current_plan is None:
return False
# Find the number of private repositories used by the namespace and compare it to the
# plan subscribed.
private_repos = model.user.get_private_repo_count(namespace)
current_plan = get_plan(cus.subscription.plan.id)
if current_plan is None:
return False
return private_repos < current_plan['privateRepos']

View file

@ -17,7 +17,8 @@ from endpoints.api import (truthy_bool, format_date, nickname, log_action, valid
RepositoryParamResource, resource, query_param, parse_args, ApiResource,
request_error, require_scope, Unauthorized, NotFound, InvalidRequest,
path_param, ExceedsLicenseException)
from endpoints.api.billing import lookup_allowed_private_repos
from endpoints.api.billing import lookup_allowed_private_repos, get_namespace_plan
from endpoints.common import check_repository_usage
from auth.permissions import (ModifyRepositoryPermission, AdministerRepositoryPermission,
CreateRepositoryPermission)
@ -329,6 +330,9 @@ class Repository(RepositoryParamResource):
def delete(self, namespace, repository):
""" Delete a repository. """
model.repository.purge_repository(namespace, repository)
user = model.user.get_namespace_user(namespace)
plan = get_namespace_plan(namespace)
check_repository_usage(user, plan)
log_action('delete_repo', namespace,
{'repo': repository, 'namespace': namespace})
return 'Deleted', 204

View file

@ -16,6 +16,7 @@ from auth.permissions import SuperUserPermission
from auth.auth_context import get_authenticated_user
from data.database import User
from util.config.configutil import add_enterprise_config_defaults
from util.config.database import sync_database_with_config
from util.config.validator import validate_service_for_config, CONFIG_FILENAMES
from data.runmigration import run_alembic_migration
from data.users import get_federated_service_name
@ -216,6 +217,9 @@ class SuperUserConfig(ApiResource):
current_user = get_authenticated_user()
model.user.confirm_attached_federated_login(current_user, service_name)
# Ensure database is up-to-date with config
sync_database_with_config(config_object)
return {
'exists': True,
'config': config_object

View file

@ -643,6 +643,7 @@ class Recovery(ApiResource):
}
@nickname('requestRecoveryEmail')
@anon_allowed
@validate_json_request('RequestRecovery')
def post(self):
""" Request a password recovery email."""

View file

@ -200,10 +200,13 @@ def render_page_template(name, **kwargs):
def check_repository_usage(user_or_org, plan_found):
private_repos = model.user.get_private_repo_count(user_or_org.username)
if plan_found is None:
repos_allowed = 0
else:
repos_allowed = plan_found['privateRepos']
if private_repos > repos_allowed:
model.notification.create_notification('over_private_usage', user_or_org,
model.notification.create_unique_notification('over_private_usage', user_or_org,
{'namespace': user_or_org.username})
else:
model.notification.delete_notifications_by_kind(user_or_org, 'over_private_usage')

View file

@ -19,7 +19,7 @@ from util.invoice import renderInvoiceToPdf
from util.seo import render_snapshot
from util.cache import no_cache
from endpoints.common import common_login, render_page_template, route_show_if, param_required
from endpoints.decorators import anon_protect
from endpoints.decorators import anon_protect, anon_allowed
from endpoints.csrf import csrf_protect, generate_csrf_token, verify_csrf
from buildtrigger.customhandler import CustomBuildTrigger
@ -366,6 +366,7 @@ def confirm_repo_email():
@web.route('/confirm', methods=['GET'])
@route_show_if(features.MAILING)
@anon_allowed
def confirm_email():
code = request.values['code']
user = None
@ -386,6 +387,8 @@ def confirm_email():
@web.route('/recovery', methods=['GET'])
@route_show_if(features.MAILING)
@anon_allowed
def confirm_recovery():
code = request.values['code']
user = model.user.validate_reset_code(code)

View file

@ -6,7 +6,6 @@ import calendar
import os
import argparse
from sys import maxsize
from datetime import datetime, timedelta
from peewee import (SqliteDatabase, create_model_tables, drop_model_tables, savepoint_sqlite,
savepoint)
@ -98,7 +97,7 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map):
store.put_content('local_us', path, checksum)
new_image.security_indexed = False
new_image.security_indexed_engine = maxsize
new_image.security_indexed_engine = -1
new_image.save()
creation_time = REFERENCE_DATE + timedelta(weeks=image_num) + timedelta(days=model_num)

View file

@ -12,15 +12,3 @@ REGION = os.environ.get('QUAY_REGION')
if os.path.isfile(_GIT_HEAD_PATH):
with open(_GIT_HEAD_PATH) as f:
GIT_HEAD = f.read().strip()
def main():
from app import app
from data.model.release import set_region_release
if REGION and GIT_HEAD:
set_region_release(SERVICE, REGION, GIT_HEAD)
if __name__ == '__main__':
main()

View file

@ -56,3 +56,4 @@ toposort
pyjwkest
rfc3987
jsonpath-rw
bintrees

View file

@ -4,6 +4,7 @@ APScheduler==3.0.3
autobahn==0.9.3-3
Babel==1.3
beautifulsoup4==4.4.0
bintrees==2.0.2
blinker==1.3
boto==2.38.0
cachetools==1.0.3

View file

@ -85,16 +85,13 @@
}
.landing-content {
z-index: 4;
z-index: 1;
text-align: center;
padding-left: 20px;
padding-right: 20px;
position: relative;
}
.landing-content {
text-align: center;
}
.landing-content .works-with img {
max-height: 28px;
margin-left: 30px;

View file

@ -23,3 +23,29 @@
.super-user .user-row.disabled .avatar {
-webkit-filter: grayscale(100%);
}
.super-user td .co-alert {
margin: 16px 0 0 0;
}
.super-user .add-storage-link {
margin-top: 5px;
}
.super-user .storage-config {
border-bottom: 1px solid #eee;
padding: 0 0 10px 0;
margin: 10px 0 0 0;
}
.super-user .last {
border-bottom: none;
}
.super-user .feature-storage-replication {
margin: 15px 0 10px 0;
}
.super-user .input-util {
margin-top: 10px;
}

View file

@ -1155,7 +1155,7 @@ i.toggle-icon:hover {
form input.ng-invalid.ng-dirty,
*[ng-form] input.ng-invalid.ng-dirty {
background-color: #FDD7D9;
background-color: #FDD7D9 !important;
}
form input.ng-valid.ng-dirty,

View file

@ -194,42 +194,78 @@
<strong>A remote storage system is required for high-avaliability systems.</strong>
</p>
<div class="co-checkbox feature-storage-replication">
<input id="ftsr" type="checkbox" ng-model="config.FEATURE_STORAGE_REPLICATION">
<label for="ftsr">Enable Storage Replication</label>
<!-- TODO(josephschorr): add link to documentation -->
<div class="help-text">If enabled, replicates storage to other regions.</div>
</div>
<div class="storage-config" ng-class="$last ? 'last' : ''" ng-repeat="sc in storageConfig">
<table class="config-table">
<tr>
<td class="non-input">Location ID:</td>
<td>
<input class="form-control" ng-if="allowChangeLocationStorageConfig(sc.location)" ng-class="storageConfigError[$index].location ? 'ng-invalid' : ''" ng-model="sc.location" ng-pattern="/^[a-zA-Z0-9_-]+$/" required>
<div ng-if="!allowChangeLocationStorageConfig(sc.location)">
{{ sc.location }}
</div>
<div class="co-alert co-alert-danger" ng-show="storageConfigError[$index].location">
{{ storageConfigError[$index].location }}
</div>
<div class="input-util" ng-if="allowRemoveStorageConfig(sc.location)"><a href="javascript:void(0)" class="remove-link" ng-click="removeStorageConfig(sc)">Remove</a></div>
</td>
</tr>
<tr ng-if="config.FEATURE_STORAGE_REPLICATION">
<td class="non-input">Set Default:</td>
<td>
<div class="co-checkbox">
<input id="default-location-{{ $index }}" ng-model="sc.defaultLocation" type="checkbox">
<label for="default-location-{{ $index }}">Replicate to storage engine by default</label>
</div>
</td>
</tr>
<tr>
<td class="non-input">Storage Engine:</td>
<td>
<select ng-model="config.DISTRIBUTED_STORAGE_CONFIG.local[0]">
<select class="form-control" ng-class="storageConfigError[$index].engine ? 'ng-invalid' : ''" ng-model="sc.data[0]">
<option value="LocalStorage">Locally mounted directory</option>
<option value="S3Storage">Amazon S3</option>
<option value="GoogleCloudStorage">Google Cloud Storage</option>
<option value="RadosGWStorage">Ceph Object Gateway (RADOS)</option>
<option value="SwiftStorage">OpenStack Storage (Swift)</option>
</select>
<div class="co-alert co-alert-danger" ng-if="storageConfigError[$index].engine">
{{ storageConfigError[$index].engine }}
</div>
</td>
</tr>
<!-- Fields -->
<tr ng-repeat="field in STORAGE_CONFIG_FIELDS[config.DISTRIBUTED_STORAGE_CONFIG.local[0]]">
<tr ng-repeat="field in STORAGE_CONFIG_FIELDS[sc.data[0]]">
<td>{{ field.title }}:</td>
<td>
<span class="config-map-field"
binding="config.DISTRIBUTED_STORAGE_CONFIG.local[1][field.name]"
binding="sc.data[1][field.name]"
ng-if="field.kind == 'map'"
keys="field.keys"></span>
<span class="config-string-field"
binding="config.DISTRIBUTED_STORAGE_CONFIG.local[1][field.name]"
binding="sc.data[1][field.name]"
placeholder="{{ field.placeholder }}"
ng-if="field.kind == 'text'"
is-optional="field.optional"></span>
<div class="co-checkbox" ng-if="field.kind == 'bool'">
<input id="dsc-{{ field.name }}" type="checkbox"
ng-model="config.DISTRIBUTED_STORAGE_CONFIG.local[1][field.name]">
ng-model="sc.data[1][field.name]">
<label for="dsc-{{ field.name }}">{{ field.placeholder }}</label>
</div>
<div ng-if="field.kind == 'option'">
<select ng-model="config.DISTRIBUTED_STORAGE_CONFIG.local[1][field.name]">
<select ng-model="sc.data[1][field.name]">
<option ng-repeat="value in field.values" value="{{ value }}"
ng-selected="config.DISTRIBUTED_STORAGE_CONFIG.local[1][field.name] == value">{{ value }}</option>
ng-selected="sc.data[1][field.name] == value">{{ value }}</option>
</select>
</div>
<div class="help-text" ng-if="field.help_text">
@ -241,7 +277,11 @@
</td>
</tr>
</table>
</div>
<div class="add-storage-link" ng-if="canAddStorageConfig()">
<a href="javascript:void(0)" ng-click="addStorageConfig()">Add Additional Storage Engine</a>
</div>
</div>
</div>
</div>

View file

@ -53,7 +53,7 @@
</span>
<span class="co-filter-box">
<span class="page-controls" total-count="tags.length" current-page="options.page" page-size="50"></span>
<span class="page-controls" total-count="tags.length" current-page="options.page" page-size="tagsPerPage"></span>
<input class="form-control" type="text" ng-model="options.tagFilter" placeholder="Filter Tags...">
</span>
</div>

View file

@ -185,12 +185,17 @@ angular.module("core-config-setup", ['angularFileUpload'])
$scope.checkValidateAndSave = function() {
if ($scope.configform.$valid) {
saveStorageConfig();
$scope.validateAndSave();
return;
}
$element.find("input.ng-invalid:first")[0].scrollIntoView();
$element.find("input.ng-invalid:first").focus();
var query = $element.find("input.ng-invalid:first");
if (query && query.length) {
query[0].scrollIntoView();
query.focus();
}
};
$scope.validateAndSave = function() {
@ -277,6 +282,99 @@ angular.module("core-config-setup", ['angularFileUpload'])
}, ApiService.errorDisplay('Could not save configuration. Please report this error.'));
};
// Convert storage config to an array
var initializeStorageConfig = function($scope) {
var config = $scope.config.DISTRIBUTED_STORAGE_CONFIG || {};
var defaultLocations = $scope.config.DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS || [];
var preference = $scope.config.DISTRIBUTED_STORAGE_PREFERENCE || [];
$scope.serverStorageConfig = angular.copy(config);
$scope.storageConfig = [];
Object.keys(config).forEach(function(location) {
$scope.storageConfig.push({
location: location,
defaultLocation: defaultLocations.indexOf(location) >= 0,
data: angular.copy(config[location]),
error: {},
});
});
if (!$scope.storageConfig.length) {
$scope.addStorageConfig('default');
return;
}
// match DISTRIBUTED_STORAGE_PREFERENCE order first, remaining are
// ordered by unicode point value
$scope.storageConfig.sort(function(a, b) {
var indexA = preference.indexOf(a.location);
var indexB = preference.indexOf(b.location);
if (indexA > -1 && indexB > -1) return indexA < indexB ? -1 : 1;
if (indexA > -1) return -1;
if (indexB > -1) return 1;
return a.location < b.location ? -1 : 1;
});
};
$scope.allowChangeLocationStorageConfig = function(location) {
if (!$scope.serverStorageConfig[location]) { return true };
// allow user to change location ID if another exists with the same ID
return $scope.storageConfig.filter(function(sc) {
return sc.location === location;
}).length >= 2;
};
$scope.allowRemoveStorageConfig = function(location) {
return $scope.storageConfig.length > 1 && $scope.allowChangeLocationStorageConfig(location);
};
$scope.canAddStorageConfig = function() {
return $scope.config &&
$scope.config.FEATURE_STORAGE_REPLICATION &&
$scope.storageConfig &&
(!$scope.storageConfig.length || $scope.storageConfig.length < 10);
};
$scope.addStorageConfig = function(location) {
var storageType = 'LocalStorage';
// Use last storage type by default
if ($scope.storageConfig.length) {
storageType = $scope.storageConfig[$scope.storageConfig.length-1].data[0];
}
$scope.storageConfig.push({
location: location || '',
defaultLocation: false,
data: [storageType, {}],
error: {},
});
};
$scope.removeStorageConfig = function(sc) {
$scope.storageConfig.splice($scope.storageConfig.indexOf(sc), 1);
};
var saveStorageConfig = function() {
var config = {};
var defaultLocations = [];
var preference = [];
$scope.storageConfig.forEach(function(sc) {
config[sc.location] = sc.data;
if (sc.defaultLocation) defaultLocations.push(sc.location);
preference.push(sc.location);
});
$scope.config.DISTRIBUTED_STORAGE_CONFIG = config;
$scope.config.DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS = defaultLocations;
$scope.config.DISTRIBUTED_STORAGE_PREFERENCE = preference;
};
var gitlabSelector = function(key) {
return function(value) {
if (!value || !$scope.config) { return; }
@ -378,18 +476,11 @@ angular.module("core-config-setup", ['angularFileUpload'])
$scope.$watch('mapped.redis.port', redisSetter('port'));
$scope.$watch('mapped.redis.password', redisSetter('password'));
// Add a watch to remove any fields not allowed by the current storage configuration.
// We have to do this otherwise extra fields (which are not allowed) can end up in the
// configuration.
$scope.$watch('config.DISTRIBUTED_STORAGE_CONFIG.local[0]', function(value) {
// Remove any fields not associated with the current kind.
if (!value || !$scope.STORAGE_CONFIG_FIELDS[value]
|| !$scope.config.DISTRIBUTED_STORAGE_CONFIG
|| !$scope.config.DISTRIBUTED_STORAGE_CONFIG.local
|| !$scope.config.DISTRIBUTED_STORAGE_CONFIG.local[1]) { return; }
var allowedFields = $scope.STORAGE_CONFIG_FIELDS[value];
var configObject = $scope.config.DISTRIBUTED_STORAGE_CONFIG.local[1];
// Remove extra extra fields (which are not allowed) from storage config.
var updateFields = function(sc) {
var type = sc.data[0];
var configObject = sc.data[1];
var allowedFields = $scope.STORAGE_CONFIG_FIELDS[type];
// Remove any fields not allowed.
for (var fieldName in configObject) {
@ -412,8 +503,53 @@ angular.module("core-config-setup", ['angularFileUpload'])
configObject[allowedFields[i].name] = configObject[allowedFields[i].name] || false;
}
}
};
// Validate and update storage config on update.
var refreshStorageConfig = function() {
if (!$scope.config || !$scope.storageConfig) return;
var locationCounts = {};
var errors = [];
var valid = true;
$scope.storageConfig.forEach(function(sc) {
// remove extra fields from storage config
updateFields(sc);
if (!locationCounts[sc.location]) locationCounts[sc.location] = 0;
locationCounts[sc.location]++;
});
// validate storage config
$scope.storageConfig.forEach(function(sc) {
var error = {};
if ($scope.config.FEATURE_STORAGE_REPLICATION && sc.data[0] === 'LocalStorage') {
error.engine = 'Replication to a locally mounted directory is unsupported as it is only accessible on a single machine.';
valid = false;
}
if (locationCounts[sc.location] > 1) {
error.location = 'Location ID must be unique.';
valid = false;
}
errors.push(error);
});
$scope.storageConfigError = errors;
$scope.configform.$setValidity('storageConfig', valid);
};
$scope.$watch('config.FEATURE_STORAGE_REPLICATION', function() {
refreshStorageConfig();
});
$scope.$watch('storageConfig', function() {
refreshStorageConfig();
}, true);
$scope.$watch('config', function(value) {
$scope.mapped['$hasChanges'] = true;
}, true);
@ -424,6 +560,7 @@ angular.module("core-config-setup", ['angularFileUpload'])
ApiService.scGetConfig().then(function(resp) {
$scope.config = resp['config'] || {};
initializeMappedLogic($scope.config);
initializeStorageConfig($scope);
$scope.mapped['$hasChanges'] = false;
}, ApiService.errorDisplay('Could not load config'));
});

View file

@ -169,6 +169,7 @@ angular.module('quay').directive('repoPanelTags', function () {
$scope.trackLineClass = function(index, track_info) {
var startIndex = $.inArray(track_info.tags[0], $scope.tags);
var endIndex = $.inArray(track_info.tags[track_info.tags.length - 1], $scope.tags);
index += $scope.options.page * $scope.tagsPerPage;
if (index == startIndex) {
return 'start';

View file

@ -55,6 +55,11 @@ angular.module('quay').factory('DataFileService', [function() {
return;
}
if (plain.byteLength == 0) {
failure();
return;
}
dataFileService.tryAsTar_(plain, success, failure);
};

View file

@ -16,6 +16,7 @@ def _location_aware(unbound_func):
for preferred in self.preferred_locations:
if preferred in locations:
storage = self._storages[preferred]
break
if not storage:
storage = self._storages[random.sample(locations, 1)[0]]
@ -26,10 +27,10 @@ def _location_aware(unbound_func):
class DistributedStorage(StoragePaths):
def __init__(self, storages, preferred_locations=[], default_locations=[]):
def __init__(self, storages, preferred_locations=None, default_locations=None):
self._storages = dict(storages)
self.preferred_locations = list(preferred_locations)
self.default_locations = list(default_locations)
self.preferred_locations = list(preferred_locations or [])
self.default_locations = list(default_locations or [])
@property
def locations(self):

View file

@ -0,0 +1,157 @@
import unittest
import logging
import random
from datetime import datetime, timedelta
from util.migrate.allocator import CompletedKeys, NoAvailableKeysError, yield_random_entries
class CompletedTestCase(unittest.TestCase):
def test_merge_blocks_operations(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertTrue(candidates.is_available(5))
self.assertTrue(candidates.is_available(0))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(4))
self.assertFalse(candidates.is_available(11))
self.assertFalse(candidates.is_available(10))
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(5, 6)
self.assertFalse(candidates.is_available(5))
self.assertTrue(candidates.is_available(6))
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(3, 8)
self.assertTrue(candidates.is_available(9))
self.assertTrue(candidates.is_available(8))
self.assertFalse(candidates.is_available(7))
self.assertEqual(1, len(candidates._slabs))
def test_adjust_max(self):
candidates = CompletedKeys(10)
self.assertEqual(0, len(candidates._slabs))
self.assertTrue(candidates.is_available(9))
candidates.mark_completed(5, 12)
self.assertEqual(0, len(candidates._slabs))
self.assertFalse(candidates.is_available(9))
self.assertTrue(candidates.is_available(4))
def test_adjust_min(self):
candidates = CompletedKeys(10)
self.assertEqual(0, len(candidates._slabs))
self.assertTrue(candidates.is_available(2))
candidates.mark_completed(0, 3)
self.assertEqual(0, len(candidates._slabs))
self.assertFalse(candidates.is_available(2))
self.assertTrue(candidates.is_available(4))
def test_inside_block(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 8)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(2, 5)
self.assertEqual(1, len(candidates._slabs))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(5))
def test_wrap_block(self):
candidates = CompletedKeys(10)
candidates.mark_completed(2, 5)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(1, 8)
self.assertEqual(1, len(candidates._slabs))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(5))
def test_non_contiguous(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertEqual(1, len(candidates._slabs))
self.assertTrue(candidates.is_available(5))
self.assertTrue(candidates.is_available(6))
candidates.mark_completed(6, 8)
self.assertEqual(2, len(candidates._slabs))
self.assertTrue(candidates.is_available(5))
self.assertFalse(candidates.is_available(6))
def test_big_merge(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(6, 8)
self.assertEqual(2, len(candidates._slabs))
candidates.mark_completed(5, 6)
self.assertEqual(1, len(candidates._slabs))
def test_range_limits(self):
candidates = CompletedKeys(10)
self.assertFalse(candidates.is_available(-1))
self.assertFalse(candidates.is_available(10))
self.assertTrue(candidates.is_available(9))
self.assertTrue(candidates.is_available(0))
def test_random_saturation(self):
candidates = CompletedKeys(100)
with self.assertRaises(NoAvailableKeysError):
for _ in range(101):
start = candidates.get_block_start_index(10)
self.assertTrue(candidates.is_available(start))
candidates.mark_completed(start, start + 10)
def test_huge_dataset(self):
candidates = CompletedKeys(1024 * 1024)
start_time = datetime.now()
iterations = 0
with self.assertRaises(NoAvailableKeysError):
while (datetime.now() - start_time) < timedelta(seconds=10):
start = candidates.get_block_start_index(1024)
self.assertTrue(candidates.is_available(start))
candidates.mark_completed(start, start + random.randint(512, 1024))
iterations += 1
self.assertGreater(iterations, 1024)
class FakeQuery(object):
def __init__(self, result_list):
self._result_list = result_list
def limit(self, *args, **kwargs):
return self
def where(self, *args, **kwargs):
return self
def __iter__(self):
return self._result_list.__iter__()
class QueryAllocatorTest(unittest.TestCase):
FAKE_PK_FIELD = 10 # Must be able to compare to integers
def test_no_work(self):
def create_empty_query():
return FakeQuery([])
for _ in yield_random_entries(create_empty_query, self.FAKE_PK_FIELD, 1, 10):
self.fail('There should never be any actual work!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()

View file

@ -32,15 +32,15 @@ def add_enterprise_config_defaults(config_obj, current_secret_key, hostname):
# Default storage configuration.
if not 'DISTRIBUTED_STORAGE_CONFIG' in config_obj:
config_obj['DISTRIBUTED_STORAGE_PREFERENCE'] = ['local']
config_obj['DISTRIBUTED_STORAGE_PREFERENCE'] = ['default']
config_obj['DISTRIBUTED_STORAGE_CONFIG'] = {
'local': ['LocalStorage', {'storage_path': '/datastorage/registry'}]
'default': ['LocalStorage', {'storage_path': '/datastorage/registry'}]
}
config_obj['USERFILES_LOCATION'] = 'local'
config_obj['USERFILES_LOCATION'] = 'default'
config_obj['USERFILES_PATH'] = 'userfiles/'
config_obj['LOG_ARCHIVE_LOCATION'] = 'local'
config_obj['LOG_ARCHIVE_LOCATION'] = 'default'
if not 'SERVER_HOSTNAME' in config_obj:
config_obj['SERVER_HOSTNAME'] = hostname

9
util/config/database.py Normal file
View file

@ -0,0 +1,9 @@
from data import model
def sync_database_with_config(config):
""" This ensures all implicitly required reference table entries exist in the database. """
location_names = config.get('DISTRIBUTED_STORAGE_CONFIG', {}).keys()
if location_names:
model.image.ensure_image_locations(*location_names)

View file

@ -30,12 +30,18 @@ JWT_FILENAMES = ['jwt-authn.cert']
CONFIG_FILENAMES = SSL_FILENAMES + DB_SSL_FILENAMES + JWT_FILENAMES
def get_storage_provider(config):
parameters = config.get('DISTRIBUTED_STORAGE_CONFIG', {}).get('local', ['LocalStorage', {}])
def get_storage_providers(config):
storage_config = config.get('DISTRIBUTED_STORAGE_CONFIG', {})
drivers = {}
try:
return get_storage_driver(parameters)
for name, parameters in storage_config.items():
drivers[name] = (parameters[0], get_storage_driver(parameters))
except TypeError:
raise Exception('Missing required storage configuration parameter(s)')
raise Exception('Missing required storage configuration parameter(s): %s' % name)
return drivers
def validate_service_for_config(service, config, password=None):
""" Attempts to validate the configuration for the given service. """
@ -80,7 +86,17 @@ def _validate_redis(config, _):
def _validate_registry_storage(config, _):
""" Validates registry storage. """
driver = get_storage_provider(config)
replication_enabled = config.get('FEATURE_STORAGE_REPLICATION', False)
providers = get_storage_providers(config).items()
if not providers:
raise Exception('Storage configuration required')
for name, (storage_type, driver) in providers:
try:
if replication_enabled and storage_type == 'LocalStorage':
raise Exception('Locally mounted directory not supported with storage replication')
# Run custom validation on the driver.
driver.validate(app.config['HTTPCLIENT'])
@ -90,10 +106,9 @@ def _validate_registry_storage(config, _):
driver.remove('_verify')
# Run setup on the driver if the read/write succeeded.
try:
driver.setup()
except Exception as ex:
raise Exception('Could not prepare storage: %s' % str(ex))
raise Exception('Invalid storage configuration: %s: %s' % (name, str(ex)))
def _validate_mailing(config, _):

View file

@ -2,7 +2,6 @@ import logging
from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
from random import shuffle
logger = logging.getLogger(__name__)
@ -21,56 +20,3 @@ class UTF8LongText(TypeDecorator):
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
else:
return dialect.type_descriptor(Text())
def _chance_duplication(pop_size, samples):
""" The chance of randomly selecting a duplicate when you choose the specified number of samples
from the specified population size.
"""
pairs = (samples * (samples - 1)) / 2.0
unique = (pop_size - 1.0)/pop_size
all_unique = pow(unique, pairs)
return 1 - all_unique
def _num_checks(pop_size, desired):
""" Binary search for the proper number of entries to use to get the specified collision
probability.
"""
s_max = pop_size
s_min = 0
last_test = -1
s_test = s_max
while s_max > s_min and last_test != s_test:
last_test = s_test
s_test = (s_max + s_min)/2
chance = _chance_duplication(pop_size, s_test)
if chance > desired:
s_max = s_test - 1
else:
s_min = s_test
return s_test
def yield_random_entries(batch_query, batch_size, collision_chance):
""" This method will yield semi-random items from a query in a database friendly way until no
more items match the base query modifier. It will pull batches of batch_size from the query
and yield enough items from each batch so that concurrent workers have a reduced chance of
selecting the same items. For example, if your batches return 10,000 entries, and you desire
only a .03 collision_chance, we will only use 25 random entries before going back to the db
for a new batch.
"""
# Seed with some data which will pass the condition, but will be immediately discarded
all_candidates = [1]
while len(all_candidates) > 0:
all_candidates = list(batch_query().limit(batch_size))
shuffle(all_candidates)
num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
num_selections)
candidates = all_candidates[0:num_selections]
for candidate in candidates:
yield candidate

156
util/migrate/allocator.py Normal file
View file

@ -0,0 +1,156 @@
import logging
import random
from bintrees import RBTree
from threading import Event
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class NoAvailableKeysError(ValueError):
pass
class CompletedKeys(object):
def __init__(self, max_index):
self._max_index = max_index
self._min_index = 0
self._slabs = RBTree()
def _get_previous_or_none(self, index):
try:
return self._slabs.floor_item(index)
except KeyError:
return None
def is_available(self, index):
logger.debug('Testing index %s', index)
if index >= self._max_index or index < self._min_index:
logger.debug('Index out of range')
return False
try:
prev_start, prev_length = self._slabs.floor_item(index)
logger.debug('Prev range: %s-%s', prev_start, prev_start + prev_length)
return (prev_start + prev_length) <= index
except KeyError:
return True
def mark_completed(self, start_index, past_last_index):
logger.debug('Marking the range completed: %s-%s', start_index, past_last_index)
# Find the item directly before this and see if there is overlap
to_discard = set()
try:
prev_start, prev_length = self._slabs.floor_item(start_index)
if prev_start + prev_length >= start_index:
# we are going to merge with the range before us
logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length)
to_discard.add(prev_start)
start_index = prev_start
past_last_index = max(past_last_index, prev_start + prev_length)
except KeyError:
pass
# Find all keys between the start and last index and merge them into one block
for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1):
candidate_next_index = merge_start + merge_length
logger.debug('Merging with block %s-%s', merge_start, candidate_next_index)
to_discard.add(merge_start)
past_last_index = max(past_last_index, candidate_next_index)
# write the new block which is fully merged
discard = False
if past_last_index >= self._max_index:
logger.debug('Discarding block and setting new max to: %s', start_index)
self._max_index = start_index
discard = True
if start_index <= self._min_index:
logger.debug('Discarding block and setting new min to: %s', past_last_index)
self._min_index = past_last_index
discard = True
if to_discard:
logger.debug('Discarding %s obsolte blocks', len(to_discard))
self._slabs.remove_items(to_discard)
if not discard:
logger.debug('Writing new block with range: %s-%s', start_index, past_last_index)
self._slabs.insert(start_index, past_last_index - start_index)
logger.debug('Total blocks: %s', len(self._slabs))
def get_block_start_index(self, block_size_estimate):
logger.debug('Total range: %s-%s', self._min_index, self._max_index)
if self._max_index <= self._min_index:
raise NoAvailableKeysError('All indexes have been marked completed')
num_holes = len(self._slabs) + 1
random_hole = random.randint(0, num_holes - 1)
logger.debug('Selected random hole %s with %s total holes', random_hole, num_holes)
hole_start = self._min_index
past_hole_end = self._max_index
# Now that we have picked a hole, we need to define the bounds
if random_hole > 0:
# There will be a slab before this hole, find where it ends
bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:]
left_index, left_len = bound_entries[0]
logger.debug('Left range %s-%s', left_index, left_index + left_len)
hole_start = left_index + left_len
if len(bound_entries) > 1:
right_index, right_len = bound_entries[1]
logger.debug('Right range %s-%s', right_index, right_index + right_len)
past_hole_end, _ = bound_entries[1]
elif not self._slabs.is_empty():
right_index, right_len = self._slabs.nsmallest(1)[0]
logger.debug('Right range %s-%s', right_index, right_index + right_len)
past_hole_end, _ = self._slabs.nsmallest(1)[0]
# Now that we have our hole bounds, select a random block from [0:len - block_size_estimate]
logger.debug('Selecting from hole range: %s-%s', hole_start, past_hole_end)
rand_max_bound = max(hole_start, past_hole_end - block_size_estimate)
logger.debug('Rand max bound: %s', rand_max_bound)
return random.randint(hole_start, rand_max_bound)
def yield_random_entries(batch_query, primary_key_field, batch_size, max_id):
""" This method will yield items from random blocks in the database. We will track metadata
about which keys are available for work, and we will complete the backfill when there is no
more work to be done. The method yields tupes of (candidate, Event), and if the work was
already done by another worker, the caller should set the event. Batch candidates must have
an "id" field which can be inspected.
"""
allocator = CompletedKeys(max_id + 1)
try:
while True:
start_index = allocator.get_block_start_index(batch_size)
all_candidates = list(batch_query()
.limit(batch_size)
.where(primary_key_field >= start_index))
if len(all_candidates) == 0:
logger.info('No candidates, new highest id: %s', start_index)
allocator.mark_completed(start_index, max_id + 1)
continue
logger.info('Found %s candidates, processing block', len(all_candidates))
for candidate in all_candidates:
abort_early = Event()
yield candidate, abort_early
if abort_early.is_set():
logger.info('Overlap with another worker, aborting')
break
completed_through = candidate.id + 1
logger.info('Marking id range as completed: %s-%s', start_index, completed_through)
allocator.mark_completed(start_index, completed_through)
except NoAvailableKeysError:
logger.info('No more work')

View file

@ -3,12 +3,15 @@ import logging
from peewee import JOIN_LEFT_OUTER
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
TextField, fn)
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
from data.database import BaseModel, CloseForLongOperation
from app import app, storage
from digest import checksums
from util.migrate import yield_random_entries
from util.migrate.allocator import yield_random_entries
BATCH_SIZE = 1000
logger = logging.getLogger(__name__)
@ -69,16 +72,19 @@ def _get_image_storage_locations(storage_id):
def backfill_content_checksums():
""" Copies metadata from image storages to their images. """
logger.debug('Image content checksum backfill: Began execution')
logger.debug('Began execution')
logger.debug('This may be a long operation!')
def batch_query():
return (ImageStorage
.select(ImageStorage.id, ImageStorage.uuid)
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
written = 0
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
max_id):
locations = _get_image_storage_locations(candidate_storage.id)
checksum = None
@ -93,16 +99,23 @@ def backfill_content_checksums():
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
# Now update the ImageStorage with the checksum
with app.config['DB_TRANSACTION_FACTORY'](db):
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
if to_update.content_checksum is not None:
num_updated = (ImageStorage
.update(content_checksum=checksum)
.where(ImageStorage.id == candidate_storage.id,
ImageStorage.content_checksum >> None)).execute()
if num_updated == 0:
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
else:
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
to_update.content_checksum = checksum
to_update.save()
abort.set()
if __name__ == "__main__":
written += num_updated
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
logger.debug('Completed, %s entries written', written)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
logging.getLogger('peewee').setLevel(logging.WARNING)
logging.getLogger('boto').setLevel(logging.WARNING)
logging.getLogger('data.database').setLevel(logging.WARNING)
backfill_content_checksums()

View file

@ -1,8 +1,45 @@
import logging
from data.database import Image, ImageStorage, db, db_for_update
from data.database import BaseModel
from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField,
TextField, IntegerField)
from app import app
from util.migrate import yield_random_entries
from util.migrate.allocator import yield_random_entries
BATCH_SIZE = 1000
class Repository(BaseModel):
pass
# Vendor the information from tables we will be writing to at the time of this migration
class ImageStorage(BaseModel):
uuid = CharField(index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
content_checksum = CharField(null=True, index=True)
class Image(BaseModel):
docker_image_id = CharField(index=True)
repository = ForeignKeyField(Repository)
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
v1_checksum = CharField(null=True)
security_indexed = BooleanField(default=False)
security_indexed_engine = IntegerField(default=-1)
parent_id = IntegerField(index=True, null=True)
logger = logging.getLogger(__name__)
@ -18,23 +55,27 @@ def backfill_parent_id():
return (Image
.select(Image.id, Image.ancestors)
.join(ImageStorage)
.where(Image.parent >> None, Image.ancestors != '/',
.where(Image.parent_id >> None, Image.ancestors != '/',
ImageStorage.uploading == False))
for to_backfill in yield_random_entries(fetch_batch, 10000, 0.3):
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select()
.where(Image.id == to_backfill.id)).get()
image.parent = to_backfill.ancestors.split('/')[-2]
image.save()
except Image.DoesNotExist:
pass
max_id = Image.select(fn.Max(Image.id)).scalar()
logger.debug('backfill_parent_id: Completed')
written = 0
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, BATCH_SIZE, max_id):
computed_parent = int(to_backfill.ancestors.split('/')[-2])
num_changed = (Image
.update(parent_id=computed_parent)
.where(Image.id == to_backfill.id, Image.parent_id >> None)).execute()
if num_changed == 0:
logger.info('Collision with another worker, aborting batch')
abort.set()
written += num_changed
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
if __name__ == "__main__":
logger.debug('backfill_parent_id: Completed, updated %s entries', written)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('peewee').setLevel(logging.CRITICAL)

View file

@ -1,12 +1,15 @@
import logging
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
from data.database import BaseModel, db, db_for_update
from util.migrate import yield_random_entries
TextField, fn)
from data.database import BaseModel
from util.migrate.allocator import yield_random_entries
from app import app
BATCH_SIZE = 1000
logger = logging.getLogger(__name__)
@ -40,28 +43,30 @@ class Image(BaseModel):
def backfill_checksums():
""" Copies checksums from image storages to their images. """
logger.debug('Image v1 checksum backfill: Began execution')
logger.debug('Began execution')
logger.debug('This may be a long operation!')
def batch_query():
return (Image
.select(Image.id)
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
~(ImageStorage.checksum >> None)))
for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
logger.debug('Computing content checksum for storage: %s', candidate_image.id)
max_id = Image.select(fn.Max(Image.id)).scalar()
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id == candidate_image.id)).get()
written = 0
for candidate_image, abort in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id):
num_changed = (Image
.update(v1_checksum=candidate_image.storage.checksum)
.where(Image.id == candidate_image.id, Image.v1_checksum >> None)).execute()
if num_changed == 0:
logger.info('Collision with another worker, aborting batch')
abort.set()
written += num_changed
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
image.v1_checksum = image.storage.checksum
image.save()
except Image.DoesNotExist:
pass
logger.debug('Completed, updated %s entries', written)
if __name__ == "__main__":

View file

@ -104,8 +104,8 @@ def backfill_v1_metadata():
try:
data = storage.get_content(repo_image.storage.locations, json_path)
except IOError:
data = None
logger.exception('failed to find v1 metadata, defaulting to None')
data = "{}"
logger.warning('failed to find v1 metadata, defaulting to {}')
repo_image.v1_json_metadata = data
repo_image.save()
except ImageStoragePlacement.DoesNotExist:

View file

@ -27,7 +27,7 @@ def _get_image_to_export(version):
candidates = (Image
.select(Image.docker_image_id, ImageStorage.uuid, ImageStorage.checksum)
.join(ImageStorage)
.where(Image.security_indexed_engine < version, Image.parent >> None, ImageStorage.uploading == False, ImageStorage.checksum != '')
.where(Image.security_indexed_engine < version, Image.parent_id >> None, ImageStorage.uploading == False, ImageStorage.checksum != '')
.limit(BATCH_SIZE*10)
.alias('candidates'))
@ -44,7 +44,7 @@ def _get_image_to_export(version):
# With analyzed parent
candidates = (Image
.select(Image.docker_image_id, ImageStorage.uuid, ImageStorage.checksum, Parent.docker_image_id.alias('parent_docker_image_id'), ParentImageStorage.uuid.alias('parent_storage_uuid'))
.join(Parent, on=(Image.parent == Parent.id))
.join(Parent, on=(Image.parent_id == Parent.id))
.join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage))
.switch(Image)
.join(ImageStorage)