Merge pull request #2909 from coreos-inc/joseph.schorr/QS-56/bad-service-key
Better handling of instance key failure in Quay
This commit is contained in:
commit
0486fe6314
10 changed files with 99 additions and 21 deletions
|
@ -6,7 +6,7 @@ from jsonschema import validate, ValidationError
|
|||
from flask import request, url_for
|
||||
from flask_principal import identity_changed, Identity
|
||||
|
||||
from app import app, get_app_url, instance_keys
|
||||
from app import app, get_app_url, instance_keys, metric_queue
|
||||
from auth.auth_context import (set_grant_context, get_grant_context)
|
||||
from auth.permissions import repository_read_grant, repository_write_grant, repository_admin_grant
|
||||
from util.http import abort
|
||||
|
@ -157,7 +157,8 @@ def identity_from_bearer_token(bearer_header):
|
|||
logger.debug('Validating auth header: %s', bearer_header)
|
||||
|
||||
try:
|
||||
payload = decode_bearer_header(bearer_header, instance_keys, app.config)
|
||||
payload = decode_bearer_header(bearer_header, instance_keys, app.config,
|
||||
metric_queue=metric_queue)
|
||||
except InvalidBearerTokenException as bte:
|
||||
logger.exception('Invalid bearer token: %s', bte)
|
||||
raise InvalidJWTException(bte)
|
||||
|
|
23
boot.py
23
boot.py
|
@ -6,16 +6,22 @@ from urlparse import urlunparse
|
|||
from jinja2 import Template
|
||||
from cachetools import lru_cache
|
||||
|
||||
import logging
|
||||
import release
|
||||
import os.path
|
||||
|
||||
from app import app
|
||||
from data.model import ServiceKeyDoesNotExist
|
||||
from data.model.release import set_region_release
|
||||
from data.model.service_keys import get_service_key
|
||||
from util.config.database import sync_database_with_config
|
||||
from util.generatepresharedkey import generate_key
|
||||
from _init import CONF_DIR
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_audience():
|
||||
audience = app.config.get('JWTPROXY_AUDIENCE')
|
||||
|
@ -44,8 +50,21 @@ def setup_jwt_proxy():
|
|||
Creates a service key for quay to use in the jwtproxy and generates the JWT proxy configuration.
|
||||
"""
|
||||
if os.path.exists(os.path.join(CONF_DIR, 'jwtproxy_conf.yaml')):
|
||||
# Proxy is already setup.
|
||||
return
|
||||
# Proxy is already setup. Make sure the service key is still valid.
|
||||
try:
|
||||
with open(app.config['INSTANCE_SERVICE_KEY_KID_LOCATION']) as f:
|
||||
quay_key_id = f.read()
|
||||
|
||||
try:
|
||||
get_service_key(quay_key_id, approved_only=False)
|
||||
return
|
||||
except ServiceKeyDoesNotExist:
|
||||
logger.exception('Could not find non-expired existing service key %s; creating a new one',
|
||||
quay_key_id)
|
||||
|
||||
# Found a valid service key, so exiting.
|
||||
except IOError:
|
||||
logger.exception('Could not load existing service key; creating a new one')
|
||||
|
||||
# Generate the key for this Quay instance to use.
|
||||
minutes_until_expiration = app.config.get('INSTANCE_SERVICE_KEY_EXPIRATION', 120)
|
||||
|
|
|
@ -119,7 +119,7 @@ class RedisBuildLogs(object):
|
|||
args.update({'socket_connect_timeout': 1, 'socket_timeout': 1})
|
||||
|
||||
connection = redis.StrictRedis(**args)
|
||||
if not connection.ping() == True:
|
||||
if not connection.ping():
|
||||
return (False, 'Could not ping redis')
|
||||
|
||||
# Ensure we can write and read a key.
|
||||
|
|
|
@ -16,6 +16,7 @@ def check_health(app_config):
|
|||
|
||||
# We will connect to the db, check that it contains some team role kinds
|
||||
try:
|
||||
return (bool(list(TeamRole.select().limit(1))), 'Could not connect to the database')
|
||||
okay = bool(list(TeamRole.select().limit(1)))
|
||||
return (okay, 'Could not connect to the database' if not okay else None)
|
||||
except Exception as ex:
|
||||
return (False, 'Could not connect to the database: %s' % ex.message)
|
||||
|
|
|
@ -53,7 +53,7 @@ class HealthCheck(object):
|
|||
service_status_expanded = {}
|
||||
|
||||
for service_name in service_statuses:
|
||||
status, err = service_statuses[service_name]
|
||||
status, message = service_statuses[service_name]
|
||||
|
||||
service_statuses_bools[service_name] = status
|
||||
service_status_expanded[service_name] = {
|
||||
|
@ -61,7 +61,9 @@ class HealthCheck(object):
|
|||
}
|
||||
|
||||
if not status:
|
||||
service_status_expanded[service_name]['failure'] = err
|
||||
service_status_expanded[service_name]['failure'] = message
|
||||
elif message:
|
||||
service_status_expanded[service_name]['message'] = message
|
||||
|
||||
if skip and service_name in skip:
|
||||
notes.append('%s skipped in compute health' % service_name)
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import logging
|
||||
from app import build_logs, storage, authentication
|
||||
from app import build_logs, storage, authentication, instance_keys
|
||||
from health.models_pre_oci import pre_oci_model as model
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_gunicorn(endpoint):
|
||||
def fn(app):
|
||||
""" Returns the status of the gunicorn workers. """
|
||||
|
@ -23,7 +22,9 @@ def _check_gunicorn(endpoint):
|
|||
registry_url = '%s://localhost%s/%s' % (scheme, port, endpoint)
|
||||
try:
|
||||
status_code = client.get(registry_url, verify=False, timeout=2).status_code
|
||||
return (status_code == 200, 'Got non-200 response for worker: %s' % status_code)
|
||||
okay = status_code == 200
|
||||
message = 'Got non-200 response for worker: %s' % status_code if not okay else None
|
||||
return (okay, message)
|
||||
except Exception as ex:
|
||||
logger.exception('Exception when checking worker health: %s', registry_url)
|
||||
return (False, 'Exception when checking worker health: %s' % registry_url)
|
||||
|
@ -50,11 +51,37 @@ def _check_storage(app):
|
|||
logger.exception('Storage check failed with exception %s', ex)
|
||||
return (False, 'Storage check failed with exception %s' % ex.message)
|
||||
|
||||
|
||||
def _check_auth(app):
|
||||
""" Returns the status of the auth engine, as accessed from this instance. """
|
||||
return authentication.ping()
|
||||
|
||||
|
||||
def _check_service_key(app):
|
||||
""" Returns the status of the service key for this instance. If the key has disappeared or
|
||||
has expired, then will return False.
|
||||
"""
|
||||
if not app.config.get('SETUP_COMPLETE', False):
|
||||
return (True, 'Stack not fully setup; skipping check')
|
||||
|
||||
try:
|
||||
kid = instance_keys.local_key_id
|
||||
except IOError as ex:
|
||||
# Key has not been created yet.
|
||||
return (True, 'Stack not fully setup; skipping check')
|
||||
|
||||
try:
|
||||
key_is_valid = bool(instance_keys.get_service_key_public_key(kid))
|
||||
message = 'Could not find valid instance service key %s' % kid if not key_is_valid else None
|
||||
return (key_is_valid, message)
|
||||
except Exception as ex:
|
||||
logger.exception('Got exception when trying to retrieve the instance key')
|
||||
|
||||
# NOTE: We return *True* here if there was an exception when retrieving the key, as it means
|
||||
# the database is down, which will be handled by the database health check.
|
||||
return (True, 'Failed to get instance key due to a database issue; skipping check')
|
||||
|
||||
|
||||
_SERVICES = {
|
||||
'registry_gunicorn': _check_gunicorn('v1/_internal_ping'),
|
||||
'web_gunicorn': _check_gunicorn('_internal_ping'),
|
||||
|
@ -63,6 +90,7 @@ _SERVICES = {
|
|||
'redis': _check_redis,
|
||||
'storage': _check_storage,
|
||||
'auth': _check_auth,
|
||||
'service_key': _check_service_key,
|
||||
}
|
||||
|
||||
def check_all_services(app, skip):
|
||||
|
|
|
@ -30,7 +30,7 @@ from jwkest.jwk import RSAKey
|
|||
|
||||
import endpoints.decorated # required for side effect
|
||||
|
||||
from app import app, storage, instance_keys, get_app_url
|
||||
from app import app, storage, instance_keys, get_app_url, metric_queue
|
||||
from data.database import close_db_filter, configure, DerivedStorageForImage, QueueItem, Image
|
||||
from data import model
|
||||
from digest.checksums import compute_simple
|
||||
|
@ -2491,7 +2491,7 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base
|
|||
encoded = response.json()['token']
|
||||
header = 'Bearer ' + encoded
|
||||
|
||||
payload = decode_bearer_header(header, instance_keys, app.config)
|
||||
payload = decode_bearer_header(header, instance_keys, app.config, metric_queue=metric_queue)
|
||||
self.assertIsNotNone(payload)
|
||||
|
||||
if scope is None:
|
||||
|
|
|
@ -102,6 +102,18 @@ class MetricQueue(object):
|
|||
self.org_count = prom.create_gauge('org_count', 'Number of Organizations')
|
||||
self.robot_count = prom.create_gauge('robot_count', 'Number of robot accounts')
|
||||
|
||||
self.instance_key_renewal_success = prom.create_counter('instance_key_renewal_success',
|
||||
'Instance Key Renewal Success Count',
|
||||
labelnames=['key_id'])
|
||||
|
||||
self.instance_key_renewal_failure = prom.create_counter('instance_key_renewal_failure',
|
||||
'Instance Key Renewal Failure Count',
|
||||
labelnames=['key_id'])
|
||||
|
||||
self.invalid_instance_key_count = prom.create_counter('invalid_registry_instance_key_count',
|
||||
'Invalid registry instance key count',
|
||||
labelnames=['key_id'])
|
||||
|
||||
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
|
||||
# provider.
|
||||
self._queue = None
|
||||
|
|
|
@ -22,7 +22,7 @@ class InvalidBearerTokenException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def decode_bearer_header(bearer_header, instance_keys, config):
|
||||
def decode_bearer_header(bearer_header, instance_keys, config, metric_queue=None):
|
||||
""" decode_bearer_header decodes the given bearer header that contains an encoded JWT with both
|
||||
a Key ID as well as the signed JWT and returns the decoded and validated JWT. On any error,
|
||||
raises an InvalidBearerTokenException with the reason for failure.
|
||||
|
@ -34,10 +34,10 @@ def decode_bearer_header(bearer_header, instance_keys, config):
|
|||
|
||||
encoded_jwt = match.group(1)
|
||||
logger.debug('encoded JWT: %s', encoded_jwt)
|
||||
return decode_bearer_token(encoded_jwt, instance_keys, config)
|
||||
return decode_bearer_token(encoded_jwt, instance_keys, config, metric_queue=metric_queue)
|
||||
|
||||
|
||||
def decode_bearer_token(bearer_token, instance_keys, config):
|
||||
def decode_bearer_token(bearer_token, instance_keys, config, metric_queue=None):
|
||||
""" decode_bearer_token decodes the given bearer token that contains both a Key ID as well as the
|
||||
encoded JWT and returns the decoded and validated JWT. On any error, raises an
|
||||
InvalidBearerTokenException with the reason for failure.
|
||||
|
@ -52,6 +52,9 @@ def decode_bearer_token(bearer_token, instance_keys, config):
|
|||
# Find the matching public key.
|
||||
public_key = instance_keys.get_service_key_public_key(kid)
|
||||
if public_key is None:
|
||||
if metric_queue is not None:
|
||||
metric_queue.invalid_instance_key_count.Inc(labelvalues=[kid])
|
||||
|
||||
logger.error('Could not find requested service key %s with encoded JWT: %s', kid, bearer_token)
|
||||
raise InvalidBearerTokenException('Unknown service key')
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app import app, instance_keys
|
||||
from app import app, instance_keys, metric_queue
|
||||
from workers.servicekeyworker.models_pre_oci import pre_oci_model as model
|
||||
from workers.worker import Worker
|
||||
|
||||
|
@ -18,10 +18,22 @@ class ServiceKeyWorker(Worker):
|
|||
"""
|
||||
Refreshes the instance's active service key so it doesn't get garbage collected.
|
||||
"""
|
||||
expiration = timedelta(minutes=instance_keys.service_key_expiration)
|
||||
logger.debug('Starting refresh of automatic service keys')
|
||||
model.set_key_expiration(instance_keys.local_key_id, datetime.utcnow() + expiration)
|
||||
logger.debug('Finished refresh of automatic service keys')
|
||||
expiration_time = timedelta(minutes=instance_keys.service_key_expiration)
|
||||
new_expiration = datetime.utcnow() + expiration_time
|
||||
|
||||
logger.debug('Starting automatic refresh of service key %s to new expiration %s',
|
||||
instance_keys.local_key_id, new_expiration)
|
||||
try:
|
||||
model.set_key_expiration(instance_keys.local_key_id, new_expiration)
|
||||
except Exception as ex:
|
||||
logger.exception('Failure for automatic refresh of service key %s with new expiration %s',
|
||||
instance_keys.local_key_id, new_expiration)
|
||||
metric_queue.instance_key_renewal_failure.Inc(labelvalues=[instance_keys.local_key_id])
|
||||
raise ex
|
||||
|
||||
logger.debug('Finished automatic refresh of service key %s with new expiration %s',
|
||||
instance_keys.local_key_id, new_expiration)
|
||||
metric_queue.instance_key_renewal_success.Inc(labelvalues=[instance_keys.local_key_id])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Reference in a new issue