blob: 190d7b487a5aae3f6c76a8939d4f7402e41cce42 [file] [log] [blame]
James E. Blairdce6cea2016-12-20 16:45:32 -08001#!/usr/bin/env python
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import json
16import logging
17import six
18import time
19from kazoo.client import KazooClient, KazooState
James E. Blairdce6cea2016-12-20 16:45:32 -080020
21# States:
22# We are building this node but it is not ready for use.
23BUILDING = 'building'
24# The node is ready for use.
25READY = 'ready'
26# The node should be deleted.
27DELETING = 'deleting'
28
29STATES = set([BUILDING, READY, DELETING])
30
31
32class ZooKeeperConnectionConfig(object):
33 '''
34 Represents the connection parameters for a ZooKeeper server.
35 '''
36
37 def __eq__(self, other):
38 if isinstance(other, ZooKeeperConnectionConfig):
39 if other.__dict__ == self.__dict__:
40 return True
41 return False
42
43 def __init__(self, host, port=2181, chroot=None):
44 '''Initialize the ZooKeeperConnectionConfig object.
45
46 :param str host: The hostname of the ZooKeeper server.
47 :param int port: The port on which ZooKeeper is listening.
48 Optional, default: 2181.
49 :param str chroot: A chroot for this connection. All
50 ZooKeeper nodes will be underneath this root path.
51 Optional, default: None.
52
53 (one per server) defining the ZooKeeper cluster servers. Only
54 the 'host' attribute is required.'.
55
56 '''
57 self.host = host
58 self.port = port
59 self.chroot = chroot or ''
60
61
62def buildZooKeeperHosts(host_list):
63 '''
64 Build the ZK cluster host list for client connections.
65
66 :param list host_list: A list of
67 :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
68 per server) defining the ZooKeeper cluster servers.
69 '''
70 if not isinstance(host_list, list):
71 raise Exception("'host_list' must be a list")
72 hosts = []
73 for host_def in host_list:
74 host = '%s:%s%s' % (host_def.host, host_def.port, host_def.chroot)
75 hosts.append(host)
76 return ",".join(hosts)
77
78
79class BaseModel(object):
80 def __init__(self, o_id):
81 if o_id:
82 self.id = o_id
83 self._state = None
84 self.state_time = None
85 self.stat = None
86
87 @property
88 def id(self):
89 return self._id
90
91 @id.setter
92 def id(self, value):
93 if not isinstance(value, six.string_types):
94 raise TypeError("'id' attribute must be a string type")
95 self._id = value
96
97 @property
98 def state(self):
99 return self._state
100
101 @state.setter
102 def state(self, value):
103 if value not in STATES:
104 raise TypeError("'%s' is not a valid state" % value)
105 self._state = value
106 self.state_time = time.time()
107
108 def toDict(self):
109 '''
110 Convert a BaseModel object's attributes to a dictionary.
111 '''
112 d = {}
113 d['state'] = self.state
114 d['state_time'] = self.state_time
115 return d
116
117 def fromDict(self, d):
118 '''
119 Set base attributes based on the given dict.
120
121 Unlike the derived classes, this should NOT return an object as it
122 assumes self has already been instantiated.
123 '''
124 if 'state' in d:
125 self.state = d['state']
126 if 'state_time' in d:
127 self.state_time = d['state_time']
128
129
130class NodeRequest(BaseModel):
131 '''
132 Class representing a node request.
133 '''
134
135 def __init__(self, id=None):
136 super(NodeRequest, self).__init__(id)
137
138 def __repr__(self):
139 d = self.toDict()
140 d['id'] = self.id
141 d['stat'] = self.stat
142 return '<NodeRequest %s>' % d
143
144 def toDict(self):
145 '''
146 Convert a NodeRequest object's attributes to a dictionary.
147 '''
148 d = super(NodeRequest, self).toDict()
149 return d
150
151 @staticmethod
152 def fromDict(d, o_id=None):
153 '''
154 Create a NodeRequest object from a dictionary.
155
156 :param dict d: The dictionary.
157 :param str o_id: The object ID.
158
159 :returns: An initialized ImageBuild object.
160 '''
161 o = NodeRequest(o_id)
162 super(NodeRequest, o).fromDict(d)
163 return o
164
165
166class ZooKeeper(object):
167 '''
168 Class implementing the ZooKeeper interface.
169
170 This class uses the facade design pattern to keep common interaction
171 with the ZooKeeper API simple and consistent for the caller, and
172 limits coupling between objects. It allows for more complex interactions
173 by providing direct access to the client connection when needed (though
174 that is discouraged). It also provides for a convenient entry point for
175 testing only ZooKeeper interactions.
176 '''
177
178 log = logging.getLogger("zuul.zk.ZooKeeper")
179
180 REQUEST_ROOT = '/nodepool/requests'
181
182 def __init__(self):
183 '''
184 Initialize the ZooKeeper object.
185 '''
186 self.client = None
187 self._became_lost = False
188
189 def _dictToStr(self, data):
190 return json.dumps(data)
191
192 def _strToDict(self, data):
193 return json.loads(data)
194
195 def _connection_listener(self, state):
196 '''
197 Listener method for Kazoo connection state changes.
198
199 .. warning:: This method must not block.
200 '''
201 if state == KazooState.LOST:
202 self.log.debug("ZooKeeper connection: LOST")
203 self._became_lost = True
204 elif state == KazooState.SUSPENDED:
205 self.log.debug("ZooKeeper connection: SUSPENDED")
206 else:
207 self.log.debug("ZooKeeper connection: CONNECTED")
208
209 @property
210 def connected(self):
211 return self.client.state == KazooState.CONNECTED
212
213 @property
214 def suspended(self):
215 return self.client.state == KazooState.SUSPENDED
216
217 @property
218 def lost(self):
219 return self.client.state == KazooState.LOST
220
221 @property
222 def didLoseConnection(self):
223 return self._became_lost
224
225 def resetLostFlag(self):
226 self._became_lost = False
227
228 def connect(self, host_list, read_only=False):
229 '''
230 Establish a connection with ZooKeeper cluster.
231
232 Convenience method if a pre-existing ZooKeeper connection is not
233 supplied to the ZooKeeper object at instantiation time.
234
235 :param list host_list: A list of
236 :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
237 (one per server) defining the ZooKeeper cluster servers.
238 :param bool read_only: If True, establishes a read-only connection.
239
240 '''
241 if self.client is None:
242 hosts = buildZooKeeperHosts(host_list)
243 self.client = KazooClient(hosts=hosts, read_only=read_only)
244 self.client.add_listener(self._connection_listener)
245 self.client.start()
246
247 def disconnect(self):
248 '''
249 Close the ZooKeeper cluster connection.
250
251 You should call this method if you used connect() to establish a
252 cluster connection.
253 '''
254 if self.client is not None and self.client.connected:
255 self.client.stop()
256 self.client.close()
257 self.client = None
258
259 def resetHosts(self, host_list):
260 '''
261 Reset the ZooKeeper cluster connection host list.
262
263 :param list host_list: A list of
264 :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
265 (one per server) defining the ZooKeeper cluster servers.
266 '''
267 if self.client is not None:
268 hosts = buildZooKeeperHosts(host_list)
269 self.client.set_hosts(hosts=hosts)
270
James E. Blair15be0e12017-01-03 13:45:20 -0800271 def submitNodeRequest(self, node_request, watcher):
James E. Blairdce6cea2016-12-20 16:45:32 -0800272 '''
273 Submit a request for nodes to Nodepool.
274
275 :param NodeRequest node_request: A NodeRequest with the
276 contents of the request.
James E. Blair15be0e12017-01-03 13:45:20 -0800277
278 :param callable watcher: A callable object that will be
279 invoked each time the request is updated. It is called
280 with two arguments: (node_request, deleted) where
281 node_request is the same argument passed to this method,
282 and deleted is a boolean which is True if the node no
283 longer exists (notably, this will happen on disconnection
284 from ZooKeeper). The watcher should return False when
285 further updates are no longer necessary.
James E. Blairdce6cea2016-12-20 16:45:32 -0800286 '''
287 priority = 100 # TODO(jeblair): integrate into nodereq
288
289 data = node_request.toDict()
290 data['created_time'] = time.time()
291
292 path = '%s/%s-' % (self.REQUEST_ROOT, priority)
James E. Blairdce6cea2016-12-20 16:45:32 -0800293 path = self.client.create(path, self._dictToStr(data),
294 makepath=True,
295 sequence=True, ephemeral=True)
296 reqid = path.split("/")[-1]
297 node_request.id = reqid
298
James E. Blair15be0e12017-01-03 13:45:20 -0800299 def callback(data, stat):
300 if data:
301 data = self._strToDict(data)
302 node_request.updateFromDict(data)
303 deleted = (data is None) # data *are* none
304 return watcher(node_request, deleted)
James E. Blairdce6cea2016-12-20 16:45:32 -0800305
James E. Blair15be0e12017-01-03 13:45:20 -0800306 self.client.DataWatch(path, callback)