Merge "Add nodepool request framework" into feature/zuulv3
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
deleted file mode 100644
index 95fc2fa..0000000
--- a/zuul/launcher/ansiblelaunchserver.py
+++ /dev/null
@@ -1,1384 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import subprocess
-import tempfile
-import threading
-import time
-import traceback
-import Queue
-import uuid
-
-import gear
-import yaml
-import jenkins_jobs.builder
-import jenkins_jobs.formatter
-import zmq
-
-import zuul.ansible.library
-import zuul.ansible.plugins.callback_plugins
-from zuul.lib import commandsocket
-
-ANSIBLE_WATCHDOG_GRACE = 5 * 60
-ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
-ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
-
-
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
- 'verbose', 'unverbose']
-
-
-def boolify(x):
- if isinstance(x, str):
- return bool(int(x))
- return bool(x)
-
-
-class LaunchGearWorker(gear.Worker):
- def __init__(self, *args, **kw):
- self.__launch_server = kw.pop('launch_server')
- super(LaunchGearWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- workers = len(self.__launch_server.node_workers)
- delay = (workers ** 2) / 1000.0
- time.sleep(delay)
- return super(LaunchGearWorker, self).handleNoop(packet)
-
-
-class NodeGearWorker(gear.Worker):
- MASS_DO = 101
-
- def sendMassDo(self, functions):
- data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
- self.broadcast_lock.acquire()
- try:
- p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
- self.broadcast(p)
- finally:
- self.broadcast_lock.release()
-
-
-class Watchdog(object):
- def __init__(self, timeout, function, args):
- self.timeout = timeout
- self.function = function
- self.args = args
- self.thread = threading.Thread(target=self._run)
- self.thread.daemon = True
-
- def _run(self):
- while self._running and time.time() < self.end:
- time.sleep(10)
- if self._running:
- self.function(*self.args)
-
- def start(self):
- self._running = True
- self.end = time.time() + self.timeout
- self.thread.start()
-
- def stop(self):
- self._running = False
-
-
-class JobDir(object):
- def __init__(self, keep=False):
- self.keep = keep
- self.root = tempfile.mkdtemp()
- self.ansible_root = os.path.join(self.root, 'ansible')
- os.makedirs(self.ansible_root)
- self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
- self.inventory = os.path.join(self.ansible_root, 'inventory')
- self.playbook = os.path.join(self.ansible_root, 'playbook')
- self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
- self.config = os.path.join(self.ansible_root, 'ansible.cfg')
- self.script_root = os.path.join(self.ansible_root, 'scripts')
- self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
- os.makedirs(self.script_root)
- self.staging_root = os.path.join(self.root, 'staging')
- os.makedirs(self.staging_root)
-
- def __enter__(self):
- return self
-
- def __exit__(self, etype, value, tb):
- if not self.keep:
- shutil.rmtree(self.root)
-
-
-class LaunchServer(object):
- log = logging.getLogger("zuul.LaunchServer")
- site_section_re = re.compile('site "(.*?)"')
- node_section_re = re.compile('node "(.*?)"')
-
- def __init__(self, config, keep_jobdir=False):
- self.config = config
- self.options = dict(
- verbose=False
- )
- self.keep_jobdir = keep_jobdir
- self.hostname = socket.gethostname()
- self.registered_functions = set()
- self.node_workers = {}
- self.jobs = {}
- self.builds = {}
- self.zmq_send_queue = Queue.Queue()
- self.termination_queue = Queue.Queue()
- self.sites = {}
- self.static_nodes = {}
- self.command_map = dict(
- reconfigure=self.reconfigure,
- stop=self.stop,
- pause=self.pause,
- unpause=self.unpause,
- release=self.release,
- graceful=self.graceful,
- verbose=self.verboseOn,
- unverbose=self.verboseOff,
- )
-
- if config.has_option('launcher', 'accept_nodes'):
- self.accept_nodes = config.getboolean('launcher',
- 'accept_nodes')
- else:
- self.accept_nodes = True
- self.config_accept_nodes = self.accept_nodes
-
- if self.config.has_option('zuul', 'state_dir'):
- state_dir = os.path.expanduser(
- self.config.get('zuul', 'state_dir'))
- else:
- state_dir = '/var/lib/zuul'
- path = os.path.join(state_dir, 'launcher.socket')
- self.command_socket = commandsocket.CommandSocket(path)
- ansible_dir = os.path.join(state_dir, 'ansible')
- plugins_dir = os.path.join(ansible_dir, 'plugins')
- self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
- if not os.path.exists(self.callback_dir):
- os.makedirs(self.callback_dir)
- self.library_dir = os.path.join(ansible_dir, 'library')
- if not os.path.exists(self.library_dir):
- os.makedirs(self.library_dir)
-
- callback_path = os.path.dirname(os.path.abspath(
- zuul.ansible.plugins.callback_plugins.__file__))
- for fn in os.listdir(callback_path):
- shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
-
- library_path = os.path.dirname(os.path.abspath(
- zuul.ansible.library.__file__))
- for fn in os.listdir(library_path):
- shutil.copy(os.path.join(library_path, fn), self.library_dir)
-
- for section in config.sections():
- m = self.site_section_re.match(section)
- if m:
- sitename = m.group(1)
- d = {}
- d['host'] = config.get(section, 'host')
- d['user'] = config.get(section, 'user')
- if config.has_option(section, 'pass'):
- d['pass'] = config.get(section, 'pass')
- else:
- d['pass'] = ''
- if config.has_option(section, 'root'):
- d['root'] = config.get(section, 'root')
- else:
- d['root'] = '/'
- self.sites[sitename] = d
- continue
- m = self.node_section_re.match(section)
- if m:
- nodename = m.group(1)
- d = {}
- d['name'] = nodename
- d['host'] = config.get(section, 'host')
- if config.has_option(section, 'description'):
- d['description'] = config.get(section, 'description')
- else:
- d['description'] = ''
- if config.has_option(section, 'labels'):
- d['labels'] = config.get(section, 'labels').split(',')
- else:
- d['labels'] = []
- self.static_nodes[nodename] = d
- continue
-
- def start(self):
- self._gearman_running = True
- self._zmq_running = True
- self._reaper_running = True
- self._command_running = True
-
- # Setup ZMQ
- self.zcontext = zmq.Context()
- self.zsocket = self.zcontext.socket(zmq.PUB)
- self.zsocket.bind("tcp://*:8888")
-
- # Setup Gearman
- server = self.config.get('gearman', 'server')
- if self.config.has_option('gearman', 'port'):
- port = self.config.get('gearman', 'port')
- else:
- port = 4730
- self.worker = LaunchGearWorker('Zuul Launch Server',
- launch_server=self)
- self.worker.addServer(server, port)
- self.log.debug("Waiting for server")
- self.worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- # Start command socket
- self.log.debug("Starting command processor")
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
- self.command_thread.daemon = True
- self.command_thread.start()
-
- # Load JJB config
- self.loadJobs()
-
- # Start ZMQ worker thread
- self.log.debug("Starting ZMQ processor")
- self.zmq_thread = threading.Thread(target=self.runZMQ)
- self.zmq_thread.daemon = True
- self.zmq_thread.start()
-
- # Start node worker reaper thread
- self.log.debug("Starting reaper")
- self.reaper_thread = threading.Thread(target=self.runReaper)
- self.reaper_thread.daemon = True
- self.reaper_thread.start()
-
- # Start Gearman worker thread
- self.log.debug("Starting worker")
- self.gearman_thread = threading.Thread(target=self.run)
- self.gearman_thread.daemon = True
- self.gearman_thread.start()
-
- # Start static workers
- for node in self.static_nodes.values():
- self.log.debug("Creating static node with arguments: %s" % (node,))
- self._launchWorker(node)
-
- def loadJobs(self):
- self.log.debug("Loading jobs")
- builder = JJB()
- path = self.config.get('launcher', 'jenkins_jobs')
- builder.load_files([path])
- builder.parser.expandYaml()
- unseen = set(self.jobs.keys())
- for job in builder.parser.jobs:
- builder.expandMacros(job)
- self.jobs[job['name']] = job
- unseen.discard(job['name'])
- for name in unseen:
- del self.jobs[name]
-
- def register(self):
- new_functions = set()
- if self.accept_nodes:
- new_functions.add("node_assign:zuul")
- new_functions.add("stop:%s" % self.hostname)
- new_functions.add("set_description:%s" % self.hostname)
- new_functions.add("node_revoke:%s" % self.hostname)
-
- for function in new_functions - self.registered_functions:
- self.worker.registerFunction(function)
- for function in self.registered_functions - new_functions:
- self.worker.unRegisterFunction(function)
- self.registered_functions = new_functions
-
- def reconfigure(self):
- self.log.debug("Reconfiguring")
- self.loadJobs()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='reconfigure'))
- except Exception:
- self.log.exception("Exception sending reconfigure command "
- "to worker:")
- self.log.debug("Reconfiguration complete")
-
- def pause(self):
- self.log.debug("Pausing")
- self.accept_nodes = False
- self.register()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='pause'))
- except Exception:
- self.log.exception("Exception sending pause command "
- "to worker:")
- self.log.debug("Paused")
-
- def unpause(self):
- self.log.debug("Unpausing")
- self.accept_nodes = self.config_accept_nodes
- self.register()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='unpause'))
- except Exception:
- self.log.exception("Exception sending unpause command "
- "to worker:")
- self.log.debug("Unpaused")
-
- def release(self):
- self.log.debug("Releasing idle nodes")
- for node in self.node_workers.values():
- if node.name in self.static_nodes:
- continue
- try:
- if node.isAlive():
- node.queue.put(dict(action='release'))
- except Exception:
- self.log.exception("Exception sending release command "
- "to worker:")
- self.log.debug("Finished releasing idle nodes")
-
- def graceful(self):
- # Note: this is run in the command processing thread; no more
- # external commands will be processed after this.
- self.log.debug("Gracefully stopping")
- self.pause()
- self.release()
- self.log.debug("Waiting for all builds to finish")
- while self.builds:
- time.sleep(5)
- self.log.debug("All builds are finished")
- self.stop()
-
- def stop(self):
- self.log.debug("Stopping")
- # First, stop accepting new jobs
- self._gearman_running = False
- self._reaper_running = False
- self.worker.shutdown()
- # Then stop all of the workers
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.stop()
- except Exception:
- self.log.exception("Exception sending stop command to worker:")
- # Stop ZMQ afterwords so that the send queue is flushed
- self._zmq_running = False
- self.zmq_send_queue.put(None)
- self.zmq_send_queue.join()
- # Stop command processing
- self._command_running = False
- self.command_socket.stop()
- # Join the gearman thread which was stopped earlier.
- self.gearman_thread.join()
- # The command thread is joined in the join() method of this
- # class, which is called by the command shell.
- self.log.debug("Stopped")
-
- def verboseOn(self):
- self.log.debug("Enabling verbose mode")
- self.options['verbose'] = True
-
- def verboseOff(self):
- self.log.debug("Disabling verbose mode")
- self.options['verbose'] = False
-
- def join(self):
- self.command_thread.join()
-
- def runCommand(self):
- while self._command_running:
- try:
- command = self.command_socket.get()
- self.command_map[command]()
- except Exception:
- self.log.exception("Exception while processing command")
-
- def runZMQ(self):
- while self._zmq_running or not self.zmq_send_queue.empty():
- try:
- item = self.zmq_send_queue.get()
- self.log.debug("Got ZMQ event %s" % (item,))
- if item is None:
- continue
- self.zsocket.send(item)
- except Exception:
- self.log.exception("Exception while processing ZMQ events")
- finally:
- self.zmq_send_queue.task_done()
-
- def run(self):
- while self._gearman_running:
- try:
- job = self.worker.getJob()
- try:
- if job.name.startswith('node_assign:'):
- self.log.debug("Got node_assign job: %s" % job.unique)
- self.assignNode(job)
- elif job.name.startswith('stop:'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- elif job.name.startswith('set_description:'):
- self.log.debug("Got set_description job: %s" %
- job.unique)
- job.sendWorkComplete()
- elif job.name.startswith('node_revoke:'):
- self.log.debug("Got node_revoke job: %s" % job.unique)
- self.revokeNode(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(traceback.format_exc())
- except gear.InterruptedError:
- return
- except Exception:
- self.log.exception("Exception while getting job")
-
- def assignNode(self, job):
- args = json.loads(job.arguments)
- self.log.debug("Assigned node with arguments: %s" % (args,))
- self._launchWorker(args)
- data = dict(manager=self.hostname)
- job.sendWorkData(json.dumps(data))
- job.sendWorkComplete()
-
- def _launchWorker(self, args):
- worker = NodeWorker(self.config, self.jobs, self.builds,
- self.sites, args['name'], args['host'],
- args['description'], args['labels'],
- self.hostname, self.zmq_send_queue,
- self.termination_queue, self.keep_jobdir,
- self.callback_dir, self.library_dir,
- self.options)
- self.node_workers[worker.name] = worker
-
- worker.thread = threading.Thread(target=worker.run)
- worker.thread.start()
-
- def revokeNode(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Revoke job with arguments: %s" % (args,))
- name = args['name']
- node = self.node_workers.get(name)
- if not node:
- self.log.debug("Unable to find worker %s" % (name,))
- return
- try:
- if node.isAlive():
- node.queue.put(dict(action='stop'))
- else:
- self.log.debug("Node %s is not alive while revoking node" %
- (node.name,))
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
- finally:
- job.sendWorkComplete()
-
- def stopJob(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Stop job with arguments: %s" % (args,))
- unique = args['number']
- build_worker_name = self.builds.get(unique)
- if not build_worker_name:
- self.log.debug("Unable to find build for job %s" % (unique,))
- return
- node = self.node_workers.get(build_worker_name)
- if not node:
- self.log.debug("Unable to find worker for job %s" % (unique,))
- return
- try:
- if node.isAlive():
- node.queue.put(dict(action='abort'))
- else:
- self.log.debug("Node %s is not alive while aborting job" %
- (node.name,))
- except Exception:
- self.log.exception("Exception sending abort command "
- "to worker:")
- finally:
- job.sendWorkComplete()
-
- def runReaper(self):
- # We don't actually care if all the events are processed
- while self._reaper_running:
- try:
- item = self.termination_queue.get()
- self.log.debug("Got termination event %s" % (item,))
- if item is None:
- continue
- worker = self.node_workers[item]
- self.log.debug("Joining %s" % (item,))
- worker.thread.join()
- self.log.debug("Joined %s" % (item,))
- del self.node_workers[item]
- except Exception:
- self.log.exception("Exception while processing "
- "termination events:")
- finally:
- self.termination_queue.task_done()
-
-
-class NodeWorker(object):
- def __init__(self, config, jobs, builds, sites, name, host,
- description, labels, manager_name, zmq_send_queue,
- termination_queue, keep_jobdir, callback_dir,
- library_dir, options):
- self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
- self.log.debug("Creating node worker %s" % (name,))
- self.config = config
- self.jobs = jobs
- self.builds = builds
- self.sites = sites
- self.name = name
- self.host = host
- self.description = description
- if not isinstance(labels, list):
- labels = [labels]
- self.labels = labels
- self.thread = None
- self.registered_functions = set()
- # If the unpaused Event is set, that means we should run jobs.
- # If it is clear, then we are paused and should not run jobs.
- self.unpaused = threading.Event()
- self.unpaused.set()
- self._running = True
- self.queue = Queue.Queue()
- self.manager_name = manager_name
- self.zmq_send_queue = zmq_send_queue
- self.termination_queue = termination_queue
- self.keep_jobdir = keep_jobdir
- self.running_job_lock = threading.Lock()
- self.pending_registration = False
- self.registration_lock = threading.Lock()
- self._get_job_lock = threading.Lock()
- self._got_job = False
- self._job_complete_event = threading.Event()
- self._running_job = False
- self._aborted_job = False
- self._sent_complete_event = False
- self.ansible_job_proc = None
- self.ansible_post_proc = None
- self.workspace_root = config.get('launcher', 'workspace_root')
- if self.config.has_option('launcher', 'private_key_file'):
- self.private_key_file = config.get('launcher', 'private_key_file')
- else:
- self.private_key_file = '~/.ssh/id_rsa'
- if self.config.has_option('launcher', 'username'):
- self.username = config.get('launcher', 'username')
- else:
- self.username = 'zuul'
- self.callback_dir = callback_dir
- self.library_dir = library_dir
- self.options = options
-
- def isAlive(self):
- # Meant to be called from the manager
- if self.thread and self.thread.is_alive():
- return True
- return False
-
- def run(self):
- self.log.debug("Node worker %s starting" % (self.name,))
- server = self.config.get('gearman', 'server')
- if self.config.has_option('gearman', 'port'):
- port = self.config.get('gearman', 'port')
- else:
- port = 4730
- self.worker = NodeGearWorker(self.name)
- self.worker.addServer(server, port)
- self.log.debug("Waiting for server")
- self.worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- self.gearman_thread = threading.Thread(target=self.runGearman)
- self.gearman_thread.daemon = True
- self.gearman_thread.start()
-
- self.log.debug("Started")
-
- while self._running or not self.queue.empty():
- try:
- self._runQueue()
- except Exception:
- self.log.exception("Exception in queue manager:")
-
- def stop(self):
- # If this is called locally, setting _running will be
- # effictive, if it's called remotely, it will not be, but it
- # will be set by the queue thread.
- self.log.debug("Submitting stop request")
- self._running = False
- self.unpaused.set()
- self.queue.put(dict(action='stop'))
- self.queue.join()
-
- def pause(self):
- self.unpaused.clear()
- self.worker.stopWaitingForJobs()
-
- def unpause(self):
- self.unpaused.set()
-
- def release(self):
- # If this node is idle, stop it.
- old_unpaused = self.unpaused.is_set()
- if old_unpaused:
- self.pause()
- with self._get_job_lock:
- if self._got_job:
- self.log.debug("This worker is not idle")
- if old_unpaused:
- self.unpause()
- return
- self.log.debug("Stopping due to release command")
- self.queue.put(dict(action='stop'))
-
- def _runQueue(self):
- item = self.queue.get()
- try:
- if item['action'] == 'stop':
- self.log.debug("Received stop request")
- self._running = False
- self.termination_queue.put(self.name)
- if not self.abortRunningJob():
- self.sendFakeCompleteEvent()
- else:
- self._job_complete_event.wait()
- self.worker.shutdown()
- if item['action'] == 'pause':
- self.log.debug("Received pause request")
- self.pause()
- if item['action'] == 'unpause':
- self.log.debug("Received unpause request")
- self.unpause()
- if item['action'] == 'release':
- self.log.debug("Received release request")
- self.release()
- elif item['action'] == 'reconfigure':
- self.log.debug("Received reconfigure request")
- self.register()
- elif item['action'] == 'abort':
- self.log.debug("Received abort request")
- self.abortRunningJob()
- finally:
- self.queue.task_done()
-
- def runGearman(self):
- while self._running:
- try:
- self.unpaused.wait()
- if self._running:
- self._runGearman()
- except Exception:
- self.log.exception("Exception in gearman manager:")
- with self._get_job_lock:
- self._got_job = False
-
- def _runGearman(self):
- if self.pending_registration:
- self.register()
- with self._get_job_lock:
- try:
- job = self.worker.getJob()
- self._got_job = True
- except gear.InterruptedError:
- return
- self.log.debug("Node worker %s got job %s" % (self.name, job.name))
- try:
- if job.name not in self.registered_functions:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- return
- self.launch(job)
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(traceback.format_exc())
-
- def generateFunctionNames(self, job):
- # This only supports "node: foo" and "node: foo || bar"
- ret = set()
- job_labels = job.get('node')
- matching_labels = set()
- if job_labels:
- job_labels = [x.strip() for x in job_labels.split('||')]
- matching_labels = set(self.labels) & set(job_labels)
- if not matching_labels:
- return ret
- ret.add('build:%s' % (job['name'],))
- for label in matching_labels:
- ret.add('build:%s:%s' % (job['name'], label))
- return ret
-
- def register(self):
- if not self.registration_lock.acquire(False):
- self.log.debug("Registration already in progress")
- return
- try:
- if self._running_job:
- self.pending_registration = True
- self.log.debug("Ignoring registration due to running job")
- return
- self.log.debug("Updating registration")
- self.pending_registration = False
- new_functions = set()
- for job in self.jobs.values():
- new_functions |= self.generateFunctionNames(job)
- self.worker.sendMassDo(new_functions)
- self.registered_functions = new_functions
- finally:
- self.registration_lock.release()
-
- def abortRunningJob(self):
- self._aborted_job = True
- return self.abortRunningProc(self.ansible_job_proc)
-
- def abortRunningProc(self, proc):
- aborted = False
- self.log.debug("Abort: acquiring job lock")
- with self.running_job_lock:
- if self._running_job:
- self.log.debug("Abort: a job is running")
- if proc:
- self.log.debug("Abort: sending kill signal to job "
- "process group")
- try:
- pgid = os.getpgid(proc.pid)
- os.killpg(pgid, signal.SIGKILL)
- aborted = True
- except Exception:
- self.log.exception("Exception while killing "
- "ansible process:")
- else:
- self.log.debug("Abort: no job is running")
-
- return aborted
-
- def launch(self, job):
- self.log.info("Node worker %s launching job %s" %
- (self.name, job.name))
-
- # Make sure we can parse what we need from the job first
- args = json.loads(job.arguments)
- offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
- job_name = job.name.split(':')[1]
-
- # Initialize the result so we have something regardless of
- # whether the job actually runs
- result = None
- self._sent_complete_event = False
- self._aborted_job = False
-
- try:
- self.sendStartEvent(job_name, args)
- except Exception:
- self.log.exception("Exception while sending job start event")
-
- try:
- result = self.runJob(job, args)
- except Exception:
- self.log.exception("Exception while launching job thread")
-
- self._running_job = False
-
- try:
- data = json.dumps(dict(result=result))
- job.sendWorkComplete(data)
- except Exception:
- self.log.exception("Exception while sending job completion packet")
-
- try:
- self.sendCompleteEvent(job_name, result, args)
- except Exception:
- self.log.exception("Exception while sending job completion event")
-
- try:
- del self.builds[job.unique]
- except Exception:
- self.log.exception("Exception while clearing build record")
-
- self._job_complete_event.set()
- if offline and self._running:
- self.stop()
-
- def sendStartEvent(self, name, parameters):
- build = dict(node_name=self.name,
- host_name=self.manager_name,
- parameters=parameters)
-
- event = dict(name=name,
- build=build)
-
- item = "onStarted %s" % json.dumps(event)
- self.log.debug("Sending over ZMQ: %s" % (item,))
- self.zmq_send_queue.put(item)
-
- def sendCompleteEvent(self, name, status, parameters):
- build = dict(status=status,
- node_name=self.name,
- host_name=self.manager_name,
- parameters=parameters)
-
- event = dict(name=name,
- build=build)
-
- item = "onFinalized %s" % json.dumps(event)
- self.log.debug("Sending over ZMQ: %s" % (item,))
- self.zmq_send_queue.put(item)
- self._sent_complete_event = True
-
- def sendFakeCompleteEvent(self):
- if self._sent_complete_event:
- return
- self.sendCompleteEvent('zuul:launcher-shutdown',
- 'SUCCESS', {})
-
- def runJob(self, job, args):
- self.ansible_job_proc = None
- self.ansible_post_proc = None
- result = None
- with self.running_job_lock:
- if not self._running:
- return result
- self._running_job = True
- self._job_complete_event.clear()
-
- self.log.debug("Job %s: beginning" % (job.unique,))
- self.builds[job.unique] = self.name
- with JobDir(self.keep_jobdir) as jobdir:
- self.log.debug("Job %s: job root at %s" %
- (job.unique, jobdir.root))
- timeout = self.prepareAnsibleFiles(jobdir, job, args)
-
- data = {
- 'manager': self.manager_name,
- 'number': job.unique,
- 'url': 'telnet://%s:8088' % self.host,
- }
- job.sendWorkData(json.dumps(data))
- job.sendWorkStatus(0, 100)
-
- job_status = self.runAnsiblePlaybook(jobdir, timeout)
- if job_status is None:
- # The result of the job is indeterminate. Zuul will
- # run it again.
- return result
-
- post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
- if not post_status:
- result = 'POST_FAILURE'
- elif job_status:
- result = 'SUCCESS'
- else:
- result = 'FAILURE'
-
- if self._aborted_job:
- # A Null result will cause zuul to relaunch the job if
- # it needs to.
- result = None
-
- return result
-
- def getHostList(self):
- return [('node', dict(
- ansible_host=self.host, ansible_user=self.username))]
-
- def _substituteVariables(self, text, variables):
- def lookup(match):
- return variables.get(match.group(1), '')
- return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
-
- def _getRsyncOptions(self, source, parameters):
- # Treat the publisher source as a filter; ant and rsync behave
- # fairly close in this manner, except for leading directories.
- source = self._substituteVariables(source, parameters)
- # If the source starts with ** then we want to match any
- # number of directories, so don't anchor the include filter.
- # If it does not start with **, then the intent is likely to
- # at least start by matching an immediate file or subdirectory
- # (even if later we have a ** in the middle), so in this case,
- # anchor it to the root of the transfer (the workspace).
- if not source.startswith('**'):
- source = os.path.join('/', source)
- # These options mean: include the thing we want, include any
- # directories (so that we continue to search for the thing we
- # want no matter how deep it is), exclude anything that
- # doesn't match the thing we want or is a directory, then get
- # rid of empty directories left over at the end.
- rsync_opts = ['--include="%s"' % source,
- '--include="*/"',
- '--exclude="*"',
- '--prune-empty-dirs']
- return rsync_opts
-
- def _makeSCPTask(self, jobdir, publisher, parameters):
- tasks = []
- for scpfile in publisher['scp']['files']:
- scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
- os.chmod(scproot, 0o755)
-
- site = publisher['scp']['site']
- if scpfile.get('copy-console'):
- # Include the local ansible directory in the console
- # upload. This uploads the playbook and ansible logs.
- copyargs = dict(src=jobdir.ansible_root + '/',
- dest=os.path.join(scproot, '_zuul_ansible'))
- task = dict(copy=copyargs,
- delegate_to='127.0.0.1')
- tasks.append(task)
-
- # Fetch the console log from the remote host.
- src = '/tmp/console.html'
- rsync_opts = []
- else:
- src = parameters['WORKSPACE']
- if not src.endswith('/'):
- src = src + '/'
- rsync_opts = self._getRsyncOptions(scpfile['source'],
- parameters)
-
- syncargs = dict(src=src,
- dest=scproot,
- copy_links='yes',
- mode='pull')
- if rsync_opts:
- syncargs['rsync_opts'] = rsync_opts
- task = dict(synchronize=syncargs)
- if not scpfile.get('copy-after-failure'):
- task['when'] = 'success'
- tasks.append(task)
-
- task = self._makeSCPTaskLocalAction(
- site, scpfile, scproot, parameters)
- tasks.append(task)
- return tasks
-
- def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
- if site not in self.sites:
- raise Exception("Undefined SCP site: %s" % (site,))
- site = self.sites[site]
- dest = scpfile['target'].lstrip('/')
- dest = self._substituteVariables(dest, parameters)
- dest = os.path.join(site['root'], dest)
- dest = os.path.normpath(dest)
- if not dest.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (dest,))
-
- rsync_cmd = [
- '/usr/bin/rsync', '--delay-updates', '-F',
- '--compress', '-rt', '--safe-links',
- '--rsync-path="mkdir -p {dest} && rsync"',
- '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
- '-o StrictHostKeyChecking=no -q"',
- '--out-format="<<CHANGED>>%i %n%L"',
- '{source}', '"{user}@{host}:{dest}"'
- ]
- if scpfile.get('keep-hierarchy'):
- source = '"%s/"' % scproot
- else:
- source = '`/usr/bin/find "%s" -type f`' % scproot
- shellargs = ' '.join(rsync_cmd).format(
- source=source,
- dest=dest,
- private_key_file=self.private_key_file,
- host=site['host'],
- user=site['user'])
- task = dict(shell=shellargs,
- delegate_to='127.0.0.1')
- if not scpfile.get('copy-after-failure'):
- task['when'] = 'success'
-
- return task
-
- def _makeFTPTask(self, jobdir, publisher, parameters):
- tasks = []
- ftp = publisher['ftp']
- site = ftp['site']
- if site not in self.sites:
- raise Exception("Undefined FTP site: %s" % site)
- site = self.sites[site]
-
- ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
- ftpcontent = os.path.join(ftproot, 'content')
- os.makedirs(ftpcontent)
- ftpscript = os.path.join(ftproot, 'script')
-
- src = parameters['WORKSPACE']
- if not src.endswith('/'):
- src = src + '/'
- rsync_opts = self._getRsyncOptions(ftp['source'],
- parameters)
- syncargs = dict(src=src,
- dest=ftpcontent,
- copy_links='yes',
- mode='pull')
- if rsync_opts:
- syncargs['rsync_opts'] = rsync_opts
- task = dict(synchronize=syncargs,
- when='success')
- tasks.append(task)
- task = dict(shell='lftp -f %s' % ftpscript,
- when='success',
- delegate_to='127.0.0.1')
- ftpsource = ftpcontent
- if ftp.get('remove-prefix'):
- ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
- while ftpsource[-1] == '/':
- ftpsource = ftpsource[:-1]
- ftptarget = ftp['target'].lstrip('/')
- ftptarget = self._substituteVariables(ftptarget, parameters)
- ftptarget = os.path.join(site['root'], ftptarget)
- ftptarget = os.path.normpath(ftptarget)
- if not ftptarget.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (ftptarget,))
- while ftptarget[-1] == '/':
- ftptarget = ftptarget[:-1]
- with open(ftpscript, 'w') as script:
- script.write('open %s\n' % site['host'])
- script.write('user %s %s\n' % (site['user'], site['pass']))
- script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
- tasks.append(task)
- return tasks
-
- def _makeBuilderTask(self, jobdir, builder, parameters):
- tasks = []
- script_fn = '%s.sh' % str(uuid.uuid4().hex)
- script_path = os.path.join(jobdir.script_root, script_fn)
- with open(script_path, 'w') as script:
- data = builder['shell']
- if not data.startswith('#!'):
- data = '#!/bin/bash -x\n %s' % (data,)
- script.write(data)
-
- remote_path = os.path.join('/tmp', script_fn)
- copy = dict(src=script_path,
- dest=remote_path,
- mode=0o555)
- task = dict(copy=copy)
- tasks.append(task)
-
- runner = dict(command=remote_path,
- cwd=parameters['WORKSPACE'],
- parameters=parameters)
- task = dict(zuul_runner=runner)
- task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
- 'second timeout')
- task['when'] = '{{ elapsed_time < timeout | int }}'
- task['async'] = '{{ timeout | int - elapsed_time }}'
- task['poll'] = 5
- tasks.append(task)
-
- filetask = dict(path=remote_path,
- state='absent')
- task = dict(file=filetask)
- tasks.append(task)
-
- return tasks
-
- def _transformPublishers(self, jjb_job):
- early_publishers = []
- late_publishers = []
- old_publishers = jjb_job.get('publishers', [])
- for publisher in old_publishers:
- early_scpfiles = []
- late_scpfiles = []
- if 'scp' not in publisher:
- early_publishers.append(publisher)
- continue
- copy_console = False
- for scpfile in publisher['scp']['files']:
- if scpfile.get('copy-console'):
- scpfile['keep-hierarchy'] = True
- late_scpfiles.append(scpfile)
- copy_console = True
- else:
- early_scpfiles.append(scpfile)
- publisher['scp']['files'] = early_scpfiles + late_scpfiles
- if copy_console:
- late_publishers.append(publisher)
- else:
- early_publishers.append(publisher)
- publishers = early_publishers + late_publishers
- if old_publishers != publishers:
- self.log.debug("Transformed job publishers")
- return early_publishers, late_publishers
-
- def prepareAnsibleFiles(self, jobdir, gearman_job, args):
- job_name = gearman_job.name.split(':')[1]
- jjb_job = self.jobs[job_name]
-
- parameters = args.copy()
- parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
-
- with open(jobdir.inventory, 'w') as inventory:
- for host_name, host_vars in self.getHostList():
- inventory.write(host_name)
- for k, v in host_vars.items():
- inventory.write(' %s=%s' % (k, v))
- inventory.write('\n')
-
- timeout = None
- timeout_var = None
- for wrapper in jjb_job.get('wrappers', []):
- if isinstance(wrapper, dict):
- build_timeout = wrapper.get('timeout')
- if isinstance(build_timeout, dict):
- timeout_var = build_timeout.get('timeout-var')
- timeout = build_timeout.get('timeout')
- if timeout is not None:
- timeout = int(timeout) * 60
- if not timeout:
- timeout = ANSIBLE_DEFAULT_TIMEOUT
- if timeout_var:
- parameters[timeout_var] = str(timeout * 1000)
-
- with open(jobdir.playbook, 'w') as playbook:
- pre_tasks = []
- tasks = []
- main_block = []
- error_block = []
- variables = []
-
- shellargs = "ssh-keyscan %s > %s" % (
- self.host, jobdir.known_hosts)
- pre_tasks.append(dict(shell=shellargs,
- delegate_to='127.0.0.1'))
-
- tasks.append(dict(block=main_block,
- rescue=error_block))
-
- task = dict(file=dict(path='/tmp/console.html', state='absent'))
- main_block.append(task)
-
- task = dict(zuul_console=dict(path='/tmp/console.html', port=8088))
- main_block.append(task)
-
- task = dict(file=dict(path=parameters['WORKSPACE'],
- state='directory'))
- main_block.append(task)
-
- msg = [
- "Launched by %s" % self.manager_name,
- "Building remotely on %s in workspace %s" % (
- self.name, parameters['WORKSPACE'])]
- task = dict(zuul_log=dict(msg=msg))
- main_block.append(task)
-
- for builder in jjb_job.get('builders', []):
- if 'shell' in builder:
- main_block.extend(
- self._makeBuilderTask(jobdir, builder, parameters))
- task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
- main_block.append(task)
-
- task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
- error_block.append(task)
- error_block.append(dict(fail=dict(msg='FAILURE')))
-
- variables.append(dict(timeout=timeout))
- play = dict(hosts='node', name='Job body', vars=variables,
- pre_tasks=pre_tasks, tasks=tasks)
- playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
- early_publishers, late_publishers = self._transformPublishers(jjb_job)
-
- with open(jobdir.post_playbook, 'w') as playbook:
- blocks = []
- for publishers in [early_publishers, late_publishers]:
- block = []
- for publisher in publishers:
- if 'scp' in publisher:
- block.extend(self._makeSCPTask(jobdir, publisher,
- parameters))
- if 'ftp' in publisher:
- block.extend(self._makeFTPTask(jobdir, publisher,
- parameters))
- blocks.append(block)
-
- # The 'always' section contains the log publishing tasks,
- # the 'block' contains all the other publishers. This way
- # we run the log publisher regardless of whether the rest
- # of the publishers succeed.
- tasks = []
- tasks.append(dict(block=blocks[0],
- always=blocks[1]))
-
- play = dict(hosts='node', name='Publishers',
- tasks=tasks)
- playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
- with open(jobdir.config, 'w') as config:
- config.write('[defaults]\n')
- config.write('hostfile = %s\n' % jobdir.inventory)
- config.write('keep_remote_files = True\n')
- config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root)
- config.write('private_key_file = %s\n' % self.private_key_file)
- config.write('retry_files_enabled = False\n')
- config.write('log_path = %s\n' % jobdir.ansible_log)
- config.write('gathering = explicit\n')
- config.write('callback_plugins = %s\n' % self.callback_dir)
- config.write('library = %s\n' % self.library_dir)
-
- config.write('[ssh_connection]\n')
- ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
- "-o UserKnownHostsFile=%s" % jobdir.known_hosts
- config.write('ssh_args = %s\n' % ssh_args)
-
- return timeout
-
- def _ansibleTimeout(self, proc, msg):
- self.log.warning(msg)
- self.abortRunningProc(proc)
-
- def runAnsiblePlaybook(self, jobdir, timeout):
- # Set LOGNAME env variable so Ansible log_path log reports
- # the correct user.
- env_copy = os.environ.copy()
- env_copy['LOGNAME'] = 'zuul'
-
- if self.options['verbose']:
- verbose = '-vvv'
- else:
- verbose = '-v'
-
- cmd = ['ansible-playbook', jobdir.playbook, verbose]
- self.log.debug("Ansible command: %s" % (cmd,))
-
- self.ansible_job_proc = subprocess.Popen(
- cmd,
- cwd=jobdir.ansible_root,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- preexec_fn=os.setsid,
- env=env_copy,
- )
- ret = None
- watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
- self._ansibleTimeout,
- (self.ansible_job_proc,
- "Ansible timeout exceeded"))
- watchdog.start()
- try:
- for line in iter(self.ansible_job_proc.stdout.readline, b''):
- line = line[:1024].rstrip()
- self.log.debug("Ansible output: %s" % (line,))
- ret = self.ansible_job_proc.wait()
- finally:
- watchdog.stop()
- self.log.debug("Ansible exit code: %s" % (ret,))
- self.ansible_job_proc = None
- if ret == 3:
- # AnsibleHostUnreachable: We had a network issue connecting to
- # our zuul-worker.
- return None
- elif ret == -9:
- # Received abort request.
- return None
- return ret == 0
-
- def runAnsiblePostPlaybook(self, jobdir, success):
- # Set LOGNAME env variable so Ansible log_path log reports
- # the correct user.
- env_copy = os.environ.copy()
- env_copy['LOGNAME'] = 'zuul'
-
- if self.options['verbose']:
- verbose = '-vvv'
- else:
- verbose = '-v'
-
- cmd = ['ansible-playbook', jobdir.post_playbook,
- '-e', 'success=%s' % success, verbose]
- self.log.debug("Ansible post command: %s" % (cmd,))
-
- self.ansible_post_proc = subprocess.Popen(
- cmd,
- cwd=jobdir.ansible_root,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- preexec_fn=os.setsid,
- env=env_copy,
- )
- ret = None
- watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
- self._ansibleTimeout,
- (self.ansible_post_proc,
- "Ansible post timeout exceeded"))
- watchdog.start()
- try:
- for line in iter(self.ansible_post_proc.stdout.readline, b''):
- line = line[:1024].rstrip()
- self.log.debug("Ansible post output: %s" % (line,))
- ret = self.ansible_post_proc.wait()
- finally:
- watchdog.stop()
- self.log.debug("Ansible post exit code: %s" % (ret,))
- self.ansible_post_proc = None
- return ret == 0
-
-
-class JJB(jenkins_jobs.builder.Builder):
- def __init__(self):
- self.global_config = None
- self._plugins_list = []
-
- def expandComponent(self, component_type, component, template_data):
- component_list_type = component_type + 's'
- new_components = []
- if isinstance(component, dict):
- name, component_data = next(iter(component.items()))
- if template_data:
- component_data = jenkins_jobs.formatter.deep_format(
- component_data, template_data, True)
- else:
- name = component
- component_data = {}
-
- new_component = self.parser.data.get(component_type, {}).get(name)
- if new_component:
- for new_sub_component in new_component[component_list_type]:
- new_components.extend(
- self.expandComponent(component_type,
- new_sub_component, component_data))
- else:
- new_components.append({name: component_data})
- return new_components
-
- def expandMacros(self, job):
- for component_type in ['builder', 'publisher', 'wrapper']:
- component_list_type = component_type + 's'
- new_components = []
- for new_component in job.get(component_list_type, []):
- new_components.extend(self.expandComponent(component_type,
- new_component, {}))
- job[component_list_type] = new_components