blob: 5cd7bee27363f661e55335fbd8bf1d04ed2764c8 [file] [log] [blame]
James E. Blairdce6cea2016-12-20 16:45:32 -08001#!/usr/bin/env python
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import json
16import logging
James E. Blairdce6cea2016-12-20 16:45:32 -080017import time
18from kazoo.client import KazooClient, KazooState
James E. Blaira38c28e2017-01-04 10:33:20 -080019from kazoo import exceptions as kze
20from kazoo.recipe.lock import Lock
James E. Blairdce6cea2016-12-20 16:45:32 -080021
22# States:
23# We are building this node but it is not ready for use.
24BUILDING = 'building'
25# The node is ready for use.
26READY = 'ready'
27# The node should be deleted.
28DELETING = 'deleting'
29
30STATES = set([BUILDING, READY, DELETING])
31
32
James E. Blaira38c28e2017-01-04 10:33:20 -080033class LockException(Exception):
34 pass
35
36
James E. Blairdce6cea2016-12-20 16:45:32 -080037class ZooKeeper(object):
38 '''
39 Class implementing the ZooKeeper interface.
40
41 This class uses the facade design pattern to keep common interaction
42 with the ZooKeeper API simple and consistent for the caller, and
43 limits coupling between objects. It allows for more complex interactions
44 by providing direct access to the client connection when needed (though
45 that is discouraged). It also provides for a convenient entry point for
46 testing only ZooKeeper interactions.
47 '''
48
49 log = logging.getLogger("zuul.zk.ZooKeeper")
50
51 REQUEST_ROOT = '/nodepool/requests'
James E. Blaira38c28e2017-01-04 10:33:20 -080052 NODE_ROOT = '/nodepool/nodes'
James E. Blairdce6cea2016-12-20 16:45:32 -080053
54 def __init__(self):
55 '''
56 Initialize the ZooKeeper object.
57 '''
58 self.client = None
59 self._became_lost = False
60
61 def _dictToStr(self, data):
62 return json.dumps(data)
63
64 def _strToDict(self, data):
65 return json.loads(data)
66
67 def _connection_listener(self, state):
68 '''
69 Listener method for Kazoo connection state changes.
70
71 .. warning:: This method must not block.
72 '''
73 if state == KazooState.LOST:
74 self.log.debug("ZooKeeper connection: LOST")
75 self._became_lost = True
76 elif state == KazooState.SUSPENDED:
77 self.log.debug("ZooKeeper connection: SUSPENDED")
78 else:
79 self.log.debug("ZooKeeper connection: CONNECTED")
80
81 @property
82 def connected(self):
83 return self.client.state == KazooState.CONNECTED
84
85 @property
86 def suspended(self):
87 return self.client.state == KazooState.SUSPENDED
88
89 @property
90 def lost(self):
91 return self.client.state == KazooState.LOST
92
93 @property
94 def didLoseConnection(self):
95 return self._became_lost
96
97 def resetLostFlag(self):
98 self._became_lost = False
99
Paul Belanger9790c6a2017-03-20 16:48:11 -0400100 def connect(self, hosts, read_only=False):
James E. Blairdce6cea2016-12-20 16:45:32 -0800101 '''
102 Establish a connection with ZooKeeper cluster.
103
104 Convenience method if a pre-existing ZooKeeper connection is not
105 supplied to the ZooKeeper object at instantiation time.
106
Paul Belanger9790c6a2017-03-20 16:48:11 -0400107 :param str hosts: Comma-separated list of hosts to connect to (e.g.
108 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
James E. Blairdce6cea2016-12-20 16:45:32 -0800109 :param bool read_only: If True, establishes a read-only connection.
110
111 '''
112 if self.client is None:
Paul Belanger9790c6a2017-03-20 16:48:11 -0400113 self.client = KazooClient(hosts=hosts, read_only=read_only)
James E. Blairdce6cea2016-12-20 16:45:32 -0800114 self.client.add_listener(self._connection_listener)
115 self.client.start()
116
117 def disconnect(self):
118 '''
119 Close the ZooKeeper cluster connection.
120
121 You should call this method if you used connect() to establish a
122 cluster connection.
123 '''
124 if self.client is not None and self.client.connected:
125 self.client.stop()
126 self.client.close()
127 self.client = None
128
Paul Belanger9790c6a2017-03-20 16:48:11 -0400129 def resetHosts(self, hosts):
James E. Blairdce6cea2016-12-20 16:45:32 -0800130 '''
131 Reset the ZooKeeper cluster connection host list.
132
Paul Belanger9790c6a2017-03-20 16:48:11 -0400133 :param str hosts: Comma-separated list of hosts to connect to (e.g.
134 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
James E. Blairdce6cea2016-12-20 16:45:32 -0800135 '''
136 if self.client is not None:
Paul Belanger9790c6a2017-03-20 16:48:11 -0400137 self.client.set_hosts(hosts=hosts)
James E. Blairdce6cea2016-12-20 16:45:32 -0800138
James E. Blair15be0e12017-01-03 13:45:20 -0800139 def submitNodeRequest(self, node_request, watcher):
James E. Blairdce6cea2016-12-20 16:45:32 -0800140 '''
141 Submit a request for nodes to Nodepool.
142
143 :param NodeRequest node_request: A NodeRequest with the
144 contents of the request.
James E. Blair15be0e12017-01-03 13:45:20 -0800145
146 :param callable watcher: A callable object that will be
147 invoked each time the request is updated. It is called
148 with two arguments: (node_request, deleted) where
149 node_request is the same argument passed to this method,
150 and deleted is a boolean which is True if the node no
151 longer exists (notably, this will happen on disconnection
152 from ZooKeeper). The watcher should return False when
153 further updates are no longer necessary.
James E. Blairdce6cea2016-12-20 16:45:32 -0800154 '''
155 priority = 100 # TODO(jeblair): integrate into nodereq
156
157 data = node_request.toDict()
158 data['created_time'] = time.time()
159
160 path = '%s/%s-' % (self.REQUEST_ROOT, priority)
James E. Blairdce6cea2016-12-20 16:45:32 -0800161 path = self.client.create(path, self._dictToStr(data),
162 makepath=True,
163 sequence=True, ephemeral=True)
164 reqid = path.split("/")[-1]
165 node_request.id = reqid
166
James E. Blair15be0e12017-01-03 13:45:20 -0800167 def callback(data, stat):
168 if data:
169 data = self._strToDict(data)
170 node_request.updateFromDict(data)
James E. Blaira38c28e2017-01-04 10:33:20 -0800171 request_nodes = node_request.nodeset.getNodes()
172 for i, nodeid in enumerate(data.get('nodes', [])):
173 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
174 node_data, node_stat = self.client.get(node_path)
175 node_data = self._strToDict(node_data)
176 request_nodes[i].id = nodeid
177 request_nodes[i].updateFromDict(node_data)
James E. Blair15be0e12017-01-03 13:45:20 -0800178 deleted = (data is None) # data *are* none
179 return watcher(node_request, deleted)
James E. Blairdce6cea2016-12-20 16:45:32 -0800180
James E. Blair15be0e12017-01-03 13:45:20 -0800181 self.client.DataWatch(path, callback)
James E. Blaira38c28e2017-01-04 10:33:20 -0800182
183 def deleteNodeRequest(self, node_request):
184 '''
185 Delete a request for nodes.
186
187 :param NodeRequest node_request: A NodeRequest with the
188 contents of the request.
189 '''
190
191 path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
192 try:
193 self.client.delete(path)
194 except kze.NoNodeError:
195 pass
196
James E. Blaircacdf2b2017-01-04 13:14:37 -0800197 def storeNode(self, node):
198 '''Store the node.
199
200 The node is expected to already exist and is updated in its
201 entirety.
202
203 :param Node node: The node to update.
204 '''
205
206 path = '%s/%s' % (self.NODE_ROOT, node.id)
207 self.client.set(path, self._dictToStr(node.toDict()))
208
James E. Blaira38c28e2017-01-04 10:33:20 -0800209 def lockNode(self, node, blocking=True, timeout=None):
210 '''
211 Lock a node.
212
213 This should be called as soon as a request is fulfilled and
214 the lock held for as long as the node is in-use. It can be
215 used by nodepool to detect if Zuul has gone offline and the
216 node should be reclaimed.
217
218 :param Node node: The node which should be locked.
219 '''
220
221 lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
222 try:
223 lock = Lock(self.client, lock_path)
224 have_lock = lock.acquire(blocking, timeout)
225 except kze.LockTimeout:
226 raise LockException(
227 "Timeout trying to acquire lock %s" % lock_path)
228
229 # If we aren't blocking, it's possible we didn't get the lock
230 # because someone else has it.
231 if not have_lock:
232 raise LockException("Did not get lock on %s" % lock_path)
233
234 node.lock = lock
235
236 def unlockNode(self, node):
237 '''
238 Unlock a node.
239
240 The node must already have been locked.
241
242 :param Node node: The node which should be unlocked.
243 '''
244
245 if node.lock is None:
246 raise LockException("Node %s does not hold a lock" % (node,))
247 node.lock.release()
248 node.lock = None