blob: 417444a8b0148ec08a3b1be652330fc3f25f3329 [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013import logging
14
15from zuul import exceptions
16from zuul.model import NullChange
17
James E. Blair83005782015-12-11 14:46:03 -080018
19class DynamicChangeQueueContextManager(object):
20 def __init__(self, change_queue):
21 self.change_queue = change_queue
22
23 def __enter__(self):
24 return self.change_queue
25
26 def __exit__(self, etype, value, tb):
27 if self.change_queue and not self.change_queue.queue:
28 self.change_queue.pipeline.removeQueue(self.change_queue)
29
30
31class StaticChangeQueueContextManager(object):
32 def __init__(self, change_queue):
33 self.change_queue = change_queue
34
35 def __enter__(self):
36 return self.change_queue
37
38 def __exit__(self, etype, value, tb):
39 pass
40
41
Monty Taylorc75478c2016-07-29 12:04:21 -070042class PipelineManager(object):
Monty Taylor82dfd412016-07-29 12:01:28 -070043 """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
44
Monty Taylorc75478c2016-07-29 12:04:21 -070045 log = logging.getLogger("zuul.PipelineManager")
James E. Blair83005782015-12-11 14:46:03 -080046
47 def __init__(self, sched, pipeline):
48 self.sched = sched
49 self.pipeline = pipeline
50 self.event_filters = []
51 self.changeish_filters = []
52
53 def __str__(self):
54 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
55
56 def _postConfig(self, layout):
57 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
58 self.log.info(" Source: %s" % self.pipeline.source)
59 self.log.info(" Requirements:")
60 for f in self.changeish_filters:
61 self.log.info(" %s" % f)
62 self.log.info(" Events:")
63 for e in self.event_filters:
64 self.log.info(" %s" % e)
65 self.log.info(" Projects:")
66
67 def log_jobs(tree, indent=0):
68 istr = ' ' + ' ' * indent
69 if tree.job:
James E. Blairae76ac52017-02-02 10:03:30 -080070 # TODOv3(jeblair): represent matchers
James E. Blair83005782015-12-11 14:46:03 -080071 efilters = ''
James E. Blairae76ac52017-02-02 10:03:30 -080072 # for b in tree.job._branches:
73 # efilters += str(b)
74 # for f in tree.job._files:
75 # efilters += str(f)
76 # if tree.job.skip_if_matcher:
77 # efilters += str(tree.job.skip_if_matcher)
78 # if efilters:
79 # efilters = ' ' + efilters
Joshua Hesketh89b67f62016-02-11 21:22:14 +110080 tags = []
James E. Blair83005782015-12-11 14:46:03 -080081 if tree.job.hold_following_changes:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110082 tags.append('[hold]')
James E. Blair83005782015-12-11 14:46:03 -080083 if not tree.job.voting:
Joshua Hesketh89b67f62016-02-11 21:22:14 +110084 tags.append('[nonvoting]')
85 if tree.job.mutex:
86 tags.append('[mutex: %s]' % tree.job.mutex)
87 tags = ' '.join(tags)
88 self.log.info("%s%s%s %s" % (istr, repr(tree.job),
89 efilters, tags))
James E. Blair83005782015-12-11 14:46:03 -080090 for x in tree.job_trees:
91 log_jobs(x, indent + 2)
92
James E. Blairae76ac52017-02-02 10:03:30 -080093 for project_name in layout.project_configs.keys():
94 project = self.pipeline.source.getProject(project_name)
95 tree = self.pipeline.getJobTree(project)
James E. Blair83005782015-12-11 14:46:03 -080096 if tree:
James E. Blairae76ac52017-02-02 10:03:30 -080097 self.log.info(" %s" % project)
James E. Blair83005782015-12-11 14:46:03 -080098 log_jobs(tree)
99 self.log.info(" On start:")
100 self.log.info(" %s" % self.pipeline.start_actions)
101 self.log.info(" On success:")
102 self.log.info(" %s" % self.pipeline.success_actions)
103 self.log.info(" On failure:")
104 self.log.info(" %s" % self.pipeline.failure_actions)
105 self.log.info(" On merge-failure:")
106 self.log.info(" %s" % self.pipeline.merge_failure_actions)
107 self.log.info(" When disabled:")
108 self.log.info(" %s" % self.pipeline.disabled_actions)
109
110 def getSubmitAllowNeeds(self):
111 # Get a list of code review labels that are allowed to be
112 # "needed" in the submit records for a change, with respect
113 # to this queue. In other words, the list of review labels
114 # this queue itself is likely to set before submitting.
115 allow_needs = set()
116 for action_reporter in self.pipeline.success_actions:
117 allow_needs.update(action_reporter.getSubmitAllowNeeds())
118 return allow_needs
119
120 def eventMatches(self, event, change):
121 if event.forced_pipeline:
122 if event.forced_pipeline == self.pipeline.name:
123 self.log.debug("Event %s for change %s was directly assigned "
124 "to pipeline %s" % (event, change, self))
125 return True
126 else:
127 return False
128 for ef in self.event_filters:
129 if ef.matches(event, change):
130 self.log.debug("Event %s for change %s matched %s "
131 "in pipeline %s" % (event, change, ef, self))
132 return True
133 return False
134
135 def isChangeAlreadyInPipeline(self, change):
136 # Checks live items in the pipeline
137 for item in self.pipeline.getAllItems():
138 if item.live and change.equals(item.change):
139 return True
140 return False
141
142 def isChangeAlreadyInQueue(self, change, change_queue):
143 # Checks any item in the specified change queue
144 for item in change_queue.queue:
145 if change.equals(item.change):
146 return True
147 return False
148
149 def reportStart(self, item):
150 if not self.pipeline._disabled:
151 try:
152 self.log.info("Reporting start, action %s item %s" %
153 (self.pipeline.start_actions, item))
154 ret = self.sendReport(self.pipeline.start_actions,
155 self.pipeline.source, item)
156 if ret:
157 self.log.error("Reporting item start %s received: %s" %
158 (item, ret))
159 except:
160 self.log.exception("Exception while reporting start:")
161
162 def sendReport(self, action_reporters, source, item,
163 message=None):
164 """Sends the built message off to configured reporters.
165
166 Takes the action_reporters, item, message and extra options and
167 sends them to the pluggable reporters.
168 """
169 report_errors = []
170 if len(action_reporters) > 0:
171 for reporter in action_reporters:
172 ret = reporter.report(source, self.pipeline, item)
173 if ret:
174 report_errors.append(ret)
175 if len(report_errors) == 0:
176 return
177 return report_errors
178
179 def isChangeReadyToBeEnqueued(self, change):
180 return True
181
182 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
183 change_queue):
184 return True
185
186 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
187 change_queue):
188 return True
189
190 def checkForChangesNeededBy(self, change, change_queue):
191 return True
192
193 def getFailingDependentItems(self, item):
194 return None
195
196 def getDependentItems(self, item):
197 orig_item = item
198 items = []
199 while item.item_ahead:
200 items.append(item.item_ahead)
201 item = item.item_ahead
202 self.log.info("Change %s depends on changes %s" %
203 (orig_item.change,
204 [x.change for x in items]))
205 return items
206
207 def getItemForChange(self, change):
208 for item in self.pipeline.getAllItems():
209 if item.change.equals(change):
210 return item
211 return None
212
213 def findOldVersionOfChangeAlreadyInQueue(self, change):
214 for item in self.pipeline.getAllItems():
215 if not item.live:
216 continue
217 if change.isUpdateOf(item.change):
218 return item
219 return None
220
221 def removeOldVersionsOfChange(self, change):
222 if not self.pipeline.dequeue_on_new_patchset:
223 return
224 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
225 if old_item:
226 self.log.debug("Change %s is a new version of %s, removing %s" %
227 (change, old_item.change, old_item))
228 self.removeItem(old_item)
229
230 def removeAbandonedChange(self, change):
231 self.log.debug("Change %s abandoned, removing." % change)
232 for item in self.pipeline.getAllItems():
233 if not item.live:
234 continue
235 if item.change.equals(change):
236 self.removeItem(item)
237
238 def reEnqueueItem(self, item, last_head):
239 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
240 if change_queue:
241 self.log.debug("Re-enqueing change %s in queue %s" %
242 (item.change, change_queue))
243 change_queue.enqueueItem(item)
244
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700245 # Get an updated copy of the layout if necessary.
246 # This will return one of the following:
247 # 1) An existing layout from the item ahead or pipeline.
248 # 2) A newly created layout from the cached pipeline
249 # layout config plus the previously returned
250 # in-repo files stored in the buildset.
251 # 3) None in the case that a fetch of the files from
252 # the merger is still pending.
253 item.current_build_set.layout = self.getLayout(item)
254
255 # Rebuild the frozen job tree from the new layout, if
256 # we have one. If not, it will be built later.
257 if item.current_build_set.layout:
258 item.freezeJobTree()
259
James E. Blair83005782015-12-11 14:46:03 -0800260 # Re-set build results in case any new jobs have been
261 # added to the tree.
262 for build in item.current_build_set.getBuilds():
263 if build.result:
James E. Blairdbfd3282016-07-21 10:46:19 -0700264 item.setResult(build)
James E. Blair83005782015-12-11 14:46:03 -0800265 # Similarly, reset the item state.
266 if item.current_build_set.unable_to_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700267 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800268 if item.dequeued_needing_change:
James E. Blairdbfd3282016-07-21 10:46:19 -0700269 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800270
271 self.reportStats(item)
272 return True
273 else:
274 self.log.error("Unable to find change queue for project %s" %
275 item.change.project)
276 return False
277
278 def addChange(self, change, quiet=False, enqueue_time=None,
279 ignore_requirements=False, live=True,
280 change_queue=None):
281 self.log.debug("Considering adding change %s" % change)
282
283 # If we are adding a live change, check if it's a live item
284 # anywhere in the pipeline. Otherwise, we will perform the
285 # duplicate check below on the specific change_queue.
286 if live and self.isChangeAlreadyInPipeline(change):
287 self.log.debug("Change %s is already in pipeline, "
288 "ignoring" % change)
289 return True
290
291 if not self.isChangeReadyToBeEnqueued(change):
292 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
293 change)
294 return False
295
296 if not ignore_requirements:
297 for f in self.changeish_filters:
298 if not f.matches(change):
299 self.log.debug("Change %s does not match pipeline "
300 "requirement %s" % (change, f))
301 return False
302
303 with self.getChangeQueue(change, change_queue) as change_queue:
304 if not change_queue:
305 self.log.debug("Unable to find change queue for "
306 "change %s in project %s" %
307 (change, change.project))
308 return False
309
310 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
311 change_queue):
312 self.log.debug("Failed to enqueue changes "
313 "ahead of %s" % change)
314 return False
315
316 if self.isChangeAlreadyInQueue(change, change_queue):
317 self.log.debug("Change %s is already in queue, "
318 "ignoring" % change)
319 return True
320
321 self.log.debug("Adding change %s to queue %s" %
322 (change, change_queue))
323 item = change_queue.enqueueChange(change)
324 if enqueue_time:
325 item.enqueue_time = enqueue_time
326 item.live = live
327 self.reportStats(item)
328 if not quiet:
329 if len(self.pipeline.start_actions) > 0:
330 self.reportStart(item)
331 self.enqueueChangesBehind(change, quiet, ignore_requirements,
332 change_queue)
James E. Blaire511d2f2016-12-08 15:22:26 -0800333 zuul_driver = self.sched.connections.drivers['zuul']
334 tenant = self.pipeline.layout.tenant
335 zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800336 return True
337
338 def dequeueItem(self, item):
339 self.log.debug("Removing change %s from queue" % item.change)
340 item.queue.dequeueItem(item)
341
342 def removeItem(self, item):
343 # Remove an item from the queue, probably because it has been
344 # superseded by another change.
345 self.log.debug("Canceling builds behind change: %s "
346 "because it is being removed." % item.change)
347 self.cancelJobs(item)
348 self.dequeueItem(item)
349 self.reportStats(item)
350
James E. Blair8d692392016-04-08 17:47:58 -0700351 def provisionNodes(self, item):
James E. Blairdbfd3282016-07-21 10:46:19 -0700352 jobs = item.findJobsToRequest()
James E. Blair8d692392016-04-08 17:47:58 -0700353 if not jobs:
354 return False
355 build_set = item.current_build_set
356 self.log.debug("Requesting nodes for change %s" % item.change)
357 for job in jobs:
358 req = self.sched.nodepool.requestNodes(build_set, job)
359 self.log.debug("Adding node request %s for job %s to item %s" %
360 (req, job, item))
361 build_set.setJobNodeRequest(job.name, req)
362 return True
363
James E. Blair83005782015-12-11 14:46:03 -0800364 def _launchJobs(self, item, jobs):
365 self.log.debug("Launching jobs for change %s" % item.change)
366 dependent_items = self.getDependentItems(item)
367 for job in jobs:
368 self.log.debug("Found job %s for change %s" % (job, item.change))
369 try:
James E. Blaircacdf2b2017-01-04 13:14:37 -0800370 nodeset = item.current_build_set.getJobNodeSet(job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800371 self.sched.nodepool.useNodeSet(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800372 build = self.sched.launcher.launch(job, item,
373 self.pipeline,
374 dependent_items)
375 self.log.debug("Adding build %s of job %s to item %s" %
376 (build, job, item))
377 item.addBuild(build)
378 except:
379 self.log.exception("Exception while launching job %s "
380 "for change %s:" % (job, item.change))
381
382 def launchJobs(self, item):
383 # TODO(jeblair): This should return a value indicating a job
384 # was launched. Appears to be a longstanding bug.
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700385 if not item.current_build_set.layout:
386 return False
387
James E. Blairdbfd3282016-07-21 10:46:19 -0700388 jobs = item.findJobsToRun(self.sched.mutex)
James E. Blair83005782015-12-11 14:46:03 -0800389 if jobs:
390 self._launchJobs(item, jobs)
391
392 def cancelJobs(self, item, prime=True):
393 self.log.debug("Cancel jobs for change %s" % item.change)
394 canceled = False
395 old_build_set = item.current_build_set
396 if prime and item.current_build_set.ref:
397 item.resetAllBuilds()
James E. Blair8d692392016-04-08 17:47:58 -0700398 for req in old_build_set.node_requests.values():
399 self.sched.nodepool.cancelRequest(req)
400 old_build_set.node_requests = {}
James E. Blaire18d4602017-01-05 11:17:28 -0800401 canceled_jobs = set()
James E. Blair83005782015-12-11 14:46:03 -0800402 for build in old_build_set.getBuilds():
James E. Blair98095cf2017-02-02 14:13:03 -0800403 if build.result:
404 canceled_jobs.add(build.job.name)
405 continue
James E. Blair01695c32017-01-04 17:29:25 -0800406 was_running = False
James E. Blair83005782015-12-11 14:46:03 -0800407 try:
James E. Blair01695c32017-01-04 17:29:25 -0800408 was_running = self.sched.launcher.cancel(build)
James E. Blair83005782015-12-11 14:46:03 -0800409 except:
410 self.log.exception("Exception while canceling build %s "
411 "for change %s" % (build, item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100412 finally:
413 self.sched.mutex.release(build.build_set.item, build.job)
414
James E. Blair01695c32017-01-04 17:29:25 -0800415 if not was_running:
416 try:
417 nodeset = build.build_set.getJobNodeSet(build.job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800418 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair01695c32017-01-04 17:29:25 -0800419 except Exception:
420 self.log.exception("Unable to return nodeset %s for "
421 "canceled build request %s" %
422 (nodeset, build))
James E. Blair83005782015-12-11 14:46:03 -0800423 build.result = 'CANCELED'
424 canceled = True
James E. Blaire18d4602017-01-05 11:17:28 -0800425 canceled_jobs.add(build.job.name)
426 for jobname, nodeset in old_build_set.nodesets.items()[:]:
427 if jobname in canceled_jobs:
428 continue
James E. Blair1511bc32017-01-18 09:25:31 -0800429 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800430 for item_behind in item.items_behind:
431 self.log.debug("Canceling jobs for change %s, behind change %s" %
432 (item_behind.change, item.change))
433 if self.cancelJobs(item_behind, prime=prime):
434 canceled = True
435 return canceled
436
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700437 def _makeMergerItem(self, item):
438 # Create a dictionary with all info about the item needed by
439 # the merger.
440 number = None
441 patchset = None
442 oldrev = None
443 newrev = None
444 if hasattr(item.change, 'number'):
445 number = item.change.number
446 patchset = item.change.patchset
447 elif hasattr(item.change, 'newrev'):
448 oldrev = item.change.oldrev
449 newrev = item.change.newrev
450 connection_name = self.pipeline.source.connection.connection_name
Adam Gandelman8bd57102016-12-02 12:58:42 -0800451
452 project = item.change.project.name
453 return dict(project=project,
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700454 url=self.pipeline.source.getGitUrl(
455 item.change.project),
456 connection_name=connection_name,
Adam Gandelman8bd57102016-12-02 12:58:42 -0800457 merge_mode=item.current_build_set.getMergeMode(project),
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700458 refspec=item.change.refspec,
459 branch=item.change.branch,
460 ref=item.current_build_set.ref,
461 number=number,
462 patchset=patchset,
463 oldrev=oldrev,
464 newrev=newrev,
465 )
466
467 def getLayout(self, item):
468 if not item.change.updatesConfig():
469 if item.item_ahead:
470 return item.item_ahead.current_build_set.layout
471 else:
472 return item.queue.pipeline.layout
473 # This item updates the config, ask the merger for the result.
474 build_set = item.current_build_set
475 if build_set.merge_state == build_set.PENDING:
476 return None
477 if build_set.merge_state == build_set.COMPLETE:
478 if build_set.unable_to_merge:
479 return None
480 # Load layout
Monty Taylor82dfd412016-07-29 12:01:28 -0700481 # Late import to break an import loop
482 import zuul.configloader
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700483 loader = zuul.configloader.ConfigLoader()
484 self.log.debug("Load dynamic layout with %s" % build_set.files)
485 layout = loader.createDynamicLayout(item.pipeline.layout.tenant,
486 build_set.files)
487 return layout
488 build_set.merge_state = build_set.PENDING
489 self.log.debug("Preparing dynamic layout for: %s" % item.change)
490 dependent_items = self.getDependentItems(item)
491 dependent_items.reverse()
492 all_items = dependent_items + [item]
493 merger_items = map(self._makeMergerItem, all_items)
494 self.sched.merger.mergeChanges(merger_items,
495 item.current_build_set,
496 ['.zuul.yaml'],
497 self.pipeline.precedence)
498
499 def prepareLayout(self, item):
500 # Get a copy of the layout in the context of the current
501 # queue.
502 # Returns True if the ref is ready, false otherwise
503 if not item.current_build_set.ref:
504 item.current_build_set.setConfiguration()
505 if not item.current_build_set.layout:
506 item.current_build_set.layout = self.getLayout(item)
507 if not item.current_build_set.layout:
508 return False
509 if not item.job_tree:
510 item.freezeJobTree()
511 return True
512
James E. Blair83005782015-12-11 14:46:03 -0800513 def _processOneItem(self, item, nnfi):
514 changed = False
515 item_ahead = item.item_ahead
516 if item_ahead and (not item_ahead.live):
517 item_ahead = None
518 change_queue = item.queue
519 failing_reasons = [] # Reasons this item is failing
520
521 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
522 # It's not okay to enqueue this change, we should remove it.
523 self.log.info("Dequeuing change %s because "
524 "it can no longer merge" % item.change)
525 self.cancelJobs(item)
526 self.dequeueItem(item)
James E. Blairdbfd3282016-07-21 10:46:19 -0700527 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800528 if item.live:
529 try:
530 self.reportItem(item)
531 except exceptions.MergeFailure:
532 pass
533 return (True, nnfi)
534 dep_items = self.getFailingDependentItems(item)
535 actionable = change_queue.isActionable(item)
536 item.active = actionable
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700537 ready = False
James E. Blair83005782015-12-11 14:46:03 -0800538 if dep_items:
539 failing_reasons.append('a needed change is failing')
540 self.cancelJobs(item, prime=False)
541 else:
542 item_ahead_merged = False
543 if (item_ahead and item_ahead.change.is_merged):
544 item_ahead_merged = True
545 if (item_ahead != nnfi and not item_ahead_merged):
546 # Our current base is different than what we expected,
547 # and it's not because our current base merged. Something
548 # ahead must have failed.
549 self.log.info("Resetting builds for change %s because the "
550 "item ahead, %s, is not the nearest non-failing "
551 "item, %s" % (item.change, item_ahead, nnfi))
552 change_queue.moveItem(item, nnfi)
553 changed = True
554 self.cancelJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700555 if actionable:
556 ready = self.prepareLayout(item)
557 if item.current_build_set.unable_to_merge:
558 failing_reasons.append("it has a merge conflict")
559 if ready and self.provisionNodes(item):
560 changed = True
561 if actionable and ready and self.launchJobs(item):
562 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700563 if item.didAnyJobFail():
James E. Blair83005782015-12-11 14:46:03 -0800564 failing_reasons.append("at least one job failed")
565 if (not item.live) and (not item.items_behind):
566 failing_reasons.append("is a non-live item with no items behind")
567 self.dequeueItem(item)
568 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700569 if ((not item_ahead) and item.areAllJobsComplete() and item.live):
James E. Blair83005782015-12-11 14:46:03 -0800570 try:
571 self.reportItem(item)
572 except exceptions.MergeFailure:
573 failing_reasons.append("it did not merge")
574 for item_behind in item.items_behind:
575 self.log.info("Resetting builds for change %s because the "
576 "item ahead, %s, failed to merge" %
577 (item_behind.change, item))
578 self.cancelJobs(item_behind)
579 self.dequeueItem(item)
580 changed = True
581 elif not failing_reasons and item.live:
582 nnfi = item
583 item.current_build_set.failing_reasons = failing_reasons
584 if failing_reasons:
585 self.log.debug("%s is a failing item because %s" %
586 (item, failing_reasons))
587 return (changed, nnfi)
588
589 def processQueue(self):
590 # Do whatever needs to be done for each change in the queue
591 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
592 changed = False
593 for queue in self.pipeline.queues:
594 queue_changed = False
595 nnfi = None # Nearest non-failing item
596 for item in queue.queue[:]:
597 item_changed, nnfi = self._processOneItem(
598 item, nnfi)
599 if item_changed:
600 queue_changed = True
601 self.reportStats(item)
602 if queue_changed:
603 changed = True
604 status = ''
605 for item in queue.queue:
606 status += item.formatStatus()
607 if status:
608 self.log.debug("Queue %s status is now:\n %s" %
609 (queue.name, status))
610 self.log.debug("Finished queue processor: %s (changed: %s)" %
611 (self.pipeline.name, changed))
612 return changed
613
James E. Blair83005782015-12-11 14:46:03 -0800614 def onBuildStarted(self, build):
615 self.log.debug("Build %s started" % build)
616 return True
617
618 def onBuildCompleted(self, build):
619 self.log.debug("Build %s completed" % build)
620 item = build.build_set.item
621
James E. Blairdbfd3282016-07-21 10:46:19 -0700622 item.setResult(build)
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100623 self.sched.mutex.release(item, build.job)
James E. Blair83005782015-12-11 14:46:03 -0800624 self.log.debug("Item %s status is now:\n %s" %
625 (item, item.formatStatus()))
James E. Blair62295d32017-01-04 13:27:58 -0800626
James E. Blaire18d4602017-01-05 11:17:28 -0800627 if build.retry:
628 build.build_set.removeJobNodeSet(build.job.name)
629
James E. Blair83005782015-12-11 14:46:03 -0800630 return True
631
632 def onMergeCompleted(self, event):
633 build_set = event.build_set
634 item = build_set.item
635 build_set.merge_state = build_set.COMPLETE
636 build_set.zuul_url = event.zuul_url
637 if event.merged:
638 build_set.commit = event.commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700639 build_set.files.setFiles(event.files)
James E. Blair83005782015-12-11 14:46:03 -0800640 elif event.updated:
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100641 if not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800642 build_set.commit = item.change.newrev
Joshua Hesketh89b67f62016-02-11 21:22:14 +1100643 if not build_set.commit and not isinstance(item.change, NullChange):
James E. Blair83005782015-12-11 14:46:03 -0800644 self.log.info("Unable to merge change %s" % item.change)
James E. Blairdbfd3282016-07-21 10:46:19 -0700645 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800646
James E. Blair8d692392016-04-08 17:47:58 -0700647 def onNodesProvisioned(self, event):
James E. Blaira38c28e2017-01-04 10:33:20 -0800648 # TODOv3(jeblair): handle provisioning failure here
James E. Blair8d692392016-04-08 17:47:58 -0700649 request = event.request
650 build_set = request.build_set
651 build_set.jobNodeRequestComplete(request.job.name, request,
James E. Blair0eaad552016-09-02 12:09:54 -0700652 request.nodeset)
James E. Blair6ab79e02017-01-06 10:10:17 -0800653 if request.failed or not request.fulfilled:
654 self.log.info("Node request failure for %s" %
655 (request.job.name,))
656 build_set.item.setNodeRequestFailure(request.job)
James E. Blair34776ee2016-08-25 13:53:54 -0700657 self.log.info("Completed node request %s for job %s of item %s "
658 "with nodes %s" %
659 (request, request.job, build_set.item,
James E. Blair0eaad552016-09-02 12:09:54 -0700660 request.nodeset))
James E. Blair8d692392016-04-08 17:47:58 -0700661
James E. Blair83005782015-12-11 14:46:03 -0800662 def reportItem(self, item):
663 if not item.reported:
664 # _reportItem() returns True if it failed to report.
665 item.reported = not self._reportItem(item)
666 if self.changes_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700667 succeeded = item.didAllJobsSucceed()
James E. Blair83005782015-12-11 14:46:03 -0800668 merged = item.reported
669 if merged:
670 merged = self.pipeline.source.isMerged(item.change,
671 item.change.branch)
672 self.log.info("Reported change %s status: all-succeeded: %s, "
673 "merged: %s" % (item.change, succeeded, merged))
674 change_queue = item.queue
675 if not (succeeded and merged):
676 self.log.debug("Reported change %s failed tests or failed "
677 "to merge" % (item.change))
678 change_queue.decreaseWindowSize()
679 self.log.debug("%s window size decreased to %s" %
680 (change_queue, change_queue.window))
681 raise exceptions.MergeFailure(
682 "Change %s failed to merge" % item.change)
683 else:
684 change_queue.increaseWindowSize()
685 self.log.debug("%s window size increased to %s" %
686 (change_queue, change_queue.window))
687
James E. Blaire511d2f2016-12-08 15:22:26 -0800688 zuul_driver = self.sched.connections.drivers['zuul']
689 tenant = self.pipeline.layout.tenant
690 zuul_driver.onChangeMerged(tenant, item.change,
691 self.pipeline.source)
James E. Blair83005782015-12-11 14:46:03 -0800692
693 def _reportItem(self, item):
694 self.log.debug("Reporting change %s" % item.change)
695 ret = True # Means error as returned by trigger.report
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700696 if not item.getJobs():
James E. Blair83005782015-12-11 14:46:03 -0800697 # We don't send empty reports with +1,
698 # and the same for -1's (merge failures or transient errors)
699 # as they cannot be followed by +1's
700 self.log.debug("No jobs for change %s" % item.change)
701 actions = []
James E. Blairdbfd3282016-07-21 10:46:19 -0700702 elif item.didAllJobsSucceed():
James E. Blair83005782015-12-11 14:46:03 -0800703 self.log.debug("success %s" % (self.pipeline.success_actions))
704 actions = self.pipeline.success_actions
705 item.setReportedResult('SUCCESS')
706 self.pipeline._consecutive_failures = 0
James E. Blairdbfd3282016-07-21 10:46:19 -0700707 elif item.didMergerFail():
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700708 actions = self.pipeline.merge_failure_actions
709 item.setReportedResult('MERGER_FAILURE')
James E. Blair83005782015-12-11 14:46:03 -0800710 else:
711 actions = self.pipeline.failure_actions
712 item.setReportedResult('FAILURE')
713 self.pipeline._consecutive_failures += 1
714 if self.pipeline._disabled:
715 actions = self.pipeline.disabled_actions
716 # Check here if we should disable so that we only use the disabled
717 # reporters /after/ the last disable_at failure is still reported as
718 # normal.
719 if (self.pipeline.disable_at and not self.pipeline._disabled and
720 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
721 self.pipeline._disabled = True
722 if actions:
723 try:
724 self.log.info("Reporting item %s, actions: %s" %
725 (item, actions))
726 ret = self.sendReport(actions, self.pipeline.source, item)
727 if ret:
728 self.log.error("Reporting item %s received: %s" %
729 (item, ret))
730 except:
731 self.log.exception("Exception while reporting:")
732 item.setReportedResult('ERROR')
James E. Blair83005782015-12-11 14:46:03 -0800733 return ret
734
735 def reportStats(self, item):
James E. Blair552b54f2016-07-22 13:55:32 -0700736 if not self.sched.statsd:
James E. Blair83005782015-12-11 14:46:03 -0800737 return
738 try:
739 # Update the gauge on enqueue and dequeue, but timers only
740 # when dequeing.
741 if item.dequeue_time:
742 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
743 else:
744 dt = None
745 items = len(self.pipeline.getAllItems())
746
747 # stats.timers.zuul.pipeline.NAME.resident_time
748 # stats_counts.zuul.pipeline.NAME.total_changes
749 # stats.gauges.zuul.pipeline.NAME.current_changes
750 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700751 self.sched.statsd.gauge(key + '.current_changes', items)
James E. Blair83005782015-12-11 14:46:03 -0800752 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700753 self.sched.statsd.timing(key + '.resident_time', dt)
754 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800755
756 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
757 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
758 project_name = item.change.project.name.replace('/', '.')
759 key += '.%s' % project_name
760 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700761 self.sched.statsd.timing(key + '.resident_time', dt)
762 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800763 except:
764 self.log.exception("Exception reporting pipeline stats")