Merge "Allow pipelines triggers to filter by username"
diff --git a/README.rst b/README.rst
index b66305e..1b227e7 100644
--- a/README.rst
+++ b/README.rst
@@ -6,7 +6,8 @@
 Contributing
 ------------
 
-To get the latest code, see: https://github.com/openstack-infra/zuul
+To browse the latest code, see: https://git.openstack.org/cgit/openstack-infra/zuul/tree/
+To clone the latest code, use `git clone git://git.openstack.org/openstack-infra/zuul`
 
 Bugs are handled at: https://launchpad.net/zuul
 
diff --git a/doc/source/reporters.rst b/doc/source/reporters.rst
index 63a86c4..7c0214d 100644
--- a/doc/source/reporters.rst
+++ b/doc/source/reporters.rst
@@ -42,8 +42,8 @@
 zuul.conf contains the SMTP server and default to/from as describe
 in :ref:`zuulconf`.
 
-Each pipeline can overwrite the to or from address by providing
-alternatives as arguments to the reporter. For example, ::
+Each pipeline can overwrite the subject or the to or from address by
+providing alternatives as arguments to the reporter. For example, ::
 
   pipelines:
     - name: post-merge
@@ -57,3 +57,4 @@
         smtp:
           to: you@example.com
           from: alternative@example.com
+          subject: Change {change} failed
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index a03cc1c..f71df22 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -160,6 +160,16 @@
   This can be overridden by individual pipelines.
   ``default_to=you@example.com``
 
+replication
+"""""""""""
+
+Zuul can push the refs it creates to any number of servers.  To do so,
+list the git push URLs in this section, one per line as follows::
+
+  [replication]
+    url1=ssh://user@host1.example.com:port/path/to/repo
+    url2=ssh://user@host2.example.com:port/path/to/repo
+
 layout.yaml
 ~~~~~~~~~~~
 
diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml
new file mode 100644
index 0000000..9f6748c
--- /dev/null
+++ b/tests/fixtures/layout-rate-limit.yaml
@@ -0,0 +1,32 @@
+pipelines:
+  - name: gate
+    manager: DependentPipelineManager
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    start:
+      gerrit:
+        verified: 0
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    window: 2
+    window-floor: 1
+    window-increase-type: linear
+    window-increase-factor: 1
+    window-decrease-type: exponential
+    window-decrease-factor: 2
+
+projects:
+  - name: org/project
+    gate:
+      - project-merge:
+        - project-test1
+        - project-test2
diff --git a/tests/fixtures/layout-timer-smtp.yaml b/tests/fixtures/layout-timer-smtp.yaml
new file mode 100644
index 0000000..ac59df4
--- /dev/null
+++ b/tests/fixtures/layout-timer-smtp.yaml
@@ -0,0 +1,23 @@
+pipelines:
+  - name: periodic
+    manager: IndependentPipelineManager
+    trigger:
+      timer:
+        - time: '* * * * * */10'
+    success:
+      smtp:
+        to: alternative_me@example.com
+        from: zuul_from@example.com
+        subject: 'Periodic check for {change.project} succeeded'
+
+jobs:
+  - name: project-bitrot-stable-old
+    success-pattern: http://logs.example.com/{job.name}/{build.number}
+  - name: project-bitrot-stable-older
+    success-pattern: http://logs.example.com/{job.name}/{build.number}
+
+projects:
+  - name: org/project
+    periodic:
+      - project-bitrot-stable-old
+      - project-bitrot-stable-older
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 48f2281..0ce0f88 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -690,11 +690,11 @@
 
 class FakeSMTP(object):
     log = logging.getLogger('zuul.FakeSMTP')
-    messages = []
 
-    def __init__(self, server, port):
+    def __init__(self, messages, server, port):
         self.server = server
         self.port = port
+        self.messages = messages
 
     def sendmail(self, from_email, to_email, msg):
         self.log.info("Sending email from %s, to %s, with msg %s" % (
@@ -703,7 +703,7 @@
         headers = msg.split('\n\n', 1)[0]
         body = msg.split('\n\n', 1)[1]
 
-        FakeSMTP.messages.append(dict(
+        self.messages.append(dict(
             from_email=from_email,
             to_email=to_email,
             msg=msg,
@@ -729,7 +729,7 @@
             # If timeout value is invalid do not set a timeout.
             test_timeout = 0
         if test_timeout > 0:
-            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+            self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
 
         if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
             os.environ.get('OS_STDOUT_CAPTURE') == '1'):
@@ -800,8 +800,14 @@
         urllib2.urlopen = URLOpenerFactory
         self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
 
+        self.smtp_messages = []
+
+        def FakeSMTPFactory(*args, **kw):
+            args = [self.smtp_messages] + list(args)
+            return FakeSMTP(*args, **kw)
+
         zuul.lib.gerrit.Gerrit = FakeGerrit
-        self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTP))
+        self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
 
         self.gerrit = FakeGerritTrigger(
             self.upstream_root, self.config, self.sched)
@@ -2914,6 +2920,39 @@
         self.assertEqual(B.data['status'], 'MERGED')
         self.assertEqual(B.reported, 2)
 
+    def test_push_urls(self):
+        "Test that Zuul can push refs to multiple URLs"
+        upstream_path = os.path.join(self.upstream_root, 'org/project')
+        replica1 = os.path.join(self.upstream_root, 'replica1')
+        replica2 = os.path.join(self.upstream_root, 'replica2')
+
+        self.config.add_section('replication')
+        self.config.set('replication', 'url1', 'file://%s' % replica1)
+        self.config.set('replication', 'url2', 'file://%s' % replica2)
+        self.sched.reconfigure(self.config)
+
+        r1 = git.Repo.clone_from(upstream_path, replica1 + '/org/project.git')
+        r2 = git.Repo.clone_from(upstream_path, replica2 + '/org/project.git')
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
+        B.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        count = 0
+        for ref in r1.refs:
+            if ref.path.startswith('refs/zuul'):
+                count += 1
+        self.assertEqual(count, 3)
+
+        count = 0
+        for ref in r2.refs:
+            if ref.path.startswith('refs/zuul'):
+                count += 1
+        self.assertEqual(count, 3)
+
     def test_timer(self):
         "Test that a periodic job is triggered"
         self.worker.hold_jobs_in_build = True
@@ -2972,25 +3011,64 @@
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
         self.waitUntilSettled()
 
-        self.assertEqual(len(FakeSMTP.messages), 2)
+        self.assertEqual(len(self.smtp_messages), 2)
 
         # A.messages only holds what FakeGerrit places in it. Thus we
         # work on the knowledge of what the first message should be as
         # it is only configured to go to SMTP.
 
         self.assertEqual('zuul@example.com',
-                         FakeSMTP.messages[0]['from_email'])
+                         self.smtp_messages[0]['from_email'])
         self.assertEqual(['you@example.com'],
-                         FakeSMTP.messages[0]['to_email'])
+                         self.smtp_messages[0]['to_email'])
         self.assertEqual('Starting check jobs.',
-                         FakeSMTP.messages[0]['body'])
+                         self.smtp_messages[0]['body'])
 
         self.assertEqual('zuul_from@example.com',
-                         FakeSMTP.messages[1]['from_email'])
+                         self.smtp_messages[1]['from_email'])
         self.assertEqual(['alternative_me@example.com'],
-                         FakeSMTP.messages[1]['to_email'])
+                         self.smtp_messages[1]['to_email'])
         self.assertEqual(A.messages[0],
-                         FakeSMTP.messages[1]['body'])
+                         self.smtp_messages[1]['body'])
+
+    def test_timer_smtp(self):
+        "Test that a periodic job is triggered"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-timer-smtp.yaml')
+        self.sched.reconfigure(self.config)
+        self.registerJobs()
+
+        start = time.time()
+        failed = True
+        while ((time.time() - start) < 30):
+            if len(self.history) == 2:
+                failed = False
+                break
+            else:
+                time.sleep(1)
+
+        if failed:
+            raise Exception("Expected jobs never ran")
+
+        self.waitUntilSettled()
+
+        self.assertEqual(self.getJobFromHistory(
+            'project-bitrot-stable-old').result, 'SUCCESS')
+        self.assertEqual(self.getJobFromHistory(
+            'project-bitrot-stable-older').result, 'SUCCESS')
+
+        self.assertEqual(len(self.smtp_messages), 1)
+
+        # A.messages only holds what FakeGerrit places in it. Thus we
+        # work on the knowledge of what the first message should be as
+        # it is only configured to go to SMTP.
+
+        self.assertEqual('zuul_from@example.com',
+                         self.smtp_messages[0]['from_email'])
+        self.assertEqual(['alternative_me@example.com'],
+                         self.smtp_messages[0]['to_email'])
+        self.assertIn('Subject: Periodic check for org/project succeeded',
+                      self.smtp_messages[0]['headers'])
 
     def test_client_enqueue(self):
         "Test that the RPC client can enqueue a change"
@@ -3075,11 +3153,22 @@
 
         self.waitUntilSettled()
 
+        items = self.sched.layout.pipelines['gate'].getAllItems()
+        enqueue_times = {}
+        for item in items:
+            enqueue_times[str(item.change)] = item.enqueue_time
+
         client = zuul.rpcclient.RPCClient('127.0.0.1',
                                           self.gearman_server.port)
         r = client.promote(pipeline='gate',
                            change_ids=['2,1', '3,1'])
 
+        # ensure that enqueue times are durable
+        items = self.sched.layout.pipelines['gate'].getAllItems()
+        for item in items:
+            self.assertEqual(
+                enqueue_times[str(item.change)], item.enqueue_time)
+
         self.worker.release('.*-merge')
         self.waitUntilSettled()
         self.worker.release('.*-merge')
@@ -3212,3 +3301,167 @@
         self.worker.hold_jobs_in_build = False
         self.worker.release()
         self.waitUntilSettled()
+
+    def test_queue_rate_limiting(self):
+        "Test that DependentPipelines are rate limited with dep across window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        C.setDependsOn(B, 1)
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+
+        # Gate is reset and only B's merge job is queued because
+        # window shrunk to 1.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only B's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # B was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(B.data['status'], 'MERGED')
+
+        # Only C is left and its merge job is queued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # After successful merge job the test jobs for C are queued.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C successfully merged so window is bumped to 3.
+        self.assertEqual(queue.window, 3)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')
+
+    def test_queue_rate_limiting_dependent(self):
+        "Test that DependentPipelines are rate limited with dep in window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        B.setDependsOn(A, 1)
+
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(B.data['status'], 'NEW')
+
+        # Gate is reset and only C's merge job is queued because
+        # window shrunk to 1 and A and B were dequeued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only C's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 3a51b1c..c2a9dab 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -30,11 +30,10 @@
 import signal
 import traceback
 
-import gear
-
 # No zuul imports here because they pull in paramiko which must not be
 # imported until after the daemonization.
 # https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
 
 
 def stack_dump_handler(signum, frame):
@@ -149,6 +148,7 @@
         if child_pid == 0:
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
+            import gear
             gear.Server(4730)
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index d4ff143..d464998 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -56,7 +56,15 @@
                                {'timer': toList(timer_trigger)}))
 
     report_actions = {'gerrit': variable_dict,
-                      'smtp': variable_dict}
+                      'smtp': {'to': str,
+                               'from': str,
+                               'subject': str,
+                               },
+                      }
+    window = v.All(int, v.Range(min=1))
+    window_floor = v.All(int, v.Range(min=1))
+    window_type = v.Any('linear', 'exponential')
+    window_factor = v.All(int, v.Range(min=1))
 
     pipeline = {v.Required('name'): str,
                 v.Required('manager'): manager,
@@ -69,6 +77,12 @@
                 'success': report_actions,
                 'failure': report_actions,
                 'start': report_actions,
+                'window': window,
+                'window-floor': window_floor,
+                'window-increase-type': window_type,
+                'window-increase-factor': window_factor,
+                'window-decrease-type': window_type,
+                'window-decrease-factor': window_factor,
                 }
     pipelines = [pipeline]
 
diff --git a/zuul/merger.py b/zuul/merger.py
index 1f3c547..09011ae 100644
--- a/zuul/merger.py
+++ b/zuul/merger.py
@@ -16,6 +16,7 @@
 import os
 import logging
 import model
+import threading
 
 
 class ZuulReference(git.Reference):
@@ -131,6 +132,11 @@
                                                  self.remote_url))
         repo.remotes.origin.push('%s:%s' % (local, remote))
 
+    def push_url(self, url, refspecs):
+        repo = self.createRepoObject()
+        self.log.debug("Pushing %s to %s" % (refspecs, url))
+        repo.git.push([url] + refspecs)
+
     def update(self):
         repo = self.createRepoObject()
         self.log.debug("Updating repository %s" % self.local_path)
@@ -142,7 +148,7 @@
     log = logging.getLogger("zuul.Merger")
 
     def __init__(self, trigger, working_root, push_refs, sshkey, email,
-                 username):
+                 username, replicate_urls):
         self.trigger = trigger
         self.repos = {}
         self.working_root = working_root
@@ -153,6 +159,7 @@
             self._makeSSHWrapper(sshkey)
         self.email = email
         self.username = username
+        self.replicate_urls = replicate_urls
 
     def _makeSSHWrapper(self, key):
         name = os.path.join(self.working_root, '.ssh_wrapper')
@@ -219,6 +226,25 @@
             return False
         return commit
 
+    def replicateRefspecs(self, refspecs):
+        threads = []
+        for url in self.replicate_urls:
+            t = threading.Thread(target=self._replicate,
+                                 args=(url, refspecs))
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()
+
+    def _replicate(self, url, project_refspecs):
+        try:
+            for project, refspecs in project_refspecs.items():
+                repo = self.getRepo(project)
+                repo.push_url(os.path.join(url, project.name + '.git'),
+                              refspecs)
+        except Exception:
+            self.log.exception("Exception pushing to %s" % url)
+
     def mergeChanges(self, items, target_ref=None):
         # Merge shortcuts:
         # if this is the only change just merge it against its branch.
@@ -257,6 +283,7 @@
             return commit
 
         project_branches = []
+        replicate_refspecs = {}
         for i in reversed(items):
             # Here we create all of the necessary zuul refs and potentially
             # push them back to Gerrit.
@@ -276,10 +303,13 @@
                     self.log.exception("Unable to set zuul ref %s for "
                                        "change %s" % (zuul_ref, i.change))
                     return False
+            ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
+            refspecs = replicate_refspecs.get(i.change.project, [])
+            refspecs.append('%s:%s' % (ref, ref))
+            replicate_refspecs[i.change.project] = refspecs
             if self.push_refs:
                 # Push the results upstream to the zuul ref after
                 # they are created.
-                ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
                 try:
                     repo.push(ref, ref)
                     complete = self.trigger.waitForRefSha(i.change.project,
@@ -291,5 +321,5 @@
                     self.log.error("Ref %s did not show up in repo" % ref)
                     return False
             project_branches.append((i.change.project, i.change.branch))
-
+        self.replicateRefspecs(replicate_refspecs)
         return commit
diff --git a/zuul/model.py b/zuul/model.py
index e62feed..a2573f7 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -59,6 +59,12 @@
         self.start_actions = None
         self.success_actions = None
         self.failure_actions = None
+        self.window = None
+        self.window_floor = None
+        self.window_increase_type = None
+        self.window_increase_factor = None
+        self.window_decrease_type = None
+        self.window_decrease_factor = None
 
     def __repr__(self):
         return '<Pipeline %s>' % self.name
@@ -375,13 +381,21 @@
     a queue shared by interrelated projects foo and bar, and a second
     queue for independent project baz.  Pipelines have one or more
     PipelineQueues."""
-    def __init__(self, pipeline, dependent=True):
+    def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
+                 window_increase_type='linear', window_increase_factor=1,
+                 window_decrease_type='exponential', window_decrease_factor=2):
         self.pipeline = pipeline
         self.name = ''
         self.projects = []
         self._jobs = set()
         self.queue = []
         self.dependent = dependent
+        self.window = window
+        self.window_floor = window_floor
+        self.window_increase_type = window_increase_type
+        self.window_increase_factor = window_increase_factor
+        self.window_decrease_type = window_decrease_type
+        self.window_decrease_factor = window_decrease_factor
 
     def __repr__(self):
         return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
@@ -398,7 +412,7 @@
             self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
 
     def enqueueChange(self, change):
-        item = QueueItem(self.pipeline, change)
+        item = QueueItem(self, self.pipeline, change)
         self.enqueueItem(item)
         item.enqueue_time = time.time()
         return item
@@ -444,6 +458,32 @@
     def mergeChangeQueue(self, other):
         for project in other.projects:
             self.addProject(project)
+        self.window = min(self.window, other.window)
+        # TODO merge semantics
+
+    def getActionableItems(self):
+        if self.dependent and self.window:
+            return self.queue[:self.window]
+        else:
+            return self.queue[:]
+
+    def increaseWindowSize(self):
+        if self.dependent:
+            if self.window_increase_type == 'linear':
+                self.window += self.window_increase_factor
+            elif self.window_increase_type == 'exponential':
+                self.window *= self.window_increase_factor
+
+    def decreaseWindowSize(self):
+        if self.dependent:
+            if self.window_decrease_type == 'linear':
+                self.window = max(
+                    self.window_floor,
+                    self.window - self.window_decrease_factor)
+            elif self.window_decrease_type == 'exponential':
+                self.window = max(
+                    self.window_floor,
+                    self.window / self.window_decrease_factor)
 
 
 class Project(object):
@@ -619,7 +659,8 @@
 class QueueItem(object):
     """A changish inside of a Pipeline queue"""
 
-    def __init__(self, pipeline, change):
+    def __init__(self, change_queue, pipeline, change):
+        self.change_queue = change_queue
         self.pipeline = pipeline
         self.change = change  # a changeish
         self.build_sets = []
@@ -661,7 +702,6 @@
 
 class Changeish(object):
     """Something like a change; either a change or a ref"""
-    is_reportable = False
 
     def __init__(self, project):
         self.project = project
@@ -680,8 +720,6 @@
 
 
 class Change(Changeish):
-    is_reportable = True
-
     def __init__(self, project):
         super(Change, self).__init__(project)
         self.branch = None
@@ -729,8 +767,6 @@
 
 
 class Ref(Changeish):
-    is_reportable = False
-
     def __init__(self, project):
         super(Ref, self).__init__(project)
         self.ref = None
@@ -767,7 +803,8 @@
 
 
 class NullChange(Changeish):
-    is_reportable = False
+    def __repr__(self):
+        return '<NullChange for %s>' % (self.project)
 
     def _id(self):
         return None
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
index 66dcd45..b214019 100644
--- a/zuul/reporter/smtp.py
+++ b/zuul/reporter/smtp.py
@@ -46,7 +46,11 @@
         to_email = params['to']\
             if 'to' in params else self.smtp_default_to
         msg = MIMEText(message)
-        msg['Subject'] = "Report change %s" % change
+        if 'subject' in params:
+            subject = params['subject'].format(change=change)
+        else:
+            subject = "Report for change %s" % change
+        msg['Subject'] = subject
         msg['From'] = from_email
         msg['To'] = to_email
 
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index ea25948..a2d1d34 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
 import model
 from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
 import merger
+from zuul import version as zuul_version
 
 statsd = extras.try_import('statsd.statsd')
 
@@ -130,6 +131,8 @@
         self.management_event_queue = Queue.Queue()
         self.layout = model.Layout()
 
+        self.zuul_version = zuul_version.version_info.version_string()
+
     def stop(self):
         self._stopped = True
         self.wake_event.set()
@@ -197,6 +200,17 @@
             pipeline.success_actions = action_reporters['success']
             pipeline.failure_actions = action_reporters['failure']
 
+            pipeline.window = conf_pipeline.get('window', 20)
+            pipeline.window_floor = conf_pipeline.get('window-floor', 3)
+            pipeline.window_increase_type = conf_pipeline.get(
+                'window-increase-type', 'linear')
+            pipeline.window_increase_factor = conf_pipeline.get(
+                'window-increase-factor', 1)
+            pipeline.window_decrease_type = conf_pipeline.get(
+                'window-decrease-type', 'exponential')
+            pipeline.window_decrease_factor = conf_pipeline.get(
+                'window-decrease-factor', 2)
+
             manager = globals()[conf_pipeline['manager']](self, pipeline)
             pipeline.setManager(manager)
             layout.pipelines[conf_pipeline['name']] = pipeline
@@ -355,6 +369,11 @@
         else:
             push_refs = False
 
+        replicate_urls = []
+        if self.config.has_section('replication'):
+            for k, v in self.config.items('replication'):
+                replicate_urls.append(v)
+
         if self.config.has_option('gerrit', 'sshkey'):
             sshkey = self.config.get('gerrit', 'sshkey')
         else:
@@ -365,7 +384,8 @@
         # location.
         self.merger = merger.Merger(self.triggers['gerrit'],
                                     merge_root, push_refs,
-                                    sshkey, merge_email, merge_name)
+                                    sshkey, merge_email, merge_name,
+                                    replicate_urls)
         for project in self.layout.projects.values():
             url = self.triggers['gerrit'].getGitUrl(project)
             self.merger.addProject(project, url)
@@ -604,7 +624,10 @@
             pipeline.manager.cancelJobs(item)
             pipeline.manager.dequeueItem(item)
         for item in items_to_enqueue:
-            pipeline.manager.addChange(item.change, quiet=True)
+            pipeline.manager.addChange(
+                item.change,
+                enqueue_time=item.enqueue_time,
+                quiet=True)
         while pipeline.manager.processQueue():
             pass
 
@@ -763,6 +786,9 @@
 
     def formatStatusJSON(self):
         data = {}
+
+        data['zuul_version'] = self.zuul_version
+
         if self._pause:
             ret = '<p><b>Queue only mode:</b> preparing to '
             if self._exit:
@@ -895,10 +921,6 @@
         """
         report_errors = []
         if len(action_reporters) > 0:
-            if not change.number:
-                self.log.debug("Not reporting change %s: No number present."
-                               % change)
-                return
             for action_reporter in action_reporters:
                 ret = action_reporter.report(change, message)
                 if ret:
@@ -967,7 +989,7 @@
                            item.change.project)
             return False
 
-    def addChange(self, change, quiet=False):
+    def addChange(self, change, quiet=False, enqueue_time=None):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
             self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -994,6 +1016,8 @@
                 if len(self.pipeline.start_actions) > 0:
                     self.reportStart(change)
             item = change_queue.enqueueChange(change)
+            if enqueue_time:
+                item.enqueue_time = enqueue_time
             self.reportStats(item)
             self.enqueueChangesBehind(change, quiet)
         else:
@@ -1161,7 +1185,7 @@
         for queue in self.pipeline.queues:
             queue_changed = False
             nnfi = None  # Nearest non-failing item
-            for item in queue.queue[:]:
+            for item in queue.getActionableItems():
                 item_changed, nnfi = self._processOneItem(item, nnfi)
                 if item_changed:
                     queue_changed = True
@@ -1222,7 +1246,7 @@
         return True
 
     def reportItem(self, item):
-        if item.change.is_reportable and item.reported:
+        if item.reported:
             raise Exception("Already reported change %s" % item.change)
         ret = self._reportItem(item)
         if self.changes_merge:
@@ -1236,12 +1260,19 @@
             if not (succeeded and merged):
                 self.log.debug("Reported change %s failed tests or failed "
                                "to merge" % (item.change))
+                item.change_queue.decreaseWindowSize()
+                self.log.debug("%s window size decreased to %s" %
+                               (item.change_queue,
+                                item.change_queue.window))
                 raise MergeFailure("Change %s failed to merge" % item.change)
+            else:
+                item.change_queue.increaseWindowSize()
+                self.log.debug("%s window size increased to %s" %
+                               (item.change_queue,
+                                item.change_queue.window))
 
     def _reportItem(self, item):
-        if not item.change.is_reportable:
-            return False
-        if item.change.is_reportable and item.reported:
+        if item.reported:
             return 0
         self.log.debug("Reporting change %s" % item.change)
         ret = True  # Means error as returned by trigger.report
@@ -1253,18 +1284,19 @@
         else:
             actions = self.pipeline.failure_actions
             item.setReportedResult('FAILURE')
-        report = self.formatReport(item)
         item.reported = True
-        try:
-            self.log.info("Reporting change %s, actions: %s" %
-                          (item.change, actions))
-            ret = self.sendReport(actions, item.change, report)
-            if ret:
-                self.log.error("Reporting change %s received: %s" %
-                               (item.change, ret))
-        except:
-            self.log.exception("Exception while reporting:")
-            item.setReportedResult('ERROR')
+        if actions:
+            report = self.formatReport(item)
+            try:
+                self.log.info("Reporting change %s, actions: %s" %
+                              (item.change, actions))
+                ret = self.sendReport(actions, item.change, report)
+                if ret:
+                    self.log.error("Reporting change %s received: %s" %
+                                   (item.change, ret))
+            except:
+                self.log.exception("Exception while reporting:")
+                item.setReportedResult('ERROR')
         self.updateBuildDescriptions(item.current_build_set)
         return ret
 
@@ -1494,7 +1526,14 @@
         change_queues = []
 
         for project in self.pipeline.getProjects():
-            change_queue = ChangeQueue(self.pipeline)
+            change_queue = ChangeQueue(
+                self.pipeline,
+                window=self.pipeline.window,
+                window_floor=self.pipeline.window_floor,
+                window_increase_type=self.pipeline.window_increase_type,
+                window_increase_factor=self.pipeline.window_increase_factor,
+                window_decrease_type=self.pipeline.window_decrease_type,
+                window_decrease_factor=self.pipeline.window_decrease_factor)
             change_queue.addProject(project)
             change_queues.append(change_queue)
             self.log.debug("Created queue: %s" % change_queue)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index b75a165..f055a50 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -28,11 +28,12 @@
         self.apsched = apscheduler.scheduler.Scheduler()
         self.apsched.start()
 
-    def _onTrigger(self, timespec):
+    def _onTrigger(self, pipeline_name, timespec):
         for project in self.sched.layout.projects.values():
             event = TriggerEvent()
             event.type = 'timer'
             event.timespec = timespec
+            event.forced_pipeline = pipeline_name
             event.project_name = project.name
             self.log.debug("Adding event %s" % event)
             self.sched.addEvent(event)
@@ -78,7 +79,8 @@
                                               hour=hour,
                                               minute=minute,
                                               second=second,
-                                              args=(timespec,))
+                                              args=(pipeline.name,
+                                                    timespec,))
 
     def getChange(self, number, patchset, refresh=False):
         raise Exception("Timer trigger does not support changes.")