Merge "Document zuul_url breaking change"
diff --git a/setup.cfg b/setup.cfg
index 45f8e42..9ff62d6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
+ zuul = zuul.cmd.client:main
[build_sphinx]
source-dir = doc/source
diff --git a/tests/fixtures/layout-merge-queues.yaml b/tests/fixtures/layout-merge-queues.yaml
new file mode 100644
index 0000000..be39a1c
--- /dev/null
+++ b/tests/fixtures/layout-merge-queues.yaml
@@ -0,0 +1,25 @@
+pipelines:
+ - name: gate
+ manager: DependentPipelineManager
+ precedence: low
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+
+projects:
+ - name: projectA
+ gate:
+ - test-only-a
+ - common-test1
+
+ - name: projectB
+ gate:
+ - test-only-b
+ - common-test2
+
+ - name: projectC
+ gate:
+ - common-test1
+ - common-test2
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 4832af9..91d0913 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
import zuul.scheduler
import zuul.webapp
+import zuul.rpclistener
+import zuul.rpcclient
import zuul.launcher.gearman
import zuul.reporter.gerrit
import zuul.reporter.smtp
@@ -351,8 +353,10 @@
change.setReported()
def query(self, number):
- change = self.changes[int(number)]
- return change.query()
+ change = self.changes.get(int(number))
+ if change:
+ return change.query()
+ return {}
def startWatching(self, *args, **kw):
pass
@@ -806,6 +810,7 @@
self.fake_gerrit.upstream_root = self.upstream_root
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+ self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.registerTrigger(self.gerrit)
@@ -824,6 +829,7 @@
self.sched.reconfigure(self.config)
self.sched.resume()
self.webapp.start()
+ self.rpc.start()
self.launcher.gearman.waitForServer()
self.registerJobs()
self.builds = self.worker.running_builds
@@ -857,6 +863,8 @@
self.statsd.join()
self.webapp.stop()
self.webapp.join()
+ self.rpc.stop()
+ self.rpc.join()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@@ -956,12 +964,14 @@
while True:
done = True
for connection in self.gearman_server.active_connections:
- if connection.functions:
+ if (connection.functions and
+ connection.client_id != 'Zuul RPC Listener'):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
+ self.rpc.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
@@ -2747,6 +2757,13 @@
self.assertIn('project-test1', status_jobs)
self.assertIn('project-test2', status_jobs)
+ def test_merging_queues(self):
+ "Test that transitively-connected change queues are merged"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-merge-queues.yaml')
+ self.sched.reconfigure(self.config)
+ self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
+
def test_node_label(self):
"Test that a job runs on a specific node label"
self.worker.registerFunction('build:node-project-test1:debian')
@@ -2947,3 +2964,75 @@
FakeSMTP.messages[1]['to_email'])
self.assertEqual(A.messages[0],
FakeSMTP.messages[1]['body'])
+
+ def test_client_enqueue(self):
+ "Test that the RPC client can enqueue a change"
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ A.addApproval('APRV', 1)
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project-merge').result,
+ 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory('project-test2').result,
+ 'SUCCESS')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(r, True)
+
+ def test_client_enqueue_negative(self):
+ "Test that the RPC client returns errors"
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid project"):
+ r = client.enqueue(pipeline='gate',
+ project='project-does-not-exist',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid pipeline"):
+ r = client.enqueue(pipeline='pipeline-does-not-exist',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid trigger"):
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='trigger-does-not-exist',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid change"):
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 0)
+ self.assertEqual(len(self.builds), 0)
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
new file mode 100644
index 0000000..a5327a2
--- /dev/null
+++ b/zuul/cmd/client.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import logging
+import logging.config
+import os
+import sys
+
+import zuul.rpcclient
+
+
+class Client(object):
+ log = logging.getLogger("zuul.Client")
+
+ def __init__(self):
+ self.args = None
+ self.config = None
+ self.gear_server_pid = None
+
+ def parse_arguments(self):
+ parser = argparse.ArgumentParser(
+ description='Zuul Project Gating System Client.')
+ parser.add_argument('-c', dest='config',
+ help='specify the config file')
+ parser.add_argument('-v', dest='verbose', action='store_true',
+ help='verbose output')
+ parser.add_argument('--version', dest='version', action='store_true',
+ help='show zuul version')
+
+ subparsers = parser.add_subparsers(title='commands',
+ description='valid commands',
+ help='additional help')
+
+ cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
+ cmd_enqueue.add_argument('--trigger', help='trigger name',
+ required=True)
+ cmd_enqueue.add_argument('--pipeline', help='pipeline name',
+ required=True)
+ cmd_enqueue.add_argument('--project', help='project name',
+ required=True)
+ cmd_enqueue.add_argument('--change', help='change id',
+ required=True)
+ cmd_enqueue.add_argument('--patchset', help='patchset number',
+ required=True)
+ cmd_enqueue.set_defaults(func=self.enqueue)
+
+ self.args = parser.parse_args()
+
+ def read_config(self):
+ self.config = ConfigParser.ConfigParser()
+ if self.args.config:
+ locations = [self.args.config]
+ else:
+ locations = ['/etc/zuul/zuul.conf',
+ '~/zuul.conf']
+ for fp in locations:
+ if os.path.exists(os.path.expanduser(fp)):
+ self.config.read(os.path.expanduser(fp))
+ return
+ raise Exception("Unable to locate config file in %s" % locations)
+
+ def setup_logging(self):
+ if self.args.verbose:
+ logging.basicConfig(level=logging.DEBUG)
+
+ def main(self):
+ self.parse_arguments()
+ self.read_config()
+ self.setup_logging()
+
+ if self.args.version:
+ from zuul.version import version_info as zuul_version_info
+ print "Zuul version: %s" % zuul_version_info.version_string()
+ sys.exit(0)
+
+ self.server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ self.port = self.config.get('gearman', 'port')
+ else:
+ self.port = 4730
+
+ if self.args.func():
+ sys.exit(0)
+ else:
+ sys.exit(1)
+
+ def enqueue(self):
+ client = zuul.rpcclient.RPCClient(self.server, self.port)
+ r = client.enqueue(pipeline=self.args.pipeline,
+ project=self.args.project,
+ trigger=self.args.trigger,
+ change=self.args.change,
+ patchset=self.args.patchset)
+ return r
+
+
+def main():
+ client = Client()
+ client.main()
+
+
+if __name__ == "__main__":
+ sys.path.insert(0, '.')
+ main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 710f35d..3a51b1c 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -172,6 +172,7 @@
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.webapp
+ import zuul.rpclistener
if (self.config.has_option('gearman_server', 'start') and
self.config.getboolean('gearman_server', 'start')):
@@ -185,6 +186,7 @@
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
+ rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from')
@@ -207,6 +209,7 @@
self.sched.reconfigure(self.config)
self.sched.resume()
webapp.start()
+ rpc.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
diff --git a/zuul/model.py b/zuul/model.py
index 0c69430..b71552d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -72,7 +72,7 @@
return job_tree
def getProjects(self):
- return self.job_trees.keys()
+ return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
def addQueue(self, queue):
self.queues.append(queue)
@@ -802,6 +802,9 @@
self.newrev = None
# timer
self.timespec = None
+ # For events that arrive with a destination pipeline (eg, from
+ # an admin command, etc):
+ self.forced_pipeline = None
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
new file mode 100644
index 0000000..63c726b
--- /dev/null
+++ b/zuul/rpcclient.py
@@ -0,0 +1,61 @@
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import time
+
+import gear
+
+
+class RPCFailure(Exception):
+ pass
+
+
+class RPCClient(object):
+ log = logging.getLogger("zuul.RPCClient")
+
+ def __init__(self, server, port):
+ self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+ self.gearman = gear.Client()
+ self.gearman.addServer(server, port)
+ self.log.debug("Waiting for gearman")
+ self.gearman.waitForServer()
+
+ def submitJob(self, name, data):
+ self.log.debug("Submitting job %s with data %s" % (name, data))
+ job = gear.Job(name,
+ json.dumps(data),
+ unique=str(time.time()))
+ self.gearman.submitJob(job)
+
+ self.log.debug("Waiting for job completion")
+ while not job.complete:
+ time.sleep(0.1)
+ if job.exception:
+ raise RPCFailure(job.exception)
+ self.log.debug("Job complete, success: %s" % (not job.failure))
+ return (not job.failure)
+
+ def enqueue(self, pipeline, project, trigger, change, patchset):
+ data = {'pipeline': pipeline,
+ 'project': project,
+ 'trigger': trigger,
+ 'change': change,
+ 'patchset': patchset,
+ }
+ return self.submitJob('zuul:enqueue', data)
+
+ def shutdown(self):
+ self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
new file mode 100644
index 0000000..d70ab63
--- /dev/null
+++ b/zuul/rpclistener.py
@@ -0,0 +1,116 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import model
+
+
+class RPCListener(object):
+ log = logging.getLogger("zuul.RPCListener")
+
+ def __init__(self, config, sched):
+ self.config = config
+ self.sched = sched
+
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = gear.Worker('Zuul RPC Listener')
+ self.worker.addServer(server, port)
+ self.register()
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def register(self):
+ self.worker.registerFunction("zuul:enqueue")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self._running = False
+ self.worker.shutdown()
+ self.log.debug("Stopped")
+
+ def join(self):
+ self.thread.join()
+
+ def run(self):
+ while self._running:
+ try:
+ job = self.worker.getJob()
+ z, jobname = job.name.split(':')
+ attrname = 'handle_' + jobname
+ if hasattr(self, attrname):
+ f = getattr(self, attrname)
+ if callable(f):
+ try:
+ f(job)
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+ else:
+ job.sendWorkFail()
+ else:
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def handle_enqueue(self, job):
+ args = json.loads(job.arguments)
+ event = model.TriggerEvent()
+ errors = ''
+
+ trigger = self.sched.triggers.get(args['trigger'])
+ if trigger:
+ event.trigger_name = args['trigger']
+ else:
+ errors += 'Invalid trigger: %s\n' % args['trigger']
+
+ project = self.sched.layout.projects.get(args['project'])
+ if project:
+ event.project_name = args['project']
+ else:
+ errors += 'Invalid project: %s\n' % args['project']
+
+ pipeline = self.sched.layout.pipelines.get(args['pipeline'])
+ if pipeline:
+ event.forced_pipeline = args['pipeline']
+ else:
+ errors += 'Invalid pipeline: %s\n' % args['pipeline']
+
+ if not errors:
+ event.change_number = args['change']
+ event.patch_number = args['patchset']
+ try:
+ event.getChange(project, trigger)
+ except Exception:
+ errors += 'Invalid change: %s,%s\n' % (
+ args['change'], args['patchset'])
+
+ if errors:
+ job.sendWorkException(errors.encode('utf8'))
+ else:
+ self.sched.addEvent(event)
+ job.sendWorkComplete()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8b6c20c..ea7f61f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -731,6 +731,11 @@
return allow_needs
def eventMatches(self, event):
+ if event.forced_pipeline:
+ if event.forced_pipeline == self.pipeline.name:
+ return True
+ else:
+ return False
for ef in self.event_filters:
if ef.matches(event):
return True
@@ -1368,6 +1373,21 @@
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
+ # Iterate over all queues trying to combine them, and keep doing
+ # so until they can not be combined further.
+ last_change_queues = change_queues
+ while True:
+ new_change_queues = self.combineChangeQueues(last_change_queues)
+ if len(last_change_queues) == len(new_change_queues):
+ break
+ last_change_queues = new_change_queues
+
+ self.log.info(" Shared change queues:")
+ for queue in new_change_queues:
+ self.pipeline.addQueue(queue)
+ self.log.info(" %s" % queue)
+
+ def combineChangeQueues(self, change_queues):
self.log.debug("Combining shared queues")
new_change_queues = []
for a in change_queues:
@@ -1381,11 +1401,7 @@
if not merged_a:
self.log.debug("Keeping queue %s" % (a))
new_change_queues.append(a)
-
- self.log.info(" Shared change queues:")
- for queue in new_change_queues:
- self.pipeline.addQueue(queue)
- self.log.info(" %s" % queue)
+ return new_change_queues
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.trigger.canMerge(change,
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 976849c..3a8644a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -302,7 +302,11 @@
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
- self.updateChange(change)
+ try:
+ self.updateChange(change)
+ except Exception:
+ del self._change_cache[key]
+ raise
return change
def updateChange(self, change):
@@ -314,6 +318,9 @@
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
+ if 'project' not in data:
+ raise Exception("Change %s,%s not found" % (change.number,
+ change.patchset))
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']