blob: 8a8e00c0f02262d28c362489f09b33ecbff4044c [file] [log] [blame]
James E. Blairf5dbd002015-12-23 15:26:17 -08001# Copyright 2014 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import collections
16import json
17import logging
James E. Blair82938472016-01-11 14:38:13 -080018import os
James E. Blairf5dbd002015-12-23 15:26:17 -080019import shutil
James E. Blair414cb672016-10-05 13:48:14 -070020import signal
James E. Blair17302972016-08-10 16:11:42 -070021import socket
James E. Blair82938472016-01-11 14:38:13 -080022import subprocess
James E. Blairf5dbd002015-12-23 15:26:17 -080023import tempfile
24import threading
James E. Blair414cb672016-10-05 13:48:14 -070025import time
James E. Blairf5dbd002015-12-23 15:26:17 -080026import traceback
James E. Blaira92cbc82017-01-23 14:56:49 -080027import yaml
James E. Blairf5dbd002015-12-23 15:26:17 -080028
29import gear
30
James E. Blair29b9c962017-02-13 16:17:22 -080031import zuul.merger.merger
Monty Taylorc231d932017-02-03 09:57:15 -060032import zuul.ansible.action
James E. Blair414cb672016-10-05 13:48:14 -070033import zuul.ansible.library
James E. Blair414cb672016-10-05 13:48:14 -070034from zuul.lib import commandsocket
James E. Blairf5dbd002015-12-23 15:26:17 -080035
James E. Blair414cb672016-10-05 13:48:14 -070036ANSIBLE_WATCHDOG_GRACE = 5 * 60
37
38
James E. Blair29b9c962017-02-13 16:17:22 -080039COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
40 'unverbose']
41
42
James E. Blair414cb672016-10-05 13:48:14 -070043class Watchdog(object):
44 def __init__(self, timeout, function, args):
45 self.timeout = timeout
46 self.function = function
47 self.args = args
48 self.thread = threading.Thread(target=self._run)
49 self.thread.daemon = True
50 self.timed_out = None
51
52 def _run(self):
53 while self._running and time.time() < self.end:
54 time.sleep(10)
55 if self._running:
56 self.timed_out = True
57 self.function(*self.args)
58 self.timed_out = False
59
60 def start(self):
61 self._running = True
62 self.end = time.time() + self.timeout
63 self.thread.start()
64
65 def stop(self):
66 self._running = False
James E. Blairf5dbd002015-12-23 15:26:17 -080067
James E. Blair23161912016-07-28 15:42:14 -070068# TODOv3(mordred): put git repos in a hierarchy that includes source
69# hostname, eg: git.openstack.org/openstack/nova. Also, configure
70# sources to have an alias, so that the review.openstack.org source
71# repos end up in git.openstack.org.
72
James E. Blair414cb672016-10-05 13:48:14 -070073
James E. Blair66b274e2017-01-31 14:47:52 -080074class JobDirPlaybook(object):
75 def __init__(self, root):
76 self.root = root
77 self.secure = None
78 self.path = None
79
80
James E. Blair82938472016-01-11 14:38:13 -080081class JobDir(object):
James E. Blair854f8892017-02-02 11:25:39 -080082 def __init__(self, root=None, keep=False):
James E. Blair414cb672016-10-05 13:48:14 -070083 self.keep = keep
James E. Blair854f8892017-02-02 11:25:39 -080084 self.root = tempfile.mkdtemp(dir=root)
James E. Blair82938472016-01-11 14:38:13 -080085 self.git_root = os.path.join(self.root, 'git')
86 os.makedirs(self.git_root)
87 self.ansible_root = os.path.join(self.root, 'ansible')
88 os.makedirs(self.ansible_root)
Monty Taylorc231d932017-02-03 09:57:15 -060089 self.secure_ansible_root = os.path.join(self.ansible_root, 'secure')
90 os.makedirs(self.secure_ansible_root)
James E. Blair414cb672016-10-05 13:48:14 -070091 self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
James E. Blair82938472016-01-11 14:38:13 -080092 self.inventory = os.path.join(self.ansible_root, 'inventory')
James E. Blaira92cbc82017-01-23 14:56:49 -080093 self.vars = os.path.join(self.ansible_root, 'vars.yaml')
James E. Blaira7f51ca2017-02-07 16:01:26 -080094 self.playbooks = [] # The list of candidate playbooks
95 self.playbook = None # A pointer to the candidate we have chosen
James E. Blair66b274e2017-01-31 14:47:52 -080096 self.pre_playbooks = []
97 self.post_playbooks = []
James E. Blair5ac93842017-01-20 06:47:34 -080098 self.roles = []
99 self.roles_path = []
James E. Blair82938472016-01-11 14:38:13 -0800100 self.config = os.path.join(self.ansible_root, 'ansible.cfg')
Monty Taylorc231d932017-02-03 09:57:15 -0600101 self.secure_config = os.path.join(
102 self.secure_ansible_root, 'ansible.cfg')
James E. Blair414cb672016-10-05 13:48:14 -0700103 self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
James E. Blairf5dbd002015-12-23 15:26:17 -0800104
James E. Blair66b274e2017-01-31 14:47:52 -0800105 def addPrePlaybook(self):
106 count = len(self.pre_playbooks)
107 root = os.path.join(self.ansible_root, 'pre_playbook_%i' % (count,))
108 os.makedirs(root)
109 playbook = JobDirPlaybook(root)
110 self.pre_playbooks.append(playbook)
111 return playbook
112
113 def addPostPlaybook(self):
114 count = len(self.post_playbooks)
115 root = os.path.join(self.ansible_root, 'post_playbook_%i' % (count,))
116 os.makedirs(root)
117 playbook = JobDirPlaybook(root)
118 self.post_playbooks.append(playbook)
119 return playbook
120
James E. Blaira7f51ca2017-02-07 16:01:26 -0800121 def addPlaybook(self):
122 count = len(self.playbooks)
123 root = os.path.join(self.ansible_root, 'playbook_%i' % (count,))
124 os.makedirs(root)
125 playbook = JobDirPlaybook(root)
126 self.playbooks.append(playbook)
127 return playbook
128
James E. Blair5ac93842017-01-20 06:47:34 -0800129 def addRole(self):
130 count = len(self.roles)
131 root = os.path.join(self.ansible_root, 'role_%i' % (count,))
132 os.makedirs(root)
133 self.roles.append(root)
134 return root
135
James E. Blair412fba82017-01-26 15:00:50 -0800136 def cleanup(self):
137 if not self.keep:
138 shutil.rmtree(self.root)
139
James E. Blairf5dbd002015-12-23 15:26:17 -0800140 def __enter__(self):
James E. Blair82938472016-01-11 14:38:13 -0800141 return self
James E. Blairf5dbd002015-12-23 15:26:17 -0800142
143 def __exit__(self, etype, value, tb):
James E. Blair412fba82017-01-26 15:00:50 -0800144 self.cleanup()
James E. Blairf5dbd002015-12-23 15:26:17 -0800145
146
147class UpdateTask(object):
148 def __init__(self, project, url):
149 self.project = project
150 self.url = url
151 self.event = threading.Event()
152
153 def __eq__(self, other):
154 if other.project == self.project:
155 return True
156 return False
157
158 def wait(self):
159 self.event.wait()
160
161 def setComplete(self):
162 self.event.set()
163
164
165class DeduplicateQueue(object):
166 def __init__(self):
167 self.queue = collections.deque()
168 self.condition = threading.Condition()
169
170 def qsize(self):
171 return len(self.queue)
172
173 def put(self, item):
174 # Returns the original item if added, or an equivalent item if
175 # already enqueued.
176 self.condition.acquire()
177 ret = None
178 try:
179 for x in self.queue:
180 if item == x:
181 ret = x
182 if ret is None:
183 ret = item
184 self.queue.append(item)
185 self.condition.notify()
186 finally:
187 self.condition.release()
188 return ret
189
190 def get(self):
191 self.condition.acquire()
192 try:
193 while True:
194 try:
195 ret = self.queue.popleft()
196 return ret
197 except IndexError:
198 pass
199 self.condition.wait()
200 finally:
201 self.condition.release()
202
203
204class LaunchServer(object):
205 log = logging.getLogger("zuul.LaunchServer")
206
James E. Blair854f8892017-02-02 11:25:39 -0800207 def __init__(self, config, connections={}, jobdir_root=None,
208 keep_jobdir=False):
James E. Blairf5dbd002015-12-23 15:26:17 -0800209 self.config = config
James E. Blair414cb672016-10-05 13:48:14 -0700210 self.keep_jobdir = keep_jobdir
James E. Blair854f8892017-02-02 11:25:39 -0800211 self.jobdir_root = jobdir_root
James E. Blair17302972016-08-10 16:11:42 -0700212 # TODOv3(mordred): make the launcher name more unique --
213 # perhaps hostname+pid.
214 self.hostname = socket.gethostname()
James E. Blairf5dbd002015-12-23 15:26:17 -0800215 self.zuul_url = config.get('merger', 'zuul_url')
James E. Blair414cb672016-10-05 13:48:14 -0700216 self.command_map = dict(
217 stop=self.stop,
218 pause=self.pause,
219 unpause=self.unpause,
220 graceful=self.graceful,
221 verbose=self.verboseOn,
222 unverbose=self.verboseOff,
223 )
James E. Blairf5dbd002015-12-23 15:26:17 -0800224
James E. Blair8c1be532017-02-07 14:04:12 -0800225 if self.config.has_option('launcher', 'git_dir'):
226 self.merge_root = self.config.get('launcher', 'git_dir')
James E. Blairf5dbd002015-12-23 15:26:17 -0800227 else:
James E. Blair8c1be532017-02-07 14:04:12 -0800228 self.merge_root = '/var/lib/zuul/launcher-git'
James E. Blairf5dbd002015-12-23 15:26:17 -0800229
230 if self.config.has_option('merger', 'git_user_email'):
231 self.merge_email = self.config.get('merger', 'git_user_email')
232 else:
233 self.merge_email = None
234
235 if self.config.has_option('merger', 'git_user_name'):
236 self.merge_name = self.config.get('merger', 'git_user_name')
237 else:
238 self.merge_name = None
239
240 self.connections = connections
241 self.merger = self._getMerger(self.merge_root)
242 self.update_queue = DeduplicateQueue()
243
James E. Blair414cb672016-10-05 13:48:14 -0700244 if self.config.has_option('zuul', 'state_dir'):
245 state_dir = os.path.expanduser(
246 self.config.get('zuul', 'state_dir'))
247 else:
248 state_dir = '/var/lib/zuul'
249 path = os.path.join(state_dir, 'launcher.socket')
250 self.command_socket = commandsocket.CommandSocket(path)
251 ansible_dir = os.path.join(state_dir, 'ansible')
James E. Blair414cb672016-10-05 13:48:14 -0700252 self.library_dir = os.path.join(ansible_dir, 'library')
253 if not os.path.exists(self.library_dir):
254 os.makedirs(self.library_dir)
Monty Taylorc231d932017-02-03 09:57:15 -0600255 self.action_dir = os.path.join(ansible_dir, 'action')
256 if not os.path.exists(self.action_dir):
257 os.makedirs(self.action_dir)
James E. Blair414cb672016-10-05 13:48:14 -0700258
James E. Blair414cb672016-10-05 13:48:14 -0700259 library_path = os.path.dirname(os.path.abspath(
260 zuul.ansible.library.__file__))
261 for fn in os.listdir(library_path):
262 shutil.copy(os.path.join(library_path, fn), self.library_dir)
Monty Taylorc231d932017-02-03 09:57:15 -0600263 action_path = os.path.dirname(os.path.abspath(
264 zuul.ansible.action.__file__))
265 for fn in os.listdir(action_path):
266 shutil.copy(os.path.join(action_path, fn), self.action_dir)
James E. Blair414cb672016-10-05 13:48:14 -0700267
Joshua Hesketh50c21782016-10-13 21:34:14 +1100268 self.job_workers = {}
269
James E. Blairf5dbd002015-12-23 15:26:17 -0800270 def _getMerger(self, root):
271 return zuul.merger.merger.Merger(root, self.connections,
272 self.merge_email, self.merge_name)
273
274 def start(self):
275 self._running = True
James E. Blair414cb672016-10-05 13:48:14 -0700276 self._command_running = True
James E. Blairf5dbd002015-12-23 15:26:17 -0800277 server = self.config.get('gearman', 'server')
278 if self.config.has_option('gearman', 'port'):
279 port = self.config.get('gearman', 'port')
280 else:
281 port = 4730
282 self.worker = gear.Worker('Zuul Launch Server')
283 self.worker.addServer(server, port)
284 self.log.debug("Waiting for server")
285 self.worker.waitForServer()
286 self.log.debug("Registering")
287 self.register()
James E. Blair414cb672016-10-05 13:48:14 -0700288
289 self.log.debug("Starting command processor")
290 self.command_socket.start()
291 self.command_thread = threading.Thread(target=self.runCommand)
292 self.command_thread.daemon = True
293 self.command_thread.start()
294
James E. Blairf5dbd002015-12-23 15:26:17 -0800295 self.log.debug("Starting worker")
296 self.update_thread = threading.Thread(target=self._updateLoop)
297 self.update_thread.daemon = True
298 self.update_thread.start()
299 self.thread = threading.Thread(target=self.run)
300 self.thread.daemon = True
301 self.thread.start()
302
303 def register(self):
304 self.worker.registerFunction("launcher:launch")
James E. Blair17302972016-08-10 16:11:42 -0700305 self.worker.registerFunction("launcher:stop:%s" % self.hostname)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700306 self.worker.registerFunction("merger:merge")
James E. Blairf5dbd002015-12-23 15:26:17 -0800307 self.worker.registerFunction("merger:cat")
308
309 def stop(self):
310 self.log.debug("Stopping")
311 self._running = False
312 self.worker.shutdown()
James E. Blair414cb672016-10-05 13:48:14 -0700313 self._command_running = False
314 self.command_socket.stop()
James E. Blair29b9c962017-02-13 16:17:22 -0800315 self.update_queue.put(None)
James E. Blairf5dbd002015-12-23 15:26:17 -0800316 self.log.debug("Stopped")
317
James E. Blair414cb672016-10-05 13:48:14 -0700318 def pause(self):
319 # TODOv3: implement
320 pass
321
322 def unpause(self):
323 # TODOv3: implement
324 pass
325
326 def graceful(self):
327 # TODOv3: implement
328 pass
329
330 def verboseOn(self):
331 # TODOv3: implement
332 pass
333
334 def verboseOff(self):
335 # TODOv3: implement
336 pass
337
James E. Blairf5dbd002015-12-23 15:26:17 -0800338 def join(self):
339 self.update_thread.join()
340 self.thread.join()
341
James E. Blair414cb672016-10-05 13:48:14 -0700342 def runCommand(self):
343 while self._command_running:
344 try:
345 command = self.command_socket.get()
Joshua Hesketh39ee7ce2016-12-09 12:11:39 +1100346 if command != '_stop':
347 self.command_map[command]()
James E. Blair414cb672016-10-05 13:48:14 -0700348 except Exception:
349 self.log.exception("Exception while processing command")
350
James E. Blairf5dbd002015-12-23 15:26:17 -0800351 def _updateLoop(self):
352 while self._running:
353 try:
354 self._innerUpdateLoop()
355 except:
356 self.log.exception("Exception in update thread:")
357
358 def _innerUpdateLoop(self):
359 # Inside of a loop that keeps the main repository up to date
360 task = self.update_queue.get()
James E. Blair29b9c962017-02-13 16:17:22 -0800361 if task is None:
362 # We are asked to stop
363 return
James E. Blairf5dbd002015-12-23 15:26:17 -0800364 self.log.info("Updating repo %s from %s" % (task.project, task.url))
365 self.merger.updateRepo(task.project, task.url)
366 self.log.debug("Finished updating repo %s from %s" %
367 (task.project, task.url))
368 task.setComplete()
369
370 def update(self, project, url):
371 task = UpdateTask(project, url)
372 task = self.update_queue.put(task)
373 return task
374
375 def run(self):
376 self.log.debug("Starting launch listener")
377 while self._running:
378 try:
379 job = self.worker.getJob()
380 try:
381 if job.name == 'launcher:launch':
382 self.log.debug("Got launch job: %s" % job.unique)
James E. Blair17302972016-08-10 16:11:42 -0700383 self.launchJob(job)
384 elif job.name.startswith('launcher:stop'):
385 self.log.debug("Got stop job: %s" % job.unique)
386 self.stopJob(job)
James E. Blairf5dbd002015-12-23 15:26:17 -0800387 elif job.name == 'merger:cat':
388 self.log.debug("Got cat job: %s" % job.unique)
389 self.cat(job)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700390 elif job.name == 'merger:merge':
391 self.log.debug("Got merge job: %s" % job.unique)
392 self.merge(job)
James E. Blairf5dbd002015-12-23 15:26:17 -0800393 else:
394 self.log.error("Unable to handle job %s" % job.name)
395 job.sendWorkFail()
396 except Exception:
397 self.log.exception("Exception while running job")
398 job.sendWorkException(traceback.format_exc())
James E. Blair29b9c962017-02-13 16:17:22 -0800399 except gear.InterruptedError:
400 pass
James E. Blairf5dbd002015-12-23 15:26:17 -0800401 except Exception:
402 self.log.exception("Exception while getting job")
403
James E. Blair17302972016-08-10 16:11:42 -0700404 def launchJob(self, job):
Joshua Hesketh50c21782016-10-13 21:34:14 +1100405 self.job_workers[job.unique] = AnsibleJob(self, job)
406 self.job_workers[job.unique].run()
James E. Blairf5dbd002015-12-23 15:26:17 -0800407
Joshua Hesketh50c21782016-10-13 21:34:14 +1100408 def finishJob(self, unique):
409 del(self.job_workers[unique])
410
411 def stopJob(self, job):
James E. Blaircaa83ad2017-01-27 08:58:07 -0800412 try:
413 args = json.loads(job.arguments)
414 self.log.debug("Stop job with arguments: %s" % (args,))
415 unique = args['uuid']
416 job_worker = self.job_workers.get(unique)
417 if not job_worker:
418 self.log.debug("Unable to find worker for job %s" % (unique,))
419 return
420 try:
421 job_worker.stop()
422 except Exception:
423 self.log.exception("Exception sending stop command "
424 "to worker:")
425 finally:
426 job.sendWorkComplete()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100427
428 def cat(self, job):
429 args = json.loads(job.arguments)
430 task = self.update(args['project'], args['url'])
431 task.wait()
432 files = self.merger.getFiles(args['project'], args['url'],
433 args['branch'], args['files'])
434 result = dict(updated=True,
435 files=files,
436 zuul_url=self.zuul_url)
437 job.sendWorkComplete(json.dumps(result))
438
439 def merge(self, job):
440 args = json.loads(job.arguments)
441 ret = self.merger.mergeChanges(args['items'], args.get('files'))
442 result = dict(merged=(ret is not None),
443 zuul_url=self.zuul_url)
444 if args.get('files'):
445 result['commit'], result['files'] = ret
446 else:
447 result['commit'] = ret
448 job.sendWorkComplete(json.dumps(result))
449
450
451class AnsibleJob(object):
452 log = logging.getLogger("zuul.AnsibleJob")
453
James E. Blair412fba82017-01-26 15:00:50 -0800454 RESULT_NORMAL = 1
455 RESULT_TIMED_OUT = 2
456 RESULT_UNREACHABLE = 3
457 RESULT_ABORTED = 4
458
Joshua Hesketh50c21782016-10-13 21:34:14 +1100459 def __init__(self, launcher_server, job):
460 self.launcher_server = launcher_server
461 self.job = job
James E. Blair412fba82017-01-26 15:00:50 -0800462 self.jobdir = None
James E. Blaircaa83ad2017-01-27 08:58:07 -0800463 self.proc = None
464 self.proc_lock = threading.Lock()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100465 self.running = False
James E. Blaircaa83ad2017-01-27 08:58:07 -0800466 self.aborted = False
Joshua Hesketh50c21782016-10-13 21:34:14 +1100467
468 if self.launcher_server.config.has_option(
469 'launcher', 'private_key_file'):
470 self.private_key_file = self.launcher_server.config.get(
471 'launcher', 'private_key_file')
472 else:
473 self.private_key_file = '~/.ssh/id_rsa'
474
475 def run(self):
476 self.running = True
477 self.thread = threading.Thread(target=self.launch)
478 self.thread.start()
479
480 def stop(self):
James E. Blaircaa83ad2017-01-27 08:58:07 -0800481 self.aborted = True
482 self.abortRunningProc()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100483 self.thread.join()
484
485 def launch(self):
486 try:
James E. Blair14e0bb22017-02-21 12:59:41 -0500487 self.jobdir = JobDir(root=self.launcher_server.jobdir_root,
488 keep=self.launcher_server.keep_jobdir)
Joshua Hesketh50c21782016-10-13 21:34:14 +1100489 self._launch()
James E. Blair096c5cd2017-02-02 15:33:18 -0800490 except Exception:
491 self.log.exception("Exception while launching job")
492 self.job.sendWorkException(traceback.format_exc())
Joshua Hesketh50c21782016-10-13 21:34:14 +1100493 finally:
494 self.running = False
James E. Blair412fba82017-01-26 15:00:50 -0800495 try:
496 self.jobdir.cleanup()
497 except Exception:
498 self.log.exception("Error cleaning up jobdir:")
499 try:
500 self.launcher_server.finishJob(self.job.unique)
501 except Exception:
502 self.log.exception("Error finalizing job thread:")
Joshua Hesketh50c21782016-10-13 21:34:14 +1100503
504 def _launch(self):
505 self.log.debug("Job %s: beginning" % (self.job.unique,))
James E. Blaire47eb772017-02-02 17:19:40 -0800506 self.log.debug("Job %s: args: %s" % (self.job.unique,
507 self.job.arguments,))
James E. Blair412fba82017-01-26 15:00:50 -0800508 self.log.debug("Job %s: job root at %s" %
509 (self.job.unique, self.jobdir.root))
510 args = json.loads(self.job.arguments)
511 tasks = []
512 for project in args['projects']:
513 self.log.debug("Job %s: updating project %s" %
514 (self.job.unique, project['name']))
515 tasks.append(self.launcher_server.update(
516 project['name'], project['url']))
517 for task in tasks:
518 task.wait()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100519
James E. Blair412fba82017-01-26 15:00:50 -0800520 self.log.debug("Job %s: git updates complete" % (self.job.unique,))
521 merger = self.launcher_server._getMerger(self.jobdir.git_root)
522 merge_items = [i for i in args['items'] if i.get('refspec')]
523 if merge_items:
524 commit = merger.mergeChanges(merge_items) # noqa
525 else:
526 commit = args['items'][-1]['newrev'] # noqa
James E. Blair82938472016-01-11 14:38:13 -0800527
James E. Blair412fba82017-01-26 15:00:50 -0800528 # is the playbook in a repo that we have already prepared?
James E. Blair66b274e2017-01-31 14:47:52 -0800529 self.preparePlaybookRepos(args)
James E. Blairc73c73a2017-01-20 15:15:15 -0800530
James E. Blair5ac93842017-01-20 06:47:34 -0800531 self.prepareRoles(args)
532
James E. Blair412fba82017-01-26 15:00:50 -0800533 # TODOv3: Ansible the ansible thing here.
534 self.prepareAnsibleFiles(args)
James E. Blairf5dbd002015-12-23 15:26:17 -0800535
James E. Blair412fba82017-01-26 15:00:50 -0800536 data = {
537 'manager': self.launcher_server.hostname,
538 'url': 'https://server/job/{}/0/'.format(args['job']),
539 'worker_name': 'My Worker',
540 }
James E. Blair17302972016-08-10 16:11:42 -0700541
James E. Blair412fba82017-01-26 15:00:50 -0800542 # TODOv3:
543 # 'name': self.name,
544 # 'manager': self.launch_server.hostname,
545 # 'worker_name': 'My Worker',
546 # 'worker_hostname': 'localhost',
547 # 'worker_ips': ['127.0.0.1', '192.168.1.1'],
548 # 'worker_fqdn': 'zuul.example.org',
549 # 'worker_program': 'FakeBuilder',
550 # 'worker_version': 'v1.1',
551 # 'worker_extra': {'something': 'else'}
James E. Blair17302972016-08-10 16:11:42 -0700552
James E. Blair412fba82017-01-26 15:00:50 -0800553 self.job.sendWorkData(json.dumps(data))
554 self.job.sendWorkStatus(0, 100)
James E. Blairf5dbd002015-12-23 15:26:17 -0800555
James E. Blair412fba82017-01-26 15:00:50 -0800556 result = self.runPlaybooks()
557
558 if result is None:
559 self.job.sendWorkFail()
560 return
561 result = dict(result=result)
562 self.job.sendWorkComplete(json.dumps(result))
563
564 def runPlaybooks(self):
565 result = None
566
James E. Blair66b274e2017-01-31 14:47:52 -0800567 for playbook in self.jobdir.pre_playbooks:
568 pre_status, pre_code = self.runAnsiblePlaybook(playbook)
569 if pre_status != self.RESULT_NORMAL or pre_code != 0:
570 # These should really never fail, so return None and have
571 # zuul try again
572 return result
James E. Blair412fba82017-01-26 15:00:50 -0800573
James E. Blair66b274e2017-01-31 14:47:52 -0800574 job_status, job_code = self.runAnsiblePlaybook(self.jobdir.playbook)
James E. Blaircaa83ad2017-01-27 08:58:07 -0800575 if job_status == self.RESULT_TIMED_OUT:
576 return 'TIMED_OUT'
577 if job_status == self.RESULT_ABORTED:
578 return 'ABORTED'
James E. Blair412fba82017-01-26 15:00:50 -0800579 if job_status != self.RESULT_NORMAL:
580 # The result of the job is indeterminate. Zuul will
581 # run it again.
582 return result
583
James E. Blair66b274e2017-01-31 14:47:52 -0800584 success = (job_code == 0)
585 if success:
James E. Blair412fba82017-01-26 15:00:50 -0800586 result = 'SUCCESS'
587 else:
588 result = 'FAILURE'
James E. Blair66b274e2017-01-31 14:47:52 -0800589
590 for playbook in self.jobdir.post_playbooks:
591 post_status, post_code = self.runAnsiblePlaybook(
592 playbook, success)
593 if post_status != self.RESULT_NORMAL or post_code != 0:
594 result = 'POST_FAILURE'
James E. Blair412fba82017-01-26 15:00:50 -0800595 return result
James E. Blair17302972016-08-10 16:11:42 -0700596
James E. Blair82938472016-01-11 14:38:13 -0800597 def getHostList(self, args):
James E. Blairad8dca02017-02-21 11:48:32 -0500598 # TODO(clarkb): This prefers v4 because we're not sure if we
599 # expect v6 to work. If we can determine how to prefer v6
600 hosts = []
James E. Blair34776ee2016-08-25 13:53:54 -0700601 for node in args['nodes']:
James E. Blairad8dca02017-02-21 11:48:32 -0500602 ip = node.get('public_ipv4')
603 if not ip:
604 ip = node.get('public_ipv6')
605 hosts.append((node['name'], dict(ansible_host=ip)))
James E. Blair34776ee2016-08-25 13:53:54 -0700606 return hosts
James E. Blair82938472016-01-11 14:38:13 -0800607
James E. Blair5ac93842017-01-20 06:47:34 -0800608 def _blockPluginDirs(self, path):
609 '''Prevent execution of playbooks or roles with plugins
Monty Taylorc231d932017-02-03 09:57:15 -0600610
James E. Blair5ac93842017-01-20 06:47:34 -0800611 Plugins are loaded from roles and also if there is a plugin
612 dir adjacent to the playbook. Throw an error if the path
613 contains a location that would cause a plugin to get loaded.
614
Monty Taylorc231d932017-02-03 09:57:15 -0600615 '''
James E. Blair5ac93842017-01-20 06:47:34 -0800616 for entry in os.listdir(path):
Monty Taylorc231d932017-02-03 09:57:15 -0600617 if os.path.isdir(entry) and entry.endswith('_plugins'):
618 raise Exception(
619 "Ansible plugin dir %s found adjacent to playbook %s in"
James E. Blair5ac93842017-01-20 06:47:34 -0800620 " non-secure repo." % (entry, path))
Monty Taylorc231d932017-02-03 09:57:15 -0600621
622 def findPlaybook(self, path, required=False, secure=False):
James E. Blaird130f712017-01-25 14:56:10 -0800623 for ext in ['.yaml', '.yml']:
624 fn = path + ext
625 if os.path.exists(fn):
Monty Taylorc231d932017-02-03 09:57:15 -0600626 if not secure:
James E. Blair5ac93842017-01-20 06:47:34 -0800627 playbook_dir = os.path.dirname(os.path.abspath(fn))
628 self._blockPluginDirs(playbook_dir)
James E. Blaird130f712017-01-25 14:56:10 -0800629 return fn
James E. Blaira7f51ca2017-02-07 16:01:26 -0800630 if required:
631 raise Exception("Unable to find playbook %s" % path)
632 return None
James E. Blaird130f712017-01-25 14:56:10 -0800633
James E. Blair66b274e2017-01-31 14:47:52 -0800634 def preparePlaybookRepos(self, args):
635 for playbook in args['pre_playbooks']:
636 jobdir_playbook = self.jobdir.addPrePlaybook()
James E. Blaira7f51ca2017-02-07 16:01:26 -0800637 self.preparePlaybookRepo(jobdir_playbook, playbook,
James E. Blair6541c1c2017-02-15 16:14:56 -0800638 args, required=True)
James E. Blair66b274e2017-01-31 14:47:52 -0800639
James E. Blaira7f51ca2017-02-07 16:01:26 -0800640 for playbook in args['playbooks']:
641 jobdir_playbook = self.jobdir.addPlaybook()
642 self.preparePlaybookRepo(jobdir_playbook, playbook,
James E. Blair6541c1c2017-02-15 16:14:56 -0800643 args, required=False)
James E. Blaira7f51ca2017-02-07 16:01:26 -0800644 if jobdir_playbook.path is not None:
645 self.jobdir.playbook = jobdir_playbook
646 break
647 if self.jobdir.playbook is None:
648 raise Exception("No valid playbook found")
James E. Blair66b274e2017-01-31 14:47:52 -0800649
650 for playbook in args['post_playbooks']:
651 jobdir_playbook = self.jobdir.addPostPlaybook()
James E. Blaira7f51ca2017-02-07 16:01:26 -0800652 self.preparePlaybookRepo(jobdir_playbook, playbook,
James E. Blair6541c1c2017-02-15 16:14:56 -0800653 args, required=True)
James E. Blair66b274e2017-01-31 14:47:52 -0800654
James E. Blair6541c1c2017-02-15 16:14:56 -0800655 def preparePlaybookRepo(self, jobdir_playbook, playbook, args, required):
James E. Blaira7f51ca2017-02-07 16:01:26 -0800656 self.log.debug("Prepare playbook repo for %s" % (playbook,))
James E. Blair66b274e2017-01-31 14:47:52 -0800657 # Check out the playbook repo if needed and set the path to
James E. Blairc73c73a2017-01-20 15:15:15 -0800658 # the playbook that should be run.
James E. Blair66b274e2017-01-31 14:47:52 -0800659 jobdir_playbook.secure = playbook['secure']
Joshua Hesketh50c21782016-10-13 21:34:14 +1100660 source = self.launcher_server.connections.getSource(
661 playbook['connection'])
James E. Blairc73c73a2017-01-20 15:15:15 -0800662 project = source.getProject(playbook['project'])
663 # TODO(jeblair): construct the url in the merger itself
664 url = source.getGitUrl(project)
James E. Blair66b274e2017-01-31 14:47:52 -0800665 if not playbook['secure']:
James E. Blairc73c73a2017-01-20 15:15:15 -0800666 # This is a project repo, so it is safe to use the already
667 # checked out version (from speculative merging) of the
668 # playbook
669 for i in args['items']:
670 if (i['connection_name'] == playbook['connection'] and
671 i['project'] == playbook['project']):
672 # We already have this repo prepared
James E. Blair412fba82017-01-26 15:00:50 -0800673 path = os.path.join(self.jobdir.git_root,
James E. Blairc73c73a2017-01-20 15:15:15 -0800674 project.name,
675 playbook['path'])
Monty Taylorc231d932017-02-03 09:57:15 -0600676 jobdir_playbook.path = self.findPlaybook(
677 path,
James E. Blair6541c1c2017-02-15 16:14:56 -0800678 required=required,
Monty Taylorc231d932017-02-03 09:57:15 -0600679 secure=playbook['secure'])
James E. Blair66b274e2017-01-31 14:47:52 -0800680 return
James E. Blairc73c73a2017-01-20 15:15:15 -0800681 # The playbook repo is either a config repo, or it isn't in
682 # the stack of changes we are testing, so check out the branch
683 # tip into a dedicated space.
684
James E. Blair66b274e2017-01-31 14:47:52 -0800685 merger = self.launcher_server._getMerger(jobdir_playbook.root)
James E. Blairc73c73a2017-01-20 15:15:15 -0800686 merger.checkoutBranch(project.name, url, playbook['branch'])
687
James E. Blair66b274e2017-01-31 14:47:52 -0800688 path = os.path.join(jobdir_playbook.root,
James E. Blairc73c73a2017-01-20 15:15:15 -0800689 project.name,
690 playbook['path'])
Monty Taylorc231d932017-02-03 09:57:15 -0600691 jobdir_playbook.path = self.findPlaybook(
692 path,
James E. Blair6541c1c2017-02-15 16:14:56 -0800693 required=required,
Monty Taylorc231d932017-02-03 09:57:15 -0600694 secure=playbook['secure'])
James E. Blairc73c73a2017-01-20 15:15:15 -0800695
James E. Blair5ac93842017-01-20 06:47:34 -0800696 def prepareRoles(self, args):
697 for role in args['roles']:
698 if role['type'] == 'zuul':
699 root = self.jobdir.addRole()
700 self.prepareZuulRole(args, role, root)
701
702 def findRole(self, path, secure=False):
703 d = os.path.join(path, 'tasks')
704 if os.path.isdir(d):
705 # This is a bare role
706 if not secure:
707 self._blockPluginDirs(path)
708 # None signifies that the repo is a bare role
709 return None
710 d = os.path.join(path, 'roles')
711 if os.path.isdir(d):
712 # This repo has a collection of roles
713 if not secure:
714 for entry in os.listdir(d):
715 self._blockPluginDirs(os.path.join(d, entry))
716 return d
717 # We assume the repository itself is a collection of roles
718 if not secure:
719 for entry in os.listdir(path):
720 self._blockPluginDirs(os.path.join(path, entry))
721 return path
722
723 def prepareZuulRole(self, args, role, root):
724 self.log.debug("Prepare zuul role for %s" % (role,))
725 # Check out the role repo if needed
726 source = self.launcher_server.connections.getSource(
727 role['connection'])
728 project = source.getProject(role['project'])
729 # TODO(jeblair): construct the url in the merger itself
730 url = source.getGitUrl(project)
731 role_repo = None
732 if not role['secure']:
733 # This is a project repo, so it is safe to use the already
734 # checked out version (from speculative merging) of the
735 # role
736
737 for i in args['items']:
738 if (i['connection_name'] == role['connection'] and
739 i['project'] == role['project']):
740 # We already have this repo prepared;
741 # copy it into location.
742
743 path = os.path.join(self.jobdir.git_root,
744 project.name)
745 link = os.path.join(root, role['name'])
746 os.symlink(path, link)
747 role_repo = link
748 break
749
750 # The role repo is either a config repo, or it isn't in
751 # the stack of changes we are testing, so check out the branch
752 # tip into a dedicated space.
753
754 if not role_repo:
755 merger = self.launcher_server._getMerger(root)
756 merger.checkoutBranch(project.name, url, 'master')
757 role_repo = os.path.join(root, project.name)
758
759 role_path = self.findRole(role_repo, secure=role['secure'])
760 if role_path is None:
761 # In the case of a bare role, add the containing directory
762 role_path = root
763 self.jobdir.roles_path.append(role_path)
764
James E. Blair412fba82017-01-26 15:00:50 -0800765 def prepareAnsibleFiles(self, args):
766 with open(self.jobdir.inventory, 'w') as inventory:
James E. Blair82938472016-01-11 14:38:13 -0800767 for host_name, host_vars in self.getHostList(args):
768 inventory.write(host_name)
769 inventory.write(' ')
770 for k, v in host_vars.items():
771 inventory.write('%s=%s' % (k, v))
772 inventory.write('\n')
James E. Blair6c0978c2017-02-21 15:03:24 -0500773 if 'ansible_host' in host_vars:
774 os.system("ssh-keyscan %s >> %s" % (
775 host_vars['ansible_host'],
776 self.jobdir.known_hosts))
777
James E. Blair412fba82017-01-26 15:00:50 -0800778 with open(self.jobdir.vars, 'w') as vars_yaml:
James E. Blaira92cbc82017-01-23 14:56:49 -0800779 zuul_vars = dict(zuul=args['zuul'])
780 vars_yaml.write(
781 yaml.safe_dump(zuul_vars, default_flow_style=False))
Monty Taylorc231d932017-02-03 09:57:15 -0600782 self.writeAnsibleConfig(self.jobdir.config)
783 self.writeAnsibleConfig(self.jobdir.secure_config, secure=True)
784
785 def writeAnsibleConfig(self, config_path, secure=False):
786 with open(config_path, 'w') as config:
James E. Blair82938472016-01-11 14:38:13 -0800787 config.write('[defaults]\n')
James E. Blair412fba82017-01-26 15:00:50 -0800788 config.write('hostfile = %s\n' % self.jobdir.inventory)
789 config.write('local_tmp = %s/.ansible/local_tmp\n' %
790 self.jobdir.root)
791 config.write('remote_tmp = %s/.ansible/remote_tmp\n' %
792 self.jobdir.root)
James E. Blair414cb672016-10-05 13:48:14 -0700793 config.write('private_key_file = %s\n' % self.private_key_file)
794 config.write('retry_files_enabled = False\n')
James E. Blair412fba82017-01-26 15:00:50 -0800795 config.write('log_path = %s\n' % self.jobdir.ansible_log)
James E. Blair414cb672016-10-05 13:48:14 -0700796 config.write('gathering = explicit\n')
Joshua Hesketh50c21782016-10-13 21:34:14 +1100797 config.write('library = %s\n'
798 % self.launcher_server.library_dir)
James E. Blair5ac93842017-01-20 06:47:34 -0800799 if self.jobdir.roles_path:
800 config.write('roles_path = %s\n' %
801 ':'.join(self.jobdir.roles_path))
James E. Blair414cb672016-10-05 13:48:14 -0700802 # bump the timeout because busy nodes may take more than
803 # 10s to respond
804 config.write('timeout = 30\n')
Monty Taylorc231d932017-02-03 09:57:15 -0600805 if not secure:
806 config.write('action_plugins = %s\n'
807 % self.launcher_server.action_dir)
James E. Blair414cb672016-10-05 13:48:14 -0700808
Monty Taylor40728e32017-02-20 07:06:58 -0500809 # On secure jobs, we want to prevent the printing of args,
810 # since secure jobs might have access to secrets that they may
811 # need to pass to a task or a role. On the other hand, there
812 # should be no sensitive data in insecure jobs, and printing
813 # the args could be useful for debugging.
814 config.write('display_args_to_stdout = %s\n' %
815 str(not secure))
816
James E. Blair414cb672016-10-05 13:48:14 -0700817 config.write('[ssh_connection]\n')
Joshua Hesketh3f7def32016-11-21 17:36:44 +1100818 # NB: when setting pipelining = True, keep_remote_files
819 # must be False (the default). Otherwise it apparently
820 # will override the pipelining option and effectively
821 # disable it. Pipelining has a side effect of running the
822 # command without a tty (ie, without the -tt argument to
823 # ssh). We require this behavior so that if a job runs a
824 # command which expects interactive input on a tty (such
825 # as sudo) it does not hang.
826 config.write('pipelining = True\n')
James E. Blair414cb672016-10-05 13:48:14 -0700827 ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
James E. Blair412fba82017-01-26 15:00:50 -0800828 "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
James E. Blair414cb672016-10-05 13:48:14 -0700829 config.write('ssh_args = %s\n' % ssh_args)
830
James E. Blaircaa83ad2017-01-27 08:58:07 -0800831 def _ansibleTimeout(self, msg):
James E. Blair414cb672016-10-05 13:48:14 -0700832 self.log.warning(msg)
James E. Blaircaa83ad2017-01-27 08:58:07 -0800833 self.abortRunningProc()
James E. Blair414cb672016-10-05 13:48:14 -0700834
James E. Blaircaa83ad2017-01-27 08:58:07 -0800835 def abortRunningProc(self):
836 with self.proc_lock:
837 if not self.proc:
838 self.log.debug("Abort: no process is running")
839 return
840 self.log.debug("Abort: sending kill signal to job "
841 "process group")
842 try:
843 pgid = os.getpgid(self.proc.pid)
844 os.killpg(pgid, signal.SIGKILL)
845 except Exception:
846 self.log.exception("Exception while killing "
847 "ansible process:")
James E. Blair82938472016-01-11 14:38:13 -0800848
Monty Taylorc231d932017-02-03 09:57:15 -0600849 def runAnsible(self, cmd, timeout, secure=False):
James E. Blair414cb672016-10-05 13:48:14 -0700850 env_copy = os.environ.copy()
851 env_copy['LOGNAME'] = 'zuul'
852
Monty Taylorc231d932017-02-03 09:57:15 -0600853 if secure:
854 cwd = self.jobdir.secure_ansible_root
855 else:
856 cwd = self.jobdir.ansible_root
857
James E. Blaircaa83ad2017-01-27 08:58:07 -0800858 with self.proc_lock:
859 if self.aborted:
860 return (self.RESULT_ABORTED, None)
861 self.log.debug("Ansible command: %s" % (cmd,))
862 self.proc = subprocess.Popen(
863 cmd,
Monty Taylorc231d932017-02-03 09:57:15 -0600864 cwd=cwd,
James E. Blaircaa83ad2017-01-27 08:58:07 -0800865 stdout=subprocess.PIPE,
866 stderr=subprocess.STDOUT,
867 preexec_fn=os.setsid,
868 env=env_copy,
869 )
James E. Blair414cb672016-10-05 13:48:14 -0700870
871 ret = None
James E. Blair414cb672016-10-05 13:48:14 -0700872 watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
873 self._ansibleTimeout,
James E. Blaircaa83ad2017-01-27 08:58:07 -0800874 ("Ansible timeout exceeded",))
James E. Blair414cb672016-10-05 13:48:14 -0700875 watchdog.start()
876 try:
James E. Blaircaa83ad2017-01-27 08:58:07 -0800877 for line in iter(self.proc.stdout.readline, b''):
James E. Blair414cb672016-10-05 13:48:14 -0700878 line = line[:1024].rstrip()
879 self.log.debug("Ansible output: %s" % (line,))
James E. Blaircaa83ad2017-01-27 08:58:07 -0800880 ret = self.proc.wait()
James E. Blair414cb672016-10-05 13:48:14 -0700881 finally:
882 watchdog.stop()
883 self.log.debug("Ansible exit code: %s" % (ret,))
884
James E. Blaircaa83ad2017-01-27 08:58:07 -0800885 with self.proc_lock:
886 self.proc = None
887
James E. Blair414cb672016-10-05 13:48:14 -0700888 if watchdog.timed_out:
James E. Blair412fba82017-01-26 15:00:50 -0800889 return (self.RESULT_TIMED_OUT, None)
James E. Blair414cb672016-10-05 13:48:14 -0700890 if ret == 3:
891 # AnsibleHostUnreachable: We had a network issue connecting to
892 # our zuul-worker.
James E. Blair412fba82017-01-26 15:00:50 -0800893 return (self.RESULT_UNREACHABLE, None)
James E. Blair414cb672016-10-05 13:48:14 -0700894 elif ret == -9:
895 # Received abort request.
James E. Blair412fba82017-01-26 15:00:50 -0800896 return (self.RESULT_ABORTED, None)
James E. Blair414cb672016-10-05 13:48:14 -0700897
James E. Blair412fba82017-01-26 15:00:50 -0800898 return (self.RESULT_NORMAL, ret)
899
James E. Blair66b274e2017-01-31 14:47:52 -0800900 def runAnsiblePlaybook(self, playbook, success=None):
James E. Blair412fba82017-01-26 15:00:50 -0800901 env_copy = os.environ.copy()
902 env_copy['LOGNAME'] = 'zuul'
903
904 if False: # TODOv3: self.options['verbose']:
905 verbose = '-vvv'
906 else:
907 verbose = '-v'
908
James E. Blair66b274e2017-01-31 14:47:52 -0800909 cmd = ['ansible-playbook', playbook.path]
James E. Blair412fba82017-01-26 15:00:50 -0800910
James E. Blair66b274e2017-01-31 14:47:52 -0800911 if success is not None:
912 cmd.extend(['-e', 'success=%s' % str(bool(success))])
James E. Blair412fba82017-01-26 15:00:50 -0800913
James E. Blair66b274e2017-01-31 14:47:52 -0800914 cmd.extend(['-e@%s' % self.jobdir.vars, verbose])
James E. Blair412fba82017-01-26 15:00:50 -0800915
James E. Blair412fba82017-01-26 15:00:50 -0800916 # TODOv3: get this from the job
917 timeout = 60
918
Monty Taylorc231d932017-02-03 09:57:15 -0600919 return self.runAnsible(
920 cmd=cmd, timeout=timeout, secure=playbook.secure)