Merge "Collect and report last reconfigured timestamp"
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 1a94660..b4adc4d 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -333,6 +333,13 @@
     greedy matchers and to escapes dots!
     Example: ``email_filter: ^.*?@example\.org$``.
 
+    *username_filter*
+    This is used for any event.  It takes a regex applied on the performer
+    username, i.e. Gerrit account name.  If you want to specify several
+    username filters, you must use a YAML list.  Make sure to use non greedy
+    matchers and to escapes dots!
+    Example: ``username_filter: ^jenkins$``.
+
     *comment_filter*
     This is only used for ``comment-added`` events.  It accepts a list of
     regexes that are searched for in the comment string. If any of these
@@ -341,6 +348,35 @@
     containing 'retrigger' somewhere in the comment text are added to a
     change.
 
+    *require-approval*
+    This may be used for any event.  It requires that a certain kind
+    of approval be present for the current patchset of the change (the
+    approval could be added by the event in question).  It takes
+    several sub-parameters, all of which are optional and are combined
+    together so that there must be an approval matching all specified
+    requirements.
+
+      *username*
+      If present, an approval from this username is required.
+
+      *email-filter*
+      If present, an approval with this email address is required.  It
+      is treated as a regular expression as above.
+
+      *older-than*
+      If present, the approval must be older than this amount of time
+      to match.  Provide a time interval as a number with a suffix of
+      "w" (weeks), "d" (days), "h" (hours), "m" (minutes), "s"
+      (seconds).  Example "48h" or "2d".
+
+      *newer-than*
+      If present, the approval must be newer than this amount of time
+      to match.  Same format as "older-than".
+
+      Any other field is interpreted as a review category and value
+      pair.  For example "verified: 1" would require that the approval
+      be for a +1 vote in the "Verified" column.
+
   **timer**
     This trigger will run based on a cron-style time specification.
     It will enqueue an event into its pipeline for every project
diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml
new file mode 100644
index 0000000..9f6748c
--- /dev/null
+++ b/tests/fixtures/layout-rate-limit.yaml
@@ -0,0 +1,32 @@
+pipelines:
+  - name: gate
+    manager: DependentPipelineManager
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    start:
+      gerrit:
+        verified: 0
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    window: 2
+    window-floor: 1
+    window-increase-type: linear
+    window-increase-factor: 1
+    window-decrease-type: exponential
+    window-decrease-factor: 2
+
+projects:
+  - name: org/project
+    gate:
+      - project-merge:
+        - project-test1
+        - project-test2
diff --git a/tests/fixtures/layout-require-approval.yaml b/tests/fixtures/layout-require-approval.yaml
new file mode 100644
index 0000000..18eee99
--- /dev/null
+++ b/tests/fixtures/layout-require-approval.yaml
@@ -0,0 +1,58 @@
+includes:
+  - python-file: custom_functions.py
+
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: patchset-created
+        - event: comment-added
+          require-approval:
+            - email-filter: jenkins@example.com
+              older-than: 48h
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+  - name: gate
+    manager: DependentPipelineManager
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          require-approval:
+            - verified: 1
+              username: jenkins
+              newer-than: 48h
+          approval:
+            - approved: 1
+        - event: comment-added
+          require-approval:
+            - verified: 1
+              username: jenkins
+              newer-than: 48h
+          approval:
+            - verified: 1
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    start:
+      gerrit:
+        verified: 0
+    precedence: high
+
+projects:
+  - name: org/project
+    merge-mode: cherry-pick
+    check:
+      - project-check
+    gate:
+      - project-gate
diff --git a/tests/fixtures/layouts/good_require_approvals.yaml b/tests/fixtures/layouts/good_require_approvals.yaml
new file mode 100644
index 0000000..b7993b0
--- /dev/null
+++ b/tests/fixtures/layouts/good_require_approvals.yaml
@@ -0,0 +1,36 @@
+includes:
+  - python-file: custom_functions.py
+
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: comment-added
+          require-approval:
+            - username: jenkins
+              older-than: 48h
+        - event: comment-added
+          require-approval:
+            - email-filter: jenkins@example.com
+              newer-than: 48h
+        - event: comment-added
+          require-approval:
+            - approved: 1
+        - event: comment-added
+          require-approval:
+            - approved: 1
+              username: jenkins
+              email-filter: jenkins@example.com
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+projects:
+  - name: org/project
+    merge-mode: cherry-pick
+    check:
+      - project-check
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 8d975aa..9787ae1 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -214,10 +214,21 @@
                  "reason": ""}
         return event
 
-    def addApproval(self, category, value):
+    def addApproval(self, category, value, username='jenkins',
+                    granted_on=None):
+        if not granted_on:
+            granted_on = time.time()
         approval = {'description': self.categories[category][0],
                     'type': category,
-                    'value': str(value)}
+                    'value': str(value),
+                    'by': {
+                        'username': username,
+                        'email': username + '@example.com',
+                    },
+                    'grantedOn': int(granted_on)}
+        for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
+            if x['by']['username'] == username and x['type'] == category:
+                del self.patchsets[-1]['approvals'][i]
         self.patchsets[-1]['approvals'].append(approval)
         event = {'approvals': [approval],
                  'author': {'email': 'user@example.com',
@@ -2658,6 +2669,95 @@
         self.assertEqual(D.data['status'], 'MERGED')
         self.assertEqual(D.reported, 2)
 
+    def test_required_approval_check_and_gate(self):
+        "Test required-approval triggers both check and gate"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-require-approval.yaml')
+        self.sched.reconfigure(self.config)
+        self.registerJobs()
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        # Add a too-old +1
+        A.addApproval('VRFY', 1, granted_on=time.time() - 72 * 60 * 60)
+
+        aprv = A.addApproval('APRV', 1)
+        self.fake_gerrit.addEvent(aprv)
+        self.waitUntilSettled()
+        # Should have run a check job
+        self.assertEqual(len(self.history), 1)
+        self.assertEqual(self.history[0].name, 'project-check')
+
+        # Report the result of that check job (overrides previous vrfy)
+        # Skynet alert: this should trigger a gate job now that
+        # all reqs are met
+        self.fake_gerrit.addEvent(A.addApproval('VRFY', 1))
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 2)
+        self.assertEqual(self.history[1].name, 'project-gate')
+
+    def test_required_approval_newer(self):
+        "Test required-approval newer trigger parameter"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-require-approval.yaml')
+        self.sched.reconfigure(self.config)
+        self.registerJobs()
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        aprv = A.addApproval('APRV', 1)
+        self.fake_gerrit.addEvent(aprv)
+        self.waitUntilSettled()
+        # No +1 from Jenkins so should not be enqueued
+        self.assertEqual(len(self.history), 0)
+
+        # Add a too-old +1, should trigger check but not gate
+        A.addApproval('VRFY', 1, granted_on=time.time() - 72 * 60 * 60)
+        self.fake_gerrit.addEvent(aprv)
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 1)
+        self.assertEqual(self.history[0].name, 'project-check')
+
+        # Add a recent +1
+        self.fake_gerrit.addEvent(A.addApproval('VRFY', 1))
+        self.fake_gerrit.addEvent(aprv)
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 2)
+        self.assertEqual(self.history[1].name, 'project-gate')
+
+    def test_required_approval_older(self):
+        "Test required-approval older trigger parameter"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-require-approval.yaml')
+        self.sched.reconfigure(self.config)
+        self.registerJobs()
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        crvw = A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(crvw)
+        self.waitUntilSettled()
+        # No +1 from Jenkins so should not be enqueued
+        self.assertEqual(len(self.history), 0)
+
+        # Add an old +1 and trigger check with a comment
+        A.addApproval('VRFY', 1, granted_on=time.time() - 72 * 60 * 60)
+        self.fake_gerrit.addEvent(crvw)
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 1)
+        self.assertEqual(self.history[0].name, 'project-check')
+
+        # Add a recent +1 and make sure nothing changes
+        A.addApproval('VRFY', 1)
+        self.fake_gerrit.addEvent(crvw)
+        self.waitUntilSettled()
+        self.assertEqual(len(self.history), 1)
+
+        # The last thing we did was query a change then do nothing
+        # with a pipeline, so it will be in the cache; clean it up so
+        # it does not fail the test.
+        for pipeline in self.sched.layout.pipelines.values():
+            pipeline.trigger.maintainCache([])
+
     def test_rerun_on_error(self):
         "Test that if a worker fails to run a job, it is run again"
         self.worker.hold_jobs_in_build = True
@@ -2781,8 +2881,13 @@
         status_jobs = set()
         for p in data['pipelines']:
             for q in p['change_queues']:
+                if q['dependent']:
+                    self.assertEqual(q['window'], 20)
+                else:
+                    self.assertEqual(q['window'], 0)
                 for head in q['heads']:
                     for change in head:
+                        self.assertTrue(change['active'])
                         self.assertEqual(change['id'], '1,1')
                         for job in change['jobs']:
                             status_jobs.add(job['name'])
@@ -3153,11 +3258,22 @@
 
         self.waitUntilSettled()
 
+        items = self.sched.layout.pipelines['gate'].getAllItems()
+        enqueue_times = {}
+        for item in items:
+            enqueue_times[str(item.change)] = item.enqueue_time
+
         client = zuul.rpcclient.RPCClient('127.0.0.1',
                                           self.gearman_server.port)
         r = client.promote(pipeline='gate',
                            change_ids=['2,1', '3,1'])
 
+        # ensure that enqueue times are durable
+        items = self.sched.layout.pipelines['gate'].getAllItems()
+        for item in items:
+            self.assertEqual(
+                enqueue_times[str(item.change)], item.enqueue_time)
+
         self.worker.release('.*-merge')
         self.waitUntilSettled()
         self.worker.release('.*-merge')
@@ -3290,3 +3406,167 @@
         self.worker.hold_jobs_in_build = False
         self.worker.release()
         self.waitUntilSettled()
+
+    def test_queue_rate_limiting(self):
+        "Test that DependentPipelines are rate limited with dep across window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        C.setDependsOn(B, 1)
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+
+        # Gate is reset and only B's merge job is queued because
+        # window shrunk to 1.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only B's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # B was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(B.data['status'], 'MERGED')
+
+        # Only C is left and its merge job is queued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # After successful merge job the test jobs for C are queued.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C successfully merged so window is bumped to 3.
+        self.assertEqual(queue.window, 3)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')
+
+    def test_queue_rate_limiting_dependent(self):
+        "Test that DependentPipelines are rate limited with dep in window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        B.setDependsOn(A, 1)
+
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(B.data['status'], 'NEW')
+
+        # Gate is reset and only C's merge job is queued because
+        # window shrunk to 1 and A and B were dequeued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only C's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 1c1a670..48aab03 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -35,6 +35,12 @@
 
     variable_dict = v.Schema({}, extra=True)
 
+    require_approval = v.Schema({'username': str,
+                                 'email-filter': str,
+                                 'older-than': str,
+                                 'newer-than': str,
+                                 }, extra=True)
+
     gerrit_trigger = {v.Required('event'):
                       toList(v.Any('patchset-created',
                                    'change-abandoned',
@@ -44,9 +50,11 @@
                                    'ref-updated')),
                       'comment_filter': toList(str),
                       'email_filter': toList(str),
+                      'username_filter': toList(str),
                       'branch': toList(str),
                       'ref': toList(str),
                       'approval': toList(variable_dict),
+                      'require-approval': toList(require_approval),
                       }
 
     timer_trigger = {v.Required('time'): str}
@@ -60,6 +68,10 @@
                                'subject': str,
                                },
                       }
+    window = v.All(int, v.Range(min=0))
+    window_floor = v.All(int, v.Range(min=1))
+    window_type = v.Any('linear', 'exponential')
+    window_factor = v.All(int, v.Range(min=1))
 
     pipeline = {v.Required('name'): str,
                 v.Required('manager'): manager,
@@ -72,6 +84,12 @@
                 'success': report_actions,
                 'failure': report_actions,
                 'start': report_actions,
+                'window': window,
+                'window-floor': window_floor,
+                'window-increase-type': window_type,
+                'window-increase-factor': window_factor,
+                'window-decrease-type': window_type,
+                'window-decrease-factor': window_factor,
                 }
     pipelines = [pipeline]
 
diff --git a/zuul/model.py b/zuul/model.py
index 3bc284e..5da9cef 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -43,6 +43,20 @@
 }
 
 
+def time_to_seconds(s):
+    if s.endswith('s'):
+        return int(s[:-1])
+    if s.endswith('m'):
+        return int(s[:-1]) * 60
+    if s.endswith('h'):
+        return int(s[:-1]) * 60 * 60
+    if s.endswith('d'):
+        return int(s[:-1]) * 24 * 60 * 60
+    if s.endswith('w'):
+        return int(s[:-1]) * 7 * 24 * 60 * 60
+    raise Exception("Unable to parse time value: %s" % s)
+
+
 class Pipeline(object):
     """A top-level pipeline such as check, gate, post, etc."""
     def __init__(self, name):
@@ -59,6 +73,12 @@
         self.start_actions = None
         self.success_actions = None
         self.failure_actions = None
+        self.window = None
+        self.window_floor = None
+        self.window_increase_type = None
+        self.window_increase_factor = None
+        self.window_decrease_type = None
+        self.window_decrease_factor = None
 
     def __repr__(self):
         return '<Pipeline %s>' % self.name
@@ -234,6 +254,8 @@
             j_queue = dict(name=queue.name)
             j_queues.append(j_queue)
             j_queue['heads'] = []
+            j_queue['window'] = queue.window
+            j_queue['dependent'] = queue.dependent
 
             j_changes = []
             for e in queue.queue:
@@ -293,6 +315,7 @@
     def formatItemJSON(self, item):
         changeish = item.change
         ret = {}
+        ret['active'] = item.active
         if hasattr(changeish, 'url') and changeish.url is not None:
             ret['url'] = changeish.url
         else:
@@ -375,13 +398,21 @@
     a queue shared by interrelated projects foo and bar, and a second
     queue for independent project baz.  Pipelines have one or more
     PipelineQueues."""
-    def __init__(self, pipeline, dependent=True):
+    def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
+                 window_increase_type='linear', window_increase_factor=1,
+                 window_decrease_type='exponential', window_decrease_factor=2):
         self.pipeline = pipeline
         self.name = ''
         self.projects = []
         self._jobs = set()
         self.queue = []
         self.dependent = dependent
+        self.window = window
+        self.window_floor = window_floor
+        self.window_increase_type = window_increase_type
+        self.window_increase_factor = window_increase_factor
+        self.window_decrease_type = window_decrease_type
+        self.window_decrease_factor = window_decrease_factor
 
     def __repr__(self):
         return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
@@ -404,6 +435,7 @@
         return item
 
     def enqueueItem(self, item):
+        item.pipeline = self.pipeline
         if self.dependent and self.queue:
             item.item_ahead = self.queue[-1]
             item.item_ahead.items_behind.append(item)
@@ -444,6 +476,32 @@
     def mergeChangeQueue(self, other):
         for project in other.projects:
             self.addProject(project)
+        self.window = min(self.window, other.window)
+        # TODO merge semantics
+
+    def isActionable(self, item):
+        if self.dependent and self.window:
+            return item in self.queue[:self.window]
+        else:
+            return True
+
+    def increaseWindowSize(self):
+        if self.dependent:
+            if self.window_increase_type == 'linear':
+                self.window += self.window_increase_factor
+            elif self.window_increase_type == 'exponential':
+                self.window *= self.window_increase_factor
+
+    def decreaseWindowSize(self):
+        if self.dependent:
+            if self.window_decrease_type == 'linear':
+                self.window = max(
+                    self.window_floor,
+                    self.window - self.window_decrease_factor)
+            elif self.window_decrease_type == 'exponential':
+                self.window = max(
+                    self.window_floor,
+                    self.window / self.window_decrease_factor)
 
 
 class Project(object):
@@ -631,6 +689,7 @@
         self.enqueue_time = None
         self.dequeue_time = None
         self.reported = False
+        self.active = False
 
     def __repr__(self):
         if self.pipeline:
@@ -694,6 +753,7 @@
         self.can_merge = False
         self.is_merged = False
         self.failed_to_merge = False
+        self.approvals = []
 
     def _id(self):
         return '%s,%s' % (self.number, self.patchset)
@@ -832,21 +892,33 @@
 
 
 class EventFilter(object):
-    def __init__(self, types=[], branches=[], refs=[], approvals={},
-                 comment_filters=[], email_filters=[], timespecs=[]):
+    def __init__(self, types=[], branches=[], refs=[], event_approvals={},
+                 comment_filters=[], email_filters=[], username_filters=[],
+                 timespecs=[], require_approvals=[]):
         self._types = types
         self._branches = branches
         self._refs = refs
         self._comment_filters = comment_filters
         self._email_filters = email_filters
+        self._username_filters = username_filters
         self.types = [re.compile(x) for x in types]
         self.branches = [re.compile(x) for x in branches]
         self.refs = [re.compile(x) for x in refs]
         self.comment_filters = [re.compile(x) for x in comment_filters]
         self.email_filters = [re.compile(x) for x in email_filters]
-        self.approvals = approvals
+        self.username_filters = [re.compile(x) for x in username_filters]
+        self.event_approvals = event_approvals
+        self.require_approvals = require_approvals
         self.timespecs = timespecs
 
+        for a in self.require_approvals:
+            if 'older-than' in a:
+                a['older-than'] = time_to_seconds(a['older-than'])
+            if 'newer-than' in a:
+                a['newer-than'] = time_to_seconds(a['newer-than'])
+            if 'email-filter' in a:
+                a['email-filter'] = re.compile(a['email-filter'])
+
     def __repr__(self):
         ret = '<EventFilter'
 
@@ -856,20 +928,22 @@
             ret += ' branches: %s' % ', '.join(self._branches)
         if self._refs:
             ret += ' refs: %s' % ', '.join(self._refs)
-        if self.approvals:
-            ret += ' approvals: %s' % ', '.join(
-                ['%s:%s' % a for a in self.approvals.items()])
+        if self.event_approvals:
+            ret += ' event_approvals: %s' % ', '.join(
+                ['%s:%s' % a for a in self.event_approvals.items()])
         if self._comment_filters:
             ret += ' comment_filters: %s' % ', '.join(self._comment_filters)
         if self._email_filters:
             ret += ' email_filters: %s' % ', '.join(self._email_filters)
+        if self._username_filters:
+            ret += ' username_filters: %s' % ', '.join(self._username_filters)
         if self.timespecs:
             ret += ' timespecs: %s' % ', '.join(self.timespecs)
         ret += '>'
 
         return ret
 
-    def matches(self, event):
+    def matches(self, event, change):
         def normalizeCategory(name):
             name = name.lower()
             return re.sub(' ', '-', name)
@@ -920,8 +994,18 @@
             if self.email_filters and not matches_email_filter:
                 return False
 
+            # username_filters are ORed
+            account_username = event.account.get('username')
+            matches_username_filter = False
+            for username_filter in self.username_filters:
+                if (account_username is not None and
+                    username_filter.search(account_username)):
+                    matches_username_filter = True
+            if self.username_filters and not matches_username_filter:
+                return False
+
         # approvals are ANDed
-        for category, value in self.approvals.items():
+        for category, value in self.event_approvals.items():
             matches_approval = False
             for eapproval in event.approvals:
                 if (normalizeCategory(eapproval['description']) == category and
@@ -930,6 +1014,41 @@
             if not matches_approval:
                 return False
 
+        if self.require_approvals and not change.approvals:
+            # A change with no approvals can not match
+            return False
+
+        now = time.time()
+        for rapproval in self.require_approvals:
+            matches_approval = False
+            for approval in change.approvals:
+                found_approval = True
+                by = approval.get('by', {})
+                for k, v in rapproval.items():
+                    if k == 'username':
+                        if (by.get('username', '') != v):
+                            found_approval = False
+                    elif k == 'email-filter':
+                        if (not v.search(by.get('email', ''))):
+                            found_approval = False
+                    elif k == 'newer-than':
+                        t = now - v
+                        if (approval['grantedOn'] < t):
+                            found_approval = False
+                    elif k == 'older-than':
+                        t = now - v
+                        if (approval['grantedOn'] >= t):
+                            found_approval = False
+                    else:
+                        if (normalizeCategory(approval['description']) != k or
+                            int(approval['value']) != v):
+                            found_approval = False
+                if found_approval:
+                    matches_approval = True
+                    break
+            if not matches_approval:
+                return False
+
         # timespecs are ORed
         matches_timespec = False
         for timespec in self.timespecs:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8f4d48f..9186ff3 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -201,6 +201,17 @@
             pipeline.success_actions = action_reporters['success']
             pipeline.failure_actions = action_reporters['failure']
 
+            pipeline.window = conf_pipeline.get('window', 20)
+            pipeline.window_floor = conf_pipeline.get('window-floor', 3)
+            pipeline.window_increase_type = conf_pipeline.get(
+                'window-increase-type', 'linear')
+            pipeline.window_increase_factor = conf_pipeline.get(
+                'window-increase-factor', 1)
+            pipeline.window_decrease_type = conf_pipeline.get(
+                'window-decrease-type', 'exponential')
+            pipeline.window_decrease_factor = conf_pipeline.get(
+                'window-decrease-factor', 2)
+
             manager = globals()[conf_pipeline['manager']](self, pipeline)
             pipeline.setManager(manager)
             layout.pipelines[conf_pipeline['name']] = pipeline
@@ -217,11 +228,15 @@
                     f = EventFilter(types=toList(trigger['event']),
                                     branches=toList(trigger.get('branch')),
                                     refs=toList(trigger.get('ref')),
-                                    approvals=approvals,
+                                    event_approvals=approvals,
                                     comment_filters=
                                     toList(trigger.get('comment_filter')),
                                     email_filters=
-                                    toList(trigger.get('email_filter')))
+                                    toList(trigger.get('email_filter')),
+                                    username_filters=
+                                    toList(trigger.get('username_filter')),
+                                    require_approvals=
+                                    toList(trigger.get('require-approval')))
                     manager.event_filters.append(f)
             elif 'timer' in conf_pipeline['trigger']:
                 pipeline.trigger = self.triggers['timer']
@@ -613,7 +628,10 @@
             pipeline.manager.cancelJobs(item)
             pipeline.manager.dequeueItem(item)
         for item in items_to_enqueue:
-            pipeline.manager.addChange(item.change, quiet=True)
+            pipeline.manager.addChange(
+                item.change,
+                enqueue_time=item.enqueue_time,
+                quiet=True)
         while pipeline.manager.processQueue():
             pass
 
@@ -709,7 +727,7 @@
                                      self.triggers.get(event.trigger_name))
             if event.type == 'patchset-created':
                 pipeline.manager.removeOldVersionsOfChange(change)
-            if pipeline.manager.eventMatches(event):
+            if pipeline.manager.eventMatches(event, change):
                 self.log.info("Adding %s, %s to %s" %
                               (project, change, pipeline))
                 pipeline.manager.addChange(change)
@@ -873,14 +891,14 @@
             allow_needs.update(action_reporter.getSubmitAllowNeeds())
         return allow_needs
 
-    def eventMatches(self, event):
+    def eventMatches(self, event, change):
         if event.forced_pipeline:
             if event.forced_pipeline == self.pipeline.name:
                 return True
             else:
                 return False
         for ef in self.event_filters:
-            if ef.matches(event):
+            if ef.matches(event, change):
                 return True
         return False
 
@@ -981,7 +999,7 @@
                            item.change.project)
             return False
 
-    def addChange(self, change, quiet=False):
+    def addChange(self, change, quiet=False, enqueue_time=None):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
             self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -1008,6 +1026,8 @@
                 if len(self.pipeline.start_actions) > 0:
                     self.reportStart(change)
             item = change_queue.enqueueChange(change)
+            if enqueue_time:
+                item.enqueue_time = enqueue_time
             self.reportStats(item)
             self.enqueueChangesBehind(change, quiet)
         else:
@@ -1123,6 +1143,8 @@
                 pass
             return (True, nnfi)
         dep_item = self.getFailingDependentItem(item)
+        actionable = change_queue.isActionable(item)
+        item.active = actionable
         if dep_item:
             failing_reasons.append('a needed change is failing')
             self.cancelJobs(item, prime=False)
@@ -1141,10 +1163,11 @@
                 change_queue.moveItem(item, nnfi)
                 changed = True
                 self.cancelJobs(item)
-            self.prepareRef(item)
-            if item.current_build_set.unable_to_merge:
-                failing_reasons.append("it has a merge conflict")
-        if self.launchJobs(item):
+            if actionable:
+                self.prepareRef(item)
+                if item.current_build_set.unable_to_merge:
+                    failing_reasons.append("it has a merge conflict")
+        if actionable and self.launchJobs(item):
             changed = True
         if self.pipeline.didAnyJobFail(item):
             failing_reasons.append("at least one job failed")
@@ -1247,10 +1270,18 @@
                                                         item.change.branch)
             self.log.info("Reported change %s status: all-succeeded: %s, "
                           "merged: %s" % (item.change, succeeded, merged))
+            change_queue = self.pipeline.getQueue(item.change.project)
             if not (succeeded and merged):
                 self.log.debug("Reported change %s failed tests or failed "
                                "to merge" % (item.change))
+                change_queue.decreaseWindowSize()
+                self.log.debug("%s window size decreased to %s" %
+                               (change_queue, change_queue.window))
                 raise MergeFailure("Change %s failed to merge" % item.change)
+            else:
+                change_queue.increaseWindowSize()
+                self.log.debug("%s window size increased to %s" %
+                               (change_queue, change_queue.window))
 
     def _reportItem(self, item):
         if item.reported:
@@ -1507,7 +1538,14 @@
         change_queues = []
 
         for project in self.pipeline.getProjects():
-            change_queue = ChangeQueue(self.pipeline)
+            change_queue = ChangeQueue(
+                self.pipeline,
+                window=self.pipeline.window,
+                window_floor=self.pipeline.window_floor,
+                window_increase_type=self.pipeline.window_increase_type,
+                window_increase_factor=self.pipeline.window_increase_factor,
+                window_decrease_type=self.pipeline.window_decrease_type,
+                window_decrease_factor=self.pipeline.window_decrease_factor)
             change_queue.addProject(project)
             change_queues.append(change_queue)
             self.log.debug("Created queue: %s" % change_queue)
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 3a8644a..fe9a1aa 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -361,6 +361,8 @@
                 if not dep.is_merged and dep.is_current_patchset:
                     change.needed_by_changes.append(dep)
 
+        change.approvals = data['currentPatchSet'].get('approvals', [])
+
         return change
 
     def getGitUrl(self, project):