| #!/usr/bin/env python |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import logging |
| import time |
| from kazoo.client import KazooClient, KazooState |
| from kazoo import exceptions as kze |
| from kazoo.recipe.lock import Lock |
| |
| # States: |
| # We are building this node but it is not ready for use. |
| BUILDING = 'building' |
| # The node is ready for use. |
| READY = 'ready' |
| # The node should be deleted. |
| DELETING = 'deleting' |
| |
| STATES = set([BUILDING, READY, DELETING]) |
| |
| |
| class LockException(Exception): |
| pass |
| |
| |
| class ZooKeeperConnectionConfig(object): |
| ''' |
| Represents the connection parameters for a ZooKeeper server. |
| ''' |
| |
| def __eq__(self, other): |
| if isinstance(other, ZooKeeperConnectionConfig): |
| if other.__dict__ == self.__dict__: |
| return True |
| return False |
| |
| def __init__(self, host, port=2181, chroot=None): |
| '''Initialize the ZooKeeperConnectionConfig object. |
| |
| :param str host: The hostname of the ZooKeeper server. |
| :param int port: The port on which ZooKeeper is listening. |
| Optional, default: 2181. |
| :param str chroot: A chroot for this connection. All |
| ZooKeeper nodes will be underneath this root path. |
| Optional, default: None. |
| |
| (one per server) defining the ZooKeeper cluster servers. Only |
| the 'host' attribute is required.'. |
| |
| ''' |
| self.host = host |
| self.port = port |
| self.chroot = chroot or '' |
| |
| |
| class ZooKeeper(object): |
| ''' |
| Class implementing the ZooKeeper interface. |
| |
| This class uses the facade design pattern to keep common interaction |
| with the ZooKeeper API simple and consistent for the caller, and |
| limits coupling between objects. It allows for more complex interactions |
| by providing direct access to the client connection when needed (though |
| that is discouraged). It also provides for a convenient entry point for |
| testing only ZooKeeper interactions. |
| ''' |
| |
| log = logging.getLogger("zuul.zk.ZooKeeper") |
| |
| REQUEST_ROOT = '/nodepool/requests' |
| NODE_ROOT = '/nodepool/nodes' |
| |
| def __init__(self): |
| ''' |
| Initialize the ZooKeeper object. |
| ''' |
| self.client = None |
| self._became_lost = False |
| |
| def _dictToStr(self, data): |
| return json.dumps(data) |
| |
| def _strToDict(self, data): |
| return json.loads(data) |
| |
| def _connection_listener(self, state): |
| ''' |
| Listener method for Kazoo connection state changes. |
| |
| .. warning:: This method must not block. |
| ''' |
| if state == KazooState.LOST: |
| self.log.debug("ZooKeeper connection: LOST") |
| self._became_lost = True |
| elif state == KazooState.SUSPENDED: |
| self.log.debug("ZooKeeper connection: SUSPENDED") |
| else: |
| self.log.debug("ZooKeeper connection: CONNECTED") |
| |
| @property |
| def connected(self): |
| return self.client.state == KazooState.CONNECTED |
| |
| @property |
| def suspended(self): |
| return self.client.state == KazooState.SUSPENDED |
| |
| @property |
| def lost(self): |
| return self.client.state == KazooState.LOST |
| |
| @property |
| def didLoseConnection(self): |
| return self._became_lost |
| |
| def resetLostFlag(self): |
| self._became_lost = False |
| |
| def connect(self, host_list, read_only=False): |
| ''' |
| Establish a connection with ZooKeeper cluster. |
| |
| Convenience method if a pre-existing ZooKeeper connection is not |
| supplied to the ZooKeeper object at instantiation time. |
| |
| :param list host_list: A list of |
| :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects |
| (one per server) defining the ZooKeeper cluster servers. |
| :param bool read_only: If True, establishes a read-only connection. |
| |
| ''' |
| if self.client is None: |
| self.client = KazooClient(hosts=host_list, read_only=read_only) |
| self.client.add_listener(self._connection_listener) |
| self.client.start() |
| |
| def disconnect(self): |
| ''' |
| Close the ZooKeeper cluster connection. |
| |
| You should call this method if you used connect() to establish a |
| cluster connection. |
| ''' |
| if self.client is not None and self.client.connected: |
| self.client.stop() |
| self.client.close() |
| self.client = None |
| |
| def resetHosts(self, host_list): |
| ''' |
| Reset the ZooKeeper cluster connection host list. |
| |
| :param list host_list: A list of |
| :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects |
| (one per server) defining the ZooKeeper cluster servers. |
| ''' |
| if self.client is not None: |
| self.client.set_hosts(hosts=host_list) |
| |
| def submitNodeRequest(self, node_request, watcher): |
| ''' |
| Submit a request for nodes to Nodepool. |
| |
| :param NodeRequest node_request: A NodeRequest with the |
| contents of the request. |
| |
| :param callable watcher: A callable object that will be |
| invoked each time the request is updated. It is called |
| with two arguments: (node_request, deleted) where |
| node_request is the same argument passed to this method, |
| and deleted is a boolean which is True if the node no |
| longer exists (notably, this will happen on disconnection |
| from ZooKeeper). The watcher should return False when |
| further updates are no longer necessary. |
| ''' |
| priority = 100 # TODO(jeblair): integrate into nodereq |
| |
| data = node_request.toDict() |
| data['created_time'] = time.time() |
| |
| path = '%s/%s-' % (self.REQUEST_ROOT, priority) |
| path = self.client.create(path, self._dictToStr(data), |
| makepath=True, |
| sequence=True, ephemeral=True) |
| reqid = path.split("/")[-1] |
| node_request.id = reqid |
| |
| def callback(data, stat): |
| if data: |
| data = self._strToDict(data) |
| node_request.updateFromDict(data) |
| request_nodes = node_request.nodeset.getNodes() |
| for i, nodeid in enumerate(data.get('nodes', [])): |
| node_path = '%s/%s' % (self.NODE_ROOT, nodeid) |
| node_data, node_stat = self.client.get(node_path) |
| node_data = self._strToDict(node_data) |
| request_nodes[i].id = nodeid |
| request_nodes[i].updateFromDict(node_data) |
| deleted = (data is None) # data *are* none |
| return watcher(node_request, deleted) |
| |
| self.client.DataWatch(path, callback) |
| |
| def deleteNodeRequest(self, node_request): |
| ''' |
| Delete a request for nodes. |
| |
| :param NodeRequest node_request: A NodeRequest with the |
| contents of the request. |
| ''' |
| |
| path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) |
| try: |
| self.client.delete(path) |
| except kze.NoNodeError: |
| pass |
| |
| def storeNode(self, node): |
| '''Store the node. |
| |
| The node is expected to already exist and is updated in its |
| entirety. |
| |
| :param Node node: The node to update. |
| ''' |
| |
| path = '%s/%s' % (self.NODE_ROOT, node.id) |
| self.client.set(path, self._dictToStr(node.toDict())) |
| |
| def lockNode(self, node, blocking=True, timeout=None): |
| ''' |
| Lock a node. |
| |
| This should be called as soon as a request is fulfilled and |
| the lock held for as long as the node is in-use. It can be |
| used by nodepool to detect if Zuul has gone offline and the |
| node should be reclaimed. |
| |
| :param Node node: The node which should be locked. |
| ''' |
| |
| lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id) |
| try: |
| lock = Lock(self.client, lock_path) |
| have_lock = lock.acquire(blocking, timeout) |
| except kze.LockTimeout: |
| raise LockException( |
| "Timeout trying to acquire lock %s" % lock_path) |
| |
| # If we aren't blocking, it's possible we didn't get the lock |
| # because someone else has it. |
| if not have_lock: |
| raise LockException("Did not get lock on %s" % lock_path) |
| |
| node.lock = lock |
| |
| def unlockNode(self, node): |
| ''' |
| Unlock a node. |
| |
| The node must already have been locked. |
| |
| :param Node node: The node which should be unlocked. |
| ''' |
| |
| if node.lock is None: |
| raise LockException("Node %s does not hold a lock" % (node,)) |
| node.lock.release() |
| node.lock = None |