Add pipelines to data model.

This is a refactoring of the data model with the following goals:

 * Call top-level queues pipelines -- because too many other things
   are already called queues.  Pipelines convey the idea that there
   are a number of tasks to be performed (jobs), and that those
   tasks can be applied to different changes in parallel.
 * Eliminate references to queue_name from within a Change.
   Instead, methods that need to understand the pipeline that were
   accessed previously via the change are now located in the
   Pipeline class, taking a change as an argument.  Essentially,
   many methods involving changes (and builds, jobs, etc) must now
   be called in the context of a pipeline.
 * Add a changeish object to encompass the things that change and
   ref events have in common.

Change-Id: Iaf8ed0991f3c5b2bf7ded2c340a60725f7f98eaf
Reviewed-on: https://review.openstack.org/10757
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 9f00375..5e2049b 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -1,6 +1,6 @@
-queues:
+pipelines:
   - name: check
-    manager: IndependentQueueManager
+    manager: IndependentPipelineManager
     trigger:
       - event: patchset-uploaded
     success:
@@ -9,13 +9,13 @@
       verified: -1
 
   - name: post
-    manager: IndependentQueueManager
+    manager: IndependentPipelineManager
     trigger:
       - event: ref-updated
         ref: ^(?!refs/).*$
 
   - name: gate
-    manager: DependentQueueManager
+    manager: DependentPipelineManager
     trigger:
       - event: comment-added
         approval: 
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a61e13c..fae05d0 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -916,15 +916,16 @@
         "Test that whether a change is ready to merge"
         # TODO: move to test_gerrit (this is a unit test!)
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        a = self.sched.trigger.getChange(1, 2, 'gate')
-        assert not a.can_merge
+        a = self.sched.trigger.getChange(1, 2)
+        mgr = self.sched.pipelines['gate'].manager
+        assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
 
         A.addApproval('CRVW', 2)
-        a = self.sched.trigger.getChange(1, 2, 'gate')
-        assert not a.can_merge
+        a = self.sched.trigger.getChange(1, 2)
+        assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
 
         A.addApproval('APRV', 1)
-        a = self.sched.trigger.getChange(1, 2, 'gate')
-        assert a.can_merge
+        a = self.sched.trigger.getChange(1, 2)
+        assert self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
 
         return True
diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py
index 805657e..542b1c4 100644
--- a/zuul/launcher/jenkins.py
+++ b/zuul/launcher/jenkins.py
@@ -211,13 +211,13 @@
         uuid = str(uuid1())
         params = dict(UUID=uuid,
                       GERRIT_PROJECT=change.project.name)
-        if change.refspec:
+        if hasattr(change, 'refspec'):
             changes_str = '^'.join(
                 ['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
                  for c in dependent_changes + [change]])
             params['GERRIT_BRANCH'] = change.branch
             params['GERRIT_CHANGES'] = changes_str
-        if change.ref:
+        if hasattr(change, 'ref'):
             params['GERRIT_REFNAME'] = change.ref
             params['GERRIT_OLDREV'] = change.oldrev
             params['GERRIT_NEWREV'] = change.newrev
diff --git a/zuul/model.py b/zuul/model.py
index b0c47c8..0839668 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -16,16 +16,283 @@
 import time
 
 
+class Pipeline(object):
+    """A top-level pipeline such as check, gate, post, etc."""
+    def __init__(self, name):
+        self.name = name
+        self.job_trees = {}  # project -> JobTree
+        self.manager = None
+
+    def setManager(self, manager):
+        self.manager = manager
+
+    def addProject(self, project):
+        job_tree = JobTree(None)  # Null job == job tree root
+        self.job_trees[project] = job_tree
+        return job_tree
+
+    def getProjects(self):
+        return self.job_trees.keys()
+
+    def getJobTree(self, project):
+        tree = self.job_trees.get(project)
+        return tree
+
+    def getJobs(self, changeish):
+        tree = self.job_trees[changeish.project]
+        if not tree:
+            return []
+        return changeish.filterJobs(tree.getJobs())
+
+    def _findJobsToRun(self, job_trees, changeish):
+        torun = []
+        if changeish.change_ahead:
+            # Only run jobs if any 'hold' jobs on the change ahead
+            # have completed successfully.
+            if self.isHoldingFollowingChanges(changeish.change_ahead):
+                return []
+        for tree in job_trees:
+            job = tree.job
+            result = None
+            if job:
+                if not job.changeMatches(changeish):
+                    continue
+                build = changeish.current_build_set.getBuild(job.name)
+                if build:
+                    result = build.result
+                else:
+                    # There is no build for the root of this job tree,
+                    # so we should run it.
+                    torun.append(job)
+            # If there is no job, this is a null job tree, and we should
+            # run all of its jobs.
+            if result == 'SUCCESS' or not job:
+                torun.extend(self._findJobsToRun(tree.job_trees, changeish))
+        return torun
+
+    def findJobsToRun(self, changeish):
+        tree = self.job_trees[changeish.project]
+        if not tree:
+            return []
+        return self._findJobsToRun(tree.job_trees, changeish)
+
+    def areAllJobsComplete(self, changeish):
+        for job in self.getJobs(changeish):
+            build = changeish.current_build_set.getBuild(job.name)
+            if not build or not build.result:
+                return False
+        return True
+
+    def didAllJobsSucceed(self, changeish):
+        for job in self.getJobs(changeish):
+            build = changeish.current_build_set.getBuild(job.name)
+            if not build:
+                return False
+            if build.result != 'SUCCESS':
+                return False
+        return True
+
+    def didAnyJobFail(self, changeish):
+        for job in self.getJobs(changeish):
+            build = changeish.current_build_set.getBuild(job.name)
+            if build and build.result == 'FAILURE':
+                return True
+        return False
+
+    def isHoldingFollowingChanges(self, changeish):
+        for job in self.getJobs(changeish):
+            if not job.hold_following_changes:
+                continue
+            build = changeish.current_build_set.getBuild(job.name)
+            if not build:
+                return True
+            if build.result != 'SUCCESS':
+                return True
+        if not changeish.change_ahead:
+            return False
+        return self.isHoldingFollowingChanges(changeish.change_ahead)
+
+    def formatStatus(self, changeish, indent=0, html=False):
+        indent_str = ' ' * indent
+        ret = ''
+        if html and changeish.url is not None:
+            ret += '%sProject %s change <a href="%s">%s</a>\n' % (
+                indent_str,
+                changeish.project.name,
+                changeish.url,
+                changeish._id())
+        else:
+            ret += '%sProject %s change %s\n' % (indent_str,
+                                                 changeish.project.name,
+                                                 changeish._id())
+        for job in self.getJobs(changeish):
+            build = changeish.current_build_set.getBuild(job.name)
+            if build:
+                result = build.result
+            else:
+                result = None
+            job_name = job.name
+            if html:
+                if build:
+                    url = build.url
+                else:
+                    url = None
+                if url is not None:
+                    job_name = '<a href="%s">%s</a>' % (url, job_name)
+            ret += '%s  %s: %s' % (indent_str, job_name, result)
+            ret += '\n'
+        if changeish.change_ahead:
+            ret += '%sWaiting on:\n' % (indent_str)
+            ret += self.formatStatus(changeish.change_ahead,
+                                     indent + 2, html)
+        return ret
+
+    def formatReport(self, changeish):
+        ret = ''
+        if self.didAllJobsSucceed(changeish):
+            ret += 'Build successful\n\n'
+        else:
+            ret += 'Build failed\n\n'
+
+        for job in self.getJobs(changeish):
+            build = changeish.current_build_set.getBuild(job.name)
+            result = build.result
+            if result == 'SUCCESS' and job.success_message:
+                result = job.success_message
+            elif result == 'FAILURE' and job.failure_message:
+                result = job.failure_message
+            url = build.url
+            if not url:
+                url = job.name
+            ret += '- %s : %s\n' % (url, result)
+        return ret
+
+    def formatDescription(self, build):
+        concurrent_changes = ''
+        concurrent_builds = ''
+        other_builds = ''
+
+        for change in build.build_set.other_changes:
+            concurrent_changes += '<li><a href="{change.url}">\
+              {change.number},{change.patchset}</a></li>'.format(
+                change=change)
+
+        change = build.build_set.change
+
+        for build in build.build_set.getBuilds():
+            if build.base_url:
+                concurrent_builds += """\
+<li>
+  <a href="{build.base_url}">
+  {build.job.name} #{build.number}</a>: {build.result}
+</li>
+""".format(build=build)
+            else:
+                concurrent_builds += """\
+<li>
+  {build.job.name}: {build.result}
+</li>""".format(build=build)
+
+        if build.build_set.previous_build_set:
+            other_build = build.build_set.previous_build_set.getBuild(
+                build.job.name)
+            if other_build:
+                other_builds += """\
+<li>
+  Preceded by: <a href="{build.base_url}">
+  {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+        if build.build_set.next_build_set:
+            other_build = build.build_set.next_build_set.getBuild(
+                build.job.name)
+            if other_build:
+                other_builds += """\
+<li>
+  Succeeded by: <a href="{build.base_url}">
+  {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+        result = build.build_set.result
+
+        if change.number:
+            ret = """\
+<p>
+  Triggered by change:
+    <a href="{change.url}">{change.number},{change.patchset}</a><br/>
+  Branch: <b>{change.branch}</b><br/>
+  Pipeline: <b>{self.name}</b>
+</p>"""
+        else:
+            ret = """\
+<p>
+  Triggered by reference:
+    {change.ref}</a><br/>
+  Old revision: <b>{change.oldrev}</b><br/>
+  New revision: <b>{change.newrev}</b><br/>
+  Pipeline: <b>{self.name}</b>
+</p>"""
+
+        if concurrent_changes:
+            ret += """\
+<p>
+  Other changes tested concurrently with this change:
+  <ul>{concurrent_changes}</ul>
+</p>
+"""
+        if concurrent_builds:
+            ret += """\
+<p>
+  All builds for this change set:
+  <ul>{concurrent_builds}</ul>
+</p>
+"""
+
+        if other_builds:
+            ret += """\
+<p>
+  Other build sets for this change:
+  <ul>{other_builds}</ul>
+</p>
+"""
+        if result:
+            ret += """\
+<p>
+  Reported result: <b>{result}</b>
+</p>
+"""
+
+        ret = ret.format(**locals())
+        return ret
+
+    def setResult(self, changeish, build):
+        if build.result != 'SUCCESS':
+            # Get a JobTree from a Job so we can find only its dependent jobs
+            root = self.getJobTree(changeish.project)
+            tree = root.getJobTreeForJob(build.job)
+            for job in tree.getJobs():
+                fakebuild = Build(job, None)
+                fakebuild.result = 'SKIPPED'
+                changeish.addBuild(fakebuild)
+
+
 class ChangeQueue(object):
-    def __init__(self, queue_name):
+    """DependentPipelines have multiple parallel queues shared by
+    different projects; this is one of them.  For instance, there may
+    a queue shared by interrelated projects foo and bar, and a second
+    queue for independent project baz.  Pipelines have one or more
+    PipelineQueues."""
+    def __init__(self, pipeline):
+        self.pipeline = pipeline
         self.name = ''
-        self.queue_name = queue_name
         self.projects = []
         self._jobs = set()
         self.queue = []
 
     def __repr__(self):
-        return '<ChangeQueue %s: %s>' % (self.queue_name, self.name)
+        return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
 
     def getJobs(self):
         return self._jobs
@@ -36,7 +303,7 @@
             names = [x.name for x in self.projects]
             names.sort()
             self.name = ', '.join(names)
-            self._jobs |= set(project.getJobs(self.queue_name))
+            self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
 
     def enqueueChange(self, change):
         if self.queue:
@@ -54,6 +321,17 @@
             self.addProject(project)
 
 
+class Project(object):
+    def __init__(self, name):
+        self.name = name
+
+    def __str__(self):
+        return self.name
+
+    def __repr__(self):
+        return '<Project %s>' % (self.name)
+
+
 class Job(object):
     def __init__(self, name):
         # If you add attributes here, be sure to add them to the copy method.
@@ -88,119 +366,6 @@
         return False
 
 
-class Build(object):
-    def __init__(self, job, uuid):
-        self.job = job
-        self.uuid = uuid
-        self.base_url = None
-        self.url = None
-        self.number = None
-        self.result = None
-        self.build_set = None
-        self.launch_time = time.time()
-
-    def __repr__(self):
-        return '<Build %s of %s>' % (self.uuid, self.job.name)
-
-    def formatDescription(self):
-        concurrent_changes = ''
-        concurrent_builds = ''
-        other_builds = ''
-
-        for change in self.build_set.other_changes:
-            concurrent_changes += '<li><a href="{change.url}">\
-              {change.number},{change.patchset}</a></li>'.format(
-                change=change)
-
-        change = self.build_set.change
-
-        for build in self.build_set.getBuilds():
-            if build.base_url:
-                concurrent_builds += """\
-<li>
-  <a href="{build.base_url}">
-  {build.job.name} #{build.number}</a>: {build.result}
-</li>
-""".format(build=build)
-            else:
-                concurrent_builds += """\
-<li>
-  {build.job.name}: {build.result}
-</li>""".format(build=build)
-
-        if self.build_set.previous_build_set:
-            build = self.build_set.previous_build_set.getBuild(self.job.name)
-            if build:
-                other_builds += """\
-<li>
-  Preceded by: <a href="{build.base_url}">
-  {build.job.name} #{build.number}</a>
-</li>
-""".format(build=build)
-
-        if self.build_set.next_build_set:
-            build = self.build_set.next_build_set.getBuild(self.job.name)
-            if build:
-                other_builds += """\
-<li>
-  Succeeded by: <a href="{build.base_url}">
-  {build.job.name} #{build.number}</a>
-</li>
-""".format(build=build)
-
-        result = self.build_set.result
-
-        if change.number:
-            ret = """\
-<p>
-  Triggered by change:
-    <a href="{change.url}">{change.number},{change.patchset}</a><br/>
-  Branch: <b>{change.branch}</b><br/>
-  Pipeline: <b>{change.queue_name}</b>
-</p>"""
-        else:
-            ret = """\
-<p>
-  Triggered by reference:
-    {change.ref}</a><br/>
-  Old revision: <b>{change.oldrev}</b><br/>
-  New revision: <b>{change.newrev}</b><br/>
-  Pipeline: <b>{change.queue_name}</b>
-</p>"""
-
-        if concurrent_changes:
-            ret += """\
-<p>
-  Other changes tested concurrently with this change:
-  <ul>{concurrent_changes}</ul>
-</p>
-"""
-        if concurrent_builds:
-            ret += """\
-<p>
-  All builds for this change set:
-  <ul>{concurrent_builds}</ul>
-</p>
-"""
-
-        if other_builds:
-            ret += """\
-<p>
-  Other build sets for this change:
-  <ul>{other_builds}</ul>
-</p>
-"""
-        if result:
-            ret += """\
-<p>
-  Reported result: <b>{result}</b>
-</p>
-"""
-
-        ret = ret.format(**locals())
-        return ret
-
-
 class JobTree(object):
     """ A JobTree represents an instance of one Job, and holds JobTrees
     whose jobs should be run if that Job succeeds.  A root node of a
@@ -233,34 +398,19 @@
         return None
 
 
-class Project(object):
-    def __init__(self, name):
-        self.name = name
-        self.job_trees = {}  # Queue -> JobTree
-
-    def __str__(self):
-        return self.name
+class Build(object):
+    def __init__(self, job, uuid):
+        self.job = job
+        self.uuid = uuid
+        self.base_url = None
+        self.url = None
+        self.number = None
+        self.result = None
+        self.build_set = None
+        self.launch_time = time.time()
 
     def __repr__(self):
-        return '<Project %s>' % (self.name)
-
-    def addQueue(self, name):
-        self.job_trees[name] = JobTree(None)
-        return self.job_trees[name]
-
-    def hasQueue(self, name):
-        if name in self.job_trees:
-            return True
-        return False
-
-    def getJobTreeForQueue(self, name):
-        return self.job_trees.get(name, None)
-
-    def getJobs(self, queue_name):
-        tree = self.getJobTreeForQueue(queue_name)
-        if not tree:
-            return []
-        return tree.getJobs()
+        return '<Build %s of %s>' % (self.uuid, self.job.name)
 
 
 class BuildSet(object):
@@ -294,111 +444,24 @@
         return [self.builds.get(x) for x in keys]
 
 
-class Change(object):
-    def __init__(self, queue_name, project):
-        self.queue_name = queue_name
-        self.project = project
-        self.branch = None
-        self.number = None
-        self.url = None
-        self.patchset = None
-        self.refspec = None
-        self.ref = None
-        self.oldrev = None
-        self.newrev = None
-        self.reported = False
-        self.needs_change = None
-        self.needed_by_changes = []
-        self.is_current_patchset = True
-        self.can_merge = False
-        self.is_merged = False
+class Changeish(object):
+    """Something like a change; either a change or a ref"""
+    is_reportable = False
 
+    def __init__(self, project):
+        self.project = project
         self.build_sets = []
         self.change_ahead = None
         self.change_behind = None
         self.current_build_set = BuildSet(self)
         self.build_sets.append(self.current_build_set)
 
-    def _id(self):
-        if self.number:
-            return '%s,%s' % (self.number, self.patchset)
-        return self.newrev
-
-    def __repr__(self):
-        return '<Change 0x%x %s>' % (id(self), self._id())
-
     def equals(self, other):
-        if self.number:
-            if (self.number == other.number and
-                self.patchset == other.patchset):
-                return True
-            return False
-        if self.ref:
-            if (self.ref == other.ref and
-                self.newrev == other.newrev):
-                return True
-            return False
-        return False
+        raise NotImplementedError()
 
-    def _filterJobs(self, jobs):
+    def filterJobs(self, jobs):
         return filter(lambda job: job.changeMatches(self), jobs)
 
-    def formatStatus(self, indent=0, html=False):
-        indent_str = ' ' * indent
-        ret = ''
-        if html and self.url is not None:
-            ret += '%sProject %s change <a href="%s">%s</a>\n' % (indent_str,
-                                                            self.project.name,
-                                                            self.url,
-                                                            self._id())
-        else:
-            ret += '%sProject %s change %s\n' % (indent_str,
-                                                 self.project.name,
-                                                 self._id())
-        for job in self._filterJobs(self.project.getJobs(self.queue_name)):
-            build = self.current_build_set.getBuild(job.name)
-            if build:
-                result = build.result
-            else:
-                result = None
-            job_name = job.name
-            if html:
-                if build:
-                    url = build.url
-                else:
-                    url = None
-                if url is not None:
-                    job_name = '<a href="%s">%s</a>' % (url, job_name)
-            ret += '%s  %s: %s' % (indent_str, job_name, result)
-            ret += '\n'
-        if self.change_ahead:
-            ret += '%sWaiting on:\n' % (indent_str)
-            ret += self.change_ahead.formatStatus(indent + 2, html)
-        return ret
-
-    def formatReport(self):
-        ret = ''
-        if self.didAllJobsSucceed():
-            ret += 'Build successful\n\n'
-        else:
-            ret += 'Build failed\n\n'
-
-        for job in self._filterJobs(self.project.getJobs(self.queue_name)):
-            build = self.current_build_set.getBuild(job.name)
-            result = build.result
-            if result == 'SUCCESS' and job.success_message:
-                result = job.success_message
-            elif result == 'FAILURE' and job.failure_message:
-                result = job.failure_message
-            url = build.url
-            if not url:
-                url = job.name
-            ret += '- %s : %s\n' % (url, result)
-        return ret
-
-    def setReportedResult(self, result):
-        self.current_build_set.result = result
-
     def resetAllBuilds(self):
         old = self.current_build_set
         self.current_build_set.result = 'CANCELED'
@@ -410,88 +473,6 @@
     def addBuild(self, build):
         self.current_build_set.addBuild(build)
 
-    def setResult(self, build):
-        if build.result != 'SUCCESS':
-            # Get a JobTree from a Job so we can find only its dependent jobs
-            root = self.project.getJobTreeForQueue(self.queue_name)
-            tree = root.getJobTreeForJob(build.job)
-            for job in tree.getJobs():
-                fakebuild = Build(job, None)
-                fakebuild.result = 'SKIPPED'
-                self.addBuild(fakebuild)
-
-    def isHoldingFollowingChanges(self):
-        tree = self.project.getJobTreeForQueue(self.queue_name)
-        for job in self._filterJobs(tree.getJobs()):
-            if not job.hold_following_changes:
-                continue
-            build = self.current_build_set.getBuild(job.name)
-            if not build:
-                return True
-            if build.result != 'SUCCESS':
-                return True
-        if not self.change_ahead:
-            return False
-        return self.change_ahead.isHoldingFollowingChanges()
-
-    def _findJobsToRun(self, job_trees):
-        torun = []
-        if self.change_ahead:
-            # Only run our jobs if any 'hold' jobs on the change ahead
-            # have completed successfully.
-            if self.change_ahead.isHoldingFollowingChanges():
-                return []
-        for tree in job_trees:
-            job = tree.job
-            if not job.changeMatches(self):
-                continue
-            result = None
-            if job:
-                build = self.current_build_set.getBuild(job.name)
-                if build:
-                    result = build.result
-                else:
-                    # There is no build for the root of this job tree,
-                    # so we should run it.
-                    torun.append(job)
-            # If there is no job, this is a null job tree, and we should
-            # run all of its jobs.
-            if result == 'SUCCESS' or not job:
-                torun.extend(self._findJobsToRun(tree.job_trees))
-        return torun
-
-    def findJobsToRun(self):
-        tree = self.project.getJobTreeForQueue(self.queue_name)
-        if not tree:
-            return []
-        return self._findJobsToRun(tree.job_trees)
-
-    def areAllJobsComplete(self):
-        tree = self.project.getJobTreeForQueue(self.queue_name)
-        for job in self._filterJobs(tree.getJobs()):
-            build = self.current_build_set.getBuild(job.name)
-            if not build or not build.result:
-                return False
-        return True
-
-    def didAllJobsSucceed(self):
-        tree = self.project.getJobTreeForQueue(self.queue_name)
-        for job in self._filterJobs(tree.getJobs()):
-            build = self.current_build_set.getBuild(job.name)
-            if not build:
-                return False
-            if build.result != 'SUCCESS':
-                return False
-        return True
-
-    def didAnyJobFail(self):
-        tree = self.project.getJobTreeForQueue(self.queue_name)
-        for job in self._filterJobs(tree.getJobs()):
-            build = self.current_build_set.getBuild(job.name)
-            if build and build.result == 'FAILURE':
-                return True
-        return False
-
     def delete(self):
         if self.change_behind:
             self.change_behind.change_ahead = None
@@ -499,6 +480,58 @@
         self.queue.dequeueChange(self)
 
 
+class Change(Changeish):
+    is_reportable = True
+
+    def __init__(self, project):
+        super(Change, self).__init__(project)
+        self.branch = None
+        self.number = None
+        self.url = None
+        self.patchset = None
+        self.refspec = None
+
+        self.reported = False
+        self.needs_change = None
+        self.needed_by_changes = []
+        self.is_current_patchset = True
+        self.can_merge = False
+        self.is_merged = False
+
+    def _id(self):
+        if self.number:
+            return '%s,%s' % (self.number, self.patchset)
+        return self.newrev
+
+    def __repr__(self):
+        return '<Change 0x%x %s>' % (id(self), self._id())
+
+    def equals(self, other):
+        if (self.number == other.number and
+            self.patchset == other.patchset):
+            return True
+        return False
+
+    def setReportedResult(self, result):
+        self.current_build_set.result = result
+
+
+class Ref(Changeish):
+    is_reportable = False
+
+    def __init__(self, project):
+        super(Change, self).__init__(project)
+        self.ref = None
+        self.oldrev = None
+        self.newrev = None
+
+    def equals(self, other):
+        if (self.ref == other.ref and
+            self.newrev == other.newrev):
+            return True
+        return False
+
+
 class TriggerEvent(object):
     def __init__(self):
         self.data = None
@@ -532,16 +565,15 @@
 
         return ret
 
-    def getChange(self, manager_name, project, trigger):
+    def getChange(self, project, trigger):
         # TODO: make the scheduler deal with events (which may have
         # changes) rather than changes so that we don't have to create
         # "fake" changes for events that aren't associated with changes.
 
         if self.change_number:
-            change = trigger.getChange(self.change_number, self.patch_number,
-                                       manager_name)
+            change = trigger.getChange(self.change_number, self.patch_number)
         if self.ref:
-            change = Change(manager_name, project)
+            change = Ref(project)
             change.ref = self.ref
             change.oldrev = self.oldrev
             change.newrev = self.newrev
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 1aad0ff..ee32d31 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,7 +20,7 @@
 import threading
 import yaml
 
-from model import Job, Project, ChangeQueue, EventFilter
+from model import Pipeline, Job, Project, ChangeQueue, EventFilter
 
 
 class Scheduler(threading.Thread):
@@ -43,7 +43,7 @@
         self._init()
 
     def _init(self):
-        self.queue_managers = {}
+        self.pipelines = {}
         self.jobs = {}
         self.projects = {}
         self.metajobs = {}
@@ -78,14 +78,16 @@
                 fn = os.path.expanduser(fn)
                 execfile(fn, self._config_env)
 
-        for config_queue in data['queues']:
-            manager = globals()[config_queue['manager']](self,
-                                                         config_queue['name'])
-            self.queue_managers[config_queue['name']] = manager
-            manager.success_action = config_queue.get('success')
-            manager.failure_action = config_queue.get('failure')
-            manager.start_action = config_queue.get('start')
-            for trigger in toList(config_queue['trigger']):
+        for conf_pipeline in data.get('pipelines', []):
+            pipeline = Pipeline(conf_pipeline['name'])
+            manager = globals()[conf_pipeline['manager']](self, pipeline)
+            pipeline.setManager(manager)
+
+            self.pipelines[conf_pipeline['name']] = pipeline
+            manager.success_action = conf_pipeline.get('success')
+            manager.failure_action = conf_pipeline.get('failure')
+            manager.start_action = conf_pipeline.get('start')
+            for trigger in toList(conf_pipeline['trigger']):
                 approvals = {}
                 for approval_dict in toList(trigger.get('approval')):
                     for k, v in approval_dict.items():
@@ -95,7 +97,7 @@
                                 refs=toList(trigger.get('ref')),
                                 approvals=approvals,
                                 comment_filters=toList(
-                                        trigger.get('comment_filter')))
+                        trigger.get('comment_filter')))
                 manager.event_filters.append(f)
 
         for config_job in data['jobs']:
@@ -137,10 +139,10 @@
         for config_project in data['projects']:
             project = Project(config_project['name'])
             self.projects[config_project['name']] = project
-            for qname in self.queue_managers.keys():
-                if qname in config_project:
-                    job_tree = project.addQueue(qname)
-                    config_jobs = config_project[qname]
+            for pipeline in self.pipelines.values():
+                if pipeline.name in config_project:
+                    job_tree = pipeline.addProject(project)
+                    config_jobs = config_project[pipeline.name]
                     add_jobs(job_tree, config_jobs)
 
         # All jobs should be defined at this point, get rid of
@@ -149,8 +151,8 @@
 
         # TODO(jeblair): check that we don't end up with jobs like
         # "foo - bar" because a ':' is missing in the yaml for a dependent job
-        for manager in self.queue_managers.values():
-            manager._postConfig()
+        for pipeline in self.pipelines.values():
+            pipeline.manager._postConfig()
 
     def getJob(self, name):
         if name in self.jobs:
@@ -275,9 +277,9 @@
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
         waiting = False
-        for manager in self.queue_managers.values():
-            for build in manager.building_jobs.keys():
-                self.log.debug("%s waiting on %s" % (manager, build))
+        for pipeline in self.pipelines.values():
+            for build in pipeline.manager.building_jobs.keys():
+                self.log.debug("%s waiting on %s" % (pipeline.manager, build))
                 waiting = True
         if not waiting:
             self.log.debug("All builds are complete")
@@ -325,49 +327,49 @@
             self.log.warning("Project %s not found" % event.project_name)
             return
 
-        for manager in self.queue_managers.values():
-            if not manager.eventMatches(event):
-                self.log.debug("Event %s ignored by %s" % (event, manager))
+        for pipeline in self.pipelines.values():
+            if not pipeline.manager.eventMatches(event):
+                self.log.debug("Event %s ignored by %s" % (event, pipeline))
                 continue
-            change = event.getChange(manager.name, project, self.trigger)
+            change = event.getChange(project, self.trigger)
             self.log.info("Adding %s, %s to %s" %
-                          (project, change, manager))
-            manager.addChange(change)
+                          (project, change, pipeline))
+            pipeline.manager.addChange(change)
 
     def process_result_queue(self):
         self.log.debug("Fetching result event")
         event_type, build = self.result_event_queue.get()
         self.log.debug("Processing result event %s" % build)
-        for manager in self.queue_managers.values():
+        for pipeline in self.pipelines.values():
             if event_type == 'started':
-                if manager.onBuildStarted(build):
+                if pipeline.manager.onBuildStarted(build):
                     return
             elif event_type == 'completed':
-                if manager.onBuildCompleted(build):
+                if pipeline.manager.onBuildCompleted(build):
                     return
         self.log.warning("Build %s not found by any queue manager" % (build))
 
     def formatStatusHTML(self):
         ret = '<html><pre>'
-        keys = self.queue_managers.keys()
+        keys = self.pipelines.keys()
         keys.sort()
         for key in keys:
-            manager = self.queue_managers[key]
-            s = 'Queue: %s' % manager.name
+            pipeline = self.pipelines[key]
+            s = 'Pipeline: %s' % pipeline.name
             ret += s + '\n'
             ret += '-' * len(s) + '\n'
-            ret += manager.formatStatusHTML()
+            ret += pipeline.formatStatusHTML()
             ret += '\n'
         ret += '</pre></html>'
         return ret
 
 
-class BaseQueueManager(object):
-    log = logging.getLogger("zuul.BaseQueueManager")
+class BasePipelineManager(object):
+    log = logging.getLogger("zuul.BasePipelineManager")
 
-    def __init__(self, sched, name):
+    def __init__(self, sched, pipeline):
         self.sched = sched
-        self.name = name
+        self.pipeline = pipeline
         self.building_jobs = {}
         self.event_filters = []
         self.success_action = {}
@@ -378,7 +380,7 @@
         return "<%s %s>" % (self.__class__.__name__, self.name)
 
     def _postConfig(self):
-        self.log.info("Configured Queue Manager %s" % self.name)
+        self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
         self.log.info("  Events:")
         for e in self.event_filters:
             self.log.info("    %s" % e)
@@ -403,9 +405,10 @@
                 log_jobs(x, indent + 2)
 
         for p in self.sched.projects.values():
-            if p.hasQueue(self.name):
+            tree = self.pipeline.getJobTree(p)
+            if tree:
                 self.log.info("    %s" % p)
-                log_jobs(p.getJobTreeForQueue(self.name))
+                log_jobs(tree)
         if self.start_action:
             self.log.info("  On start:")
             self.log.info("    %s" % self.start_action)
@@ -421,7 +424,10 @@
         # "needed" in the submit records for a change, with respect
         # to this queue.  In other words, the list of review labels
         # this queue itself is likely to set before submitting.
-        return self.success_action.keys()
+        if self.success_action:
+            return self.success_action.keys()
+        else:
+            return {}
 
     def eventMatches(self, event):
         for ef in self.event_filters:
@@ -447,7 +453,7 @@
             try:
                 self.log.info("Reporting start, action %s change %s" %
                               (self.start_action, change))
-                msg = "Starting %s jobs." % self.name
+                msg = "Starting %s jobs." % self.pipeline.name
                 ret = self.sched.trigger.report(change, msg, self.start_action)
                 if ret:
                     self.log.error("Reporting change start %s received: %s" %
@@ -458,7 +464,7 @@
 
     def launchJobs(self, change):
         self.log.debug("Launching jobs for change %s" % change)
-        for job in change.findJobsToRun():
+        for job in self.pipeline.findJobsToRun(change):
             self.log.debug("Found job %s for change %s" % (job, change))
             try:
                 build = self.sched.launcher.launch(job, change)
@@ -472,12 +478,12 @@
 
     def updateBuildDescriptions(self, build_set):
         for build in build_set.getBuilds():
-            desc = build.formatDescription()
+            desc = self.pipeline.formatDescription(build)
             self.sched.launcher.setBuildDescription(build, desc)
 
         if build_set.previous_build_set:
             for build in build_set.previous_build_set.getBuilds():
-                desc = build.formatDescription()
+                desc = self.pipeline.formatDescription(build)
                 self.sched.launcher.setBuildDescription(build, desc)
 
     def onBuildStarted(self, build):
@@ -504,11 +510,11 @@
 
         del self.building_jobs[build]
 
-        change.setResult(build)
+        self.pipeline.setResult(change, build)
         self.log.info("Change %s status is now:\n %s" %
-                      (change, change.formatStatus()))
+                      (change, self.pipeline.formatStatus(change)))
 
-        if change.areAllJobsComplete():
+        if self.pipeline.areAllJobsComplete(change):
             self.log.debug("All jobs for change %s are complete" % change)
             self.possiblyReportChange(change)
         else:
@@ -525,11 +531,13 @@
         self.reportChange(change)
 
     def reportChange(self, change):
-        self.log.debug("Reporting change %s" % change)
+        if not change.is_reportable:
+            return False
         if change.reported:
             return 0
+        self.log.debug("Reporting change %s" % change)
         ret = None
-        if change.didAllJobsSucceed():
+        if self.pipeline.didAllJobsSucceed(change):
             action = self.success_action
             change.setReportedResult('SUCCESS')
         else:
@@ -539,7 +547,7 @@
             self.log.info("Reporting change %s, action: %s" %
                           (change, action))
             ret = self.sched.trigger.report(change,
-                                            change.formatReport(),
+                                            self.pipeline.formatReport(change),
                                             action)
             if ret:
                 self.log.error("Reporting change %s received: %s" %
@@ -563,35 +571,34 @@
         changes = self.getChangesInQueue()
         ret = ''
         for change in changes:
-            ret += change.formatStatus(html=True)
+            ret += self.pipeline.formatStatus(change, html=True)
         return ret
 
 
-class IndependentQueueManager(BaseQueueManager):
-    log = logging.getLogger("zuul.IndependentQueueManager")
+class IndependentPipelineManager(BasePipelineManager):
+    log = logging.getLogger("zuul.IndependentPipelineManager")
 
 
-class DependentQueueManager(BaseQueueManager):
-    log = logging.getLogger("zuul.DependentQueueManager")
+class DependentPipelineManager(BasePipelineManager):
+    log = logging.getLogger("zuul.DependentPipelineManager")
 
     def __init__(self, *args, **kwargs):
-        super(DependentQueueManager, self).__init__(*args, **kwargs)
+        super(DependentPipelineManager, self).__init__(*args, **kwargs)
         self.change_queues = []
 
     def _postConfig(self):
-        super(DependentQueueManager, self)._postConfig()
+        super(DependentPipelineManager, self)._postConfig()
         self.buildChangeQueues()
 
     def buildChangeQueues(self):
         self.log.debug("Building shared change queues")
         change_queues = []
 
-        for project in self.sched.projects.values():
-            if project.hasQueue(self.name):
-                change_queue = ChangeQueue(self.name)
-                change_queue.addProject(project)
-                change_queues.append(change_queue)
-                self.log.debug("Created queue: %s" % change_queue)
+        for project in self.pipeline.getProjects():
+            change_queue = ChangeQueue(self.pipeline)
+            change_queue.addProject(project)
+            change_queues.append(change_queue)
+            self.log.debug("Created queue: %s" % change_queue)
 
         self.log.debug("Combining shared queues")
         new_change_queues = []
@@ -622,6 +629,9 @@
         self.log.debug("Checking for changes needed by %s:" % change)
         # Return true if okay to proceed enqueing this change,
         # false if the change should not be enqueued.
+        if not hasattr(change, 'needs_change'):
+            self.log.debug("  Changeish does not support dependencies")
+            return True
         if not change.needs_change:
             self.log.debug("  No changes needed")
             return True
@@ -635,7 +645,8 @@
         if change.needs_change in change_queue.queue:
             self.log.debug("  Needed change is already ahead in the queue")
             return True
-        if change.needs_change.can_merge:
+        if self.sched.trigger.canMerge(change.needs_change,
+                                       self.getSubmitAllowNeeds()):
             # It can merge, so attempt to enqueue it _ahead_ of this change.
             # If that works we can enqueue this change, otherwise, we can't.
             self.log.debug("  Change %s must be merged ahead of %s" %
@@ -649,8 +660,12 @@
     def _checkForChangesNeeding(self, change):
         to_enqueue = []
         self.log.debug("Checking for changes needing %s:" % change)
+        if not hasattr(change, 'needed_by_changes'):
+            self.log.debug("  Changeish does not support dependencies")
+            return to_enqueue
         for needs in change.needed_by_changes:
-            if needs.can_merge:
+            if self.sched.trigger.canMerge(needs,
+                                           self.getSubmitAllowNeeds()):
                 self.log.debug("  Change %s needs %s and is ready to merge" %
                                (needs, change))
                 to_enqueue.append(needs)
@@ -664,7 +679,8 @@
             self.log.debug("Change %s is already in queue, ignoring" % change)
             return True
 
-        if not change.can_merge:
+        if not self.sched.trigger.canMerge(change,
+                                           self.getSubmitAllowNeeds()):
             self.log.debug("Change %s can not merge, ignoring" % change)
             return False
 
@@ -705,7 +721,7 @@
     def launchJobs(self, change):
         self.log.debug("Launching jobs for change %s" % change)
         dependent_changes = self._getDependentChanges(change)
-        for job in change.findJobsToRun():
+        for job in self.pipeline.findJobsToRun(change):
             self.log.debug("Found job %s for change %s" % (job, change))
             try:
                 build = self.sched.launcher.launch(job,
@@ -749,9 +765,10 @@
 
     def onBuildCompleted(self, build):
         change = self.building_jobs.get(build)
-        if not super(DependentQueueManager, self).onBuildCompleted(build):
+        if not super(DependentPipelineManager, self).onBuildCompleted(build):
             return False
-        if change and change.change_behind and change.didAnyJobFail():
+        if (change and change.change_behind and
+            self.pipeline.didAnyJobFail(change)):
             # This or some other build failed. All changes behind this change
             # will need to be retested. To free up resources cancel the builds
             # behind this one as they will be rerun anyways.
@@ -775,7 +792,7 @@
             merged = (not ret)
             if merged:
                 merged = self.sched.trigger.isMerged(change, change.branch)
-            succeeded = change.didAllJobsSucceed()
+            succeeded = self.pipeline.didAllJobsSucceed(change)
             self.log.info("Reported change %s status: all-succeeded: %s, "
                           "merged: %s" % (change, succeeded, merged))
 
@@ -790,7 +807,7 @@
                     self.cancelJobs(change_behind)
                     self.launchJobs(change_behind)
         # If the change behind this is ready, notify
-        if change_behind and change_behind.areAllJobsComplete():
+        if change_behind and self.pipeline.areAllJobsComplete(change_behind):
             self.log.info("Change %s behind change %s is ready, "
                           "possibly reporting" % (change_behind, change))
             self.possiblyReportChange(change_behind)
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 721be7b..003e96a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -170,7 +170,7 @@
         if status == 'MERGED' or status == 'SUBMITTED':
             return True
 
-    def _canMerge(self, change, allow_needs):
+    def canMerge(self, change, allow_needs):
         if not change.number:
             self.log.debug("Change has no number; considering it merged")
             # Good question.  It's probably ref-updated, which, ah,
@@ -206,15 +206,13 @@
             return False
         return True
 
-    def getChange(self, number, patchset, queue_name, changes=None):
-        # TODO: queue_name is screwing up the data model, refactor
-        # the queue context so it isn't necessary.
+    def getChange(self, number, patchset, changes=None):
         self.log.info("Getting information for %s,%s" % (number, patchset))
         if changes is None:
             changes = {}
         data = self.gerrit.query(number)
         project = self.sched.projects[data['project']]
-        change = Change(queue_name, project)
+        change = Change(project)
         change._data = data
 
         change.number = number
@@ -233,9 +231,6 @@
         else:
             change.is_current_patchset = False
 
-        manager = self.sched.queue_managers[queue_name]
-        change.can_merge = self._canMerge(change,
-                                          manager.getSubmitAllowNeeds())
         change.is_merged = self._isMerged(change)
         if change.is_merged:
             # This change is merged, so we don't need to look any further
@@ -249,7 +244,7 @@
             key = '%s,%s' % (num, ps)
             if key in changes:
                 return changes.get(key)
-            c = self.getChange(num, ps, queue_name, changes)
+            c = self.getChange(num, ps, changes)
             return c
 
         if 'dependsOn' in data: