blob: 5ea4e5677605abe9622af1d9fba58205611fba65 [file] [log] [blame]
James E. Blairdce6cea2016-12-20 16:45:32 -08001#!/usr/bin/env python
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import json
16import logging
James E. Blairdce6cea2016-12-20 16:45:32 -080017import time
David Shrewsburyffab07a2017-07-24 12:45:07 -040018
James E. Blairdce6cea2016-12-20 16:45:32 -080019from kazoo.client import KazooClient, KazooState
James E. Blaira38c28e2017-01-04 10:33:20 -080020from kazoo import exceptions as kze
21from kazoo.recipe.lock import Lock
James E. Blairdce6cea2016-12-20 16:45:32 -080022
David Shrewsburyffab07a2017-07-24 12:45:07 -040023import zuul.model
24
25
James E. Blaira38c28e2017-01-04 10:33:20 -080026class LockException(Exception):
27 pass
28
29
James E. Blairdce6cea2016-12-20 16:45:32 -080030class ZooKeeper(object):
31 '''
32 Class implementing the ZooKeeper interface.
33
34 This class uses the facade design pattern to keep common interaction
35 with the ZooKeeper API simple and consistent for the caller, and
36 limits coupling between objects. It allows for more complex interactions
37 by providing direct access to the client connection when needed (though
38 that is discouraged). It also provides for a convenient entry point for
39 testing only ZooKeeper interactions.
40 '''
41
42 log = logging.getLogger("zuul.zk.ZooKeeper")
43
44 REQUEST_ROOT = '/nodepool/requests'
James E. Blaira38c28e2017-01-04 10:33:20 -080045 NODE_ROOT = '/nodepool/nodes'
James E. Blairdce6cea2016-12-20 16:45:32 -080046
47 def __init__(self):
48 '''
49 Initialize the ZooKeeper object.
50 '''
51 self.client = None
52 self._became_lost = False
53
54 def _dictToStr(self, data):
Clint Byrumf322fe22017-05-10 20:53:12 -070055 return json.dumps(data).encode('utf8')
James E. Blairdce6cea2016-12-20 16:45:32 -080056
57 def _strToDict(self, data):
Clint Byrumf322fe22017-05-10 20:53:12 -070058 return json.loads(data.decode('utf8'))
James E. Blairdce6cea2016-12-20 16:45:32 -080059
60 def _connection_listener(self, state):
61 '''
62 Listener method for Kazoo connection state changes.
63
64 .. warning:: This method must not block.
65 '''
66 if state == KazooState.LOST:
67 self.log.debug("ZooKeeper connection: LOST")
68 self._became_lost = True
69 elif state == KazooState.SUSPENDED:
70 self.log.debug("ZooKeeper connection: SUSPENDED")
71 else:
72 self.log.debug("ZooKeeper connection: CONNECTED")
73
74 @property
75 def connected(self):
76 return self.client.state == KazooState.CONNECTED
77
78 @property
79 def suspended(self):
80 return self.client.state == KazooState.SUSPENDED
81
82 @property
83 def lost(self):
84 return self.client.state == KazooState.LOST
85
86 @property
87 def didLoseConnection(self):
88 return self._became_lost
89
90 def resetLostFlag(self):
91 self._became_lost = False
92
Paul Belanger9790c6a2017-03-20 16:48:11 -040093 def connect(self, hosts, read_only=False):
James E. Blairdce6cea2016-12-20 16:45:32 -080094 '''
95 Establish a connection with ZooKeeper cluster.
96
97 Convenience method if a pre-existing ZooKeeper connection is not
98 supplied to the ZooKeeper object at instantiation time.
99
Paul Belanger9790c6a2017-03-20 16:48:11 -0400100 :param str hosts: Comma-separated list of hosts to connect to (e.g.
101 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
James E. Blairdce6cea2016-12-20 16:45:32 -0800102 :param bool read_only: If True, establishes a read-only connection.
103
104 '''
105 if self.client is None:
Paul Belanger9790c6a2017-03-20 16:48:11 -0400106 self.client = KazooClient(hosts=hosts, read_only=read_only)
James E. Blairdce6cea2016-12-20 16:45:32 -0800107 self.client.add_listener(self._connection_listener)
108 self.client.start()
109
110 def disconnect(self):
111 '''
112 Close the ZooKeeper cluster connection.
113
114 You should call this method if you used connect() to establish a
115 cluster connection.
116 '''
117 if self.client is not None and self.client.connected:
118 self.client.stop()
119 self.client.close()
120 self.client = None
121
Paul Belanger9790c6a2017-03-20 16:48:11 -0400122 def resetHosts(self, hosts):
James E. Blairdce6cea2016-12-20 16:45:32 -0800123 '''
124 Reset the ZooKeeper cluster connection host list.
125
Paul Belanger9790c6a2017-03-20 16:48:11 -0400126 :param str hosts: Comma-separated list of hosts to connect to (e.g.
127 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
James E. Blairdce6cea2016-12-20 16:45:32 -0800128 '''
129 if self.client is not None:
Paul Belanger9790c6a2017-03-20 16:48:11 -0400130 self.client.set_hosts(hosts=hosts)
James E. Blairdce6cea2016-12-20 16:45:32 -0800131
James E. Blair15be0e12017-01-03 13:45:20 -0800132 def submitNodeRequest(self, node_request, watcher):
James E. Blairdce6cea2016-12-20 16:45:32 -0800133 '''
134 Submit a request for nodes to Nodepool.
135
136 :param NodeRequest node_request: A NodeRequest with the
137 contents of the request.
James E. Blair15be0e12017-01-03 13:45:20 -0800138
139 :param callable watcher: A callable object that will be
140 invoked each time the request is updated. It is called
141 with two arguments: (node_request, deleted) where
142 node_request is the same argument passed to this method,
143 and deleted is a boolean which is True if the node no
144 longer exists (notably, this will happen on disconnection
145 from ZooKeeper). The watcher should return False when
146 further updates are no longer necessary.
James E. Blairdce6cea2016-12-20 16:45:32 -0800147 '''
148 priority = 100 # TODO(jeblair): integrate into nodereq
149
150 data = node_request.toDict()
151 data['created_time'] = time.time()
152
153 path = '%s/%s-' % (self.REQUEST_ROOT, priority)
James E. Blairdce6cea2016-12-20 16:45:32 -0800154 path = self.client.create(path, self._dictToStr(data),
155 makepath=True,
156 sequence=True, ephemeral=True)
157 reqid = path.split("/")[-1]
158 node_request.id = reqid
159
James E. Blair15be0e12017-01-03 13:45:20 -0800160 def callback(data, stat):
161 if data:
162 data = self._strToDict(data)
163 node_request.updateFromDict(data)
Clint Byrum1d0c7d12017-05-10 19:40:53 -0700164 request_nodes = list(node_request.nodeset.getNodes())
James E. Blaira38c28e2017-01-04 10:33:20 -0800165 for i, nodeid in enumerate(data.get('nodes', [])):
166 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
167 node_data, node_stat = self.client.get(node_path)
168 node_data = self._strToDict(node_data)
169 request_nodes[i].id = nodeid
170 request_nodes[i].updateFromDict(node_data)
James E. Blair15be0e12017-01-03 13:45:20 -0800171 deleted = (data is None) # data *are* none
172 return watcher(node_request, deleted)
James E. Blairdce6cea2016-12-20 16:45:32 -0800173
James E. Blair15be0e12017-01-03 13:45:20 -0800174 self.client.DataWatch(path, callback)
James E. Blaira38c28e2017-01-04 10:33:20 -0800175
176 def deleteNodeRequest(self, node_request):
177 '''
178 Delete a request for nodes.
179
180 :param NodeRequest node_request: A NodeRequest with the
181 contents of the request.
182 '''
183
184 path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
185 try:
186 self.client.delete(path)
187 except kze.NoNodeError:
188 pass
189
James E. Blaircacdf2b2017-01-04 13:14:37 -0800190 def storeNode(self, node):
191 '''Store the node.
192
193 The node is expected to already exist and is updated in its
194 entirety.
195
196 :param Node node: The node to update.
197 '''
198
199 path = '%s/%s' % (self.NODE_ROOT, node.id)
200 self.client.set(path, self._dictToStr(node.toDict()))
201
James E. Blaira38c28e2017-01-04 10:33:20 -0800202 def lockNode(self, node, blocking=True, timeout=None):
203 '''
204 Lock a node.
205
206 This should be called as soon as a request is fulfilled and
207 the lock held for as long as the node is in-use. It can be
208 used by nodepool to detect if Zuul has gone offline and the
209 node should be reclaimed.
210
211 :param Node node: The node which should be locked.
212 '''
213
214 lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
215 try:
216 lock = Lock(self.client, lock_path)
217 have_lock = lock.acquire(blocking, timeout)
218 except kze.LockTimeout:
219 raise LockException(
220 "Timeout trying to acquire lock %s" % lock_path)
221
222 # If we aren't blocking, it's possible we didn't get the lock
223 # because someone else has it.
224 if not have_lock:
225 raise LockException("Did not get lock on %s" % lock_path)
226
227 node.lock = lock
228
229 def unlockNode(self, node):
230 '''
231 Unlock a node.
232
233 The node must already have been locked.
234
235 :param Node node: The node which should be unlocked.
236 '''
237
238 if node.lock is None:
239 raise LockException("Node %s does not hold a lock" % (node,))
240 node.lock.release()
241 node.lock = None
David Shrewsburyffab07a2017-07-24 12:45:07 -0400242
243 def heldNodeCount(self, autohold_key):
244 '''
245 Count the number of nodes being held for the given tenant/project/job.
246
247 :param set autohold_key: A set with the tenant/project/job names.
248 '''
249 identifier = " ".join(autohold_key)
250 try:
251 nodes = self.client.get_children(self.NODE_ROOT)
252 except kze.NoNodeError:
253 return 0
254
255 count = 0
256 for nodeid in nodes:
257 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
258 node_data, node_stat = self.client.get(node_path)
259 node_data = self._strToDict(node_data)
260 if (node_data['state'] == zuul.model.STATE_HOLD and
261 node_data.get('hold_job') == identifier):
262 count += 1
263 return count