Hash and track layer file chunks for torrenting
This commit is contained in:
parent
40c741f34e
commit
fe87d3c796
8 changed files with 115 additions and 10 deletions
|
@ -273,3 +273,6 @@ class DefaultConfig(object):
|
|||
'API_VERSION': 'v1',
|
||||
'API_TIMEOUT_SECONDS': 10,
|
||||
}
|
||||
|
||||
# Torrent management flags
|
||||
TORRENT_PIECE_SIZE = 512 * 1024
|
||||
|
|
|
@ -9,7 +9,7 @@ from random import SystemRandom
|
|||
from datetime import datetime
|
||||
from peewee import *
|
||||
from data.read_slave import ReadSlaveModel
|
||||
from data.fields import ResumableSHAField, JSONField
|
||||
from data.fields import ResumableSHA256Field, ResumableSHA1Field, JSONField
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from collections import defaultdict
|
||||
|
||||
|
@ -803,12 +803,14 @@ class BlobUpload(BaseModel):
|
|||
repository = ForeignKeyField(Repository, index=True)
|
||||
uuid = CharField(index=True, unique=True)
|
||||
byte_count = IntegerField(default=0)
|
||||
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
|
||||
sha_state = ResumableSHA256Field(null=True, default=resumablehashlib.sha256)
|
||||
location = ForeignKeyField(ImageStorageLocation)
|
||||
storage_metadata = JSONField(null=True, default={})
|
||||
chunk_count = IntegerField(default=0)
|
||||
uncompressed_byte_count = IntegerField(null=True)
|
||||
created = DateTimeField(default=datetime.now, index=True)
|
||||
piece_sha_state = ResumableSHA1Field(null=True, default=resumablehashlib.sha1)
|
||||
piece_hashes = TextField(default='')
|
||||
|
||||
class Meta:
|
||||
database = db
|
||||
|
@ -846,6 +848,16 @@ class QuayRelease(BaseModel):
|
|||
)
|
||||
|
||||
|
||||
class TorrentInfo(BaseModel):
|
||||
storage = ForeignKeyField(ImageStorage)
|
||||
piece_length = IntegerField()
|
||||
pieces = TextField()
|
||||
|
||||
indexes = (
|
||||
(('storage', 'piece_length'), True),
|
||||
)
|
||||
|
||||
|
||||
all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility,
|
||||
RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem,
|
||||
RepositoryBuild, Team, TeamMember, TeamRole, LogEntryKind, LogEntry,
|
||||
|
@ -856,4 +868,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
|
|||
RepositoryAuthorizedEmail, ImageStorageTransformation,
|
||||
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
|
||||
AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion,
|
||||
QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage]
|
||||
QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage, TorrentInfo]
|
||||
|
|
|
@ -5,7 +5,7 @@ import json
|
|||
from peewee import TextField
|
||||
|
||||
|
||||
class ResumableSHAField(TextField):
|
||||
class ResumableSHA256Field(TextField):
|
||||
def db_value(self, value):
|
||||
sha_state = value.state()
|
||||
|
||||
|
@ -28,6 +28,29 @@ class ResumableSHAField(TextField):
|
|||
return to_resume
|
||||
|
||||
|
||||
class ResumableSHA1Field(TextField):
|
||||
def db_value(self, value):
|
||||
sha_state = value.state()
|
||||
|
||||
# One of the fields is a byte string, let's base64 encode it to make sure
|
||||
# we can store and fetch it regardless of default collocation.
|
||||
sha_state[3] = base64.b64encode(sha_state[3])
|
||||
|
||||
return json.dumps(sha_state)
|
||||
|
||||
def python_value(self, value):
|
||||
to_resume = resumablehashlib.sha1()
|
||||
if value is None:
|
||||
return to_resume
|
||||
|
||||
sha_state = json.loads(value)
|
||||
|
||||
# We need to base64 decode the data bytestring.
|
||||
sha_state[3] = base64.b64decode(sha_state[3])
|
||||
to_resume.set_state(sha_state)
|
||||
return to_resume
|
||||
|
||||
|
||||
class JSONField(TextField):
|
||||
def db_value(self, value):
|
||||
return json.dumps(value)
|
||||
|
|
|
@ -51,6 +51,8 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
|
|||
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
|
||||
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
|
||||
|
||||
return storage
|
||||
|
||||
|
||||
def get_blob_upload(namespace, repo_name, upload_uuid):
|
||||
""" Load the upload which is already in progress.
|
||||
|
|
|
@ -5,7 +5,7 @@ from peewee import JOIN_LEFT_OUTER, fn, SQL
|
|||
from data.model import config, db_transaction, InvalidImageException
|
||||
from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement,
|
||||
ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature,
|
||||
ImageStorageSignatureKind, Repository, Namespace)
|
||||
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -212,3 +212,7 @@ def get_storage_locations(uuid):
|
|||
.where(ImageStorage.uuid == uuid))
|
||||
|
||||
return [location.location.name for location in query]
|
||||
|
||||
|
||||
def save_torrent_info(storage_object, piece_length, pieces):
|
||||
TorrentInfo.create(storage=storage_object, piece_length=piece_length, pieces=pieces)
|
||||
|
|
|
@ -14,6 +14,7 @@ from endpoints.decorators import anon_protect
|
|||
from util.cache import cache_control
|
||||
from util.registry.filelike import wrap_with_handler, StreamSlice
|
||||
from util.registry.gzipstream import calculate_size_handler
|
||||
from util.registry.torrent import PieceHasher
|
||||
from storage.basestorage import InvalidChunkException
|
||||
|
||||
|
||||
|
@ -221,6 +222,13 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
|||
|
||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
||||
|
||||
piece_hasher = None
|
||||
# TODO remove this when all in-progress blob uploads reliably contain piece hashes
|
||||
if start_offset == 0 and found.piece_sha_state is not None:
|
||||
piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'], start_offset, found.piece_hashes,
|
||||
found.piece_sha_state)
|
||||
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
|
||||
|
||||
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
|
||||
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
|
||||
# comes in, but in the common case Docker only sends one chunk.
|
||||
|
@ -231,7 +239,8 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
|||
|
||||
try:
|
||||
length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
|
||||
start_offset, length, input_fp,
|
||||
start_offset, length,
|
||||
input_fp,
|
||||
found.storage_metadata,
|
||||
content_type=BLOB_CONTENT_TYPE)
|
||||
except InvalidChunkException:
|
||||
|
@ -246,6 +255,10 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
|||
# know the uncompressed size.
|
||||
found.uncompressed_byte_count = None
|
||||
|
||||
if piece_hasher is not None:
|
||||
found.piece_hashes = piece_hasher.piece_hashes
|
||||
found.piece_sha_state = piece_hasher.hash_fragment
|
||||
|
||||
found.storage_metadata = new_metadata
|
||||
found.byte_count += length_written
|
||||
found.chunk_count += 1
|
||||
|
@ -274,10 +287,15 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
|||
final_blob_location, upload_obj.storage_metadata)
|
||||
|
||||
# Mark the blob as uploaded.
|
||||
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
||||
upload_obj.location, upload_obj.byte_count,
|
||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||
upload_obj.uncompressed_byte_count)
|
||||
blob_storage = model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
||||
upload_obj.location,
|
||||
upload_obj.byte_count,
|
||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||
upload_obj.uncompressed_byte_count)
|
||||
|
||||
if upload_obj.piece_sha_state is not None:
|
||||
piece_string = upload_obj.piece_hashes + upload_obj.piece_sha_state.hexdigest()
|
||||
model.storage.save_torrent_info(blob_storage, app.config['TORRENT_PIECE_SIZE'], piece_string)
|
||||
|
||||
# Delete the upload tracking row.
|
||||
upload_obj.delete_instance()
|
||||
|
|
Binary file not shown.
43
util/registry/torrent.py
Normal file
43
util/registry/torrent.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
import resumablehashlib
|
||||
|
||||
|
||||
class PieceHasher(object):
|
||||
def __init__(self, piece_size, starting_offset, starting_piece_hash_str, hash_fragment_to_resume):
|
||||
if not isinstance(starting_offset, (int, long)):
|
||||
raise TypeError('starting_offset must be an integer')
|
||||
elif not isinstance(piece_size, (int, long)):
|
||||
raise TypeError('piece_size must be an integer')
|
||||
|
||||
self._current_offset = starting_offset
|
||||
self._piece_size = piece_size
|
||||
self._hash_fragment = hash_fragment_to_resume
|
||||
self._piece_hashes = [starting_piece_hash_str]
|
||||
|
||||
def update(self, buf):
|
||||
buf_offset = 0
|
||||
while buf_offset < len(buf):
|
||||
buf_bytes_to_hash = buf[0:self._piece_length_remaining()]
|
||||
to_hash_len = len(buf_bytes_to_hash)
|
||||
|
||||
if self._piece_offset() == 0 and to_hash_len > 0 and self._current_offset > 0:
|
||||
# We are opening a new piece
|
||||
self._piece_hashes.append(self._hash_fragment.hexdigest())
|
||||
self._hash_fragment = resumablehashlib.sha1()
|
||||
|
||||
self._hash_fragment.update(buf_bytes_to_hash)
|
||||
self._current_offset += to_hash_len
|
||||
buf_offset += to_hash_len
|
||||
|
||||
def _piece_length_remaining(self):
|
||||
return self._piece_size - (self._current_offset % self._piece_size)
|
||||
|
||||
def _piece_offset(self):
|
||||
return self._current_offset % self._piece_size
|
||||
|
||||
@property
|
||||
def piece_hashes(self):
|
||||
return ''.join(self._piece_hashes)
|
||||
|
||||
@property
|
||||
def hash_fragment(self):
|
||||
return self._hash_fragment
|
Reference in a new issue