Merge remote-tracking branch 'origin/laffa' into nomenclature
Conflicts: test/data/test.db
This commit is contained in:
commit
a0d94f9d59
26 changed files with 1088 additions and 83 deletions
|
@ -17,6 +17,7 @@ from endpoints.index import index
|
||||||
from endpoints.web import web
|
from endpoints.web import web
|
||||||
from endpoints.tags import tags
|
from endpoints.tags import tags
|
||||||
from endpoints.registry import registry
|
from endpoints.registry import registry
|
||||||
|
from endpoints.verbs import verbs
|
||||||
from endpoints.webhooks import webhooks
|
from endpoints.webhooks import webhooks
|
||||||
from endpoints.realtime import realtime
|
from endpoints.realtime import realtime
|
||||||
from endpoints.callbacks import callback
|
from endpoints.callbacks import callback
|
||||||
|
@ -43,6 +44,7 @@ application.register_blueprint(callback, url_prefix='/oauth2')
|
||||||
application.register_blueprint(index, url_prefix='/v1')
|
application.register_blueprint(index, url_prefix='/v1')
|
||||||
application.register_blueprint(tags, url_prefix='/v1')
|
application.register_blueprint(tags, url_prefix='/v1')
|
||||||
application.register_blueprint(registry, url_prefix='/v1')
|
application.register_blueprint(registry, url_prefix='/v1')
|
||||||
|
application.register_blueprint(verbs, url_prefix='/c1')
|
||||||
application.register_blueprint(api_bp, url_prefix='/api')
|
application.register_blueprint(api_bp, url_prefix='/api')
|
||||||
application.register_blueprint(webhooks, url_prefix='/webhooks')
|
application.register_blueprint(webhooks, url_prefix='/webhooks')
|
||||||
application.register_blueprint(realtime, url_prefix='/realtime')
|
application.register_blueprint(realtime, url_prefix='/realtime')
|
||||||
|
|
|
@ -168,7 +168,7 @@ class Visibility(BaseModel):
|
||||||
|
|
||||||
|
|
||||||
class Repository(BaseModel):
|
class Repository(BaseModel):
|
||||||
namespace_user = ForeignKeyField(User)
|
namespace_user = ForeignKeyField(User, null=True)
|
||||||
name = CharField()
|
name = CharField()
|
||||||
visibility = ForeignKeyField(Visibility)
|
visibility = ForeignKeyField(Visibility)
|
||||||
description = TextField(null=True)
|
description = TextField(null=True)
|
||||||
|
@ -221,7 +221,6 @@ class PermissionPrototype(BaseModel):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AccessToken(BaseModel):
|
class AccessToken(BaseModel):
|
||||||
friendly_name = CharField(null=True)
|
friendly_name = CharField(null=True)
|
||||||
code = CharField(default=random_string_generator(length=64), unique=True,
|
code = CharField(default=random_string_generator(length=64), unique=True,
|
||||||
|
@ -267,6 +266,23 @@ class ImageStorage(BaseModel):
|
||||||
uploading = BooleanField(default=True, null=True)
|
uploading = BooleanField(default=True, null=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ImageStorageTransformation(BaseModel):
|
||||||
|
name = CharField(index=True, unique=True)
|
||||||
|
|
||||||
|
|
||||||
|
class DerivedImageStorage(BaseModel):
|
||||||
|
source = ForeignKeyField(ImageStorage, null=True, related_name='source')
|
||||||
|
derivative = ForeignKeyField(ImageStorage, related_name='derivative')
|
||||||
|
transformation = ForeignKeyField(ImageStorageTransformation)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
read_slaves = (read_slave,)
|
||||||
|
indexes = (
|
||||||
|
(('source', 'transformation'), True),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ImageStorageLocation(BaseModel):
|
class ImageStorageLocation(BaseModel):
|
||||||
name = CharField(unique=True, index=True)
|
name = CharField(unique=True, index=True)
|
||||||
|
|
||||||
|
@ -451,4 +467,5 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
|
||||||
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
|
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
|
||||||
Notification, ImageStorageLocation, ImageStoragePlacement,
|
Notification, ImageStorageLocation, ImageStoragePlacement,
|
||||||
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
|
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
|
||||||
RepositoryAuthorizedEmail, TeamMemberInvite]
|
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
|
||||||
|
TeamMemberInvite]
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
from __future__ import with_statement
|
from __future__ import with_statement
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
from alembic import context
|
from alembic import context
|
||||||
from sqlalchemy import engine_from_config, pool
|
from sqlalchemy import engine_from_config, pool
|
||||||
from logging.config import fileConfig
|
from logging.config import fileConfig
|
||||||
|
@ -12,8 +15,17 @@ from util.morecollections import AttrDict
|
||||||
|
|
||||||
# this is the Alembic Config object, which provides
|
# this is the Alembic Config object, which provides
|
||||||
# access to the values within the .ini file in use.
|
# access to the values within the .ini file in use.
|
||||||
|
db_uri = unquote(app.config['DB_URI'])
|
||||||
|
if 'GENMIGRATE' in os.environ:
|
||||||
|
docker_host = os.environ.get('DOCKER_HOST')
|
||||||
|
docker_host_ip = docker_host[len('tcp://'):].split(':')[0]
|
||||||
|
if os.environ.get('GENMIGRATE') == 'mysql':
|
||||||
|
db_uri = 'mysql+pymysql://root:password@%s/genschema' % (docker_host_ip)
|
||||||
|
else:
|
||||||
|
db_uri = 'postgresql://postgres@%s/genschema' % (docker_host_ip)
|
||||||
|
|
||||||
config = context.config
|
config = context.config
|
||||||
config.set_main_option('sqlalchemy.url', unquote(app.config['DB_URI']))
|
config.set_main_option('sqlalchemy.url', db_uri)
|
||||||
|
|
||||||
# Interpret the config file for Python logging.
|
# Interpret the config file for Python logging.
|
||||||
# This line sets up loggers basically.
|
# This line sets up loggers basically.
|
||||||
|
@ -57,7 +69,7 @@ def run_migrations_online():
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(db.obj, SqliteDatabase):
|
if isinstance(db.obj, SqliteDatabase) and not 'GENMIGRATE' in os.environ:
|
||||||
print ('Skipping Sqlite migration!')
|
print ('Skipping Sqlite migration!')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
82
data/migrations/migration.sh
Executable file
82
data/migrations/migration.sh
Executable file
|
@ -0,0 +1,82 @@
|
||||||
|
set -e
|
||||||
|
|
||||||
|
up_mysql() {
|
||||||
|
# Run a SQL database on port 3306 inside of Docker.
|
||||||
|
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mysql
|
||||||
|
|
||||||
|
# Sleep for 5s to get MySQL get started.
|
||||||
|
echo 'Sleeping for 5...'
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
# Add the database to mysql.
|
||||||
|
docker run --rm --link mysql:mysql mysql sh -c 'echo "create database genschema" | mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -ppassword'
|
||||||
|
}
|
||||||
|
|
||||||
|
down_mysql() {
|
||||||
|
docker kill mysql
|
||||||
|
docker rm mysql
|
||||||
|
}
|
||||||
|
|
||||||
|
up_postgres() {
|
||||||
|
# Run a SQL database on port 5432 inside of Docker.
|
||||||
|
docker run --name postgres -p 5432:5432 -d postgres
|
||||||
|
|
||||||
|
# Sleep for 5s to get SQL get started.
|
||||||
|
echo 'Sleeping for 5...'
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
# Add the database to postgres.
|
||||||
|
docker run --rm --link postgres:postgres postgres sh -c 'echo "create database genschema" | psql -h "$POSTGRES_PORT_5432_TCP_ADDR" -p "$POSTGRES_PORT_5432_TCP_PORT" -U postgres'
|
||||||
|
}
|
||||||
|
|
||||||
|
down_postgres() {
|
||||||
|
docker kill postgres
|
||||||
|
docker rm postgres
|
||||||
|
}
|
||||||
|
|
||||||
|
gen_migrate() {
|
||||||
|
# Generate the migration to the current model.
|
||||||
|
GENMIGRATE=$1 PYTHONPATH=. alembic revision --autogenerate -m "$@"
|
||||||
|
|
||||||
|
# Generate a SQLite database with the schema as defined by the existing alembic model.
|
||||||
|
GENMIGRATE=$1 PYTHONPATH=. alembic upgrade head
|
||||||
|
}
|
||||||
|
|
||||||
|
test_migrate() {
|
||||||
|
# Generate a SQLite database with the schema as defined by the existing alembic model.
|
||||||
|
GENMIGRATE=$1 PYTHONPATH=. alembic upgrade head
|
||||||
|
|
||||||
|
# Downgrade to verify it works in both directions.
|
||||||
|
COUNT=`ls data/migrations/versions/*.py | wc -l | tr -d ' '`
|
||||||
|
GENMIGRATE=$1 PYTHONPATH=. alembic downgrade "-$COUNT"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Test (and generate, if requested) via MySQL.
|
||||||
|
echo '> Starting MySQL'
|
||||||
|
up_mysql
|
||||||
|
|
||||||
|
if [ ! -z "$@" ]
|
||||||
|
then
|
||||||
|
set +e
|
||||||
|
echo '> Generating Migration'
|
||||||
|
gen_migrate "mysql"
|
||||||
|
set -e
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo '> Testing Migration (mysql)'
|
||||||
|
set +e
|
||||||
|
test_migrate "mysql"
|
||||||
|
set -e
|
||||||
|
down_mysql
|
||||||
|
|
||||||
|
# Test via Postgres.
|
||||||
|
echo '> Starting Postgres'
|
||||||
|
up_postgres
|
||||||
|
|
||||||
|
echo '> Testing Migration (postgres)'
|
||||||
|
set +e
|
||||||
|
test_migrate "postgres"
|
||||||
|
set -e
|
||||||
|
down_postgres
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
"""Calculate uncompressed sizes for all images
|
||||||
|
|
||||||
|
Revision ID: 2430f55c41d5
|
||||||
|
Revises: 3b4d3a4461dc
|
||||||
|
Create Date: 2014-10-07 14:50:04.660315
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '2430f55c41d5'
|
||||||
|
down_revision = '3b4d3a4461dc'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from util.uncompressedsize import backfill_sizes_from_data
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade(tables):
|
||||||
|
backfill_sizes_from_data()
|
||||||
|
|
||||||
|
def downgrade(tables):
|
||||||
|
pass
|
|
@ -0,0 +1,57 @@
|
||||||
|
"""Add support for squashed images
|
||||||
|
|
||||||
|
Revision ID: 3b4d3a4461dc
|
||||||
|
Revises: b1d41e2071b
|
||||||
|
Create Date: 2014-10-07 14:49:13.105746
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '3b4d3a4461dc'
|
||||||
|
down_revision = 'b1d41e2071b'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
def upgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table('imagestoragetransformation',
|
||||||
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('name', sa.String(length=255), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('id', name=op.f('pk_imagestoragetransformation'))
|
||||||
|
)
|
||||||
|
op.create_index('imagestoragetransformation_name', 'imagestoragetransformation', ['name'], unique=True)
|
||||||
|
op.create_table('derivedimagestorage',
|
||||||
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('source_id', sa.Integer(), nullable=True),
|
||||||
|
sa.Column('derivative_id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('transformation_id', sa.Integer(), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['derivative_id'], ['imagestorage.id'], name=op.f('fk_derivedimagestorage_derivative_id_imagestorage')),
|
||||||
|
sa.ForeignKeyConstraint(['source_id'], ['imagestorage.id'], name=op.f('fk_derivedimagestorage_source_id_imagestorage')),
|
||||||
|
sa.ForeignKeyConstraint(['transformation_id'], ['imagestoragetransformation.id'], name=op.f('fk_dis_transformation_id_ist')),
|
||||||
|
sa.PrimaryKeyConstraint('id', name=op.f('pk_derivedimagestorage'))
|
||||||
|
)
|
||||||
|
op.create_index('derivedimagestorage_derivative_id', 'derivedimagestorage', ['derivative_id'], unique=False)
|
||||||
|
op.create_index('derivedimagestorage_source_id', 'derivedimagestorage', ['source_id'], unique=False)
|
||||||
|
op.create_index('derivedimagestorage_source_id_transformation_id', 'derivedimagestorage', ['source_id', 'transformation_id'], unique=True)
|
||||||
|
op.create_index('derivedimagestorage_transformation_id', 'derivedimagestorage', ['transformation_id'], unique=False)
|
||||||
|
op.drop_index('image_repository_id_docker_image_id', table_name='image')
|
||||||
|
op.create_index('image_repository_id_docker_image_id', 'image', ['repository_id', 'docker_image_id'], unique=True)
|
||||||
|
op.drop_index('imagestorage_uuid', table_name='imagestorage')
|
||||||
|
op.create_index('imagestorage_uuid', 'imagestorage', ['uuid'], unique=False)
|
||||||
|
op.drop_column(u'repository', 'namespace')
|
||||||
|
op.create_index('repository_namespace_user_id', 'repository', ['namespace_user_id'], unique=False)
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_index('repository_namespace_user_id', table_name='repository')
|
||||||
|
op.add_column(u'repository', sa.Column('namespace', sa.String(length=255), nullable=True))
|
||||||
|
op.drop_index('imagestorage_uuid', table_name='imagestorage')
|
||||||
|
op.create_index('imagestorage_uuid', 'imagestorage', ['uuid'], unique=True)
|
||||||
|
op.drop_index('image_repository_id_docker_image_id', table_name='image')
|
||||||
|
op.create_index('image_repository_id_docker_image_id', 'image', ['repository_id', 'docker_image_id'], unique=False)
|
||||||
|
op.drop_table('derivedimagestorage')
|
||||||
|
op.drop_table('imagestoragetransformation')
|
||||||
|
### end Alembic commands ###
|
|
@ -13,7 +13,7 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor
|
||||||
Notification, ImageStorageLocation, ImageStoragePlacement,
|
Notification, ImageStorageLocation, ImageStoragePlacement,
|
||||||
ExternalNotificationEvent, ExternalNotificationMethod,
|
ExternalNotificationEvent, ExternalNotificationMethod,
|
||||||
RepositoryNotification, RepositoryAuthorizedEmail, TeamMemberInvite,
|
RepositoryNotification, RepositoryAuthorizedEmail, TeamMemberInvite,
|
||||||
random_string_generator, db, BUILD_PHASE)
|
DerivedImageStorage, random_string_generator, db, BUILD_PHASE)
|
||||||
from peewee import JOIN_LEFT_OUTER, fn
|
from peewee import JOIN_LEFT_OUTER, fn
|
||||||
from util.validation import (validate_username, validate_email, validate_password,
|
from util.validation import (validate_username, validate_email, validate_password,
|
||||||
INVALID_PASSWORD_MESSAGE)
|
INVALID_PASSWORD_MESSAGE)
|
||||||
|
@ -1161,6 +1161,14 @@ def __translate_ancestry(old_ancestry, translations, repository, username, prefe
|
||||||
return '/%s/' % '/'.join(new_ids)
|
return '/%s/' % '/'.join(new_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_storage(location_name):
|
||||||
|
storage = ImageStorage.create()
|
||||||
|
location = ImageStorageLocation.get(name=location_name)
|
||||||
|
ImageStoragePlacement.create(location=location, storage=storage)
|
||||||
|
storage.locations = {location_name}
|
||||||
|
return storage
|
||||||
|
|
||||||
|
|
||||||
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
||||||
preferred_location):
|
preferred_location):
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
@ -1201,10 +1209,7 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
||||||
origin_image_id = to_copy.id
|
origin_image_id = to_copy.id
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
||||||
storage = ImageStorage.create()
|
storage = _create_storage(preferred_location)
|
||||||
location = ImageStorageLocation.get(name=preferred_location)
|
|
||||||
ImageStoragePlacement.create(location=location, storage=storage)
|
|
||||||
storage.locations = {preferred_location}
|
|
||||||
|
|
||||||
logger.debug('Storage locations: %s', storage.locations)
|
logger.debug('Storage locations: %s', storage.locations)
|
||||||
|
|
||||||
|
@ -1222,6 +1227,26 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
||||||
return new_image
|
return new_image
|
||||||
|
|
||||||
|
|
||||||
|
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||||
|
try:
|
||||||
|
found = (ImageStorage
|
||||||
|
.select(ImageStorage, DerivedImageStorage)
|
||||||
|
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
|
||||||
|
.join(ImageStorageTransformation)
|
||||||
|
.where(DerivedImageStorage.source == source,
|
||||||
|
ImageStorageTransformation.name == transformation_name)
|
||||||
|
.get())
|
||||||
|
|
||||||
|
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
|
||||||
|
return found
|
||||||
|
except ImageStorage.DoesNotExist:
|
||||||
|
logger.debug('Creating storage dervied from source: %s', source.uuid)
|
||||||
|
trans = ImageStorageTransformation.get(name=transformation_name)
|
||||||
|
new_storage = _create_storage(preferred_location)
|
||||||
|
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
|
||||||
|
return new_storage
|
||||||
|
|
||||||
|
|
||||||
def get_storage_by_uuid(storage_uuid):
|
def get_storage_by_uuid(storage_uuid):
|
||||||
placements = list(ImageStoragePlacement
|
placements = list(ImageStoragePlacement
|
||||||
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
|
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
|
||||||
|
@ -1377,15 +1402,8 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
|
|
||||||
image_to_remove.delete_instance()
|
image_to_remove.delete_instance()
|
||||||
|
|
||||||
if uuids_to_check_for_gc:
|
def remove_storages(query):
|
||||||
storage_to_remove = (ImageStorage
|
for storage in query:
|
||||||
.select()
|
|
||||||
.join(Image, JOIN_LEFT_OUTER)
|
|
||||||
.group_by(ImageStorage)
|
|
||||||
.where(ImageStorage.uuid << list(uuids_to_check_for_gc))
|
|
||||||
.having(fn.Count(Image.id) == 0))
|
|
||||||
|
|
||||||
for storage in storage_to_remove:
|
|
||||||
logger.debug('Garbage collecting image storage: %s', storage.uuid)
|
logger.debug('Garbage collecting image storage: %s', storage.uuid)
|
||||||
|
|
||||||
image_path = config.store.image_path(storage.uuid)
|
image_path = config.store.image_path(storage.uuid)
|
||||||
|
@ -1394,7 +1412,24 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
placement.delete_instance()
|
placement.delete_instance()
|
||||||
config.store.remove({location_name}, image_path)
|
config.store.remove({location_name}, image_path)
|
||||||
|
|
||||||
storage.delete_instance()
|
storage.delete_instance(recursive=True)
|
||||||
|
|
||||||
|
if uuids_to_check_for_gc:
|
||||||
|
storage_to_remove = (ImageStorage
|
||||||
|
.select()
|
||||||
|
.join(Image, JOIN_LEFT_OUTER)
|
||||||
|
.group_by(ImageStorage)
|
||||||
|
.where(ImageStorage.uuid << list(uuids_to_check_for_gc))
|
||||||
|
.having(fn.Count(Image.id) == 0))
|
||||||
|
|
||||||
|
remove_storages(storage_to_remove)
|
||||||
|
|
||||||
|
# Now remove any derived image storages whose sources have been removed
|
||||||
|
derived_storages_to_remove = (ImageStorage
|
||||||
|
.select()
|
||||||
|
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
|
||||||
|
.where(DerivedImageStorage.source >> None))
|
||||||
|
remove_storages(derived_storages_to_remove)
|
||||||
|
|
||||||
return len(to_remove)
|
return len(to_remove)
|
||||||
|
|
||||||
|
|
115
endpoints/verbs.py
Normal file
115
endpoints/verbs.py
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from flask import redirect, Blueprint, abort, send_file
|
||||||
|
|
||||||
|
from app import storage as store, app
|
||||||
|
from auth.auth import process_auth
|
||||||
|
from auth.permissions import ReadRepositoryPermission
|
||||||
|
from data import model
|
||||||
|
from data import database
|
||||||
|
|
||||||
|
from util.queuefile import QueueFile
|
||||||
|
from util.queueprocess import QueueProcess
|
||||||
|
from util.gzipwrap import GzipWrap
|
||||||
|
from util.dockerloadformat import build_docker_load_stream
|
||||||
|
|
||||||
|
|
||||||
|
verbs = Blueprint('verbs', __name__)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list):
|
||||||
|
def get_next_image():
|
||||||
|
for current_image_id in image_list:
|
||||||
|
yield model.get_repo_image(namespace, repository, current_image_id)
|
||||||
|
|
||||||
|
def get_next_layer():
|
||||||
|
for current_image_id in image_list:
|
||||||
|
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
|
||||||
|
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
|
||||||
|
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
|
||||||
|
current_image_path)
|
||||||
|
|
||||||
|
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
|
||||||
|
yield current_image_stream
|
||||||
|
|
||||||
|
database.configure(app.config)
|
||||||
|
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
|
||||||
|
get_next_image, get_next_layer)
|
||||||
|
|
||||||
|
return stream.read
|
||||||
|
|
||||||
|
|
||||||
|
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
|
||||||
|
image_path = store.image_layer_path(linked_storage_uuid)
|
||||||
|
store.stream_write(linked_locations, image_path, queue_file)
|
||||||
|
queue_file.close()
|
||||||
|
|
||||||
|
database.configure(app.config)
|
||||||
|
done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
|
||||||
|
done_uploading.uploading = False
|
||||||
|
done_uploading.save()
|
||||||
|
|
||||||
|
|
||||||
|
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
|
||||||
|
@process_auth
|
||||||
|
def get_squashed_tag(namespace, repository, tag):
|
||||||
|
permission = ReadRepositoryPermission(namespace, repository)
|
||||||
|
if permission.can() or model.repository_is_public(namespace, repository):
|
||||||
|
# Lookup the requested tag.
|
||||||
|
tag_image = model.get_tag_image(namespace, repository, tag)
|
||||||
|
if not tag_image:
|
||||||
|
abort(404)
|
||||||
|
|
||||||
|
# Lookup the tag's image and storage.
|
||||||
|
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
|
||||||
|
if not repo_image:
|
||||||
|
abort(404)
|
||||||
|
|
||||||
|
derived = model.find_or_create_derived_storage(repo_image.storage, 'squash',
|
||||||
|
store.preferred_locations[0])
|
||||||
|
if not derived.uploading:
|
||||||
|
logger.debug('Derived image %s exists in storage', derived.uuid)
|
||||||
|
derived_layer_path = store.image_layer_path(derived.uuid)
|
||||||
|
download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
|
||||||
|
if download_url:
|
||||||
|
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
|
||||||
|
return redirect(download_url)
|
||||||
|
|
||||||
|
logger.debug('Sending cached derived image %s', derived.uuid)
|
||||||
|
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
|
||||||
|
|
||||||
|
# Load the ancestry for the image.
|
||||||
|
logger.debug('Building and returning derived image %s', derived.uuid)
|
||||||
|
uuid = repo_image.storage.uuid
|
||||||
|
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
|
||||||
|
full_image_list = json.loads(ancestry_data)
|
||||||
|
|
||||||
|
# Load the image's JSON layer.
|
||||||
|
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
|
||||||
|
image_json = json.loads(image_json_data)
|
||||||
|
|
||||||
|
# Calculate a synthetic image ID.
|
||||||
|
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':squash').hexdigest()
|
||||||
|
|
||||||
|
# Create a queue process to generate the data. The queue files will read from the process
|
||||||
|
# and send the results to the client and storage.
|
||||||
|
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
|
||||||
|
queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max
|
||||||
|
|
||||||
|
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
||||||
|
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
|
||||||
|
|
||||||
|
# Start building.
|
||||||
|
queue_process.run()
|
||||||
|
|
||||||
|
# Start the storage saving.
|
||||||
|
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
||||||
|
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args)
|
||||||
|
|
||||||
|
# Return the client's data.
|
||||||
|
return send_file(client_queue_file)
|
||||||
|
|
||||||
|
abort(403)
|
|
@ -247,6 +247,8 @@ def initialize_database():
|
||||||
ImageStorageLocation.create(name='local_eu')
|
ImageStorageLocation.create(name='local_eu')
|
||||||
ImageStorageLocation.create(name='local_us')
|
ImageStorageLocation.create(name='local_us')
|
||||||
|
|
||||||
|
ImageStorageTransformation.create(name='squash')
|
||||||
|
|
||||||
# NOTE: These MUST be copied over to NotificationKind, since every external
|
# NOTE: These MUST be copied over to NotificationKind, since every external
|
||||||
# notification can also generate a Quay.io notification.
|
# notification can also generate a Quay.io notification.
|
||||||
ExternalNotificationEvent.create(name='repo_push')
|
ExternalNotificationEvent.create(name='repo_push')
|
||||||
|
|
|
@ -36,3 +36,4 @@ psycopg2
|
||||||
pyyaml
|
pyyaml
|
||||||
git+https://github.com/DevTable/aniso8601-fake.git
|
git+https://github.com/DevTable/aniso8601-fake.git
|
||||||
git+https://github.com/DevTable/anunidecode.git
|
git+https://github.com/DevTable/anunidecode.git
|
||||||
|
gipc
|
|
@ -2215,37 +2215,57 @@ p.editable:hover i {
|
||||||
font-size: 0.8em;
|
font-size: 0.8em;
|
||||||
position: relative;
|
position: relative;
|
||||||
margin-top: 30px;
|
margin-top: 30px;
|
||||||
margin-right: 26px;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
.repo .pull-container {
|
.repo .pull-container {
|
||||||
display: inline-block;
|
display: inline-block;
|
||||||
width: 300px;
|
width: 460px;
|
||||||
margin-left: 10px;
|
margin-left: 10px;
|
||||||
margin-right: 10px;
|
margin-right: 10px;
|
||||||
vertical-align: middle;
|
vertical-align: middle;
|
||||||
|
position: relative;
|
||||||
}
|
}
|
||||||
|
|
||||||
.repo .pull-container input {
|
.repo .pull-container .pull-selector {
|
||||||
cursor: default;
|
|
||||||
background: white;
|
|
||||||
color: #666;
|
|
||||||
padding: 4px;
|
|
||||||
border: 1px solid #ddd;
|
|
||||||
width: 300px;
|
|
||||||
}
|
|
||||||
|
|
||||||
.repo-image-view .id-container {
|
|
||||||
display: inline-block;
|
display: inline-block;
|
||||||
margin-top: 10px;
|
width: 114px;
|
||||||
|
font-size: 14px;
|
||||||
|
height: 36px;
|
||||||
|
vertical-align: top;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
margin-right: -3px;
|
||||||
|
background: #f8f8f8;
|
||||||
|
outline: none;
|
||||||
|
border-top-left-radius: 4px;
|
||||||
|
border-bottom-left-radius: 4px;
|
||||||
}
|
}
|
||||||
|
|
||||||
.repo-image-view .id-container input {
|
.repo .pull-container .pull-selector i {
|
||||||
background: #fefefe;
|
display: inline-block;
|
||||||
|
margin-right: 6px;
|
||||||
}
|
}
|
||||||
|
|
||||||
.repo-image-view .id-container .input-group {
|
|
||||||
width: 542px;
|
.repo .pull-container .copy-box {
|
||||||
|
width: 340px;
|
||||||
|
display: inline-block;
|
||||||
|
}
|
||||||
|
|
||||||
|
.repo .pull-container .copy-box .copy-container {
|
||||||
|
border-top-left-radius: 0px !important;
|
||||||
|
border-bottom-left-radius: 0px !important;
|
||||||
|
border-left: 0px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.repo .pull-container .dropdown-menu li i.fa {
|
||||||
|
text-align: center;
|
||||||
|
width: 12px;
|
||||||
|
display: inline-block;
|
||||||
|
}
|
||||||
|
|
||||||
|
.repo .pull-container sup {
|
||||||
|
margin-left: 4px;
|
||||||
|
color: red;
|
||||||
}
|
}
|
||||||
|
|
||||||
.repo-image-view #clipboardCopied {
|
.repo-image-view #clipboardCopied {
|
||||||
|
@ -2281,25 +2301,45 @@ p.editable:hover i {
|
||||||
position: relative;
|
position: relative;
|
||||||
}
|
}
|
||||||
|
|
||||||
.copy-box-element.disabled .input-group-addon {
|
.copy-box-element .copy-container {
|
||||||
display: none;
|
border-radius: 4px !important;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
position: relative;
|
||||||
|
}
|
||||||
|
|
||||||
|
.copy-box-element input {
|
||||||
|
border: 0px;
|
||||||
|
padding-right: 32px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.copy-box-element .copy-container .copy-icon {
|
||||||
|
position: absolute;
|
||||||
|
top: 8px;
|
||||||
|
right: 10px;
|
||||||
|
display: inline-block;
|
||||||
|
color: #ddd;
|
||||||
|
font-size: 16px;
|
||||||
|
cursor: pointer;
|
||||||
|
transition: color 0.5s ease-in-out;
|
||||||
|
}
|
||||||
|
|
||||||
|
.copy-box-element .copy-container .copy-icon.zeroclipboard-is-hover {
|
||||||
|
color: #444;
|
||||||
}
|
}
|
||||||
|
|
||||||
.copy-box-element.disabled input {
|
.copy-box-element.disabled input {
|
||||||
border-radius: 4px !important;
|
margin-right: 0px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.copy-box-element.disabled .copy-icon {
|
||||||
|
display: none;
|
||||||
}
|
}
|
||||||
|
|
||||||
.global-zeroclipboard-container embed {
|
.global-zeroclipboard-container embed {
|
||||||
cursor: pointer;
|
cursor: pointer;
|
||||||
}
|
}
|
||||||
|
|
||||||
#copyClipboard.zeroclipboard-is-hover, .copy-box-element .zeroclipboard-is-hover {
|
.copy-box-element .hovering {
|
||||||
background: #428bca;
|
|
||||||
color: white;
|
|
||||||
cursor: pointer !important;
|
|
||||||
}
|
|
||||||
|
|
||||||
#clipboardCopied.hovering, .copy-box-element .hovering {
|
|
||||||
position: absolute;
|
position: absolute;
|
||||||
right: 0px;
|
right: 0px;
|
||||||
top: 40px;
|
top: 40px;
|
||||||
|
@ -2307,16 +2347,11 @@ p.editable:hover i {
|
||||||
z-index: 100;
|
z-index: 100;
|
||||||
}
|
}
|
||||||
|
|
||||||
.copy-box-element .id-container {
|
|
||||||
display: inline-block;
|
|
||||||
vertical-align: middle;
|
|
||||||
}
|
|
||||||
|
|
||||||
.copy-box-element input {
|
.copy-box-element input {
|
||||||
background-color: white !important;
|
background-color: white !important;
|
||||||
}
|
}
|
||||||
|
|
||||||
#clipboardCopied, .clipboard-copied-message {
|
.clipboard-copied-message {
|
||||||
font-size: 0.8em;
|
font-size: 0.8em;
|
||||||
display: inline-block;
|
display: inline-block;
|
||||||
margin-right: 10px;
|
margin-right: 10px;
|
||||||
|
@ -2327,7 +2362,7 @@ p.editable:hover i {
|
||||||
border-radius: 4px;
|
border-radius: 4px;
|
||||||
}
|
}
|
||||||
|
|
||||||
#clipboardCopied.animated, .clipboard-copied-message {
|
.clipboard-copied-message {
|
||||||
-webkit-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
-webkit-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
||||||
-moz-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
-moz-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
||||||
-ms-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
-ms-animation: fadeOut 4s ease-in-out 0s 1 forwards;
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
<div class="copy-box-element" ng-class="disabled ? 'disabled' : ''">
|
<div class="copy-box-element" ng-class="disabled ? 'disabled' : ''">
|
||||||
<div class="id-container">
|
<div class="id-container">
|
||||||
<div class="input-group">
|
<div class="copy-container">
|
||||||
<input type="text" class="form-control" value="{{ value }}" readonly>
|
<input type="text" class="form-control" value="{{ value }}" readonly>
|
||||||
<span class="input-group-addon" data-title="Copy to Clipboard">
|
<span class="copy-icon" data-title="Copy to Clipboard"
|
||||||
<i class="fa fa-copy"></i>
|
data-container="body"
|
||||||
|
data-placement="bottom"
|
||||||
|
bs-tooltip>
|
||||||
|
<i class="fa fa-clipboard"></i>
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -843,6 +843,15 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading
|
||||||
return config['SERVER_HOSTNAME'];
|
return config['SERVER_HOSTNAME'];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
config.getHost = function(opt_auth) {
|
||||||
|
var auth = opt_auth;
|
||||||
|
if (auth) {
|
||||||
|
auth = auth + '@';
|
||||||
|
}
|
||||||
|
|
||||||
|
return config['PREFERRED_URL_SCHEME'] + '://' + auth + config['SERVER_HOSTNAME'];
|
||||||
|
};
|
||||||
|
|
||||||
config.getUrl = function(opt_path) {
|
config.getUrl = function(opt_path) {
|
||||||
var path = opt_path || '';
|
var path = opt_path || '';
|
||||||
return config['PREFERRED_URL_SCHEME'] + '://' + config['SERVER_HOSTNAME'] + path;
|
return config['PREFERRED_URL_SCHEME'] + '://' + config['SERVER_HOSTNAME'] + path;
|
||||||
|
@ -2589,7 +2598,7 @@ quayApp.directive('copyBox', function () {
|
||||||
restrict: 'C',
|
restrict: 'C',
|
||||||
scope: {
|
scope: {
|
||||||
'value': '=value',
|
'value': '=value',
|
||||||
'hoveringMessage': '=hoveringMessage'
|
'hoveringMessage': '=hoveringMessage',
|
||||||
},
|
},
|
||||||
controller: function($scope, $element, $rootScope) {
|
controller: function($scope, $element, $rootScope) {
|
||||||
$scope.disabled = false;
|
$scope.disabled = false;
|
||||||
|
@ -2598,7 +2607,7 @@ quayApp.directive('copyBox', function () {
|
||||||
$rootScope.__copyBoxIdCounter = number + 1;
|
$rootScope.__copyBoxIdCounter = number + 1;
|
||||||
$scope.inputId = "copy-box-input-" + number;
|
$scope.inputId = "copy-box-input-" + number;
|
||||||
|
|
||||||
var button = $($element).find('.input-group-addon');
|
var button = $($element).find('.copy-icon');
|
||||||
var input = $($element).find('input');
|
var input = $($element).find('input');
|
||||||
|
|
||||||
input.attr('id', $scope.inputId);
|
input.attr('id', $scope.inputId);
|
||||||
|
|
|
@ -365,6 +365,9 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi
|
||||||
var namespace = $routeParams.namespace;
|
var namespace = $routeParams.namespace;
|
||||||
var name = $routeParams.name;
|
var name = $routeParams.name;
|
||||||
|
|
||||||
|
$scope.pullCommands = [];
|
||||||
|
$scope.currentPullCommand = null;
|
||||||
|
|
||||||
$rootScope.title = 'Loading...';
|
$rootScope.title = 'Loading...';
|
||||||
|
|
||||||
// Watch for the destruction of the scope.
|
// Watch for the destruction of the scope.
|
||||||
|
@ -399,6 +402,46 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi
|
||||||
$scope.buildDialogShowCounter = 0;
|
$scope.buildDialogShowCounter = 0;
|
||||||
$scope.getFormattedCommand = ImageMetadataService.getFormattedCommand;
|
$scope.getFormattedCommand = ImageMetadataService.getFormattedCommand;
|
||||||
|
|
||||||
|
$scope.setCurrentPullCommand = function(pullCommand) {
|
||||||
|
$scope.currentPullCommand = pullCommand;
|
||||||
|
};
|
||||||
|
|
||||||
|
$scope.updatePullCommand = function() {
|
||||||
|
$scope.pullCommands = [];
|
||||||
|
|
||||||
|
if ($scope.currentTag) {
|
||||||
|
$scope.pullCommands.push({
|
||||||
|
'title': 'docker pull (Tag ' + $scope.currentTag.name + ')',
|
||||||
|
'shortTitle': 'Pull Tag',
|
||||||
|
'icon': 'fa-tag',
|
||||||
|
'command': 'docker pull ' + Config.getDomain() + '/' + namespace + '/' + name + ':' + $scope.currentTag.name
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$scope.pullCommands.push({
|
||||||
|
'title': 'docker pull (Full Repository)',
|
||||||
|
'shortTitle': 'Pull Repo',
|
||||||
|
'icon': 'fa-code-fork',
|
||||||
|
'command': 'docker pull ' + Config.getDomain() + '/' + namespace + '/' + name
|
||||||
|
});
|
||||||
|
|
||||||
|
if ($scope.currentTag) {
|
||||||
|
var squash = 'curl -f ' + Config.getHost('ACCOUNTNAME:PASSWORDORTOKEN');
|
||||||
|
squash += '/c1/squash/' + namespace + '/' + name + '/' + $scope.currentTag.name;
|
||||||
|
squash += ' | docker load';
|
||||||
|
|
||||||
|
$scope.pullCommands.push({
|
||||||
|
'title': 'Squashed image (Tag ' + $scope.currentTag.name + ')',
|
||||||
|
'shortTitle': 'Squashed',
|
||||||
|
'icon': 'fa-file-archive-o',
|
||||||
|
'command': squash,
|
||||||
|
'experimental': true
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$scope.currentPullCommand = $scope.pullCommands[0];
|
||||||
|
};
|
||||||
|
|
||||||
$scope.showNewBuildDialog = function() {
|
$scope.showNewBuildDialog = function() {
|
||||||
$scope.buildDialogShowCounter++;
|
$scope.buildDialogShowCounter++;
|
||||||
};
|
};
|
||||||
|
@ -593,6 +636,8 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi
|
||||||
$location.search('tag', null);
|
$location.search('tag', null);
|
||||||
$location.search('image', imageId.substr(0, 12));
|
$location.search('image', imageId.substr(0, 12));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$scope.updatePullCommand();
|
||||||
};
|
};
|
||||||
|
|
||||||
$scope.setTag = function(tagName, opt_updateURL) {
|
$scope.setTag = function(tagName, opt_updateURL) {
|
||||||
|
@ -627,6 +672,8 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi
|
||||||
$scope.currentTag = null;
|
$scope.currentTag = null;
|
||||||
$scope.currentImage = null;
|
$scope.currentImage = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$scope.updatePullCommand();
|
||||||
};
|
};
|
||||||
|
|
||||||
$scope.getFirstTextLine = getFirstTextLine;
|
$scope.getFirstTextLine = getFirstTextLine;
|
||||||
|
|
|
@ -56,10 +56,21 @@
|
||||||
|
|
||||||
<!-- Pull Command -->
|
<!-- Pull Command -->
|
||||||
<span class="pull-command visible-md-inline">
|
<span class="pull-command visible-md-inline">
|
||||||
<div class="pull-container" data-title="Pull repository" bs-tooltip="tooltip.title">
|
<div class="pull-container" ng-show="currentPullCommand">
|
||||||
<div class="input-group">
|
<button class="pull-selector dropdown-toggle" data-toggle="dropdown">
|
||||||
<div class="copy-box" hovering-message="true" value="'docker pull ' + Config.getDomain() + '/' + repo.namespace + '/' + repo.name"></div>
|
<i class="fa" ng-class="currentPullCommand.icon"></i>
|
||||||
</div>
|
{{ currentPullCommand.shortTitle }}
|
||||||
|
<b class="caret"></b>
|
||||||
|
</button>
|
||||||
|
<ul class="dropdown-menu">
|
||||||
|
<li ng-repeat="pullCommand in pullCommands">
|
||||||
|
<a href="javascript:void(0)" ng-click="setCurrentPullCommand(pullCommand)"><i class="fa" ng-class="pullCommand.icon"></i>
|
||||||
|
{{ pullCommand.title }}
|
||||||
|
<sup ng-if="pullCommand.experimental">Experimental</sup>
|
||||||
|
</a>
|
||||||
|
</li>
|
||||||
|
</ul>
|
||||||
|
<div class="copy-box" hovering-message="true" value="currentPullCommand.command"></div>
|
||||||
</div>
|
</div>
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
|
|
Binary file not shown.
143
test/test_streamlayerformat.py
Normal file
143
test/test_streamlayerformat.py
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
import unittest
|
||||||
|
import tarfile
|
||||||
|
|
||||||
|
from StringIO import StringIO
|
||||||
|
from util.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT
|
||||||
|
|
||||||
|
class TestStreamLayerMerger(unittest.TestCase):
|
||||||
|
def create_layer(self, **kwargs):
|
||||||
|
output = StringIO()
|
||||||
|
with tarfile.open(fileobj=output, mode='w:gz') as tar:
|
||||||
|
for filename in kwargs:
|
||||||
|
current_filename = filename
|
||||||
|
current_contents = kwargs[filename]
|
||||||
|
|
||||||
|
if current_contents is None:
|
||||||
|
# This is a deleted file.
|
||||||
|
current_filename = AUFS_WHITEOUT + current_filename
|
||||||
|
current_contents = ''
|
||||||
|
|
||||||
|
info = tarfile.TarInfo(name=current_filename)
|
||||||
|
info.size = len(current_contents)
|
||||||
|
tar.addfile(info, fileobj=StringIO(current_contents))
|
||||||
|
|
||||||
|
return output.getvalue()
|
||||||
|
|
||||||
|
def squash_layers(self, layers):
|
||||||
|
def get_layers():
|
||||||
|
return [StringIO(layer) for layer in layers]
|
||||||
|
|
||||||
|
merger = StreamLayerMerger(get_layers)
|
||||||
|
merged_data = ''.join(merger.get_generator())
|
||||||
|
return merged_data
|
||||||
|
|
||||||
|
def assertHasFile(self, squashed, filename, contents):
|
||||||
|
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
|
||||||
|
member = tar.getmember(filename)
|
||||||
|
self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines()))
|
||||||
|
|
||||||
|
def assertDoesNotHaveFile(self, squashed, filename):
|
||||||
|
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
|
||||||
|
try:
|
||||||
|
member = tar.getmember(filename)
|
||||||
|
self.fail('Filename %s found' % filename)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_single_layer(self):
|
||||||
|
tar_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = 'bar',
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
squashed = self.squash_layers([tar_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'another_file', 'bar')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
|
||||||
|
def test_multiple_layers(self):
|
||||||
|
second_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = 'bar',
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
first_layer = self.create_layer(
|
||||||
|
top_file = 'top')
|
||||||
|
|
||||||
|
squashed = self.squash_layers([first_layer, second_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'another_file', 'bar')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
self.assertHasFile(squashed, 'top_file', 'top')
|
||||||
|
|
||||||
|
def test_multiple_layers_overwrite(self):
|
||||||
|
second_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = 'bar',
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
first_layer = self.create_layer(
|
||||||
|
another_file = 'top')
|
||||||
|
|
||||||
|
squashed = self.squash_layers([first_layer, second_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
self.assertHasFile(squashed, 'another_file', 'top')
|
||||||
|
|
||||||
|
def test_deleted_file(self):
|
||||||
|
second_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = 'bar',
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
first_layer = self.create_layer(
|
||||||
|
another_file = None)
|
||||||
|
|
||||||
|
squashed = self.squash_layers([first_layer, second_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
self.assertDoesNotHaveFile(squashed, 'another_file')
|
||||||
|
|
||||||
|
def test_deleted_readded_file(self):
|
||||||
|
third_layer = self.create_layer(
|
||||||
|
another_file = 'bar')
|
||||||
|
|
||||||
|
second_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = None,
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
first_layer = self.create_layer(
|
||||||
|
another_file = 'newagain')
|
||||||
|
|
||||||
|
squashed = self.squash_layers([first_layer, second_layer, third_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
self.assertHasFile(squashed, 'another_file', 'newagain')
|
||||||
|
|
||||||
|
def test_deleted_in_lower_layer(self):
|
||||||
|
third_layer = self.create_layer(
|
||||||
|
another_file = 'bar')
|
||||||
|
|
||||||
|
second_layer = self.create_layer(
|
||||||
|
some_file = 'foo',
|
||||||
|
another_file = None,
|
||||||
|
third_file = 'meh')
|
||||||
|
|
||||||
|
first_layer = self.create_layer(
|
||||||
|
top_file = 'top')
|
||||||
|
|
||||||
|
squashed = self.squash_layers([first_layer, second_layer, third_layer])
|
||||||
|
|
||||||
|
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||||
|
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||||
|
self.assertHasFile(squashed, 'top_file', 'top')
|
||||||
|
self.assertDoesNotHaveFile(squashed, 'another_file')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
31
util/aufs.py
Normal file
31
util/aufs.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
AUFS_METADATA = u'.wh..wh.'
|
||||||
|
AUFS_WHITEOUT = u'.wh.'
|
||||||
|
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
|
||||||
|
|
||||||
|
def is_aufs_metadata(filepath):
|
||||||
|
""" Returns whether the given filepath references an AUFS metadata file. """
|
||||||
|
filename = os.path.basename(filepath)
|
||||||
|
return filename.startswith(AUFS_METADATA) or filepath.startswith(AUFS_METADATA)
|
||||||
|
|
||||||
|
def get_deleted_filename(filepath):
|
||||||
|
""" Returns the name of the deleted file referenced by the AUFS whiteout file at
|
||||||
|
the given path or None if the file path does not reference a whiteout file.
|
||||||
|
"""
|
||||||
|
filename = os.path.basename(filepath)
|
||||||
|
if not filename.startswith(AUFS_WHITEOUT):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return filename[AUFS_WHITEOUT_PREFIX_LENGTH:]
|
||||||
|
|
||||||
|
def get_deleted_prefix(filepath):
|
||||||
|
""" Returns the path prefix of the deleted file referenced by the AUFS whiteout file at
|
||||||
|
the given path or None if the file path does not reference a whiteout file.
|
||||||
|
"""
|
||||||
|
deleted_filename = get_deleted_filename(filepath)
|
||||||
|
if deleted_filename is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
dirname = os.path.dirname(filepath)
|
||||||
|
return os.path.join('/', dirname, deleted_filename)
|
|
@ -1,16 +1,10 @@
|
||||||
import marisa_trie
|
import marisa_trie
|
||||||
import os
|
import os
|
||||||
import tarfile
|
import tarfile
|
||||||
|
from aufs import is_aufs_metadata, get_deleted_prefix
|
||||||
|
|
||||||
AUFS_METADATA = u'.wh..wh.'
|
|
||||||
|
|
||||||
AUFS_WHITEOUT = u'.wh.'
|
|
||||||
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
|
|
||||||
|
|
||||||
ALLOWED_TYPES = {tarfile.REGTYPE, tarfile.AREGTYPE}
|
ALLOWED_TYPES = {tarfile.REGTYPE, tarfile.AREGTYPE}
|
||||||
|
|
||||||
|
|
||||||
def files_and_dirs_from_tar(source_stream, removed_prefix_collector):
|
def files_and_dirs_from_tar(source_stream, removed_prefix_collector):
|
||||||
try:
|
try:
|
||||||
tar_stream = tarfile.open(mode='r|*', fileobj=source_stream)
|
tar_stream = tarfile.open(mode='r|*', fileobj=source_stream)
|
||||||
|
@ -20,22 +14,19 @@ def files_and_dirs_from_tar(source_stream, removed_prefix_collector):
|
||||||
|
|
||||||
for tar_info in tar_stream:
|
for tar_info in tar_stream:
|
||||||
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
||||||
dirname = os.path.dirname(absolute)
|
|
||||||
filename = os.path.basename(absolute)
|
|
||||||
|
|
||||||
# Skip directories and metadata
|
# Skip metadata.
|
||||||
if (filename.startswith(AUFS_METADATA) or
|
if is_aufs_metadata(absolute):
|
||||||
absolute.startswith(AUFS_METADATA)):
|
|
||||||
# Skip
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif filename.startswith(AUFS_WHITEOUT):
|
# Add prefixes of removed paths to the collector.
|
||||||
removed_filename = filename[AUFS_WHITEOUT_PREFIX_LENGTH:]
|
deleted_prefix = get_deleted_prefix(absolute)
|
||||||
removed_prefix = os.path.join('/', dirname, removed_filename)
|
if deleted_prefix is not None:
|
||||||
removed_prefix_collector.add(removed_prefix)
|
deleted_prefix.add(deleted_prefix)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif tar_info.type in ALLOWED_TYPES:
|
# Otherwise, yield the path if it is in the allowed types.
|
||||||
|
if tar_info.type in ALLOWED_TYPES:
|
||||||
yield '/' + absolute
|
yield '/' + absolute
|
||||||
|
|
||||||
|
|
||||||
|
|
128
util/dockerloadformat.py
Normal file
128
util/dockerloadformat.py
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
from util.gzipwrap import GzipWrap
|
||||||
|
from util.streamlayerformat import StreamLayerMerger
|
||||||
|
from app import app
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import json
|
||||||
|
import tarfile
|
||||||
|
|
||||||
|
class FileEstimationException(Exception):
|
||||||
|
""" Exception raised by build_docker_load_stream if the estimated size of the layer TAR
|
||||||
|
was lower than the actual size. This means the sent TAR header is wrong, and we have
|
||||||
|
to fail.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def build_docker_load_stream(namespace, repository, tag, synthetic_image_id,
|
||||||
|
layer_json, get_image_iterator, get_layer_iterator):
|
||||||
|
""" Builds and streams a synthetic .tar.gz that represents a squashed version
|
||||||
|
of the given layers, in `docker load` V1 format.
|
||||||
|
"""
|
||||||
|
return GzipWrap(_import_format_generator(namespace, repository, tag,
|
||||||
|
synthetic_image_id, layer_json,
|
||||||
|
get_image_iterator, get_layer_iterator))
|
||||||
|
|
||||||
|
|
||||||
|
def _import_format_generator(namespace, repository, tag, synthetic_image_id,
|
||||||
|
layer_json, get_image_iterator, get_layer_iterator):
|
||||||
|
|
||||||
|
# Docker import V1 Format (.tar):
|
||||||
|
# repositories - JSON file containing a repo -> tag -> image map
|
||||||
|
# {image ID folder}:
|
||||||
|
# json - The layer JSON
|
||||||
|
# layer.tar - The TARed contents of the layer
|
||||||
|
# VERSION - The docker import version: '1.0'
|
||||||
|
layer_merger = StreamLayerMerger(get_layer_iterator)
|
||||||
|
|
||||||
|
# Yield the repositories file:
|
||||||
|
synthetic_layer_info = {}
|
||||||
|
synthetic_layer_info[tag + '.squash'] = synthetic_image_id
|
||||||
|
|
||||||
|
hostname = app.config['SERVER_HOSTNAME']
|
||||||
|
repositories = {}
|
||||||
|
repositories[hostname + '/' + namespace + '/' + repository] = synthetic_layer_info
|
||||||
|
|
||||||
|
yield _tar_file('repositories', json.dumps(repositories))
|
||||||
|
|
||||||
|
# Yield the image ID folder.
|
||||||
|
yield _tar_folder(synthetic_image_id)
|
||||||
|
|
||||||
|
# Yield the JSON layer data.
|
||||||
|
layer_json = _build_layer_json(layer_json, synthetic_image_id)
|
||||||
|
yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json))
|
||||||
|
|
||||||
|
# Yield the VERSION file.
|
||||||
|
yield _tar_file(synthetic_image_id + '/VERSION', '1.0')
|
||||||
|
|
||||||
|
# Yield the merged layer data's header.
|
||||||
|
estimated_file_size = 0
|
||||||
|
for image in get_image_iterator():
|
||||||
|
estimated_file_size += image.storage.uncompressed_size
|
||||||
|
|
||||||
|
yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
|
||||||
|
|
||||||
|
# Yield the contents of the merged layer.
|
||||||
|
yielded_size = 0
|
||||||
|
for entry in layer_merger.get_generator():
|
||||||
|
yield entry
|
||||||
|
yielded_size += len(entry)
|
||||||
|
|
||||||
|
# If the yielded size is more than the estimated size (which is unlikely but possible), then
|
||||||
|
# raise an exception since the tar header will be wrong.
|
||||||
|
if yielded_size > estimated_file_size:
|
||||||
|
raise FileEstimationException()
|
||||||
|
|
||||||
|
# If the yielded size is less than the estimated size (which is likely), fill the rest with
|
||||||
|
# zeros.
|
||||||
|
if yielded_size < estimated_file_size:
|
||||||
|
yield '\0' * (estimated_file_size - yielded_size)
|
||||||
|
|
||||||
|
# Yield any file padding to 512 bytes that is necessary.
|
||||||
|
yield _tar_file_padding(estimated_file_size)
|
||||||
|
|
||||||
|
# Last two records are empty in TAR spec.
|
||||||
|
yield '\0' * 512
|
||||||
|
yield '\0' * 512
|
||||||
|
|
||||||
|
|
||||||
|
def _build_layer_json(layer_json, synthetic_image_id):
|
||||||
|
updated_json = copy.deepcopy(layer_json)
|
||||||
|
updated_json['id'] = synthetic_image_id
|
||||||
|
|
||||||
|
if 'parent' in updated_json:
|
||||||
|
del updated_json['parent']
|
||||||
|
|
||||||
|
if 'config' in updated_json and 'Image' in updated_json['config']:
|
||||||
|
updated_json['config']['Image'] = synthetic_image_id
|
||||||
|
|
||||||
|
if 'container_config' in updated_json and 'Image' in updated_json['container_config']:
|
||||||
|
updated_json['container_config']['Image'] = synthetic_image_id
|
||||||
|
|
||||||
|
return updated_json
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file(name, contents):
|
||||||
|
length = len(contents)
|
||||||
|
tar_data = _tar_file_header(name, length)
|
||||||
|
tar_data += contents
|
||||||
|
tar_data += _tar_file_padding(length)
|
||||||
|
return tar_data
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file_padding(length):
|
||||||
|
if length % 512 != 0:
|
||||||
|
return '\0' * (512 - (length % 512))
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file_header(name, file_size):
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.type = tarfile.REGTYPE
|
||||||
|
info.size = file_size
|
||||||
|
return info.tobuf()
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_folder(name):
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.type = tarfile.DIRTYPE
|
||||||
|
return info.tobuf()
|
|
@ -9,6 +9,8 @@ import zlib
|
||||||
# http://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760
|
# http://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760
|
||||||
ZLIB_GZIP_WINDOW = zlib.MAX_WBITS | 32
|
ZLIB_GZIP_WINDOW = zlib.MAX_WBITS | 32
|
||||||
|
|
||||||
|
CHUNK_SIZE = 5 * 1024 * 1024
|
||||||
|
|
||||||
class SizeInfo(object):
|
class SizeInfo(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.size = 0
|
self.size = 0
|
||||||
|
@ -23,6 +25,11 @@ def calculate_size_handler():
|
||||||
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
|
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
|
||||||
|
|
||||||
def fn(buf):
|
def fn(buf):
|
||||||
size_info.size += len(decompressor.decompress(buf))
|
# Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much
|
||||||
|
# memory. As a result, we have to loop until the unconsumed tail is empty.
|
||||||
|
current_data = buf
|
||||||
|
while len(current_data) > 0:
|
||||||
|
size_info.size += len(decompressor.decompress(current_data, CHUNK_SIZE))
|
||||||
|
current_data = decompressor.unconsumed_tail
|
||||||
|
|
||||||
return size_info, fn
|
return size_info, fn
|
||||||
|
|
50
util/gzipwrap.py
Normal file
50
util/gzipwrap.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
from gzip import GzipFile
|
||||||
|
|
||||||
|
class GzipWrap(object):
|
||||||
|
def __init__(self, input, filename=None, compresslevel=1):
|
||||||
|
self.input = iter(input)
|
||||||
|
self.buffer = ''
|
||||||
|
self.zipper = GzipFile(filename, mode='wb', fileobj=self, compresslevel=compresslevel)
|
||||||
|
|
||||||
|
def read(self, size=-1):
|
||||||
|
# If the buffer already has enough bytes, then simply pop them off of
|
||||||
|
# the beginning and return them.
|
||||||
|
if len(self.buffer) >= size:
|
||||||
|
ret = self.buffer[0:size]
|
||||||
|
self.buffer = self.buffer[size:]
|
||||||
|
return ret
|
||||||
|
|
||||||
|
# Otherwise, zip the input until we have enough bytes.
|
||||||
|
while True:
|
||||||
|
# Attempt to retrieve the next bytes to write.
|
||||||
|
is_done = False
|
||||||
|
|
||||||
|
input_size = 0
|
||||||
|
input_buffer = ''
|
||||||
|
while input_size < 1024 * 256: # 256K buffer to Gzip
|
||||||
|
try:
|
||||||
|
s = self.input.next()
|
||||||
|
input_buffer += s
|
||||||
|
input_size = input_size + len(s)
|
||||||
|
except StopIteration:
|
||||||
|
is_done = True
|
||||||
|
break
|
||||||
|
|
||||||
|
self.zipper.write(input_buffer)
|
||||||
|
|
||||||
|
if is_done:
|
||||||
|
self.zipper.flush()
|
||||||
|
|
||||||
|
if len(self.buffer) >= size or is_done:
|
||||||
|
ret = self.buffer[0:size]
|
||||||
|
self.buffer = self.buffer[size:]
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self.buffer += data
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.input.close()
|
41
util/queuefile.py
Normal file
41
util/queuefile.py
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
from multiprocessing import Queue
|
||||||
|
import os
|
||||||
|
|
||||||
|
class QueueFile(object):
|
||||||
|
""" Class which implements a file-like interface and reads from a blocking
|
||||||
|
multiprocessing queue.
|
||||||
|
"""
|
||||||
|
def __init__(self, queue, name=None):
|
||||||
|
self._queue = queue
|
||||||
|
self._closed = False
|
||||||
|
self._done = False
|
||||||
|
self._buffer = ''
|
||||||
|
self._total_size = 0
|
||||||
|
self._name = name
|
||||||
|
|
||||||
|
def read(self, size=8192):
|
||||||
|
if self._closed or self._done:
|
||||||
|
return None
|
||||||
|
|
||||||
|
while len(self._buffer) < size:
|
||||||
|
result = self._queue.get(block=True)
|
||||||
|
if result is None:
|
||||||
|
self._done = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
self._closed = True
|
||||||
|
raise result
|
||||||
|
|
||||||
|
self._buffer += result
|
||||||
|
self._total_size += len(result)
|
||||||
|
|
||||||
|
buf = self._buffer[0:size]
|
||||||
|
self._buffer = self._buffer[size:]
|
||||||
|
return buf
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._closed = True
|
61
util/queueprocess.py
Normal file
61
util/queueprocess.py
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
from multiprocessing import Process, Queue
|
||||||
|
import logging
|
||||||
|
import multiprocessing
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import gipc
|
||||||
|
|
||||||
|
logger = multiprocessing.log_to_stderr()
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
class QueueProcess(object):
|
||||||
|
""" Helper class which invokes a worker in a process to produce
|
||||||
|
data for one (or more) queues.
|
||||||
|
"""
|
||||||
|
def __init__(self, get_producer, chunk_size, max_size, args):
|
||||||
|
self._get_producer = get_producer
|
||||||
|
self._queues = []
|
||||||
|
self._chunk_size = chunk_size
|
||||||
|
self._max_size = max_size
|
||||||
|
self._args = args or []
|
||||||
|
|
||||||
|
def create_queue(self):
|
||||||
|
""" Adds a multiprocessing queue to the list of queues. Any queues added
|
||||||
|
will have the data produced appended.
|
||||||
|
"""
|
||||||
|
queue = Queue(self._max_size / self._chunk_size)
|
||||||
|
self._queues.append(queue)
|
||||||
|
return queue
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def run_process(target, args):
|
||||||
|
gipc.start_process(target=target, args=args)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# Important! gipc is used here because normal multiprocessing does not work
|
||||||
|
# correctly with gevent when we sleep.
|
||||||
|
args = (self._get_producer, self._queues, self._chunk_size, self._args)
|
||||||
|
QueueProcess.run_process(_run, args)
|
||||||
|
|
||||||
|
def _run(get_producer, queues, chunk_size, args):
|
||||||
|
producer = get_producer(*args)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = producer(chunk_size) or None
|
||||||
|
except Exception as ex:
|
||||||
|
data = ex
|
||||||
|
|
||||||
|
for queue in queues:
|
||||||
|
try:
|
||||||
|
queue.put(data, block=True, timeout=10)
|
||||||
|
except Exception as ex:
|
||||||
|
# One of the listeners stopped listening.
|
||||||
|
return
|
||||||
|
|
||||||
|
if data is None or isinstance(data, Exception):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Important! This allows the thread that writes the queue data to the pipe
|
||||||
|
# to do so. Otherwise, this hangs.
|
||||||
|
time.sleep(0)
|
||||||
|
|
91
util/streamlayerformat.py
Normal file
91
util/streamlayerformat.py
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
import marisa_trie
|
||||||
|
import os
|
||||||
|
import tarfile
|
||||||
|
from aufs import is_aufs_metadata, get_deleted_prefix
|
||||||
|
|
||||||
|
|
||||||
|
AUFS_METADATA = u'.wh..wh.'
|
||||||
|
|
||||||
|
AUFS_WHITEOUT = u'.wh.'
|
||||||
|
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
|
||||||
|
|
||||||
|
class StreamLayerMerger(object):
|
||||||
|
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
|
||||||
|
def __init__(self, layer_iterator):
|
||||||
|
self.trie = marisa_trie.Trie()
|
||||||
|
self.layer_iterator = layer_iterator
|
||||||
|
self.encountered = []
|
||||||
|
|
||||||
|
def get_generator(self):
|
||||||
|
for current_layer in self.layer_iterator():
|
||||||
|
# Read the current layer as TAR. If it is empty, we just continue
|
||||||
|
# to the next layer.
|
||||||
|
try:
|
||||||
|
tar_file = tarfile.open(mode='r|*', fileobj=current_layer)
|
||||||
|
except tarfile.ReadError as re:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# For each of the tar entries, yield them IF and ONLY IF we have not
|
||||||
|
# encountered the path before.
|
||||||
|
|
||||||
|
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
|
||||||
|
chunk_size = 1024 * 1024 * 9
|
||||||
|
|
||||||
|
for tar_info in tar_file:
|
||||||
|
if not self.check_tar_info(tar_info):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Yield the tar header.
|
||||||
|
yield tar_info.tobuf()
|
||||||
|
|
||||||
|
# Try to extract any file contents for the tar. If found, we yield them as well.
|
||||||
|
if tar_info.isreg():
|
||||||
|
file_stream = tar_file.extractfile(tar_info)
|
||||||
|
if file_stream is not None:
|
||||||
|
length = 0
|
||||||
|
while True:
|
||||||
|
current_block = file_stream.read(chunk_size)
|
||||||
|
if not len(current_block):
|
||||||
|
break
|
||||||
|
|
||||||
|
yield current_block
|
||||||
|
length += len(current_block)
|
||||||
|
|
||||||
|
file_stream.close()
|
||||||
|
|
||||||
|
# Files must be padding to 512 byte multiples.
|
||||||
|
if length % 512 != 0:
|
||||||
|
yield '\0' * (512 - (length % 512))
|
||||||
|
|
||||||
|
# Close the layer stream now that we're done with it.
|
||||||
|
tar_file.close()
|
||||||
|
|
||||||
|
# Update the trie with the new encountered entries.
|
||||||
|
self.trie = marisa_trie.Trie(self.encountered)
|
||||||
|
|
||||||
|
# Last two records are empty in TAR spec.
|
||||||
|
yield '\0' * 512
|
||||||
|
yield '\0' * 512
|
||||||
|
|
||||||
|
|
||||||
|
def check_tar_info(self, tar_info):
|
||||||
|
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
||||||
|
|
||||||
|
# Skip metadata.
|
||||||
|
if is_aufs_metadata(absolute):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Add any prefix of deleted paths to the prefix list.
|
||||||
|
deleted_prefix = get_deleted_prefix(absolute)
|
||||||
|
if deleted_prefix is not None:
|
||||||
|
self.encountered.append(deleted_prefix)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if this file has already been encountered somewhere. If so,
|
||||||
|
# skip it.
|
||||||
|
if unicode(absolute) in self.trie:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Otherwise, add the path to the encountered list and return it.
|
||||||
|
self.encountered.append(absolute)
|
||||||
|
return True
|
|
@ -1,5 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
import zlib
|
import zlib
|
||||||
|
import sys
|
||||||
|
|
||||||
from data import model
|
from data import model
|
||||||
from data.database import ImageStorage
|
from data.database import ImageStorage
|
||||||
|
@ -15,6 +16,15 @@ CHUNK_SIZE = 5 * 1024 * 1024
|
||||||
|
|
||||||
|
|
||||||
def backfill_sizes_from_data():
|
def backfill_sizes_from_data():
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
logger.debug('Starting uncompressed image size backfill')
|
||||||
|
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
ch = logging.StreamHandler(sys.stdout)
|
||||||
|
ch.setFormatter(formatter)
|
||||||
|
logger.addHandler(ch)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Load the record from the DB.
|
# Load the record from the DB.
|
||||||
batch_ids = list(ImageStorage
|
batch_ids = list(ImageStorage
|
||||||
|
@ -47,7 +57,9 @@ def backfill_sizes_from_data():
|
||||||
if len(current_data) == 0:
|
if len(current_data) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
uncompressed_size += len(decompressor.decompress(current_data))
|
while current_data:
|
||||||
|
uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE))
|
||||||
|
current_data = decompressor.unconsumed_tail
|
||||||
|
|
||||||
# Write the size to the image storage. We do so under a transaction AFTER checking to
|
# Write the size to the image storage. We do so under a transaction AFTER checking to
|
||||||
# make sure the image storage still exists and has not changed.
|
# make sure the image storage still exists and has not changed.
|
Reference in a new issue