Support cross-source dependencies

Additional tests and docs in later patches.

Change-Id: I3b86a1e3dd507fa5e584680fb6c86d35f9ff3e23
Story: 2001334
Task: 5885
diff --git a/doc/source/admin/drivers/zuul.rst b/doc/source/admin/drivers/zuul.rst
index d95dffc..41535ee 100644
--- a/doc/source/admin/drivers/zuul.rst
+++ b/doc/source/admin/drivers/zuul.rst
@@ -26,6 +26,12 @@
          When Zuul merges a change to a project, it generates this
          event for every open change in the project.
 
+         .. warning::
+
+            Triggering on this event can cause poor performance when
+            using the GitHub driver with a large number of
+            installations.
+
       .. value:: parent-change-enqueued
 
          When Zuul enqueues a change into any pipeline, it generates
diff --git a/tests/base.py b/tests/base.py
index 59c0d2a..e688abd 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -170,7 +170,7 @@
             'status': status,
             'subject': subject,
             'submitRecords': [],
-            'url': 'https://hostname/%s' % number}
+            'url': 'https://%s/%s' % (self.gerrit.server, number)}
 
         self.upstream_root = upstream_root
         self.addPatchset(files=files, parent=parent)
@@ -559,14 +559,13 @@
             return change.query()
         return {}
 
-    def simpleQuery(self, query):
-        self.log.debug("simpleQuery: %s" % query)
-        self.queries.append(query)
+    def _simpleQuery(self, query):
         if query.startswith('change:'):
             # Query a specific changeid
             changeid = query[len('change:'):]
             l = [change.query() for change in self.changes.values()
-                 if change.data['id'] == changeid]
+                 if (change.data['id'] == changeid or
+                     change.data['number'] == changeid)]
         elif query.startswith('message:'):
             # Query the content of a commit message
             msg = query[len('message:'):].strip()
@@ -577,6 +576,20 @@
             l = [change.query() for change in self.changes.values()]
         return l
 
+    def simpleQuery(self, query):
+        self.log.debug("simpleQuery: %s" % query)
+        self.queries.append(query)
+        results = []
+        if query.startswith('(') and 'OR' in query:
+            query = query[1:-2]
+            for q in query.split(' OR '):
+                for r in self._simpleQuery(q):
+                    if r not in results:
+                        results.append(r)
+        else:
+            results = self._simpleQuery(query)
+        return results
+
     def _start_watcher_thread(self, *args, **kw):
         pass
 
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index 054ee5f..197b525 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -119,7 +119,7 @@
         self.assertEqual('SUCCESS', buildset0['result'])
         self.assertEqual('Build succeeded.', buildset0['message'])
         self.assertEqual('tenant-one', buildset0['tenant'])
-        self.assertEqual('https://hostname/%d' % buildset0['change'],
+        self.assertEqual('https://review.example.com/%d' % buildset0['change'],
                          buildset0['ref_url'])
 
         buildset0_builds = conn.execute(
diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py
index 7a61eec..732bc3d 100644
--- a/tests/unit/test_gerrit_crd.py
+++ b/tests/unit/test_gerrit_crd.py
@@ -24,9 +24,6 @@
 class TestGerritCRD(ZuulTestCase):
     tenant_config_file = 'config/single-tenant/main.yaml'
 
-    def setUp(self):
-        raise self.skipTest("Feature not yet implemented")
-
     def test_crd_gate(self):
         "Test cross-repo dependencies"
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@@ -100,18 +97,14 @@
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
         C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1')
-        C2 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C2',
-                                            status='ABANDONED')
-        C1.data['id'] = B.data['id']
-        C2.data['id'] = B.data['id']
 
         A.addApproval('Code-Review', 2)
         B.addApproval('Code-Review', 2)
         C1.addApproval('Code-Review', 2)
 
         # A Depends-On: B+C1
-        A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
-            A.subject, B.data['url'])
+        A.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % (
+            A.subject, B.data['url'], C1.data['url'])
 
         self.executor_server.hold_jobs_in_build = True
         B.addApproval('Approved', 1)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index d5d8e83..5db20b3 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -4196,7 +4196,7 @@
         running_item = running_items[0]
         self.assertEqual([], running_item['failing_reasons'])
         self.assertEqual([], running_item['items_behind'])
-        self.assertEqual('https://hostname/1', running_item['url'])
+        self.assertEqual('https://review.example.com/1', running_item['url'])
         self.assertIsNone(running_item['item_ahead'])
         self.assertEqual('org/project', running_item['project'])
         self.assertIsNone(running_item['remaining_time'])
diff --git a/tests/unit/test_zuultrigger.py b/tests/unit/test_zuultrigger.py
index 3954a21..5575853 100644
--- a/tests/unit/test_zuultrigger.py
+++ b/tests/unit/test_zuultrigger.py
@@ -126,5 +126,5 @@
             "dependencies was unable to be automatically merged with the "
             "current state of its repository. Please rebase the change and "
             "upload a new patchset.")
-        self.assertEqual(self.fake_gerrit.queries[1],
-                         "project:org/project status:open")
+        self.assertIn("project:org/project status:open",
+                      self.fake_gerrit.queries)
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index f4b090d..d3b3c00 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -442,8 +442,19 @@
         # In case this change is already in the history we have a
         # cyclic dependency and don't need to update ourselves again
         # as this gets done in a previous frame of the call stack.
-        # NOTE(jeblair): I don't think it's possible to hit this case
-        # anymore as all paths hit the change cache first.
+        # NOTE(jeblair): The only case where this can still be hit is
+        # when we get an event for a change with no associated
+        # patchset; for instance, when the gerrit topic is changed.
+        # In that case, we will update change 1234,None, which will be
+        # inserted into the cache as its own entry, but then we will
+        # resolve the patchset before adding it to the history list,
+        # then if there are dependencies, we can walk down and then
+        # back up to the version of this change with a patchset which
+        # will match the history list but will have bypassed the
+        # change cache because the previous object had a patchset of
+        # None.  All paths hit the change cache first.  To be able to
+        # drop history, we need to resolve the patchset on events with
+        # no patchsets before adding the entry to the change cache.
         if (history and change.number and change.patchset and
             (change.number, change.patchset) in history):
             self.log.debug("Change %s is in history" % (change,))
@@ -461,6 +472,11 @@
         change.project = self.source.getProject(data['project'])
         change.branch = data['branch']
         change.url = data['url']
+        change.uris = [
+            '%s/%s' % (self.server, change.number),
+            '%s/#/c/%s' % (self.server, change.number),
+        ]
+
         max_ps = 0
         files = []
         for ps in data['patchSets']:
@@ -481,6 +497,7 @@
         change.open = data['open']
         change.status = data['status']
         change.owner = data['owner']
+        change.message = data['commitMessage']
 
         if change.is_merged:
             # This change is merged, so we don't need to look any further
@@ -494,7 +511,8 @@
             history = history[:]
         history.append((change.number, change.patchset))
 
-        needs_changes = []
+        needs_changes = set()
+        git_needs_changes = []
         if 'dependsOn' in data:
             parts = data['dependsOn'][0]['ref'].split('/')
             dep_num, dep_ps = parts[3], parts[4]
@@ -505,8 +523,11 @@
             # already merged. So even if it is "ABANDONED", we should not
             # ignore it.
             if (not dep.is_merged) and dep not in needs_changes:
-                needs_changes.append(dep)
+                git_needs_changes.append(dep)
+                needs_changes.add(dep)
+        change.git_needs_changes = git_needs_changes
 
+        compat_needs_changes = []
         for record in self._getDependsOnFromCommit(data['commitMessage'],
                                                    change):
             dep_num = record['number']
@@ -516,10 +537,12 @@
                            (change, dep_num, dep_ps))
             dep = self._getChange(dep_num, dep_ps, history=history)
             if dep.open and dep not in needs_changes:
-                needs_changes.append(dep)
-        change.needs_changes = needs_changes
+                compat_needs_changes.append(dep)
+                needs_changes.add(dep)
+        change.compat_needs_changes = compat_needs_changes
 
-        needed_by_changes = []
+        needed_by_changes = set()
+        git_needed_by_changes = []
         if 'neededBy' in data:
             for needed in data['neededBy']:
                 parts = needed['ref'].split('/')
@@ -527,9 +550,13 @@
                 self.log.debug("Updating %s: Getting git-needed change %s,%s" %
                                (change, dep_num, dep_ps))
                 dep = self._getChange(dep_num, dep_ps, history=history)
-                if dep.open and dep.is_current_patchset:
-                    needed_by_changes.append(dep)
+                if (dep.open and dep.is_current_patchset and
+                    dep not in needed_by_changes):
+                    git_needed_by_changes.append(dep)
+                    needed_by_changes.add(dep)
+        change.git_needed_by_changes = git_needed_by_changes
 
+        compat_needed_by_changes = []
         for record in self._getNeededByFromCommit(data['id'], change):
             dep_num = record['number']
             dep_ps = record['currentPatchSet']['number']
@@ -543,9 +570,13 @@
             refresh = (dep_num, dep_ps) not in history
             dep = self._getChange(
                 dep_num, dep_ps, refresh=refresh, history=history)
-            if dep.open and dep.is_current_patchset:
-                needed_by_changes.append(dep)
-        change.needed_by_changes = needed_by_changes
+            if (dep.open and dep.is_current_patchset
+                and dep not in needed_by_changes):
+                compat_needed_by_changes.append(dep)
+                needed_by_changes.add(dep)
+        change.compat_needed_by_changes = compat_needed_by_changes
+
+        self.sched.onChangeUpdated(change)
 
         return change
 
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index 7141080..9a75b3e 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -12,12 +12,15 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import re
+import urllib
 import logging
 import voluptuous as vs
 from zuul.source import BaseSource
 from zuul.model import Project
 from zuul.driver.gerrit.gerritmodel import GerritRefFilter
 from zuul.driver.util import scalar_or_list, to_list
+from zuul.lib.dependson import find_dependency_headers
 
 
 class GerritSource(BaseSource):
@@ -44,6 +47,59 @@
     def getChange(self, event, refresh=False):
         return self.connection.getChange(event, refresh)
 
+    change_re = re.compile(r"/(\#\/c\/)?(\d+)[\w]*")
+
+    def getChangeByURL(self, url):
+        try:
+            parsed = urllib.parse.urlparse(url)
+        except ValueError:
+            return None
+        m = self.change_re.match(parsed.path)
+        if not m:
+            return None
+        try:
+            change_no = int(m.group(2))
+        except ValueError:
+            return None
+        query = "change:%s" % (change_no,)
+        results = self.connection.simpleQuery(query)
+        if not results:
+            return None
+        change = self.connection._getChange(
+            results[0]['number'], results[0]['currentPatchSet']['number'])
+        return change
+
+    def getChangesDependingOn(self, change, projects):
+        queries = set()
+        for uri in change.uris:
+            queries.add('message:%s' % uri)
+        query = '(' + ' OR '.join(queries) + ')'
+        results = self.connection.simpleQuery(query)
+        seen = set()
+        changes = []
+        for result in results:
+            for match in find_dependency_headers(result['commitMessage']):
+                found = False
+                for uri in change.uris:
+                    if uri in match:
+                        found = True
+                        break
+                if not found:
+                    continue
+                key = (result['number'], result['currentPatchSet']['number'])
+                if key in seen:
+                    continue
+                seen.add(key)
+                change = self.connection._getChange(
+                    result['number'], result['currentPatchSet']['number'])
+                changes.append(change)
+        return changes
+
+    def getCachedChanges(self):
+        for x in self.connection._change_cache.values():
+            for y in x.values():
+                yield y
+
     def getProject(self, name):
         p = self.connection.getProject(name)
         if not p:
diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py
index 78ae04e..a7d42be 100644
--- a/zuul/driver/git/gitsource.py
+++ b/zuul/driver/git/gitsource.py
@@ -38,6 +38,15 @@
     def getChange(self, event, refresh=False):
         return self.connection.getChange(event, refresh)
 
+    def getChangeByURL(self, url):
+        return None
+
+    def getChangesDependingOn(self, change, projects):
+        return []
+
+    def getCachedChanges(self):
+        return []
+
     def getProject(self, name):
         p = self.connection.getProject(name)
         if not p:
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 4b91c18..1957cc2 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -658,6 +658,9 @@
             change = self._getChange(project, event.change_number,
                                      event.patch_number, refresh=refresh)
             change.url = event.change_url
+            change.uris = [
+                '%s/%s/pull/%s' % (self.server, project, change.number),
+            ]
             change.updated_at = self._ghTimestampToDate(event.updated_at)
             change.source_event = event
             change.is_current_patchset = (change.pr.get('head').get('sha') ==
@@ -699,58 +702,72 @@
             raise
         return change
 
-    def _getDependsOnFromPR(self, body):
-        prs = []
-        seen = set()
+    def getChangesDependingOn(self, change, projects):
+        changes = []
+        if not change.uris:
+            return changes
 
-        for match in self.depends_on_re.findall(body):
-            if match in seen:
-                self.log.debug("Ignoring duplicate Depends-On: %s" % (match,))
-                continue
-            seen.add(match)
-            # Get the github url
-            url = match.rsplit()[-1]
-            # break it into the parts we need
-            _, org, proj, _, num = url.rsplit('/', 4)
-            # Get a pull object so we can get the head sha
-            pull = self.getPull('%s/%s' % (org, proj), int(num))
-            prs.append(pull)
+        # Get a list of projects with unique installation ids
+        installation_ids = set()
+        installation_projects = set()
 
-        return prs
+        if projects:
+            # We only need to find changes in projects in the supplied
+            # ChangeQueue.  Find all of the github installations for
+            # all of those projects, and search using each of them, so
+            # that if we get the right results based on the
+            # permissions granted to each of the installations.  The
+            # common case for this is likely to be just one
+            # installation -- change queues aren't likely to span more
+            # than one installation.
+            for project in projects:
+                installation_id = self.installation_map.get(project)
+                if installation_id not in installation_ids:
+                    installation_ids.add(installation_id)
+                    installation_projects.add(project)
+        else:
+            # We aren't in the context of a change queue and we just
+            # need to query all installations.  This currently only
+            # happens if certain features of the zuul trigger are
+            # used; generally it should be avoided.
+            for project, installation_id in self.installation_map.items():
+                if installation_id not in installation_ids:
+                    installation_ids.add(installation_id)
+                    installation_projects.add(project)
 
-    def _getNeededByFromPR(self, change):
-        prs = []
-        seen = set()
-        # This shouldn't return duplicate issues, but code as if it could
-
-        # This leaves off the protocol, but looks for the specific GitHub
-        # hostname, the org/project, and the pull request number.
-        pattern = 'Depends-On %s/%s/pull/%s' % (self.server,
-                                                change.project.name,
-                                                change.number)
+        keys = set()
+        pattern = ' OR '.join(change.uris)
         query = '%s type:pr is:open in:body' % pattern
-        # FIXME(tobiash): find a way to query this for different installations
-        github = self.getGithubClient(change.project.name)
-        for issue in github.search_issues(query=query):
-            pr = issue.issue.pull_request().as_dict()
-            if not pr.get('url'):
-                continue
-            if issue in seen:
-                continue
-            # the issue provides no good description of the project :\
-            org, proj, _, num = pr.get('url').split('/')[-4:]
-            self.log.debug("Found PR %s/%s/%s needs %s/%s" %
-                           (org, proj, num, change.project.name,
-                            change.number))
-            prs.append(pr)
-            seen.add(issue)
+        # Repeat the search for each installation id (project)
+        for installation_project in installation_projects:
+            github = self.getGithubClient(installation_project)
+            for issue in github.search_issues(query=query):
+                pr = issue.issue.pull_request().as_dict()
+                if not pr.get('url'):
+                    continue
+                # the issue provides no good description of the project :\
+                org, proj, _, num = pr.get('url').split('/')[-4:]
+                proj = pr.get('base').get('repo').get('full_name')
+                sha = pr.get('head').get('sha')
+                key = (proj, num, sha)
+                if key in keys:
+                    continue
+                self.log.debug("Found PR %s/%s needs %s/%s" %
+                               (proj, num, change.project.name,
+                                change.number))
+                keys.add(key)
+            self.log.debug("Ran search issues: %s", query)
+            log_rate_limit(self.log, github)
 
-        self.log.debug("Ran search issues: %s", query)
-        log_rate_limit(self.log, github)
-        return prs
+        for key in keys:
+            (proj, num, sha) = key
+            project = self.source.getProject(proj)
+            change = self._getChange(project, int(num), patchset=sha)
+            changes.append(change)
+
+        return changes
 
     def _updateChange(self, change, history=None):
-
         # If this change is already in the history, we have a cyclic
         # dependency loop and we do not need to update again, since it
         # was done in a previous frame.
@@ -770,10 +787,8 @@
         change.reviews = self.getPullReviews(change.project,
                                              change.number)
         change.labels = change.pr.get('labels')
-        change.body = change.pr.get('body')
-        # ensure body is at least an empty string
-        if not change.body:
-            change.body = ''
+        # ensure message is at least an empty string
+        change.message = change.pr.get('body') or ''
 
         if history is None:
             history = []
@@ -781,38 +796,7 @@
             history = history[:]
         history.append((change.project.name, change.number))
 
-        needs_changes = []
-
-        # Get all the PRs this may depend on
-        for pr in self._getDependsOnFromPR(change.body):
-            proj = pr.get('base').get('repo').get('full_name')
-            pull = pr.get('number')
-            self.log.debug("Updating %s: Getting dependent "
-                           "pull request %s/%s" %
-                           (change, proj, pull))
-            project = self.source.getProject(proj)
-            dep = self._getChange(project, pull,
-                                  patchset=pr.get('head').get('sha'),
-                                  history=history)
-            if (not dep.is_merged) and dep not in needs_changes:
-                needs_changes.append(dep)
-
-        change.needs_changes = needs_changes
-
-        needed_by_changes = []
-        for pr in self._getNeededByFromPR(change):
-            proj = pr.get('base').get('repo').get('full_name')
-            pull = pr.get('number')
-            self.log.debug("Updating %s: Getting needed "
-                           "pull request %s/%s" %
-                           (change, proj, pull))
-            project = self.source.getProject(proj)
-            dep = self._getChange(project, pull,
-                                  patchset=pr.get('head').get('sha'),
-                                  history=history)
-            if not dep.is_merged:
-                needed_by_changes.append(dep)
-        change.needed_by_changes = needed_by_changes
+        self.sched.onChangeUpdated(change)
 
         return change
 
diff --git a/zuul/driver/github/githubmodel.py b/zuul/driver/github/githubmodel.py
index ffd1c3f..0731dd7 100644
--- a/zuul/driver/github/githubmodel.py
+++ b/zuul/driver/github/githubmodel.py
@@ -37,7 +37,8 @@
         self.labels = []
 
     def isUpdateOf(self, other):
-        if (hasattr(other, 'number') and self.number == other.number and
+        if (self.project == other.project and
+            hasattr(other, 'number') and self.number == other.number and
             hasattr(other, 'patchset') and self.patchset != other.patchset and
             hasattr(other, 'updated_at') and
             self.updated_at > other.updated_at):
diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py
index 1e7e07a..9834727 100644
--- a/zuul/driver/github/githubsource.py
+++ b/zuul/driver/github/githubsource.py
@@ -12,6 +12,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import re
+import urllib
 import logging
 import time
 import voluptuous as v
@@ -61,6 +63,38 @@
     def getChange(self, event, refresh=False):
         return self.connection.getChange(event, refresh)
 
+    change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*")
+
+    def getChangeByURL(self, url):
+        try:
+            parsed = urllib.parse.urlparse(url)
+        except ValueError:
+            return None
+        m = self.change_re.match(parsed.path)
+        if not m:
+            return None
+        org = m.group(1)
+        proj = m.group(2)
+        try:
+            num = int(m.group(3))
+        except ValueError:
+            return None
+        pull = self.connection.getPull('%s/%s' % (org, proj), int(num))
+        if not pull:
+            return None
+        proj = pull.get('base').get('repo').get('full_name')
+        project = self.getProject(proj)
+        change = self.connection._getChange(
+            project, num,
+            patchset=pull.get('head').get('sha'))
+        return change
+
+    def getChangesDependingOn(self, change, projects):
+        return self.connection.getChangesDependingOn(change, projects)
+
+    def getCachedChanges(self):
+        return self.connection._change_cache.values()
+
     def getProject(self, name):
         p = self.connection.getProject(name)
         if not p:
diff --git a/zuul/driver/zuul/__init__.py b/zuul/driver/zuul/__init__.py
index 0f6ec7d..e381137 100644
--- a/zuul/driver/zuul/__init__.py
+++ b/zuul/driver/zuul/__init__.py
@@ -90,7 +90,18 @@
         if not hasattr(change, 'needed_by_changes'):
             self.log.debug("  %s does not support dependencies" % type(change))
             return
-        for needs in change.needed_by_changes:
+
+        # This is very inefficient, especially on systems with large
+        # numbers of github installations.  This can be improved later
+        # with persistent storage of dependency information.
+        needed_by_changes = set(change.needed_by_changes)
+        for source in self.sched.connections.getSources():
+            self.log.debug("  Checking source: %s", source)
+            needed_by_changes.update(
+                source.getChangesDependingOn(change, None))
+        self.log.debug("  Following changes: %s", needed_by_changes)
+
+        for needs in needed_by_changes:
             self._createParentChangeEnqueuedEvent(needs, pipeline)
 
     def _createParentChangeEnqueuedEvent(self, change, pipeline):
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 06c2087..b21a290 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -245,7 +245,7 @@
         for change in dependent_changes:
             # We have to find the project this way because it may not
             # be registered in the tenant (ie, a foreign project).
-            source = self.sched.connections.getSourceByHostname(
+            source = self.sched.connections.getSourceByCanonicalHostname(
                 change['project']['canonical_hostname'])
             project = source.getProject(change['project']['name'])
             if project not in projects:
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 262490a..33c66f9 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -14,6 +14,7 @@
 
 import logging
 import re
+from collections import OrderedDict
 
 import zuul.driver.zuul
 import zuul.driver.gerrit
@@ -38,7 +39,7 @@
     log = logging.getLogger("zuul.ConnectionRegistry")
 
     def __init__(self):
-        self.connections = {}
+        self.connections = OrderedDict()
         self.drivers = {}
 
         self.registerDriver(zuul.driver.zuul.ZuulDriver())
@@ -85,7 +86,7 @@
 
     def configure(self, config, source_only=False):
         # Register connections from the config
-        connections = {}
+        connections = OrderedDict()
 
         for section_name in config.sections():
             con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
@@ -154,6 +155,13 @@
         connection = self.connections[connection_name]
         return connection.driver.getSource(connection)
 
+    def getSources(self):
+        sources = []
+        for connection in self.connections.values():
+            if hasattr(connection.driver, 'getSource'):
+                sources.append(connection.driver.getSource(connection))
+        return sources
+
     def getReporter(self, connection_name, config=None):
         connection = self.connections[connection_name]
         return connection.driver.getReporter(connection, config)
@@ -162,7 +170,7 @@
         connection = self.connections[connection_name]
         return connection.driver.getTrigger(connection, config)
 
-    def getSourceByHostname(self, canonical_hostname):
+    def getSourceByCanonicalHostname(self, canonical_hostname):
         for connection in self.connections.values():
             if hasattr(connection, 'canonical_hostname'):
                 if connection.canonical_hostname == canonical_hostname:
diff --git a/zuul/lib/dependson.py b/zuul/lib/dependson.py
new file mode 100644
index 0000000..cd0f6ef
--- /dev/null
+++ b/zuul/lib/dependson.py
@@ -0,0 +1,29 @@
+# Copyright 2018 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import re
+
+
+DEPENDS_ON_RE = re.compile(r"^Depends-On: (.*?)\s*$",
+                           re.MULTILINE | re.IGNORECASE)
+
+
+def find_dependency_headers(message):
+    # Search for Depends-On headers
+    dependencies = []
+    for match in DEPENDS_ON_RE.findall(message):
+        if match in dependencies:
+            continue
+        dependencies.append(match)
+    return dependencies
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index d205afc..b8a280f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -12,9 +12,11 @@
 
 import logging
 import textwrap
+import urllib
 
 from zuul import exceptions
 from zuul import model
+from zuul.lib.dependson import find_dependency_headers
 
 
 class DynamicChangeQueueContextManager(object):
@@ -343,6 +345,32 @@
         self.dequeueItem(item)
         self.reportStats(item)
 
+    def updateCommitDependencies(self, change, change_queue):
+        # Search for Depends-On headers and find appropriate changes
+        self.log.debug("  Updating commit dependencies for %s", change)
+        change.refresh_deps = False
+        dependencies = []
+        seen = set()
+        for match in find_dependency_headers(change.message):
+            self.log.debug("  Found Depends-On header: %s", match)
+            if match in seen:
+                continue
+            seen.add(match)
+            try:
+                url = urllib.parse.urlparse(match)
+            except ValueError:
+                continue
+            source = self.sched.connections.getSourceByCanonicalHostname(
+                url.hostname)
+            if not source:
+                continue
+            self.log.debug("  Found source: %s", source)
+            dep = source.getChangeByURL(match)
+            if dep and (not dep.is_merged) and dep not in dependencies:
+                self.log.debug("  Adding dependency: %s", dep)
+                dependencies.append(dep)
+        change.commit_needs_changes = dependencies
+
     def provisionNodes(self, item):
         jobs = item.findJobsToRequest()
         if not jobs:
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index 5aef453..23c2cdb 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -95,12 +95,29 @@
     def enqueueChangesBehind(self, change, quiet, ignore_requirements,
                              change_queue):
         self.log.debug("Checking for changes needing %s:" % change)
-        to_enqueue = []
-        source = change.project.source
         if not hasattr(change, 'needed_by_changes'):
             self.log.debug("  %s does not support dependencies" % type(change))
             return
-        for other_change in change.needed_by_changes:
+
+        # for project in change_queue, project.source get changes, then dedup.
+        sources = set()
+        for project in change_queue.projects:
+            sources.add(project.source)
+
+        seen = set(change.needed_by_changes)
+        needed_by_changes = change.needed_by_changes[:]
+        for source in sources:
+            self.log.debug("  Checking source: %s", source)
+            for c in source.getChangesDependingOn(change,
+                                                  change_queue.projects):
+                if c not in seen:
+                    seen.add(c)
+                    needed_by_changes.append(c)
+
+        self.log.debug("  Following changes: %s", needed_by_changes)
+
+        to_enqueue = []
+        for other_change in needed_by_changes:
             with self.getChangeQueue(other_change) as other_change_queue:
                 if other_change_queue != change_queue:
                     self.log.debug("  Change %s in project %s can not be "
@@ -108,6 +125,7 @@
                                    (other_change, other_change.project,
                                     change_queue))
                     continue
+            source = other_change.project.source
             if source.canMerge(other_change, self.getSubmitAllowNeeds()):
                 self.log.debug("  Change %s needs %s and is ready to merge" %
                                (other_change, change))
@@ -145,10 +163,13 @@
         return True
 
     def checkForChangesNeededBy(self, change, change_queue):
-        self.log.debug("Checking for changes needed by %s:" % change)
-        source = change.project.source
         # Return true if okay to proceed enqueing this change,
         # false if the change should not be enqueued.
+        self.log.debug("Checking for changes needed by %s:" % change)
+        source = change.project.source
+        if (hasattr(change, 'commit_needs_changes') and
+            (change.refresh_deps or change.commit_needs_changes is None)):
+            self.updateCommitDependencies(change, change_queue)
         if not hasattr(change, 'needs_changes'):
             self.log.debug("  %s does not support dependencies" % type(change))
             return True
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
index 65f5ca0..0c2baf0 100644
--- a/zuul/manager/independent.py
+++ b/zuul/manager/independent.py
@@ -70,6 +70,9 @@
         self.log.debug("Checking for changes needed by %s:" % change)
         # Return true if okay to proceed enqueing this change,
         # false if the change should not be enqueued.
+        if (hasattr(change, 'commit_needs_changes') and
+            (change.refresh_deps or change.commit_needs_changes is None)):
+            self.updateCommitDependencies(change, None)
         if not hasattr(change, 'needs_changes'):
             self.log.debug("  %s does not support dependencies" % type(change))
             return True
diff --git a/zuul/model.py b/zuul/model.py
index 16a701d..bac9e4c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -2103,11 +2103,28 @@
     def __init__(self, project):
         super(Change, self).__init__(project)
         self.number = None
+        # The gitweb url for browsing the change
         self.url = None
+        # URIs for this change which may appear in depends-on headers.
+        # Note this omits the scheme; i.e., is hostname/path.
+        self.uris = []
         self.patchset = None
 
-        self.needs_changes = []
-        self.needed_by_changes = []
+        # Changes that the source determined are needed due to the
+        # git DAG:
+        self.git_needs_changes = []
+        self.git_needed_by_changes = []
+
+        # Changes that the source determined are needed by backwards
+        # compatible processing of Depends-On headers (Gerrit only):
+        self.compat_needs_changes = []
+        self.compat_needed_by_changes = []
+
+        # Changes that the pipeline manager determined are needed due
+        # to Depends-On headers (all drivers):
+        self.commit_needs_changes = None
+        self.refresh_deps = False
+
         self.is_current_patchset = True
         self.can_merge = False
         self.is_merged = False
@@ -2116,6 +2133,11 @@
         self.status = None
         self.owner = None
 
+        # This may be the commit message, or it may be a cover message
+        # in the case of a PR.  Either way, it's the place where we
+        # look for depends-on headers.
+        self.message = None
+
         self.source_event = None
 
     def _id(self):
@@ -2129,8 +2151,18 @@
             return True
         return False
 
+    @property
+    def needs_changes(self):
+        return (self.git_needs_changes + self.compat_needs_changes +
+                self.commit_needs_changes)
+
+    @property
+    def needed_by_changes(self):
+        return (self.git_needed_by_changes + self.compat_needed_by_changes)
+
     def isUpdateOf(self, other):
-        if ((hasattr(other, 'number') and self.number == other.number) and
+        if (self.project == other.project and
+            (hasattr(other, 'number') and self.number == other.number) and
             (hasattr(other, 'patchset') and
              self.patchset is not None and
              other.patchset is not None and
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index c3f2f23..a2e3b6e 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1088,3 +1088,25 @@
         for pipeline in tenant.layout.pipelines.values():
             pipelines.append(pipeline.formatStatusJSON(websocket_url))
         return json.dumps(data)
+
+    def onChangeUpdated(self, change):
+        """Remove stale dependency references on change update.
+
+        When a change is updated with a new patchset, other changes in
+        the system may still have a reference to the old patchset in
+        their dependencies.  Search for those (across all sources) and
+        mark that their dependencies are out of date.  This will cause
+        them to be refreshed the next time the queue processor
+        examines them.
+        """
+
+        self.log.debug("Change %s has been updated, clearing dependent "
+                       "change caches", change)
+        for source in self.connections.getSources():
+            for other_change in source.getCachedChanges():
+                if other_change.commit_needs_changes is None:
+                    continue
+                for dep in other_change.commit_needs_changes:
+                    if change.isUpdateOf(dep):
+                        other_change.refresh_deps = True
+        change.refresh_deps = True
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
index 0396aff..00dfc9c 100644
--- a/zuul/source/__init__.py
+++ b/zuul/source/__init__.py
@@ -52,6 +52,29 @@
         """Get the change representing an event."""
 
     @abc.abstractmethod
+    def getChangeByURL(self, url):
+        """Get the change corresponding to the supplied URL.
+
+        The URL may may not correspond to this source; if it doesn't,
+        or there is no change at that URL, return None.
+
+        """
+
+    @abc.abstractmethod
+    def getChangesDependingOn(self, change, projects):
+        """Return changes which depend on changes at the supplied URIs.
+
+        Search this source for changes which depend on the supplied
+        change.  Generally the Change.uris attribute should be used to
+        perform the search, as it contains a list of URLs without the
+        scheme which represent a single change
+
+        If the projects argument is None, search across all known
+        projects.  If it is supplied, the search may optionally be
+        restricted to only those projects.
+        """
+
+    @abc.abstractmethod
     def getProjectOpenChanges(self, project):
         """Get the open changes for a project."""