blob: 7f64986250aa6de888edecaa181daa768a756f8e [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013import logging
14
15from zuul import exceptions
16from zuul.model import NullChange
17
James E. Blair83005782015-12-11 14:46:03 -080018
19class DynamicChangeQueueContextManager(object):
20 def __init__(self, change_queue):
21 self.change_queue = change_queue
22
23 def __enter__(self):
24 return self.change_queue
25
26 def __exit__(self, etype, value, tb):
27 if self.change_queue and not self.change_queue.queue:
28 self.change_queue.pipeline.removeQueue(self.change_queue)
29
30
31class StaticChangeQueueContextManager(object):
32 def __init__(self, change_queue):
33 self.change_queue = change_queue
34
35 def __enter__(self):
36 return self.change_queue
37
38 def __exit__(self, etype, value, tb):
39 pass
40
41
Monty Taylorc75478c2016-07-29 12:04:21 -070042class PipelineManager(object):
Monty Taylor82dfd412016-07-29 12:01:28 -070043 """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
44
Monty Taylorc75478c2016-07-29 12:04:21 -070045 log = logging.getLogger("zuul.PipelineManager")
James E. Blair83005782015-12-11 14:46:03 -080046
47 def __init__(self, sched, pipeline):
48 self.sched = sched
49 self.pipeline = pipeline
50 self.event_filters = []
51 self.changeish_filters = []
52
53 def __str__(self):
54 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
55
56 def _postConfig(self, layout):
57 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
58 self.log.info(" Source: %s" % self.pipeline.source)
59 self.log.info(" Requirements:")
60 for f in self.changeish_filters:
61 self.log.info(" %s" % f)
62 self.log.info(" Events:")
63 for e in self.event_filters:
64 self.log.info(" %s" % e)
65 self.log.info(" Projects:")
66
67 def log_jobs(tree, indent=0):
68 istr = ' ' + ' ' * indent
69 if tree.job:
70 efilters = ''
71 for b in tree.job._branches:
72 efilters += str(b)
73 for f in tree.job._files:
74 efilters += str(f)
75 if tree.job.skip_if_matcher:
76 efilters += str(tree.job.skip_if_matcher)
77 if efilters:
78 efilters = ' ' + efilters
Joshua Hesketh89b67f62016-02-11 21:22:14 +110079 tags = []
James E. Blair83005782015-12-11 14:46:03 -080080 if tree.job.hold_following_changes:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110081 tags.append('[hold]')
James E. Blair83005782015-12-11 14:46:03 -080082 if not tree.job.voting:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110083 tags.append('[nonvoting]')
84 if tree.job.mutex:
85 tags.append('[mutex: %s]' % tree.job.mutex)
86 tags = ' '.join(tags)
87 self.log.info("%s%s%s %s" % (istr, repr(tree.job),
88 efilters, tags))
James E. Blair83005782015-12-11 14:46:03 -080089 for x in tree.job_trees:
90 log_jobs(x, indent + 2)
91
92 for p in layout.projects.values():
93 tree = self.pipeline.getJobTree(p)
94 if tree:
95 self.log.info(" %s" % p)
96 log_jobs(tree)
97 self.log.info(" On start:")
98 self.log.info(" %s" % self.pipeline.start_actions)
99 self.log.info(" On success:")
100 self.log.info(" %s" % self.pipeline.success_actions)
101 self.log.info(" On failure:")
102 self.log.info(" %s" % self.pipeline.failure_actions)
103 self.log.info(" On merge-failure:")
104 self.log.info(" %s" % self.pipeline.merge_failure_actions)
105 self.log.info(" When disabled:")
106 self.log.info(" %s" % self.pipeline.disabled_actions)
107
108 def getSubmitAllowNeeds(self):
109 # Get a list of code review labels that are allowed to be
110 # "needed" in the submit records for a change, with respect
111 # to this queue. In other words, the list of review labels
112 # this queue itself is likely to set before submitting.
113 allow_needs = set()
114 for action_reporter in self.pipeline.success_actions:
115 allow_needs.update(action_reporter.getSubmitAllowNeeds())
116 return allow_needs
117
118 def eventMatches(self, event, change):
119 if event.forced_pipeline:
120 if event.forced_pipeline == self.pipeline.name:
121 self.log.debug("Event %s for change %s was directly assigned "
122 "to pipeline %s" % (event, change, self))
123 return True
124 else:
125 return False
126 for ef in self.event_filters:
127 if ef.matches(event, change):
128 self.log.debug("Event %s for change %s matched %s "
129 "in pipeline %s" % (event, change, ef, self))
130 return True
131 return False
132
133 def isChangeAlreadyInPipeline(self, change):
134 # Checks live items in the pipeline
135 for item in self.pipeline.getAllItems():
136 if item.live and change.equals(item.change):
137 return True
138 return False
139
140 def isChangeAlreadyInQueue(self, change, change_queue):
141 # Checks any item in the specified change queue
142 for item in change_queue.queue:
143 if change.equals(item.change):
144 return True
145 return False
146
147 def reportStart(self, item):
148 if not self.pipeline._disabled:
149 try:
150 self.log.info("Reporting start, action %s item %s" %
151 (self.pipeline.start_actions, item))
152 ret = self.sendReport(self.pipeline.start_actions,
153 self.pipeline.source, item)
154 if ret:
155 self.log.error("Reporting item start %s received: %s" %
156 (item, ret))
157 except:
158 self.log.exception("Exception while reporting start:")
159
160 def sendReport(self, action_reporters, source, item,
161 message=None):
162 """Sends the built message off to configured reporters.
163
164 Takes the action_reporters, item, message and extra options and
165 sends them to the pluggable reporters.
166 """
167 report_errors = []
168 if len(action_reporters) > 0:
169 for reporter in action_reporters:
170 ret = reporter.report(source, self.pipeline, item)
171 if ret:
172 report_errors.append(ret)
173 if len(report_errors) == 0:
174 return
175 return report_errors
176
177 def isChangeReadyToBeEnqueued(self, change):
178 return True
179
180 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
181 change_queue):
182 return True
183
184 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
185 change_queue):
186 return True
187
188 def checkForChangesNeededBy(self, change, change_queue):
189 return True
190
191 def getFailingDependentItems(self, item):
192 return None
193
194 def getDependentItems(self, item):
195 orig_item = item
196 items = []
197 while item.item_ahead:
198 items.append(item.item_ahead)
199 item = item.item_ahead
200 self.log.info("Change %s depends on changes %s" %
201 (orig_item.change,
202 [x.change for x in items]))
203 return items
204
205 def getItemForChange(self, change):
206 for item in self.pipeline.getAllItems():
207 if item.change.equals(change):
208 return item
209 return None
210
211 def findOldVersionOfChangeAlreadyInQueue(self, change):
212 for item in self.pipeline.getAllItems():
213 if not item.live:
214 continue
215 if change.isUpdateOf(item.change):
216 return item
217 return None
218
219 def removeOldVersionsOfChange(self, change):
220 if not self.pipeline.dequeue_on_new_patchset:
221 return
222 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
223 if old_item:
224 self.log.debug("Change %s is a new version of %s, removing %s" %
225 (change, old_item.change, old_item))
226 self.removeItem(old_item)
227
228 def removeAbandonedChange(self, change):
229 self.log.debug("Change %s abandoned, removing." % change)
230 for item in self.pipeline.getAllItems():
231 if not item.live:
232 continue
233 if item.change.equals(change):
234 self.removeItem(item)
235
236 def reEnqueueItem(self, item, last_head):
237 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
238 if change_queue:
239 self.log.debug("Re-enqueing change %s in queue %s" %
240 (item.change, change_queue))
241 change_queue.enqueueItem(item)
242
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700243 # Get an updated copy of the layout if necessary.
244 # This will return one of the following:
245 # 1) An existing layout from the item ahead or pipeline.
246 # 2) A newly created layout from the cached pipeline
247 # layout config plus the previously returned
248 # in-repo files stored in the buildset.
249 # 3) None in the case that a fetch of the files from
250 # the merger is still pending.
251 item.current_build_set.layout = self.getLayout(item)
252
253 # Rebuild the frozen job tree from the new layout, if
254 # we have one. If not, it will be built later.
255 if item.current_build_set.layout:
256 item.freezeJobTree()
257
James E. Blair83005782015-12-11 14:46:03 -0800258 # Re-set build results in case any new jobs have been
259 # added to the tree.
260 for build in item.current_build_set.getBuilds():
261 if build.result:
James E. Blairdbfd3282016-07-21 10:46:19 -0700262 item.setResult(build)
James E. Blair83005782015-12-11 14:46:03 -0800263 # Similarly, reset the item state.
264 if item.current_build_set.unable_to_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700265 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800266 if item.dequeued_needing_change:
James E. Blairdbfd3282016-07-21 10:46:19 -0700267 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800268
269 self.reportStats(item)
270 return True
271 else:
272 self.log.error("Unable to find change queue for project %s" %
273 item.change.project)
274 return False
275
276 def addChange(self, change, quiet=False, enqueue_time=None,
277 ignore_requirements=False, live=True,
278 change_queue=None):
279 self.log.debug("Considering adding change %s" % change)
280
281 # If we are adding a live change, check if it's a live item
282 # anywhere in the pipeline. Otherwise, we will perform the
283 # duplicate check below on the specific change_queue.
284 if live and self.isChangeAlreadyInPipeline(change):
285 self.log.debug("Change %s is already in pipeline, "
286 "ignoring" % change)
287 return True
288
289 if not self.isChangeReadyToBeEnqueued(change):
290 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
291 change)
292 return False
293
294 if not ignore_requirements:
295 for f in self.changeish_filters:
296 if not f.matches(change):
297 self.log.debug("Change %s does not match pipeline "
298 "requirement %s" % (change, f))
299 return False
300
301 with self.getChangeQueue(change, change_queue) as change_queue:
302 if not change_queue:
303 self.log.debug("Unable to find change queue for "
304 "change %s in project %s" %
305 (change, change.project))
306 return False
307
308 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
309 change_queue):
310 self.log.debug("Failed to enqueue changes "
311 "ahead of %s" % change)
312 return False
313
314 if self.isChangeAlreadyInQueue(change, change_queue):
315 self.log.debug("Change %s is already in queue, "
316 "ignoring" % change)
317 return True
318
319 self.log.debug("Adding change %s to queue %s" %
320 (change, change_queue))
321 item = change_queue.enqueueChange(change)
322 if enqueue_time:
323 item.enqueue_time = enqueue_time
324 item.live = live
325 self.reportStats(item)
326 if not quiet:
327 if len(self.pipeline.start_actions) > 0:
328 self.reportStart(item)
329 self.enqueueChangesBehind(change, quiet, ignore_requirements,
330 change_queue)
331 for trigger in self.sched.triggers.values():
332 trigger.onChangeEnqueued(item.change, self.pipeline)
333 return True
334
335 def dequeueItem(self, item):
336 self.log.debug("Removing change %s from queue" % item.change)
337 item.queue.dequeueItem(item)
338
339 def removeItem(self, item):
340 # Remove an item from the queue, probably because it has been
341 # superseded by another change.
342 self.log.debug("Canceling builds behind change: %s "
343 "because it is being removed." % item.change)
344 self.cancelJobs(item)
345 self.dequeueItem(item)
346 self.reportStats(item)
347
James E. Blair8d692392016-04-08 17:47:58 -0700348 def provisionNodes(self, item):
James E. Blairdbfd3282016-07-21 10:46:19 -0700349 jobs = item.findJobsToRequest()
James E. Blair8d692392016-04-08 17:47:58 -0700350 if not jobs:
351 return False
352 build_set = item.current_build_set
353 self.log.debug("Requesting nodes for change %s" % item.change)
354 for job in jobs:
355 req = self.sched.nodepool.requestNodes(build_set, job)
356 self.log.debug("Adding node request %s for job %s to item %s" %
357 (req, job, item))
358 build_set.setJobNodeRequest(job.name, req)
359 return True
360
James E. Blair83005782015-12-11 14:46:03 -0800361 def _launchJobs(self, item, jobs):
362 self.log.debug("Launching jobs for change %s" % item.change)
363 dependent_items = self.getDependentItems(item)
364 for job in jobs:
365 self.log.debug("Found job %s for change %s" % (job, item.change))
366 try:
James E. Blaircacdf2b2017-01-04 13:14:37 -0800367 nodeset = item.current_build_set.getJobNodeSet(job.name)
368 self.sched.nodepool.useNodeset(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800369 build = self.sched.launcher.launch(job, item,
370 self.pipeline,
371 dependent_items)
372 self.log.debug("Adding build %s of job %s to item %s" %
373 (build, job, item))
374 item.addBuild(build)
375 except:
376 self.log.exception("Exception while launching job %s "
377 "for change %s:" % (job, item.change))
378
379 def launchJobs(self, item):
380 # TODO(jeblair): This should return a value indicating a job
381 # was launched. Appears to be a longstanding bug.
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700382 if not item.current_build_set.layout:
383 return False
384
James E. Blairdbfd3282016-07-21 10:46:19 -0700385 jobs = item.findJobsToRun(self.sched.mutex)
James E. Blair83005782015-12-11 14:46:03 -0800386 if jobs:
387 self._launchJobs(item, jobs)
388
389 def cancelJobs(self, item, prime=True):
390 self.log.debug("Cancel jobs for change %s" % item.change)
391 canceled = False
392 old_build_set = item.current_build_set
393 if prime and item.current_build_set.ref:
394 item.resetAllBuilds()
James E. Blair8d692392016-04-08 17:47:58 -0700395 for req in old_build_set.node_requests.values():
396 self.sched.nodepool.cancelRequest(req)
397 old_build_set.node_requests = {}
James E. Blair83005782015-12-11 14:46:03 -0800398 for build in old_build_set.getBuilds():
James E. Blair01695c32017-01-04 17:29:25 -0800399 was_running = False
James E. Blair83005782015-12-11 14:46:03 -0800400 try:
James E. Blair01695c32017-01-04 17:29:25 -0800401 was_running = self.sched.launcher.cancel(build)
James E. Blair83005782015-12-11 14:46:03 -0800402 except:
403 self.log.exception("Exception while canceling build %s "
404 "for change %s" % (build, item.change))
James E. Blair01695c32017-01-04 17:29:25 -0800405 if not was_running:
406 try:
407 nodeset = build.build_set.getJobNodeSet(build.job.name)
408 self.nodepool.returnNodeset(nodeset)
409 except Exception:
410 self.log.exception("Unable to return nodeset %s for "
411 "canceled build request %s" %
412 (nodeset, build))
James E. Blair83005782015-12-11 14:46:03 -0800413 build.result = 'CANCELED'
414 canceled = True
James E. Blair83005782015-12-11 14:46:03 -0800415 for item_behind in item.items_behind:
416 self.log.debug("Canceling jobs for change %s, behind change %s" %
417 (item_behind.change, item.change))
418 if self.cancelJobs(item_behind, prime=prime):
419 canceled = True
420 return canceled
421
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700422 def _makeMergerItem(self, item):
423 # Create a dictionary with all info about the item needed by
424 # the merger.
425 number = None
426 patchset = None
427 oldrev = None
428 newrev = None
429 if hasattr(item.change, 'number'):
430 number = item.change.number
431 patchset = item.change.patchset
432 elif hasattr(item.change, 'newrev'):
433 oldrev = item.change.oldrev
434 newrev = item.change.newrev
435 connection_name = self.pipeline.source.connection.connection_name
Adam Gandelman8bd57102016-12-02 12:58:42 -0800436
437 project = item.change.project.name
438 return dict(project=project,
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700439 url=self.pipeline.source.getGitUrl(
440 item.change.project),
441 connection_name=connection_name,
Adam Gandelman8bd57102016-12-02 12:58:42 -0800442 merge_mode=item.current_build_set.getMergeMode(project),
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700443 refspec=item.change.refspec,
444 branch=item.change.branch,
445 ref=item.current_build_set.ref,
446 number=number,
447 patchset=patchset,
448 oldrev=oldrev,
449 newrev=newrev,
450 )
451
452 def getLayout(self, item):
453 if not item.change.updatesConfig():
454 if item.item_ahead:
455 return item.item_ahead.current_build_set.layout
456 else:
457 return item.queue.pipeline.layout
458 # This item updates the config, ask the merger for the result.
459 build_set = item.current_build_set
460 if build_set.merge_state == build_set.PENDING:
461 return None
462 if build_set.merge_state == build_set.COMPLETE:
463 if build_set.unable_to_merge:
464 return None
465 # Load layout
Monty Taylor82dfd412016-07-29 12:01:28 -0700466 # Late import to break an import loop
467 import zuul.configloader
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700468 loader = zuul.configloader.ConfigLoader()
469 self.log.debug("Load dynamic layout with %s" % build_set.files)
470 layout = loader.createDynamicLayout(item.pipeline.layout.tenant,
471 build_set.files)
472 return layout
473 build_set.merge_state = build_set.PENDING
474 self.log.debug("Preparing dynamic layout for: %s" % item.change)
475 dependent_items = self.getDependentItems(item)
476 dependent_items.reverse()
477 all_items = dependent_items + [item]
478 merger_items = map(self._makeMergerItem, all_items)
479 self.sched.merger.mergeChanges(merger_items,
480 item.current_build_set,
481 ['.zuul.yaml'],
482 self.pipeline.precedence)
483
484 def prepareLayout(self, item):
485 # Get a copy of the layout in the context of the current
486 # queue.
487 # Returns True if the ref is ready, false otherwise
488 if not item.current_build_set.ref:
489 item.current_build_set.setConfiguration()
490 if not item.current_build_set.layout:
491 item.current_build_set.layout = self.getLayout(item)
492 if not item.current_build_set.layout:
493 return False
494 if not item.job_tree:
495 item.freezeJobTree()
496 return True
497
James E. Blair83005782015-12-11 14:46:03 -0800498 def _processOneItem(self, item, nnfi):
499 changed = False
500 item_ahead = item.item_ahead
501 if item_ahead and (not item_ahead.live):
502 item_ahead = None
503 change_queue = item.queue
504 failing_reasons = [] # Reasons this item is failing
505
506 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
507 # It's not okay to enqueue this change, we should remove it.
508 self.log.info("Dequeuing change %s because "
509 "it can no longer merge" % item.change)
510 self.cancelJobs(item)
511 self.dequeueItem(item)
James E. Blairdbfd3282016-07-21 10:46:19 -0700512 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800513 if item.live:
514 try:
515 self.reportItem(item)
516 except exceptions.MergeFailure:
517 pass
518 return (True, nnfi)
519 dep_items = self.getFailingDependentItems(item)
520 actionable = change_queue.isActionable(item)
521 item.active = actionable
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700522 ready = False
James E. Blair83005782015-12-11 14:46:03 -0800523 if dep_items:
524 failing_reasons.append('a needed change is failing')
525 self.cancelJobs(item, prime=False)
526 else:
527 item_ahead_merged = False
528 if (item_ahead and item_ahead.change.is_merged):
529 item_ahead_merged = True
530 if (item_ahead != nnfi and not item_ahead_merged):
531 # Our current base is different than what we expected,
532 # and it's not because our current base merged. Something
533 # ahead must have failed.
534 self.log.info("Resetting builds for change %s because the "
535 "item ahead, %s, is not the nearest non-failing "
536 "item, %s" % (item.change, item_ahead, nnfi))
537 change_queue.moveItem(item, nnfi)
538 changed = True
539 self.cancelJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700540 if actionable:
541 ready = self.prepareLayout(item)
542 if item.current_build_set.unable_to_merge:
543 failing_reasons.append("it has a merge conflict")
544 if ready and self.provisionNodes(item):
545 changed = True
546 if actionable and ready and self.launchJobs(item):
547 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700548 if item.didAnyJobFail():
James E. Blair83005782015-12-11 14:46:03 -0800549 failing_reasons.append("at least one job failed")
550 if (not item.live) and (not item.items_behind):
551 failing_reasons.append("is a non-live item with no items behind")
552 self.dequeueItem(item)
553 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700554 if ((not item_ahead) and item.areAllJobsComplete() and item.live):
James E. Blair83005782015-12-11 14:46:03 -0800555 try:
556 self.reportItem(item)
557 except exceptions.MergeFailure:
558 failing_reasons.append("it did not merge")
559 for item_behind in item.items_behind:
560 self.log.info("Resetting builds for change %s because the "
561 "item ahead, %s, failed to merge" %
562 (item_behind.change, item))
563 self.cancelJobs(item_behind)
564 self.dequeueItem(item)
565 changed = True
566 elif not failing_reasons and item.live:
567 nnfi = item
568 item.current_build_set.failing_reasons = failing_reasons
569 if failing_reasons:
570 self.log.debug("%s is a failing item because %s" %
571 (item, failing_reasons))
572 return (changed, nnfi)
573
574 def processQueue(self):
575 # Do whatever needs to be done for each change in the queue
576 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
577 changed = False
578 for queue in self.pipeline.queues:
579 queue_changed = False
580 nnfi = None # Nearest non-failing item
581 for item in queue.queue[:]:
582 item_changed, nnfi = self._processOneItem(
583 item, nnfi)
584 if item_changed:
585 queue_changed = True
586 self.reportStats(item)
587 if queue_changed:
588 changed = True
589 status = ''
590 for item in queue.queue:
591 status += item.formatStatus()
592 if status:
593 self.log.debug("Queue %s status is now:\n %s" %
594 (queue.name, status))
595 self.log.debug("Finished queue processor: %s (changed: %s)" %
596 (self.pipeline.name, changed))
597 return changed
598
James E. Blair83005782015-12-11 14:46:03 -0800599 def onBuildStarted(self, build):
600 self.log.debug("Build %s started" % build)
601 return True
602
603 def onBuildCompleted(self, build):
604 self.log.debug("Build %s completed" % build)
605 item = build.build_set.item
606
James E. Blairdbfd3282016-07-21 10:46:19 -0700607 item.setResult(build)
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100608 self.sched.mutex.release(item, build.job)
James E. Blair83005782015-12-11 14:46:03 -0800609 self.log.debug("Item %s status is now:\n %s" %
610 (item, item.formatStatus()))
James E. Blair62295d32017-01-04 13:27:58 -0800611
612 try:
613 nodeset = build.build_set.getJobNodeSet(build.job.name)
614 self.nodepool.returnNodeset(nodeset)
615 except Exception:
616 self.log.exception("Unable to return nodeset %s" % (nodeset,))
617
James E. Blair83005782015-12-11 14:46:03 -0800618 return True
619
620 def onMergeCompleted(self, event):
621 build_set = event.build_set
622 item = build_set.item
623 build_set.merge_state = build_set.COMPLETE
624 build_set.zuul_url = event.zuul_url
625 if event.merged:
626 build_set.commit = event.commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700627 build_set.files.setFiles(event.files)
James E. Blair83005782015-12-11 14:46:03 -0800628 elif event.updated:
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100629 if not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800630 build_set.commit = item.change.newrev
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100631 if not build_set.commit and not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800632 self.log.info("Unable to merge change %s" % item.change)
James E. Blairdbfd3282016-07-21 10:46:19 -0700633 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800634
James E. Blair8d692392016-04-08 17:47:58 -0700635 def onNodesProvisioned(self, event):
James E. Blaira38c28e2017-01-04 10:33:20 -0800636 # TODOv3(jeblair): handle provisioning failure here
James E. Blair8d692392016-04-08 17:47:58 -0700637 request = event.request
638 build_set = request.build_set
639 build_set.jobNodeRequestComplete(request.job.name, request,
James E. Blair0eaad552016-09-02 12:09:54 -0700640 request.nodeset)
James E. Blair34776ee2016-08-25 13:53:54 -0700641 self.log.info("Completed node request %s for job %s of item %s "
642 "with nodes %s" %
643 (request, request.job, build_set.item,
James E. Blair0eaad552016-09-02 12:09:54 -0700644 request.nodeset))
James E. Blair8d692392016-04-08 17:47:58 -0700645
James E. Blair83005782015-12-11 14:46:03 -0800646 def reportItem(self, item):
647 if not item.reported:
648 # _reportItem() returns True if it failed to report.
649 item.reported = not self._reportItem(item)
650 if self.changes_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700651 succeeded = item.didAllJobsSucceed()
James E. Blair83005782015-12-11 14:46:03 -0800652 merged = item.reported
653 if merged:
654 merged = self.pipeline.source.isMerged(item.change,
655 item.change.branch)
656 self.log.info("Reported change %s status: all-succeeded: %s, "
657 "merged: %s" % (item.change, succeeded, merged))
658 change_queue = item.queue
659 if not (succeeded and merged):
660 self.log.debug("Reported change %s failed tests or failed "
661 "to merge" % (item.change))
662 change_queue.decreaseWindowSize()
663 self.log.debug("%s window size decreased to %s" %
664 (change_queue, change_queue.window))
665 raise exceptions.MergeFailure(
666 "Change %s failed to merge" % item.change)
667 else:
668 change_queue.increaseWindowSize()
669 self.log.debug("%s window size increased to %s" %
670 (change_queue, change_queue.window))
671
672 for trigger in self.sched.triggers.values():
673 trigger.onChangeMerged(item.change, self.pipeline.source)
674
675 def _reportItem(self, item):
676 self.log.debug("Reporting change %s" % item.change)
677 ret = True # Means error as returned by trigger.report
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700678 if not item.getJobs():
James E. Blair83005782015-12-11 14:46:03 -0800679 # We don't send empty reports with +1,
680 # and the same for -1's (merge failures or transient errors)
681 # as they cannot be followed by +1's
682 self.log.debug("No jobs for change %s" % item.change)
683 actions = []
James E. Blairdbfd3282016-07-21 10:46:19 -0700684 elif item.didAllJobsSucceed():
James E. Blair83005782015-12-11 14:46:03 -0800685 self.log.debug("success %s" % (self.pipeline.success_actions))
686 actions = self.pipeline.success_actions
687 item.setReportedResult('SUCCESS')
688 self.pipeline._consecutive_failures = 0
James E. Blairdbfd3282016-07-21 10:46:19 -0700689 elif item.didMergerFail():
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700690 actions = self.pipeline.merge_failure_actions
691 item.setReportedResult('MERGER_FAILURE')
James E. Blair83005782015-12-11 14:46:03 -0800692 else:
693 actions = self.pipeline.failure_actions
694 item.setReportedResult('FAILURE')
695 self.pipeline._consecutive_failures += 1
696 if self.pipeline._disabled:
697 actions = self.pipeline.disabled_actions
698 # Check here if we should disable so that we only use the disabled
699 # reporters /after/ the last disable_at failure is still reported as
700 # normal.
701 if (self.pipeline.disable_at and not self.pipeline._disabled and
702 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
703 self.pipeline._disabled = True
704 if actions:
705 try:
706 self.log.info("Reporting item %s, actions: %s" %
707 (item, actions))
708 ret = self.sendReport(actions, self.pipeline.source, item)
709 if ret:
710 self.log.error("Reporting item %s received: %s" %
711 (item, ret))
712 except:
713 self.log.exception("Exception while reporting:")
714 item.setReportedResult('ERROR')
James E. Blair83005782015-12-11 14:46:03 -0800715 return ret
716
717 def reportStats(self, item):
James E. Blair552b54f2016-07-22 13:55:32 -0700718 if not self.sched.statsd:
James E. Blair83005782015-12-11 14:46:03 -0800719 return
720 try:
721 # Update the gauge on enqueue and dequeue, but timers only
722 # when dequeing.
723 if item.dequeue_time:
724 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
725 else:
726 dt = None
727 items = len(self.pipeline.getAllItems())
728
729 # stats.timers.zuul.pipeline.NAME.resident_time
730 # stats_counts.zuul.pipeline.NAME.total_changes
731 # stats.gauges.zuul.pipeline.NAME.current_changes
732 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700733 self.sched.statsd.gauge(key + '.current_changes', items)
James E. Blair83005782015-12-11 14:46:03 -0800734 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700735 self.sched.statsd.timing(key + '.resident_time', dt)
736 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800737
738 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
739 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
740 project_name = item.change.project.name.replace('/', '.')
741 key += '.%s' % project_name
742 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700743 self.sched.statsd.timing(key + '.resident_time', dt)
744 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800745 except:
746 self.log.exception("Exception reporting pipeline stats")