Merge "Provide error message on malformed job list" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index ff1a523..c081235 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
name: openstack-infra/zuul
infra-check:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
irrelevant-files:
- zuul/cmd/migrate.py
- playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
- playbooks/zuul-migrate/.*
infra-gate:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
+ irrelevant-files:
+ - zuul/cmd/migrate.py
+ - playbooks/zuul-migrate/.*
- tox-pep8
- tox-py35:
irrelevant-files:
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 5e7e0e1..464cb60 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -265,6 +265,22 @@
Directory in which Zuul should clone git repositories.
+ .. attr:: git_http_low_speed_limit
+ :default: 1000
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in bytes, setting to 0 will disable.
+
+ .. attr:: git_http_low_speed_time
+ :default: 30
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in seconds, setting to 0 will disable.
+
.. attr:: git_user_email
Value to pass to `git config user.email
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..67be273 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -49,7 +49,7 @@
msg='.git file in submodule should be a file')
work_repo = Repo(parent_path, self.workspace_root,
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(
os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +60,7 @@
sub_repo = Repo(
os.path.join(self.upstream_root, 'org/project2'),
os.path.join(self.workspace_root, 'subdir'),
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(os.path.exists(
os.path.join(self.workspace_root, 'subdir', '.git')),
msg='Cloned over the submodule placeholder')
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request.id)
nodeset = request.nodeset
for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)
+
+ def test_accept_nodes_resubmitted(self):
+ # Test that a resubmitted request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Accept the nodes, passing a different ID
+ self.nodepool.acceptNodes(request, "invalid")
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
+
+ def test_accept_nodes_lost_request(self):
+ # Test that a lost request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ self.zk.deleteNodeRequest(request)
+
+ # Accept the nodes
+ self.nodepool.acceptNodes(request, request.id)
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 68ea74f..4b0bfc5 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -17,6 +17,8 @@
import json
import os
import textwrap
+import gc
+from unittest import skip
import testtools
@@ -170,6 +172,39 @@
self.assertIn('tenant-one-gate', A.messages[1],
"A should transit tenant-one gate")
+ @skip("This test is useful, but not reliable")
+ def test_full_and_dynamic_reconfig(self):
+ self.executor_server.hold_jobs_in_build = True
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project
+ tenant-one-gate:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ gc.collect()
+ pipelines = [obj for obj in gc.get_objects()
+ if isinstance(obj, zuul.model.Pipeline)]
+ self.assertEqual(len(pipelines), 4)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
def test_dynamic_config(self):
in_repo_conf = textwrap.dedent(
"""
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 82921fb..28ac5a5 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -562,6 +562,10 @@
'disk_limit_per_job', 250))
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+ self.merge_speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ self.merge_speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
execution_wrapper_name = get_default(self.config, 'executor',
'execution_wrapper', 'bubblewrap')
load_multiplier = float(get_default(self.config, 'executor',
@@ -618,9 +622,9 @@
cache_root = self.merge_root
else:
cache_root = None
- return zuul.merger.merger.Merger(root, self.connections,
- self.merge_email, self.merge_name,
- cache_root, logger)
+ return zuul.merger.merger.Merger(
+ root, self.connections, self.merge_email, self.merge_name,
+ self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
def start(self):
self._running = True
@@ -1693,6 +1697,13 @@
elif ret == -9:
# Received abort request.
return (self.RESULT_ABORTED, None)
+ elif ret == 1:
+ if syntax_buffer[0].startswith('ERROR!'):
+ with open(self.jobdir.job_output_file, 'a') as job_output:
+ for line in syntax_buffer:
+ job_output.write("{now} | {line}\n".format(
+ now=datetime.datetime.now(),
+ line=line.decode('utf-8').rstrip()))
elif ret == 4:
# Ansible could not parse the yaml.
self.log.debug("Ansible parse error")
@@ -1759,15 +1770,15 @@
if result is not None:
result = self.RESULT_MAP[result]
msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
- msg.format(phase=phase, step=step, result=result,
- trusted=trusted, playbook=playbook, branch=branch)
+ msg = msg.format(phase=phase, step=step, result=result,
+ trusted=trusted, playbook=playbook, branch=branch)
else:
msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
- msg.format(phase=phase, step=step, trusted=trusted,
- playbook=playbook, branch=branch)
+ msg = msg.format(phase=phase, step=step, trusted=trusted,
+ playbook=playbook, branch=branch)
with open(self.jobdir.job_output_file, 'a') as job_output:
- job_output.write("{now} | {msg}".format(
+ job_output.write("{now} | {msg}\n".format(
now=datetime.datetime.now(),
msg=msg))
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0d53fc8..740e29f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -430,27 +430,39 @@
import zuul.configloader
loader = zuul.configloader.ConfigLoader()
+ self.log.debug("Loading dynamic layout")
+ (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
build_set = item.current_build_set
try:
# First parse the config as it will land with the
# full set of config and project repos. This lets us
# catch syntax errors in config repos even though we won't
# actually run with that config.
- self.log.debug("Loading dynamic layout (phase 1)")
- loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=True,
- scheduler=self.sched,
- connections=self.sched.connections)
+ if trusted_updates:
+ self.log.debug("Loading dynamic layout (phase 1)")
+ loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=True,
+ scheduler=self.sched,
+ connections=self.sched.connections)
# Then create the config a second time but without changes
# to config repos so that we actually use this config.
- self.log.debug("Loading dynamic layout (phase 2)")
- layout = loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=False)
+ if untrusted_updates:
+ self.log.debug("Loading dynamic layout (phase 2)")
+ layout = loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=False)
+ else:
+ # We're a change to a config repo (with no untrusted
+ # items ahead), so just use the most recently
+ # generated layout.
+ if item.item_ahead:
+ return item.item_ahead.layout
+ else:
+ return item.queue.pipeline.layout
self.log.debug("Loading dynamic layout complete")
except zuul.configloader.ConfigurationSyntaxError as e:
self.log.info("Configuration syntax error "
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 7b732c7..fbacbee 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -44,16 +44,15 @@
class Repo(object):
- def __init__(self, remote, local, email, username, sshkey=None,
- cache_path=None, logger=None):
+ def __init__(self, remote, local, email, username, speed_limit, speed_time,
+ sshkey=None, cache_path=None, logger=None):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
self.log = logger
- # TODO(pabelanger): Expose to user via zuul.conf.
self.env = {
- 'GIT_HTTP_LOW_SPEED_LIMIT': '1000',
- 'GIT_HTTP_LOW_SPEED_TIME': '30',
+ 'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+ 'GIT_HTTP_LOW_SPEED_TIME': speed_time,
}
if sshkey:
self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
@@ -297,7 +296,7 @@
class Merger(object):
def __init__(self, working_root, connections, email, username,
- cache_root=None, logger=None):
+ speed_limit, speed_time, cache_root=None, logger=None):
self.logger = logger
if logger is None:
self.log = logging.getLogger("zuul.Merger")
@@ -310,6 +309,8 @@
self.connections = connections
self.email = email
self.username = username
+ self.speed_limit = speed_limit
+ self.speed_time = speed_time
self.cache_root = cache_root
def _addProject(self, hostname, project_name, url, sshkey):
@@ -322,8 +323,9 @@
project_name)
else:
cache_path = None
- repo = Repo(url, path, self.email, self.username,
- sshkey, cache_path, self.logger)
+ repo = Repo(
+ url, path, self.email, self.username, self.speed_limit,
+ self.speed_time, sshkey, cache_path, self.logger)
self.repos[key] = repo
except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
'/var/lib/zuul/merger-git')
merge_email = get_default(self.config, 'merger', 'git_user_email')
merge_name = get_default(self.config, 'merger', 'git_user_name')
-
- self.merger = merger.Merger(merge_root, connections, merge_email,
- merge_name)
+ speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ self.merger = merger.Merger(
+ merge_root, connections, merge_email, merge_name, speed_limit,
+ speed_time)
def start(self):
self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index c95a169..6eebbfb 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1157,7 +1157,6 @@
self.start_time = None
self.end_time = None
self.estimated_time = None
- self.pipeline = None
self.canceled = False
self.retry = False
self.parameters = {}
@@ -1169,6 +1168,10 @@
return ('<Build %s of %s on %s>' %
(self.uuid, self.job.name, self.worker))
+ @property
+ def pipeline(self):
+ return self.build_set.item.pipeline
+
def getSafeAttributes(self):
return Attributes(uuid=self.uuid,
result=self.result,
@@ -1432,7 +1435,6 @@
def addBuild(self, build):
self.current_build_set.addBuild(build)
- build.pipeline = self.pipeline
def removeBuild(self, build):
self.current_build_set.removeBuild(build)
@@ -1518,6 +1520,25 @@
def wasDequeuedNeedingChange(self):
return self.dequeued_needing_change
+ def includesConfigUpdates(self):
+ includes_trusted = False
+ includes_untrusted = False
+ tenant = self.pipeline.layout.tenant
+ item = self
+ while item:
+ if item.change.updatesConfig():
+ (trusted, project) = tenant.getProject(
+ item.change.project.canonical_name)
+ if trusted:
+ includes_trusted = True
+ else:
+ includes_untrusted = True
+ if includes_trusted and includes_untrusted:
+ # We're done early
+ return (includes_trusted, includes_untrusted)
+ item = item.item_ahead
+ return (includes_trusted, includes_untrusted)
+
def isHoldingFollowingChanges(self):
if not self.live:
return False
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
except Exception:
self.log.exception("Error unlocking node:")
- def lockNodeSet(self, nodeset):
- self._lockNodes(nodeset.getNodes())
+ def lockNodeSet(self, nodeset, request_id):
+ self._lockNodes(nodeset.getNodes(), request_id)
- def _lockNodes(self, nodes):
+ def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
+ if node.allocated_to != request_id:
+ raise Exception("Node %s allocated to %s, not %s" %
+ (node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node, timeout=30)
locked_nodes.append(node)
@@ -141,7 +144,12 @@
del self.requests[request.uid]
return False
- if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ # TODOv3(jeblair): handle allocation failure
+ if deleted:
+ self.log.debug("Resubmitting lost node request %s" % (request,))
+ request.id = None
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
self.log.info("Node request %s %s" % (request, request.state))
# Give our results to the scheduler.
@@ -150,18 +158,29 @@
# Stop watching this request node.
return False
- # TODOv3(jeblair): handle allocation failure
- elif deleted:
- self.log.debug("Resubmitting lost node request %s" % (request,))
- self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
return True
- def acceptNodes(self, request):
+ def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use.
self.log.info("Accepting node request %s" % (request,))
+ if request_id != request.id:
+ self.log.info("Skipping node accept for %s (resubmitted as %s)",
+ request_id, request.id)
+ return
+
+ # Make sure the request still exists. It's possible it could have
+ # disappeared if we lost the ZK session between when the fulfillment
+ # response was added to our queue, and when we actually get around to
+ # processing it. Nodepool will automatically reallocate the assigned
+ # nodes in that situation.
+ if not self.sched.zk.nodeRequestExists(request):
+ self.log.info("Request %s no longer exists", request.id)
+ return
+
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
try:
- self.lockNodeSet(request.nodeset)
+ self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..ab147ba 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
def __init__(self, request):
self.request = request
+ self.request_id = request.id
def toList(item):
@@ -889,9 +890,10 @@
def _doNodesProvisionedEvent(self, event):
request = event.request
+ request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request_id)
if request.canceled:
return
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..2fca749 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -160,7 +160,6 @@
def callback(data, stat):
if data:
data = self._strToDict(data)
- node_request.updateFromDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
node_data = self._strToDict(node_data)
request_nodes[i].id = nodeid
request_nodes[i].updateFromDict(node_data)
+ node_request.updateFromDict(data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
@@ -187,6 +187,19 @@
except kze.NoNodeError:
pass
+ def nodeRequestExists(self, node_request):
+ '''
+ See if a NodeRequest exists in ZooKeeper.
+
+ :param NodeRequest node_request: A NodeRequest to verify.
+
+ :returns: True if the request exists, False otherwise.
+ '''
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ if self.client.exists(path):
+ return True
+ return False
+
def storeNode(self, node):
'''Store the node.