Allow using webapp from connections

Allow connections to register their own handlers for HTTP URIs inside
the zuul's webapp HTTP server. That way, connections can listen for
events comming through HTTP.

Story: 2000774

Change-Id: Ic5887d00ff302f67469df5154e9df10b99f1cfcd
diff --git a/tests/base.py b/tests/base.py
index 9983103..6d3df8b 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1367,6 +1367,9 @@
 
         self.sched = zuul.scheduler.Scheduler(self.config)
 
+        self.webapp = zuul.webapp.WebApp(
+            self.sched, port=0, listen_address='127.0.0.1')
+
         self.event_queues = [
             self.sched.result_event_queue,
             self.sched.trigger_event_queue,
@@ -1374,7 +1377,7 @@
         ]
 
         self.configure_connections()
-        self.sched.registerConnections(self.connections)
+        self.sched.registerConnections(self.connections, self.webapp)
 
         def URLOpenerFactory(*args, **kw):
             if isinstance(args[0], urllib.request.Request):
@@ -1414,8 +1417,6 @@
         self.sched.setNodepool(self.nodepool)
         self.sched.setZooKeeper(self.zk)
 
-        self.webapp = zuul.webapp.WebApp(
-            self.sched, port=0, listen_address='127.0.0.1')
         self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.start()
diff --git a/tests/unit/test_webapp.py b/tests/unit/test_webapp.py
index 8791a25..4511ec7 100644
--- a/tests/unit/test_webapp.py
+++ b/tests/unit/test_webapp.py
@@ -19,6 +19,7 @@
 import json
 
 from six.moves import urllib
+import webob
 
 from tests.base import ZuulTestCase, FIXTURE_DIR
 
@@ -96,3 +97,16 @@
             self.port)
         f = urllib.request.urlopen(req)
         self.assertEqual(f.read(), public_pem)
+
+    def test_webapp_custom_handler(self):
+        def custom_handler(path, tenant_name, request):
+            return webob.Response(body='ok')
+
+        self.webapp.register_path('/custom', custom_handler)
+        req = urllib.request.Request(
+            "http://localhost:%s/custom" % self.port)
+        f = urllib.request.urlopen(req)
+        self.assertEqual('ok', f.read())
+
+        self.webapp.unregister_path('/custom')
+        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index ff4e1f4..f1d1015 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -182,7 +182,7 @@
         self.log.info('Starting scheduler')
         try:
             self.sched.start()
-            self.sched.registerConnections(self.connections)
+            self.sched.registerConnections(self.connections, webapp)
             self.sched.reconfigure(self.config)
             self.sched.resume()
         except Exception:
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 6913294..49624d7 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -59,3 +59,21 @@
         This lets the user supply a list of change objects that are
         still in use.  Anything in our cache that isn't in the supplied
         list should be safe to remove from the cache."""
+
+    def registerWebapp(self, webapp):
+        self.webapp = webapp
+
+    def registerHttpHandler(self, path, handler):
+        """Add connection handler for HTTP URI.
+
+        Connection can use builtin HTTP server for listening on incoming event
+        requests. The resulting path will be /connection/connection_name/path.
+        """
+        self.webapp.register_path(self._connectionPath(path), handler)
+
+    def unregisterHttpHandler(self, path):
+        """Remove the connection handler for HTTP URI."""
+        self.webapp.unregister_path(self._connectionPath(path))
+
+    def _connectionPath(self, path):
+        return '/connection/%s/%s' % (self.connection_name, path)
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 3c69286..403aca6 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -58,6 +58,13 @@
             if load:
                 connection.onLoad()
 
+    def registerWebapp(self, webapp):
+        for driver_name, driver in self.drivers.items():
+            if hasattr(driver, 'registerWebapp'):
+                driver.registerWebapp(webapp)
+        for connection_name, connection in self.connections.items():
+            connection.registerWebapp(webapp)
+
     def reconfigureDrivers(self, tenant):
         for driver in self.drivers.values():
             if hasattr(driver, 'reconfigure'):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 47bd471..53ca4c1 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -232,10 +232,11 @@
         self.stopConnections()
         self.wake_event.set()
 
-    def registerConnections(self, connections, load=True):
+    def registerConnections(self, connections, webapp, load=True):
         # load: whether or not to trigger the onLoad for the connection. This
         # is useful for not doing a full load during layout validation.
         self.connections = connections
+        self.connections.registerWebapp(webapp)
         self.connections.registerScheduler(self, load)
 
     def stopConnections(self):
diff --git a/zuul/webapp.py b/zuul/webapp.py
index 4f040fa..f5a7373 100644
--- a/zuul/webapp.py
+++ b/zuul/webapp.py
@@ -45,6 +45,7 @@
 
 class WebApp(threading.Thread):
     log = logging.getLogger("zuul.WebApp")
+    change_path_regexp = '/status/change/(\d+,\d+)$'
 
     def __init__(self, scheduler, port=8001, cache_expiry=1,
                  listen_address='0.0.0.0'):
@@ -56,10 +57,16 @@
         self.cache_time = 0
         self.cache = {}
         self.daemon = True
+        self.routes = {}
+        self._init_default_routes()
         self.server = httpserver.serve(
             dec.wsgify(self.app), host=self.listen_address, port=self.port,
             start_loop=False)
 
+    def _init_default_routes(self):
+        self.register_path('/(status\.json|status)$', self.status)
+        self.register_path(self.change_path_regexp, self.change)
+
     def run(self):
         self.server.serve_forever()
 
@@ -90,14 +97,13 @@
             return change['id'] == rev
         return self._changes_by_func(func, tenant_name)
 
-    def _normalize_path(self, path):
-        # support legacy status.json as well as new /status
-        if path == '/status.json' or path == '/status':
-            return "status"
-        m = re.match('/status/change/(\d+,\d+)$', path)
-        if m:
-            return m.group(1)
-        return None
+    def register_path(self, path, handler):
+        path_re = re.compile(path)
+        self.routes[path] = (path_re, handler)
+
+    def unregister_path(self, path):
+        if self.routes.get(path):
+            del self.routes[path]
 
     def _handle_keys(self, request, path):
         m = re.match('/keys/(.*?)/(.*?).pub', path)
@@ -120,14 +126,43 @@
         return response.conditional_response_app
 
     def app(self, request):
+        # Try registered paths without a tenant_name first
+        path = request.path
+        for path_re, handler in self.routes.itervalues():
+            if path_re.match(path):
+                return handler(path, '', request)
+
+        # Now try with a tenant_name stripped
         tenant_name = request.path.split('/')[1]
         path = request.path.replace('/' + tenant_name, '')
+        # Handle keys
         if path.startswith('/keys'):
             return self._handle_keys(request, path)
-        path = self._normalize_path(path)
-        if path is None:
+        for path_re, handler in self.routes.itervalues():
+            if path_re.match(path):
+                return handler(path, tenant_name, request)
+        else:
             raise webob.exc.HTTPNotFound()
 
+    def status(self, path, tenant_name, request):
+        def func():
+            return webob.Response(body=self.cache[tenant_name],
+                                  content_type='application/json')
+        return self._response_with_status_cache(func, tenant_name)
+
+    def change(self, path, tenant_name, request):
+        def func():
+            m = re.match(self.change_path_regexp, path)
+            change_id = m.group(1)
+            status = self._status_for_change(change_id, tenant_name)
+            if status:
+                return webob.Response(body=status,
+                                      content_type='application/json')
+            else:
+                raise webob.exc.HTTPNotFound()
+        return self._response_with_status_cache(func, tenant_name)
+
+    def _refresh_status_cache(self, tenant_name):
         if (tenant_name not in self.cache or
             (time.time() - self.cache_time) > self.cache_expiry):
             try:
@@ -140,16 +175,10 @@
                 self.log.exception("Exception formatting status:")
                 raise
 
-        if path == 'status':
-            response = webob.Response(body=self.cache[tenant_name],
-                                      content_type='application/json')
-        else:
-            status = self._status_for_change(path, tenant_name)
-            if status:
-                response = webob.Response(body=status,
-                                          content_type='application/json')
-            else:
-                raise webob.exc.HTTPNotFound()
+    def _response_with_status_cache(self, func, tenant_name):
+        self._refresh_status_cache(tenant_name)
+
+        response = func()
 
         response.headers['Access-Control-Allow-Origin'] = '*'