diff --git a/data/database.py b/data/database.py index c8fbc27ea..ffea9aadd 100644 --- a/data/database.py +++ b/data/database.py @@ -853,9 +853,13 @@ class TorrentInfo(BaseModel): piece_length = IntegerField() pieces = Base64BinaryField() - indexes = ( - (('storage', 'piece_length'), True), - ) + class Meta: + database = db + read_slaves = (read_slave,) + indexes = ( + # we may want to compute the piece hashes multiple times with different piece lengths + (('storage', 'piece_length'), True), + ) all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, diff --git a/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py b/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py new file mode 100644 index 000000000..1e16f9838 --- /dev/null +++ b/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py @@ -0,0 +1,38 @@ +"""Add the torrentinfo table and torrent fields on blobuploads. + +Revision ID: 23ca04d0bc8e +Revises: 471caec2cb66 +Create Date: 2016-01-06 13:25:24.597037 + +""" + +# revision identifiers, used by Alembic. +revision = '23ca04d0bc8e' +down_revision = '471caec2cb66' + +from alembic import op +import sqlalchemy as sa + +def upgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.create_table('torrentinfo', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('storage_id', sa.Integer(), nullable=False), + sa.Column('piece_length', sa.Integer(), nullable=False), + sa.Column('pieces', sa.Text(), nullable=False), + sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], name=op.f('fk_torrentinfo_storage_id_imagestorage')), + sa.PrimaryKeyConstraint('id', name=op.f('pk_torrentinfo')) + ) + op.create_index('torrentinfo_storage_id', 'torrentinfo', ['storage_id'], unique=False) + op.create_index('torrentinfo_storage_id_piece_length', 'torrentinfo', ['storage_id', 'piece_length'], unique=True) + op.add_column(u'blobupload', sa.Column('piece_hashes', sa.Text(), nullable=False)) + op.add_column(u'blobupload', sa.Column('piece_sha_state', sa.Text(), nullable=True)) + ### end Alembic commands ### + + +def downgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column(u'blobupload', 'piece_sha_state') + op.drop_column(u'blobupload', 'piece_hashes') + op.drop_table('torrentinfo') + ### end Alembic commands ### diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index d4d33c630..a378f6c5d 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -239,7 +239,7 @@ def put_image_layer(namespace, repository, image_id): updated_storage = model.storage.set_image_storage_metadata(image_id, namespace, repository, size_info.compressed_size, size_info.uncompressed_size) - pieces_bytes = piece_hasher.piece_hashes + piece_hasher.hash_fragment.digest() + pieces_bytes = piece_hasher.final_piece_hashes() model.storage.save_torrent_info(updated_storage, app.config['TORRENT_PIECE_SIZE'], pieces_bytes) # Append the computed checksum. diff --git a/endpoints/verbs.py b/endpoints/verbs.py index f91a73565..4a0c6edf5 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -15,7 +15,10 @@ from storage import Storage from util.registry.queuefile import QueueFile from util.registry.queueprocess import QueueProcess -from util.registry.torrent import make_torrent, per_user_torrent_filename, public_torrent_filename +from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename, + PieceHasher) +from util.registry.filelike import wrap_with_handler +from util.registry.gzipstream import SizeInfo from formats.squashed import SquashedDockerImage from formats.aci import ACIImage from endpoints.v2.blob import BLOB_DIGEST_ROUTE @@ -25,7 +28,8 @@ verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) -def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image): +def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, + handlers): store = Storage(app) # For performance reasons, we load the full image list here, cache it, then disconnect from @@ -55,10 +59,13 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json, get_next_image, get_next_layer, get_image_json) + for handler_fn in handlers: + stream = wrap_with_handler(stream, handler_fn) + return stream.read -def _sign_sythentic_image(verb, linked_storage_uuid, queue_file): +def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): signature = None try: signature = signer.detached_sign(queue_file) @@ -200,12 +207,20 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= # Close any existing DB connection once the process has exited. database.close_db_filter(None) + hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE']) + def _store_metadata_and_cleanup(): + with database.UseThenDisconnect(app.config): + model.storage.save_torrent_info(derived, app.config['TORRENT_PIECE_SIZE'], + hasher.final_piece_hashes()) + # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. - args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image) + handlers = [hasher.update] + args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, + handlers) queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max - args, finished=_cleanup) + args, finished=_store_metadata_and_cleanup) client_queue_file = QueueFile(queue_process.create_queue(), 'client') storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') @@ -224,7 +239,7 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= if sign and signer.name: signing_args = (verb, derived.uuid, signing_queue_file) - QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup) + QueueProcess.run_process(_sign_synthetic_image, signing_args, finished=_cleanup) # Close the database handle here for this process before we send the long download. database.close_db_filter(None) diff --git a/requirements.txt b/requirements.txt index cc98dad31..d6323dc2b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ APScheduler==3.0.3 autobahn==0.9.3-3 Babel==1.3 beautifulsoup4==4.4.0 -bintrees==2.0.2 +bintrees==2.0.3 blinker==1.3 boto==2.38.0 cachetools==1.0.3 diff --git a/test/data/test.db b/test/data/test.db index 16a8c3581..e26e28a9b 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/util/registry/torrent.py b/util/registry/torrent.py index 0cc9cb349..18ff45a9b 100644 --- a/util/registry/torrent.py +++ b/util/registry/torrent.py @@ -80,3 +80,6 @@ class PieceHasher(object): @property def hash_fragment(self): return self._hash_fragment + + def final_piece_hashes(self): + return self._piece_hashes + self._hash_fragment.digest()