Perform live reconfiguration
Change-Id: I69563ee47dd6f3777a52b67999ff1a03247f1e1e
Reviewed-on: https://review.openstack.org/35324
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 6f70c63..3d07c13 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2281,3 +2281,23 @@
assert self.getJobFromHistory('node-project-merge').node is None
assert self.getJobFromHistory('node-project-test1').node == 'debian'
assert self.getJobFromHistory('node-project-test2').node is None
+
+ def test_live_reconfiguration(self):
+ "Test that live reconfiguration works"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.sched.reconfigure(self.config)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+ assert self.getJobFromHistory('project-merge').result == 'SUCCESS'
+ assert self.getJobFromHistory('project-test1').result == 'SUCCESS'
+ assert self.getJobFromHistory('project-test2').result == 'SUCCESS'
+ assert A.data['status'] == 'MERGED'
+ assert A.reported == 2
+ self.assertEmptyQueues()
diff --git a/zuul/model.py b/zuul/model.py
index 5e653ed..f1dd0b0 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -306,12 +306,15 @@
def enqueueChange(self, change):
item = QueueItem(self.pipeline, change)
+ self.enqueueItem(item)
+ item.enqueue_time = time.time()
+ return item
+
+ def enqueueItem(self, item):
if self.dependent and self.queue:
item.item_ahead = self.queue[-1]
item.item_ahead.item_behind = item
self.queue.append(item)
- item.enqueue_time = time.time()
- return item
def dequeueItem(self, item):
if item in self.queue:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index bf5cbaa..b92f187 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -67,6 +67,7 @@
threading.Thread.__init__(self)
self.daemon = True
self.wake_event = threading.Event()
+ self.layout_lock = threading.Lock()
self.reconfigure_complete_event = threading.Event()
self._pause = False
self._reconfigure = False
@@ -78,9 +79,6 @@
self.trigger_event_queue = Queue.Queue()
self.result_event_queue = Queue.Queue()
- self._init()
-
- def _init(self):
self.layout = model.Layout()
def stop(self):
@@ -88,7 +86,6 @@
self.wake_event.set()
def testConfig(self, config_path):
- self._init()
self._parseConfig(config_path)
def _parseConfig(self, config_path):
@@ -283,6 +280,15 @@
def setTrigger(self, trigger):
self.trigger = trigger
+ def getProject(self, name):
+ self.layout_lock.acquire()
+ p = None
+ try:
+ p = self.layout.projects.get(name)
+ finally:
+ self.layout_lock.release()
+ return p
+
def addEvent(self, event):
self.log.debug("Adding trigger event: %s" % event)
try:
@@ -320,7 +326,6 @@
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
self.config = config
- self._pause = True
self._reconfigure = True
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
@@ -387,15 +392,62 @@
self.log.debug("Exiting")
self._save_queue()
os._exit(0)
- if self._reconfigure:
+
+ def _doReconfigureEvent(self):
+ # This is called in the scheduler loop after another thread sets
+ # the reconfigure flag
+ self.layout_lock.acquire()
+ try:
self.log.debug("Performing reconfiguration")
- self._init()
- self.layout = self._parseConfig(
+ layout = self._parseConfig(
self.config.get('zuul', 'layout_config'))
+ for name, new_pipeline in layout.pipelines.items():
+ old_pipeline = self.layout.pipelines.get(name)
+ if not old_pipeline:
+ if self.layout.pipelines:
+ # Don't emit this warning on startup
+ self.log.warning("No old pipeline matching %s found "
+ "when reconfiguring" % name)
+ continue
+ self.log.debug("Re-enqueueing changes for pipeline %s" %
+ name)
+ items_to_remove = []
+ for shared_queue in old_pipeline.queues:
+ for item in (shared_queue.queue +
+ shared_queue.severed_heads):
+ item.item_ahead = None
+ item.item_behind = None
+ item.pipeline = None
+ project = layout.projects.get(item.change.project.name)
+ if not project:
+ self.log.warning("Unable to find project for "
+ "change %s while reenqueueing" %
+ item.change)
+ item.change.project = None
+ items_to_remove.append(item)
+ continue
+ item.change.project = project
+ severed = item in shared_queue.severed_heads
+ if not new_pipeline.manager.reEnqueueItem(item,
+ severed=severed):
+ items_to_remove.append(item)
+ builds_to_remove = []
+ for build, item in old_pipeline.manager.building_jobs.items():
+ if item in items_to_remove:
+ builds_to_remove.append(build)
+ self.log.warning("Deleting running build %s for "
+ "change %s while reenqueueing" % (
+ build, item.change))
+ for build in builds_to_remove:
+ del old_pipeline.manager.building_jobs[build]
+ new_pipeline.manager.building_jobs = \
+ old_pipeline.manager.building_jobs
+ self.layout = layout
self._setupMerger()
- self._pause = False
self._reconfigure = False
self.reconfigure_complete_event.set()
+ finally:
+ self.layout_lock.release()
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@@ -424,6 +476,9 @@
return
self.log.debug("Run handler awake")
try:
+ if self._reconfigure:
+ self._doReconfigureEvent()
+
# Give result events priority -- they let us stop builds,
# whereas trigger evensts cause us to launch builds.
if not self.result_event_queue.empty():
@@ -497,8 +552,6 @@
ret = '<html><pre>'
if self._pause:
ret += '<p><b>Queue only mode:</b> preparing to '
- if self._reconfigure:
- ret += 'reconfigure'
if self._exit:
ret += 'exit'
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@@ -520,8 +573,6 @@
data = {}
if self._pause:
ret = '<p><b>Queue only mode:</b> preparing to '
- if self._reconfigure:
- ret += 'reconfigure'
if self._exit:
ret += 'exit'
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@@ -683,6 +734,22 @@
(change, old_change, old_change))
self.removeChange(old_change)
+ def reEnqueueItem(self, item, severed=False):
+ change_queue = self.pipeline.getQueue(item.change.project)
+ if change_queue:
+ self.log.debug("Re-enqueing change %s in queue %s" %
+ (item.change, change_queue))
+ if severed:
+ change_queue.addSeveredHead(item)
+ else:
+ change_queue.enqueueItem(item)
+ self.reportStats(item)
+ return True
+ else:
+ self.log.error("Unable to find change queue for project %s" %
+ item.change.project)
+ return False
+
def addChange(self, change):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index fe1a009..16b80c4 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -324,7 +324,7 @@
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
- change.project = self.sched.layout.projects[data['project']]
+ change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0