diff --git a/config.py b/config.py index 8c30b2acb..45e6ae0bb 100644 --- a/config.py +++ b/config.py @@ -97,7 +97,7 @@ class DefaultConfig(object): # Userfiles USERFILES_TYPE = 'LocalUserfiles' - USERFILES_PATH = 'test/data/userfiles' + USERFILES_PATH = 'test/data/registry/userfiles' # Analytics ANALYTICS_TYPE = "FakeAnalytics" diff --git a/data/database.py b/data/database.py index 6fe6760ec..c4a96c0d0 100644 --- a/data/database.py +++ b/data/database.py @@ -275,7 +275,7 @@ class RepositoryBuild(BaseModel): class QueueItem(BaseModel): - queue_name = CharField(index=True) + queue_name = CharField(index=True, max_length=1024) body = TextField() available_after = DateTimeField(default=datetime.now, index=True) available = BooleanField(default=True, index=True) diff --git a/data/queue.py b/data/queue.py index 09e90f1a1..b4b53faa5 100644 --- a/data/queue.py +++ b/data/queue.py @@ -8,17 +8,26 @@ transaction_factory = app.config['DB_TRANSACTION_FACTORY'] class WorkQueue(object): - def __init__(self, queue_name): + def __init__(self, queue_name, canonical_name_match_list=None): self.queue_name = queue_name - def put(self, message, available_after=0, retries_remaining=5): + if canonical_name_match_list is None: + self.canonical_name_match_list = [] + else: + self.canonical_name_match_list = canonical_name_match_list + + @staticmethod + def _canonical_name(name_list): + return '/'.join(name_list) + '/' + + def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { - 'queue_name': self.queue_name, + 'queue_name': self._canonical_name([self.queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } @@ -35,16 +44,25 @@ class WorkQueue(object): minutes. """ now = datetime.now() - available_or_expired = ((QueueItem.available == True) | - (QueueItem.processing_expires <= now)) + + name_match_query = '%s%%' % self._canonical_name([self.queue_name] + + self.canonical_name_match_list) with transaction_factory(db): - avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, - QueueItem.available_after <= now, - available_or_expired, - QueueItem.retries_remaining > 0) + running = (QueueItem + .select(QueueItem.queue_name) + .where(QueueItem.available == False, + QueueItem.processing_expires > now, + QueueItem.queue_name ** name_match_query)) - found = list(avail.limit(1).order_by(QueueItem.available_after)) + avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query, + QueueItem.available_after <= now, + ((QueueItem.available == True) | + (QueueItem.processing_expires <= now)), + QueueItem.retries_remaining > 0, + ~(QueueItem.queue_name << running)) + + found = list(avail.limit(1).order_by(QueueItem.id)) if found: item = found[0] @@ -57,16 +75,24 @@ class WorkQueue(object): return None - def complete(self, completed_item): + @staticmethod + def complete(completed_item): completed_item.delete_instance() - def incomplete(self, incomplete_item, retry_after=300): + @staticmethod + def incomplete(incomplete_item, retry_after=300): retry_date = datetime.now() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True incomplete_item.save() + @staticmethod + def extend_processing(queue_item, seconds_from_now): + new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) + queue_item.processing_expires = new_expiration + queue_item.save() + image_diff_queue = WorkQueue('imagediff') -dockerfile_build_queue = WorkQueue('dockerfilebuild3') +dockerfile_build_queue = WorkQueue('dockerfilebuild') webhook_queue = WorkQueue('webhook') diff --git a/data/userfiles.py b/data/userfiles.py index 7d1f8b69b..8722803ca 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -2,10 +2,12 @@ import boto import os import logging import hashlib +import magic from boto.s3.key import Key from uuid import uuid4 -from flask import url_for +from flask import url_for, request, send_file +from flask.views import View logger = logging.getLogger(__name__) @@ -88,43 +90,84 @@ class S3Userfiles(object): return k.etag[1:-1][:7] -def upload_userfile_endpoint(file_id): - raise NotImplementedError() +class UserfilesHandlers(View): + methods = ['GET', 'PUT'] + def __init__(self, local_userfiles): + self._userfiles = local_userfiles + self._magic = magic.Magic(mime=True) -def download_userfile_endpoint(file_id): - raise NotImplementedError() + def get(self, file_id): + path = self._userfiles.file_path(file_id) + logger.debug('Sending path: %s' % path) + return send_file(path, mimetype=self._magic.from_file(path)) + + def put(self, file_id): + input_stream = request.stream + if request.headers.get('transfer-encoding') == 'chunked': + # Careful, might work only with WSGI servers supporting chunked + # encoding (Gunicorn) + input_stream = request.environ['wsgi.input'] + + self._userfiles.store_stream(input_stream, file_id) + + def dispatch_request(self, file_id): + if request.method == 'GET': + return self.get(file_id) + elif request.method == 'PUT': + return self.put(file_id) class LocalUserfiles(object): - def __init__(self, path): + def __init__(self, app, path): self._root_path = path self._buffer_size = 64 * 1024 # 64 KB + self._app = app + + def _build_url_adapter(self): + return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'], + script_name=self._app.config['APPLICATION_ROOT'] or '/', + url_scheme=self._app.config['PREFERRED_URL_SCHEME']) def prepare_for_drop(self, mime_type): file_id = str(uuid4()) - return (url_for('upload_userfile_endpoint', file_id=file_id), file_id) + with self._app.app_context() as ctx: + ctx.url_adapter = self._build_url_adapter() + return (url_for('userfiles_handlers', file_id=file_id, _external=True), file_id) + + def file_path(self, file_id): + if '..' in file_id or file_id.startswith('/'): + raise RuntimeError('Invalid Filename') + return os.path.join(self._root_path, file_id) + + def store_stream(self, stream, file_id): + path = self.file_path(file_id) + dirname = os.path.dirname(path) + if not os.path.exists(dirname): + os.makedirs(dirname) - def store_file(self, file_like_obj, content_type): - file_id = str(uuid4()) - path = os.path.join(self._root_path, file_id) with open(path, 'w') as to_write: while True: try: - buf = file_like_obj.read(self._buffer_size) + buf = stream.read(self._buffer_size) if not buf: break to_write.write(buf) except IOError: break + def store_file(self, file_like_obj, content_type): + file_id = str(uuid4()) + self.store_stream(file_like_obj, content_type) return file_id def get_file_url(self, file_id, expires_in=300): - return url_for('download_userfile_endpoint', file_id=file_id) + with self._app.app_context() as ctx: + ctx.url_adapter = self._build_url_adapter() + return url_for('userfiles_handlers', file_id=file_id, _external=True) def get_file_checksum(self, file_id): - path = os.path.join(self._root_path, file_id) + path = self.file_path(file_id) sha_hash = hashlib.sha256() with open(path, 'r') as to_hash: while True: @@ -148,11 +191,10 @@ class Userfiles(object): path = app.config.get('USERFILES_PATH', '') if storage_type == 'LocalUserfiles': - app.add_url_rule('/userfiles/', 'upload_userfile_endpoint', - upload_userfile_endpoint, methods=['PUT']) - app.add_url_rule('/userfiles/', 'download_userfile_endpoint', - download_userfile_endpoint, methods=['GET']) - userfiles = LocalUserfiles(path) + userfiles = LocalUserfiles(app, path) + app.add_url_rule('/userfiles/', + view_func=UserfilesHandlers.as_view('userfiles_handlers', + local_userfiles=userfiles)) elif storage_type == 'S3Userfiles': access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '') diff --git a/endpoints/api/discovery.py b/endpoints/api/discovery.py index a9dc48298..6e7eb3f5a 100644 --- a/endpoints/api/discovery.py +++ b/endpoints/api/discovery.py @@ -28,8 +28,7 @@ SERVER_HOSTNAME = app.config['SERVER_HOSTNAME'] def fully_qualified_name(method_view_class): - inst = method_view_class() - return '%s.%s' % (inst.__module__, inst.__class__.__name__) + return '%s.%s' % (method_view_class.__module__, method_view_class.__name__) def swagger_route_data(include_internal=False, compact=False): diff --git a/endpoints/common.py b/endpoints/common.py index 6d201d911..33986fd8d 100644 --- a/endpoints/common.py +++ b/endpoints/common.py @@ -160,7 +160,7 @@ def start_build(repository, dockerfile_id, tags, build_name, subdir, manual, dockerfile_id, build_name, trigger, pull_robot_name = pull_robot_name) - dockerfile_build_queue.put(json.dumps({ + dockerfile_build_queue.put([repository.namespace, repository.name], json.dumps({ 'build_uuid': build_request.uuid, 'namespace': repository.namespace, 'repository': repository.name, diff --git a/endpoints/index.py b/endpoints/index.py index 0ff64709b..8271dda73 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -315,7 +315,7 @@ def update_images(namespace, repository): 'pushed_image_count': len(image_with_checksums), 'pruned_image_count': num_removed, } - webhook_queue.put(json.dumps(webhook_data)) + webhook_queue.put([namespace, repository], json.dumps(webhook_data)) return make_response('Updated', 204) diff --git a/endpoints/registry.py b/endpoints/registry.py index 8e381a1bd..9d981ba08 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -179,7 +179,7 @@ def put_image_layer(namespace, repository, image_id): # The layer is ready for download, send a job to the work queue to # process it. logger.debug('Queing diffs job for image: %s' % image_id) - image_diff_queue.put(json.dumps({ + image_diff_queue.put([namespace, repository, image_id], json.dumps({ 'namespace': namespace, 'repository': repository, 'image_id': image_id, @@ -232,7 +232,7 @@ def put_image_checksum(namespace, repository, image_id): # The layer is ready for download, send a job to the work queue to # process it. logger.debug('Queing diffs job for image: %s' % image_id) - image_diff_queue.put(json.dumps({ + image_diff_queue.put([namespace, repository, image_id], json.dumps({ 'namespace': namespace, 'repository': repository, 'image_id': image_id, diff --git a/endpoints/webhooks.py b/endpoints/webhooks.py index df5988750..4bba9a25f 100644 --- a/endpoints/webhooks.py +++ b/endpoints/webhooks.py @@ -5,7 +5,6 @@ from flask import request, make_response, Blueprint from app import billing as stripe from data import model -from data.queue import dockerfile_build_queue from auth.auth import process_auth from auth.permissions import ModifyRepositoryPermission from util.invoice import renderInvoiceToHtml diff --git a/requirements-nover.txt b/requirements-nover.txt index 439bf714d..2d0d91c34 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -28,4 +28,5 @@ flask-restful jsonschema git+https://github.com/NateFerrero/oauth2lib.git alembic -sqlalchemy \ No newline at end of file +sqlalchemy +python-magic diff --git a/requirements.txt b/requirements.txt index 06f341226..45afa158b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -42,6 +42,7 @@ pycrypto==2.6.1 python-daemon==1.6 python-dateutil==2.2 python-digitalocean==0.7 +python-magic==0.4.6 pytz==2014.2 redis==2.9.1 reportlab==2.7 diff --git a/test/data/test.db b/test/data/test.db index 6471bc3e4..fc0078c61 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index 4d7b0e306..b16008ba1 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -397,6 +397,9 @@ class DockerfileBuildWorker(Worker): docker_resource = requests.get(resource_url, stream=True) c_type = docker_resource.headers['content-type'] + if ';' in c_type: + c_type = c_type.split(';')[0] + filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' % (c_type, repo, tag_names)) logger.info(filetype_msg)