Move github webhook from webapp to zuul-web
We want to have zuul-web to handle all http serving stuff so also the
github webhook handling needs to be moved to zuul-web.
Note that this changes the url of the github webhooks to
/driver/github/<connection_name>/payload.
Change-Id: I6482de6c5b9655ac0b9bf353b37a59cd5406f1b7
Signed-off-by: Jesse Keating <omgjlk@us.ibm.com>
Co-Authored-by: Tobias Henkel <tobias.henkel@bmw.de>
diff --git a/tests/base.py b/tests/base.py
index c449242..e7151df 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -66,9 +66,11 @@
import zuul.merger.server
import zuul.model
import zuul.nodepool
+import zuul.rpcclient
import zuul.zk
import zuul.configloader
from zuul.exceptions import MergeFailure
+from zuul.lib.config import get_default
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@@ -939,7 +941,7 @@
class FakeGithubConnection(githubconnection.GithubConnection):
log = logging.getLogger("zuul.test.FakeGithubConnection")
- def __init__(self, driver, connection_name, connection_config,
+ def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakeGithubConnection, self).__init__(driver, connection_name,
connection_config)
@@ -952,12 +954,16 @@
self.merge_not_allowed_count = 0
self.reports = []
self.github_client = tests.fakegithub.FakeGithub(changes_db)
+ self.rpcclient = rpcclient
def getGithubClient(self,
project=None,
user_id=None):
return self.github_client
+ def setZuulWebPort(self, port):
+ self.zuul_web_port = port
+
def openFakePullRequest(self, project, branch, subject, files=[],
body=None):
self.pr_number += 1
@@ -991,19 +997,25 @@
}
return (name, data)
- def emitEvent(self, event):
+ def emitEvent(self, event, use_zuulweb=False):
"""Emulates sending the GitHub webhook event to the connection."""
- port = self.webapp.server.socket.getsockname()[1]
name, data = event
payload = json.dumps(data).encode('utf8')
secret = self.connection_config['webhook_token']
signature = githubconnection._sign_request(payload, secret)
- headers = {'X-Github-Event': name, 'X-Hub-Signature': signature}
- req = urllib.request.Request(
- 'http://localhost:%s/connection/%s/payload'
- % (port, self.connection_name),
- data=payload, headers=headers)
- return urllib.request.urlopen(req)
+ headers = {'x-github-event': name, 'x-hub-signature': signature}
+
+ if use_zuulweb:
+ req = urllib.request.Request(
+ 'http://127.0.0.1:%s/driver/github/%s/payload'
+ % (self.zuul_web_port, self.connection_name),
+ data=payload, headers=headers)
+ return urllib.request.urlopen(req)
+ else:
+ job = self.rpcclient.submitJob(
+ 'github:%s:payload' % self.connection_name,
+ {'headers': headers, 'body': data})
+ return json.loads(job.data[0])
def addProject(self, project):
# use the original method here and additionally register it in the
@@ -1983,6 +1995,13 @@
'gearman', 'ssl_key',
os.path.join(FIXTURE_DIR, 'gearman/client.key'))
+ self.rpcclient = zuul.rpcclient.RPCClient(
+ self.config.get('gearman', 'server'),
+ self.gearman_server.port,
+ get_default(self.config, 'gearman', 'ssl_key'),
+ get_default(self.config, 'gearman', 'ssl_cert'),
+ get_default(self.config, 'gearman', 'ssl_ca'))
+
gerritsource.GerritSource.replication_timeout = 1.5
gerritsource.GerritSource.replication_retry_interval = 0.5
gerritconnection.GerritEventConnector.delay = 0.0
@@ -2000,7 +2019,7 @@
]
self.configure_connections()
- self.sched.registerConnections(self.connections, self.webapp)
+ self.sched.registerConnections(self.connections)
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
@@ -2065,6 +2084,7 @@
server = config.get('server', 'github.com')
db = self.github_changes_dbs.setdefault(server, {})
con = FakeGithubConnection(driver, name, config,
+ self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.event_queues.append(con.event_queue)
@@ -2297,6 +2317,7 @@
self.statsd.join()
self.webapp.stop()
self.webapp.join()
+ self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.zk.disconnect()
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index 3942b0b..7aca428 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -12,15 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
+import asyncio
+import threading
import os
import re
from testtools.matchers import MatchesRegex, StartsWith
import urllib
+import socket
import time
from unittest import skip
import git
+import zuul.web
+
from tests.base import ZuulTestCase, simple_layout, random_sha1
@@ -734,3 +739,85 @@
# project2 should have no parsed branch
self.assertEqual(0, len(project2.unparsed_branch_config.keys()))
+
+
+class TestGithubWebhook(ZuulTestCase):
+ config_file = 'zuul-github-driver.conf'
+
+ def setUp(self):
+ super(TestGithubWebhook, self).setUp()
+
+ # Start the web server
+ self.web = zuul.web.ZuulWeb(
+ listen_address='127.0.0.1', listen_port=0,
+ gear_server='127.0.0.1', gear_port=self.gearman_server.port,
+ github_connections={'github': self.fake_github})
+ loop = asyncio.new_event_loop()
+ loop.set_debug(True)
+ ws_thread = threading.Thread(target=self.web.run, args=(loop,))
+ ws_thread.start()
+ self.addCleanup(loop.close)
+ self.addCleanup(ws_thread.join)
+ self.addCleanup(self.web.stop)
+
+ host = '127.0.0.1'
+ # Wait until web server is started
+ while True:
+ time.sleep(0.1)
+ if self.web.server is None:
+ continue
+ port = self.web.server.sockets[0].getsockname()[1]
+ try:
+ with socket.create_connection((host, port)):
+ break
+ except ConnectionRefusedError:
+ pass
+
+ self.fake_github.setZuulWebPort(port)
+
+ def tearDown(self):
+ super(TestGithubWebhook, self).tearDown()
+
+ @simple_layout('layouts/basic-github.yaml', driver='github')
+ def test_webhook(self):
+ """Test that we can get github events via zuul-web."""
+
+ self.executor_server.hold_jobs_in_build = True
+
+ A = self.fake_github.openFakePullRequest('org/project', 'master', 'A')
+ self.fake_github.emitEvent(A.getPullRequestOpenedEvent(),
+ use_zuulweb=True)
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test1').result)
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test2').result)
+
+ job = self.getJobFromHistory('project-test2')
+ zuulvars = job.parameters['zuul']
+ self.assertEqual(str(A.number), zuulvars['change'])
+ self.assertEqual(str(A.head_sha), zuulvars['patchset'])
+ self.assertEqual('master', zuulvars['branch'])
+ self.assertEqual(1, len(A.comments))
+ self.assertThat(
+ A.comments[0],
+ MatchesRegex('.*\[project-test1 \]\(.*\).*', re.DOTALL))
+ self.assertThat(
+ A.comments[0],
+ MatchesRegex('.*\[project-test2 \]\(.*\).*', re.DOTALL))
+ self.assertEqual(2, len(self.history))
+
+ # test_pull_unmatched_branch_event(self):
+ self.create_branch('org/project', 'unmatched_branch')
+ B = self.fake_github.openFakePullRequest(
+ 'org/project', 'unmatched_branch', 'B')
+ self.fake_github.emitEvent(B.getPullRequestOpenedEvent(),
+ use_zuulweb=True)
+ self.waitUntilSettled()
+
+ self.assertEqual(2, len(self.history))
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 3cffa10..a32d924 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -160,7 +160,7 @@
self.log.info('Starting scheduler')
try:
self.sched.start()
- self.sched.registerConnections(self.connections, webapp)
+ self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
self.sched.resume()
except Exception:
diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py
index 78392db..dc4f198 100755
--- a/zuul/cmd/web.py
+++ b/zuul/cmd/web.py
@@ -23,6 +23,7 @@
import zuul.web
from zuul.driver.sql import sqlconnection
+from zuul.driver.github import githubconnection
from zuul.lib.config import get_default
@@ -73,6 +74,11 @@
sql_conn = connections[0]
params['sql_connection'] = sql_conn
+ params['github_connections'] = {}
+ for conn_name, connection in self.connections.connections.items():
+ if isinstance(connection, githubconnection.GithubConnection):
+ params['github_connections'][conn_name] = connection
+
try:
self.web = zuul.web.ZuulWeb(**params)
except Exception as e:
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 483495d..5115154 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -74,21 +74,3 @@
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
-
- def registerWebapp(self, webapp):
- self.webapp = webapp
-
- def registerHttpHandler(self, path, handler):
- """Add connection handler for HTTP URI.
-
- Connection can use builtin HTTP server for listening on incoming event
- requests. The resulting path will be /connection/connection_name/path.
- """
- self.webapp.register_path(self._connectionPath(path), handler)
-
- def unregisterHttpHandler(self, path):
- """Remove the connection handler for HTTP URI."""
- self.webapp.unregister_path(self._connectionPath(path))
-
- def _connectionPath(self, path):
- return '/connection/%s/%s' % (self.connection_name, path)
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index b766c6f..41f93c4 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,6 +21,8 @@
import threading
import time
import re
+import json
+import traceback
import cachecontrol
from cachecontrol.cache import DictCache
@@ -28,13 +30,14 @@
import iso8601
import jwt
import requests
-import webob
-import webob.dec
import voluptuous as v
import github3
import github3.exceptions
+import gear
+
from zuul.connection import BaseConnection
+from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
@@ -65,71 +68,101 @@
utc = UTC()
-class GithubWebhookListener():
-
- log = logging.getLogger("zuul.GithubWebhookListener")
+class GithubGearmanWorker(object):
+ """A thread that answers gearman requests"""
+ log = logging.getLogger("zuul.GithubGearmanWorker")
def __init__(self, connection):
+ self.config = connection.sched.config
self.connection = connection
+ self.thread = threading.Thread(target=self._run,
+ name='github-gearman-worker')
+ self._running = False
+ handler = "github:%s:payload" % self.connection.connection_name
+ self.jobs = {
+ handler: self.handle_payload,
+ }
- def handle_request(self, path, tenant_name, request):
- if request.method != 'POST':
- self.log.debug("Only POST method is allowed.")
- raise webob.exc.HTTPMethodNotAllowed(
- 'Only POST method is allowed.')
+ def _run(self):
+ while self._running:
+ try:
+ job = self.gearman.getJob()
+ try:
+ if job.name not in self.jobs:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ continue
+ output = self.jobs[job.name](json.loads(job.arguments))
+ job.sendWorkComplete(json.dumps(output))
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
- delivery = request.headers.get('X-GitHub-Delivery')
+ def handle_payload(self, args):
+ headers = args.get("headers")
+ body = args.get("body")
+
+ delivery = headers.get('X-GitHub-Delivery')
self.log.debug("Github Webhook Received: {delivery}".format(
delivery=delivery))
- self._validate_signature(request)
# TODO(jlk): Validate project in the request is a project we know
try:
- self.__dispatch_event(request)
+ self.__dispatch_event(body, headers)
+ output = {'return_code': 200}
except Exception:
+ output = {'return_code': 503}
self.log.exception("Exception handling Github event:")
- def __dispatch_event(self, request):
+ return output
+
+ def __dispatch_event(self, body, headers):
try:
- event = request.headers['X-Github-Event']
+ event = headers['x-github-event']
self.log.debug("X-Github-Event: " + event)
except KeyError:
self.log.debug("Request headers missing the X-Github-Event.")
- raise webob.exc.HTTPBadRequest('Please specify a X-Github-Event '
- 'header.')
+ raise Exception('Please specify a X-Github-Event header.')
try:
- json_body = request.json_body
- self.connection.addEvent(json_body, event)
+ self.connection.addEvent(body, event)
except Exception:
message = 'Exception deserializing JSON body'
self.log.exception(message)
- raise webob.exc.HTTPBadRequest(message)
+ # TODO(jlk): Raise this as something different?
+ raise Exception(message)
- def _validate_signature(self, request):
- secret = self.connection.connection_config.get('webhook_token', None)
- if secret is None:
- raise RuntimeError("webhook_token is required")
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+ self.gearman = gear.TextWorker('Zuul Github Connector')
+ self.log.debug("Connect to gearman")
+ self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.log.debug("Waiting for server")
+ self.gearman.waitForServer()
+ self.log.debug("Registering")
+ for job in self.jobs:
+ self.gearman.registerFunction(job)
+ self.thread.start()
- body = request.body
- try:
- request_signature = request.headers['X-Hub-Signature']
- except KeyError:
- raise webob.exc.HTTPUnauthorized(
- 'Please specify a X-Hub-Signature header with secret.')
-
- payload_signature = _sign_request(body, secret)
-
- self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
- self.log.debug("Request Signature: {0}".format(str(request_signature)))
- if not hmac.compare_digest(
- str(payload_signature), str(request_signature)):
- raise webob.exc.HTTPUnauthorized(
- 'Request signature does not match calculated payload '
- 'signature. Check that secret is correct.')
-
- return True
+ def stop(self):
+ self._running = False
+ self.gearman.stopWaitingForJobs()
+ # We join here to avoid whitelisting the thread -- if it takes more
+ # than 5s to stop in tests, there's a problem.
+ self.thread.join(timeout=5)
+ self.gearman.shutdown()
class GithubEventConnector(threading.Thread):
@@ -456,15 +489,18 @@
re.MULTILINE | re.IGNORECASE)
def onLoad(self):
- webhook_listener = GithubWebhookListener(self)
- self.registerHttpHandler(self.payload_path,
- webhook_listener.handle_request)
+ self.log.info('Starting GitHub connection: %s' % self.connection_name)
+ self.gearman_worker = GithubGearmanWorker(self)
+ self.log.info('Authing to GitHub')
self._authenticateGithubAPI()
self._prime_installation_map()
+ self.log.info('Starting event connector')
self._start_event_connector()
+ self.log.info('Starting GearmanWorker')
+ self.gearman_worker.start()
def onStop(self):
- self.unregisterHttpHandler(self.payload_path)
+ self.gearman_worker.stop()
self._stop_event_connector()
def _start_event_connector(self):
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 33c66f9..86e2218 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -66,13 +66,6 @@
if load:
connection.onLoad()
- def registerWebapp(self, webapp):
- for driver_name, driver in self.drivers.items():
- if hasattr(driver, 'registerWebapp'):
- driver.registerWebapp(webapp)
- for connection_name, connection in self.connections.items():
- connection.registerWebapp(webapp)
-
def reconfigureDrivers(self, tenant):
for driver in self.drivers.values():
if hasattr(driver, 'reconfigure'):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a2e3b6e..1aed284 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -292,11 +292,10 @@
except Exception:
self.log.exception("Exception while processing command")
- def registerConnections(self, connections, webapp, load=True):
+ def registerConnections(self, connections, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
- self.connections.registerWebapp(webapp)
self.connections.registerScheduler(self, load)
def stopConnections(self):
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index a98a6c8..201785d 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -16,6 +16,8 @@
import asyncio
+import hashlib
+import hmac
import json
import logging
import os
@@ -33,6 +35,12 @@
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
+def _sign_request(body, secret):
+ signature = 'sha1=' + hmac.new(
+ secret.encode('utf-8'), body, hashlib.sha1).hexdigest()
+ return signature
+
+
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
@@ -150,8 +158,9 @@
# Tenant status cache expiry
cache_expiry = 1
- def __init__(self, rpc):
+ def __init__(self, rpc, github_connections):
self.rpc = rpc
+ self.github_connections = github_connections
self.cache = {}
self.cache_time = {}
self.controllers = {
@@ -159,13 +168,14 @@
'status_get': self.status_get,
'job_list': self.job_list,
'key_get': self.key_get,
+ 'payload_post': self.payload_post,
}
- def tenant_list(self, request):
+ async def tenant_list(self, request):
job = self.rpc.submitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
- def status_get(self, request):
+ async def status_get(self, request):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
@@ -179,23 +189,82 @@
resp.last_modified = self.cache_time[tenant]
return resp
- def job_list(self, request):
+ async def job_list(self, request):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
resp = web.json_response(json.loads(job.data[0]))
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
- def key_get(self, request):
+ async def key_get(self, request):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
+ def _validate_signature(self, body, headers, secret):
+ try:
+ request_signature = headers['x-hub-signature']
+ except KeyError:
+ raise web.HTTPUnauthorized(
+ reason='X-Hub-Signature header missing.')
+
+ payload_signature = _sign_request(body, secret)
+
+ self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
+ self.log.debug("Request Signature: {0}".format(str(request_signature)))
+ if not hmac.compare_digest(
+ str(payload_signature), str(request_signature)):
+ raise web.HTTPUnauthorized(
+ reason=('Request signature does not match calculated payload '
+ 'signature. Check that secret is correct.'))
+
+ return True
+
+ async def github_payload(self, post):
+ connection = post.match_info["connection"]
+ github_connection = self.github_connections.get(connection)
+ token = github_connection.connection_config.get('webhook_token')
+
+ # Note(tobiash): We need to normalize the headers. Otherwise we will
+ # have trouble to get them from the dict afterwards.
+ # e.g.
+ # GitHub: sent: X-GitHub-Event received: X-GitHub-Event
+ # urllib: sent: X-GitHub-Event received: X-Github-Event
+ #
+ # We cannot easily solve this mismatch as every http processing lib
+ # modifies the header casing in its own way and by specification http
+ # headers are case insensitive so just lowercase all so we don't have
+ # to take care later.
+ headers = dict()
+ for key, value in post.headers.items():
+ headers[key.lower()] = value
+ body = await post.read()
+ self._validate_signature(body, headers, token)
+ # We cannot send the raw body through gearman, so it's easy to just
+ # encode it as json, after decoding it as utf-8
+ json_body = json.loads(body.decode('utf-8'))
+ job = self.rpc.submitJob('github:%s:payload' % connection,
+ {'headers': headers, 'body': json_body})
+ jobdata = json.loads(job.data[0])
+ return web.json_response(jobdata, status=jobdata['return_code'])
+
+ async def payload_post(self, post):
+ # Allow for other drivers to also accept a payload in the future,
+ # instead of hardcoding this to GitHub
+ driver = post.match_info["driver"]
+ try:
+ method = getattr(self, driver + '_payload')
+ except AttributeError as e:
+ self.log.exception("Unknown driver error:")
+ raise web.HTTPNotFound
+
+ return await method(post)
+
async def processRequest(self, request, action):
try:
- resp = self.controllers[action](request)
+ resp = await self.controllers[action](request)
except asyncio.CancelledError:
self.log.debug("request handling cancelled")
except Exception as e:
@@ -300,7 +369,8 @@
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
- sql_connection=None):
+ sql_connection=None,
+ github_connections={}):
self.listen_address = listen_address
self.listen_port = listen_port
self.event_loop = None
@@ -311,7 +381,7 @@
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
- self.gearman_handler = GearmanHandler(self.rpc)
+ self.gearman_handler = GearmanHandler(self.rpc, github_connections)
if sql_connection:
self.sql_handler = SqlHandler(sql_connection)
else:
@@ -337,6 +407,10 @@
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
+ async def _handlePayloadPost(self, post):
+ return await self.gearman_handler.processRequest(post,
+ 'payload_post')
+
async def _handleStaticRequest(self, request):
fp = None
if request.path.endswith("tenants.html") or request.path.endswith("/"):
@@ -376,6 +450,8 @@
('GET', '/{tenant}/jobs.html', self._handleStaticRequest),
('GET', '/{tenant}/stream.html', self._handleStaticRequest),
('GET', '/tenants.html', self._handleStaticRequest),
+ ('POST', '/driver/{driver}/{connection}/payload',
+ self._handlePayloadPost),
('GET', '/', self._handleStaticRequest),
]