Merge "add forge author identity privilege to support pushing refs to gerrit"
diff --git a/NEWS.rst b/NEWS.rst
index db269a4..c4901a8 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -25,12 +25,6 @@
triggers later). See the sample layout.yaml and Zuul section of the
documentation.
-* The default behavior is now to immediately dequeue changes that have
- merge conflicts, even those not at the head of the queue. To enable
- the old behavior (which would wait until the conflicting change was
- at the head before dequeuing it), see the new "dequeue-on-conflict"
- option.
-
* Some statsd keys have changed in a backwards incompatible way:
* The counters and timers of the form zuul.job.{name} is now split
into several keys of the form:
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index f8e070c..6adfa30 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -346,14 +346,6 @@
well. To suppress this behavior (and allow jobs to continue
running), set this to ``false``. Default: ``true``.
-**dequeue-on-conflict**
- Normally, if there is a merge conflict or similar error with a
- change, Zuul will immediately remove it from the queue, even if the
- error is only due to a change that happened to be enqueued ahead of
- it. If you would like to keep the change in the queue until it is
- at the head to be certain that the merge conflict is intrinsic to
- the change, set this to ``false``. Default: ``true``.
-
**success**
Describes where Zuul should report to if all the jobs complete
successfully.
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 1cb8688..dc659fb 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -75,7 +75,6 @@
verified: -1
- name: conflict
- dequeue-on-conflict: false
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 70b68c5..395ff25 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1068,9 +1068,6 @@
print 'pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue)
self.assertEqual(len(queue.queue), 0)
- if len(queue.severed_heads) != 0:
- print 'heads', queue.severed_heads
- self.assertEqual(len(queue.severed_heads), 0)
def assertReportedStat(self, key, value=None, kind=None):
start = time.time()
@@ -1403,11 +1400,20 @@
self.release(self.builds[2])
self.waitUntilSettled()
- # project-test1 and project-test2 for A, project-test2 for B
- self.assertEqual(len(self.builds), 3)
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-merge for C (without B)
+ self.assertEqual(len(self.builds), 4)
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
- # check that build status of aborted jobs are masked ('CANCELED')
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-test1 and project-test2 for C
+ self.assertEqual(len(self.builds), 5)
+
items = self.sched.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
@@ -1418,7 +1424,7 @@
self.assertEqual(self.countJobResults(builds, None), 1)
builds = items[2].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
- self.assertEqual(self.countJobResults(builds, 'CANCELED'), 2)
+ self.assertEqual(self.countJobResults(builds, None), 2)
self.worker.hold_jobs_in_build = False
self.worker.release()
@@ -1667,8 +1673,9 @@
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.waitUntilSettled()
- queue = self.gearman_server.getQueue()
- self.getParameter(queue[-1], 'ZUUL_REF')
+
+ self.assertEqual(len(self.history), 2) # A and C merge jobs
+
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
@@ -1679,32 +1686,7 @@
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
-
- def test_dequeue_conflict(self):
- "Test that the option to dequeue merge conflicts works"
-
- self.gearman_server.hold_jobs_in_queue = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addPatchset(['conflict'])
- B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
- B.addPatchset(['conflict'])
- A.addApproval('CRVW', 2)
- B.addApproval('CRVW', 2)
- self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
- self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
- self.waitUntilSettled()
-
- self.assertEqual(A.reported, 1)
- self.assertEqual(B.reported, 2)
-
- self.gearman_server.hold_jobs_in_queue = False
- self.gearman_server.release()
- self.waitUntilSettled()
-
- self.assertEqual(A.data['status'], 'MERGED')
- self.assertEqual(B.data['status'], 'NEW')
- self.assertEqual(A.reported, 2)
- self.assertEqual(B.reported, 2)
+ self.assertEqual(len(self.history), 6)
def test_post(self):
"Test that post jobs run"
@@ -1888,6 +1870,66 @@
self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 1)
+ def test_failing_dependent_changes(self):
+ "Test that failing dependent patches are taken out of stream"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+ E.addApproval('CRVW', 2)
+
+ # E, D -> C -> B, A
+
+ D.setDependsOn(C, 1)
+ C.setDependsOn(B, 1)
+
+ self.worker.addFailTest('project-test1', B)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.worker.hold_jobs_in_build = False
+ for build in self.builds:
+ if build.parameters['ZUUL_CHANGE'] != '1':
+ build.release()
+ self.waitUntilSettled()
+
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.data['status'], 'NEW')
+ self.assertEqual(C.reported, 2)
+ self.assertEqual(D.data['status'], 'NEW')
+ self.assertEqual(D.reported, 2)
+ self.assertEqual(E.data['status'], 'MERGED')
+ self.assertEqual(E.reported, 2)
+ self.assertEqual(len(self.history), 18)
+
def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra
diff --git a/tox.ini b/tox.ini
index 8e0ede6..06b37df 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,4 +1,5 @@
[tox]
+minversion = 1.6
envlist = pep8, pyflakes, py27
[testenv]
@@ -6,6 +7,7 @@
setenv = STATSD_HOST=localhost
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
+install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 00900a0..0d08f1b 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -64,7 +64,6 @@
'success-message': str,
'failure-message': str,
'dequeue-on-new-patchset': bool,
- 'dequeue-on-conflict': bool,
'trigger': trigger,
'success': report_actions,
'failure': report_actions,
diff --git a/zuul/model.py b/zuul/model.py
index d68ac91..056f41d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -51,7 +51,6 @@
self.failure_message = None
self.success_message = None
self.dequeue_on_new_patchset = True
- self.dequeue_on_conflict = True
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
@@ -169,6 +168,7 @@
return True
if build.result != 'SUCCESS':
return True
+
if not item.item_ahead:
return False
return self.isHoldingFollowingChanges(item.item_ahead)
@@ -212,7 +212,6 @@
items = []
for shared_queue in self.queues:
items.extend(shared_queue.queue)
- items.extend(shared_queue.severed_heads)
return items
def formatStatusHTML(self):
@@ -222,8 +221,8 @@
s = 'Change queue: %s' % queue.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
- for head in queue.getHeads():
- ret += self.formatStatus(head, html=True)
+ for item in queue.queue:
+ ret += self.formatStatus(item, html=True)
return ret
def formatStatusJSON(self):
@@ -235,18 +234,21 @@
j_queue = dict(name=queue.name)
j_queues.append(j_queue)
j_queue['heads'] = []
- for head in queue.getHeads():
- j_changes = []
- e = head
- while e:
- j_changes.append(self.formatItemJSON(e))
- if (len(j_changes) > 1 and
- (j_changes[-2]['remaining_time'] is not None) and
- (j_changes[-1]['remaining_time'] is not None)):
- j_changes[-1]['remaining_time'] = max(
- j_changes[-2]['remaining_time'],
- j_changes[-1]['remaining_time'])
- e = e.item_behind
+
+ j_changes = []
+ for e in queue.queue:
+ if not e.item_ahead:
+ if j_changes:
+ j_queue['heads'].append(j_changes)
+ j_changes = []
+ j_changes.append(self.formatItemJSON(e))
+ if (len(j_changes) > 1 and
+ (j_changes[-2]['remaining_time'] is not None) and
+ (j_changes[-1]['remaining_time'] is not None)):
+ j_changes[-1]['remaining_time'] = max(
+ j_changes[-2]['remaining_time'],
+ j_changes[-1]['remaining_time'])
+ if j_changes:
j_queue['heads'].append(j_changes)
return j_pipeline
@@ -261,9 +263,11 @@
changeish.url,
changeish._id())
else:
- ret += '%sProject %s change %s\n' % (indent_str,
- changeish.project.name,
- changeish._id())
+ ret += '%sProject %s change %s based on %s\n' % (
+ indent_str,
+ changeish.project.name,
+ changeish._id(),
+ item.item_ahead)
for job in self.getJobs(changeish):
build = item.current_build_set.getBuild(job.name)
if build:
@@ -284,9 +288,6 @@
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n'
- if item.item_behind:
- ret += '%sFollowed by:\n' % (indent_str)
- ret += self.formatStatus(item.item_behind, indent + 2, html)
return ret
def formatItemJSON(self, item):
@@ -297,6 +298,12 @@
else:
ret['url'] = None
ret['id'] = changeish._id()
+ if item.item_ahead:
+ ret['item_ahead'] = item.item_ahead.change._id()
+ else:
+ ret['item_ahead'] = None
+ ret['items_behind'] = [i.change._id() for i in item.items_behind]
+ ret['failing_reasons'] = item.current_build_set.failing_reasons
ret['project'] = changeish.project.name
ret['enqueue_time'] = int(item.enqueue_time * 1000)
ret['jobs'] = []
@@ -333,19 +340,7 @@
result=result,
voting=job.voting))
if self.haveAllJobsStarted(item):
- # if a change ahead has failed, we are unknown.
- item_ahead_failed = False
- i = item.item_ahead
- while i:
- if self.didAnyJobFail(i):
- item_ahead_failed = True
- i = None # safe to stop looking
- else:
- i = i.item_ahead
- if item_ahead_failed:
- ret['remaining_time'] = None
- else:
- ret['remaining_time'] = max_remaining
+ ret['remaining_time'] = max_remaining
else:
ret['remaining_time'] = None
return ret
@@ -385,7 +380,6 @@
self.projects = []
self._jobs = set()
self.queue = []
- self.severed_heads = []
self.dependent = dependent
def __repr__(self):
@@ -411,45 +405,45 @@
def enqueueItem(self, item):
if self.dependent and self.queue:
item.item_ahead = self.queue[-1]
- item.item_ahead.item_behind = item
+ item.item_ahead.items_behind.append(item)
self.queue.append(item)
def dequeueItem(self, item):
if item in self.queue:
self.queue.remove(item)
- if item in self.severed_heads:
- self.severed_heads.remove(item)
if item.item_ahead:
- item.item_ahead.item_behind = item.item_behind
- if item.item_behind:
- item.item_behind.item_ahead = item.item_ahead
+ item.item_ahead.items_behind.remove(item)
+ for item_behind in item.items_behind:
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item_behind)
+ item_behind.item_ahead = item.item_ahead
item.item_ahead = None
- item.item_behind = None
+ item.items_behind = []
item.dequeue_time = time.time()
- def addSeveredHead(self, item):
- self.severed_heads.append(item)
+ def moveItem(self, item, item_ahead):
+ if not self.dependent:
+ return False
+ if item.item_ahead == item_ahead:
+ return False
+ # Remove from current location
+ if item.item_ahead:
+ item.item_ahead.items_behind.remove(item)
+ for item_behind in item.items_behind:
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item_behind)
+ item_behind.item_ahead = item.item_ahead
+ # Add to new location
+ item.item_ahead = item_ahead
+ item.items_behind = []
+ if item.item_ahead:
+ item.item_ahead.items_behind.append(item)
+ return True
def mergeChangeQueue(self, other):
for project in other.projects:
self.addProject(project)
- def getHead(self):
- if not self.queue:
- return None
- return self.queue[0]
-
- def getHeads(self):
- heads = []
- if self.dependent:
- h = self.getHead()
- if h:
- heads.append(h)
- else:
- heads.extend(self.queue)
- heads.extend(self.severed_heads)
- return heads
-
class Project(object):
def __init__(self, name):
@@ -592,6 +586,7 @@
self.commit = None
self.unable_to_merge = False
self.unable_to_merge_message = None
+ self.failing_reasons = []
def setConfiguration(self):
# The change isn't enqueued until after it's created
@@ -632,11 +627,19 @@
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
self.item_ahead = None
- self.item_behind = None
+ self.items_behind = []
self.enqueue_time = None
self.dequeue_time = None
self.reported = False
+ def __repr__(self):
+ if self.pipeline:
+ pipeline = self.pipeline.name
+ else:
+ pipeline = None
+ return '<QueueItem 0x%x for %s in %s>' % (
+ id(self), self.change, pipeline)
+
def resetAllBuilds(self):
old = self.current_build_set
self.current_build_set.result = 'CANCELED'
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8a4d942..514be2f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -133,8 +133,6 @@
"Build succeeded.")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True)
- pipeline.dequeue_on_conflict = conf_pipeline.get(
- 'dequeue-on-conflict', True)
action_reporters = {}
for action in ['start', 'success', 'failure']:
@@ -456,10 +454,9 @@
name)
items_to_remove = []
for shared_queue in old_pipeline.queues:
- for item in (shared_queue.queue +
- shared_queue.severed_heads):
+ for item in shared_queue.queue:
item.item_ahead = None
- item.item_behind = None
+ item.items_behind = []
item.pipeline = None
project = layout.projects.get(item.change.project.name)
if not project:
@@ -470,9 +467,7 @@
items_to_remove.append(item)
continue
item.change.project = project
- severed = item in shared_queue.severed_heads
- if not new_pipeline.manager.reEnqueueItem(
- item, severed=severed):
+ if not new_pipeline.manager.reEnqueueItem(item):
items_to_remove.append(item)
builds_to_remove = []
for build, item in old_pipeline.manager.building_jobs.items():
@@ -794,6 +789,9 @@
def checkForChangesNeededBy(self, change):
return True
+ def getFailingDependentItem(self, item):
+ return None
+
def getDependentItems(self, item):
orig_item = item
items = []
@@ -805,6 +803,12 @@
[x.change for x in items]))
return items
+ def getItemForChange(self, change):
+ for item in self.pipeline.getAllItems():
+ if item.change.equals(change):
+ return item
+ return None
+
def findOldVersionOfChangeAlreadyInQueue(self, change):
for c in self.pipeline.getChangesInQueue():
if change.isUpdateOf(c):
@@ -820,15 +824,12 @@
(change, old_change, old_change))
self.removeChange(old_change)
- def reEnqueueItem(self, item, severed=False):
+ def reEnqueueItem(self, item):
change_queue = self.pipeline.getQueue(item.change.project)
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
- if severed:
- change_queue.addSeveredHead(item)
- else:
- change_queue.enqueueItem(item)
+ change_queue.enqueueItem(item)
self.reportStats(item)
return True
else:
@@ -869,15 +870,10 @@
change.project)
return False
- def dequeueItem(self, item, keep_severed_heads=True):
+ def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
- item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project)
change_queue.dequeueItem(item)
- if (keep_severed_heads and not item_ahead and
- (item.change.is_reportable and not item.reported)):
- self.log.debug("Adding %s as a severed head" % item.change)
- change_queue.addSeveredHead(item)
self.sched._maintain_trigger_cache = True
def removeChange(self, change):
@@ -888,7 +884,7 @@
self.log.debug("Canceling builds behind change: %s "
"because it is being removed." % item.change)
self.cancelJobs(item)
- self.dequeueItem(item, keep_severed_heads=False)
+ self.dequeueItem(item)
self.reportStats(item)
def prepareRef(self, item):
@@ -901,29 +897,14 @@
ref = item.current_build_set.ref
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
- dependent_str = ', '.join(
- ['%s' % i.change.number for i in dependent_items
- if i.change.project == item.change.project])
- if dependent_str:
- msg = \
- "This change was unable to be automatically merged "\
- "with the current state of the repository and the "\
- "following changes which were enqueued ahead of it: "\
- "%s. Please rebase your change and upload a new "\
- "patchset." % dependent_str
- else:
- msg = "This change was unable to be automatically merged "\
- "with the current state of the repository. Please "\
- "rebase your change and upload a new patchset."
all_items = dependent_items + [item]
- if (dependent_items and
- not dependent_items[-1].current_build_set.commit):
- self.pipeline.setUnableToMerge(item, msg)
- return True
commit = self.sched.merger.mergeChanges(all_items, ref)
item.current_build_set.commit = commit
if not commit:
self.log.info("Unable to merge change %s" % item.change)
+ msg = ("This change was unable to be automatically merged "
+ "with the current state of the repository. Please "
+ "rebase your change and upload a new patchset.")
self.pipeline.setUnableToMerge(item, msg)
return True
return False
@@ -971,74 +952,94 @@
self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED'
del self.building_jobs[build]
- if item.item_behind:
+ for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
- (item.item_behind.change, item.change))
- if self.cancelJobs(item.item_behind, prime=prime):
+ (item_behind.change, item.change))
+ if self.cancelJobs(item_behind, prime=prime):
canceled = True
return canceled
- def _processOneItem(self, item):
+ def _processOneItem(self, item, nnfi):
changed = False
item_ahead = item.item_ahead
- item_behind = item.item_behind
- if self.prepareRef(item):
- changed = True
- if self.pipeline.dequeue_on_conflict:
- self.log.info("Dequeuing change %s because "
- "of a git merge error" % item.change)
- self.dequeueItem(item, keep_severed_heads=False)
- try:
- self.reportItem(item)
- except MergeFailure:
- pass
- return changed
+ change_queue = self.pipeline.getQueue(item.change.project)
+ failing_reasons = [] # Reasons this item is failing
+
if self.checkForChangesNeededBy(item.change) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
- self.dequeueItem(item, keep_severed_heads=False)
+ self.dequeueItem(item)
self.pipeline.setDequeuedNeedingChange(item)
try:
self.reportItem(item)
except MergeFailure:
pass
- changed = True
- return changed
- if not item_ahead:
- merge_failed = False
- if self.pipeline.areAllJobsComplete(item):
- try:
- self.reportItem(item)
- except MergeFailure:
- merge_failed = True
- self.dequeueItem(item)
- changed = True
- if merge_failed or self.pipeline.didAnyJobFail(item):
- if item_behind:
- self.cancelJobs(item_behind)
- changed = True
- self.dequeueItem(item)
+ return (True, nnfi)
+ dep_item = self.getFailingDependentItem(item)
+ if dep_item:
+ failing_reasons.append('a needed change is failing')
+ self.cancelJobs(item, prime=False)
else:
- if self.pipeline.didAnyJobFail(item):
- if item_behind:
- if self.cancelJobs(item_behind, prime=False):
- changed = True
- # don't restart yet; this change will eventually become
- # the head
+ if (item_ahead and item_ahead != nnfi and
+ not item_ahead.change.is_merged):
+ # Our current base is different than what we expected,
+ # and it's not because our current base merged. Something
+ # ahead must have failed.
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, is not the nearest non-failing "
+ "item, %s" % (item.change, item_ahead, nnfi))
+ change_queue.moveItem(item, nnfi)
+ changed = True
+ self.cancelJobs(item)
+ self.prepareRef(item)
+ if item.current_build_set.unable_to_merge:
+ failing_reasons.append("merge conflict")
if self.launchJobs(item):
changed = True
- return changed
+ if self.pipeline.didAnyJobFail(item):
+ failing_reasons.append("at least one job failed")
+ if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
+ try:
+ self.reportItem(item)
+ except MergeFailure:
+ failing_reasons.append("did not merge")
+ for item_behind in item.items_behind:
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, failed to merge" %
+ (item_behind.change, item))
+ self.cancelJobs(item_behind)
+ self.dequeueItem(item)
+ changed = True
+ elif not failing_reasons:
+ nnfi = item
+ item.current_build_set.failing_reasons = failing_reasons
+ if failing_reasons:
+ self.log.debug("%s is a failing item because %s" %
+ (item, failing_reasons))
+ return (changed, nnfi)
def processQueue(self):
# Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False
- for item in self.pipeline.getAllItems():
- if self._processOneItem(item):
+ for queue in self.pipeline.queues:
+ queue_changed = False
+ nnfi = None # Nearest non-failing item
+ for item in queue.queue[:]:
+ item_changed, nnfi = self._processOneItem(item, nnfi)
+ if item_changed:
+ queue_changed = True
+ self.reportStats(item)
+ if queue_changed:
changed = True
- self.reportStats(item)
+ status = ''
+ for item in queue.queue:
+ status += self.pipeline.formatStatus(item)
+ if status:
+ self.log.debug("Queue %s status is now:\n %s" %
+ (queue.name, status))
self.log.debug("Finished queue processor: %s (changed: %s)" %
(self.pipeline.name, changed))
return changed
@@ -1079,8 +1080,8 @@
del self.building_jobs[build]
self.pipeline.setResult(change, build)
- self.log.info("Change %s status is now:\n %s" %
- (change, self.pipeline.formatStatus(change)))
+ self.log.debug("Change %s status is now:\n %s" %
+ (change, self.pipeline.formatStatus(change)))
self.updateBuildDescriptions(build.build_set)
while self.processQueue():
pass
@@ -1444,3 +1445,15 @@
self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change)
return False
+
+ def getFailingDependentItem(self, item):
+ if not hasattr(item.change, 'needs_change'):
+ return None
+ if not item.change.needs_change:
+ return None
+ needs_item = self.getItemForChange(item.change.needs_change)
+ if not needs_item:
+ return None
+ if needs_item.current_build_set.failing_reasons:
+ return needs_item
+ return None