Merge "New client command for printing autohold requests" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index 035ff0c..797bfe6 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1392,12 +1392,12 @@
         self.recordResult(result)
         return result
 
-    def runAnsible(self, cmd, timeout, playbook):
+    def runAnsible(self, cmd, timeout, playbook, wrapped=True):
         build = self.executor_server.job_builds[self.job.unique]
 
         if self.executor_server._run_ansible:
             result = super(RecordingAnsibleJob, self).runAnsible(
-                cmd, timeout, playbook)
+                cmd, timeout, playbook, wrapped)
         else:
             if playbook.path:
                 result = build.run()
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6efc43f..9090421 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,7 +875,8 @@
         # already (without approvals), we need to clear the cache
         # first.
         for connection in self.connections.connections.values():
-            connection.maintainCache([])
+            if hasattr(connection, '_change_cache'):
+                connection._change_cache.clear()
 
         self.executor_server.hold_jobs_in_build = True
         A.addApproval('Approved', 1)
@@ -945,7 +946,8 @@
 
         self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
         # there should still be changes in the cache
-        self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
+        self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
+                            0)
 
         self.executor_server.hold_jobs_in_build = False
         self.executor_server.release()
@@ -3931,7 +3933,8 @@
         self.assertEqual(B.data['status'], 'NEW')
 
         for connection in self.connections.connections.values():
-            connection.maintainCache([])
+            if hasattr(connection, '_change_cache'):
+                connection._change_cache.clear()
 
         self.executor_server.hold_jobs_in_build = True
         B.addApproval('Approved', 1)
diff --git a/zuul/ansible/filter/zuul_filters.py b/zuul/ansible/filter/zuul_filters.py
index de25548..4092c06 100644
--- a/zuul/ansible/filter/zuul_filters.py
+++ b/zuul/ansible/filter/zuul_filters.py
@@ -36,7 +36,7 @@
             ['%s:%s:refs/changes/%s/%s/%s' % (
                 i['project']['name'],
                 i['branch'],
-                str(i['change'])[:-2:],
+                str(i['change'])[-2:],
                 i['change'],
                 i['patchset'])
              for i in zuul['items']])
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index b44fa46..ca10f21 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,13 +68,6 @@
     def registerScheduler(self, sched):
         self.sched = sched
 
-    def maintainCache(self, relevant):
-        """Make cache contain relevant changes.
-
-        This lets the user supply a list of change objects that are
-        still in use.  Anything in our cache that isn't in the supplied
-        list should be safe to remove from the cache."""
-
     def registerWebapp(self, webapp):
         self.webapp = webapp
 
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 83871e3..343c305 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,9 +23,10 @@
 import pprint
 import shlex
 import queue
-import voluptuous as v
+import weakref
 
 from typing import Dict, List
+import voluptuous as v
 
 from zuul.connection import BaseConnection
 from zuul.model import Ref, Tag, Branch, Project
@@ -142,7 +143,8 @@
         # cache as it may be a dependency
         if event.change_number:
             refresh = True
-            if event.change_number not in self.connection._change_cache:
+            if ((event.change_number, event.patch_number) not in
+                self.connection._change_cache):
                 refresh = False
                 for tenant in self.connection.sched.abide.tenants.values():
                     # TODO(fungi): it would be better to have some simple means
@@ -300,7 +302,7 @@
         self.baseurl = self.connection_config.get('baseurl',
                                                   'https://%s' % self.server)
 
-        self._change_cache = {}
+        self._change_cache = weakref.WeakValueDictionary()
         self.projects = {}
         self.gerrit_event_connector = None
         self.source = driver.getSource(self)
@@ -311,22 +313,6 @@
     def addProject(self, project: Project) -> None:
         self.projects[project.name] = project
 
-    def maintainCache(self, relevant):
-        # This lets the user supply a list of change objects that are
-        # still in use.  Anything in our cache that isn't in the supplied
-        # list should be safe to remove from the cache.
-        remove = {}
-        for change_number, patchsets in self._change_cache.items():
-            for patchset, change in patchsets.items():
-                if change not in relevant:
-                    remove.setdefault(change_number, [])
-                    remove[change_number].append(patchset)
-        for change_number, patchsets in remove.items():
-            for patchset in patchsets:
-                del self._change_cache[change_number][patchset]
-            if not self._change_cache[change_number]:
-                del self._change_cache[change_number]
-
     def getChange(self, event, refresh=False):
         if event.change_number:
             change = self._getChange(event.change_number, event.patch_number,
@@ -371,22 +357,19 @@
         return change
 
     def _getChange(self, number, patchset, refresh=False, history=None):
-        change = self._change_cache.get(number, {}).get(patchset)
+        change = self._change_cache.get((number, patchset))
         if change and not refresh:
             return change
         if not change:
             change = GerritChange(None)
             change.number = number
             change.patchset = patchset
-        self._change_cache.setdefault(change.number, {})
-        self._change_cache[change.number][change.patchset] = change
+        self._change_cache[(change.number, change.patchset)] = change
         try:
             self._updateChange(change, history)
         except Exception:
-            if self._change_cache.get(change.number, {}).get(change.patchset):
-                del self._change_cache[change.number][change.patchset]
-                if not self._change_cache[change.number]:
-                    del self._change_cache[change.number]
+            if self._change_cache.get((change.number, change.patchset)):
+                del self._change_cache[(change.number, change.patchset)]
             raise
         return change
 
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 3d0eb37..46c8ee5 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,6 +21,7 @@
 import threading
 import time
 import re
+import weakref
 
 import cachecontrol
 from cachecontrol.cache import DictCache
@@ -394,7 +395,7 @@
     def __init__(self, driver, connection_name, connection_config):
         super(GithubConnection, self).__init__(
             driver, connection_name, connection_config)
-        self._change_cache = {}
+        self._change_cache = weakref.WeakValueDictionary()
         self._project_branch_cache = {}
         self.projects = {}
         self.git_ssh_key = self.connection_config.get('sshkey')
@@ -567,11 +568,6 @@
         # authenticated, if not then anonymous is the best we have.
         return self._github
 
-    def maintainCache(self, relevant):
-        for key, change in self._change_cache.items():
-            if change not in relevant:
-                del self._change_cache[key]
-
     def getChange(self, event, refresh=False):
         """Get the change representing an event."""
 
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index d4e6736..5998922 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -47,6 +47,11 @@
 DEFAULT_FINGER_PORT = 79
 
 
+class StopException(Exception):
+    """An exception raised when an inner loop is asked to stop."""
+    pass
+
+
 class ExecutorError(Exception):
     """A non-transient run-time executor error
 
@@ -286,6 +291,7 @@
         #     inventory.yaml
         #   .ansible (mounted in bwrap read-write)
         #     fact-cache/localhost
+        #     cp
         #   playbook_0 (mounted in bwrap for each playbook read-only)
         #     secrets.yaml
         #     project -> ../trusted/project_0/...
@@ -326,6 +332,8 @@
         self.ansible_cache_root = os.path.join(self.root, '.ansible')
         self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
         os.makedirs(self.fact_cache)
+        self.control_path = os.path.join(self.ansible_cache_root, 'cp')
+        os.makedirs(self.control_path)
         localhost_facts = os.path.join(self.fact_cache, 'localhost')
         # NOTE(pabelanger): We do not want to leak zuul-executor facts to other
         # playbooks now that smart fact gathering is enabled by default.  We
@@ -555,6 +563,8 @@
         self.aborted = True
         self.aborted_reason = reason
         self.abortRunningProc()
+
+    def wait(self):
         if self.thread:
             self.thread.join()
 
@@ -701,6 +711,11 @@
         self.job.sendWorkStatus(0, 100)
 
         result = self.runPlaybooks(args)
+
+        # Stop the persistent SSH connections.
+        setup_status, setup_code = self.runAnsibleCleanup(
+            self.jobdir.setup_playbook)
+
         if self.aborted_reason == self.RESULT_DISK_FULL:
             result = 'DISK_FULL'
         data = self.getResultData()
@@ -1198,6 +1213,7 @@
             # command which expects interactive input on a tty (such
             # as sudo) it does not hang.
             config.write('pipelining = True\n')
+            config.write('control_path_dir = %s\n' % self.jobdir.control_path)
             ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
                 "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
             config.write('ssh_args = %s\n' % ssh_args)
@@ -1219,7 +1235,7 @@
             except Exception:
                 self.log.exception("Exception while killing ansible process:")
 
-    def runAnsible(self, cmd, timeout, playbook):
+    def runAnsible(self, cmd, timeout, playbook, wrapped=True):
         config_file = playbook.ansible_config
         env_copy = os.environ.copy()
         env_copy.update(self.ssh_agent.env)
@@ -1260,8 +1276,12 @@
         if playbook.secrets_content:
             secrets[playbook.secrets] = playbook.secrets_content
 
-        context = self.executor_server.execution_wrapper.getExecutionContext(
-            ro_paths, rw_paths, secrets)
+        if wrapped:
+            wrapper = self.executor_server.execution_wrapper
+        else:
+            wrapper = self.executor_server.connections.drivers['nullwrap']
+
+        context = wrapper.getExecutionContext(ro_paths, rw_paths, secrets)
 
         popen = context.getPopen(
             work_dir=self.jobdir.work_root,
@@ -1280,6 +1300,7 @@
             self.proc = popen(
                 cmd,
                 cwd=self.jobdir.work_root,
+                stdin=subprocess.DEVNULL,
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 preexec_fn=os.setsid,
@@ -1376,7 +1397,28 @@
                '-a', 'gather_subset=!all']
 
         result, code = self.runAnsible(
-            cmd=cmd, timeout=60, playbook=playbook)
+            cmd=cmd, timeout=60, playbook=playbook,
+            wrapped=False)
+        self.log.debug("Ansible complete, result %s code %s" % (
+            self.RESULT_MAP[result], code))
+        return result, code
+
+    def runAnsibleCleanup(self, playbook):
+        # TODO(jeblair): This requires a bugfix in Ansible 2.4
+        # Once this is used, increase the controlpersist timeout.
+        return (self.RESULT_NORMAL, 0)
+
+        if self.executor_server.verbose:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible', '*', verbose, '-m', 'meta',
+               '-a', 'reset_connection']
+
+        result, code = self.runAnsible(
+            cmd=cmd, timeout=60, playbook=playbook,
+            wrapped=False)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
         return result, code
@@ -1496,6 +1538,7 @@
         self.hostname = socket.gethostname()
         self.log_streaming_port = log_streaming_port
         self.merger_lock = threading.Lock()
+        self.run_lock = threading.Lock()
         self.verbose = False
         self.command_map = dict(
             stop=self.stop,
@@ -1631,8 +1674,9 @@
         self.merger_worker.registerFunction("merger:refstate")
 
     def register_work(self):
-        self.accepting_work = True
-        self.executor_worker.registerFunction("executor:execute")
+        if self._running:
+            self.accepting_work = True
+            self.executor_worker.registerFunction("executor:execute")
 
     def unregister_work(self):
         self.accepting_work = False
@@ -1641,26 +1685,57 @@
     def stop(self):
         self.log.debug("Stopping")
         self.disk_accountant.stop()
+        # The governor can change function registration, so make sure
+        # it has stopped.
         self.governor_stop_event.set()
-        self._running = False
-        self._command_running = False
+        self.governor_thread.join()
+        # Stop accepting new jobs
+        self.merger_worker.setFunctions([])
+        self.executor_worker.setFunctions([])
+        # Tell the executor worker to abort any jobs it just accepted,
+        # and grab the list of currently running job workers.
+        with self.run_lock:
+            self._running = False
+            self._command_running = False
+            workers = list(self.job_workers.values())
         self.command_socket.stop()
-        self.update_queue.put(None)
 
-        for job_worker in list(self.job_workers.values()):
+        for job_worker in workers:
             try:
                 job_worker.stop()
             except Exception:
                 self.log.exception("Exception sending stop command "
                                    "to worker:")
+        for job_worker in workers:
+            try:
+                job_worker.wait()
+            except Exception:
+                self.log.exception("Exception waiting for worker "
+                                   "to stop:")
+
+        # Now that we aren't accepting any new jobs, and all of the
+        # running jobs have stopped, tell the update processor to
+        # stop.
+        self.update_queue.put(None)
+
+        # All job results should have been sent by now, shutdown the
+        # gearman workers.
         self.merger_worker.shutdown()
         self.executor_worker.shutdown()
+
         if self.statsd:
             base_key = 'zuul.executor.%s' % self.hostname
             self.statsd.gauge(base_key + '.load_average', 0)
             self.statsd.gauge(base_key + '.running_builds', 0)
+
         self.log.debug("Stopped")
 
+    def join(self):
+        self.governor_thread.join()
+        self.update_thread.join()
+        self.merger_thread.join()
+        self.executor_thread.join()
+
     def pause(self):
         # TODOv3: implement
         pass
@@ -1685,12 +1760,6 @@
     def nokeep(self):
         self.keep_jobdir = False
 
-    def join(self):
-        self.update_thread.join()
-        self.merger_thread.join()
-        self.executor_thread.join()
-        self.governor_thread.join()
-
     def runCommand(self):
         while self._command_running:
             try:
@@ -1701,10 +1770,12 @@
                 self.log.exception("Exception while processing command")
 
     def _updateLoop(self):
-        while self._running:
+        while True:
             try:
                 self._innerUpdateLoop()
-            except:
+            except StopException:
+                return
+            except Exception:
                 self.log.exception("Exception in update thread:")
 
     def _innerUpdateLoop(self):
@@ -1712,7 +1783,7 @@
         task = self.update_queue.get()
         if task is None:
             # We are asked to stop
-            return
+            raise StopException()
         with self.merger_lock:
             self.log.info("Updating repo %s/%s" % (
                 task.connection_name, task.project_name))
@@ -1733,18 +1804,7 @@
             try:
                 job = self.merger_worker.getJob()
                 try:
-                    if job.name == 'merger:cat':
-                        self.log.debug("Got cat job: %s" % job.unique)
-                        self.cat(job)
-                    elif job.name == 'merger:merge':
-                        self.log.debug("Got merge job: %s" % job.unique)
-                        self.merge(job)
-                    elif job.name == 'merger:refstate':
-                        self.log.debug("Got refstate job: %s" % job.unique)
-                        self.refstate(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
+                    self.mergerJobDispatch(job)
                 except Exception:
                     self.log.exception("Exception while running job")
                     job.sendWorkException(
@@ -1754,21 +1814,28 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
+    def mergerJobDispatch(self, job):
+        with self.run_lock:
+            if job.name == 'merger:cat':
+                self.log.debug("Got cat job: %s" % job.unique)
+                self.cat(job)
+            elif job.name == 'merger:merge':
+                self.log.debug("Got merge job: %s" % job.unique)
+                self.merge(job)
+            elif job.name == 'merger:refstate':
+                self.log.debug("Got refstate job: %s" % job.unique)
+                self.refstate(job)
+            else:
+                self.log.error("Unable to handle job %s" % job.name)
+                job.sendWorkFail()
+
     def run_executor(self):
         self.log.debug("Starting executor listener")
         while self._running:
             try:
                 job = self.executor_worker.getJob()
                 try:
-                    if job.name == 'executor:execute':
-                        self.log.debug("Got execute job: %s" % job.unique)
-                        self.executeJob(job)
-                    elif job.name.startswith('executor:stop'):
-                        self.log.debug("Got stop job: %s" % job.unique)
-                        self.stopJob(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
+                    self.executorJobDispatch(job)
                 except Exception:
                     self.log.exception("Exception while running job")
                     job.sendWorkException(
@@ -1778,9 +1845,20 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
-    def run_governor(self):
-        while not self.governor_stop_event.wait(30):
-            self.manageLoad()
+    def executorJobDispatch(self, job):
+        with self.run_lock:
+            if not self._running:
+                job.sendWorkFail()
+                return
+            if job.name == 'executor:execute':
+                self.log.debug("Got execute job: %s" % job.unique)
+                self.executeJob(job)
+            elif job.name.startswith('executor:stop'):
+                self.log.debug("Got stop job: %s" % job.unique)
+                self.stopJob(job)
+            else:
+                self.log.error("Unable to handle job %s" % job.name)
+                job.sendWorkFail()
 
     def executeJob(self, job):
         if self.statsd:
@@ -1789,6 +1867,10 @@
         self.job_workers[job.unique] = self._job_class(self, job)
         self.job_workers[job.unique].run()
 
+    def run_governor(self):
+        while not self.governor_stop_event.wait(30):
+            self.manageLoad()
+
     def manageLoad(self):
         ''' Apply some heuristics to decide whether or not we should
             be askign for more jobs '''
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5924f8..026763b 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -595,8 +595,6 @@
 
             self._reenqueueTenant(old_tenant, tenant)
 
-        # TODOv3(jeblair): update for tenants
-        # self.maintainConnectionCache()
         self.connections.reconfigureDrivers(tenant)
 
         # TODOv3(jeblair): remove postconfig calls?
@@ -728,23 +726,6 @@
             finally:
                 self.run_handler_lock.release()
 
-    def maintainConnectionCache(self):
-        # TODOv3(jeblair): update for tenants
-        relevant = set()
-        for tenant in self.abide.tenants.values():
-            for pipeline in tenant.layout.pipelines.values():
-                self.log.debug("Gather relevant cache items for: %s" %
-                               pipeline)
-
-                for item in pipeline.getAllItems():
-                    relevant.add(item.change)
-                    relevant.update(item.change.getRelatedChanges())
-        for connection in self.connections.values():
-            connection.maintainCache(relevant)
-            self.log.debug(
-                "End maintain connection cache for: %s" % connection)
-        self.log.debug("Connection cache size: %s" % len(relevant))
-
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
         event = self.trigger_event_queue.get()