Add a zuul client
Add a command line client called 'zuul' that supports one command
to start with: 'enqueue'. It allows an operator (one with access
to the gearman server) to enqueue an arbitrary change in a specified
pipeline. It uses gearman to communicate with the Zuul server, which
now has an added RPC listener component to answer such requests via
gearman.
Add tests for the client RPC interface.
Raise an exception if a Gerrit query does not produce a change. Unlike
events from Gerrit, user (or admin) submitted events over the RPC bus
are more likely to reference invalid changes. To validate those, the
Gerrit trigger will raise an exception (and remove from its cache) changes
which prove to be invalid.
Change-Id: Ife07683a736c15f4db44a0f9881f3f71b78716b2
diff --git a/setup.cfg b/setup.cfg
index 45f8e42..9ff62d6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
+ zuul = zuul.cmd.client:main
[build_sphinx]
source-dir = doc/source
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 4832af9..63ab94f 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
import zuul.scheduler
import zuul.webapp
+import zuul.rpclistener
+import zuul.rpcclient
import zuul.launcher.gearman
import zuul.reporter.gerrit
import zuul.reporter.smtp
@@ -351,8 +353,10 @@
change.setReported()
def query(self, number):
- change = self.changes[int(number)]
- return change.query()
+ change = self.changes.get(int(number))
+ if change:
+ return change.query()
+ return {}
def startWatching(self, *args, **kw):
pass
@@ -806,6 +810,7 @@
self.fake_gerrit.upstream_root = self.upstream_root
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+ self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.registerTrigger(self.gerrit)
@@ -824,6 +829,7 @@
self.sched.reconfigure(self.config)
self.sched.resume()
self.webapp.start()
+ self.rpc.start()
self.launcher.gearman.waitForServer()
self.registerJobs()
self.builds = self.worker.running_builds
@@ -857,6 +863,8 @@
self.statsd.join()
self.webapp.stop()
self.webapp.join()
+ self.rpc.stop()
+ self.rpc.join()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@@ -956,12 +964,14 @@
while True:
done = True
for connection in self.gearman_server.active_connections:
- if connection.functions:
+ if (connection.functions and
+ connection.client_id != 'Zuul RPC Listener'):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
+ self.rpc.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
@@ -2947,3 +2957,75 @@
FakeSMTP.messages[1]['to_email'])
self.assertEqual(A.messages[0],
FakeSMTP.messages[1]['body'])
+
+ def test_client_enqueue(self):
+ "Test that the RPC client can enqueue a change"
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ A.addApproval('APRV', 1)
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project-merge').result,
+ 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory('project-test2').result,
+ 'SUCCESS')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(r, True)
+
+ def test_client_enqueue_negative(self):
+ "Test that the RPC client returns errors"
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid project"):
+ r = client.enqueue(pipeline='gate',
+ project='project-does-not-exist',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid pipeline"):
+ r = client.enqueue(pipeline='pipeline-does-not-exist',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid trigger"):
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='trigger-does-not-exist',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+ "Invalid change"):
+ r = client.enqueue(pipeline='gate',
+ project='org/project',
+ trigger='gerrit',
+ change='1',
+ patchset='1')
+ client.shutdown()
+ self.assertEqual(r, False)
+
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 0)
+ self.assertEqual(len(self.builds), 0)
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
new file mode 100644
index 0000000..a5327a2
--- /dev/null
+++ b/zuul/cmd/client.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import logging
+import logging.config
+import os
+import sys
+
+import zuul.rpcclient
+
+
+class Client(object):
+ log = logging.getLogger("zuul.Client")
+
+ def __init__(self):
+ self.args = None
+ self.config = None
+ self.gear_server_pid = None
+
+ def parse_arguments(self):
+ parser = argparse.ArgumentParser(
+ description='Zuul Project Gating System Client.')
+ parser.add_argument('-c', dest='config',
+ help='specify the config file')
+ parser.add_argument('-v', dest='verbose', action='store_true',
+ help='verbose output')
+ parser.add_argument('--version', dest='version', action='store_true',
+ help='show zuul version')
+
+ subparsers = parser.add_subparsers(title='commands',
+ description='valid commands',
+ help='additional help')
+
+ cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
+ cmd_enqueue.add_argument('--trigger', help='trigger name',
+ required=True)
+ cmd_enqueue.add_argument('--pipeline', help='pipeline name',
+ required=True)
+ cmd_enqueue.add_argument('--project', help='project name',
+ required=True)
+ cmd_enqueue.add_argument('--change', help='change id',
+ required=True)
+ cmd_enqueue.add_argument('--patchset', help='patchset number',
+ required=True)
+ cmd_enqueue.set_defaults(func=self.enqueue)
+
+ self.args = parser.parse_args()
+
+ def read_config(self):
+ self.config = ConfigParser.ConfigParser()
+ if self.args.config:
+ locations = [self.args.config]
+ else:
+ locations = ['/etc/zuul/zuul.conf',
+ '~/zuul.conf']
+ for fp in locations:
+ if os.path.exists(os.path.expanduser(fp)):
+ self.config.read(os.path.expanduser(fp))
+ return
+ raise Exception("Unable to locate config file in %s" % locations)
+
+ def setup_logging(self):
+ if self.args.verbose:
+ logging.basicConfig(level=logging.DEBUG)
+
+ def main(self):
+ self.parse_arguments()
+ self.read_config()
+ self.setup_logging()
+
+ if self.args.version:
+ from zuul.version import version_info as zuul_version_info
+ print "Zuul version: %s" % zuul_version_info.version_string()
+ sys.exit(0)
+
+ self.server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ self.port = self.config.get('gearman', 'port')
+ else:
+ self.port = 4730
+
+ if self.args.func():
+ sys.exit(0)
+ else:
+ sys.exit(1)
+
+ def enqueue(self):
+ client = zuul.rpcclient.RPCClient(self.server, self.port)
+ r = client.enqueue(pipeline=self.args.pipeline,
+ project=self.args.project,
+ trigger=self.args.trigger,
+ change=self.args.change,
+ patchset=self.args.patchset)
+ return r
+
+
+def main():
+ client = Client()
+ client.main()
+
+
+if __name__ == "__main__":
+ sys.path.insert(0, '.')
+ main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 710f35d..3a51b1c 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -172,6 +172,7 @@
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.webapp
+ import zuul.rpclistener
if (self.config.has_option('gearman_server', 'start') and
self.config.getboolean('gearman_server', 'start')):
@@ -185,6 +186,7 @@
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
+ rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from')
@@ -207,6 +209,7 @@
self.sched.reconfigure(self.config)
self.sched.resume()
webapp.start()
+ rpc.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
diff --git a/zuul/model.py b/zuul/model.py
index 0c69430..9a076d6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -802,6 +802,9 @@
self.newrev = None
# timer
self.timespec = None
+ # For events that arrive with a destination pipeline (eg, from
+ # an admin command, etc):
+ self.forced_pipeline = None
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
new file mode 100644
index 0000000..63c726b
--- /dev/null
+++ b/zuul/rpcclient.py
@@ -0,0 +1,61 @@
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import time
+
+import gear
+
+
+class RPCFailure(Exception):
+ pass
+
+
+class RPCClient(object):
+ log = logging.getLogger("zuul.RPCClient")
+
+ def __init__(self, server, port):
+ self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+ self.gearman = gear.Client()
+ self.gearman.addServer(server, port)
+ self.log.debug("Waiting for gearman")
+ self.gearman.waitForServer()
+
+ def submitJob(self, name, data):
+ self.log.debug("Submitting job %s with data %s" % (name, data))
+ job = gear.Job(name,
+ json.dumps(data),
+ unique=str(time.time()))
+ self.gearman.submitJob(job)
+
+ self.log.debug("Waiting for job completion")
+ while not job.complete:
+ time.sleep(0.1)
+ if job.exception:
+ raise RPCFailure(job.exception)
+ self.log.debug("Job complete, success: %s" % (not job.failure))
+ return (not job.failure)
+
+ def enqueue(self, pipeline, project, trigger, change, patchset):
+ data = {'pipeline': pipeline,
+ 'project': project,
+ 'trigger': trigger,
+ 'change': change,
+ 'patchset': patchset,
+ }
+ return self.submitJob('zuul:enqueue', data)
+
+ def shutdown(self):
+ self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
new file mode 100644
index 0000000..d70ab63
--- /dev/null
+++ b/zuul/rpclistener.py
@@ -0,0 +1,116 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import model
+
+
+class RPCListener(object):
+ log = logging.getLogger("zuul.RPCListener")
+
+ def __init__(self, config, sched):
+ self.config = config
+ self.sched = sched
+
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = gear.Worker('Zuul RPC Listener')
+ self.worker.addServer(server, port)
+ self.register()
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def register(self):
+ self.worker.registerFunction("zuul:enqueue")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self._running = False
+ self.worker.shutdown()
+ self.log.debug("Stopped")
+
+ def join(self):
+ self.thread.join()
+
+ def run(self):
+ while self._running:
+ try:
+ job = self.worker.getJob()
+ z, jobname = job.name.split(':')
+ attrname = 'handle_' + jobname
+ if hasattr(self, attrname):
+ f = getattr(self, attrname)
+ if callable(f):
+ try:
+ f(job)
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+ else:
+ job.sendWorkFail()
+ else:
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def handle_enqueue(self, job):
+ args = json.loads(job.arguments)
+ event = model.TriggerEvent()
+ errors = ''
+
+ trigger = self.sched.triggers.get(args['trigger'])
+ if trigger:
+ event.trigger_name = args['trigger']
+ else:
+ errors += 'Invalid trigger: %s\n' % args['trigger']
+
+ project = self.sched.layout.projects.get(args['project'])
+ if project:
+ event.project_name = args['project']
+ else:
+ errors += 'Invalid project: %s\n' % args['project']
+
+ pipeline = self.sched.layout.pipelines.get(args['pipeline'])
+ if pipeline:
+ event.forced_pipeline = args['pipeline']
+ else:
+ errors += 'Invalid pipeline: %s\n' % args['pipeline']
+
+ if not errors:
+ event.change_number = args['change']
+ event.patch_number = args['patchset']
+ try:
+ event.getChange(project, trigger)
+ except Exception:
+ errors += 'Invalid change: %s,%s\n' % (
+ args['change'], args['patchset'])
+
+ if errors:
+ job.sendWorkException(errors.encode('utf8'))
+ else:
+ self.sched.addEvent(event)
+ job.sendWorkComplete()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8b6c20c..28e7eb1 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -731,6 +731,11 @@
return allow_needs
def eventMatches(self, event):
+ if event.forced_pipeline:
+ if event.forced_pipeline == self.pipeline.name:
+ return True
+ else:
+ return False
for ef in self.event_filters:
if ef.matches(event):
return True
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 976849c..3a8644a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -302,7 +302,11 @@
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
- self.updateChange(change)
+ try:
+ self.updateChange(change)
+ except Exception:
+ del self._change_cache[key]
+ raise
return change
def updateChange(self, change):
@@ -314,6 +318,9 @@
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
+ if 'project' not in data:
+ raise Exception("Change %s,%s not found" % (change.number,
+ change.patchset))
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']