Implement autohold

Adds the 'autohold' client option, the scheduler implementation
of it, and a unit test for it.

The autohold is automatically removed from the in-memory data
structure once we've reached the number of requested runs of
the job.

Story: 2000905
Change-Id: Ieac0b5fee6801313fa23cce69520eb348735ad99
diff --git a/doc/source/admin/client.rst b/doc/source/admin/client.rst
index 6b62360..5cdf919 100644
--- a/doc/source/admin/client.rst
+++ b/doc/source/admin/client.rst
@@ -22,6 +22,14 @@
 
 The following subcommands are supported:
 
+Autohold
+^^^^^^^^
+.. program-output:: zuul autohold --help
+
+Example::
+
+  zuul autohold --tenant openstack --project example_project --job example_job ---count 1
+
 Enqueue
 ^^^^^^^
 .. program-output:: zuul enqueue --help
diff --git a/tests/fixtures/layouts/autohold.yaml b/tests/fixtures/layouts/autohold.yaml
new file mode 100644
index 0000000..015e562
--- /dev/null
+++ b/tests/fixtures/layouts/autohold.yaml
@@ -0,0 +1,24 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: project-test2
+    nodes:
+      - name: controller
+        label: label1
+
+- project:
+    name: org/project
+    check:
+      jobs:
+        - project-test2
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 5dd3f4e..e38dd84 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -1434,6 +1434,58 @@
         self.assertEqual(self.getJobFromHistory('project-test2').result,
                          'FAILURE')
 
+    @simple_layout('layouts/autohold.yaml')
+    def test_autohold(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        self.addCleanup(client.shutdown)
+        r = client.autohold('tenant-one', 'org/project', 'project-test2', 1)
+        self.assertTrue(r)
+
+        self.executor_server.failJob('project-test2', A)
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1)
+        self.assertEqual(self.getJobFromHistory('project-test2').result,
+                         'FAILURE')
+
+        # Check nodepool for a held node
+        held_node = None
+        for node in self.fake_nodepool.getNodes():
+            if node['state'] == zuul.model.STATE_HOLD:
+                held_node = node
+                break
+        self.assertIsNotNone(held_node)
+
+        # Validate node has recorded the failed job
+        self.assertEqual(
+            held_node['hold_job'],
+            " ".join(['tenant-one',
+                      'review.example.com/org/project',
+                      'project-test2'])
+        )
+
+        # Another failed change should not hold any more nodes
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        self.executor_server.failJob('project-test2', B)
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(B.reported, 1)
+        self.assertEqual(self.getJobFromHistory('project-test2').result,
+                         'FAILURE')
+
+        held_nodes = 0
+        for node in self.fake_nodepool.getNodes():
+            if node['state'] == zuul.model.STATE_HOLD:
+                held_nodes += 1
+        self.assertEqual(held_nodes, 1)
+
     @simple_layout('layouts/three-projects.yaml')
     def test_dependent_behind_dequeue(self):
         # This particular test does a large amount of merges and needs a little
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index b55aed8..03eabce 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -46,6 +46,19 @@
                                            description='valid commands',
                                            help='additional help')
 
+        cmd_autohold = subparsers.add_parser(
+            'autohold', help='hold nodes for failed job')
+        cmd_autohold.add_argument('--tenant', help='tenant name',
+                                  required=True)
+        cmd_autohold.add_argument('--project', help='project name',
+                                  required=True)
+        cmd_autohold.add_argument('--job', help='job name',
+                                  required=True)
+        cmd_autohold.add_argument('--count',
+                                  help='number of job runs (default: 1)',
+                                  required=False, type=int, default=1)
+        cmd_autohold.set_defaults(func=self.autohold)
+
         cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
         cmd_enqueue.add_argument('--tenant', help='tenant name',
                                  required=True)
@@ -137,6 +150,15 @@
         else:
             sys.exit(1)
 
+    def autohold(self):
+        client = zuul.rpcclient.RPCClient(
+            self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
+        r = client.autohold(tenant_name=self.args.tenant,
+                            project_name=self.args.project,
+                            job_name=self.args.job,
+                            count=self.args.count)
+        return r
+
     def enqueue(self):
         client = zuul.rpcclient.RPCClient(
             self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
diff --git a/zuul/model.py b/zuul/model.py
index ed50164..e7043f3 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -356,6 +356,7 @@
         self.label = label
         self.id = None
         self.lock = None
+        self.hold_job = None
         # Attributes from Nodepool
         self._state = 'unknown'
         self.state_time = time.time()
@@ -396,6 +397,7 @@
     def toDict(self):
         d = {}
         d['state'] = self.state
+        d['hold_job'] = self.hold_job
         for k in self._keys:
             d[k] = getattr(self, k)
         return d
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 8f6489c..f677810 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -44,6 +44,34 @@
             except Exception:
                 self.log.exception("Error deleting node request:")
 
+    def holdNodeSet(self, nodeset, autohold_key):
+        '''
+        If requested, perform a hold on the given set of nodes.
+
+        :param NodeSet nodeset: The object containing the set of nodes to hold.
+        :param set autohold_key: A set with the tenant/project/job names
+            associated with the given NodeSet.
+        '''
+        if autohold_key not in self.sched.autohold_requests:
+            return
+
+        hold_iterations = self.sched.autohold_requests[autohold_key]
+        nodes = nodeset.getNodes()
+
+        for node in nodes:
+            node.state = model.STATE_HOLD
+            node.hold_job = " ".join(autohold_key)
+            self.sched.zk.storeNode(node)
+
+        # We remove the autohold when the number of nodes in hold
+        # is equal to or greater than (run iteration count can be
+        # altered) the number of nodes used in a single job run
+        # times the number of run iterations requested.
+        nodes_in_hold = self.sched.zk.heldNodeCount(autohold_key)
+        if nodes_in_hold >= len(nodes) * hold_iterations:
+            self.log.debug("Removing autohold for %s", autohold_key)
+            del self.sched.autohold_requests[autohold_key]
+
     def useNodeSet(self, nodeset):
         self.log.info("Setting nodeset %s in use" % (nodeset,))
         for node in nodeset.getNodes():
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index fd3517f..ee0c7d2 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -48,6 +48,13 @@
         self.log.debug("Job complete, success: %s" % (not job.failure))
         return job
 
+    def autohold(self, tenant_name, project_name, job_name, count):
+        data = {'tenant_name': tenant_name,
+                'project_name': project_name,
+                'job_name': job_name,
+                'count': count}
+        return not self.submitJob('zuul:autohold', data).failure
+
     def enqueue(self, tenant, pipeline, project, trigger, change):
         data = {'tenant': tenant,
                 'pipeline': pipeline,
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 6543c91..ae948eb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -49,6 +49,7 @@
         self.thread.start()
 
     def register(self):
+        self.worker.registerFunction("zuul:autohold")
         self.worker.registerFunction("zuul:enqueue")
         self.worker.registerFunction("zuul:enqueue_ref")
         self.worker.registerFunction("zuul:promote")
@@ -89,6 +90,38 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
+    def handle_autohold(self, job):
+        args = json.loads(job.arguments)
+        params = {}
+
+        tenant = self.sched.abide.tenants.get(args['tenant_name'])
+        if tenant:
+            params['tenant_name'] = args['tenant_name']
+        else:
+            error = "Invalid tenant: %s" % args['tenant_name']
+            job.sendWorkException(error.encode('utf8'))
+            return
+
+        (trusted, project) = tenant.getProject(args['project_name'])
+        if project:
+            params['project_name'] = project.canonical_name
+        else:
+            error = "Invalid project: %s" % args['project_name']
+            job.sendWorkException(error.encode('utf8'))
+            return
+
+        params['job_name'] = args['job_name']
+
+        if args['count'] < 0:
+            error = "Invalid count: %d" % args['count']
+            job.sendWorkException(error.encode('utf8'))
+            return
+
+        params['count'] = args['count']
+
+        self.sched.autohold(**params)
+        job.sendWorkComplete()
+
     def _common_enqueue(self, job):
         args = json.loads(job.arguments)
         event = model.TriggerEvent()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2217b0b..7eb8a69 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -231,6 +231,7 @@
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
         self.tenant_last_reconfigured = {}
+        self.autohold_requests = {}
 
     def stop(self):
         self._stopped = True
@@ -349,6 +350,15 @@
         self.last_reconfigured = int(time.time())
         # TODOv3(jeblair): reconfigure time should be per-tenant
 
+    def autohold(self, tenant_name, project_name, job_name, count):
+        key = (tenant_name, project_name, job_name)
+        if count == 0 and key in self.autohold_requests:
+            self.log.debug("Removing autohold for %s", key)
+            del self.autohold_requests[key]
+        else:
+            self.log.debug("Autohold requested for %s", key)
+            self.autohold_requests[key] = count
+
     def promote(self, tenant_name, pipeline_name, change_ids):
         event = PromoteEvent(tenant_name, pipeline_name, change_ids)
         self.management_event_queue.put(event)
@@ -828,6 +838,16 @@
         # the nodes to nodepool.
         try:
             nodeset = build.build_set.getJobNodeSet(build.job.name)
+            autohold_key = (build.pipeline.layout.tenant.name,
+                            build.build_set.item.change.project.canonical_name,
+                            build.job.name)
+
+            try:
+                self.nodepool.holdNodeSet(nodeset, autohold_key)
+            except Exception:
+                self.log.exception("Unable to process autohold for %s",
+                                   autohold_key)
+
             self.nodepool.returnNodeSet(nodeset)
         except Exception:
             self.log.exception("Unable to return nodeset %s" % (nodeset,))
diff --git a/zuul/zk.py b/zuul/zk.py
index 31b85ea..1ce0428 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -15,10 +15,14 @@
 import json
 import logging
 import time
+
 from kazoo.client import KazooClient, KazooState
 from kazoo import exceptions as kze
 from kazoo.recipe.lock import Lock
 
+import zuul.model
+
+
 # States:
 # We are building this node but it is not ready for use.
 BUILDING = 'building'
@@ -246,3 +250,25 @@
             raise LockException("Node %s does not hold a lock" % (node,))
         node.lock.release()
         node.lock = None
+
+    def heldNodeCount(self, autohold_key):
+        '''
+        Count the number of nodes being held for the given tenant/project/job.
+
+        :param set autohold_key: A set with the tenant/project/job names.
+        '''
+        identifier = " ".join(autohold_key)
+        try:
+            nodes = self.client.get_children(self.NODE_ROOT)
+        except kze.NoNodeError:
+            return 0
+
+        count = 0
+        for nodeid in nodes:
+            node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
+            node_data, node_stat = self.client.get(node_path)
+            node_data = self._strToDict(node_data)
+            if (node_data['state'] == zuul.model.STATE_HOLD and
+                    node_data.get('hold_job') == identifier):
+                count += 1
+        return count