Merge "On null changes serialize the id as null"
diff --git a/NEWS.rst b/NEWS.rst
index f6500f0..c4901a8 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -25,15 +25,14 @@
   triggers later).  See the sample layout.yaml and Zuul section of the
   documentation.
 
-* The default behavior is now to immediately dequeue changes that have
-  merge conflicts, even those not at the head of the queue.  To enable
-  the old behavior (which would wait until the conflicting change was
-  at the head before dequeuing it), see the new "dequeue-on-conflict"
-  option.
-
 * Some statsd keys have changed in a backwards incompatible way:
   * The counters and timers of the form zuul.job.{name} is now split
     into several keys of the form:
     zuul.pipeline.{pipeline-name}.job.{job-name}.{result}
   * Job names in statsd keys now have the '_' character substituted
     for the '.' character.
+
+* The layout.yaml structure has changed to introduce configurable
+  reporters. This requires restructuring the start/success/failure
+  actions to include a dictionary of reporters and their parameters.
+  See reporters in the docs and layout.yaml-sample.
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 039cffa..5a0c7b9 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -21,6 +21,7 @@
    gating
    triggers
    launchers
+   reporters
    zuul
 
 Indices and tables
diff --git a/doc/source/reporters.rst b/doc/source/reporters.rst
new file mode 100644
index 0000000..18d35a1
--- /dev/null
+++ b/doc/source/reporters.rst
@@ -0,0 +1,59 @@
+:title: Reporters
+
+Reporters
+=========
+
+Zuul can communicate results and progress back to configurable
+protocols. For example, after succeeding in a build a pipeline can be
+configured to post a positive review back to gerrit.
+
+There are three stages when a report can be handled. That is on:
+Start, Success or Failure. Each stage can have multiple reports.
+For example, you can set verified on gerrit and send an email.
+
+Gerrit
+------
+
+Zuul works with standard versions of Gerrit by invoking the
+``gerrit`` command over an SSH connection.  It reports back to
+Gerrit using SSH.
+
+The dictionary passed to the gerrit reporter is used for ``gerrit
+review`` arguments, with the boolean value of ``true`` simply
+indicating that the argument should be present without following it
+with a value. For example, ``verified: 1`` becomes ``gerrit review
+--verified 1`` and ``submit: true`` becomes ``gerrit review
+--submit``.
+
+Gerrit Configuration
+~~~~~~~~~~~~~~~~~~~~
+
+The configuration for posting back to gerrit is shared with the gerrit
+trigger in zuul.conf as described in :ref:`zuulconf`.
+
+SMTP
+----
+
+A simple email reporter is also available.
+
+SMTP Configuration
+~~~~~~~~~~~~~~~~~~
+
+zuul.conf contains the smtp server and default to/from as describe
+in :ref:`zuulconf`.
+
+Each pipeline can overwrite the to or from address by providing
+alternatives as arguments to the reporter. For example, ::
+
+  pipelines:
+    - name: post-merge
+      manager: IndependentPipelineManager
+      trigger:
+        - event: change-merged
+      success:
+        smtp:
+          to: you@example.com
+      failure:
+        smtp:
+          to: you@example.com
+          from: alternative@example.com
diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst
index 0bd07de..17c93ab 100644
--- a/doc/source/triggers.rst
+++ b/doc/source/triggers.rst
@@ -69,6 +69,7 @@
             create = group CI Tools
             push = +force CI Tools
             pushMerge = group CI Tools
+            forgeAuthor = group CI Tools
     [access "refs/for/refs/zuul/*"]
             pushMerge = group CI Tools
 
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 73ebf71..6adfa30 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -139,6 +139,23 @@
   is included).  Defaults to ``false``.
   ``job_name_in_report=true``
 
+smtp
+""""
+
+**server**
+  SMTP server hostname or address to use.
+  ``server=localhost``
+
+**default_from**
+  Who the email should appear to be sent from when emailing the report.
+  This can be overridden by individual pipelines.
+  ``default_from=zuul@example.com``
+
+**default_to**
+  Who the report should be emailed to by default.
+  This can be overridden by individual pipelines.
+  ``default_to=you@example.com``
+
 layout.yaml
 ~~~~~~~~~~~
 
@@ -329,25 +346,15 @@
   well.  To suppress this behavior (and allow jobs to continue
   running), set this to ``false``.  Default: ``true``.
 
-**dequeue-on-conflict**
-  Normally, if there is a merge conflict or similar error with a
-  change, Zuul will immediately remove it from the queue, even if the
-  error is only due to a change that happened to be enqueued ahead of
-  it.  If you would like to keep the change in the queue until it is
-  at the head to be certain that the merge conflict is intrinsic to
-  the change, set this to ``false``.  Default: ``true``.
-
 **success**
-  Describes what Zuul should do if all the jobs complete successfully.
+  Describes where Zuul should report to if all the jobs complete
+  successfully.
   This section is optional; if it is omitted, Zuul will run jobs and
   do nothing on success; it will not even report a message to Gerrit.
-  If the section is present, it will leave a message on the Gerrit
-  review.  Each additional argument is assumed to be an argument to
-  ``gerrit review``, with the boolean value of ``true`` simply
-  indicating that the argument should be present without following it
-  with a value.  For example, ``verified: 1`` becomes ``gerrit
-  review --verified 1`` and ``submit: true`` becomes ``gerrit review
-  --submit``.
+  If the section is present, the listed reporter plugins will be
+  asked to report on the jobs.
+  Each reporter's value dictionary is handled by the reporter. See
+  reporters for more details.
 
 **failure**
   Uses the same syntax as **success**, but describes what Zuul should
@@ -373,9 +380,11 @@
     trigger:
       - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
 This will trigger jobs each time a new patchset (or change) is
 uploaded to Gerrit, and report +/-1 values to Gerrit in the
@@ -388,10 +397,12 @@
         approval:
           - approved: 1
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 2
+        submit: true
     failure:
-      verified: -2
+      gerrit:
+        verified: -2
 
 This will trigger jobs whenever a reviewer leaves a vote of ``1`` in the
 ``approved`` review category in Gerrit (a non-standard category).
@@ -425,9 +436,11 @@
       trigger:
         - event: change-merged
       success:
-        force-message: True
+        gerrit:
+          force-message: True
       failure:
-        force-message: True
+        gerrit:
+          force-message: True
 
 The ``change-merged`` events happen when a change has been merged in the git
 repository. The change is thus closed and Gerrit will not accept modifications
@@ -558,6 +571,22 @@
 **name**
   The name of the project (as known by Gerrit).
 
+**merge-mode (optional)**
+  An optional value that indicates what strategy should be used to
+  merge changes to this project.  Supported values are:
+
+  ** merge-resolve **
+  Equivalent to 'git merge -s resolve'.  This corresponds closely to
+  what Gerrit performs (using JGit) for a project if the "Merge if
+  necessary" merge mode is selected and "Automatically resolve
+  conflicts" is checked.  This is the default.
+
+  ** merge **
+  Equivalent to 'git merge'.
+
+  ** cherry-pick **
+  Equivalent to 'git cherry-pick'.
+
 This is followed by a section for each of the pipelines defined above.
 Pipelines may be omitted if no jobs should run for this project in a
 given pipeline.  Within the pipeline section, the jobs that should be
diff --git a/etc/layout.yaml-sample b/etc/layout.yaml-sample
index b49f5d5..30a3352 100644
--- a/etc/layout.yaml-sample
+++ b/etc/layout.yaml-sample
@@ -5,9 +5,11 @@
       gerrit:
         - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+     gerrit:
+        verified: -1
 
   - name: tests
     manager: IndependentPipelineManager
@@ -16,9 +18,11 @@
         - event: patchset-created
           email_filter: ^.*@example.org$
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: post
     manager: IndependentPipelineManager
@@ -35,12 +39,14 @@
           approval:
             - approved: 1
     start:
-      verified: 0
+      gerrit:
+        verified: 0
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 1
     failure:
-      verified: -2
+      gerrit:
+        verified: -1
 
 jobs:
   - name: ^.*-merge$
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index edf1044..c193727 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -19,3 +19,9 @@
 ;git_user_email=zuul@example.com
 ;git_user_name=zuul
 status_url=https://jenkins.example.com/zuul/status
+
+[smtp]
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 9760a79..ddee844 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,9 +9,10 @@
 paramiko
 GitPython>=0.3.2.RC1
 lockfile
+ordereddict
 python-daemon
 extras
 statsd>=1.0.0,<3.0
 voluptuous>=0.6,<0.7
-gear>=0.3.1,<0.4.0
+gear>=0.4.0,<1.0.0
 apscheduler>=2.1.1,<3.0
diff --git a/tests/fixtures/layout-delayed-repo-init.yaml b/tests/fixtures/layout-delayed-repo-init.yaml
index e0613f1..6caf622 100644
--- a/tests/fixtures/layout-delayed-repo-init.yaml
+++ b/tests/fixtures/layout-delayed-repo-init.yaml
@@ -5,9 +5,11 @@
       gerrit:
         - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: post
     manager: IndependentPipelineManager
@@ -25,12 +27,15 @@
           approval:
             - approved: 1
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 2
+        submit: true
     failure:
-      verified: -2
+      gerrit:
+        verified: -2
     start:
-      verified: 0
+      gerrit:
+        verified: 0
     precedence: high
 
 projects:
diff --git a/tests/fixtures/layout-live-reconfiguration-functions.yaml b/tests/fixtures/layout-live-reconfiguration-functions.yaml
index f477a12..e261a88 100644
--- a/tests/fixtures/layout-live-reconfiguration-functions.yaml
+++ b/tests/fixtures/layout-live-reconfiguration-functions.yaml
@@ -11,12 +11,15 @@
           approval:
             - approved: 1
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 2
+        submit: true
     failure:
-      verified: -2
+      gerrit:
+        verified: -2
     start:
-      verified: 0
+      gerrit:
+        verified: 0
     precedence: high
 
 jobs:
diff --git a/tests/fixtures/layout-smtp.yaml b/tests/fixtures/layout-smtp.yaml
new file mode 100644
index 0000000..813857b
--- /dev/null
+++ b/tests/fixtures/layout-smtp.yaml
@@ -0,0 +1,25 @@
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: patchset-created
+    start:
+      smtp:
+        to: you@example.com
+    success:
+      gerrit:
+        verified: 1
+      smtp:
+        to: alternative_me@example.com
+        from: zuul_from@example.com
+    failure:
+      gerrit:
+        verified: -1
+
+projects:
+  - name: org/project
+    check:
+      - project-merge:
+        - project-test1
+        - project-test2
diff --git a/tests/fixtures/layout-timer.yaml b/tests/fixtures/layout-timer.yaml
index e24fb13..9e0f66b 100644
--- a/tests/fixtures/layout-timer.yaml
+++ b/tests/fixtures/layout-timer.yaml
@@ -5,9 +5,11 @@
       gerrit:
         - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: periodic
     manager: IndependentPipelineManager
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 675d351..dc659fb 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -8,9 +8,11 @@
       gerrit:
         - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: post
     manager: IndependentPipelineManager
@@ -28,12 +30,15 @@
           approval:
             - approved: 1
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 2
+        submit: true
     failure:
-      verified: -2
+      gerrit:
+        verified: -2
     start:
-      verified: 0
+      gerrit:
+        verified: 0
     precedence: high
 
   - name: unused
@@ -51,9 +56,11 @@
       gerrit:
         - event: change-restored
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: dup2
     manager: IndependentPipelineManager
@@ -61,12 +68,13 @@
       gerrit:
         - event: change-restored
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: conflict
-    dequeue-on-conflict: false
     manager: DependentPipelineManager
     failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
     trigger:
@@ -75,12 +83,15 @@
           approval:
             - approved: 1
     success:
-      verified: 2
-      submit: true
+      gerrit:
+        verified: 2
+        submit: true
     failure:
-      verified: -2
+      gerrit:
+        verified: -2
     start:
-      verified: 0
+      gerrit:
+        verified: 0
 
 jobs:
   - name: ^.*-merge$
diff --git a/tests/fixtures/layouts/good_layout.yaml b/tests/fixtures/layouts/good_layout.yaml
index 15be6ef..4bd5e70 100644
--- a/tests/fixtures/layouts/good_layout.yaml
+++ b/tests/fixtures/layouts/good_layout.yaml
@@ -8,9 +8,11 @@
       gerrit:
         - event: patchset-created
     success:
-      verified: 1
+      gerrit:
+        verified: 1
     failure:
-      verified: -1
+      gerrit:
+        verified: -1
 
   - name: post
     manager: IndependentPipelineManager
@@ -28,15 +30,18 @@
         - event: comment-added
           approval:
             - approved: 1
-    success:
-      verified: 2
-      code-review: 1
-      submit: true
-    failure:
-      verified: -2
-      workinprogress: true
     start:
-      verified: 0
+      gerrit:
+        verified: 0
+    success:
+      gerrit:
+        verified: 2
+        code-review: 1
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+        workinprogress: true
 
 jobs:
   - name: ^.*-merge$
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index 57eca51..081258a 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -14,3 +14,9 @@
 push_change_refs=true
 url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
 job_name_in_report=true
+
+[smtp]
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
\ No newline at end of file
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a92efea..70956b4 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
 import zuul.scheduler
 import zuul.webapp
 import zuul.launcher.gearman
+import zuul.reporter.gerrit
+import zuul.reporter.smtp
 import zuul.trigger.gerrit
 import zuul.trigger.timer
 
@@ -388,6 +390,8 @@
 
 
 class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
+    name = 'gerrit'
+
     def __init__(self, upstream_root, *args):
         super(FakeGerritTrigger, self).__init__(*args)
         self.upstream_root = upstream_root
@@ -443,6 +447,7 @@
         self.aborted = False
         self.created = time.time()
         self.description = ''
+        self.run_error = False
 
     def release(self):
         self.wait_condition.acquire()
@@ -492,7 +497,13 @@
         if self.aborted:
             result = 'ABORTED'
 
-        data = {'result': result}
+        if self.run_error:
+            work_fail = True
+            result = 'RUN_ERROR'
+        else:
+            data['result'] = result
+            work_fail = False
+
         changes = None
         if 'ZUUL_CHANGE_IDS' in self.parameters:
             changes = self.parameters['ZUUL_CHANGE_IDS']
@@ -504,7 +515,11 @@
                          pipeline=self.parameters['ZUUL_PIPELINE'])
         )
 
-        self.job.sendWorkComplete(json.dumps(data))
+        self.job.sendWorkData(json.dumps(data))
+        if work_fail:
+            self.job.sendWorkFail()
+        else:
+            self.job.sendWorkComplete(json.dumps(data))
         del self.worker.gearman_jobs[self.job.unique]
         self.worker.running_builds.remove(self)
         self.worker.lock.release()
@@ -668,6 +683,35 @@
         self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
 
 
+class FakeSMTP(object):
+    log = logging.getLogger('zuul.FakeSMTP')
+    messages = []
+
+    def __init__(self, server, port):
+        self.server = server
+        self.port = port
+
+    def sendmail(self, from_email, to_email, msg):
+        self.log.info("Sending email from %s, to %s, with msg %s" % (
+                      from_email, to_email, msg))
+
+        headers = msg.split('\n\n', 1)[0]
+        body = msg.split('\n\n', 1)[1]
+
+        FakeSMTP.messages.append(dict(
+            from_email=from_email,
+            to_email=to_email,
+            msg=msg,
+            headers=headers,
+            body=body,
+        ))
+
+        return True
+
+    def quit(self):
+        return True
+
+
 class TestScheduler(testtools.TestCase):
     log = logging.getLogger("zuul.test")
 
@@ -751,6 +795,7 @@
         self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
 
         zuul.lib.gerrit.Gerrit = FakeGerrit
+        self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTP))
 
         self.gerrit = FakeGerritTrigger(
             self.upstream_root, self.config, self.sched)
@@ -766,6 +811,14 @@
         self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
         self.sched.registerTrigger(self.timer)
 
+        self.sched.registerReporter(
+            zuul.reporter.gerrit.Reporter(self.gerrit))
+        self.smtp_reporter = zuul.reporter.smtp.Reporter(
+            self.config.get('smtp', 'default_from'),
+            self.config.get('smtp', 'default_to'),
+            self.config.get('smtp', 'server'))
+        self.sched.registerReporter(self.smtp_reporter)
+
         self.sched.start()
         self.sched.reconfigure(self.config)
         self.sched.resume()
@@ -1015,9 +1068,6 @@
                     print 'pipeline %s queue %s contents %s' % (
                         pipeline.name, queue.name, queue.queue)
                 self.assertEqual(len(queue.queue), 0)
-                if len(queue.severed_heads) != 0:
-                    print 'heads', queue.severed_heads
-                self.assertEqual(len(queue.severed_heads), 0)
 
     def assertReportedStat(self, key, value=None, kind=None):
         start = time.time()
@@ -1070,6 +1120,14 @@
         self.assertReportedStat(
             'zuul.pipeline.gate.org.project.total_changes', value='1|c')
 
+    def test_initial_pipeline_gauges(self):
+        "Test that each pipeline reported its length on start"
+        pipeline_names = self.sched.layout.pipelines.keys()
+        self.assertNotEqual(len(pipeline_names), 0)
+        for name in pipeline_names:
+            self.assertReportedStat('zuul.pipeline.%s.current_changes' % name,
+                                    value='0|g')
+
     def test_duplicate_pipelines(self):
         "Test that a change matching multiple pipelines works"
 
@@ -1305,6 +1363,82 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_failed_change_in_middle(self):
+        "Test a failed change in the middle of the queue"
+
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.worker.addFailTest('project-test1', B)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 6)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+        self.assertEqual(self.builds[4].name, 'project-test1')
+        self.assertEqual(self.builds[5].name, 'project-test2')
+
+        self.release(self.builds[2])
+        self.waitUntilSettled()
+
+        # project-test1 and project-test2 for A
+        # project-test2 for B
+        # project-merge for C (without B)
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # project-test1 and project-test2 for A
+        # project-test2 for B
+        # project-test1 and project-test2 for C
+        self.assertEqual(len(self.builds), 5)
+
+        items = self.sched.layout.pipelines['gate'].getAllItems()
+        builds = items[0].current_build_set.getBuilds()
+        self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+        self.assertEqual(self.countJobResults(builds, None), 2)
+        builds = items[1].current_build_set.getBuilds()
+        self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+        self.assertEqual(self.countJobResults(builds, 'FAILURE'), 1)
+        self.assertEqual(self.countJobResults(builds, None), 1)
+        builds = items[2].current_build_set.getBuilds()
+        self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+        self.assertEqual(self.countJobResults(builds, None), 2)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 0)
+        self.assertEqual(len(self.history), 12)
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(C.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.reported, 2)
+
     def test_failed_change_at_head_with_queue(self):
         "Test that if a change at the head fails, queued jobs are canceled"
 
@@ -1367,6 +1501,111 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_two_failed_changes_at_head(self):
+        "Test that changes are reparented correctly if 2 fail at head"
+
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.worker.addFailTest('project-test1', A)
+        self.worker.addFailTest('project-test1', B)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 6)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+        self.assertEqual(self.builds[4].name, 'project-test1')
+        self.assertEqual(self.builds[5].name, 'project-test2')
+
+        self.assertTrue(self.job_has_changes(self.builds[0], A))
+        self.assertTrue(self.job_has_changes(self.builds[2], A))
+        self.assertTrue(self.job_has_changes(self.builds[2], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], A))
+        self.assertTrue(self.job_has_changes(self.builds[4], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], C))
+
+        # Fail change B first
+        self.release(self.builds[2])
+        self.waitUntilSettled()
+
+        # restart of C after B failure
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 5)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test2')
+        self.assertEqual(self.builds[3].name, 'project-test1')
+        self.assertEqual(self.builds[4].name, 'project-test2')
+
+        self.assertTrue(self.job_has_changes(self.builds[1], A))
+        self.assertTrue(self.job_has_changes(self.builds[2], A))
+        self.assertTrue(self.job_has_changes(self.builds[2], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], A))
+        self.assertFalse(self.job_has_changes(self.builds[4], B))
+        self.assertTrue(self.job_has_changes(self.builds[4], C))
+
+        # Finish running all passing jobs for change A
+        self.release(self.builds[1])
+        self.waitUntilSettled()
+        # Fail and report change A
+        self.release(self.builds[0])
+        self.waitUntilSettled()
+
+        # restart of B,C after A failure
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')  # B
+        self.assertEqual(self.builds[1].name, 'project-test2')  # B
+        self.assertEqual(self.builds[2].name, 'project-test1')  # C
+        self.assertEqual(self.builds[3].name, 'project-test2')  # C
+
+        self.assertFalse(self.job_has_changes(self.builds[1], A))
+        self.assertTrue(self.job_has_changes(self.builds[1], B))
+        self.assertFalse(self.job_has_changes(self.builds[1], C))
+
+        self.assertFalse(self.job_has_changes(self.builds[2], A))
+        # After A failed and B and C restarted, B should be back in
+        # C's tests because it has not failed yet.
+        self.assertTrue(self.job_has_changes(self.builds[2], B))
+        self.assertTrue(self.job_has_changes(self.builds[2], C))
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 0)
+        self.assertEqual(len(self.history), 21)
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(C.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.reported, 2)
+
     def test_patch_order(self):
         "Test that dependent patches are tested in the right order"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1539,8 +1778,9 @@
         self.waitUntilSettled()
         self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        queue = self.gearman_server.getQueue()
-        self.getParameter(queue[-1], 'ZUUL_REF')
+
+        self.assertEqual(len(self.history), 2)  # A and C merge jobs
+
         self.gearman_server.hold_jobs_in_queue = False
         self.gearman_server.release()
         self.waitUntilSettled()
@@ -1551,32 +1791,7 @@
         self.assertEqual(A.reported, 2)
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
-
-    def test_dequeue_conflict(self):
-        "Test that the option to dequeue merge conflicts works"
-
-        self.gearman_server.hold_jobs_in_queue = True
-        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        A.addPatchset(['conflict'])
-        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
-        B.addPatchset(['conflict'])
-        A.addApproval('CRVW', 2)
-        B.addApproval('CRVW', 2)
-        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
-        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
-        self.waitUntilSettled()
-
-        self.assertEqual(A.reported, 1)
-        self.assertEqual(B.reported, 2)
-
-        self.gearman_server.hold_jobs_in_queue = False
-        self.gearman_server.release()
-        self.waitUntilSettled()
-
-        self.assertEqual(A.data['status'], 'MERGED')
-        self.assertEqual(B.data['status'], 'NEW')
-        self.assertEqual(A.reported, 2)
-        self.assertEqual(B.reported, 2)
+        self.assertEqual(len(self.history), 6)
 
     def test_post(self):
         "Test that post jobs run"
@@ -1760,6 +1975,66 @@
         self.assertEqual(C.reported, 2)
         self.assertEqual(len(self.history), 1)
 
+    def test_failing_dependent_changes(self):
+        "Test that failing dependent patches are taken out of stream"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+        E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+        D.addApproval('CRVW', 2)
+        E.addApproval('CRVW', 2)
+
+        # E, D -> C -> B, A
+
+        D.setDependsOn(C, 1)
+        C.setDependsOn(B, 1)
+
+        self.worker.addFailTest('project-test1', B)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.worker.hold_jobs_in_build = False
+        for build in self.builds:
+            if build.parameters['ZUUL_CHANGE'] != '1':
+                build.release()
+                self.waitUntilSettled()
+
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.data['status'], 'NEW')
+        self.assertEqual(C.reported, 2)
+        self.assertEqual(D.data['status'], 'NEW')
+        self.assertEqual(D.reported, 2)
+        self.assertEqual(E.data['status'], 'MERGED')
+        self.assertEqual(E.reported, 2)
+        self.assertEqual(len(self.history), 18)
+
     def test_head_is_dequeued_once(self):
         "Test that if a change at the head fails it is dequeued only once"
         # If it's dequeued more than once, we should see extra
@@ -2314,6 +2589,21 @@
         self.assertEqual(D.data['status'], 'MERGED')
         self.assertEqual(D.reported, 2)
 
+    def test_rerun_on_error(self):
+        "Test that if a worker fails to run a job, it is run again"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.builds[0].run_error = True
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+        self.assertEqual(self.countJobResults(self.history, 'RUN_ERROR'), 1)
+        self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
     def test_statsd(self):
         "Test each of the statsd methods used in the scheduler"
         import extras
@@ -2563,3 +2853,34 @@
                             status_jobs.add(job['name'])
         self.assertIn('project-bitrot-stable-old', status_jobs)
         self.assertIn('project-bitrot-stable-older', status_jobs)
+
+    def test_check_smtp_pool(self):
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-smtp.yaml')
+        self.sched.reconfigure(self.config)
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.waitUntilSettled()
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(len(FakeSMTP.messages), 2)
+
+        # A.messages only holds what FakeGerrit places in it. Thus we
+        # work on the knowledge of what the first message should be as
+        # it is only configured to go to SMTP.
+
+        self.assertEqual('zuul@example.com',
+                         FakeSMTP.messages[0]['from_email'])
+        self.assertEqual(['you@example.com'],
+                         FakeSMTP.messages[0]['to_email'])
+        self.assertEqual('Starting check jobs.',
+                         FakeSMTP.messages[0]['body'])
+
+        self.assertEqual('zuul_from@example.com',
+                         FakeSMTP.messages[1]['from_email'])
+        self.assertEqual(['alternative_me@example.com'],
+                         FakeSMTP.messages[1]['to_email'])
+        self.assertEqual(A.messages[0],
+                         FakeSMTP.messages[1]['body'])
diff --git a/tox.ini b/tox.ini
index 8e0ede6..a92fdb2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,11 +1,15 @@
 [tox]
-envlist = pep8, pyflakes, py27
+minversion = 1.6
+skipsdist = True
+envlist = pep8, py27
 
 [testenv]
 # Set STATSD env variables so that statsd code paths are tested.
 setenv = STATSD_HOST=localhost
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
+usedevelop = True
+install_command = pip install {opts} {packages}
 deps = -r{toxinidir}/requirements.txt
        -r{toxinidir}/test-requirements.txt
 commands =
@@ -21,11 +25,6 @@
 commands =
   python setup.py testr --coverage
 
-[testenv:pyflakes]
-deps = pyflakes
-       -r{toxinidir}/requirements.txt
-commands = pyflakes zuul setup.py
-
 [testenv:venv]
 commands = {posargs}
 
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 75f7dd9..6a699d3 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -103,8 +103,9 @@
 
     def exit_handler(self, signum, frame):
         signal.signal(signal.SIGUSR1, signal.SIG_IGN)
-        self.stop_gear_server()
         self.sched.exit()
+        self.sched.join()
+        self.stop_gear_server()
 
     def term_handler(self, signum, frame):
         self.stop_gear_server()
@@ -164,6 +165,8 @@
         # See comment at top of file about zuul imports
         import zuul.scheduler
         import zuul.launcher.gearman
+        import zuul.reporter.gerrit
+        import zuul.reporter.smtp
         import zuul.trigger.gerrit
         import zuul.trigger.timer
         import zuul.webapp
@@ -180,10 +183,23 @@
         gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
         timer = zuul.trigger.timer.Timer(self.config, self.sched)
         webapp = zuul.webapp.WebApp(self.sched)
+        gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
+        smtp_reporter = zuul.reporter.smtp.Reporter(
+            self.config.get('smtp', 'default_from')
+            if self.config.has_option('smtp', 'default_from') else 'zuul',
+            self.config.get('smtp', 'default_to')
+            if self.config.has_option('smtp', 'default_to') else 'zuul',
+            self.config.get('smtp', 'server')
+            if self.config.has_option('smtp', 'server') else 'localhost',
+            self.config.get('smtp', 'port')
+            if self.config.has_option('smtp', 'port') else 25
+        )
 
         self.sched.setLauncher(gearman)
         self.sched.registerTrigger(gerrit)
         self.sched.registerTrigger(timer)
+        self.sched.registerReporter(gerrit_reporter)
+        self.sched.registerReporter(smtp_reporter)
 
         self.sched.start()
         self.sched.reconfigure(self.config)
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 62683f4..2580397 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -324,6 +324,7 @@
     def cancel(self, build):
         self.log.info("Cancel build %s for job %s" % (build, build.job))
 
+        build.canceled = True
         if build.number is not None:
             self.log.debug("Build %s has already started" % build)
             self.cancelRunningBuild(build)
@@ -353,15 +354,16 @@
 
         build = self.builds.get(job.unique)
         if build:
-            if result is None:
-                data = getJobData(job)
-                result = data.get('result')
-            if result is None:
-                result = 'LOST'
-            self.log.info("Build %s complete, result %s" %
-                          (job, result))
-            build.result = result
-            self.sched.onBuildCompleted(build)
+            if not build.canceled:
+                if result is None:
+                    data = getJobData(job)
+                    result = data.get('result')
+                if result is None:
+                    build.retry = True
+                self.log.info("Build %s complete, result %s" %
+                              (job, result))
+                build.result = result
+                self.sched.onBuildCompleted(build)
             # The test suite expects the build to be removed from the
             # internal dict after it's added to the report queue.
             del self.builds[job.unique]
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 7f80d64..0d08f1b 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -54,6 +54,9 @@
     trigger = v.Required(v.Any({'gerrit': toList(gerrit_trigger)},
                                {'timer': toList(timer_trigger)}))
 
+    report_actions = {'gerrit': variable_dict,
+                      'smtp': variable_dict}
+
     pipeline = {v.Required('name'): str,
                 v.Required('manager'): manager,
                 'precedence': precedence,
@@ -61,11 +64,10 @@
                 'success-message': str,
                 'failure-message': str,
                 'dequeue-on-new-patchset': bool,
-                'dequeue-on-conflict': bool,
                 'trigger': trigger,
-                'success': variable_dict,
-                'failure': variable_dict,
-                'start': variable_dict,
+                'success': report_actions,
+                'failure': report_actions,
+                'start': report_actions,
                 }
     pipelines = [pipeline]
 
@@ -161,7 +163,8 @@
             self.templates_schemas[t_name] = v.Schema(schema)
 
         project = {'name': str,
-                   'merge-mode': v.Any('cherry-pick'),
+                   'merge-mode': v.Any('merge', 'merge-resolve,',
+                                       'cherry-pick'),
                    'template': self.validateTemplateCalls,
                    }
 
diff --git a/zuul/merger.py b/zuul/merger.py
index 94db499..218f7f2 100644
--- a/zuul/merger.py
+++ b/zuul/merger.py
@@ -88,11 +88,15 @@
         self.fetch(ref)
         self.repo.git.cherry_pick("FETCH_HEAD")
 
-    def merge(self, ref):
+    def merge(self, ref, strategy=None):
         self._ensure_cloned()
-        self.log.debug("Merging %s" % ref)
+        args = []
+        if strategy:
+            args += ['-s', strategy]
+        args.append('FETCH_HEAD')
         self.fetch(ref)
-        self.repo.git.merge("FETCH_HEAD")
+        self.log.debug("Merging %s with args %s" % (ref, args))
+        self.repo.git.merge(*args)
 
     def fetch(self, ref):
         self._ensure_cloned()
@@ -186,7 +190,7 @@
         except:
             self.log.exception("Unable to update %s", project)
 
-    def _mergeChange(self, change, ref, target_ref, mode):
+    def _mergeChange(self, change, ref, target_ref):
         repo = self.getRepo(change.project)
         try:
             repo.checkout(ref)
@@ -195,13 +199,16 @@
             return False
 
         try:
-            if not mode:
-                mode = change.project.merge_mode
-            if mode == model.MERGE_IF_NECESSARY:
+            mode = change.project.merge_mode
+            if mode == model.MERGER_MERGE:
                 repo.merge(change.refspec)
-            elif mode == model.CHERRY_PICK:
+            elif mode == model.MERGER_MERGE_RESOLVE:
+                repo.merge(change.refspec, 'resolve')
+            elif mode == model.MERGER_CHERRY_PICK:
                 repo.cherryPick(change.refspec)
-        except:
+            else:
+                raise Exception("Unsupported merge mode: %s" % mode)
+        except Exception:
             # Log exceptions at debug level because they are
             # usually benign merge conflicts
             self.log.debug("Unable to merge %s" % change, exc_info=True)
@@ -219,7 +226,7 @@
             return False
         return commit
 
-    def mergeChanges(self, items, target_ref=None, mode=None):
+    def mergeChanges(self, items, target_ref=None):
         # Merge shortcuts:
         # if this is the only change just merge it against its branch.
         # elif there are changes ahead of us that are from the same project and
@@ -244,13 +251,13 @@
                 return False
             commit = self._mergeChange(item.change,
                                        repo.getBranchHead(item.change.branch),
-                                       target_ref=target_ref, mode=mode)
+                                       target_ref=target_ref)
         # Sibling changes exist. Merge current change against newest sibling.
         elif (len(sibling_items) >= 2 and
               sibling_items[-2].current_build_set.commit):
             last_commit = sibling_items[-2].current_build_set.commit
             commit = self._mergeChange(item.change, last_commit,
-                                       target_ref=target_ref, mode=mode)
+                                       target_ref=target_ref)
         # Either change did not merge or we did not need to merge as there were
         # previous merge conflicts.
         if not commit:
diff --git a/zuul/model.py b/zuul/model.py
index 440f3ba..44780f7 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -15,12 +15,21 @@
 import re
 import time
 from uuid import uuid4
+import extras
+
+OrderedDict = extras.try_imports(['collections.OrderedDict',
+                                  'ordereddict.OrderedDict'])
 
 
-FAST_FORWARD_ONLY = 1
-MERGE_ALWAYS = 2
-MERGE_IF_NECESSARY = 3
-CHERRY_PICK = 4
+MERGER_MERGE = 1          # "git merge"
+MERGER_MERGE_RESOLVE = 2  # "git merge -s resolve"
+MERGER_CHERRY_PICK = 3    # "git cherry-pick"
+
+MERGER_MAP = {
+    'merge': MERGER_MERGE,
+    'merge-resolve': MERGER_MERGE_RESOLVE,
+    'cherry-pick': MERGER_CHERRY_PICK,
+}
 
 PRECEDENCE_NORMAL = 0
 PRECEDENCE_LOW = 1
@@ -42,12 +51,14 @@
         self.failure_message = None
         self.success_message = None
         self.dequeue_on_new_patchset = True
-        self.dequeue_on_conflict = True
         self.job_trees = {}  # project -> JobTree
         self.manager = None
         self.queues = []
         self.precedence = PRECEDENCE_NORMAL
         self.trigger = None
+        self.start_actions = None
+        self.success_actions = None
+        self.failure_actions = None
 
     def __repr__(self):
         return '<Pipeline %s>' % self.name
@@ -157,12 +168,15 @@
                 return True
             if build.result != 'SUCCESS':
                 return True
+
         if not item.item_ahead:
             return False
         return self.isHoldingFollowingChanges(item.item_ahead)
 
     def setResult(self, item, build):
-        if build.result != 'SUCCESS':
+        if build.retry:
+            item.removeBuild(build)
+        elif build.result != 'SUCCESS':
             # Get a JobTree from a Job so we can find only its dependent jobs
             root = self.getJobTree(item.change.project)
             tree = root.getJobTreeForJob(build.job)
@@ -198,7 +212,6 @@
         items = []
         for shared_queue in self.queues:
             items.extend(shared_queue.queue)
-            items.extend(shared_queue.severed_heads)
         return items
 
     def formatStatusHTML(self):
@@ -208,8 +221,8 @@
                 s = 'Change queue: %s' % queue.name
                 ret += s + '\n'
                 ret += '-' * len(s) + '\n'
-            for head in queue.getHeads():
-                ret += self.formatStatus(head, html=True)
+            for item in queue.queue:
+                ret += self.formatStatus(item, html=True)
         return ret
 
     def formatStatusJSON(self):
@@ -221,18 +234,21 @@
             j_queue = dict(name=queue.name)
             j_queues.append(j_queue)
             j_queue['heads'] = []
-            for head in queue.getHeads():
-                j_changes = []
-                e = head
-                while e:
-                    j_changes.append(self.formatItemJSON(e))
-                    if (len(j_changes) > 1 and
-                        (j_changes[-2]['remaining_time'] is not None) and
-                        (j_changes[-1]['remaining_time'] is not None)):
-                        j_changes[-1]['remaining_time'] = max(
-                            j_changes[-2]['remaining_time'],
-                            j_changes[-1]['remaining_time'])
-                    e = e.item_behind
+
+            j_changes = []
+            for e in queue.queue:
+                if not e.item_ahead:
+                    if j_changes:
+                        j_queue['heads'].append(j_changes)
+                    j_changes = []
+                j_changes.append(self.formatItemJSON(e))
+                if (len(j_changes) > 1 and
+                    (j_changes[-2]['remaining_time'] is not None) and
+                    (j_changes[-1]['remaining_time'] is not None)):
+                    j_changes[-1]['remaining_time'] = max(
+                        j_changes[-2]['remaining_time'],
+                        j_changes[-1]['remaining_time'])
+            if j_changes:
                 j_queue['heads'].append(j_changes)
         return j_pipeline
 
@@ -247,9 +263,11 @@
                 changeish.url,
                 changeish._id())
         else:
-            ret += '%sProject %s change %s\n' % (indent_str,
-                                                 changeish.project.name,
-                                                 changeish._id())
+            ret += '%sProject %s change %s based on %s\n' % (
+                indent_str,
+                changeish.project.name,
+                changeish._id(),
+                item.item_ahead)
         for job in self.getJobs(changeish):
             build = item.current_build_set.getBuild(job.name)
             if build:
@@ -270,9 +288,6 @@
                     job_name = '<a href="%s">%s</a>' % (url, job_name)
             ret += '%s  %s: %s%s' % (indent_str, job_name, result, voting)
             ret += '\n'
-        if item.item_behind:
-            ret += '%sFollowed by:\n' % (indent_str)
-            ret += self.formatStatus(item.item_behind, indent + 2, html)
         return ret
 
     def formatItemJSON(self, item):
@@ -283,6 +298,13 @@
         else:
             ret['url'] = None
         ret['id'] = changeish._id()
+        if item.item_ahead:
+            ret['item_ahead'] = item.item_ahead.change._id()
+        else:
+            ret['item_ahead'] = None
+        ret['items_behind'] = [i.change._id() for i in item.items_behind]
+        ret['failing_reasons'] = item.current_build_set.failing_reasons
+        ret['zuul_ref'] = item.current_build_set.ref
         ret['project'] = changeish.project.name
         ret['enqueue_time'] = int(item.enqueue_time * 1000)
         ret['jobs'] = []
@@ -319,24 +341,34 @@
                     result=result,
                     voting=job.voting))
         if self.haveAllJobsStarted(item):
-            # if a change ahead has failed, we are unknown.
-            item_ahead_failed = False
-            i = item.item_ahead
-            while i:
-                if self.didAnyJobFail(i):
-                    item_ahead_failed = True
-                    i = None  # safe to stop looking
-                else:
-                    i = i.item_ahead
-            if item_ahead_failed:
-                ret['remaining_time'] = None
-            else:
-                ret['remaining_time'] = max_remaining
+            ret['remaining_time'] = max_remaining
         else:
             ret['remaining_time'] = None
         return ret
 
 
+class ActionReporter(object):
+    """An ActionReporter has a reporter and its configured paramaters"""
+
+    def __repr__(self):
+        return '<ActionReporter %s, %s>' % (self.reporter, self.params)
+
+    def __init__(self, reporter, params):
+        self.reporter = reporter
+        self.params = params
+
+    def report(self, change, message):
+        """Sends the built message off to the configured reporter.
+        Takes the change and message and adds the configured parameters.
+        """
+        return self.reporter.report(change, message, self.params)
+
+    def getSubmitAllowNeeds(self):
+        """Gets the submit allow needs from the reporter based off the
+        parameters."""
+        return self.reporter.getSubmitAllowNeeds(self.params)
+
+
 class ChangeQueue(object):
     """DependentPipelines have multiple parallel queues shared by
     different projects; this is one of them.  For instance, there may
@@ -349,7 +381,6 @@
         self.projects = []
         self._jobs = set()
         self.queue = []
-        self.severed_heads = []
         self.dependent = dependent
 
     def __repr__(self):
@@ -375,50 +406,50 @@
     def enqueueItem(self, item):
         if self.dependent and self.queue:
             item.item_ahead = self.queue[-1]
-            item.item_ahead.item_behind = item
+            item.item_ahead.items_behind.append(item)
         self.queue.append(item)
 
     def dequeueItem(self, item):
         if item in self.queue:
             self.queue.remove(item)
-        if item in self.severed_heads:
-            self.severed_heads.remove(item)
         if item.item_ahead:
-            item.item_ahead.item_behind = item.item_behind
-        if item.item_behind:
-            item.item_behind.item_ahead = item.item_ahead
+            item.item_ahead.items_behind.remove(item)
+        for item_behind in item.items_behind:
+            if item.item_ahead:
+                item.item_ahead.items_behind.append(item_behind)
+            item_behind.item_ahead = item.item_ahead
         item.item_ahead = None
-        item.item_behind = None
+        item.items_behind = []
         item.dequeue_time = time.time()
 
-    def addSeveredHead(self, item):
-        self.severed_heads.append(item)
+    def moveItem(self, item, item_ahead):
+        if not self.dependent:
+            return False
+        if item.item_ahead == item_ahead:
+            return False
+        # Remove from current location
+        if item.item_ahead:
+            item.item_ahead.items_behind.remove(item)
+        for item_behind in item.items_behind:
+            if item.item_ahead:
+                item.item_ahead.items_behind.append(item_behind)
+            item_behind.item_ahead = item.item_ahead
+        # Add to new location
+        item.item_ahead = item_ahead
+        item.items_behind = []
+        if item.item_ahead:
+            item.item_ahead.items_behind.append(item)
+        return True
 
     def mergeChangeQueue(self, other):
         for project in other.projects:
             self.addProject(project)
 
-    def getHead(self):
-        if not self.queue:
-            return None
-        return self.queue[0]
-
-    def getHeads(self):
-        heads = []
-        if self.dependent:
-            h = self.getHead()
-            if h:
-                heads.append(h)
-        else:
-            heads.extend(self.queue)
-        heads.extend(self.severed_heads)
-        return heads
-
 
 class Project(object):
     def __init__(self, name):
         self.name = name
-        self.merge_mode = MERGE_IF_NECESSARY
+        self.merge_mode = MERGER_MERGE_RESOLVE
 
     def __str__(self):
         return self.name
@@ -536,6 +567,8 @@
         self.end_time = None
         self.estimated_time = None
         self.pipeline = None
+        self.canceled = False
+        self.retry = False
         self.parameters = {}
 
     def __repr__(self):
@@ -554,6 +587,7 @@
         self.commit = None
         self.unable_to_merge = False
         self.unable_to_merge_message = None
+        self.failing_reasons = []
 
     def setConfiguration(self):
         # The change isn't enqueued until after it's created
@@ -571,6 +605,9 @@
         self.builds[build.job.name] = build
         build.build_set = self
 
+    def removeBuild(self, build):
+        del self.builds[build.job.name]
+
     def getBuild(self, job_name):
         return self.builds.get(job_name)
 
@@ -591,11 +628,19 @@
         self.current_build_set = BuildSet(self)
         self.build_sets.append(self.current_build_set)
         self.item_ahead = None
-        self.item_behind = None
+        self.items_behind = []
         self.enqueue_time = None
         self.dequeue_time = None
         self.reported = False
 
+    def __repr__(self):
+        if self.pipeline:
+            pipeline = self.pipeline.name
+        else:
+            pipeline = None
+        return '<QueueItem 0x%x for %s in %s>' % (
+            id(self), self.change, pipeline)
+
     def resetAllBuilds(self):
         old = self.current_build_set
         self.current_build_set.result = 'CANCELED'
@@ -608,6 +653,9 @@
         self.current_build_set.addBuild(build)
         build.pipeline = self.pipeline
 
+    def removeBuild(self, build):
+        self.current_build_set.removeBuild(build)
+
     def setReportedResult(self, result):
         self.current_build_set.result = result
 
@@ -898,7 +946,7 @@
 class Layout(object):
     def __init__(self):
         self.projects = {}
-        self.pipelines = {}
+        self.pipelines = OrderedDict()
         self.jobs = {}
         self.metajobs = []
 
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/reporter/__init__.py
diff --git a/zuul/reporter/gerrit.py b/zuul/reporter/gerrit.py
new file mode 100644
index 0000000..cceacca
--- /dev/null
+++ b/zuul/reporter/gerrit.py
@@ -0,0 +1,49 @@
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+
+class Reporter(object):
+    """Sends off reports to Gerrit."""
+
+    name = 'gerrit'
+    log = logging.getLogger("zuul.reporter.gerrit.Reporter")
+
+    def __init__(self, trigger):
+        """Set up the reporter."""
+        self.gerrit = trigger.gerrit
+        self.trigger = trigger
+
+    def report(self, change, message, params):
+        """Send a message to gerrit."""
+        self.log.debug("Report change %s, params %s, message: %s" %
+                       (change, params, message))
+        if not params:
+            self.log.debug("Not reporting change %s: No params specified." %
+                           change)
+            return
+        changeid = '%s,%s' % (change.number, change.patchset)
+        change._ref_sha = self.trigger.getRefSha(change.project.name,
+                                                 'refs/heads/' + change.branch)
+        return self.gerrit.review(change.project.name, changeid, message,
+                                  params)
+
+    def getSubmitAllowNeeds(self, params):
+        """Get a list of code review labels that are allowed to be
+        "needed" in the submit records for a change, with respect
+        to this queue.  In other words, the list of review labels
+        this reporter itself is likely to set before submitting.
+        """
+        return params
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
new file mode 100644
index 0000000..66dcd45
--- /dev/null
+++ b/zuul/reporter/smtp.py
@@ -0,0 +1,67 @@
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import smtplib
+
+from email.mime.text import MIMEText
+
+
+class Reporter(object):
+    """Sends off reports to emails via SMTP."""
+
+    name = 'smtp'
+    log = logging.getLogger("zuul.reporter.smtp.Reporter")
+
+    def __init__(self, smtp_default_from, smtp_default_to,
+                 smtp_server='localhost', smtp_port=25):
+        """Set up the reporter.
+
+        Takes parameters for the smtp server.
+        """
+        self.smtp_server = smtp_server
+        self.smtp_port = smtp_port
+        self.smtp_default_from = smtp_default_from
+        self.smtp_default_to = smtp_default_to
+
+    def report(self, change, message, params):
+        """Send the compiled report message via smtp."""
+        self.log.debug("Report change %s, params %s, message: %s" %
+                       (change, params, message))
+
+        # Create a text/plain email message
+        from_email = params['from']\
+            if 'from' in params else self.smtp_default_from
+        to_email = params['to']\
+            if 'to' in params else self.smtp_default_to
+        msg = MIMEText(message)
+        msg['Subject'] = "Report change %s" % change
+        msg['From'] = from_email
+        msg['To'] = to_email
+
+        try:
+            s = smtplib.SMTP(self.smtp_server, self.smtp_port)
+            s.sendmail(from_email, to_email.split(','), msg.as_string())
+            s.quit()
+        except:
+            return "Could not send email via SMTP"
+        return
+
+    def getSubmitAllowNeeds(self, params):
+        """Get a list of code review labels that are allowed to be
+        "needed" in the submit records for a change, with respect
+        to this queue.  In other words, the list of review labels
+        this reporter itself is likely to set before submitting.
+        """
+        return []
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 27c5544..8b6c20c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -28,7 +28,7 @@
 
 import layoutvalidator
 import model
-from model import Pipeline, Project, ChangeQueue, EventFilter
+from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
 import merger
 
 statsd = extras.try_import('statsd.statsd')
@@ -75,6 +75,7 @@
         self._stopped = False
         self.launcher = None
         self.triggers = dict()
+        self.reporters = dict()
         self.config = None
         self._maintain_trigger_cache = False
 
@@ -132,15 +133,27 @@
                                                          "Build succeeded.")
             pipeline.dequeue_on_new_patchset = conf_pipeline.get(
                 'dequeue-on-new-patchset', True)
-            pipeline.dequeue_on_conflict = conf_pipeline.get(
-                'dequeue-on-conflict', True)
+
+            action_reporters = {}
+            for action in ['start', 'success', 'failure']:
+                action_reporters[action] = []
+                if conf_pipeline.get(action):
+                    for reporter_name, params \
+                        in conf_pipeline.get(action).items():
+                        if reporter_name in self.reporters.keys():
+                            action_reporters[action].append(ActionReporter(
+                                self.reporters[reporter_name], params))
+                        else:
+                            self.log.error('Invalid reporter name %s' %
+                                           reporter_name)
+            pipeline.start_actions = action_reporters['start']
+            pipeline.success_actions = action_reporters['success']
+            pipeline.failure_actions = action_reporters['failure']
+
             manager = globals()[conf_pipeline['manager']](self, pipeline)
             pipeline.setManager(manager)
-
             layout.pipelines[conf_pipeline['name']] = pipeline
-            manager.success_action = conf_pipeline.get('success')
-            manager.failure_action = conf_pipeline.get('failure')
-            manager.start_action = conf_pipeline.get('start')
+
             # TODO: move this into triggers (may require pluggable
             # configuration)
             if 'gerrit' in conf_pipeline['trigger']:
@@ -238,9 +251,8 @@
                 config_project.update(expanded)
 
             layout.projects[config_project['name']] = project
-            mode = config_project.get('merge-mode')
-            if mode and mode == 'cherry-pick':
-                project.merge_mode = model.CHERRY_PICK
+            mode = config_project.get('merge-mode', 'merge-resolve')
+            project.merge_mode = model.MERGER_MAP[mode]
             for pipeline in layout.pipelines.values():
                 if pipeline.name in config_project:
                     job_tree = pipeline.addProject(project)
@@ -300,6 +312,11 @@
             name = trigger.name
         self.triggers[name] = trigger
 
+    def registerReporter(self, reporter, name=None):
+        if name is None:
+            name = reporter.name
+        self.reporters[name] = reporter
+
     def getProject(self, name):
         self.layout_lock.acquire()
         p = None
@@ -339,6 +356,8 @@
                     dt = int((build.end_time - build.start_time) * 1000)
                     statsd.timing(key, dt)
                 statsd.incr(key)
+                key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+                statsd.incr(key)
         except:
             self.log.exception("Exception reporting runtime stats")
         self.result_event_queue.put(('completed', build))
@@ -435,10 +454,9 @@
                                name)
                 items_to_remove = []
                 for shared_queue in old_pipeline.queues:
-                    for item in (shared_queue.queue +
-                                 shared_queue.severed_heads):
+                    for item in shared_queue.queue:
                         item.item_ahead = None
-                        item.item_behind = None
+                        item.items_behind = []
                         item.pipeline = None
                         project = layout.projects.get(item.change.project.name)
                         if not project:
@@ -449,9 +467,7 @@
                             items_to_remove.append(item)
                             continue
                         item.change.project = project
-                        severed = item in shared_queue.severed_heads
-                        if not new_pipeline.manager.reEnqueueItem(
-                            item, severed=severed):
+                        if not new_pipeline.manager.reEnqueueItem(item):
                             items_to_remove.append(item)
                 builds_to_remove = []
                 for build, item in old_pipeline.manager.building_jobs.items():
@@ -468,6 +484,16 @@
             self._setupMerger()
             for trigger in self.triggers.values():
                 trigger.postConfig()
+            if statsd:
+                try:
+                    for pipeline in self.layout.pipelines.values():
+                        items = len(pipeline.getAllItems())
+                        # stats.gauges.zuul.pipeline.NAME.current_changes
+                        key = 'zuul.pipeline.%s' % pipeline.name
+                        statsd.gauge(key + '.current_changes', items)
+                except Exception:
+                    self.log.exception("Exception reporting initial "
+                                       "pipeline stats:")
             self._reconfigure = False
             self.reconfigure_complete_event.set()
         finally:
@@ -532,9 +558,11 @@
     def maintainTriggerCache(self):
         relevant = set()
         for pipeline in self.layout.pipelines.values():
+            self.log.debug("Start maintain trigger cache for: %s" % pipeline)
             for item in pipeline.getAllItems():
                 relevant.add(item.change)
                 relevant.update(item.change.getRelatedChanges())
+            self.log.debug("End maintain trigger cache for: %s" % pipeline)
         self.log.debug("Trigger cache size: %s" % len(relevant))
         for trigger in self.triggers.values():
             trigger.maintainCache(relevant)
@@ -550,7 +578,7 @@
             return
 
         # Preprocessing for ref-update events
-        if hasattr(event, 'refspec'):
+        if event.ref:
             # Make sure the local git repo is up-to-date with the remote one.
             # We better have the new ref before enqueuing the changes.
             # This is done before enqueuing the changes to avoid calling an
@@ -598,7 +626,6 @@
             ret += '</p>'
 
         keys = self.layout.pipelines.keys()
-        keys.sort()
         for key in keys:
             pipeline = self.layout.pipelines[key]
             s = 'Pipeline: %s' % pipeline.name
@@ -629,7 +656,6 @@
         pipelines = []
         data['pipelines'] = pipelines
         keys = self.layout.pipelines.keys()
-        keys.sort()
         for key in keys:
             pipeline = self.layout.pipelines[key]
             pipelines.append(pipeline.formatStatusJSON())
@@ -644,9 +670,6 @@
         self.pipeline = pipeline
         self.building_jobs = {}
         self.event_filters = []
-        self.success_action = {}
-        self.failure_action = {}
-        self.start_action = {}
         if self.sched.config and self.sched.config.has_option(
             'zuul', 'report_times'):
             self.report_times = self.sched.config.getboolean(
@@ -690,25 +713,22 @@
             if tree:
                 self.log.info("    %s" % p)
                 log_jobs(tree)
-        if self.start_action:
-            self.log.info("  On start:")
-            self.log.info("    %s" % self.start_action)
-        if self.success_action:
-            self.log.info("  On success:")
-            self.log.info("    %s" % self.success_action)
-        if self.failure_action:
-            self.log.info("  On failure:")
-            self.log.info("    %s" % self.failure_action)
+        self.log.info("  On start:")
+        self.log.info("    %s" % self.pipeline.start_actions)
+        self.log.info("  On success:")
+        self.log.info("    %s" % self.pipeline.success_actions)
+        self.log.info("  On failure:")
+        self.log.info("    %s" % self.pipeline.failure_actions)
 
     def getSubmitAllowNeeds(self):
         # Get a list of code review labels that are allowed to be
         # "needed" in the submit records for a change, with respect
         # to this queue.  In other words, the list of review labels
         # this queue itself is likely to set before submitting.
-        if self.success_action:
-            return self.success_action.keys()
-        else:
-            return {}
+        allow_needs = set()
+        for action_reporter in self.pipeline.success_actions:
+            allow_needs.update(action_reporter.getSubmitAllowNeeds())
+        return allow_needs
 
     def eventMatches(self, event):
         for ef in self.event_filters:
@@ -725,17 +745,38 @@
     def reportStart(self, change):
         try:
             self.log.info("Reporting start, action %s change %s" %
-                          (self.start_action, change))
+                          (self.pipeline.start_actions, change))
             msg = "Starting %s jobs." % self.pipeline.name
             if self.sched.config.has_option('zuul', 'status_url'):
                 msg += "\n" + self.sched.config.get('zuul', 'status_url')
-            ret = self.pipeline.trigger.report(change, msg, self.start_action)
+            ret = self.sendReport(self.pipeline.start_actions,
+                                  change, msg)
             if ret:
                 self.log.error("Reporting change start %s received: %s" %
                                (change, ret))
         except:
             self.log.exception("Exception while reporting start:")
 
+    def sendReport(self, action_reporters, change, message):
+        """Sends the built message off to configured reporters.
+
+        Takes the action_reporters, change, message and extra options and
+        sends them to the pluggable reporters.
+        """
+        report_errors = []
+        if len(action_reporters) > 0:
+            if not change.number:
+                self.log.debug("Not reporting change %s: No number present."
+                               % change)
+                return
+            for action_reporter in action_reporters:
+                ret = action_reporter.report(change, message)
+                if ret:
+                    report_errors.append(ret)
+            if len(report_errors) == 0:
+                return
+        return report_errors
+
     def isChangeReadyToBeEnqueued(self, change):
         return True
 
@@ -748,6 +789,9 @@
     def checkForChangesNeededBy(self, change):
         return True
 
+    def getFailingDependentItem(self, item):
+        return None
+
     def getDependentItems(self, item):
         orig_item = item
         items = []
@@ -759,6 +803,12 @@
                        [x.change for x in items]))
         return items
 
+    def getItemForChange(self, change):
+        for item in self.pipeline.getAllItems():
+            if item.change.equals(change):
+                return item
+        return None
+
     def findOldVersionOfChangeAlreadyInQueue(self, change):
         for c in self.pipeline.getChangesInQueue():
             if change.isUpdateOf(c):
@@ -774,15 +824,12 @@
                            (change, old_change, old_change))
             self.removeChange(old_change)
 
-    def reEnqueueItem(self, item, severed=False):
+    def reEnqueueItem(self, item):
         change_queue = self.pipeline.getQueue(item.change.project)
         if change_queue:
             self.log.debug("Re-enqueing change %s in queue %s" %
                            (item.change, change_queue))
-            if severed:
-                change_queue.addSeveredHead(item)
-            else:
-                change_queue.enqueueItem(item)
+            change_queue.enqueueItem(item)
             self.reportStats(item)
             return True
         else:
@@ -813,7 +860,7 @@
         if change_queue:
             self.log.debug("Adding change %s to queue %s" %
                            (change, change_queue))
-            if self.start_action:
+            if len(self.pipeline.start_actions) > 0:
                 self.reportStart(change)
             item = change_queue.enqueueChange(change)
             self.reportStats(item)
@@ -823,15 +870,10 @@
                            change.project)
             return False
 
-    def dequeueItem(self, item, keep_severed_heads=True):
+    def dequeueItem(self, item):
         self.log.debug("Removing change %s from queue" % item.change)
-        item_ahead = item.item_ahead
         change_queue = self.pipeline.getQueue(item.change.project)
         change_queue.dequeueItem(item)
-        if (keep_severed_heads and not item_ahead and
-            (item.change.is_reportable and not item.reported)):
-            self.log.debug("Adding %s as a severed head" % item.change)
-            change_queue.addSeveredHead(item)
         self.sched._maintain_trigger_cache = True
 
     def removeChange(self, change):
@@ -842,7 +884,8 @@
                 self.log.debug("Canceling builds behind change: %s "
                                "because it is being removed." % item.change)
                 self.cancelJobs(item)
-                self.dequeueItem(item, keep_severed_heads=False)
+                self.dequeueItem(item)
+                self.reportStats(item)
 
     def prepareRef(self, item):
         # Returns False on success.
@@ -854,29 +897,14 @@
             ref = item.current_build_set.ref
             dependent_items = self.getDependentItems(item)
             dependent_items.reverse()
-            dependent_str = ', '.join(
-                ['%s' % i.change.number for i in dependent_items
-                 if i.change.project == item.change.project])
-            if dependent_str:
-                msg = \
-                    "This change was unable to be automatically merged "\
-                    "with the current state of the repository and the "\
-                    "following changes which were enqueued ahead of it: "\
-                    "%s. Please rebase your change and upload a new "\
-                    "patchset." % dependent_str
-            else:
-                msg = "This change was unable to be automatically merged "\
-                    "with the current state of the repository. Please "\
-                    "rebase your change and upload a new patchset."
             all_items = dependent_items + [item]
-            if (dependent_items and
-                not dependent_items[-1].current_build_set.commit):
-                self.pipeline.setUnableToMerge(item, msg)
-                return True
             commit = self.sched.merger.mergeChanges(all_items, ref)
             item.current_build_set.commit = commit
             if not commit:
                 self.log.info("Unable to merge change %s" % item.change)
+                msg = ("This change was unable to be automatically merged "
+                       "with the current state of the repository. Please "
+                       "rebase your change and upload a new patchset.")
                 self.pipeline.setUnableToMerge(item, msg)
                 return True
         return False
@@ -924,74 +952,99 @@
             self.log.debug("Removing build %s from running builds" % build)
             build.result = 'CANCELED'
             del self.building_jobs[build]
-        if item.item_behind:
+        for item_behind in item.items_behind:
             self.log.debug("Canceling jobs for change %s, behind change %s" %
-                           (item.item_behind.change, item.change))
-            if self.cancelJobs(item.item_behind, prime=prime):
+                           (item_behind.change, item.change))
+            if self.cancelJobs(item_behind, prime=prime):
                 canceled = True
         return canceled
 
-    def _processOneItem(self, item):
+    def _processOneItem(self, item, nnfi):
         changed = False
         item_ahead = item.item_ahead
-        item_behind = item.item_behind
-        if self.prepareRef(item):
-            changed = True
-            if self.pipeline.dequeue_on_conflict:
-                self.log.info("Dequeuing change %s because "
-                              "of a git merge error" % item.change)
-                self.dequeueItem(item, keep_severed_heads=False)
-                try:
-                    self.reportItem(item)
-                except MergeFailure:
-                    pass
-                return changed
+        change_queue = self.pipeline.getQueue(item.change.project)
+        failing_reasons = []  # Reasons this item is failing
+
         if self.checkForChangesNeededBy(item.change) is not True:
             # It's not okay to enqueue this change, we should remove it.
             self.log.info("Dequeuing change %s because "
                           "it can no longer merge" % item.change)
             self.cancelJobs(item)
-            self.dequeueItem(item, keep_severed_heads=False)
+            self.dequeueItem(item)
             self.pipeline.setDequeuedNeedingChange(item)
             try:
                 self.reportItem(item)
             except MergeFailure:
                 pass
-            changed = True
-            return changed
-        if not item_ahead:
-            merge_failed = False
-            if self.pipeline.areAllJobsComplete(item):
-                try:
-                    self.reportItem(item)
-                except MergeFailure:
-                    merge_failed = True
-                self.dequeueItem(item)
-                changed = True
-            if merge_failed or self.pipeline.didAnyJobFail(item):
-                if item_behind:
-                    self.cancelJobs(item_behind)
-                    changed = True
-                    self.dequeueItem(item)
+            return (True, nnfi)
+        dep_item = self.getFailingDependentItem(item)
+        if dep_item:
+            failing_reasons.append('a needed change is failing')
+            self.cancelJobs(item, prime=False)
         else:
-            if self.pipeline.didAnyJobFail(item):
-                if item_behind:
-                    if self.cancelJobs(item_behind, prime=False):
-                        changed = True
-                # don't restart yet; this change will eventually become
-                # the head
+            item_ahead_merged = False
+            if ((item_ahead and item_ahead.change.is_merged) or
+                not change_queue.dependent):
+                item_ahead_merged = True
+            if (item_ahead != nnfi and not item_ahead_merged):
+                # Our current base is different than what we expected,
+                # and it's not because our current base merged.  Something
+                # ahead must have failed.
+                self.log.info("Resetting builds for change %s because the "
+                              "item ahead, %s, is not the nearest non-failing "
+                              "item, %s" % (item.change, item_ahead, nnfi))
+                change_queue.moveItem(item, nnfi)
+                changed = True
+                self.cancelJobs(item)
+            self.prepareRef(item)
+            if item.current_build_set.unable_to_merge:
+                failing_reasons.append("it has a merge conflict")
         if self.launchJobs(item):
             changed = True
-        return changed
+        if self.pipeline.didAnyJobFail(item):
+            failing_reasons.append("at least one job failed")
+        if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
+            try:
+                self.reportItem(item)
+            except MergeFailure:
+                failing_reasons.append("it did not merge")
+                for item_behind in item.items_behind:
+                    self.log.info("Resetting builds for change %s because the "
+                                  "item ahead, %s, failed to merge" %
+                                  (item_behind.change, item))
+                    self.cancelJobs(item_behind)
+            self.dequeueItem(item)
+            changed = True
+        elif not failing_reasons:
+            nnfi = item
+        item.current_build_set.failing_reasons = failing_reasons
+        if failing_reasons:
+            self.log.debug("%s is a failing item because %s" %
+                           (item, failing_reasons))
+        return (changed, nnfi)
 
     def processQueue(self):
         # Do whatever needs to be done for each change in the queue
         self.log.debug("Starting queue processor: %s" % self.pipeline.name)
         changed = False
-        for item in self.pipeline.getAllItems():
-            if self._processOneItem(item):
+        for queue in self.pipeline.queues:
+            queue_changed = False
+            nnfi = None  # Nearest non-failing item
+            for item in queue.queue[:]:
+                item_changed, nnfi = self._processOneItem(item, nnfi)
+                if item_changed:
+                    queue_changed = True
+                self.reportStats(item)
+            if queue_changed:
                 changed = True
-            self.reportStats(item)
+                status = ''
+                for item in queue.queue:
+                    status += self.pipeline.formatStatus(item)
+                if status:
+                    self.log.debug("Queue %s status is now:\n %s" %
+                                   (queue.name, status))
+        self.log.debug("Finished queue processor: %s (changed: %s)" %
+                       (self.pipeline.name, changed))
         return changed
 
     def updateBuildDescriptions(self, build_set):
@@ -1030,8 +1083,8 @@
         del self.building_jobs[build]
 
         self.pipeline.setResult(change, build)
-        self.log.info("Change %s status is now:\n %s" %
-                      (change, self.pipeline.formatStatus(change)))
+        self.log.debug("Change %s status is now:\n %s" %
+                       (change, self.pipeline.formatStatus(change)))
         self.updateBuildDescriptions(build.build_set)
         while self.processQueue():
             pass
@@ -1060,21 +1113,21 @@
         if item.change.is_reportable and item.reported:
             return 0
         self.log.debug("Reporting change %s" % item.change)
-        ret = None
+        ret = True  # Means error as returned by trigger.report
         if self.pipeline.didAllJobsSucceed(item):
-            self.log.debug("success %s %s" % (self.success_action,
-                                              self.failure_action))
-            action = self.success_action
+            self.log.debug("success %s %s" % (self.pipeline.success_actions,
+                                              self.pipeline.failure_actions))
+            actions = self.pipeline.success_actions
             item.setReportedResult('SUCCESS')
         else:
-            action = self.failure_action
+            actions = self.pipeline.failure_actions
             item.setReportedResult('FAILURE')
         report = self.formatReport(item)
         item.reported = True
         try:
-            self.log.info("Reporting change %s, action: %s" %
-                          (item.change, action))
-            ret = self.pipeline.trigger.report(item.change, report, action)
+            self.log.info("Reporting change %s, actions: %s" %
+                          (item.change, actions))
+            ret = self.sendReport(actions, item.change, report)
             if ret:
                 self.log.error("Reporting change %s received: %s" %
                                (item.change, ret))
@@ -1395,3 +1448,15 @@
         self.log.debug("  Change %s is needed but can not be merged" %
                        change.needs_change)
         return False
+
+    def getFailingDependentItem(self, item):
+        if not hasattr(item.change, 'needs_change'):
+            return None
+        if not item.change.needs_change:
+            return None
+        needs_item = self.getItemForChange(item.change.needs_change)
+        if not needs_item:
+            return None
+        if needs_item.current_build_set.failing_reasons:
+            return needs_item
+        return None
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 65dd7fa..976849c 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -138,22 +138,6 @@
         self.gerrit_connector.stop()
         self.gerrit_connector.join()
 
-    def report(self, change, message, action):
-        self.log.debug("Report change %s, action %s, message: %s" %
-                       (change, action, message))
-        if not change.number:
-            self.log.debug("Change has no number; not reporting")
-            return
-        if not action:
-            self.log.debug("No action specified; not reporting")
-            return
-        changeid = '%s,%s' % (change.number, change.patchset)
-        ref = 'refs/heads/' + change.branch
-        change._ref_sha = self.getRefSha(change.project.name,
-                                         ref)
-        return self.gerrit.review(change.project.name, changeid,
-                                  message, action)
-
     def _getInfoRefs(self, project):
         url = "%s/p/%s/info/refs?service=git-upload-pack" % (
             self.baseurl, project)
@@ -334,6 +318,7 @@
         change.branch = data['branch']
         change.url = data['url']
         max_ps = 0
+        change.files = []
         for ps in data['patchSets']:
             if ps['number'] == change.patchset:
                 change.refspec = ps['ref']
@@ -352,6 +337,7 @@
             # for dependencies.
             return change
 
+        change.needs_change = None
         if 'dependsOn' in data:
             parts = data['dependsOn'][0]['ref'].split('/')
             dep_num, dep_ps = parts[3], parts[4]
@@ -359,6 +345,7 @@
             if not dep.is_merged:
                 change.needs_change = dep
 
+        change.needed_by_changes = []
         if 'neededBy' in data:
             for needed in data['neededBy']:
                 parts = needed['ref'].split('/')
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index e67af01..b75a165 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -40,9 +40,6 @@
     def stop(self):
         self.apsched.shutdown()
 
-    def report(self, change, message, action):
-        raise Exception("Timer trigger does not support reporting.")
-
     def isMerged(self, change, head=None):
         raise Exception("Timer trigger does not support checking if "
                         "a change is merged.")