| # Copyright 2014 OpenStack Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import logging |
| import os |
| import re |
| import shutil |
| import signal |
| import socket |
| import subprocess |
| import tempfile |
| import threading |
| import time |
| import traceback |
| import Queue |
| import uuid |
| |
| import gear |
| import yaml |
| import jenkins_jobs.builder |
| import jenkins_jobs.formatter |
| import zmq |
| |
| import zuul.ansible.library |
| import zuul.ansible.plugins.callback_plugins |
| from zuul.lib import commandsocket |
| |
| ANSIBLE_WATCHDOG_GRACE = 5 * 60 |
| ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60 |
| ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60 |
| |
| |
| COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful', |
| 'verbose', 'unverbose'] |
| |
| |
| def boolify(x): |
| if isinstance(x, str): |
| return bool(int(x)) |
| return bool(x) |
| |
| |
| class LaunchGearWorker(gear.Worker): |
| def __init__(self, *args, **kw): |
| self.__launch_server = kw.pop('launch_server') |
| super(LaunchGearWorker, self).__init__(*args, **kw) |
| |
| def handleNoop(self, packet): |
| workers = len(self.__launch_server.node_workers) |
| delay = (workers ** 2) / 1000.0 |
| time.sleep(delay) |
| return super(LaunchGearWorker, self).handleNoop(packet) |
| |
| |
| class NodeGearWorker(gear.Worker): |
| MASS_DO = 101 |
| |
| def sendMassDo(self, functions): |
| data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions]) |
| self.broadcast_lock.acquire() |
| try: |
| p = gear.Packet(gear.constants.REQ, self.MASS_DO, data) |
| self.broadcast(p) |
| finally: |
| self.broadcast_lock.release() |
| |
| |
| class Watchdog(object): |
| def __init__(self, timeout, function, args): |
| self.timeout = timeout |
| self.function = function |
| self.args = args |
| self.thread = threading.Thread(target=self._run) |
| self.thread.daemon = True |
| |
| def _run(self): |
| while self._running and time.time() < self.end: |
| time.sleep(10) |
| if self._running: |
| self.function(*self.args) |
| |
| def start(self): |
| self._running = True |
| self.end = time.time() + self.timeout |
| self.thread.start() |
| |
| def stop(self): |
| self._running = False |
| |
| |
| class JobDir(object): |
| def __init__(self, keep=False): |
| self.keep = keep |
| self.root = tempfile.mkdtemp() |
| self.ansible_root = os.path.join(self.root, 'ansible') |
| os.makedirs(self.ansible_root) |
| self.known_hosts = os.path.join(self.ansible_root, 'known_hosts') |
| self.inventory = os.path.join(self.ansible_root, 'inventory') |
| self.playbook = os.path.join(self.ansible_root, 'playbook') |
| self.post_playbook = os.path.join(self.ansible_root, 'post_playbook') |
| self.config = os.path.join(self.ansible_root, 'ansible.cfg') |
| self.script_root = os.path.join(self.ansible_root, 'scripts') |
| self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt') |
| os.makedirs(self.script_root) |
| self.staging_root = os.path.join(self.root, 'staging') |
| os.makedirs(self.staging_root) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, etype, value, tb): |
| if not self.keep: |
| shutil.rmtree(self.root) |
| |
| |
| class LaunchServer(object): |
| log = logging.getLogger("zuul.LaunchServer") |
| site_section_re = re.compile('site "(.*?)"') |
| node_section_re = re.compile('node "(.*?)"') |
| |
| def __init__(self, config, keep_jobdir=False): |
| self.config = config |
| self.options = dict( |
| verbose=False |
| ) |
| self.keep_jobdir = keep_jobdir |
| self.hostname = socket.gethostname() |
| self.registered_functions = set() |
| self.node_workers = {} |
| self.jobs = {} |
| self.builds = {} |
| self.zmq_send_queue = Queue.Queue() |
| self.termination_queue = Queue.Queue() |
| self.sites = {} |
| self.static_nodes = {} |
| self.command_map = dict( |
| reconfigure=self.reconfigure, |
| stop=self.stop, |
| pause=self.pause, |
| unpause=self.unpause, |
| release=self.release, |
| graceful=self.graceful, |
| verbose=self.verboseOn, |
| unverbose=self.verboseOff, |
| ) |
| |
| if config.has_option('launcher', 'accept_nodes'): |
| self.accept_nodes = config.getboolean('launcher', |
| 'accept_nodes') |
| else: |
| self.accept_nodes = True |
| |
| if self.config.has_option('zuul', 'state_dir'): |
| state_dir = os.path.expanduser( |
| self.config.get('zuul', 'state_dir')) |
| else: |
| state_dir = '/var/lib/zuul' |
| path = os.path.join(state_dir, 'launcher.socket') |
| self.command_socket = commandsocket.CommandSocket(path) |
| ansible_dir = os.path.join(state_dir, 'ansible') |
| plugins_dir = os.path.join(ansible_dir, 'plugins') |
| self.callback_dir = os.path.join(plugins_dir, 'callback_plugins') |
| if not os.path.exists(self.callback_dir): |
| os.makedirs(self.callback_dir) |
| self.library_dir = os.path.join(ansible_dir, 'library') |
| if not os.path.exists(self.library_dir): |
| os.makedirs(self.library_dir) |
| |
| callback_path = os.path.dirname(os.path.abspath( |
| zuul.ansible.plugins.callback_plugins.__file__)) |
| for fn in os.listdir(callback_path): |
| shutil.copy(os.path.join(callback_path, fn), self.callback_dir) |
| |
| library_path = os.path.dirname(os.path.abspath( |
| zuul.ansible.library.__file__)) |
| for fn in os.listdir(library_path): |
| shutil.copy(os.path.join(library_path, fn), self.library_dir) |
| |
| for section in config.sections(): |
| m = self.site_section_re.match(section) |
| if m: |
| sitename = m.group(1) |
| d = {} |
| d['host'] = config.get(section, 'host') |
| d['user'] = config.get(section, 'user') |
| if config.has_option(section, 'pass'): |
| d['pass'] = config.get(section, 'pass') |
| else: |
| d['pass'] = '' |
| if config.has_option(section, 'root'): |
| d['root'] = config.get(section, 'root') |
| else: |
| d['root'] = '/' |
| self.sites[sitename] = d |
| continue |
| m = self.node_section_re.match(section) |
| if m: |
| nodename = m.group(1) |
| d = {} |
| d['name'] = nodename |
| d['host'] = config.get(section, 'host') |
| if config.has_option(section, 'description'): |
| d['description'] = config.get(section, 'description') |
| else: |
| d['description'] = '' |
| if config.has_option(section, 'labels'): |
| d['labels'] = config.get(section, 'labels').split(',') |
| else: |
| d['labels'] = [] |
| self.static_nodes[nodename] = d |
| continue |
| |
| def start(self): |
| self._gearman_running = True |
| self._zmq_running = True |
| self._reaper_running = True |
| self._command_running = True |
| |
| # Setup ZMQ |
| self.zcontext = zmq.Context() |
| self.zsocket = self.zcontext.socket(zmq.PUB) |
| self.zsocket.bind("tcp://*:8888") |
| |
| # Setup Gearman |
| server = self.config.get('gearman', 'server') |
| if self.config.has_option('gearman', 'port'): |
| port = self.config.get('gearman', 'port') |
| else: |
| port = 4730 |
| self.worker = LaunchGearWorker('Zuul Launch Server', |
| launch_server=self) |
| self.worker.addServer(server, port) |
| self.log.debug("Waiting for server") |
| self.worker.waitForServer() |
| self.log.debug("Registering") |
| self.register() |
| |
| # Start command socket |
| self.log.debug("Starting command processor") |
| self.command_socket.start() |
| self.command_thread = threading.Thread(target=self.runCommand) |
| self.command_thread.daemon = True |
| self.command_thread.start() |
| |
| # Load JJB config |
| self.loadJobs() |
| |
| # Start ZMQ worker thread |
| self.log.debug("Starting ZMQ processor") |
| self.zmq_thread = threading.Thread(target=self.runZMQ) |
| self.zmq_thread.daemon = True |
| self.zmq_thread.start() |
| |
| # Start node worker reaper thread |
| self.log.debug("Starting reaper") |
| self.reaper_thread = threading.Thread(target=self.runReaper) |
| self.reaper_thread.daemon = True |
| self.reaper_thread.start() |
| |
| # Start Gearman worker thread |
| self.log.debug("Starting worker") |
| self.gearman_thread = threading.Thread(target=self.run) |
| self.gearman_thread.daemon = True |
| self.gearman_thread.start() |
| |
| # Start static workers |
| for node in self.static_nodes.values(): |
| self.log.debug("Creating static node with arguments: %s" % (node,)) |
| self._launchWorker(node) |
| |
| def loadJobs(self): |
| self.log.debug("Loading jobs") |
| builder = JJB() |
| path = self.config.get('launcher', 'jenkins_jobs') |
| builder.load_files([path]) |
| builder.parser.expandYaml() |
| unseen = set(self.jobs.keys()) |
| for job in builder.parser.jobs: |
| builder.expandMacros(job) |
| self.jobs[job['name']] = job |
| unseen.discard(job['name']) |
| for name in unseen: |
| del self.jobs[name] |
| |
| def register(self): |
| new_functions = set() |
| if self.accept_nodes: |
| new_functions.add("node_assign:zuul") |
| new_functions.add("stop:%s" % self.hostname) |
| new_functions.add("set_description:%s" % self.hostname) |
| new_functions.add("node_revoke:%s" % self.hostname) |
| |
| for function in new_functions - self.registered_functions: |
| self.worker.registerFunction(function) |
| for function in self.registered_functions - new_functions: |
| self.worker.unRegisterFunction(function) |
| self.registered_functions = new_functions |
| |
| def reconfigure(self): |
| self.log.debug("Reconfiguring") |
| self.loadJobs() |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='reconfigure')) |
| except Exception: |
| self.log.exception("Exception sending reconfigure command " |
| "to worker:") |
| self.log.debug("Reconfiguration complete") |
| |
| def pause(self): |
| self.log.debug("Pausing") |
| self.accept_nodes = False |
| self.register() |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='pause')) |
| except Exception: |
| self.log.exception("Exception sending pause command " |
| "to worker:") |
| self.log.debug("Paused") |
| |
| def unpause(self): |
| self.log.debug("Unpausing") |
| self.accept_nodes = True |
| self.register() |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='unpause')) |
| except Exception: |
| self.log.exception("Exception sending unpause command " |
| "to worker:") |
| self.log.debug("Unpaused") |
| |
| def release(self): |
| self.log.debug("Releasing idle nodes") |
| for node in self.node_workers.values(): |
| if node.name in self.static_nodes: |
| continue |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='release')) |
| except Exception: |
| self.log.exception("Exception sending release command " |
| "to worker:") |
| self.log.debug("Finished releasing idle nodes") |
| |
| def graceful(self): |
| # Note: this is run in the command processing thread; no more |
| # external commands will be processed after this. |
| self.log.debug("Gracefully stopping") |
| self.pause() |
| self.release() |
| self.log.debug("Waiting for all builds to finish") |
| while self.builds: |
| time.sleep(5) |
| self.log.debug("All builds are finished") |
| self.stop() |
| |
| def stop(self): |
| self.log.debug("Stopping") |
| # First, stop accepting new jobs |
| self._gearman_running = False |
| self._reaper_running = False |
| self.worker.shutdown() |
| # Then stop all of the workers |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.stop() |
| except Exception: |
| self.log.exception("Exception sending stop command to worker:") |
| # Stop ZMQ afterwords so that the send queue is flushed |
| self._zmq_running = False |
| self.zmq_send_queue.put(None) |
| self.zmq_send_queue.join() |
| # Stop command processing |
| self._command_running = False |
| self.command_socket.stop() |
| # Join the gearman thread which was stopped earlier. |
| self.gearman_thread.join() |
| # The command thread is joined in the join() method of this |
| # class, which is called by the command shell. |
| self.log.debug("Stopped") |
| |
| def verboseOn(self): |
| self.log.debug("Enabling verbose mode") |
| self.options['verbose'] = True |
| |
| def verboseOff(self): |
| self.log.debug("Disabling verbose mode") |
| self.options['verbose'] = False |
| |
| def join(self): |
| self.command_thread.join() |
| |
| def runCommand(self): |
| while self._command_running: |
| try: |
| command = self.command_socket.get() |
| self.command_map[command]() |
| except Exception: |
| self.log.exception("Exception while processing command") |
| |
| def runZMQ(self): |
| while self._zmq_running or not self.zmq_send_queue.empty(): |
| try: |
| item = self.zmq_send_queue.get() |
| self.log.debug("Got ZMQ event %s" % (item,)) |
| if item is None: |
| continue |
| self.zsocket.send(item) |
| except Exception: |
| self.log.exception("Exception while processing ZMQ events") |
| finally: |
| self.zmq_send_queue.task_done() |
| |
| def run(self): |
| while self._gearman_running: |
| try: |
| job = self.worker.getJob() |
| try: |
| if job.name.startswith('node_assign:'): |
| self.log.debug("Got node_assign job: %s" % job.unique) |
| self.assignNode(job) |
| elif job.name.startswith('stop:'): |
| self.log.debug("Got stop job: %s" % job.unique) |
| self.stopJob(job) |
| elif job.name.startswith('set_description:'): |
| self.log.debug("Got set_description job: %s" % |
| job.unique) |
| job.sendWorkComplete() |
| elif job.name.startswith('node_revoke:'): |
| self.log.debug("Got node_revoke job: %s" % job.unique) |
| self.revokeNode(job) |
| else: |
| self.log.error("Unable to handle job %s" % job.name) |
| job.sendWorkFail() |
| except Exception: |
| self.log.exception("Exception while running job") |
| job.sendWorkException(traceback.format_exc()) |
| except gear.InterruptedError: |
| return |
| except Exception: |
| self.log.exception("Exception while getting job") |
| |
| def assignNode(self, job): |
| args = json.loads(job.arguments) |
| self.log.debug("Assigned node with arguments: %s" % (args,)) |
| self._launchWorker(args) |
| data = dict(manager=self.hostname) |
| job.sendWorkData(json.dumps(data)) |
| job.sendWorkComplete() |
| |
| def _launchWorker(self, args): |
| worker = NodeWorker(self.config, self.jobs, self.builds, |
| self.sites, args['name'], args['host'], |
| args['description'], args['labels'], |
| self.hostname, self.zmq_send_queue, |
| self.termination_queue, self.keep_jobdir, |
| self.callback_dir, self.library_dir, |
| self.options) |
| self.node_workers[worker.name] = worker |
| |
| worker.thread = threading.Thread(target=worker.run) |
| worker.thread.start() |
| |
| def revokeNode(self, job): |
| try: |
| args = json.loads(job.arguments) |
| self.log.debug("Revoke job with arguments: %s" % (args,)) |
| name = args['name'] |
| node = self.node_workers.get(name) |
| if not node: |
| self.log.debug("Unable to find worker %s" % (name,)) |
| return |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='stop')) |
| else: |
| self.log.debug("Node %s is not alive while revoking node" % |
| (node.name,)) |
| except Exception: |
| self.log.exception("Exception sending stop command " |
| "to worker:") |
| finally: |
| job.sendWorkComplete() |
| |
| def stopJob(self, job): |
| try: |
| args = json.loads(job.arguments) |
| self.log.debug("Stop job with arguments: %s" % (args,)) |
| unique = args['number'] |
| build_worker_name = self.builds.get(unique) |
| if not build_worker_name: |
| self.log.debug("Unable to find build for job %s" % (unique,)) |
| return |
| node = self.node_workers.get(build_worker_name) |
| if not node: |
| self.log.debug("Unable to find worker for job %s" % (unique,)) |
| return |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='abort')) |
| else: |
| self.log.debug("Node %s is not alive while aborting job" % |
| (node.name,)) |
| except Exception: |
| self.log.exception("Exception sending abort command " |
| "to worker:") |
| finally: |
| job.sendWorkComplete() |
| |
| def runReaper(self): |
| # We don't actually care if all the events are processed |
| while self._reaper_running: |
| try: |
| item = self.termination_queue.get() |
| self.log.debug("Got termination event %s" % (item,)) |
| if item is None: |
| continue |
| worker = self.node_workers[item] |
| self.log.debug("Joining %s" % (item,)) |
| worker.thread.join() |
| self.log.debug("Joined %s" % (item,)) |
| del self.node_workers[item] |
| except Exception: |
| self.log.exception("Exception while processing " |
| "termination events:") |
| finally: |
| self.termination_queue.task_done() |
| |
| |
| class NodeWorker(object): |
| def __init__(self, config, jobs, builds, sites, name, host, |
| description, labels, manager_name, zmq_send_queue, |
| termination_queue, keep_jobdir, callback_dir, |
| library_dir, options): |
| self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,)) |
| self.log.debug("Creating node worker %s" % (name,)) |
| self.config = config |
| self.jobs = jobs |
| self.builds = builds |
| self.sites = sites |
| self.name = name |
| self.host = host |
| self.description = description |
| if not isinstance(labels, list): |
| labels = [labels] |
| self.labels = labels |
| self.thread = None |
| self.registered_functions = set() |
| # If the unpaused Event is set, that means we should run jobs. |
| # If it is clear, then we are paused and should not run jobs. |
| self.unpaused = threading.Event() |
| self.unpaused.set() |
| self._running = True |
| self.queue = Queue.Queue() |
| self.manager_name = manager_name |
| self.zmq_send_queue = zmq_send_queue |
| self.termination_queue = termination_queue |
| self.keep_jobdir = keep_jobdir |
| self.running_job_lock = threading.Lock() |
| self.pending_registration = False |
| self.registration_lock = threading.Lock() |
| self._get_job_lock = threading.Lock() |
| self._got_job = False |
| self._job_complete_event = threading.Event() |
| self._running_job = False |
| self._aborted_job = False |
| self._sent_complete_event = False |
| self.ansible_job_proc = None |
| self.ansible_post_proc = None |
| self.workspace_root = config.get('launcher', 'workspace_root') |
| if self.config.has_option('launcher', 'private_key_file'): |
| self.private_key_file = config.get('launcher', 'private_key_file') |
| else: |
| self.private_key_file = '~/.ssh/id_rsa' |
| if self.config.has_option('launcher', 'username'): |
| self.username = config.get('launcher', 'username') |
| else: |
| self.username = 'zuul' |
| self.callback_dir = callback_dir |
| self.library_dir = library_dir |
| self.options = options |
| |
| def isAlive(self): |
| # Meant to be called from the manager |
| if self.thread and self.thread.is_alive(): |
| return True |
| return False |
| |
| def run(self): |
| self.log.debug("Node worker %s starting" % (self.name,)) |
| server = self.config.get('gearman', 'server') |
| if self.config.has_option('gearman', 'port'): |
| port = self.config.get('gearman', 'port') |
| else: |
| port = 4730 |
| self.worker = NodeGearWorker(self.name) |
| self.worker.addServer(server, port) |
| self.log.debug("Waiting for server") |
| self.worker.waitForServer() |
| self.log.debug("Registering") |
| self.register() |
| |
| self.gearman_thread = threading.Thread(target=self.runGearman) |
| self.gearman_thread.daemon = True |
| self.gearman_thread.start() |
| |
| self.log.debug("Started") |
| |
| while self._running or not self.queue.empty(): |
| try: |
| self._runQueue() |
| except Exception: |
| self.log.exception("Exception in queue manager:") |
| |
| def stop(self): |
| # If this is called locally, setting _running will be |
| # effictive, if it's called remotely, it will not be, but it |
| # will be set by the queue thread. |
| self.log.debug("Submitting stop request") |
| self._running = False |
| self.unpaused.set() |
| self.queue.put(dict(action='stop')) |
| self.queue.join() |
| |
| def pause(self): |
| self.unpaused.clear() |
| self.worker.stopWaitingForJobs() |
| |
| def unpause(self): |
| self.unpaused.set() |
| |
| def release(self): |
| # If this node is idle, stop it. |
| old_unpaused = self.unpaused.is_set() |
| if old_unpaused: |
| self.pause() |
| with self._get_job_lock: |
| if self._got_job: |
| self.log.debug("This worker is not idle") |
| if old_unpaused: |
| self.unpause() |
| return |
| self.log.debug("Stopping due to release command") |
| self.queue.put(dict(action='stop')) |
| |
| def _runQueue(self): |
| item = self.queue.get() |
| try: |
| if item['action'] == 'stop': |
| self.log.debug("Received stop request") |
| self._running = False |
| self.termination_queue.put(self.name) |
| if not self.abortRunningJob(): |
| self.sendFakeCompleteEvent() |
| else: |
| self._job_complete_event.wait() |
| self.worker.shutdown() |
| if item['action'] == 'pause': |
| self.log.debug("Received pause request") |
| self.pause() |
| if item['action'] == 'unpause': |
| self.log.debug("Received unpause request") |
| self.unpause() |
| if item['action'] == 'release': |
| self.log.debug("Received release request") |
| self.release() |
| elif item['action'] == 'reconfigure': |
| self.log.debug("Received reconfigure request") |
| self.register() |
| elif item['action'] == 'abort': |
| self.log.debug("Received abort request") |
| self.abortRunningJob() |
| finally: |
| self.queue.task_done() |
| |
| def runGearman(self): |
| while self._running: |
| try: |
| self.unpaused.wait() |
| if self._running: |
| self._runGearman() |
| except Exception: |
| self.log.exception("Exception in gearman manager:") |
| with self._get_job_lock: |
| self._got_job = False |
| |
| def _runGearman(self): |
| if self.pending_registration: |
| self.register() |
| with self._get_job_lock: |
| try: |
| job = self.worker.getJob() |
| self._got_job = True |
| except gear.InterruptedError: |
| return |
| self.log.debug("Node worker %s got job %s" % (self.name, job.name)) |
| try: |
| if job.name not in self.registered_functions: |
| self.log.error("Unable to handle job %s" % job.name) |
| job.sendWorkFail() |
| return |
| self.launch(job) |
| except Exception: |
| self.log.exception("Exception while running job") |
| job.sendWorkException(traceback.format_exc()) |
| |
| def generateFunctionNames(self, job): |
| # This only supports "node: foo" and "node: foo || bar" |
| ret = set() |
| job_labels = job.get('node') |
| matching_labels = set() |
| if job_labels: |
| job_labels = [x.strip() for x in job_labels.split('||')] |
| matching_labels = set(self.labels) & set(job_labels) |
| if not matching_labels: |
| return ret |
| ret.add('build:%s' % (job['name'],)) |
| for label in matching_labels: |
| ret.add('build:%s:%s' % (job['name'], label)) |
| return ret |
| |
| def register(self): |
| if not self.registration_lock.acquire(False): |
| self.log.debug("Registration already in progress") |
| return |
| try: |
| if self._running_job: |
| self.pending_registration = True |
| self.log.debug("Ignoring registration due to running job") |
| return |
| self.log.debug("Updating registration") |
| self.pending_registration = False |
| new_functions = set() |
| for job in self.jobs.values(): |
| new_functions |= self.generateFunctionNames(job) |
| self.worker.sendMassDo(new_functions) |
| self.registered_functions = new_functions |
| finally: |
| self.registration_lock.release() |
| |
| def abortRunningJob(self): |
| self._aborted_job = True |
| return self.abortRunningProc(self.ansible_job_proc) |
| |
| def abortRunningProc(self, proc): |
| aborted = False |
| self.log.debug("Abort: acquiring job lock") |
| with self.running_job_lock: |
| if self._running_job: |
| self.log.debug("Abort: a job is running") |
| if proc: |
| self.log.debug("Abort: sending kill signal to job " |
| "process group") |
| try: |
| pgid = os.getpgid(proc.pid) |
| os.killpg(pgid, signal.SIGKILL) |
| aborted = True |
| except Exception: |
| self.log.exception("Exception while killing " |
| "ansible process:") |
| else: |
| self.log.debug("Abort: no job is running") |
| |
| return aborted |
| |
| def launch(self, job): |
| self.log.info("Node worker %s launching job %s" % |
| (self.name, job.name)) |
| |
| # Make sure we can parse what we need from the job first |
| args = json.loads(job.arguments) |
| offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False)) |
| job_name = job.name.split(':')[1] |
| |
| # Initialize the result so we have something regardless of |
| # whether the job actually runs |
| result = None |
| self._sent_complete_event = False |
| self._aborted_job = False |
| |
| try: |
| self.sendStartEvent(job_name, args) |
| except Exception: |
| self.log.exception("Exception while sending job start event") |
| |
| try: |
| result = self.runJob(job, args) |
| except Exception: |
| self.log.exception("Exception while launching job thread") |
| |
| self._running_job = False |
| if not result: |
| result = b'' |
| |
| try: |
| job.sendWorkComplete(result) |
| except Exception: |
| self.log.exception("Exception while sending job completion packet") |
| |
| try: |
| self.sendCompleteEvent(job_name, result, args) |
| except Exception: |
| self.log.exception("Exception while sending job completion event") |
| |
| try: |
| del self.builds[job.unique] |
| except Exception: |
| self.log.exception("Exception while clearing build record") |
| |
| self._job_complete_event.set() |
| if offline and self._running: |
| self.stop() |
| |
| def sendStartEvent(self, name, parameters): |
| build = dict(node_name=self.name, |
| host_name=self.manager_name, |
| parameters=parameters) |
| |
| event = dict(name=name, |
| build=build) |
| |
| item = "onStarted %s" % json.dumps(event) |
| self.log.debug("Sending over ZMQ: %s" % (item,)) |
| self.zmq_send_queue.put(item) |
| |
| def sendCompleteEvent(self, name, status, parameters): |
| build = dict(status=status, |
| node_name=self.name, |
| host_name=self.manager_name, |
| parameters=parameters) |
| |
| event = dict(name=name, |
| build=build) |
| |
| item = "onFinalized %s" % json.dumps(event) |
| self.log.debug("Sending over ZMQ: %s" % (item,)) |
| self.zmq_send_queue.put(item) |
| self._sent_complete_event = True |
| |
| def sendFakeCompleteEvent(self): |
| if self._sent_complete_event: |
| return |
| self.sendCompleteEvent('zuul:launcher-shutdown', |
| 'SUCCESS', {}) |
| |
| def runJob(self, job, args): |
| self.ansible_job_proc = None |
| self.ansible_post_proc = None |
| result = None |
| with self.running_job_lock: |
| if not self._running: |
| return result |
| self._running_job = True |
| self._job_complete_event.clear() |
| |
| self.log.debug("Job %s: beginning" % (job.unique,)) |
| self.builds[job.unique] = self.name |
| with JobDir(self.keep_jobdir) as jobdir: |
| self.log.debug("Job %s: job root at %s" % |
| (job.unique, jobdir.root)) |
| timeout = self.prepareAnsibleFiles(jobdir, job, args) |
| |
| data = { |
| 'manager': self.manager_name, |
| 'number': job.unique, |
| 'url': 'telnet://%s:8088' % self.host, |
| } |
| job.sendWorkData(json.dumps(data)) |
| job.sendWorkStatus(0, 100) |
| |
| job_status = self.runAnsiblePlaybook(jobdir, timeout) |
| if job_status is None: |
| # The result of the job is indeterminate. Zuul will |
| # run it again. |
| return result |
| |
| post_status = self.runAnsiblePostPlaybook(jobdir, job_status) |
| if not post_status: |
| status = 'POST_FAILURE' |
| elif job_status: |
| status = 'SUCCESS' |
| else: |
| status = 'FAILURE' |
| |
| if not self._aborted_job: |
| # A Null result will cause zuul to relaunch the job if |
| # it needs to. |
| result = json.dumps(dict(result=status)) |
| |
| return result |
| |
| def getHostList(self): |
| return [('node', dict( |
| ansible_host=self.host, ansible_user=self.username))] |
| |
| def _substituteVariables(self, text, variables): |
| def lookup(match): |
| return variables.get(match.group(1), '') |
| return re.sub('\$([A-Za-z0-9_]+)', lookup, text) |
| |
| def _getRsyncOptions(self, source, parameters): |
| # Treat the publisher source as a filter; ant and rsync behave |
| # fairly close in this manner, except for leading directories. |
| source = self._substituteVariables(source, parameters) |
| # If the source starts with ** then we want to match any |
| # number of directories, so don't anchor the include filter. |
| # If it does not start with **, then the intent is likely to |
| # at least start by matching an immediate file or subdirectory |
| # (even if later we have a ** in the middle), so in this case, |
| # anchor it to the root of the transfer (the workspace). |
| if not source.startswith('**'): |
| source = os.path.join('/', source) |
| # These options mean: include the thing we want, include any |
| # directories (so that we continue to search for the thing we |
| # want no matter how deep it is), exclude anything that |
| # doesn't match the thing we want or is a directory, then get |
| # rid of empty directories left over at the end. |
| rsync_opts = ['--include="%s"' % source, |
| '--include="*/"', |
| '--exclude="*"', |
| '--prune-empty-dirs'] |
| return rsync_opts |
| |
| def _makeSCPTask(self, jobdir, publisher, parameters): |
| tasks = [] |
| for scpfile in publisher['scp']['files']: |
| scproot = tempfile.mkdtemp(dir=jobdir.staging_root) |
| os.chmod(scproot, 0o755) |
| |
| site = publisher['scp']['site'] |
| if scpfile.get('copy-console'): |
| # Include the local ansible directory in the console |
| # upload. This uploads the playbook and ansible logs. |
| copyargs = dict(src=jobdir.ansible_root + '/', |
| dest=os.path.join(scproot, '_zuul_ansible')) |
| task = dict(copy=copyargs, |
| delegate_to='127.0.0.1') |
| tasks.append(task) |
| |
| # Fetch the console log from the remote host. |
| src = '/tmp/console.html' |
| rsync_opts = [] |
| else: |
| src = parameters['WORKSPACE'] |
| if not src.endswith('/'): |
| src = src + '/' |
| rsync_opts = self._getRsyncOptions(scpfile['source'], |
| parameters) |
| |
| syncargs = dict(src=src, |
| dest=scproot, |
| copy_links='yes', |
| mode='pull') |
| if rsync_opts: |
| syncargs['rsync_opts'] = rsync_opts |
| task = dict(synchronize=syncargs) |
| if not scpfile.get('copy-after-failure'): |
| task['when'] = 'success' |
| tasks.append(task) |
| |
| task = self._makeSCPTaskLocalAction( |
| site, scpfile, scproot, parameters) |
| tasks.append(task) |
| return tasks |
| |
| def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters): |
| if site not in self.sites: |
| raise Exception("Undefined SCP site: %s" % (site,)) |
| site = self.sites[site] |
| dest = scpfile['target'].lstrip('/') |
| dest = self._substituteVariables(dest, parameters) |
| dest = os.path.join(site['root'], dest) |
| dest = os.path.normpath(dest) |
| if not dest.startswith(site['root']): |
| raise Exception("Target path %s is not below site root" % |
| (dest,)) |
| |
| rsync_cmd = [ |
| '/usr/bin/rsync', '--delay-updates', '-F', |
| '--compress', '-rt', '--safe-links', |
| '--rsync-path="mkdir -p {dest} && rsync"', |
| '--rsh="/usr/bin/ssh -i {private_key_file} -S none ' |
| '-o StrictHostKeyChecking=no -q"', |
| '--out-format="<<CHANGED>>%i %n%L"', |
| '{source}', '"{user}@{host}:{dest}"' |
| ] |
| if scpfile.get('keep-hierarchy'): |
| source = '"%s/"' % scproot |
| else: |
| source = '`/usr/bin/find "%s" -type f`' % scproot |
| shellargs = ' '.join(rsync_cmd).format( |
| source=source, |
| dest=dest, |
| private_key_file=self.private_key_file, |
| host=site['host'], |
| user=site['user']) |
| task = dict(shell=shellargs, |
| delegate_to='127.0.0.1') |
| if not scpfile.get('copy-after-failure'): |
| task['when'] = 'success' |
| |
| return task |
| |
| def _makeFTPTask(self, jobdir, publisher, parameters): |
| tasks = [] |
| ftp = publisher['ftp'] |
| site = ftp['site'] |
| if site not in self.sites: |
| raise Exception("Undefined FTP site: %s" % site) |
| site = self.sites[site] |
| |
| ftproot = tempfile.mkdtemp(dir=jobdir.staging_root) |
| ftpcontent = os.path.join(ftproot, 'content') |
| os.makedirs(ftpcontent) |
| ftpscript = os.path.join(ftproot, 'script') |
| |
| src = parameters['WORKSPACE'] |
| if not src.endswith('/'): |
| src = src + '/' |
| rsync_opts = self._getRsyncOptions(ftp['source'], |
| parameters) |
| syncargs = dict(src=src, |
| dest=ftpcontent, |
| copy_links='yes', |
| mode='pull') |
| if rsync_opts: |
| syncargs['rsync_opts'] = rsync_opts |
| task = dict(synchronize=syncargs, |
| when='success') |
| tasks.append(task) |
| task = dict(shell='lftp -f %s' % ftpscript, |
| when='success', |
| delegate_to='127.0.0.1') |
| ftpsource = ftpcontent |
| if ftp.get('remove-prefix'): |
| ftpsource = os.path.join(ftpcontent, ftp['remove-prefix']) |
| while ftpsource[-1] == '/': |
| ftpsource = ftpsource[:-1] |
| ftptarget = ftp['target'].lstrip('/') |
| ftptarget = self._substituteVariables(ftptarget, parameters) |
| ftptarget = os.path.join(site['root'], ftptarget) |
| ftptarget = os.path.normpath(ftptarget) |
| if not ftptarget.startswith(site['root']): |
| raise Exception("Target path %s is not below site root" % |
| (ftptarget,)) |
| while ftptarget[-1] == '/': |
| ftptarget = ftptarget[:-1] |
| with open(ftpscript, 'w') as script: |
| script.write('open %s\n' % site['host']) |
| script.write('user %s %s\n' % (site['user'], site['pass'])) |
| script.write('mirror -R %s %s\n' % (ftpsource, ftptarget)) |
| tasks.append(task) |
| return tasks |
| |
| def _makeBuilderTask(self, jobdir, builder, parameters): |
| tasks = [] |
| script_fn = '%s.sh' % str(uuid.uuid4().hex) |
| script_path = os.path.join(jobdir.script_root, script_fn) |
| with open(script_path, 'w') as script: |
| data = builder['shell'] |
| if not data.startswith('#!'): |
| data = '#!/bin/bash -x\n %s' % (data,) |
| script.write(data) |
| |
| remote_path = os.path.join('/tmp', script_fn) |
| copy = dict(src=script_path, |
| dest=remote_path, |
| mode=0o555) |
| task = dict(copy=copy) |
| tasks.append(task) |
| |
| runner = dict(command=remote_path, |
| cwd=parameters['WORKSPACE'], |
| parameters=parameters) |
| task = dict(zuul_runner=runner) |
| task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} ' |
| 'second timeout') |
| task['when'] = '{{ elapsed_time < timeout | int }}' |
| task['async'] = '{{ timeout | int - elapsed_time }}' |
| task['poll'] = 5 |
| tasks.append(task) |
| |
| filetask = dict(path=remote_path, |
| state='absent') |
| task = dict(file=filetask) |
| tasks.append(task) |
| |
| return tasks |
| |
| def _transformPublishers(self, jjb_job): |
| early_publishers = [] |
| late_publishers = [] |
| old_publishers = jjb_job.get('publishers', []) |
| for publisher in old_publishers: |
| early_scpfiles = [] |
| late_scpfiles = [] |
| if 'scp' not in publisher: |
| early_publishers.append(publisher) |
| continue |
| copy_console = False |
| for scpfile in publisher['scp']['files']: |
| if scpfile.get('copy-console'): |
| scpfile['keep-hierarchy'] = True |
| late_scpfiles.append(scpfile) |
| copy_console = True |
| else: |
| early_scpfiles.append(scpfile) |
| publisher['scp']['files'] = early_scpfiles + late_scpfiles |
| if copy_console: |
| late_publishers.append(publisher) |
| else: |
| early_publishers.append(publisher) |
| publishers = early_publishers + late_publishers |
| if old_publishers != publishers: |
| self.log.debug("Transformed job publishers") |
| return early_publishers, late_publishers |
| |
| def prepareAnsibleFiles(self, jobdir, gearman_job, args): |
| job_name = gearman_job.name.split(':')[1] |
| jjb_job = self.jobs[job_name] |
| |
| parameters = args.copy() |
| parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name) |
| |
| with open(jobdir.inventory, 'w') as inventory: |
| for host_name, host_vars in self.getHostList(): |
| inventory.write(host_name) |
| for k, v in host_vars.items(): |
| inventory.write(' %s=%s' % (k, v)) |
| inventory.write('\n') |
| |
| timeout = None |
| timeout_var = None |
| for wrapper in jjb_job.get('wrappers', []): |
| if isinstance(wrapper, dict): |
| build_timeout = wrapper.get('timeout') |
| if isinstance(build_timeout, dict): |
| timeout_var = build_timeout.get('timeout-var') |
| timeout = build_timeout.get('timeout') |
| if timeout is not None: |
| timeout = int(timeout) * 60 |
| if not timeout: |
| timeout = ANSIBLE_DEFAULT_TIMEOUT |
| if timeout_var: |
| parameters[timeout_var] = str(timeout * 1000) |
| |
| with open(jobdir.playbook, 'w') as playbook: |
| pre_tasks = [] |
| tasks = [] |
| main_block = [] |
| error_block = [] |
| variables = [] |
| |
| shellargs = "ssh-keyscan %s > %s" % ( |
| self.host, jobdir.known_hosts) |
| pre_tasks.append(dict(shell=shellargs, |
| delegate_to='127.0.0.1')) |
| |
| tasks.append(dict(block=main_block, |
| rescue=error_block)) |
| |
| task = dict(file=dict(path='/tmp/console.html', state='absent')) |
| main_block.append(task) |
| |
| task = dict(zuul_console=dict(path='/tmp/console.html', port=8088)) |
| main_block.append(task) |
| |
| task = dict(file=dict(path=parameters['WORKSPACE'], |
| state='directory')) |
| main_block.append(task) |
| |
| msg = [ |
| "Launched by %s" % self.manager_name, |
| "Building remotely on %s in workspace %s" % ( |
| self.name, parameters['WORKSPACE'])] |
| task = dict(zuul_log=dict(msg=msg)) |
| main_block.append(task) |
| |
| for builder in jjb_job.get('builders', []): |
| if 'shell' in builder: |
| main_block.extend( |
| self._makeBuilderTask(jobdir, builder, parameters)) |
| task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS")) |
| main_block.append(task) |
| |
| task = dict(zuul_log=dict(msg="Job complete, result: FAILURE")) |
| error_block.append(task) |
| error_block.append(dict(fail=dict(msg='FAILURE'))) |
| |
| variables.append(dict(timeout=timeout)) |
| play = dict(hosts='node', name='Job body', vars=variables, |
| pre_tasks=pre_tasks, tasks=tasks) |
| playbook.write(yaml.safe_dump([play], default_flow_style=False)) |
| |
| early_publishers, late_publishers = self._transformPublishers(jjb_job) |
| |
| with open(jobdir.post_playbook, 'w') as playbook: |
| blocks = [] |
| for publishers in [early_publishers, late_publishers]: |
| block = [] |
| for publisher in publishers: |
| if 'scp' in publisher: |
| block.extend(self._makeSCPTask(jobdir, publisher, |
| parameters)) |
| if 'ftp' in publisher: |
| block.extend(self._makeFTPTask(jobdir, publisher, |
| parameters)) |
| blocks.append(block) |
| |
| # The 'always' section contains the log publishing tasks, |
| # the 'block' contains all the other publishers. This way |
| # we run the log publisher regardless of whether the rest |
| # of the publishers succeed. |
| tasks = [] |
| tasks.append(dict(block=blocks[0], |
| always=blocks[1])) |
| |
| play = dict(hosts='node', name='Publishers', |
| tasks=tasks) |
| playbook.write(yaml.safe_dump([play], default_flow_style=False)) |
| |
| with open(jobdir.config, 'w') as config: |
| config.write('[defaults]\n') |
| config.write('hostfile = %s\n' % jobdir.inventory) |
| config.write('keep_remote_files = True\n') |
| config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root) |
| config.write('private_key_file = %s\n' % self.private_key_file) |
| config.write('retry_files_enabled = False\n') |
| config.write('log_path = %s\n' % jobdir.ansible_log) |
| config.write('gathering = explicit\n') |
| config.write('callback_plugins = %s\n' % self.callback_dir) |
| config.write('library = %s\n' % self.library_dir) |
| |
| config.write('[ssh_connection]\n') |
| ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \ |
| "-o UserKnownHostsFile=%s" % jobdir.known_hosts |
| config.write('ssh_args = %s\n' % ssh_args) |
| |
| return timeout |
| |
| def _ansibleTimeout(self, proc, msg): |
| self.log.warning(msg) |
| self.abortRunningProc(proc) |
| |
| def runAnsiblePlaybook(self, jobdir, timeout): |
| # Set LOGNAME env variable so Ansible log_path log reports |
| # the correct user. |
| env_copy = os.environ.copy() |
| env_copy['LOGNAME'] = 'zuul' |
| |
| if self.options['verbose']: |
| verbose = '-vvv' |
| else: |
| verbose = '-v' |
| |
| cmd = ['ansible-playbook', jobdir.playbook, verbose] |
| self.log.debug("Ansible command: %s" % (cmd,)) |
| |
| self.ansible_job_proc = subprocess.Popen( |
| cmd, |
| cwd=jobdir.ansible_root, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| preexec_fn=os.setsid, |
| env=env_copy, |
| ) |
| ret = None |
| watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE, |
| self._ansibleTimeout, |
| (self.ansible_job_proc, |
| "Ansible timeout exceeded")) |
| watchdog.start() |
| try: |
| for line in iter(self.ansible_job_proc.stdout.readline, b''): |
| line = line[:1024].rstrip() |
| self.log.debug("Ansible output: %s" % (line,)) |
| ret = self.ansible_job_proc.wait() |
| finally: |
| watchdog.stop() |
| self.log.debug("Ansible exit code: %s" % (ret,)) |
| self.ansible_job_proc = None |
| if ret == 3: |
| # AnsibleHostUnreachable: We had a network issue connecting to |
| # our zuul-worker. |
| return None |
| elif ret == -9: |
| # Received abort request. |
| return None |
| return ret == 0 |
| |
| def runAnsiblePostPlaybook(self, jobdir, success): |
| # Set LOGNAME env variable so Ansible log_path log reports |
| # the correct user. |
| env_copy = os.environ.copy() |
| env_copy['LOGNAME'] = 'zuul' |
| |
| if self.options['verbose']: |
| verbose = '-vvv' |
| else: |
| verbose = '-v' |
| |
| cmd = ['ansible-playbook', jobdir.post_playbook, |
| '-e', 'success=%s' % success, verbose] |
| self.log.debug("Ansible post command: %s" % (cmd,)) |
| |
| self.ansible_post_proc = subprocess.Popen( |
| cmd, |
| cwd=jobdir.ansible_root, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| preexec_fn=os.setsid, |
| env=env_copy, |
| ) |
| ret = None |
| watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT, |
| self._ansibleTimeout, |
| (self.ansible_post_proc, |
| "Ansible post timeout exceeded")) |
| watchdog.start() |
| try: |
| for line in iter(self.ansible_post_proc.stdout.readline, b''): |
| line = line[:1024].rstrip() |
| self.log.debug("Ansible post output: %s" % (line,)) |
| ret = self.ansible_post_proc.wait() |
| finally: |
| watchdog.stop() |
| self.log.debug("Ansible post exit code: %s" % (ret,)) |
| self.ansible_post_proc = None |
| return ret == 0 |
| |
| |
| class JJB(jenkins_jobs.builder.Builder): |
| def __init__(self): |
| self.global_config = None |
| self._plugins_list = [] |
| |
| def expandComponent(self, component_type, component, template_data): |
| component_list_type = component_type + 's' |
| new_components = [] |
| if isinstance(component, dict): |
| name, component_data = next(iter(component.items())) |
| if template_data: |
| component_data = jenkins_jobs.formatter.deep_format( |
| component_data, template_data, True) |
| else: |
| name = component |
| component_data = {} |
| |
| new_component = self.parser.data.get(component_type, {}).get(name) |
| if new_component: |
| for new_sub_component in new_component[component_list_type]: |
| new_components.extend( |
| self.expandComponent(component_type, |
| new_sub_component, component_data)) |
| else: |
| new_components.append({name: component_data}) |
| return new_components |
| |
| def expandMacros(self, job): |
| for component_type in ['builder', 'publisher', 'wrapper']: |
| component_list_type = component_type + 's' |
| new_components = [] |
| for new_component in job.get(component_list_type, []): |
| new_components.extend(self.expandComponent(component_type, |
| new_component, {})) |
| job[component_list_type] = new_components |