Merge "Add a zuul client"
diff --git a/setup.cfg b/setup.cfg
index 45f8e42..9ff62d6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
 [entry_points]
 console_scripts =
     zuul-server = zuul.cmd.server:main
+    zuul = zuul.cmd.client:main
 
 [build_sphinx]
 source-dir = doc/source
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 1be4721..91d0913 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
 
 import zuul.scheduler
 import zuul.webapp
+import zuul.rpclistener
+import zuul.rpcclient
 import zuul.launcher.gearman
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
@@ -351,8 +353,10 @@
             change.setReported()
 
     def query(self, number):
-        change = self.changes[int(number)]
-        return change.query()
+        change = self.changes.get(int(number))
+        if change:
+            return change.query()
+        return {}
 
     def startWatching(self, *args, **kw):
         pass
@@ -806,6 +810,7 @@
         self.fake_gerrit.upstream_root = self.upstream_root
 
         self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
         self.sched.registerTrigger(self.gerrit)
@@ -824,6 +829,7 @@
         self.sched.reconfigure(self.config)
         self.sched.resume()
         self.webapp.start()
+        self.rpc.start()
         self.launcher.gearman.waitForServer()
         self.registerJobs()
         self.builds = self.worker.running_builds
@@ -857,6 +863,8 @@
         self.statsd.join()
         self.webapp.stop()
         self.webapp.join()
+        self.rpc.stop()
+        self.rpc.join()
         threads = threading.enumerate()
         if len(threads) > 1:
             self.log.error("More than one thread is running: %s" % threads)
@@ -956,12 +964,14 @@
         while True:
             done = True
             for connection in self.gearman_server.active_connections:
-                if connection.functions:
+                if (connection.functions and
+                    connection.client_id != 'Zuul RPC Listener'):
                     done = False
             if done:
                 break
             time.sleep(0)
         self.gearman_server.functions = set()
+        self.rpc.register()
 
     def haveAllBuildsReported(self):
         # See if Zuul is waiting on a meta job to complete
@@ -2954,3 +2964,75 @@
                          FakeSMTP.messages[1]['to_email'])
         self.assertEqual(A.messages[0],
                          FakeSMTP.messages[1]['body'])
+
+    def test_client_enqueue(self):
+        "Test that the RPC client can enqueue a change"
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        A.addApproval('APRV', 1)
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        r = client.enqueue(pipeline='gate',
+                           project='org/project',
+                           trigger='gerrit',
+                           change='1',
+                           patchset='1')
+        self.waitUntilSettled()
+        self.assertEqual(self.getJobFromHistory('project-merge').result,
+                         'SUCCESS')
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        self.assertEqual(self.getJobFromHistory('project-test2').result,
+                         'SUCCESS')
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(r, True)
+
+    def test_client_enqueue_negative(self):
+        "Test that the RPC client returns errors"
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid project"):
+            r = client.enqueue(pipeline='gate',
+                               project='project-does-not-exist',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid pipeline"):
+            r = client.enqueue(pipeline='pipeline-does-not-exist',
+                               project='org/project',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid trigger"):
+            r = client.enqueue(pipeline='gate',
+                               project='org/project',
+                               trigger='trigger-does-not-exist',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid change"):
+            r = client.enqueue(pipeline='gate',
+                               project='org/project',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 0)
+        self.assertEqual(len(self.builds), 0)
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
new file mode 100644
index 0000000..a5327a2
--- /dev/null
+++ b/zuul/cmd/client.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import logging
+import logging.config
+import os
+import sys
+
+import zuul.rpcclient
+
+
+class Client(object):
+    log = logging.getLogger("zuul.Client")
+
+    def __init__(self):
+        self.args = None
+        self.config = None
+        self.gear_server_pid = None
+
+    def parse_arguments(self):
+        parser = argparse.ArgumentParser(
+            description='Zuul Project Gating System Client.')
+        parser.add_argument('-c', dest='config',
+                            help='specify the config file')
+        parser.add_argument('-v', dest='verbose', action='store_true',
+                            help='verbose output')
+        parser.add_argument('--version', dest='version', action='store_true',
+                            help='show zuul version')
+
+        subparsers = parser.add_subparsers(title='commands',
+                                           description='valid commands',
+                                           help='additional help')
+
+        cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
+        cmd_enqueue.add_argument('--trigger', help='trigger name',
+                                 required=True)
+        cmd_enqueue.add_argument('--pipeline', help='pipeline name',
+                                 required=True)
+        cmd_enqueue.add_argument('--project', help='project name',
+                                 required=True)
+        cmd_enqueue.add_argument('--change', help='change id',
+                                 required=True)
+        cmd_enqueue.add_argument('--patchset', help='patchset number',
+                                 required=True)
+        cmd_enqueue.set_defaults(func=self.enqueue)
+
+        self.args = parser.parse_args()
+
+    def read_config(self):
+        self.config = ConfigParser.ConfigParser()
+        if self.args.config:
+            locations = [self.args.config]
+        else:
+            locations = ['/etc/zuul/zuul.conf',
+                         '~/zuul.conf']
+        for fp in locations:
+            if os.path.exists(os.path.expanduser(fp)):
+                self.config.read(os.path.expanduser(fp))
+                return
+        raise Exception("Unable to locate config file in %s" % locations)
+
+    def setup_logging(self):
+        if self.args.verbose:
+            logging.basicConfig(level=logging.DEBUG)
+
+    def main(self):
+        self.parse_arguments()
+        self.read_config()
+        self.setup_logging()
+
+        if self.args.version:
+            from zuul.version import version_info as zuul_version_info
+            print "Zuul version: %s" % zuul_version_info.version_string()
+            sys.exit(0)
+
+        self.server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            self.port = self.config.get('gearman', 'port')
+        else:
+            self.port = 4730
+
+        if self.args.func():
+            sys.exit(0)
+        else:
+            sys.exit(1)
+
+    def enqueue(self):
+        client = zuul.rpcclient.RPCClient(self.server, self.port)
+        r = client.enqueue(pipeline=self.args.pipeline,
+                           project=self.args.project,
+                           trigger=self.args.trigger,
+                           change=self.args.change,
+                           patchset=self.args.patchset)
+        return r
+
+
+def main():
+    client = Client()
+    client.main()
+
+
+if __name__ == "__main__":
+    sys.path.insert(0, '.')
+    main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 710f35d..3a51b1c 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -172,6 +172,7 @@
         import zuul.trigger.gerrit
         import zuul.trigger.timer
         import zuul.webapp
+        import zuul.rpclistener
 
         if (self.config.has_option('gearman_server', 'start') and
             self.config.getboolean('gearman_server', 'start')):
@@ -185,6 +186,7 @@
         gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
         timer = zuul.trigger.timer.Timer(self.config, self.sched)
         webapp = zuul.webapp.WebApp(self.sched)
+        rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
         gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
         smtp_reporter = zuul.reporter.smtp.Reporter(
             self.config.get('smtp', 'default_from')
@@ -207,6 +209,7 @@
         self.sched.reconfigure(self.config)
         self.sched.resume()
         webapp.start()
+        rpc.start()
 
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
         signal.signal(signal.SIGUSR1, self.exit_handler)
diff --git a/zuul/model.py b/zuul/model.py
index 5fc8f6f..b71552d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -802,6 +802,9 @@
         self.newrev = None
         # timer
         self.timespec = None
+        # For events that arrive with a destination pipeline (eg, from
+        # an admin command, etc):
+        self.forced_pipeline = None
 
     def __repr__(self):
         ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
new file mode 100644
index 0000000..63c726b
--- /dev/null
+++ b/zuul/rpcclient.py
@@ -0,0 +1,61 @@
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import time
+
+import gear
+
+
+class RPCFailure(Exception):
+    pass
+
+
+class RPCClient(object):
+    log = logging.getLogger("zuul.RPCClient")
+
+    def __init__(self, server, port):
+        self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+        self.gearman = gear.Client()
+        self.gearman.addServer(server, port)
+        self.log.debug("Waiting for gearman")
+        self.gearman.waitForServer()
+
+    def submitJob(self, name, data):
+        self.log.debug("Submitting job %s with data %s" % (name, data))
+        job = gear.Job(name,
+                       json.dumps(data),
+                       unique=str(time.time()))
+        self.gearman.submitJob(job)
+
+        self.log.debug("Waiting for job completion")
+        while not job.complete:
+            time.sleep(0.1)
+        if job.exception:
+            raise RPCFailure(job.exception)
+        self.log.debug("Job complete, success: %s" % (not job.failure))
+        return (not job.failure)
+
+    def enqueue(self, pipeline, project, trigger, change, patchset):
+        data = {'pipeline': pipeline,
+                'project': project,
+                'trigger': trigger,
+                'change': change,
+                'patchset': patchset,
+                }
+        return self.submitJob('zuul:enqueue', data)
+
+    def shutdown(self):
+        self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
new file mode 100644
index 0000000..d70ab63
--- /dev/null
+++ b/zuul/rpclistener.py
@@ -0,0 +1,116 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import model
+
+
+class RPCListener(object):
+    log = logging.getLogger("zuul.RPCListener")
+
+    def __init__(self, config, sched):
+        self.config = config
+        self.sched = sched
+
+    def start(self):
+        self._running = True
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = gear.Worker('Zuul RPC Listener')
+        self.worker.addServer(server, port)
+        self.register()
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def register(self):
+        self.worker.registerFunction("zuul:enqueue")
+
+    def stop(self):
+        self.log.debug("Stopping")
+        self._running = False
+        self.worker.shutdown()
+        self.log.debug("Stopped")
+
+    def join(self):
+        self.thread.join()
+
+    def run(self):
+        while self._running:
+            try:
+                job = self.worker.getJob()
+                z, jobname = job.name.split(':')
+                attrname = 'handle_' + jobname
+                if hasattr(self, attrname):
+                    f = getattr(self, attrname)
+                    if callable(f):
+                        try:
+                            f(job)
+                        except Exception:
+                            self.log.exception("Exception while running job")
+                            job.sendWorkException(traceback.format_exc())
+                    else:
+                        job.sendWorkFail()
+                else:
+                    job.sendWorkFail()
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def handle_enqueue(self, job):
+        args = json.loads(job.arguments)
+        event = model.TriggerEvent()
+        errors = ''
+
+        trigger = self.sched.triggers.get(args['trigger'])
+        if trigger:
+            event.trigger_name = args['trigger']
+        else:
+            errors += 'Invalid trigger: %s\n' % args['trigger']
+
+        project = self.sched.layout.projects.get(args['project'])
+        if project:
+            event.project_name = args['project']
+        else:
+            errors += 'Invalid project: %s\n' % args['project']
+
+        pipeline = self.sched.layout.pipelines.get(args['pipeline'])
+        if pipeline:
+            event.forced_pipeline = args['pipeline']
+        else:
+            errors += 'Invalid pipeline: %s\n' % args['pipeline']
+
+        if not errors:
+            event.change_number = args['change']
+            event.patch_number = args['patchset']
+            try:
+                event.getChange(project, trigger)
+            except Exception:
+                errors += 'Invalid change: %s,%s\n' % (
+                    args['change'], args['patchset'])
+
+        if errors:
+            job.sendWorkException(errors.encode('utf8'))
+        else:
+            self.sched.addEvent(event)
+            job.sendWorkComplete()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 308c790..ea7f61f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -731,6 +731,11 @@
         return allow_needs
 
     def eventMatches(self, event):
+        if event.forced_pipeline:
+            if event.forced_pipeline == self.pipeline.name:
+                return True
+            else:
+                return False
         for ef in self.event_filters:
             if ef.matches(event):
                 return True
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 976849c..3a8644a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -302,7 +302,11 @@
             change.patchset = patchset
         key = '%s,%s' % (change.number, change.patchset)
         self._change_cache[key] = change
-        self.updateChange(change)
+        try:
+            self.updateChange(change)
+        except Exception:
+            del self._change_cache[key]
+            raise
         return change
 
     def updateChange(self, change):
@@ -314,6 +318,9 @@
         if change.patchset is None:
             change.patchset = data['currentPatchSet']['number']
 
+        if 'project' not in data:
+            raise Exception("Change %s,%s not found" % (change.number,
+                                                        change.patchset))
         change.project = self.sched.getProject(data['project'])
         change.branch = data['branch']
         change.url = data['url']