Merge "Fix AttributeError when handle periodic job with github driver"
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index d43fd03..1c17c28 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -131,6 +131,14 @@
    component of the key will be replaced with the hostname of the
    executor.
 
+   .. stat:: merger.<result>
+      :type: counter
+
+      Incremented to represent the status of a Zuul executor's merger
+      operations. ``<result>`` can be either ``SUCCESS`` or ``FAILURE``.
+      A failed merge operation which would be accounted for as a ``FAILURE``
+      is what ends up being returned by Zuul as a ``MERGER_FAILURE``.
+
    .. stat:: builds
       :type: counter
 
@@ -148,6 +156,27 @@
       The number of builds currently running on this executor.  This
       includes starting builds.
 
+  .. stat:: phase
+
+     Subtree detailing per-phase execution statistics:
+
+     .. stat:: <phase>
+
+        ``<phase>`` represents a phase in the execution of a job.
+        This can be an *internal* phase (such as ``setup`` or ``cleanup``) as
+        well as *job* phases such as ``pre``, ``run`` or ``post``.
+
+        .. stat:: <result>
+           :type: counter
+
+           A counter for each type of result.
+           These results do not, by themselves, determine the status of a build
+           but are indicators of the exit status provided by Ansible for the
+           execution of a particular phase.
+
+           Example of possible counters for each phase are: ``RESULT_NORMAL``,
+           ``RESULT_TIMED_OUT``, ``RESULT_UNREACHABLE``, ``RESULT_ABORTED``.
+
    .. stat:: load_average
       :type: gauge
 
diff --git a/tests/base.py b/tests/base.py
index f68f59a..70889bb 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1640,6 +1640,10 @@
         nodeid = path.split("/")[-1]
         return nodeid
 
+    def removeNode(self, node):
+        path = self.NODE_ROOT + '/' + node["_oid"]
+        self.client.delete(path, recursive=True)
+
     def addFailRequest(self, request):
         self.fail_requests.add(request['_oid'])
 
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
index 95ab870..ba35eb0 100644
--- a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
+++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
@@ -1,6 +1,7 @@
 - hosts: localhost
   tasks:
     - command: dd if=/dev/zero of=toobig bs=1M count=2
+    - command: sync
     - wait_for:
         delay: 10
         path: /
diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py
index 7081b53..e12846d 100644
--- a/tests/unit/test_disk_accountant.py
+++ b/tests/unit/test_disk_accountant.py
@@ -10,6 +10,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import fixtures
 import os
 import tempfile
 import time
@@ -32,6 +33,10 @@
 
 
 class TestDiskAccountant(BaseTestCase):
+    def setUp(self):
+        super(TestDiskAccountant, self).setUp()
+        self.useFixture(fixtures.NestedTempfile())
+
     def test_disk_accountant(self):
         jobs_dir = tempfile.mkdtemp(
             dir=os.environ.get("ZUUL_TEST_ROOT", None))
@@ -47,6 +52,8 @@
             testfile = os.path.join(jobdir, 'tfile')
             with open(testfile, 'w') as tf:
                 tf.write(2 * 1024 * 1024 * '.')
+                tf.flush()
+                os.fsync(tf.fileno())
 
             # da should catch over-limit dir within 5 seconds
             for i in range(0, 50):
diff --git a/tests/unit/test_encryption.py b/tests/unit/test_encryption.py
index b424769..0a5c0a4 100644
--- a/tests/unit/test_encryption.py
+++ b/tests/unit/test_encryption.py
@@ -12,6 +12,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import fixtures
 import os
 import subprocess
 import tempfile
@@ -26,6 +27,10 @@
     def setUp(self):
         super(TestEncryption, self).setUp()
         self.private, self.public = encryption.generate_rsa_keypair()
+        # Because we set delete to False when using NamedTemporaryFile below
+        # we need to stick our usage of temporary files in the NestedTempfile
+        # fixture ensuring everything gets cleaned up when it is done.
+        self.useFixture(fixtures.NestedTempfile())
 
     def test_serialization(self):
         "Verify key serialization"
diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py
index 8cb98ee..46e1d99 100755
--- a/tests/unit/test_executor.py
+++ b/tests/unit/test_executor.py
@@ -490,6 +490,7 @@
     def test_slow_start(self):
         self.executor_server.hold_jobs_in_build = True
         self.executor_server.max_starting_builds = 1
+        self.executor_server.min_starting_builds = 1
         self.executor_server.manageLoad()
         self.assertTrue(self.executor_server.accepting_work)
         A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A')
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..c833fa2 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
     ZuulTestCase,
     repack_repo,
     simple_layout,
+    iterate_timeout,
 )
 
 
@@ -1505,7 +1506,7 @@
                                           self.gearman_server.port)
         self.addCleanup(client.shutdown)
         r = client.autohold('tenant-one', 'org/project', 'project-test2',
-                            "reason text", 1)
+                            "", "", "reason text", 1)
         self.assertTrue(r)
 
         # First check that successful jobs do not autohold
@@ -1552,7 +1553,7 @@
             held_node['hold_job'],
             " ".join(['tenant-one',
                       'review.example.com/org/project',
-                      'project-test2'])
+                      'project-test2', '.*'])
         )
         self.assertEqual(held_node['comment'], "reason text")
 
@@ -1572,13 +1573,151 @@
                 held_nodes += 1
         self.assertEqual(held_nodes, 1)
 
+    def _test_autohold_scoped(self, change_obj, change, ref):
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        self.addCleanup(client.shutdown)
+
+        # create two changes on the same project, and autohold request
+        # for one of them.
+        other = self.fake_gerrit.addFakeChange(
+            'org/project', 'master', 'other'
+        )
+
+        r = client.autohold('tenant-one', 'org/project', 'project-test2',
+                            str(change), ref, "reason text", 1)
+        self.assertTrue(r)
+
+        # First, check that an unrelated job does not trigger autohold, even
+        # when it failed
+        self.executor_server.failJob('project-test2', other)
+        self.fake_gerrit.addEvent(other.getPatchsetCreatedEvent(1))
+
+        self.waitUntilSettled()
+
+        self.assertEqual(other.data['status'], 'NEW')
+        self.assertEqual(other.reported, 1)
+        # project-test2
+        self.assertEqual(self.history[0].result, 'FAILURE')
+
+        # Check nodepool for a held node
+        held_node = None
+        for node in self.fake_nodepool.getNodes():
+            if node['state'] == zuul.model.STATE_HOLD:
+                held_node = node
+                break
+        self.assertIsNone(held_node)
+
+        # And then verify that failed job for the defined change
+        # triggers the autohold
+
+        self.executor_server.failJob('project-test2', change_obj)
+        self.fake_gerrit.addEvent(change_obj.getPatchsetCreatedEvent(1))
+
+        self.waitUntilSettled()
+
+        self.assertEqual(change_obj.data['status'], 'NEW')
+        self.assertEqual(change_obj.reported, 1)
+        # project-test2
+        self.assertEqual(self.history[1].result, 'FAILURE')
+
+        # Check nodepool for a held node
+        held_node = None
+        for node in self.fake_nodepool.getNodes():
+            if node['state'] == zuul.model.STATE_HOLD:
+                held_node = node
+                break
+        self.assertIsNotNone(held_node)
+
+        # Validate node has recorded the failed job
+        if change != "":
+            ref = "refs/changes/%s/%s/.*" % (
+                str(change_obj.number)[-1:], str(change_obj.number)
+            )
+
+        self.assertEqual(
+            held_node['hold_job'],
+            " ".join(['tenant-one',
+                      'review.example.com/org/project',
+                      'project-test2', ref])
+        )
+        self.assertEqual(held_node['comment'], "reason text")
+
+    @simple_layout('layouts/autohold.yaml')
+    def test_autohold_change(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+        self._test_autohold_scoped(A, change=A.number, ref="")
+
+    @simple_layout('layouts/autohold.yaml')
+    def test_autohold_ref(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        ref = A.data['currentPatchSet']['ref']
+        self._test_autohold_scoped(A, change="", ref=ref)
+
+    @simple_layout('layouts/autohold.yaml')
+    def test_autohold_scoping(self):
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        self.addCleanup(client.shutdown)
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+        # create three autohold requests, scoped to job, change and
+        # a specific ref
+        change = str(A.number)
+        ref = A.data['currentPatchSet']['ref']
+        r1 = client.autohold('tenant-one', 'org/project', 'project-test2',
+                             "", "", "reason text", 1)
+        self.assertTrue(r1)
+        r2 = client.autohold('tenant-one', 'org/project', 'project-test2',
+                             change, "", "reason text", 1)
+        self.assertTrue(r2)
+        r3 = client.autohold('tenant-one', 'org/project', 'project-test2',
+                             "", ref, "reason text", 1)
+        self.assertTrue(r3)
+
+        # Fail 3 jobs for the same change, and verify that the autohold
+        # requests are fullfilled in the expected order: from the most
+        # specific towards the most generic one.
+
+        def _fail_job_and_verify_autohold_request(change_obj, ref_filter):
+            self.executor_server.failJob('project-test2', change_obj)
+            self.fake_gerrit.addEvent(change_obj.getPatchsetCreatedEvent(1))
+
+            self.waitUntilSettled()
+
+            # Check nodepool for a held node
+            held_node = None
+            for node in self.fake_nodepool.getNodes():
+                if node['state'] == zuul.model.STATE_HOLD:
+                    held_node = node
+                    break
+            self.assertIsNotNone(held_node)
+
+            self.assertEqual(
+                held_node['hold_job'],
+                " ".join(['tenant-one',
+                          'review.example.com/org/project',
+                          'project-test2', ref_filter])
+            )
+            self.assertFalse(held_node['_lock'], "Node %s is locked" %
+                             (node['_oid'],))
+            self.fake_nodepool.removeNode(held_node)
+
+        _fail_job_and_verify_autohold_request(A, ref)
+
+        ref = "refs/changes/%s/%s/.*" % (str(change)[-1:], str(change))
+        _fail_job_and_verify_autohold_request(A, ref)
+        _fail_job_and_verify_autohold_request(A, ".*")
+
     @simple_layout('layouts/autohold.yaml')
     def test_autohold_ignores_aborted_jobs(self):
         client = zuul.rpcclient.RPCClient('127.0.0.1',
                                           self.gearman_server.port)
         self.addCleanup(client.shutdown)
         r = client.autohold('tenant-one', 'org/project', 'project-test2',
-                            "reason text", 1)
+                            "", "", "reason text", 1)
         self.assertTrue(r)
 
         self.executor_server.hold_jobs_in_build = True
@@ -1622,7 +1761,7 @@
         self.addCleanup(client.shutdown)
 
         r = client.autohold('tenant-one', 'org/project', 'project-test2',
-                            "reason text", 1)
+                            "", "", "reason text", 1)
         self.assertTrue(r)
 
         autohold_requests = client.autohold_list()
@@ -1631,11 +1770,12 @@
 
         # The single dict key should be a CSV string value
         key = list(autohold_requests.keys())[0]
-        tenant, project, job = key.split(',')
+        tenant, project, job, ref_filter = key.split(',')
 
         self.assertEqual('tenant-one', tenant)
         self.assertIn('org/project', project)
         self.assertEqual('project-test2', job)
+        self.assertEqual(".*", ref_filter)
 
         # Note: the value is converted from set to list by json.
         self.assertEqual([1, "reason text"], autohold_requests[key])
@@ -4397,6 +4537,54 @@
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
 
+    def test_zookeeper_disconnect2(self):
+        "Test that jobs are executed after a zookeeper disconnect"
+
+        # This tests receiving a ZK disconnect between the arrival of
+        # a fulfilled request and when we accept its nodes.
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        # We're waiting on the nodepool request to complete.  Stop the
+        # scheduler from processing further events, then fulfill the
+        # nodepool request.
+        self.sched.run_handler_lock.acquire()
+
+        # Fulfill the nodepool request.
+        self.fake_nodepool.paused = False
+        requests = list(self.sched.nodepool.requests.values())
+        self.assertEqual(1, len(requests))
+        request = requests[0]
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        id1 = request.id
+
+        # The request is fulfilled, but the scheduler hasn't processed
+        # it yet.  Reconnect ZK.
+        self.zk.client.stop()
+        self.zk.client.start()
+
+        # Allow the scheduler to continue and process the (now
+        # out-of-date) notification that nodes are ready.
+        self.sched.run_handler_lock.release()
+
+        # It should resubmit the request, once it's fulfilled, we can
+        # wait for it to run jobs and settle.
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        self.waitUntilSettled()
+
+        id2 = request.id
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        # Make sure it was resubmitted (the id's should be different).
+        self.assertNotEqual(id1, id2)
+
     def test_nodepool_failure(self):
         "Test that jobs are reported after a nodepool failure"
 
diff --git a/tools/github-debugging.py b/tools/github-debugging.py
index 101fd11..da6fd0c 100755
--- a/tools/github-debugging.py
+++ b/tools/github-debugging.py
@@ -11,6 +11,8 @@
 # TODO: for real use override the following variables
 server = 'github.com'
 api_token = 'xxxx'
+appid = 2
+appkey = '/opt/project/appkey'
 
 org = 'example'
 repo = 'sandbox'
@@ -42,20 +44,36 @@
     return conn
 
 
+def create_connection_app(server, appid, appkey):
+    driver = GithubDriver()
+    connection_config = {
+        'server': server,
+        'app_id': appid,
+        'app_key': appkey,
+    }
+    conn = GithubConnection(driver, 'github', connection_config)
+    conn._authenticateGithubAPI()
+    conn._prime_installation_map()
+    return conn
+
+
 def get_change(connection: GithubConnection,
                org: str,
                repo: str,
                pull: int) -> Change:
     p = Project("%s/%s" % (org, repo), connection.source)
-    github = connection.getGithubClient(p)
+    github = connection.getGithubClient(p.name)
     pr = github.pull_request(org, repo, pull)
     sha = pr.head.sha
     return conn._getChange(p, pull, sha, True)
 
 
-# create github connection
+# create github connection with api token
 conn = create_connection(server, api_token)
 
+# create github connection with app key
+# conn = create_connection_app(server, appid, appkey)
+
 
 # Now we can do anything we want with the connection, e.g. check canMerge for
 # a pull request.
diff --git a/tools/nodepool-integration-setup.sh b/tools/nodepool-integration-setup.sh
index c02a016..58c39cf 100755
--- a/tools/nodepool-integration-setup.sh
+++ b/tools/nodepool-integration-setup.sh
@@ -3,7 +3,7 @@
 /usr/zuul-env/bin/zuul-cloner --workspace /tmp --cache-dir /opt/git \
     git://git.openstack.org openstack-infra/nodepool
 
-ln -s /tmp/nodepool/log $WORKSPACE/logs
+ln -s /tmp/nodepool/log $HOME/logs
 
 cd /tmp/openstack-infra/nodepool
 /usr/local/jenkins/slave_scripts/install-distro-packages.sh
diff --git a/tox.ini b/tox.ini
index 73915ad..e5035bd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,7 +6,7 @@
 [testenv]
 basepython = python3
 setenv = VIRTUAL_ENV={envdir}
-         OS_TEST_TIMEOUT=120
+         OS_TEST_TIMEOUT=150
 passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE OS_LOG_DEFAULTS
 usedevelop = True
 install_command = pip install {opts} {packages}
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index ebf59b9..a7b3ef3 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -51,6 +51,11 @@
                                   required=True)
         cmd_autohold.add_argument('--job', help='job name',
                                   required=True)
+        cmd_autohold.add_argument('--change',
+                                  help='specific change to hold nodes for',
+                                  required=False, default='')
+        cmd_autohold.add_argument('--ref', help='git ref to hold nodes for',
+                                  required=False, default='')
         cmd_autohold.add_argument('--reason', help='reason for the hold',
                                   required=True)
         cmd_autohold.add_argument('--count',
@@ -173,9 +178,15 @@
     def autohold(self):
         client = zuul.rpcclient.RPCClient(
             self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
+        if self.args.change and self.args.ref:
+            print("Change and ref can't be both used for the same request")
+            return False
+
         r = client.autohold(tenant=self.args.tenant,
                             project=self.args.project,
                             job=self.args.job,
+                            change=self.args.change,
+                            ref=self.args.ref,
                             reason=self.args.reason,
                             count=self.args.count)
         return r
@@ -190,14 +201,19 @@
             return True
 
         table = prettytable.PrettyTable(
-            field_names=['Tenant', 'Project', 'Job', 'Count', 'Reason'])
+            field_names=[
+                'Tenant', 'Project', 'Job', 'Ref Filter', 'Count', 'Reason'
+            ])
 
         for key, value in autohold_requests.items():
             # The key comes to us as a CSV string because json doesn't like
             # non-str keys.
-            tenant_name, project_name, job_name = key.split(',')
+            tenant_name, project_name, job_name, ref_filter = key.split(',')
             count, reason = value
-            table.add_row([tenant_name, project_name, job_name, count, reason])
+
+            table.add_row([
+                tenant_name, project_name, job_name, ref_filter, count, reason
+            ])
         print(table)
         return True
 
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index fdc1ad7..8f3408e 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -141,6 +141,10 @@
         )
         return [f]
 
+    def getRefForChange(self, change):
+        partial = change[-2:]
+        return "refs/changes/%s/%s/.*" % (partial, change)
+
 
 approval = vs.Schema({'username': str,
                       'email': str,
diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py
index a7d42be..9f0963d 100644
--- a/zuul/driver/git/gitsource.py
+++ b/zuul/driver/git/gitsource.py
@@ -68,3 +68,6 @@
 
     def getRejectFilters(self, config):
         return []
+
+    def getRefForChange(self, change):
+        raise NotImplemented()
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 3800ca0..6dfcdd3 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -458,6 +458,7 @@
         self._github = None
         self.app_id = None
         self.app_key = None
+        self.sched = None
 
         self.installation_map = {}
         self.installation_token_cache = {}
@@ -828,7 +829,8 @@
         change.updated_at = self._ghTimestampToDate(
             change.pr.get('updated_at'))
 
-        self.sched.onChangeUpdated(change)
+        if self.sched:
+            self.sched.onChangeUpdated(change)
 
         return change
 
diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py
index 33f8f7c..6f9b14d 100644
--- a/zuul/driver/github/githubsource.py
+++ b/zuul/driver/github/githubsource.py
@@ -144,6 +144,9 @@
         )
         return [f]
 
+    def getRefForChange(self, change):
+        return "refs/pull/%s/head" % change
+
 
 review = v.Schema({'username': str,
                    'email': str,
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index ffc083f..a831a53 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -780,8 +780,17 @@
         ret = merger.mergeChanges(items, repo_state=repo_state)
         if not ret:  # merge conflict
             result = dict(result='MERGER_FAILURE')
+            if self.executor_server.statsd:
+                base_key = ("zuul.executor.%s.merger" %
+                            self.executor_server.hostname)
+                self.executor_server.statsd.incr(base_key + ".FAILURE")
             self.job.sendWorkComplete(json.dumps(result))
             return False
+
+        if self.executor_server.statsd:
+            base_key = ("zuul.executor.%s.merger" %
+                        self.executor_server.hostname)
+            self.executor_server.statsd.incr(base_key + ".SUCCESS")
         recent = ret[3]
         for key, commit in recent.items():
             (connection, project, branch) = key
@@ -1465,6 +1474,11 @@
             wrapped=False)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
+        if self.executor_server.statsd:
+            base_key = ("zuul.executor.%s.phase.setup" %
+                        self.executor_server.hostname)
+            self.executor_server.statsd.incr(base_key + ".%s" %
+                                             self.RESULT_MAP[result])
         return result, code
 
     def runAnsibleCleanup(self, playbook):
@@ -1485,6 +1499,11 @@
             wrapped=False)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
+        if self.executor_server.statsd:
+            base_key = ("zuul.executor.%s.phase.cleanup" %
+                        self.executor_server.hostname)
+            self.executor_server.statsd.incr(base_key + ".%s" %
+                                             self.RESULT_MAP[result])
         return result, code
 
     def emitPlaybookBanner(self, playbook, step, phase, result=None):
@@ -1554,6 +1573,11 @@
             cmd=cmd, timeout=timeout, playbook=playbook)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
+        if self.executor_server.statsd:
+            base_key = ("zuul.executor.%s.phase.%s" %
+                        (self.executor_server.hostname, phase or 'unknown'))
+            self.executor_server.statsd.incr(base_key + ".%s" %
+                                             self.RESULT_MAP[result])
 
         self.emitPlaybookBanner(playbook, 'END', phase, result=result)
         return result, code
@@ -1636,6 +1660,7 @@
                                             'load_multiplier', '2.5'))
         self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
         self.max_starting_builds = self.max_load_avg * 2
+        self.min_starting_builds = 4
         self.min_avail_mem = float(get_default(self.config, 'executor',
                                                'min_avail_mem', '5.0'))
         self.accepting_work = False
@@ -1975,7 +2000,7 @@
                 starting_builds += 1
         max_starting_builds = max(
             self.max_starting_builds - len(self.job_workers),
-            1)
+            self.min_starting_builds)
         if self.accepting_work:
             # Don't unregister if we don't have any active jobs.
             if load_avg > self.max_load_avg:
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
         self.log.debug("Updating node request %s" % (request,))
 
         if request.uid not in self.requests:
+            self.log.debug("Request %s is unknown" % (request.uid,))
             return False
 
         if request.canceled:
@@ -193,14 +194,21 @@
 
     def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
-        # nodes for (potential) use.
+        # nodes for (potential) use.  Return False if there is a
+        # problem with the request (canceled or retrying), True if it
+        # is ready to be acted upon (success or failure).
 
         self.log.info("Accepting node request %s" % (request,))
 
         if request_id != request.id:
             self.log.info("Skipping node accept for %s (resubmitted as %s)",
                           request_id, request.id)
-            return
+            return False
+
+        if request.canceled:
+            self.log.info("Ignoring canceled node request %s" % (request,))
+            # The request was already deleted when it was canceled
+            return False
 
         # Make sure the request still exists. It's possible it could have
         # disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
         # processing it. Nodepool will automatically reallocate the assigned
         # nodes in that situation.
         if not self.sched.zk.nodeRequestExists(request):
-            self.log.info("Request %s no longer exists", request.id)
-            return
-
-        if request.canceled:
-            self.log.info("Ignoring canceled node request %s" % (request,))
-            # The request was already deleted when it was canceled
-            return
+            self.log.info("Request %s no longer exists, resubmitting",
+                          request.id)
+            request.id = None
+            request.state = model.STATE_REQUESTED
+            self.requests[request.uid] = request
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+            return False
 
         locked = False
         if request.fulfilled:
@@ -239,3 +247,4 @@
             # them.
             if locked:
                 self.unlockNodeSet(request.nodeset)
+        return True
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 8f2e5dc..a947ed0 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -48,10 +48,12 @@
         self.log.debug("Job complete, success: %s" % (not job.failure))
         return job
 
-    def autohold(self, tenant, project, job, reason, count):
+    def autohold(self, tenant, project, job, change, ref, reason, count):
         data = {'tenant': tenant,
                 'project': project,
                 'job': job,
+                'change': change,
+                'ref': ref,
                 'reason': reason,
                 'count': count}
         return not self.submitJob('zuul:autohold', data).failure
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index e5016df..f3f55f6 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -150,7 +150,20 @@
             job.sendWorkException(error.encode('utf8'))
             return
 
+        if args['change'] and args['ref']:
+            job.sendWorkException("Change and ref can't be both used "
+                                  "for the same request")
+
+        if args['change']:
+            # Convert change into ref based on zuul connection
+            ref_filter = project.source.getRefForChange(args['change'])
+        elif args['ref']:
+            ref_filter = "%s" % args['ref']
+        else:
+            ref_filter = ".*"
+
         params['job_name'] = args['job']
+        params['ref_filter'] = ref_filter
         params['reason'] = args['reason']
 
         if args['count'] < 0:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2130ede..1b65871 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -19,6 +19,7 @@
 import logging
 import os
 import pickle
+import re
 import queue
 import socket
 import sys
@@ -435,8 +436,9 @@
         self.last_reconfigured = int(time.time())
         # TODOv3(jeblair): reconfigure time should be per-tenant
 
-    def autohold(self, tenant_name, project_name, job_name, reason, count):
-        key = (tenant_name, project_name, job_name)
+    def autohold(self, tenant_name, project_name, job_name, ref_filter,
+                 reason, count):
+        key = (tenant_name, project_name, job_name, ref_filter)
         if count == 0 and key in self.autohold_requests:
             self.log.debug("Removing autohold for %s", key)
             del self.autohold_requests[key]
@@ -972,6 +974,66 @@
             self.log.exception("Exception estimating build time:")
         pipeline.manager.onBuildStarted(event.build)
 
+    def _getAutoholdRequestKey(self, build):
+        change = build.build_set.item.change
+
+        autohold_key_base = (build.pipeline.layout.tenant.name,
+                             change.project.canonical_name,
+                             build.job.name)
+
+        class Scope(object):
+            """Enum defining a precedence/priority of autohold requests.
+
+            Autohold requests for specific refs should be fulfilled first,
+            before those for changes, and generic jobs.
+
+            Matching algorithm goes over all existing autohold requests, and
+            returns one with the highest number (in case of duplicated
+            requests the last one wins).
+            """
+            NONE = 0
+            JOB = 1
+            CHANGE = 2
+            REF = 3
+
+        def autohold_key_base_issubset(base, request_key):
+            """check whether the given key is a subset of the build key"""
+            index = 0
+            base_len = len(base)
+            while index < base_len:
+                if base[index] != request_key[index]:
+                    return False
+                index += 1
+            return True
+
+        # Do a partial match of the autohold key against all autohold
+        # requests, ignoring the last element of the key (ref filter),
+        # and finally do a regex match between ref filter from
+        # the autohold request and the build's change ref to check
+        # if it matches. Lastly, make sure that we match the most
+        # specific autohold request by comparing "scopes"
+        # of requests - the most specific is selected.
+        autohold_key = None
+        scope = Scope.NONE
+        for request in self.autohold_requests:
+            ref_filter = request[-1]
+            if not autohold_key_base_issubset(autohold_key_base, request) \
+                or not re.match(ref_filter, change.ref):
+                continue
+
+            if ref_filter == ".*":
+                candidate_scope = Scope.JOB
+            elif ref_filter.endswith(".*"):
+                candidate_scope = Scope.CHANGE
+            else:
+                candidate_scope = Scope.REF
+
+            if candidate_scope > scope:
+                scope = candidate_scope
+                autohold_key = request
+
+        return autohold_key
+
     def _doBuildCompletedEvent(self, event):
         build = event.build
 
@@ -980,11 +1042,9 @@
         # the nodes to nodepool.
         try:
             nodeset = build.nodeset
-            autohold_key = (build.pipeline.layout.tenant.name,
-                            build.build_set.item.change.project.canonical_name,
-                            build.job.name)
-            if (build.result == "FAILURE" and
-                autohold_key in self.autohold_requests):
+            autohold_key = self._getAutoholdRequestKey(build)
+
+            if (build.result == "FAILURE" and autohold_key):
                 # We explicitly only want to hold nodes for jobs if they have
                 # failed and have an autohold request.
                 try:
@@ -1035,8 +1095,8 @@
         request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request, request_id)
-        if request.canceled:
+        ready = self.nodepool.acceptNodes(request, request_id)
+        if not ready:
             return
 
         if build_set is not build_set.item.current_build_set: