Split the app into separate backends, which can use different worker types and different timeouts.

This commit is contained in:
Jake Moshenko 2014-10-14 13:58:08 -04:00
parent adc915a5eb
commit 328db8b660
24 changed files with 178 additions and 117 deletions

View file

@ -34,7 +34,9 @@ ADD conf/init/doupdatelimits.sh /etc/my_init.d/
ADD conf/init/preplogsdir.sh /etc/my_init.d/
ADD conf/init/runmigration.sh /etc/my_init.d/
ADD conf/init/gunicorn /etc/service/gunicorn
ADD conf/init/gunicorn_web /etc/service/gunicorn_web
ADD conf/init/gunicorn_registry /etc/service/gunicorn_registry
ADD conf/init/gunicorn_verbs /etc/service/gunicorn_verbs
ADD conf/init/nginx /etc/service/nginx
ADD conf/init/diffsworker /etc/service/diffsworker
ADD conf/init/notificationworker /etc/service/notificationworker

39
app.py
View file

@ -3,7 +3,7 @@ import os
import json
import yaml
from flask import Flask as BaseFlask, Config as BaseConfig
from flask import Flask as BaseFlask, Config as BaseConfig, request, Request
from flask.ext.principal import Principal
from flask.ext.login import LoginManager
from flask.ext.mail import Mail
@ -18,12 +18,12 @@ from data.users import UserAuthentication
from util.analytics import Analytics
from util.exceptionlog import Sentry
from util.queuemetrics import QueueMetrics
from util.names import urn_generator
from data.billing import Billing
from data.buildlogs import BuildLogs
from data.archivedlogs import LogArchive
from data.queue import WorkQueue
from data.userevent import UserEventsBuilderModule
from datetime import datetime
class Config(BaseConfig):
@ -60,6 +60,7 @@ LICENSE_FILENAME = 'conf/stack/license.enc'
app = Flask(__name__)
logger = logging.getLogger(__name__)
profile = logging.getLogger('profile')
if 'TEST' in os.environ:
@ -82,6 +83,37 @@ else:
environ_config = json.loads(os.environ.get(OVERRIDE_CONFIG_KEY, '{}'))
app.config.update(environ_config)
app.teardown_request(database.close_db_filter)
class RequestWithId(Request):
request_gen = staticmethod(urn_generator(['request']))
def __init__(self, *args, **kwargs):
super(RequestWithId, self).__init__(*args, **kwargs)
self.request_id = self.request_gen()
@app.before_request
def _request_start():
profile.debug('Starting request: %s', request.path)
@app.after_request
def _request_end(r):
profile.debug('Ending request: %s', request.path)
return r
class InjectingFilter(logging.Filter):
def filter(self, record):
record.msg = '[%s] %s' % (request.request_id, record.msg)
return True
profile.addFilter(InjectingFilter())
app.request_class = RequestWithId
features.import_features(app.config)
Principal(app, use_sessions=False)
@ -105,9 +137,6 @@ dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf
reporter=queue_metrics.report)
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
# TODO: Remove this in the prod push following the notifications change.
webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME'], tf)
database.configure(app.config)
model.config.app_config = app.config
model.config.store = storage

View file

@ -1,90 +1,14 @@
import logging
import logging.config
import uuid
from peewee import Proxy
from app import app as application
from flask import request, Request
from util.names import urn_generator
from data.database import db as model_db, read_slave
# Turn off debug logging for boto
logging.getLogger('boto').setLevel(logging.CRITICAL)
from endpoints.api import api_bp
from endpoints.index import index
from endpoints.web import web
from endpoints.tags import tags
from endpoints.registry import registry
from endpoints.verbs import verbs
from endpoints.webhooks import webhooks
from endpoints.realtime import realtime
from endpoints.callbacks import callback
from logentries import LogentriesHandler
logger = logging.getLogger(__name__)
# Bind all of the blueprints
import web
import verbs
import registry
werkzeug = logging.getLogger('werkzeug')
werkzeug.setLevel(logging.DEBUG)
profile = logging.getLogger('profile')
profile.setLevel(logging.DEBUG)
logentries_key = application.config.get('LOGENTRIES_KEY', None)
if logentries_key:
logger.debug('Initializing logentries with key: %s' % logentries_key)
werkzeug.addHandler(LogentriesHandler(logentries_key))
profile.addHandler(LogentriesHandler(logentries_key))
application.register_blueprint(web)
application.register_blueprint(callback, url_prefix='/oauth2')
application.register_blueprint(index, url_prefix='/v1')
application.register_blueprint(tags, url_prefix='/v1')
application.register_blueprint(registry, url_prefix='/v1')
application.register_blueprint(verbs, url_prefix='/c1')
application.register_blueprint(api_bp, url_prefix='/api')
application.register_blueprint(webhooks, url_prefix='/webhooks')
application.register_blueprint(realtime, url_prefix='/realtime')
class RequestWithId(Request):
request_gen = staticmethod(urn_generator(['request']))
def __init__(self, *args, **kwargs):
super(RequestWithId, self).__init__(*args, **kwargs)
self.request_id = self.request_gen()
@application.before_request
def _request_start():
profile.debug('Starting request: %s', request.path)
@application.after_request
def _request_end(r):
profile.debug('Ending request: %s', request.path)
return r
class InjectingFilter(logging.Filter):
def filter(self, record):
record.msg = '[%s] %s' % (request.request_id, record.msg)
return True
profile.addFilter(InjectingFilter())
def close_db(exc):
db = model_db
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
if read_slave.obj is not None and not read_slave.is_closed():
logger.debug('Disconnecting from read slave.')
read_slave.close()
application.teardown_request(close_db)
application.request_class = RequestWithId
if __name__ == '__main__':
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)

View file

@ -1,4 +1,4 @@
bind = 'unix:/tmp/gunicorn.sock'
bind = 'unix:/tmp/gunicorn_registry.sock'
workers = 16
worker_class = 'gevent'
timeout = 2000

6
conf/gunicorn_verbs.py Normal file
View file

@ -0,0 +1,6 @@
bind = 'unix:/tmp/gunicorn_verbs.sock'
workers = 8
timeout = 2000
logconfig = 'conf/logging.conf'
pythonpath = '.'
preload_app = True

7
conf/gunicorn_web.py Normal file
View file

@ -0,0 +1,7 @@
bind = 'unix:/tmp/gunicorn_web.sock'
workers = 2
worker_class = 'gevent'
timeout = 30
logconfig = 'conf/logging.conf'
pythonpath = '.'
preload_app = True

View file

@ -14,8 +14,12 @@ gzip_types text/plain text/xml text/css
text/javascript application/x-javascript
application/octet-stream;
upstream app_server {
server unix:/tmp/gunicorn.sock fail_timeout=0;
# For a TCP configuration:
# server 192.168.0.7:8000 fail_timeout=0;
upstream web_app_server {
server unix:/tmp/gunicorn_web.sock fail_timeout=0;
}
upstream verbs_app_server {
server unix:/tmp/gunicorn_verbs.sock fail_timeout=0;
}
upstream registry_app_server {
server unix:/tmp/gunicorn_registry.sock fail_timeout=0;
}

View file

@ -1,2 +0,0 @@
#!/bin/sh
exec svlogd /var/log/gunicorn/

View file

@ -1,8 +0,0 @@
#! /bin/bash
echo 'Starting gunicon'
cd /
venv/bin/gunicorn -c conf/gunicorn_config.py application:application
echo 'Gunicorn exited'

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec svlogd /var/log/gunicorn_registry/

View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting gunicon'
cd /
venv/bin/gunicorn -c conf/gunicorn_registry.py registry:application
echo 'Gunicorn exited'

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec svlogd /var/log/gunicorn_verbs/

8
conf/init/gunicorn_verbs/run Executable file
View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting gunicon'
cd /
nice -10 venv/bin/gunicorn -c conf/gunicorn_verbs.py verbs:application
echo 'Gunicorn exited'

2
conf/init/gunicorn_web/log/run Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec svlogd /var/log/gunicorn_web/

8
conf/init/gunicorn_web/run Executable file
View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting gunicon'
cd /
venv/bin/gunicorn -c conf/gunicorn_web.py web:application
echo 'Gunicorn exited'

View file

@ -17,6 +17,14 @@ qualname=application.profiler
level=DEBUG
handlers=console
[logger_boto]
level=INFO
handlers=console
[logger_werkzeug]
level=DEBUG
handlers=console
[logger_gunicorn.error]
level=INFO
handlers=console

View file

@ -1,4 +1,3 @@
client_max_body_size 20G;
client_body_temp_path /var/log/nginx/client_body 1 2;
server_name _;
@ -11,17 +10,35 @@ if ($args ~ "_escaped_fragment_") {
rewrite ^ /snapshot$uri;
}
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_set_header Transfer-Encoding $http_transfer_encoding;
location / {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_pass http://web_app_server;
}
location /v1/ {
proxy_buffering off;
proxy_request_buffering off;
proxy_set_header Transfer-Encoding $http_transfer_encoding;
proxy_pass http://app_server;
proxy_pass http://registry_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
client_max_body_size 20G;
}
location /c1/ {
proxy_buffering off;
proxy_request_buffering off;
proxy_pass http://verbs_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
}

View file

@ -132,9 +132,6 @@ class DefaultConfig(object):
DIFFS_QUEUE_NAME = 'imagediff'
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
# TODO: Remove this in the prod push following the notifications change.
WEBHOOK_QUEUE_NAME = 'webhook'
# Super user config. Note: This MUST BE an empty list for the default config.
SUPER_USERS = []

View file

@ -7,9 +7,9 @@ from datetime import datetime
from peewee import *
from data.read_slave import ReadSlaveModel
from sqlalchemy.engine.url import make_url
from urlparse import urlparse
from util.names import urn_generator
logger = logging.getLogger(__name__)
@ -80,6 +80,16 @@ def uuid_generator():
return str(uuid.uuid4())
def close_db_filter(_):
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
if read_slave.obj is not None and not read_slave.is_closed():
logger.debug('Disconnecting from read slave.')
read_slave.close()
class BaseModel(ReadSlaveModel):
class Meta:
database = db

13
registry.py Normal file
View file

@ -0,0 +1,13 @@
import logging
import logging.config
from app import app as application
from endpoints.index import index
from endpoints.tags import tags
from endpoints.registry import registry
application.register_blueprint(index, url_prefix='/v1')
application.register_blueprint(tags, url_prefix='/v1')
application.register_blueprint(registry, url_prefix='/v1')

View file

@ -1,7 +1,6 @@
from datetime import datetime, timedelta
from config import DefaultConfig
from test.testlogs import TestBuildLogs
class FakeTransaction(object):

View file

@ -3,7 +3,6 @@ import logging
import multiprocessing
import os
import time
import gipc
import sys
import traceback
@ -31,7 +30,7 @@ class QueueProcess(object):
@staticmethod
def run_process(target, args):
gipc.start_process(target=target, args=args)
Process(target=target, args=args).start()
def run(self):
# Important! gipc is used here because normal multiprocessing does not work
@ -50,9 +49,9 @@ def _run(get_producer, queues, chunk_size, args):
for queue in queues:
try:
queue.put(data, block=True, timeout=10)
queue.put(data, block=True)
except Exception as ex:
# One of the listeners stopped listening.
logger.exception('Exception writing to queue.')
return
if data is None or isinstance(data, Exception):

9
verbs.py Normal file
View file

@ -0,0 +1,9 @@
import logging
import logging.config
from app import app as application
from endpoints.verbs import verbs
application.register_blueprint(verbs, url_prefix='/c1')

17
web.py Normal file
View file

@ -0,0 +1,17 @@
import logging
import logging.config
from app import app as application
from endpoints.api import api_bp
from endpoints.web import web
from endpoints.webhooks import webhooks
from endpoints.realtime import realtime
from endpoints.callbacks import callback
application.register_blueprint(web)
application.register_blueprint(callback, url_prefix='/oauth2')
application.register_blueprint(api_bp, url_prefix='/api')
application.register_blueprint(webhooks, url_prefix='/webhooks')
application.register_blueprint(realtime, url_prefix='/realtime')