blob: 739ffc594bef015c482233f2535fec769dd834c6 [file] [log] [blame]
James E. Blairf5dbd002015-12-23 15:26:17 -08001# Copyright 2014 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import collections
16import json
17import logging
James E. Blair82938472016-01-11 14:38:13 -080018import os
James E. Blairf5dbd002015-12-23 15:26:17 -080019import shutil
James E. Blair414cb672016-10-05 13:48:14 -070020import signal
James E. Blair17302972016-08-10 16:11:42 -070021import socket
James E. Blair82938472016-01-11 14:38:13 -080022import subprocess
James E. Blairf5dbd002015-12-23 15:26:17 -080023import tempfile
24import threading
James E. Blair414cb672016-10-05 13:48:14 -070025import time
James E. Blairf5dbd002015-12-23 15:26:17 -080026import traceback
James E. Blaira92cbc82017-01-23 14:56:49 -080027import yaml
James E. Blairf5dbd002015-12-23 15:26:17 -080028
29import gear
30
31import zuul.merger
James E. Blair414cb672016-10-05 13:48:14 -070032import zuul.ansible.library
James E. Blair414cb672016-10-05 13:48:14 -070033from zuul.lib import commandsocket
James E. Blairf5dbd002015-12-23 15:26:17 -080034
James E. Blair414cb672016-10-05 13:48:14 -070035ANSIBLE_WATCHDOG_GRACE = 5 * 60
36
37
38class Watchdog(object):
39 def __init__(self, timeout, function, args):
40 self.timeout = timeout
41 self.function = function
42 self.args = args
43 self.thread = threading.Thread(target=self._run)
44 self.thread.daemon = True
45 self.timed_out = None
46
47 def _run(self):
48 while self._running and time.time() < self.end:
49 time.sleep(10)
50 if self._running:
51 self.timed_out = True
52 self.function(*self.args)
53 self.timed_out = False
54
55 def start(self):
56 self._running = True
57 self.end = time.time() + self.timeout
58 self.thread.start()
59
60 def stop(self):
61 self._running = False
James E. Blairf5dbd002015-12-23 15:26:17 -080062
James E. Blair23161912016-07-28 15:42:14 -070063# TODOv3(mordred): put git repos in a hierarchy that includes source
64# hostname, eg: git.openstack.org/openstack/nova. Also, configure
65# sources to have an alias, so that the review.openstack.org source
66# repos end up in git.openstack.org.
67
James E. Blair414cb672016-10-05 13:48:14 -070068
James E. Blair66b274e2017-01-31 14:47:52 -080069class JobDirPlaybook(object):
70 def __init__(self, root):
71 self.root = root
72 self.secure = None
73 self.path = None
74
75
James E. Blair82938472016-01-11 14:38:13 -080076class JobDir(object):
James E. Blair414cb672016-10-05 13:48:14 -070077 def __init__(self, keep=False):
78 self.keep = keep
James E. Blair82938472016-01-11 14:38:13 -080079 self.root = tempfile.mkdtemp()
80 self.git_root = os.path.join(self.root, 'git')
81 os.makedirs(self.git_root)
82 self.ansible_root = os.path.join(self.root, 'ansible')
83 os.makedirs(self.ansible_root)
James E. Blair414cb672016-10-05 13:48:14 -070084 self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
James E. Blair82938472016-01-11 14:38:13 -080085 self.inventory = os.path.join(self.ansible_root, 'inventory')
James E. Blaira92cbc82017-01-23 14:56:49 -080086 self.vars = os.path.join(self.ansible_root, 'vars.yaml')
James E. Blairc73c73a2017-01-20 15:15:15 -080087 self.playbook_root = os.path.join(self.ansible_root, 'playbook')
88 os.makedirs(self.playbook_root)
James E. Blair66b274e2017-01-31 14:47:52 -080089 self.playbook = JobDirPlaybook(self.playbook_root)
90 self.pre_playbooks = []
91 self.post_playbooks = []
James E. Blair82938472016-01-11 14:38:13 -080092 self.config = os.path.join(self.ansible_root, 'ansible.cfg')
James E. Blair414cb672016-10-05 13:48:14 -070093 self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
James E. Blairf5dbd002015-12-23 15:26:17 -080094
James E. Blair66b274e2017-01-31 14:47:52 -080095 def addPrePlaybook(self):
96 count = len(self.pre_playbooks)
97 root = os.path.join(self.ansible_root, 'pre_playbook_%i' % (count,))
98 os.makedirs(root)
99 playbook = JobDirPlaybook(root)
100 self.pre_playbooks.append(playbook)
101 return playbook
102
103 def addPostPlaybook(self):
104 count = len(self.post_playbooks)
105 root = os.path.join(self.ansible_root, 'post_playbook_%i' % (count,))
106 os.makedirs(root)
107 playbook = JobDirPlaybook(root)
108 self.post_playbooks.append(playbook)
109 return playbook
110
James E. Blair412fba82017-01-26 15:00:50 -0800111 def cleanup(self):
112 if not self.keep:
113 shutil.rmtree(self.root)
114
James E. Blairf5dbd002015-12-23 15:26:17 -0800115 def __enter__(self):
James E. Blair82938472016-01-11 14:38:13 -0800116 return self
James E. Blairf5dbd002015-12-23 15:26:17 -0800117
118 def __exit__(self, etype, value, tb):
James E. Blair412fba82017-01-26 15:00:50 -0800119 self.cleanup()
James E. Blairf5dbd002015-12-23 15:26:17 -0800120
121
122class UpdateTask(object):
123 def __init__(self, project, url):
124 self.project = project
125 self.url = url
126 self.event = threading.Event()
127
128 def __eq__(self, other):
129 if other.project == self.project:
130 return True
131 return False
132
133 def wait(self):
134 self.event.wait()
135
136 def setComplete(self):
137 self.event.set()
138
139
140class DeduplicateQueue(object):
141 def __init__(self):
142 self.queue = collections.deque()
143 self.condition = threading.Condition()
144
145 def qsize(self):
146 return len(self.queue)
147
148 def put(self, item):
149 # Returns the original item if added, or an equivalent item if
150 # already enqueued.
151 self.condition.acquire()
152 ret = None
153 try:
154 for x in self.queue:
155 if item == x:
156 ret = x
157 if ret is None:
158 ret = item
159 self.queue.append(item)
160 self.condition.notify()
161 finally:
162 self.condition.release()
163 return ret
164
165 def get(self):
166 self.condition.acquire()
167 try:
168 while True:
169 try:
170 ret = self.queue.popleft()
171 return ret
172 except IndexError:
173 pass
174 self.condition.wait()
175 finally:
176 self.condition.release()
177
178
179class LaunchServer(object):
180 log = logging.getLogger("zuul.LaunchServer")
181
James E. Blair414cb672016-10-05 13:48:14 -0700182 def __init__(self, config, connections={}, keep_jobdir=False):
James E. Blairf5dbd002015-12-23 15:26:17 -0800183 self.config = config
James E. Blair414cb672016-10-05 13:48:14 -0700184 self.keep_jobdir = keep_jobdir
James E. Blair17302972016-08-10 16:11:42 -0700185 # TODOv3(mordred): make the launcher name more unique --
186 # perhaps hostname+pid.
187 self.hostname = socket.gethostname()
James E. Blairf5dbd002015-12-23 15:26:17 -0800188 self.zuul_url = config.get('merger', 'zuul_url')
James E. Blair414cb672016-10-05 13:48:14 -0700189 self.command_map = dict(
190 stop=self.stop,
191 pause=self.pause,
192 unpause=self.unpause,
193 graceful=self.graceful,
194 verbose=self.verboseOn,
195 unverbose=self.verboseOff,
196 )
James E. Blairf5dbd002015-12-23 15:26:17 -0800197
198 if self.config.has_option('merger', 'git_dir'):
199 self.merge_root = self.config.get('merger', 'git_dir')
200 else:
201 self.merge_root = '/var/lib/zuul/git'
202
203 if self.config.has_option('merger', 'git_user_email'):
204 self.merge_email = self.config.get('merger', 'git_user_email')
205 else:
206 self.merge_email = None
207
208 if self.config.has_option('merger', 'git_user_name'):
209 self.merge_name = self.config.get('merger', 'git_user_name')
210 else:
211 self.merge_name = None
212
213 self.connections = connections
214 self.merger = self._getMerger(self.merge_root)
215 self.update_queue = DeduplicateQueue()
216
James E. Blair414cb672016-10-05 13:48:14 -0700217 if self.config.has_option('zuul', 'state_dir'):
218 state_dir = os.path.expanduser(
219 self.config.get('zuul', 'state_dir'))
220 else:
221 state_dir = '/var/lib/zuul'
222 path = os.path.join(state_dir, 'launcher.socket')
223 self.command_socket = commandsocket.CommandSocket(path)
224 ansible_dir = os.path.join(state_dir, 'ansible')
James E. Blair414cb672016-10-05 13:48:14 -0700225 self.library_dir = os.path.join(ansible_dir, 'library')
226 if not os.path.exists(self.library_dir):
227 os.makedirs(self.library_dir)
228
James E. Blair414cb672016-10-05 13:48:14 -0700229 library_path = os.path.dirname(os.path.abspath(
230 zuul.ansible.library.__file__))
231 for fn in os.listdir(library_path):
232 shutil.copy(os.path.join(library_path, fn), self.library_dir)
233
Joshua Hesketh50c21782016-10-13 21:34:14 +1100234 self.job_workers = {}
235
James E. Blairf5dbd002015-12-23 15:26:17 -0800236 def _getMerger(self, root):
237 return zuul.merger.merger.Merger(root, self.connections,
238 self.merge_email, self.merge_name)
239
240 def start(self):
241 self._running = True
James E. Blair414cb672016-10-05 13:48:14 -0700242 self._command_running = True
James E. Blairf5dbd002015-12-23 15:26:17 -0800243 server = self.config.get('gearman', 'server')
244 if self.config.has_option('gearman', 'port'):
245 port = self.config.get('gearman', 'port')
246 else:
247 port = 4730
248 self.worker = gear.Worker('Zuul Launch Server')
249 self.worker.addServer(server, port)
250 self.log.debug("Waiting for server")
251 self.worker.waitForServer()
252 self.log.debug("Registering")
253 self.register()
James E. Blair414cb672016-10-05 13:48:14 -0700254
255 self.log.debug("Starting command processor")
256 self.command_socket.start()
257 self.command_thread = threading.Thread(target=self.runCommand)
258 self.command_thread.daemon = True
259 self.command_thread.start()
260
James E. Blairf5dbd002015-12-23 15:26:17 -0800261 self.log.debug("Starting worker")
262 self.update_thread = threading.Thread(target=self._updateLoop)
263 self.update_thread.daemon = True
264 self.update_thread.start()
265 self.thread = threading.Thread(target=self.run)
266 self.thread.daemon = True
267 self.thread.start()
268
269 def register(self):
270 self.worker.registerFunction("launcher:launch")
James E. Blair17302972016-08-10 16:11:42 -0700271 self.worker.registerFunction("launcher:stop:%s" % self.hostname)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700272 self.worker.registerFunction("merger:merge")
James E. Blairf5dbd002015-12-23 15:26:17 -0800273 self.worker.registerFunction("merger:cat")
274
275 def stop(self):
276 self.log.debug("Stopping")
277 self._running = False
278 self.worker.shutdown()
James E. Blair414cb672016-10-05 13:48:14 -0700279 self._command_running = False
280 self.command_socket.stop()
James E. Blairf5dbd002015-12-23 15:26:17 -0800281 self.log.debug("Stopped")
282
James E. Blair414cb672016-10-05 13:48:14 -0700283 def pause(self):
284 # TODOv3: implement
285 pass
286
287 def unpause(self):
288 # TODOv3: implement
289 pass
290
291 def graceful(self):
292 # TODOv3: implement
293 pass
294
295 def verboseOn(self):
296 # TODOv3: implement
297 pass
298
299 def verboseOff(self):
300 # TODOv3: implement
301 pass
302
James E. Blairf5dbd002015-12-23 15:26:17 -0800303 def join(self):
304 self.update_thread.join()
305 self.thread.join()
306
James E. Blair414cb672016-10-05 13:48:14 -0700307 def runCommand(self):
308 while self._command_running:
309 try:
310 command = self.command_socket.get()
Joshua Hesketh39ee7ce2016-12-09 12:11:39 +1100311 if command != '_stop':
312 self.command_map[command]()
James E. Blair414cb672016-10-05 13:48:14 -0700313 except Exception:
314 self.log.exception("Exception while processing command")
315
James E. Blairf5dbd002015-12-23 15:26:17 -0800316 def _updateLoop(self):
317 while self._running:
318 try:
319 self._innerUpdateLoop()
320 except:
321 self.log.exception("Exception in update thread:")
322
323 def _innerUpdateLoop(self):
324 # Inside of a loop that keeps the main repository up to date
325 task = self.update_queue.get()
326 self.log.info("Updating repo %s from %s" % (task.project, task.url))
327 self.merger.updateRepo(task.project, task.url)
328 self.log.debug("Finished updating repo %s from %s" %
329 (task.project, task.url))
330 task.setComplete()
331
332 def update(self, project, url):
333 task = UpdateTask(project, url)
334 task = self.update_queue.put(task)
335 return task
336
337 def run(self):
338 self.log.debug("Starting launch listener")
339 while self._running:
340 try:
341 job = self.worker.getJob()
342 try:
343 if job.name == 'launcher:launch':
344 self.log.debug("Got launch job: %s" % job.unique)
James E. Blair17302972016-08-10 16:11:42 -0700345 self.launchJob(job)
346 elif job.name.startswith('launcher:stop'):
347 self.log.debug("Got stop job: %s" % job.unique)
348 self.stopJob(job)
James E. Blairf5dbd002015-12-23 15:26:17 -0800349 elif job.name == 'merger:cat':
350 self.log.debug("Got cat job: %s" % job.unique)
351 self.cat(job)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700352 elif job.name == 'merger:merge':
353 self.log.debug("Got merge job: %s" % job.unique)
354 self.merge(job)
James E. Blairf5dbd002015-12-23 15:26:17 -0800355 else:
356 self.log.error("Unable to handle job %s" % job.name)
357 job.sendWorkFail()
358 except Exception:
359 self.log.exception("Exception while running job")
360 job.sendWorkException(traceback.format_exc())
361 except Exception:
362 self.log.exception("Exception while getting job")
363
James E. Blair17302972016-08-10 16:11:42 -0700364 def launchJob(self, job):
Joshua Hesketh50c21782016-10-13 21:34:14 +1100365 self.job_workers[job.unique] = AnsibleJob(self, job)
366 self.job_workers[job.unique].run()
James E. Blairf5dbd002015-12-23 15:26:17 -0800367
Joshua Hesketh50c21782016-10-13 21:34:14 +1100368 def finishJob(self, unique):
369 del(self.job_workers[unique])
370
371 def stopJob(self, job):
James E. Blaircaa83ad2017-01-27 08:58:07 -0800372 try:
373 args = json.loads(job.arguments)
374 self.log.debug("Stop job with arguments: %s" % (args,))
375 unique = args['uuid']
376 job_worker = self.job_workers.get(unique)
377 if not job_worker:
378 self.log.debug("Unable to find worker for job %s" % (unique,))
379 return
380 try:
381 job_worker.stop()
382 except Exception:
383 self.log.exception("Exception sending stop command "
384 "to worker:")
385 finally:
386 job.sendWorkComplete()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100387
388 def cat(self, job):
389 args = json.loads(job.arguments)
390 task = self.update(args['project'], args['url'])
391 task.wait()
392 files = self.merger.getFiles(args['project'], args['url'],
393 args['branch'], args['files'])
394 result = dict(updated=True,
395 files=files,
396 zuul_url=self.zuul_url)
397 job.sendWorkComplete(json.dumps(result))
398
399 def merge(self, job):
400 args = json.loads(job.arguments)
401 ret = self.merger.mergeChanges(args['items'], args.get('files'))
402 result = dict(merged=(ret is not None),
403 zuul_url=self.zuul_url)
404 if args.get('files'):
405 result['commit'], result['files'] = ret
406 else:
407 result['commit'] = ret
408 job.sendWorkComplete(json.dumps(result))
409
410
411class AnsibleJob(object):
412 log = logging.getLogger("zuul.AnsibleJob")
413
James E. Blair412fba82017-01-26 15:00:50 -0800414 RESULT_NORMAL = 1
415 RESULT_TIMED_OUT = 2
416 RESULT_UNREACHABLE = 3
417 RESULT_ABORTED = 4
418
Joshua Hesketh50c21782016-10-13 21:34:14 +1100419 def __init__(self, launcher_server, job):
420 self.launcher_server = launcher_server
421 self.job = job
James E. Blair412fba82017-01-26 15:00:50 -0800422 self.jobdir = None
James E. Blaircaa83ad2017-01-27 08:58:07 -0800423 self.proc = None
424 self.proc_lock = threading.Lock()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100425 self.running = False
James E. Blaircaa83ad2017-01-27 08:58:07 -0800426 self.aborted = False
Joshua Hesketh50c21782016-10-13 21:34:14 +1100427
428 if self.launcher_server.config.has_option(
429 'launcher', 'private_key_file'):
430 self.private_key_file = self.launcher_server.config.get(
431 'launcher', 'private_key_file')
432 else:
433 self.private_key_file = '~/.ssh/id_rsa'
434
435 def run(self):
436 self.running = True
437 self.thread = threading.Thread(target=self.launch)
438 self.thread.start()
439
440 def stop(self):
James E. Blaircaa83ad2017-01-27 08:58:07 -0800441 self.aborted = True
442 self.abortRunningProc()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100443 self.thread.join()
444
445 def launch(self):
446 try:
James E. Blair412fba82017-01-26 15:00:50 -0800447 self.jobdir = JobDir()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100448 self._launch()
James E. Blair096c5cd2017-02-02 15:33:18 -0800449 except Exception:
450 self.log.exception("Exception while launching job")
451 self.job.sendWorkException(traceback.format_exc())
Joshua Hesketh50c21782016-10-13 21:34:14 +1100452 finally:
453 self.running = False
James E. Blair412fba82017-01-26 15:00:50 -0800454 try:
455 self.jobdir.cleanup()
456 except Exception:
457 self.log.exception("Error cleaning up jobdir:")
458 try:
459 self.launcher_server.finishJob(self.job.unique)
460 except Exception:
461 self.log.exception("Error finalizing job thread:")
Joshua Hesketh50c21782016-10-13 21:34:14 +1100462
463 def _launch(self):
464 self.log.debug("Job %s: beginning" % (self.job.unique,))
James E. Blair412fba82017-01-26 15:00:50 -0800465 self.log.debug("Job %s: job root at %s" %
466 (self.job.unique, self.jobdir.root))
467 args = json.loads(self.job.arguments)
468 tasks = []
469 for project in args['projects']:
470 self.log.debug("Job %s: updating project %s" %
471 (self.job.unique, project['name']))
472 tasks.append(self.launcher_server.update(
473 project['name'], project['url']))
474 for task in tasks:
475 task.wait()
Joshua Hesketh50c21782016-10-13 21:34:14 +1100476
James E. Blair412fba82017-01-26 15:00:50 -0800477 self.log.debug("Job %s: git updates complete" % (self.job.unique,))
478 merger = self.launcher_server._getMerger(self.jobdir.git_root)
479 merge_items = [i for i in args['items'] if i.get('refspec')]
480 if merge_items:
481 commit = merger.mergeChanges(merge_items) # noqa
482 else:
483 commit = args['items'][-1]['newrev'] # noqa
James E. Blair82938472016-01-11 14:38:13 -0800484
James E. Blair412fba82017-01-26 15:00:50 -0800485 # is the playbook in a repo that we have already prepared?
James E. Blair66b274e2017-01-31 14:47:52 -0800486 self.preparePlaybookRepos(args)
James E. Blairc73c73a2017-01-20 15:15:15 -0800487
James E. Blair412fba82017-01-26 15:00:50 -0800488 # TODOv3: Ansible the ansible thing here.
489 self.prepareAnsibleFiles(args)
James E. Blairf5dbd002015-12-23 15:26:17 -0800490
James E. Blair412fba82017-01-26 15:00:50 -0800491 data = {
492 'manager': self.launcher_server.hostname,
493 'url': 'https://server/job/{}/0/'.format(args['job']),
494 'worker_name': 'My Worker',
495 }
James E. Blair17302972016-08-10 16:11:42 -0700496
James E. Blair412fba82017-01-26 15:00:50 -0800497 # TODOv3:
498 # 'name': self.name,
499 # 'manager': self.launch_server.hostname,
500 # 'worker_name': 'My Worker',
501 # 'worker_hostname': 'localhost',
502 # 'worker_ips': ['127.0.0.1', '192.168.1.1'],
503 # 'worker_fqdn': 'zuul.example.org',
504 # 'worker_program': 'FakeBuilder',
505 # 'worker_version': 'v1.1',
506 # 'worker_extra': {'something': 'else'}
James E. Blair17302972016-08-10 16:11:42 -0700507
James E. Blair412fba82017-01-26 15:00:50 -0800508 self.job.sendWorkData(json.dumps(data))
509 self.job.sendWorkStatus(0, 100)
James E. Blairf5dbd002015-12-23 15:26:17 -0800510
James E. Blair412fba82017-01-26 15:00:50 -0800511 result = self.runPlaybooks()
512
513 if result is None:
514 self.job.sendWorkFail()
515 return
516 result = dict(result=result)
517 self.job.sendWorkComplete(json.dumps(result))
518
519 def runPlaybooks(self):
520 result = None
521
James E. Blair66b274e2017-01-31 14:47:52 -0800522 for playbook in self.jobdir.pre_playbooks:
523 pre_status, pre_code = self.runAnsiblePlaybook(playbook)
524 if pre_status != self.RESULT_NORMAL or pre_code != 0:
525 # These should really never fail, so return None and have
526 # zuul try again
527 return result
James E. Blair412fba82017-01-26 15:00:50 -0800528
James E. Blair66b274e2017-01-31 14:47:52 -0800529 job_status, job_code = self.runAnsiblePlaybook(self.jobdir.playbook)
James E. Blaircaa83ad2017-01-27 08:58:07 -0800530 if job_status == self.RESULT_TIMED_OUT:
531 return 'TIMED_OUT'
532 if job_status == self.RESULT_ABORTED:
533 return 'ABORTED'
James E. Blair412fba82017-01-26 15:00:50 -0800534 if job_status != self.RESULT_NORMAL:
535 # The result of the job is indeterminate. Zuul will
536 # run it again.
537 return result
538
James E. Blair66b274e2017-01-31 14:47:52 -0800539 success = (job_code == 0)
540 if success:
James E. Blair412fba82017-01-26 15:00:50 -0800541 result = 'SUCCESS'
542 else:
543 result = 'FAILURE'
James E. Blair66b274e2017-01-31 14:47:52 -0800544
545 for playbook in self.jobdir.post_playbooks:
546 post_status, post_code = self.runAnsiblePlaybook(
547 playbook, success)
548 if post_status != self.RESULT_NORMAL or post_code != 0:
549 result = 'POST_FAILURE'
James E. Blair412fba82017-01-26 15:00:50 -0800550 return result
James E. Blair17302972016-08-10 16:11:42 -0700551
James E. Blair82938472016-01-11 14:38:13 -0800552 def getHostList(self, args):
James E. Blair34776ee2016-08-25 13:53:54 -0700553 # TODOv3: the localhost addition is temporary so we have
554 # something to exercise ansible.
555 hosts = [('localhost', dict(ansible_connection='local'))]
556 for node in args['nodes']:
557 # TODOv3: the connection should almost certainly not be
558 # local.
559 hosts.append((node['name'], dict(ansible_connection='local')))
560 return hosts
James E. Blair82938472016-01-11 14:38:13 -0800561
James E. Blaird130f712017-01-25 14:56:10 -0800562 def findPlaybook(self, path):
563 for ext in ['.yaml', '.yml']:
564 fn = path + ext
565 if os.path.exists(fn):
566 return fn
567 raise Exception("Unable to find playbook %s" % path)
568
James E. Blair66b274e2017-01-31 14:47:52 -0800569 def preparePlaybookRepos(self, args):
570 for playbook in args['pre_playbooks']:
571 jobdir_playbook = self.jobdir.addPrePlaybook()
572 self.preparePlaybookRepo(jobdir_playbook, playbook, args)
573
574 jobdir_playbook = self.jobdir.playbook
575 self.preparePlaybookRepo(jobdir_playbook, args['playbook'], args)
576
577 for playbook in args['post_playbooks']:
578 jobdir_playbook = self.jobdir.addPostPlaybook()
579 self.preparePlaybookRepo(jobdir_playbook, playbook, args)
580
581 def preparePlaybookRepo(self, jobdir_playbook, playbook, args):
582 # Check out the playbook repo if needed and set the path to
James E. Blairc73c73a2017-01-20 15:15:15 -0800583 # the playbook that should be run.
James E. Blair66b274e2017-01-31 14:47:52 -0800584 jobdir_playbook.secure = playbook['secure']
Joshua Hesketh50c21782016-10-13 21:34:14 +1100585 source = self.launcher_server.connections.getSource(
586 playbook['connection'])
James E. Blairc73c73a2017-01-20 15:15:15 -0800587 project = source.getProject(playbook['project'])
588 # TODO(jeblair): construct the url in the merger itself
589 url = source.getGitUrl(project)
James E. Blair66b274e2017-01-31 14:47:52 -0800590 if not playbook['secure']:
James E. Blairc73c73a2017-01-20 15:15:15 -0800591 # This is a project repo, so it is safe to use the already
592 # checked out version (from speculative merging) of the
593 # playbook
594 for i in args['items']:
595 if (i['connection_name'] == playbook['connection'] and
596 i['project'] == playbook['project']):
597 # We already have this repo prepared
James E. Blair412fba82017-01-26 15:00:50 -0800598 path = os.path.join(self.jobdir.git_root,
James E. Blairc73c73a2017-01-20 15:15:15 -0800599 project.name,
600 playbook['path'])
James E. Blair66b274e2017-01-31 14:47:52 -0800601 jobdir_playbook.path = self.findPlaybook(path)
602 return
James E. Blairc73c73a2017-01-20 15:15:15 -0800603 # The playbook repo is either a config repo, or it isn't in
604 # the stack of changes we are testing, so check out the branch
605 # tip into a dedicated space.
606
James E. Blair66b274e2017-01-31 14:47:52 -0800607 merger = self.launcher_server._getMerger(jobdir_playbook.root)
James E. Blairc73c73a2017-01-20 15:15:15 -0800608 merger.checkoutBranch(project.name, url, playbook['branch'])
609
James E. Blair66b274e2017-01-31 14:47:52 -0800610 path = os.path.join(jobdir_playbook.root,
James E. Blairc73c73a2017-01-20 15:15:15 -0800611 project.name,
612 playbook['path'])
James E. Blair66b274e2017-01-31 14:47:52 -0800613 jobdir_playbook.path = self.findPlaybook(path)
James E. Blairc73c73a2017-01-20 15:15:15 -0800614
James E. Blair412fba82017-01-26 15:00:50 -0800615 def prepareAnsibleFiles(self, args):
616 with open(self.jobdir.inventory, 'w') as inventory:
James E. Blair82938472016-01-11 14:38:13 -0800617 for host_name, host_vars in self.getHostList(args):
618 inventory.write(host_name)
619 inventory.write(' ')
620 for k, v in host_vars.items():
621 inventory.write('%s=%s' % (k, v))
622 inventory.write('\n')
James E. Blair412fba82017-01-26 15:00:50 -0800623 with open(self.jobdir.vars, 'w') as vars_yaml:
James E. Blaira92cbc82017-01-23 14:56:49 -0800624 zuul_vars = dict(zuul=args['zuul'])
625 vars_yaml.write(
626 yaml.safe_dump(zuul_vars, default_flow_style=False))
James E. Blair412fba82017-01-26 15:00:50 -0800627 with open(self.jobdir.config, 'w') as config:
James E. Blair82938472016-01-11 14:38:13 -0800628 config.write('[defaults]\n')
James E. Blair412fba82017-01-26 15:00:50 -0800629 config.write('hostfile = %s\n' % self.jobdir.inventory)
630 config.write('local_tmp = %s/.ansible/local_tmp\n' %
631 self.jobdir.root)
632 config.write('remote_tmp = %s/.ansible/remote_tmp\n' %
633 self.jobdir.root)
James E. Blair414cb672016-10-05 13:48:14 -0700634 config.write('private_key_file = %s\n' % self.private_key_file)
635 config.write('retry_files_enabled = False\n')
James E. Blair412fba82017-01-26 15:00:50 -0800636 config.write('log_path = %s\n' % self.jobdir.ansible_log)
James E. Blair414cb672016-10-05 13:48:14 -0700637 config.write('gathering = explicit\n')
Joshua Hesketh50c21782016-10-13 21:34:14 +1100638 config.write('library = %s\n'
639 % self.launcher_server.library_dir)
James E. Blair414cb672016-10-05 13:48:14 -0700640 # bump the timeout because busy nodes may take more than
641 # 10s to respond
642 config.write('timeout = 30\n')
643
644 config.write('[ssh_connection]\n')
Joshua Hesketh3f7def32016-11-21 17:36:44 +1100645 # NB: when setting pipelining = True, keep_remote_files
646 # must be False (the default). Otherwise it apparently
647 # will override the pipelining option and effectively
648 # disable it. Pipelining has a side effect of running the
649 # command without a tty (ie, without the -tt argument to
650 # ssh). We require this behavior so that if a job runs a
651 # command which expects interactive input on a tty (such
652 # as sudo) it does not hang.
653 config.write('pipelining = True\n')
James E. Blair414cb672016-10-05 13:48:14 -0700654 ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
James E. Blair412fba82017-01-26 15:00:50 -0800655 "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
James E. Blair414cb672016-10-05 13:48:14 -0700656 config.write('ssh_args = %s\n' % ssh_args)
657
James E. Blaircaa83ad2017-01-27 08:58:07 -0800658 def _ansibleTimeout(self, msg):
James E. Blair414cb672016-10-05 13:48:14 -0700659 self.log.warning(msg)
James E. Blaircaa83ad2017-01-27 08:58:07 -0800660 self.abortRunningProc()
James E. Blair414cb672016-10-05 13:48:14 -0700661
James E. Blaircaa83ad2017-01-27 08:58:07 -0800662 def abortRunningProc(self):
663 with self.proc_lock:
664 if not self.proc:
665 self.log.debug("Abort: no process is running")
666 return
667 self.log.debug("Abort: sending kill signal to job "
668 "process group")
669 try:
670 pgid = os.getpgid(self.proc.pid)
671 os.killpg(pgid, signal.SIGKILL)
672 except Exception:
673 self.log.exception("Exception while killing "
674 "ansible process:")
James E. Blair82938472016-01-11 14:38:13 -0800675
James E. Blair412fba82017-01-26 15:00:50 -0800676 def runAnsible(self, cmd, timeout):
James E. Blair414cb672016-10-05 13:48:14 -0700677 env_copy = os.environ.copy()
678 env_copy['LOGNAME'] = 'zuul'
679
James E. Blaircaa83ad2017-01-27 08:58:07 -0800680 with self.proc_lock:
681 if self.aborted:
682 return (self.RESULT_ABORTED, None)
683 self.log.debug("Ansible command: %s" % (cmd,))
684 self.proc = subprocess.Popen(
685 cmd,
686 cwd=self.jobdir.ansible_root,
687 stdout=subprocess.PIPE,
688 stderr=subprocess.STDOUT,
689 preexec_fn=os.setsid,
690 env=env_copy,
691 )
James E. Blair414cb672016-10-05 13:48:14 -0700692
693 ret = None
James E. Blair414cb672016-10-05 13:48:14 -0700694 watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
695 self._ansibleTimeout,
James E. Blaircaa83ad2017-01-27 08:58:07 -0800696 ("Ansible timeout exceeded",))
James E. Blair414cb672016-10-05 13:48:14 -0700697 watchdog.start()
698 try:
James E. Blaircaa83ad2017-01-27 08:58:07 -0800699 for line in iter(self.proc.stdout.readline, b''):
James E. Blair414cb672016-10-05 13:48:14 -0700700 line = line[:1024].rstrip()
701 self.log.debug("Ansible output: %s" % (line,))
James E. Blaircaa83ad2017-01-27 08:58:07 -0800702 ret = self.proc.wait()
James E. Blair414cb672016-10-05 13:48:14 -0700703 finally:
704 watchdog.stop()
705 self.log.debug("Ansible exit code: %s" % (ret,))
706
James E. Blaircaa83ad2017-01-27 08:58:07 -0800707 with self.proc_lock:
708 self.proc = None
709
James E. Blair414cb672016-10-05 13:48:14 -0700710 if watchdog.timed_out:
James E. Blair412fba82017-01-26 15:00:50 -0800711 return (self.RESULT_TIMED_OUT, None)
James E. Blair414cb672016-10-05 13:48:14 -0700712 if ret == 3:
713 # AnsibleHostUnreachable: We had a network issue connecting to
714 # our zuul-worker.
James E. Blair412fba82017-01-26 15:00:50 -0800715 return (self.RESULT_UNREACHABLE, None)
James E. Blair414cb672016-10-05 13:48:14 -0700716 elif ret == -9:
717 # Received abort request.
James E. Blair412fba82017-01-26 15:00:50 -0800718 return (self.RESULT_ABORTED, None)
James E. Blair414cb672016-10-05 13:48:14 -0700719
James E. Blair412fba82017-01-26 15:00:50 -0800720 return (self.RESULT_NORMAL, ret)
721
James E. Blair66b274e2017-01-31 14:47:52 -0800722 def runAnsiblePlaybook(self, playbook, success=None):
James E. Blair412fba82017-01-26 15:00:50 -0800723 env_copy = os.environ.copy()
724 env_copy['LOGNAME'] = 'zuul'
725
726 if False: # TODOv3: self.options['verbose']:
727 verbose = '-vvv'
728 else:
729 verbose = '-v'
730
James E. Blair66b274e2017-01-31 14:47:52 -0800731 cmd = ['ansible-playbook', playbook.path]
James E. Blair412fba82017-01-26 15:00:50 -0800732
James E. Blair66b274e2017-01-31 14:47:52 -0800733 if success is not None:
734 cmd.extend(['-e', 'success=%s' % str(bool(success))])
James E. Blair412fba82017-01-26 15:00:50 -0800735
James E. Blair66b274e2017-01-31 14:47:52 -0800736 cmd.extend(['-e@%s' % self.jobdir.vars, verbose])
James E. Blair412fba82017-01-26 15:00:50 -0800737
James E. Blair412fba82017-01-26 15:00:50 -0800738 # TODOv3: get this from the job
739 timeout = 60
740
741 return self.runAnsible(cmd, timeout)