Perform live reconfiguration

Change-Id: I69563ee47dd6f3777a52b67999ff1a03247f1e1e
Reviewed-on: https://review.openstack.org/35324
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 6f70c63..3d07c13 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2281,3 +2281,23 @@
         assert self.getJobFromHistory('node-project-merge').node is None
         assert self.getJobFromHistory('node-project-test1').node == 'debian'
         assert self.getJobFromHistory('node-project-test2').node is None
+
+    def test_live_reconfiguration(self):
+        "Test that live reconfiguration works"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.sched.reconfigure(self.config)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+        assert self.getJobFromHistory('project-merge').result == 'SUCCESS'
+        assert self.getJobFromHistory('project-test1').result == 'SUCCESS'
+        assert self.getJobFromHistory('project-test2').result == 'SUCCESS'
+        assert A.data['status'] == 'MERGED'
+        assert A.reported == 2
+        self.assertEmptyQueues()
diff --git a/zuul/model.py b/zuul/model.py
index 5e653ed..f1dd0b0 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -306,12 +306,15 @@
 
     def enqueueChange(self, change):
         item = QueueItem(self.pipeline, change)
+        self.enqueueItem(item)
+        item.enqueue_time = time.time()
+        return item
+
+    def enqueueItem(self, item):
         if self.dependent and self.queue:
             item.item_ahead = self.queue[-1]
             item.item_ahead.item_behind = item
         self.queue.append(item)
-        item.enqueue_time = time.time()
-        return item
 
     def dequeueItem(self, item):
         if item in self.queue:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index bf5cbaa..b92f187 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -67,6 +67,7 @@
         threading.Thread.__init__(self)
         self.daemon = True
         self.wake_event = threading.Event()
+        self.layout_lock = threading.Lock()
         self.reconfigure_complete_event = threading.Event()
         self._pause = False
         self._reconfigure = False
@@ -78,9 +79,6 @@
 
         self.trigger_event_queue = Queue.Queue()
         self.result_event_queue = Queue.Queue()
-        self._init()
-
-    def _init(self):
         self.layout = model.Layout()
 
     def stop(self):
@@ -88,7 +86,6 @@
         self.wake_event.set()
 
     def testConfig(self, config_path):
-        self._init()
         self._parseConfig(config_path)
 
     def _parseConfig(self, config_path):
@@ -283,6 +280,15 @@
     def setTrigger(self, trigger):
         self.trigger = trigger
 
+    def getProject(self, name):
+        self.layout_lock.acquire()
+        p = None
+        try:
+            p = self.layout.projects.get(name)
+        finally:
+            self.layout_lock.release()
+        return p
+
     def addEvent(self, event):
         self.log.debug("Adding trigger event: %s" % event)
         try:
@@ -320,7 +326,6 @@
     def reconfigure(self, config):
         self.log.debug("Prepare to reconfigure")
         self.config = config
-        self._pause = True
         self._reconfigure = True
         self.wake_event.set()
         self.log.debug("Waiting for reconfiguration")
@@ -387,15 +392,62 @@
             self.log.debug("Exiting")
             self._save_queue()
             os._exit(0)
-        if self._reconfigure:
+
+    def _doReconfigureEvent(self):
+        # This is called in the scheduler loop after another thread sets
+        # the reconfigure flag
+        self.layout_lock.acquire()
+        try:
             self.log.debug("Performing reconfiguration")
-            self._init()
-            self.layout = self._parseConfig(
+            layout = self._parseConfig(
                 self.config.get('zuul', 'layout_config'))
+            for name, new_pipeline in layout.pipelines.items():
+                old_pipeline = self.layout.pipelines.get(name)
+                if not old_pipeline:
+                    if self.layout.pipelines:
+                        # Don't emit this warning on startup
+                        self.log.warning("No old pipeline matching %s found "
+                                         "when reconfiguring" % name)
+                    continue
+                self.log.debug("Re-enqueueing changes for pipeline %s" %
+                               name)
+                items_to_remove = []
+                for shared_queue in old_pipeline.queues:
+                    for item in (shared_queue.queue +
+                                 shared_queue.severed_heads):
+                        item.item_ahead = None
+                        item.item_behind = None
+                        item.pipeline = None
+                        project = layout.projects.get(item.change.project.name)
+                        if not project:
+                            self.log.warning("Unable to find project for "
+                                             "change %s while reenqueueing" %
+                                             item.change)
+                            item.change.project = None
+                            items_to_remove.append(item)
+                            continue
+                        item.change.project = project
+                        severed = item in shared_queue.severed_heads
+                        if not new_pipeline.manager.reEnqueueItem(item,
+                                severed=severed):
+                            items_to_remove.append(item)
+                builds_to_remove = []
+                for build, item in old_pipeline.manager.building_jobs.items():
+                    if item in items_to_remove:
+                        builds_to_remove.append(build)
+                        self.log.warning("Deleting running build %s for "
+                                         "change %s while reenqueueing" % (
+                                         build, item.change))
+                for build in builds_to_remove:
+                    del old_pipeline.manager.building_jobs[build]
+                new_pipeline.manager.building_jobs = \
+                    old_pipeline.manager.building_jobs
+            self.layout = layout
             self._setupMerger()
-            self._pause = False
             self._reconfigure = False
             self.reconfigure_complete_event.set()
+        finally:
+            self.layout_lock.release()
 
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
@@ -424,6 +476,9 @@
                 return
             self.log.debug("Run handler awake")
             try:
+                if self._reconfigure:
+                    self._doReconfigureEvent()
+
                 # Give result events priority -- they let us stop builds,
                 # whereas trigger evensts cause us to launch builds.
                 if not self.result_event_queue.empty():
@@ -497,8 +552,6 @@
         ret = '<html><pre>'
         if self._pause:
             ret += '<p><b>Queue only mode:</b> preparing to '
-            if self._reconfigure:
-                ret += 'reconfigure'
             if self._exit:
                 ret += 'exit'
             ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@@ -520,8 +573,6 @@
         data = {}
         if self._pause:
             ret = '<p><b>Queue only mode:</b> preparing to '
-            if self._reconfigure:
-                ret += 'reconfigure'
             if self._exit:
                 ret += 'exit'
             ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@@ -683,6 +734,22 @@
                            (change, old_change, old_change))
             self.removeChange(old_change)
 
+    def reEnqueueItem(self, item, severed=False):
+        change_queue = self.pipeline.getQueue(item.change.project)
+        if change_queue:
+            self.log.debug("Re-enqueing change %s in queue %s" %
+                           (item.change, change_queue))
+            if severed:
+                change_queue.addSeveredHead(item)
+            else:
+                change_queue.enqueueItem(item)
+            self.reportStats(item)
+            return True
+        else:
+            self.log.error("Unable to find change queue for project %s" %
+                           item.change.project)
+            return False
+
     def addChange(self, change):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index fe1a009..16b80c4 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -324,7 +324,7 @@
         if change.patchset is None:
             change.patchset = data['currentPatchSet']['number']
 
-        change.project = self.sched.layout.projects[data['project']]
+        change.project = self.sched.getProject(data['project'])
         change.branch = data['branch']
         change.url = data['url']
         max_ps = 0