Merge "New client command for printing autohold requests" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index 035ff0c..797bfe6 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1392,12 +1392,12 @@
self.recordResult(result)
return result
- def runAnsible(self, cmd, timeout, playbook):
+ def runAnsible(self, cmd, timeout, playbook, wrapped=True):
build = self.executor_server.job_builds[self.job.unique]
if self.executor_server._run_ansible:
result = super(RecordingAnsibleJob, self).runAnsible(
- cmd, timeout, playbook)
+ cmd, timeout, playbook, wrapped)
else:
if playbook.path:
result = build.run()
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6efc43f..9090421 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,7 +875,8 @@
# already (without approvals), we need to clear the cache
# first.
for connection in self.connections.connections.values():
- connection.maintainCache([])
+ if hasattr(connection, '_change_cache'):
+ connection._change_cache.clear()
self.executor_server.hold_jobs_in_build = True
A.addApproval('Approved', 1)
@@ -945,7 +946,8 @@
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
- self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
+ self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
+ 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@@ -3931,7 +3933,8 @@
self.assertEqual(B.data['status'], 'NEW')
for connection in self.connections.connections.values():
- connection.maintainCache([])
+ if hasattr(connection, '_change_cache'):
+ connection._change_cache.clear()
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)
diff --git a/zuul/ansible/filter/zuul_filters.py b/zuul/ansible/filter/zuul_filters.py
index de25548..4092c06 100644
--- a/zuul/ansible/filter/zuul_filters.py
+++ b/zuul/ansible/filter/zuul_filters.py
@@ -36,7 +36,7 @@
['%s:%s:refs/changes/%s/%s/%s' % (
i['project']['name'],
i['branch'],
- str(i['change'])[:-2:],
+ str(i['change'])[-2:],
i['change'],
i['patchset'])
for i in zuul['items']])
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index b44fa46..ca10f21 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,13 +68,6 @@
def registerScheduler(self, sched):
self.sched = sched
- def maintainCache(self, relevant):
- """Make cache contain relevant changes.
-
- This lets the user supply a list of change objects that are
- still in use. Anything in our cache that isn't in the supplied
- list should be safe to remove from the cache."""
-
def registerWebapp(self, webapp):
self.webapp = webapp
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 83871e3..343c305 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,9 +23,10 @@
import pprint
import shlex
import queue
-import voluptuous as v
+import weakref
from typing import Dict, List
+import voluptuous as v
from zuul.connection import BaseConnection
from zuul.model import Ref, Tag, Branch, Project
@@ -142,7 +143,8 @@
# cache as it may be a dependency
if event.change_number:
refresh = True
- if event.change_number not in self.connection._change_cache:
+ if ((event.change_number, event.patch_number) not in
+ self.connection._change_cache):
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -300,7 +302,7 @@
self.baseurl = self.connection_config.get('baseurl',
'https://%s' % self.server)
- self._change_cache = {}
+ self._change_cache = weakref.WeakValueDictionary()
self.projects = {}
self.gerrit_event_connector = None
self.source = driver.getSource(self)
@@ -311,22 +313,6 @@
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
- def maintainCache(self, relevant):
- # This lets the user supply a list of change objects that are
- # still in use. Anything in our cache that isn't in the supplied
- # list should be safe to remove from the cache.
- remove = {}
- for change_number, patchsets in self._change_cache.items():
- for patchset, change in patchsets.items():
- if change not in relevant:
- remove.setdefault(change_number, [])
- remove[change_number].append(patchset)
- for change_number, patchsets in remove.items():
- for patchset in patchsets:
- del self._change_cache[change_number][patchset]
- if not self._change_cache[change_number]:
- del self._change_cache[change_number]
-
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
@@ -371,22 +357,19 @@
return change
def _getChange(self, number, patchset, refresh=False, history=None):
- change = self._change_cache.get(number, {}).get(patchset)
+ change = self._change_cache.get((number, patchset))
if change and not refresh:
return change
if not change:
change = GerritChange(None)
change.number = number
change.patchset = patchset
- self._change_cache.setdefault(change.number, {})
- self._change_cache[change.number][change.patchset] = change
+ self._change_cache[(change.number, change.patchset)] = change
try:
self._updateChange(change, history)
except Exception:
- if self._change_cache.get(change.number, {}).get(change.patchset):
- del self._change_cache[change.number][change.patchset]
- if not self._change_cache[change.number]:
- del self._change_cache[change.number]
+ if self._change_cache.get((change.number, change.patchset)):
+ del self._change_cache[(change.number, change.patchset)]
raise
return change
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 3d0eb37..46c8ee5 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,6 +21,7 @@
import threading
import time
import re
+import weakref
import cachecontrol
from cachecontrol.cache import DictCache
@@ -394,7 +395,7 @@
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
- self._change_cache = {}
+ self._change_cache = weakref.WeakValueDictionary()
self._project_branch_cache = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
@@ -567,11 +568,6 @@
# authenticated, if not then anonymous is the best we have.
return self._github
- def maintainCache(self, relevant):
- for key, change in self._change_cache.items():
- if change not in relevant:
- del self._change_cache[key]
-
def getChange(self, event, refresh=False):
"""Get the change representing an event."""
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index d4e6736..5998922 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -47,6 +47,11 @@
DEFAULT_FINGER_PORT = 79
+class StopException(Exception):
+ """An exception raised when an inner loop is asked to stop."""
+ pass
+
+
class ExecutorError(Exception):
"""A non-transient run-time executor error
@@ -286,6 +291,7 @@
# inventory.yaml
# .ansible (mounted in bwrap read-write)
# fact-cache/localhost
+ # cp
# playbook_0 (mounted in bwrap for each playbook read-only)
# secrets.yaml
# project -> ../trusted/project_0/...
@@ -326,6 +332,8 @@
self.ansible_cache_root = os.path.join(self.root, '.ansible')
self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
os.makedirs(self.fact_cache)
+ self.control_path = os.path.join(self.ansible_cache_root, 'cp')
+ os.makedirs(self.control_path)
localhost_facts = os.path.join(self.fact_cache, 'localhost')
# NOTE(pabelanger): We do not want to leak zuul-executor facts to other
# playbooks now that smart fact gathering is enabled by default. We
@@ -555,6 +563,8 @@
self.aborted = True
self.aborted_reason = reason
self.abortRunningProc()
+
+ def wait(self):
if self.thread:
self.thread.join()
@@ -701,6 +711,11 @@
self.job.sendWorkStatus(0, 100)
result = self.runPlaybooks(args)
+
+ # Stop the persistent SSH connections.
+ setup_status, setup_code = self.runAnsibleCleanup(
+ self.jobdir.setup_playbook)
+
if self.aborted_reason == self.RESULT_DISK_FULL:
result = 'DISK_FULL'
data = self.getResultData()
@@ -1198,6 +1213,7 @@
# command which expects interactive input on a tty (such
# as sudo) it does not hang.
config.write('pipelining = True\n')
+ config.write('control_path_dir = %s\n' % self.jobdir.control_path)
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
@@ -1219,7 +1235,7 @@
except Exception:
self.log.exception("Exception while killing ansible process:")
- def runAnsible(self, cmd, timeout, playbook):
+ def runAnsible(self, cmd, timeout, playbook, wrapped=True):
config_file = playbook.ansible_config
env_copy = os.environ.copy()
env_copy.update(self.ssh_agent.env)
@@ -1260,8 +1276,12 @@
if playbook.secrets_content:
secrets[playbook.secrets] = playbook.secrets_content
- context = self.executor_server.execution_wrapper.getExecutionContext(
- ro_paths, rw_paths, secrets)
+ if wrapped:
+ wrapper = self.executor_server.execution_wrapper
+ else:
+ wrapper = self.executor_server.connections.drivers['nullwrap']
+
+ context = wrapper.getExecutionContext(ro_paths, rw_paths, secrets)
popen = context.getPopen(
work_dir=self.jobdir.work_root,
@@ -1280,6 +1300,7 @@
self.proc = popen(
cmd,
cwd=self.jobdir.work_root,
+ stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
@@ -1376,7 +1397,28 @@
'-a', 'gather_subset=!all']
result, code = self.runAnsible(
- cmd=cmd, timeout=60, playbook=playbook)
+ cmd=cmd, timeout=60, playbook=playbook,
+ wrapped=False)
+ self.log.debug("Ansible complete, result %s code %s" % (
+ self.RESULT_MAP[result], code))
+ return result, code
+
+ def runAnsibleCleanup(self, playbook):
+ # TODO(jeblair): This requires a bugfix in Ansible 2.4
+ # Once this is used, increase the controlpersist timeout.
+ return (self.RESULT_NORMAL, 0)
+
+ if self.executor_server.verbose:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible', '*', verbose, '-m', 'meta',
+ '-a', 'reset_connection']
+
+ result, code = self.runAnsible(
+ cmd=cmd, timeout=60, playbook=playbook,
+ wrapped=False)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
return result, code
@@ -1496,6 +1538,7 @@
self.hostname = socket.gethostname()
self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock()
+ self.run_lock = threading.Lock()
self.verbose = False
self.command_map = dict(
stop=self.stop,
@@ -1631,8 +1674,9 @@
self.merger_worker.registerFunction("merger:refstate")
def register_work(self):
- self.accepting_work = True
- self.executor_worker.registerFunction("executor:execute")
+ if self._running:
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
def unregister_work(self):
self.accepting_work = False
@@ -1641,26 +1685,57 @@
def stop(self):
self.log.debug("Stopping")
self.disk_accountant.stop()
+ # The governor can change function registration, so make sure
+ # it has stopped.
self.governor_stop_event.set()
- self._running = False
- self._command_running = False
+ self.governor_thread.join()
+ # Stop accepting new jobs
+ self.merger_worker.setFunctions([])
+ self.executor_worker.setFunctions([])
+ # Tell the executor worker to abort any jobs it just accepted,
+ # and grab the list of currently running job workers.
+ with self.run_lock:
+ self._running = False
+ self._command_running = False
+ workers = list(self.job_workers.values())
self.command_socket.stop()
- self.update_queue.put(None)
- for job_worker in list(self.job_workers.values()):
+ for job_worker in workers:
try:
job_worker.stop()
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
+ for job_worker in workers:
+ try:
+ job_worker.wait()
+ except Exception:
+ self.log.exception("Exception waiting for worker "
+ "to stop:")
+
+ # Now that we aren't accepting any new jobs, and all of the
+ # running jobs have stopped, tell the update processor to
+ # stop.
+ self.update_queue.put(None)
+
+ # All job results should have been sent by now, shutdown the
+ # gearman workers.
self.merger_worker.shutdown()
self.executor_worker.shutdown()
+
if self.statsd:
base_key = 'zuul.executor.%s' % self.hostname
self.statsd.gauge(base_key + '.load_average', 0)
self.statsd.gauge(base_key + '.running_builds', 0)
+
self.log.debug("Stopped")
+ def join(self):
+ self.governor_thread.join()
+ self.update_thread.join()
+ self.merger_thread.join()
+ self.executor_thread.join()
+
def pause(self):
# TODOv3: implement
pass
@@ -1685,12 +1760,6 @@
def nokeep(self):
self.keep_jobdir = False
- def join(self):
- self.update_thread.join()
- self.merger_thread.join()
- self.executor_thread.join()
- self.governor_thread.join()
-
def runCommand(self):
while self._command_running:
try:
@@ -1701,10 +1770,12 @@
self.log.exception("Exception while processing command")
def _updateLoop(self):
- while self._running:
+ while True:
try:
self._innerUpdateLoop()
- except:
+ except StopException:
+ return
+ except Exception:
self.log.exception("Exception in update thread:")
def _innerUpdateLoop(self):
@@ -1712,7 +1783,7 @@
task = self.update_queue.get()
if task is None:
# We are asked to stop
- return
+ raise StopException()
with self.merger_lock:
self.log.info("Updating repo %s/%s" % (
task.connection_name, task.project_name))
@@ -1733,18 +1804,7 @@
try:
job = self.merger_worker.getJob()
try:
- if job.name == 'merger:cat':
- self.log.debug("Got cat job: %s" % job.unique)
- self.cat(job)
- elif job.name == 'merger:merge':
- self.log.debug("Got merge job: %s" % job.unique)
- self.merge(job)
- elif job.name == 'merger:refstate':
- self.log.debug("Got refstate job: %s" % job.unique)
- self.refstate(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
+ self.mergerJobDispatch(job)
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(
@@ -1754,21 +1814,28 @@
except Exception:
self.log.exception("Exception while getting job")
+ def mergerJobDispatch(self, job):
+ with self.run_lock:
+ if job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ elif job.name == 'merger:merge':
+ self.log.debug("Got merge job: %s" % job.unique)
+ self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+
def run_executor(self):
self.log.debug("Starting executor listener")
while self._running:
try:
job = self.executor_worker.getJob()
try:
- if job.name == 'executor:execute':
- self.log.debug("Got execute job: %s" % job.unique)
- self.executeJob(job)
- elif job.name.startswith('executor:stop'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
+ self.executorJobDispatch(job)
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(
@@ -1778,9 +1845,20 @@
except Exception:
self.log.exception("Exception while getting job")
- def run_governor(self):
- while not self.governor_stop_event.wait(30):
- self.manageLoad()
+ def executorJobDispatch(self, job):
+ with self.run_lock:
+ if not self._running:
+ job.sendWorkFail()
+ return
+ if job.name == 'executor:execute':
+ self.log.debug("Got execute job: %s" % job.unique)
+ self.executeJob(job)
+ elif job.name.startswith('executor:stop'):
+ self.log.debug("Got stop job: %s" % job.unique)
+ self.stopJob(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
def executeJob(self, job):
if self.statsd:
@@ -1789,6 +1867,10 @@
self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[job.unique].run()
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ self.manageLoad()
+
def manageLoad(self):
''' Apply some heuristics to decide whether or not we should
be askign for more jobs '''
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5924f8..026763b 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -595,8 +595,6 @@
self._reenqueueTenant(old_tenant, tenant)
- # TODOv3(jeblair): update for tenants
- # self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@@ -728,23 +726,6 @@
finally:
self.run_handler_lock.release()
- def maintainConnectionCache(self):
- # TODOv3(jeblair): update for tenants
- relevant = set()
- for tenant in self.abide.tenants.values():
- for pipeline in tenant.layout.pipelines.values():
- self.log.debug("Gather relevant cache items for: %s" %
- pipeline)
-
- for item in pipeline.getAllItems():
- relevant.add(item.change)
- relevant.update(item.change.getRelatedChanges())
- for connection in self.connections.values():
- connection.maintainCache(relevant)
- self.log.debug(
- "End maintain connection cache for: %s" % connection)
- self.log.debug("Connection cache size: %s" % len(relevant))
-
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()