Finish the implementation of local userfiles. Strip charsets from mimetypes in the build worker. Add canonical name ordering to the build queue. Port all queues to the canonical naming version.

This commit is contained in:
jakedt 2014-04-11 18:34:47 -04:00
parent 808026dc00
commit 61a6db236f
13 changed files with 112 additions and 41 deletions

View file

@ -97,7 +97,7 @@ class DefaultConfig(object):
# Userfiles # Userfiles
USERFILES_TYPE = 'LocalUserfiles' USERFILES_TYPE = 'LocalUserfiles'
USERFILES_PATH = 'test/data/userfiles' USERFILES_PATH = 'test/data/registry/userfiles'
# Analytics # Analytics
ANALYTICS_TYPE = "FakeAnalytics" ANALYTICS_TYPE = "FakeAnalytics"

View file

@ -275,7 +275,7 @@ class RepositoryBuild(BaseModel):
class QueueItem(BaseModel): class QueueItem(BaseModel):
queue_name = CharField(index=True) queue_name = CharField(index=True, max_length=1024)
body = TextField() body = TextField()
available_after = DateTimeField(default=datetime.now, index=True) available_after = DateTimeField(default=datetime.now, index=True)
available = BooleanField(default=True, index=True) available = BooleanField(default=True, index=True)

View file

@ -8,17 +8,26 @@ transaction_factory = app.config['DB_TRANSACTION_FACTORY']
class WorkQueue(object): class WorkQueue(object):
def __init__(self, queue_name): def __init__(self, queue_name, canonical_name_match_list=None):
self.queue_name = queue_name self.queue_name = queue_name
def put(self, message, available_after=0, retries_remaining=5): if canonical_name_match_list is None:
self.canonical_name_match_list = []
else:
self.canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
""" """
Put an item, if it shouldn't be processed for some number of seconds, Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. specify that amount as available_after.
""" """
params = { params = {
'queue_name': self.queue_name, 'queue_name': self._canonical_name([self.queue_name] + canonical_name_list),
'body': message, 'body': message,
'retries_remaining': retries_remaining, 'retries_remaining': retries_remaining,
} }
@ -35,16 +44,25 @@ class WorkQueue(object):
minutes. minutes.
""" """
now = datetime.now() now = datetime.now()
available_or_expired = ((QueueItem.available == True) |
(QueueItem.processing_expires <= now)) name_match_query = '%s%%' % self._canonical_name([self.queue_name] +
self.canonical_name_match_list)
with transaction_factory(db): with transaction_factory(db):
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, running = (QueueItem
QueueItem.available_after <= now, .select(QueueItem.queue_name)
available_or_expired, .where(QueueItem.available == False,
QueueItem.retries_remaining > 0) QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
found = list(avail.limit(1).order_by(QueueItem.available_after)) avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query,
QueueItem.available_after <= now,
((QueueItem.available == True) |
(QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0,
~(QueueItem.queue_name << running))
found = list(avail.limit(1).order_by(QueueItem.id))
if found: if found:
item = found[0] item = found[0]
@ -57,16 +75,24 @@ class WorkQueue(object):
return None return None
def complete(self, completed_item): @staticmethod
def complete(completed_item):
completed_item.delete_instance() completed_item.delete_instance()
def incomplete(self, incomplete_item, retry_after=300): @staticmethod
def incomplete(incomplete_item, retry_after=300):
retry_date = datetime.now() + timedelta(seconds=retry_after) retry_date = datetime.now() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date incomplete_item.available_after = retry_date
incomplete_item.available = True incomplete_item.available = True
incomplete_item.save() incomplete_item.save()
@staticmethod
def extend_processing(queue_item, seconds_from_now):
new_expiration = datetime.now() + timedelta(seconds=seconds_from_now)
queue_item.processing_expires = new_expiration
queue_item.save()
image_diff_queue = WorkQueue('imagediff') image_diff_queue = WorkQueue('imagediff')
dockerfile_build_queue = WorkQueue('dockerfilebuild3') dockerfile_build_queue = WorkQueue('dockerfilebuild')
webhook_queue = WorkQueue('webhook') webhook_queue = WorkQueue('webhook')

View file

@ -2,10 +2,12 @@ import boto
import os import os
import logging import logging
import hashlib import hashlib
import magic
from boto.s3.key import Key from boto.s3.key import Key
from uuid import uuid4 from uuid import uuid4
from flask import url_for from flask import url_for, request, send_file
from flask.views import View
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -88,43 +90,84 @@ class S3Userfiles(object):
return k.etag[1:-1][:7] return k.etag[1:-1][:7]
def upload_userfile_endpoint(file_id): class UserfilesHandlers(View):
raise NotImplementedError() methods = ['GET', 'PUT']
def __init__(self, local_userfiles):
self._userfiles = local_userfiles
self._magic = magic.Magic(mime=True)
def download_userfile_endpoint(file_id): def get(self, file_id):
raise NotImplementedError() path = self._userfiles.file_path(file_id)
logger.debug('Sending path: %s' % path)
return send_file(path, mimetype=self._magic.from_file(path))
def put(self, file_id):
input_stream = request.stream
if request.headers.get('transfer-encoding') == 'chunked':
# Careful, might work only with WSGI servers supporting chunked
# encoding (Gunicorn)
input_stream = request.environ['wsgi.input']
self._userfiles.store_stream(input_stream, file_id)
def dispatch_request(self, file_id):
if request.method == 'GET':
return self.get(file_id)
elif request.method == 'PUT':
return self.put(file_id)
class LocalUserfiles(object): class LocalUserfiles(object):
def __init__(self, path): def __init__(self, app, path):
self._root_path = path self._root_path = path
self._buffer_size = 64 * 1024 # 64 KB self._buffer_size = 64 * 1024 # 64 KB
self._app = app
def _build_url_adapter(self):
return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'],
script_name=self._app.config['APPLICATION_ROOT'] or '/',
url_scheme=self._app.config['PREFERRED_URL_SCHEME'])
def prepare_for_drop(self, mime_type): def prepare_for_drop(self, mime_type):
file_id = str(uuid4()) file_id = str(uuid4())
return (url_for('upload_userfile_endpoint', file_id=file_id), file_id) with self._app.app_context() as ctx:
ctx.url_adapter = self._build_url_adapter()
return (url_for('userfiles_handlers', file_id=file_id, _external=True), file_id)
def file_path(self, file_id):
if '..' in file_id or file_id.startswith('/'):
raise RuntimeError('Invalid Filename')
return os.path.join(self._root_path, file_id)
def store_stream(self, stream, file_id):
path = self.file_path(file_id)
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
def store_file(self, file_like_obj, content_type):
file_id = str(uuid4())
path = os.path.join(self._root_path, file_id)
with open(path, 'w') as to_write: with open(path, 'w') as to_write:
while True: while True:
try: try:
buf = file_like_obj.read(self._buffer_size) buf = stream.read(self._buffer_size)
if not buf: if not buf:
break break
to_write.write(buf) to_write.write(buf)
except IOError: except IOError:
break break
def store_file(self, file_like_obj, content_type):
file_id = str(uuid4())
self.store_stream(file_like_obj, content_type)
return file_id return file_id
def get_file_url(self, file_id, expires_in=300): def get_file_url(self, file_id, expires_in=300):
return url_for('download_userfile_endpoint', file_id=file_id) with self._app.app_context() as ctx:
ctx.url_adapter = self._build_url_adapter()
return url_for('userfiles_handlers', file_id=file_id, _external=True)
def get_file_checksum(self, file_id): def get_file_checksum(self, file_id):
path = os.path.join(self._root_path, file_id) path = self.file_path(file_id)
sha_hash = hashlib.sha256() sha_hash = hashlib.sha256()
with open(path, 'r') as to_hash: with open(path, 'r') as to_hash:
while True: while True:
@ -148,11 +191,10 @@ class Userfiles(object):
path = app.config.get('USERFILES_PATH', '') path = app.config.get('USERFILES_PATH', '')
if storage_type == 'LocalUserfiles': if storage_type == 'LocalUserfiles':
app.add_url_rule('/userfiles/<file_id>', 'upload_userfile_endpoint', userfiles = LocalUserfiles(app, path)
upload_userfile_endpoint, methods=['PUT']) app.add_url_rule('/userfiles/<file_id>',
app.add_url_rule('/userfiles/<file_id>', 'download_userfile_endpoint', view_func=UserfilesHandlers.as_view('userfiles_handlers',
download_userfile_endpoint, methods=['GET']) local_userfiles=userfiles))
userfiles = LocalUserfiles(path)
elif storage_type == 'S3Userfiles': elif storage_type == 'S3Userfiles':
access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '') access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '')

View file

@ -28,8 +28,7 @@ SERVER_HOSTNAME = app.config['SERVER_HOSTNAME']
def fully_qualified_name(method_view_class): def fully_qualified_name(method_view_class):
inst = method_view_class() return '%s.%s' % (method_view_class.__module__, method_view_class.__name__)
return '%s.%s' % (inst.__module__, inst.__class__.__name__)
def swagger_route_data(include_internal=False, compact=False): def swagger_route_data(include_internal=False, compact=False):

View file

@ -160,7 +160,7 @@ def start_build(repository, dockerfile_id, tags, build_name, subdir, manual,
dockerfile_id, build_name, dockerfile_id, build_name,
trigger, pull_robot_name = pull_robot_name) trigger, pull_robot_name = pull_robot_name)
dockerfile_build_queue.put(json.dumps({ dockerfile_build_queue.put([repository.namespace, repository.name], json.dumps({
'build_uuid': build_request.uuid, 'build_uuid': build_request.uuid,
'namespace': repository.namespace, 'namespace': repository.namespace,
'repository': repository.name, 'repository': repository.name,

View file

@ -315,7 +315,7 @@ def update_images(namespace, repository):
'pushed_image_count': len(image_with_checksums), 'pushed_image_count': len(image_with_checksums),
'pruned_image_count': num_removed, 'pruned_image_count': num_removed,
} }
webhook_queue.put(json.dumps(webhook_data)) webhook_queue.put([namespace, repository], json.dumps(webhook_data))
return make_response('Updated', 204) return make_response('Updated', 204)

View file

@ -179,7 +179,7 @@ def put_image_layer(namespace, repository, image_id):
# The layer is ready for download, send a job to the work queue to # The layer is ready for download, send a job to the work queue to
# process it. # process it.
logger.debug('Queing diffs job for image: %s' % image_id) logger.debug('Queing diffs job for image: %s' % image_id)
image_diff_queue.put(json.dumps({ image_diff_queue.put([namespace, repository, image_id], json.dumps({
'namespace': namespace, 'namespace': namespace,
'repository': repository, 'repository': repository,
'image_id': image_id, 'image_id': image_id,
@ -232,7 +232,7 @@ def put_image_checksum(namespace, repository, image_id):
# The layer is ready for download, send a job to the work queue to # The layer is ready for download, send a job to the work queue to
# process it. # process it.
logger.debug('Queing diffs job for image: %s' % image_id) logger.debug('Queing diffs job for image: %s' % image_id)
image_diff_queue.put(json.dumps({ image_diff_queue.put([namespace, repository, image_id], json.dumps({
'namespace': namespace, 'namespace': namespace,
'repository': repository, 'repository': repository,
'image_id': image_id, 'image_id': image_id,

View file

@ -5,7 +5,6 @@ from flask import request, make_response, Blueprint
from app import billing as stripe from app import billing as stripe
from data import model from data import model
from data.queue import dockerfile_build_queue
from auth.auth import process_auth from auth.auth import process_auth
from auth.permissions import ModifyRepositoryPermission from auth.permissions import ModifyRepositoryPermission
from util.invoice import renderInvoiceToHtml from util.invoice import renderInvoiceToHtml

View file

@ -28,4 +28,5 @@ flask-restful
jsonschema jsonschema
git+https://github.com/NateFerrero/oauth2lib.git git+https://github.com/NateFerrero/oauth2lib.git
alembic alembic
sqlalchemy sqlalchemy
python-magic

View file

@ -42,6 +42,7 @@ pycrypto==2.6.1
python-daemon==1.6 python-daemon==1.6
python-dateutil==2.2 python-dateutil==2.2
python-digitalocean==0.7 python-digitalocean==0.7
python-magic==0.4.6
pytz==2014.2 pytz==2014.2
redis==2.9.1 redis==2.9.1
reportlab==2.7 reportlab==2.7

Binary file not shown.

View file

@ -397,6 +397,9 @@ class DockerfileBuildWorker(Worker):
docker_resource = requests.get(resource_url, stream=True) docker_resource = requests.get(resource_url, stream=True)
c_type = docker_resource.headers['content-type'] c_type = docker_resource.headers['content-type']
if ';' in c_type:
c_type = c_type.split(';')[0]
filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' % filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
(c_type, repo, tag_names)) (c_type, repo, tag_names))
logger.info(filetype_msg) logger.info(filetype_msg)