Ansible launcher: several fixes
These were developed together in situ:
* Fix tailing the console log
* Change console log name from console.log to console.txt
(for better auto content typing)
* Expand JJB macros for builders and publishers
* Use a two-stage SCP copy (worker -> controller; controller -> site);
a one-stage copy is possible but will require installing a key
on the remote site
* Substitute parameters (eg $LOG_SERVER) into scp/ftp site paths
* Better worker logging (use the worker name in the logger name)
Change-Id: I98e5603f7a3210c1322640a66ecdeadb24ce74fe
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
index 0e3e066..e0d1d6e 100644
--- a/zuul/ansible/library/zuul_console.py
+++ b/zuul/ansible/library/zuul_console.py
@@ -109,12 +109,20 @@
def followConsole(self, console, conn):
while True:
- r = [console.file, conn]
- e = [console.file, conn]
- r, w, e = select.select(r, [], e)
+ # As long as we have unread data, keep reading/sending
+ while True:
+ chunk = console.file.read(4096)
+ if chunk:
+ conn.send(chunk)
+ else:
+ break
- if console.file in e:
- return True
+ # At this point, we are waiting for more data to be written
+ time.sleep(0.5)
+
+ # Check to see if the remote end has sent any data, if so,
+ # discard
+ r, w, e = select.select([conn], [], [conn], 0)
if conn in e:
return False
if conn in r:
@@ -124,19 +132,15 @@
if not ret:
return False
- if console.file in r:
- line = console.file.readline()
- if line:
- conn.send(line)
- time.sleep(0.5)
- try:
- st = os.stat(console.path)
- if (st.st_ino != console.stat.st_ino or
- st.st_size < console.size):
- return True
- except Exception:
+ # See if the file has been truncated
+ try:
+ st = os.stat(console.path)
+ if (st.st_ino != console.stat.st_ino or
+ st.st_size < console.size):
return True
- console.size = st.st_size
+ except Exception:
+ return True
+ console.size = st.st_size
def handleOneConnection(self, conn):
# FIXME: this won't notice disconnects until it tries to send
@@ -166,14 +170,14 @@
def test():
- s = Server('/tmp/console.log', 8088)
+ s = Server('/tmp/console.txt', 8088)
s.run()
def main():
module = AnsibleModule(
argument_spec=dict(
- path=dict(default='/tmp/console.log'),
+ path=dict(default='/tmp/console.txt'),
port=dict(default=8088, type='int'),
)
)
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 7554244..955469f 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -21,7 +21,7 @@
class Console(object):
def __enter__(self):
- self.logfile = open('/tmp/console.log', 'w+')
+ self.logfile = open('/tmp/console.txt', 'w+', 0)
return self
def __exit__(self, etype, value, tb):
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 844b390..75b4911 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -29,6 +29,7 @@
import gear
import yaml
import jenkins_jobs.builder
+import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
@@ -136,6 +137,7 @@
builder.parser.expandYaml()
unseen = set(self.jobs.keys())
for job in builder.parser.jobs:
+ builder.expandMacros(job)
self.jobs[job['name']] = job
unseen.discard(job['name'])
for name in unseen:
@@ -261,6 +263,10 @@
self.log.debug("Got termination event %s" % (item,))
if item is None:
continue
+ worker = self.node_workers[item]
+ self.log.debug("Joining %s" % (item,))
+ worker.process.join()
+ self.log.debug("Joined %s" % (item,))
del self.node_workers[item]
except Exception:
self.log.exception("Exception while processing "
@@ -270,11 +276,10 @@
class NodeWorker(object):
- log = logging.getLogger("zuul.NodeWorker")
-
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue):
+ self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
@@ -325,12 +330,15 @@
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
+ self.log.debug("Registering")
self.register()
self.gearman_thread = threading.Thread(target=self.runGearman)
self.gearman_thread.daemon = True
self.gearman_thread.start()
+ self.log.debug("Started")
+
while self._running or not self.queue.empty():
try:
self._runQueue()
@@ -559,7 +567,12 @@
return [('node', dict(
ansible_host=self.host, ansible_user=self.username))]
- def _makeSCPTask(self, publisher):
+ def _substituteVariables(self, text, variables):
+ def lookup(match):
+ return variables.get(match.group(1), '')
+ return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+ def _makeSCPTask(self, jobdir, publisher, parameters):
tasks = []
for scpfile in publisher['scp']['files']:
site = publisher['scp']['site']
@@ -567,35 +580,64 @@
raise Exception("Undefined SCP site: %s" % (site,))
site = self.sites[site]
if scpfile.get('copy-console'):
- src = '/tmp/console.log'
+ src = '/tmp/console.txt'
else:
src = scpfile['source']
- dest = os.path.join(site['root'], scpfile['target'])
+ src = self._substituteVariables(src, parameters)
+ src = os.path.join(parameters['WORKSPACE'], src)
+ scproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
+ os.chmod(scproot, 0o755)
+ syncargs = dict(src=src,
+ dest=scproot,
+ mode='pull')
+ task = dict(synchronize=syncargs)
+ if not scpfile.get('copy-after-failure'):
+ task['when'] = 'success'
+ tasks.append(task)
+
+ dest = scpfile['target']
+ dest = self._substituteVariables(dest, parameters)
+ dest = os.path.join(site['root'], dest)
dest = os.path.normpath(dest)
if not dest.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
(dest,))
- syncargs = dict(src=src,
- dest=dest)
- task = dict(synchronize=syncargs,
- delegate_to=site['host'])
+ local_args = [
+ 'command', '/usr/bin/rsync', '--delay-updates', '-F',
+ '--compress', '-rt', '--safe-links', '--rsh',
+ '"/usr/bin/ssh -i {private_key_file} -S none '
+ '-o StrictHostKeyChecking=no"',
+ '--out-format="<<CHANGED>>%i %n%L"',
+ '"{source}/"', '"{user}@{host}:{dest}"'
+ ]
+ local_action = ' '.join(local_args).format(
+ source=scproot,
+ dest=dest,
+ private_key_file=self.private_key_file,
+ host=site['host'],
+ user=site['user'])
+ task = dict(local_action=local_action)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
tasks.append(task)
return tasks
- def _makeFTPTask(self, jobdir, publisher):
+ def _makeFTPTask(self, jobdir, publisher, parameters):
tasks = []
ftp = publisher['ftp']
site = ftp['site']
if site not in self.sites:
raise Exception("Undefined FTP site: %s" % site)
site = self.sites[site]
+
ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
ftpcontent = os.path.join(ftproot, 'content')
os.makedirs(ftpcontent)
ftpscript = os.path.join(ftproot, 'script')
- syncargs = dict(src=ftp['source'],
+ src = ftp['source']
+ src = self._substituteVariables(src, parameters)
+ src = os.path.join(parameters['WORKSPACE'], src)
+ syncargs = dict(src=src,
dest=ftpcontent)
task = dict(synchronize=syncargs,
when='success')
@@ -608,6 +650,7 @@
while ftpsource[-1] == '/':
ftpsource = ftpsource[:-1]
ftptarget = ftp['target']
+ ftptarget = self._substituteVariables(ftptarget, parameters)
ftptarget = os.path.join(site['root'], ftp['target'])
ftptarget = os.path.normpath(ftptarget)
if not ftptarget.startswith(site['root']):
@@ -627,7 +670,10 @@
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
- script.write(builder['shell'])
+ data = builder['shell']
+ if not data.startswith('#!'):
+ data = '#!/bin/bash -x\n %s' % (data,)
+ script.write(data)
remote_path = os.path.join('/tmp', script_fn)
copy = dict(src=script_path,
@@ -681,10 +727,10 @@
with open(jobdir.playbook, 'w') as playbook:
tasks = []
- task = dict(file=dict(path='/tmp/console.log', state='absent'))
+ task = dict(file=dict(path='/tmp/console.txt', state='absent'))
tasks.append(task)
- task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
+ task = dict(zuul_console=dict(path='/tmp/console.txt', port=8088))
tasks.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
@@ -703,9 +749,11 @@
tasks = []
for publisher in jjb_job.get('publishers', []):
if 'scp' in publisher:
- tasks.extend(self._makeSCPTask(publisher))
+ tasks.extend(self._makeSCPTask(jobdir, publisher,
+ parameters))
if 'ftp' in publisher:
- tasks.extend(self._makeFTPTask(jobdir, publisher))
+ tasks.extend(self._makeFTPTask(jobdir, publisher,
+ parameters))
play = dict(hosts='node', name='Publishers',
tasks=tasks)
playbook.write(yaml.dump([play]))
@@ -747,13 +795,15 @@
def runAnsiblePostPlaybook(self, jobdir, success):
proc = subprocess.Popen(
['ansible-playbook', jobdir.post_playbook,
- '-e', 'success=%s' % success],
+ '-e', 'success=%s' % success, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
(out, err) = proc.communicate()
+ self.log.debug("Ansible post stdout:\n%s" % out)
+ self.log.debug("Ansible post stderr:\n%s" % err)
return proc.wait() == 0
@@ -761,3 +811,34 @@
def __init__(self):
self.global_config = None
self._plugins_list = []
+
+ def expandComponent(self, component_type, component, template_data):
+ component_list_type = component_type + 's'
+ new_components = []
+ if isinstance(component, dict):
+ name, component_data = next(iter(component.items()))
+ if template_data:
+ component_data = jenkins_jobs.formatter.deep_format(
+ component_data, template_data, True)
+ else:
+ name = component
+ component_data = {}
+
+ new_component = self.parser.data[component_type].get(name)
+ if new_component:
+ for new_sub_component in new_component[component_list_type]:
+ new_components.extend(
+ self.expandComponent(component_type,
+ new_sub_component, component_data))
+ else:
+ new_components.append({name: component_data})
+ return new_components
+
+ def expandMacros(self, job):
+ for component_type in ['builder', 'publisher']:
+ component_list_type = component_type + 's'
+ new_components = []
+ for new_component in job.get(component_list_type, []):
+ new_components.extend(self.expandComponent(component_type,
+ new_component, {}))
+ job[component_list_type] = new_components