Merge pull request #1119 from jakedt/torrent

Add piece hashing for synthetic images
This commit is contained in:
Jake Moshenko 2016-01-06 14:07:11 -05:00
commit bc01fed9d4
7 changed files with 71 additions and 11 deletions

View file

@ -853,9 +853,13 @@ class TorrentInfo(BaseModel):
piece_length = IntegerField()
pieces = Base64BinaryField()
indexes = (
(('storage', 'piece_length'), True),
)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# we may want to compute the piece hashes multiple times with different piece lengths
(('storage', 'piece_length'), True),
)
all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility,

View file

@ -0,0 +1,38 @@
"""Add the torrentinfo table and torrent fields on blobuploads.
Revision ID: 23ca04d0bc8e
Revises: 471caec2cb66
Create Date: 2016-01-06 13:25:24.597037
"""
# revision identifiers, used by Alembic.
revision = '23ca04d0bc8e'
down_revision = '471caec2cb66'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.create_table('torrentinfo',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('storage_id', sa.Integer(), nullable=False),
sa.Column('piece_length', sa.Integer(), nullable=False),
sa.Column('pieces', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], name=op.f('fk_torrentinfo_storage_id_imagestorage')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_torrentinfo'))
)
op.create_index('torrentinfo_storage_id', 'torrentinfo', ['storage_id'], unique=False)
op.create_index('torrentinfo_storage_id_piece_length', 'torrentinfo', ['storage_id', 'piece_length'], unique=True)
op.add_column(u'blobupload', sa.Column('piece_hashes', sa.Text(), nullable=False))
op.add_column(u'blobupload', sa.Column('piece_sha_state', sa.Text(), nullable=True))
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_column(u'blobupload', 'piece_sha_state')
op.drop_column(u'blobupload', 'piece_hashes')
op.drop_table('torrentinfo')
### end Alembic commands ###

View file

@ -239,7 +239,7 @@ def put_image_layer(namespace, repository, image_id):
updated_storage = model.storage.set_image_storage_metadata(image_id, namespace, repository,
size_info.compressed_size,
size_info.uncompressed_size)
pieces_bytes = piece_hasher.piece_hashes + piece_hasher.hash_fragment.digest()
pieces_bytes = piece_hasher.final_piece_hashes()
model.storage.save_torrent_info(updated_storage, app.config['TORRENT_PIECE_SIZE'], pieces_bytes)
# Append the computed checksum.

View file

@ -15,7 +15,10 @@ from storage import Storage
from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess
from util.registry.torrent import make_torrent, per_user_torrent_filename, public_torrent_filename
from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename,
PieceHasher)
from util.registry.filelike import wrap_with_handler
from util.registry.gzipstream import SizeInfo
from formats.squashed import SquashedDockerImage
from formats.aci import ACIImage
from endpoints.v2.blob import BLOB_DIGEST_ROUTE
@ -25,7 +28,8 @@ verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image):
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image,
handlers):
store = Storage(app)
# For performance reasons, we load the full image list here, cache it, then disconnect from
@ -55,10 +59,13 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json,
get_next_image, get_next_layer, get_image_json)
for handler_fn in handlers:
stream = wrap_with_handler(stream, handler_fn)
return stream.read
def _sign_sythentic_image(verb, linked_storage_uuid, queue_file):
def _sign_synthetic_image(verb, linked_storage_uuid, queue_file):
signature = None
try:
signature = signer.detached_sign(queue_file)
@ -200,12 +207,20 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
# Close any existing DB connection once the process has exited.
database.close_db_filter(None)
hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'])
def _store_metadata_and_cleanup():
with database.UseThenDisconnect(app.config):
model.storage.save_torrent_info(derived, app.config['TORRENT_PIECE_SIZE'],
hasher.final_piece_hashes())
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image)
handlers = [hasher.update]
args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image,
handlers)
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_cleanup)
args, finished=_store_metadata_and_cleanup)
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
@ -224,7 +239,7 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
if sign and signer.name:
signing_args = (verb, derived.uuid, signing_queue_file)
QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup)
QueueProcess.run_process(_sign_synthetic_image, signing_args, finished=_cleanup)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)

View file

@ -4,7 +4,7 @@ APScheduler==3.0.3
autobahn==0.9.3-3
Babel==1.3
beautifulsoup4==4.4.0
bintrees==2.0.2
bintrees==2.0.3
blinker==1.3
boto==2.38.0
cachetools==1.0.3

Binary file not shown.

View file

@ -80,3 +80,6 @@ class PieceHasher(object):
@property
def hash_fragment(self):
return self._hash_fragment
def final_piece_hashes(self):
return self._piece_hashes + self._hash_fragment.digest()