Make GC of repositories fully async for whitelisted namespaces

This change adds a worker to conduct GC on repositories with garbage every 10s.

Fixes #144
This commit is contained in:
Joseph Schorr 2015-06-19 14:55:44 -04:00 committed by Joseph Schorr
parent acd86008c8
commit 70de107268
6 changed files with 111 additions and 10 deletions

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec logger -i -t gcworker

8
conf/init/service/gcworker/run Executable file
View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting GC worker'
cd /
venv/bin/python -m workers.gcworker 2>&1
echo 'Repository GC exited'

View file

@ -231,3 +231,6 @@ class DefaultConfig(object):
'#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79', '#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79',
'#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b', '#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b',
'#8c6d31', '#ad494a', '#e7ba52', '#a55194'] '#8c6d31', '#ad494a', '#e7ba52', '#a55194']
# Experiment: Async garbage collection
EXP_ASYNC_GARBAGE_COLLECTION = []

View file

@ -4,7 +4,7 @@ from peewee import JOIN_LEFT_OUTER, fn
from datetime import timedelta, datetime from datetime import timedelta, datetime
from data.model import (DataModelException, tag, db_transaction, storage, image, permission, from data.model import (DataModelException, tag, db_transaction, storage, image, permission,
_basequery) _basequery, config)
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User,
Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount, Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp, Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp,
@ -58,7 +58,11 @@ def purge_repository(namespace_name, repository_name):
fetched.delete_instance(recursive=True, delete_nullable=False) fetched.delete_instance(recursive=True, delete_nullable=False)
def find_repository_with_garbage(): def find_repository_with_garbage(filter_list=None):
# TODO(jschorr): Remove the filter once we have turned the experiment on for everyone.
if filter_list is not None and not filter_list:
return None
epoch_timestamp = get_epoch_timestamp() epoch_timestamp = get_epoch_timestamp()
try: try:
@ -72,16 +76,19 @@ def find_repository_with_garbage():
.limit(500) .limit(500)
.alias('candidates')) .alias('candidates'))
if filter_list:
candidates = candidates.where(Namespace.username << filter_list)
found = (RepositoryTag found = (RepositoryTag
.select(candidates.c.repository) .select(candidates.c.repository_id)
.from_(candidates) .from_(candidates)
.order_by(db_random_func()) .order_by(db_random_func())
.get()) .get())
if not found:
if found is None:
return return
return Repository.get(Repository.id == found) return Repository.get(Repository.id == found.repository_id)
except RepositoryTag.DoesNotExist: except RepositoryTag.DoesNotExist:
return None return None
except Repository.DoesNotExist: except Repository.DoesNotExist:
@ -89,11 +96,19 @@ def find_repository_with_garbage():
def garbage_collect_repository(namespace_name, repository_name): def garbage_collect_repository(namespace_name, repository_name):
# If the namespace is the async experiment, don't perform garbage collection here.
# TODO(jschorr): Remove this check once we have turned the experiment on for everyone.
if namespace_name in config.app_config.get('EXP_ASYNC_GARBAGE_COLLECTION', []):
return
repo = get_repository(namespace_name, repository_name) repo = get_repository(namespace_name, repository_name)
garbage_collect_repo(repo) if repo is not None:
garbage_collect_repo(repo)
def garbage_collect_repo(repo): def garbage_collect_repo(repo):
logger.debug('Garbage collecting repository %s', repo.id)
storage_id_whitelist = {} storage_id_whitelist = {}
tag.garbage_collect_tags(repo) tag.garbage_collect_tags(repo)

View file

@ -10,7 +10,7 @@ PUBLIC_USER = 'public'
REPO = 'somerepo' REPO = 'somerepo'
class TestGarbageColection(unittest.TestCase): class TestGarbageCollection(unittest.TestCase):
@staticmethod @staticmethod
def _set_tag_expiration_policy(namespace, expiration_s): def _set_tag_expiration_policy(namespace, expiration_s):
namespace_user = model.user.get_user(namespace) namespace_user = model.user.get_user(namespace)
@ -80,9 +80,11 @@ class TestGarbageColection(unittest.TestCase):
def gcNow(self, repository): def gcNow(self, repository):
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
def deleteTag(self, repository, tag): def deleteTag(self, repository, tag, perform_gc=True):
model.tag.delete_tag(repository.namespace_user.username, repository.name, tag) model.tag.delete_tag(repository.namespace_user.username, repository.name, tag)
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) if perform_gc:
model.repository.garbage_collect_repository(repository.namespace_user.username,
repository.name)
def moveTag(self, repository, tag, docker_image_id): def moveTag(self, repository, tag, docker_image_id):
model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag, model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag,
@ -105,6 +107,47 @@ class TestGarbageColection(unittest.TestCase):
self.fail('Expected image %s to be deleted' % docker_image_id) self.fail('Expected image %s to be deleted' % docker_image_id)
def test_has_garbage(self):
""" Remove all existing repositories, then add one without garbage, check, then add one with
garbage, and check again.
"""
# Delete all existing repos.
for repo in database.Repository.select():
repo.delete_instance(recursive=True)
# Change the time machine expiration on the namespace.
(database.User.update(removed_tag_expiration_s=1000000000)
.where(database.User.username == ADMIN_ACCESS_USER)
.execute())
# Create a repository without any garbage.
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
# Ensure that no repositories are returned by the has garbage check.
self.assertIsNone(model.repository.find_repository_with_garbage())
# Delete a tag.
self.deleteTag(repository, 'latest', perform_gc=False)
# There should still not be any repositories with garbage, due to time machine.
self.assertIsNone(model.repository.find_repository_with_garbage())
# Change the time machine expiration on the namespace.
(database.User.update(removed_tag_expiration_s=0)
.where(database.User.username == ADMIN_ACCESS_USER)
.execute())
# Now we should find the repository for GC.
repository = model.repository.find_repository_with_garbage()
self.assertIsNotNone(repository)
self.assertEquals(REPO, repository.name)
# GC the repository.
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
# There should now be no repositories with garbage.
self.assertIsNone(model.repository.find_repository_with_garbage())
def test_one_tag(self): def test_one_tag(self):
""" Create a repository with a single tag, then remove that tag and verify that the repository """ Create a repository with a single tag, then remove that tag and verify that the repository

30
workers/gcworker.py Normal file
View file

@ -0,0 +1,30 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from app import app
from data.database import UseThenDisconnect
from data.model.repository import find_repository_with_garbage, garbage_collect_repo
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=10)
def garbage_collect_repositories():
""" Performs garbage collection on repositories. """
with UseThenDisconnect(app.config):
repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', []))
if repository is None:
logger.debug('No repository with garbage found')
return False
logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name)
garbage_collect_repo(repository)
logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name)
return True
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()