Add functional tests.

Add support for testing, and some initial functional tests.

Some threads in Zuul now have proper shutdown methods, since
that's needed for testing.  The threads that are faked in the
testing infrastructure still lack shutdown methods.

Fix some bugs uncovered by tests:

Reverse the order of commits that are given to Jenkins (the
commits that preceded the one being tested were listed in
revers order, this corrects that).

Return 0 in reportChange if the change has already been
reported.  The old return value of True was misinterpreted
as an error.

Change-Id: I4979d0efd8581061a2b856b892d4ecdb75284a1b
Reviewed-on: https://review.openstack.org/10572
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
new file mode 100644
index 0000000..e01ebfd
--- /dev/null
+++ b/tests/fixtures/layout.yaml
@@ -0,0 +1,47 @@
+queues:
+  - name: check
+    manager: IndependentQueueManager
+    trigger:
+      - event: patchset-uploaded
+    success:
+      verified: 1
+    failure:
+      verified: -1
+
+  - name: post
+    manager: IndependentQueueManager
+    trigger:
+      - event: ref-updated
+        ref: ^(?!refs/).*$
+
+  - name: gate
+    manager: DependentQueueManager
+    trigger:
+      - event: comment-added
+        approval: 
+          - approved: 1
+    success:
+      verified: 2
+      submit: true
+    failure:
+      verified: -2
+    start:
+      verified: 0
+
+jobs:
+  - name: ^.*-merge$
+    failure-message: Unable to merge change
+    hold-following-changes: true
+
+projects:
+  - name: org/project
+    check:
+      - project-merge:
+        - project-test1
+        - project-test2
+    gate:
+      - project-merge:
+        - project-test1
+        - project-test2
+    post:
+      - project-post
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
new file mode 100644
index 0000000..b66b489
--- /dev/null
+++ b/tests/fixtures/zuul.conf
@@ -0,0 +1,12 @@
+[jenkins]
+server=https://jenkins.example.com
+user=jenkins
+apikey=1234
+
+[gerrit]
+server=review.example.com
+user=jenkins
+sshkey=none
+
+[zuul]
+layout_config=layout.yaml
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
new file mode 100644
index 0000000..3d9355f
--- /dev/null
+++ b/tests/test_scheduler.py
@@ -0,0 +1,466 @@
+#!/usr/bin/env python
+
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import ConfigParser
+import os
+import Queue
+import logging
+import json
+import threading
+import time
+import pprint
+import re
+
+import zuul
+import zuul.scheduler
+import zuul.launcher.jenkins
+import zuul.trigger.gerrit
+
+FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
+                           'fixtures')
+CONFIG = ConfigParser.ConfigParser()
+CONFIG.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
+
+CONFIG.set('zuul', 'layout_config',
+           os.path.join(FIXTURE_DIR, "layout.yaml"))
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class FakeChange(object):
+    categories = {'APRV': 'Approved',
+                  'CRVW': 'Code-Review',
+                  'VRFY': 'Verified'}
+
+    def __init__(self, number, project, branch, subject, status='NEW'):
+        self.patchsets = []
+        self.submit_records = []
+        self.number = number
+        self.project = project
+        self.branch = branch
+        self.subject = subject
+        self.latest_patchset = 0
+        self.data = {
+            'branch': branch,
+            'comments': [],
+            'commitMessage': subject,
+            'createdOn': time.time(),
+            'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
+            'lastUpdated': time.time(),
+            'number': str(number),
+            'open': True,
+            'owner': {'email': 'user@example.com',
+                      'name': 'User Name',
+                      'username': 'username'},
+            'patchSets': self.patchsets,
+            'project': project,
+            'status': status,
+            'subject': subject,
+            'submitRecords': self.submit_records,
+            'url': 'https://hostname/%s' % number}
+
+        self.addPatchset()
+
+    def addPatchset(self, files=None):
+        self.latest_patchset += 1
+        d = {'approvals': [],
+             'createdOn': time.time(),
+             'files': [{'file': '/COMMIT_MSG',
+                        'type': 'ADDED'},
+                       {'file': 'README',
+                        'type': 'MODIFIED'}],
+             'number': self.latest_patchset,
+             'ref': 'refs/changes/1/%s/%s' % (self.number,
+                                              self.latest_patchset),
+             'revision':
+                 'aa69c46accf97d0598111724a38250ae76a22c87',
+             'uploader': {'email': 'user@example.com',
+                          'name': 'User name',
+                          'username': 'user'}}
+        self.data['currentPatchSet'] = d
+        self.patchsets.append(d)
+
+    def addApproval(self, category, value):
+        event = {'approvals': [{'description': self.categories[category],
+                                'type': category,
+                                'value': str(value)}],
+                 'author': {'email': 'user@example.com',
+                            'name': 'User Name',
+                            'username': 'username'},
+                 'change': {'branch': self.branch,
+                            'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
+                            'number': str(self.number),
+                            'owner': {'email': 'user@example.com',
+                                      'name': 'User Name',
+                                      'username': 'username'},
+                            'project': self.project,
+                            'subject': self.subject,
+                            'topic': 'master',
+                            'url': 'https://hostname/459'},
+                 'comment': '',
+                 'patchSet': self.patchsets[-1],
+                 'type': 'comment-added'}
+        return json.loads(json.dumps(event))
+
+    def query(self):
+        return json.loads(json.dumps(self.data))
+
+    def setMerged(self):
+        self.data['status'] = 'MERGED'
+        self.open = False
+
+
+class FakeGerrit(object):
+    def __init__(self, *args, **kw):
+        self.event_queue = Queue.Queue()
+        self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
+        self.change_number = 0
+        self.changes = {}
+
+    def addFakeChange(self, project, branch, subject):
+        self.change_number += 1
+        c = FakeChange(self.change_number, project, branch, subject)
+        self.changes[self.change_number] = c
+        return c
+
+    def addEvent(self, data):
+        return self.event_queue.put(data)
+
+    def getEvent(self):
+        return self.event_queue.get()
+
+    def eventDone(self):
+        self.event_queue.task_done()
+
+    def review(self, project, changeid, message, action):
+        if 'submit' in action:
+            number, ps = changeid.split(',')
+            change = self.changes[int(number)]
+            change.setMerged()
+
+    def query(self, number):
+        change = self.changes[int(number)]
+        return change.query()
+
+    def startWatching(self, *args, **kw):
+        pass
+
+
+class FakeJenkinsEvent(object):
+    def __init__(self, name, number, parameters, phase, status=None):
+        data = {'build':
+                     {'full_url': 'https://server/job/%s/%s/' % (name, number),
+                      'number': number,
+                      'parameters': parameters,
+                      'phase': phase,
+                      'url': 'job/%s/%s/' % (name, number)},
+                     'name': name,
+                     'url': 'job/%s/' % name}
+        if status:
+            data['build']['status'] = status
+        self.body = json.dumps(data)
+
+
+class FakeJenkinsJob(threading.Thread):
+    log = logging.getLogger("zuul.test")
+
+    def __init__(self, jenkins, callback, name, number, parameters):
+        threading.Thread.__init__(self)
+        self.jenkins = jenkins
+        self.callback = callback
+        self.name = name
+        self.number = number
+        self.parameters = parameters
+        self.wait_condition = threading.Condition()
+        self.waiting = False
+
+    def release(self):
+        self.wait_condition.acquire()
+        self.wait_condition.notify()
+        self.waiting = False
+        self.log.debug("Job %s released" % (self.parameters['UUID']))
+        self.wait_condition.release()
+
+    def isWaiting(self):
+        self.wait_condition.acquire()
+        if self.waiting:
+            ret = True
+        else:
+            ret = False
+        self.wait_condition.release()
+        return ret
+
+    def _wait(self):
+        self.wait_condition.acquire()
+        self.waiting = True
+        self.log.debug("Job %s waiting" % (self.parameters['UUID']))
+        self.wait_condition.wait()
+        self.wait_condition.release()
+
+    def run(self):
+        self.jenkins.fakeEnqueue(self)
+        if self.jenkins.hold_jobs_in_queue:
+            self._wait()
+        self.jenkins.fakeDequeue(self)
+        self.callback.jenkins_endpoint(FakeJenkinsEvent(
+                self.name, self.number, self.parameters,
+                'STARTED'))
+        if self.jenkins.hold_jobs_in_build:
+            self._wait()
+        self.log.debug("Job %s continuing" % (self.parameters['UUID']))
+        self.jenkins.fakeAddHistory(name=self.name, number=self.number,
+                                    result='SUCCESS')
+        self.callback.jenkins_endpoint(FakeJenkinsEvent(
+                self.name, self.number, self.parameters,
+                'COMPLETED', 'SUCCESS'))
+        self.callback.jenkins_endpoint(FakeJenkinsEvent(
+                self.name, self.number, self.parameters,
+                'FINISHED', 'SUCCESS'))
+        self.jenkins.all_jobs.remove(self)
+
+
+class FakeJenkins(object):
+    log = logging.getLogger("zuul.test")
+
+    def __init__(self, *args, **kw):
+        self.queue = []
+        self.all_jobs = []
+        self.job_counter = {}
+        self.job_history = []
+        self.hold_jobs_in_queue = False
+        self.hold_jobs_in_build = False
+
+    def fakeEnqueue(self, job):
+        self.queue.append(job)
+
+    def fakeDequeue(self, job):
+        self.queue.remove(job)
+
+    def fakeAddHistory(self, **kw):
+        self.job_history.append(kw)
+
+    def fakeRelease(self, regex=None):
+        all_jobs = self.all_jobs[:]
+        self.log.debug("releasing jobs %s (%s)" % (regex, len(self.all_jobs)))
+        for job in all_jobs:
+            if not regex or re.match(regex, job.name):
+                self.log.debug("releasing job %s" % (job.parameters['UUID']))
+                job.release()
+            else:
+                self.log.debug("not releasing job %s" % (
+                        job.parameters['UUID']))
+        self.log.debug("done releasing jobs %s (%s)" % (regex,
+                                                        len(self.all_jobs)))
+
+    def fakeAllWaiting(self, regex=None):
+        all_jobs = self.all_jobs[:]
+        for job in all_jobs:
+            self.log.debug("job %s %s" % (job.parameters['UUID'],
+                                          job.isWaiting()))
+            if not job.isWaiting():
+                return False
+        return True
+
+    def build_job(self, name, parameters):
+        count = self.job_counter.get(name, 0)
+        count += 1
+        self.job_counter[name] = count
+        job = FakeJenkinsJob(self, self.callback, name, count, parameters)
+        self.all_jobs.append(job)
+        job.start()
+
+    def set_build_description(self, *args, **kw):
+        pass
+
+
+class FakeJenkinsCallback(zuul.launcher.jenkins.JenkinsCallback):
+    def start(self):
+        pass
+
+
+class testScheduler(unittest.TestCase):
+    log = logging.getLogger("zuul.test")
+
+    def setUp(self):
+        self.config = CONFIG
+        self.sched = zuul.scheduler.Scheduler()
+
+        def jenkinsFactory(*args, **kw):
+            self.fake_jenkins = FakeJenkins()
+            return self.fake_jenkins
+
+        def jenkinsCallbackFactory(*args, **kw):
+            self.fake_jenkins_callback = FakeJenkinsCallback(*args, **kw)
+            return self.fake_jenkins_callback
+
+        zuul.launcher.jenkins.ExtendedJenkins = jenkinsFactory
+        zuul.launcher.jenkins.JenkinsCallback = jenkinsCallbackFactory
+        self.jenkins = zuul.launcher.jenkins.Jenkins(self.config, self.sched)
+        self.fake_jenkins.callback = self.fake_jenkins_callback
+
+        zuul.lib.gerrit.Gerrit = FakeGerrit
+
+        self.gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
+        self.fake_gerrit = self.gerrit.gerrit
+
+        self.sched.setLauncher(self.jenkins)
+        self.sched.setTrigger(self.gerrit)
+
+        self.sched.start()
+        self.sched.reconfigure(self.config)
+        self.sched.resume()
+
+    def tearDown(self):
+        self.jenkins.stop()
+        self.gerrit.stop()
+        self.sched.stop()
+        self.sched.join()
+
+    def waitUntilSettled(self):
+        self.log.debug("Waiting until settled...")
+        start = time.time()
+        while True:
+            if time.time() - start > 10:
+                print 'queue status:',
+                print self.sched.trigger_event_queue.empty(),
+                print self.sched.result_event_queue.empty(),
+                print self.fake_gerrit.event_queue.empty(),
+                raise Exception("Timeout waiting for Zuul to settle")
+            self.fake_gerrit.event_queue.join()
+            self.sched.queue_lock.acquire()
+            if (self.sched.trigger_event_queue.empty() and
+                self.sched.result_event_queue.empty() and
+                self.fake_gerrit.event_queue.empty() and
+                self.fake_jenkins.fakeAllWaiting()):
+                self.sched.queue_lock.release()
+                self.log.debug("...settled.")
+                return
+            self.sched.queue_lock.release()
+            self.sched.wake_event.wait(0.1)
+
+    def test_jobs_launched(self):
+        "Test that jobs are launched and a change is merged"
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        jobs = self.fake_jenkins.job_history
+        job_names = [x['name'] for x in jobs]
+        assert 'project-merge' in job_names
+        assert 'project-test1' in job_names
+        assert 'project-test2' in job_names
+        assert jobs[0]['result'] == 'SUCCESS'
+        assert jobs[1]['result'] == 'SUCCESS'
+        assert jobs[2]['result'] == 'SUCCESS'
+        assert A.data['status'] == 'MERGED'
+
+    def test_parallel_changes(self):
+        "Test that changes are tested in parallel and merged in series"
+        self.fake_jenkins.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+        jobs = self.fake_jenkins.all_jobs
+        assert len(jobs) == 1
+        assert jobs[0].name == 'project-merge'
+        assert (jobs[0].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+
+        self.fake_jenkins.fakeRelease('.*-merge')
+        self.waitUntilSettled()
+        assert len(jobs) == 3
+        assert jobs[0].name == 'project-test1'
+        assert (jobs[0].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+        assert jobs[1].name == 'project-test2'
+        assert (jobs[1].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+        assert jobs[2].name == 'project-merge'
+        assert (jobs[2].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1')
+
+        self.fake_jenkins.fakeRelease('.*-merge')
+        self.waitUntilSettled()
+        assert len(jobs) == 5
+        assert jobs[0].name == 'project-test1'
+        assert (jobs[0].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+        assert jobs[1].name == 'project-test2'
+        assert (jobs[1].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+
+        assert jobs[2].name == 'project-test1'
+        assert (jobs[2].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1')
+        assert jobs[3].name == 'project-test2'
+        assert (jobs[3].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1')
+
+        assert jobs[4].name == 'project-merge'
+        assert (jobs[4].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1^'
+                'org/project:master:refs/changes/1/3/1')
+
+        self.fake_jenkins.fakeRelease('.*-merge')
+        self.waitUntilSettled()
+        assert len(jobs) == 6
+        assert jobs[0].name == 'project-test1'
+        assert (jobs[0].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+        assert jobs[1].name == 'project-test2'
+        assert (jobs[1].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1')
+
+        assert jobs[2].name == 'project-test1'
+        assert (jobs[2].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1')
+        assert jobs[3].name == 'project-test2'
+        assert (jobs[3].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1')
+
+        assert jobs[4].name == 'project-test1'
+        assert (jobs[4].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1^'
+                'org/project:master:refs/changes/1/3/1')
+        assert jobs[5].name == 'project-test2'
+        assert (jobs[5].parameters['GERRIT_CHANGES'] ==
+                'org/project:master:refs/changes/1/1/1^'
+                'org/project:master:refs/changes/1/2/1^'
+                'org/project:master:refs/changes/1/3/1')
+
+        self.fake_jenkins.hold_jobs_in_build = False
+        self.fake_jenkins.fakeRelease()
+        self.waitUntilSettled()
+        assert len(jobs) == 0
+
+        jobs = self.fake_jenkins.job_history
+        assert len(jobs) == 9
+        assert A.data['status'] == 'MERGED'
+        assert B.data['status'] == 'MERGED'
+        assert C.data['status'] == 'MERGED'
diff --git a/tools/pip-requires b/tools/pip-requires
new file mode 100644
index 0000000..9dcc275
--- /dev/null
+++ b/tools/pip-requires
@@ -0,0 +1,5 @@
+PyYAML
+python-jenkins
+Paste
+webob
+paramiko
diff --git a/tools/test-requires b/tools/test-requires
new file mode 100644
index 0000000..f3c7e8e
--- /dev/null
+++ b/tools/test-requires
@@ -0,0 +1 @@
+nose
diff --git a/tox.ini b/tox.ini
index 39d864d..d4a0074 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,10 @@
 [tox]
-envlist = pep8, pyflakes
+envlist = pep8, pyflakes, py27
+
+[testenv]
+deps = -r{toxinidir}/tools/pip-requires
+       -r{toxinidir}/tools/test-requires
+commands = nosetests {posargs}
 
 [tox:jenkins]
 downloadcache = ~/cache/pip
diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py
index 1b11c5d..805657e 100644
--- a/zuul/launcher/jenkins.py
+++ b/zuul/launcher/jenkins.py
@@ -91,10 +91,18 @@
     def __init__(self, jenkins):
         threading.Thread.__init__(self)
         self.jenkins = jenkins
+        self.wake_event = threading.Event()
+        self._stopped = False
+
+    def stop(self):
+        self._stopped = True
+        self.wake_event.set()
 
     def run(self):
         while True:
-            time.sleep(180)
+            self.wake_event.wait(180)
+            if self._stopped:
+                return
             try:
                 self.jenkins.lookForLostBuilds()
             except:
@@ -191,9 +199,15 @@
         self.cleanup_thread = JenkinsCleanup(self)
         self.cleanup_thread.start()
 
+    def stop(self):
+        self.cleanup_thread.stop()
+        self.cleanup_thread.join()
+
     def launch(self, job, change, dependent_changes=[]):
         self.log.info("Launch job %s for change %s with dependent changes %s" %
                       (job, change, dependent_changes))
+        dependent_changes = dependent_changes[:]
+        dependent_changes.reverse()
         uuid = str(uuid1())
         params = dict(UUID=uuid,
                       GERRIT_PROJECT=change.project.name)
diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py
index fd5ebe4..ca626f0 100644
--- a/zuul/lib/gerrit.py
+++ b/zuul/lib/gerrit.py
@@ -108,6 +108,9 @@
     def getEvent(self):
         return self.event_queue.get()
 
+    def eventDone(self):
+        self.event_queue.task_done()
+
     def review(self, project, change, message, action={}):
         cmd = 'gerrit review --project %s' % project
         if message:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e42ae26..ff79d4f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,9 +30,11 @@
         threading.Thread.__init__(self)
         self.wake_event = threading.Event()
         self.reconfigure_complete_event = threading.Event()
+        self.queue_lock = threading.Lock()
         self._pause = False
         self._reconfigure = False
         self._exit = False
+        self._stopped = False
         self.launcher = None
         self.trigger = None
 
@@ -46,6 +48,10 @@
         self.projects = {}
         self.metajobs = {}
 
+    def stop(self):
+        self._stopped = True
+        self.wake_event.set()
+
     def _parseConfig(self, config_path):
         def toList(item):
             if not item:
@@ -170,17 +176,23 @@
 
     def addEvent(self, event):
         self.log.debug("Adding trigger event: %s" % event)
+        self.queue_lock.acquire()
         self.trigger_event_queue.put(event)
+        self.queue_lock.release()
         self.wake_event.set()
 
     def onBuildStarted(self, build):
         self.log.debug("Adding start event for build: %s" % build)
+        self.queue_lock.acquire()
         self.result_event_queue.put(('started', build))
+        self.queue_lock.release()
         self.wake_event.set()
 
     def onBuildCompleted(self, build):
         self.log.debug("Adding complete event for build: %s" % build)
+        self.queue_lock.acquire()
         self.result_event_queue.put(('completed', build))
+        self.queue_lock.release()
         self.wake_event.set()
 
     def reconfigure(self, config):
@@ -278,7 +290,10 @@
             self.log.debug("Run handler sleeping")
             self.wake_event.wait()
             self.wake_event.clear()
+            if self._stopped:
+                return
             self.log.debug("Run handler awake")
+            self.queue_lock.acquire()
             try:
                 if not self._pause:
                     if not self.trigger_event_queue.empty():
@@ -299,6 +314,7 @@
                         self.wake_event.set()
             except:
                 self.log.exception("Exception in run handler:")
+            self.queue_lock.release()
 
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
@@ -511,7 +527,7 @@
     def reportChange(self, change):
         self.log.debug("Reporting change %s" % change)
         if change.reported:
-            return True
+            return 0
         ret = None
         if change.didAllJobsSucceed():
             action = self.success_action
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 9e57e56..2c4574f 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -27,9 +27,16 @@
         super(GerritEventConnector, self).__init__()
         self.gerrit = gerrit
         self.sched = sched
+        self._stopped = False
+
+    def stop(self):
+        self._stopped = True
+        self.gerrit.addEvent(None)
 
     def _handleEvent(self):
         data = self.gerrit.getEvent()
+        if self._stopped:
+            return
         event = TriggerEvent()
         event.type = data.get('type')
         change = data.get('change')
@@ -51,9 +58,12 @@
             event.oldrev = refupdate.get('oldRev')
             event.newrev = refupdate.get('newRev')
         self.sched.addEvent(event)
+        self.gerrit.eventDone()
 
     def run(self):
         while True:
+            if self._stopped:
+                return
             try:
                 self._handleEvent()
             except:
@@ -81,6 +91,10 @@
             self.gerrit, sched)
         self.gerrit_connector.start()
 
+    def stop(self):
+        self.gerrit_connector.stop()
+        self.gerrit_connector.join()
+
     def report(self, change, message, action):
         self.log.debug("Report change %s, action %s, message: %s" %
                        (change, action, message))