Fix trigger cache maintenance

Each pipeline was maintaining the trigger cache only based on
its own current usage, which means that pipelines were clearing
the cache of changes currently being used by other pipelines.

This considers all changes in use in all pipelines when maintaining
the cache.

Change-Id: I3ab14c69acd80ecc613b63628c837511594744d0
Reviewed-on: https://review.openstack.org/36699
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Tested-by: Jenkins
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index b67e09f..eae5370 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1394,6 +1394,52 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_trigger_cache(self):
+        "Test that the trigger cache operates correctly"
+        self.worker.hold_jobs_in_build = True
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        X = self.fake_gerrit.addFakeChange('org/project', 'master', 'X')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+
+        M1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'M1')
+        M1.setMerged()
+
+        B.setDependsOn(A, 1)
+        A.setDependsOn(M1, 1)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(X.getPatchsetCreatedEvent(1))
+
+        self.waitUntilSettled()
+
+        for build in self.builds:
+            if build.parameters['ZUUL_PIPELINE'] == 'check':
+                build.release()
+        self.waitUntilSettled()
+        for build in self.builds:
+            if build.parameters['ZUUL_PIPELINE'] == 'check':
+                build.release()
+        self.waitUntilSettled()
+
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.log.debug("len %s " % self.sched.trigger._change_cache.keys())
+        # there should still be changes in the cache
+        self.assertNotEqual(len(self.sched.trigger._change_cache.keys()), 0)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(B.data['status'], 'MERGED')
+        self.assertEqual(A.queried, 2)  # Initial and isMerged
+        self.assertEqual(B.queried, 3)  # Initial A, refresh from B, isMerged
+
     def test_can_merge(self):
         "Test whether a change is ready to merge"
         # TODO: move to test_gerrit (this is a unit test!)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 338eb63..a973877 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -76,6 +76,7 @@
         self.launcher = None
         self.trigger = None
         self.config = None
+        self._maintain_trigger_cache = False
 
         self.trigger_event_queue = Queue.Queue()
         self.result_event_queue = Queue.Queue()
@@ -499,9 +500,23 @@
                 else:
                     if not self.result_event_queue.empty():
                         self.wake_event.set()
+
+                if self._maintain_trigger_cache:
+                    self.maintainTriggerCache()
+                    self._maintain_trigger_cache = False
+
             except:
                 self.log.exception("Exception in run handler:")
 
+    def maintainTriggerCache(self):
+        relevant = set()
+        for pipeline in self.layout.pipelines.values():
+            for item in pipeline.getAllItems():
+                relevant.add(item.change)
+                relevant.update(item.change.getRelatedChanges())
+        self.log.debug("Trigger cache size: %s" % len(relevant))
+        self.trigger.maintainCache(relevant)
+
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
         event = self.trigger_event_queue.get()
@@ -794,14 +809,7 @@
             (item.change.is_reportable and not item.reported)):
             self.log.debug("Adding %s as a severed head" % item.change)
             change_queue.addSeveredHead(item)
-        self.maintainTriggerCache()
-
-    def maintainTriggerCache(self):
-        relevant = set()
-        for item in self.pipeline.getAllItems():
-            relevant.add(item.change)
-            relevant.update(item.change.getRelatedChanges())
-        self.sched.trigger.maintainCache(relevant)
+        self.sched._maintain_trigger_cache = True
 
     def removeChange(self, change):
         # Remove a change from the queue, probably because it has been