Merge "On null changes serialize the id as null"
diff --git a/NEWS.rst b/NEWS.rst
index f6500f0..c4901a8 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -25,15 +25,14 @@
triggers later). See the sample layout.yaml and Zuul section of the
documentation.
-* The default behavior is now to immediately dequeue changes that have
- merge conflicts, even those not at the head of the queue. To enable
- the old behavior (which would wait until the conflicting change was
- at the head before dequeuing it), see the new "dequeue-on-conflict"
- option.
-
* Some statsd keys have changed in a backwards incompatible way:
* The counters and timers of the form zuul.job.{name} is now split
into several keys of the form:
zuul.pipeline.{pipeline-name}.job.{job-name}.{result}
* Job names in statsd keys now have the '_' character substituted
for the '.' character.
+
+* The layout.yaml structure has changed to introduce configurable
+ reporters. This requires restructuring the start/success/failure
+ actions to include a dictionary of reporters and their parameters.
+ See reporters in the docs and layout.yaml-sample.
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 039cffa..5a0c7b9 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -21,6 +21,7 @@
gating
triggers
launchers
+ reporters
zuul
Indices and tables
diff --git a/doc/source/reporters.rst b/doc/source/reporters.rst
new file mode 100644
index 0000000..18d35a1
--- /dev/null
+++ b/doc/source/reporters.rst
@@ -0,0 +1,59 @@
+:title: Reporters
+
+Reporters
+=========
+
+Zuul can communicate results and progress back to configurable
+protocols. For example, after succeeding in a build a pipeline can be
+configured to post a positive review back to gerrit.
+
+There are three stages when a report can be handled. That is on:
+Start, Success or Failure. Each stage can have multiple reports.
+For example, you can set verified on gerrit and send an email.
+
+Gerrit
+------
+
+Zuul works with standard versions of Gerrit by invoking the
+``gerrit`` command over an SSH connection. It reports back to
+Gerrit using SSH.
+
+The dictionary passed to the gerrit reporter is used for ``gerrit
+review`` arguments, with the boolean value of ``true`` simply
+indicating that the argument should be present without following it
+with a value. For example, ``verified: 1`` becomes ``gerrit review
+--verified 1`` and ``submit: true`` becomes ``gerrit review
+--submit``.
+
+Gerrit Configuration
+~~~~~~~~~~~~~~~~~~~~
+
+The configuration for posting back to gerrit is shared with the gerrit
+trigger in zuul.conf as described in :ref:`zuulconf`.
+
+SMTP
+----
+
+A simple email reporter is also available.
+
+SMTP Configuration
+~~~~~~~~~~~~~~~~~~
+
+zuul.conf contains the smtp server and default to/from as describe
+in :ref:`zuulconf`.
+
+Each pipeline can overwrite the to or from address by providing
+alternatives as arguments to the reporter. For example, ::
+
+ pipelines:
+ - name: post-merge
+ manager: IndependentPipelineManager
+ trigger:
+ - event: change-merged
+ success:
+ smtp:
+ to: you@example.com
+ failure:
+ smtp:
+ to: you@example.com
+ from: alternative@example.com
diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst
index 0bd07de..17c93ab 100644
--- a/doc/source/triggers.rst
+++ b/doc/source/triggers.rst
@@ -69,6 +69,7 @@
create = group CI Tools
push = +force CI Tools
pushMerge = group CI Tools
+ forgeAuthor = group CI Tools
[access "refs/for/refs/zuul/*"]
pushMerge = group CI Tools
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 73ebf71..6adfa30 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -139,6 +139,23 @@
is included). Defaults to ``false``.
``job_name_in_report=true``
+smtp
+""""
+
+**server**
+ SMTP server hostname or address to use.
+ ``server=localhost``
+
+**default_from**
+ Who the email should appear to be sent from when emailing the report.
+ This can be overridden by individual pipelines.
+ ``default_from=zuul@example.com``
+
+**default_to**
+ Who the report should be emailed to by default.
+ This can be overridden by individual pipelines.
+ ``default_to=you@example.com``
+
layout.yaml
~~~~~~~~~~~
@@ -329,25 +346,15 @@
well. To suppress this behavior (and allow jobs to continue
running), set this to ``false``. Default: ``true``.
-**dequeue-on-conflict**
- Normally, if there is a merge conflict or similar error with a
- change, Zuul will immediately remove it from the queue, even if the
- error is only due to a change that happened to be enqueued ahead of
- it. If you would like to keep the change in the queue until it is
- at the head to be certain that the merge conflict is intrinsic to
- the change, set this to ``false``. Default: ``true``.
-
**success**
- Describes what Zuul should do if all the jobs complete successfully.
+ Describes where Zuul should report to if all the jobs complete
+ successfully.
This section is optional; if it is omitted, Zuul will run jobs and
do nothing on success; it will not even report a message to Gerrit.
- If the section is present, it will leave a message on the Gerrit
- review. Each additional argument is assumed to be an argument to
- ``gerrit review``, with the boolean value of ``true`` simply
- indicating that the argument should be present without following it
- with a value. For example, ``verified: 1`` becomes ``gerrit
- review --verified 1`` and ``submit: true`` becomes ``gerrit review
- --submit``.
+ If the section is present, the listed reporter plugins will be
+ asked to report on the jobs.
+ Each reporter's value dictionary is handled by the reporter. See
+ reporters for more details.
**failure**
Uses the same syntax as **success**, but describes what Zuul should
@@ -373,9 +380,11 @@
trigger:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
This will trigger jobs each time a new patchset (or change) is
uploaded to Gerrit, and report +/-1 values to Gerrit in the
@@ -388,10 +397,12 @@
approval:
- approved: 1
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 2
+ submit: true
failure:
- verified: -2
+ gerrit:
+ verified: -2
This will trigger jobs whenever a reviewer leaves a vote of ``1`` in the
``approved`` review category in Gerrit (a non-standard category).
@@ -425,9 +436,11 @@
trigger:
- event: change-merged
success:
- force-message: True
+ gerrit:
+ force-message: True
failure:
- force-message: True
+ gerrit:
+ force-message: True
The ``change-merged`` events happen when a change has been merged in the git
repository. The change is thus closed and Gerrit will not accept modifications
@@ -558,6 +571,22 @@
**name**
The name of the project (as known by Gerrit).
+**merge-mode (optional)**
+ An optional value that indicates what strategy should be used to
+ merge changes to this project. Supported values are:
+
+ ** merge-resolve **
+ Equivalent to 'git merge -s resolve'. This corresponds closely to
+ what Gerrit performs (using JGit) for a project if the "Merge if
+ necessary" merge mode is selected and "Automatically resolve
+ conflicts" is checked. This is the default.
+
+ ** merge **
+ Equivalent to 'git merge'.
+
+ ** cherry-pick **
+ Equivalent to 'git cherry-pick'.
+
This is followed by a section for each of the pipelines defined above.
Pipelines may be omitted if no jobs should run for this project in a
given pipeline. Within the pipeline section, the jobs that should be
diff --git a/etc/layout.yaml-sample b/etc/layout.yaml-sample
index b49f5d5..30a3352 100644
--- a/etc/layout.yaml-sample
+++ b/etc/layout.yaml-sample
@@ -5,9 +5,11 @@
gerrit:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: tests
manager: IndependentPipelineManager
@@ -16,9 +18,11 @@
- event: patchset-created
email_filter: ^.*@example.org$
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: post
manager: IndependentPipelineManager
@@ -35,12 +39,14 @@
approval:
- approved: 1
start:
- verified: 0
+ gerrit:
+ verified: 0
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 1
failure:
- verified: -2
+ gerrit:
+ verified: -1
jobs:
- name: ^.*-merge$
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index edf1044..c193727 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -19,3 +19,9 @@
;git_user_email=zuul@example.com
;git_user_name=zuul
status_url=https://jenkins.example.com/zuul/status
+
+[smtp]
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 9760a79..ddee844 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,9 +9,10 @@
paramiko
GitPython>=0.3.2.RC1
lockfile
+ordereddict
python-daemon
extras
statsd>=1.0.0,<3.0
voluptuous>=0.6,<0.7
-gear>=0.3.1,<0.4.0
+gear>=0.4.0,<1.0.0
apscheduler>=2.1.1,<3.0
diff --git a/tests/fixtures/layout-delayed-repo-init.yaml b/tests/fixtures/layout-delayed-repo-init.yaml
index e0613f1..6caf622 100644
--- a/tests/fixtures/layout-delayed-repo-init.yaml
+++ b/tests/fixtures/layout-delayed-repo-init.yaml
@@ -5,9 +5,11 @@
gerrit:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: post
manager: IndependentPipelineManager
@@ -25,12 +27,15 @@
approval:
- approved: 1
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 2
+ submit: true
failure:
- verified: -2
+ gerrit:
+ verified: -2
start:
- verified: 0
+ gerrit:
+ verified: 0
precedence: high
projects:
diff --git a/tests/fixtures/layout-live-reconfiguration-functions.yaml b/tests/fixtures/layout-live-reconfiguration-functions.yaml
index f477a12..e261a88 100644
--- a/tests/fixtures/layout-live-reconfiguration-functions.yaml
+++ b/tests/fixtures/layout-live-reconfiguration-functions.yaml
@@ -11,12 +11,15 @@
approval:
- approved: 1
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 2
+ submit: true
failure:
- verified: -2
+ gerrit:
+ verified: -2
start:
- verified: 0
+ gerrit:
+ verified: 0
precedence: high
jobs:
diff --git a/tests/fixtures/layout-smtp.yaml b/tests/fixtures/layout-smtp.yaml
new file mode 100644
index 0000000..813857b
--- /dev/null
+++ b/tests/fixtures/layout-smtp.yaml
@@ -0,0 +1,25 @@
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ start:
+ smtp:
+ to: you@example.com
+ success:
+ gerrit:
+ verified: 1
+ smtp:
+ to: alternative_me@example.com
+ from: zuul_from@example.com
+ failure:
+ gerrit:
+ verified: -1
+
+projects:
+ - name: org/project
+ check:
+ - project-merge:
+ - project-test1
+ - project-test2
diff --git a/tests/fixtures/layout-timer.yaml b/tests/fixtures/layout-timer.yaml
index e24fb13..9e0f66b 100644
--- a/tests/fixtures/layout-timer.yaml
+++ b/tests/fixtures/layout-timer.yaml
@@ -5,9 +5,11 @@
gerrit:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: periodic
manager: IndependentPipelineManager
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 675d351..dc659fb 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -8,9 +8,11 @@
gerrit:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: post
manager: IndependentPipelineManager
@@ -28,12 +30,15 @@
approval:
- approved: 1
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 2
+ submit: true
failure:
- verified: -2
+ gerrit:
+ verified: -2
start:
- verified: 0
+ gerrit:
+ verified: 0
precedence: high
- name: unused
@@ -51,9 +56,11 @@
gerrit:
- event: change-restored
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: dup2
manager: IndependentPipelineManager
@@ -61,12 +68,13 @@
gerrit:
- event: change-restored
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: conflict
- dequeue-on-conflict: false
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
@@ -75,12 +83,15 @@
approval:
- approved: 1
success:
- verified: 2
- submit: true
+ gerrit:
+ verified: 2
+ submit: true
failure:
- verified: -2
+ gerrit:
+ verified: -2
start:
- verified: 0
+ gerrit:
+ verified: 0
jobs:
- name: ^.*-merge$
diff --git a/tests/fixtures/layouts/good_layout.yaml b/tests/fixtures/layouts/good_layout.yaml
index 15be6ef..4bd5e70 100644
--- a/tests/fixtures/layouts/good_layout.yaml
+++ b/tests/fixtures/layouts/good_layout.yaml
@@ -8,9 +8,11 @@
gerrit:
- event: patchset-created
success:
- verified: 1
+ gerrit:
+ verified: 1
failure:
- verified: -1
+ gerrit:
+ verified: -1
- name: post
manager: IndependentPipelineManager
@@ -28,15 +30,18 @@
- event: comment-added
approval:
- approved: 1
- success:
- verified: 2
- code-review: 1
- submit: true
- failure:
- verified: -2
- workinprogress: true
start:
- verified: 0
+ gerrit:
+ verified: 0
+ success:
+ gerrit:
+ verified: 2
+ code-review: 1
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ workinprogress: true
jobs:
- name: ^.*-merge$
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index 57eca51..081258a 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -14,3 +14,9 @@
push_change_refs=true
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
+
+[smtp]
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
\ No newline at end of file
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a92efea..70956b4 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -44,6 +44,8 @@
import zuul.scheduler
import zuul.webapp
import zuul.launcher.gearman
+import zuul.reporter.gerrit
+import zuul.reporter.smtp
import zuul.trigger.gerrit
import zuul.trigger.timer
@@ -388,6 +390,8 @@
class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
+ name = 'gerrit'
+
def __init__(self, upstream_root, *args):
super(FakeGerritTrigger, self).__init__(*args)
self.upstream_root = upstream_root
@@ -443,6 +447,7 @@
self.aborted = False
self.created = time.time()
self.description = ''
+ self.run_error = False
def release(self):
self.wait_condition.acquire()
@@ -492,7 +497,13 @@
if self.aborted:
result = 'ABORTED'
- data = {'result': result}
+ if self.run_error:
+ work_fail = True
+ result = 'RUN_ERROR'
+ else:
+ data['result'] = result
+ work_fail = False
+
changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
changes = self.parameters['ZUUL_CHANGE_IDS']
@@ -504,7 +515,11 @@
pipeline=self.parameters['ZUUL_PIPELINE'])
)
- self.job.sendWorkComplete(json.dumps(data))
+ self.job.sendWorkData(json.dumps(data))
+ if work_fail:
+ self.job.sendWorkFail()
+ else:
+ self.job.sendWorkComplete(json.dumps(data))
del self.worker.gearman_jobs[self.job.unique]
self.worker.running_builds.remove(self)
self.worker.lock.release()
@@ -668,6 +683,35 @@
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
+class FakeSMTP(object):
+ log = logging.getLogger('zuul.FakeSMTP')
+ messages = []
+
+ def __init__(self, server, port):
+ self.server = server
+ self.port = port
+
+ def sendmail(self, from_email, to_email, msg):
+ self.log.info("Sending email from %s, to %s, with msg %s" % (
+ from_email, to_email, msg))
+
+ headers = msg.split('\n\n', 1)[0]
+ body = msg.split('\n\n', 1)[1]
+
+ FakeSMTP.messages.append(dict(
+ from_email=from_email,
+ to_email=to_email,
+ msg=msg,
+ headers=headers,
+ body=body,
+ ))
+
+ return True
+
+ def quit(self):
+ return True
+
+
class TestScheduler(testtools.TestCase):
log = logging.getLogger("zuul.test")
@@ -751,6 +795,7 @@
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
zuul.lib.gerrit.Gerrit = FakeGerrit
+ self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTP))
self.gerrit = FakeGerritTrigger(
self.upstream_root, self.config, self.sched)
@@ -766,6 +811,14 @@
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
+ self.sched.registerReporter(
+ zuul.reporter.gerrit.Reporter(self.gerrit))
+ self.smtp_reporter = zuul.reporter.smtp.Reporter(
+ self.config.get('smtp', 'default_from'),
+ self.config.get('smtp', 'default_to'),
+ self.config.get('smtp', 'server'))
+ self.sched.registerReporter(self.smtp_reporter)
+
self.sched.start()
self.sched.reconfigure(self.config)
self.sched.resume()
@@ -1015,9 +1068,6 @@
print 'pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue)
self.assertEqual(len(queue.queue), 0)
- if len(queue.severed_heads) != 0:
- print 'heads', queue.severed_heads
- self.assertEqual(len(queue.severed_heads), 0)
def assertReportedStat(self, key, value=None, kind=None):
start = time.time()
@@ -1070,6 +1120,14 @@
self.assertReportedStat(
'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+ def test_initial_pipeline_gauges(self):
+ "Test that each pipeline reported its length on start"
+ pipeline_names = self.sched.layout.pipelines.keys()
+ self.assertNotEqual(len(pipeline_names), 0)
+ for name in pipeline_names:
+ self.assertReportedStat('zuul.pipeline.%s.current_changes' % name,
+ value='0|g')
+
def test_duplicate_pipelines(self):
"Test that a change matching multiple pipelines works"
@@ -1305,6 +1363,82 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def test_failed_change_in_middle(self):
+ "Test a failed change in the middle of the queue"
+
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.worker.addFailTest('project-test1', B)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 6)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+ self.assertEqual(self.builds[4].name, 'project-test1')
+ self.assertEqual(self.builds[5].name, 'project-test2')
+
+ self.release(self.builds[2])
+ self.waitUntilSettled()
+
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-merge for C (without B)
+ self.assertEqual(len(self.builds), 4)
+ self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-test1 and project-test2 for C
+ self.assertEqual(len(self.builds), 5)
+
+ items = self.sched.layout.pipelines['gate'].getAllItems()
+ builds = items[0].current_build_set.getBuilds()
+ self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+ self.assertEqual(self.countJobResults(builds, None), 2)
+ builds = items[1].current_build_set.getBuilds()
+ self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+ self.assertEqual(self.countJobResults(builds, 'FAILURE'), 1)
+ self.assertEqual(self.countJobResults(builds, None), 1)
+ builds = items[2].current_build_set.getBuilds()
+ self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
+ self.assertEqual(self.countJobResults(builds, None), 2)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 0)
+ self.assertEqual(len(self.history), 12)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.reported, 2)
+
def test_failed_change_at_head_with_queue(self):
"Test that if a change at the head fails, queued jobs are canceled"
@@ -1367,6 +1501,111 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def test_two_failed_changes_at_head(self):
+ "Test that changes are reparented correctly if 2 fail at head"
+
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.worker.addFailTest('project-test1', A)
+ self.worker.addFailTest('project-test1', B)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 6)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+ self.assertEqual(self.builds[4].name, 'project-test1')
+ self.assertEqual(self.builds[5].name, 'project-test2')
+
+ self.assertTrue(self.job_has_changes(self.builds[0], A))
+ self.assertTrue(self.job_has_changes(self.builds[2], A))
+ self.assertTrue(self.job_has_changes(self.builds[2], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], A))
+ self.assertTrue(self.job_has_changes(self.builds[4], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], C))
+
+ # Fail change B first
+ self.release(self.builds[2])
+ self.waitUntilSettled()
+
+ # restart of C after B failure
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 5)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test2')
+ self.assertEqual(self.builds[3].name, 'project-test1')
+ self.assertEqual(self.builds[4].name, 'project-test2')
+
+ self.assertTrue(self.job_has_changes(self.builds[1], A))
+ self.assertTrue(self.job_has_changes(self.builds[2], A))
+ self.assertTrue(self.job_has_changes(self.builds[2], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], A))
+ self.assertFalse(self.job_has_changes(self.builds[4], B))
+ self.assertTrue(self.job_has_changes(self.builds[4], C))
+
+ # Finish running all passing jobs for change A
+ self.release(self.builds[1])
+ self.waitUntilSettled()
+ # Fail and report change A
+ self.release(self.builds[0])
+ self.waitUntilSettled()
+
+ # restart of B,C after A failure
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 4)
+ self.assertEqual(self.builds[0].name, 'project-test1') # B
+ self.assertEqual(self.builds[1].name, 'project-test2') # B
+ self.assertEqual(self.builds[2].name, 'project-test1') # C
+ self.assertEqual(self.builds[3].name, 'project-test2') # C
+
+ self.assertFalse(self.job_has_changes(self.builds[1], A))
+ self.assertTrue(self.job_has_changes(self.builds[1], B))
+ self.assertFalse(self.job_has_changes(self.builds[1], C))
+
+ self.assertFalse(self.job_has_changes(self.builds[2], A))
+ # After A failed and B and C restarted, B should be back in
+ # C's tests because it has not failed yet.
+ self.assertTrue(self.job_has_changes(self.builds[2], B))
+ self.assertTrue(self.job_has_changes(self.builds[2], C))
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 0)
+ self.assertEqual(len(self.history), 21)
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.reported, 2)
+
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1539,8 +1778,9 @@
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.waitUntilSettled()
- queue = self.gearman_server.getQueue()
- self.getParameter(queue[-1], 'ZUUL_REF')
+
+ self.assertEqual(len(self.history), 2) # A and C merge jobs
+
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
@@ -1551,32 +1791,7 @@
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
-
- def test_dequeue_conflict(self):
- "Test that the option to dequeue merge conflicts works"
-
- self.gearman_server.hold_jobs_in_queue = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addPatchset(['conflict'])
- B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
- B.addPatchset(['conflict'])
- A.addApproval('CRVW', 2)
- B.addApproval('CRVW', 2)
- self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
- self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
- self.waitUntilSettled()
-
- self.assertEqual(A.reported, 1)
- self.assertEqual(B.reported, 2)
-
- self.gearman_server.hold_jobs_in_queue = False
- self.gearman_server.release()
- self.waitUntilSettled()
-
- self.assertEqual(A.data['status'], 'MERGED')
- self.assertEqual(B.data['status'], 'NEW')
- self.assertEqual(A.reported, 2)
- self.assertEqual(B.reported, 2)
+ self.assertEqual(len(self.history), 6)
def test_post(self):
"Test that post jobs run"
@@ -1760,6 +1975,66 @@
self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 1)
+ def test_failing_dependent_changes(self):
+ "Test that failing dependent patches are taken out of stream"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+ E.addApproval('CRVW', 2)
+
+ # E, D -> C -> B, A
+
+ D.setDependsOn(C, 1)
+ C.setDependsOn(B, 1)
+
+ self.worker.addFailTest('project-test1', B)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.worker.hold_jobs_in_build = False
+ for build in self.builds:
+ if build.parameters['ZUUL_CHANGE'] != '1':
+ build.release()
+ self.waitUntilSettled()
+
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.data['status'], 'NEW')
+ self.assertEqual(C.reported, 2)
+ self.assertEqual(D.data['status'], 'NEW')
+ self.assertEqual(D.reported, 2)
+ self.assertEqual(E.data['status'], 'MERGED')
+ self.assertEqual(E.reported, 2)
+ self.assertEqual(len(self.history), 18)
+
def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra
@@ -2314,6 +2589,21 @@
self.assertEqual(D.data['status'], 'MERGED')
self.assertEqual(D.reported, 2)
+ def test_rerun_on_error(self):
+ "Test that if a worker fails to run a job, it is run again"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.builds[0].run_error = True
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+ self.assertEqual(self.countJobResults(self.history, 'RUN_ERROR'), 1)
+ self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
def test_statsd(self):
"Test each of the statsd methods used in the scheduler"
import extras
@@ -2563,3 +2853,34 @@
status_jobs.add(job['name'])
self.assertIn('project-bitrot-stable-old', status_jobs)
self.assertIn('project-bitrot-stable-older', status_jobs)
+
+ def test_check_smtp_pool(self):
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-smtp.yaml')
+ self.sched.reconfigure(self.config)
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.waitUntilSettled()
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(FakeSMTP.messages), 2)
+
+ # A.messages only holds what FakeGerrit places in it. Thus we
+ # work on the knowledge of what the first message should be as
+ # it is only configured to go to SMTP.
+
+ self.assertEqual('zuul@example.com',
+ FakeSMTP.messages[0]['from_email'])
+ self.assertEqual(['you@example.com'],
+ FakeSMTP.messages[0]['to_email'])
+ self.assertEqual('Starting check jobs.',
+ FakeSMTP.messages[0]['body'])
+
+ self.assertEqual('zuul_from@example.com',
+ FakeSMTP.messages[1]['from_email'])
+ self.assertEqual(['alternative_me@example.com'],
+ FakeSMTP.messages[1]['to_email'])
+ self.assertEqual(A.messages[0],
+ FakeSMTP.messages[1]['body'])
diff --git a/tox.ini b/tox.ini
index 8e0ede6..a92fdb2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,11 +1,15 @@
[tox]
-envlist = pep8, pyflakes, py27
+minversion = 1.6
+skipsdist = True
+envlist = pep8, py27
[testenv]
# Set STATSD env variables so that statsd code paths are tested.
setenv = STATSD_HOST=localhost
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
+usedevelop = True
+install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
@@ -21,11 +25,6 @@
commands =
python setup.py testr --coverage
-[testenv:pyflakes]
-deps = pyflakes
- -r{toxinidir}/requirements.txt
-commands = pyflakes zuul setup.py
-
[testenv:venv]
commands = {posargs}
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 75f7dd9..6a699d3 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -103,8 +103,9 @@
def exit_handler(self, signum, frame):
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
- self.stop_gear_server()
self.sched.exit()
+ self.sched.join()
+ self.stop_gear_server()
def term_handler(self, signum, frame):
self.stop_gear_server()
@@ -164,6 +165,8 @@
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.gearman
+ import zuul.reporter.gerrit
+ import zuul.reporter.smtp
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.webapp
@@ -180,10 +183,23 @@
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
+ gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
+ smtp_reporter = zuul.reporter.smtp.Reporter(
+ self.config.get('smtp', 'default_from')
+ if self.config.has_option('smtp', 'default_from') else 'zuul',
+ self.config.get('smtp', 'default_to')
+ if self.config.has_option('smtp', 'default_to') else 'zuul',
+ self.config.get('smtp', 'server')
+ if self.config.has_option('smtp', 'server') else 'localhost',
+ self.config.get('smtp', 'port')
+ if self.config.has_option('smtp', 'port') else 25
+ )
self.sched.setLauncher(gearman)
self.sched.registerTrigger(gerrit)
self.sched.registerTrigger(timer)
+ self.sched.registerReporter(gerrit_reporter)
+ self.sched.registerReporter(smtp_reporter)
self.sched.start()
self.sched.reconfigure(self.config)
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 62683f4..2580397 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -324,6 +324,7 @@
def cancel(self, build):
self.log.info("Cancel build %s for job %s" % (build, build.job))
+ build.canceled = True
if build.number is not None:
self.log.debug("Build %s has already started" % build)
self.cancelRunningBuild(build)
@@ -353,15 +354,16 @@
build = self.builds.get(job.unique)
if build:
- if result is None:
- data = getJobData(job)
- result = data.get('result')
- if result is None:
- result = 'LOST'
- self.log.info("Build %s complete, result %s" %
- (job, result))
- build.result = result
- self.sched.onBuildCompleted(build)
+ if not build.canceled:
+ if result is None:
+ data = getJobData(job)
+ result = data.get('result')
+ if result is None:
+ build.retry = True
+ self.log.info("Build %s complete, result %s" %
+ (job, result))
+ build.result = result
+ self.sched.onBuildCompleted(build)
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
del self.builds[job.unique]
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 7f80d64..0d08f1b 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -54,6 +54,9 @@
trigger = v.Required(v.Any({'gerrit': toList(gerrit_trigger)},
{'timer': toList(timer_trigger)}))
+ report_actions = {'gerrit': variable_dict,
+ 'smtp': variable_dict}
+
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
'precedence': precedence,
@@ -61,11 +64,10 @@
'success-message': str,
'failure-message': str,
'dequeue-on-new-patchset': bool,
- 'dequeue-on-conflict': bool,
'trigger': trigger,
- 'success': variable_dict,
- 'failure': variable_dict,
- 'start': variable_dict,
+ 'success': report_actions,
+ 'failure': report_actions,
+ 'start': report_actions,
}
pipelines = [pipeline]
@@ -161,7 +163,8 @@
self.templates_schemas[t_name] = v.Schema(schema)
project = {'name': str,
- 'merge-mode': v.Any('cherry-pick'),
+ 'merge-mode': v.Any('merge', 'merge-resolve,',
+ 'cherry-pick'),
'template': self.validateTemplateCalls,
}
diff --git a/zuul/merger.py b/zuul/merger.py
index 94db499..218f7f2 100644
--- a/zuul/merger.py
+++ b/zuul/merger.py
@@ -88,11 +88,15 @@
self.fetch(ref)
self.repo.git.cherry_pick("FETCH_HEAD")
- def merge(self, ref):
+ def merge(self, ref, strategy=None):
self._ensure_cloned()
- self.log.debug("Merging %s" % ref)
+ args = []
+ if strategy:
+ args += ['-s', strategy]
+ args.append('FETCH_HEAD')
self.fetch(ref)
- self.repo.git.merge("FETCH_HEAD")
+ self.log.debug("Merging %s with args %s" % (ref, args))
+ self.repo.git.merge(*args)
def fetch(self, ref):
self._ensure_cloned()
@@ -186,7 +190,7 @@
except:
self.log.exception("Unable to update %s", project)
- def _mergeChange(self, change, ref, target_ref, mode):
+ def _mergeChange(self, change, ref, target_ref):
repo = self.getRepo(change.project)
try:
repo.checkout(ref)
@@ -195,13 +199,16 @@
return False
try:
- if not mode:
- mode = change.project.merge_mode
- if mode == model.MERGE_IF_NECESSARY:
+ mode = change.project.merge_mode
+ if mode == model.MERGER_MERGE:
repo.merge(change.refspec)
- elif mode == model.CHERRY_PICK:
+ elif mode == model.MERGER_MERGE_RESOLVE:
+ repo.merge(change.refspec, 'resolve')
+ elif mode == model.MERGER_CHERRY_PICK:
repo.cherryPick(change.refspec)
- except:
+ else:
+ raise Exception("Unsupported merge mode: %s" % mode)
+ except Exception:
# Log exceptions at debug level because they are
# usually benign merge conflicts
self.log.debug("Unable to merge %s" % change, exc_info=True)
@@ -219,7 +226,7 @@
return False
return commit
- def mergeChanges(self, items, target_ref=None, mode=None):
+ def mergeChanges(self, items, target_ref=None):
# Merge shortcuts:
# if this is the only change just merge it against its branch.
# elif there are changes ahead of us that are from the same project and
@@ -244,13 +251,13 @@
return False
commit = self._mergeChange(item.change,
repo.getBranchHead(item.change.branch),
- target_ref=target_ref, mode=mode)
+ target_ref=target_ref)
# Sibling changes exist. Merge current change against newest sibling.
elif (len(sibling_items) >= 2 and
sibling_items[-2].current_build_set.commit):
last_commit = sibling_items[-2].current_build_set.commit
commit = self._mergeChange(item.change, last_commit,
- target_ref=target_ref, mode=mode)
+ target_ref=target_ref)
# Either change did not merge or we did not need to merge as there were
# previous merge conflicts.
if not commit:
diff --git a/zuul/model.py b/zuul/model.py
index 440f3ba..44780f7 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -15,12 +15,21 @@
import re
import time
from uuid import uuid4
+import extras
+
+OrderedDict = extras.try_imports(['collections.OrderedDict',
+ 'ordereddict.OrderedDict'])
-FAST_FORWARD_ONLY = 1
-MERGE_ALWAYS = 2
-MERGE_IF_NECESSARY = 3
-CHERRY_PICK = 4
+MERGER_MERGE = 1 # "git merge"
+MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
+MERGER_CHERRY_PICK = 3 # "git cherry-pick"
+
+MERGER_MAP = {
+ 'merge': MERGER_MERGE,
+ 'merge-resolve': MERGER_MERGE_RESOLVE,
+ 'cherry-pick': MERGER_CHERRY_PICK,
+}
PRECEDENCE_NORMAL = 0
PRECEDENCE_LOW = 1
@@ -42,12 +51,14 @@
self.failure_message = None
self.success_message = None
self.dequeue_on_new_patchset = True
- self.dequeue_on_conflict = True
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.trigger = None
+ self.start_actions = None
+ self.success_actions = None
+ self.failure_actions = None
def __repr__(self):
return '<Pipeline %s>' % self.name
@@ -157,12 +168,15 @@
return True
if build.result != 'SUCCESS':
return True
+
if not item.item_ahead:
return False
return self.isHoldingFollowingChanges(item.item_ahead)
def setResult(self, item, build):
- if build.result != 'SUCCESS':
+ if build.retry:
+ item.removeBuild(build)
+ elif build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.getJobTree(item.change.project)
tree = root.getJobTreeForJob(build.job)
@@ -198,7 +212,6 @@
items = []
for shared_queue in self.queues:
items.extend(shared_queue.queue)
- items.extend(shared_queue.severed_heads)
return items
def formatStatusHTML(self):
@@ -208,8 +221,8 @@
s = 'Change queue: %s' % queue.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
- for head in queue.getHeads():
- ret += self.formatStatus(head, html=True)
+ for item in queue.queue:
+ ret += self.formatStatus(item, html=True)
return ret
def formatStatusJSON(self):
@@ -221,18 +234,21 @@
j_queue = dict(name=queue.name)
j_queues.append(j_queue)
j_queue['heads'] = []
- for head in queue.getHeads():
- j_changes = []
- e = head
- while e:
- j_changes.append(self.formatItemJSON(e))
- if (len(j_changes) > 1 and
- (j_changes[-2]['remaining_time'] is not None) and
- (j_changes[-1]['remaining_time'] is not None)):
- j_changes[-1]['remaining_time'] = max(
- j_changes[-2]['remaining_time'],
- j_changes[-1]['remaining_time'])
- e = e.item_behind
+
+ j_changes = []
+ for e in queue.queue:
+ if not e.item_ahead:
+ if j_changes:
+ j_queue['heads'].append(j_changes)
+ j_changes = []
+ j_changes.append(self.formatItemJSON(e))
+ if (len(j_changes) > 1 and
+ (j_changes[-2]['remaining_time'] is not None) and
+ (j_changes[-1]['remaining_time'] is not None)):
+ j_changes[-1]['remaining_time'] = max(
+ j_changes[-2]['remaining_time'],
+ j_changes[-1]['remaining_time'])
+ if j_changes:
j_queue['heads'].append(j_changes)
return j_pipeline
@@ -247,9 +263,11 @@
changeish.url,
changeish._id())
else:
- ret += '%sProject %s change %s\n' % (indent_str,
- changeish.project.name,
- changeish._id())
+ ret += '%sProject %s change %s based on %s\n' % (
+ indent_str,
+ changeish.project.name,
+ changeish._id(),
+ item.item_ahead)
for job in self.getJobs(changeish):
build = item.current_build_set.getBuild(job.name)
if build:
@@ -270,9 +288,6 @@
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n'
- if item.item_behind:
- ret += '%sFollowed by:\n' % (indent_str)
- ret += self.formatStatus(item.item_behind, indent + 2, html)
return ret
def formatItemJSON(self, item):
@@ -283,6 +298,13 @@
else:
ret['url'] = None
ret['id'] = changeish._id()
+ if item.item_ahead:
+ ret['item_ahead'] = item.item_ahead.change._id()
+ else:
+ ret['item_ahead'] = None
+ ret['items_behind'] = [i.change._id() for i in item.items_behind]
+ ret['failing_reasons'] = item.current_build_set.failing_reasons
+ ret['zuul_ref'] = item.current_build_set.ref
ret['project'] = changeish.project.name
ret['enqueue_time'] = int(item.enqueue_time * 1000)
ret['jobs'] = []
@@ -319,24 +341,34 @@
result=result,
voting=job.voting))
if self.haveAllJobsStarted(item):
- # if a change ahead has failed, we are unknown.
- item_ahead_failed = False
- i = item.item_ahead
- while i:
- if self.didAnyJobFail(i):
- item_ahead_failed = True
- i = None # safe to stop looking
- else:
- i = i.item_ahead
- if item_ahead_failed:
- ret['remaining_time'] = None
- else:
- ret['remaining_time'] = max_remaining
+ ret['remaining_time'] = max_remaining
else:
ret['remaining_time'] = None
return ret
+class ActionReporter(object):
+ """An ActionReporter has a reporter and its configured paramaters"""
+
+ def __repr__(self):
+ return '<ActionReporter %s, %s>' % (self.reporter, self.params)
+
+ def __init__(self, reporter, params):
+ self.reporter = reporter
+ self.params = params
+
+ def report(self, change, message):
+ """Sends the built message off to the configured reporter.
+ Takes the change and message and adds the configured parameters.
+ """
+ return self.reporter.report(change, message, self.params)
+
+ def getSubmitAllowNeeds(self):
+ """Gets the submit allow needs from the reporter based off the
+ parameters."""
+ return self.reporter.getSubmitAllowNeeds(self.params)
+
+
class ChangeQueue(object):
"""DependentPipelines have multiple parallel queues shared by
different projects; this is one of them. For instance, there may
@@ -349,7 +381,6 @@
self.projects = []
self._jobs = set()
self.queue = []
- self.severed_heads = []
self.dependent = dependent
def __repr__(self):
@@ -375,50 +406,50 @@
def enqueueItem(self, item):
if self.dependent and self.queue:
item.item_ahead = self.queue[-1]
- item.item_ahead.item_behind = item
+ item.item_ahead.items_behind.append(item)
self.queue.append(item)
def dequeueItem(self, item):
if item in self.queue:
self.queue.remove(item)
- if item in self.severed_heads:
- self.severed_heads.remove(item)
if item.item_ahead:
- item.item_ahead.item_behind = item.item_behind
- if item.item_behind:
- item.item_behind.item_ahead = item.item_ahead
+ item.item_ahead.items_behind.remove(item)
+ for item_behind in item.items_behind:
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item_behind)
+ item_behind.item_ahead = item.item_ahead
item.item_ahead = None
- item.item_behind = None
+ item.items_behind = []
item.dequeue_time = time.time()
- def addSeveredHead(self, item):
- self.severed_heads.append(item)
+ def moveItem(self, item, item_ahead):
+ if not self.dependent:
+ return False
+ if item.item_ahead == item_ahead:
+ return False
+ # Remove from current location
+ if item.item_ahead:
+ item.item_ahead.items_behind.remove(item)
+ for item_behind in item.items_behind:
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item_behind)
+ item_behind.item_ahead = item.item_ahead
+ # Add to new location
+ item.item_ahead = item_ahead
+ item.items_behind = []
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item)
+ return True
def mergeChangeQueue(self, other):
for project in other.projects:
self.addProject(project)
- def getHead(self):
- if not self.queue:
- return None
- return self.queue[0]
-
- def getHeads(self):
- heads = []
- if self.dependent:
- h = self.getHead()
- if h:
- heads.append(h)
- else:
- heads.extend(self.queue)
- heads.extend(self.severed_heads)
- return heads
-
class Project(object):
def __init__(self, name):
self.name = name
- self.merge_mode = MERGE_IF_NECESSARY
+ self.merge_mode = MERGER_MERGE_RESOLVE
def __str__(self):
return self.name
@@ -536,6 +567,8 @@
self.end_time = None
self.estimated_time = None
self.pipeline = None
+ self.canceled = False
+ self.retry = False
self.parameters = {}
def __repr__(self):
@@ -554,6 +587,7 @@
self.commit = None
self.unable_to_merge = False
self.unable_to_merge_message = None
+ self.failing_reasons = []
def setConfiguration(self):
# The change isn't enqueued until after it's created
@@ -571,6 +605,9 @@
self.builds[build.job.name] = build
build.build_set = self
+ def removeBuild(self, build):
+ del self.builds[build.job.name]
+
def getBuild(self, job_name):
return self.builds.get(job_name)
@@ -591,11 +628,19 @@
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
self.item_ahead = None
- self.item_behind = None
+ self.items_behind = []
self.enqueue_time = None
self.dequeue_time = None
self.reported = False
+ def __repr__(self):
+ if self.pipeline:
+ pipeline = self.pipeline.name
+ else:
+ pipeline = None
+ return '<QueueItem 0x%x for %s in %s>' % (
+ id(self), self.change, pipeline)
+
def resetAllBuilds(self):
old = self.current_build_set
self.current_build_set.result = 'CANCELED'
@@ -608,6 +653,9 @@
self.current_build_set.addBuild(build)
build.pipeline = self.pipeline
+ def removeBuild(self, build):
+ self.current_build_set.removeBuild(build)
+
def setReportedResult(self, result):
self.current_build_set.result = result
@@ -898,7 +946,7 @@
class Layout(object):
def __init__(self):
self.projects = {}
- self.pipelines = {}
+ self.pipelines = OrderedDict()
self.jobs = {}
self.metajobs = []
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/reporter/__init__.py
diff --git a/zuul/reporter/gerrit.py b/zuul/reporter/gerrit.py
new file mode 100644
index 0000000..cceacca
--- /dev/null
+++ b/zuul/reporter/gerrit.py
@@ -0,0 +1,49 @@
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+
+class Reporter(object):
+ """Sends off reports to Gerrit."""
+
+ name = 'gerrit'
+ log = logging.getLogger("zuul.reporter.gerrit.Reporter")
+
+ def __init__(self, trigger):
+ """Set up the reporter."""
+ self.gerrit = trigger.gerrit
+ self.trigger = trigger
+
+ def report(self, change, message, params):
+ """Send a message to gerrit."""
+ self.log.debug("Report change %s, params %s, message: %s" %
+ (change, params, message))
+ if not params:
+ self.log.debug("Not reporting change %s: No params specified." %
+ change)
+ return
+ changeid = '%s,%s' % (change.number, change.patchset)
+ change._ref_sha = self.trigger.getRefSha(change.project.name,
+ 'refs/heads/' + change.branch)
+ return self.gerrit.review(change.project.name, changeid, message,
+ params)
+
+ def getSubmitAllowNeeds(self, params):
+ """Get a list of code review labels that are allowed to be
+ "needed" in the submit records for a change, with respect
+ to this queue. In other words, the list of review labels
+ this reporter itself is likely to set before submitting.
+ """
+ return params
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
new file mode 100644
index 0000000..66dcd45
--- /dev/null
+++ b/zuul/reporter/smtp.py
@@ -0,0 +1,67 @@
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import smtplib
+
+from email.mime.text import MIMEText
+
+
+class Reporter(object):
+ """Sends off reports to emails via SMTP."""
+
+ name = 'smtp'
+ log = logging.getLogger("zuul.reporter.smtp.Reporter")
+
+ def __init__(self, smtp_default_from, smtp_default_to,
+ smtp_server='localhost', smtp_port=25):
+ """Set up the reporter.
+
+ Takes parameters for the smtp server.
+ """
+ self.smtp_server = smtp_server
+ self.smtp_port = smtp_port
+ self.smtp_default_from = smtp_default_from
+ self.smtp_default_to = smtp_default_to
+
+ def report(self, change, message, params):
+ """Send the compiled report message via smtp."""
+ self.log.debug("Report change %s, params %s, message: %s" %
+ (change, params, message))
+
+ # Create a text/plain email message
+ from_email = params['from']\
+ if 'from' in params else self.smtp_default_from
+ to_email = params['to']\
+ if 'to' in params else self.smtp_default_to
+ msg = MIMEText(message)
+ msg['Subject'] = "Report change %s" % change
+ msg['From'] = from_email
+ msg['To'] = to_email
+
+ try:
+ s = smtplib.SMTP(self.smtp_server, self.smtp_port)
+ s.sendmail(from_email, to_email.split(','), msg.as_string())
+ s.quit()
+ except:
+ return "Could not send email via SMTP"
+ return
+
+ def getSubmitAllowNeeds(self, params):
+ """Get a list of code review labels that are allowed to be
+ "needed" in the submit records for a change, with respect
+ to this queue. In other words, the list of review labels
+ this reporter itself is likely to set before submitting.
+ """
+ return []
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 27c5544..8b6c20c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -28,7 +28,7 @@
import layoutvalidator
import model
-from model import Pipeline, Project, ChangeQueue, EventFilter
+from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
import merger
statsd = extras.try_import('statsd.statsd')
@@ -75,6 +75,7 @@
self._stopped = False
self.launcher = None
self.triggers = dict()
+ self.reporters = dict()
self.config = None
self._maintain_trigger_cache = False
@@ -132,15 +133,27 @@
"Build succeeded.")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True)
- pipeline.dequeue_on_conflict = conf_pipeline.get(
- 'dequeue-on-conflict', True)
+
+ action_reporters = {}
+ for action in ['start', 'success', 'failure']:
+ action_reporters[action] = []
+ if conf_pipeline.get(action):
+ for reporter_name, params \
+ in conf_pipeline.get(action).items():
+ if reporter_name in self.reporters.keys():
+ action_reporters[action].append(ActionReporter(
+ self.reporters[reporter_name], params))
+ else:
+ self.log.error('Invalid reporter name %s' %
+ reporter_name)
+ pipeline.start_actions = action_reporters['start']
+ pipeline.success_actions = action_reporters['success']
+ pipeline.failure_actions = action_reporters['failure']
+
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
-
layout.pipelines[conf_pipeline['name']] = pipeline
- manager.success_action = conf_pipeline.get('success')
- manager.failure_action = conf_pipeline.get('failure')
- manager.start_action = conf_pipeline.get('start')
+
# TODO: move this into triggers (may require pluggable
# configuration)
if 'gerrit' in conf_pipeline['trigger']:
@@ -238,9 +251,8 @@
config_project.update(expanded)
layout.projects[config_project['name']] = project
- mode = config_project.get('merge-mode')
- if mode and mode == 'cherry-pick':
- project.merge_mode = model.CHERRY_PICK
+ mode = config_project.get('merge-mode', 'merge-resolve')
+ project.merge_mode = model.MERGER_MAP[mode]
for pipeline in layout.pipelines.values():
if pipeline.name in config_project:
job_tree = pipeline.addProject(project)
@@ -300,6 +312,11 @@
name = trigger.name
self.triggers[name] = trigger
+ def registerReporter(self, reporter, name=None):
+ if name is None:
+ name = reporter.name
+ self.reporters[name] = reporter
+
def getProject(self, name):
self.layout_lock.acquire()
p = None
@@ -339,6 +356,8 @@
dt = int((build.end_time - build.start_time) * 1000)
statsd.timing(key, dt)
statsd.incr(key)
+ key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+ statsd.incr(key)
except:
self.log.exception("Exception reporting runtime stats")
self.result_event_queue.put(('completed', build))
@@ -435,10 +454,9 @@
name)
items_to_remove = []
for shared_queue in old_pipeline.queues:
- for item in (shared_queue.queue +
- shared_queue.severed_heads):
+ for item in shared_queue.queue:
item.item_ahead = None
- item.item_behind = None
+ item.items_behind = []
item.pipeline = None
project = layout.projects.get(item.change.project.name)
if not project:
@@ -449,9 +467,7 @@
items_to_remove.append(item)
continue
item.change.project = project
- severed = item in shared_queue.severed_heads
- if not new_pipeline.manager.reEnqueueItem(
- item, severed=severed):
+ if not new_pipeline.manager.reEnqueueItem(item):
items_to_remove.append(item)
builds_to_remove = []
for build, item in old_pipeline.manager.building_jobs.items():
@@ -468,6 +484,16 @@
self._setupMerger()
for trigger in self.triggers.values():
trigger.postConfig()
+ if statsd:
+ try:
+ for pipeline in self.layout.pipelines.values():
+ items = len(pipeline.getAllItems())
+ # stats.gauges.zuul.pipeline.NAME.current_changes
+ key = 'zuul.pipeline.%s' % pipeline.name
+ statsd.gauge(key + '.current_changes', items)
+ except Exception:
+ self.log.exception("Exception reporting initial "
+ "pipeline stats:")
self._reconfigure = False
self.reconfigure_complete_event.set()
finally:
@@ -532,9 +558,11 @@
def maintainTriggerCache(self):
relevant = set()
for pipeline in self.layout.pipelines.values():
+ self.log.debug("Start maintain trigger cache for: %s" % pipeline)
for item in pipeline.getAllItems():
relevant.add(item.change)
relevant.update(item.change.getRelatedChanges())
+ self.log.debug("End maintain trigger cache for: %s" % pipeline)
self.log.debug("Trigger cache size: %s" % len(relevant))
for trigger in self.triggers.values():
trigger.maintainCache(relevant)
@@ -550,7 +578,7 @@
return
# Preprocessing for ref-update events
- if hasattr(event, 'refspec'):
+ if event.ref:
# Make sure the local git repo is up-to-date with the remote one.
# We better have the new ref before enqueuing the changes.
# This is done before enqueuing the changes to avoid calling an
@@ -598,7 +626,6 @@
ret += '</p>'
keys = self.layout.pipelines.keys()
- keys.sort()
for key in keys:
pipeline = self.layout.pipelines[key]
s = 'Pipeline: %s' % pipeline.name
@@ -629,7 +656,6 @@
pipelines = []
data['pipelines'] = pipelines
keys = self.layout.pipelines.keys()
- keys.sort()
for key in keys:
pipeline = self.layout.pipelines[key]
pipelines.append(pipeline.formatStatusJSON())
@@ -644,9 +670,6 @@
self.pipeline = pipeline
self.building_jobs = {}
self.event_filters = []
- self.success_action = {}
- self.failure_action = {}
- self.start_action = {}
if self.sched.config and self.sched.config.has_option(
'zuul', 'report_times'):
self.report_times = self.sched.config.getboolean(
@@ -690,25 +713,22 @@
if tree:
self.log.info(" %s" % p)
log_jobs(tree)
- if self.start_action:
- self.log.info(" On start:")
- self.log.info(" %s" % self.start_action)
- if self.success_action:
- self.log.info(" On success:")
- self.log.info(" %s" % self.success_action)
- if self.failure_action:
- self.log.info(" On failure:")
- self.log.info(" %s" % self.failure_action)
+ self.log.info(" On start:")
+ self.log.info(" %s" % self.pipeline.start_actions)
+ self.log.info(" On success:")
+ self.log.info(" %s" % self.pipeline.success_actions)
+ self.log.info(" On failure:")
+ self.log.info(" %s" % self.pipeline.failure_actions)
def getSubmitAllowNeeds(self):
# Get a list of code review labels that are allowed to be
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
- if self.success_action:
- return self.success_action.keys()
- else:
- return {}
+ allow_needs = set()
+ for action_reporter in self.pipeline.success_actions:
+ allow_needs.update(action_reporter.getSubmitAllowNeeds())
+ return allow_needs
def eventMatches(self, event):
for ef in self.event_filters:
@@ -725,17 +745,38 @@
def reportStart(self, change):
try:
self.log.info("Reporting start, action %s change %s" %
- (self.start_action, change))
+ (self.pipeline.start_actions, change))
msg = "Starting %s jobs." % self.pipeline.name
if self.sched.config.has_option('zuul', 'status_url'):
msg += "\n" + self.sched.config.get('zuul', 'status_url')
- ret = self.pipeline.trigger.report(change, msg, self.start_action)
+ ret = self.sendReport(self.pipeline.start_actions,
+ change, msg)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
+ def sendReport(self, action_reporters, change, message):
+ """Sends the built message off to configured reporters.
+
+ Takes the action_reporters, change, message and extra options and
+ sends them to the pluggable reporters.
+ """
+ report_errors = []
+ if len(action_reporters) > 0:
+ if not change.number:
+ self.log.debug("Not reporting change %s: No number present."
+ % change)
+ return
+ for action_reporter in action_reporters:
+ ret = action_reporter.report(change, message)
+ if ret:
+ report_errors.append(ret)
+ if len(report_errors) == 0:
+ return
+ return report_errors
+
def isChangeReadyToBeEnqueued(self, change):
return True
@@ -748,6 +789,9 @@
def checkForChangesNeededBy(self, change):
return True
+ def getFailingDependentItem(self, item):
+ return None
+
def getDependentItems(self, item):
orig_item = item
items = []
@@ -759,6 +803,12 @@
[x.change for x in items]))
return items
+ def getItemForChange(self, change):
+ for item in self.pipeline.getAllItems():
+ if item.change.equals(change):
+ return item
+ return None
+
def findOldVersionOfChangeAlreadyInQueue(self, change):
for c in self.pipeline.getChangesInQueue():
if change.isUpdateOf(c):
@@ -774,15 +824,12 @@
(change, old_change, old_change))
self.removeChange(old_change)
- def reEnqueueItem(self, item, severed=False):
+ def reEnqueueItem(self, item):
change_queue = self.pipeline.getQueue(item.change.project)
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
- if severed:
- change_queue.addSeveredHead(item)
- else:
- change_queue.enqueueItem(item)
+ change_queue.enqueueItem(item)
self.reportStats(item)
return True
else:
@@ -813,7 +860,7 @@
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
- if self.start_action:
+ if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
self.reportStats(item)
@@ -823,15 +870,10 @@
change.project)
return False
- def dequeueItem(self, item, keep_severed_heads=True):
+ def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
- item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project)
change_queue.dequeueItem(item)
- if (keep_severed_heads and not item_ahead and
- (item.change.is_reportable and not item.reported)):
- self.log.debug("Adding %s as a severed head" % item.change)
- change_queue.addSeveredHead(item)
self.sched._maintain_trigger_cache = True
def removeChange(self, change):
@@ -842,7 +884,8 @@
self.log.debug("Canceling builds behind change: %s "
"because it is being removed." % item.change)
self.cancelJobs(item)
- self.dequeueItem(item, keep_severed_heads=False)
+ self.dequeueItem(item)
+ self.reportStats(item)
def prepareRef(self, item):
# Returns False on success.
@@ -854,29 +897,14 @@
ref = item.current_build_set.ref
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
- dependent_str = ', '.join(
- ['%s' % i.change.number for i in dependent_items
- if i.change.project == item.change.project])
- if dependent_str:
- msg = \
- "This change was unable to be automatically merged "\
- "with the current state of the repository and the "\
- "following changes which were enqueued ahead of it: "\
- "%s. Please rebase your change and upload a new "\
- "patchset." % dependent_str
- else:
- msg = "This change was unable to be automatically merged "\
- "with the current state of the repository. Please "\
- "rebase your change and upload a new patchset."
all_items = dependent_items + [item]
- if (dependent_items and
- not dependent_items[-1].current_build_set.commit):
- self.pipeline.setUnableToMerge(item, msg)
- return True
commit = self.sched.merger.mergeChanges(all_items, ref)
item.current_build_set.commit = commit
if not commit:
self.log.info("Unable to merge change %s" % item.change)
+ msg = ("This change was unable to be automatically merged "
+ "with the current state of the repository. Please "
+ "rebase your change and upload a new patchset.")
self.pipeline.setUnableToMerge(item, msg)
return True
return False
@@ -924,74 +952,99 @@
self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED'
del self.building_jobs[build]
- if item.item_behind:
+ for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
- (item.item_behind.change, item.change))
- if self.cancelJobs(item.item_behind, prime=prime):
+ (item_behind.change, item.change))
+ if self.cancelJobs(item_behind, prime=prime):
canceled = True
return canceled
- def _processOneItem(self, item):
+ def _processOneItem(self, item, nnfi):
changed = False
item_ahead = item.item_ahead
- item_behind = item.item_behind
- if self.prepareRef(item):
- changed = True
- if self.pipeline.dequeue_on_conflict:
- self.log.info("Dequeuing change %s because "
- "of a git merge error" % item.change)
- self.dequeueItem(item, keep_severed_heads=False)
- try:
- self.reportItem(item)
- except MergeFailure:
- pass
- return changed
+ change_queue = self.pipeline.getQueue(item.change.project)
+ failing_reasons = [] # Reasons this item is failing
+
if self.checkForChangesNeededBy(item.change) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
- self.dequeueItem(item, keep_severed_heads=False)
+ self.dequeueItem(item)
self.pipeline.setDequeuedNeedingChange(item)
try:
self.reportItem(item)
except MergeFailure:
pass
- changed = True
- return changed
- if not item_ahead:
- merge_failed = False
- if self.pipeline.areAllJobsComplete(item):
- try:
- self.reportItem(item)
- except MergeFailure:
- merge_failed = True
- self.dequeueItem(item)
- changed = True
- if merge_failed or self.pipeline.didAnyJobFail(item):
- if item_behind:
- self.cancelJobs(item_behind)
- changed = True
- self.dequeueItem(item)
+ return (True, nnfi)
+ dep_item = self.getFailingDependentItem(item)
+ if dep_item:
+ failing_reasons.append('a needed change is failing')
+ self.cancelJobs(item, prime=False)
else:
- if self.pipeline.didAnyJobFail(item):
- if item_behind:
- if self.cancelJobs(item_behind, prime=False):
- changed = True
- # don't restart yet; this change will eventually become
- # the head
+ item_ahead_merged = False
+ if ((item_ahead and item_ahead.change.is_merged) or
+ not change_queue.dependent):
+ item_ahead_merged = True
+ if (item_ahead != nnfi and not item_ahead_merged):
+ # Our current base is different than what we expected,
+ # and it's not because our current base merged. Something
+ # ahead must have failed.
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, is not the nearest non-failing "
+ "item, %s" % (item.change, item_ahead, nnfi))
+ change_queue.moveItem(item, nnfi)
+ changed = True
+ self.cancelJobs(item)
+ self.prepareRef(item)
+ if item.current_build_set.unable_to_merge:
+ failing_reasons.append("it has a merge conflict")
if self.launchJobs(item):
changed = True
- return changed
+ if self.pipeline.didAnyJobFail(item):
+ failing_reasons.append("at least one job failed")
+ if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
+ try:
+ self.reportItem(item)
+ except MergeFailure:
+ failing_reasons.append("it did not merge")
+ for item_behind in item.items_behind:
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, failed to merge" %
+ (item_behind.change, item))
+ self.cancelJobs(item_behind)
+ self.dequeueItem(item)
+ changed = True
+ elif not failing_reasons:
+ nnfi = item
+ item.current_build_set.failing_reasons = failing_reasons
+ if failing_reasons:
+ self.log.debug("%s is a failing item because %s" %
+ (item, failing_reasons))
+ return (changed, nnfi)
def processQueue(self):
# Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False
- for item in self.pipeline.getAllItems():
- if self._processOneItem(item):
+ for queue in self.pipeline.queues:
+ queue_changed = False
+ nnfi = None # Nearest non-failing item
+ for item in queue.queue[:]:
+ item_changed, nnfi = self._processOneItem(item, nnfi)
+ if item_changed:
+ queue_changed = True
+ self.reportStats(item)
+ if queue_changed:
changed = True
- self.reportStats(item)
+ status = ''
+ for item in queue.queue:
+ status += self.pipeline.formatStatus(item)
+ if status:
+ self.log.debug("Queue %s status is now:\n %s" %
+ (queue.name, status))
+ self.log.debug("Finished queue processor: %s (changed: %s)" %
+ (self.pipeline.name, changed))
return changed
def updateBuildDescriptions(self, build_set):
@@ -1030,8 +1083,8 @@
del self.building_jobs[build]
self.pipeline.setResult(change, build)
- self.log.info("Change %s status is now:\n %s" %
- (change, self.pipeline.formatStatus(change)))
+ self.log.debug("Change %s status is now:\n %s" %
+ (change, self.pipeline.formatStatus(change)))
self.updateBuildDescriptions(build.build_set)
while self.processQueue():
pass
@@ -1060,21 +1113,21 @@
if item.change.is_reportable and item.reported:
return 0
self.log.debug("Reporting change %s" % item.change)
- ret = None
+ ret = True # Means error as returned by trigger.report
if self.pipeline.didAllJobsSucceed(item):
- self.log.debug("success %s %s" % (self.success_action,
- self.failure_action))
- action = self.success_action
+ self.log.debug("success %s %s" % (self.pipeline.success_actions,
+ self.pipeline.failure_actions))
+ actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
else:
- action = self.failure_action
+ actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
report = self.formatReport(item)
item.reported = True
try:
- self.log.info("Reporting change %s, action: %s" %
- (item.change, action))
- ret = self.pipeline.trigger.report(item.change, report, action)
+ self.log.info("Reporting change %s, actions: %s" %
+ (item.change, actions))
+ ret = self.sendReport(actions, item.change, report)
if ret:
self.log.error("Reporting change %s received: %s" %
(item.change, ret))
@@ -1395,3 +1448,15 @@
self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change)
return False
+
+ def getFailingDependentItem(self, item):
+ if not hasattr(item.change, 'needs_change'):
+ return None
+ if not item.change.needs_change:
+ return None
+ needs_item = self.getItemForChange(item.change.needs_change)
+ if not needs_item:
+ return None
+ if needs_item.current_build_set.failing_reasons:
+ return needs_item
+ return None
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 65dd7fa..976849c 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -138,22 +138,6 @@
self.gerrit_connector.stop()
self.gerrit_connector.join()
- def report(self, change, message, action):
- self.log.debug("Report change %s, action %s, message: %s" %
- (change, action, message))
- if not change.number:
- self.log.debug("Change has no number; not reporting")
- return
- if not action:
- self.log.debug("No action specified; not reporting")
- return
- changeid = '%s,%s' % (change.number, change.patchset)
- ref = 'refs/heads/' + change.branch
- change._ref_sha = self.getRefSha(change.project.name,
- ref)
- return self.gerrit.review(change.project.name, changeid,
- message, action)
-
def _getInfoRefs(self, project):
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
@@ -334,6 +318,7 @@
change.branch = data['branch']
change.url = data['url']
max_ps = 0
+ change.files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
@@ -352,6 +337,7 @@
# for dependencies.
return change
+ change.needs_change = None
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
@@ -359,6 +345,7 @@
if not dep.is_merged:
change.needs_change = dep
+ change.needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index e67af01..b75a165 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -40,9 +40,6 @@
def stop(self):
self.apsched.shutdown()
- def report(self, change, message, action):
- raise Exception("Timer trigger does not support reporting.")
-
def isMerged(self, change, head=None):
raise Exception("Timer trigger does not support checking if "
"a change is merged.")