Enable caching of blobs in V2 registry protocol, to avoid DB connections after the cache has been loaded
This should help for bursty pull traffic, as it will avoid DB connections on a huge % of requests
This commit is contained in:
parent
db6007cb37
commit
b2485934ed
5 changed files with 112 additions and 13 deletions
12
app.py
12
app.py
|
@ -23,6 +23,7 @@ from data import model
|
||||||
from data.archivedlogs import LogArchive
|
from data.archivedlogs import LogArchive
|
||||||
from data.billing import Billing
|
from data.billing import Billing
|
||||||
from data.buildlogs import BuildLogs
|
from data.buildlogs import BuildLogs
|
||||||
|
from data.cache import InMemoryDataModelCache
|
||||||
from data.model.user import LoginWrappedDBUser
|
from data.model.user import LoginWrappedDBUser
|
||||||
from data.queue import WorkQueue, BuildMetricQueueReporter
|
from data.queue import WorkQueue, BuildMetricQueueReporter
|
||||||
from data.userevent import UserEventsBuilderModule
|
from data.userevent import UserEventsBuilderModule
|
||||||
|
@ -160,16 +161,7 @@ def _request_end(resp):
|
||||||
if request.user_agent is not None:
|
if request.user_agent is not None:
|
||||||
extra["user-agent"] = request.user_agent.string
|
extra["user-agent"] = request.user_agent.string
|
||||||
|
|
||||||
user = get_authenticated_user()
|
|
||||||
|
|
||||||
if user:
|
|
||||||
extra['user'] = {'email': user.email,
|
|
||||||
'uuid': user.uuid,
|
|
||||||
'org': user.organization,
|
|
||||||
'robot': user.robot}
|
|
||||||
|
|
||||||
logger.info("request-end", extra=extra)
|
logger.info("request-end", extra=extra)
|
||||||
|
|
||||||
logger.debug('Ending request: %s', request.path)
|
logger.debug('Ending request: %s', request.path)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
@ -188,6 +180,8 @@ Principal(app, use_sessions=False)
|
||||||
|
|
||||||
tf = app.config['DB_TRANSACTION_FACTORY']
|
tf = app.config['DB_TRANSACTION_FACTORY']
|
||||||
|
|
||||||
|
# TODO(jschorr): make this configurable
|
||||||
|
model_cache = InMemoryDataModelCache()
|
||||||
|
|
||||||
avatar = Avatar(app)
|
avatar = Avatar(app)
|
||||||
login_manager = LoginManager(app)
|
login_manager = LoginManager(app)
|
||||||
|
|
|
@ -7,9 +7,10 @@ from flask import url_for, request, redirect, Response, abort as flask_abort
|
||||||
import bitmath
|
import bitmath
|
||||||
import resumablehashlib
|
import resumablehashlib
|
||||||
|
|
||||||
from app import storage, app, get_app_url, metric_queue
|
from app import storage, app, get_app_url, metric_queue, model_cache
|
||||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||||
from data import database
|
from data import database
|
||||||
|
from data.cache import cache_key
|
||||||
from digest import digest_tools
|
from digest import digest_tools
|
||||||
from endpoints.decorators import anon_protect, parse_repository_name
|
from endpoints.decorators import anon_protect, parse_repository_name
|
||||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||||
|
@ -33,6 +34,18 @@ class _InvalidRangeHeader(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _get_repository_blob(namespace_name, repo_name, digest):
|
||||||
|
""" Returns the blob with the given digest under the repository with the given
|
||||||
|
name. If one does not exist (or it is still uploading), returns None.
|
||||||
|
Automatically handles caching.
|
||||||
|
"""
|
||||||
|
def load_blob():
|
||||||
|
return model.get_blob_by_digest(namespace_name, repo_name, digest)
|
||||||
|
|
||||||
|
blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest)
|
||||||
|
return model_cache.retrieve(blob_cache_key, load_blob)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
||||||
@parse_repository_name()
|
@parse_repository_name()
|
||||||
@process_registry_jwt_auth(scopes=['pull'])
|
@process_registry_jwt_auth(scopes=['pull'])
|
||||||
|
@ -41,7 +54,7 @@ class _InvalidRangeHeader(Exception):
|
||||||
@cache_control(max_age=31436000)
|
@cache_control(max_age=31436000)
|
||||||
def check_blob_exists(namespace_name, repo_name, digest):
|
def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
# Find the blob.
|
# Find the blob.
|
||||||
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
|
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
||||||
if blob is None:
|
if blob is None:
|
||||||
raise BlobUnknown()
|
raise BlobUnknown()
|
||||||
|
|
||||||
|
@ -49,7 +62,8 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
headers = {
|
headers = {
|
||||||
'Docker-Content-Digest': digest,
|
'Docker-Content-Digest': digest,
|
||||||
'Content-Length': blob.size,
|
'Content-Length': blob.size,
|
||||||
'Content-Type': BLOB_CONTENT_TYPE,}
|
'Content-Type': BLOB_CONTENT_TYPE,
|
||||||
|
}
|
||||||
|
|
||||||
# If our storage supports range requests, let the client know.
|
# If our storage supports range requests, let the client know.
|
||||||
if storage.get_supports_resumable_downloads(blob.locations):
|
if storage.get_supports_resumable_downloads(blob.locations):
|
||||||
|
@ -67,7 +81,7 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
@cache_control(max_age=31536000)
|
@cache_control(max_age=31536000)
|
||||||
def download_blob(namespace_name, repo_name, digest):
|
def download_blob(namespace_name, repo_name, digest):
|
||||||
# Find the blob.
|
# Find the blob.
|
||||||
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
|
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
||||||
if blob is None:
|
if blob is None:
|
||||||
raise BlobUnknown()
|
raise BlobUnknown()
|
||||||
|
|
||||||
|
|
59
endpoints/v2/test/test_blob.py
Normal file
59
endpoints/v2/test/test_blob.py
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
import hashlib
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mock import patch
|
||||||
|
from flask import url_for
|
||||||
|
from playhouse.test_utils import assert_query_count
|
||||||
|
|
||||||
|
from app import instance_keys, app as realapp
|
||||||
|
from data import model
|
||||||
|
from data.cache import InMemoryDataModelCache
|
||||||
|
from data.database import ImageStorageLocation
|
||||||
|
from endpoints.test.shared import conduct_call
|
||||||
|
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
|
||||||
|
from test.fixtures import *
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('method, endpoint', [
|
||||||
|
('GET', 'download_blob'),
|
||||||
|
('HEAD', 'check_blob_exists'),
|
||||||
|
])
|
||||||
|
def test_blob_caching(method, endpoint, client, app):
|
||||||
|
digest = 'sha256:' + hashlib.sha256("a").hexdigest()
|
||||||
|
location = ImageStorageLocation.get(name='local_us')
|
||||||
|
model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000)
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'repository': 'devtable/simple',
|
||||||
|
'digest': digest,
|
||||||
|
}
|
||||||
|
|
||||||
|
user = model.user.get_user('devtable')
|
||||||
|
access = [{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'devtable/simple',
|
||||||
|
'actions': ['pull'],
|
||||||
|
}]
|
||||||
|
|
||||||
|
context, subject = build_context_and_subject(user=user)
|
||||||
|
token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600,
|
||||||
|
instance_keys)
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Authorization': 'Bearer %s' % token,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Run without caching to make sure the request works. This also preloads some of
|
||||||
|
# our global model caches.
|
||||||
|
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()):
|
||||||
|
# First request should make a DB query to retrieve the blob.
|
||||||
|
with assert_query_count(1):
|
||||||
|
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
# Subsequent requests should use the cached blob.
|
||||||
|
with assert_query_count(0):
|
||||||
|
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||||
|
headers=headers)
|
|
@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser
|
||||||
from endpoints.api import api_bp
|
from endpoints.api import api_bp
|
||||||
from endpoints.appr import appr_bp
|
from endpoints.appr import appr_bp
|
||||||
from endpoints.web import web
|
from endpoints.web import web
|
||||||
|
from endpoints.v2 import v2_bp
|
||||||
from endpoints.verbs import verbs as verbs_bp
|
from endpoints.verbs import verbs as verbs_bp
|
||||||
|
|
||||||
from initdb import initialize_database, populate_database
|
from initdb import initialize_database, populate_database
|
||||||
|
@ -173,6 +174,7 @@ def app(appconfig, initialized_db):
|
||||||
app.register_blueprint(appr_bp, url_prefix='/cnr')
|
app.register_blueprint(appr_bp, url_prefix='/cnr')
|
||||||
app.register_blueprint(web, url_prefix='/')
|
app.register_blueprint(web, url_prefix='/')
|
||||||
app.register_blueprint(verbs_bp, url_prefix='/c1')
|
app.register_blueprint(verbs_bp, url_prefix='/c1')
|
||||||
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||||
|
|
||||||
app.config.update(appconfig)
|
app.config.update(appconfig)
|
||||||
return app
|
return app
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import binascii
|
import binascii
|
||||||
|
import copy
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
@ -118,6 +119,19 @@ def addtoken():
|
||||||
return 'OK'
|
return 'OK'
|
||||||
|
|
||||||
|
|
||||||
|
@testbp.route('/breakdatabase', methods=['POST'])
|
||||||
|
def break_database():
|
||||||
|
# Close any existing connection.
|
||||||
|
close_db_filter(None)
|
||||||
|
|
||||||
|
# Reload the database config with an invalid connection.
|
||||||
|
config = copy.copy(app.config)
|
||||||
|
config['DB_URI'] = 'sqlite:///not/a/valid/database'
|
||||||
|
configure(config)
|
||||||
|
|
||||||
|
return 'OK'
|
||||||
|
|
||||||
|
|
||||||
@testbp.route('/reloadapp', methods=['POST'])
|
@testbp.route('/reloadapp', methods=['POST'])
|
||||||
def reload_app():
|
def reload_app():
|
||||||
# Close any existing connection.
|
# Close any existing connection.
|
||||||
|
@ -1674,6 +1688,22 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
||||||
def test_cancel_push(self):
|
def test_cancel_push(self):
|
||||||
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
|
||||||
|
|
||||||
|
def test_with_blob_caching(self):
|
||||||
|
# Add a repository and do a pull, to prime the cache.
|
||||||
|
_, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||||
|
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
|
||||||
|
|
||||||
|
# Purposefully break the database so that we can check if caching works.
|
||||||
|
self.conduct('POST', '/__test/breakdatabase')
|
||||||
|
|
||||||
|
# Attempt to pull the blobs and ensure we get back a result. Since the database is broken,
|
||||||
|
# this will only work if caching is working and no additional queries/connections are made.
|
||||||
|
repo_name = 'devtable/newrepo'
|
||||||
|
for tag_name in manifests:
|
||||||
|
for layer in manifests[tag_name].layers:
|
||||||
|
blob_id = str(layer.digest)
|
||||||
|
self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt')
|
||||||
|
|
||||||
def test_pull_by_checksum(self):
|
def test_pull_by_checksum(self):
|
||||||
# Add a new repository under the user, so we have a real repository to pull.
|
# Add a new repository under the user, so we have a real repository to pull.
|
||||||
_, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
_, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||||
|
|
Reference in a new issue