Add a reporter for verbs to track number of storage streams are needed
This commit is contained in:
parent
110366f656
commit
dd470bdc9d
7 changed files with 43 additions and 14 deletions
|
@ -21,6 +21,7 @@ from util.http import exact_abort
|
||||||
from util.registry.filelike import wrap_with_handler
|
from util.registry.filelike import wrap_with_handler
|
||||||
from util.registry.queuefile import QueueFile
|
from util.registry.queuefile import QueueFile
|
||||||
from util.registry.queueprocess import QueueProcess
|
from util.registry.queueprocess import QueueProcess
|
||||||
|
from util.registry.tarlayerformat import TarLayerFormatterReporter
|
||||||
from util.registry.torrent import (
|
from util.registry.torrent import (
|
||||||
make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher)
|
make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher)
|
||||||
|
|
||||||
|
@ -32,7 +33,15 @@ license_validator.enforce_license_before_request(verbs)
|
||||||
LAYER_MIMETYPE = 'binary/octet-stream'
|
LAYER_MIMETYPE = 'binary/octet-stream'
|
||||||
|
|
||||||
|
|
||||||
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
|
class VerbReporter(TarLayerFormatterReporter):
|
||||||
|
def __init__(self, kind):
|
||||||
|
self.kind = kind
|
||||||
|
|
||||||
|
def report_pass(self, pass_count):
|
||||||
|
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
|
||||||
|
|
||||||
|
|
||||||
|
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporter):
|
||||||
"""
|
"""
|
||||||
This method generates a stream of data which will be replicated and read from the queue files.
|
This method generates a stream of data which will be replicated and read from the queue files.
|
||||||
This method runs in a separate process.
|
This method runs in a separate process.
|
||||||
|
@ -47,14 +56,14 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
|
||||||
yield current_image
|
yield current_image
|
||||||
|
|
||||||
def image_stream_getter(store, current_image):
|
def image_stream_getter(store, current_image):
|
||||||
def func():
|
def get_stream_for_storage():
|
||||||
current_image_path = model.get_blob_path(current_image.blob)
|
current_image_path = model.get_blob_path(current_image.blob)
|
||||||
current_image_stream = store.stream_read_file(current_image.blob.locations,
|
current_image_stream = store.stream_read_file(current_image.blob.locations,
|
||||||
current_image_path)
|
current_image_path)
|
||||||
|
|
||||||
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
|
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
|
||||||
return current_image_stream
|
return current_image_stream
|
||||||
return func
|
return get_stream_for_storage
|
||||||
|
|
||||||
def tar_stream_getter_iterator():
|
def tar_stream_getter_iterator():
|
||||||
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
|
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
|
||||||
|
@ -63,7 +72,7 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
|
||||||
yield image_stream_getter(store, current_image)
|
yield image_stream_getter(store, current_image)
|
||||||
|
|
||||||
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image,
|
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image,
|
||||||
tar_stream_getter_iterator)
|
tar_stream_getter_iterator, reporter=reporter)
|
||||||
|
|
||||||
for handler_fn in handlers:
|
for handler_fn in handlers:
|
||||||
stream = wrap_with_handler(stream, handler_fn)
|
stream = wrap_with_handler(stream, handler_fn)
|
||||||
|
@ -286,7 +295,8 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
|
||||||
# Create a queue process to generate the data. The queue files will read from the process
|
# Create a queue process to generate the data. The queue files will read from the process
|
||||||
# and send the results to the client and storage.
|
# and send the results to the client and storage.
|
||||||
handlers = [hasher.update]
|
handlers = [hasher.update]
|
||||||
args = (formatter, repo_image, tag, derived_image_id, handlers)
|
reporter = VerbReporter(verb)
|
||||||
|
args = (formatter, repo_image, tag, derived_image_id, handlers, reporter)
|
||||||
queue_process = QueueProcess(
|
queue_process = QueueProcess(
|
||||||
_open_stream,
|
_open_stream,
|
||||||
8 * 1024,
|
8 * 1024,
|
||||||
|
|
|
@ -19,7 +19,7 @@ class AppCImageFormatter(TarImageFormatter):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
||||||
tar_stream_getter_iterator):
|
tar_stream_getter_iterator, reporter=None):
|
||||||
image_mtime = 0
|
image_mtime = 0
|
||||||
created = next(get_image_iterator()).v1_metadata.created
|
created = next(get_image_iterator()).v1_metadata.created
|
||||||
if created is not None:
|
if created is not None:
|
||||||
|
@ -40,7 +40,8 @@ class AppCImageFormatter(TarImageFormatter):
|
||||||
# Yield the merged layer dtaa.
|
# Yield the merged layer dtaa.
|
||||||
yield self.tar_folder('rootfs', mtime=image_mtime)
|
yield self.tar_folder('rootfs', mtime=image_mtime)
|
||||||
|
|
||||||
layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/')
|
layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/',
|
||||||
|
reporter=reporter)
|
||||||
for entry in layer_merger.get_generator():
|
for entry in layer_merger.get_generator():
|
||||||
yield entry
|
yield entry
|
||||||
|
|
||||||
|
|
|
@ -8,13 +8,13 @@ class TarImageFormatter(object):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
||||||
tar_stream_getter_iterator):
|
tar_stream_getter_iterator, reporter=None):
|
||||||
"""
|
"""
|
||||||
Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's
|
Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's
|
||||||
implementation.
|
implementation.
|
||||||
"""
|
"""
|
||||||
return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator,
|
return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator,
|
||||||
tar_stream_getter_iterator))
|
tar_stream_getter_iterator, reporter=reporter))
|
||||||
|
|
||||||
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
||||||
tar_stream_getter_iterator):
|
tar_stream_getter_iterator):
|
||||||
|
|
|
@ -29,7 +29,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
|
||||||
SIZE_MULTIPLIER = 1.2
|
SIZE_MULTIPLIER = 1.2
|
||||||
|
|
||||||
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
|
||||||
tar_stream_getter_iterator):
|
tar_stream_getter_iterator, reporter=None):
|
||||||
image_mtime = 0
|
image_mtime = 0
|
||||||
created = next(get_image_iterator()).v1_metadata.created
|
created = next(get_image_iterator()).v1_metadata.created
|
||||||
if created is not None:
|
if created is not None:
|
||||||
|
@ -42,7 +42,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
|
||||||
# json - The layer JSON
|
# json - The layer JSON
|
||||||
# layer.tar - The tarballed contents of the layer
|
# layer.tar - The tarballed contents of the layer
|
||||||
# VERSION - The docker import version: '1.0'
|
# VERSION - The docker import version: '1.0'
|
||||||
layer_merger = StreamLayerMerger(tar_stream_getter_iterator)
|
layer_merger = StreamLayerMerger(tar_stream_getter_iterator, reporter=reporter)
|
||||||
|
|
||||||
# Yield the repositories file:
|
# Yield the repositories file:
|
||||||
synthetic_layer_info = {}
|
synthetic_layer_info = {}
|
||||||
|
|
|
@ -114,6 +114,9 @@ class MetricQueue(object):
|
||||||
'Invalid registry instance key count',
|
'Invalid registry instance key count',
|
||||||
labelnames=['key_id'])
|
labelnames=['key_id'])
|
||||||
|
|
||||||
|
self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count',
|
||||||
|
labelnames=['kind', 'pass_count'])
|
||||||
|
|
||||||
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
|
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
|
||||||
# provider.
|
# provider.
|
||||||
self._queue = None
|
self._queue = None
|
||||||
|
|
|
@ -8,8 +8,8 @@ from util.registry.tarlayerformat import TarLayerFormat
|
||||||
|
|
||||||
class StreamLayerMerger(TarLayerFormat):
|
class StreamLayerMerger(TarLayerFormat):
|
||||||
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
|
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
|
||||||
def __init__(self, get_tar_stream_iterator, path_prefix=None):
|
def __init__(self, get_tar_stream_iterator, path_prefix=None, reporter=None):
|
||||||
super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix)
|
super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix, reporter=reporter)
|
||||||
|
|
||||||
self.path_trie = marisa_trie.Trie()
|
self.path_trie = marisa_trie.Trie()
|
||||||
self.path_encountered = set()
|
self.path_encountered = set()
|
||||||
|
|
|
@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from six import add_metaclass
|
from six import add_metaclass
|
||||||
|
|
||||||
|
from util.abchelpers import nooper
|
||||||
|
|
||||||
class TarLayerReadException(Exception):
|
class TarLayerReadException(Exception):
|
||||||
""" Exception raised when reading a layer has failed. """
|
""" Exception raised when reading a layer has failed. """
|
||||||
|
@ -15,13 +16,26 @@ class TarLayerReadException(Exception):
|
||||||
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
|
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
|
||||||
CHUNK_SIZE = 1024 * 1024 * 9
|
CHUNK_SIZE = 1024 * 1024 * 9
|
||||||
|
|
||||||
|
@add_metaclass(ABCMeta)
|
||||||
|
class TarLayerFormatterReporter(object):
|
||||||
|
@abstractmethod
|
||||||
|
def report_pass(self, stream_count):
|
||||||
|
""" Reports a formatting pass. """
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@nooper
|
||||||
|
class NoopReporter(TarLayerFormatterReporter):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@add_metaclass(ABCMeta)
|
@add_metaclass(ABCMeta)
|
||||||
class TarLayerFormat(object):
|
class TarLayerFormat(object):
|
||||||
""" Class which creates a generator of the combined TAR data. """
|
""" Class which creates a generator of the combined TAR data. """
|
||||||
def __init__(self, tar_stream_getter_iterator, path_prefix=None):
|
def __init__(self, tar_stream_getter_iterator, path_prefix=None, reporter=None):
|
||||||
self.tar_stream_getter_iterator = tar_stream_getter_iterator
|
self.tar_stream_getter_iterator = tar_stream_getter_iterator
|
||||||
self.path_prefix = path_prefix or ''
|
self.path_prefix = path_prefix or ''
|
||||||
|
self.reporter = reporter or NoopReporter()
|
||||||
|
|
||||||
def get_generator(self):
|
def get_generator(self):
|
||||||
for stream_getter in self.tar_stream_getter_iterator():
|
for stream_getter in self.tar_stream_getter_iterator():
|
||||||
|
@ -115,6 +129,7 @@ class TarLayerFormat(object):
|
||||||
|
|
||||||
# Conduct any post-tar work.
|
# Conduct any post-tar work.
|
||||||
self.after_tar_layer()
|
self.after_tar_layer()
|
||||||
|
self.reporter.report_pass(2 if len(dangling_hard_links) > 0 else 1)
|
||||||
|
|
||||||
# Last two records are empty in TAR spec.
|
# Last two records are empty in TAR spec.
|
||||||
yield '\0' * 512
|
yield '\0' * 512
|
||||||
|
|
Reference in a new issue