| # Copyright 2014 OpenStack Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import logging |
| import multiprocessing |
| import os |
| import re |
| import shutil |
| import signal |
| import socket |
| import subprocess |
| import tempfile |
| import threading |
| import traceback |
| import uuid |
| |
| import gear |
| import yaml |
| import jenkins_jobs.builder |
| import zmq |
| |
| import zuul.ansible.library |
| import zuul.ansible.plugins.callback_plugins |
| |
| |
| class JobDir(object): |
| def __init__(self): |
| self.root = tempfile.mkdtemp() |
| self.git_root = os.path.join(self.root, 'git') |
| os.makedirs(self.git_root) |
| self.ansible_root = os.path.join(self.root, 'ansible') |
| os.makedirs(self.ansible_root) |
| self.plugins_root = os.path.join(self.ansible_root, 'plugins') |
| os.makedirs(self.plugins_root) |
| self.inventory = os.path.join(self.ansible_root, 'inventory') |
| self.playbook = os.path.join(self.ansible_root, 'playbook') |
| self.post_playbook = os.path.join(self.ansible_root, 'post_playbook') |
| self.config = os.path.join(self.ansible_root, 'ansible.cfg') |
| self.script_root = os.path.join(self.ansible_root, 'scripts') |
| os.makedirs(self.script_root) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, etype, value, tb): |
| shutil.rmtree(self.root) |
| |
| |
| class LaunchServer(object): |
| log = logging.getLogger("zuul.LaunchServer") |
| section_re = re.compile('site "(.*?)"') |
| |
| def __init__(self, config): |
| self.config = config |
| self.hostname = socket.gethostname() |
| self.node_workers = {} |
| self.mpmanager = multiprocessing.Manager() |
| self.jobs = self.mpmanager.dict() |
| self.builds = self.mpmanager.dict() |
| self.zmq_send_queue = multiprocessing.JoinableQueue() |
| self.termination_queue = multiprocessing.JoinableQueue() |
| self.sites = {} |
| |
| for section in config.sections(): |
| m = self.section_re.match(section) |
| if m: |
| sitename = m.group(1) |
| d = {} |
| d['host'] = config.get(section, 'host') |
| d['user'] = config.get(section, 'user') |
| d['pass'] = config.get(section, 'pass', '') |
| d['root'] = config.get(section, 'root', '/') |
| self.sites[sitename] = d |
| |
| def start(self): |
| self._gearman_running = True |
| self._zmq_running = True |
| self._reaper_running = True |
| |
| # Setup ZMQ |
| self.zcontext = zmq.Context() |
| self.zsocket = self.zcontext.socket(zmq.PUB) |
| self.zsocket.bind("tcp://*:8881") |
| |
| # Setup Gearman |
| server = self.config.get('gearman', 'server') |
| if self.config.has_option('gearman', 'port'): |
| port = self.config.get('gearman', 'port') |
| else: |
| port = 4730 |
| self.worker = gear.Worker('Zuul Launch Server') |
| self.worker.addServer(server, port) |
| self.log.debug("Waiting for server") |
| self.worker.waitForServer() |
| self.log.debug("Registering") |
| self.register() |
| |
| # Load JJB config |
| self.loadJobs() |
| |
| # Start ZMQ worker thread |
| self.log.debug("Starting ZMQ processor") |
| self.zmq_thread = threading.Thread(target=self.runZMQ) |
| self.zmq_thread.daemon = True |
| self.zmq_thread.start() |
| |
| # Start node worker reaper thread |
| self.log.debug("Starting reaper") |
| self.reaper_thread = threading.Thread(target=self.runReaper) |
| self.reaper_thread.daemon = True |
| self.reaper_thread.start() |
| |
| # Start Gearman worker thread |
| self.log.debug("Starting worker") |
| self.gearman_thread = threading.Thread(target=self.run) |
| self.gearman_thread.daemon = True |
| self.gearman_thread.start() |
| |
| def loadJobs(self): |
| self.log.debug("Loading jobs") |
| builder = JJB() |
| path = self.config.get('launcher', 'jenkins_jobs') |
| builder.load_files([path]) |
| builder.parser.expandYaml() |
| unseen = set(self.jobs.keys()) |
| for job in builder.parser.jobs: |
| self.jobs[job['name']] = job |
| unseen.discard(job['name']) |
| for name in unseen: |
| del self.jobs[name] |
| |
| def register(self): |
| self.worker.registerFunction("node-assign:zuul") |
| self.worker.registerFunction("stop:%s" % self.hostname) |
| |
| def reconfigure(self, config): |
| self.log.debug("Reconfiguring") |
| self.config = config |
| self.loadJobs() |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='reconfigure')) |
| except Exception: |
| self.log.exception("Exception sending reconfigure command " |
| "to worker:") |
| |
| def stop(self): |
| self.log.debug("Stopping") |
| self._gearman_running = False |
| self._reaper_running = False |
| self.worker.shutdown() |
| for node in self.node_workers.values(): |
| try: |
| if node.isAlive(): |
| node.stop() |
| except Exception: |
| self.log.exception("Exception sending stop command to worker:") |
| self._zmq_running = False |
| self.zmq_send_queue.put(None) |
| self.zmq_send_queue.join() |
| self.log.debug("Stopped") |
| |
| def join(self): |
| self.gearman_thread.join() |
| |
| def runZMQ(self): |
| while self._zmq_running or not self.zmq_send_queue.empty(): |
| try: |
| item = self.zmq_send_queue.get() |
| self.log.debug("Got ZMQ event %s" % (item,)) |
| if item is None: |
| continue |
| self.zsocket.send(item) |
| except Exception: |
| self.log.exception("Exception while processing ZMQ events") |
| finally: |
| self.zmq_send_queue.task_done() |
| |
| def run(self): |
| while self._gearman_running: |
| try: |
| job = self.worker.getJob() |
| try: |
| if job.name.startswith('node-assign:'): |
| self.log.debug("Got node-assign job: %s" % job.unique) |
| self.assignNode(job) |
| elif job.name.startswith('stop:'): |
| self.log.debug("Got stop job: %s" % job.unique) |
| self.stopJob(job) |
| else: |
| self.log.error("Unable to handle job %s" % job.name) |
| job.sendWorkFail() |
| except Exception: |
| self.log.exception("Exception while running job") |
| job.sendWorkException(traceback.format_exc()) |
| except gear.InterruptedError: |
| return |
| except Exception: |
| self.log.exception("Exception while getting job") |
| |
| def assignNode(self, job): |
| args = json.loads(job.arguments) |
| self.log.debug("Assigned node with arguments: %s" % (args,)) |
| worker = NodeWorker(self.config, self.jobs, self.builds, |
| self.sites, args['name'], args['host'], |
| args['description'], args['labels'], |
| self.hostname, self.zmq_send_queue, |
| self.termination_queue) |
| self.node_workers[worker.name] = worker |
| |
| worker.process = multiprocessing.Process(target=worker.run) |
| worker.process.start() |
| |
| data = dict(manager=self.hostname) |
| job.sendWorkData(json.dumps(data)) |
| job.sendWorkComplete() |
| |
| def stopJob(self, job): |
| try: |
| args = json.loads(job.arguments) |
| self.log.debug("Stop job with arguments: %s" % (args,)) |
| unique = args['number'] |
| build_worker_name = self.builds.get(unique) |
| if not build_worker_name: |
| self.log.debug("Unable to find build for job %s" % (unique,)) |
| return |
| node = self.node_workers.get(build_worker_name) |
| if not node: |
| self.log.debug("Unable to find worker for job %s" % (unique,)) |
| return |
| try: |
| if node.isAlive(): |
| node.queue.put(dict(action='abort')) |
| else: |
| self.log.debug("Node %s is not alive while aborting job" % |
| (node.name,)) |
| except Exception: |
| self.log.exception("Exception sending abort command " |
| "to worker:") |
| finally: |
| job.sendWorkComplete() |
| |
| def runReaper(self): |
| # We don't actually care if all the events are processed |
| while self._reaper_running: |
| try: |
| item = self.termination_queue.get() |
| self.log.debug("Got termination event %s" % (item,)) |
| if item is None: |
| continue |
| del self.node_workers[item] |
| except Exception: |
| self.log.exception("Exception while processing " |
| "termination events:") |
| finally: |
| self.termination_queue.task_done() |
| |
| |
| class NodeWorker(object): |
| log = logging.getLogger("zuul.NodeWorker") |
| |
| def __init__(self, config, jobs, builds, sites, name, host, |
| description, labels, manager_name, zmq_send_queue, |
| termination_queue): |
| self.log.debug("Creating node worker %s" % (name,)) |
| self.config = config |
| self.jobs = jobs |
| self.builds = builds |
| self.sites = sites |
| self.name = name |
| self.host = host |
| self.description = description |
| if not isinstance(labels, list): |
| labels = [labels] |
| self.labels = labels |
| self.process = None |
| self.registered_functions = set() |
| self._running = True |
| self.queue = multiprocessing.JoinableQueue() |
| self.manager_name = manager_name |
| self.zmq_send_queue = zmq_send_queue |
| self.termination_queue = termination_queue |
| self.running_job_lock = threading.Lock() |
| self._job_complete_event = threading.Event() |
| self._running_job = False |
| self._sent_complete_event = False |
| self.workspace_root = config.get('launcher', 'workspace_root') |
| if self.config.has_option('launcher', 'private_key_file'): |
| self.private_key_file = config.get('launcher', 'private_key_file') |
| else: |
| self.private_key_file = '~/.ssh/id_rsa' |
| if self.config.has_option('launcher', 'username'): |
| self.username = config.get('launcher', 'username') |
| else: |
| self.username = 'zuul' |
| |
| def isAlive(self): |
| # Meant to be called from the manager |
| if self.process and self.process.is_alive(): |
| return True |
| return False |
| |
| def run(self): |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| self.log.debug("Node worker %s starting" % (self.name,)) |
| server = self.config.get('gearman', 'server') |
| if self.config.has_option('gearman', 'port'): |
| port = self.config.get('gearman', 'port') |
| else: |
| port = 4730 |
| self.worker = gear.Worker(self.name) |
| self.worker.addServer(server, port) |
| self.log.debug("Waiting for server") |
| self.worker.waitForServer() |
| self.register() |
| |
| self.gearman_thread = threading.Thread(target=self.runGearman) |
| self.gearman_thread.daemon = True |
| self.gearman_thread.start() |
| |
| while self._running or not self.queue.empty(): |
| try: |
| self._runQueue() |
| except Exception: |
| self.log.exception("Exception in queue manager:") |
| |
| def stop(self): |
| # If this is called locally, setting _running will be |
| # effictive, if it's called remotely, it will not be, but it |
| # will be set by the queue thread. |
| self.log.debug("Submitting stop request") |
| self._running = False |
| self.queue.put(dict(action='stop')) |
| self.queue.join() |
| |
| def _runQueue(self): |
| item = self.queue.get() |
| try: |
| if item['action'] == 'stop': |
| self.log.debug("Received stop request") |
| self._running = False |
| self.termination_queue.put(self.name) |
| if not self.abortRunningJob(): |
| self.sendFakeCompleteEvent() |
| else: |
| self._job_complete_event.wait() |
| self.worker.shutdown() |
| elif item['action'] == 'reconfigure': |
| self.log.debug("Received reconfigure request") |
| self.register() |
| elif item['action'] == 'abort': |
| self.log.debug("Received abort request") |
| self.abortRunningJob() |
| finally: |
| self.queue.task_done() |
| |
| def runGearman(self): |
| while self._running: |
| try: |
| self._runGearman() |
| except Exception: |
| self.log.exception("Exception in gearman manager:") |
| |
| def _runGearman(self): |
| try: |
| job = self.worker.getJob() |
| except gear.InterruptedError: |
| return |
| self.log.debug("Node worker %s got job %s" % (self.name, job.name)) |
| try: |
| if job.name not in self.registered_functions: |
| self.log.error("Unable to handle job %s" % job.name) |
| job.sendWorkFail() |
| return |
| self.launch(job) |
| except Exception: |
| self.log.exception("Exception while running job") |
| job.sendWorkException(traceback.format_exc()) |
| |
| def generateFunctionNames(self, job): |
| # This only supports "node: foo" and "node: foo || bar" |
| ret = set() |
| job_labels = job.get('node') |
| matching_labels = set() |
| if job_labels: |
| job_labels = [x.strip() for x in job_labels.split('||')] |
| matching_labels = set(self.labels) & set(job_labels) |
| if not matching_labels: |
| return ret |
| ret.add('build:%s' % (job['name'],)) |
| for label in matching_labels: |
| ret.add('build:%s:%s' % (job['name'], label)) |
| return ret |
| |
| def register(self): |
| if self._running_job: |
| return |
| new_functions = set() |
| for job in self.jobs.values(): |
| new_functions |= self.generateFunctionNames(job) |
| for function in new_functions - self.registered_functions: |
| self.worker.registerFunction(function) |
| for function in self.registered_functions - new_functions: |
| self.worker.unRegisterFunction(function) |
| self.registered_functions = new_functions |
| |
| def abortRunningJob(self): |
| aborted = False |
| self.log.debug("Abort: acquiring job lock") |
| with self.running_job_lock: |
| if self._running_job: |
| self.log.debug("Abort: a job is running") |
| proc = self.ansible_proc |
| if proc: |
| self.log.debug("Abort: sending kill signal to job " |
| "process group") |
| try: |
| pgid = os.getpgid(proc.pid) |
| os.killpg(pgid, signal.SIGKILL) |
| aborted = True |
| except Exception: |
| self.log.exception("Exception while killing " |
| "ansible process:") |
| else: |
| self.log.debug("Abort: no job is running") |
| |
| return aborted |
| |
| def launch(self, job): |
| self.log.info("Node worker %s launching job %s" % |
| (self.name, job.name)) |
| |
| # Make sure we can parse what we need from the job first |
| args = json.loads(job.arguments) |
| # This may be configurable later, or we may choose to honor |
| # OFFLINE_NODE_WHEN_COMPLETE |
| offline = True |
| job_name = job.name.split(':')[1] |
| |
| # Initialize the result so we have something regardless of |
| # whether the job actually runs |
| result = None |
| self._sent_complete_event = False |
| |
| try: |
| self.sendStartEvent(job_name, args) |
| except Exception: |
| self.log.exception("Exception while sending job start event") |
| |
| try: |
| result = self.runJob(job, args) |
| except Exception: |
| self.log.exception("Exception while launching job thread") |
| |
| self._running_job = False |
| if not result: |
| result = b'' |
| |
| try: |
| job.sendWorkComplete(result) |
| except Exception: |
| self.log.exception("Exception while sending job completion packet") |
| |
| try: |
| self.sendCompleteEvent(job_name, result, args) |
| except Exception: |
| self.log.exception("Exception while sending job completion event") |
| |
| try: |
| del self.builds[job.unique] |
| except Exception: |
| self.log.exception("Exception while clearing build record") |
| |
| self._job_complete_event.set() |
| if offline and self._running: |
| self.stop() |
| |
| def sendStartEvent(self, name, parameters): |
| build = dict(node_name=self.name, |
| host_name=self.manager_name, |
| parameters=parameters) |
| |
| event = dict(name=name, |
| build=build) |
| |
| item = "onStarted %s" % json.dumps(event) |
| self.log.debug("Sending over ZMQ: %s" % (item,)) |
| self.zmq_send_queue.put(item) |
| |
| def sendCompleteEvent(self, name, status, parameters): |
| build = dict(status=status, |
| node_name=self.name, |
| host_name=self.manager_name, |
| parameters=parameters) |
| |
| event = dict(name=name, |
| build=build) |
| |
| item = "onFinalized %s" % json.dumps(event) |
| self.log.debug("Sending over ZMQ: %s" % (item,)) |
| self.zmq_send_queue.put(item) |
| self._sent_complete_event = True |
| |
| def sendFakeCompleteEvent(self): |
| if self._sent_complete_event: |
| return |
| self.sendCompleteEvent('zuul:launcher-shutdown', |
| 'SUCCESS', {}) |
| |
| def runJob(self, job, args): |
| self.ansible_proc = None |
| result = None |
| with self.running_job_lock: |
| if not self._running: |
| return result |
| self._running_job = True |
| self._job_complete_event.clear() |
| |
| self.log.debug("Job %s: beginning" % (job.unique,)) |
| self.builds[job.unique] = self.name |
| with JobDir() as jobdir: |
| self.log.debug("Job %s: job root at %s" % |
| (job.unique, jobdir.root)) |
| timeout = self.prepareAnsibleFiles(jobdir, job, args) |
| |
| data = { |
| 'manager': self.manager_name, |
| 'number': job.unique, |
| 'url': 'telnet://%s:8088' % self.host, |
| } |
| job.sendWorkData(json.dumps(data)) |
| job.sendWorkStatus(0, 100) |
| |
| job_status = self.runAnsiblePlaybook(jobdir, timeout) |
| post_status = self.runAnsiblePostPlaybook(jobdir, job_status) |
| if job_status and post_status: |
| status = 'SUCCESS' |
| else: |
| status = 'FAILURE' |
| |
| result = json.dumps(dict(result=status)) |
| |
| return result |
| |
| def getHostList(self): |
| return [('node', dict( |
| ansible_host=self.host, ansible_user=self.username))] |
| |
| def _makeSCPTask(self, publisher): |
| tasks = [] |
| for scpfile in publisher['scp']['files']: |
| site = publisher['scp']['site'] |
| if site not in self.sites: |
| raise Exception("Undefined SCP site: %s" % (site,)) |
| site = self.sites[site] |
| if scpfile.get('copy-console'): |
| src = '/tmp/console.log' |
| else: |
| src = scpfile['source'] |
| dest = os.path.join(site['root'], scpfile['target']) |
| dest = os.path.normpath(dest) |
| if not dest.startswith(site['root']): |
| raise Exception("Target path %s is not below site root" % |
| (dest,)) |
| syncargs = dict(src=src, |
| dest=dest) |
| task = dict(synchronize=syncargs, |
| delegate_to=site['host']) |
| if not scpfile.get('copy-after-failure'): |
| task['when'] = 'success' |
| tasks.append(task) |
| return tasks |
| |
| def _makeFTPTask(self, jobdir, publisher): |
| tasks = [] |
| ftp = publisher['ftp'] |
| site = ftp['site'] |
| if site not in self.sites: |
| raise Exception("Undefined FTP site: %s" % site) |
| site = self.sites[site] |
| ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root) |
| ftpcontent = os.path.join(ftproot, 'content') |
| os.makedirs(ftpcontent) |
| ftpscript = os.path.join(ftproot, 'script') |
| syncargs = dict(src=ftp['source'], |
| dest=ftpcontent) |
| task = dict(synchronize=syncargs, |
| when='success') |
| tasks.append(task) |
| task = dict(shell='lftp -f %s' % ftpscript, |
| when='success') |
| ftpsource = ftpcontent |
| if ftp.get('remove-prefix'): |
| ftpsource = os.path.join(ftpcontent, ftp['remove-prefix']) |
| while ftpsource[-1] == '/': |
| ftpsource = ftpsource[:-1] |
| ftptarget = ftp['target'] |
| ftptarget = os.path.join(site['root'], ftp['target']) |
| ftptarget = os.path.normpath(ftptarget) |
| if not ftptarget.startswith(site['root']): |
| raise Exception("Target path %s is not below site root" % |
| (ftptarget,)) |
| while ftptarget[-1] == '/': |
| ftptarget = ftptarget[:-1] |
| with open(ftpscript, 'w') as script: |
| script.write('open %s\n' % site['host']) |
| script.write('user %s %s\n' % (site['user'], site['pass'])) |
| script.write('mirror -R %s %s\n' % (ftpsource, ftptarget)) |
| tasks.append(task) |
| return tasks |
| |
| def _makeBuilderTask(self, jobdir, builder, parameters, timeout): |
| tasks = [] |
| script_fn = '%s.sh' % str(uuid.uuid4().hex) |
| script_path = os.path.join(jobdir.script_root, script_fn) |
| with open(script_path, 'w') as script: |
| script.write(builder['shell']) |
| |
| remote_path = os.path.join('/tmp', script_fn) |
| copy = dict(src=script_path, |
| dest=remote_path, |
| mode=0555) |
| task = dict(copy=copy) |
| tasks.append(task) |
| |
| runner = dict(command=remote_path, |
| cwd=parameters['WORKSPACE'], |
| parameters=parameters) |
| task = dict(zuul_runner=runner) |
| if timeout: |
| task['when'] = '{{ timeout | int > 0 }}' |
| task['async'] = '{{ timeout }}' |
| else: |
| task['async'] = 2 * 60 * 60 # 2 hour default timeout |
| task['poll'] = 5 |
| tasks.append(task) |
| |
| filetask = dict(path=remote_path, |
| state='absent') |
| task = dict(file=filetask) |
| tasks.append(task) |
| |
| return tasks |
| |
| def prepareAnsibleFiles(self, jobdir, gearman_job, args): |
| job_name = gearman_job.name.split(':')[1] |
| jjb_job = self.jobs[job_name] |
| |
| parameters = args.copy() |
| parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name) |
| |
| with open(jobdir.inventory, 'w') as inventory: |
| for host_name, host_vars in self.getHostList(): |
| inventory.write(host_name) |
| for k, v in host_vars.items(): |
| inventory.write(' %s=%s' % (k, v)) |
| inventory.write('\n') |
| |
| timeout = None |
| for wrapper in jjb_job.get('wrappers', []): |
| if isinstance(wrapper, dict): |
| timeout = wrapper.get('build-timeout', {}) |
| if isinstance(timeout, dict): |
| timeout = timeout.get('timeout') |
| if timeout: |
| timeout = timeout * 60 |
| |
| with open(jobdir.playbook, 'w') as playbook: |
| tasks = [] |
| |
| task = dict(file=dict(path='/tmp/console.log', state='absent')) |
| tasks.append(task) |
| |
| task = dict(zuul_console=dict(path='/tmp/console.log', port=8088)) |
| tasks.append(task) |
| |
| task = dict(file=dict(path=parameters['WORKSPACE'], |
| state='directory')) |
| tasks.append(task) |
| |
| for builder in jjb_job.get('builders', []): |
| if 'shell' in builder: |
| tasks.extend(self._makeBuilderTask(jobdir, builder, |
| parameters, timeout)) |
| play = dict(hosts='node', name='Job body', |
| tasks=tasks) |
| playbook.write(yaml.dump([play])) |
| |
| with open(jobdir.post_playbook, 'w') as playbook: |
| tasks = [] |
| for publisher in jjb_job.get('publishers', []): |
| if 'scp' in publisher: |
| tasks.extend(self._makeSCPTask(publisher)) |
| if 'ftp' in publisher: |
| tasks.extend(self._makeFTPTask(jobdir, publisher)) |
| play = dict(hosts='node', name='Publishers', |
| tasks=tasks) |
| playbook.write(yaml.dump([play])) |
| |
| with open(jobdir.config, 'w') as config: |
| config.write('[defaults]\n') |
| config.write('hostfile = %s\n' % jobdir.inventory) |
| config.write('host_key_checking = False\n') |
| config.write('private_key_file = %s\n' % self.private_key_file) |
| |
| callback_path = zuul.ansible.plugins.callback_plugins.__file__ |
| callback_path = os.path.abspath(callback_path) |
| callback_path = os.path.dirname(callback_path) |
| config.write('callback_plugins = %s\n' % callback_path) |
| |
| library_path = zuul.ansible.library.__file__ |
| library_path = os.path.abspath(library_path) |
| library_path = os.path.dirname(library_path) |
| config.write('library = %s\n' % library_path) |
| |
| return timeout |
| |
| def runAnsiblePlaybook(self, jobdir, timeout): |
| self.ansible_proc = subprocess.Popen( |
| ['ansible-playbook', jobdir.playbook, |
| '-e', 'timeout=%s' % timeout, '-v'], |
| cwd=jobdir.ansible_root, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| preexec_fn=os.setsid, |
| ) |
| (out, err) = self.ansible_proc.communicate() |
| self.log.debug("Ansible stdout:\n%s" % out) |
| self.log.debug("Ansible stderr:\n%s" % err) |
| ret = self.ansible_proc.wait() |
| self.ansible_proc = None |
| return ret == 0 |
| |
| def runAnsiblePostPlaybook(self, jobdir, success): |
| proc = subprocess.Popen( |
| ['ansible-playbook', jobdir.post_playbook, |
| '-e', 'success=%s' % success], |
| cwd=jobdir.ansible_root, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| preexec_fn=os.setsid, |
| ) |
| (out, err) = proc.communicate() |
| return proc.wait() == 0 |
| |
| |
| class JJB(jenkins_jobs.builder.Builder): |
| def __init__(self): |
| self.global_config = None |
| self._plugins_list = [] |