We have to serialize our build data before sending it to etc.

This commit is contained in:
Jake Moshenko 2014-12-23 14:09:04 -05:00
parent 709e571b78
commit 4e22e22ba1
2 changed files with 15 additions and 11 deletions

View file

@ -3,6 +3,7 @@ import etcd
import uuid import uuid
import calendar import calendar
import os.path import os.path
import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from trollius import From, coroutine, Return, async from trollius import From, coroutine, Return, async
@ -74,7 +75,8 @@ class EphemeralBuilderManager(BaseManager):
if etcd_result.action == ETCD_EXPIRE_RESULT: if etcd_result.action == ETCD_EXPIRE_RESULT:
# Handle the expiration # Handle the expiration
logger.debug('Builder expired, clean up the old build node') logger.debug('Builder expired, clean up the old build node')
async(self._clean_up_old_builder(etcd_result.key, etcd_result._prev_node.value)) job_metadata = json.loads(etcd_result._prev_node.value)
async(self._clean_up_old_builder(etcd_result.key, job_metadata))
def initialize(self, manager_config): def initialize(self, manager_config):
logger.debug('Calling initialize') logger.debug('Calling initialize')
@ -149,7 +151,7 @@ class EphemeralBuilderManager(BaseManager):
} }
try: try:
yield From(self._etcd_client.write(job_key, payload, prevExist=False, ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl))
component = self.register_component(realm, BuildComponent, token=token) component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job self._component_to_job[component] = build_job
except KeyError: except KeyError:
@ -163,7 +165,7 @@ class EphemeralBuilderManager(BaseManager):
# Store the builder in etcd associated with the job id # Store the builder in etcd associated with the job id
payload['builder_id'] = builder_id payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, payload, prevExist=True, ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl))
raise Return(True) raise Return(True)
@ -199,9 +201,10 @@ class EphemeralBuilderManager(BaseManager):
def job_heartbeat(self, build_job): def job_heartbeat(self, build_job):
# Extend the deadline in etcd # Extend the deadline in etcd
job_key = self._etcd_job_key(build_job) job_key = self._etcd_job_key(build_job)
build_job_response = yield From(self._etcd_client.read(job_key)) build_job_metadata_response = yield From(self._etcd_client.read(job_key))
build_job_metadata = json.loads(build_job_metadata_response.value)
max_expiration = datetime.utcfromtimestamp(build_job_response.value['max_expiration']) max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
max_expiration_remaining = max_expiration - datetime.utcnow() max_expiration_remaining = max_expiration - datetime.utcnow()
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds())) max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
@ -210,11 +213,11 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(new_expiration.timetuple()), 'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_response.value['builder_id'], 'builder_id': build_job_metadata['builder_id'],
'max_expiration': build_job_response.value['max_expiration'], 'max_expiration': build_job_metadata['max_expiration'],
} }
yield From(self._etcd_client.write(job_key, payload, ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
self.job_heartbeat_callback(build_job) self.job_heartbeat_callback(build_job)

View file

@ -2,6 +2,7 @@ import unittest
import etcd import etcd
import os.path import os.path
import time import time
import json
from trollius import coroutine, get_event_loop, From, Future, sleep from trollius import coroutine, get_event_loop, From, Future, sleep
from mock import Mock from mock import Mock
@ -130,7 +131,7 @@ class TestEphemeral(unittest.TestCase):
expired_result.action = ETCD_EXPIRE_RESULT expired_result.action = ETCD_EXPIRE_RESULT
expired_result.key = self.mock_job_key expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = {'builder_id': '1234'} expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
expired_future = Future() expired_future = Future()
expired_future.set_result(expired_result) expired_future.set_result(expired_result)
@ -162,11 +163,11 @@ class TestEphemeral(unittest.TestCase):
def test_heartbeat_response(self): def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60 expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult) builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = { builder_result.value = json.dumps({
'builder_id': '123', 'builder_id': '123',
'expiration': expiration_timestamp, 'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp, 'max_expiration': expiration_timestamp,
} })
self.etcd_client_mock.read = Mock(return_value=builder_result) self.etcd_client_mock.read = Mock(return_value=builder_result)
yield From(self.manager.job_heartbeat(self.mock_job)) yield From(self.manager.job_heartbeat(self.mock_job))