From 4b926ae189aa8bc728766143fac49b30fdd5083b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 3 Nov 2016 15:28:40 -0400 Subject: [PATCH] Add new metrics as requested by some customers Note that the `status` field on the pull and push metrics will eventually be set to False for failed pulls and pushes in a followup PR --- auth/process.py | 32 ++++++++++++++++++++++++-------- endpoints/v1/index.py | 4 ++-- endpoints/v1/registry.py | 5 ++++- endpoints/v2/blob.py | 9 ++++++++- endpoints/v2/manifest.py | 6 +++--- endpoints/verbs/__init__.py | 6 +++--- util/metrics/metricqueue.py | 16 +++++++++++++--- 7 files changed, 57 insertions(+), 21 deletions(-) diff --git a/auth/process.py b/auth/process.py index a9910825f..6afe48252 100644 --- a/auth/process.py +++ b/auth/process.py @@ -12,7 +12,7 @@ from flask_principal import identity_changed, Identity import scopes -from app import app, authentication +from app import app, authentication, metric_queue from auth_context import (set_authenticated_user, set_validated_token, set_grant_context, set_validated_oauth_token) from data import model @@ -52,14 +52,17 @@ def _validate_and_apply_oauth_token(token): validated = model.oauth.validate_access_token(token) if not validated: logger.warning('OAuth access token could not be validated: %s', token) + metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) raise InvalidToken('OAuth access token could not be validated: {token}'.format(token=token)) elif validated.expires_at <= datetime.utcnow(): logger.info('OAuth access with an expired token: %s', token) + metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) raise ExpiredToken('OAuth access token has expired: {token}'.format(token=token)) # Don't allow disabled users to login. if not validated.authorized_user.enabled: - return None + metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) + return False # We have a valid token scope_set = scopes.scopes_from_scope_string(validated.scope) @@ -71,6 +74,8 @@ def _validate_and_apply_oauth_token(token): new_identity = QuayDeferredPermissionUser.for_user(validated.authorized_user, scope_set) identity_changed.send(app, identity=new_identity) + metric_queue.authentication_count.Inc(labelvalues=['oauth', True]) + return True def _parse_basic_auth_header(auth): @@ -105,14 +110,16 @@ def _process_basic_auth(auth): logger.debug('Successfully validated token: %s', credentials[1]) set_validated_token(token) identity_changed.send(app, identity=Identity(token.code, 'token')) - return + metric_queue.authentication_count.Inc(labelvalues=['token', True]) + return True except model.DataModelException: logger.debug('Invalid token: %s', credentials[1]) + metric_queue.authentication_count.Inc(labelvalues=['token', False]) elif credentials[0] == '$oauthtoken': oauth_token = credentials[1] - _validate_and_apply_oauth_token(oauth_token) + return _validate_and_apply_oauth_token(oauth_token) elif '+' in credentials[0]: logger.debug('Trying robot auth with credentials %s', str(credentials)) @@ -124,9 +131,11 @@ def _process_basic_auth(auth): deferred_robot = QuayDeferredPermissionUser.for_user(robot) identity_changed.send(app, identity=deferred_robot) - return + metric_queue.authentication_count.Inc(labelvalues=['robot', True]) + return True except model.InvalidRobotException: logger.debug('Invalid robot or password for robot: %s', credentials[0]) + metric_queue.authentication_count.Inc(labelvalues=['robot', False]) else: (authenticated, _) = authentication.verify_and_link_user(credentials[0], credentials[1], @@ -137,10 +146,14 @@ def _process_basic_auth(auth): new_identity = QuayDeferredPermissionUser.for_user(authenticated) identity_changed.send(app, identity=new_identity) - return + metric_queue.authentication_count.Inc(labelvalues=['user', True]) + return True + else: + metric_queue.authentication_count.Inc(labelvalues=['user', False]) # We weren't able to authenticate via basic auth. logger.debug('Basic auth present but could not be validated.') + return False def has_basic_auth(username): @@ -175,11 +188,11 @@ def _process_signed_grant(auth): normalized = [part.strip() for part in auth.split(' ') if part] if normalized[0].lower() != 'token' or len(normalized) != 2: logger.debug('Not a token: %s', auth) - return + return False if not normalized[1].startswith(SIGNATURE_PREFIX): logger.debug('Not a signed grant token: %s', auth) - return + return False encrypted = normalized[1][len(SIGNATURE_PREFIX):] ser = SecureCookieSessionInterface().get_signing_serializer(app) @@ -188,6 +201,7 @@ def _process_signed_grant(auth): token_data = ser.loads(encrypted, max_age=app.config['SIGNED_GRANT_EXPIRATION_SEC']) except BadSignature: logger.warning('Signed grant could not be validated: %s', encrypted) + metric_queue.authentication_count.Inc(labelvalues=['signed', False]) abort(401, message='Signed grant could not be validated: %(auth)s', issue='invalid-auth-token', auth=auth) @@ -203,6 +217,8 @@ def _process_signed_grant(auth): loaded_identity.provides.update(token_data['grants']) identity_changed.send(app, identity=loaded_identity) + metric_queue.authentication_count.Inc(labelvalues=['signed', True]) + return True def process_oauth(func): diff --git a/endpoints/v1/index.py b/endpoints/v1/index.py index 2a3eb6d05..c4f815f98 100644 --- a/endpoints/v1/index.py +++ b/endpoints/v1/index.py @@ -234,7 +234,7 @@ def update_images(namespace_name, repo_name): track_and_log('push_repo', repo) spawn_notification(repo, 'repo_push', event_data) - metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1']) + metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1', True]) return make_response('Updated', 204) abort(403) @@ -261,7 +261,7 @@ def get_repository_images(namespace_name, repo_name): resp.mimetype = 'application/json' track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1']) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1', True]) return resp abort(403) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index ea4f1f2e4..543a8f4c2 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -7,7 +7,7 @@ from time import time from flask import make_response, request, session, Response, redirect, abort as flask_abort -from app import storage as store, app +from app import storage as store, app, metric_queue from auth.auth_context import get_authenticated_user from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) @@ -197,7 +197,10 @@ def put_image_layer(namespace, repository, image_id): locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id) with database.CloseForLongOperation(app.config): try: + start_time = time() store.stream_write(locations, path, sr) + metric_queue.chunk_upload_time.Observe(time() - start_time, + labelvalues=[size_info.compressed_size, list(locations)[0]]) except IOError: logger.exception('Exception when writing image data') abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index fa9886af5..ba0acf9ad 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -1,11 +1,12 @@ import logging import re +import time from flask import url_for, request, redirect, Response, abort as flask_abort import resumablehashlib -from app import storage, app, get_app_url +from app import storage, app, get_app_url, metric_queue from auth.registry_jwt_auth import process_registry_jwt_auth from data import database from data.interfaces.v2 import pre_oci_model as model @@ -398,6 +399,7 @@ def _upload_chunk(blob_upload, range_header): size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) + start_time = time.time() length_written, new_metadata, upload_error = storage.stream_upload_chunk( location_set, blob_upload.uuid, @@ -412,6 +414,10 @@ def _upload_chunk(blob_upload, range_header): logger.error('storage.stream_upload_chunk returned error %s', upload_error) return None + # Update the chunk upload time metric. + metric_queue.chunk_upload_time.Observe(time.time() - start_time, + labelvalues=[length_written, list(location_set)[0]]) + # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid: @@ -428,6 +434,7 @@ def _upload_chunk(blob_upload, range_header): blob_upload.storage_metadata = new_metadata blob_upload.byte_count += length_written blob_upload.chunk_count += 1 + return blob_upload diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 4c56e8701..0d220fcab 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -51,7 +51,7 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): repo = model.get_repository(namespace_name, repo_name) if repo is not None: track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2']) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) return Response( manifest.json, @@ -74,7 +74,7 @@ def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref): repo = model.get_repository(namespace_name, repo_name) if repo is not None: track_and_log('pull_repo', repo) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2']) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) return Response(manifest.json, status=200, headers={'Content-Type': manifest.media_type, 'Docker-Content-Digest': manifest.digest}) @@ -204,7 +204,7 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest): track_and_log('push_repo', repo, tag=manifest.tag) spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]}) - metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2']) + metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) return Response( 'OK', diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index d7c8e248b..e176d6e6f 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -212,12 +212,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= # Check for torrent. If found, we return a torrent for the repo verb image (if the derived # image already exists). if request.accept_mimetypes.best == 'application/x-bittorrent': - metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent']) + metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True]) return _torrent_repo_verb(repo_image, tag, verb, **kwargs) # Log the action. track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) - metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb]) + metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True]) # Lookup/create the derived image for the verb and repo image. derived_image = model.lookup_or_create_derived_image(repo_image, verb, @@ -360,5 +360,5 @@ def get_tag_torrent(namespace_name, repo_name, digest): if blob is None: abort(404) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent']) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent', True]) return _torrent_for_blob(blob, public_repo) diff --git a/util/metrics/metricqueue.py b/util/metrics/metricqueue.py index 8c9785e6b..57f1e4794 100644 --- a/util/metrics/metricqueue.py +++ b/util/metrics/metricqueue.py @@ -41,7 +41,7 @@ class MetricQueue(object): 'Time from triggering to starting a builder.', labelnames=['builder_type'], buckets=BUILDER_START_TIME_BUCKETS) - self.builder_time_to_build = prom.create_histogram('builder_ttb', + self.builder_time_to_build = prom.create_histogram('builder_ttb', 'Time from triggering to actually starting a build', labelnames=['builder_type'], buckets=BUILDER_START_TIME_BUCKETS) @@ -61,16 +61,26 @@ class MetricQueue(object): labelnames=['queue_name']) self.repository_pull = prom.create_counter('repository_pull', 'Repository Pull Count', - labelnames=['namespace', 'repo_name', 'protocol']) + labelnames=['namespace', 'repo_name', 'protocol', + 'status']) self.repository_push = prom.create_counter('repository_push', 'Repository Push Count', - labelnames=['namespace', 'repo_name', 'protocol']) + labelnames=['namespace', 'repo_name', 'protocol', + 'status']) self.repository_build_completed = prom.create_counter('repository_build_completed', 'Repository Build Complete Count', labelnames=['namespace', 'repo_name', 'status', 'executor']) + self.chunk_upload_time = prom.create_histogram('chunk_upload_time', + 'Registry blob chunk upload time', + labelnames=['chunk_size', 'storage_region']) + + self.authentication_count = prom.create_counter('authenication_count', + 'Authentication count', + labelnames=['kind', 'status']) + self.repository_count = prom.create_gauge('repository_count', 'Number of repositories') self.user_count = prom.create_gauge('user_count', 'Number of users') self.org_count = prom.create_gauge('org_count', 'Number of Organizations')