diff --git a/app.py b/app.py index 33245bee1..4e43d1cd7 100644 --- a/app.py +++ b/app.py @@ -19,7 +19,6 @@ import features from _init import (config_provider, CONF_DIR, IS_KUBERNETES, IS_TESTING, OVERRIDE_CONFIG_DIRECTORY, IS_BUILDING) -from auth.auth_context import get_authenticated_user from avatars.avatars import Avatar from buildman.manager.buildcanceller import BuildCanceller from data import database @@ -30,7 +29,7 @@ from data.billing import Billing from data.buildlogs import BuildLogs from data.cache import get_model_cache from data.model.user import LoginWrappedDBUser -from data.queue import WorkQueue, BuildMetricQueueReporter +from data.queue import WorkQueue from data.userevent import UserEventsBuilderModule from data.userfiles import Userfiles from data.users import UserAuthentication @@ -52,14 +51,13 @@ from util.names import urn_generator from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager from util.label_validator import LabelValidator -from util.metrics.metricqueue import MetricQueue from util.metrics.prometheus import PrometheusPlugin -from util.saas.cloudwatch import start_cloudwatch_sender from util.secscan.api import SecurityScannerAPI from util.repomirror.api import RepoMirrorAPI from util.tufmetadata.api import TUFMetadataAPI from util.security.instancekeys import InstanceKeys from util.security.signing import Signer +from util.greenlet_tracing import enable_tracing OVERRIDE_CONFIG_YAML_FILENAME = os.path.join(CONF_DIR, 'stack/config.yaml') @@ -205,6 +203,9 @@ def _request_end(resp): return resp +if app.config.get('GREENLET_TRACING', True): + enable_tracing() + root_logger = logging.getLogger() @@ -224,11 +225,10 @@ avatar = Avatar(app) login_manager = LoginManager(app) mail = Mail(app) prometheus = PrometheusPlugin(app) -metric_queue = MetricQueue(prometheus) -chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf, metric_queue=metric_queue) +chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf) instance_keys = InstanceKeys(app) ip_resolver = IPResolver(app) -storage = Storage(app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver) +storage = Storage(app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) analytics = Analytics(app) @@ -244,8 +244,6 @@ instance_keys = InstanceKeys(app) label_validator = LabelValidator(app) build_canceller = BuildCanceller(app) -start_cloudwatch_sender(metric_queue, app) - github_trigger = GithubOAuthService(app.config, 'GITHUB_TRIGGER_CONFIG') gitlab_trigger = GitLabOAuthService(app.config, 'GITLAB_TRIGGER_CONFIG') @@ -253,29 +251,24 @@ oauth_login = OAuthLoginManager(app.config) oauth_apps = [github_trigger, gitlab_trigger] image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf, - has_namespace=False, metric_queue=metric_queue) + has_namespace=False) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, - metric_queue=metric_queue, - reporter=BuildMetricQueueReporter(metric_queue), has_namespace=True) -notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf, has_namespace=True, - metric_queue=metric_queue) +notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf, has_namespace=True) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf, - has_namespace=False, - metric_queue=metric_queue) + has_namespace=False) export_action_logs_queue = WorkQueue(app.config['EXPORT_ACTION_LOGS_QUEUE_NAME'], tf, - has_namespace=True, - metric_queue=metric_queue) + has_namespace=True) # Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied # when a namespace is marked for deletion. -namespace_gc_queue = WorkQueue(app.config['NAMESPACE_GC_QUEUE_NAME'], tf, has_namespace=False, - metric_queue=metric_queue) +namespace_gc_queue = WorkQueue(app.config['NAMESPACE_GC_QUEUE_NAME'], tf, has_namespace=False) all_queues = [image_replication_queue, dockerfile_build_queue, notification_queue, secscan_notification_queue, chunk_cleanup_queue, namespace_gc_queue] -url_scheme_and_hostname = URLSchemeAndHostname(app.config['PREFERRED_URL_SCHEME'], app.config['SERVER_HOSTNAME']) +url_scheme_and_hostname = URLSchemeAndHostname(app.config['PREFERRED_URL_SCHEME'], + app.config['SERVER_HOSTNAME']) secscan_api = SecurityScannerAPI(app.config, storage, app.config['SERVER_HOSTNAME'], app.config['HTTPCLIENT'], uri_creator=get_blob_download_uri_getter(app.test_request_context('/'), url_scheme_and_hostname), instance_keys=instance_keys) @@ -296,6 +289,7 @@ else: if app.config.get('DATABASE_SECRET_KEY') is None and app.config.get('SETUP_COMPLETE', False): raise Exception('Missing DATABASE_SECRET_KEY in config; did you perhaps forget to add it?') + database.configure(app.config) model.config.app_config = app.config diff --git a/auth/decorators.py b/auth/decorators.py index 5fc966140..be6764065 100644 --- a/auth/decorators.py +++ b/auth/decorators.py @@ -2,8 +2,8 @@ import logging from functools import wraps from flask import request, session +from prometheus_client import Counter -from app import metric_queue from auth.basic import validate_basic_auth from auth.oauth import validate_bearer_auth from auth.cookie import validate_session_cookie @@ -14,6 +14,12 @@ from util.http import abort logger = logging.getLogger(__name__) + +authentication_count = Counter('quay_authentication_attempts_total', + 'number of authentication attempts accross the registry and API', + labelnames=['auth_kind', 'success']) + + def _auth_decorator(pass_result=False, handlers=None): """ Builds an auth decorator that runs the given handlers and, if any return successfully, sets up the auth context. The wrapped function will be invoked *regardless of success or @@ -39,13 +45,13 @@ def _auth_decorator(pass_result=False, handlers=None): result.apply_to_context() # Log the metric. - metric_queue.authentication_count.Inc(labelvalues=[result.kind, True]) + authentication_count.labels(result.kind, True).inc() break # Otherwise, report the error. if result.error_message is not None: # Log the failure. - metric_queue.authentication_count.Inc(labelvalues=[result.kind, False]) + authentication_count.labels(result.kind, False).inc() break if pass_result: @@ -72,10 +78,10 @@ def require_session_login(func): result = validate_session_cookie() if result.has_nonrobot_user: result.apply_to_context() - metric_queue.authentication_count.Inc(labelvalues=[result.kind, True]) + authentication_count.labels(result.kind, True).inc() return func(*args, **kwargs) elif not result.missing: - metric_queue.authentication_count.Inc(labelvalues=[result.kind, False]) + authentication_count.labels(result.kind, False).inc() abort(401, message='Method requires login and no valid login could be loaded.') return wrapper diff --git a/auth/registry_jwt_auth.py b/auth/registry_jwt_auth.py index 75be63d73..720daaa95 100644 --- a/auth/registry_jwt_auth.py +++ b/auth/registry_jwt_auth.py @@ -6,7 +6,7 @@ from jsonschema import validate, ValidationError from flask import request, url_for from flask_principal import identity_changed, Identity -from app import app, get_app_url, instance_keys, metric_queue +from app import app, get_app_url, instance_keys from auth.auth_context import set_authenticated_context from auth.auth_context_type import SignedAuthContext from auth.permissions import repository_read_grant, repository_write_grant, repository_admin_grant @@ -89,8 +89,7 @@ def identity_from_bearer_token(bearer_header): logger.debug('Validating auth header: %s', bearer_header) try: - payload = decode_bearer_header(bearer_header, instance_keys, app.config, - metric_queue=metric_queue) + payload = decode_bearer_header(bearer_header, instance_keys, app.config) except InvalidBearerTokenException as bte: logger.exception('Invalid bearer token: %s', bte) raise InvalidJWTException(bte) diff --git a/auth/validateresult.py b/auth/validateresult.py index 3235104e0..0c4dec6a9 100644 --- a/auth/validateresult.py +++ b/auth/validateresult.py @@ -9,6 +9,9 @@ class AuthKind(Enum): signed_grant = 'signed_grant' credentials = 'credentials' + def __str__(self): + return '%s' % self.value + class ValidateResult(object): """ A result of validating auth in one form or another. """ diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 590a90dde..69c16abf2 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -8,9 +8,9 @@ from collections import namedtuple from datetime import datetime, timedelta from six import iteritems +from prometheus_client import Counter, Histogram from trollius import From, coroutine, Return, async, sleep -from app import metric_queue from buildman.orchestrator import (orchestrator_from_config, KeyEvent, OrchestratorError, OrchestratorConnectionError, ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION) @@ -26,6 +26,17 @@ from util.morecollections import AttrDict logger = logging.getLogger(__name__) +build_fallback = Counter('quay_build_fallback_total', + 'number of times a build has been retried', + labelnames=['executor']) +build_ack_duration = Histogram('quay_build_ack_duration_seconds', + 'seconds taken for the builder to acknowledge a queued build', + labelnames=['executor']) +build_duration = Histogram('quay_build_duration_seconds', + "seconds taken for a build's execution", + labelnames=['executor', 'job_status']) + + JOB_PREFIX = 'building/' LOCK_PREFIX = 'lock/' REALM_PREFIX = 'realm/' @@ -428,7 +439,7 @@ class EphemeralBuilderManager(BaseManager): # Check if we can use this executor based on the retries remaining. if executor.minimum_retry_threshold > build_job.retries_remaining: - metric_queue.builder_fallback.Inc() + build_fallback.labels(executor.name).inc() logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)', build_uuid, executor.name, executor.minimum_retry_threshold, build_job.retries_remaining) @@ -440,28 +451,9 @@ class EphemeralBuilderManager(BaseManager): try: execution_id = yield From(executor.start_builder(realm, token, build_uuid)) except: - try: - metric_queue.build_start_failure.Inc(labelvalues=[executor.name]) - metric_queue.put_deprecated(('ExecutorFailure-%s' % executor.name), 1, unit='Count') - except: - logger.exception('Exception when writing failure metric for execution %s for job %s', - execution_id, build_uuid) - logger.exception('Exception when starting builder for job: %s', build_uuid) continue - try: - metric_queue.build_start_success.Inc(labelvalues=[executor.name]) - except: - logger.exception('Exception when writing success metric for execution %s for job %s', - execution_id, build_uuid) - - try: - metric_queue.ephemeral_build_workers.Inc() - except: - logger.exception('Exception when writing start metrics for execution %s for job %s', - execution_id, build_uuid) - started_with_executor = executor # Break out of the loop now that we've started a builder successfully. @@ -542,8 +534,7 @@ class EphemeralBuilderManager(BaseManager): job.build_uuid, build_component.builder_realm) yield From(build_component.start_build(job)) - yield From(self._write_duration_metric(metric_queue.builder_time_to_build, - build_component.builder_realm)) + yield From(self._write_duration_metric(build_ack_duration, build_component.builder_realm)) # Clean up the bookkeeping for allowing any manager to take the job. try: @@ -560,7 +551,8 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Calling job_completed for job %s with status: %s', build_job.build_uuid, job_status) - yield From(self._write_duration_metric(metric_queue.build_time, build_component.builder_realm)) + yield From(self._write_duration_metric(build_duration, build_component.builder_realm, + job_status=job_status)) # Mark the job as completed. Since this is being invoked from the component, we don't need # to ask for the phase to be updated as well. @@ -660,18 +652,16 @@ class EphemeralBuilderManager(BaseManager): yield From(sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)) @coroutine - def _write_duration_metric(self, metric, realm): - """ - :returns: True if the metric was written, otherwise False - :rtype: bool - """ + def _write_duration_metric(self, metric, realm, job_status=None): try: metric_data = yield From(self._orchestrator.get_key(self._metric_key(realm))) parsed_metric_data = json.loads(metric_data) start_time = parsed_metric_data['start_time'] - metric.Observe(time.time() - start_time, - labelvalues=[parsed_metric_data.get('executor_name', - 'unknown')]) + executor = parsed_metric_data.get('executor_name', 'unknown') + if job_status: + metric.labels(executor, job_status).observe(time.time() - start_time) + else: + metric.labels(executor).observe(time.time() - start_time) except Exception: logger.exception("Could not write metric for realm %s", realm) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index e82d7a316..ff83f45f2 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -5,25 +5,25 @@ import os import socket import subprocess import threading +import time import uuid -from functools import partial +from functools import partial, wraps import boto.ec2 import cachetools.func import requests -import trollius from container_cloud_config import CloudConfigContext from jinja2 import FileSystemLoader, Environment -from trollius import coroutine, From, Return, get_event_loop +from trollius import coroutine, sleep, From, Return, get_event_loop +from prometheus_client import Histogram import release -from buildman.asyncutil import AsyncWrapper -from app import metric_queue, app -from util.metrics.metricqueue import duration_collector_async from _init import ROOT_DIR +from app import app +from buildman.asyncutil import AsyncWrapper logger = logging.getLogger(__name__) @@ -38,6 +38,28 @@ ENV = Environment(loader=FileSystemLoader(os.path.join(ROOT_DIR, "buildman/templ TEMPLATE = ENV.get_template('cloudconfig.yaml') CloudConfigContext().populate_jinja_environment(ENV) + +build_start_duration = Histogram('quay_build_start_duration_seconds', + 'seconds taken for a executor to start executing a queued build', + labelnames=['executor'], + buckets=[.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0]) + + +def async_observe(metric, *labels): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + trigger_time = time.time() + try: + rv = func(*args, **kwargs) + except Return as e: + metric.labels(*labels).observe(time.time() - trigger_time) + raise e + return rv + return wrapper + return decorator + + class ExecutorException(Exception): """ Exception raised when there is a problem starting or stopping a builder. """ @@ -160,7 +182,7 @@ class EC2Executor(BuilderExecutor): return stack_amis[ec2_region] @coroutine - @duration_collector_async(metric_queue.builder_time_to_start, ['ec2']) + @async_observe(build_start_duration, 'ec2') def start_builder(self, realm, token, build_uuid): region = self.executor_config['EC2_REGION'] channel = self.executor_config.get('COREOS_CHANNEL', 'stable') @@ -204,7 +226,6 @@ class EC2Executor(BuilderExecutor): )) except boto.exception.EC2ResponseError as ec2e: logger.exception('Unable to spawn builder instance') - metric_queue.ephemeral_build_worker_failure.Inc() raise ec2e if not reservation.instances: @@ -215,7 +236,7 @@ class EC2Executor(BuilderExecutor): launched = AsyncWrapper(reservation.instances[0]) # Sleep a few seconds to wait for AWS to spawn the instance. - yield From(trollius.sleep(_TAG_RETRY_SLEEP)) + yield From(sleep(_TAG_RETRY_SLEEP)) # Tag the instance with its metadata. for i in range(0, _TAG_RETRY_COUNT): @@ -231,7 +252,7 @@ class EC2Executor(BuilderExecutor): if i < _TAG_RETRY_COUNT - 1: logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)', launched.id, build_uuid, i) - yield From(trollius.sleep(_TAG_RETRY_SLEEP)) + yield From(sleep(_TAG_RETRY_SLEEP)) continue raise ExecutorException('Unable to find builder instance.') @@ -269,7 +290,7 @@ class PopenExecutor(BuilderExecutor): """ Executor which uses Popen to fork a quay-builder process. """ @coroutine - @duration_collector_async(metric_queue.builder_time_to_start, ['fork']) + @async_observe(build_start_duration, 'fork') def start_builder(self, realm, token, build_uuid): # Now start a machine for this job, adding the machine id to the etcd information logger.debug('Forking process for build') @@ -491,7 +512,7 @@ class KubernetesExecutor(BuilderExecutor): return job_resource @coroutine - @duration_collector_async(metric_queue.builder_time_to_start, ['k8s']) + @async_observe(build_start_duration, 'k8s') def start_builder(self, realm, token, build_uuid): # generate resource channel = self.executor_config.get('COREOS_CHANNEL', 'stable') diff --git a/buildman/server.py b/buildman/server.py index 7aaf3b66b..762c589b8 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -1,21 +1,23 @@ import logging import json + +from datetime import timedelta +from threading import Event + import trollius -from threading import Event -from datetime import timedelta -from trollius.coroutines import From +from aiowsgi import create_server as create_wsgi_server from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory from autobahn.asyncio.websocket import WampWebSocketServerFactory from autobahn.wamp import types -from aiowsgi import create_server as create_wsgi_server from flask import Flask +from trollius.coroutines import From +from app import app from buildman.enums import BuildJobResult, BuildServerStatus, RESULT_PHASES -from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException +from buildman.jobutil.buildstatus import StatusHandler from data import database, model -from app import app, metric_queue logger = logging.getLogger(__name__) @@ -67,11 +69,10 @@ class BuilderServer(object): @controller_app.route('/status') def status(): - metrics = server._queue.get_metrics() - (running_count, available_not_running_count, available_count) = metrics + (running_count, _available_not_running_count, available_count) = server._queue.get_metrics() workers = [component for component in server._current_components - if component.kind() == 'builder'] + if component.kind() == 'builder'] data = { 'status': server._current_status, @@ -167,8 +168,6 @@ class BuilderServer(object): if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() - _report_completion_status(build_job, job_status, executor_name) - @trollius.coroutine def _work_checker(self): logger.debug('Initializing work checker') @@ -249,18 +248,3 @@ class BuilderServer(object): # Initialize the work queue checker. yield From(self._work_checker()) - -def _report_completion_status(build_job, status, executor_name): - metric_queue.build_counter.Inc(labelvalues=[status]) - metric_queue.repository_build_completed.Inc(labelvalues=[build_job.namespace, build_job.repo_name, - status, executor_name or 'executor']) - if status == BuildJobResult.COMPLETE: - status_name = 'CompleteBuilds' - elif status == BuildJobResult.ERROR: - status_name = 'FailedBuilds' - elif status == BuildJobResult.INCOMPLETE: - status_name = 'IncompletedBuilds' - else: - return - - metric_queue.put_deprecated(status_name, 1, unit='Count') diff --git a/buildman/test/test_buildman.py b/buildman/test/test_buildman.py index 49b9a20fc..b8b849841 100644 --- a/buildman/test/test_buildman.py +++ b/buildman/test/test_buildman.py @@ -6,7 +6,6 @@ from mock import Mock, ANY from six import iteritems from trollius import coroutine, get_event_loop, From, Future, Return -from app import metric_queue from buildman.asyncutil import AsyncWrapper from buildman.component.buildcomponent import BuildComponent from buildman.manager.ephemeral import (EphemeralBuilderManager, REALM_PREFIX, @@ -15,7 +14,6 @@ from buildman.manager.executor import BuilderExecutor, ExecutorException from buildman.orchestrator import KeyEvent, KeyChange from buildman.server import BuildJobResult from util import slash_join -from util.metrics.metricqueue import duration_collector_async BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' @@ -36,7 +34,6 @@ class TestExecutor(BuilderExecutor): job_stopped = None @coroutine - @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"]) def start_builder(self, realm, token, build_uuid): self.job_started = str(uuid.uuid4()) raise Return(self.job_started) @@ -48,7 +45,6 @@ class TestExecutor(BuilderExecutor): class BadExecutor(BuilderExecutor): @coroutine - @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"]) def start_builder(self, realm, token, build_uuid): raise ExecutorException('raised on purpose!') diff --git a/config.py b/config.py index ae742ece8..f0743c6af 100644 --- a/config.py +++ b/config.py @@ -607,3 +607,6 @@ class DefaultConfig(ImmutableConfig): # Feature Flag: Whether garbage collection is enabled. FEATURE_GARBAGE_COLLECTION = True + + # When enabled, sets a tracing callback to report greenlet metrics. + GREENLET_TRACING = True diff --git a/data/queue.py b/data/queue.py index 289f4ad64..33bf2707f 100644 --- a/data/queue.py +++ b/data/queue.py @@ -3,36 +3,41 @@ import uuid from datetime import datetime, timedelta from contextlib import contextmanager +from prometheus_client import Counter, Gauge + from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict +queue_item_puts = Counter('quay_queue_item_puts_total', + 'number of items that have been added to the queue', + labelnames=['queue_name']) +queue_item_gets = Counter('quay_queue_item_gets_total', + 'number of times get() has been called on queue', + labelnames=['queue_name', 'availability']) +queue_item_deletes = Counter('quay_queue_item_deletes_total', + 'number of expired queue items that have been deleted') + +queue_items_locked = Gauge('quay_queue_items_locked', + 'number of queue items that have been acquired', + labelnames=['queue_name']) +queue_items_available = Gauge('quay_queue_items_available', + 'number of queue items that have not expired', + labelnames=['queue_name']) +queue_items_available_unlocked = Gauge('quay_queue_items_available_unlocked', + 'number of queue items that have not expired and are not locked', + labelnames=['queue_name']) + + MINIMUM_EXTENSION = timedelta(seconds=20) DEFAULT_BATCH_SIZE = 1000 -class BuildMetricQueueReporter(object): - """ Metric queue reporter for the build system. """ - def __init__(self, metric_queue): - self._metric_queue = metric_queue - - def __call__(self, currently_processing, running_count, total_count): - need_capacity_count = total_count - running_count - self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count') - self._metric_queue.build_capacity_shortage.Set(need_capacity_count) - - building_percent = 100 if currently_processing else 0 - self._metric_queue.percent_building.Set(building_percent) - - class WorkQueue(object): """ Work queue defines methods for interacting with a queue backed by the database. """ def __init__(self, queue_name, transaction_factory, - canonical_name_match_list=None, reporter=None, metric_queue=None, - has_namespace=False): + canonical_name_match_list=None, has_namespace=False): self._queue_name = queue_name - self._reporter = reporter - self._metric_queue = metric_queue self._transaction_factory = transaction_factory self._currently_processing = False self._has_namespaced_items = has_namespace @@ -129,21 +134,10 @@ class WorkQueue(object): return (running_count, available_not_running_count, available_count) def update_metrics(self): - if self._reporter is None and self._metric_queue is None: - return - (running_count, available_not_running_count, available_count) = self.get_metrics() - - if self._metric_queue: - self._metric_queue.work_queue_running.Set(running_count, labelvalues=[self._queue_name]) - self._metric_queue.work_queue_available.Set(available_count, labelvalues=[self._queue_name]) - self._metric_queue.work_queue_available_not_running.Set(available_not_running_count, - labelvalues=[self._queue_name]) - - - if self._reporter: - self._reporter(self._currently_processing, running_count, - running_count + available_not_running_count) + queue_items_locked.labels(self._queue_name).set(running_count) + queue_items_available.labels(self._queue_name).set(available_count) + queue_items_available_unlocked.labels(self._queue_name).set(available_not_running_count) def has_retries_remaining(self, item_id): """ Returns whether the queue item with the given id has any retries remaining. If the @@ -204,7 +198,9 @@ class WorkQueue(object): # Chunk the inserted items into batch_size chunks and insert_many remaining = list(items_to_insert) while remaining: - QueueItem.insert_many(remaining[0:batch_size]).execute() + current_batch = remaining[0:batch_size] + QueueItem.insert_many(current_batch).execute() + queue_item_puts.labels(self._queue_name).inc(current_batch) remaining = remaining[batch_size:] def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): @@ -214,6 +210,7 @@ class WorkQueue(object): """ item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after, retries_remaining)) + queue_item_puts.labels(self._queue_name).inc() return str(item.id) def _select_available_item(self, ordering_required, now): @@ -289,15 +286,18 @@ class WorkQueue(object): db_item = self._select_available_item(ordering_required, now) if db_item is None: self._currently_processing = False + queue_item_gets.labels(self._queue_name, 'nonexistant').inc() return None # Attempt to claim the item for this instance. was_claimed = self._attempt_to_claim_item(db_item, now, processing_time) if not was_claimed: self._currently_processing = False + queue_item_gets.labels(self._queue_name, 'claimed').inc() return None self._currently_processing = True + queue_item_gets.labels(self._queue_name, 'acquired').inc() # Return a view of the queue item rather than an active db object return AttrDict({ @@ -307,8 +307,8 @@ class WorkQueue(object): }) def cancel(self, item_id): - """ Attempts to cancel the queue item with the given ID from the queue. Returns true on success - and false if the queue item could not be canceled. + """ Attempts to cancel the queue item with the given ID from the queue. + Returns true on success and false if the queue item could not be canceled. """ count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute() return count_removed > 0 @@ -375,4 +375,5 @@ def delete_expired(expiration_threshold, deletion_threshold, batch_size): return 0 QueueItem.delete().where(QueueItem.id << to_delete).execute() + queue_item_deletes.inc(to_delete) return len(to_delete) diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py index 5f99d3ec8..686904d0c 100644 --- a/data/registry_model/blobuploader.py +++ b/data/registry_model/blobuploader.py @@ -7,6 +7,8 @@ from collections import namedtuple import bitmath import resumablehashlib +from prometheus_client import Counter, Histogram + from data.registry_model import registry_model from data.database import CloseForLongOperation, db_transaction from digest import digest_tools @@ -18,6 +20,13 @@ from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) +chunk_upload_duration = Histogram('quay_chunk_upload_duration_seconds', + 'number of seconds for a chunk to be uploaded to the registry', + labelnames=['region']) +pushed_bytes_total = Counter('quay_registry_pushed_bytes_total', + 'number of bytes pushed to the registry') + + BLOB_CONTENT_TYPE = 'application/octet-stream' @@ -125,13 +134,10 @@ class _BlobUploadManager(object): """ Returns the unique ID for the blob upload. """ return self.blob_upload.upload_id - def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None): + def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1): """ Uploads a chunk of data found in the given input file-like interface. start_offset and length are optional and should match a range header if any was given. - If metric_queue is given, the upload time and chunk size are written into the metrics in - the queue. - Returns the total number of bytes uploaded after this upload has completed. Raises a BlobUploadException if the upload failed. """ @@ -207,11 +213,8 @@ class _BlobUploadManager(object): raise BlobUploadException(upload_error) # Update the chunk upload time and push bytes metrics. - if metric_queue is not None: - metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ - length_written, list(location_set)[0]]) - - metric_queue.push_byte_count.Inc(length_written) + chunk_upload_duration.labels(list(location_set)[0]).observe(time.time() - start_time) + pushed_bytes_total.inc(length_written) # Ensure we have not gone beyond the max layer size. new_blob_bytes = self.blob_upload.byte_count + length_written diff --git a/endpoints/api/__init__.py b/endpoints/api/__init__.py index 8dcabe6a3..850dd9abd 100644 --- a/endpoints/api/__init__.py +++ b/endpoints/api/__init__.py @@ -10,7 +10,7 @@ from flask_restful import Resource, abort, Api, reqparse from flask_restful.utils.cors import crossdomain from jsonschema import validate, ValidationError -from app import app, metric_queue, authentication +from app import app, authentication from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission, UserReadPermission, UserAdminPermission) @@ -25,7 +25,7 @@ from endpoints.csrf import csrf_protect from endpoints.exception import (Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired, NotFound) from endpoints.decorators import check_anon_protection, require_xhr_from_browser, check_readonly -from util.metrics.metricqueue import time_decorator +from util.metrics.prometheus import timed_blueprint from util.names import parse_namespace_repository from util.pagination import encrypt_page_token, decrypt_page_token from util.request import get_request_ip @@ -33,7 +33,7 @@ from __init__models_pre_oci import pre_oci_model as model logger = logging.getLogger(__name__) -api_bp = Blueprint('api', __name__) +api_bp = timed_blueprint(Blueprint('api', __name__)) CROSS_DOMAIN_HEADERS = ['Authorization', 'Content-Type', 'X-Requested-With'] @@ -46,10 +46,8 @@ class ApiExceptionHandlingApi(Api): api = ApiExceptionHandlingApi() api.init_app(api_bp) -api.decorators = [csrf_protect(), - crossdomain(origin='*', headers=CROSS_DOMAIN_HEADERS), - process_oauth, time_decorator(api_bp.name, metric_queue), - require_xhr_from_browser] +api.decorators = [csrf_protect(), crossdomain(origin='*', headers=CROSS_DOMAIN_HEADERS), + process_oauth, require_xhr_from_browser] def resource(*urls, **kwargs): @@ -342,7 +340,7 @@ def max_json_size(max_size): def wrapped(self, *args, **kwargs): if request.is_json and len(request.get_data()) > max_size: raise InvalidRequest() - + return func(self, *args, **kwargs) return wrapped return wrapper diff --git a/endpoints/appr/__init__.py b/endpoints/appr/__init__.py index c998d8a95..eb283cde0 100644 --- a/endpoints/appr/__init__.py +++ b/endpoints/appr/__init__.py @@ -5,15 +5,13 @@ from functools import wraps from cnr.exception import Forbidden from flask import Blueprint -from app import metric_queue from auth.permissions import (AdministerRepositoryPermission, ReadRepositoryPermission, ModifyRepositoryPermission) from endpoints.appr.decorators import require_repo_permission -from util.metrics.metricqueue import time_blueprint +from util.metrics.prometheus import timed_blueprint -appr_bp = Blueprint('appr', __name__) -time_blueprint(appr_bp, metric_queue) +appr_bp = timed_blueprint(Blueprint('appr', __name__)) logger = logging.getLogger(__name__) diff --git a/endpoints/building.py b/endpoints/building.py index 247d0a932..7dc2befb4 100644 --- a/endpoints/building.py +++ b/endpoints/building.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from flask import request -from app import app, dockerfile_build_queue, metric_queue +from app import app, dockerfile_build_queue from data import model from data.logs_model import logs_model from data.database import db, RepositoryState @@ -55,7 +55,7 @@ def start_build(repository, prepared_build, pull_robot_name=None): logger.debug('Prevented queueing of build under namespace %s due to reaching max: %s', repository.namespace_user.username, repository.namespace_user.maximum_queued_builds_count) - raise MaximumBuildsQueuedException() + raise MaximumBuildsQueuedException() host = app.config['SERVER_HOSTNAME'] repo_path = '%s/%s/%s' % (host, repository.namespace_user.username, repository.name) @@ -97,10 +97,6 @@ def start_build(repository, prepared_build, pull_robot_name=None): build_request.queue_id = queue_id build_request.save() - # Add the queueing of the build to the metrics queue. - metric_queue.repository_build_queued.Inc(labelvalues=[repository.namespace_user.username, - repository.name]) - # Add the build to the repo's log and spawn the build_queued notification. event_log_metadata = { 'build_id': build_request.uuid, diff --git a/endpoints/metrics.py b/endpoints/metrics.py new file mode 100644 index 000000000..22def202e --- /dev/null +++ b/endpoints/metrics.py @@ -0,0 +1,13 @@ +from prometheus_client import Counter + +image_pulls = Counter('quay_registry_image_pulls_total', + 'number of images that have been downloaded via the registry', + labelnames=['protocol', 'ref', 'status']) + +image_pushes = Counter('quay_registry_image_pushes_total', + 'number of images that have been uploaded via the registry', + labelnames=['protocol', 'status']) + +image_pulled_bytes = Counter('quay_registry_image_pulled_bytes_total', + 'number of bytes that have been downloaded via the registry', + labelnames=['protocol']) diff --git a/endpoints/v1/__init__.py b/endpoints/v1/__init__.py index 2248222d2..62e964ef4 100644 --- a/endpoints/v1/__init__.py +++ b/endpoints/v1/__init__.py @@ -6,14 +6,13 @@ from flask import Blueprint, make_response, jsonify import features -from app import metric_queue, app +from app import app from data.readreplica import ReadOnlyModeException from endpoints.decorators import anon_protect, anon_allowed -from util.metrics.metricqueue import time_blueprint from util.http import abort +from util.metrics.prometheus import timed_blueprint -v1_bp = Blueprint('v1', __name__) -time_blueprint(v1_bp, metric_queue) +v1_bp = timed_blueprint(Blueprint('v1', __name__)) logger = logging.getLogger(__name__) diff --git a/endpoints/v1/index.py b/endpoints/v1/index.py index 3030b20e8..b6e9cc173 100644 --- a/endpoints/v1/index.py +++ b/endpoints/v1/index.py @@ -6,7 +6,7 @@ from functools import wraps from flask import request, make_response, jsonify, session -from app import userevents, metric_queue, storage, docker_v2_signing_key +from app import userevents, storage, docker_v2_signing_key from auth.auth_context import get_authenticated_context, get_authenticated_user from auth.credentials import validate_credentials, CredentialKind from auth.decorators import process_auth @@ -19,6 +19,7 @@ from data.registry_model import registry_model from data.registry_model.manifestbuilder import create_manifest_builder, lookup_manifest_builder from endpoints.decorators import (anon_protect, anon_allowed, parse_repository_name, check_repository_state, check_readonly) +from endpoints.metrics import image_pulls, image_pushes from endpoints.v1 import v1_bp, check_v1_push_enabled from notifications import spawn_notification from util.audit import track_and_log @@ -250,11 +251,13 @@ def update_images(namespace_name, repo_name): kind_filter='image') if repository_ref is None: # Make sure the repo actually exists. + image_pushes.labels('v1', 404).inc() abort(404, message='Unknown repository', issue='unknown-repo') builder = lookup_manifest_builder(repository_ref, session.get('manifest_builder'), storage, docker_v2_signing_key) if builder is None: + image_pushes.labels('v1', 400).inc() abort(400) # Generate a job for each notification that has been added to this repo @@ -267,9 +270,10 @@ def update_images(namespace_name, repo_name): track_and_log('push_repo', repository_ref) spawn_notification(repository_ref, 'repo_push', event_data) - metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1', True]) + image_pushes.labels('v1', 204).inc() return make_response('Updated', 204) + image_pushes.labels('v1', 403).inc() abort(403) @@ -287,6 +291,7 @@ def get_repository_images(namespace_name, repo_name): if permission.can() or (repository_ref and repository_ref.is_public): # We can't rely on permissions to tell us if a repo exists anymore if repository_ref is None: + image_pulls.labels('v1', 'tag', 404).inc() abort(404, message='Unknown repository', issue='unknown-repo') logger.debug('Building repository image response') @@ -296,9 +301,10 @@ def get_repository_images(namespace_name, repo_name): track_and_log('pull_repo', repository_ref, analytics_name='pull_repo_100x', analytics_sample=0.01) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1', True]) + image_pulls.labels('v1', 'tag', 200).inc() return resp + image_pulls.labels('v1', 'tag', 403).inc() abort(403) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 14376cb19..045415669 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -7,7 +7,7 @@ from time import time from flask import make_response, request, session, Response, redirect, abort as flask_abort -from app import storage as store, app, docker_v2_signing_key, metric_queue +from app import storage as store, app, docker_v2_signing_key from auth.auth_context import get_authenticated_user from auth.decorators import extract_namespace_repo_from_session, process_auth from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) @@ -16,14 +16,16 @@ from data.registry_model import registry_model from data.registry_model.blobuploader import upload_blob, BlobUploadSettings, BlobUploadException from data.registry_model.manifestbuilder import lookup_manifest_builder from digest import checksums +from endpoints.metrics import image_pulled_bytes from endpoints.v1 import v1_bp, check_v1_push_enabled from endpoints.v1.index import ensure_namespace_enabled from endpoints.decorators import (anon_protect, check_region_blacklisted, check_repository_state, check_readonly) -from util.http import abort, exact_abort +from util.http import abort from util.registry.replication import queue_storage_replication from util.request import get_request_ip + logger = logging.getLogger(__name__) @@ -127,7 +129,7 @@ def get_image_layer(namespace, repository, image_id, headers): abort(404, 'Image %(image_id)s not found', issue='unknown-image', image_id=image_id) path = legacy_image.blob.storage_path - metric_queue.pull_byte_count.Inc(legacy_image.blob.compressed_size, labelvalues=['v1']) + image_pulled_bytes.labels('v1').inc(legacy_image.blob.compressed_size) try: logger.debug('Looking up the direct download URL for path: %s', path) diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index 845ad258f..94a224954 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -10,10 +10,10 @@ from semantic_version import Spec import features -from app import app, metric_queue, get_app_url +from app import app, get_app_url from auth.auth_context import get_authenticated_context -from auth.permissions import ( - ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission) +from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, + AdministerRepositoryPermission) from auth.registry_jwt_auth import process_registry_jwt_auth, get_auth_headers from data.registry_model import registry_model from data.readreplica import ReadOnlyModeException @@ -21,14 +21,15 @@ from endpoints.decorators import anon_protect, anon_allowed, route_show_if from endpoints.v2.errors import (V2RegistryException, Unauthorized, Unsupported, NameUnknown, ReadOnlyMode) from util.http import abort -from util.metrics.metricqueue import time_blueprint +from util.metrics.prometheus import timed_blueprint from util.registry.dockerver import docker_version from util.pagination import encrypt_page_token, decrypt_page_token + logger = logging.getLogger(__name__) -v2_bp = Blueprint('v2', __name__) -time_blueprint(v2_bp, metric_queue) + +v2_bp = timed_blueprint(Blueprint('v2', __name__)) @v2_bp.app_errorhandler(V2RegistryException) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 141c37990..a7c3d302d 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -3,7 +3,7 @@ import re from flask import url_for, request, redirect, Response, abort as flask_abort -from app import storage, app, get_app_url, metric_queue, model_cache +from app import storage, app, get_app_url, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from auth.permissions import ReadRepositoryPermission from data import database @@ -15,6 +15,7 @@ from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_ from digest import digest_tools from endpoints.decorators import (anon_protect, anon_allowed, parse_repository_name, check_region_blacklisted, check_readonly) +from endpoints.metrics import image_pulled_bytes from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import ( BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge, @@ -83,7 +84,7 @@ def download_blob(namespace_name, repo_name, digest): if storage.get_supports_resumable_downloads(blob.placements): headers['Accept-Ranges'] = 'bytes' - metric_queue.pull_byte_count.Inc(blob.compressed_size, labelvalues=['v2']) + image_pulled_bytes.labels('v2').inc(blob.compressed_size) # Short-circuit by redirecting if the storage supports it. path = blob.storage_path @@ -434,7 +435,7 @@ def _upload_chunk(blob_uploader, commit_digest=None): try: # Upload the data received. - blob_uploader.upload_chunk(app.config, input_fp, start_offset, length, metric_queue) + blob_uploader.upload_chunk(app.config, input_fp, start_offset, length) if commit_digest is not None: # Commit the upload to a blob. diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index b71b3bb3f..ed4a16c4b 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -6,12 +6,13 @@ from flask import request, url_for, Response import features -from app import app, metric_queue, storage +from app import app, storage from auth.registry_jwt_auth import process_registry_jwt_auth from digest import digest_tools from data.registry_model import registry_model from data.model.oci.manifest import CreateManifestException from endpoints.decorators import anon_protect, parse_repository_name, check_readonly +from endpoints.metrics import image_pulls, image_pushes from endpoints.v2 import v2_bp, require_repo_read, require_repo_write from endpoints.v2.errors import (ManifestInvalid, ManifestUnknown, NameInvalid, TagExpired, NameUnknown) @@ -41,6 +42,7 @@ MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN) def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: + image_pulls.labels('v2_1', 'tag', 404).inc() raise NameUnknown() tag = registry_model.get_repo_tag(repository_ref, manifest_ref) @@ -49,23 +51,27 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): logger.debug('Found expired tag %s for repository %s/%s', manifest_ref, namespace_name, repo_name) msg = 'Tag %s was deleted or has expired. To pull, revive via time machine' % manifest_ref + image_pulls.labels('v2_1', 'tag', 404).inc() raise TagExpired(msg) + image_pulls.labels('v2_1', 'tag', 404).inc() raise ManifestUnknown() manifest = registry_model.get_manifest_for_tag(tag, backfill_if_necessary=True) if manifest is None: # Something went wrong. + image_pulls.labels('v2_1', 'tag', 400).inc() raise ManifestInvalid() manifest_bytes, manifest_digest, manifest_media_type = _rewrite_schema_if_necessary( namespace_name, repo_name, manifest_ref, manifest) if manifest_bytes is None: + image_pulls.labels('v2_1', 'tag', 404).inc() raise ManifestUnknown() track_and_log('pull_repo', repository_ref, analytics_name='pull_repo_100x', analytics_sample=0.01, tag=manifest_ref) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + image_pulls.labels('v2_1', 'tag', 200).inc() return Response( manifest_bytes.as_unicode(), @@ -85,19 +91,22 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: + image_pulls.labels('v2_1', 'manifest', 404).inc() raise NameUnknown() manifest = registry_model.lookup_manifest_by_digest(repository_ref, manifest_ref) if manifest is None: + image_pulls.labels('v2_1', 'manifest', 404).inc() raise ManifestUnknown() manifest_bytes, manifest_digest, manifest_media_type = _rewrite_schema_if_necessary( namespace_name, repo_name, '$digest', manifest) if manifest_digest is None: + image_pulls.labels('v2_1', 'manifest', 404).inc() raise ManifestUnknown() track_and_log('pull_repo', repository_ref, manifest_digest=manifest_ref) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + image_pulls.labels('v2_1', 'manifest', 200).inc() return Response(manifest_bytes.as_unicode(), status=200, headers={ 'Content-Type': manifest_media_type, @@ -180,6 +189,7 @@ def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref): def write_manifest_by_digest(namespace_name, repo_name, manifest_ref): parsed = _parse_manifest() if parsed.digest != manifest_ref: + image_pushes.labels('v2_invalid', 400).inc() raise ManifestInvalid(detail={'message': 'manifest digest mismatch'}) if parsed.schema_version != 2: @@ -190,14 +200,17 @@ def write_manifest_by_digest(namespace_name, repo_name, manifest_ref): # manifest with a temporary tag, as it is being pushed as part of a call for a manifest list. repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: + image_pushes.labels('v2_2', 404).inc() raise NameUnknown() expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] manifest = registry_model.create_manifest_with_temp_tag(repository_ref, parsed, expiration_sec, storage) if manifest is None: + image_pushes.labels('v2_2', 400).inc() raise ManifestInvalid() + image_pushes.labels('v2_2', 202).inc() return Response( 'OK', status=202, @@ -271,7 +284,7 @@ def _write_manifest_and_log(namespace_name, repo_name, tag_name, manifest_impl): track_and_log('push_repo', repository_ref, tag=tag_name) spawn_notification(repository_ref, 'repo_push', {'updated_tags': [tag_name]}) - metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + image_pushes.labels('v2_1', 202).inc() return Response( 'OK', diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index 1a7898ab8..1ef512318 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -3,11 +3,14 @@ import json import logging import uuid +from functools import wraps + from flask import redirect, Blueprint, abort, send_file, make_response, request +from prometheus_client import Counter import features -from app import app, signer, storage, metric_queue, config_provider, ip_resolver, instance_keys +from app import app, signer, storage, config_provider, ip_resolver, instance_keys from auth.auth_context import get_authenticated_user from auth.decorators import process_auth from auth.permissions import ReadRepositoryPermission @@ -16,6 +19,7 @@ from data import model from data.registry_model import registry_model from endpoints.decorators import (anon_protect, anon_allowed, route_show_if, parse_repository_name, check_region_blacklisted) +from endpoints.metrics import image_pulls, image_pulled_bytes from endpoints.v2.blob import BLOB_DIGEST_ROUTE from image.appc import AppCImageFormatter from image.docker import ManifestException @@ -32,6 +36,10 @@ from util.registry.torrent import (make_torrent, per_user_torrent_filename, publ logger = logging.getLogger(__name__) +verb_stream_passes = Counter('quay_verb_stream_passes_total', + 'number of passes over a tar stream used by verb requests', + labelnames=['kind']) + verbs = Blueprint('verbs', __name__) LAYER_MIMETYPE = 'binary/octet-stream' @@ -42,7 +50,7 @@ class VerbReporter(TarLayerFormatterReporter): self.kind = kind def report_pass(self, pass_count): - metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count]) + verb_stream_passes.labels(self.kind).inc(pass_count) def _open_stream(formatter, tag, schema1_manifest, derived_image_id, handlers, reporter): @@ -65,7 +73,7 @@ def _open_stream(formatter, tag, schema1_manifest, derived_image_id, handlers, r def tar_stream_getter_iterator(): # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) - store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) + store = Storage(app, config_provider=config_provider, ip_resolver=ip_resolver) # Note: We reverse because we have to start at the leaf layer and move upward, # as per the spec for the formatters. @@ -112,7 +120,7 @@ def _write_derived_image_to_storage(verb, derived_image, queue_file): queue_file.add_exception_handler(handle_exception) # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) - store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) + store = Storage(app, config_provider=config_provider, ip_resolver=ip_resolver) try: store.stream_write(derived_image.blob.placements, derived_image.blob.storage_path, queue_file) @@ -293,12 +301,10 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che # Check for torrent. If found, we return a torrent for the repo verb image (if the derived # image already exists). if request.accept_mimetypes.best == 'application/x-bittorrent': - metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True]) return _torrent_repo_verb(repo, tag, manifest, verb, **kwargs) # Log the action. track_and_log('repo_verb', wrap_repository(repo), tag=tag.name, verb=verb, **kwargs) - metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True]) is_readonly = app.config.get('REGISTRY_STATE', 'normal') == 'readonly' @@ -321,7 +327,7 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che logger.debug('Derived %s image %s exists in storage', verb, derived_image) is_head_request = request.method == 'HEAD' - metric_queue.pull_byte_count.Inc(derived_image.blob.compressed_size, labelvalues=[verb]) + image_pulled_bytes.labels('bittorrent').inc(derived_image.blob.compressed_size) download_url = storage.get_direct_download_url(derived_image.blob.placements, derived_image.blob.storage_path, @@ -435,10 +441,25 @@ def os_arch_checker(os, arch): return checker +def observe_route(protocol): + """ + Decorates get_tag_torrent to record the image_pulls metric into Prometheus. + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + rv = func(*args, **kwargs) + image_pulls.labels(protocol, 'tag', rv.status_code) + return rv + return wrapper + return decorator + + @route_show_if(features.ACI_CONVERSION) @anon_protect @verbs.route('/aci/////sig///', methods=['GET']) @verbs.route('/aci/////aci.asc///', methods=['GET']) +@observe_route('aci') @process_auth def get_aci_signature(server, namespace, repository, tag, os, arch): return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch), @@ -449,6 +470,7 @@ def get_aci_signature(server, namespace, repository, tag, os, arch): @anon_protect @verbs.route('/aci/////aci///', methods=[ 'GET', 'HEAD']) +@observe_route('aci') @process_auth def get_aci_image(server, namespace, repository, tag, os, arch): return _repo_verb(namespace, repository, tag, 'aci', @@ -458,6 +480,7 @@ def get_aci_image(server, namespace, repository, tag, os, arch): @anon_protect @verbs.route('/squash///', methods=['GET']) +@observe_route('squash') @process_auth def get_squashed_tag(namespace, repository, tag): return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImageFormatter()) @@ -466,6 +489,7 @@ def get_squashed_tag(namespace, repository, tag): @route_show_if(features.BITTORRENT) @anon_protect @verbs.route('/torrent{0}'.format(BLOB_DIGEST_ROUTE), methods=['GET']) +@observe_route('bittorrent') @process_auth @parse_repository_name() @check_region_blacklisted(namespace_name_kwarg='namespace_name') @@ -493,7 +517,6 @@ def get_tag_torrent(namespace_name, repo_name, digest): if blob is None: abort(404) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent', True]) return _torrent_for_blob(blob, repo_is_public) diff --git a/pylintrc b/pylintrc index 123b4692d..6205beb17 100644 --- a/pylintrc +++ b/pylintrc @@ -16,7 +16,7 @@ disable=missing-docstring,invalid-name,too-many-locals,too-few-public-methods,to # List of module names for which member attributes should not be checked # (useful for modules/projects where namespaces are manipulated during runtime # and thus extisting member attributes cannot be deduced by static analysis -ignored-modules=features +ignored-modules=features,greenlet # List of members which are set dynamically and missed by pylint inference # system, and so shouldn't trigger E0201 when accessed. Python regular diff --git a/storage/__init__.py b/storage/__init__.py index d7220333e..1b012305a 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -22,42 +22,41 @@ STORAGE_DRIVER_CLASSES = { } -def get_storage_driver(location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver, +def get_storage_driver(location, chunk_cleanup_queue, config_provider, ip_resolver, storage_params): """ Returns a storage driver class for the given storage configuration (a pair of string name and a dict of parameters). """ driver = storage_params[0] parameters = storage_params[1] driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) - context = StorageContext(location, metric_queue, chunk_cleanup_queue, config_provider, + context = StorageContext(location, chunk_cleanup_queue, config_provider, ip_resolver) return driver_class(context, **parameters) class StorageContext(object): - def __init__(self, location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver): + def __init__(self, location, chunk_cleanup_queue, config_provider, ip_resolver): self.location = location - self.metric_queue = metric_queue self.chunk_cleanup_queue = chunk_cleanup_queue self.config_provider = config_provider self.ip_resolver = ip_resolver or NoopIPResolver() class Storage(object): - def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None, + def __init__(self, app=None, chunk_cleanup_queue=None, instance_keys=None, config_provider=None, ip_resolver=None): self.app = app if app is not None: - self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys, + self.state = self.init_app(app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver) else: self.state = None - def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider, + def init_app(self, app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver): storages = {} for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): - storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue, + storages[location] = get_storage_driver(location, chunk_cleanup_queue, config_provider, ip_resolver, storage_params) preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) diff --git a/storage/cloud.py b/storage/cloud.py index 09f34a1f3..f3bad95f6 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -3,27 +3,26 @@ import os import logging import copy -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import padding - -from cachetools.func import lru_cache -from itertools import chain - +from collections import namedtuple from datetime import datetime, timedelta +from io import BufferedIOBase +from itertools import chain +from uuid import uuid4 -from botocore.signers import CloudFrontSigner -from boto.exception import S3ResponseError import boto.s3.connection import boto.s3.multipart import boto.gs.connection import boto.s3.key import boto.gs.key -from io import BufferedIOBase -from uuid import uuid4 -from collections import namedtuple +from boto.exception import S3ResponseError +from botocore.signers import CloudFrontSigner +from cachetools.func import lru_cache +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import padding +from prometheus_client import Counter from util.registry import filelike from storage.basestorage import BaseStorageV2 @@ -31,6 +30,13 @@ from storage.basestorage import BaseStorageV2 logger = logging.getLogger(__name__) + +multipart_uploads_started = Counter('quay_multipart_uploads_started_total', + 'number of multipart uploads to Quay storage that started') +multipart_uploads_completed = Counter('quay_multipart_uploads_completed_total', + 'number of multipart uploads to Quay storage that completed') + + _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _CHUNKS_KEY = 'chunks' @@ -181,8 +187,7 @@ class _CloudStorage(BaseStorageV2): if content_encoding is not None: metadata['Content-Encoding'] = content_encoding - if self._context.metric_queue is not None: - self._context.metric_queue.multipart_upload_start.Inc() + multipart_uploads_started.inc() return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -237,8 +242,7 @@ class _CloudStorage(BaseStorageV2): logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) write_error = e - if self._context.metric_queue is not None: - self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) + multipart_uploads_completed.inc() if cancel_on_error: try: @@ -251,8 +255,7 @@ class _CloudStorage(BaseStorageV2): break if total_bytes_written > 0: - if self._context.metric_queue is not None: - self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['success']) + multipart_uploads_completed.inc() self._perform_action_with_retry(mp.complete_upload) diff --git a/storage/test/test_cloud_storage.py b/storage/test/test_cloud_storage.py index f9f418058..8cc62cbb0 100644 --- a/storage/test/test_cloud_storage.py +++ b/storage/test/test_cloud_storage.py @@ -18,7 +18,7 @@ _TEST_BUCKET = 'some_bucket' _TEST_USER = 'someuser' _TEST_PASSWORD = 'somepassword' _TEST_PATH = 'some/cool/path' -_TEST_CONTEXT = StorageContext('nyc', None, None, None, None) +_TEST_CONTEXT = StorageContext('nyc', None, None, None) @pytest.fixture(scope='function') diff --git a/storage/test/test_cloudfront.py b/storage/test/test_cloudfront.py index face652dc..3ab51c27f 100644 --- a/storage/test/test_cloudfront.py +++ b/storage/test/test_cloudfront.py @@ -44,7 +44,7 @@ def test_direct_download(test_aws_ip, test_empty_ip_range_cache, test_ip_range_c if ipranges_populated: ipresolver.sync_token = test_ip_range_cache['sync_token'] if ipranges_populated else test_empty_ip_range_cache['sync_token'] ipresolver.amazon_ranges = test_ip_range_cache['all_amazon'] if ipranges_populated else test_empty_ip_range_cache['all_amazon'] - context = StorageContext('nyc', None, None, config_provider, ipresolver) + context = StorageContext('nyc', None, config_provider, ipresolver) # Create a test bucket and put some test content. boto.connect_s3().create_bucket(_TEST_BUCKET) @@ -68,7 +68,7 @@ def test_direct_download(test_aws_ip, test_empty_ip_range_cache, test_ip_range_c @mock_s3 def test_direct_download_no_ip(test_aws_ip, aws_ip_range_data, ipranges_populated, app): ipresolver = IPResolver(app) - context = StorageContext('nyc', None, None, config_provider, ipresolver) + context = StorageContext('nyc', None, config_provider, ipresolver) # Create a test bucket and put some test content. boto.connect_s3().create_bucket(_TEST_BUCKET) diff --git a/storage/test/test_swift.py b/storage/test/test_swift.py index 8e0d3a77a..f4b441568 100644 --- a/storage/test/test_swift.py +++ b/storage/test/test_swift.py @@ -11,7 +11,7 @@ from storage.swift import SwiftStorage, _EMPTY_SEGMENTS_KEY from swiftclient.client import ClientException base_args = { - 'context': StorageContext('nyc', None, None, None, None), + 'context': StorageContext('nyc', None, None, None), 'swift_container': 'container-name', 'storage_path': '/basepath', 'auth_url': 'https://auth.com', @@ -265,7 +265,7 @@ def test_cancel_chunked_upload(): chunk_cleanup_queue = FakeQueue() args = dict(base_args) - args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None) + args['context'] = StorageContext('nyc', chunk_cleanup_queue, None, None) swift = FakeSwiftStorage(**args) uuid, metadata = swift.initiate_chunked_upload() @@ -288,7 +288,7 @@ def test_cancel_chunked_upload(): def test_empty_chunks_queued_for_deletion(): chunk_cleanup_queue = FakeQueue() args = dict(base_args) - args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None) + args['context'] = StorageContext('nyc', chunk_cleanup_queue, None, None) swift = FakeSwiftStorage(**args) uuid, metadata = swift.initiate_chunked_upload() diff --git a/test/registry_tests.py b/test/registry_tests.py index 07858fd1c..631fba600 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -31,7 +31,7 @@ from jwkest.jwk import RSAKey import endpoints.decorated # required for side effect -from app import app, storage, instance_keys, get_app_url, metric_queue +from app import app, storage, instance_keys, get_app_url from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image from data import model from digest.checksums import compute_simple @@ -2228,7 +2228,7 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base encoded = response.json()['token'] header = 'Bearer ' + encoded - payload = decode_bearer_header(header, instance_keys, app.config, metric_queue=metric_queue) + payload = decode_bearer_header(header, instance_keys, app.config) self.assertIsNotNone(payload) if scope is None: diff --git a/util/greenlet_tracing.py b/util/greenlet_tracing.py new file mode 100644 index 000000000..e6e1928ce --- /dev/null +++ b/util/greenlet_tracing.py @@ -0,0 +1,64 @@ +from time import time + +from gevent.hub import get_hub +from greenlet import settrace +from prometheus_client import Counter, Histogram + + +greenlet_switch = Counter('greenlet_switch_total', 'number of greenlet context switches') +greenlet_throw = Counter('greenlet_throw_total', 'number of greenlet throws') +greenlet_duration = Histogram('greenlet_duration_seconds', + 'seconds in which a particular greenlet is executing', + buckets=[.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]) + +_latest_switch = None + +def enable_tracing(): + settrace(greenlet_callback) + + +def greenlet_callback(event, args): + """ + This is a callback that is executed greenlet on all events. + """ + if event in ('switch', 'throw'): + # It's only safe to unpack args under these two events. + (origin, _target) = args + + if origin is get_hub(): + # This greenlet is the one that manages the loop itself, thus noop. + return + + if event == 'switch': + switch_callback(args) + return + if event == 'throw': + throw_callback(args) + return + + +def switch_callback(_args): + """ + This is a callback that is executed specifically on greenlet switches. + """ + global _latest_switch + greenlet_switch.inc() + + if _latest_switch is None: + # This is the first switch. + _latest_switch = time() + return + + now = time() + greenlet_duration.observe(now - _latest_switch) + _latest_switch = now + + +def throw_callback(_args): + """ + This is a callback that is executed on execeptions from origin to target. + + This callback is running in the context of the target greenlet and any exceptions will replace + the original, as if target.throw() was used replacing the exception. + """ + greenlet_throw.inc() diff --git a/util/metrics/metricqueue.py b/util/metrics/metricqueue.py deleted file mode 100644 index 30e3974a0..000000000 --- a/util/metrics/metricqueue.py +++ /dev/null @@ -1,210 +0,0 @@ -import datetime -import logging -import time - -from functools import wraps -from Queue import Queue, Full - -from flask import g, request -from trollius import Return - - -logger = logging.getLogger(__name__) - -# Buckets for the API response times. -API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0] - -# Buckets for the builder start times. -BUILDER_START_TIME_BUCKETS = [.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0] - - -class MetricQueue(object): - """ Object to which various metrics are written, for distribution to metrics collection - system(s) such as Prometheus. - """ - def __init__(self, prom): - # Define the various exported metrics. - self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds', - labelnames=['endpoint'], - buckets=API_RESPONSE_TIME_BUCKETS) - self.resp_code = prom.create_counter('response_code', 'HTTP response code', - labelnames=['endpoint', 'code']) - self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes', - labelnames=['endpoint']) - self.error_500 = prom.create_counter('response_500', '5XX HTTP response codes', - labelnames=['endpoint']) - self.multipart_upload_start = prom.create_counter('multipart_upload_start', - 'Multipart upload started') - self.multipart_upload_end = prom.create_counter('multipart_upload_end', - 'Multipart upload ends.', labelnames=['type']) - self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage', - 'Build capacity shortage.') - self.builder_time_to_start = prom.create_histogram('builder_tts', - 'Time from triggering to starting a builder.', - labelnames=['builder_type'], - buckets=BUILDER_START_TIME_BUCKETS) - self.builder_time_to_build = prom.create_histogram('builder_ttb', - 'Time from triggering to actually starting a build', - labelnames=['builder_type'], - buckets=BUILDER_START_TIME_BUCKETS) - self.build_time = prom.create_histogram('build_time', 'Time spent building', labelnames=['builder_type']) - self.builder_fallback = prom.create_counter('builder_fallback', 'Builder fell back to secondary executor') - self.build_start_success = prom.create_counter('build_start_success', 'Executor succeeded in starting a build', labelnames=['builder_type']) - self.build_start_failure = prom.create_counter('build_start_failure', 'Executor failed to start a build', labelnames=['builder_type']) - self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.') - self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name']) - self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers', - 'Number of started ephemeral build workers') - self.ephemeral_build_worker_failure = prom.create_counter('ephemeral_build_worker_failure', - 'Number of failed-to-start ephemeral build workers') - - self.work_queue_running = prom.create_gauge('work_queue_running', 'Running items in a queue', - labelnames=['queue_name']) - self.work_queue_available = prom.create_gauge('work_queue_available', - 'Available items in a queue', - labelnames=['queue_name']) - - self.work_queue_available_not_running = prom.create_gauge('work_queue_available_not_running', - 'Available items that are not yet running', - labelnames=['queue_name']) - - self.repository_pull = prom.create_counter('repository_pull', 'Repository Pull Count', - labelnames=['namespace', 'repo_name', 'protocol', - 'status']) - - self.repository_push = prom.create_counter('repository_push', 'Repository Push Count', - labelnames=['namespace', 'repo_name', 'protocol', - 'status']) - - self.repository_build_queued = prom.create_counter('repository_build_queued', - 'Repository Build Queued Count', - labelnames=['namespace', 'repo_name']) - - self.repository_build_completed = prom.create_counter('repository_build_completed', - 'Repository Build Complete Count', - labelnames=['namespace', 'repo_name', - 'status', 'executor']) - - self.chunk_size = prom.create_histogram('chunk_size', - 'Registry blob chunk size', - labelnames=['storage_region']) - - self.chunk_upload_time = prom.create_histogram('chunk_upload_time', - 'Registry blob chunk upload time', - labelnames=['storage_region']) - - self.authentication_count = prom.create_counter('authentication_count', - 'Authentication count', - labelnames=['kind', 'status']) - - self.repository_count = prom.create_gauge('repository_count', 'Number of repositories') - self.user_count = prom.create_gauge('user_count', 'Number of users') - self.org_count = prom.create_gauge('org_count', 'Number of Organizations') - self.robot_count = prom.create_gauge('robot_count', 'Number of robot accounts') - - self.instance_key_renewal_success = prom.create_counter('instance_key_renewal_success', - 'Instance Key Renewal Success Count', - labelnames=['key_id']) - - self.instance_key_renewal_failure = prom.create_counter('instance_key_renewal_failure', - 'Instance Key Renewal Failure Count', - labelnames=['key_id']) - - self.invalid_instance_key_count = prom.create_counter('invalid_registry_instance_key_count', - 'Invalid registry instance key count', - labelnames=['key_id']) - - self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count', - labelnames=['kind', 'pass_count']) - - self.push_byte_count = prom.create_counter('registry_push_byte_count', - 'Number of bytes pushed to the registry') - - self.pull_byte_count = prom.create_counter('estimated_registry_pull_byte_count', - 'Number of (estimated) bytes pulled from the registry', - labelnames=['protocol_version']) - - # Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another - # provider. - self._queue = None - - def enable_deprecated(self, maxsize=10000): - self._queue = Queue(maxsize) - - def put_deprecated(self, name, value, **kwargs): - if self._queue is None: - logger.debug('No metric queue %s %s %s', name, value, kwargs) - return - - try: - kwargs.setdefault('timestamp', datetime.datetime.now()) - kwargs.setdefault('dimensions', {}) - self._queue.put_nowait((name, value, kwargs)) - except Full: - logger.error('Metric queue full') - - def get_deprecated(self): - return self._queue.get() - - def get_nowait_deprecated(self): - return self._queue.get_nowait() - - -def duration_collector_async(metric, labelvalues): - """ Decorates a method to have its duration time logged to the metric. """ - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - trigger_time = time.time() - try: - rv = func(*args, **kwargs) - except Return as e: - metric.Observe(time.time() - trigger_time, labelvalues=labelvalues) - raise e - return rv - return wrapper - return decorator - - -def time_decorator(name, metric_queue): - """ Decorates an endpoint method to have its request time logged to the metrics queue. """ - after = _time_after_request(name, metric_queue) - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - _time_before_request() - rv = func(*args, **kwargs) - after(rv) - return rv - return wrapper - return decorator - - -def time_blueprint(bp, metric_queue): - """ Decorates a blueprint to have its request time logged to the metrics queue. """ - bp.before_request(_time_before_request) - bp.after_request(_time_after_request(bp.name, metric_queue)) - - -def _time_before_request(): - g._request_start_time = time.time() - - -def _time_after_request(name, metric_queue): - def f(r): - start = getattr(g, '_request_start_time', None) - if start is None: - return r - - dur = time.time() - start - - metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint]) - metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) - - if r.status_code >= 500: - metric_queue.error_500.Inc(labelvalues=[request.endpoint]) - elif r.status_code < 200 or r.status_code >= 300: - metric_queue.non_200.Inc(labelvalues=[request.endpoint]) - - return r - return f diff --git a/util/metrics/prometheus.py b/util/metrics/prometheus.py index 1461f143d..f281bc9fe 100644 --- a/util/metrics/prometheus.py +++ b/util/metrics/prometheus.py @@ -1,18 +1,23 @@ -import datetime -import json import logging +import time +import threading -from Queue import Queue, Full, Empty -from threading import Thread - -import requests +from flask import g, request +from prometheus_client import push_to_gateway, REGISTRY, Histogram logger = logging.getLogger(__name__) -QUEUE_MAX = 1000 -MAX_BATCH_SIZE = 100 -REGISTER_WAIT = datetime.timedelta(hours=1) + +request_duration = Histogram('quay_request_duration_seconds', + 'seconds taken to process a request', + labelnames=['method', 'endpoint', 'status'], + buckets=[.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]) + + +PROMETHEUS_PUSH_INTERVAL_SECONDS = 30 +ONE_DAY_IN_SECONDS = 60 * 60 * 24 + class PrometheusPlugin(object): """ Application plugin for reporting metrics to Prometheus. """ @@ -24,145 +29,54 @@ class PrometheusPlugin(object): self.state = None def init_app(self, app): - prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL') - prom_namespace = app.config.get('PROMETHEUS_NAMESPACE') - logger.debug('Initializing prometheus with aggregator url: %s', prom_url) - prometheus = Prometheus(prom_url, prom_namespace) + pusher = ThreadPusher(app) + pusher.start() # register extension with app app.extensions = getattr(app, 'extensions', {}) - app.extensions['prometheus'] = prometheus - return prometheus + app.extensions['prometheus'] = pusher + return pusher def __getattr__(self, name): return getattr(self.state, name, None) -class Prometheus(object): - """ Aggregator for collecting stats that are reported to Prometheus. """ - def __init__(self, url=None, namespace=None): - self._metric_collectors = [] - self._url = url - self._namespace = namespace or '' - - if url is not None: - self._queue = Queue(QUEUE_MAX) - self._sender = _QueueSender(self._queue, url, self._metric_collectors) - self._sender.start() - logger.debug('Prometheus aggregator sending to %s', url) - else: - self._queue = None - logger.debug('Prometheus aggregator disabled') - - def enqueue(self, call, data): - if not self._queue: - return - - v = json.dumps({ - 'Call': call, - 'Data': data, - }) - - if call == 'register': - self._metric_collectors.append(v) - return - - try: - self._queue.put_nowait(v) - except Full: - # If the queue is full, it is because 1) no aggregator was enabled or 2) - # the aggregator is taking a long time to respond to requests. In the case - # of 1, it's probably enterprise mode and we don't care. In the case of 2, - # the response timeout error is printed inside the queue handler. In either case, - # we don't need to print an error here. - pass - - def create_gauge(self, *args, **kwargs): - return self._create_collector('Gauge', args, kwargs) - - def create_counter(self, *args, **kwargs): - return self._create_collector('Counter', args, kwargs) - - def create_summary(self, *args, **kwargs): - return self._create_collector('Summary', args, kwargs) - - def create_histogram(self, *args, **kwargs): - return self._create_collector('Histogram', args, kwargs) - - def create_untyped(self, *args, **kwargs): - return self._create_collector('Untyped', args, kwargs) - - def _create_collector(self, collector_type, args, kwargs): - kwargs['namespace'] = kwargs.get('namespace', self._namespace) - return _Collector(self.enqueue, collector_type, *args, **kwargs) - - -class _QueueSender(Thread): - """ Helper class which uses a thread to asynchronously send metrics to the local Prometheus - aggregator. """ - def __init__(self, queue, url, metric_collectors): - Thread.__init__(self) +class ThreadPusher(threading.Thread): + def __init__(self, app): + super(ThreadPusher, self).__init__() self.daemon = True - self.next_register = datetime.datetime.now() - self._queue = queue - self._url = url - self._metric_collectors = metric_collectors + self._app = app def run(self): + agg_url = self._app.config.get('PROMETHEUS_AGGREGATOR_URL') while True: - reqs = [] - reqs.append(self._queue.get()) + if agg_url is None: + # Practically disable this worker, if there is no aggregator. + time.sleep(ONE_DAY_IN_SECONDS) + continue - while len(reqs) < MAX_BATCH_SIZE: - try: - req = self._queue.get_nowait() - reqs.append(req) - except Empty: - break - - try: - resp = requests.post(self._url + '/call', '\n'.join(reqs)) - if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): - resp = requests.post(self._url + '/call', '\n'.join(self._metric_collectors)) - self.next_register = datetime.datetime.now() + REGISTER_WAIT - logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, - len(self._metric_collectors), self.next_register) - elif resp.status_code != 200: - logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, - ', '.join(reqs)) - else: - logger.debug('Sent %d prometheus metrics', len(reqs)) - except: - logger.exception('Failed to write to prometheus aggregator: %s', reqs) + time.sleep(PROMETHEUS_PUSH_INTERVAL_SECONDS) + push_to_gateway(agg_url, job=self._app.config.get('PROMETHEUS_NAMESPACE', 'quay'), + registry=REGISTRY) -class _Collector(object): - """ Collector for a Prometheus metric. """ - def __init__(self, enqueue_method, collector_type, collector_name, collector_help, - namespace='', subsystem='', **kwargs): - self._enqueue_method = enqueue_method - self._base_args = { - 'Name': collector_name, - 'Namespace': namespace, - 'Subsystem': subsystem, - 'Type': collector_type, - } +def timed_blueprint(bp): + """ + Decorates a blueprint to have its request duration tracked by Prometheus. + """ + def _time_before_request(): + g._request_start_time = time.time() + bp.before_request(_time_before_request) - registration_params = dict(kwargs) - registration_params.update(self._base_args) - registration_params['Help'] = collector_help - - self._enqueue_method('register', registration_params) - - def __getattr__(self, method): - def f(value=0, labelvalues=()): - data = dict(self._base_args) - data.update({ - 'Value': value, - 'LabelValues': [str(i) for i in labelvalues], - 'Method': method, - }) - - self._enqueue_method('put', data) + def _time_after_request(): + def f(r): + start = getattr(g, '_request_start_time', None) + if start is None: + return r + dur = time.time() - start + request_duration.labels(request.method, request.endpoint, r.status_code).observe(dur) + return r return f + bp.after_request(_time_after_request()) + return bp diff --git a/util/metrics/test/test_metricqueue.py b/util/metrics/test/test_metricqueue.py deleted file mode 100644 index 62f1665d9..000000000 --- a/util/metrics/test/test_metricqueue.py +++ /dev/null @@ -1,58 +0,0 @@ -import time - -import pytest - -from mock import Mock -from trollius import coroutine, Return, get_event_loop, From - -from util.metrics.metricqueue import duration_collector_async - - -mock_histogram = Mock() - -class NonReturn(Exception): - pass - - -@coroutine -@duration_collector_async(mock_histogram, labelvalues=["testlabel"]) -def duration_decorated(): - time.sleep(1) - raise Return("fin") - - -@coroutine -@duration_collector_async(mock_histogram, labelvalues=["testlabel"]) -def duration_decorated_error(): - raise NonReturn("not a Return error") - -@coroutine -def calls_decorated(): - yield From(duration_decorated()) - - -def test_duration_decorator(): - loop = get_event_loop() - loop.run_until_complete(duration_decorated()) - assert mock_histogram.Observe.called - assert 1 - mock_histogram.Observe.call_args[0][0] < 1 # duration should be close to 1s - assert mock_histogram.Observe.call_args[1]["labelvalues"] == ["testlabel"] - - -def test_duration_decorator_error(): - loop = get_event_loop() - mock_histogram.reset_mock() - - with pytest.raises(NonReturn): - loop.run_until_complete(duration_decorated_error()) - assert not mock_histogram.Observe.called - - -def test_duration_decorator_caller(): - mock_histogram.reset_mock() - - loop = get_event_loop() - loop.run_until_complete(calls_decorated()) - assert mock_histogram.Observe.called - assert 1 - mock_histogram.Observe.call_args[0][0] < 1 # duration should be close to 1s - assert mock_histogram.Observe.call_args[1]["labelvalues"] == ["testlabel"] diff --git a/util/registry/queueprocess.py b/util/registry/queueprocess.py index 5cf3f20d0..1686d30ab 100644 --- a/util/registry/queueprocess.py +++ b/util/registry/queueprocess.py @@ -7,9 +7,11 @@ import time import sys import traceback + logger = multiprocessing.log_to_stderr() logger.setLevel(logging.INFO) + class QueueProcess(object): """ Helper class which invokes a worker in a process to produce data for one (or more) queues. diff --git a/util/saas/cloudwatch.py b/util/saas/cloudwatch.py index c38df655a..18ff83bb4 100644 --- a/util/saas/cloudwatch.py +++ b/util/saas/cloudwatch.py @@ -1,20 +1,23 @@ import logging -import boto import time import random from Queue import Empty from threading import Thread +import boto + logger = logging.getLogger(__name__) + MAX_BATCH_METRICS = 20 # Sleep for this much time between failed send requests. # This prevents hammering cloudwatch when it's not available. FAILED_SEND_SLEEP_SECS = 15 + def start_cloudwatch_sender(metrics, app): """ Starts sending from metrics to a new CloudWatchSender. @@ -76,10 +79,9 @@ class CloudWatchSender(Thread): except: for i in range(len(metrics['name'])): self._metrics.put_deprecated(metrics['name'][i], metrics['value'][i], - unit=metrics['unit'][i], - dimensions=metrics['dimensions'][i], - timestamp=metrics['timestamp'][i], - ) + unit=metrics['unit'][i], + dimensions=metrics['dimensions'][i], + timestamp=metrics['timestamp'][i]) logger.exception('Failed to write to CloudWatch: %s', metrics) logger.debug('Attempted to requeue %d metrics.', len(metrics['name'])) diff --git a/util/security/registry_jwt.py b/util/security/registry_jwt.py index 6a56c344b..492156749 100644 --- a/util/security/registry_jwt.py +++ b/util/security/registry_jwt.py @@ -1,11 +1,23 @@ -import time -import jwt import logging +import time + +from functools import wraps + +import jwt + +from prometheus_client import Counter from util.security import jwtutil + logger = logging.getLogger(__name__) + +bearer_token_decoded = Counter('bearer_token_decoded_total', + 'number of times a bearer token has been validated', + labelnames=['success']) + + ANONYMOUS_SUB = '(anonymous)' ALGORITHM = 'RS256' CLAIM_TUF_ROOTS = 'com.apostille.roots' @@ -23,7 +35,7 @@ class InvalidBearerTokenException(Exception): pass -def decode_bearer_header(bearer_header, instance_keys, config, metric_queue=None): +def decode_bearer_header(bearer_header, instance_keys, config): """ decode_bearer_header decodes the given bearer header that contains an encoded JWT with both a Key ID as well as the signed JWT and returns the decoded and validated JWT. On any error, raises an InvalidBearerTokenException with the reason for failure. @@ -35,10 +47,30 @@ def decode_bearer_header(bearer_header, instance_keys, config, metric_queue=None encoded_jwt = match.group(1) logger.debug('encoded JWT: %s', encoded_jwt) - return decode_bearer_token(encoded_jwt, instance_keys, config, metric_queue=metric_queue) + return decode_bearer_token(encoded_jwt, instance_keys, config) -def decode_bearer_token(bearer_token, instance_keys, config, metric_queue=None): +def observe_decode(): + """ + Decorates `decode_bearer_tokens` to record a metric into Prometheus such that any exceptions + raised get recorded as a failure and the return of a payload is considered a success. + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + rv = func(*args, **kwargs) + except Exception as e: + bearer_token_decoded.labels(False).inc() + raise e + bearer_token_decoded.labels(True).inc() + return rv + return wrapper + return decorator + + +@observe_decode() +def decode_bearer_token(bearer_token, instance_keys, config): """ decode_bearer_token decodes the given bearer token that contains both a Key ID as well as the encoded JWT and returns the decoded and validated JWT. On any error, raises an InvalidBearerTokenException with the reason for failure. @@ -58,9 +90,6 @@ def decode_bearer_token(bearer_token, instance_keys, config, metric_queue=None): # Find the matching public key. public_key = instance_keys.get_service_key_public_key(kid) if public_key is None: - if metric_queue is not None: - metric_queue.invalid_instance_key_count.Inc(labelvalues=[kid]) - logger.error('Could not find requested service key %s with encoded JWT: %s', kid, bearer_token) raise InvalidBearerTokenException('Unknown service key') diff --git a/workers/blobuploadcleanupworker/models_pre_oci.py b/workers/blobuploadcleanupworker/models_pre_oci.py index 97db6e159..80ed2f177 100644 --- a/workers/blobuploadcleanupworker/models_pre_oci.py +++ b/workers/blobuploadcleanupworker/models_pre_oci.py @@ -2,8 +2,8 @@ from datetime import datetime, timedelta from data import model from data.database import BlobUpload as BlobUploadTable -from workers.blobuploadcleanupworker.models_interface import ( - BlobUpload, BlobUploadCleanupWorkerDataInterface) +from workers.blobuploadcleanupworker.models_interface import (BlobUpload, + BlobUploadCleanupWorkerDataInterface) class PreOCIModel(BlobUploadCleanupWorkerDataInterface): diff --git a/workers/buildlogsarchiver/buildlogsarchiver.py b/workers/buildlogsarchiver/buildlogsarchiver.py index ddc6fcc42..56bd69de5 100644 --- a/workers/buildlogsarchiver/buildlogsarchiver.py +++ b/workers/buildlogsarchiver/buildlogsarchiver.py @@ -10,12 +10,14 @@ from util.streamingjsonencoder import StreamingJSONEncoder from workers.buildlogsarchiver.models_pre_oci import pre_oci_model as model from workers.worker import Worker -POLL_PERIOD_SECONDS = 30 -MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of builds in memory logger = logging.getLogger(__name__) +POLL_PERIOD_SECONDS = 30 +MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of builds in memory + + class ArchiveBuildLogsWorker(Worker): def __init__(self): super(ArchiveBuildLogsWorker, self).__init__() diff --git a/workers/expiredappspecifictokenworker.py b/workers/expiredappspecifictokenworker.py index 9db1aed7f..8ad981ec4 100644 --- a/workers/expiredappspecifictokenworker.py +++ b/workers/expiredappspecifictokenworker.py @@ -9,10 +9,13 @@ from workers.worker import Worker from util.log import logfile_path from util.timedeltastring import convert_to_timedelta -POLL_PERIOD_SECONDS = 60 * 60 # 1 hour logger = logging.getLogger(__name__) + +POLL_PERIOD_SECONDS = 60 * 60 # 1 hour + + class ExpiredAppSpecificTokenWorker(Worker): def __init__(self): super(ExpiredAppSpecificTokenWorker, self).__init__() @@ -38,7 +41,7 @@ if __name__ == "__main__": logger.debug('App specific tokens disabled; skipping') while True: time.sleep(100000) - + if app.config.get('EXPIRED_APP_SPECIFIC_TOKEN_GC') is None: logger.debug('GC of App specific tokens is disabled; skipping') while True: diff --git a/workers/exportactionlogsworker.py b/workers/exportactionlogsworker.py index 11b6478ea..8c37db482 100644 --- a/workers/exportactionlogsworker.py +++ b/workers/exportactionlogsworker.py @@ -15,10 +15,11 @@ from app import app, export_action_logs_queue, storage as app_storage, get_app_u from endpoints.api import format_date from data.logs_model import logs_model from data.logs_model.interface import LogsIterationTimeout -from workers.queueworker import QueueWorker, JobException +from workers.queueworker import QueueWorker from util.log import logfile_path from util.useremails import send_logs_exported_email + logger = logging.getLogger(__name__) diff --git a/workers/gc/gcworker.py b/workers/gc/gcworker.py index 6707cf1cd..794597189 100644 --- a/workers/gc/gcworker.py +++ b/workers/gc/gcworker.py @@ -9,8 +9,10 @@ from data.model.repository import find_repository_with_garbage, get_random_gc_po from data.model.gc import garbage_collect_repo from workers.worker import Worker + logger = logging.getLogger(__name__) + class GarbageCollectionWorker(Worker): def __init__(self): super(GarbageCollectionWorker, self).__init__() diff --git a/workers/globalpromstats/globalpromstats.py b/workers/globalpromstats/globalpromstats.py index 9b97022c3..dc78a146a 100644 --- a/workers/globalpromstats/globalpromstats.py +++ b/workers/globalpromstats/globalpromstats.py @@ -1,15 +1,25 @@ import logging import time -from app import app, metric_queue +from prometheus_client import Gauge + +from app import app from data.database import UseThenDisconnect -from workers.globalpromstats.models_pre_oci import pre_oci_model as model from util.locking import GlobalLock, LockNotAcquiredException from util.log import logfile_path +from workers.globalpromstats.models_pre_oci import pre_oci_model as model from workers.worker import Worker + logger = logging.getLogger(__name__) + +repository_rows = Gauge('quay_repository_rows', 'number of repositories in the database') +user_rows = Gauge('quay_user_rows', 'number of users in the database') +org_rows = Gauge('quay_org_rows', 'number of organizations in the database') +robot_rows = Gauge('quay_robot_rows', 'number of robot accounts in the database') + + WORKER_FREQUENCY = app.config.get('GLOBAL_PROMETHEUS_STATS_FREQUENCY', 60 * 60) @@ -33,13 +43,10 @@ class GlobalPrometheusStatsWorker(Worker): def _report_stats(self): logger.debug('Reporting global stats') with UseThenDisconnect(app.config): - # Repository count. - metric_queue.repository_count.Set(model.get_repository_count()) - - # User counts. - metric_queue.user_count.Set(model.get_active_user_count()) - metric_queue.org_count.Set(model.get_active_org_count()) - metric_queue.robot_count.Set(model.get_robot_count()) + repository_rows.set(model.get_repository_count()) + user_rows.set(model.get_active_user_count()) + org_rows.set(model.get_active_org_count()) + robot_rows.set(model.get_robot_count()) def main(): diff --git a/workers/globalpromstats/test/test_globalpromstats.py b/workers/globalpromstats/test/test_globalpromstats.py deleted file mode 100644 index 3256f251f..000000000 --- a/workers/globalpromstats/test/test_globalpromstats.py +++ /dev/null @@ -1,15 +0,0 @@ -from mock import patch, Mock - -from workers.globalpromstats.globalpromstats import GlobalPrometheusStatsWorker - -from test.fixtures import * - -def test_reportstats(initialized_db): - mock = Mock() - with patch('workers.globalpromstats.globalpromstats.metric_queue', mock): - worker = GlobalPrometheusStatsWorker() - worker._report_stats() - - mock.repository_count.Set.assert_called_once() - mock.org_count.Set.assert_called_once() - mock.robot_count.Set.assert_called_once() diff --git a/workers/labelbackfillworker.py b/workers/labelbackfillworker.py index b2407f606..741fb4fa1 100644 --- a/workers/labelbackfillworker.py +++ b/workers/labelbackfillworker.py @@ -12,10 +12,13 @@ from workers.worker import Worker from util.log import logfile_path from util.migrate.allocator import yield_random_entries + logger = logging.getLogger(__name__) + WORKER_TIMEOUT = 600 + class LabelBackfillWorker(Worker): def __init__(self): super(LabelBackfillWorker, self).__init__() diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index c154029bb..b7a26bf20 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -16,8 +16,10 @@ from util.streamingjsonencoder import StreamingJSONEncoder from util.timedeltastring import convert_to_timedelta from workers.worker import Worker + logger = logging.getLogger(__name__) + JSON_MIMETYPE = 'application/json' MIN_LOGS_PER_ROTATION = 5000 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 diff --git a/workers/notificationworker/test/test_notificationworker.py b/workers/notificationworker/test/test_notificationworker.py index c414b52e1..19fb8a185 100644 --- a/workers/notificationworker/test/test_notificationworker.py +++ b/workers/notificationworker/test/test_notificationworker.py @@ -14,6 +14,7 @@ from test.fixtures import * from workers.notificationworker.models_pre_oci import pre_oci_model as model + def test_basic_notification_endtoend(initialized_db): # Ensure the public user doesn't have any notifications. assert not model.user_has_local_notifications('public') diff --git a/workers/queueworker.py b/workers/queueworker.py index 8aed9fa45..4386fea62 100644 --- a/workers/queueworker.py +++ b/workers/queueworker.py @@ -7,8 +7,10 @@ from app import app from data.database import CloseForLongOperation from workers.worker import Worker + logger = logging.getLogger(__name__) + class JobException(Exception): """ A job exception is an exception that is caused by something being malformed in the job. When a worker raises this exception the job will be terminated and the retry will not be returned diff --git a/workers/repomirrorworker/__init__.py b/workers/repomirrorworker/__init__.py index ceb48f7a6..2fc404bf9 100644 --- a/workers/repomirrorworker/__init__.py +++ b/workers/repomirrorworker/__init__.py @@ -4,9 +4,11 @@ import traceback import fnmatch import logging.config +from prometheus_client import Gauge + import features -from app import app, prometheus +from app import app from data import database from data.model.repo_mirror import claim_mirror, release_mirror from data.logs_model import logs_model @@ -16,12 +18,15 @@ from data.model.oci.tag import delete_tag, retarget_tag, lookup_alive_tags_shall from notifications import spawn_notification from util.audit import wrap_repository - from workers.repomirrorworker.repo_mirror_model import repo_mirror_model as model + logger = logging.getLogger(__name__) -unmirrored_repositories_gauge = prometheus.create_gauge('unmirrored_repositories', - 'Number of repositories that need to be scanned.') + + +unmirrored_repositories = Gauge('quay_repository_rows_unmirrored', + 'number of repositories in the database that have not yet been mirrored') + class PreemptedException(Exception): """ Exception raised if another worker analyzed the image before this worker was able to do so. @@ -61,7 +66,7 @@ def process_mirrors(skopeo, token=None): logger.exception('Repository Mirror service unavailable') return None - unmirrored_repositories_gauge.Set(num_remaining) + unmirrored_repositories.set(num_remaining) return next_token diff --git a/workers/repositoryactioncounter.py b/workers/repositoryactioncounter.py index e6d6b835d..fb9f2f806 100644 --- a/workers/repositoryactioncounter.py +++ b/workers/repositoryactioncounter.py @@ -7,10 +7,13 @@ from data import model from data.logs_model import logs_model from workers.worker import Worker, with_exponential_backoff -POLL_PERIOD_SECONDS = 10 logger = logging.getLogger(__name__) + +POLL_PERIOD_SECONDS = 10 + + class RepositoryActionCountWorker(Worker): def __init__(self): super(RepositoryActionCountWorker, self).__init__() diff --git a/workers/securityworker/__init__.py b/workers/securityworker/__init__.py index 8c2bc44a7..79fceb6eb 100644 --- a/workers/securityworker/__init__.py +++ b/workers/securityworker/__init__.py @@ -1,14 +1,19 @@ import logging.config +from prometheus_client import Gauge + from app import app, prometheus from data.database import UseThenDisconnect from workers.securityworker.models_pre_oci import pre_oci_model as model from util.secscan.api import APIRequestFailure from util.secscan.analyzer import PreemptedException + logger = logging.getLogger(__name__) -unscanned_images_gauge = prometheus.create_gauge('unscanned_images', - 'Number of images that clair needs to scan.') + + +unscanned_images = Gauge('quay_security_scanning_unscanned_images_remaining', + 'number of images that are not scanned by the latest security scanner') def index_images(target_version, analyzer, token=None): @@ -31,6 +36,6 @@ def index_images(target_version, analyzer, token=None): logger.exception('Security scanner service unavailable') return - unscanned_images_gauge.Set(num_remaining) + unscanned_images.set(num_remaining) return next_token diff --git a/workers/securityworker/securityworker.py b/workers/securityworker/securityworker.py index 100308acf..2003cbf30 100644 --- a/workers/securityworker/securityworker.py +++ b/workers/securityworker/securityworker.py @@ -11,8 +11,10 @@ from util.secscan.analyzer import LayerAnalyzer from util.log import logfile_path from endpoints.v2 import v2_bp + logger = logging.getLogger(__name__) + DEFAULT_INDEXING_INTERVAL = 30 diff --git a/workers/servicekeyworker/servicekeyworker.py b/workers/servicekeyworker/servicekeyworker.py index d7eaecfa1..83d58a629 100644 --- a/workers/servicekeyworker/servicekeyworker.py +++ b/workers/servicekeyworker/servicekeyworker.py @@ -1,13 +1,21 @@ import logging from datetime import datetime, timedelta -from app import app, instance_keys, metric_queue +from prometheus_client import Counter + +from app import app, instance_keys from workers.servicekeyworker.models_pre_oci import pre_oci_model as model from workers.worker import Worker + logger = logging.getLogger(__name__) +instance_key_renewal_self = Counter('quay_instance_key_renewal_self_total', + 'number of times a Quay instance renews its own key', + labelnames=['success']) + + class ServiceKeyWorker(Worker): def __init__(self): super(ServiceKeyWorker, self).__init__() @@ -28,12 +36,12 @@ class ServiceKeyWorker(Worker): except Exception as ex: logger.exception('Failure for automatic refresh of service key %s with new expiration %s', instance_keys.local_key_id, new_expiration) - metric_queue.instance_key_renewal_failure.Inc(labelvalues=[instance_keys.local_key_id]) + instance_key_renewal_self.labels(False).inc() raise ex logger.debug('Finished automatic refresh of service key %s with new expiration %s', instance_keys.local_key_id, new_expiration) - metric_queue.instance_key_renewal_success.Inc(labelvalues=[instance_keys.local_key_id]) + instance_key_renewal_self.labels(True).inc() if __name__ == "__main__": diff --git a/workers/storagereplication.py b/workers/storagereplication.py index 005bd2ee8..96e802e3c 100644 --- a/workers/storagereplication.py +++ b/workers/storagereplication.py @@ -9,11 +9,14 @@ from data import model from workers.queueworker import QueueWorker, WorkerUnhealthyException, JobException from util.log import logfile_path + logger = logging.getLogger(__name__) + POLL_PERIOD_SECONDS = 10 RESERVATION_SECONDS = app.config.get('STORAGE_REPLICATION_PROCESSING_SECONDS', 60*20) + class StorageReplicationWorker(QueueWorker): def process_queue_item(self, job_details): storage_uuid = job_details['storage_id'] diff --git a/workers/tagbackfillworker.py b/workers/tagbackfillworker.py index a9a9ef263..d9e3e1805 100644 --- a/workers/tagbackfillworker.py +++ b/workers/tagbackfillworker.py @@ -26,8 +26,10 @@ from util.bytes import Bytes from util.log import logfile_path from util.migrate.allocator import yield_random_entries + logger = logging.getLogger(__name__) + WORKER_TIMEOUT = app.config.get('BACKFILL_TAGS_TIMEOUT', 6000) diff --git a/workers/teamsyncworker/teamsyncworker.py b/workers/teamsyncworker/teamsyncworker.py index a69e36235..a56999de3 100644 --- a/workers/teamsyncworker/teamsyncworker.py +++ b/workers/teamsyncworker/teamsyncworker.py @@ -9,11 +9,14 @@ from workers.worker import Worker from util.timedeltastring import convert_to_timedelta from util.log import logfile_path + logger = logging.getLogger(__name__) + WORKER_FREQUENCY = app.config.get('TEAM_SYNC_WORKER_FREQUENCY', 60) STALE_CUTOFF = convert_to_timedelta(app.config.get('TEAM_RESYNC_STALE_TIME', '30m')) + class TeamSynchronizationWorker(Worker): """ Worker which synchronizes teams with their backing groups in LDAP/Keystone/etc. """ diff --git a/workers/test/test_exportactionlogsworker.py b/workers/test/test_exportactionlogsworker.py index 0e4a728b4..8d342dd78 100644 --- a/workers/test/test_exportactionlogsworker.py +++ b/workers/test/test_exportactionlogsworker.py @@ -16,12 +16,13 @@ from workers.exportactionlogsworker import ExportActionLogsWorker, POLL_PERIOD_S from test.fixtures import * + _TEST_CONTENT = os.urandom(1024) _TEST_BUCKET = 'some_bucket' _TEST_USER = 'someuser' _TEST_PASSWORD = 'somepassword' _TEST_PATH = 'some/cool/path' -_TEST_CONTEXT = StorageContext('nyc', None, None, None, None) +_TEST_CONTEXT = StorageContext('nyc', None, None, None) @pytest.fixture(params=['test', 'mock_s3']) diff --git a/workers/test/test_logrotateworker.py b/workers/test/test_logrotateworker.py index aba8290cd..740b9c19a 100644 --- a/workers/test/test_logrotateworker.py +++ b/workers/test/test_logrotateworker.py @@ -1,4 +1,5 @@ import os.path + from datetime import datetime, timedelta from app import storage