From deee70d53bd551430faa0f3ba015ac7434622994 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Wed, 25 Sep 2013 17:50:03 -0400 Subject: [PATCH] Add the registry code. --- endpoints/registry.py | 263 ++++++++++++++++++++++++++++++++++++++++++ endpoints/tags.py | 93 +++++++++++++++ registry_wsgi.py | 13 +++ storage/__init__.py | 138 ++++++++++++++++++++++ storage/local.py | 82 +++++++++++++ storage/s3.py | 128 ++++++++++++++++++++ util/checksums.py | 89 ++++++++++++++ 7 files changed, 806 insertions(+) create mode 100644 endpoints/registry.py create mode 100644 endpoints/tags.py create mode 100644 registry_wsgi.py create mode 100644 storage/__init__.py create mode 100644 storage/local.py create mode 100644 storage/s3.py create mode 100644 util/checksums.py diff --git a/endpoints/registry.py b/endpoints/registry.py new file mode 100644 index 000000000..3d80f53b5 --- /dev/null +++ b/endpoints/registry.py @@ -0,0 +1,263 @@ + +import logging + +from flask import make_response, request, session, Response, abort +import simplejson as json +from functools import wraps +from datetime import datetime +from time import time + +import storage + +from app import app +from auth.auth import process_auth +from util import checksums + + +store = storage.load() +logger = logging.getLogger(__name__) + + +class SocketReader(object): + def __init__(self, fp): + self._fp = fp + self.handlers = [] + + def add_handler(self, handler): + self.handlers.append(handler) + + def read(self, n=-1): + buf = self._fp.read(n) + if not buf: + return '' + for handler in self.handlers: + handler(buf) + return buf + + +def require_completion(f): + """This make sure that the image push correctly finished.""" + @wraps(f) + def wrapper(*args, **kwargs): + if store.exists(store.image_mark_path(kwargs['image_id'])): + abort(400) #'Image is being uploaded, retry later') + return f(*args, **kwargs) + return wrapper + + +def set_cache_headers(f): + """Returns HTTP headers suitable for caching.""" + @wraps(f) + def wrapper(*args, **kwargs): + # Set TTL to 1 year by default + ttl = 31536000 + expires = datetime.fromtimestamp(int(time()) + ttl) + expires = expires.strftime('%a, %d %b %Y %H:%M:%S GMT') + headers = { + 'Cache-Control': 'public, max-age={0}'.format(ttl), + 'Expires': expires, + 'Last-Modified': 'Thu, 01 Jan 1970 00:00:00 GMT', + } + if 'If-Modified-Since' in request.headers: + response = make_response('Not modified', 304) + response.headers.extend(headers) + return response + kwargs['headers'] = headers + # Prevent the Cookie to be sent when the object is cacheable + session.modified = False + return f(*args, **kwargs) + return wrapper + + +@app.route('/v1/images//layer', methods=['GET']) +@process_auth +@require_completion +@set_cache_headers +def get_image_layer(image_id, headers): + try: + return Response(store.stream_read(store.image_layer_path( + image_id)), headers=headers) + except IOError: + abort(404) #'Image not found', 404) + + +@app.route('/v1/images//layer', methods=['PUT']) +@process_auth +def put_image_layer(image_id): + try: + json_data = store.get_content(store.image_json_path(image_id)) + except IOError: + abort(404) #'Image not found', 404) + layer_path = store.image_layer_path(image_id) + mark_path = store.image_mark_path(image_id) + if store.exists(layer_path) and not store.exists(mark_path): + abort(409) #'Image already exists', 409) + input_stream = request.stream + if request.headers.get('transfer-encoding') == 'chunked': + # Careful, might work only with WSGI servers supporting chunked + # encoding (Gunicorn) + input_stream = request.environ['wsgi.input'] + # compute checksums + csums = [] + sr = SocketReader(input_stream) + tmp, store_hndlr = storage.temp_store_handler() + sr.add_handler(store_hndlr) + h, sum_hndlr = checksums.simple_checksum_handler(json_data) + sr.add_handler(sum_hndlr) + store.stream_write(layer_path, sr) + csums.append('sha256:{0}'.format(h.hexdigest())) + try: + tmp.seek(0) + csums.append(checksums.compute_tarsum(tmp, json_data)) + tmp.close() + except (IOError, checksums.TarError) as e: + logger.debug('put_image_layer: Error when computing tarsum ' + '{0}'.format(e)) + try: + checksum = store.get_content(store.image_checksum_path(image_id)) + except IOError: + # We don't have a checksum stored yet, that's fine skipping the check. + # Not removing the mark though, image is not downloadable yet. + session['checksum'] = csums + return make_response('true', 200) + # We check if the checksums provided matches one the one we computed + if checksum not in csums: + logger.debug('put_image_layer: Wrong checksum') + abort(400) #'Checksum mismatch, ignoring the layer') + # Checksum is ok, we remove the marker + store.remove(mark_path) + return make_response('true', 200) + + +@app.route('/v1/images//checksum', methods=['PUT']) +@process_auth +def put_image_checksum(image_id): + checksum = request.headers.get('X-Docker-Checksum') + if not checksum: + abort(400) #'Missing Image\'s checksum') + if not session.get('checksum'): + abort(400) #'Checksum not found in Cookie') + if not store.exists(store.image_json_path(image_id)): + abort(404) #'Image not found', 404) + mark_path = store.image_mark_path(image_id) + if not store.exists(mark_path): + abort(409) #'Cannot set this image checksum', 409) + err = store_checksum(image_id, checksum) + if err: + abort(err) + if checksum not in session.get('checksum', []): + logger.debug('put_image_layer: Wrong checksum') + abort(400) #'Checksum mismatch') + # Checksum is ok, we remove the marker + store.remove(mark_path) + return make_response('true', 200) + + +@app.route('/v1/images//json', methods=['GET']) +@process_auth +@require_completion +@set_cache_headers +def get_image_json(image_id, headers): + try: + data = store.get_content(store.image_json_path(image_id)) + except IOError: + abort(404) #'Image not found', 404) + try: + size = store.get_size(store.image_layer_path(image_id)) + headers['X-Docker-Size'] = str(size) + except OSError: + pass + checksum_path = store.image_checksum_path(image_id) + if store.exists(checksum_path): + headers['X-Docker-Checksum'] = store.get_content(checksum_path) + response = make_response(data, 200) + response.headers.extend(headers) + return response + + +@app.route('/v1/images//ancestry', methods=['GET']) +@process_auth +@require_completion +@set_cache_headers +def get_image_ancestry(image_id, headers): + try: + data = store.get_content(store.image_ancestry_path(image_id)) + except IOError: + abort(404) #'Image not found', 404) + response = make_response(json.dumps(json.loads(data)), 200) + response.headers.extend(headers) + return response + + +def generate_ancestry(image_id, parent_id=None): + if not parent_id: + store.put_content(store.image_ancestry_path(image_id), + json.dumps([image_id])) + return + data = store.get_content(store.image_ancestry_path(parent_id)) + data = json.loads(data) + data.insert(0, image_id) + store.put_content(store.image_ancestry_path(image_id), json.dumps(data)) + + +def check_images_list(image_id): + full_repos_name = session.get('repository') + if not full_repos_name: + # We only enforce this check when there is a repos name in the session + # otherwise it means that the auth is disabled. + return True + try: + path = store.images_list_path(*full_repos_name.split('/')) + images_list = json.loads(store.get_content(path)) + except IOError: + return False + return (image_id in images_list) + + +def store_checksum(image_id, checksum): + checksum_parts = checksum.split(':') + if len(checksum_parts) != 2: + return 'Invalid checksum format' + # We store the checksum + checksum_path = store.image_checksum_path(image_id) + store.put_content(checksum_path, checksum) + + +@app.route('/v1/images//json', methods=['PUT']) +@process_auth +def put_image_json(image_id): + try: + data = json.loads(request.data) + except json.JSONDecodeError: + pass + if not data or not isinstance(data, dict): + abort(400) #'Invalid JSON') + if 'id' not in data: + abort(400) #'Missing key `id\' in JSON') + # Read the checksum + checksum = request.headers.get('X-Docker-Checksum') + if checksum: + # Storing the checksum is optional at this stage + err = store_checksum(image_id, checksum) + if err: + abort(err) + else: + # We cleanup any old checksum in case it's a retry after a fail + store.remove(store.image_checksum_path(image_id)) + if image_id != data['id']: + abort(400) #'JSON data contains invalid id') + if check_images_list(image_id) is False: + abort(400) #'This image does not belong to the repository') + parent_id = data.get('parent') + if parent_id and not store.exists(store.image_json_path(data['parent'])): + abort(400) #'Image depends on a non existing parent') + json_path = store.image_json_path(image_id) + mark_path = store.image_mark_path(image_id) + if store.exists(json_path) and not store.exists(mark_path): + abort(409) #'Image already exists', 409) + # If we reach that point, it means that this is a new image or a retry + # on a failed push + store.put_content(mark_path, 'true') + store.put_content(json_path, request.data) + generate_ancestry(image_id, parent_id) + return make_response('true', 200) diff --git a/endpoints/tags.py b/endpoints/tags.py new file mode 100644 index 000000000..89cbdc442 --- /dev/null +++ b/endpoints/tags.py @@ -0,0 +1,93 @@ + +import logging + +from flask import abort, request, jsonify, make_response +import simplejson as json + +import storage + +from app import app +from util.names import parse_repository_name +from auth.auth import process_auth +from auth.permissions import (ReadRepositoryPermission, + ModifyRepositoryPermission) +from data import model + + +store = storage.load() +logger = logging.getLogger(__name__) + + +@app.route('/v1/repositories//tags', + methods=['GET']) +@process_auth +@parse_repository_name +def get_tags(namespace, repository): + permission = ReadRepositoryPermission(namespace, repository) + + if permission.can(): + tags = model.list_repository_tags(namespace, repository) + tag_map = {tag.name: tag.image.image_id for tag in tags} + return jsonify(tag_map) + + abort(403) + + +@app.route('/v1/repositories//tags/', + methods=['GET']) +@process_auth +@parse_repository_name +def get_tag(namespace, repository, tag): + permission = ReadRepositoryPermission(namespace, repository) + + if permission.can(): + tag_image = model.get_tag_image(namespace, repository, tag) + response = make_response(tag_image.image_id, 200) + + abort(403) + + +@app.route('/v1/repositories//tags/', + methods=['PUT']) +@process_auth +@parse_repository_name +def put_tag(namespace, repository, tag): + permission = ModifyRepositoryPermission(namespace, repository) + + if permission.can(): + image_id = json.loads(request.data) + model.create_or_update_tag(namespace, repository, tag, image_id) + + return make_response('Created', 200) + + abort(403) + + +@app.route('/v1/repositories//tags/', + methods=['DELETE']) +@process_auth +@parse_repository_name +def delete_tag(namespace, repository, tag): + permission = ModifyRepositoryPermission(namespace, repository) + + if permission.can(): + model.delete_tag(namespace, repository, tag_name) + + return make_response('Deleted', 204) + + abort(403) + + +@app.route('/v1/repositories//tags', + methods=['DELETE']) +@process_auth +@parse_repository_name +def delete_repository(namespace, repository): + permission = ModifyRepositoryPermission(namespace, repository) + + if permission.can(): + model.delete_all_repository_tags(namespace, repository) + + return make_response('Deleted', 204) + + abort(403) diff --git a/registry_wsgi.py b/registry_wsgi.py new file mode 100644 index 000000000..29e3c3112 --- /dev/null +++ b/registry_wsgi.py @@ -0,0 +1,13 @@ +import logging + +from app import app + +import endpoints.registry +import endpoints.tags + +if __name__ == '__main__': + FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - ' + \ + '%(funcName)s - %(message)s' + logging.basicConfig(format=FORMAT, level=logging.DEBUG) + + app.run(port=5000, debug=True) diff --git a/storage/__init__.py b/storage/__init__.py new file mode 100644 index 000000000..7cb508176 --- /dev/null +++ b/storage/__init__.py @@ -0,0 +1,138 @@ + +import contextlib +import tempfile + + +__all__ = ['load'] + + +class Storage(object): + + """Storage is organized as follow: + $ROOT/images//json + $ROOT/images//layer + $ROOT/repositories/// + """ + + # Useful if we want to change those locations later without rewriting + # the code which uses Storage + repositories = 'repositories' + images = 'images' + # Set the IO buffer to 64kB + buffer_size = 64 * 1024 + + #FIXME(samalba): Move all path resolver in each module (out of the base) + def images_list_path(self, namespace, repository): + return '{0}/{1}/{2}/_images_list'.format(self.repositories, + namespace, + repository) + + def image_json_path(self, image_id): + return '{0}/{1}/json'.format(self.images, image_id) + + def image_mark_path(self, image_id): + return '{0}/{1}/_inprogress'.format(self.images, image_id) + + def image_checksum_path(self, image_id): + return '{0}/{1}/_checksum'.format(self.images, image_id) + + def image_layer_path(self, image_id): + return '{0}/{1}/layer'.format(self.images, image_id) + + def image_ancestry_path(self, image_id): + return '{0}/{1}/ancestry'.format(self.images, image_id) + + def tag_path(self, namespace, repository, tagname=None): + if not tagname: + return '{0}/{1}/{2}'.format(self.repositories, + namespace, + repository) + return '{0}/{1}/{2}/tag_{3}'.format(self.repositories, + namespace, + repository, + tagname) + + def index_images_path(self, namespace, repository): + return '{0}/{1}/{2}/_index_images'.format(self.repositories, + namespace, + repository) + + def get_content(self, path): + raise NotImplementedError + + def put_content(self, path, content): + raise NotImplementedError + + def stream_read(self, path): + raise NotImplementedError + + def stream_write(self, path, fp): + raise NotImplementedError + + def list_directory(self, path=None): + raise NotImplementedError + + def exists(self, path): + raise NotImplementedError + + def remove(self, path): + raise NotImplementedError + + def get_size(self, path): + raise NotImplementedError + + +@contextlib.contextmanager +def store_stream(stream): + """Stores the entire stream to a temporary file.""" + tmpf = tempfile.TemporaryFile() + while True: + try: + buf = stream.read(4096) + if not buf: + break + tmpf.write(buf) + except IOError: + break + tmpf.seek(0) + yield tmpf + tmpf.close() + + +def temp_store_handler(): + tmpf = tempfile.TemporaryFile() + + def fn(buf): + try: + tmpf.write(buf) + except IOError: + pass + + return tmpf, fn + + +from local import LocalStorage +from s3 import S3Storage + + +_storage = {} + + +def load(kind=None): + """Returns the right storage class according to the configuration.""" + global _storage + + # TODO hard code to local for now + kind = 'local' + # if not kind: + # kind = cfg.storage.lower() + if kind in _storage: + return _storage[kind] + if kind == 's3': + store = S3Storage('/registry', 'access_key', 'secret_key', 'bucket') + elif kind == 'local': + store = LocalStorage('/tmp/registry') + else: + raise ValueError('Not supported storage \'{0}\''.format(kind)) + _storage[kind] = store + return store diff --git a/storage/local.py b/storage/local.py new file mode 100644 index 000000000..42980713c --- /dev/null +++ b/storage/local.py @@ -0,0 +1,82 @@ + +import os +import shutil + +from . import Storage + + +class LocalStorage(Storage): + + def __init__(self, storage_path): + self._root_path = storage_path + + def _init_path(self, path=None, create=False): + path = os.path.join(self._root_path, path) if path else self._root_path + if create is True: + dirname = os.path.dirname(path) + if not os.path.exists(dirname): + os.makedirs(dirname) + return path + + def get_content(self, path): + path = self._init_path(path) + with open(path, mode='r') as f: + return f.read() + + def put_content(self, path, content): + path = self._init_path(path, create=True) + with open(path, mode='w') as f: + f.write(content) + return path + + def stream_read(self, path): + path = self._init_path(path) + with open(path, mode='rb') as f: + while True: + buf = f.read(self.buffer_size) + if not buf: + break + yield buf + + def stream_write(self, path, fp): + # Size is mandatory + path = self._init_path(path, create=True) + with open(path, mode='wb') as f: + while True: + try: + buf = fp.read(self.buffer_size) + if not buf: + break + f.write(buf) + except IOError: + break + + def list_directory(self, path=None): + path = self._init_path(path) + prefix = path[len(self._root_path) + 1:] + '/' + exists = False + for d in os.listdir(path): + exists = True + yield prefix + d + if exists is False: + # Raises OSError even when the directory is empty + # (to be consistent with S3) + raise OSError('No such directory: \'{0}\''.format(path)) + + def exists(self, path): + path = self._init_path(path) + return os.path.exists(path) + + def remove(self, path): + path = self._init_path(path) + if os.path.isdir(path): + shutil.rmtree(path) + return + try: + os.remove(path) + except OSError: + pass + + def get_size(self, path): + path = self._init_path(path) + return os.path.getsize(path) diff --git a/storage/s3.py b/storage/s3.py new file mode 100644 index 000000000..fb125186b --- /dev/null +++ b/storage/s3.py @@ -0,0 +1,128 @@ + +import cStringIO as StringIO +import os + +import boto.s3.connection +import boto.s3.key + +from . import Storage + + +class S3Storage(Storage): + + def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): + self._s3_conn = \ + boto.s3.connection.S3Connection(s3_access_key, + s3_secret_key, + is_secure=False) + self._s3_bucket = self._s3_conn.get_bucket(s3_bucket) + self._root_path = storage_path + + def _debug_key(self, key): + """Used for debugging only.""" + orig_meth = key.bucket.connection.make_request + + def new_meth(*args, **kwargs): + print '#' * 16 + print args + print kwargs + print '#' * 16 + return orig_meth(*args, **kwargs) + key.bucket.connection.make_request = new_meth + + def _init_path(self, path=None): + path = os.path.join(self._root_path, path) if path else self._root_path + if path and path[0] == '/': + return path[1:] + return path + + def get_content(self, path): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + if not key.exists(): + raise IOError('No such key: \'{0}\''.format(path)) + return key.get_contents_as_string() + + def put_content(self, path, content): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + key.set_contents_from_string(content) + return path + + def stream_read(self, path): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + if not key.exists(): + raise IOError('No such key: \'{0}\''.format(path)) + while True: + buf = key.read(self.buffer_size) + if not buf: + break + yield buf + + def stream_write(self, path, fp): + # Minimum size of upload part size on S3 is 5MB + buffer_size = 5 * 1024 * 1024 + if self.buffer_size > buffer_size: + buffer_size = self.buffer_size + path = self._init_path(path) + mp = self._s3_bucket.initiate_multipart_upload(path) + num_part = 1 + while True: + try: + buf = fp.read(buffer_size) + if not buf: + break + io = StringIO.StringIO(buf) + mp.upload_part_from_file(io, num_part) + num_part += 1 + io.close() + except IOError: + break + mp.complete_upload() + + def list_directory(self, path=None): + path = self._init_path(path) + if not path.endswith('/'): + path += '/' + ln = 0 + if self._root_path != '/': + ln = len(self._root_path) + exists = False + for key in self._s3_bucket.list(prefix=path, delimiter='/'): + exists = True + name = key.name + if name.endswith('/'): + yield name[ln:-1] + else: + yield name[ln:] + if exists is False: + # In order to be compliant with the LocalStorage API. Even though + # S3 does not have a concept of folders. + raise OSError('No such directory: \'{0}\''.format(path)) + + def exists(self, path): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + return key.exists() + + def remove(self, path): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + if key.exists(): + # It's a file + key.delete() + return + # We assume it's a directory + if not path.endswith('/'): + path += '/' + for key in self._s3_bucket.list(prefix=path, delimiter='/'): + key.delete() + + def get_size(self, path): + path = self._init_path(path) + # Lookup does a HEAD HTTP Request on the object + key = self._s3_bucket.lookup(path) + if not key: + raise OSError('No such key: \'{0}\''.format(path)) + return key.size diff --git a/util/checksums.py b/util/checksums.py new file mode 100644 index 000000000..16ba05404 --- /dev/null +++ b/util/checksums.py @@ -0,0 +1,89 @@ + +import hashlib +import logging +import tarfile + + +TarError = tarfile.TarError +logger = logging.getLogger(__name__) + + +def sha256_file(fp, data=None): + h = hashlib.sha256(data or '') + if not fp: + return h.hexdigest() + while True: + buf = fp.read(4096) + if not buf: + break + h.update(buf) + return h.hexdigest() + + +def sha256_string(s): + return hashlib.sha256(s).hexdigest() + + +def compute_tarsum(fp, json_data): + header_fields = ('name', 'mode', 'uid', 'gid', 'size', 'mtime', + 'type', 'linkname', 'uname', 'gname', 'devmajor', + 'devminor') + tar = tarfile.open(mode='r|*', fileobj=fp) + hashes = [] + for member in tar: + header = '' + for field in header_fields: + value = getattr(member, field) + if field == 'type': + field = 'typeflag' + elif field == 'name': + if member.isdir() and not value.endswith('/'): + value += '/' + header += '{0}{1}'.format(field, value) + h = None + try: + if member.size > 0: + f = tar.extractfile(member) + h = sha256_file(f, header) + else: + h = sha256_string(header) + except KeyError: + h = sha256_string(header) + hashes.append(h) + #log = '\n+ filename: {0}\n'.format(member.name) + #log += '+ header: {0}\n'.format(header) + #log += '+ hash: {0}\n'.format(h) + #log += '*' * 16 + #logger.debug('checksums.compute_tarsum: \n{0}'.format(log)) + tar.close() + hashes.sort() + data = json_data + ''.join(hashes) + #logger.debug('checksums.compute_tarsum: ' + # 'Hashes: \n{0}\n{1}'.format('\n'.join(hashes), '-' * 16)) + tarsum = 'tarsum+sha256:{0}'.format(sha256_string(data)) + logger.debug('checksums.compute_tarsum: return {0}'.format(tarsum)) + return tarsum + + +def simple_checksum_handler(json_data): + h = hashlib.sha256(json_data + '\n') + + def fn(buf): + h.update(buf) + return h, fn + + +def compute_simple(fp, json_data): + data = json_data + '\n' + return 'sha256:{0}'.format(sha256_file(fp, data)) + + +if __name__ == '__main__': + import sys + if len(sys.argv) < 3: + print 'Usage: {0} json_file layer_file'.format(sys.argv[0]) + sys.exit(1) + json_data = file(sys.argv[1]).read() + fp = open(sys.argv[2]) + print compute_simple(fp, json_data) + print compute_tarsum(fp, json_data)