Add new metrics as requested by some customers
Note that the `status` field on the pull and push metrics will eventually be set to False for failed pulls and pushes in a followup PR
This commit is contained in:
parent
7fc4aa7afd
commit
4b926ae189
7 changed files with 57 additions and 21 deletions
|
@ -12,7 +12,7 @@ from flask_principal import identity_changed, Identity
|
||||||
|
|
||||||
import scopes
|
import scopes
|
||||||
|
|
||||||
from app import app, authentication
|
from app import app, authentication, metric_queue
|
||||||
from auth_context import (set_authenticated_user, set_validated_token, set_grant_context,
|
from auth_context import (set_authenticated_user, set_validated_token, set_grant_context,
|
||||||
set_validated_oauth_token)
|
set_validated_oauth_token)
|
||||||
from data import model
|
from data import model
|
||||||
|
@ -52,14 +52,17 @@ def _validate_and_apply_oauth_token(token):
|
||||||
validated = model.oauth.validate_access_token(token)
|
validated = model.oauth.validate_access_token(token)
|
||||||
if not validated:
|
if not validated:
|
||||||
logger.warning('OAuth access token could not be validated: %s', token)
|
logger.warning('OAuth access token could not be validated: %s', token)
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['oauth', False])
|
||||||
raise InvalidToken('OAuth access token could not be validated: {token}'.format(token=token))
|
raise InvalidToken('OAuth access token could not be validated: {token}'.format(token=token))
|
||||||
elif validated.expires_at <= datetime.utcnow():
|
elif validated.expires_at <= datetime.utcnow():
|
||||||
logger.info('OAuth access with an expired token: %s', token)
|
logger.info('OAuth access with an expired token: %s', token)
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['oauth', False])
|
||||||
raise ExpiredToken('OAuth access token has expired: {token}'.format(token=token))
|
raise ExpiredToken('OAuth access token has expired: {token}'.format(token=token))
|
||||||
|
|
||||||
# Don't allow disabled users to login.
|
# Don't allow disabled users to login.
|
||||||
if not validated.authorized_user.enabled:
|
if not validated.authorized_user.enabled:
|
||||||
return None
|
metric_queue.authentication_count.Inc(labelvalues=['oauth', False])
|
||||||
|
return False
|
||||||
|
|
||||||
# We have a valid token
|
# We have a valid token
|
||||||
scope_set = scopes.scopes_from_scope_string(validated.scope)
|
scope_set = scopes.scopes_from_scope_string(validated.scope)
|
||||||
|
@ -71,6 +74,8 @@ def _validate_and_apply_oauth_token(token):
|
||||||
|
|
||||||
new_identity = QuayDeferredPermissionUser.for_user(validated.authorized_user, scope_set)
|
new_identity = QuayDeferredPermissionUser.for_user(validated.authorized_user, scope_set)
|
||||||
identity_changed.send(app, identity=new_identity)
|
identity_changed.send(app, identity=new_identity)
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['oauth', True])
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _parse_basic_auth_header(auth):
|
def _parse_basic_auth_header(auth):
|
||||||
|
@ -105,14 +110,16 @@ def _process_basic_auth(auth):
|
||||||
logger.debug('Successfully validated token: %s', credentials[1])
|
logger.debug('Successfully validated token: %s', credentials[1])
|
||||||
set_validated_token(token)
|
set_validated_token(token)
|
||||||
identity_changed.send(app, identity=Identity(token.code, 'token'))
|
identity_changed.send(app, identity=Identity(token.code, 'token'))
|
||||||
return
|
metric_queue.authentication_count.Inc(labelvalues=['token', True])
|
||||||
|
return True
|
||||||
|
|
||||||
except model.DataModelException:
|
except model.DataModelException:
|
||||||
logger.debug('Invalid token: %s', credentials[1])
|
logger.debug('Invalid token: %s', credentials[1])
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['token', False])
|
||||||
|
|
||||||
elif credentials[0] == '$oauthtoken':
|
elif credentials[0] == '$oauthtoken':
|
||||||
oauth_token = credentials[1]
|
oauth_token = credentials[1]
|
||||||
_validate_and_apply_oauth_token(oauth_token)
|
return _validate_and_apply_oauth_token(oauth_token)
|
||||||
|
|
||||||
elif '+' in credentials[0]:
|
elif '+' in credentials[0]:
|
||||||
logger.debug('Trying robot auth with credentials %s', str(credentials))
|
logger.debug('Trying robot auth with credentials %s', str(credentials))
|
||||||
|
@ -124,9 +131,11 @@ def _process_basic_auth(auth):
|
||||||
|
|
||||||
deferred_robot = QuayDeferredPermissionUser.for_user(robot)
|
deferred_robot = QuayDeferredPermissionUser.for_user(robot)
|
||||||
identity_changed.send(app, identity=deferred_robot)
|
identity_changed.send(app, identity=deferred_robot)
|
||||||
return
|
metric_queue.authentication_count.Inc(labelvalues=['robot', True])
|
||||||
|
return True
|
||||||
except model.InvalidRobotException:
|
except model.InvalidRobotException:
|
||||||
logger.debug('Invalid robot or password for robot: %s', credentials[0])
|
logger.debug('Invalid robot or password for robot: %s', credentials[0])
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['robot', False])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
(authenticated, _) = authentication.verify_and_link_user(credentials[0], credentials[1],
|
(authenticated, _) = authentication.verify_and_link_user(credentials[0], credentials[1],
|
||||||
|
@ -137,10 +146,14 @@ def _process_basic_auth(auth):
|
||||||
|
|
||||||
new_identity = QuayDeferredPermissionUser.for_user(authenticated)
|
new_identity = QuayDeferredPermissionUser.for_user(authenticated)
|
||||||
identity_changed.send(app, identity=new_identity)
|
identity_changed.send(app, identity=new_identity)
|
||||||
return
|
metric_queue.authentication_count.Inc(labelvalues=['user', True])
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['user', False])
|
||||||
|
|
||||||
# We weren't able to authenticate via basic auth.
|
# We weren't able to authenticate via basic auth.
|
||||||
logger.debug('Basic auth present but could not be validated.')
|
logger.debug('Basic auth present but could not be validated.')
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def has_basic_auth(username):
|
def has_basic_auth(username):
|
||||||
|
@ -175,11 +188,11 @@ def _process_signed_grant(auth):
|
||||||
normalized = [part.strip() for part in auth.split(' ') if part]
|
normalized = [part.strip() for part in auth.split(' ') if part]
|
||||||
if normalized[0].lower() != 'token' or len(normalized) != 2:
|
if normalized[0].lower() != 'token' or len(normalized) != 2:
|
||||||
logger.debug('Not a token: %s', auth)
|
logger.debug('Not a token: %s', auth)
|
||||||
return
|
return False
|
||||||
|
|
||||||
if not normalized[1].startswith(SIGNATURE_PREFIX):
|
if not normalized[1].startswith(SIGNATURE_PREFIX):
|
||||||
logger.debug('Not a signed grant token: %s', auth)
|
logger.debug('Not a signed grant token: %s', auth)
|
||||||
return
|
return False
|
||||||
|
|
||||||
encrypted = normalized[1][len(SIGNATURE_PREFIX):]
|
encrypted = normalized[1][len(SIGNATURE_PREFIX):]
|
||||||
ser = SecureCookieSessionInterface().get_signing_serializer(app)
|
ser = SecureCookieSessionInterface().get_signing_serializer(app)
|
||||||
|
@ -188,6 +201,7 @@ def _process_signed_grant(auth):
|
||||||
token_data = ser.loads(encrypted, max_age=app.config['SIGNED_GRANT_EXPIRATION_SEC'])
|
token_data = ser.loads(encrypted, max_age=app.config['SIGNED_GRANT_EXPIRATION_SEC'])
|
||||||
except BadSignature:
|
except BadSignature:
|
||||||
logger.warning('Signed grant could not be validated: %s', encrypted)
|
logger.warning('Signed grant could not be validated: %s', encrypted)
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['signed', False])
|
||||||
abort(401, message='Signed grant could not be validated: %(auth)s', issue='invalid-auth-token',
|
abort(401, message='Signed grant could not be validated: %(auth)s', issue='invalid-auth-token',
|
||||||
auth=auth)
|
auth=auth)
|
||||||
|
|
||||||
|
@ -203,6 +217,8 @@ def _process_signed_grant(auth):
|
||||||
|
|
||||||
loaded_identity.provides.update(token_data['grants'])
|
loaded_identity.provides.update(token_data['grants'])
|
||||||
identity_changed.send(app, identity=loaded_identity)
|
identity_changed.send(app, identity=loaded_identity)
|
||||||
|
metric_queue.authentication_count.Inc(labelvalues=['signed', True])
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def process_oauth(func):
|
def process_oauth(func):
|
||||||
|
|
|
@ -234,7 +234,7 @@ def update_images(namespace_name, repo_name):
|
||||||
|
|
||||||
track_and_log('push_repo', repo)
|
track_and_log('push_repo', repo)
|
||||||
spawn_notification(repo, 'repo_push', event_data)
|
spawn_notification(repo, 'repo_push', event_data)
|
||||||
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1'])
|
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1', True])
|
||||||
return make_response('Updated', 204)
|
return make_response('Updated', 204)
|
||||||
|
|
||||||
abort(403)
|
abort(403)
|
||||||
|
@ -261,7 +261,7 @@ def get_repository_images(namespace_name, repo_name):
|
||||||
resp.mimetype = 'application/json'
|
resp.mimetype = 'application/json'
|
||||||
|
|
||||||
track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01)
|
track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01)
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1'])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1', True])
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
|
@ -7,7 +7,7 @@ from time import time
|
||||||
|
|
||||||
from flask import make_response, request, session, Response, redirect, abort as flask_abort
|
from flask import make_response, request, session, Response, redirect, abort as flask_abort
|
||||||
|
|
||||||
from app import storage as store, app
|
from app import storage as store, app, metric_queue
|
||||||
from auth.auth_context import get_authenticated_user
|
from auth.auth_context import get_authenticated_user
|
||||||
from auth.permissions import (ReadRepositoryPermission,
|
from auth.permissions import (ReadRepositoryPermission,
|
||||||
ModifyRepositoryPermission)
|
ModifyRepositoryPermission)
|
||||||
|
@ -197,7 +197,10 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id)
|
locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id)
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
try:
|
try:
|
||||||
|
start_time = time()
|
||||||
store.stream_write(locations, path, sr)
|
store.stream_write(locations, path, sr)
|
||||||
|
metric_queue.chunk_upload_time.Observe(time() - start_time,
|
||||||
|
labelvalues=[size_info.compressed_size, list(locations)[0]])
|
||||||
except IOError:
|
except IOError:
|
||||||
logger.exception('Exception when writing image data')
|
logger.exception('Exception when writing image data')
|
||||||
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
|
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
from flask import url_for, request, redirect, Response, abort as flask_abort
|
from flask import url_for, request, redirect, Response, abort as flask_abort
|
||||||
|
|
||||||
import resumablehashlib
|
import resumablehashlib
|
||||||
|
|
||||||
from app import storage, app, get_app_url
|
from app import storage, app, get_app_url, metric_queue
|
||||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||||
from data import database
|
from data import database
|
||||||
from data.interfaces.v2 import pre_oci_model as model
|
from data.interfaces.v2 import pre_oci_model as model
|
||||||
|
@ -398,6 +399,7 @@ def _upload_chunk(blob_upload, range_header):
|
||||||
size_info, fn = calculate_size_handler()
|
size_info, fn = calculate_size_handler()
|
||||||
input_fp = wrap_with_handler(input_fp, fn)
|
input_fp = wrap_with_handler(input_fp, fn)
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
length_written, new_metadata, upload_error = storage.stream_upload_chunk(
|
length_written, new_metadata, upload_error = storage.stream_upload_chunk(
|
||||||
location_set,
|
location_set,
|
||||||
blob_upload.uuid,
|
blob_upload.uuid,
|
||||||
|
@ -412,6 +414,10 @@ def _upload_chunk(blob_upload, range_header):
|
||||||
logger.error('storage.stream_upload_chunk returned error %s', upload_error)
|
logger.error('storage.stream_upload_chunk returned error %s', upload_error)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Update the chunk upload time metric.
|
||||||
|
metric_queue.chunk_upload_time.Observe(time.time() - start_time,
|
||||||
|
labelvalues=[length_written, list(location_set)[0]])
|
||||||
|
|
||||||
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
||||||
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
||||||
if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
|
if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
|
||||||
|
@ -428,6 +434,7 @@ def _upload_chunk(blob_upload, range_header):
|
||||||
blob_upload.storage_metadata = new_metadata
|
blob_upload.storage_metadata = new_metadata
|
||||||
blob_upload.byte_count += length_written
|
blob_upload.byte_count += length_written
|
||||||
blob_upload.chunk_count += 1
|
blob_upload.chunk_count += 1
|
||||||
|
|
||||||
return blob_upload
|
return blob_upload
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
|
||||||
repo = model.get_repository(namespace_name, repo_name)
|
repo = model.get_repository(namespace_name, repo_name)
|
||||||
if repo is not None:
|
if repo is not None:
|
||||||
track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01)
|
track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01)
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2'])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
manifest.json,
|
manifest.json,
|
||||||
|
@ -74,7 +74,7 @@ def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref):
|
||||||
repo = model.get_repository(namespace_name, repo_name)
|
repo = model.get_repository(namespace_name, repo_name)
|
||||||
if repo is not None:
|
if repo is not None:
|
||||||
track_and_log('pull_repo', repo)
|
track_and_log('pull_repo', repo)
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2'])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
|
||||||
|
|
||||||
return Response(manifest.json, status=200, headers={'Content-Type': manifest.media_type,
|
return Response(manifest.json, status=200, headers={'Content-Type': manifest.media_type,
|
||||||
'Docker-Content-Digest': manifest.digest})
|
'Docker-Content-Digest': manifest.digest})
|
||||||
|
@ -204,7 +204,7 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest):
|
||||||
|
|
||||||
track_and_log('push_repo', repo, tag=manifest.tag)
|
track_and_log('push_repo', repo, tag=manifest.tag)
|
||||||
spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]})
|
spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]})
|
||||||
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2'])
|
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
'OK',
|
'OK',
|
||||||
|
|
|
@ -212,12 +212,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
|
||||||
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
|
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
|
||||||
# image already exists).
|
# image already exists).
|
||||||
if request.accept_mimetypes.best == 'application/x-bittorrent':
|
if request.accept_mimetypes.best == 'application/x-bittorrent':
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent'])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True])
|
||||||
return _torrent_repo_verb(repo_image, tag, verb, **kwargs)
|
return _torrent_repo_verb(repo_image, tag, verb, **kwargs)
|
||||||
|
|
||||||
# Log the action.
|
# Log the action.
|
||||||
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
|
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True])
|
||||||
|
|
||||||
# Lookup/create the derived image for the verb and repo image.
|
# Lookup/create the derived image for the verb and repo image.
|
||||||
derived_image = model.lookup_or_create_derived_image(repo_image, verb,
|
derived_image = model.lookup_or_create_derived_image(repo_image, verb,
|
||||||
|
@ -360,5 +360,5 @@ def get_tag_torrent(namespace_name, repo_name, digest):
|
||||||
if blob is None:
|
if blob is None:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent'])
|
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent', True])
|
||||||
return _torrent_for_blob(blob, public_repo)
|
return _torrent_for_blob(blob, public_repo)
|
||||||
|
|
|
@ -41,7 +41,7 @@ class MetricQueue(object):
|
||||||
'Time from triggering to starting a builder.',
|
'Time from triggering to starting a builder.',
|
||||||
labelnames=['builder_type'],
|
labelnames=['builder_type'],
|
||||||
buckets=BUILDER_START_TIME_BUCKETS)
|
buckets=BUILDER_START_TIME_BUCKETS)
|
||||||
self.builder_time_to_build = prom.create_histogram('builder_ttb',
|
self.builder_time_to_build = prom.create_histogram('builder_ttb',
|
||||||
'Time from triggering to actually starting a build',
|
'Time from triggering to actually starting a build',
|
||||||
labelnames=['builder_type'],
|
labelnames=['builder_type'],
|
||||||
buckets=BUILDER_START_TIME_BUCKETS)
|
buckets=BUILDER_START_TIME_BUCKETS)
|
||||||
|
@ -61,16 +61,26 @@ class MetricQueue(object):
|
||||||
labelnames=['queue_name'])
|
labelnames=['queue_name'])
|
||||||
|
|
||||||
self.repository_pull = prom.create_counter('repository_pull', 'Repository Pull Count',
|
self.repository_pull = prom.create_counter('repository_pull', 'Repository Pull Count',
|
||||||
labelnames=['namespace', 'repo_name', 'protocol'])
|
labelnames=['namespace', 'repo_name', 'protocol',
|
||||||
|
'status'])
|
||||||
|
|
||||||
self.repository_push = prom.create_counter('repository_push', 'Repository Push Count',
|
self.repository_push = prom.create_counter('repository_push', 'Repository Push Count',
|
||||||
labelnames=['namespace', 'repo_name', 'protocol'])
|
labelnames=['namespace', 'repo_name', 'protocol',
|
||||||
|
'status'])
|
||||||
|
|
||||||
self.repository_build_completed = prom.create_counter('repository_build_completed',
|
self.repository_build_completed = prom.create_counter('repository_build_completed',
|
||||||
'Repository Build Complete Count',
|
'Repository Build Complete Count',
|
||||||
labelnames=['namespace', 'repo_name',
|
labelnames=['namespace', 'repo_name',
|
||||||
'status', 'executor'])
|
'status', 'executor'])
|
||||||
|
|
||||||
|
self.chunk_upload_time = prom.create_histogram('chunk_upload_time',
|
||||||
|
'Registry blob chunk upload time',
|
||||||
|
labelnames=['chunk_size', 'storage_region'])
|
||||||
|
|
||||||
|
self.authentication_count = prom.create_counter('authenication_count',
|
||||||
|
'Authentication count',
|
||||||
|
labelnames=['kind', 'status'])
|
||||||
|
|
||||||
self.repository_count = prom.create_gauge('repository_count', 'Number of repositories')
|
self.repository_count = prom.create_gauge('repository_count', 'Number of repositories')
|
||||||
self.user_count = prom.create_gauge('user_count', 'Number of users')
|
self.user_count = prom.create_gauge('user_count', 'Number of users')
|
||||||
self.org_count = prom.create_gauge('org_count', 'Number of Organizations')
|
self.org_count = prom.create_gauge('org_count', 'Number of Organizations')
|
||||||
|
|
Reference in a new issue