Add a zuul client

Add a command line client called 'zuul' that supports one command
to start with: 'enqueue'.  It allows an operator (one with access
to the gearman server) to enqueue an arbitrary change in a specified
pipeline.  It uses gearman to communicate with the Zuul server, which
now has an added RPC listener component to answer such requests via
gearman.

Add tests for the client RPC interface.

Raise an exception if a Gerrit query does not produce a change.  Unlike
events from Gerrit, user (or admin) submitted events over the RPC bus
are more likely to reference invalid changes.  To validate those, the
Gerrit trigger will raise an exception (and remove from its cache) changes
which prove to be invalid.

Change-Id: Ife07683a736c15f4db44a0f9881f3f71b78716b2
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 4832af9..63ab94f 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
 
 import zuul.scheduler
 import zuul.webapp
+import zuul.rpclistener
+import zuul.rpcclient
 import zuul.launcher.gearman
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
@@ -351,8 +353,10 @@
             change.setReported()
 
     def query(self, number):
-        change = self.changes[int(number)]
-        return change.query()
+        change = self.changes.get(int(number))
+        if change:
+            return change.query()
+        return {}
 
     def startWatching(self, *args, **kw):
         pass
@@ -806,6 +810,7 @@
         self.fake_gerrit.upstream_root = self.upstream_root
 
         self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
         self.sched.registerTrigger(self.gerrit)
@@ -824,6 +829,7 @@
         self.sched.reconfigure(self.config)
         self.sched.resume()
         self.webapp.start()
+        self.rpc.start()
         self.launcher.gearman.waitForServer()
         self.registerJobs()
         self.builds = self.worker.running_builds
@@ -857,6 +863,8 @@
         self.statsd.join()
         self.webapp.stop()
         self.webapp.join()
+        self.rpc.stop()
+        self.rpc.join()
         threads = threading.enumerate()
         if len(threads) > 1:
             self.log.error("More than one thread is running: %s" % threads)
@@ -956,12 +964,14 @@
         while True:
             done = True
             for connection in self.gearman_server.active_connections:
-                if connection.functions:
+                if (connection.functions and
+                    connection.client_id != 'Zuul RPC Listener'):
                     done = False
             if done:
                 break
             time.sleep(0)
         self.gearman_server.functions = set()
+        self.rpc.register()
 
     def haveAllBuildsReported(self):
         # See if Zuul is waiting on a meta job to complete
@@ -2947,3 +2957,75 @@
                          FakeSMTP.messages[1]['to_email'])
         self.assertEqual(A.messages[0],
                          FakeSMTP.messages[1]['body'])
+
+    def test_client_enqueue(self):
+        "Test that the RPC client can enqueue a change"
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        A.addApproval('APRV', 1)
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        r = client.enqueue(pipeline='gate',
+                           project='org/project',
+                           trigger='gerrit',
+                           change='1',
+                           patchset='1')
+        self.waitUntilSettled()
+        self.assertEqual(self.getJobFromHistory('project-merge').result,
+                         'SUCCESS')
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        self.assertEqual(self.getJobFromHistory('project-test2').result,
+                         'SUCCESS')
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(r, True)
+
+    def test_client_enqueue_negative(self):
+        "Test that the RPC client returns errors"
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid project"):
+            r = client.enqueue(pipeline='gate',
+                               project='project-does-not-exist',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid pipeline"):
+            r = client.enqueue(pipeline='pipeline-does-not-exist',
+                               project='org/project',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid trigger"):
+            r = client.enqueue(pipeline='gate',
+                               project='org/project',
+                               trigger='trigger-does-not-exist',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
+                                         "Invalid change"):
+            r = client.enqueue(pipeline='gate',
+                               project='org/project',
+                               trigger='gerrit',
+                               change='1',
+                               patchset='1')
+            client.shutdown()
+            self.assertEqual(r, False)
+
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 0)
+        self.assertEqual(len(self.builds), 0)