blob: 0d11316497ca16f075174fd6cf58f3e82b4fb67e [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013import logging
14
15from zuul import exceptions
16from zuul.model import NullChange
17
James E. Blair83005782015-12-11 14:46:03 -080018
19class DynamicChangeQueueContextManager(object):
20 def __init__(self, change_queue):
21 self.change_queue = change_queue
22
23 def __enter__(self):
24 return self.change_queue
25
26 def __exit__(self, etype, value, tb):
27 if self.change_queue and not self.change_queue.queue:
28 self.change_queue.pipeline.removeQueue(self.change_queue)
29
30
31class StaticChangeQueueContextManager(object):
32 def __init__(self, change_queue):
33 self.change_queue = change_queue
34
35 def __enter__(self):
36 return self.change_queue
37
38 def __exit__(self, etype, value, tb):
39 pass
40
41
Monty Taylorc75478c2016-07-29 12:04:21 -070042class PipelineManager(object):
Monty Taylor82dfd412016-07-29 12:01:28 -070043 """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
44
Monty Taylorc75478c2016-07-29 12:04:21 -070045 log = logging.getLogger("zuul.PipelineManager")
James E. Blair83005782015-12-11 14:46:03 -080046
47 def __init__(self, sched, pipeline):
48 self.sched = sched
49 self.pipeline = pipeline
50 self.event_filters = []
51 self.changeish_filters = []
52
53 def __str__(self):
54 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
55
56 def _postConfig(self, layout):
57 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
58 self.log.info(" Source: %s" % self.pipeline.source)
59 self.log.info(" Requirements:")
60 for f in self.changeish_filters:
61 self.log.info(" %s" % f)
62 self.log.info(" Events:")
63 for e in self.event_filters:
64 self.log.info(" %s" % e)
65 self.log.info(" Projects:")
66
67 def log_jobs(tree, indent=0):
68 istr = ' ' + ' ' * indent
69 if tree.job:
70 efilters = ''
71 for b in tree.job._branches:
72 efilters += str(b)
73 for f in tree.job._files:
74 efilters += str(f)
75 if tree.job.skip_if_matcher:
76 efilters += str(tree.job.skip_if_matcher)
77 if efilters:
78 efilters = ' ' + efilters
Joshua Hesketh89b67f62016-02-11 21:22:14 +110079 tags = []
James E. Blair83005782015-12-11 14:46:03 -080080 if tree.job.hold_following_changes:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110081 tags.append('[hold]')
James E. Blair83005782015-12-11 14:46:03 -080082 if not tree.job.voting:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110083 tags.append('[nonvoting]')
84 if tree.job.mutex:
85 tags.append('[mutex: %s]' % tree.job.mutex)
86 tags = ' '.join(tags)
87 self.log.info("%s%s%s %s" % (istr, repr(tree.job),
88 efilters, tags))
James E. Blair83005782015-12-11 14:46:03 -080089 for x in tree.job_trees:
90 log_jobs(x, indent + 2)
91
92 for p in layout.projects.values():
93 tree = self.pipeline.getJobTree(p)
94 if tree:
95 self.log.info(" %s" % p)
96 log_jobs(tree)
97 self.log.info(" On start:")
98 self.log.info(" %s" % self.pipeline.start_actions)
99 self.log.info(" On success:")
100 self.log.info(" %s" % self.pipeline.success_actions)
101 self.log.info(" On failure:")
102 self.log.info(" %s" % self.pipeline.failure_actions)
103 self.log.info(" On merge-failure:")
104 self.log.info(" %s" % self.pipeline.merge_failure_actions)
105 self.log.info(" When disabled:")
106 self.log.info(" %s" % self.pipeline.disabled_actions)
107
108 def getSubmitAllowNeeds(self):
109 # Get a list of code review labels that are allowed to be
110 # "needed" in the submit records for a change, with respect
111 # to this queue. In other words, the list of review labels
112 # this queue itself is likely to set before submitting.
113 allow_needs = set()
114 for action_reporter in self.pipeline.success_actions:
115 allow_needs.update(action_reporter.getSubmitAllowNeeds())
116 return allow_needs
117
118 def eventMatches(self, event, change):
119 if event.forced_pipeline:
120 if event.forced_pipeline == self.pipeline.name:
121 self.log.debug("Event %s for change %s was directly assigned "
122 "to pipeline %s" % (event, change, self))
123 return True
124 else:
125 return False
126 for ef in self.event_filters:
127 if ef.matches(event, change):
128 self.log.debug("Event %s for change %s matched %s "
129 "in pipeline %s" % (event, change, ef, self))
130 return True
131 return False
132
133 def isChangeAlreadyInPipeline(self, change):
134 # Checks live items in the pipeline
135 for item in self.pipeline.getAllItems():
136 if item.live and change.equals(item.change):
137 return True
138 return False
139
140 def isChangeAlreadyInQueue(self, change, change_queue):
141 # Checks any item in the specified change queue
142 for item in change_queue.queue:
143 if change.equals(item.change):
144 return True
145 return False
146
147 def reportStart(self, item):
148 if not self.pipeline._disabled:
149 try:
150 self.log.info("Reporting start, action %s item %s" %
151 (self.pipeline.start_actions, item))
152 ret = self.sendReport(self.pipeline.start_actions,
153 self.pipeline.source, item)
154 if ret:
155 self.log.error("Reporting item start %s received: %s" %
156 (item, ret))
157 except:
158 self.log.exception("Exception while reporting start:")
159
160 def sendReport(self, action_reporters, source, item,
161 message=None):
162 """Sends the built message off to configured reporters.
163
164 Takes the action_reporters, item, message and extra options and
165 sends them to the pluggable reporters.
166 """
167 report_errors = []
168 if len(action_reporters) > 0:
169 for reporter in action_reporters:
170 ret = reporter.report(source, self.pipeline, item)
171 if ret:
172 report_errors.append(ret)
173 if len(report_errors) == 0:
174 return
175 return report_errors
176
177 def isChangeReadyToBeEnqueued(self, change):
178 return True
179
180 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
181 change_queue):
182 return True
183
184 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
185 change_queue):
186 return True
187
188 def checkForChangesNeededBy(self, change, change_queue):
189 return True
190
191 def getFailingDependentItems(self, item):
192 return None
193
194 def getDependentItems(self, item):
195 orig_item = item
196 items = []
197 while item.item_ahead:
198 items.append(item.item_ahead)
199 item = item.item_ahead
200 self.log.info("Change %s depends on changes %s" %
201 (orig_item.change,
202 [x.change for x in items]))
203 return items
204
205 def getItemForChange(self, change):
206 for item in self.pipeline.getAllItems():
207 if item.change.equals(change):
208 return item
209 return None
210
211 def findOldVersionOfChangeAlreadyInQueue(self, change):
212 for item in self.pipeline.getAllItems():
213 if not item.live:
214 continue
215 if change.isUpdateOf(item.change):
216 return item
217 return None
218
219 def removeOldVersionsOfChange(self, change):
220 if not self.pipeline.dequeue_on_new_patchset:
221 return
222 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
223 if old_item:
224 self.log.debug("Change %s is a new version of %s, removing %s" %
225 (change, old_item.change, old_item))
226 self.removeItem(old_item)
227
228 def removeAbandonedChange(self, change):
229 self.log.debug("Change %s abandoned, removing." % change)
230 for item in self.pipeline.getAllItems():
231 if not item.live:
232 continue
233 if item.change.equals(change):
234 self.removeItem(item)
235
236 def reEnqueueItem(self, item, last_head):
237 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
238 if change_queue:
239 self.log.debug("Re-enqueing change %s in queue %s" %
240 (item.change, change_queue))
241 change_queue.enqueueItem(item)
242
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700243 # Get an updated copy of the layout if necessary.
244 # This will return one of the following:
245 # 1) An existing layout from the item ahead or pipeline.
246 # 2) A newly created layout from the cached pipeline
247 # layout config plus the previously returned
248 # in-repo files stored in the buildset.
249 # 3) None in the case that a fetch of the files from
250 # the merger is still pending.
251 item.current_build_set.layout = self.getLayout(item)
252
253 # Rebuild the frozen job tree from the new layout, if
254 # we have one. If not, it will be built later.
255 if item.current_build_set.layout:
256 item.freezeJobTree()
257
James E. Blair83005782015-12-11 14:46:03 -0800258 # Re-set build results in case any new jobs have been
259 # added to the tree.
260 for build in item.current_build_set.getBuilds():
261 if build.result:
James E. Blairdbfd3282016-07-21 10:46:19 -0700262 item.setResult(build)
James E. Blair83005782015-12-11 14:46:03 -0800263 # Similarly, reset the item state.
264 if item.current_build_set.unable_to_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700265 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800266 if item.dequeued_needing_change:
James E. Blairdbfd3282016-07-21 10:46:19 -0700267 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800268
269 self.reportStats(item)
270 return True
271 else:
272 self.log.error("Unable to find change queue for project %s" %
273 item.change.project)
274 return False
275
276 def addChange(self, change, quiet=False, enqueue_time=None,
277 ignore_requirements=False, live=True,
278 change_queue=None):
279 self.log.debug("Considering adding change %s" % change)
280
281 # If we are adding a live change, check if it's a live item
282 # anywhere in the pipeline. Otherwise, we will perform the
283 # duplicate check below on the specific change_queue.
284 if live and self.isChangeAlreadyInPipeline(change):
285 self.log.debug("Change %s is already in pipeline, "
286 "ignoring" % change)
287 return True
288
289 if not self.isChangeReadyToBeEnqueued(change):
290 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
291 change)
292 return False
293
294 if not ignore_requirements:
295 for f in self.changeish_filters:
296 if not f.matches(change):
297 self.log.debug("Change %s does not match pipeline "
298 "requirement %s" % (change, f))
299 return False
300
301 with self.getChangeQueue(change, change_queue) as change_queue:
302 if not change_queue:
303 self.log.debug("Unable to find change queue for "
304 "change %s in project %s" %
305 (change, change.project))
306 return False
307
308 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
309 change_queue):
310 self.log.debug("Failed to enqueue changes "
311 "ahead of %s" % change)
312 return False
313
314 if self.isChangeAlreadyInQueue(change, change_queue):
315 self.log.debug("Change %s is already in queue, "
316 "ignoring" % change)
317 return True
318
319 self.log.debug("Adding change %s to queue %s" %
320 (change, change_queue))
321 item = change_queue.enqueueChange(change)
322 if enqueue_time:
323 item.enqueue_time = enqueue_time
324 item.live = live
325 self.reportStats(item)
326 if not quiet:
327 if len(self.pipeline.start_actions) > 0:
328 self.reportStart(item)
329 self.enqueueChangesBehind(change, quiet, ignore_requirements,
330 change_queue)
331 for trigger in self.sched.triggers.values():
332 trigger.onChangeEnqueued(item.change, self.pipeline)
333 return True
334
335 def dequeueItem(self, item):
336 self.log.debug("Removing change %s from queue" % item.change)
337 item.queue.dequeueItem(item)
338
339 def removeItem(self, item):
340 # Remove an item from the queue, probably because it has been
341 # superseded by another change.
342 self.log.debug("Canceling builds behind change: %s "
343 "because it is being removed." % item.change)
344 self.cancelJobs(item)
345 self.dequeueItem(item)
346 self.reportStats(item)
347
James E. Blair8d692392016-04-08 17:47:58 -0700348 def provisionNodes(self, item):
James E. Blairdbfd3282016-07-21 10:46:19 -0700349 jobs = item.findJobsToRequest()
James E. Blair8d692392016-04-08 17:47:58 -0700350 if not jobs:
351 return False
352 build_set = item.current_build_set
353 self.log.debug("Requesting nodes for change %s" % item.change)
354 for job in jobs:
355 req = self.sched.nodepool.requestNodes(build_set, job)
356 self.log.debug("Adding node request %s for job %s to item %s" %
357 (req, job, item))
358 build_set.setJobNodeRequest(job.name, req)
359 return True
360
James E. Blair83005782015-12-11 14:46:03 -0800361 def _launchJobs(self, item, jobs):
362 self.log.debug("Launching jobs for change %s" % item.change)
363 dependent_items = self.getDependentItems(item)
364 for job in jobs:
365 self.log.debug("Found job %s for change %s" % (job, item.change))
366 try:
James E. Blaircacdf2b2017-01-04 13:14:37 -0800367 nodeset = item.current_build_set.getJobNodeSet(job.name)
368 self.sched.nodepool.useNodeset(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800369 build = self.sched.launcher.launch(job, item,
370 self.pipeline,
371 dependent_items)
372 self.log.debug("Adding build %s of job %s to item %s" %
373 (build, job, item))
374 item.addBuild(build)
375 except:
376 self.log.exception("Exception while launching job %s "
377 "for change %s:" % (job, item.change))
378
379 def launchJobs(self, item):
380 # TODO(jeblair): This should return a value indicating a job
381 # was launched. Appears to be a longstanding bug.
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700382 if not item.current_build_set.layout:
383 return False
384
James E. Blairdbfd3282016-07-21 10:46:19 -0700385 jobs = item.findJobsToRun(self.sched.mutex)
James E. Blair83005782015-12-11 14:46:03 -0800386 if jobs:
387 self._launchJobs(item, jobs)
388
389 def cancelJobs(self, item, prime=True):
390 self.log.debug("Cancel jobs for change %s" % item.change)
391 canceled = False
392 old_build_set = item.current_build_set
393 if prime and item.current_build_set.ref:
394 item.resetAllBuilds()
James E. Blair8d692392016-04-08 17:47:58 -0700395 for req in old_build_set.node_requests.values():
396 self.sched.nodepool.cancelRequest(req)
397 old_build_set.node_requests = {}
James E. Blaire18d4602017-01-05 11:17:28 -0800398 canceled_jobs = set()
James E. Blair83005782015-12-11 14:46:03 -0800399 for build in old_build_set.getBuilds():
James E. Blair01695c32017-01-04 17:29:25 -0800400 was_running = False
James E. Blair83005782015-12-11 14:46:03 -0800401 try:
James E. Blair01695c32017-01-04 17:29:25 -0800402 was_running = self.sched.launcher.cancel(build)
James E. Blair83005782015-12-11 14:46:03 -0800403 except:
404 self.log.exception("Exception while canceling build %s "
405 "for change %s" % (build, item.change))
James E. Blair01695c32017-01-04 17:29:25 -0800406 if not was_running:
407 try:
408 nodeset = build.build_set.getJobNodeSet(build.job.name)
James E. Blaire18d4602017-01-05 11:17:28 -0800409 self.sched.nodepool.returnNodeset(nodeset)
James E. Blair01695c32017-01-04 17:29:25 -0800410 except Exception:
411 self.log.exception("Unable to return nodeset %s for "
412 "canceled build request %s" %
413 (nodeset, build))
James E. Blair83005782015-12-11 14:46:03 -0800414 build.result = 'CANCELED'
415 canceled = True
James E. Blaire18d4602017-01-05 11:17:28 -0800416 canceled_jobs.add(build.job.name)
417 for jobname, nodeset in old_build_set.nodesets.items()[:]:
418 if jobname in canceled_jobs:
419 continue
420 self.sched.nodepool.returnNodeset(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800421 for item_behind in item.items_behind:
422 self.log.debug("Canceling jobs for change %s, behind change %s" %
423 (item_behind.change, item.change))
424 if self.cancelJobs(item_behind, prime=prime):
425 canceled = True
426 return canceled
427
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700428 def _makeMergerItem(self, item):
429 # Create a dictionary with all info about the item needed by
430 # the merger.
431 number = None
432 patchset = None
433 oldrev = None
434 newrev = None
435 if hasattr(item.change, 'number'):
436 number = item.change.number
437 patchset = item.change.patchset
438 elif hasattr(item.change, 'newrev'):
439 oldrev = item.change.oldrev
440 newrev = item.change.newrev
441 connection_name = self.pipeline.source.connection.connection_name
Adam Gandelman8bd57102016-12-02 12:58:42 -0800442
443 project = item.change.project.name
444 return dict(project=project,
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700445 url=self.pipeline.source.getGitUrl(
446 item.change.project),
447 connection_name=connection_name,
Adam Gandelman8bd57102016-12-02 12:58:42 -0800448 merge_mode=item.current_build_set.getMergeMode(project),
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700449 refspec=item.change.refspec,
450 branch=item.change.branch,
451 ref=item.current_build_set.ref,
452 number=number,
453 patchset=patchset,
454 oldrev=oldrev,
455 newrev=newrev,
456 )
457
458 def getLayout(self, item):
459 if not item.change.updatesConfig():
460 if item.item_ahead:
461 return item.item_ahead.current_build_set.layout
462 else:
463 return item.queue.pipeline.layout
464 # This item updates the config, ask the merger for the result.
465 build_set = item.current_build_set
466 if build_set.merge_state == build_set.PENDING:
467 return None
468 if build_set.merge_state == build_set.COMPLETE:
469 if build_set.unable_to_merge:
470 return None
471 # Load layout
Monty Taylor82dfd412016-07-29 12:01:28 -0700472 # Late import to break an import loop
473 import zuul.configloader
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700474 loader = zuul.configloader.ConfigLoader()
475 self.log.debug("Load dynamic layout with %s" % build_set.files)
476 layout = loader.createDynamicLayout(item.pipeline.layout.tenant,
477 build_set.files)
478 return layout
479 build_set.merge_state = build_set.PENDING
480 self.log.debug("Preparing dynamic layout for: %s" % item.change)
481 dependent_items = self.getDependentItems(item)
482 dependent_items.reverse()
483 all_items = dependent_items + [item]
484 merger_items = map(self._makeMergerItem, all_items)
485 self.sched.merger.mergeChanges(merger_items,
486 item.current_build_set,
487 ['.zuul.yaml'],
488 self.pipeline.precedence)
489
490 def prepareLayout(self, item):
491 # Get a copy of the layout in the context of the current
492 # queue.
493 # Returns True if the ref is ready, false otherwise
494 if not item.current_build_set.ref:
495 item.current_build_set.setConfiguration()
496 if not item.current_build_set.layout:
497 item.current_build_set.layout = self.getLayout(item)
498 if not item.current_build_set.layout:
499 return False
500 if not item.job_tree:
501 item.freezeJobTree()
502 return True
503
James E. Blair83005782015-12-11 14:46:03 -0800504 def _processOneItem(self, item, nnfi):
505 changed = False
506 item_ahead = item.item_ahead
507 if item_ahead and (not item_ahead.live):
508 item_ahead = None
509 change_queue = item.queue
510 failing_reasons = [] # Reasons this item is failing
511
512 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
513 # It's not okay to enqueue this change, we should remove it.
514 self.log.info("Dequeuing change %s because "
515 "it can no longer merge" % item.change)
516 self.cancelJobs(item)
517 self.dequeueItem(item)
James E. Blairdbfd3282016-07-21 10:46:19 -0700518 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800519 if item.live:
520 try:
521 self.reportItem(item)
522 except exceptions.MergeFailure:
523 pass
524 return (True, nnfi)
525 dep_items = self.getFailingDependentItems(item)
526 actionable = change_queue.isActionable(item)
527 item.active = actionable
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700528 ready = False
James E. Blair83005782015-12-11 14:46:03 -0800529 if dep_items:
530 failing_reasons.append('a needed change is failing')
531 self.cancelJobs(item, prime=False)
532 else:
533 item_ahead_merged = False
534 if (item_ahead and item_ahead.change.is_merged):
535 item_ahead_merged = True
536 if (item_ahead != nnfi and not item_ahead_merged):
537 # Our current base is different than what we expected,
538 # and it's not because our current base merged. Something
539 # ahead must have failed.
540 self.log.info("Resetting builds for change %s because the "
541 "item ahead, %s, is not the nearest non-failing "
542 "item, %s" % (item.change, item_ahead, nnfi))
543 change_queue.moveItem(item, nnfi)
544 changed = True
545 self.cancelJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700546 if actionable:
547 ready = self.prepareLayout(item)
548 if item.current_build_set.unable_to_merge:
549 failing_reasons.append("it has a merge conflict")
550 if ready and self.provisionNodes(item):
551 changed = True
552 if actionable and ready and self.launchJobs(item):
553 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700554 if item.didAnyJobFail():
James E. Blair83005782015-12-11 14:46:03 -0800555 failing_reasons.append("at least one job failed")
556 if (not item.live) and (not item.items_behind):
557 failing_reasons.append("is a non-live item with no items behind")
558 self.dequeueItem(item)
559 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700560 if ((not item_ahead) and item.areAllJobsComplete() and item.live):
James E. Blair83005782015-12-11 14:46:03 -0800561 try:
562 self.reportItem(item)
563 except exceptions.MergeFailure:
564 failing_reasons.append("it did not merge")
565 for item_behind in item.items_behind:
566 self.log.info("Resetting builds for change %s because the "
567 "item ahead, %s, failed to merge" %
568 (item_behind.change, item))
569 self.cancelJobs(item_behind)
570 self.dequeueItem(item)
571 changed = True
572 elif not failing_reasons and item.live:
573 nnfi = item
574 item.current_build_set.failing_reasons = failing_reasons
575 if failing_reasons:
576 self.log.debug("%s is a failing item because %s" %
577 (item, failing_reasons))
578 return (changed, nnfi)
579
580 def processQueue(self):
581 # Do whatever needs to be done for each change in the queue
582 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
583 changed = False
584 for queue in self.pipeline.queues:
585 queue_changed = False
586 nnfi = None # Nearest non-failing item
587 for item in queue.queue[:]:
588 item_changed, nnfi = self._processOneItem(
589 item, nnfi)
590 if item_changed:
591 queue_changed = True
592 self.reportStats(item)
593 if queue_changed:
594 changed = True
595 status = ''
596 for item in queue.queue:
597 status += item.formatStatus()
598 if status:
599 self.log.debug("Queue %s status is now:\n %s" %
600 (queue.name, status))
601 self.log.debug("Finished queue processor: %s (changed: %s)" %
602 (self.pipeline.name, changed))
603 return changed
604
James E. Blair83005782015-12-11 14:46:03 -0800605 def onBuildStarted(self, build):
606 self.log.debug("Build %s started" % build)
607 return True
608
609 def onBuildCompleted(self, build):
610 self.log.debug("Build %s completed" % build)
611 item = build.build_set.item
612
James E. Blairdbfd3282016-07-21 10:46:19 -0700613 item.setResult(build)
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100614 self.sched.mutex.release(item, build.job)
James E. Blair83005782015-12-11 14:46:03 -0800615 self.log.debug("Item %s status is now:\n %s" %
616 (item, item.formatStatus()))
James E. Blair62295d32017-01-04 13:27:58 -0800617
James E. Blaire18d4602017-01-05 11:17:28 -0800618 if build.retry:
619 build.build_set.removeJobNodeSet(build.job.name)
620
621 # If any jobs were skipped as a result of this build, return
622 # their nodes.
623 for build in build.build_set.getBuilds():
624 if build.result == 'SKIPPED':
625 nodeset = build.build_set.getJobNodeSet(build.job.name)
626 self.sched.nodepool.returnNodeset(nodeset)
James E. Blair62295d32017-01-04 13:27:58 -0800627
James E. Blair83005782015-12-11 14:46:03 -0800628 return True
629
630 def onMergeCompleted(self, event):
631 build_set = event.build_set
632 item = build_set.item
633 build_set.merge_state = build_set.COMPLETE
634 build_set.zuul_url = event.zuul_url
635 if event.merged:
636 build_set.commit = event.commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700637 build_set.files.setFiles(event.files)
James E. Blair83005782015-12-11 14:46:03 -0800638 elif event.updated:
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100639 if not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800640 build_set.commit = item.change.newrev
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100641 if not build_set.commit and not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800642 self.log.info("Unable to merge change %s" % item.change)
James E. Blairdbfd3282016-07-21 10:46:19 -0700643 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800644
James E. Blair8d692392016-04-08 17:47:58 -0700645 def onNodesProvisioned(self, event):
James E. Blaira38c28e2017-01-04 10:33:20 -0800646 # TODOv3(jeblair): handle provisioning failure here
James E. Blair8d692392016-04-08 17:47:58 -0700647 request = event.request
648 build_set = request.build_set
649 build_set.jobNodeRequestComplete(request.job.name, request,
James E. Blair0eaad552016-09-02 12:09:54 -0700650 request.nodeset)
James E. Blair34776ee2016-08-25 13:53:54 -0700651 self.log.info("Completed node request %s for job %s of item %s "
652 "with nodes %s" %
653 (request, request.job, build_set.item,
James E. Blair0eaad552016-09-02 12:09:54 -0700654 request.nodeset))
James E. Blair8d692392016-04-08 17:47:58 -0700655
James E. Blair83005782015-12-11 14:46:03 -0800656 def reportItem(self, item):
657 if not item.reported:
658 # _reportItem() returns True if it failed to report.
659 item.reported = not self._reportItem(item)
660 if self.changes_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700661 succeeded = item.didAllJobsSucceed()
James E. Blair83005782015-12-11 14:46:03 -0800662 merged = item.reported
663 if merged:
664 merged = self.pipeline.source.isMerged(item.change,
665 item.change.branch)
666 self.log.info("Reported change %s status: all-succeeded: %s, "
667 "merged: %s" % (item.change, succeeded, merged))
668 change_queue = item.queue
669 if not (succeeded and merged):
670 self.log.debug("Reported change %s failed tests or failed "
671 "to merge" % (item.change))
672 change_queue.decreaseWindowSize()
673 self.log.debug("%s window size decreased to %s" %
674 (change_queue, change_queue.window))
675 raise exceptions.MergeFailure(
676 "Change %s failed to merge" % item.change)
677 else:
678 change_queue.increaseWindowSize()
679 self.log.debug("%s window size increased to %s" %
680 (change_queue, change_queue.window))
681
682 for trigger in self.sched.triggers.values():
683 trigger.onChangeMerged(item.change, self.pipeline.source)
684
685 def _reportItem(self, item):
686 self.log.debug("Reporting change %s" % item.change)
687 ret = True # Means error as returned by trigger.report
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700688 if not item.getJobs():
James E. Blair83005782015-12-11 14:46:03 -0800689 # We don't send empty reports with +1,
690 # and the same for -1's (merge failures or transient errors)
691 # as they cannot be followed by +1's
692 self.log.debug("No jobs for change %s" % item.change)
693 actions = []
James E. Blairdbfd3282016-07-21 10:46:19 -0700694 elif item.didAllJobsSucceed():
James E. Blair83005782015-12-11 14:46:03 -0800695 self.log.debug("success %s" % (self.pipeline.success_actions))
696 actions = self.pipeline.success_actions
697 item.setReportedResult('SUCCESS')
698 self.pipeline._consecutive_failures = 0
James E. Blairdbfd3282016-07-21 10:46:19 -0700699 elif item.didMergerFail():
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700700 actions = self.pipeline.merge_failure_actions
701 item.setReportedResult('MERGER_FAILURE')
James E. Blair83005782015-12-11 14:46:03 -0800702 else:
703 actions = self.pipeline.failure_actions
704 item.setReportedResult('FAILURE')
705 self.pipeline._consecutive_failures += 1
706 if self.pipeline._disabled:
707 actions = self.pipeline.disabled_actions
708 # Check here if we should disable so that we only use the disabled
709 # reporters /after/ the last disable_at failure is still reported as
710 # normal.
711 if (self.pipeline.disable_at and not self.pipeline._disabled and
712 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
713 self.pipeline._disabled = True
714 if actions:
715 try:
716 self.log.info("Reporting item %s, actions: %s" %
717 (item, actions))
718 ret = self.sendReport(actions, self.pipeline.source, item)
719 if ret:
720 self.log.error("Reporting item %s received: %s" %
721 (item, ret))
722 except:
723 self.log.exception("Exception while reporting:")
724 item.setReportedResult('ERROR')
James E. Blair83005782015-12-11 14:46:03 -0800725 return ret
726
727 def reportStats(self, item):
James E. Blair552b54f2016-07-22 13:55:32 -0700728 if not self.sched.statsd:
James E. Blair83005782015-12-11 14:46:03 -0800729 return
730 try:
731 # Update the gauge on enqueue and dequeue, but timers only
732 # when dequeing.
733 if item.dequeue_time:
734 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
735 else:
736 dt = None
737 items = len(self.pipeline.getAllItems())
738
739 # stats.timers.zuul.pipeline.NAME.resident_time
740 # stats_counts.zuul.pipeline.NAME.total_changes
741 # stats.gauges.zuul.pipeline.NAME.current_changes
742 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700743 self.sched.statsd.gauge(key + '.current_changes', items)
James E. Blair83005782015-12-11 14:46:03 -0800744 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700745 self.sched.statsd.timing(key + '.resident_time', dt)
746 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800747
748 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
749 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
750 project_name = item.change.project.name.replace('/', '.')
751 key += '.%s' % project_name
752 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700753 self.sched.statsd.timing(key + '.resident_time', dt)
754 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800755 except:
756 self.log.exception("Exception reporting pipeline stats")