Lock nodes when nodepool request is fulfilled
This is continuing work on implementing the Zuul<->Nodepool protocol
from the Zuulv3 spec.
Change-Id: Ic8477e607fd09b85a37f47cbee7da905c017c534
diff --git a/tests/base.py b/tests/base.py
index 9ee5838..9ec8e54 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -919,14 +919,45 @@
reqs.append(data)
return reqs
+ def makeNode(self, request_id, node_type):
+ now = time.time()
+ path = '/nodepool/nodes/'
+ data = dict(type=node_type,
+ provider='test-provider',
+ region='test-region',
+ az=None,
+ public_ipv4='127.0.0.1',
+ private_ipv4=None,
+ public_ipv6=None,
+ allocated_to=request_id,
+ state='ready',
+ state_time=now,
+ created_time=now,
+ updated_time=now,
+ image_id=None,
+ launcher='fake-nodepool')
+ data = json.dumps(data)
+ path = self.client.create(path, data,
+ makepath=True,
+ sequence=True)
+ nodeid = path.split("/")[-1]
+ return nodeid
+
def fulfillRequest(self, request):
if request['state'] == 'fulfilled':
return
request = request.copy()
- request['state'] = 'fulfilled'
- request['state_time'] = time.time()
oid = request['_oid']
del request['_oid']
+
+ nodes = []
+ for node in request['node_types']:
+ nodeid = self.makeNode(oid, node)
+ nodes.append(nodeid)
+
+ request['state'] = 'fulfilled'
+ request['state_time'] = time.time()
+ request['nodes'] = nodes
path = self.REQUEST_ROOT + '/' + oid
data = json.dumps(request)
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
diff --git a/tests/test_nodepool.py b/tests/test_nodepool.py
index 3fb0335..b5b9b17 100644
--- a/tests/test_nodepool.py
+++ b/tests/test_nodepool.py
@@ -70,6 +70,14 @@
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
+ # Accept the nodes
+ self.nodepool.acceptNodes(request)
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNotNone(node.lock)
+ self.assertEqual(node.state, 'ready')
+
def test_node_request_disconnect(self):
# Test that node requests are re-submitted after disconnect
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 7a4c7cc..213cfc4 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -615,6 +615,7 @@
item.setUnableToMerge()
def onNodesProvisioned(self, event):
+ # TODOv3(jeblair): handle provisioning failure here
request = event.request
build_set = request.build_set
build_set.jobNodeRequestComplete(request.job.name, request,
diff --git a/zuul/model.py b/zuul/model.py
index 66f18ea..a13fcba 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -354,10 +354,36 @@
self.name = name
self.image = image
self.id = None
+ self.lock = None
+ # Attributes from Nodepool
+ self._state = 'unknown'
+ self.state_time = time.time()
+ self.public_ipv4 = None
+ self.private_ipv4 = None
+ self.public_ipv6 = None
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ # TODOv3(jeblair): reinstate
+ # if value not in STATES:
+ # raise TypeError("'%s' is not a valid state" % value)
+ self._state = value
+ self.state_time = time.time()
def __repr__(self):
return '<Node %s %s:%s>' % (self.id, self.name, self.image)
+ def updateFromDict(self, data):
+ self._state = data['state']
+ self.state_time = data['state_time']
+ self.public_ipv4 = data.get('public_ipv4')
+ self.private_ipv4 = data.get('private_ipv4')
+ self.public_ipv6 = data.get('public_ipv6')
+
class NodeSet(object):
"""A set of nodes.
@@ -407,6 +433,9 @@
self.stat = None
self.uid = uuid4().hex
self.id = None
+ # Zuul internal failure flag (not stored in ZK so it's not
+ # overwritten).
+ self.failed = False
@property
def state(self):
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9d0d803..903d90c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -41,6 +41,34 @@
def returnNodes(self, nodes, used=True):
pass
+ def unlockNodeset(self, nodeset):
+ self._unlockNodes(nodeset.getNodes())
+
+ def _unlockNodes(self, nodes):
+ for node in nodes:
+ try:
+ self.sched.zk.unlockNode(node)
+ except Exception:
+ self.log.exception("Error unlocking node:")
+
+ def lockNodeset(self, nodeset):
+ self._lockNodes(nodeset.getNodes())
+
+ def _lockNodes(self, nodes):
+ # Try to lock all of the supplied nodes. If any lock fails,
+ # try to unlock any which have already been locked before
+ # re-raising the error.
+ locked_nodes = []
+ try:
+ for node in nodes:
+ self.log.debug("Locking node: %s" % (node,))
+ self.sched.zk.lockNode(node)
+ locked_nodes.append(node)
+ except Exception:
+ self.log.exception("Error locking nodes:")
+ self._unlockNodes(locked_nodes)
+ raise
+
def _updateNodeRequest(self, request, deleted):
# Return False to indicate that we should stop watching the
# node.
@@ -50,10 +78,45 @@
return False
if request.state == 'fulfilled':
+ self.log.info("Node request %s fulfilled" % (request,))
+
+ # Give our results to the scheduler.
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]
+
+ # Stop watching this request node.
return False
+ # TODOv3(jeblair): handle allocation failure
elif deleted:
self.log.debug("Resubmitting lost node request %s" % (request,))
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
return True
+
+ def acceptNodes(self, request):
+ # Called by the scheduler when it wants to accept and lock
+ # nodes for (potential) use.
+
+ self.log.debug("Accepting node request: %s" % (request,))
+
+ # First, try to lock the nodes.
+ locked = False
+ try:
+ self.lockNodeset(request.nodeset)
+ locked = True
+ except Exception:
+ self.log.exception("Error locking nodes:")
+ request.failed = True
+
+ # Regardless of whether locking succeeded, delete the
+ # request.
+ self.log.debug("Deleting node request: %s" % (request,))
+ try:
+ self.sched.zk.deleteNodeRequest(request)
+ except Exception:
+ self.log.exception("Error deleting node request:")
+ request.failed = True
+ # If deleting the request failed, and we did lock the
+ # nodes, unlock the nodes since we're not going to use
+ # them.
+ if locked:
+ self.unlockNodeset(request.nodeset)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 4a6cc93..270e055 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -800,6 +800,14 @@
def _doNodesProvisionedEvent(self, event):
request = event.request
build_set = request.build_set
+
+ try:
+ self.nodepool.acceptNodes(request)
+ except Exception:
+ self.log.exception("Unable to accept nodes from request %s:"
+ % (request,))
+ return
+
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current" % (build_set,))
self.nodepool.returnNodes(request.nodes, used=False)
diff --git a/zuul/zk.py b/zuul/zk.py
index 190d7b4..4f7d736 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,6 +17,8 @@
import six
import time
from kazoo.client import KazooClient, KazooState
+from kazoo import exceptions as kze
+from kazoo.recipe.lock import Lock
# States:
# We are building this node but it is not ready for use.
@@ -29,6 +31,10 @@
STATES = set([BUILDING, READY, DELETING])
+class LockException(Exception):
+ pass
+
+
class ZooKeeperConnectionConfig(object):
'''
Represents the connection parameters for a ZooKeeper server.
@@ -178,6 +184,7 @@
log = logging.getLogger("zuul.zk.ZooKeeper")
REQUEST_ROOT = '/nodepool/requests'
+ NODE_ROOT = '/nodepool/nodes'
def __init__(self):
'''
@@ -300,7 +307,69 @@
if data:
data = self._strToDict(data)
node_request.updateFromDict(data)
+ request_nodes = node_request.nodeset.getNodes()
+ for i, nodeid in enumerate(data.get('nodes', [])):
+ node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
+ node_data, node_stat = self.client.get(node_path)
+ node_data = self._strToDict(node_data)
+ request_nodes[i].id = nodeid
+ request_nodes[i].updateFromDict(node_data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
self.client.DataWatch(path, callback)
+
+ def deleteNodeRequest(self, node_request):
+ '''
+ Delete a request for nodes.
+
+ :param NodeRequest node_request: A NodeRequest with the
+ contents of the request.
+ '''
+
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ try:
+ self.client.delete(path)
+ except kze.NoNodeError:
+ pass
+
+ def lockNode(self, node, blocking=True, timeout=None):
+ '''
+ Lock a node.
+
+ This should be called as soon as a request is fulfilled and
+ the lock held for as long as the node is in-use. It can be
+ used by nodepool to detect if Zuul has gone offline and the
+ node should be reclaimed.
+
+ :param Node node: The node which should be locked.
+ '''
+
+ lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
+ try:
+ lock = Lock(self.client, lock_path)
+ have_lock = lock.acquire(blocking, timeout)
+ except kze.LockTimeout:
+ raise LockException(
+ "Timeout trying to acquire lock %s" % lock_path)
+
+ # If we aren't blocking, it's possible we didn't get the lock
+ # because someone else has it.
+ if not have_lock:
+ raise LockException("Did not get lock on %s" % lock_path)
+
+ node.lock = lock
+
+ def unlockNode(self, node):
+ '''
+ Unlock a node.
+
+ The node must already have been locked.
+
+ :param Node node: The node which should be unlocked.
+ '''
+
+ if node.lock is None:
+ raise LockException("Node %s does not hold a lock" % (node,))
+ node.lock.release()
+ node.lock = None