Add support to list running jobs to zuul client
Change-Id: I16ccc02aa1a3b0cd8648b6ea05fc20c89c92a571
diff --git a/requirements.txt b/requirements.txt
index bb48290..1c5587b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -16,3 +16,5 @@
apscheduler>=2.1.1,<3.0
python-swiftclient>=1.6
python-keystoneclient>=0.4.2
+PrettyTable>=0.6,<0.8
+babel
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 7e1416f..d191357 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3870,3 +3870,62 @@
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
+
+ def test_client_get_running_jobs(self):
+ "Test that the RPC client can get a list of running jobs"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+
+ # Wait for gearman server to send the initial workData back to zuul
+ start = time.time()
+ while True:
+ if time.time() - start > 10:
+ raise Exception("Timeout waiting for gearman server to report "
+ + "back to the client")
+ build = self.launcher.builds.values()[0]
+ if build.worker.name == "My Worker":
+ break
+ else:
+ time.sleep(0)
+
+ running_items = client.get_running_jobs()
+
+ self.assertEqual(1, len(running_items))
+ running_item = running_items[0]
+ self.assertEqual([], running_item['failing_reasons'])
+ self.assertEqual([], running_item['items_behind'])
+ self.assertEqual('https://hostname/1', running_item['url'])
+ self.assertEqual(None, running_item['item_ahead'])
+ self.assertEqual('org/project', running_item['project'])
+ self.assertEqual(None, running_item['remaining_time'])
+ self.assertEqual(True, running_item['active'])
+ self.assertEqual('1,1', running_item['id'])
+
+ self.assertEqual(3, len(running_item['jobs']))
+ for job in running_item['jobs']:
+ if job['name'] == 'project-merge':
+ self.assertEqual('project-merge', job['name'])
+ self.assertEqual('gate', job['pipeline'])
+ self.assertEqual(False, job['retry'])
+ self.assertEqual(13, len(job['parameters']))
+ self.assertEqual('https://server/job/project-merge/0/',
+ job['url'])
+ self.assertEqual(7, len(job['worker']))
+ self.assertEqual(False, job['canceled'])
+ self.assertEqual(True, job['voting'])
+ self.assertEqual(None, job['result'])
+ self.assertEqual('gate', job['pipeline'])
+ break
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ running_items = client.get_running_jobs()
+ self.assertEqual(0, len(running_items))
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index a334bff..147fade 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -15,11 +15,15 @@
# under the License.
import argparse
+import babel.dates
import ConfigParser
+import datetime
import logging
import logging.config
import os
+import prettytable
import sys
+import time
import zuul.rpcclient
@@ -66,6 +70,23 @@
required=True, nargs='+')
cmd_promote.set_defaults(func=self.promote)
+ cmd_show = subparsers.add_parser('show',
+ help='valid show subcommands')
+ show_subparsers = cmd_show.add_subparsers(title='show')
+ show_running_jobs = show_subparsers.add_parser(
+ 'running-jobs',
+ help='show the running jobs'
+ )
+ show_running_jobs.add_argument(
+ '--columns',
+ help="comma separated list of columns to display (or 'ALL')",
+ choices=self._show_running_jobs_columns().keys().append('ALL'),
+ default='name, worker.name, start_time, result'
+ )
+
+ # TODO: add filters such as queue, project, changeid etc
+ show_running_jobs.set_defaults(func=self.show_running_jobs)
+
self.args = parser.parse_args()
def _get_version(self):
@@ -119,6 +140,147 @@
change_ids=self.args.changes)
return r
+ def show_running_jobs(self):
+ client = zuul.rpcclient.RPCClient(self.server, self.port)
+ running_items = client.get_running_jobs()
+
+ if len(running_items) == 0:
+ print "No jobs currently running"
+ return True
+
+ all_fields = self._show_running_jobs_columns()
+ if self.args.columns.upper() == 'ALL':
+ fields = all_fields.keys()
+ else:
+ fields = [f.strip().lower() for f in self.args.columns.split(',')
+ if f.strip().lower() in all_fields.keys()]
+
+ table = prettytable.PrettyTable(
+ field_names=[all_fields[f]['title'] for f in fields])
+ for item in running_items:
+ for job in item['jobs']:
+ values = []
+ for f in fields:
+ v = job
+ for part in f.split('.'):
+ if hasattr(v, 'get'):
+ v = v.get(part, '')
+ if ('transform' in all_fields[f]
+ and callable(all_fields[f]['transform'])):
+ v = all_fields[f]['transform'](v)
+ if 'append' in all_fields[f]:
+ v += all_fields[f]['append']
+ values.append(v)
+ table.add_row(values)
+ print table
+ return True
+
+ def _epoch_to_relative_time(self, epoch):
+ if epoch:
+ delta = datetime.timedelta(seconds=(time.time() - int(epoch)))
+ return babel.dates.format_timedelta(delta, locale='en_US')
+ else:
+ return "Unknown"
+
+ def _boolean_to_yes_no(self, value):
+ return 'Yes' if value else 'No'
+
+ def _boolean_to_pass_fail(self, value):
+ return 'Pass' if value else 'Fail'
+
+ def _format_list(self, l):
+ return ', '.join(l) if isinstance(l, list) else ''
+
+ def _show_running_jobs_columns(self):
+ """A helper function to get the list of available columns for
+ `zuul show running-jobs`. Also describes how to convert particular
+ values (for example epoch to time string)"""
+
+ return {
+ 'name': {
+ 'title': 'Job Name',
+ },
+ 'elapsed_time': {
+ 'title': 'Elapsed Time',
+ 'transform': self._epoch_to_relative_time
+ },
+ 'remaining_time': {
+ 'title': 'Remaining Time',
+ 'transform': self._epoch_to_relative_time
+ },
+ 'url': {
+ 'title': 'URL'
+ },
+ 'result': {
+ 'title': 'Result'
+ },
+ 'voting': {
+ 'title': 'Voting',
+ 'transform': self._boolean_to_yes_no
+ },
+ 'uuid': {
+ 'title': 'UUID'
+ },
+ 'launch_time': {
+ 'title': 'Launch Time',
+ 'transform': self._epoch_to_relative_time,
+ 'append': ' ago'
+ },
+ 'start_time': {
+ 'title': 'Start Time',
+ 'transform': self._epoch_to_relative_time,
+ 'append': ' ago'
+ },
+ 'end_time': {
+ 'title': 'End Time',
+ 'transform': self._epoch_to_relative_time,
+ 'append': ' ago'
+ },
+ 'estimated_time': {
+ 'title': 'Estimated Time',
+ 'transform': self._epoch_to_relative_time,
+ 'append': ' to go'
+ },
+ 'pipeline': {
+ 'title': 'Pipeline'
+ },
+ 'canceled': {
+ 'title': 'Canceled',
+ 'transform': self._boolean_to_yes_no
+ },
+ 'retry': {
+ 'title': 'Retry'
+ },
+ 'number': {
+ 'title': 'Number'
+ },
+ 'parameters': {
+ 'title': 'Parameters'
+ },
+ 'worker.name': {
+ 'title': 'Worker'
+ },
+ 'worker.hostname': {
+ 'title': 'Worker Hostname'
+ },
+ 'worker.ips': {
+ 'title': 'Worker IPs',
+ 'transform': self._format_list
+ },
+ 'worker.fqdn': {
+ 'title': 'Worker Domain'
+ },
+ 'worker.progam': {
+ 'title': 'Worker Program'
+ },
+ 'worker.version': {
+ 'title': 'Worker Version'
+ },
+ 'worker.extra': {
+ 'title': 'Worker Extra'
+ },
+ }
+
def main():
client = Client()
diff --git a/zuul/model.py b/zuul/model.py
index 9028577..82ce9d0 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -269,7 +269,7 @@
if j_changes:
j_queue['heads'].append(j_changes)
j_changes = []
- j_changes.append(self.formatItemJSON(e))
+ j_changes.append(e.formatJSON())
if (len(j_changes) > 1 and
(j_changes[-2]['remaining_time'] is not None) and
(j_changes[-1]['remaining_time'] is not None)):
@@ -280,101 +280,6 @@
j_queue['heads'].append(j_changes)
return j_pipeline
- def formatStatus(self, item, indent=0, html=False):
- changeish = item.change
- indent_str = ' ' * indent
- ret = ''
- if html and hasattr(changeish, 'url') and changeish.url is not None:
- ret += '%sProject %s change <a href="%s">%s</a>\n' % (
- indent_str,
- changeish.project.name,
- changeish.url,
- changeish._id())
- else:
- ret += '%sProject %s change %s based on %s\n' % (
- indent_str,
- changeish.project.name,
- changeish._id(),
- item.item_ahead)
- for job in self.getJobs(changeish):
- build = item.current_build_set.getBuild(job.name)
- if build:
- result = build.result
- else:
- result = None
- job_name = job.name
- if not job.voting:
- voting = ' (non-voting)'
- else:
- voting = ''
- if html:
- if build:
- url = build.url
- else:
- url = None
- if url is not None:
- job_name = '<a href="%s">%s</a>' % (url, job_name)
- ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
- ret += '\n'
- return ret
-
- def formatItemJSON(self, item):
- changeish = item.change
- ret = {}
- ret['active'] = item.active
- if hasattr(changeish, 'url') and changeish.url is not None:
- ret['url'] = changeish.url
- else:
- ret['url'] = None
- ret['id'] = changeish._id()
- if item.item_ahead:
- ret['item_ahead'] = item.item_ahead.change._id()
- else:
- ret['item_ahead'] = None
- ret['items_behind'] = [i.change._id() for i in item.items_behind]
- ret['failing_reasons'] = item.current_build_set.failing_reasons
- ret['zuul_ref'] = item.current_build_set.ref
- ret['project'] = changeish.project.name
- ret['enqueue_time'] = int(item.enqueue_time * 1000)
- ret['jobs'] = []
- max_remaining = 0
- for job in self.getJobs(changeish):
- now = time.time()
- build = item.current_build_set.getBuild(job.name)
- elapsed = None
- remaining = None
- result = None
- url = None
- if build:
- result = build.result
- url = build.url
- if build.start_time:
- if build.end_time:
- elapsed = int((build.end_time -
- build.start_time) * 1000)
- remaining = 0
- else:
- elapsed = int((now - build.start_time) * 1000)
- if build.estimated_time:
- remaining = max(
- int(build.estimated_time * 1000) - elapsed,
- 0)
- if remaining and remaining > max_remaining:
- max_remaining = remaining
- ret['jobs'].append(
- dict(
- name=job.name,
- elapsed_time=elapsed,
- remaining_time=remaining,
- url=url,
- result=result,
- voting=job.voting))
- if self.haveAllJobsStarted(item):
- ret['remaining_time'] = max_remaining
- else:
- ret['remaining_time'] = None
- return ret
-
class ActionReporter(object):
"""An ActionReporter has a reporter and its configured paramaters"""
@@ -760,6 +665,124 @@
def setReportedResult(self, result):
self.current_build_set.result = result
+ def formatJSON(self):
+ changeish = self.change
+ ret = {}
+ ret['active'] = self.active
+ if hasattr(changeish, 'url') and changeish.url is not None:
+ ret['url'] = changeish.url
+ else:
+ ret['url'] = None
+ ret['id'] = changeish._id()
+ if self.item_ahead:
+ ret['item_ahead'] = self.item_ahead.change._id()
+ else:
+ ret['item_ahead'] = None
+ ret['items_behind'] = [i.change._id() for i in self.items_behind]
+ ret['failing_reasons'] = self.current_build_set.failing_reasons
+ ret['zuul_ref'] = self.current_build_set.ref
+ ret['project'] = changeish.project.name
+ ret['enqueue_time'] = int(self.enqueue_time * 1000)
+ ret['jobs'] = []
+ max_remaining = 0
+ for job in self.pipeline.getJobs(changeish):
+ now = time.time()
+ build = self.current_build_set.getBuild(job.name)
+ elapsed = None
+ remaining = None
+ result = None
+ url = None
+ worker = None
+ if build:
+ result = build.result
+ url = build.url
+ if build.start_time:
+ if build.end_time:
+ elapsed = int((build.end_time -
+ build.start_time) * 1000)
+ remaining = 0
+ else:
+ elapsed = int((now - build.start_time) * 1000)
+ if build.estimated_time:
+ remaining = max(
+ int(build.estimated_time * 1000) - elapsed,
+ 0)
+ worker = {
+ 'name': build.worker.name,
+ 'hostname': build.worker.hostname,
+ 'ips': build.worker.ips,
+ 'fqdn': build.worker.fqdn,
+ 'program': build.worker.program,
+ 'version': build.worker.version,
+ 'extra': build.worker.extra
+ }
+ if remaining and remaining > max_remaining:
+ max_remaining = remaining
+
+ ret['jobs'].append({
+ 'name': job.name,
+ 'elapsed_time': elapsed,
+ 'remaining_time': remaining,
+ 'url': url,
+ 'result': result,
+ 'voting': job.voting,
+ 'uuid': build.uuid if build else None,
+ 'launch_time': build.launch_time if build else None,
+ 'start_time': build.start_time if build else None,
+ 'end_time': build.end_time if build else None,
+ 'estimated_time': build.estimated_time if build else None,
+ 'pipeline': build.pipeline.name if build else None,
+ 'canceled': build.canceled if build else None,
+ 'retry': build.retry if build else None,
+ 'number': build.number if build else None,
+ 'parameters': build.parameters if build else None,
+ 'worker': worker
+ })
+
+ if self.pipeline.haveAllJobsStarted(self):
+ ret['remaining_time'] = max_remaining
+ else:
+ ret['remaining_time'] = None
+ return ret
+
+ def formatStatus(self, indent=0, html=False):
+ changeish = self.change
+ indent_str = ' ' * indent
+ ret = ''
+ if html and hasattr(changeish, 'url') and changeish.url is not None:
+ ret += '%sProject %s change <a href="%s">%s</a>\n' % (
+ indent_str,
+ changeish.project.name,
+ changeish.url,
+ changeish._id())
+ else:
+ ret += '%sProject %s change %s based on %s\n' % (
+ indent_str,
+ changeish.project.name,
+ changeish._id(),
+ self.item_ahead)
+ for job in self.pipeline.getJobs(changeish):
+ build = self.current_build_set.getBuild(job.name)
+ if build:
+ result = build.result
+ else:
+ result = None
+ job_name = job.name
+ if not job.voting:
+ voting = ' (non-voting)'
+ else:
+ voting = ''
+ if html:
+ if build:
+ url = build.url
+ else:
+ url = None
+ if url is not None:
+ job_name = '<a href="%s">%s</a>' % (url, job_name)
+ ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
+ ret += '\n'
+ return ret
+
class Changeish(object):
"""Something like a change; either a change or a ref"""
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 69390c0..7f572be 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -46,7 +46,7 @@
if job.exception:
raise RPCFailure(job.exception)
self.log.debug("Job complete, success: %s" % (not job.failure))
- return (not job.failure)
+ return job
def enqueue(self, pipeline, project, trigger, change):
data = {'pipeline': pipeline,
@@ -54,13 +54,21 @@
'trigger': trigger,
'change': change,
}
- return self.submitJob('zuul:enqueue', data)
+ return not self.submitJob('zuul:enqueue', data).failure
def promote(self, pipeline, change_ids):
data = {'pipeline': pipeline,
'change_ids': change_ids,
}
- return self.submitJob('zuul:promote', data)
+ return not self.submitJob('zuul:promote', data).failure
+
+ def get_running_jobs(self):
+ data = {}
+ job = self.submitJob('zuul:get_running_jobs', data)
+ if job.failure:
+ return False
+ else:
+ return json.loads(job.data[0])
def shutdown(self):
self.gearman.shutdown()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index c1b9216..a5a5b5d 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -48,6 +48,7 @@
def register(self):
self.worker.registerFunction("zuul:enqueue")
self.worker.registerFunction("zuul:promote")
+ self.worker.registerFunction("zuul:get_running_jobs")
def stop(self):
self.log.debug("Stopping")
@@ -123,3 +124,14 @@
change_ids = args['change_ids']
self.sched.promote(pipeline_name, change_ids)
job.sendWorkComplete()
+
+ def handle_get_running_jobs(self, job):
+ # args = json.loads(job.arguments)
+ # TODO: use args to filter by pipeline etc
+ running_items = []
+ for pipeline_name, pipeline in self.sched.layout.pipelines.iteritems():
+ for queue in pipeline.queues:
+ for item in queue.queue:
+ running_items.append(item.formatJSON())
+
+ job.sendWorkComplete(json.dumps(running_items))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 18f44db..f5d6629 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1305,7 +1305,7 @@
changed = True
status = ''
for item in queue.queue:
- status += self.pipeline.formatStatus(item)
+ status += item.formatStatus()
if status:
self.log.debug("Queue %s status is now:\n %s" %
(queue.name, status))
@@ -1334,7 +1334,7 @@
self.pipeline.setResult(item, build)
self.log.debug("Item %s status is now:\n %s" %
- (item, self.pipeline.formatStatus(item)))
+ (item, item.formatStatus()))
self.updateBuildDescriptions(build.build_set)
return True