Merge "Add support to list running jobs to zuul client"
diff --git a/requirements.txt b/requirements.txt
index bb48290..1c5587b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -16,3 +16,5 @@
 apscheduler>=2.1.1,<3.0
 python-swiftclient>=1.6
 python-keystoneclient>=0.4.2
+PrettyTable>=0.6,<0.8
+babel
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a99a946..7cfea1c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3897,3 +3897,62 @@
         self.worker.hold_jobs_in_build = False
         self.worker.release()
         self.waitUntilSettled()
+
+    def test_client_get_running_jobs(self):
+        "Test that the RPC client can get a list of running jobs"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+
+        # Wait for gearman server to send the initial workData back to zuul
+        start = time.time()
+        while True:
+            if time.time() - start > 10:
+                raise Exception("Timeout waiting for gearman server to report "
+                                + "back to the client")
+            build = self.launcher.builds.values()[0]
+            if build.worker.name == "My Worker":
+                break
+            else:
+                time.sleep(0)
+
+        running_items = client.get_running_jobs()
+
+        self.assertEqual(1, len(running_items))
+        running_item = running_items[0]
+        self.assertEqual([], running_item['failing_reasons'])
+        self.assertEqual([], running_item['items_behind'])
+        self.assertEqual('https://hostname/1', running_item['url'])
+        self.assertEqual(None, running_item['item_ahead'])
+        self.assertEqual('org/project', running_item['project'])
+        self.assertEqual(None, running_item['remaining_time'])
+        self.assertEqual(True, running_item['active'])
+        self.assertEqual('1,1', running_item['id'])
+
+        self.assertEqual(3, len(running_item['jobs']))
+        for job in running_item['jobs']:
+            if job['name'] == 'project-merge':
+                self.assertEqual('project-merge', job['name'])
+                self.assertEqual('gate', job['pipeline'])
+                self.assertEqual(False, job['retry'])
+                self.assertEqual(13, len(job['parameters']))
+                self.assertEqual('https://server/job/project-merge/0/',
+                                 job['url'])
+                self.assertEqual(7, len(job['worker']))
+                self.assertEqual(False, job['canceled'])
+                self.assertEqual(True, job['voting'])
+                self.assertEqual(None, job['result'])
+                self.assertEqual('gate', job['pipeline'])
+                break
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+        running_items = client.get_running_jobs()
+        self.assertEqual(0, len(running_items))
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index a334bff..147fade 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -15,11 +15,15 @@
 # under the License.
 
 import argparse
+import babel.dates
 import ConfigParser
+import datetime
 import logging
 import logging.config
 import os
+import prettytable
 import sys
+import time
 
 import zuul.rpcclient
 
@@ -66,6 +70,23 @@
                                  required=True, nargs='+')
         cmd_promote.set_defaults(func=self.promote)
 
+        cmd_show = subparsers.add_parser('show',
+                                         help='valid show subcommands')
+        show_subparsers = cmd_show.add_subparsers(title='show')
+        show_running_jobs = show_subparsers.add_parser(
+            'running-jobs',
+            help='show the running jobs'
+        )
+        show_running_jobs.add_argument(
+            '--columns',
+            help="comma separated list of columns to display (or 'ALL')",
+            choices=self._show_running_jobs_columns().keys().append('ALL'),
+            default='name, worker.name, start_time, result'
+        )
+
+        # TODO: add filters such as queue, project, changeid etc
+        show_running_jobs.set_defaults(func=self.show_running_jobs)
+
         self.args = parser.parse_args()
 
     def _get_version(self):
@@ -119,6 +140,147 @@
                            change_ids=self.args.changes)
         return r
 
+    def show_running_jobs(self):
+        client = zuul.rpcclient.RPCClient(self.server, self.port)
+        running_items = client.get_running_jobs()
+
+        if len(running_items) == 0:
+            print "No jobs currently running"
+            return True
+
+        all_fields = self._show_running_jobs_columns()
+        if self.args.columns.upper() == 'ALL':
+            fields = all_fields.keys()
+        else:
+            fields = [f.strip().lower() for f in self.args.columns.split(',')
+                      if f.strip().lower() in all_fields.keys()]
+
+        table = prettytable.PrettyTable(
+            field_names=[all_fields[f]['title'] for f in fields])
+        for item in running_items:
+            for job in item['jobs']:
+                values = []
+                for f in fields:
+                    v = job
+                    for part in f.split('.'):
+                        if hasattr(v, 'get'):
+                            v = v.get(part, '')
+                    if ('transform' in all_fields[f]
+                        and callable(all_fields[f]['transform'])):
+                        v = all_fields[f]['transform'](v)
+                    if 'append' in all_fields[f]:
+                        v += all_fields[f]['append']
+                    values.append(v)
+                table.add_row(values)
+        print table
+        return True
+
+    def _epoch_to_relative_time(self, epoch):
+        if epoch:
+            delta = datetime.timedelta(seconds=(time.time() - int(epoch)))
+            return babel.dates.format_timedelta(delta, locale='en_US')
+        else:
+            return "Unknown"
+
+    def _boolean_to_yes_no(self, value):
+        return 'Yes' if value else 'No'
+
+    def _boolean_to_pass_fail(self, value):
+        return 'Pass' if value else 'Fail'
+
+    def _format_list(self, l):
+        return ', '.join(l) if isinstance(l, list) else ''
+
+    def _show_running_jobs_columns(self):
+        """A helper function to get the list of available columns for
+        `zuul show running-jobs`. Also describes how to convert particular
+        values (for example epoch to time string)"""
+
+        return {
+            'name': {
+                'title': 'Job Name',
+            },
+            'elapsed_time': {
+                'title': 'Elapsed Time',
+                'transform': self._epoch_to_relative_time
+            },
+            'remaining_time': {
+                'title': 'Remaining Time',
+                'transform': self._epoch_to_relative_time
+            },
+            'url': {
+                'title': 'URL'
+            },
+            'result': {
+                'title': 'Result'
+            },
+            'voting': {
+                'title': 'Voting',
+                'transform': self._boolean_to_yes_no
+            },
+            'uuid': {
+                'title': 'UUID'
+            },
+            'launch_time': {
+                'title': 'Launch Time',
+                'transform': self._epoch_to_relative_time,
+                'append': ' ago'
+            },
+            'start_time': {
+                'title': 'Start Time',
+                'transform': self._epoch_to_relative_time,
+                'append': ' ago'
+            },
+            'end_time': {
+                'title': 'End Time',
+                'transform': self._epoch_to_relative_time,
+                'append': ' ago'
+            },
+            'estimated_time': {
+                'title': 'Estimated Time',
+                'transform': self._epoch_to_relative_time,
+                'append': ' to go'
+            },
+            'pipeline': {
+                'title': 'Pipeline'
+            },
+            'canceled': {
+                'title': 'Canceled',
+                'transform': self._boolean_to_yes_no
+            },
+            'retry': {
+                'title': 'Retry'
+            },
+            'number': {
+                'title': 'Number'
+            },
+            'parameters': {
+                'title': 'Parameters'
+            },
+            'worker.name': {
+                'title': 'Worker'
+            },
+            'worker.hostname': {
+                'title': 'Worker Hostname'
+            },
+            'worker.ips': {
+                'title': 'Worker IPs',
+                'transform': self._format_list
+            },
+            'worker.fqdn': {
+                'title': 'Worker Domain'
+            },
+            'worker.progam': {
+                'title': 'Worker Program'
+            },
+            'worker.version': {
+                'title': 'Worker Version'
+            },
+            'worker.extra': {
+                'title': 'Worker Extra'
+            },
+        }
+
 
 def main():
     client = Client()
diff --git a/zuul/model.py b/zuul/model.py
index 7f496db..2f4110f 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -269,7 +269,7 @@
                     if j_changes:
                         j_queue['heads'].append(j_changes)
                     j_changes = []
-                j_changes.append(self.formatItemJSON(e))
+                j_changes.append(e.formatJSON())
                 if (len(j_changes) > 1 and
                     (j_changes[-2]['remaining_time'] is not None) and
                     (j_changes[-1]['remaining_time'] is not None)):
@@ -280,101 +280,6 @@
                 j_queue['heads'].append(j_changes)
         return j_pipeline
 
-    def formatStatus(self, item, indent=0, html=False):
-        changeish = item.change
-        indent_str = ' ' * indent
-        ret = ''
-        if html and hasattr(changeish, 'url') and changeish.url is not None:
-            ret += '%sProject %s change <a href="%s">%s</a>\n' % (
-                indent_str,
-                changeish.project.name,
-                changeish.url,
-                changeish._id())
-        else:
-            ret += '%sProject %s change %s based on %s\n' % (
-                indent_str,
-                changeish.project.name,
-                changeish._id(),
-                item.item_ahead)
-        for job in self.getJobs(changeish):
-            build = item.current_build_set.getBuild(job.name)
-            if build:
-                result = build.result
-            else:
-                result = None
-            job_name = job.name
-            if not job.voting:
-                voting = ' (non-voting)'
-            else:
-                voting = ''
-            if html:
-                if build:
-                    url = build.url
-                else:
-                    url = None
-                if url is not None:
-                    job_name = '<a href="%s">%s</a>' % (url, job_name)
-            ret += '%s  %s: %s%s' % (indent_str, job_name, result, voting)
-            ret += '\n'
-        return ret
-
-    def formatItemJSON(self, item):
-        changeish = item.change
-        ret = {}
-        ret['active'] = item.active
-        if hasattr(changeish, 'url') and changeish.url is not None:
-            ret['url'] = changeish.url
-        else:
-            ret['url'] = None
-        ret['id'] = changeish._id()
-        if item.item_ahead:
-            ret['item_ahead'] = item.item_ahead.change._id()
-        else:
-            ret['item_ahead'] = None
-        ret['items_behind'] = [i.change._id() for i in item.items_behind]
-        ret['failing_reasons'] = item.current_build_set.failing_reasons
-        ret['zuul_ref'] = item.current_build_set.ref
-        ret['project'] = changeish.project.name
-        ret['enqueue_time'] = int(item.enqueue_time * 1000)
-        ret['jobs'] = []
-        max_remaining = 0
-        for job in self.getJobs(changeish):
-            now = time.time()
-            build = item.current_build_set.getBuild(job.name)
-            elapsed = None
-            remaining = None
-            result = None
-            url = None
-            if build:
-                result = build.result
-                url = build.url
-                if build.start_time:
-                    if build.end_time:
-                        elapsed = int((build.end_time -
-                                       build.start_time) * 1000)
-                        remaining = 0
-                    else:
-                        elapsed = int((now - build.start_time) * 1000)
-                        if build.estimated_time:
-                            remaining = max(
-                                int(build.estimated_time * 1000) - elapsed,
-                                0)
-            if remaining and remaining > max_remaining:
-                max_remaining = remaining
-            ret['jobs'].append(
-                dict(
-                    name=job.name,
-                    elapsed_time=elapsed,
-                    remaining_time=remaining,
-                    url=url,
-                    result=result,
-                    voting=job.voting))
-        if self.haveAllJobsStarted(item):
-            ret['remaining_time'] = max_remaining
-        else:
-            ret['remaining_time'] = None
-        return ret
-
 
 class ActionReporter(object):
     """An ActionReporter has a reporter and its configured paramaters"""
@@ -760,6 +665,124 @@
     def setReportedResult(self, result):
         self.current_build_set.result = result
 
+    def formatJSON(self):
+        changeish = self.change
+        ret = {}
+        ret['active'] = self.active
+        if hasattr(changeish, 'url') and changeish.url is not None:
+            ret['url'] = changeish.url
+        else:
+            ret['url'] = None
+        ret['id'] = changeish._id()
+        if self.item_ahead:
+            ret['item_ahead'] = self.item_ahead.change._id()
+        else:
+            ret['item_ahead'] = None
+        ret['items_behind'] = [i.change._id() for i in self.items_behind]
+        ret['failing_reasons'] = self.current_build_set.failing_reasons
+        ret['zuul_ref'] = self.current_build_set.ref
+        ret['project'] = changeish.project.name
+        ret['enqueue_time'] = int(self.enqueue_time * 1000)
+        ret['jobs'] = []
+        max_remaining = 0
+        for job in self.pipeline.getJobs(changeish):
+            now = time.time()
+            build = self.current_build_set.getBuild(job.name)
+            elapsed = None
+            remaining = None
+            result = None
+            url = None
+            worker = None
+            if build:
+                result = build.result
+                url = build.url
+                if build.start_time:
+                    if build.end_time:
+                        elapsed = int((build.end_time -
+                                       build.start_time) * 1000)
+                        remaining = 0
+                    else:
+                        elapsed = int((now - build.start_time) * 1000)
+                        if build.estimated_time:
+                            remaining = max(
+                                int(build.estimated_time * 1000) - elapsed,
+                                0)
+                worker = {
+                    'name': build.worker.name,
+                    'hostname': build.worker.hostname,
+                    'ips': build.worker.ips,
+                    'fqdn': build.worker.fqdn,
+                    'program': build.worker.program,
+                    'version': build.worker.version,
+                    'extra': build.worker.extra
+                }
+            if remaining and remaining > max_remaining:
+                max_remaining = remaining
+
+            ret['jobs'].append({
+                'name': job.name,
+                'elapsed_time': elapsed,
+                'remaining_time': remaining,
+                'url': url,
+                'result': result,
+                'voting': job.voting,
+                'uuid': build.uuid if build else None,
+                'launch_time': build.launch_time if build else None,
+                'start_time': build.start_time if build else None,
+                'end_time': build.end_time if build else None,
+                'estimated_time': build.estimated_time if build else None,
+                'pipeline': build.pipeline.name if build else None,
+                'canceled': build.canceled if build else None,
+                'retry': build.retry if build else None,
+                'number': build.number if build else None,
+                'parameters': build.parameters if build else None,
+                'worker': worker
+            })
+
+        if self.pipeline.haveAllJobsStarted(self):
+            ret['remaining_time'] = max_remaining
+        else:
+            ret['remaining_time'] = None
+        return ret
+
+    def formatStatus(self, indent=0, html=False):
+        changeish = self.change
+        indent_str = ' ' * indent
+        ret = ''
+        if html and hasattr(changeish, 'url') and changeish.url is not None:
+            ret += '%sProject %s change <a href="%s">%s</a>\n' % (
+                indent_str,
+                changeish.project.name,
+                changeish.url,
+                changeish._id())
+        else:
+            ret += '%sProject %s change %s based on %s\n' % (
+                indent_str,
+                changeish.project.name,
+                changeish._id(),
+                self.item_ahead)
+        for job in self.pipeline.getJobs(changeish):
+            build = self.current_build_set.getBuild(job.name)
+            if build:
+                result = build.result
+            else:
+                result = None
+            job_name = job.name
+            if not job.voting:
+                voting = ' (non-voting)'
+            else:
+                voting = ''
+            if html:
+                if build:
+                    url = build.url
+                else:
+                    url = None
+                if url is not None:
+                    job_name = '<a href="%s">%s</a>' % (url, job_name)
+            ret += '%s  %s: %s%s' % (indent_str, job_name, result, voting)
+            ret += '\n'
+        return ret
+
 
 class Changeish(object):
     """Something like a change; either a change or a ref"""
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 69390c0..7f572be 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -46,7 +46,7 @@
         if job.exception:
             raise RPCFailure(job.exception)
         self.log.debug("Job complete, success: %s" % (not job.failure))
-        return (not job.failure)
+        return job
 
     def enqueue(self, pipeline, project, trigger, change):
         data = {'pipeline': pipeline,
@@ -54,13 +54,21 @@
                 'trigger': trigger,
                 'change': change,
                 }
-        return self.submitJob('zuul:enqueue', data)
+        return not self.submitJob('zuul:enqueue', data).failure
 
     def promote(self, pipeline, change_ids):
         data = {'pipeline': pipeline,
                 'change_ids': change_ids,
                 }
-        return self.submitJob('zuul:promote', data)
+        return not self.submitJob('zuul:promote', data).failure
+
+    def get_running_jobs(self):
+        data = {}
+        job = self.submitJob('zuul:get_running_jobs', data)
+        if job.failure:
+            return False
+        else:
+            return json.loads(job.data[0])
 
     def shutdown(self):
         self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index c1b9216..a5a5b5d 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -48,6 +48,7 @@
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
         self.worker.registerFunction("zuul:promote")
+        self.worker.registerFunction("zuul:get_running_jobs")
 
     def stop(self):
         self.log.debug("Stopping")
@@ -123,3 +124,14 @@
         change_ids = args['change_ids']
         self.sched.promote(pipeline_name, change_ids)
         job.sendWorkComplete()
+
+    def handle_get_running_jobs(self, job):
+        # args = json.loads(job.arguments)
+        # TODO: use args to filter by pipeline etc
+        running_items = []
+        for pipeline_name, pipeline in self.sched.layout.pipelines.iteritems():
+            for queue in pipeline.queues:
+                for item in queue.queue:
+                    running_items.append(item.formatJSON())
+
+        job.sendWorkComplete(json.dumps(running_items))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 18f44db..f5d6629 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1305,7 +1305,7 @@
                 changed = True
                 status = ''
                 for item in queue.queue:
-                    status += self.pipeline.formatStatus(item)
+                    status += item.formatStatus()
                 if status:
                     self.log.debug("Queue %s status is now:\n %s" %
                                    (queue.name, status))
@@ -1334,7 +1334,7 @@
 
         self.pipeline.setResult(item, build)
         self.log.debug("Item %s status is now:\n %s" %
-                       (item, self.pipeline.formatStatus(item)))
+                       (item, item.formatStatus()))
         self.updateBuildDescriptions(build.build_set)
         return True