Add pipelines to data model.
This is a refactoring of the data model with the following goals:
* Call top-level queues pipelines -- because too many other things
are already called queues. Pipelines convey the idea that there
are a number of tasks to be performed (jobs), and that those
tasks can be applied to different changes in parallel.
* Eliminate references to queue_name from within a Change.
Instead, methods that need to understand the pipeline that were
accessed previously via the change are now located in the
Pipeline class, taking a change as an argument. Essentially,
many methods involving changes (and builds, jobs, etc) must now
be called in the context of a pipeline.
* Add a changeish object to encompass the things that change and
ref events have in common.
Change-Id: Iaf8ed0991f3c5b2bf7ded2c340a60725f7f98eaf
Reviewed-on: https://review.openstack.org/10757
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 9f00375..5e2049b 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -1,6 +1,6 @@
-queues:
+pipelines:
- name: check
- manager: IndependentQueueManager
+ manager: IndependentPipelineManager
trigger:
- event: patchset-uploaded
success:
@@ -9,13 +9,13 @@
verified: -1
- name: post
- manager: IndependentQueueManager
+ manager: IndependentPipelineManager
trigger:
- event: ref-updated
ref: ^(?!refs/).*$
- name: gate
- manager: DependentQueueManager
+ manager: DependentPipelineManager
trigger:
- event: comment-added
approval:
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a61e13c..fae05d0 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -916,15 +916,16 @@
"Test that whether a change is ready to merge"
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- a = self.sched.trigger.getChange(1, 2, 'gate')
- assert not a.can_merge
+ a = self.sched.trigger.getChange(1, 2)
+ mgr = self.sched.pipelines['gate'].manager
+ assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
A.addApproval('CRVW', 2)
- a = self.sched.trigger.getChange(1, 2, 'gate')
- assert not a.can_merge
+ a = self.sched.trigger.getChange(1, 2)
+ assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
A.addApproval('APRV', 1)
- a = self.sched.trigger.getChange(1, 2, 'gate')
- assert a.can_merge
+ a = self.sched.trigger.getChange(1, 2)
+ assert self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
return True
diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py
index 805657e..542b1c4 100644
--- a/zuul/launcher/jenkins.py
+++ b/zuul/launcher/jenkins.py
@@ -211,13 +211,13 @@
uuid = str(uuid1())
params = dict(UUID=uuid,
GERRIT_PROJECT=change.project.name)
- if change.refspec:
+ if hasattr(change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
for c in dependent_changes + [change]])
params['GERRIT_BRANCH'] = change.branch
params['GERRIT_CHANGES'] = changes_str
- if change.ref:
+ if hasattr(change, 'ref'):
params['GERRIT_REFNAME'] = change.ref
params['GERRIT_OLDREV'] = change.oldrev
params['GERRIT_NEWREV'] = change.newrev
diff --git a/zuul/model.py b/zuul/model.py
index b0c47c8..0839668 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -16,16 +16,283 @@
import time
+class Pipeline(object):
+ """A top-level pipeline such as check, gate, post, etc."""
+ def __init__(self, name):
+ self.name = name
+ self.job_trees = {} # project -> JobTree
+ self.manager = None
+
+ def setManager(self, manager):
+ self.manager = manager
+
+ def addProject(self, project):
+ job_tree = JobTree(None) # Null job == job tree root
+ self.job_trees[project] = job_tree
+ return job_tree
+
+ def getProjects(self):
+ return self.job_trees.keys()
+
+ def getJobTree(self, project):
+ tree = self.job_trees.get(project)
+ return tree
+
+ def getJobs(self, changeish):
+ tree = self.job_trees[changeish.project]
+ if not tree:
+ return []
+ return changeish.filterJobs(tree.getJobs())
+
+ def _findJobsToRun(self, job_trees, changeish):
+ torun = []
+ if changeish.change_ahead:
+ # Only run jobs if any 'hold' jobs on the change ahead
+ # have completed successfully.
+ if self.isHoldingFollowingChanges(changeish.change_ahead):
+ return []
+ for tree in job_trees:
+ job = tree.job
+ result = None
+ if job:
+ if not job.changeMatches(changeish):
+ continue
+ build = changeish.current_build_set.getBuild(job.name)
+ if build:
+ result = build.result
+ else:
+ # There is no build for the root of this job tree,
+ # so we should run it.
+ torun.append(job)
+ # If there is no job, this is a null job tree, and we should
+ # run all of its jobs.
+ if result == 'SUCCESS' or not job:
+ torun.extend(self._findJobsToRun(tree.job_trees, changeish))
+ return torun
+
+ def findJobsToRun(self, changeish):
+ tree = self.job_trees[changeish.project]
+ if not tree:
+ return []
+ return self._findJobsToRun(tree.job_trees, changeish)
+
+ def areAllJobsComplete(self, changeish):
+ for job in self.getJobs(changeish):
+ build = changeish.current_build_set.getBuild(job.name)
+ if not build or not build.result:
+ return False
+ return True
+
+ def didAllJobsSucceed(self, changeish):
+ for job in self.getJobs(changeish):
+ build = changeish.current_build_set.getBuild(job.name)
+ if not build:
+ return False
+ if build.result != 'SUCCESS':
+ return False
+ return True
+
+ def didAnyJobFail(self, changeish):
+ for job in self.getJobs(changeish):
+ build = changeish.current_build_set.getBuild(job.name)
+ if build and build.result == 'FAILURE':
+ return True
+ return False
+
+ def isHoldingFollowingChanges(self, changeish):
+ for job in self.getJobs(changeish):
+ if not job.hold_following_changes:
+ continue
+ build = changeish.current_build_set.getBuild(job.name)
+ if not build:
+ return True
+ if build.result != 'SUCCESS':
+ return True
+ if not changeish.change_ahead:
+ return False
+ return self.isHoldingFollowingChanges(changeish.change_ahead)
+
+ def formatStatus(self, changeish, indent=0, html=False):
+ indent_str = ' ' * indent
+ ret = ''
+ if html and changeish.url is not None:
+ ret += '%sProject %s change <a href="%s">%s</a>\n' % (
+ indent_str,
+ changeish.project.name,
+ changeish.url,
+ changeish._id())
+ else:
+ ret += '%sProject %s change %s\n' % (indent_str,
+ changeish.project.name,
+ changeish._id())
+ for job in self.getJobs(changeish):
+ build = changeish.current_build_set.getBuild(job.name)
+ if build:
+ result = build.result
+ else:
+ result = None
+ job_name = job.name
+ if html:
+ if build:
+ url = build.url
+ else:
+ url = None
+ if url is not None:
+ job_name = '<a href="%s">%s</a>' % (url, job_name)
+ ret += '%s %s: %s' % (indent_str, job_name, result)
+ ret += '\n'
+ if changeish.change_ahead:
+ ret += '%sWaiting on:\n' % (indent_str)
+ ret += self.formatStatus(changeish.change_ahead,
+ indent + 2, html)
+ return ret
+
+ def formatReport(self, changeish):
+ ret = ''
+ if self.didAllJobsSucceed(changeish):
+ ret += 'Build successful\n\n'
+ else:
+ ret += 'Build failed\n\n'
+
+ for job in self.getJobs(changeish):
+ build = changeish.current_build_set.getBuild(job.name)
+ result = build.result
+ if result == 'SUCCESS' and job.success_message:
+ result = job.success_message
+ elif result == 'FAILURE' and job.failure_message:
+ result = job.failure_message
+ url = build.url
+ if not url:
+ url = job.name
+ ret += '- %s : %s\n' % (url, result)
+ return ret
+
+ def formatDescription(self, build):
+ concurrent_changes = ''
+ concurrent_builds = ''
+ other_builds = ''
+
+ for change in build.build_set.other_changes:
+ concurrent_changes += '<li><a href="{change.url}">\
+ {change.number},{change.patchset}</a></li>'.format(
+ change=change)
+
+ change = build.build_set.change
+
+ for build in build.build_set.getBuilds():
+ if build.base_url:
+ concurrent_builds += """\
+<li>
+ <a href="{build.base_url}">
+ {build.job.name} #{build.number}</a>: {build.result}
+</li>
+""".format(build=build)
+ else:
+ concurrent_builds += """\
+<li>
+ {build.job.name}: {build.result}
+</li>""".format(build=build)
+
+ if build.build_set.previous_build_set:
+ other_build = build.build_set.previous_build_set.getBuild(
+ build.job.name)
+ if other_build:
+ other_builds += """\
+<li>
+ Preceded by: <a href="{build.base_url}">
+ {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+ if build.build_set.next_build_set:
+ other_build = build.build_set.next_build_set.getBuild(
+ build.job.name)
+ if other_build:
+ other_builds += """\
+<li>
+ Succeeded by: <a href="{build.base_url}">
+ {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+ result = build.build_set.result
+
+ if change.number:
+ ret = """\
+<p>
+ Triggered by change:
+ <a href="{change.url}">{change.number},{change.patchset}</a><br/>
+ Branch: <b>{change.branch}</b><br/>
+ Pipeline: <b>{self.name}</b>
+</p>"""
+ else:
+ ret = """\
+<p>
+ Triggered by reference:
+ {change.ref}</a><br/>
+ Old revision: <b>{change.oldrev}</b><br/>
+ New revision: <b>{change.newrev}</b><br/>
+ Pipeline: <b>{self.name}</b>
+</p>"""
+
+ if concurrent_changes:
+ ret += """\
+<p>
+ Other changes tested concurrently with this change:
+ <ul>{concurrent_changes}</ul>
+</p>
+"""
+ if concurrent_builds:
+ ret += """\
+<p>
+ All builds for this change set:
+ <ul>{concurrent_builds}</ul>
+</p>
+"""
+
+ if other_builds:
+ ret += """\
+<p>
+ Other build sets for this change:
+ <ul>{other_builds}</ul>
+</p>
+"""
+ if result:
+ ret += """\
+<p>
+ Reported result: <b>{result}</b>
+</p>
+"""
+
+ ret = ret.format(**locals())
+ return ret
+
+ def setResult(self, changeish, build):
+ if build.result != 'SUCCESS':
+ # Get a JobTree from a Job so we can find only its dependent jobs
+ root = self.getJobTree(changeish.project)
+ tree = root.getJobTreeForJob(build.job)
+ for job in tree.getJobs():
+ fakebuild = Build(job, None)
+ fakebuild.result = 'SKIPPED'
+ changeish.addBuild(fakebuild)
+
+
class ChangeQueue(object):
- def __init__(self, queue_name):
+ """DependentPipelines have multiple parallel queues shared by
+ different projects; this is one of them. For instance, there may
+ a queue shared by interrelated projects foo and bar, and a second
+ queue for independent project baz. Pipelines have one or more
+ PipelineQueues."""
+ def __init__(self, pipeline):
+ self.pipeline = pipeline
self.name = ''
- self.queue_name = queue_name
self.projects = []
self._jobs = set()
self.queue = []
def __repr__(self):
- return '<ChangeQueue %s: %s>' % (self.queue_name, self.name)
+ return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
def getJobs(self):
return self._jobs
@@ -36,7 +303,7 @@
names = [x.name for x in self.projects]
names.sort()
self.name = ', '.join(names)
- self._jobs |= set(project.getJobs(self.queue_name))
+ self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
def enqueueChange(self, change):
if self.queue:
@@ -54,6 +321,17 @@
self.addProject(project)
+class Project(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return '<Project %s>' % (self.name)
+
+
class Job(object):
def __init__(self, name):
# If you add attributes here, be sure to add them to the copy method.
@@ -88,119 +366,6 @@
return False
-class Build(object):
- def __init__(self, job, uuid):
- self.job = job
- self.uuid = uuid
- self.base_url = None
- self.url = None
- self.number = None
- self.result = None
- self.build_set = None
- self.launch_time = time.time()
-
- def __repr__(self):
- return '<Build %s of %s>' % (self.uuid, self.job.name)
-
- def formatDescription(self):
- concurrent_changes = ''
- concurrent_builds = ''
- other_builds = ''
-
- for change in self.build_set.other_changes:
- concurrent_changes += '<li><a href="{change.url}">\
- {change.number},{change.patchset}</a></li>'.format(
- change=change)
-
- change = self.build_set.change
-
- for build in self.build_set.getBuilds():
- if build.base_url:
- concurrent_builds += """\
-<li>
- <a href="{build.base_url}">
- {build.job.name} #{build.number}</a>: {build.result}
-</li>
-""".format(build=build)
- else:
- concurrent_builds += """\
-<li>
- {build.job.name}: {build.result}
-</li>""".format(build=build)
-
- if self.build_set.previous_build_set:
- build = self.build_set.previous_build_set.getBuild(self.job.name)
- if build:
- other_builds += """\
-<li>
- Preceded by: <a href="{build.base_url}">
- {build.job.name} #{build.number}</a>
-</li>
-""".format(build=build)
-
- if self.build_set.next_build_set:
- build = self.build_set.next_build_set.getBuild(self.job.name)
- if build:
- other_builds += """\
-<li>
- Succeeded by: <a href="{build.base_url}">
- {build.job.name} #{build.number}</a>
-</li>
-""".format(build=build)
-
- result = self.build_set.result
-
- if change.number:
- ret = """\
-<p>
- Triggered by change:
- <a href="{change.url}">{change.number},{change.patchset}</a><br/>
- Branch: <b>{change.branch}</b><br/>
- Pipeline: <b>{change.queue_name}</b>
-</p>"""
- else:
- ret = """\
-<p>
- Triggered by reference:
- {change.ref}</a><br/>
- Old revision: <b>{change.oldrev}</b><br/>
- New revision: <b>{change.newrev}</b><br/>
- Pipeline: <b>{change.queue_name}</b>
-</p>"""
-
- if concurrent_changes:
- ret += """\
-<p>
- Other changes tested concurrently with this change:
- <ul>{concurrent_changes}</ul>
-</p>
-"""
- if concurrent_builds:
- ret += """\
-<p>
- All builds for this change set:
- <ul>{concurrent_builds}</ul>
-</p>
-"""
-
- if other_builds:
- ret += """\
-<p>
- Other build sets for this change:
- <ul>{other_builds}</ul>
-</p>
-"""
- if result:
- ret += """\
-<p>
- Reported result: <b>{result}</b>
-</p>
-"""
-
- ret = ret.format(**locals())
- return ret
-
-
class JobTree(object):
""" A JobTree represents an instance of one Job, and holds JobTrees
whose jobs should be run if that Job succeeds. A root node of a
@@ -233,34 +398,19 @@
return None
-class Project(object):
- def __init__(self, name):
- self.name = name
- self.job_trees = {} # Queue -> JobTree
-
- def __str__(self):
- return self.name
+class Build(object):
+ def __init__(self, job, uuid):
+ self.job = job
+ self.uuid = uuid
+ self.base_url = None
+ self.url = None
+ self.number = None
+ self.result = None
+ self.build_set = None
+ self.launch_time = time.time()
def __repr__(self):
- return '<Project %s>' % (self.name)
-
- def addQueue(self, name):
- self.job_trees[name] = JobTree(None)
- return self.job_trees[name]
-
- def hasQueue(self, name):
- if name in self.job_trees:
- return True
- return False
-
- def getJobTreeForQueue(self, name):
- return self.job_trees.get(name, None)
-
- def getJobs(self, queue_name):
- tree = self.getJobTreeForQueue(queue_name)
- if not tree:
- return []
- return tree.getJobs()
+ return '<Build %s of %s>' % (self.uuid, self.job.name)
class BuildSet(object):
@@ -294,111 +444,24 @@
return [self.builds.get(x) for x in keys]
-class Change(object):
- def __init__(self, queue_name, project):
- self.queue_name = queue_name
- self.project = project
- self.branch = None
- self.number = None
- self.url = None
- self.patchset = None
- self.refspec = None
- self.ref = None
- self.oldrev = None
- self.newrev = None
- self.reported = False
- self.needs_change = None
- self.needed_by_changes = []
- self.is_current_patchset = True
- self.can_merge = False
- self.is_merged = False
+class Changeish(object):
+ """Something like a change; either a change or a ref"""
+ is_reportable = False
+ def __init__(self, project):
+ self.project = project
self.build_sets = []
self.change_ahead = None
self.change_behind = None
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
- def _id(self):
- if self.number:
- return '%s,%s' % (self.number, self.patchset)
- return self.newrev
-
- def __repr__(self):
- return '<Change 0x%x %s>' % (id(self), self._id())
-
def equals(self, other):
- if self.number:
- if (self.number == other.number and
- self.patchset == other.patchset):
- return True
- return False
- if self.ref:
- if (self.ref == other.ref and
- self.newrev == other.newrev):
- return True
- return False
- return False
+ raise NotImplementedError()
- def _filterJobs(self, jobs):
+ def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
- def formatStatus(self, indent=0, html=False):
- indent_str = ' ' * indent
- ret = ''
- if html and self.url is not None:
- ret += '%sProject %s change <a href="%s">%s</a>\n' % (indent_str,
- self.project.name,
- self.url,
- self._id())
- else:
- ret += '%sProject %s change %s\n' % (indent_str,
- self.project.name,
- self._id())
- for job in self._filterJobs(self.project.getJobs(self.queue_name)):
- build = self.current_build_set.getBuild(job.name)
- if build:
- result = build.result
- else:
- result = None
- job_name = job.name
- if html:
- if build:
- url = build.url
- else:
- url = None
- if url is not None:
- job_name = '<a href="%s">%s</a>' % (url, job_name)
- ret += '%s %s: %s' % (indent_str, job_name, result)
- ret += '\n'
- if self.change_ahead:
- ret += '%sWaiting on:\n' % (indent_str)
- ret += self.change_ahead.formatStatus(indent + 2, html)
- return ret
-
- def formatReport(self):
- ret = ''
- if self.didAllJobsSucceed():
- ret += 'Build successful\n\n'
- else:
- ret += 'Build failed\n\n'
-
- for job in self._filterJobs(self.project.getJobs(self.queue_name)):
- build = self.current_build_set.getBuild(job.name)
- result = build.result
- if result == 'SUCCESS' and job.success_message:
- result = job.success_message
- elif result == 'FAILURE' and job.failure_message:
- result = job.failure_message
- url = build.url
- if not url:
- url = job.name
- ret += '- %s : %s\n' % (url, result)
- return ret
-
- def setReportedResult(self, result):
- self.current_build_set.result = result
-
def resetAllBuilds(self):
old = self.current_build_set
self.current_build_set.result = 'CANCELED'
@@ -410,88 +473,6 @@
def addBuild(self, build):
self.current_build_set.addBuild(build)
- def setResult(self, build):
- if build.result != 'SUCCESS':
- # Get a JobTree from a Job so we can find only its dependent jobs
- root = self.project.getJobTreeForQueue(self.queue_name)
- tree = root.getJobTreeForJob(build.job)
- for job in tree.getJobs():
- fakebuild = Build(job, None)
- fakebuild.result = 'SKIPPED'
- self.addBuild(fakebuild)
-
- def isHoldingFollowingChanges(self):
- tree = self.project.getJobTreeForQueue(self.queue_name)
- for job in self._filterJobs(tree.getJobs()):
- if not job.hold_following_changes:
- continue
- build = self.current_build_set.getBuild(job.name)
- if not build:
- return True
- if build.result != 'SUCCESS':
- return True
- if not self.change_ahead:
- return False
- return self.change_ahead.isHoldingFollowingChanges()
-
- def _findJobsToRun(self, job_trees):
- torun = []
- if self.change_ahead:
- # Only run our jobs if any 'hold' jobs on the change ahead
- # have completed successfully.
- if self.change_ahead.isHoldingFollowingChanges():
- return []
- for tree in job_trees:
- job = tree.job
- if not job.changeMatches(self):
- continue
- result = None
- if job:
- build = self.current_build_set.getBuild(job.name)
- if build:
- result = build.result
- else:
- # There is no build for the root of this job tree,
- # so we should run it.
- torun.append(job)
- # If there is no job, this is a null job tree, and we should
- # run all of its jobs.
- if result == 'SUCCESS' or not job:
- torun.extend(self._findJobsToRun(tree.job_trees))
- return torun
-
- def findJobsToRun(self):
- tree = self.project.getJobTreeForQueue(self.queue_name)
- if not tree:
- return []
- return self._findJobsToRun(tree.job_trees)
-
- def areAllJobsComplete(self):
- tree = self.project.getJobTreeForQueue(self.queue_name)
- for job in self._filterJobs(tree.getJobs()):
- build = self.current_build_set.getBuild(job.name)
- if not build or not build.result:
- return False
- return True
-
- def didAllJobsSucceed(self):
- tree = self.project.getJobTreeForQueue(self.queue_name)
- for job in self._filterJobs(tree.getJobs()):
- build = self.current_build_set.getBuild(job.name)
- if not build:
- return False
- if build.result != 'SUCCESS':
- return False
- return True
-
- def didAnyJobFail(self):
- tree = self.project.getJobTreeForQueue(self.queue_name)
- for job in self._filterJobs(tree.getJobs()):
- build = self.current_build_set.getBuild(job.name)
- if build and build.result == 'FAILURE':
- return True
- return False
-
def delete(self):
if self.change_behind:
self.change_behind.change_ahead = None
@@ -499,6 +480,58 @@
self.queue.dequeueChange(self)
+class Change(Changeish):
+ is_reportable = True
+
+ def __init__(self, project):
+ super(Change, self).__init__(project)
+ self.branch = None
+ self.number = None
+ self.url = None
+ self.patchset = None
+ self.refspec = None
+
+ self.reported = False
+ self.needs_change = None
+ self.needed_by_changes = []
+ self.is_current_patchset = True
+ self.can_merge = False
+ self.is_merged = False
+
+ def _id(self):
+ if self.number:
+ return '%s,%s' % (self.number, self.patchset)
+ return self.newrev
+
+ def __repr__(self):
+ return '<Change 0x%x %s>' % (id(self), self._id())
+
+ def equals(self, other):
+ if (self.number == other.number and
+ self.patchset == other.patchset):
+ return True
+ return False
+
+ def setReportedResult(self, result):
+ self.current_build_set.result = result
+
+
+class Ref(Changeish):
+ is_reportable = False
+
+ def __init__(self, project):
+ super(Change, self).__init__(project)
+ self.ref = None
+ self.oldrev = None
+ self.newrev = None
+
+ def equals(self, other):
+ if (self.ref == other.ref and
+ self.newrev == other.newrev):
+ return True
+ return False
+
+
class TriggerEvent(object):
def __init__(self):
self.data = None
@@ -532,16 +565,15 @@
return ret
- def getChange(self, manager_name, project, trigger):
+ def getChange(self, project, trigger):
# TODO: make the scheduler deal with events (which may have
# changes) rather than changes so that we don't have to create
# "fake" changes for events that aren't associated with changes.
if self.change_number:
- change = trigger.getChange(self.change_number, self.patch_number,
- manager_name)
+ change = trigger.getChange(self.change_number, self.patch_number)
if self.ref:
- change = Change(manager_name, project)
+ change = Ref(project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 1aad0ff..ee32d31 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,7 +20,7 @@
import threading
import yaml
-from model import Job, Project, ChangeQueue, EventFilter
+from model import Pipeline, Job, Project, ChangeQueue, EventFilter
class Scheduler(threading.Thread):
@@ -43,7 +43,7 @@
self._init()
def _init(self):
- self.queue_managers = {}
+ self.pipelines = {}
self.jobs = {}
self.projects = {}
self.metajobs = {}
@@ -78,14 +78,16 @@
fn = os.path.expanduser(fn)
execfile(fn, self._config_env)
- for config_queue in data['queues']:
- manager = globals()[config_queue['manager']](self,
- config_queue['name'])
- self.queue_managers[config_queue['name']] = manager
- manager.success_action = config_queue.get('success')
- manager.failure_action = config_queue.get('failure')
- manager.start_action = config_queue.get('start')
- for trigger in toList(config_queue['trigger']):
+ for conf_pipeline in data.get('pipelines', []):
+ pipeline = Pipeline(conf_pipeline['name'])
+ manager = globals()[conf_pipeline['manager']](self, pipeline)
+ pipeline.setManager(manager)
+
+ self.pipelines[conf_pipeline['name']] = pipeline
+ manager.success_action = conf_pipeline.get('success')
+ manager.failure_action = conf_pipeline.get('failure')
+ manager.start_action = conf_pipeline.get('start')
+ for trigger in toList(conf_pipeline['trigger']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
for k, v in approval_dict.items():
@@ -95,7 +97,7 @@
refs=toList(trigger.get('ref')),
approvals=approvals,
comment_filters=toList(
- trigger.get('comment_filter')))
+ trigger.get('comment_filter')))
manager.event_filters.append(f)
for config_job in data['jobs']:
@@ -137,10 +139,10 @@
for config_project in data['projects']:
project = Project(config_project['name'])
self.projects[config_project['name']] = project
- for qname in self.queue_managers.keys():
- if qname in config_project:
- job_tree = project.addQueue(qname)
- config_jobs = config_project[qname]
+ for pipeline in self.pipelines.values():
+ if pipeline.name in config_project:
+ job_tree = pipeline.addProject(project)
+ config_jobs = config_project[pipeline.name]
add_jobs(job_tree, config_jobs)
# All jobs should be defined at this point, get rid of
@@ -149,8 +151,8 @@
# TODO(jeblair): check that we don't end up with jobs like
# "foo - bar" because a ':' is missing in the yaml for a dependent job
- for manager in self.queue_managers.values():
- manager._postConfig()
+ for pipeline in self.pipelines.values():
+ pipeline.manager._postConfig()
def getJob(self, name):
if name in self.jobs:
@@ -275,9 +277,9 @@
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
- for manager in self.queue_managers.values():
- for build in manager.building_jobs.keys():
- self.log.debug("%s waiting on %s" % (manager, build))
+ for pipeline in self.pipelines.values():
+ for build in pipeline.manager.building_jobs.keys():
+ self.log.debug("%s waiting on %s" % (pipeline.manager, build))
waiting = True
if not waiting:
self.log.debug("All builds are complete")
@@ -325,49 +327,49 @@
self.log.warning("Project %s not found" % event.project_name)
return
- for manager in self.queue_managers.values():
- if not manager.eventMatches(event):
- self.log.debug("Event %s ignored by %s" % (event, manager))
+ for pipeline in self.pipelines.values():
+ if not pipeline.manager.eventMatches(event):
+ self.log.debug("Event %s ignored by %s" % (event, pipeline))
continue
- change = event.getChange(manager.name, project, self.trigger)
+ change = event.getChange(project, self.trigger)
self.log.info("Adding %s, %s to %s" %
- (project, change, manager))
- manager.addChange(change)
+ (project, change, pipeline))
+ pipeline.manager.addChange(change)
def process_result_queue(self):
self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get()
self.log.debug("Processing result event %s" % build)
- for manager in self.queue_managers.values():
+ for pipeline in self.pipelines.values():
if event_type == 'started':
- if manager.onBuildStarted(build):
+ if pipeline.manager.onBuildStarted(build):
return
elif event_type == 'completed':
- if manager.onBuildCompleted(build):
+ if pipeline.manager.onBuildCompleted(build):
return
self.log.warning("Build %s not found by any queue manager" % (build))
def formatStatusHTML(self):
ret = '<html><pre>'
- keys = self.queue_managers.keys()
+ keys = self.pipelines.keys()
keys.sort()
for key in keys:
- manager = self.queue_managers[key]
- s = 'Queue: %s' % manager.name
+ pipeline = self.pipelines[key]
+ s = 'Pipeline: %s' % pipeline.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
- ret += manager.formatStatusHTML()
+ ret += pipeline.formatStatusHTML()
ret += '\n'
ret += '</pre></html>'
return ret
-class BaseQueueManager(object):
- log = logging.getLogger("zuul.BaseQueueManager")
+class BasePipelineManager(object):
+ log = logging.getLogger("zuul.BasePipelineManager")
- def __init__(self, sched, name):
+ def __init__(self, sched, pipeline):
self.sched = sched
- self.name = name
+ self.pipeline = pipeline
self.building_jobs = {}
self.event_filters = []
self.success_action = {}
@@ -378,7 +380,7 @@
return "<%s %s>" % (self.__class__.__name__, self.name)
def _postConfig(self):
- self.log.info("Configured Queue Manager %s" % self.name)
+ self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Events:")
for e in self.event_filters:
self.log.info(" %s" % e)
@@ -403,9 +405,10 @@
log_jobs(x, indent + 2)
for p in self.sched.projects.values():
- if p.hasQueue(self.name):
+ tree = self.pipeline.getJobTree(p)
+ if tree:
self.log.info(" %s" % p)
- log_jobs(p.getJobTreeForQueue(self.name))
+ log_jobs(tree)
if self.start_action:
self.log.info(" On start:")
self.log.info(" %s" % self.start_action)
@@ -421,7 +424,10 @@
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
- return self.success_action.keys()
+ if self.success_action:
+ return self.success_action.keys()
+ else:
+ return {}
def eventMatches(self, event):
for ef in self.event_filters:
@@ -447,7 +453,7 @@
try:
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
- msg = "Starting %s jobs." % self.name
+ msg = "Starting %s jobs." % self.pipeline.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
@@ -458,7 +464,7 @@
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
- for job in change.findJobsToRun():
+ for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job, change)
@@ -472,12 +478,12 @@
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
- desc = build.formatDescription()
+ desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
- desc = build.formatDescription()
+ desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
def onBuildStarted(self, build):
@@ -504,11 +510,11 @@
del self.building_jobs[build]
- change.setResult(build)
+ self.pipeline.setResult(change, build)
self.log.info("Change %s status is now:\n %s" %
- (change, change.formatStatus()))
+ (change, self.pipeline.formatStatus(change)))
- if change.areAllJobsComplete():
+ if self.pipeline.areAllJobsComplete(change):
self.log.debug("All jobs for change %s are complete" % change)
self.possiblyReportChange(change)
else:
@@ -525,11 +531,13 @@
self.reportChange(change)
def reportChange(self, change):
- self.log.debug("Reporting change %s" % change)
+ if not change.is_reportable:
+ return False
if change.reported:
return 0
+ self.log.debug("Reporting change %s" % change)
ret = None
- if change.didAllJobsSucceed():
+ if self.pipeline.didAllJobsSucceed(change):
action = self.success_action
change.setReportedResult('SUCCESS')
else:
@@ -539,7 +547,7 @@
self.log.info("Reporting change %s, action: %s" %
(change, action))
ret = self.sched.trigger.report(change,
- change.formatReport(),
+ self.pipeline.formatReport(change),
action)
if ret:
self.log.error("Reporting change %s received: %s" %
@@ -563,35 +571,34 @@
changes = self.getChangesInQueue()
ret = ''
for change in changes:
- ret += change.formatStatus(html=True)
+ ret += self.pipeline.formatStatus(change, html=True)
return ret
-class IndependentQueueManager(BaseQueueManager):
- log = logging.getLogger("zuul.IndependentQueueManager")
+class IndependentPipelineManager(BasePipelineManager):
+ log = logging.getLogger("zuul.IndependentPipelineManager")
-class DependentQueueManager(BaseQueueManager):
- log = logging.getLogger("zuul.DependentQueueManager")
+class DependentPipelineManager(BasePipelineManager):
+ log = logging.getLogger("zuul.DependentPipelineManager")
def __init__(self, *args, **kwargs):
- super(DependentQueueManager, self).__init__(*args, **kwargs)
+ super(DependentPipelineManager, self).__init__(*args, **kwargs)
self.change_queues = []
def _postConfig(self):
- super(DependentQueueManager, self)._postConfig()
+ super(DependentPipelineManager, self)._postConfig()
self.buildChangeQueues()
def buildChangeQueues(self):
self.log.debug("Building shared change queues")
change_queues = []
- for project in self.sched.projects.values():
- if project.hasQueue(self.name):
- change_queue = ChangeQueue(self.name)
- change_queue.addProject(project)
- change_queues.append(change_queue)
- self.log.debug("Created queue: %s" % change_queue)
+ for project in self.pipeline.getProjects():
+ change_queue = ChangeQueue(self.pipeline)
+ change_queue.addProject(project)
+ change_queues.append(change_queue)
+ self.log.debug("Created queue: %s" % change_queue)
self.log.debug("Combining shared queues")
new_change_queues = []
@@ -622,6 +629,9 @@
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
+ if not hasattr(change, 'needs_change'):
+ self.log.debug(" Changeish does not support dependencies")
+ return True
if not change.needs_change:
self.log.debug(" No changes needed")
return True
@@ -635,7 +645,8 @@
if change.needs_change in change_queue.queue:
self.log.debug(" Needed change is already ahead in the queue")
return True
- if change.needs_change.can_merge:
+ if self.sched.trigger.canMerge(change.needs_change,
+ self.getSubmitAllowNeeds()):
# It can merge, so attempt to enqueue it _ahead_ of this change.
# If that works we can enqueue this change, otherwise, we can't.
self.log.debug(" Change %s must be merged ahead of %s" %
@@ -649,8 +660,12 @@
def _checkForChangesNeeding(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
+ if not hasattr(change, 'needed_by_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return to_enqueue
for needs in change.needed_by_changes:
- if needs.can_merge:
+ if self.sched.trigger.canMerge(needs,
+ self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
@@ -664,7 +679,8 @@
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
- if not change.can_merge:
+ if not self.sched.trigger.canMerge(change,
+ self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
@@ -705,7 +721,7 @@
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
dependent_changes = self._getDependentChanges(change)
- for job in change.findJobsToRun():
+ for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job,
@@ -749,9 +765,10 @@
def onBuildCompleted(self, build):
change = self.building_jobs.get(build)
- if not super(DependentQueueManager, self).onBuildCompleted(build):
+ if not super(DependentPipelineManager, self).onBuildCompleted(build):
return False
- if change and change.change_behind and change.didAnyJobFail():
+ if (change and change.change_behind and
+ self.pipeline.didAnyJobFail(change)):
# This or some other build failed. All changes behind this change
# will need to be retested. To free up resources cancel the builds
# behind this one as they will be rerun anyways.
@@ -775,7 +792,7 @@
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(change, change.branch)
- succeeded = change.didAllJobsSucceed()
+ succeeded = self.pipeline.didAllJobsSucceed(change)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
@@ -790,7 +807,7 @@
self.cancelJobs(change_behind)
self.launchJobs(change_behind)
# If the change behind this is ready, notify
- if change_behind and change_behind.areAllJobsComplete():
+ if change_behind and self.pipeline.areAllJobsComplete(change_behind):
self.log.info("Change %s behind change %s is ready, "
"possibly reporting" % (change_behind, change))
self.possiblyReportChange(change_behind)
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 721be7b..003e96a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -170,7 +170,7 @@
if status == 'MERGED' or status == 'SUBMITTED':
return True
- def _canMerge(self, change, allow_needs):
+ def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
@@ -206,15 +206,13 @@
return False
return True
- def getChange(self, number, patchset, queue_name, changes=None):
- # TODO: queue_name is screwing up the data model, refactor
- # the queue context so it isn't necessary.
+ def getChange(self, number, patchset, changes=None):
self.log.info("Getting information for %s,%s" % (number, patchset))
if changes is None:
changes = {}
data = self.gerrit.query(number)
project = self.sched.projects[data['project']]
- change = Change(queue_name, project)
+ change = Change(project)
change._data = data
change.number = number
@@ -233,9 +231,6 @@
else:
change.is_current_patchset = False
- manager = self.sched.queue_managers[queue_name]
- change.can_merge = self._canMerge(change,
- manager.getSubmitAllowNeeds())
change.is_merged = self._isMerged(change)
if change.is_merged:
# This change is merged, so we don't need to look any further
@@ -249,7 +244,7 @@
key = '%s,%s' % (num, ps)
if key in changes:
return changes.get(key)
- c = self.getChange(num, ps, queue_name, changes)
+ c = self.getChange(num, ps, changes)
return c
if 'dependsOn' in data: