Merge "Add support for a skip-if filter on jobs"
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 4cc897c..9be4deb 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -569,6 +569,15 @@
well. To suppress this behavior (and allow jobs to continue
running), set this to ``false``. Default: ``true``.
+**ignore-dependencies**
+ In any kind of pipeline (dependent or independent), Zuul will
+ attempt to enqueue all dependencies ahead of the current change so
+ that they are tested together (independent pipelines report the
+ results of each change regardless of the results of changes ahead).
+ To ignore dependencies completely in an independent pipeline, set
+ this to ``true``. This option is ignored by dependent pipelines.
+ The default is: ``false``.
+
**success**
Describes where Zuul should report to if all the jobs complete
successfully.
@@ -1091,9 +1100,11 @@
./tools/zuul-changes.py --review-host=review.openstack.org \
http://zuul.openstack.org/ check 'recheck'
-If you send a SIGUSR2 to the zuul-server process, Zuul will dump a stack
-trace for each running thread into its debug log. This is useful for
-tracking down deadlock or otherwise slow threads.
+If you send a SIGUSR2 to the zuul-server process, or the forked process
+that runs the Gearman daemon, Zuul will dump a stack trace for each
+running thread into its debug log. It is written under the log bucket
+``zuul.stack_dump``. This is useful for tracking down deadlock or
+otherwise slow threads.
When `yappi <https://code.google.com/p/yappi/>`_ (Yet Another Python
Profiler) is available, additional functions' and threads' stats are
diff --git a/etc/status/.gitignore b/etc/status/.gitignore
index 8b94cad..1ecdbed 100644
--- a/etc/status/.gitignore
+++ b/etc/status/.gitignore
@@ -1,4 +1,4 @@
public_html/jquery.min.js
-public_html/jquery-visibility.min.js
+public_html/jquery-visibility.js
public_html/bootstrap
public_html/jquery.graphite.js
diff --git a/etc/status/fetch-dependencies.sh b/etc/status/fetch-dependencies.sh
index 4868310..b31d0de 100755
--- a/etc/status/fetch-dependencies.sh
+++ b/etc/status/fetch-dependencies.sh
@@ -3,10 +3,10 @@
echo "Destination: $BASE_DIR/public_html"
echo "Fetching jquery.min.js..."
-curl --silent http://code.jquery.com/jquery.min.js > $BASE_DIR/public_html/jquery.min.js
+curl -L --silent http://code.jquery.com/jquery.min.js > $BASE_DIR/public_html/jquery.min.js
echo "Fetching jquery-visibility.min.js..."
-curl --silent https://raw.githubusercontent.com/mathiasbynens/jquery-visibility/master/jquery-visibility.js > $BASE_DIR/public_html/jquery-visibility.min.js
+curl -L --silent https://raw.githubusercontent.com/mathiasbynens/jquery-visibility/master/jquery-visibility.js > $BASE_DIR/public_html/jquery-visibility.js
echo "Fetching jquery.graphite.js..."
curl -L --silent https://github.com/prestontimmons/graphitejs/archive/master.zip > jquery-graphite.zip
diff --git a/etc/status/public_html/index.html b/etc/status/public_html/index.html
index d77470b..3bd7a12 100644
--- a/etc/status/public_html/index.html
+++ b/etc/status/public_html/index.html
@@ -20,7 +20,6 @@
<head>
<title>Zuul Status</title>
<link rel="stylesheet" href="bootstrap/css/bootstrap.min.css">
- <link rel="stylesheet" href="bootstrap/css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="styles/zuul.css" />
</head>
<body>
@@ -28,7 +27,7 @@
<div id="zuul_container"></div>
<script src="jquery.min.js"></script>
- <script src="jquery-visibility.min.js"></script>
+ <script src="jquery-visibility.js"></script>
<script src="jquery.graphite.js"></script>
<script src="jquery.zuul.js"></script>
<script src="zuul.app.js"></script>
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index 5d155af..c13e48c 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -370,6 +370,11 @@
icon_title = 'Waiting until closer to head of queue to' +
' start jobs';
}
+ else if (change.live !== true) {
+ // Grey icon
+ icon_name = 'grey.png';
+ icon_title = 'Dependent change independently tested';
+ }
else if (change.failing_reasons &&
change.failing_reasons.length > 0) {
var reason = change.failing_reasons.join(', ');
diff --git a/requirements.txt b/requirements.txt
index 81257ab..b24d171 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,7 +8,7 @@
GitPython>=0.3.2.1
lockfile>=0.8
ordereddict
-python-daemon<2.0
+python-daemon>=2.0.4
extras
statsd>=1.0.0,<3.0
voluptuous>=0.7
diff --git a/tests/base.py b/tests/base.py
index ec3b74d..18d5f5a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -418,11 +418,15 @@
return {}
def simpleQuery(self, query):
- # This is currently only used to return all open changes for a
- # project
self.queries.append(query)
- l = [change.query() for change in self.changes.values()]
- l.append({"type": "stats", "rowCount": 1, "runTimeMilliseconds": 3})
+ if query.startswith('change:'):
+ # Query a specific changeid
+ changeid = query[len('change:'):]
+ l = [change.query() for change in self.changes.values()
+ if change.data['id'] == changeid]
+ else:
+ # Query all open changes
+ l = [change.query() for change in self.changes.values()]
return l
def startWatching(self, *args, **kw):
@@ -982,6 +986,10 @@
repos.append(obj)
self.assertEqual(len(repos), 0)
self.assertEmptyQueues()
+ for pipeline in self.sched.layout.pipelines.values():
+ if isinstance(pipeline.manager,
+ zuul.scheduler.IndependentPipelineManager):
+ self.assertEqual(len(pipeline.queues), 0)
def shutdown(self):
self.log.debug("Shutting down after tests")
@@ -1215,10 +1223,10 @@
self.sched.trigger_event_queue.join()
self.sched.result_event_queue.join()
self.sched.run_handler_lock.acquire()
- if (self.sched.trigger_event_queue.empty() and
+ if (not self.merge_client.build_sets and
+ self.sched.trigger_event_queue.empty() and
self.sched.result_event_queue.empty() and
self.fake_gerrit.event_queue.empty() and
- not self.merge_client.build_sets and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
diff --git a/tests/fixtures/layout-ignore-dependencies.yaml b/tests/fixtures/layout-ignore-dependencies.yaml
new file mode 100644
index 0000000..5c0257c
--- /dev/null
+++ b/tests/fixtures/layout-ignore-dependencies.yaml
@@ -0,0 +1,28 @@
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ ignore-dependencies: true
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+projects:
+ - name: org/project1
+ check:
+ - project1-merge:
+ - project1-test1
+ - project1-test2
+ - project1-project2-integration
+
+ - name: org/project2
+ check:
+ - project2-merge:
+ - project2-test1
+ - project2-test2
+ - project1-project2-integration
diff --git a/tests/fixtures/layout-zuultrigger-merged.yaml b/tests/fixtures/layout-zuultrigger-merged.yaml
index 657700d..bb06dde 100644
--- a/tests/fixtures/layout-zuultrigger-merged.yaml
+++ b/tests/fixtures/layout-zuultrigger-merged.yaml
@@ -36,6 +36,7 @@
- name: merge-check
manager: IndependentPipelineManager
source: gerrit
+ ignore-dependencies: true
trigger:
zuul:
- event: project-change-merged
diff --git a/tests/fixtures/layouts/good_layout.yaml b/tests/fixtures/layouts/good_layout.yaml
index 4bd5e70..fc2effd 100644
--- a/tests/fixtures/layouts/good_layout.yaml
+++ b/tests/fixtures/layouts/good_layout.yaml
@@ -43,6 +43,17 @@
verified: -2
workinprogress: true
+ - name: merge-check
+ manager: IndependentPipelineManager
+ source: gerrit
+ ignore-dependencies: true
+ trigger:
+ zuul:
+ - event: project-change-merged
+ merge-failure:
+ gerrit:
+ verified: -1
+
jobs:
- name: ^.*-merge$
failure-message: Unable to merge change
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a4e47be..4c8c832 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -641,6 +641,91 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def test_needed_changes_enqueue(self):
+ "Test that a needed change is enqueued ahead"
+ # A Given a git tree like this, if we enqueue
+ # / \ change C, we should walk up and down the tree
+ # B G and enqueue changes in the order ABCDEFG.
+ # /|\ This is also the order that you would get if
+ # *C E F you enqueued changes in the order ABCDEFG, so
+ # / the ordering is stable across re-enqueue events.
+ # D
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+ F = self.fake_gerrit.addFakeChange('org/project', 'master', 'F')
+ G = self.fake_gerrit.addFakeChange('org/project', 'master', 'G')
+ B.setDependsOn(A, 1)
+ C.setDependsOn(B, 1)
+ D.setDependsOn(C, 1)
+ E.setDependsOn(B, 1)
+ F.setDependsOn(B, 1)
+ G.setDependsOn(A, 1)
+
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+ E.addApproval('CRVW', 2)
+ F.addApproval('CRVW', 2)
+ G.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(C.data['status'], 'NEW')
+ self.assertEqual(D.data['status'], 'NEW')
+ self.assertEqual(E.data['status'], 'NEW')
+ self.assertEqual(F.data['status'], 'NEW')
+ self.assertEqual(G.data['status'], 'NEW')
+
+ # We're about to add approvals to changes without adding the
+ # triggering events to Zuul, so that we can be sure that it is
+ # enqueing the changes based on dependencies, not because of
+ # triggering events. Since it will have the changes cached
+ # already (without approvals), we need to clear the cache
+ # first.
+ source = self.sched.layout.pipelines['gate'].source
+ source.maintainCache([])
+
+ self.worker.hold_jobs_in_build = True
+ A.addApproval('APRV', 1)
+ B.addApproval('APRV', 1)
+ D.addApproval('APRV', 1)
+ E.addApproval('APRV', 1)
+ F.addApproval('APRV', 1)
+ G.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+ for x in range(8):
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(D.data['status'], 'MERGED')
+ self.assertEqual(E.data['status'], 'MERGED')
+ self.assertEqual(F.data['status'], 'MERGED')
+ self.assertEqual(G.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.reported, 2)
+ self.assertEqual(D.reported, 2)
+ self.assertEqual(E.reported, 2)
+ self.assertEqual(F.reported, 2)
+ self.assertEqual(G.reported, 2)
+ self.assertEqual(self.history[6].changes,
+ '1,1 2,1 3,1 4,1 5,1 6,1 7,1')
+
def test_trigger_cache(self):
"Test that the trigger cache operates correctly"
self.worker.hold_jobs_in_build = True
@@ -1452,35 +1537,178 @@
self.assertEqual(D.reported, 2)
self.assertEqual(len(self.history), 9) # 3 each for A, B, D.
- def test_abandoned_change_dequeues(self):
- "Test that an abandoned change is dequeued"
+ def test_new_patchset_check(self):
+ "Test a new patchset in check"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
+
+ # A live item, and a non-live/live pair
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertEqual(items[2].change.patchset, '1')
+ self.assertTrue(items[2].live)
+
+ # Add a new patchset to A
+ A.addPatchset()
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ # The live copy of A,1 should be gone, but the non-live and B
+ # should continue, and we should have a new A,2
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertEqual(items[2].change.patchset, '2')
+ self.assertTrue(items[2].live)
+
+ # Add a new patchset to B
+ B.addPatchset()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ # The live copy of B,1 should be gone, and it's non-live copy of A,1
+ # but we should have a new B,2 (still based on A,1)
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '2')
+ self.assertTrue(items[0].live)
+
+ self.assertEqual(items[1].change.number, '1')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertFalse(items[1].live)
+
+ self.assertEqual(items[2].change.number, '2')
+ self.assertEqual(items[2].change.patchset, '2')
+ self.assertTrue(items[2].live)
+
+ self.builds[0].release()
+ self.waitUntilSettled()
+ self.builds[0].release()
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertEqual(self.history[0].result, 'ABORTED')
+ self.assertEqual(self.history[0].changes, '1,1')
+ self.assertEqual(self.history[1].result, 'ABORTED')
+ self.assertEqual(self.history[1].changes, '1,1 2,1')
+ self.assertEqual(self.history[2].result, 'SUCCESS')
+ self.assertEqual(self.history[2].changes, '1,2')
+ self.assertEqual(self.history[3].result, 'SUCCESS')
+ self.assertEqual(self.history[3].changes, '1,1 2,2')
+
+ def test_abandoned_gate(self):
+ "Test that an abandoned change is dequeued from gate"
+
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
self.assertEqual(len(self.builds), 1, "One job being built (on hold)")
self.assertEqual(self.builds[0].name, 'project-merge')
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
- # For debugging purposes...
- # for pipeline in self.sched.layout.pipelines.values():
- # for queue in pipeline.queues:
- # self.log.info("pipepline %s queue %s contents %s" % (
- # pipeline.name, queue.name, queue.queue))
-
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0, "No job running")
- self.assertEmptyQueues()
self.assertEqual(len(self.history), 1, "Only one build in history")
self.assertEqual(self.history[0].result, 'ABORTED',
+ "Build should have been aborted")
+ self.assertEqual(A.reported, 1,
+ "Abandoned gate change should report only start")
+
+ def test_abandoned_check(self):
+ "Test that an abandoned change is dequeued from check"
+
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ # A live item, and a non-live/live pair
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertTrue(items[2].live)
+
+ # Abandon A
+ self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+ self.waitUntilSettled()
+
+ # The live copy of A should be gone, but the non-live and B
+ # should continue
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 2)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertTrue(items[1].live)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.history), 4)
+ self.assertEqual(self.history[0].result, 'ABORTED',
'Build should have been aborted')
self.assertEqual(A.reported, 0, "Abandoned change should not report")
+ self.assertEqual(B.reported, 1, "Change should report")
def test_zuul_url_return(self):
"Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
@@ -1903,7 +2131,7 @@
status_jobs = set()
for p in data['pipelines']:
for q in p['change_queues']:
- if q['dependent']:
+ if p['name'] in ['gate', 'conflict']:
self.assertEqual(q['window'], 20)
else:
self.assertEqual(q['window'], 0)
@@ -2872,3 +3100,405 @@
self.getJobFromHistory('experimental-project-test').result,
'SUCCESS')
self.assertEqual(A.reported, 1)
+
+ def test_crd_gate(self):
+ "Test cross-repo dependencies"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ AM2 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM2')
+ AM1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM1')
+ AM2.setMerged()
+ AM1.setMerged()
+
+ BM2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM2')
+ BM1 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM1')
+ BM2.setMerged()
+ BM1.setMerged()
+
+ # A -> AM1 -> AM2
+ # B -> BM1 -> BM2
+ # A Depends-On: B
+ # M2 is here to make sure it is never queried. If it is, it
+ # means zuul is walking down the entire history of merged
+ # changes.
+
+ B.setDependsOn(BM1, 1)
+ BM1.setDependsOn(BM2, 1)
+
+ A.setDependsOn(AM1, 1)
+ AM1.setDependsOn(AM2, 1)
+
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ source = self.sched.layout.pipelines['gate'].source
+ source.maintainCache([])
+
+ self.worker.hold_jobs_in_build = True
+ B.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(AM2.queried, 0)
+ self.assertEqual(BM2.queried, 0)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+
+ self.assertEqual(self.getJobFromHistory('project1-merge').changes,
+ '2,1 1,1')
+
+ def test_crd_branch(self):
+ "Test cross-repo dependencies in multiple branches"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C')
+ C.data['id'] = B.data['id']
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ # A Depends-On: B+C
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.worker.hold_jobs_in_build = True
+ B.addApproval('APRV', 1)
+ C.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.reported, 2)
+
+ self.assertEqual(self.getJobFromHistory('project1-merge').changes,
+ '2,1 3,1 1,1')
+
+ def test_crd_multiline(self):
+ "Test multiple depends-on lines in commit"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ # A Depends-On: B+C
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % (
+ A.subject, B.data['id'], C.data['id'])
+
+ self.worker.hold_jobs_in_build = True
+ B.addApproval('APRV', 1)
+ C.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.reported, 2)
+
+ self.assertEqual(self.getJobFromHistory('project1-merge').changes,
+ '2,1 3,1 1,1')
+
+ def test_crd_unshared_gate(self):
+ "Test cross-repo dependencies in unshared gate queues"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ # A and B do not share a queue, make sure that A is unable to
+ # enqueue B (and therefore, A is unable to be enqueued).
+ B.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 0)
+ self.assertEqual(B.reported, 0)
+ self.assertEqual(len(self.history), 0)
+
+ # Enqueue and merge B alone.
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2)
+
+ # Now that B is merged, A should be able to be enqueued and
+ # merged.
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+
+ def test_crd_cycle(self):
+ "Test cross-repo dependency cycles"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ # A -> B -> A (via commit-depends)
+
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 0)
+ self.assertEqual(B.reported, 0)
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ def test_crd_check(self):
+ "Test cross-repo dependencies in independent pipelines"
+
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ queue = self.gearman_server.getQueue()
+ ref = self.getParameter(queue[-1], 'ZUUL_REF')
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ path = os.path.join(self.git_root, "org/project1")
+ repo = git.Repo(path)
+ repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
+ repo_messages.reverse()
+ correct_messages = ['initial commit', 'A-1']
+ self.assertEqual(repo_messages, correct_messages)
+
+ path = os.path.join(self.git_root, "org/project2")
+ repo = git.Repo(path)
+ repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
+ repo_messages.reverse()
+ correct_messages = ['initial commit', 'B-1']
+ self.assertEqual(repo_messages, correct_messages)
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 0)
+
+ self.assertEqual(self.history[0].changes, '2,1 1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ def test_crd_check_git_depends(self):
+ "Test single-repo dependencies in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+
+ # Add two git-dependent changes and make sure they both report
+ # success.
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1')
+ self.assertEqual(self.history[-1].changes, '1,1 2,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
+ def test_crd_check_duplicate(self):
+ "Test duplicate check in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes...
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...make sure the live one is not duplicated...
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...but the non-live one is able to be.
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 3)
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1 2,1')
+ self.assertEqual(self.history[1].changes, '1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
+ def test_crd_check_reconfiguration(self):
+ "Test cross-repo dependencies re-enqueued in independent pipelines"
+
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.sched.reconfigure(self.config)
+
+ # Make sure the items still share a change queue, and the
+ # first one is not live.
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 1)
+ queue = self.sched.layout.pipelines['check'].queues[0]
+ first_item = queue.queue[0]
+ for item in queue.queue:
+ self.assertEqual(item.queue, first_item.queue)
+ self.assertFalse(first_item.live)
+ self.assertTrue(queue.queue[1].live)
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 0)
+
+ self.assertEqual(self.history[0].changes, '2,1 1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ def test_crd_check_ignore_dependencies(self):
+ "Test cross-repo dependencies can be ignored"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-ignore-dependencies.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+ # C git-depends on B
+ C.setDependsOn(B, 1)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Make sure none of the items share a change queue, and all
+ # are live.
+ check_pipeline = self.sched.layout.pipelines['check']
+ self.assertEqual(len(check_pipeline.queues), 3)
+ self.assertEqual(len(check_pipeline.getAllItems()), 3)
+ for item in check_pipeline.getAllItems():
+ self.assertTrue(item.live)
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(C.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertEqual(C.reported, 1)
+
+ # Each job should have tested exactly one change
+ for job in self.history:
+ self.assertEqual(len(job.changes.split()), 1)
diff --git a/tests/test_zuultrigger.py b/tests/test_zuultrigger.py
index 3f339be..a26fa86 100644
--- a/tests/test_zuultrigger.py
+++ b/tests/test_zuultrigger.py
@@ -65,7 +65,7 @@
for job in self.history:
if job.changes == '1,1':
self.assertEqual(job.name, 'project-gate')
- elif job.changes == '2,1':
+ elif job.changes == '1,1 2,1':
self.assertEqual(job.name, 'project-check')
elif job.changes == '1,1 3,1':
self.assertEqual(job.name, 'project-gate')
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 9da300e..832eae4 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -150,6 +150,7 @@
import zuul.webapp
import zuul.rpclistener
+ signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
if (self.config.has_option('gearman_server', 'start') and
self.config.getboolean('gearman_server', 'start')):
self.start_gear_server()
@@ -204,7 +205,6 @@
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
- signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
signal.signal(signal.SIGTERM, self.term_handler)
while True:
try:
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 68abbf5..88d10e2 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -106,6 +106,7 @@
'merge-failure-message': str,
'footer-message': str,
'dequeue-on-new-patchset': bool,
+ 'ignore-dependencies': bool,
'trigger': trigger,
'success': report_actions,
'failure': report_actions,
diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py
index 5aad953..9aeff3d 100644
--- a/zuul/lib/gerrit.py
+++ b/zuul/lib/gerrit.py
@@ -120,7 +120,7 @@
if v is True:
cmd += ' --%s' % k
else:
- cmd += ' --%s %s' % (k, v)
+ cmd += ' --label %s=%s' % (k, v)
cmd += ' %s' % change
out, err = self._ssh(cmd)
return err
diff --git a/zuul/model.py b/zuul/model.py
index 5ac6f53..1786fd9 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -73,6 +73,7 @@
self.success_message = None
self.footer_message = None
self.dequeue_on_new_patchset = True
+ self.ignore_dependencies = False
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
@@ -111,15 +112,20 @@
return queue
return None
+ def removeQueue(self, queue):
+ self.queues.remove(queue)
+
def getJobTree(self, project):
tree = self.job_trees.get(project)
return tree
- def getJobs(self, changeish):
- tree = self.getJobTree(changeish.project)
+ def getJobs(self, item):
+ if not item.live:
+ return []
+ tree = self.getJobTree(item.change.project)
if not tree:
return []
- return changeish.filterJobs(tree.getJobs())
+ return item.change.filterJobs(tree.getJobs())
def _findJobsToRun(self, job_trees, item):
torun = []
@@ -148,27 +154,29 @@
return torun
def findJobsToRun(self, item):
+ if not item.live:
+ return []
tree = self.getJobTree(item.change.project)
if not tree:
return []
return self._findJobsToRun(tree.job_trees, item)
def haveAllJobsStarted(self, item):
- for job in self.getJobs(item.change):
+ for job in self.getJobs(item):
build = item.current_build_set.getBuild(job.name)
if not build or not build.start_time:
return False
return True
def areAllJobsComplete(self, item):
- for job in self.getJobs(item.change):
+ for job in self.getJobs(item):
build = item.current_build_set.getBuild(job.name)
if not build or not build.result:
return False
return True
def didAllJobsSucceed(self, item):
- for job in self.getJobs(item.change):
+ for job in self.getJobs(item):
if not job.voting:
continue
build = item.current_build_set.getBuild(job.name)
@@ -184,7 +192,7 @@
return True
def didAnyJobFail(self, item):
- for job in self.getJobs(item.change):
+ for job in self.getJobs(item):
if not job.voting:
continue
build = item.current_build_set.getBuild(job.name)
@@ -193,7 +201,9 @@
return False
def isHoldingFollowingChanges(self, item):
- for job in self.getJobs(item.change):
+ if not item.live:
+ return False
+ for job in self.getJobs(item):
if not job.hold_following_changes:
continue
build = item.current_build_set.getBuild(job.name)
@@ -256,7 +266,6 @@
j_queues.append(j_queue)
j_queue['heads'] = []
j_queue['window'] = queue.window
- j_queue['dependent'] = queue.dependent
j_changes = []
for e in queue.queue:
@@ -303,8 +312,8 @@
different projects; this is one of them. For instance, there may
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
- PipelineQueues."""
- def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
+ ChangeQueues."""
+ def __init__(self, pipeline, window=0, window_floor=1,
window_increase_type='linear', window_increase_factor=1,
window_decrease_type='exponential', window_decrease_factor=2):
self.pipeline = pipeline
@@ -314,7 +323,6 @@
self.projects = []
self._jobs = set()
self.queue = []
- self.dependent = dependent
self.window = window
self.window_floor = window_floor
self.window_increase_type = window_increase_type
@@ -348,14 +356,15 @@
self.name = self.assigned_name or self.generated_name
def enqueueChange(self, change):
- item = QueueItem(self.pipeline, change)
+ item = QueueItem(self, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
def enqueueItem(self, item):
item.pipeline = self.pipeline
- if self.dependent and self.queue:
+ item.queue = self
+ if self.queue:
item.item_ahead = self.queue[-1]
item.item_ahead.items_behind.append(item)
self.queue.append(item)
@@ -374,8 +383,6 @@
item.dequeue_time = time.time()
def moveItem(self, item, item_ahead):
- if not self.dependent:
- return False
if item.item_ahead == item_ahead:
return False
# Remove from current location
@@ -399,20 +406,20 @@
# TODO merge semantics
def isActionable(self, item):
- if self.dependent and self.window:
+ if self.window:
return item in self.queue[:self.window]
else:
return True
def increaseWindowSize(self):
- if self.dependent:
+ if self.window:
if self.window_increase_type == 'linear':
self.window += self.window_increase_factor
elif self.window_increase_type == 'exponential':
self.window *= self.window_increase_factor
def decreaseWindowSize(self):
- if self.dependent:
+ if self.window:
if self.window_decrease_type == 'linear':
self.window = max(
self.window_floor,
@@ -656,8 +663,9 @@
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
- def __init__(self, pipeline, change):
- self.pipeline = pipeline
+ def __init__(self, queue, change):
+ self.pipeline = queue.pipeline
+ self.queue = queue
self.change = change # a changeish
self.build_sets = []
self.dequeued_needing_change = False
@@ -668,7 +676,8 @@
self.enqueue_time = None
self.dequeue_time = None
self.reported = False
- self.active = False
+ self.active = False # Whether an item is within an active window
+ self.live = True # Whether an item is intended to be processed at all
def __repr__(self):
if self.pipeline:
@@ -700,6 +709,7 @@
changeish = self.change
ret = {}
ret['active'] = self.active
+ ret['live'] = self.live
if hasattr(changeish, 'url') and changeish.url is not None:
ret['url'] = changeish.url
else:
@@ -720,7 +730,7 @@
else:
ret['owner'] = None
max_remaining = 0
- for job in self.pipeline.getJobs(changeish):
+ for job in self.pipeline.getJobs(self):
now = time.time()
build = self.current_build_set.getBuild(job.name)
elapsed = None
@@ -796,7 +806,7 @@
changeish.project.name,
changeish._id(),
self.item_ahead)
- for job in self.pipeline.getJobs(changeish):
+ for job in self.pipeline.getJobs(self):
build = self.current_build_set.getBuild(job.name)
if build:
result = build.result
@@ -858,7 +868,7 @@
self.refspec = None
self.files = []
- self.needs_change = None
+ self.needs_changes = []
self.needed_by_changes = []
self.is_current_patchset = True
self.can_merge = False
@@ -891,8 +901,8 @@
def getRelatedChanges(self):
related = set()
- if self.needs_change:
- related.add(self.needs_change)
+ for c in self.needs_changes:
+ related.add(c)
for c in self.needed_by_changes:
related.add(c)
related.update(c.getRelatedChanges())
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index fc21bf1..25f8192 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Antoine "hashar" Musso
# Copyright 2013 Wikimedia Foundation Inc.
@@ -281,6 +281,8 @@
pipeline.footer_message = conf_pipeline.get('footer-message', "")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True)
+ pipeline.ignore_dependencies = conf_pipeline.get(
+ 'ignore-dependencies', False)
action_reporters = {}
for action in ['start', 'success', 'failure', 'merge-failure']:
@@ -671,11 +673,15 @@
self.log.debug("Re-enqueueing changes for pipeline %s" % name)
items_to_remove = []
builds_to_remove = []
+ last_head = None
for shared_queue in old_pipeline.queues:
for item in shared_queue.queue:
+ if not item.item_ahead:
+ last_head = item
item.item_ahead = None
item.items_behind = []
item.pipeline = None
+ item.queue = None
project = layout.projects.get(item.change.project.name)
if not project:
self.log.warning("Unable to find project for "
@@ -691,7 +697,8 @@
build.job = job
else:
builds_to_remove.append(build)
- if not new_pipeline.manager.reEnqueueItem(item):
+ if not new_pipeline.manager.reEnqueueItem(item,
+ last_head):
items_to_remove.append(item)
for item in items_to_remove:
for build in item.current_build_set.getBuilds():
@@ -1059,9 +1066,17 @@
return True
return False
- def isChangeAlreadyInQueue(self, change):
- for c in self.pipeline.getChangesInQueue():
- if change.equals(c):
+ def isChangeAlreadyInPipeline(self, change):
+ # Checks live items in the pipeline
+ for item in self.pipeline.getAllItems():
+ if item.live and change.equals(item.change):
+ return True
+ return False
+
+ def isChangeAlreadyInQueue(self, change, change_queue):
+ # Checks any item in the specified change queue
+ for item in change_queue.queue:
+ if change.equals(item.change):
return True
return False
@@ -1099,16 +1114,18 @@
def isChangeReadyToBeEnqueued(self, change):
return True
- def enqueueChangesAhead(self, change, quiet, ignore_requirements):
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
return True
- def enqueueChangesBehind(self, change, quiet, ignore_requirements):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
return True
- def getFailingDependentItem(self, item):
+ def getFailingDependentItems(self, item):
return None
def getDependentItems(self, item):
@@ -1129,42 +1146,54 @@
return None
def findOldVersionOfChangeAlreadyInQueue(self, change):
- for c in self.pipeline.getChangesInQueue():
- if change.isUpdateOf(c):
- return c
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if change.isUpdateOf(item.change):
+ return item
return None
def removeOldVersionsOfChange(self, change):
if not self.pipeline.dequeue_on_new_patchset:
return
- old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
- if old_change:
+ old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
+ if old_item:
self.log.debug("Change %s is a new version of %s, removing %s" %
- (change, old_change, old_change))
- self.removeChange(old_change)
+ (change, old_item.change, old_item))
+ self.removeItem(old_item)
def removeAbandonedChange(self, change):
self.log.debug("Change %s abandoned, removing." % change)
- self.removeChange(change)
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if item.change.equals(change):
+ self.removeItem(item)
- def reEnqueueItem(self, item):
- change_queue = self.pipeline.getQueue(item.change.project)
- if change_queue:
- self.log.debug("Re-enqueing change %s in queue %s" %
- (item.change, change_queue))
- change_queue.enqueueItem(item)
- self.reportStats(item)
- return True
- else:
- self.log.error("Unable to find change queue for project %s" %
- item.change.project)
- return False
+ def reEnqueueItem(self, item, last_head):
+ with self.getChangeQueue(item.change, last_head.queue) as change_queue:
+ if change_queue:
+ self.log.debug("Re-enqueing change %s in queue %s" %
+ (item.change, change_queue))
+ change_queue.enqueueItem(item)
+ self.reportStats(item)
+ return True
+ else:
+ self.log.error("Unable to find change queue for project %s" %
+ item.change.project)
+ return False
def addChange(self, change, quiet=False, enqueue_time=None,
- ignore_requirements=False):
+ ignore_requirements=False, live=True,
+ change_queue=None):
self.log.debug("Considering adding change %s" % change)
- if self.isChangeAlreadyInQueue(change):
- self.log.debug("Change %s is already in queue, ignoring" % change)
+
+ # If we are adding a live change, check if it's a live item
+ # anywhere in the pipeline. Otherwise, we will perform the
+ # duplicate check below on the specific change_queue.
+ if live and self.isChangeAlreadyInPipeline(change):
+ self.log.debug("Change %s is already in pipeline, "
+ "ignoring" % change)
return True
if not self.isChangeReadyToBeEnqueued(change):
@@ -1179,16 +1208,24 @@
"requirement %s" % (change, f))
return False
- if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
- self.log.debug("Failed to enqueue changes ahead of %s" % change)
- return False
+ with self.getChangeQueue(change, change_queue) as change_queue:
+ if not change_queue:
+ self.log.debug("Unable to find change queue for "
+ "change %s in project %s" %
+ (change, change.project))
+ return False
- if self.isChangeAlreadyInQueue(change):
- self.log.debug("Change %s is already in queue, ignoring" % change)
- return True
+ if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
+ change_queue):
+ self.log.debug("Failed to enqueue changes "
+ "ahead of %s" % change)
+ return False
- change_queue = self.pipeline.getQueue(change.project)
- if change_queue:
+ if self.isChangeAlreadyInQueue(change, change_queue):
+ self.log.debug("Change %s is already in queue, "
+ "ignoring" % change)
+ return True
+
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if not quiet:
@@ -1197,30 +1234,26 @@
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
+ item.live = live
self.reportStats(item)
- self.enqueueChangesBehind(change, quiet, ignore_requirements)
+ self.enqueueChangesBehind(change, quiet, ignore_requirements,
+ change_queue)
self.sched.triggers['zuul'].onChangeEnqueued(item.change,
self.pipeline)
- else:
- self.log.error("Unable to find change queue for project %s" %
- change.project)
- return False
+ return True
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
- change_queue = self.pipeline.getQueue(item.change.project)
- change_queue.dequeueItem(item)
+ item.queue.dequeueItem(item)
- def removeChange(self, change):
- # Remove a change from the queue, probably because it has been
+ def removeItem(self, item):
+ # Remove an item from the queue, probably because it has been
# superseded by another change.
- for item in self.pipeline.getAllItems():
- if item.change == change:
- self.log.debug("Canceling builds behind change: %s "
- "because it is being removed." % item.change)
- self.cancelJobs(item)
- self.dequeueItem(item)
- self.reportStats(item)
+ self.log.debug("Canceling builds behind change: %s "
+ "because it is being removed." % item.change)
+ self.cancelJobs(item)
+ self.dequeueItem(item)
+ self.reportStats(item)
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by
@@ -1320,10 +1353,12 @@
def _processOneItem(self, item, nnfi, ready_ahead):
changed = False
item_ahead = item.item_ahead
- change_queue = self.pipeline.getQueue(item.change.project)
+ if item_ahead and (not item_ahead.live):
+ item_ahead = None
+ change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
- if self.checkForChangesNeededBy(item.change) is not True:
+ if self.checkForChangesNeededBy(item.change, change_queue) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
@@ -1335,17 +1370,16 @@
except MergeFailure:
pass
return (True, nnfi, ready_ahead)
- dep_item = self.getFailingDependentItem(item)
+ dep_items = self.getFailingDependentItems(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
- if dep_item:
+ if dep_items:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
else:
item_ahead_merged = False
- if ((item_ahead and item_ahead.change.is_merged) or
- not change_queue.dependent):
+ if (item_ahead and item_ahead.change.is_merged):
item_ahead_merged = True
if (item_ahead != nnfi and not item_ahead_merged):
# Our current base is different than what we expected,
@@ -1368,7 +1402,12 @@
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
- if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
+ if (not item.live) and (not item.items_behind):
+ failing_reasons.append("is a non-live item with no items behind")
+ self.dequeueItem(item)
+ changed = True
+ if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
+ and item.live):
try:
self.reportItem(item)
except MergeFailure:
@@ -1380,7 +1419,7 @@
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = True
- elif not failing_reasons:
+ elif not failing_reasons and item.live:
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
@@ -1464,7 +1503,7 @@
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
- change_queue = self.pipeline.getQueue(item.change.project)
+ change_queue = item.queue
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
@@ -1534,7 +1573,7 @@
else:
url_pattern = None
- for job in self.pipeline.getJobs(item.change):
+ for job in self.pipeline.getJobs(item):
build = item.current_build_set.getBuild(job.name)
result = build.result
pattern = url_pattern
@@ -1714,6 +1753,18 @@
self.log.exception("Exception reporting pipeline stats")
+class DynamicChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ if self.change_queue and not self.change_queue.queue:
+ self.change_queue.pipeline.removeQueue(self.change_queue.queue)
+
+
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
@@ -1721,11 +1772,86 @@
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
- change_queue = ChangeQueue(self.pipeline, dependent=False)
- for project in self.pipeline.getProjects():
- change_queue.addProject(project)
-
+ def getChangeQueue(self, change, existing=None):
+ # creates a new change queue for every change
+ if existing:
+ return DynamicChangeQueueContextManager(existing)
+ if change.project not in self.pipeline.getProjects():
+ return DynamicChangeQueueContextManager(None)
+ change_queue = ChangeQueue(self.pipeline)
+ change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
+ return DynamicChangeQueueContextManager(change_queue)
+
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ ret = self.checkForChangesNeededBy(change, change_queue)
+ if ret in [True, False]:
+ return ret
+ self.log.debug(" Changes %s must be merged ahead of %s" %
+ (ret, change))
+ for needed_change in ret:
+ # This differs from the dependent pipeline by enqueuing
+ # changes ahead as "not live", that is, not intended to
+ # have jobs run. Also, pipeline requirements are always
+ # ignored (which is safe because the changes are not
+ # live).
+ r = self.addChange(needed_change, quiet=True,
+ ignore_requirements=True,
+ live=False, change_queue=change_queue)
+ if not r:
+ return False
+ return True
+
+ def checkForChangesNeededBy(self, change, change_queue):
+ if self.pipeline.ignore_dependencies:
+ return True
+ self.log.debug("Checking for changes needed by %s:" % change)
+ # Return true if okay to proceed enqueing this change,
+ # false if the change should not be enqueued.
+ if not hasattr(change, 'needs_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return True
+ if not change.needs_changes:
+ self.log.debug(" No changes needed")
+ return True
+ changes_needed = []
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
+ continue
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
+ self.log.debug(" Needed change is already ahead in the queue")
+ continue
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # This differs from the dependent pipeline check in not
+ # verifying that the dependent change is mergable.
+ if changes_needed:
+ return changes_needed
+ return True
+
+ def dequeueItem(self, item):
+ super(IndependentPipelineManager, self).dequeueItem(item)
+ # An independent pipeline manager dynamically removes empty
+ # queues
+ if not item.queue.queue:
+ self.pipeline.removeQueue(item.queue)
+
+
+class StaticChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ pass
class DependentPipelineManager(BasePipelineManager):
@@ -1787,6 +1913,12 @@
new_change_queues.append(a)
return new_change_queues
+ def getChangeQueue(self, change, existing=None):
+ if existing:
+ return StaticChangeQueueContextManager(existing)
+ return StaticChangeQueueContextManager(
+ self.pipeline.getQueue(change.project))
+
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
@@ -1794,71 +1926,111 @@
return False
return True
- def enqueueChangesBehind(self, change, quiet, ignore_requirements):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return
- for needs in change.needed_by_changes:
- if self.pipeline.source.canMerge(needs,
+ for other_change in change.needed_by_changes:
+ with self.getChangeQueue(other_change) as other_change_queue:
+ if other_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s can not be "
+ "enqueued in the target queue %s" %
+ (other_change, other_change.project,
+ change_queue))
+ continue
+ if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
- (needs, change))
- to_enqueue.append(needs)
+ (other_change, change))
+ to_enqueue.append(other_change)
+
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, quiet=quiet,
- ignore_requirements=ignore_requirements)
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
- def enqueueChangesAhead(self, change, quiet, ignore_requirements):
- ret = self.checkForChangesNeededBy(change)
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
- self.log.debug(" Change %s must be merged ahead of %s" %
+ self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
- return self.addChange(ret, quiet=quiet,
- ignore_requirements=ignore_requirements)
+ for needed_change in ret:
+ r = self.addChange(needed_change, quiet=quiet,
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
+ if not r:
+ return False
+ return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
- if not hasattr(change, 'needs_change'):
+ if not hasattr(change, 'needs_changes'):
self.log.debug(" Changeish does not support dependencies")
return True
- if not change.needs_change:
+ if not change.needs_changes:
self.log.debug(" No changes needed")
return True
- if change.needs_change.is_merged:
- self.log.debug(" Needed change is merged")
- return True
- if not change.needs_change.is_current_patchset:
- self.log.debug(" Needed change is not the current patchset")
- return False
- if self.isChangeAlreadyInQueue(change.needs_change):
- self.log.debug(" Needed change is already ahead in the queue")
- return True
- if self.pipeline.source.canMerge(change.needs_change,
- self.getSubmitAllowNeeds()):
- self.log.debug(" Change %s is needed" %
- change.needs_change)
- return change.needs_change
- # The needed change can't be merged.
- self.log.debug(" Change %s is needed but can not be merged" %
- change.needs_change)
- return False
+ changes_needed = []
+ # Ignore supplied change_queue
+ with self.getChangeQueue(change) as change_queue:
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
+ continue
+ with self.getChangeQueue(needed_change) as needed_change_queue:
+ if needed_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s does not "
+ "share a change queue with %s "
+ "in project %s" %
+ (needed_change, needed_change.project,
+ change, change.project))
+ return False
+ if not needed_change.is_current_patchset:
+ self.log.debug(" Needed change is not the "
+ "current patchset")
+ return False
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
+ self.log.debug(" Needed change is already ahead "
+ "in the queue")
+ continue
+ if self.pipeline.source.canMerge(needed_change,
+ self.getSubmitAllowNeeds()):
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # The needed change can't be merged.
+ self.log.debug(" Change %s is needed but can not be merged" %
+ needed_change)
+ return False
+ if changes_needed:
+ return changes_needed
+ return True
- def getFailingDependentItem(self, item):
- if not hasattr(item.change, 'needs_change'):
+ def getFailingDependentItems(self, item):
+ if not hasattr(item.change, 'needs_changes'):
return None
- if not item.change.needs_change:
+ if not item.change.needs_changes:
return None
- needs_item = self.getItemForChange(item.change.needs_change)
- if not needs_item:
- return None
- if needs_item.current_build_set.failing_reasons:
- return needs_item
+ failing_items = set()
+ for needed_change in item.change.needs_changes:
+ needed_item = self.getItemForChange(needed_change)
+ if not needed_item:
+ continue
+ if needed_item.current_build_set.failing_reasons:
+ failing_items.add(needed_item)
+ if failing_items:
+ return failing_items
return None
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 6782534..c5fdf9a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -13,6 +13,7 @@
# under the License.
import logging
+import re
import threading
import time
import urllib2
@@ -93,7 +94,6 @@
refresh=True)
self.sched.addEvent(event)
- self.gerrit.eventDone()
def run(self):
while True:
@@ -103,6 +103,8 @@
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
+ finally:
+ self.gerrit.eventDone()
class Gerrit(object):
@@ -111,6 +113,9 @@
replication_timeout = 300
replication_retry_interval = 5
+ depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
+ re.MULTILINE | re.IGNORECASE)
+
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
@@ -304,7 +309,7 @@
change = NullChange(project)
return change
- def _getChange(self, number, patchset, refresh=False):
+ def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
@@ -318,7 +323,7 @@
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
- self.updateChange(change)
+ self.updateChange(change, history)
except Exception:
del self._change_cache[key]
raise
@@ -342,7 +347,22 @@
(record.get('number'),))
return changes
- def updateChange(self, change):
+ def _getDependsOnFromCommit(self, message):
+ records = []
+ seen = set()
+ for match in self.depends_on_re.findall(message):
+ if match in seen:
+ self.log.debug("Ignoring duplicate Depends-On: %s" %
+ (match,))
+ continue
+ seen.add(match)
+ query = "change:%s" % (match,)
+ self.log.debug("Running query %s to find needed changes" %
+ (query,))
+ records.extend(self.gerrit.simpleQuery(query))
+ return records
+
+ def updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
@@ -382,13 +402,36 @@
# for dependencies.
return change
- change.needs_change = None
+ if history is None:
+ history = []
+ else:
+ history = history[:]
+ history.append(change.number)
+
+ change.needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
- dep = self._getChange(dep_num, dep_ps)
- if not dep.is_merged:
- change.needs_change = dep
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting git-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in change.needs_changes:
+ change.needs_changes.append(dep)
+
+ for record in self._getDependsOnFromCommit(data['commitMessage']):
+ dep_num = record['number']
+ dep_ps = record['currentPatchSet']['number']
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting commit-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in change.needs_changes:
+ change.needs_changes.append(dep)
change.needed_by_changes = []
if 'neededBy' in data:
@@ -396,7 +439,7 @@
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
- if not dep.is_merged and dep.is_current_patchset:
+ if (not dep.is_merged) and dep.is_current_patchset:
change.needed_by_changes.append(dep)
return change