Merge "Add POST_FAILURE status"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
[DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
diff --git a/tests/base.py b/tests/base.py
index 3c28a72..5b31eea 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -27,6 +27,7 @@
import re
import select
import shutil
+from six.moves import reload_module
import socket
import string
import subprocess
@@ -861,6 +862,28 @@
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
+ # NOTE(notmorgan): Extract logging overrides for specific libraries
+ # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+ # each. This is used to limit the output during test runs from
+ # libraries that zuul depends on such as gear.
+ log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+ if log_defaults_from_env:
+ for default in log_defaults_from_env.split(','):
+ try:
+ name, level_str = default.split('=', 1)
+ level = getattr(logging, level_str, logging.DEBUG)
+ self.useFixture(fixtures.FakeLogger(
+ name=name,
+ level=level,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
+ except ValueError:
+ # NOTE(notmorgan): Invalid format of the log default,
+ # skip and don't try and apply a logger for the
+ # specified module
+ pass
+
class ZuulTestCase(BaseTestCase):
@@ -916,8 +939,8 @@
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
# the statsd client object is configured in the statsd module import
- reload(statsd)
- reload(zuul.scheduler)
+ reload_module(statsd)
+ reload_module(zuul.scheduler)
self.gearman_server = FakeGearmanServer()
diff --git a/tox.ini b/tox.ini
index 79ea939..a8767c2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=30
+ OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
passenv = ZUUL_TEST_ROOT
usedevelop = True
install_command = pip install {opts} {packages}
@@ -18,6 +19,8 @@
python setup.py testr --slowest --testr-args='{posargs}'
[testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
commands = flake8 {posargs}
[testenv:cover]
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 20b5600..5a38807 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -18,7 +18,6 @@
import datetime
import getpass
import os
-import re
import subprocess
import threading
@@ -47,12 +46,16 @@
if os.path.exists(fn):
with open(fn) as f:
for line in f:
- line = re.sub('#.*', '', line).strip()
if not line:
continue
+ if line[0] == '#':
+ continue
if '=' not in line:
continue
- k, v = line.split('=')
+ k, v = line.strip().split('=')
+ for q in ["'", '"']:
+ if v[0] == q:
+ v = v.strip(q)
env[k] = v
return env
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index ae1e319..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -32,7 +32,7 @@
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
- delay = 5.0
+ delay = 10.0
def __init__(self, connection):
super(GerritEventConnector, self).__init__()
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 33e00b8..4bf8da6 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -42,7 +42,7 @@
ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful']
def boolify(x):
@@ -114,16 +114,20 @@
self.termination_queue = Queue.Queue()
self.sites = {}
self.static_nodes = {}
+ self.command_map = dict(
+ reconfigure=self.reconfigure,
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ release=self.release,
+ graceful=self.graceful,
+ )
+
if config.has_option('launcher', 'accept_nodes'):
self.accept_nodes = config.getboolean('launcher',
'accept_nodes')
else:
- # TODO(jeblair): remove deprecated form of option
- if config.has_option('launcher', 'accept-nodes'):
- self.accept_nodes = config.getboolean('launcher',
- 'accept-nodes')
- else:
- self.accept_nodes = True
+ self.accept_nodes = True
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
@@ -242,8 +246,6 @@
new_functions = set()
if self.accept_nodes:
new_functions.add("node_assign:zuul")
- # TODO(jeblair): remove deprecated form
- new_functions.add("node-assign:zuul")
new_functions.add("stop:%s" % self.hostname)
new_functions.add("set_description:%s" % self.hostname)
@@ -304,6 +306,18 @@
"to worker:")
self.log.debug("Finished releasing idle nodes")
+ def graceful(self):
+ # Note: this is run in the command processing thread; no more
+ # external commands will be processed after this.
+ self.log.debug("Gracefully stopping")
+ self.pause()
+ self.release()
+ self.log.debug("Waiting for all builds to finish")
+ while self.builds:
+ time.sleep(5)
+ self.log.debug("All builds are finished")
+ self.stop()
+
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
@@ -337,16 +351,7 @@
while self._command_running:
try:
command = self.command_socket.get()
- if command == 'reconfigure':
- self.reconfigure()
- elif command == 'stop':
- self.stop()
- elif command == 'pause':
- self.pause()
- elif command == 'unpause':
- self.unpause()
- elif command == 'release':
- self.release()
+ self.command_map[command]()
except Exception:
self.log.exception("Exception while processing command")
@@ -371,10 +376,6 @@
if job.name.startswith('node_assign:'):
self.log.debug("Got node_assign job: %s" % job.unique)
self.assignNode(job)
- elif job.name.startswith('node-assign:'):
- # TODO(jeblair): remove deprecated form
- self.log.debug("Got node-assign job: %s" % job.unique)
- self.assignNode(job)
elif job.name.startswith('stop:'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
@@ -784,6 +785,12 @@
job.sendWorkStatus(0, 100)
job_status = self.runAnsiblePlaybook(jobdir, timeout)
+ if job_status == 3:
+ # AnsibleHostUnreachable: We had a network issue connecting to
+ # our zuul-worker. Rather then contiune, have zuul requeue the
+ # job.
+ return result
+
post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
if not post_status:
status = 'POST_FAILURE'
@@ -914,7 +921,8 @@
parameters)
syncargs = dict(src=src,
dest=ftpcontent,
- copy_links='yes')
+ copy_links='yes',
+ mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
@@ -991,15 +999,19 @@
inventory.write('\n')
timeout = None
+ timeout_var = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
- timeout = wrapper.get('build-timeout', {})
- if isinstance(timeout, dict):
- timeout = timeout.get('timeout')
+ build_timeout = wrapper.get('build-timeout', {})
+ if isinstance(build_timeout, dict):
+ timeout_var = build_timeout.get('timeout-var', None)
+ timeout = build_timeout.get('timeout')
if timeout:
timeout = timeout * 60
if not timeout:
timeout = ANSIBLE_DEFAULT_TIMEOUT
+ if timeout_var:
+ parameters[timeout_var] = timeout
with open(jobdir.playbook, 'w') as playbook:
tasks = []
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 3556b45..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
import json
import logging
import os
+import six
import time
import threading
from uuid import uuid4
@@ -236,7 +237,7 @@
s_config = {}
s_config.update((k, v.format(item=item, job=job,
change=item.change))
- if isinstance(v, basestring)
+ if isinstance(v, six.string_types)
else (k, v)
for k, v in s.items())
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
import os
import re
+import six
+
+
OrderedDict = extras.try_imports(['collections.OrderedDict',
'ordereddict.OrderedDict'])
@@ -59,17 +62,17 @@
raise Exception("Expansion error. Check error messages above")
self.log.info("Mapping projects to workspace...")
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
dest = os.path.normpath(os.path.join(workspace, dest[0]))
ret[project] = dest
self.log.info(" %s -> %s", project, dest)
self.log.debug("Checking overlap in destination directories...")
check = defaultdict(list)
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
check[dest].append(project)
- dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+ dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
if dupes:
raise Exception("Some projects share the same destination: %s",
dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index f0235a6..3155df6 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
import re
import yaml
+import six
+
from git import GitCommandError
from zuul.lib.clonemapper import CloneMapper
from zuul.merger.merger import Repo
@@ -62,7 +64,7 @@
dests = mapper.expand(workspace=self.workspace)
self.log.info("Preparing %s repositories", len(dests))
- for project, dest in dests.iteritems():
+ for project, dest in six.iteritems(dests):
self.prepareRepo(project, dest)
self.log.info("Prepared all repositories")
diff --git a/zuul/model.py b/zuul/model.py
index 3fb0577..542d0b6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -110,7 +110,11 @@
return job_tree
def getProjects(self):
- return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+ # cmp is not in python3, applied idiom from
+ # http://python-future.org/compatible_idioms.html#cmp
+ return sorted(
+ self.job_trees.keys(),
+ key=lambda p: p.name)
def addQueue(self, queue):
self.queues.append(queue)
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 83d119f..551dd03 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -40,11 +40,11 @@
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
+ self.worker.waitForServer()
+ self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
- self.worker.waitForServer()
- self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index dcc5f88..f08612d 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -411,7 +411,9 @@
base = os.path.dirname(os.path.realpath(config_path))
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
- execfile(fn, config_env)
+ with open(fn) as _f:
+ code = compile(_f.read(), fn, 'exec')
+ six.exec_(code, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])