Upgrade to the 0.11.1 tutum version of docker. Package it as a Dockerfile using Docker in Docker. Add a status server option to the workers to utilize the new termination signal and status features of gantry.

This commit is contained in:
Jake Moshenko 2014-05-16 18:31:24 -04:00
parent 8b5c781f84
commit cc47e77156
12 changed files with 328 additions and 99 deletions

View file

@ -1,30 +1,35 @@
to prepare a new build node host starting from a 14.04 base server:
to build and upload the builder to quay
```
sudo apt-get update
sudo apt-get install -y git python-virtualenv python-dev phantomjs libjpeg8 libjpeg62-dev libfreetype6 libfreetype6-dev libevent-dev gdebi-core lxc
```
check out the code, install the kernel, custom docker, nsexec, and reboot:
```
git clone https://bitbucket.org/yackob03/quay.git
curl -s https://get.docker.io/ubuntu/ | sudo sh
sudo apt-get update && sudo apt-get install -y git
git clone git clone https://bitbucket.org/yackob03/quay.git
cd quay
sudo gdebi --n binary_dependencies/builder/nsexec_1.22ubuntu1trusty1_amd64.deb
sudo gdebi --n binary_dependencies/builder/lxc-docker-0.9.0_0.9.0-20140501212101-72572f0-dirty_amd64.deb
sudo usermod -v 100000-200000 -w 100000-200000 root
sudo chmod +x /var/lib/lxc
sudo chmod +x /var/lib/docker
cd ~
git clone https://bitbucket.org/yackob03/quayconfig.git
ln -s ../../quayconfig/production/ quay/conf/stack
rm Dockerfile
ln -s Dockerfile.buildworker Dockerfile
sudo docker build -t quay.io/quay/builder .
sudo docker push quay.io/quay/builder
```
to run the code from a fresh 14.04 server:
```
sudo apt-get update && sudo apt-get install -y git lxc linux-image-extra-`uname -r`
curl -s https://get.docker.io/ubuntu/ | sudo sh
git clone https://github.com/DevTable/gantryd.git
cd gantryd
cat requirements.system | xargs sudo apt-get install -y
virtualenv --distribute venv
venv/bin/pip install -r requirements.txt
sudo docker login -p 9Y1PX7D3IE4KPSGCIALH17EM5V3ZTMP8CNNHJNXAQ2NJGAS48BDH8J1PUOZ869ML -u 'quay+deploy' -e notused quay.io
```
start the worker
```
cd quay
virtualenv --distribute venv
venv/bin/pip install -r requirements.txt
sudo venv/bin/python -m workers.dockerfilebuild -D
cd ~
git clone https://bitbucket.org/yackob03/quayconfig.git
sudo docker pull quay.io/quay/builder
cd ~/gantryd
sudo venv/bin/python gantry.py ../quayconfig/production/gantry.json update builder
```

View file

@ -573,4 +573,4 @@ else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
worker.start()
worker.start(start_status_server_port=8000)

View file

@ -1,11 +1,16 @@
import logging
import json
import signal
import sys
from threading import Event
from apscheduler.scheduler import Scheduler
from datetime import datetime, timedelta
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from time import sleep
from data.model import db
logger = logging.getLogger(__name__)
@ -23,6 +28,36 @@ class WorkerUnhealthyException(Exception):
pass
class WorkerStatusServer(HTTPServer):
def __init__(self, worker, *args, **kwargs):
HTTPServer.__init__(self, *args, **kwargs)
self.worker = worker
class WorkerStatusHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/status':
# Return the worker status
code = 200 if self.server.worker.is_healthy() else 503
self.send_response(code)
elif self.path == '/terminate':
# Return whether it is safe to terminate the worker process
code = 200 if self.server.worker.is_terminated() else 503
self.send_response(code)
else:
self.send_error(404)
def do_POST(self):
if self.path == '/terminate':
try:
self.server.worker.join()
self.send_response(200)
except:
self.send_response(500)
else:
self.send_error(404)
class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
watchdog_period_seconds=60):
@ -31,6 +66,7 @@ class Worker(object):
self._reservation_seconds = reservation_seconds
self._watchdog_period_seconds = watchdog_period_seconds
self._stop = Event()
self._terminated = Event()
self._queue = queue
self.current_queue_item = None
@ -42,6 +78,17 @@ class Worker(object):
""" Function that gets run once every watchdog_period_seconds. """
pass
def _close_db_handle(self):
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
def is_healthy(self):
return not self._stop.is_set()
def is_terminated(self):
return self._terminated.is_set()
def extend_processing(self, seconds_from_now):
if self.current_queue_item is not None:
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
@ -51,7 +98,7 @@ class Worker(object):
self.current_queue_item = self._queue.get()
while self.current_queue_item:
logger.debug('Queue gave us some work: %s' % self.current_queue_item.body)
logger.debug('Queue gave us some work: %s', self.current_queue_item.body)
job_details = json.loads(self.current_queue_item.body)
@ -68,13 +115,24 @@ class Worker(object):
finally:
self.current_queue_item = None
# Close the db handle periodically
self._close_db_handle()
if not self._stop.is_set():
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
if not self._stop.is_set():
logger.debug('No more work.')
def start(self):
def start(self, start_status_server_port=None):
if start_status_server_port is not None:
# Start a status server on a thread
server_address = ('', start_status_server_port)
httpd = WorkerStatusServer(self, server_address, WorkerStatusHandler)
server_thread = Thread(target=httpd.serve_forever)
server_thread.daemon = True
server_thread.start()
logger.debug("Scheduling worker.")
soon = datetime.now() + timedelta(seconds=.001)
@ -84,8 +142,8 @@ class Worker(object):
start_date=soon)
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
signal.signal(signal.SIGTERM, self.join)
signal.signal(signal.SIGINT, self.join)
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
while not self._stop.wait(1):
pass
@ -94,11 +152,25 @@ class Worker(object):
self._sched.shutdown()
logger.debug('Finished.')
def join(self, signal_num=None, stack_frame=None):
logger.debug('Shutting down worker gracefully.')
self._stop.set()
self._terminated.set()
# Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=True)
# Wait forever if we're running a server
while start_status_server_port is not None:
sleep(60)
def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set():
sys.exit(1)
else:
logger.debug('Shutting down worker.')
self._stop.set()
if not graceful:
# Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=True)
def join(self):
self.terminate(graceful=True)