Merge "Add a promote client command"
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 91d0913..c15d70c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2976,8 +2976,7 @@
         r = client.enqueue(pipeline='gate',
                            project='org/project',
                            trigger='gerrit',
-                           change='1',
-                           patchset='1')
+                           change='1,1')
         self.waitUntilSettled()
         self.assertEqual(self.getJobFromHistory('project-merge').result,
                          'SUCCESS')
@@ -2998,8 +2997,7 @@
             r = client.enqueue(pipeline='gate',
                                project='project-does-not-exist',
                                trigger='gerrit',
-                               change='1',
-                               patchset='1')
+                               change='1,1')
             client.shutdown()
             self.assertEqual(r, False)
 
@@ -3008,8 +3006,7 @@
             r = client.enqueue(pipeline='pipeline-does-not-exist',
                                project='org/project',
                                trigger='gerrit',
-                               change='1',
-                               patchset='1')
+                               change='1,1')
             client.shutdown()
             self.assertEqual(r, False)
 
@@ -3018,8 +3015,7 @@
             r = client.enqueue(pipeline='gate',
                                project='org/project',
                                trigger='trigger-does-not-exist',
-                               change='1',
-                               patchset='1')
+                               change='1,1')
             client.shutdown()
             self.assertEqual(r, False)
 
@@ -3028,11 +3024,164 @@
             r = client.enqueue(pipeline='gate',
                                project='org/project',
                                trigger='gerrit',
-                               change='1',
-                               patchset='1')
+                               change='1,1')
             client.shutdown()
             self.assertEqual(r, False)
 
         self.waitUntilSettled()
         self.assertEqual(len(self.history), 0)
         self.assertEqual(len(self.builds), 0)
+
+    def test_client_promote(self):
+        "Test that the RPC client can promote a change"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        r = client.promote(pipeline='gate',
+                           change_ids=['2,1', '3,1'])
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 6)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+        self.assertEqual(self.builds[4].name, 'project-test1')
+        self.assertEqual(self.builds[5].name, 'project-test2')
+
+        self.assertTrue(self.job_has_changes(self.builds[0], B))
+        self.assertFalse(self.job_has_changes(self.builds[0], A))
+        self.assertFalse(self.job_has_changes(self.builds[0], C))
+
+        self.assertTrue(self.job_has_changes(self.builds[2], B))
+        self.assertTrue(self.job_has_changes(self.builds[2], C))
+        self.assertFalse(self.job_has_changes(self.builds[2], A))
+
+        self.assertTrue(self.job_has_changes(self.builds[4], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], C))
+        self.assertTrue(self.job_has_changes(self.builds[4], A))
+
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.data['status'], 'MERGED')
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.data['status'], 'MERGED')
+        self.assertEqual(C.reported, 2)
+
+        client.shutdown()
+        self.assertEqual(r, True)
+
+    def test_client_promote_dependent(self):
+        "Test that the RPC client can promote a dependent change"
+        # C (depends on B) -> B -> A ; then promote C to get:
+        # A -> C (depends on B) -> B
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        C.setDependsOn(B, 1)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        r = client.promote(pipeline='gate',
+                           change_ids=['3,1'])
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 6)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+        self.assertEqual(self.builds[4].name, 'project-test1')
+        self.assertEqual(self.builds[5].name, 'project-test2')
+
+        self.assertTrue(self.job_has_changes(self.builds[0], B))
+        self.assertFalse(self.job_has_changes(self.builds[0], A))
+        self.assertFalse(self.job_has_changes(self.builds[0], C))
+
+        self.assertTrue(self.job_has_changes(self.builds[2], B))
+        self.assertTrue(self.job_has_changes(self.builds[2], C))
+        self.assertFalse(self.job_has_changes(self.builds[2], A))
+
+        self.assertTrue(self.job_has_changes(self.builds[4], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], C))
+        self.assertTrue(self.job_has_changes(self.builds[4], A))
+
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.data['status'], 'MERGED')
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.data['status'], 'MERGED')
+        self.assertEqual(C.reported, 2)
+
+        client.shutdown()
+        self.assertEqual(r, True)
+
+    def test_client_promote_negative(self):
+        "Test that the RPC client returns errors for promotion"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
+            r = client.promote(pipeline='nonexistent',
+                               change_ids=['2,1', '3,1'])
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
+            r = client.promote(pipeline='gate',
+                               change_ids=['4,1'])
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index a5327a2..8f55c3d 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -55,10 +55,16 @@
                                  required=True)
         cmd_enqueue.add_argument('--change', help='change id',
                                  required=True)
-        cmd_enqueue.add_argument('--patchset', help='patchset number',
-                                 required=True)
         cmd_enqueue.set_defaults(func=self.enqueue)
 
+        cmd_promote = subparsers.add_parser('promote',
+                                            help='promote one or more changes')
+        cmd_promote.add_argument('--pipeline', help='pipeline name',
+                                 required=True)
+        cmd_promote.add_argument('--changes', help='change ids',
+                                 required=True, nargs='+')
+        cmd_promote.set_defaults(func=self.promote)
+
         self.args = parser.parse_args()
 
     def read_config(self):
@@ -104,8 +110,13 @@
         r = client.enqueue(pipeline=self.args.pipeline,
                            project=self.args.project,
                            trigger=self.args.trigger,
-                           change=self.args.change,
-                           patchset=self.args.patchset)
+                           change=self.args.change)
+        return r
+
+    def promote(self):
+        client = zuul.rpcclient.RPCClient(self.server, self.port)
+        r = client.promote(pipeline=self.args.pipeline,
+                           change_ids=self.args.changes)
         return r
 
 
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 63c726b..69390c0 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -48,14 +48,19 @@
         self.log.debug("Job complete, success: %s" % (not job.failure))
         return (not job.failure)
 
-    def enqueue(self, pipeline, project, trigger, change, patchset):
+    def enqueue(self, pipeline, project, trigger, change):
         data = {'pipeline': pipeline,
                 'project': project,
                 'trigger': trigger,
                 'change': change,
-                'patchset': patchset,
                 }
         return self.submitJob('zuul:enqueue', data)
 
+    def promote(self, pipeline, change_ids):
+        data = {'pipeline': pipeline,
+                'change_ids': change_ids,
+                }
+        return self.submitJob('zuul:promote', data)
+
     def shutdown(self):
         self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d70ab63..c1b9216 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -39,13 +39,15 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
-        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
+        self.worker.waitForServer()
+        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
+        self.worker.registerFunction("zuul:promote")
 
     def stop(self):
         self.log.debug("Stopping")
@@ -57,10 +59,12 @@
         self.thread.join()
 
     def run(self):
+        self.log.debug("Starting RPC listener")
         while self._running:
             try:
                 job = self.worker.getJob()
                 z, jobname = job.name.split(':')
+                self.log.debug("Received job %s" % job.name)
                 attrname = 'handle_' + jobname
                 if hasattr(self, attrname):
                     f = getattr(self, attrname)
@@ -86,31 +90,36 @@
         if trigger:
             event.trigger_name = args['trigger']
         else:
-            errors += 'Invalid trigger: %s\n' % args['trigger']
+            errors += 'Invalid trigger: %s\n' % (args['trigger'],)
 
         project = self.sched.layout.projects.get(args['project'])
         if project:
             event.project_name = args['project']
         else:
-            errors += 'Invalid project: %s\n' % args['project']
+            errors += 'Invalid project: %s\n' % (args['project'],)
 
         pipeline = self.sched.layout.pipelines.get(args['pipeline'])
         if pipeline:
             event.forced_pipeline = args['pipeline']
         else:
-            errors += 'Invalid pipeline: %s\n' % args['pipeline']
+            errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
 
         if not errors:
-            event.change_number = args['change']
-            event.patch_number = args['patchset']
+            event.change_number, event.patch_number = args['change'].split(',')
             try:
                 event.getChange(project, trigger)
             except Exception:
-                errors += 'Invalid change: %s,%s\n' % (
-                    args['change'], args['patchset'])
+                errors += 'Invalid change: %s\n' % (args['change'],)
 
         if errors:
             job.sendWorkException(errors.encode('utf8'))
         else:
             self.sched.addEvent(event)
             job.sendWorkComplete()
+
+    def handle_promote(self, job):
+        args = json.loads(job.arguments)
+        pipeline_name = args['pipeline']
+        change_ids = args['change_ids']
+        self.sched.promote(pipeline_name, change_ids)
+        job.sendWorkComplete()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 31b4429..f8dd6e1 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -22,6 +22,7 @@
 import pickle
 import Queue
 import re
+import sys
 import threading
 import time
 import yaml
@@ -64,12 +65,21 @@
     """An event that should be processed within the main queue run loop"""
     def __init__(self):
         self._wait_event = threading.Event()
+        self._exception = None
+        self._traceback = None
 
-    def setComplete(self):
+    def exception(self, e, tb):
+        self._exception = e
+        self._traceback = tb
+        self._wait_event.set()
+
+    def done(self):
         self._wait_event.set()
 
     def wait(self, timeout=None):
         self._wait_event.wait(timeout)
+        if self._exception:
+            raise self._exception, None, self._traceback
         return self._wait_event.is_set()
 
 
@@ -84,6 +94,20 @@
         self.config = config
 
 
+class PromoteEvent(ManagementEvent):
+    """Promote one or more changes to the head of the queue.
+
+    :arg str pipeline_name: the name of the pipeline
+    :arg list change_ids: a list of strings of change ids in the form
+        1234,1
+    """
+
+    def __init__(self, pipeline_name, change_ids):
+        super(PromoteEvent, self).__init__()
+        self.pipeline_name = pipeline_name
+        self.change_ids = change_ids
+
+
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
@@ -396,6 +420,14 @@
         event.wait()
         self.log.debug("Reconfiguration complete")
 
+    def promote(self, pipeline_name, change_ids):
+        event = PromoteEvent(pipeline_name, change_ids)
+        self.management_event_queue.put(event)
+        self.wake_event.set()
+        self.log.debug("Waiting for promotion")
+        event.wait()
+        self.log.debug("Promotion complete")
+
     def exit(self):
         self.log.debug("Prepare to exit")
         self._pause = True
@@ -520,6 +552,43 @@
         finally:
             self.layout_lock.release()
 
+    def _doPromoteEvent(self, event):
+        pipeline = self.layout.pipelines[event.pipeline_name]
+        change_ids = [c.split(',') for c in event.change_ids]
+        items_to_enqueue = []
+        change_queue = None
+        for shared_queue in pipeline.queues:
+            if change_queue:
+                break
+            for item in shared_queue.queue:
+                if (item.change.number == change_ids[0][0] and
+                    item.change.patchset == change_ids[0][1]):
+                    change_queue = shared_queue
+                    break
+        if not change_queue:
+            raise Exception("Unable to find shared change queue for %s" %
+                            event.change_ids[0])
+        for number, patchset in change_ids:
+            found = False
+            for item in change_queue.queue:
+                if (item.change.number == number and
+                    item.change.patchset == patchset):
+                    found = True
+                    items_to_enqueue.append(item)
+                    break
+            if not found:
+                raise Exception("Unable to find %s,%s in queue %s" %
+                                (number, patchset, change_queue))
+        for item in change_queue.queue[:]:
+            if item not in items_to_enqueue:
+                items_to_enqueue.append(item)
+            pipeline.manager.cancelJobs(item)
+            pipeline.manager.dequeueItem(item)
+        for item in items_to_enqueue:
+            pipeline.manager.addChange(item.change, quiet=True)
+        while pipeline.manager.processQueue():
+            pass
+
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
         waiting = False
@@ -625,9 +694,16 @@
         self.log.debug("Fetching management event")
         event = self.management_event_queue.get()
         self.log.debug("Processing management event %s" % event)
-        if isinstance(event, ReconfigureEvent):
-            self._doReconfigureEvent(event)
-        event.setComplete()
+        try:
+            if isinstance(event, ReconfigureEvent):
+                self._doReconfigureEvent(event)
+            elif isinstance(event, PromoteEvent):
+                self._doPromoteEvent(event)
+            else:
+                self.log.error("Unable to handle event %s" % event)
+            event.done()
+        except Exception as e:
+            event.exception(e, sys.exc_info()[2])
         self.management_event_queue.task_done()
 
     def process_result_queue(self):
@@ -815,10 +891,10 @@
     def isChangeReadyToBeEnqueued(self, change):
         return True
 
-    def enqueueChangesAhead(self, change):
+    def enqueueChangesAhead(self, change, quiet):
         return True
 
-    def enqueueChangesBehind(self, change):
+    def enqueueChangesBehind(self, change, quiet):
         return True
 
     def checkForChangesNeededBy(self, change):
@@ -872,7 +948,7 @@
                            item.change.project)
             return False
 
-    def addChange(self, change):
+    def addChange(self, change, quiet=False):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
             self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -883,7 +959,7 @@
                            change)
             return False
 
-        if not self.enqueueChangesAhead(change):
+        if not self.enqueueChangesAhead(change, quiet):
             self.log.debug("Failed to enqueue changes ahead of %s" % change)
             return False
 
@@ -895,11 +971,12 @@
         if change_queue:
             self.log.debug("Adding change %s to queue %s" %
                            (change, change_queue))
-            if len(self.pipeline.start_actions) > 0:
-                self.reportStart(change)
+            if not quiet:
+                if len(self.pipeline.start_actions) > 0:
+                    self.reportStart(change)
             item = change_queue.enqueueChange(change)
             self.reportStats(item)
-            self.enqueueChangesBehind(change)
+            self.enqueueChangesBehind(change, quiet)
         else:
             self.log.error("Unable to find change queue for project %s" %
                            change.project)
@@ -970,7 +1047,7 @@
         self.log.debug("Cancel jobs for change %s" % item.change)
         canceled = False
         to_remove = []
-        if prime and item.current_build_set.builds:
+        if prime and item.current_build_set.ref:
             item.resetAllBuilds()
         for build, build_item in self.building_jobs.items():
             if build_item == item:
@@ -1440,7 +1517,7 @@
             return False
         return True
 
-    def enqueueChangesBehind(self, change):
+    def enqueueChangesBehind(self, change, quiet):
         to_enqueue = []
         self.log.debug("Checking for changes needing %s:" % change)
         if not hasattr(change, 'needed_by_changes'):
@@ -1456,15 +1533,15 @@
             self.log.debug("  No changes need %s" % change)
 
         for other_change in to_enqueue:
-            self.addChange(other_change)
+            self.addChange(other_change, quiet)
 
-    def enqueueChangesAhead(self, change):
+    def enqueueChangesAhead(self, change, quiet):
         ret = self.checkForChangesNeededBy(change)
         if ret in [True, False]:
             return ret
         self.log.debug("  Change %s must be merged ahead of %s" %
                        (ret, change))
-        return self.addChange(ret)
+        return self.addChange(ret, quiet)
 
     def checkForChangesNeededBy(self, change):
         self.log.debug("Checking for changes needed by %s:" % change)