Cache is held and managed by connections

Add reconfigure test case. This test previously fails currently due to a
regression introduced with the connections changes.

Because multiple sources share a connection, a pipeline that does not hold
and therefore require any changes in the cache may clear a connections
cache before a pipeline that does need said change has an opportunity to
add it to the relevant list.

Allow connections to manage their cache directly rather than the source
doing it vicariously ignorant of other pipelines/sources. Collect the
relevant changes from all pipelines and ask any connections holding a
cache for that item to keep it on reconfiguration.

Co-Authored-By: James E. Blair <jeblair@linux.vnet.ibm.com>
Change-Id: I2bf8ba6b9deda58114db9e9b96985a2a0e2a69cb
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 81c2948..b2ec5f7 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -693,8 +693,8 @@
         # triggering events.  Since it will have the changes cached
         # already (without approvals), we need to clear the cache
         # first.
-        source = self.sched.layout.pipelines['gate'].source
-        source.maintainCache([])
+        for connection in self.connections.values():
+            connection.maintainCache([])
 
         self.worker.hold_jobs_in_build = True
         A.addApproval('APRV', 1)
@@ -791,7 +791,6 @@
         A.addApproval('APRV', 1)
         a = source._getChange(1, 2, refresh=True)
         self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
-        source.maintainCache([])
 
     def test_build_configuration(self):
         "Test that zuul merges the right commits for testing"
@@ -2609,6 +2608,53 @@
         # Ensure the removed job was not included in the report.
         self.assertNotIn('project1-project2-integration', A.messages[0])
 
+    def test_double_live_reconfiguration_shared_queue(self):
+        # This was a real-world regression.  A change is added to
+        # gate; a reconfigure happens, a second change which depends
+        # on the first is added, and a second reconfiguration happens.
+        # Ensure that both changes merge.
+
+        # A failure may indicate incorrect caching or cleaning up of
+        # references during a reconfiguration.
+        self.worker.hold_jobs_in_build = True
+
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+        B.setDependsOn(A, 1)
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+
+        # Add the parent change.
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Reconfigure (with only one change in the pipeline).
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        # Add the child change.
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Reconfigure (with both in the pipeline).
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.history), 8)
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.data['status'], 'MERGED')
+        self.assertEqual(B.reported, 2)
+
     def test_live_reconfiguration_del_project(self):
         # Test project deletion from layout
         # while changes are enqueued
@@ -3656,8 +3702,8 @@
         self.assertEqual(A.data['status'], 'NEW')
         self.assertEqual(B.data['status'], 'NEW')
 
-        source = self.sched.layout.pipelines['gate'].source
-        source.maintainCache([])
+        for connection in self.connections.values():
+            connection.maintainCache([])
 
         self.worker.hold_jobs_in_build = True
         B.addApproval('APRV', 1)