Ansible launcher: several fixes

These were developed together in situ:

* Fix tailing the console log
* Change console log name from console.log to console.txt
  (for better auto content typing)
* Expand JJB macros for builders and publishers
* Use a two-stage SCP copy (worker -> controller; controller -> site);
  a one-stage copy is possible but will require installing a key
  on the remote site
* Substitute parameters (eg $LOG_SERVER) into scp/ftp site paths
* Better worker logging (use the worker name in the logger name)

Change-Id: I98e5603f7a3210c1322640a66ecdeadb24ce74fe
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
index 0e3e066..e0d1d6e 100644
--- a/zuul/ansible/library/zuul_console.py
+++ b/zuul/ansible/library/zuul_console.py
@@ -109,12 +109,20 @@
 
     def followConsole(self, console, conn):
         while True:
-            r = [console.file, conn]
-            e = [console.file, conn]
-            r, w, e = select.select(r, [], e)
+            # As long as we have unread data, keep reading/sending
+            while True:
+                chunk = console.file.read(4096)
+                if chunk:
+                    conn.send(chunk)
+                else:
+                    break
 
-            if console.file in e:
-                return True
+            # At this point, we are waiting for more data to be written
+            time.sleep(0.5)
+
+            # Check to see if the remote end has sent any data, if so,
+            # discard
+            r, w, e = select.select([conn], [], [conn], 0)
             if conn in e:
                 return False
             if conn in r:
@@ -124,19 +132,15 @@
                 if not ret:
                     return False
 
-            if console.file in r:
-                line = console.file.readline()
-                if line:
-                    conn.send(line)
-                time.sleep(0.5)
-                try:
-                    st = os.stat(console.path)
-                    if (st.st_ino != console.stat.st_ino or
-                        st.st_size < console.size):
-                        return True
-                except Exception:
+            # See if the file has been truncated
+            try:
+                st = os.stat(console.path)
+                if (st.st_ino != console.stat.st_ino or
+                    st.st_size < console.size):
                     return True
-                console.size = st.st_size
+            except Exception:
+                return True
+            console.size = st.st_size
 
     def handleOneConnection(self, conn):
         # FIXME: this won't notice disconnects until it tries to send
@@ -166,14 +170,14 @@
 
 
 def test():
-    s = Server('/tmp/console.log', 8088)
+    s = Server('/tmp/console.txt', 8088)
     s.run()
 
 
 def main():
     module = AnsibleModule(
         argument_spec=dict(
-            path=dict(default='/tmp/console.log'),
+            path=dict(default='/tmp/console.txt'),
             port=dict(default=8088, type='int'),
         )
     )
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 7554244..955469f 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -21,7 +21,7 @@
 
 class Console(object):
     def __enter__(self):
-        self.logfile = open('/tmp/console.log', 'w+')
+        self.logfile = open('/tmp/console.txt', 'w+', 0)
         return self
 
     def __exit__(self, etype, value, tb):
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 844b390..75b4911 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -29,6 +29,7 @@
 import gear
 import yaml
 import jenkins_jobs.builder
+import jenkins_jobs.formatter
 import zmq
 
 import zuul.ansible.library
@@ -136,6 +137,7 @@
         builder.parser.expandYaml()
         unseen = set(self.jobs.keys())
         for job in builder.parser.jobs:
+            builder.expandMacros(job)
             self.jobs[job['name']] = job
             unseen.discard(job['name'])
         for name in unseen:
@@ -261,6 +263,10 @@
                 self.log.debug("Got termination event %s" % (item,))
                 if item is None:
                     continue
+                worker = self.node_workers[item]
+                self.log.debug("Joining %s" % (item,))
+                worker.process.join()
+                self.log.debug("Joined %s" % (item,))
                 del self.node_workers[item]
             except Exception:
                 self.log.exception("Exception while processing "
@@ -270,11 +276,10 @@
 
 
 class NodeWorker(object):
-    log = logging.getLogger("zuul.NodeWorker")
-
     def __init__(self, config, jobs, builds, sites, name, host,
                  description, labels, manager_name, zmq_send_queue,
                  termination_queue):
+        self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
         self.log.debug("Creating node worker %s" % (name,))
         self.config = config
         self.jobs = jobs
@@ -325,12 +330,15 @@
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
+        self.log.debug("Registering")
         self.register()
 
         self.gearman_thread = threading.Thread(target=self.runGearman)
         self.gearman_thread.daemon = True
         self.gearman_thread.start()
 
+        self.log.debug("Started")
+
         while self._running or not self.queue.empty():
             try:
                 self._runQueue()
@@ -559,7 +567,12 @@
         return [('node', dict(
             ansible_host=self.host, ansible_user=self.username))]
 
-    def _makeSCPTask(self, publisher):
+    def _substituteVariables(self, text, variables):
+        def lookup(match):
+            return variables.get(match.group(1), '')
+        return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+    def _makeSCPTask(self, jobdir, publisher, parameters):
         tasks = []
         for scpfile in publisher['scp']['files']:
             site = publisher['scp']['site']
@@ -567,35 +580,64 @@
                 raise Exception("Undefined SCP site: %s" % (site,))
             site = self.sites[site]
             if scpfile.get('copy-console'):
-                src = '/tmp/console.log'
+                src = '/tmp/console.txt'
             else:
                 src = scpfile['source']
-            dest = os.path.join(site['root'], scpfile['target'])
+                src = self._substituteVariables(src, parameters)
+                src = os.path.join(parameters['WORKSPACE'], src)
+            scproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
+            os.chmod(scproot, 0o755)
+            syncargs = dict(src=src,
+                            dest=scproot,
+                            mode='pull')
+            task = dict(synchronize=syncargs)
+            if not scpfile.get('copy-after-failure'):
+                task['when'] = 'success'
+            tasks.append(task)
+
+            dest = scpfile['target']
+            dest = self._substituteVariables(dest, parameters)
+            dest = os.path.join(site['root'], dest)
             dest = os.path.normpath(dest)
             if not dest.startswith(site['root']):
                 raise Exception("Target path %s is not below site root" %
                                 (dest,))
-            syncargs = dict(src=src,
-                            dest=dest)
-            task = dict(synchronize=syncargs,
-                        delegate_to=site['host'])
+            local_args = [
+                'command', '/usr/bin/rsync', '--delay-updates', '-F',
+                '--compress', '-rt', '--safe-links', '--rsh',
+                '"/usr/bin/ssh -i {private_key_file} -S none '
+                '-o StrictHostKeyChecking=no"',
+                '--out-format="<<CHANGED>>%i %n%L"',
+                '"{source}/"', '"{user}@{host}:{dest}"'
+            ]
+            local_action = ' '.join(local_args).format(
+                source=scproot,
+                dest=dest,
+                private_key_file=self.private_key_file,
+                host=site['host'],
+                user=site['user'])
+            task = dict(local_action=local_action)
             if not scpfile.get('copy-after-failure'):
                 task['when'] = 'success'
             tasks.append(task)
         return tasks
 
-    def _makeFTPTask(self, jobdir, publisher):
+    def _makeFTPTask(self, jobdir, publisher, parameters):
         tasks = []
         ftp = publisher['ftp']
         site = ftp['site']
         if site not in self.sites:
             raise Exception("Undefined FTP site: %s" % site)
         site = self.sites[site]
+
         ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
         ftpcontent = os.path.join(ftproot, 'content')
         os.makedirs(ftpcontent)
         ftpscript = os.path.join(ftproot, 'script')
-        syncargs = dict(src=ftp['source'],
+        src = ftp['source']
+        src = self._substituteVariables(src, parameters)
+        src = os.path.join(parameters['WORKSPACE'], src)
+        syncargs = dict(src=src,
                         dest=ftpcontent)
         task = dict(synchronize=syncargs,
                     when='success')
@@ -608,6 +650,7 @@
         while ftpsource[-1] == '/':
             ftpsource = ftpsource[:-1]
         ftptarget = ftp['target']
+        ftptarget = self._substituteVariables(ftptarget, parameters)
         ftptarget = os.path.join(site['root'], ftp['target'])
         ftptarget = os.path.normpath(ftptarget)
         if not ftptarget.startswith(site['root']):
@@ -627,7 +670,10 @@
         script_fn = '%s.sh' % str(uuid.uuid4().hex)
         script_path = os.path.join(jobdir.script_root, script_fn)
         with open(script_path, 'w') as script:
-            script.write(builder['shell'])
+            data = builder['shell']
+            if not data.startswith('#!'):
+                data = '#!/bin/bash -x\n %s' % (data,)
+            script.write(data)
 
         remote_path = os.path.join('/tmp', script_fn)
         copy = dict(src=script_path,
@@ -681,10 +727,10 @@
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
 
-            task = dict(file=dict(path='/tmp/console.log', state='absent'))
+            task = dict(file=dict(path='/tmp/console.txt', state='absent'))
             tasks.append(task)
 
-            task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
+            task = dict(zuul_console=dict(path='/tmp/console.txt', port=8088))
             tasks.append(task)
 
             task = dict(file=dict(path=parameters['WORKSPACE'],
@@ -703,9 +749,11 @@
             tasks = []
             for publisher in jjb_job.get('publishers', []):
                 if 'scp' in publisher:
-                    tasks.extend(self._makeSCPTask(publisher))
+                    tasks.extend(self._makeSCPTask(jobdir, publisher,
+                                                   parameters))
                 if 'ftp' in publisher:
-                    tasks.extend(self._makeFTPTask(jobdir, publisher))
+                    tasks.extend(self._makeFTPTask(jobdir, publisher,
+                                                   parameters))
             play = dict(hosts='node', name='Publishers',
                         tasks=tasks)
             playbook.write(yaml.dump([play]))
@@ -747,13 +795,15 @@
     def runAnsiblePostPlaybook(self, jobdir, success):
         proc = subprocess.Popen(
             ['ansible-playbook', jobdir.post_playbook,
-             '-e', 'success=%s' % success],
+             '-e', 'success=%s' % success, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
             preexec_fn=os.setsid,
         )
         (out, err) = proc.communicate()
+        self.log.debug("Ansible post stdout:\n%s" % out)
+        self.log.debug("Ansible post stderr:\n%s" % err)
         return proc.wait() == 0
 
 
@@ -761,3 +811,34 @@
     def __init__(self):
         self.global_config = None
         self._plugins_list = []
+
+    def expandComponent(self, component_type, component, template_data):
+        component_list_type = component_type + 's'
+        new_components = []
+        if isinstance(component, dict):
+            name, component_data = next(iter(component.items()))
+            if template_data:
+                component_data = jenkins_jobs.formatter.deep_format(
+                    component_data, template_data, True)
+        else:
+            name = component
+            component_data = {}
+
+        new_component = self.parser.data[component_type].get(name)
+        if new_component:
+            for new_sub_component in new_component[component_list_type]:
+                new_components.extend(
+                    self.expandComponent(component_type,
+                                         new_sub_component, component_data))
+        else:
+            new_components.append({name: component_data})
+        return new_components
+
+    def expandMacros(self, job):
+        for component_type in ['builder', 'publisher']:
+            component_list_type = component_type + 's'
+            new_components = []
+            for new_component in job.get(component_list_type, []):
+                new_components.extend(self.expandComponent(component_type,
+                                                           new_component, {}))
+            job[component_list_type] = new_components