This commit is contained in:
Jimmy Zelinskie 2019-11-21 21:25:03 +00:00 committed by GitHub
commit a6749dcf51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
126 changed files with 663 additions and 1916 deletions

View file

@ -1,7 +1,9 @@
FROM centos:7
LABEL maintainer "thomasmckay@redhat.com"
ENV PYTHON_VERSION=2.7 \
ENV OS=linux \
ARCH=amd64 \
PYTHON_VERSION=2.7 \
PATH=$HOME/.local/bin/:$PATH \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
@ -76,13 +78,18 @@ RUN curl --silent --location https://rpm.nodesource.com/setup_8.x | bash - && \
# TODO: Build jwtproxy in dist-git
# https://jira.coreos.com/browse/QUAY-1315
RUN curl -fsSL -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.3/jwtproxy-linux-x64 && \
ENV JWTPROXY_VERSION=0.0.3
RUN curl -fsSL -o /usr/local/bin/jwtproxy "https://github.com/coreos/jwtproxy/releases/download/v${JWTPROXY_VERSION}/jwtproxy-${OS}-${ARCH}" && \
chmod +x /usr/local/bin/jwtproxy
# TODO: Build prometheus-aggregator in dist-git
# TODO: Build pushgateway in dist-git
# https://jira.coreos.com/browse/QUAY-1324
RUN curl -fsSL -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator &&\
chmod +x /usr/local/bin/prometheus-aggregator
ENV PUSHGATEWAY_VERSION=1.0.0
RUN curl -fsSL "https://github.com/prometheus/pushgateway/releases/download/v${PUSHGATEWAY_VERSION}/pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}.tar.gz" | \
tar xz "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}/pushgateway" && \
mv "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}/pushgateway" /usr/local/bin/pushgateway && \
rm -rf "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}" && \
chmod +x /usr/local/bin/pushgateway
# Update local copy of AWS IP Ranges.
RUN curl -fsSL https://ip-ranges.amazonaws.com/ip-ranges.json -o util/ipresolver/aws-ip-ranges.json
@ -105,7 +112,7 @@ RUN UNINSTALL_PKGS="\
yum clean all && \
rm -rf /var/cache/yum /tmp/* /var/tmp/* /root/.cache
EXPOSE 8080 8443 7443
EXPOSE 8080 8443 7443 9091
RUN chgrp -R 0 $QUAYDIR && \
chmod -R g=u $QUAYDIR

View file

@ -1,7 +1,9 @@
FROM registry.redhat.io/rhel7:7.7
LABEL maintainer "thomasmckay@redhat.com"
ENV PYTHON_VERSION=2.7 \
ENV OS=linux \
ARCH=amd64 \
PYTHON_VERSION=2.7 \
PATH=$HOME/.local/bin/:$PATH \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
@ -81,13 +83,18 @@ RUN curl --silent --location https://rpm.nodesource.com/setup_8.x | bash - && \
# TODO: Build jwtproxy in dist-git
# https://jira.coreos.com/browse/QUAY-1315
RUN curl -fsSL -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.3/jwtproxy-linux-x64 && \
ENV JWTPROXY_VERSION=0.0.3
RUN curl -fsSL -o /usr/local/bin/jwtproxy "https://github.com/coreos/jwtproxy/releases/download/v${JWTPROXY_VERSION}/jwtproxy-${OS}-${ARCH}" && \
chmod +x /usr/local/bin/jwtproxy
# TODO: Build prometheus-aggregator in dist-git
# TODO: Build pushgateway in dist-git
# https://jira.coreos.com/browse/QUAY-1324
RUN curl -fsSL -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator &&\
chmod +x /usr/local/bin/prometheus-aggregator
ENV PUSHGATEWAY_VERSION=1.0.0
RUN curl -fsSL "https://github.com/prometheus/pushgateway/releases/download/v${PUSHGATEWAY_VERSION}/pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}.tar.gz" | \
tar xz "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}/pushgateway" && \
mv "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}/pushgateway" /usr/local/bin/pushgateway && \
rm -rf "pushgateway-${PUSHGATEWAY_VERSION}.${OS}-${ARCH}" && \
chmod +x /usr/local/bin/pushgateway
# Update local copy of AWS IP Ranges.
RUN curl -fsSL https://ip-ranges.amazonaws.com/ip-ranges.json -o util/ipresolver/aws-ip-ranges.json
@ -110,7 +117,7 @@ RUN UNINSTALL_PKGS="\
yum clean all && \
rm -rf /var/cache/yum /tmp/* /var/tmp/* /root/.cache
EXPOSE 8080 8443 7443
EXPOSE 8080 8443 7443 9091
RUN chgrp -R 0 $QUAYDIR && \
chmod -R g=u $QUAYDIR

36
app.py
View file

@ -19,7 +19,6 @@ import features
from _init import (config_provider, CONF_DIR, IS_KUBERNETES, IS_TESTING, OVERRIDE_CONFIG_DIRECTORY,
IS_BUILDING)
from auth.auth_context import get_authenticated_user
from avatars.avatars import Avatar
from buildman.manager.buildcanceller import BuildCanceller
from data import database
@ -30,7 +29,7 @@ from data.billing import Billing
from data.buildlogs import BuildLogs
from data.cache import get_model_cache
from data.model.user import LoginWrappedDBUser
from data.queue import WorkQueue, BuildMetricQueueReporter
from data.queue import WorkQueue
from data.userevent import UserEventsBuilderModule
from data.userfiles import Userfiles
from data.users import UserAuthentication
@ -52,14 +51,13 @@ from util.names import urn_generator
from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager
from util.label_validator import LabelValidator
from util.metrics.metricqueue import MetricQueue
from util.metrics.prometheus import PrometheusPlugin
from util.saas.cloudwatch import start_cloudwatch_sender
from util.secscan.api import SecurityScannerAPI
from util.repomirror.api import RepoMirrorAPI
from util.tufmetadata.api import TUFMetadataAPI
from util.security.instancekeys import InstanceKeys
from util.security.signing import Signer
from util.greenlet_tracing import enable_tracing
OVERRIDE_CONFIG_YAML_FILENAME = os.path.join(CONF_DIR, 'stack/config.yaml')
@ -205,6 +203,9 @@ def _request_end(resp):
return resp
if app.config.get('GREENLET_TRACING', True):
enable_tracing()
root_logger = logging.getLogger()
@ -224,11 +225,10 @@ avatar = Avatar(app)
login_manager = LoginManager(app)
mail = Mail(app)
prometheus = PrometheusPlugin(app)
metric_queue = MetricQueue(prometheus)
chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf, metric_queue=metric_queue)
chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf)
instance_keys = InstanceKeys(app)
ip_resolver = IPResolver(app)
storage = Storage(app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver)
storage = Storage(app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver)
userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage)
analytics = Analytics(app)
@ -244,8 +244,6 @@ instance_keys = InstanceKeys(app)
label_validator = LabelValidator(app)
build_canceller = BuildCanceller(app)
start_cloudwatch_sender(metric_queue, app)
github_trigger = GithubOAuthService(app.config, 'GITHUB_TRIGGER_CONFIG')
gitlab_trigger = GitLabOAuthService(app.config, 'GITLAB_TRIGGER_CONFIG')
@ -253,29 +251,24 @@ oauth_login = OAuthLoginManager(app.config)
oauth_apps = [github_trigger, gitlab_trigger]
image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf,
has_namespace=False, metric_queue=metric_queue)
has_namespace=False)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
metric_queue=metric_queue,
reporter=BuildMetricQueueReporter(metric_queue),
has_namespace=True)
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf, has_namespace=True,
metric_queue=metric_queue)
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf, has_namespace=True)
secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf,
has_namespace=False,
metric_queue=metric_queue)
has_namespace=False)
export_action_logs_queue = WorkQueue(app.config['EXPORT_ACTION_LOGS_QUEUE_NAME'], tf,
has_namespace=True,
metric_queue=metric_queue)
has_namespace=True)
# Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied
# when a namespace is marked for deletion.
namespace_gc_queue = WorkQueue(app.config['NAMESPACE_GC_QUEUE_NAME'], tf, has_namespace=False,
metric_queue=metric_queue)
namespace_gc_queue = WorkQueue(app.config['NAMESPACE_GC_QUEUE_NAME'], tf, has_namespace=False)
all_queues = [image_replication_queue, dockerfile_build_queue, notification_queue,
secscan_notification_queue, chunk_cleanup_queue, namespace_gc_queue]
url_scheme_and_hostname = URLSchemeAndHostname(app.config['PREFERRED_URL_SCHEME'], app.config['SERVER_HOSTNAME'])
url_scheme_and_hostname = URLSchemeAndHostname(app.config['PREFERRED_URL_SCHEME'],
app.config['SERVER_HOSTNAME'])
secscan_api = SecurityScannerAPI(app.config, storage, app.config['SERVER_HOSTNAME'], app.config['HTTPCLIENT'],
uri_creator=get_blob_download_uri_getter(app.test_request_context('/'), url_scheme_and_hostname),
instance_keys=instance_keys)
@ -296,6 +289,7 @@ else:
if app.config.get('DATABASE_SECRET_KEY') is None and app.config.get('SETUP_COMPLETE', False):
raise Exception('Missing DATABASE_SECRET_KEY in config; did you perhaps forget to add it?')
database.configure(app.config)
model.config.app_config = app.config

View file

@ -2,8 +2,8 @@ import logging
from functools import wraps
from flask import request, session
from prometheus_client import Counter
from app import metric_queue
from auth.basic import validate_basic_auth
from auth.oauth import validate_bearer_auth
from auth.cookie import validate_session_cookie
@ -14,6 +14,12 @@ from util.http import abort
logger = logging.getLogger(__name__)
authentication_count = Counter('quay_authentication_attempts_total',
'number of authentication attempts accross the registry and API',
labelnames=['auth_kind', 'success'])
def _auth_decorator(pass_result=False, handlers=None):
""" Builds an auth decorator that runs the given handlers and, if any return successfully,
sets up the auth context. The wrapped function will be invoked *regardless of success or
@ -39,13 +45,13 @@ def _auth_decorator(pass_result=False, handlers=None):
result.apply_to_context()
# Log the metric.
metric_queue.authentication_count.Inc(labelvalues=[result.kind, True])
authentication_count.labels(result.kind, True).inc()
break
# Otherwise, report the error.
if result.error_message is not None:
# Log the failure.
metric_queue.authentication_count.Inc(labelvalues=[result.kind, False])
authentication_count.labels(result.kind, False).inc()
break
if pass_result:
@ -72,10 +78,10 @@ def require_session_login(func):
result = validate_session_cookie()
if result.has_nonrobot_user:
result.apply_to_context()
metric_queue.authentication_count.Inc(labelvalues=[result.kind, True])
authentication_count.labels(result.kind, True).inc()
return func(*args, **kwargs)
elif not result.missing:
metric_queue.authentication_count.Inc(labelvalues=[result.kind, False])
authentication_count.labels(result.kind, False).inc()
abort(401, message='Method requires login and no valid login could be loaded.')
return wrapper

View file

@ -6,7 +6,7 @@ from jsonschema import validate, ValidationError
from flask import request, url_for
from flask_principal import identity_changed, Identity
from app import app, get_app_url, instance_keys, metric_queue
from app import app, get_app_url, instance_keys
from auth.auth_context import set_authenticated_context
from auth.auth_context_type import SignedAuthContext
from auth.permissions import repository_read_grant, repository_write_grant, repository_admin_grant
@ -89,8 +89,7 @@ def identity_from_bearer_token(bearer_header):
logger.debug('Validating auth header: %s', bearer_header)
try:
payload = decode_bearer_header(bearer_header, instance_keys, app.config,
metric_queue=metric_queue)
payload = decode_bearer_header(bearer_header, instance_keys, app.config)
except InvalidBearerTokenException as bte:
logger.exception('Invalid bearer token: %s', bte)
raise InvalidJWTException(bte)

View file

@ -9,6 +9,9 @@ class AuthKind(Enum):
signed_grant = 'signed_grant'
credentials = 'credentials'
def __str__(self):
return '%s' % self.value
class ValidateResult(object):
""" A result of validating auth in one form or another. """

View file

@ -8,9 +8,9 @@ from collections import namedtuple
from datetime import datetime, timedelta
from six import iteritems
from prometheus_client import Counter, Histogram
from trollius import From, coroutine, Return, async, sleep
from app import metric_queue
from buildman.orchestrator import (orchestrator_from_config, KeyEvent,
OrchestratorError, OrchestratorConnectionError,
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
@ -26,6 +26,17 @@ from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
build_fallback = Counter('quay_build_fallback_total',
'number of times a build has been retried',
labelnames=['executor'])
build_ack_duration = Histogram('quay_build_ack_duration_seconds',
'seconds taken for the builder to acknowledge a queued build',
labelnames=['executor'])
build_duration = Histogram('quay_build_duration_seconds',
"seconds taken for a build's execution",
labelnames=['executor', 'job_status'])
JOB_PREFIX = 'building/'
LOCK_PREFIX = 'lock/'
REALM_PREFIX = 'realm/'
@ -428,7 +439,7 @@ class EphemeralBuilderManager(BaseManager):
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
metric_queue.builder_fallback.Inc()
build_fallback.labels(executor.name).inc()
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
build_uuid, executor.name, executor.minimum_retry_threshold,
build_job.retries_remaining)
@ -440,28 +451,9 @@ class EphemeralBuilderManager(BaseManager):
try:
execution_id = yield From(executor.start_builder(realm, token, build_uuid))
except:
try:
metric_queue.build_start_failure.Inc(labelvalues=[executor.name])
metric_queue.put_deprecated(('ExecutorFailure-%s' % executor.name), 1, unit='Count')
except:
logger.exception('Exception when writing failure metric for execution %s for job %s',
execution_id, build_uuid)
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
try:
metric_queue.build_start_success.Inc(labelvalues=[executor.name])
except:
logger.exception('Exception when writing success metric for execution %s for job %s',
execution_id, build_uuid)
try:
metric_queue.ephemeral_build_workers.Inc()
except:
logger.exception('Exception when writing start metrics for execution %s for job %s',
execution_id, build_uuid)
started_with_executor = executor
# Break out of the loop now that we've started a builder successfully.
@ -542,8 +534,7 @@ class EphemeralBuilderManager(BaseManager):
job.build_uuid, build_component.builder_realm)
yield From(build_component.start_build(job))
yield From(self._write_duration_metric(metric_queue.builder_time_to_build,
build_component.builder_realm))
yield From(self._write_duration_metric(build_ack_duration, build_component.builder_realm))
# Clean up the bookkeeping for allowing any manager to take the job.
try:
@ -560,7 +551,8 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Calling job_completed for job %s with status: %s',
build_job.build_uuid, job_status)
yield From(self._write_duration_metric(metric_queue.build_time, build_component.builder_realm))
yield From(self._write_duration_metric(build_duration, build_component.builder_realm,
job_status=job_status))
# Mark the job as completed. Since this is being invoked from the component, we don't need
# to ask for the phase to be updated as well.
@ -660,18 +652,16 @@ class EphemeralBuilderManager(BaseManager):
yield From(sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION))
@coroutine
def _write_duration_metric(self, metric, realm):
"""
:returns: True if the metric was written, otherwise False
:rtype: bool
"""
def _write_duration_metric(self, metric, realm, job_status=None):
try:
metric_data = yield From(self._orchestrator.get_key(self._metric_key(realm)))
parsed_metric_data = json.loads(metric_data)
start_time = parsed_metric_data['start_time']
metric.Observe(time.time() - start_time,
labelvalues=[parsed_metric_data.get('executor_name',
'unknown')])
executor = parsed_metric_data.get('executor_name', 'unknown')
if job_status:
metric.labels(executor, job_status).observe(time.time() - start_time)
else:
metric.labels(executor).observe(time.time() - start_time)
except Exception:
logger.exception("Could not write metric for realm %s", realm)

View file

@ -5,25 +5,25 @@ import os
import socket
import subprocess
import threading
import time
import uuid
from functools import partial
from functools import partial, wraps
import boto.ec2
import cachetools.func
import requests
import trollius
from container_cloud_config import CloudConfigContext
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
from trollius import coroutine, sleep, From, Return, get_event_loop
from prometheus_client import Histogram
import release
from buildman.asyncutil import AsyncWrapper
from app import metric_queue, app
from util.metrics.metricqueue import duration_collector_async
from _init import ROOT_DIR
from app import app
from buildman.asyncutil import AsyncWrapper
logger = logging.getLogger(__name__)
@ -38,6 +38,28 @@ ENV = Environment(loader=FileSystemLoader(os.path.join(ROOT_DIR, "buildman/templ
TEMPLATE = ENV.get_template('cloudconfig.yaml')
CloudConfigContext().populate_jinja_environment(ENV)
build_start_duration = Histogram('quay_build_start_duration_seconds',
'seconds taken for a executor to start executing a queued build',
labelnames=['executor'],
buckets=[.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0])
def async_observe(metric, *labels):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
trigger_time = time.time()
try:
rv = func(*args, **kwargs)
except Return as e:
metric.labels(*labels).observe(time.time() - trigger_time)
raise e
return rv
return wrapper
return decorator
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
@ -160,7 +182,7 @@ class EC2Executor(BuilderExecutor):
return stack_amis[ec2_region]
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['ec2'])
@async_observe(build_start_duration, 'ec2')
def start_builder(self, realm, token, build_uuid):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
@ -204,7 +226,6 @@ class EC2Executor(BuilderExecutor):
))
except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance')
metric_queue.ephemeral_build_worker_failure.Inc()
raise ec2e
if not reservation.instances:
@ -215,7 +236,7 @@ class EC2Executor(BuilderExecutor):
launched = AsyncWrapper(reservation.instances[0])
# Sleep a few seconds to wait for AWS to spawn the instance.
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
yield From(sleep(_TAG_RETRY_SLEEP))
# Tag the instance with its metadata.
for i in range(0, _TAG_RETRY_COUNT):
@ -231,7 +252,7 @@ class EC2Executor(BuilderExecutor):
if i < _TAG_RETRY_COUNT - 1:
logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)',
launched.id, build_uuid, i)
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
yield From(sleep(_TAG_RETRY_SLEEP))
continue
raise ExecutorException('Unable to find builder instance.')
@ -269,7 +290,7 @@ class PopenExecutor(BuilderExecutor):
""" Executor which uses Popen to fork a quay-builder process.
"""
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['fork'])
@async_observe(build_start_duration, 'fork')
def start_builder(self, realm, token, build_uuid):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
@ -491,7 +512,7 @@ class KubernetesExecutor(BuilderExecutor):
return job_resource
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['k8s'])
@async_observe(build_start_duration, 'k8s')
def start_builder(self, realm, token, build_uuid):
# generate resource
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')

View file

@ -1,21 +1,23 @@
import logging
import json
from datetime import timedelta
from threading import Event
import trollius
from threading import Event
from datetime import timedelta
from trollius.coroutines import From
from aiowsgi import create_server as create_wsgi_server
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory
from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from trollius.coroutines import From
from app import app
from buildman.enums import BuildJobResult, BuildServerStatus, RESULT_PHASES
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from buildman.jobutil.buildstatus import StatusHandler
from data import database, model
from app import app, metric_queue
logger = logging.getLogger(__name__)
@ -67,8 +69,7 @@ class BuilderServer(object):
@controller_app.route('/status')
def status():
metrics = server._queue.get_metrics()
(running_count, available_not_running_count, available_count) = metrics
(running_count, _available_not_running_count, available_count) = server._queue.get_metrics()
workers = [component for component in server._current_components
if component.kind() == 'builder']
@ -167,8 +168,6 @@ class BuilderServer(object):
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
self._shutdown_event.set()
_report_completion_status(build_job, job_status, executor_name)
@trollius.coroutine
def _work_checker(self):
logger.debug('Initializing work checker')
@ -249,18 +248,3 @@ class BuilderServer(object):
# Initialize the work queue checker.
yield From(self._work_checker())
def _report_completion_status(build_job, status, executor_name):
metric_queue.build_counter.Inc(labelvalues=[status])
metric_queue.repository_build_completed.Inc(labelvalues=[build_job.namespace, build_job.repo_name,
status, executor_name or 'executor'])
if status == BuildJobResult.COMPLETE:
status_name = 'CompleteBuilds'
elif status == BuildJobResult.ERROR:
status_name = 'FailedBuilds'
elif status == BuildJobResult.INCOMPLETE:
status_name = 'IncompletedBuilds'
else:
return
metric_queue.put_deprecated(status_name, 1, unit='Count')

View file

@ -6,7 +6,6 @@ from mock import Mock, ANY
from six import iteritems
from trollius import coroutine, get_event_loop, From, Future, Return
from app import metric_queue
from buildman.asyncutil import AsyncWrapper
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.ephemeral import (EphemeralBuilderManager, REALM_PREFIX,
@ -15,7 +14,6 @@ from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.orchestrator import KeyEvent, KeyChange
from buildman.server import BuildJobResult
from util import slash_join
from util.metrics.metricqueue import duration_collector_async
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
@ -36,7 +34,6 @@ class TestExecutor(BuilderExecutor):
job_stopped = None
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
def start_builder(self, realm, token, build_uuid):
self.job_started = str(uuid.uuid4())
raise Return(self.job_started)
@ -48,7 +45,6 @@ class TestExecutor(BuilderExecutor):
class BadExecutor(BuilderExecutor):
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
def start_builder(self, realm, token, build_uuid):
raise ExecutorException('raised on purpose!')

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t blobuploadcleanupworker

View file

@ -1,10 +0,0 @@
#! /bin/bash
echo 'Starting Blob upload cleanup worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.blobuploadcleanupworker.blobuploadcleanupworker 2>&1
echo 'Blob upload cleanup exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t buildlogsarchiver

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting build logs archiver worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.buildlogsarchiver.buildlogsarchiver 2>&1
echo 'Diffs worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t buildmanager

View file

@ -1,11 +0,0 @@
#! /bin/bash
echo 'Starting internal build manager'
# Run the build manager.
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
export PYTHONPATH=$QUAYPATH
exec venv/bin/python -m buildman.builder 2>&1
echo 'Internal build manager exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t chunkcleanupworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting chunk cleanup worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.chunkcleanupworker 2>&1
echo 'Chunk cleanup worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t expiredappspecifictokenworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting Expired app specific token GC worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.expiredappspecifictokenworker 2>&1
echo 'Expired app specific token GC exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t exportactionlogsworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting Export Actions Log worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.exportactionlogsworker 2>&1
echo 'Export Actions Log worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t gcworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting GC worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.gc.gcworker 2>&1
echo 'Repository GC exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t globalpromstats

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting global prometheus stats worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.globalpromstats.globalpromstats
echo 'Global prometheus stats exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t labelbackfillworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting label backfill worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.labelbackfillworker 2>&1
echo 'Repository label backfill exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t logrotateworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting log rotation worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.logrotateworker
echo 'Log rotation worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t namespacegcworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting Namespace GC worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.namespacegcworker 2>&1
echo 'Namespace GC exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t notificationworker

View file

@ -1,10 +0,0 @@
#! /bin/bash
echo 'Starting notification worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.notificationworker.notificationworker
echo 'Notification worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t queuecleanupworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting Queue cleanup worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.queuecleanupworker 2>&1
echo 'Repository Queue cleanup exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t repositoryactioncounter

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting repository action count worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.repositoryactioncounter 2>&1
echo 'Repository action worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t security_notification_worker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting security scanner notification worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.security_notification_worker 2>&1
echo 'Security scanner notification worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t securityworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting security scanner worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.securityworker.securityworker 2>&1
echo 'Security scanner worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t storagereplication

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting storage replication worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.storagereplication 2>&1
echo 'Repository storage replication exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t tagbackfillworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting tag backfill worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.tagbackfillworker 2>&1
echo 'Repository tag backfill exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t teamsyncworker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting team synchronization worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.teamsyncworker.teamsyncworker 2>&1
echo 'Team synchronization worker exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t dnsmasq

View file

@ -1,7 +0,0 @@
#! /bin/bash
echo 'Starting dnsmasq'
/usr/sbin/dnsmasq --no-daemon --user=root --listen-address=127.0.0.1
echo 'dnsmasq'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t gunicorn_registry

View file

@ -1,12 +0,0 @@
#! /bin/bash
echo 'Starting gunicon'
QUAYPATH=${QUAYPATH:-"."}
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
DB_CONNECTION_POOLING=${DB_CONNECTION_POOLING:-"true"}
cd ${QUAYDIR:-"/"}
DB_CONNECTION_POOLING=$DB_CONNECTION_POOLING PYTHONPATH=$QUAYPATH nice -n 10 venv/bin/gunicorn -c $QUAYCONF/gunicorn_registry.py registry:application
echo 'Gunicorn exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t gunicorn_secscan

View file

@ -1,11 +0,0 @@
#! /bin/bash
echo 'Starting gunicon'
QUAYPATH=${QUAYPATH:-"."}
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/gunicorn -c $QUAYCONF/gunicorn_secscan.py secscan:application
echo 'Gunicorn exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t gunicorn_verbs

View file

@ -1,11 +0,0 @@
#! /bin/bash
echo 'Starting gunicon'
QUAYPATH=${QUAYPATH:-"."}
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH nice -n 10 venv/bin/gunicorn -c $QUAYCONF/gunicorn_verbs.py verbs:application
echo 'Gunicorn exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t gunicorn_web

View file

@ -1,11 +0,0 @@
#! /bin/bash
echo 'Starting gunicon'
QUAYPATH=${QUAYPATH:-"."}
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/gunicorn -c $QUAYCONF/gunicorn_web.py web:application
echo 'Gunicorn exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t jwtproxy

View file

@ -1,16 +0,0 @@
#! /bin/bash
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
if [ -f $QUAYCONF/jwtproxy_conf.yaml ];
then
echo 'Starting jwtproxy'
/usr/local/bin/jwtproxy --config $QUAYCONF/jwtproxy_conf.yaml
rm /tmp/jwtproxy_secscan.sock
echo 'Jwtproxy exited'
else
sleep 1
fi

View file

@ -1,7 +0,0 @@
#!/bin/sh
# Ensure dependencies start before the logger
sv check syslog-ng > /dev/null || exit 1
# Start the logger
exec logger -i -t memcached

View file

@ -1,12 +0,0 @@
#! /bin/bash
echo 'Starting memcached'
if [ "$DEBUGLOG" == "true" ]
then
memcached -u memcached -m 64 -vv -l 127.0.0.1 -p 18080
else
memcached -u memcached -m 64 -l 127.0.0.1 -p 18080
fi
echo 'memcached exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t nginx

View file

@ -1,12 +0,0 @@
#! /bin/bash
echo 'Starting nginx'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH
QUAYCONF=${QUAYCONF:-"$QUAYPATH/conf"}
/usr/sbin/nginx -c $QUAYCONF/nginx/nginx.conf
echo 'Nginx exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t prometheus-aggregator

View file

@ -1,7 +0,0 @@
#! /bin/bash
echo 'Starting prometheus aggregator'
/usr/local/bin/prometheus-aggregator
echo 'Prometheus aggregator exited'

View file

@ -1,4 +0,0 @@
#!/bin/sh
# Start the logger
exec logger -i -t service_key_worker

View file

@ -1,9 +0,0 @@
#! /bin/bash
echo 'Starting service key worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.servicekeyworker.servicekeyworker 2>&1
echo 'Service key worker exited'

View file

@ -97,7 +97,7 @@ def default_services():
"nginx": {
"autostart": "true"
},
"prometheus-aggregator": {
"pushgateway": {
"autostart": "true"
},
"servicekey": {

View file

@ -1,778 +1,53 @@
import os
import pytest
import json
import yaml
import jinja2
from contextlib import contextmanager
import os
import tempfile
from six import iteritems
from supervisor.options import ServerOptions
import jinja2
import pytest
from ..supervisord_conf_create import (default_services, limit_services, override_services,
QUAY_SERVICES, QUAY_OVERRIDE_SERVICES)
@contextmanager
def environ(**kwargs):
original_env = {key: os.getenv(key) for key in kwargs}
os.environ.update(**kwargs)
try:
yield
finally:
for key, value in iteritems(original_env):
if value is None:
del os.environ[key]
else:
os.environ[key] = value
from ..supervisord_conf_create import QUAYCONF_DIR, default_services, limit_services
def render_supervisord_conf(config):
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../supervisord.conf.jnj")) as f:
template = jinja2.Template(f.read())
return template.render(config=config)
def test_supervisord_conf_create_defaults():
def test_supervisord_conf_validates():
config = default_services()
limit_services(config, [])
rendered = render_supervisord_conf(config)
limit_services(config, QUAY_SERVICES)
override_services(config, QUAY_OVERRIDE_SERVICES)
rendered_config_file = render_supervisord_conf(config)
print rendered_config_file
expected = """[supervisord]
nodaemon=true
with environ(QUAYPATH='.', QUAYDIR='/', QUAYCONF='/conf', DB_CONNECTION_POOLING_REGISTRY='true'):
opts = ServerOptions()
[unix_http_server]
file=%(ENV_QUAYCONF)s/supervisord.sock
user=root
with tempfile.NamedTemporaryFile() as f:
f.write(rendered_config_file)
f.flush()
[supervisorctl]
serverurl=unix:///%(ENV_QUAYCONF)s/supervisord.sock
opts.searchpaths = [f.name]
assert opts.default_configfile() == f.name
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[eventlistener:stdout]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command = supervisor_stdout
buffer_size = 1024
events = PROCESS_LOG
result_handler = supervisor_stdout:event_handler
;;; Run batch scripts
[program:blobuploadcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.blobuploadcleanupworker.blobuploadcleanupworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:buildlogsarchiver]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.buildlogsarchiver.buildlogsarchiver
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:builder]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m buildman.builder
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:chunkcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.chunkcleanupworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:expiredappspecifictokenworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.expiredappspecifictokenworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:exportactionlogsworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.exportactionlogsworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.gc.gcworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:globalpromstats]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.globalpromstats.globalpromstats
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:labelbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.labelbackfillworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:logrotateworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.logrotateworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:namespacegcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.namespacegcworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:notificationworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.notificationworker.notificationworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:queuecleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.queuecleanupworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:repositoryactioncounter]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repositoryactioncounter
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:security_notification_worker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.security_notification_worker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:securityworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.securityworker.securityworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:storagereplication]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.storagereplication
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:tagbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.tagbackfillworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:teamsyncworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.teamsyncworker.teamsyncworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
;;; Run interactive scripts
[program:dnsmasq]
command=/usr/sbin/dnsmasq --no-daemon --user=root --listen-address=127.0.0.1 --port=8053
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-registry]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s,
DB_CONNECTION_POOLING=%(ENV_DB_CONNECTION_POOLING_REGISTRY)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_registry.py registry:application
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-secscan]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_secscan.py secscan:application
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-verbs]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_verbs.py verbs:application
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-web]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_web.py web:application
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:jwtproxy]
command=/usr/local/bin/jwtproxy --config %(ENV_QUAYCONF)s/jwtproxy_conf.yaml
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:memcache]
command=memcached -u memcached -m 64 -l 127.0.0.1 -p 18080
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:nginx]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=nginx -c %(ENV_QUAYCONF)s/nginx/nginx.conf
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:prometheus-aggregator]
command=/usr/local/bin/prometheus-aggregator
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:servicekey]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.servicekeyworker.servicekeyworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:repomirrorworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repomirrorworker.repomirrorworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
# EOF NO NEWLINE"""
assert rendered == expected
def test_supervisord_conf_create_all_overrides():
config = default_services()
limit_services(config, "servicekey,prometheus-aggregator")
rendered = render_supervisord_conf(config)
expected = """[supervisord]
nodaemon=true
[unix_http_server]
file=%(ENV_QUAYCONF)s/supervisord.sock
user=root
[supervisorctl]
serverurl=unix:///%(ENV_QUAYCONF)s/supervisord.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[eventlistener:stdout]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command = supervisor_stdout
buffer_size = 1024
events = PROCESS_LOG
result_handler = supervisor_stdout:event_handler
;;; Run batch scripts
[program:blobuploadcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.blobuploadcleanupworker.blobuploadcleanupworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:buildlogsarchiver]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.buildlogsarchiver.buildlogsarchiver
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:builder]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m buildman.builder
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:chunkcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.chunkcleanupworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:expiredappspecifictokenworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.expiredappspecifictokenworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:exportactionlogsworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.exportactionlogsworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.gc.gcworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:globalpromstats]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.globalpromstats.globalpromstats
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:labelbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.labelbackfillworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:logrotateworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.logrotateworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:namespacegcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.namespacegcworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:notificationworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.notificationworker.notificationworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:queuecleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.queuecleanupworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:repositoryactioncounter]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repositoryactioncounter
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:security_notification_worker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.security_notification_worker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:securityworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.securityworker.securityworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:storagereplication]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.storagereplication
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:tagbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.tagbackfillworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:teamsyncworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.teamsyncworker.teamsyncworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
;;; Run interactive scripts
[program:dnsmasq]
command=/usr/sbin/dnsmasq --no-daemon --user=root --listen-address=127.0.0.1 --port=8053
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-registry]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s,
DB_CONNECTION_POOLING=%(ENV_DB_CONNECTION_POOLING_REGISTRY)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_registry.py registry:application
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-secscan]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_secscan.py secscan:application
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-verbs]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_verbs.py verbs:application
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:gunicorn-web]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_web.py web:application
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:jwtproxy]
command=/usr/local/bin/jwtproxy --config %(ENV_QUAYCONF)s/jwtproxy_conf.yaml
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:memcache]
command=memcached -u memcached -m 64 -l 127.0.0.1 -p 18080
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:nginx]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=nginx -c %(ENV_QUAYCONF)s/nginx/nginx.conf
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:prometheus-aggregator]
command=/usr/local/bin/prometheus-aggregator
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:servicekey]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.servicekeyworker.servicekeyworker
autostart = true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:repomirrorworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repomirrorworker.repomirrorworker
autostart = false
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
# EOF NO NEWLINE"""
assert rendered == expected
opts.realize([])
opts.process_config()

View file

@ -23,7 +23,7 @@ result_handler = supervisor_stdout:event_handler
[program:blobuploadcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.blobuploadcleanupworker.blobuploadcleanupworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.blobuploadcleanupworker.blobuploadcleanupworker
autostart = {{ config['blobuploadcleanupworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -35,7 +35,7 @@ stderr_events_enabled = true
[program:buildlogsarchiver]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.buildlogsarchiver.buildlogsarchiver
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.buildlogsarchiver.buildlogsarchiver
autostart = {{ config['buildlogsarchiver']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -47,7 +47,7 @@ stderr_events_enabled = true
[program:builder]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m buildman.builder
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m buildman.builder
autostart = {{ config['builder']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -59,7 +59,7 @@ stderr_events_enabled = true
[program:chunkcleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.chunkcleanupworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.chunkcleanupworker
autostart = {{ config['chunkcleanupworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -71,7 +71,7 @@ stderr_events_enabled = true
[program:expiredappspecifictokenworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.expiredappspecifictokenworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.expiredappspecifictokenworker
autostart = {{ config['expiredappspecifictokenworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -95,7 +95,7 @@ stderr_events_enabled = true
[program:gcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.gc.gcworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.gc.gcworker
autostart = {{ config['gcworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -107,7 +107,7 @@ stderr_events_enabled = true
[program:globalpromstats]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.globalpromstats.globalpromstats
command=sh -c python python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.globalpromstats.globalpromstats
autostart = {{ config['globalpromstats']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -119,7 +119,7 @@ stderr_events_enabled = true
[program:labelbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.labelbackfillworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.labelbackfillworker
autostart = {{ config['labelbackfillworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -131,7 +131,7 @@ stderr_events_enabled = true
[program:logrotateworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.logrotateworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.logrotateworker
autostart = {{ config['logrotateworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -143,7 +143,7 @@ stderr_events_enabled = true
[program:namespacegcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.namespacegcworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.namespacegcworker
autostart = {{ config['namespacegcworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -155,7 +155,7 @@ stderr_events_enabled = true
[program:notificationworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.notificationworker.notificationworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.notificationworker.notificationworker
autostart = {{ config['notificationworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -167,7 +167,7 @@ stderr_events_enabled = true
[program:queuecleanupworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.queuecleanupworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.queuecleanupworker
autostart = {{ config['queuecleanupworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -179,7 +179,7 @@ stderr_events_enabled = true
[program:repositoryactioncounter]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repositoryactioncounter
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.repositoryactioncounter
autostart = {{ config['repositoryactioncounter']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -191,7 +191,7 @@ stderr_events_enabled = true
[program:security_notification_worker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.security_notification_worker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.security_notification_worker
autostart = {{ config['security_notification_worker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -203,7 +203,7 @@ stderr_events_enabled = true
[program:securityworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.securityworker.securityworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.securityworker.securityworker
autostart = {{ config['securityworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -215,7 +215,7 @@ stderr_events_enabled = true
[program:storagereplication]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.storagereplication
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.storagereplication
autostart = {{ config['storagereplication']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -227,7 +227,7 @@ stderr_events_enabled = true
[program:tagbackfillworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.tagbackfillworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.tagbackfillworker
autostart = {{ config['tagbackfillworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -239,7 +239,7 @@ stderr_events_enabled = true
[program:teamsyncworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.teamsyncworker.teamsyncworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.teamsyncworker.teamsyncworker
autostart = {{ config['teamsyncworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -263,7 +263,7 @@ stderr_events_enabled = true
environment=
PYTHONPATH=%(ENV_QUAYDIR)s,
DB_CONNECTION_POOLING=%(ENV_DB_CONNECTION_POOLING_REGISTRY)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_registry.py registry:application
command=sh -c python -m util/wait_for_endpoints.py http://localhost:9091/-/ready && nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_registry.py registry:application
autostart = {{ config['gunicorn-registry']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -275,7 +275,7 @@ stderr_events_enabled = true
[program:gunicorn-secscan]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_secscan.py secscan:application
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && gunicorn -c %(ENV_QUAYCONF)s/gunicorn_secscan.py secscan:application
autostart = {{ config['gunicorn-secscan']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -287,7 +287,7 @@ stderr_events_enabled = true
[program:gunicorn-verbs]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_verbs.py verbs:application
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && nice -n 10 gunicorn -c %(ENV_QUAYCONF)s/gunicorn_verbs.py verbs:application
autostart = {{ config['gunicorn-verbs']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -299,7 +299,7 @@ stderr_events_enabled = true
[program:gunicorn-web]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=gunicorn -c %(ENV_QUAYCONF)s/gunicorn_web.py web:application
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && gunicorn -c %(ENV_QUAYCONF)s/gunicorn_web.py web:application
autostart = {{ config['gunicorn-web']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -340,9 +340,9 @@ stderr_logfile_maxbytes=0
stdout_events_enabled = true
stderr_events_enabled = true
[program:prometheus-aggregator]
command=/usr/local/bin/prometheus-aggregator
autostart = {{ config['prometheus-aggregator']['autostart'] }}
[program:pushgateway]
command=/usr/local/bin/pushgateway
autostart = {{ config['pushgateway']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stdout
@ -353,7 +353,7 @@ stderr_events_enabled = true
[program:servicekey]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.servicekeyworker.servicekeyworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.servicekeyworker.servicekeyworker
autostart = {{ config['servicekey']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
@ -365,7 +365,7 @@ stderr_events_enabled = true
[program:repomirrorworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repomirrorworker.repomirrorworker
command=sh -c python -m util.wait_for_endpoints http://localhost:9091/-/ready && python -m workers.repomirrorworker.repomirrorworker
autostart = {{ config['repomirrorworker']['autostart'] }}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0

View file

@ -460,8 +460,8 @@ class DefaultConfig(ImmutableConfig):
# The whitelist of client IDs for OAuth applications that allow for direct login.
DIRECT_OAUTH_CLIENTID_WHITELIST = []
# URL that specifies the location of the prometheus stats aggregator.
PROMETHEUS_AGGREGATOR_URL = 'http://localhost:9092'
# URL that specifies the location of the prometheus pushgateway.
PROMETHEUS_PUSHGATEWAY_URL = 'http://localhost:9091'
# Namespace prefix for all prometheus metrics.
PROMETHEUS_NAMESPACE = 'quay'
@ -607,3 +607,6 @@ class DefaultConfig(ImmutableConfig):
# Feature Flag: Whether garbage collection is enabled.
FEATURE_GARBAGE_COLLECTION = True
# When enabled, sets a tracing callback to report greenlet metrics.
GREENLET_TRACING = True

View file

@ -3,36 +3,41 @@ import uuid
from datetime import datetime, timedelta
from contextlib import contextmanager
from prometheus_client import Counter, Gauge
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
queue_item_puts = Counter('quay_queue_item_puts_total',
'number of items that have been added to the queue',
labelnames=['queue_name'])
queue_item_gets = Counter('quay_queue_item_gets_total',
'number of times get() has been called on queue',
labelnames=['queue_name', 'availability'])
queue_item_deletes = Counter('quay_queue_item_deletes_total',
'number of expired queue items that have been deleted')
queue_items_locked = Gauge('quay_queue_items_locked',
'number of queue items that have been acquired',
labelnames=['queue_name'])
queue_items_available = Gauge('quay_queue_items_available',
'number of queue items that have not expired',
labelnames=['queue_name'])
queue_items_available_unlocked = Gauge('quay_queue_items_available_unlocked',
'number of queue items that have not expired and are not locked',
labelnames=['queue_name'])
MINIMUM_EXTENSION = timedelta(seconds=20)
DEFAULT_BATCH_SIZE = 1000
class BuildMetricQueueReporter(object):
""" Metric queue reporter for the build system. """
def __init__(self, metric_queue):
self._metric_queue = metric_queue
def __call__(self, currently_processing, running_count, total_count):
need_capacity_count = total_count - running_count
self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count')
self._metric_queue.build_capacity_shortage.Set(need_capacity_count)
building_percent = 100 if currently_processing else 0
self._metric_queue.percent_building.Set(building_percent)
class WorkQueue(object):
""" Work queue defines methods for interacting with a queue backed by the database. """
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None, metric_queue=None,
has_namespace=False):
canonical_name_match_list=None, has_namespace=False):
self._queue_name = queue_name
self._reporter = reporter
self._metric_queue = metric_queue
self._transaction_factory = transaction_factory
self._currently_processing = False
self._has_namespaced_items = has_namespace
@ -129,21 +134,10 @@ class WorkQueue(object):
return (running_count, available_not_running_count, available_count)
def update_metrics(self):
if self._reporter is None and self._metric_queue is None:
return
(running_count, available_not_running_count, available_count) = self.get_metrics()
if self._metric_queue:
self._metric_queue.work_queue_running.Set(running_count, labelvalues=[self._queue_name])
self._metric_queue.work_queue_available.Set(available_count, labelvalues=[self._queue_name])
self._metric_queue.work_queue_available_not_running.Set(available_not_running_count,
labelvalues=[self._queue_name])
if self._reporter:
self._reporter(self._currently_processing, running_count,
running_count + available_not_running_count)
queue_items_locked.labels(self._queue_name).set(running_count)
queue_items_available.labels(self._queue_name).set(available_count)
queue_items_available_unlocked.labels(self._queue_name).set(available_not_running_count)
def has_retries_remaining(self, item_id):
""" Returns whether the queue item with the given id has any retries remaining. If the
@ -204,7 +198,9 @@ class WorkQueue(object):
# Chunk the inserted items into batch_size chunks and insert_many
remaining = list(items_to_insert)
while remaining:
QueueItem.insert_many(remaining[0:batch_size]).execute()
current_batch = remaining[0:batch_size]
QueueItem.insert_many(current_batch).execute()
queue_item_puts.labels(self._queue_name).inc(current_batch)
remaining = remaining[batch_size:]
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
@ -214,6 +210,7 @@ class WorkQueue(object):
"""
item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after,
retries_remaining))
queue_item_puts.labels(self._queue_name).inc()
return str(item.id)
def _select_available_item(self, ordering_required, now):
@ -289,15 +286,18 @@ class WorkQueue(object):
db_item = self._select_available_item(ordering_required, now)
if db_item is None:
self._currently_processing = False
queue_item_gets.labels(self._queue_name, 'nonexistant').inc()
return None
# Attempt to claim the item for this instance.
was_claimed = self._attempt_to_claim_item(db_item, now, processing_time)
if not was_claimed:
self._currently_processing = False
queue_item_gets.labels(self._queue_name, 'claimed').inc()
return None
self._currently_processing = True
queue_item_gets.labels(self._queue_name, 'acquired').inc()
# Return a view of the queue item rather than an active db object
return AttrDict({
@ -307,8 +307,8 @@ class WorkQueue(object):
})
def cancel(self, item_id):
""" Attempts to cancel the queue item with the given ID from the queue. Returns true on success
and false if the queue item could not be canceled.
""" Attempts to cancel the queue item with the given ID from the queue.
Returns true on success and false if the queue item could not be canceled.
"""
count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute()
return count_removed > 0
@ -375,4 +375,5 @@ def delete_expired(expiration_threshold, deletion_threshold, batch_size):
return 0
QueueItem.delete().where(QueueItem.id << to_delete).execute()
queue_item_deletes.inc(to_delete)
return len(to_delete)

View file

@ -7,6 +7,8 @@ from collections import namedtuple
import bitmath
import resumablehashlib
from prometheus_client import Counter, Histogram
from data.registry_model import registry_model
from data.database import CloseForLongOperation, db_transaction
from digest import digest_tools
@ -18,6 +20,13 @@ from util.registry.torrent import PieceHasher
logger = logging.getLogger(__name__)
chunk_upload_duration = Histogram('quay_chunk_upload_duration_seconds',
'number of seconds for a chunk to be uploaded to the registry',
labelnames=['region'])
pushed_bytes_total = Counter('quay_registry_pushed_bytes_total',
'number of bytes pushed to the registry')
BLOB_CONTENT_TYPE = 'application/octet-stream'
@ -125,13 +134,10 @@ class _BlobUploadManager(object):
""" Returns the unique ID for the blob upload. """
return self.blob_upload.upload_id
def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None):
def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1):
""" Uploads a chunk of data found in the given input file-like interface. start_offset and
length are optional and should match a range header if any was given.
If metric_queue is given, the upload time and chunk size are written into the metrics in
the queue.
Returns the total number of bytes uploaded after this upload has completed. Raises
a BlobUploadException if the upload failed.
"""
@ -207,11 +213,8 @@ class _BlobUploadManager(object):
raise BlobUploadException(upload_error)
# Update the chunk upload time and push bytes metrics.
if metric_queue is not None:
metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[
length_written, list(location_set)[0]])
metric_queue.push_byte_count.Inc(length_written)
chunk_upload_duration.labels(list(location_set)[0]).observe(time.time() - start_time)
pushed_bytes_total.inc(length_written)
# Ensure we have not gone beyond the max layer size.
new_blob_bytes = self.blob_upload.byte_count + length_written

View file

@ -10,7 +10,7 @@ from flask_restful import Resource, abort, Api, reqparse
from flask_restful.utils.cors import crossdomain
from jsonschema import validate, ValidationError
from app import app, metric_queue, authentication
from app import app, authentication
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
AdministerRepositoryPermission, UserReadPermission,
UserAdminPermission)
@ -25,7 +25,7 @@ from endpoints.csrf import csrf_protect
from endpoints.exception import (Unauthorized, InvalidRequest, InvalidResponse,
FreshLoginRequired, NotFound)
from endpoints.decorators import check_anon_protection, require_xhr_from_browser, check_readonly
from util.metrics.metricqueue import time_decorator
from util.metrics.prometheus import timed_blueprint
from util.names import parse_namespace_repository
from util.pagination import encrypt_page_token, decrypt_page_token
from util.request import get_request_ip
@ -33,7 +33,7 @@ from __init__models_pre_oci import pre_oci_model as model
logger = logging.getLogger(__name__)
api_bp = Blueprint('api', __name__)
api_bp = timed_blueprint(Blueprint('api', __name__))
CROSS_DOMAIN_HEADERS = ['Authorization', 'Content-Type', 'X-Requested-With']
@ -46,10 +46,8 @@ class ApiExceptionHandlingApi(Api):
api = ApiExceptionHandlingApi()
api.init_app(api_bp)
api.decorators = [csrf_protect(),
crossdomain(origin='*', headers=CROSS_DOMAIN_HEADERS),
process_oauth, time_decorator(api_bp.name, metric_queue),
require_xhr_from_browser]
api.decorators = [csrf_protect(), crossdomain(origin='*', headers=CROSS_DOMAIN_HEADERS),
process_oauth, require_xhr_from_browser]
def resource(*urls, **kwargs):

View file

@ -5,15 +5,13 @@ from functools import wraps
from cnr.exception import Forbidden
from flask import Blueprint
from app import metric_queue
from auth.permissions import (AdministerRepositoryPermission, ReadRepositoryPermission,
ModifyRepositoryPermission)
from endpoints.appr.decorators import require_repo_permission
from util.metrics.metricqueue import time_blueprint
from util.metrics.prometheus import timed_blueprint
appr_bp = Blueprint('appr', __name__)
time_blueprint(appr_bp, metric_queue)
appr_bp = timed_blueprint(Blueprint('appr', __name__))
logger = logging.getLogger(__name__)

View file

@ -5,7 +5,7 @@ from datetime import datetime, timedelta
from flask import request
from app import app, dockerfile_build_queue, metric_queue
from app import app, dockerfile_build_queue
from data import model
from data.logs_model import logs_model
from data.database import db, RepositoryState
@ -97,10 +97,6 @@ def start_build(repository, prepared_build, pull_robot_name=None):
build_request.queue_id = queue_id
build_request.save()
# Add the queueing of the build to the metrics queue.
metric_queue.repository_build_queued.Inc(labelvalues=[repository.namespace_user.username,
repository.name])
# Add the build to the repo's log and spawn the build_queued notification.
event_log_metadata = {
'build_id': build_request.uuid,

13
endpoints/metrics.py Normal file
View file

@ -0,0 +1,13 @@
from prometheus_client import Counter
image_pulls = Counter('quay_registry_image_pulls_total',
'number of images that have been downloaded via the registry',
labelnames=['protocol', 'ref', 'status'])
image_pushes = Counter('quay_registry_image_pushes_total',
'number of images that have been uploaded via the registry',
labelnames=['protocol', 'status'])
image_pulled_bytes = Counter('quay_registry_image_pulled_bytes_total',
'number of bytes that have been downloaded via the registry',
labelnames=['protocol'])

View file

@ -6,14 +6,13 @@ from flask import Blueprint, make_response, jsonify
import features
from app import metric_queue, app
from app import app
from data.readreplica import ReadOnlyModeException
from endpoints.decorators import anon_protect, anon_allowed
from util.metrics.metricqueue import time_blueprint
from util.http import abort
from util.metrics.prometheus import timed_blueprint
v1_bp = Blueprint('v1', __name__)
time_blueprint(v1_bp, metric_queue)
v1_bp = timed_blueprint(Blueprint('v1', __name__))
logger = logging.getLogger(__name__)

View file

@ -6,7 +6,7 @@ from functools import wraps
from flask import request, make_response, jsonify, session
from app import userevents, metric_queue, storage, docker_v2_signing_key
from app import userevents, storage, docker_v2_signing_key
from auth.auth_context import get_authenticated_context, get_authenticated_user
from auth.credentials import validate_credentials, CredentialKind
from auth.decorators import process_auth
@ -19,6 +19,7 @@ from data.registry_model import registry_model
from data.registry_model.manifestbuilder import create_manifest_builder, lookup_manifest_builder
from endpoints.decorators import (anon_protect, anon_allowed, parse_repository_name,
check_repository_state, check_readonly)
from endpoints.metrics import image_pulls, image_pushes
from endpoints.v1 import v1_bp, check_v1_push_enabled
from notifications import spawn_notification
from util.audit import track_and_log
@ -250,11 +251,13 @@ def update_images(namespace_name, repo_name):
kind_filter='image')
if repository_ref is None:
# Make sure the repo actually exists.
image_pushes.labels('v1', 404).inc()
abort(404, message='Unknown repository', issue='unknown-repo')
builder = lookup_manifest_builder(repository_ref, session.get('manifest_builder'), storage,
docker_v2_signing_key)
if builder is None:
image_pushes.labels('v1', 400).inc()
abort(400)
# Generate a job for each notification that has been added to this repo
@ -267,9 +270,10 @@ def update_images(namespace_name, repo_name):
track_and_log('push_repo', repository_ref)
spawn_notification(repository_ref, 'repo_push', event_data)
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v1', True])
image_pushes.labels('v1', 204).inc()
return make_response('Updated', 204)
image_pushes.labels('v1', 403).inc()
abort(403)
@ -287,6 +291,7 @@ def get_repository_images(namespace_name, repo_name):
if permission.can() or (repository_ref and repository_ref.is_public):
# We can't rely on permissions to tell us if a repo exists anymore
if repository_ref is None:
image_pulls.labels('v1', 'tag', 404).inc()
abort(404, message='Unknown repository', issue='unknown-repo')
logger.debug('Building repository image response')
@ -296,9 +301,10 @@ def get_repository_images(namespace_name, repo_name):
track_and_log('pull_repo', repository_ref,
analytics_name='pull_repo_100x',
analytics_sample=0.01)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v1', True])
image_pulls.labels('v1', 'tag', 200).inc()
return resp
image_pulls.labels('v1', 'tag', 403).inc()
abort(403)

View file

@ -7,7 +7,7 @@ from time import time
from flask import make_response, request, session, Response, redirect, abort as flask_abort
from app import storage as store, app, docker_v2_signing_key, metric_queue
from app import storage as store, app, docker_v2_signing_key
from auth.auth_context import get_authenticated_user
from auth.decorators import extract_namespace_repo_from_session, process_auth
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission)
@ -16,14 +16,16 @@ from data.registry_model import registry_model
from data.registry_model.blobuploader import upload_blob, BlobUploadSettings, BlobUploadException
from data.registry_model.manifestbuilder import lookup_manifest_builder
from digest import checksums
from endpoints.metrics import image_pulled_bytes
from endpoints.v1 import v1_bp, check_v1_push_enabled
from endpoints.v1.index import ensure_namespace_enabled
from endpoints.decorators import (anon_protect, check_region_blacklisted, check_repository_state,
check_readonly)
from util.http import abort, exact_abort
from util.http import abort
from util.registry.replication import queue_storage_replication
from util.request import get_request_ip
logger = logging.getLogger(__name__)
@ -127,7 +129,7 @@ def get_image_layer(namespace, repository, image_id, headers):
abort(404, 'Image %(image_id)s not found', issue='unknown-image', image_id=image_id)
path = legacy_image.blob.storage_path
metric_queue.pull_byte_count.Inc(legacy_image.blob.compressed_size, labelvalues=['v1'])
image_pulled_bytes.labels('v1').inc(legacy_image.blob.compressed_size)
try:
logger.debug('Looking up the direct download URL for path: %s', path)

View file

@ -10,10 +10,10 @@ from semantic_version import Spec
import features
from app import app, metric_queue, get_app_url
from app import app, get_app_url
from auth.auth_context import get_authenticated_context
from auth.permissions import (
ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission)
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
AdministerRepositoryPermission)
from auth.registry_jwt_auth import process_registry_jwt_auth, get_auth_headers
from data.registry_model import registry_model
from data.readreplica import ReadOnlyModeException
@ -21,14 +21,15 @@ from endpoints.decorators import anon_protect, anon_allowed, route_show_if
from endpoints.v2.errors import (V2RegistryException, Unauthorized, Unsupported, NameUnknown,
ReadOnlyMode)
from util.http import abort
from util.metrics.metricqueue import time_blueprint
from util.metrics.prometheus import timed_blueprint
from util.registry.dockerver import docker_version
from util.pagination import encrypt_page_token, decrypt_page_token
logger = logging.getLogger(__name__)
v2_bp = Blueprint('v2', __name__)
time_blueprint(v2_bp, metric_queue)
v2_bp = timed_blueprint(Blueprint('v2', __name__))
@v2_bp.app_errorhandler(V2RegistryException)

View file

@ -3,7 +3,7 @@ import re
from flask import url_for, request, redirect, Response, abort as flask_abort
from app import storage, app, get_app_url, metric_queue, model_cache
from app import storage, app, get_app_url, model_cache
from auth.registry_jwt_auth import process_registry_jwt_auth
from auth.permissions import ReadRepositoryPermission
from data import database
@ -15,6 +15,7 @@ from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_
from digest import digest_tools
from endpoints.decorators import (anon_protect, anon_allowed, parse_repository_name,
check_region_blacklisted, check_readonly)
from endpoints.metrics import image_pulled_bytes
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2.errors import (
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge,
@ -83,7 +84,7 @@ def download_blob(namespace_name, repo_name, digest):
if storage.get_supports_resumable_downloads(blob.placements):
headers['Accept-Ranges'] = 'bytes'
metric_queue.pull_byte_count.Inc(blob.compressed_size, labelvalues=['v2'])
image_pulled_bytes.labels('v2').inc(blob.compressed_size)
# Short-circuit by redirecting if the storage supports it.
path = blob.storage_path
@ -434,7 +435,7 @@ def _upload_chunk(blob_uploader, commit_digest=None):
try:
# Upload the data received.
blob_uploader.upload_chunk(app.config, input_fp, start_offset, length, metric_queue)
blob_uploader.upload_chunk(app.config, input_fp, start_offset, length)
if commit_digest is not None:
# Commit the upload to a blob.

View file

@ -6,12 +6,13 @@ from flask import request, url_for, Response
import features
from app import app, metric_queue, storage
from app import app, storage
from auth.registry_jwt_auth import process_registry_jwt_auth
from digest import digest_tools
from data.registry_model import registry_model
from data.model.oci.manifest import CreateManifestException
from endpoints.decorators import anon_protect, parse_repository_name, check_readonly
from endpoints.metrics import image_pulls, image_pushes
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write
from endpoints.v2.errors import (ManifestInvalid, ManifestUnknown, NameInvalid, TagExpired,
NameUnknown)
@ -41,6 +42,7 @@ MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN)
def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
image_pulls.labels('v2_1', 'tag', 404).inc()
raise NameUnknown()
tag = registry_model.get_repo_tag(repository_ref, manifest_ref)
@ -49,23 +51,27 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
logger.debug('Found expired tag %s for repository %s/%s', manifest_ref, namespace_name,
repo_name)
msg = 'Tag %s was deleted or has expired. To pull, revive via time machine' % manifest_ref
image_pulls.labels('v2_1', 'tag', 404).inc()
raise TagExpired(msg)
image_pulls.labels('v2_1', 'tag', 404).inc()
raise ManifestUnknown()
manifest = registry_model.get_manifest_for_tag(tag, backfill_if_necessary=True)
if manifest is None:
# Something went wrong.
image_pulls.labels('v2_1', 'tag', 400).inc()
raise ManifestInvalid()
manifest_bytes, manifest_digest, manifest_media_type = _rewrite_schema_if_necessary(
namespace_name, repo_name, manifest_ref, manifest)
if manifest_bytes is None:
image_pulls.labels('v2_1', 'tag', 404).inc()
raise ManifestUnknown()
track_and_log('pull_repo', repository_ref, analytics_name='pull_repo_100x', analytics_sample=0.01,
tag=manifest_ref)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
image_pulls.labels('v2_1', 'tag', 200).inc()
return Response(
manifest_bytes.as_unicode(),
@ -85,19 +91,22 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
image_pulls.labels('v2_1', 'manifest', 404).inc()
raise NameUnknown()
manifest = registry_model.lookup_manifest_by_digest(repository_ref, manifest_ref)
if manifest is None:
image_pulls.labels('v2_1', 'manifest', 404).inc()
raise ManifestUnknown()
manifest_bytes, manifest_digest, manifest_media_type = _rewrite_schema_if_necessary(
namespace_name, repo_name, '$digest', manifest)
if manifest_digest is None:
image_pulls.labels('v2_1', 'manifest', 404).inc()
raise ManifestUnknown()
track_and_log('pull_repo', repository_ref, manifest_digest=manifest_ref)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
image_pulls.labels('v2_1', 'manifest', 200).inc()
return Response(manifest_bytes.as_unicode(), status=200, headers={
'Content-Type': manifest_media_type,
@ -180,6 +189,7 @@ def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
parsed = _parse_manifest()
if parsed.digest != manifest_ref:
image_pushes.labels('v2_invalid', 400).inc()
raise ManifestInvalid(detail={'message': 'manifest digest mismatch'})
if parsed.schema_version != 2:
@ -190,14 +200,17 @@ def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
# manifest with a temporary tag, as it is being pushed as part of a call for a manifest list.
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
image_pushes.labels('v2_2', 404).inc()
raise NameUnknown()
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
manifest = registry_model.create_manifest_with_temp_tag(repository_ref, parsed, expiration_sec,
storage)
if manifest is None:
image_pushes.labels('v2_2', 400).inc()
raise ManifestInvalid()
image_pushes.labels('v2_2', 202).inc()
return Response(
'OK',
status=202,
@ -271,7 +284,7 @@ def _write_manifest_and_log(namespace_name, repo_name, tag_name, manifest_impl):
track_and_log('push_repo', repository_ref, tag=tag_name)
spawn_notification(repository_ref, 'repo_push', {'updated_tags': [tag_name]})
metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
image_pushes.labels('v2_1', 202).inc()
return Response(
'OK',

View file

@ -3,11 +3,14 @@ import json
import logging
import uuid
from functools import wraps
from flask import redirect, Blueprint, abort, send_file, make_response, request
from prometheus_client import Counter
import features
from app import app, signer, storage, metric_queue, config_provider, ip_resolver, instance_keys
from app import app, signer, storage, config_provider, ip_resolver, instance_keys
from auth.auth_context import get_authenticated_user
from auth.decorators import process_auth
from auth.permissions import ReadRepositoryPermission
@ -16,6 +19,7 @@ from data import model
from data.registry_model import registry_model
from endpoints.decorators import (anon_protect, anon_allowed, route_show_if, parse_repository_name,
check_region_blacklisted)
from endpoints.metrics import image_pulls, image_pulled_bytes
from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from image.appc import AppCImageFormatter
from image.docker import ManifestException
@ -32,6 +36,10 @@ from util.registry.torrent import (make_torrent, per_user_torrent_filename, publ
logger = logging.getLogger(__name__)
verb_stream_passes = Counter('quay_verb_stream_passes_total',
'number of passes over a tar stream used by verb requests',
labelnames=['kind'])
verbs = Blueprint('verbs', __name__)
LAYER_MIMETYPE = 'binary/octet-stream'
@ -42,7 +50,7 @@ class VerbReporter(TarLayerFormatterReporter):
self.kind = kind
def report_pass(self, pass_count):
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
verb_stream_passes.labels(self.kind).inc(pass_count)
def _open_stream(formatter, tag, schema1_manifest, derived_image_id, handlers, reporter):
@ -65,7 +73,7 @@ def _open_stream(formatter, tag, schema1_manifest, derived_image_id, handlers, r
def tar_stream_getter_iterator():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
store = Storage(app, config_provider=config_provider, ip_resolver=ip_resolver)
# Note: We reverse because we have to start at the leaf layer and move upward,
# as per the spec for the formatters.
@ -112,7 +120,7 @@ def _write_derived_image_to_storage(verb, derived_image, queue_file):
queue_file.add_exception_handler(handle_exception)
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
store = Storage(app, config_provider=config_provider, ip_resolver=ip_resolver)
try:
store.stream_write(derived_image.blob.placements, derived_image.blob.storage_path, queue_file)
@ -293,12 +301,10 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
# image already exists).
if request.accept_mimetypes.best == 'application/x-bittorrent':
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True])
return _torrent_repo_verb(repo, tag, manifest, verb, **kwargs)
# Log the action.
track_and_log('repo_verb', wrap_repository(repo), tag=tag.name, verb=verb, **kwargs)
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True])
is_readonly = app.config.get('REGISTRY_STATE', 'normal') == 'readonly'
@ -321,7 +327,7 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
logger.debug('Derived %s image %s exists in storage', verb, derived_image)
is_head_request = request.method == 'HEAD'
metric_queue.pull_byte_count.Inc(derived_image.blob.compressed_size, labelvalues=[verb])
image_pulled_bytes.labels('bittorrent').inc(derived_image.blob.compressed_size)
download_url = storage.get_direct_download_url(derived_image.blob.placements,
derived_image.blob.storage_path,
@ -435,10 +441,25 @@ def os_arch_checker(os, arch):
return checker
def observe_route(protocol):
"""
Decorates get_tag_torrent to record the image_pulls metric into Prometheus.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
rv = func(*args, **kwargs)
image_pulls.labels(protocol, 'tag', rv.status_code)
return rv
return wrapper
return decorator
@route_show_if(features.ACI_CONVERSION)
@anon_protect
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/sig/<os>/<arch>/', methods=['GET'])
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci.asc/<os>/<arch>/', methods=['GET'])
@observe_route('aci')
@process_auth
def get_aci_signature(server, namespace, repository, tag, os, arch):
return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch),
@ -449,6 +470,7 @@ def get_aci_signature(server, namespace, repository, tag, os, arch):
@anon_protect
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=[
'GET', 'HEAD'])
@observe_route('aci')
@process_auth
def get_aci_image(server, namespace, repository, tag, os, arch):
return _repo_verb(namespace, repository, tag, 'aci',
@ -458,6 +480,7 @@ def get_aci_image(server, namespace, repository, tag, os, arch):
@anon_protect
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@observe_route('squash')
@process_auth
def get_squashed_tag(namespace, repository, tag):
return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImageFormatter())
@ -466,6 +489,7 @@ def get_squashed_tag(namespace, repository, tag):
@route_show_if(features.BITTORRENT)
@anon_protect
@verbs.route('/torrent{0}'.format(BLOB_DIGEST_ROUTE), methods=['GET'])
@observe_route('bittorrent')
@process_auth
@parse_repository_name()
@check_region_blacklisted(namespace_name_kwarg='namespace_name')
@ -493,7 +517,6 @@ def get_tag_torrent(namespace_name, repo_name, digest):
if blob is None:
abort(404)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent', True])
return _torrent_for_blob(blob, repo_is_public)

View file

@ -38,11 +38,6 @@ initdb)
fulldbtest)
d bash /src/quay/test/fulldbtest.sh
;;
prom)
R=quay.io/quay/prom-monitor
docker build -t $R prom_aggregator
docker run --rm -it --net=host $R -loglevel=debug
;;
*)
echo "unknown option"
exit 1

View file

@ -16,7 +16,7 @@ disable=missing-docstring,invalid-name,too-many-locals,too-few-public-methods,to
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus extisting member attributes cannot be deduced by static analysis
ignored-modules=features
ignored-modules=features,greenlet
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E0201 when accessed. Python regular

View file

@ -2,6 +2,8 @@
FROM phusion/baseimage:0.10.0
ENV OS linux
ENV ARCH amd64
ENV DEBIAN_FRONTEND noninteractive
ENV HOME /root
ENV QUAYDIR /quay-registry
@ -70,12 +72,17 @@ RUN curl -O https://storage.googleapis.com/golang/go1.10.linux-amd64.tar.gz && \
rm -rf /gocode && rm -rf /usr/local/go
# Install jwtproxy
RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64 \
&& chmod +x /usr/local/bin/jwtproxy
ENV JWTPROXY_VERSION=0.0.3
RUN curl -fsSL -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v$(JWTPROXY_VERSION)/jwtproxy-$(OS)-$(ARCH) && \
chmod +x /usr/local/bin/jwtproxy
# Install prometheus-aggregator
RUN curl -L -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator \
&& chmod +x /usr/local/bin/prometheus-aggregator
# Install pushgateway
ENV PUSHGATEWAY_VERSION=1.0.0
RUN curl -fsSL https://github.com/prometheus/pushgateway/releases/download/$(PUSHGATEWAY_VERSION)/pushgateway-$(PUSHGATEWAY_VERSION).$(OS)-$(ARCH).tar.gz | \
tar xz pushgateway-$(PUSHGATEWAY_VERSION).$(OS)-$(ARCH)/pushgateway && \
mv pushgateway-$(PUSHGATEWAY_VERSION).$(OS)-$(ARCH)/pushgateway /usr/local/bin/pushgateway && \
rm -rf pushgateway-$(PUSHGATEWAY_VERSION).$(OS)-$(ARCH) && \
chmod +x /usr/local/bin/pushgateway
# Install python dependencies
COPY requirements.txt requirements-tests.txt ./

View file

@ -75,9 +75,9 @@ case "$QUAYENTRY" in
echo "Entering repository mirroring mode"
if [ -z "${QUAY_SERVICES}" ]
then
export QUAY_SERVICES=repomirrorworker,prometheus-aggregator
export QUAY_SERVICES=repomirrorworker,pushgateway
else
export QUAY_SERVICES=${QUAY_SERVICES},repomirrorworker,prometheus-aggregator
export QUAY_SERVICES=${QUAY_SERVICES},repomirrorworker,pushgateway
fi
;&
"registry")

View file

@ -22,42 +22,41 @@ STORAGE_DRIVER_CLASSES = {
}
def get_storage_driver(location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver,
def get_storage_driver(location, chunk_cleanup_queue, config_provider, ip_resolver,
storage_params):
""" Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """
driver = storage_params[0]
parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
context = StorageContext(location, metric_queue, chunk_cleanup_queue, config_provider,
context = StorageContext(location, chunk_cleanup_queue, config_provider,
ip_resolver)
return driver_class(context, **parameters)
class StorageContext(object):
def __init__(self, location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver):
def __init__(self, location, chunk_cleanup_queue, config_provider, ip_resolver):
self.location = location
self.metric_queue = metric_queue
self.chunk_cleanup_queue = chunk_cleanup_queue
self.config_provider = config_provider
self.ip_resolver = ip_resolver or NoopIPResolver()
class Storage(object):
def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None,
def __init__(self, app=None, chunk_cleanup_queue=None, instance_keys=None,
config_provider=None, ip_resolver=None):
self.app = app
if app is not None:
self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys,
self.state = self.init_app(app, chunk_cleanup_queue, instance_keys,
config_provider, ip_resolver)
else:
self.state = None
def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider,
def init_app(self, app, chunk_cleanup_queue, instance_keys, config_provider,
ip_resolver):
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue,
storages[location] = get_storage_driver(location, chunk_cleanup_queue,
config_provider, ip_resolver, storage_params)
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)

View file

@ -3,27 +3,26 @@ import os
import logging
import copy
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cachetools.func import lru_cache
from itertools import chain
from collections import namedtuple
from datetime import datetime, timedelta
from io import BufferedIOBase
from itertools import chain
from uuid import uuid4
from botocore.signers import CloudFrontSigner
from boto.exception import S3ResponseError
import boto.s3.connection
import boto.s3.multipart
import boto.gs.connection
import boto.s3.key
import boto.gs.key
from io import BufferedIOBase
from uuid import uuid4
from collections import namedtuple
from boto.exception import S3ResponseError
from botocore.signers import CloudFrontSigner
from cachetools.func import lru_cache
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from prometheus_client import Counter
from util.registry import filelike
from storage.basestorage import BaseStorageV2
@ -31,6 +30,13 @@ from storage.basestorage import BaseStorageV2
logger = logging.getLogger(__name__)
multipart_uploads_started = Counter('quay_multipart_uploads_started_total',
'number of multipart uploads to Quay storage that started')
multipart_uploads_completed = Counter('quay_multipart_uploads_completed_total',
'number of multipart uploads to Quay storage that completed')
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_CHUNKS_KEY = 'chunks'
@ -181,8 +187,7 @@ class _CloudStorage(BaseStorageV2):
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
if self._context.metric_queue is not None:
self._context.metric_queue.multipart_upload_start.Inc()
multipart_uploads_started.inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
@ -237,8 +242,7 @@ class _CloudStorage(BaseStorageV2):
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
write_error = e
if self._context.metric_queue is not None:
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
multipart_uploads_completed.inc()
if cancel_on_error:
try:
@ -251,8 +255,7 @@ class _CloudStorage(BaseStorageV2):
break
if total_bytes_written > 0:
if self._context.metric_queue is not None:
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
multipart_uploads_completed.inc()
self._perform_action_with_retry(mp.complete_upload)

View file

@ -18,7 +18,7 @@ _TEST_BUCKET = 'some_bucket'
_TEST_USER = 'someuser'
_TEST_PASSWORD = 'somepassword'
_TEST_PATH = 'some/cool/path'
_TEST_CONTEXT = StorageContext('nyc', None, None, None, None)
_TEST_CONTEXT = StorageContext('nyc', None, None, None)
@pytest.fixture(scope='function')

View file

@ -44,7 +44,7 @@ def test_direct_download(test_aws_ip, test_empty_ip_range_cache, test_ip_range_c
if ipranges_populated:
ipresolver.sync_token = test_ip_range_cache['sync_token'] if ipranges_populated else test_empty_ip_range_cache['sync_token']
ipresolver.amazon_ranges = test_ip_range_cache['all_amazon'] if ipranges_populated else test_empty_ip_range_cache['all_amazon']
context = StorageContext('nyc', None, None, config_provider, ipresolver)
context = StorageContext('nyc', None, config_provider, ipresolver)
# Create a test bucket and put some test content.
boto.connect_s3().create_bucket(_TEST_BUCKET)
@ -68,7 +68,7 @@ def test_direct_download(test_aws_ip, test_empty_ip_range_cache, test_ip_range_c
@mock_s3
def test_direct_download_no_ip(test_aws_ip, aws_ip_range_data, ipranges_populated, app):
ipresolver = IPResolver(app)
context = StorageContext('nyc', None, None, config_provider, ipresolver)
context = StorageContext('nyc', None, config_provider, ipresolver)
# Create a test bucket and put some test content.
boto.connect_s3().create_bucket(_TEST_BUCKET)

View file

@ -11,7 +11,7 @@ from storage.swift import SwiftStorage, _EMPTY_SEGMENTS_KEY
from swiftclient.client import ClientException
base_args = {
'context': StorageContext('nyc', None, None, None, None),
'context': StorageContext('nyc', None, None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
@ -265,7 +265,7 @@ def test_cancel_chunked_upload():
chunk_cleanup_queue = FakeQueue()
args = dict(base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None)
args['context'] = StorageContext('nyc', chunk_cleanup_queue, None, None)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()
@ -288,7 +288,7 @@ def test_cancel_chunked_upload():
def test_empty_chunks_queued_for_deletion():
chunk_cleanup_queue = FakeQueue()
args = dict(base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None)
args['context'] = StorageContext('nyc', chunk_cleanup_queue, None, None)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()

View file

@ -31,7 +31,7 @@ from jwkest.jwk import RSAKey
import endpoints.decorated # required for side effect
from app import app, storage, instance_keys, get_app_url, metric_queue
from app import app, storage, instance_keys, get_app_url
from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
from data import model
from digest.checksums import compute_simple
@ -2228,7 +2228,7 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base
encoded = response.json()['token']
header = 'Bearer ' + encoded
payload = decode_bearer_header(header, instance_keys, app.config, metric_queue=metric_queue)
payload = decode_bearer_header(header, instance_keys, app.config)
self.assertIsNotNone(payload)
if scope is None:

View file

@ -78,7 +78,7 @@ class TestConfig(DefaultConfig):
INSTANCE_SERVICE_KEY_KID_LOCATION = 'test/data/test.kid'
INSTANCE_SERVICE_KEY_LOCATION = 'test/data/test.pem'
PROMETHEUS_AGGREGATOR_URL = None
PROMETHEUS_PUSHGATEWAY_URL = None
GITHUB_LOGIN_CONFIG = {}
GOOGLE_LOGIN_CONFIG = {}

View file

@ -62,7 +62,7 @@ INTERNAL_ONLY_PROPERTIES = {
'TUF_GUN_PREFIX',
'LOGGING_LEVEL',
'SIGNED_GRANT_EXPIRATION_SEC',
'PROMETHEUS_AGGREGATOR_URL',
'PROMETHEUS_PUSHGATEWAY_URL',
'DB_TRANSACTION_FACTORY',
'NOTIFICATION_SEND_TIMEOUT',
'QUEUE_METRICS_TYPE',

64
util/greenlet_tracing.py Normal file
View file

@ -0,0 +1,64 @@
from time import time
from gevent.hub import get_hub
from greenlet import settrace
from prometheus_client import Counter, Histogram
greenlet_switch = Counter('greenlet_switch_total', 'number of greenlet context switches')
greenlet_throw = Counter('greenlet_throw_total', 'number of greenlet throws')
greenlet_duration = Histogram('greenlet_duration_seconds',
'seconds in which a particular greenlet is executing',
buckets=[.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0])
_latest_switch = None
def enable_tracing():
settrace(greenlet_callback)
def greenlet_callback(event, args):
"""
This is a callback that is executed greenlet on all events.
"""
if event in ('switch', 'throw'):
# It's only safe to unpack args under these two events.
(origin, _target) = args
if origin is get_hub():
# This greenlet is the one that manages the loop itself, thus noop.
return
if event == 'switch':
switch_callback(args)
return
if event == 'throw':
throw_callback(args)
return
def switch_callback(_args):
"""
This is a callback that is executed specifically on greenlet switches.
"""
global _latest_switch
greenlet_switch.inc()
if _latest_switch is None:
# This is the first switch.
_latest_switch = time()
return
now = time()
greenlet_duration.observe(now - _latest_switch)
_latest_switch = now
def throw_callback(_args):
"""
This is a callback that is executed on execeptions from origin to target.
This callback is running in the context of the target greenlet and any exceptions will replace
the original, as if target.throw() was used replacing the exception.
"""
greenlet_throw.inc()

View file

@ -1,210 +0,0 @@
import datetime
import logging
import time
from functools import wraps
from Queue import Queue, Full
from flask import g, request
from trollius import Return
logger = logging.getLogger(__name__)
# Buckets for the API response times.
API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
# Buckets for the builder start times.
BUILDER_START_TIME_BUCKETS = [.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0]
class MetricQueue(object):
""" Object to which various metrics are written, for distribution to metrics collection
system(s) such as Prometheus.
"""
def __init__(self, prom):
# Define the various exported metrics.
self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds',
labelnames=['endpoint'],
buckets=API_RESPONSE_TIME_BUCKETS)
self.resp_code = prom.create_counter('response_code', 'HTTP response code',
labelnames=['endpoint', 'code'])
self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes',
labelnames=['endpoint'])
self.error_500 = prom.create_counter('response_500', '5XX HTTP response codes',
labelnames=['endpoint'])
self.multipart_upload_start = prom.create_counter('multipart_upload_start',
'Multipart upload started')
self.multipart_upload_end = prom.create_counter('multipart_upload_end',
'Multipart upload ends.', labelnames=['type'])
self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage',
'Build capacity shortage.')
self.builder_time_to_start = prom.create_histogram('builder_tts',
'Time from triggering to starting a builder.',
labelnames=['builder_type'],
buckets=BUILDER_START_TIME_BUCKETS)
self.builder_time_to_build = prom.create_histogram('builder_ttb',
'Time from triggering to actually starting a build',
labelnames=['builder_type'],
buckets=BUILDER_START_TIME_BUCKETS)
self.build_time = prom.create_histogram('build_time', 'Time spent building', labelnames=['builder_type'])
self.builder_fallback = prom.create_counter('builder_fallback', 'Builder fell back to secondary executor')
self.build_start_success = prom.create_counter('build_start_success', 'Executor succeeded in starting a build', labelnames=['builder_type'])
self.build_start_failure = prom.create_counter('build_start_failure', 'Executor failed to start a build', labelnames=['builder_type'])
self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.')
self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name'])
self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers',
'Number of started ephemeral build workers')
self.ephemeral_build_worker_failure = prom.create_counter('ephemeral_build_worker_failure',
'Number of failed-to-start ephemeral build workers')
self.work_queue_running = prom.create_gauge('work_queue_running', 'Running items in a queue',
labelnames=['queue_name'])
self.work_queue_available = prom.create_gauge('work_queue_available',
'Available items in a queue',
labelnames=['queue_name'])
self.work_queue_available_not_running = prom.create_gauge('work_queue_available_not_running',
'Available items that are not yet running',
labelnames=['queue_name'])
self.repository_pull = prom.create_counter('repository_pull', 'Repository Pull Count',
labelnames=['namespace', 'repo_name', 'protocol',
'status'])
self.repository_push = prom.create_counter('repository_push', 'Repository Push Count',
labelnames=['namespace', 'repo_name', 'protocol',
'status'])
self.repository_build_queued = prom.create_counter('repository_build_queued',
'Repository Build Queued Count',
labelnames=['namespace', 'repo_name'])
self.repository_build_completed = prom.create_counter('repository_build_completed',
'Repository Build Complete Count',
labelnames=['namespace', 'repo_name',
'status', 'executor'])
self.chunk_size = prom.create_histogram('chunk_size',
'Registry blob chunk size',
labelnames=['storage_region'])
self.chunk_upload_time = prom.create_histogram('chunk_upload_time',
'Registry blob chunk upload time',
labelnames=['storage_region'])
self.authentication_count = prom.create_counter('authentication_count',
'Authentication count',
labelnames=['kind', 'status'])
self.repository_count = prom.create_gauge('repository_count', 'Number of repositories')
self.user_count = prom.create_gauge('user_count', 'Number of users')
self.org_count = prom.create_gauge('org_count', 'Number of Organizations')
self.robot_count = prom.create_gauge('robot_count', 'Number of robot accounts')
self.instance_key_renewal_success = prom.create_counter('instance_key_renewal_success',
'Instance Key Renewal Success Count',
labelnames=['key_id'])
self.instance_key_renewal_failure = prom.create_counter('instance_key_renewal_failure',
'Instance Key Renewal Failure Count',
labelnames=['key_id'])
self.invalid_instance_key_count = prom.create_counter('invalid_registry_instance_key_count',
'Invalid registry instance key count',
labelnames=['key_id'])
self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count',
labelnames=['kind', 'pass_count'])
self.push_byte_count = prom.create_counter('registry_push_byte_count',
'Number of bytes pushed to the registry')
self.pull_byte_count = prom.create_counter('estimated_registry_pull_byte_count',
'Number of (estimated) bytes pulled from the registry',
labelnames=['protocol_version'])
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
# provider.
self._queue = None
def enable_deprecated(self, maxsize=10000):
self._queue = Queue(maxsize)
def put_deprecated(self, name, value, **kwargs):
if self._queue is None:
logger.debug('No metric queue %s %s %s', name, value, kwargs)
return
try:
kwargs.setdefault('timestamp', datetime.datetime.now())
kwargs.setdefault('dimensions', {})
self._queue.put_nowait((name, value, kwargs))
except Full:
logger.error('Metric queue full')
def get_deprecated(self):
return self._queue.get()
def get_nowait_deprecated(self):
return self._queue.get_nowait()
def duration_collector_async(metric, labelvalues):
""" Decorates a method to have its duration time logged to the metric. """
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
trigger_time = time.time()
try:
rv = func(*args, **kwargs)
except Return as e:
metric.Observe(time.time() - trigger_time, labelvalues=labelvalues)
raise e
return rv
return wrapper
return decorator
def time_decorator(name, metric_queue):
""" Decorates an endpoint method to have its request time logged to the metrics queue. """
after = _time_after_request(name, metric_queue)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
_time_before_request()
rv = func(*args, **kwargs)
after(rv)
return rv
return wrapper
return decorator
def time_blueprint(bp, metric_queue):
""" Decorates a blueprint to have its request time logged to the metrics queue. """
bp.before_request(_time_before_request)
bp.after_request(_time_after_request(bp.name, metric_queue))
def _time_before_request():
g._request_start_time = time.time()
def _time_after_request(name, metric_queue):
def f(r):
start = getattr(g, '_request_start_time', None)
if start is None:
return r
dur = time.time() - start
metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint])
metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code])
if r.status_code >= 500:
metric_queue.error_500.Inc(labelvalues=[request.endpoint])
elif r.status_code < 200 or r.status_code >= 300:
metric_queue.non_200.Inc(labelvalues=[request.endpoint])
return r
return f

View file

@ -1,18 +1,23 @@
import datetime
import json
import logging
import time
import threading
from Queue import Queue, Full, Empty
from threading import Thread
import requests
from flask import g, request
from prometheus_client import push_to_gateway, REGISTRY, Histogram
logger = logging.getLogger(__name__)
QUEUE_MAX = 1000
MAX_BATCH_SIZE = 100
REGISTER_WAIT = datetime.timedelta(hours=1)
request_duration = Histogram('quay_request_duration_seconds',
'seconds taken to process a request',
labelnames=['method', 'endpoint', 'status'],
buckets=[.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0])
PROMETHEUS_PUSH_INTERVAL_SECONDS = 30
ONE_DAY_IN_SECONDS = 60 * 60 * 24
class PrometheusPlugin(object):
""" Application plugin for reporting metrics to Prometheus. """
@ -24,145 +29,54 @@ class PrometheusPlugin(object):
self.state = None
def init_app(self, app):
prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL')
prom_namespace = app.config.get('PROMETHEUS_NAMESPACE')
logger.debug('Initializing prometheus with aggregator url: %s', prom_url)
prometheus = Prometheus(prom_url, prom_namespace)
pusher = ThreadPusher(app)
pusher.start()
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['prometheus'] = prometheus
return prometheus
app.extensions['prometheus'] = pusher
return pusher
def __getattr__(self, name):
return getattr(self.state, name, None)
class Prometheus(object):
""" Aggregator for collecting stats that are reported to Prometheus. """
def __init__(self, url=None, namespace=None):
self._metric_collectors = []
self._url = url
self._namespace = namespace or ''
if url is not None:
self._queue = Queue(QUEUE_MAX)
self._sender = _QueueSender(self._queue, url, self._metric_collectors)
self._sender.start()
logger.debug('Prometheus aggregator sending to %s', url)
else:
self._queue = None
logger.debug('Prometheus aggregator disabled')
def enqueue(self, call, data):
if not self._queue:
return
v = json.dumps({
'Call': call,
'Data': data,
})
if call == 'register':
self._metric_collectors.append(v)
return
try:
self._queue.put_nowait(v)
except Full:
# If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed inside the queue handler. In either case,
# we don't need to print an error here.
pass
def create_gauge(self, *args, **kwargs):
return self._create_collector('Gauge', args, kwargs)
def create_counter(self, *args, **kwargs):
return self._create_collector('Counter', args, kwargs)
def create_summary(self, *args, **kwargs):
return self._create_collector('Summary', args, kwargs)
def create_histogram(self, *args, **kwargs):
return self._create_collector('Histogram', args, kwargs)
def create_untyped(self, *args, **kwargs):
return self._create_collector('Untyped', args, kwargs)
def _create_collector(self, collector_type, args, kwargs):
kwargs['namespace'] = kwargs.get('namespace', self._namespace)
return _Collector(self.enqueue, collector_type, *args, **kwargs)
class _QueueSender(Thread):
""" Helper class which uses a thread to asynchronously send metrics to the local Prometheus
aggregator. """
def __init__(self, queue, url, metric_collectors):
Thread.__init__(self)
class ThreadPusher(threading.Thread):
def __init__(self, app):
super(ThreadPusher, self).__init__()
self.daemon = True
self.next_register = datetime.datetime.now()
self._queue = queue
self._url = url
self._metric_collectors = metric_collectors
self._app = app
def run(self):
agg_url = self._app.config.get('PROMETHEUS_PUSHGATEWAY_URL')
while True:
reqs = []
reqs.append(self._queue.get())
if agg_url is None:
# Practically disable this worker, if there is no pushgateway.
time.sleep(ONE_DAY_IN_SECONDS)
continue
while len(reqs) < MAX_BATCH_SIZE:
try:
req = self._queue.get_nowait()
reqs.append(req)
except Empty:
break
try:
resp = requests.post(self._url + '/call', '\n'.join(reqs))
if resp.status_code == 500 and self.next_register <= datetime.datetime.now():
resp = requests.post(self._url + '/call', '\n'.join(self._metric_collectors))
self.next_register = datetime.datetime.now() + REGISTER_WAIT
logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code,
len(self._metric_collectors), self.next_register)
elif resp.status_code != 200:
logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text,
', '.join(reqs))
else:
logger.debug('Sent %d prometheus metrics', len(reqs))
except:
logger.exception('Failed to write to prometheus aggregator: %s', reqs)
time.sleep(PROMETHEUS_PUSH_INTERVAL_SECONDS)
push_to_gateway(agg_url, job=self._app.config.get('PROMETHEUS_NAMESPACE', 'quay'),
registry=REGISTRY)
class _Collector(object):
""" Collector for a Prometheus metric. """
def __init__(self, enqueue_method, collector_type, collector_name, collector_help,
namespace='', subsystem='', **kwargs):
self._enqueue_method = enqueue_method
self._base_args = {
'Name': collector_name,
'Namespace': namespace,
'Subsystem': subsystem,
'Type': collector_type,
}
def timed_blueprint(bp):
"""
Decorates a blueprint to have its request duration tracked by Prometheus.
"""
def _time_before_request():
g._request_start_time = time.time()
bp.before_request(_time_before_request)
registration_params = dict(kwargs)
registration_params.update(self._base_args)
registration_params['Help'] = collector_help
self._enqueue_method('register', registration_params)
def __getattr__(self, method):
def f(value=0, labelvalues=()):
data = dict(self._base_args)
data.update({
'Value': value,
'LabelValues': [str(i) for i in labelvalues],
'Method': method,
})
self._enqueue_method('put', data)
def _time_after_request():
def f(r):
start = getattr(g, '_request_start_time', None)
if start is None:
return r
dur = time.time() - start
request_duration.labels(request.method, request.endpoint, r.status_code).observe(dur)
return r
return f
bp.after_request(_time_after_request())
return bp

Some files were not shown because too many files have changed in this diff Show more