blob: bcda89fdc09cc7d51cc0c8745ed9a12c5566b396 [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013import logging
14
15from zuul import exceptions
James E. Blair83005782015-12-11 14:46:03 -080016
James E. Blair83005782015-12-11 14:46:03 -080017
18class DynamicChangeQueueContextManager(object):
19 def __init__(self, change_queue):
20 self.change_queue = change_queue
21
22 def __enter__(self):
23 return self.change_queue
24
25 def __exit__(self, etype, value, tb):
26 if self.change_queue and not self.change_queue.queue:
27 self.change_queue.pipeline.removeQueue(self.change_queue)
28
29
30class StaticChangeQueueContextManager(object):
31 def __init__(self, change_queue):
32 self.change_queue = change_queue
33
34 def __enter__(self):
35 return self.change_queue
36
37 def __exit__(self, etype, value, tb):
38 pass
39
40
Monty Taylorc75478c2016-07-29 12:04:21 -070041class PipelineManager(object):
Monty Taylor82dfd412016-07-29 12:01:28 -070042 """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
43
Monty Taylorc75478c2016-07-29 12:04:21 -070044 log = logging.getLogger("zuul.PipelineManager")
James E. Blair83005782015-12-11 14:46:03 -080045
46 def __init__(self, sched, pipeline):
47 self.sched = sched
48 self.pipeline = pipeline
49 self.event_filters = []
50 self.changeish_filters = []
51
52 def __str__(self):
53 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
54
55 def _postConfig(self, layout):
56 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blair83005782015-12-11 14:46:03 -080057 self.log.info(" Requirements:")
58 for f in self.changeish_filters:
59 self.log.info(" %s" % f)
60 self.log.info(" Events:")
61 for e in self.event_filters:
62 self.log.info(" %s" % e)
63 self.log.info(" Projects:")
64
Fredrik Medleyf8aec832015-09-28 13:40:20 +020065 def log_jobs(job_list):
66 for job_name, job_variants in job_list.jobs.items():
67 for variant in job_variants:
68 # TODOv3(jeblair): represent matchers
69 efilters = ''
70 # for b in tree.job._branches:
71 # efilters += str(b)
72 # for f in tree.job._files:
73 # efilters += str(f)
74 # if tree.job.skip_if_matcher:
75 # efilters += str(tree.job.skip_if_matcher)
76 # if efilters:
77 # efilters = ' ' + efilters
78 tags = []
79 if variant.hold_following_changes:
80 tags.append('[hold]')
81 if not variant.voting:
82 tags.append('[nonvoting]')
Tobias Henkel9a0e1942017-03-20 16:16:02 +010083 if variant.semaphore:
84 tags.append('[semaphore: %s]' % variant.semaphore)
Fredrik Medleyf8aec832015-09-28 13:40:20 +020085 tags = ' '.join(tags)
86 self.log.info(" %s%s %s" % (repr(variant),
87 efilters, tags))
James E. Blair83005782015-12-11 14:46:03 -080088
James E. Blairae76ac52017-02-02 10:03:30 -080089 for project_name in layout.project_configs.keys():
James E. Blairf59f3cf2017-02-19 14:50:26 -080090 project_config = layout.project_configs.get(project_name)
91 if project_config:
92 project_pipeline_config = project_config.pipelines.get(
93 self.pipeline.name)
94 if project_pipeline_config:
95 self.log.info(" %s" % project_name)
Fredrik Medleyf8aec832015-09-28 13:40:20 +020096 log_jobs(project_pipeline_config.job_list)
James E. Blair83005782015-12-11 14:46:03 -080097 self.log.info(" On start:")
98 self.log.info(" %s" % self.pipeline.start_actions)
99 self.log.info(" On success:")
100 self.log.info(" %s" % self.pipeline.success_actions)
101 self.log.info(" On failure:")
102 self.log.info(" %s" % self.pipeline.failure_actions)
103 self.log.info(" On merge-failure:")
104 self.log.info(" %s" % self.pipeline.merge_failure_actions)
105 self.log.info(" When disabled:")
106 self.log.info(" %s" % self.pipeline.disabled_actions)
107
108 def getSubmitAllowNeeds(self):
109 # Get a list of code review labels that are allowed to be
110 # "needed" in the submit records for a change, with respect
111 # to this queue. In other words, the list of review labels
112 # this queue itself is likely to set before submitting.
113 allow_needs = set()
114 for action_reporter in self.pipeline.success_actions:
115 allow_needs.update(action_reporter.getSubmitAllowNeeds())
116 return allow_needs
117
118 def eventMatches(self, event, change):
119 if event.forced_pipeline:
120 if event.forced_pipeline == self.pipeline.name:
121 self.log.debug("Event %s for change %s was directly assigned "
122 "to pipeline %s" % (event, change, self))
123 return True
124 else:
125 return False
126 for ef in self.event_filters:
127 if ef.matches(event, change):
128 self.log.debug("Event %s for change %s matched %s "
129 "in pipeline %s" % (event, change, ef, self))
130 return True
131 return False
132
133 def isChangeAlreadyInPipeline(self, change):
134 # Checks live items in the pipeline
135 for item in self.pipeline.getAllItems():
136 if item.live and change.equals(item.change):
137 return True
138 return False
139
140 def isChangeAlreadyInQueue(self, change, change_queue):
141 # Checks any item in the specified change queue
142 for item in change_queue.queue:
143 if change.equals(item.change):
144 return True
145 return False
146
147 def reportStart(self, item):
148 if not self.pipeline._disabled:
James E. Blair6053de42017-04-05 11:27:11 -0700149 source = item.change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800150 try:
151 self.log.info("Reporting start, action %s item %s" %
152 (self.pipeline.start_actions, item))
153 ret = self.sendReport(self.pipeline.start_actions,
James E. Blair6053de42017-04-05 11:27:11 -0700154 source, item)
James E. Blair83005782015-12-11 14:46:03 -0800155 if ret:
156 self.log.error("Reporting item start %s received: %s" %
157 (item, ret))
158 except:
159 self.log.exception("Exception while reporting start:")
160
161 def sendReport(self, action_reporters, source, item,
162 message=None):
163 """Sends the built message off to configured reporters.
164
165 Takes the action_reporters, item, message and extra options and
166 sends them to the pluggable reporters.
167 """
168 report_errors = []
169 if len(action_reporters) > 0:
170 for reporter in action_reporters:
171 ret = reporter.report(source, self.pipeline, item)
172 if ret:
173 report_errors.append(ret)
174 if len(report_errors) == 0:
175 return
176 return report_errors
177
178 def isChangeReadyToBeEnqueued(self, change):
179 return True
180
181 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200182 change_queue, history=None):
James E. Blair83005782015-12-11 14:46:03 -0800183 return True
184
185 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
186 change_queue):
187 return True
188
189 def checkForChangesNeededBy(self, change, change_queue):
190 return True
191
192 def getFailingDependentItems(self, item):
193 return None
194
195 def getDependentItems(self, item):
196 orig_item = item
197 items = []
198 while item.item_ahead:
199 items.append(item.item_ahead)
200 item = item.item_ahead
201 self.log.info("Change %s depends on changes %s" %
202 (orig_item.change,
203 [x.change for x in items]))
204 return items
205
206 def getItemForChange(self, change):
207 for item in self.pipeline.getAllItems():
208 if item.change.equals(change):
209 return item
210 return None
211
212 def findOldVersionOfChangeAlreadyInQueue(self, change):
213 for item in self.pipeline.getAllItems():
214 if not item.live:
215 continue
216 if change.isUpdateOf(item.change):
217 return item
218 return None
219
220 def removeOldVersionsOfChange(self, change):
221 if not self.pipeline.dequeue_on_new_patchset:
222 return
223 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
224 if old_item:
225 self.log.debug("Change %s is a new version of %s, removing %s" %
226 (change, old_item.change, old_item))
227 self.removeItem(old_item)
228
229 def removeAbandonedChange(self, change):
230 self.log.debug("Change %s abandoned, removing." % change)
231 for item in self.pipeline.getAllItems():
232 if not item.live:
233 continue
234 if item.change.equals(change):
235 self.removeItem(item)
236
237 def reEnqueueItem(self, item, last_head):
238 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
239 if change_queue:
240 self.log.debug("Re-enqueing change %s in queue %s" %
241 (item.change, change_queue))
242 change_queue.enqueueItem(item)
243
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700244 # Get an updated copy of the layout if necessary.
245 # This will return one of the following:
246 # 1) An existing layout from the item ahead or pipeline.
247 # 2) A newly created layout from the cached pipeline
248 # layout config plus the previously returned
249 # in-repo files stored in the buildset.
250 # 3) None in the case that a fetch of the files from
251 # the merger is still pending.
252 item.current_build_set.layout = self.getLayout(item)
253
254 # Rebuild the frozen job tree from the new layout, if
255 # we have one. If not, it will be built later.
256 if item.current_build_set.layout:
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200257 item.freezeJobGraph()
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700258
James E. Blair83005782015-12-11 14:46:03 -0800259 # Re-set build results in case any new jobs have been
260 # added to the tree.
261 for build in item.current_build_set.getBuilds():
262 if build.result:
James E. Blairdbfd3282016-07-21 10:46:19 -0700263 item.setResult(build)
James E. Blair83005782015-12-11 14:46:03 -0800264 # Similarly, reset the item state.
265 if item.current_build_set.unable_to_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700266 item.setUnableToMerge()
James E. Blaire53250c2017-03-01 14:34:36 -0800267 if item.current_build_set.config_error:
268 item.setConfigError(item.current_build_set.config_error)
James E. Blair83005782015-12-11 14:46:03 -0800269 if item.dequeued_needing_change:
James E. Blairdbfd3282016-07-21 10:46:19 -0700270 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800271
272 self.reportStats(item)
273 return True
274 else:
275 self.log.error("Unable to find change queue for project %s" %
276 item.change.project)
277 return False
278
279 def addChange(self, change, quiet=False, enqueue_time=None,
280 ignore_requirements=False, live=True,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200281 change_queue=None, history=None):
James E. Blair83005782015-12-11 14:46:03 -0800282 self.log.debug("Considering adding change %s" % change)
283
284 # If we are adding a live change, check if it's a live item
285 # anywhere in the pipeline. Otherwise, we will perform the
286 # duplicate check below on the specific change_queue.
287 if live and self.isChangeAlreadyInPipeline(change):
288 self.log.debug("Change %s is already in pipeline, "
289 "ignoring" % change)
290 return True
291
292 if not self.isChangeReadyToBeEnqueued(change):
293 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
294 change)
295 return False
296
297 if not ignore_requirements:
298 for f in self.changeish_filters:
299 if not f.matches(change):
300 self.log.debug("Change %s does not match pipeline "
301 "requirement %s" % (change, f))
302 return False
303
304 with self.getChangeQueue(change, change_queue) as change_queue:
305 if not change_queue:
306 self.log.debug("Unable to find change queue for "
307 "change %s in project %s" %
308 (change, change.project))
309 return False
310
311 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200312 change_queue, history=history):
James E. Blair83005782015-12-11 14:46:03 -0800313 self.log.debug("Failed to enqueue changes "
314 "ahead of %s" % change)
315 return False
316
317 if self.isChangeAlreadyInQueue(change, change_queue):
318 self.log.debug("Change %s is already in queue, "
319 "ignoring" % change)
320 return True
321
James E. Blair487000f2017-02-14 14:54:32 -0800322 self.log.info("Adding change %s to queue %s in %s" %
323 (change, change_queue, self.pipeline))
James E. Blair83005782015-12-11 14:46:03 -0800324 item = change_queue.enqueueChange(change)
325 if enqueue_time:
326 item.enqueue_time = enqueue_time
327 item.live = live
328 self.reportStats(item)
329 if not quiet:
330 if len(self.pipeline.start_actions) > 0:
331 self.reportStart(item)
332 self.enqueueChangesBehind(change, quiet, ignore_requirements,
333 change_queue)
James E. Blaire511d2f2016-12-08 15:22:26 -0800334 zuul_driver = self.sched.connections.drivers['zuul']
335 tenant = self.pipeline.layout.tenant
336 zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800337 return True
338
339 def dequeueItem(self, item):
340 self.log.debug("Removing change %s from queue" % item.change)
341 item.queue.dequeueItem(item)
342
343 def removeItem(self, item):
344 # Remove an item from the queue, probably because it has been
345 # superseded by another change.
346 self.log.debug("Canceling builds behind change: %s "
347 "because it is being removed." % item.change)
348 self.cancelJobs(item)
349 self.dequeueItem(item)
350 self.reportStats(item)
351
James E. Blair8d692392016-04-08 17:47:58 -0700352 def provisionNodes(self, item):
James E. Blairdbfd3282016-07-21 10:46:19 -0700353 jobs = item.findJobsToRequest()
James E. Blair8d692392016-04-08 17:47:58 -0700354 if not jobs:
355 return False
356 build_set = item.current_build_set
357 self.log.debug("Requesting nodes for change %s" % item.change)
358 for job in jobs:
359 req = self.sched.nodepool.requestNodes(build_set, job)
360 self.log.debug("Adding node request %s for job %s to item %s" %
361 (req, job, item))
362 build_set.setJobNodeRequest(job.name, req)
363 return True
364
Paul Belanger174a8272017-03-14 13:20:10 -0400365 def _executeJobs(self, item, jobs):
366 self.log.debug("Executing jobs for change %s" % item.change)
James E. Blair83005782015-12-11 14:46:03 -0800367 dependent_items = self.getDependentItems(item)
368 for job in jobs:
369 self.log.debug("Found job %s for change %s" % (job, item.change))
370 try:
James E. Blaircacdf2b2017-01-04 13:14:37 -0800371 nodeset = item.current_build_set.getJobNodeSet(job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800372 self.sched.nodepool.useNodeSet(nodeset)
Paul Belanger174a8272017-03-14 13:20:10 -0400373 build = self.sched.executor.execute(job, item,
374 self.pipeline,
375 dependent_items)
James E. Blair83005782015-12-11 14:46:03 -0800376 self.log.debug("Adding build %s of job %s to item %s" %
377 (build, job, item))
378 item.addBuild(build)
379 except:
Paul Belanger174a8272017-03-14 13:20:10 -0400380 self.log.exception("Exception while executing job %s "
James E. Blair83005782015-12-11 14:46:03 -0800381 "for change %s:" % (job, item.change))
382
Paul Belanger174a8272017-03-14 13:20:10 -0400383 def executeJobs(self, item):
James E. Blair83005782015-12-11 14:46:03 -0800384 # TODO(jeblair): This should return a value indicating a job
Paul Belanger174a8272017-03-14 13:20:10 -0400385 # was executed. Appears to be a longstanding bug.
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700386 if not item.current_build_set.layout:
387 return False
388
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100389 jobs = item.findJobsToRun(
390 item.pipeline.layout.tenant.semaphore_handler)
James E. Blair83005782015-12-11 14:46:03 -0800391 if jobs:
Paul Belanger174a8272017-03-14 13:20:10 -0400392 self._executeJobs(item, jobs)
James E. Blair83005782015-12-11 14:46:03 -0800393
394 def cancelJobs(self, item, prime=True):
395 self.log.debug("Cancel jobs for change %s" % item.change)
396 canceled = False
397 old_build_set = item.current_build_set
398 if prime and item.current_build_set.ref:
399 item.resetAllBuilds()
James E. Blair8d692392016-04-08 17:47:58 -0700400 for req in old_build_set.node_requests.values():
401 self.sched.nodepool.cancelRequest(req)
402 old_build_set.node_requests = {}
James E. Blaire18d4602017-01-05 11:17:28 -0800403 canceled_jobs = set()
James E. Blair83005782015-12-11 14:46:03 -0800404 for build in old_build_set.getBuilds():
James E. Blair98095cf2017-02-02 14:13:03 -0800405 if build.result:
406 canceled_jobs.add(build.job.name)
407 continue
James E. Blair01695c32017-01-04 17:29:25 -0800408 was_running = False
James E. Blair83005782015-12-11 14:46:03 -0800409 try:
Paul Belanger174a8272017-03-14 13:20:10 -0400410 was_running = self.sched.executor.cancel(build)
James E. Blair83005782015-12-11 14:46:03 -0800411 except:
412 self.log.exception("Exception while canceling build %s "
413 "for change %s" % (build, item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100414 finally:
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100415 old_build_set.layout.tenant.semaphore_handler.release(
416 old_build_set.item, build.job)
Tobias Henkelfb91a492017-02-15 07:29:43 +0100417
James E. Blair01695c32017-01-04 17:29:25 -0800418 if not was_running:
419 try:
420 nodeset = build.build_set.getJobNodeSet(build.job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800421 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair01695c32017-01-04 17:29:25 -0800422 except Exception:
423 self.log.exception("Unable to return nodeset %s for "
424 "canceled build request %s" %
425 (nodeset, build))
James E. Blair83005782015-12-11 14:46:03 -0800426 build.result = 'CANCELED'
427 canceled = True
James E. Blaire18d4602017-01-05 11:17:28 -0800428 canceled_jobs.add(build.job.name)
429 for jobname, nodeset in old_build_set.nodesets.items()[:]:
430 if jobname in canceled_jobs:
431 continue
James E. Blair1511bc32017-01-18 09:25:31 -0800432 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800433 for item_behind in item.items_behind:
434 self.log.debug("Canceling jobs for change %s, behind change %s" %
435 (item_behind.change, item.change))
436 if self.cancelJobs(item_behind, prime=prime):
437 canceled = True
438 return canceled
439
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700440 def _makeMergerItem(self, item):
441 # Create a dictionary with all info about the item needed by
442 # the merger.
443 number = None
444 patchset = None
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700445 refspec = None
446 branch = None
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700447 oldrev = None
448 newrev = None
449 if hasattr(item.change, 'number'):
450 number = item.change.number
451 patchset = item.change.patchset
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700452 refspec = item.change.refspec
453 branch = item.change.branch
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700454 elif hasattr(item.change, 'newrev'):
455 oldrev = item.change.oldrev
456 newrev = item.change.newrev
James E. Blair6053de42017-04-05 11:27:11 -0700457 source = item.change.project.source
458 connection_name = source.connection.connection_name
Adam Gandelman8bd57102016-12-02 12:58:42 -0800459
460 project = item.change.project.name
461 return dict(project=project,
James E. Blair6053de42017-04-05 11:27:11 -0700462 url=source.getGitUrl(item.change.project),
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700463 connection_name=connection_name,
James E. Blair0ffa0102017-03-30 13:11:33 -0700464 merge_mode=item.current_build_set.getMergeMode(),
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700465 refspec=refspec,
466 branch=branch,
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700467 ref=item.current_build_set.ref,
468 number=number,
469 patchset=patchset,
470 oldrev=oldrev,
471 newrev=newrev,
472 )
473
James E. Blair149b69c2017-03-02 10:48:16 -0800474 def _loadDynamicLayout(self, item):
475 # Load layout
476 # Late import to break an import loop
477 import zuul.configloader
478 loader = zuul.configloader.ConfigLoader()
479
480 build_set = item.current_build_set
481 self.log.debug("Load dynamic layout with %s" % build_set.files)
482 try:
Monty Taylor4eeb9b12017-03-06 12:43:16 -0600483 # First parse the config as it will land with the
James E. Blair149b69c2017-03-02 10:48:16 -0800484 # full set of config and project repos. This lets us
485 # catch syntax errors in config repos even though we won't
486 # actually run with that config.
487 loader.createDynamicLayout(
488 item.pipeline.layout.tenant,
489 build_set.files,
James E. Blair109da3f2017-04-04 14:39:43 -0700490 include_config_projects=True)
James E. Blair149b69c2017-03-02 10:48:16 -0800491
492 # Then create the config a second time but without changes
493 # to config repos so that we actually use this config.
494 layout = loader.createDynamicLayout(
495 item.pipeline.layout.tenant,
496 build_set.files,
James E. Blair109da3f2017-04-04 14:39:43 -0700497 include_config_projects=False)
James E. Blair149b69c2017-03-02 10:48:16 -0800498 except zuul.configloader.ConfigurationSyntaxError as e:
499 self.log.info("Configuration syntax error "
500 "in dynamic layout %s" %
501 build_set.files)
502 item.setConfigError(str(e))
503 return None
504 except Exception:
505 self.log.exception("Error in dynamic layout %s" %
506 build_set.files)
507 item.setConfigError("Unknown configuration error")
508 return None
509 return layout
510
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700511 def getLayout(self, item):
512 if not item.change.updatesConfig():
513 if item.item_ahead:
514 return item.item_ahead.current_build_set.layout
515 else:
516 return item.queue.pipeline.layout
517 # This item updates the config, ask the merger for the result.
518 build_set = item.current_build_set
519 if build_set.merge_state == build_set.PENDING:
520 return None
521 if build_set.merge_state == build_set.COMPLETE:
522 if build_set.unable_to_merge:
523 return None
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700524 self.log.debug("Preparing dynamic layout for: %s" % item.change)
James E. Blair149b69c2017-03-02 10:48:16 -0800525 return self._loadDynamicLayout(item)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700526
527 def scheduleMerge(self, item, files=None):
528 build_set = item.current_build_set
529
530 if not hasattr(item.change, 'branch'):
531 self.log.debug("Change %s does not have an associated branch, "
532 "not scheduling a merge job for item %s" %
533 (item.change, item))
534 build_set.merge_state = build_set.COMPLETE
535 return True
536
537 self.log.debug("Scheduling merge for item %s (files: %s)" %
538 (item, files))
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700539 dependent_items = self.getDependentItems(item)
540 dependent_items.reverse()
541 all_items = dependent_items + [item]
542 merger_items = map(self._makeMergerItem, all_items)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700543 build_set = item.current_build_set
544 build_set.merge_state = build_set.PENDING
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700545 self.sched.merger.mergeChanges(merger_items,
546 item.current_build_set,
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700547 files,
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700548 self.pipeline.precedence)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700549 return False
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700550
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700551 def prepareItem(self, item):
552 # This runs on every iteration of _processOneItem
553 # Returns True if the item is ready, false otherwise
554 build_set = item.current_build_set
555 if not build_set.ref:
556 build_set.setConfiguration()
557 if build_set.merge_state == build_set.NEW:
558 return self.scheduleMerge(item, ['zuul.yaml', '.zuul.yaml'])
559 if build_set.config_error:
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700560 return False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700561 return True
562
563 def prepareJobs(self, item):
564 # This only runs once the item is in the pipeline's action window
565 # Returns True if the item is ready, false otherwise
566 build_set = item.current_build_set
567 if not build_set.layout:
568 build_set.layout = self.getLayout(item)
569 if not build_set.layout:
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200570 return False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700571
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200572 if not item.job_graph:
573 try:
574 item.freezeJobGraph()
575 except Exception as e:
576 # TODOv3(jeblair): nicify this exception as it will be reported
577 self.log.exception("Error freezing job graph for %s" %
578 item)
579 item.setConfigError("Unable to freeze job graph: %s" %
580 (str(e)))
581 return False
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700582 return True
583
James E. Blair83005782015-12-11 14:46:03 -0800584 def _processOneItem(self, item, nnfi):
585 changed = False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700586 ready = False
587 failing_reasons = [] # Reasons this item is failing
588
James E. Blair83005782015-12-11 14:46:03 -0800589 item_ahead = item.item_ahead
590 if item_ahead and (not item_ahead.live):
591 item_ahead = None
592 change_queue = item.queue
James E. Blair83005782015-12-11 14:46:03 -0800593
594 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
595 # It's not okay to enqueue this change, we should remove it.
596 self.log.info("Dequeuing change %s because "
597 "it can no longer merge" % item.change)
598 self.cancelJobs(item)
599 self.dequeueItem(item)
James E. Blairdbfd3282016-07-21 10:46:19 -0700600 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800601 if item.live:
602 try:
603 self.reportItem(item)
604 except exceptions.MergeFailure:
605 pass
606 return (True, nnfi)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700607
James E. Blair83005782015-12-11 14:46:03 -0800608 actionable = change_queue.isActionable(item)
609 item.active = actionable
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700610
611 dep_items = self.getFailingDependentItems(item)
James E. Blair83005782015-12-11 14:46:03 -0800612 if dep_items:
613 failing_reasons.append('a needed change is failing')
614 self.cancelJobs(item, prime=False)
615 else:
616 item_ahead_merged = False
617 if (item_ahead and item_ahead.change.is_merged):
618 item_ahead_merged = True
619 if (item_ahead != nnfi and not item_ahead_merged):
620 # Our current base is different than what we expected,
621 # and it's not because our current base merged. Something
622 # ahead must have failed.
623 self.log.info("Resetting builds for change %s because the "
624 "item ahead, %s, is not the nearest non-failing "
625 "item, %s" % (item.change, item_ahead, nnfi))
626 change_queue.moveItem(item, nnfi)
627 changed = True
628 self.cancelJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700629 if actionable:
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700630 ready = self.prepareItem(item) and self.prepareJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700631 if item.current_build_set.unable_to_merge:
632 failing_reasons.append("it has a merge conflict")
James E. Blaire53250c2017-03-01 14:34:36 -0800633 if item.current_build_set.config_error:
634 failing_reasons.append("it has an invalid configuration")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700635 if ready and self.provisionNodes(item):
636 changed = True
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700637 if ready and self.executeJobs(item):
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700638 changed = True
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700639
James E. Blairdbfd3282016-07-21 10:46:19 -0700640 if item.didAnyJobFail():
James E. Blair83005782015-12-11 14:46:03 -0800641 failing_reasons.append("at least one job failed")
642 if (not item.live) and (not item.items_behind):
643 failing_reasons.append("is a non-live item with no items behind")
644 self.dequeueItem(item)
645 changed = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700646 if ((not item_ahead) and item.areAllJobsComplete() and item.live):
James E. Blair83005782015-12-11 14:46:03 -0800647 try:
648 self.reportItem(item)
649 except exceptions.MergeFailure:
650 failing_reasons.append("it did not merge")
651 for item_behind in item.items_behind:
652 self.log.info("Resetting builds for change %s because the "
653 "item ahead, %s, failed to merge" %
654 (item_behind.change, item))
655 self.cancelJobs(item_behind)
656 self.dequeueItem(item)
657 changed = True
658 elif not failing_reasons and item.live:
659 nnfi = item
660 item.current_build_set.failing_reasons = failing_reasons
661 if failing_reasons:
662 self.log.debug("%s is a failing item because %s" %
663 (item, failing_reasons))
664 return (changed, nnfi)
665
666 def processQueue(self):
667 # Do whatever needs to be done for each change in the queue
668 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
669 changed = False
670 for queue in self.pipeline.queues:
671 queue_changed = False
672 nnfi = None # Nearest non-failing item
673 for item in queue.queue[:]:
674 item_changed, nnfi = self._processOneItem(
675 item, nnfi)
676 if item_changed:
677 queue_changed = True
678 self.reportStats(item)
679 if queue_changed:
680 changed = True
681 status = ''
682 for item in queue.queue:
683 status += item.formatStatus()
684 if status:
685 self.log.debug("Queue %s status is now:\n %s" %
686 (queue.name, status))
687 self.log.debug("Finished queue processor: %s (changed: %s)" %
688 (self.pipeline.name, changed))
689 return changed
690
James E. Blair83005782015-12-11 14:46:03 -0800691 def onBuildStarted(self, build):
692 self.log.debug("Build %s started" % build)
693 return True
694
695 def onBuildCompleted(self, build):
696 self.log.debug("Build %s completed" % build)
697 item = build.build_set.item
698
James E. Blairdbfd3282016-07-21 10:46:19 -0700699 item.setResult(build)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100700 item.pipeline.layout.tenant.semaphore_handler.release(item, build.job)
James E. Blair83005782015-12-11 14:46:03 -0800701 self.log.debug("Item %s status is now:\n %s" %
702 (item, item.formatStatus()))
James E. Blair62295d32017-01-04 13:27:58 -0800703
James E. Blaire18d4602017-01-05 11:17:28 -0800704 if build.retry:
705 build.build_set.removeJobNodeSet(build.job.name)
706
James E. Blair83005782015-12-11 14:46:03 -0800707 return True
708
709 def onMergeCompleted(self, event):
710 build_set = event.build_set
711 item = build_set.item
712 build_set.merge_state = build_set.COMPLETE
713 build_set.zuul_url = event.zuul_url
714 if event.merged:
715 build_set.commit = event.commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700716 build_set.files.setFiles(event.files)
James E. Blair83005782015-12-11 14:46:03 -0800717 elif event.updated:
Clint Byrum6251f722017-03-25 06:21:05 -0700718 build_set.commit = item.change.newrev
719 if not build_set.commit:
James E. Blair83005782015-12-11 14:46:03 -0800720 self.log.info("Unable to merge change %s" % item.change)
James E. Blairdbfd3282016-07-21 10:46:19 -0700721 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800722
James E. Blair8d692392016-04-08 17:47:58 -0700723 def onNodesProvisioned(self, event):
James E. Blaira38c28e2017-01-04 10:33:20 -0800724 # TODOv3(jeblair): handle provisioning failure here
James E. Blair8d692392016-04-08 17:47:58 -0700725 request = event.request
726 build_set = request.build_set
727 build_set.jobNodeRequestComplete(request.job.name, request,
James E. Blair0eaad552016-09-02 12:09:54 -0700728 request.nodeset)
James E. Blair6ab79e02017-01-06 10:10:17 -0800729 if request.failed or not request.fulfilled:
730 self.log.info("Node request failure for %s" %
731 (request.job.name,))
732 build_set.item.setNodeRequestFailure(request.job)
James E. Blair34776ee2016-08-25 13:53:54 -0700733 self.log.info("Completed node request %s for job %s of item %s "
734 "with nodes %s" %
735 (request, request.job, build_set.item,
James E. Blair0eaad552016-09-02 12:09:54 -0700736 request.nodeset))
James E. Blair8d692392016-04-08 17:47:58 -0700737
James E. Blair83005782015-12-11 14:46:03 -0800738 def reportItem(self, item):
739 if not item.reported:
740 # _reportItem() returns True if it failed to report.
741 item.reported = not self._reportItem(item)
742 if self.changes_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700743 succeeded = item.didAllJobsSucceed()
James E. Blair83005782015-12-11 14:46:03 -0800744 merged = item.reported
James E. Blair6053de42017-04-05 11:27:11 -0700745 source = item.change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800746 if merged:
James E. Blair6053de42017-04-05 11:27:11 -0700747 merged = source.isMerged(item.change, item.change.branch)
James E. Blair83005782015-12-11 14:46:03 -0800748 self.log.info("Reported change %s status: all-succeeded: %s, "
749 "merged: %s" % (item.change, succeeded, merged))
750 change_queue = item.queue
751 if not (succeeded and merged):
752 self.log.debug("Reported change %s failed tests or failed "
753 "to merge" % (item.change))
754 change_queue.decreaseWindowSize()
755 self.log.debug("%s window size decreased to %s" %
756 (change_queue, change_queue.window))
757 raise exceptions.MergeFailure(
758 "Change %s failed to merge" % item.change)
759 else:
760 change_queue.increaseWindowSize()
761 self.log.debug("%s window size increased to %s" %
762 (change_queue, change_queue.window))
763
James E. Blaire511d2f2016-12-08 15:22:26 -0800764 zuul_driver = self.sched.connections.drivers['zuul']
765 tenant = self.pipeline.layout.tenant
James E. Blair6053de42017-04-05 11:27:11 -0700766 zuul_driver.onChangeMerged(tenant, item.change, source)
James E. Blair83005782015-12-11 14:46:03 -0800767
768 def _reportItem(self, item):
769 self.log.debug("Reporting change %s" % item.change)
James E. Blair6053de42017-04-05 11:27:11 -0700770 source = item.change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800771 ret = True # Means error as returned by trigger.report
James E. Blaire53250c2017-03-01 14:34:36 -0800772 if item.getConfigError():
773 self.log.debug("Invalid config for change %s" % item.change)
774 # TODOv3(jeblair): consider a new reporter action for this
775 actions = self.pipeline.merge_failure_actions
776 item.setReportedResult('CONFIG_ERROR')
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700777 elif item.didMergerFail():
778 actions = self.pipeline.merge_failure_actions
779 item.setReportedResult('MERGER_FAILURE')
James E. Blaire53250c2017-03-01 14:34:36 -0800780 elif not item.getJobs():
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700781 # We don't send empty reports with +1
James E. Blair83005782015-12-11 14:46:03 -0800782 self.log.debug("No jobs for change %s" % item.change)
783 actions = []
James E. Blairdbfd3282016-07-21 10:46:19 -0700784 elif item.didAllJobsSucceed():
James E. Blair83005782015-12-11 14:46:03 -0800785 self.log.debug("success %s" % (self.pipeline.success_actions))
786 actions = self.pipeline.success_actions
787 item.setReportedResult('SUCCESS')
788 self.pipeline._consecutive_failures = 0
James E. Blair83005782015-12-11 14:46:03 -0800789 else:
790 actions = self.pipeline.failure_actions
791 item.setReportedResult('FAILURE')
792 self.pipeline._consecutive_failures += 1
793 if self.pipeline._disabled:
794 actions = self.pipeline.disabled_actions
795 # Check here if we should disable so that we only use the disabled
796 # reporters /after/ the last disable_at failure is still reported as
797 # normal.
798 if (self.pipeline.disable_at and not self.pipeline._disabled and
799 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
800 self.pipeline._disabled = True
801 if actions:
802 try:
803 self.log.info("Reporting item %s, actions: %s" %
804 (item, actions))
James E. Blair6053de42017-04-05 11:27:11 -0700805 ret = self.sendReport(actions, source, item)
James E. Blair83005782015-12-11 14:46:03 -0800806 if ret:
807 self.log.error("Reporting item %s received: %s" %
808 (item, ret))
809 except:
810 self.log.exception("Exception while reporting:")
811 item.setReportedResult('ERROR')
James E. Blair83005782015-12-11 14:46:03 -0800812 return ret
813
814 def reportStats(self, item):
James E. Blair552b54f2016-07-22 13:55:32 -0700815 if not self.sched.statsd:
James E. Blair83005782015-12-11 14:46:03 -0800816 return
817 try:
818 # Update the gauge on enqueue and dequeue, but timers only
819 # when dequeing.
820 if item.dequeue_time:
821 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
822 else:
823 dt = None
824 items = len(self.pipeline.getAllItems())
825
826 # stats.timers.zuul.pipeline.NAME.resident_time
827 # stats_counts.zuul.pipeline.NAME.total_changes
828 # stats.gauges.zuul.pipeline.NAME.current_changes
829 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700830 self.sched.statsd.gauge(key + '.current_changes', items)
James E. Blair83005782015-12-11 14:46:03 -0800831 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700832 self.sched.statsd.timing(key + '.resident_time', dt)
833 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800834
835 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
836 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
837 project_name = item.change.project.name.replace('/', '.')
838 key += '.%s' % project_name
839 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700840 self.sched.statsd.timing(key + '.resident_time', dt)
841 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800842 except:
843 self.log.exception("Exception reporting pipeline stats")