Merge "Handle non-syntax errors from Ansible" into feature/zuulv3
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request.id)
nodeset = request.nodeset
for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)
+
+ def test_accept_nodes_resubmitted(self):
+ # Test that a resubmitted request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Accept the nodes, passing a different ID
+ self.nodepool.acceptNodes(request, "invalid")
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
+
+ def test_accept_nodes_lost_request(self):
+ # Test that a lost request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ self.zk.deleteNodeRequest(request)
+
+ # Accept the nodes
+ self.nodepool.acceptNodes(request, request.id)
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 1c633ba..78524f2 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -17,6 +17,8 @@
import json
import os
import textwrap
+import gc
+from unittest import skip
import testtools
@@ -170,6 +172,39 @@
self.assertIn('tenant-one-gate', A.messages[1],
"A should transit tenant-one gate")
+ @skip("This test is useful, but not reliable")
+ def test_full_and_dynamic_reconfig(self):
+ self.executor_server.hold_jobs_in_build = True
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project
+ tenant-one-gate:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ gc.collect()
+ pipelines = [obj for obj in gc.get_objects()
+ if isinstance(obj, zuul.model.Pipeline)]
+ self.assertEqual(len(pipelines), 4)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
def test_dynamic_config(self):
in_repo_conf = textwrap.dedent(
"""
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0d53fc8..740e29f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -430,27 +430,39 @@
import zuul.configloader
loader = zuul.configloader.ConfigLoader()
+ self.log.debug("Loading dynamic layout")
+ (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
build_set = item.current_build_set
try:
# First parse the config as it will land with the
# full set of config and project repos. This lets us
# catch syntax errors in config repos even though we won't
# actually run with that config.
- self.log.debug("Loading dynamic layout (phase 1)")
- loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=True,
- scheduler=self.sched,
- connections=self.sched.connections)
+ if trusted_updates:
+ self.log.debug("Loading dynamic layout (phase 1)")
+ loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=True,
+ scheduler=self.sched,
+ connections=self.sched.connections)
# Then create the config a second time but without changes
# to config repos so that we actually use this config.
- self.log.debug("Loading dynamic layout (phase 2)")
- layout = loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=False)
+ if untrusted_updates:
+ self.log.debug("Loading dynamic layout (phase 2)")
+ layout = loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=False)
+ else:
+ # We're a change to a config repo (with no untrusted
+ # items ahead), so just use the most recently
+ # generated layout.
+ if item.item_ahead:
+ return item.item_ahead.layout
+ else:
+ return item.queue.pipeline.layout
self.log.debug("Loading dynamic layout complete")
except zuul.configloader.ConfigurationSyntaxError as e:
self.log.info("Configuration syntax error "
diff --git a/zuul/model.py b/zuul/model.py
index c95a169..6eebbfb 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1157,7 +1157,6 @@
self.start_time = None
self.end_time = None
self.estimated_time = None
- self.pipeline = None
self.canceled = False
self.retry = False
self.parameters = {}
@@ -1169,6 +1168,10 @@
return ('<Build %s of %s on %s>' %
(self.uuid, self.job.name, self.worker))
+ @property
+ def pipeline(self):
+ return self.build_set.item.pipeline
+
def getSafeAttributes(self):
return Attributes(uuid=self.uuid,
result=self.result,
@@ -1432,7 +1435,6 @@
def addBuild(self, build):
self.current_build_set.addBuild(build)
- build.pipeline = self.pipeline
def removeBuild(self, build):
self.current_build_set.removeBuild(build)
@@ -1518,6 +1520,25 @@
def wasDequeuedNeedingChange(self):
return self.dequeued_needing_change
+ def includesConfigUpdates(self):
+ includes_trusted = False
+ includes_untrusted = False
+ tenant = self.pipeline.layout.tenant
+ item = self
+ while item:
+ if item.change.updatesConfig():
+ (trusted, project) = tenant.getProject(
+ item.change.project.canonical_name)
+ if trusted:
+ includes_trusted = True
+ else:
+ includes_untrusted = True
+ if includes_trusted and includes_untrusted:
+ # We're done early
+ return (includes_trusted, includes_untrusted)
+ item = item.item_ahead
+ return (includes_trusted, includes_untrusted)
+
def isHoldingFollowingChanges(self):
if not self.live:
return False
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
except Exception:
self.log.exception("Error unlocking node:")
- def lockNodeSet(self, nodeset):
- self._lockNodes(nodeset.getNodes())
+ def lockNodeSet(self, nodeset, request_id):
+ self._lockNodes(nodeset.getNodes(), request_id)
- def _lockNodes(self, nodes):
+ def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
+ if node.allocated_to != request_id:
+ raise Exception("Node %s allocated to %s, not %s" %
+ (node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node, timeout=30)
locked_nodes.append(node)
@@ -141,7 +144,12 @@
del self.requests[request.uid]
return False
- if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ # TODOv3(jeblair): handle allocation failure
+ if deleted:
+ self.log.debug("Resubmitting lost node request %s" % (request,))
+ request.id = None
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
self.log.info("Node request %s %s" % (request, request.state))
# Give our results to the scheduler.
@@ -150,18 +158,29 @@
# Stop watching this request node.
return False
- # TODOv3(jeblair): handle allocation failure
- elif deleted:
- self.log.debug("Resubmitting lost node request %s" % (request,))
- self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
return True
- def acceptNodes(self, request):
+ def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use.
self.log.info("Accepting node request %s" % (request,))
+ if request_id != request.id:
+ self.log.info("Skipping node accept for %s (resubmitted as %s)",
+ request_id, request.id)
+ return
+
+ # Make sure the request still exists. It's possible it could have
+ # disappeared if we lost the ZK session between when the fulfillment
+ # response was added to our queue, and when we actually get around to
+ # processing it. Nodepool will automatically reallocate the assigned
+ # nodes in that situation.
+ if not self.sched.zk.nodeRequestExists(request):
+ self.log.info("Request %s no longer exists", request.id)
+ return
+
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
try:
- self.lockNodeSet(request.nodeset)
+ self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..ab147ba 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
def __init__(self, request):
self.request = request
+ self.request_id = request.id
def toList(item):
@@ -889,9 +890,10 @@
def _doNodesProvisionedEvent(self, event):
request = event.request
+ request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request_id)
if request.canceled:
return
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..2fca749 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -160,7 +160,6 @@
def callback(data, stat):
if data:
data = self._strToDict(data)
- node_request.updateFromDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
node_data = self._strToDict(node_data)
request_nodes[i].id = nodeid
request_nodes[i].updateFromDict(node_data)
+ node_request.updateFromDict(data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
@@ -187,6 +187,19 @@
except kze.NoNodeError:
pass
+ def nodeRequestExists(self, node_request):
+ '''
+ See if a NodeRequest exists in ZooKeeper.
+
+ :param NodeRequest node_request: A NodeRequest to verify.
+
+ :returns: True if the request exists, False otherwise.
+ '''
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ if self.client.exists(path):
+ return True
+ return False
+
def storeNode(self, node):
'''Store the node.