Merge "Add a promote client command"
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 91d0913..c15d70c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2976,8 +2976,7 @@
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
- change='1',
- patchset='1')
+ change='1,1')
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
@@ -2998,8 +2997,7 @@
r = client.enqueue(pipeline='gate',
project='project-does-not-exist',
trigger='gerrit',
- change='1',
- patchset='1')
+ change='1,1')
client.shutdown()
self.assertEqual(r, False)
@@ -3008,8 +3006,7 @@
r = client.enqueue(pipeline='pipeline-does-not-exist',
project='org/project',
trigger='gerrit',
- change='1',
- patchset='1')
+ change='1,1')
client.shutdown()
self.assertEqual(r, False)
@@ -3018,8 +3015,7 @@
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='trigger-does-not-exist',
- change='1',
- patchset='1')
+ change='1,1')
client.shutdown()
self.assertEqual(r, False)
@@ -3028,11 +3024,164 @@
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
- change='1',
- patchset='1')
+ change='1,1')
client.shutdown()
self.assertEqual(r, False)
self.waitUntilSettled()
self.assertEqual(len(self.history), 0)
self.assertEqual(len(self.builds), 0)
+
+ def test_client_promote(self):
+ "Test that the RPC client can promote a change"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ r = client.promote(pipeline='gate',
+ change_ids=['2,1', '3,1'])
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 6)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+ self.assertEqual(self.builds[4].name, 'project-test1')
+ self.assertEqual(self.builds[5].name, 'project-test2')
+
+ self.assertTrue(self.job_has_changes(self.builds[0], B))
+ self.assertFalse(self.job_has_changes(self.builds[0], A))
+ self.assertFalse(self.job_has_changes(self.builds[0], C))
+
+ self.assertTrue(self.job_has_changes(self.builds[2], B))
+ self.assertTrue(self.job_has_changes(self.builds[2], C))
+ self.assertFalse(self.job_has_changes(self.builds[2], A))
+
+ self.assertTrue(self.job_has_changes(self.builds[4], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], C))
+ self.assertTrue(self.job_has_changes(self.builds[4], A))
+
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(C.reported, 2)
+
+ client.shutdown()
+ self.assertEqual(r, True)
+
+ def test_client_promote_dependent(self):
+ "Test that the RPC client can promote a dependent change"
+ # C (depends on B) -> B -> A ; then promote C to get:
+ # A -> C (depends on B) -> B
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+ C.setDependsOn(B, 1)
+
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ r = client.promote(pipeline='gate',
+ change_ids=['3,1'])
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 6)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+ self.assertEqual(self.builds[4].name, 'project-test1')
+ self.assertEqual(self.builds[5].name, 'project-test2')
+
+ self.assertTrue(self.job_has_changes(self.builds[0], B))
+ self.assertFalse(self.job_has_changes(self.builds[0], A))
+ self.assertFalse(self.job_has_changes(self.builds[0], C))
+
+ self.assertTrue(self.job_has_changes(self.builds[2], B))
+ self.assertTrue(self.job_has_changes(self.builds[2], C))
+ self.assertFalse(self.job_has_changes(self.builds[2], A))
+
+ self.assertTrue(self.job_has_changes(self.builds[4], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], C))
+ self.assertTrue(self.job_has_changes(self.builds[4], A))
+
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(C.reported, 2)
+
+ client.shutdown()
+ self.assertEqual(r, True)
+
+ def test_client_promote_negative(self):
+ "Test that the RPC client returns errors for promotion"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
+ r = client.promote(pipeline='nonexistent',
+ change_ids=['2,1', '3,1'])
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
+ r = client.promote(pipeline='gate',
+ change_ids=['4,1'])
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index a5327a2..8f55c3d 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -55,10 +55,16 @@
required=True)
cmd_enqueue.add_argument('--change', help='change id',
required=True)
- cmd_enqueue.add_argument('--patchset', help='patchset number',
- required=True)
cmd_enqueue.set_defaults(func=self.enqueue)
+ cmd_promote = subparsers.add_parser('promote',
+ help='promote one or more changes')
+ cmd_promote.add_argument('--pipeline', help='pipeline name',
+ required=True)
+ cmd_promote.add_argument('--changes', help='change ids',
+ required=True, nargs='+')
+ cmd_promote.set_defaults(func=self.promote)
+
self.args = parser.parse_args()
def read_config(self):
@@ -104,8 +110,13 @@
r = client.enqueue(pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
- change=self.args.change,
- patchset=self.args.patchset)
+ change=self.args.change)
+ return r
+
+ def promote(self):
+ client = zuul.rpcclient.RPCClient(self.server, self.port)
+ r = client.promote(pipeline=self.args.pipeline,
+ change_ids=self.args.changes)
return r
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 63c726b..69390c0 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -48,14 +48,19 @@
self.log.debug("Job complete, success: %s" % (not job.failure))
return (not job.failure)
- def enqueue(self, pipeline, project, trigger, change, patchset):
+ def enqueue(self, pipeline, project, trigger, change):
data = {'pipeline': pipeline,
'project': project,
'trigger': trigger,
'change': change,
- 'patchset': patchset,
}
return self.submitJob('zuul:enqueue', data)
+ def promote(self, pipeline, change_ids):
+ data = {'pipeline': pipeline,
+ 'change_ids': change_ids,
+ }
+ return self.submitJob('zuul:promote', data)
+
def shutdown(self):
self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d70ab63..c1b9216 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -39,13 +39,15 @@
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
- self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
+ self.worker.waitForServer()
+ self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
+ self.worker.registerFunction("zuul:promote")
def stop(self):
self.log.debug("Stopping")
@@ -57,10 +59,12 @@
self.thread.join()
def run(self):
+ self.log.debug("Starting RPC listener")
while self._running:
try:
job = self.worker.getJob()
z, jobname = job.name.split(':')
+ self.log.debug("Received job %s" % job.name)
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
@@ -86,31 +90,36 @@
if trigger:
event.trigger_name = args['trigger']
else:
- errors += 'Invalid trigger: %s\n' % args['trigger']
+ errors += 'Invalid trigger: %s\n' % (args['trigger'],)
project = self.sched.layout.projects.get(args['project'])
if project:
event.project_name = args['project']
else:
- errors += 'Invalid project: %s\n' % args['project']
+ errors += 'Invalid project: %s\n' % (args['project'],)
pipeline = self.sched.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
else:
- errors += 'Invalid pipeline: %s\n' % args['pipeline']
+ errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
if not errors:
- event.change_number = args['change']
- event.patch_number = args['patchset']
+ event.change_number, event.patch_number = args['change'].split(',')
try:
event.getChange(project, trigger)
except Exception:
- errors += 'Invalid change: %s,%s\n' % (
- args['change'], args['patchset'])
+ errors += 'Invalid change: %s\n' % (args['change'],)
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.addEvent(event)
job.sendWorkComplete()
+
+ def handle_promote(self, job):
+ args = json.loads(job.arguments)
+ pipeline_name = args['pipeline']
+ change_ids = args['change_ids']
+ self.sched.promote(pipeline_name, change_ids)
+ job.sendWorkComplete()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 31b4429..f8dd6e1 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -22,6 +22,7 @@
import pickle
import Queue
import re
+import sys
import threading
import time
import yaml
@@ -64,12 +65,21 @@
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
+ self._exception = None
+ self._traceback = None
- def setComplete(self):
+ def exception(self, e, tb):
+ self._exception = e
+ self._traceback = tb
+ self._wait_event.set()
+
+ def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
+ if self._exception:
+ raise self._exception, None, self._traceback
return self._wait_event.is_set()
@@ -84,6 +94,20 @@
self.config = config
+class PromoteEvent(ManagementEvent):
+ """Promote one or more changes to the head of the queue.
+
+ :arg str pipeline_name: the name of the pipeline
+ :arg list change_ids: a list of strings of change ids in the form
+ 1234,1
+ """
+
+ def __init__(self, pipeline_name, change_ids):
+ super(PromoteEvent, self).__init__()
+ self.pipeline_name = pipeline_name
+ self.change_ids = change_ids
+
+
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
@@ -396,6 +420,14 @@
event.wait()
self.log.debug("Reconfiguration complete")
+ def promote(self, pipeline_name, change_ids):
+ event = PromoteEvent(pipeline_name, change_ids)
+ self.management_event_queue.put(event)
+ self.wake_event.set()
+ self.log.debug("Waiting for promotion")
+ event.wait()
+ self.log.debug("Promotion complete")
+
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
@@ -520,6 +552,43 @@
finally:
self.layout_lock.release()
+ def _doPromoteEvent(self, event):
+ pipeline = self.layout.pipelines[event.pipeline_name]
+ change_ids = [c.split(',') for c in event.change_ids]
+ items_to_enqueue = []
+ change_queue = None
+ for shared_queue in pipeline.queues:
+ if change_queue:
+ break
+ for item in shared_queue.queue:
+ if (item.change.number == change_ids[0][0] and
+ item.change.patchset == change_ids[0][1]):
+ change_queue = shared_queue
+ break
+ if not change_queue:
+ raise Exception("Unable to find shared change queue for %s" %
+ event.change_ids[0])
+ for number, patchset in change_ids:
+ found = False
+ for item in change_queue.queue:
+ if (item.change.number == number and
+ item.change.patchset == patchset):
+ found = True
+ items_to_enqueue.append(item)
+ break
+ if not found:
+ raise Exception("Unable to find %s,%s in queue %s" %
+ (number, patchset, change_queue))
+ for item in change_queue.queue[:]:
+ if item not in items_to_enqueue:
+ items_to_enqueue.append(item)
+ pipeline.manager.cancelJobs(item)
+ pipeline.manager.dequeueItem(item)
+ for item in items_to_enqueue:
+ pipeline.manager.addChange(item.change, quiet=True)
+ while pipeline.manager.processQueue():
+ pass
+
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
@@ -625,9 +694,16 @@
self.log.debug("Fetching management event")
event = self.management_event_queue.get()
self.log.debug("Processing management event %s" % event)
- if isinstance(event, ReconfigureEvent):
- self._doReconfigureEvent(event)
- event.setComplete()
+ try:
+ if isinstance(event, ReconfigureEvent):
+ self._doReconfigureEvent(event)
+ elif isinstance(event, PromoteEvent):
+ self._doPromoteEvent(event)
+ else:
+ self.log.error("Unable to handle event %s" % event)
+ event.done()
+ except Exception as e:
+ event.exception(e, sys.exc_info()[2])
self.management_event_queue.task_done()
def process_result_queue(self):
@@ -815,10 +891,10 @@
def isChangeReadyToBeEnqueued(self, change):
return True
- def enqueueChangesAhead(self, change):
+ def enqueueChangesAhead(self, change, quiet):
return True
- def enqueueChangesBehind(self, change):
+ def enqueueChangesBehind(self, change, quiet):
return True
def checkForChangesNeededBy(self, change):
@@ -872,7 +948,7 @@
item.change.project)
return False
- def addChange(self, change):
+ def addChange(self, change, quiet=False):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -883,7 +959,7 @@
change)
return False
- if not self.enqueueChangesAhead(change):
+ if not self.enqueueChangesAhead(change, quiet):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@@ -895,11 +971,12 @@
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
- if len(self.pipeline.start_actions) > 0:
- self.reportStart(change)
+ if not quiet:
+ if len(self.pipeline.start_actions) > 0:
+ self.reportStart(change)
item = change_queue.enqueueChange(change)
self.reportStats(item)
- self.enqueueChangesBehind(change)
+ self.enqueueChangesBehind(change, quiet)
else:
self.log.error("Unable to find change queue for project %s" %
change.project)
@@ -970,7 +1047,7 @@
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
to_remove = []
- if prime and item.current_build_set.builds:
+ if prime and item.current_build_set.ref:
item.resetAllBuilds()
for build, build_item in self.building_jobs.items():
if build_item == item:
@@ -1440,7 +1517,7 @@
return False
return True
- def enqueueChangesBehind(self, change):
+ def enqueueChangesBehind(self, change, quiet):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
@@ -1456,15 +1533,15 @@
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
- self.addChange(other_change)
+ self.addChange(other_change, quiet)
- def enqueueChangesAhead(self, change):
+ def enqueueChangesAhead(self, change, quiet):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
(ret, change))
- return self.addChange(ret)
+ return self.addChange(ret, quiet)
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)