blob: 4e92336cc66eee3318101305acbcb19c2e7f9cd7 [file] [log] [blame]
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import pickle
import Queue
import re
import threading
import yaml
from model import Pipeline, Job, Project, ChangeQueue, EventFilter
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
def __init__(self):
threading.Thread.__init__(self)
self.wake_event = threading.Event()
self.reconfigure_complete_event = threading.Event()
self.queue_lock = threading.Lock()
self._pause = False
self._reconfigure = False
self._exit = False
self._stopped = False
self.launcher = None
self.trigger = None
self.trigger_event_queue = Queue.Queue()
self.result_event_queue = Queue.Queue()
self._init()
def _init(self):
self.pipelines = {}
self.jobs = {}
self.projects = {}
self.metajobs = {}
def stop(self):
self._stopped = True
self.wake_event.set()
def _parseConfig(self, config_path):
def toList(item):
if not item:
return []
if isinstance(item, list):
return item
return [item]
if config_path:
config_path = os.path.expanduser(config_path)
if not os.path.exists(config_path):
raise Exception("Unable to read layout config file at %s" %
config_path)
config_file = open(config_path)
data = yaml.load(config_file)
self._config_env = {}
for include in data.get('includes', []):
if 'python-file' in include:
fn = include['python-file']
if not os.path.isabs(fn):
base = os.path.dirname(config_path)
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
execfile(fn, self._config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
self.pipelines[conf_pipeline['name']] = pipeline
manager.success_action = conf_pipeline.get('success')
manager.failure_action = conf_pipeline.get('failure')
manager.start_action = conf_pipeline.get('start')
for trigger in toList(conf_pipeline['trigger']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
for k, v in approval_dict.items():
approvals[k] = v
f = EventFilter(types=toList(trigger['event']),
branches=toList(trigger.get('branch')),
refs=toList(trigger.get('ref')),
approvals=approvals,
comment_filters=toList(
trigger.get('comment_filter')))
manager.event_filters.append(f)
for config_job in data['jobs']:
job = self.getJob(config_job['name'])
# Be careful to only set attributes explicitly present on
# this job, to avoid squashing attributes set by a meta-job.
m = config_job.get('failure-message', None)
if m:
job.failure_message = m
m = config_job.get('success-message', None)
if m:
job.success_message = m
m = config_job.get('hold-following-changes', False)
if m:
job.hold_following_changes = True
fname = config_job.get('parameter-function', None)
if fname:
func = self._config_env.get(fname, None)
if not func:
raise Exception("Unable to find function %s" % fname)
job.parameter_function = func
branches = toList(config_job.get('branch'))
if branches:
job._branches = branches
job.branches = [re.compile(x) for x in branches]
def add_jobs(job_tree, config_jobs):
for job in config_jobs:
if isinstance(job, list):
for x in job:
add_jobs(job_tree, x)
if isinstance(job, dict):
for parent, children in job.items():
parent_tree = job_tree.addJob(self.getJob(parent))
add_jobs(parent_tree, children)
if isinstance(job, str):
job_tree.addJob(self.getJob(job))
for config_project in data['projects']:
project = Project(config_project['name'])
self.projects[config_project['name']] = project
for pipeline in self.pipelines.values():
if pipeline.name in config_project:
job_tree = pipeline.addProject(project)
config_jobs = config_project[pipeline.name]
add_jobs(job_tree, config_jobs)
# All jobs should be defined at this point, get rid of
# metajobs so that getJob isn't doing anything weird.
self.metajobs = {}
# TODO(jeblair): check that we don't end up with jobs like
# "foo - bar" because a ':' is missing in the yaml for a dependent job
for pipeline in self.pipelines.values():
pipeline.manager._postConfig()
def getJob(self, name):
if name in self.jobs:
return self.jobs[name]
job = Job(name)
if name.startswith('^'):
# This is a meta-job
regex = re.compile(name)
self.metajobs[regex] = job
else:
# Apply attributes from matching meta-jobs
for regex, metajob in self.metajobs.items():
if regex.match(name):
job.copy(metajob)
self.jobs[name] = job
return job
def setLauncher(self, launcher):
self.launcher = launcher
def setTrigger(self, trigger):
self.trigger = trigger
def addEvent(self, event):
self.log.debug("Adding trigger event: %s" % event)
self.queue_lock.acquire()
self.trigger_event_queue.put(event)
self.queue_lock.release()
self.wake_event.set()
def onBuildStarted(self, build):
self.log.debug("Adding start event for build: %s" % build)
self.queue_lock.acquire()
self.result_event_queue.put(('started', build))
self.queue_lock.release()
self.wake_event.set()
def onBuildCompleted(self, build):
self.log.debug("Adding complete event for build: %s" % build)
self.queue_lock.acquire()
self.result_event_queue.put(('completed', build))
self.queue_lock.release()
self.wake_event.set()
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
self.config = config
self._pause = True
self._reconfigure = True
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
self.reconfigure_complete_event.wait()
self.reconfigure_complete_event.clear()
self.log.debug("Reconfiguration complete")
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
self._exit = True
self.wake_event.set()
self.log.debug("Waiting for exit")
def _get_queue_pickle_file(self):
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(self.config.get('zuul',
'state_dir'))
else:
state_dir = '/var/lib/zuul'
return os.path.join(state_dir, 'queue.pickle')
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
while not self.trigger_event_queue.empty():
events.append(self.trigger_event_queue.get())
self.log.debug("Queue length is %s" % len(events))
if events:
self.log.debug("Saving queue")
pickle.dump(events, open(pickle_file, 'wb'))
def _load_queue(self):
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Loading queue")
events = pickle.load(open(pickle_file, 'rb'))
self.log.debug("Queue length is %s" % len(events))
for event in events:
self.trigger_event_queue.put(event)
else:
self.log.debug("No queue file found")
def _delete_queue(self):
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Deleting saved queue")
os.unlink(pickle_file)
def resume(self):
try:
self._load_queue()
except:
self.log.exception("Unable to load queue")
try:
self._delete_queue()
except:
self.log.exception("Unable to delete saved queue")
self.log.debug("Resuming queue processing")
self.wake_event.set()
def _doPauseEvent(self):
if self._exit:
self.log.debug("Exiting")
self._save_queue()
os._exit(0)
if self._reconfigure:
self.log.debug("Performing reconfiguration")
self._init()
self._parseConfig(self.config.get('zuul', 'layout_config'))
self._pause = False
self.reconfigure_complete_event.set()
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
for pipeline in self.pipelines.values():
for build in pipeline.manager.building_jobs.keys():
self.log.debug("%s waiting on %s" % (pipeline.manager, build))
waiting = True
if not waiting:
self.log.debug("All builds are complete")
return True
self.log.debug("All builds are not complete")
return False
def run(self):
while True:
self.log.debug("Run handler sleeping")
self.wake_event.wait()
self.wake_event.clear()
if self._stopped:
return
self.log.debug("Run handler awake")
self.queue_lock.acquire()
try:
if not self._pause:
if not self.trigger_event_queue.empty():
self.process_event_queue()
if not self.result_event_queue.empty():
self.process_result_queue()
if self._pause and self._areAllBuildsComplete():
self._doPauseEvent()
if not self._pause:
if not (self.trigger_event_queue.empty() and
self.result_event_queue.empty()):
self.wake_event.set()
else:
if not self.result_event_queue.empty():
self.wake_event.set()
except:
self.log.exception("Exception in run handler:")
self.queue_lock.release()
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event)
project = self.projects.get(event.project_name)
if not project:
self.log.warning("Project %s not found" % event.project_name)
return
for pipeline in self.pipelines.values():
if not pipeline.manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, pipeline))
continue
change = event.getChange(project, self.trigger)
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
def process_result_queue(self):
self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get()
self.log.debug("Processing result event %s" % build)
for pipeline in self.pipelines.values():
if event_type == 'started':
if pipeline.manager.onBuildStarted(build):
return
elif event_type == 'completed':
if pipeline.manager.onBuildCompleted(build):
return
self.log.warning("Build %s not found by any queue manager" % (build))
def formatStatusHTML(self):
ret = '<html><pre>'
keys = self.pipelines.keys()
keys.sort()
for key in keys:
pipeline = self.pipelines[key]
s = 'Pipeline: %s' % pipeline.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
ret += pipeline.manager.formatStatusHTML()
ret += '\n'
ret += '</pre></html>'
return ret
class BasePipelineManager(object):
log = logging.getLogger("zuul.BasePipelineManager")
def __init__(self, sched, pipeline):
self.sched = sched
self.pipeline = pipeline
self.building_jobs = {}
self.event_filters = []
self.success_action = {}
self.failure_action = {}
self.start_action = {}
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
def _postConfig(self):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Events:")
for e in self.event_filters:
self.log.info(" %s" % e)
self.log.info(" Projects:")
def log_jobs(tree, indent=0):
istr = ' ' + ' ' * indent
if tree.job:
efilters = ''
for b in tree.job._branches:
efilters += str(b)
if efilters:
efilters = ' ' + efilters
hold = ''
if tree.job.hold_following_changes:
hold = ' [hold]'
self.log.info("%s%s%s%s" % (istr,
repr(tree.job),
efilters,
hold))
for x in tree.job_trees:
log_jobs(x, indent + 2)
for p in self.sched.projects.values():
tree = self.pipeline.getJobTree(p)
if tree:
self.log.info(" %s" % p)
log_jobs(tree)
if self.start_action:
self.log.info(" On start:")
self.log.info(" %s" % self.start_action)
if self.success_action:
self.log.info(" On success:")
self.log.info(" %s" % self.success_action)
if self.failure_action:
self.log.info(" On failure:")
self.log.info(" %s" % self.failure_action)
def getSubmitAllowNeeds(self):
# Get a list of code review labels that are allowed to be
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
if self.success_action:
return self.success_action.keys()
else:
return {}
def eventMatches(self, event):
for ef in self.event_filters:
if ef.matches(event):
return True
return False
def isChangeAlreadyInQueue(self, change):
for c in self.getChangesInQueue():
if change.equals(c):
return True
return False
def addChange(self, change):
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return
self.log.debug("Adding change %s" % change)
self._addChange(change)
def _addChange(self, change):
if self.start_action:
try:
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
msg = "Starting %s jobs." % self.pipeline.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
self.launchJobs(change)
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job, change)
self.building_jobs[build] = change
self.log.debug("Adding build %s of job %s to change %s" %
(build, job, change))
change.addBuild(build)
except:
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
def onBuildStarted(self, build):
self.log.debug("Build %s started" % build)
if build not in self.building_jobs:
self.log.debug("Build %s not found" % (build))
# Or triggered externally, or triggered before zuul started,
# or restarted
return False
self.updateBuildDescriptions(build.build_set)
return True
def onBuildCompleted(self, build):
self.log.debug("Build %s completed" % build)
if build not in self.building_jobs:
self.log.debug("Build %s not found" % (build))
# Or triggered externally, or triggered before zuul started,
# or restarted
return False
change = self.building_jobs[build]
self.log.debug("Found change %s which triggered completed build %s" %
(change, build))
del self.building_jobs[build]
self.pipeline.setResult(change, build)
self.log.info("Change %s status is now:\n %s" %
(change, self.pipeline.formatStatus(change)))
if self.pipeline.areAllJobsComplete(change):
self.log.debug("All jobs for change %s are complete" % change)
self.possiblyReportChange(change)
else:
# There may be jobs that depended on jobs that are now complete
self.log.debug("All jobs for change %s are not yet complete" %
(change))
self.launchJobs(change)
self.updateBuildDescriptions(build.build_set)
return True
def possiblyReportChange(self, change):
self.log.debug("Possibly reporting change %s" % change)
self.reportChange(change)
def reportChange(self, change):
if not change.is_reportable:
return False
if change.reported:
return 0
self.log.debug("Reporting change %s" % change)
ret = None
if self.pipeline.didAllJobsSucceed(change):
action = self.success_action
change.setReportedResult('SUCCESS')
else:
action = self.failure_action
change.setReportedResult('FAILURE')
try:
self.log.info("Reporting change %s, action: %s" %
(change, action))
ret = self.sched.trigger.report(change,
self.pipeline.formatReport(change),
action)
if ret:
self.log.error("Reporting change %s received: %s" %
(change, ret))
else:
change.reported = True
except:
self.log.exception("Exception while reporting:")
change.setReportedResult('ERROR')
self.updateBuildDescriptions(change.current_build_set)
return ret
def getChangesInQueue(self):
changes = []
for build, change in self.building_jobs.items():
if change not in changes:
changes.append(change)
return changes
def formatStatusHTML(self):
changes = self.getChangesInQueue()
ret = ''
for change in changes:
ret += self.pipeline.formatStatus(change, html=True)
return ret
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
self.change_queues = []
def _postConfig(self):
super(DependentPipelineManager, self)._postConfig()
self.buildChangeQueues()
def buildChangeQueues(self):
self.log.debug("Building shared change queues")
change_queues = []
for project in self.pipeline.getProjects():
change_queue = ChangeQueue(self.pipeline)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
self.log.debug("Combining shared queues")
new_change_queues = []
for a in change_queues:
merged_a = False
for b in new_change_queues:
if not a.getJobs().isdisjoint(b.getJobs()):
self.log.debug("Merging queue %s into %s" % (a, b))
b.mergeChangeQueue(a)
merged_a = True
break # this breaks out of 'for b' and continues 'for a'
if not merged_a:
self.log.debug("Keeping queue %s" % (a))
new_change_queues.append(a)
self.change_queues = new_change_queues
self.log.info(" Shared change queues:")
for x in self.change_queues:
self.log.info(" %s" % x)
def getQueue(self, project):
for queue in self.change_queues:
if project in queue.projects:
return queue
self.log.error("Unable to find change queue for project %s" % project)
def _checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_change'):
self.log.debug(" Changeish does not support dependencies")
return True
if not change.needs_change:
self.log.debug(" No changes needed")
return True
if change.needs_change.is_merged:
self.log.debug(" Needed change is merged")
return True
if not change.needs_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False
change_queue = self.getQueue(change.project)
if change.needs_change in change_queue.queue:
self.log.debug(" Needed change is already ahead in the queue")
return True
if self.sched.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
# It can merge, so attempt to enqueue it _ahead_ of this change.
# If that works we can enqueue this change, otherwise, we can't.
self.log.debug(" Change %s must be merged ahead of %s" %
(change.needs_change, change))
return self.addChange(change.needs_change)
# The needed change can't be merged.
self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change)
return False
def _checkForChangesNeeding(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return to_enqueue
for needs in change.needed_by_changes:
if self.sched.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
return to_enqueue
def addChange(self, change):
# Returns true if added (or not needed), false if failed to add
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
if not self.sched.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
if not self._checkForChangesNeededBy(change):
return False
to_enqueue = self._checkForChangesNeeding(change)
# TODO(jeblair): Consider re-ordering this so that the dependent
# changes aren't checked until closer when they are needed.
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s has been added to queue, ignoring" %
change)
return True
self.log.debug("Adding change %s" % change)
change_queue = self.getQueue(change.project)
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
change_queue.enqueueChange(change)
self._addChange(change)
for needs in to_enqueue:
self.addChange(needs)
return True
return False
def _getDependentChanges(self, change):
orig_change = change
changes = []
while change.change_ahead:
changes.append(change.change_ahead)
change = change.change_ahead
self.log.info("Change %s depends on changes %s" % (orig_change,
changes))
return changes
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
dependent_changes = self._getDependentChanges(change)
for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job,
change,
dependent_changes)
self.building_jobs[build] = change
self.log.debug("Adding build %s of job %s to change %s" %
(build, job, change))
change.addBuild(build)
except:
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
if change.change_behind:
self.log.debug("Launching jobs for change %s, behind change %s" %
(change.change_behind, change))
self.launchJobs(change.change_behind)
def cancelJobs(self, change, prime=True):
self.log.debug("Cancel jobs for change %s" % change)
to_remove = []
if prime:
change.resetAllBuilds()
for build, build_change in self.building_jobs.items():
if build_change == change:
self.log.debug("Found build %s for change %s to cancel" %
(build, change))
try:
self.sched.launcher.cancel(build)
except:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, change))
to_remove.append(build)
for build in to_remove:
self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED'
del self.building_jobs[build]
if change.change_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
(change.change_behind, change))
self.cancelJobs(change.change_behind, prime=prime)
def onBuildCompleted(self, build):
change = self.building_jobs.get(build)
if not super(DependentPipelineManager, self).onBuildCompleted(build):
return False
if (change and change.change_behind and
self.pipeline.didAnyJobFail(change)):
# This or some other build failed. All changes behind this change
# will need to be retested. To free up resources cancel the builds
# behind this one as they will be rerun anyways.
self.cancelJobs(change.change_behind, prime=False)
self.log.debug("Canceling builds behind change: %s due to "
"failure." % change)
return True
def possiblyReportChange(self, change):
self.log.debug("Possibly reporting change %s" % change)
if change.reported:
self.log.debug("Change %s already reported" % change)
return
change_behind = change.change_behind
if not change.change_ahead:
self.log.debug("Change %s is at the front of the queue, "
"reporting" % (change))
ret = self.reportChange(change)
self.log.debug("Removing reported change %s from queue" % change)
change.delete()
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(change, change.branch)
succeeded = self.pipeline.didAllJobsSucceed(change)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (change))
# The merge or test failed, re-run all jobs behind this one
if change_behind:
self.log.info("Canceling/relaunching jobs for change %s "
"behind failed change %s" %
(change_behind, change))
self.cancelJobs(change_behind)
self.launchJobs(change_behind)
# If the change behind this is ready, notify
if change_behind and self.pipeline.areAllJobsComplete(change_behind):
self.log.info("Change %s behind change %s is ready, "
"possibly reporting" % (change_behind, change))
self.possiblyReportChange(change_behind)
def getChangesInQueue(self):
changes = []
for shared_queue in self.change_queues:
changes.extend(shared_queue.queue)
return changes
def formatStatusHTML(self):
ret = ''
ret += '\n'
for queue in self.change_queues:
s = 'Shared queue: %s' % queue.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
if queue.queue:
ret += self.pipeline.formatStatus(queue.queue[-1], html=True)
return ret