Support dynamic dependent pipeline additions

Previously one could not speculatively add a project to a dependent
pipeline if the desired queue did not exist. This change handles this
special case by creating a dynamic queue for the use of the speculative
change. If the change merges, then the queue will exist for any future
change.

Introduces a new ChangeQueue attribute, 'dynamic', to help
DependentPipelineManager objects determine whether a ChangeQueue was
created dynamically or not, during a clean up phase. Only used in
DependentPipelineManager pipelines.

Note that this doesn't necessarily support a newly added named queue, or
adding a project to an existing named queue. That will be follow up
work.

Change-Id: I51ab7fb113f1bcbcef5f4f6c96ca046d0c76fdd9
Story: 2000898
Task: 3528
diff --git a/tests/fixtures/config/in-repo/git/common-config/zuul.yaml b/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
index 1fdaf2e..fce086e 100644
--- a/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
@@ -32,6 +32,26 @@
         verified: 0
     precedence: high
 
+- pipeline:
+    name: gate
+    manager: dependent
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - code-review: 2
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    start:
+      gerrit:
+        verified: 0
+    precedence: high
+
 - job:
     name: common-config-test
 
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 5d49d11..57f7947 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -199,6 +199,52 @@
         self.executor_server.release()
         self.waitUntilSettled()
 
+    def test_dynamic_dependent_pipeline(self):
+        # Test dynamically adding a project to a
+        # dependent pipeline for the first time
+        self.executor_server.hold_jobs_in_build = True
+
+        tenant = self.sched.abide.tenants.get('tenant-one')
+        gate_pipeline = tenant.layout.pipelines['gate']
+
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test2
+
+            - project:
+                name: org/project
+                gate:
+                  jobs:
+                    - project-test2
+            """)
+
+        in_repo_playbook = textwrap.dedent(
+            """
+            - hosts: all
+              tasks: []
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf,
+                     'playbooks/project-test2.yaml': in_repo_playbook}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('approved', 1)
+        self.fake_gerrit.addEvent(A.addApproval('code-review', 2))
+        self.waitUntilSettled()
+
+        items = gate_pipeline.getAllItems()
+        self.assertEqual(items[0].change.number, '1')
+        self.assertEqual(items[0].change.patchset, '1')
+        self.assertTrue(items[0].live)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
+        # Make sure the dynamic queue got cleaned up
+        self.assertEqual(gate_pipeline.queues, [])
+
     def test_in_repo_branch(self):
         in_repo_conf = textwrap.dedent(
             """
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index ada3491..411894e 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -14,6 +14,7 @@
 
 from zuul import model
 from zuul.manager import PipelineManager, StaticChangeQueueContextManager
+from zuul.manager import DynamicChangeQueueContextManager
 
 
 class DependentPipelineManager(PipelineManager):
@@ -75,8 +76,17 @@
     def getChangeQueue(self, change, existing=None):
         if existing:
             return StaticChangeQueueContextManager(existing)
-        return StaticChangeQueueContextManager(
-            self.pipeline.getQueue(change.project))
+        queue = self.pipeline.getQueue(change.project)
+        if queue:
+            return StaticChangeQueueContextManager(queue)
+        else:
+            # There is no existing queue for this change. Create a
+            # dynamic one for this one change's use
+            change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
+            change_queue.addProject(change.project)
+            self.pipeline.addQueue(change_queue)
+            self.log.debug("Dynamically created queue %s", change_queue)
+            return DynamicChangeQueueContextManager(change_queue)
 
     def isChangeReadyToBeEnqueued(self, change):
         source = change.project.source
@@ -201,3 +211,11 @@
         if failing_items:
             return failing_items
         return None
+
+    def dequeueItem(self, item):
+        super(DependentPipelineManager, self).dequeueItem(item)
+        # If this was a dynamic queue from a speculative change,
+        # remove the queue (if empty)
+        if item.queue.dynamic:
+            if not item.queue.queue:
+                self.pipeline.removeQueue(item.queue)
diff --git a/zuul/model.py b/zuul/model.py
index ffbb70c..f5cbdac 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -208,11 +208,14 @@
     be processed. If a Change succeeds, the Window is increased by
     `window_increase_factor`. If a Change fails, the Window is decreased by
     `window_decrease_factor`.
+
+    A ChangeQueue may be a dynamically created queue, which may be removed
+    from a DependentPipelineManager once empty.
     """
     def __init__(self, pipeline, window=0, window_floor=1,
                  window_increase_type='linear', window_increase_factor=1,
                  window_decrease_type='exponential', window_decrease_factor=2,
-                 name=None):
+                 name=None, dynamic=False):
         self.pipeline = pipeline
         if name:
             self.name = name
@@ -227,6 +230,7 @@
         self.window_increase_factor = window_increase_factor
         self.window_decrease_type = window_decrease_type
         self.window_decrease_factor = window_decrease_factor
+        self.dynamic = dynamic
 
     def __repr__(self):
         return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)