diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index 7be39fded..7a13c54d3 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -21,6 +21,7 @@ from util.http import exact_abort from util.registry.filelike import wrap_with_handler from util.registry.queuefile import QueueFile from util.registry.queueprocess import QueueProcess +from util.registry.tarlayerformat import TarLayerFormatterReporter from util.registry.torrent import ( make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher) @@ -32,7 +33,15 @@ license_validator.enforce_license_before_request(verbs) LAYER_MIMETYPE = 'binary/octet-stream' -def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): +class VerbReporter(TarLayerFormatterReporter): + def __init__(self, kind): + self.kind = kind + + def report_pass(self, pass_count): + metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count]) + + +def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporter): """ This method generates a stream of data which will be replicated and read from the queue files. This method runs in a separate process. @@ -47,14 +56,14 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): yield current_image def image_stream_getter(store, current_image): - def func(): + def get_stream_for_storage(): current_image_path = model.get_blob_path(current_image.blob) current_image_stream = store.stream_read_file(current_image.blob.locations, current_image_path) logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path) return current_image_stream - return func + return get_stream_for_storage def tar_stream_getter_iterator(): # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) @@ -63,7 +72,7 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): yield image_stream_getter(store, current_image) stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, - tar_stream_getter_iterator) + tar_stream_getter_iterator, reporter=reporter) for handler_fn in handlers: stream = wrap_with_handler(stream, handler_fn) @@ -286,7 +295,8 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. handlers = [hasher.update] - args = (formatter, repo_image, tag, derived_image_id, handlers) + reporter = VerbReporter(verb) + args = (formatter, repo_image, tag, derived_image_id, handlers, reporter) queue_process = QueueProcess( _open_stream, 8 * 1024, diff --git a/image/appc/__init__.py b/image/appc/__init__.py index b9ac26247..91730f923 100644 --- a/image/appc/__init__.py +++ b/image/appc/__init__.py @@ -19,7 +19,7 @@ class AppCImageFormatter(TarImageFormatter): """ def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -40,7 +40,8 @@ class AppCImageFormatter(TarImageFormatter): # Yield the merged layer dtaa. yield self.tar_folder('rootfs', mtime=image_mtime) - layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/') + layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/', + reporter=reporter) for entry in layer_merger.get_generator(): yield entry diff --git a/image/common.py b/image/common.py index b075aa0ff..356288316 100644 --- a/image/common.py +++ b/image/common.py @@ -8,13 +8,13 @@ class TarImageFormatter(object): """ def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): """ Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's implementation. """ return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator)) + tar_stream_getter_iterator, reporter=reporter)) def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, tar_stream_getter_iterator): diff --git a/image/docker/squashed.py b/image/docker/squashed.py index df41d6e6b..5a52c7109 100644 --- a/image/docker/squashed.py +++ b/image/docker/squashed.py @@ -29,7 +29,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): SIZE_MULTIPLIER = 1.2 def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -42,7 +42,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): # json - The layer JSON # layer.tar - The tarballed contents of the layer # VERSION - The docker import version: '1.0' - layer_merger = StreamLayerMerger(tar_stream_getter_iterator) + layer_merger = StreamLayerMerger(tar_stream_getter_iterator, reporter=reporter) # Yield the repositories file: synthetic_layer_info = {} diff --git a/util/metrics/metricqueue.py b/util/metrics/metricqueue.py index 6348ec6d2..2ebf95e1c 100644 --- a/util/metrics/metricqueue.py +++ b/util/metrics/metricqueue.py @@ -114,6 +114,9 @@ class MetricQueue(object): 'Invalid registry instance key count', labelnames=['key_id']) + self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count', + labelnames=['kind', 'pass_count']) + # Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another # provider. self._queue = None diff --git a/util/registry/streamlayerformat.py b/util/registry/streamlayerformat.py index d21c61401..f6330ef4c 100644 --- a/util/registry/streamlayerformat.py +++ b/util/registry/streamlayerformat.py @@ -8,8 +8,8 @@ from util.registry.tarlayerformat import TarLayerFormat class StreamLayerMerger(TarLayerFormat): """ Class which creates a generator of the combined TAR data for a set of Docker layers. """ - def __init__(self, get_tar_stream_iterator, path_prefix=None): - super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix) + def __init__(self, get_tar_stream_iterator, path_prefix=None, reporter=None): + super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix, reporter=reporter) self.path_trie = marisa_trie.Trie() self.path_encountered = set() diff --git a/util/registry/tarlayerformat.py b/util/registry/tarlayerformat.py index e6c02e551..5c114519f 100644 --- a/util/registry/tarlayerformat.py +++ b/util/registry/tarlayerformat.py @@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod from collections import defaultdict from six import add_metaclass +from util.abchelpers import nooper class TarLayerReadException(Exception): """ Exception raised when reading a layer has failed. """ @@ -15,13 +16,26 @@ class TarLayerReadException(Exception): # 9MB (+ padding below) so that it matches the 10MB expected by Gzip. CHUNK_SIZE = 1024 * 1024 * 9 +@add_metaclass(ABCMeta) +class TarLayerFormatterReporter(object): + @abstractmethod + def report_pass(self, stream_count): + """ Reports a formatting pass. """ + pass + + +@nooper +class NoopReporter(TarLayerFormatterReporter): + pass + @add_metaclass(ABCMeta) class TarLayerFormat(object): """ Class which creates a generator of the combined TAR data. """ - def __init__(self, tar_stream_getter_iterator, path_prefix=None): + def __init__(self, tar_stream_getter_iterator, path_prefix=None, reporter=None): self.tar_stream_getter_iterator = tar_stream_getter_iterator self.path_prefix = path_prefix or '' + self.reporter = reporter or NoopReporter() def get_generator(self): for stream_getter in self.tar_stream_getter_iterator(): @@ -115,6 +129,7 @@ class TarLayerFormat(object): # Conduct any post-tar work. self.after_tar_layer() + self.reporter.report_pass(2 if len(dangling_hard_links) > 0 else 1) # Last two records are empty in TAR spec. yield '\0' * 512