blob: 3c0c11fcdff59083becfa665ad21e018320b3ea9 [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013import logging
14
15from zuul import exceptions
James E. Blair289f5932017-07-27 15:02:29 -070016from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080017
James E. Blair83005782015-12-11 14:46:03 -080018
19class DynamicChangeQueueContextManager(object):
20 def __init__(self, change_queue):
21 self.change_queue = change_queue
22
23 def __enter__(self):
24 return self.change_queue
25
26 def __exit__(self, etype, value, tb):
27 if self.change_queue and not self.change_queue.queue:
28 self.change_queue.pipeline.removeQueue(self.change_queue)
29
30
31class StaticChangeQueueContextManager(object):
32 def __init__(self, change_queue):
33 self.change_queue = change_queue
34
35 def __enter__(self):
36 return self.change_queue
37
38 def __exit__(self, etype, value, tb):
39 pass
40
41
Monty Taylorc75478c2016-07-29 12:04:21 -070042class PipelineManager(object):
Monty Taylor82dfd412016-07-29 12:01:28 -070043 """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
44
Monty Taylorc75478c2016-07-29 12:04:21 -070045 log = logging.getLogger("zuul.PipelineManager")
James E. Blair83005782015-12-11 14:46:03 -080046
47 def __init__(self, sched, pipeline):
48 self.sched = sched
49 self.pipeline = pipeline
50 self.event_filters = []
Monty Taylor55d0f562017-05-17 14:30:21 -050051 self.ref_filters = []
James E. Blair83005782015-12-11 14:46:03 -080052
53 def __str__(self):
54 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
55
56 def _postConfig(self, layout):
57 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blair83005782015-12-11 14:46:03 -080058 self.log.info(" Requirements:")
Monty Taylor55d0f562017-05-17 14:30:21 -050059 for f in self.ref_filters:
James E. Blair83005782015-12-11 14:46:03 -080060 self.log.info(" %s" % f)
61 self.log.info(" Events:")
62 for e in self.event_filters:
63 self.log.info(" %s" % e)
64 self.log.info(" Projects:")
65
Fredrik Medleyf8aec832015-09-28 13:40:20 +020066 def log_jobs(job_list):
67 for job_name, job_variants in job_list.jobs.items():
68 for variant in job_variants:
69 # TODOv3(jeblair): represent matchers
70 efilters = ''
71 # for b in tree.job._branches:
72 # efilters += str(b)
73 # for f in tree.job._files:
74 # efilters += str(f)
75 # if tree.job.skip_if_matcher:
76 # efilters += str(tree.job.skip_if_matcher)
77 # if efilters:
78 # efilters = ' ' + efilters
79 tags = []
80 if variant.hold_following_changes:
81 tags.append('[hold]')
82 if not variant.voting:
83 tags.append('[nonvoting]')
Tobias Henkel9a0e1942017-03-20 16:16:02 +010084 if variant.semaphore:
85 tags.append('[semaphore: %s]' % variant.semaphore)
Fredrik Medleyf8aec832015-09-28 13:40:20 +020086 tags = ' '.join(tags)
87 self.log.info(" %s%s %s" % (repr(variant),
88 efilters, tags))
James E. Blair83005782015-12-11 14:46:03 -080089
James E. Blairae76ac52017-02-02 10:03:30 -080090 for project_name in layout.project_configs.keys():
James E. Blairf59f3cf2017-02-19 14:50:26 -080091 project_config = layout.project_configs.get(project_name)
92 if project_config:
93 project_pipeline_config = project_config.pipelines.get(
94 self.pipeline.name)
95 if project_pipeline_config:
96 self.log.info(" %s" % project_name)
Fredrik Medleyf8aec832015-09-28 13:40:20 +020097 log_jobs(project_pipeline_config.job_list)
James E. Blair83005782015-12-11 14:46:03 -080098 self.log.info(" On start:")
99 self.log.info(" %s" % self.pipeline.start_actions)
100 self.log.info(" On success:")
101 self.log.info(" %s" % self.pipeline.success_actions)
102 self.log.info(" On failure:")
103 self.log.info(" %s" % self.pipeline.failure_actions)
104 self.log.info(" On merge-failure:")
105 self.log.info(" %s" % self.pipeline.merge_failure_actions)
106 self.log.info(" When disabled:")
107 self.log.info(" %s" % self.pipeline.disabled_actions)
108
109 def getSubmitAllowNeeds(self):
110 # Get a list of code review labels that are allowed to be
111 # "needed" in the submit records for a change, with respect
112 # to this queue. In other words, the list of review labels
113 # this queue itself is likely to set before submitting.
114 allow_needs = set()
115 for action_reporter in self.pipeline.success_actions:
116 allow_needs.update(action_reporter.getSubmitAllowNeeds())
117 return allow_needs
118
119 def eventMatches(self, event, change):
120 if event.forced_pipeline:
121 if event.forced_pipeline == self.pipeline.name:
122 self.log.debug("Event %s for change %s was directly assigned "
123 "to pipeline %s" % (event, change, self))
124 return True
125 else:
126 return False
127 for ef in self.event_filters:
128 if ef.matches(event, change):
129 self.log.debug("Event %s for change %s matched %s "
130 "in pipeline %s" % (event, change, ef, self))
131 return True
132 return False
133
134 def isChangeAlreadyInPipeline(self, change):
135 # Checks live items in the pipeline
136 for item in self.pipeline.getAllItems():
137 if item.live and change.equals(item.change):
138 return True
139 return False
140
141 def isChangeAlreadyInQueue(self, change, change_queue):
142 # Checks any item in the specified change queue
143 for item in change_queue.queue:
144 if change.equals(item.change):
145 return True
146 return False
147
148 def reportStart(self, item):
149 if not self.pipeline._disabled:
150 try:
151 self.log.info("Reporting start, action %s item %s" %
152 (self.pipeline.start_actions, item))
Jamie Lennox168bc8f2017-05-04 14:16:33 +1000153 ret = self.sendReport(self.pipeline.start_actions, item)
James E. Blair83005782015-12-11 14:46:03 -0800154 if ret:
155 self.log.error("Reporting item start %s received: %s" %
156 (item, ret))
157 except:
158 self.log.exception("Exception while reporting start:")
159
Jamie Lennox168bc8f2017-05-04 14:16:33 +1000160 def sendReport(self, action_reporters, item, message=None):
James E. Blair83005782015-12-11 14:46:03 -0800161 """Sends the built message off to configured reporters.
162
163 Takes the action_reporters, item, message and extra options and
164 sends them to the pluggable reporters.
165 """
166 report_errors = []
167 if len(action_reporters) > 0:
168 for reporter in action_reporters:
Jesse Keating186f14a2017-05-12 11:29:16 -0700169 ret = reporter.report(item)
James E. Blair83005782015-12-11 14:46:03 -0800170 if ret:
171 report_errors.append(ret)
172 if len(report_errors) == 0:
173 return
174 return report_errors
175
176 def isChangeReadyToBeEnqueued(self, change):
177 return True
178
179 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200180 change_queue, history=None):
James E. Blair83005782015-12-11 14:46:03 -0800181 return True
182
183 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
184 change_queue):
185 return True
186
187 def checkForChangesNeededBy(self, change, change_queue):
188 return True
189
190 def getFailingDependentItems(self, item):
191 return None
192
James E. Blair83005782015-12-11 14:46:03 -0800193 def getItemForChange(self, change):
194 for item in self.pipeline.getAllItems():
195 if item.change.equals(change):
196 return item
197 return None
198
199 def findOldVersionOfChangeAlreadyInQueue(self, change):
200 for item in self.pipeline.getAllItems():
201 if not item.live:
202 continue
203 if change.isUpdateOf(item.change):
204 return item
205 return None
206
207 def removeOldVersionsOfChange(self, change):
208 if not self.pipeline.dequeue_on_new_patchset:
209 return
210 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
211 if old_item:
212 self.log.debug("Change %s is a new version of %s, removing %s" %
213 (change, old_item.change, old_item))
214 self.removeItem(old_item)
215
216 def removeAbandonedChange(self, change):
217 self.log.debug("Change %s abandoned, removing." % change)
218 for item in self.pipeline.getAllItems():
219 if not item.live:
220 continue
221 if item.change.equals(change):
222 self.removeItem(item)
223
224 def reEnqueueItem(self, item, last_head):
225 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
226 if change_queue:
227 self.log.debug("Re-enqueing change %s in queue %s" %
228 (item.change, change_queue))
229 change_queue.enqueueItem(item)
230
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700231 # Get an updated copy of the layout if necessary.
232 # This will return one of the following:
233 # 1) An existing layout from the item ahead or pipeline.
234 # 2) A newly created layout from the cached pipeline
235 # layout config plus the previously returned
236 # in-repo files stored in the buildset.
237 # 3) None in the case that a fetch of the files from
238 # the merger is still pending.
James E. Blair29a24fd2017-10-02 15:04:56 -0700239 item.layout = self.getLayout(item)
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700240
241 # Rebuild the frozen job tree from the new layout, if
242 # we have one. If not, it will be built later.
James E. Blair29a24fd2017-10-02 15:04:56 -0700243 if item.layout:
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200244 item.freezeJobGraph()
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700245
James E. Blair83005782015-12-11 14:46:03 -0800246 # Re-set build results in case any new jobs have been
247 # added to the tree.
248 for build in item.current_build_set.getBuilds():
249 if build.result:
James E. Blairdbfd3282016-07-21 10:46:19 -0700250 item.setResult(build)
James E. Blair83005782015-12-11 14:46:03 -0800251 # Similarly, reset the item state.
252 if item.current_build_set.unable_to_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700253 item.setUnableToMerge()
James E. Blaire53250c2017-03-01 14:34:36 -0800254 if item.current_build_set.config_error:
255 item.setConfigError(item.current_build_set.config_error)
James E. Blair83005782015-12-11 14:46:03 -0800256 if item.dequeued_needing_change:
James E. Blairdbfd3282016-07-21 10:46:19 -0700257 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800258
259 self.reportStats(item)
260 return True
261 else:
262 self.log.error("Unable to find change queue for project %s" %
263 item.change.project)
264 return False
265
266 def addChange(self, change, quiet=False, enqueue_time=None,
267 ignore_requirements=False, live=True,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200268 change_queue=None, history=None):
James E. Blair83005782015-12-11 14:46:03 -0800269 self.log.debug("Considering adding change %s" % change)
270
271 # If we are adding a live change, check if it's a live item
272 # anywhere in the pipeline. Otherwise, we will perform the
273 # duplicate check below on the specific change_queue.
274 if live and self.isChangeAlreadyInPipeline(change):
275 self.log.debug("Change %s is already in pipeline, "
276 "ignoring" % change)
277 return True
278
279 if not self.isChangeReadyToBeEnqueued(change):
280 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
281 change)
282 return False
283
284 if not ignore_requirements:
Monty Taylor55d0f562017-05-17 14:30:21 -0500285 for f in self.ref_filters:
Jesse Keating8c2eb572017-05-30 17:31:45 -0700286 if f.connection_name != change.project.connection_name:
287 self.log.debug("Filter %s skipped for change %s due "
288 "to mismatched connections" % (f, change))
289 continue
James E. Blair83005782015-12-11 14:46:03 -0800290 if not f.matches(change):
291 self.log.debug("Change %s does not match pipeline "
292 "requirement %s" % (change, f))
293 return False
294
295 with self.getChangeQueue(change, change_queue) as change_queue:
296 if not change_queue:
297 self.log.debug("Unable to find change queue for "
298 "change %s in project %s" %
299 (change, change.project))
300 return False
301
302 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200303 change_queue, history=history):
James E. Blair83005782015-12-11 14:46:03 -0800304 self.log.debug("Failed to enqueue changes "
305 "ahead of %s" % change)
306 return False
307
308 if self.isChangeAlreadyInQueue(change, change_queue):
309 self.log.debug("Change %s is already in queue, "
310 "ignoring" % change)
311 return True
312
James E. Blair487000f2017-02-14 14:54:32 -0800313 self.log.info("Adding change %s to queue %s in %s" %
314 (change, change_queue, self.pipeline))
James E. Blair83005782015-12-11 14:46:03 -0800315 item = change_queue.enqueueChange(change)
316 if enqueue_time:
317 item.enqueue_time = enqueue_time
318 item.live = live
319 self.reportStats(item)
Tobias Henkel9842bd72017-05-16 13:40:03 +0200320 item.quiet = quiet
James E. Blair83005782015-12-11 14:46:03 -0800321 self.enqueueChangesBehind(change, quiet, ignore_requirements,
322 change_queue)
James E. Blaire511d2f2016-12-08 15:22:26 -0800323 zuul_driver = self.sched.connections.drivers['zuul']
324 tenant = self.pipeline.layout.tenant
325 zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800326 return True
327
328 def dequeueItem(self, item):
329 self.log.debug("Removing change %s from queue" % item.change)
330 item.queue.dequeueItem(item)
331
332 def removeItem(self, item):
333 # Remove an item from the queue, probably because it has been
334 # superseded by another change.
335 self.log.debug("Canceling builds behind change: %s "
336 "because it is being removed." % item.change)
337 self.cancelJobs(item)
338 self.dequeueItem(item)
339 self.reportStats(item)
340
James E. Blair8d692392016-04-08 17:47:58 -0700341 def provisionNodes(self, item):
James E. Blairdbfd3282016-07-21 10:46:19 -0700342 jobs = item.findJobsToRequest()
James E. Blair8d692392016-04-08 17:47:58 -0700343 if not jobs:
344 return False
345 build_set = item.current_build_set
346 self.log.debug("Requesting nodes for change %s" % item.change)
347 for job in jobs:
348 req = self.sched.nodepool.requestNodes(build_set, job)
349 self.log.debug("Adding node request %s for job %s to item %s" %
350 (req, job, item))
351 build_set.setJobNodeRequest(job.name, req)
352 return True
353
Paul Belanger174a8272017-03-14 13:20:10 -0400354 def _executeJobs(self, item, jobs):
355 self.log.debug("Executing jobs for change %s" % item.change)
James E. Blair1960d682017-04-28 15:44:14 -0700356 build_set = item.current_build_set
James E. Blair83005782015-12-11 14:46:03 -0800357 for job in jobs:
358 self.log.debug("Found job %s for change %s" % (job, item.change))
359 try:
James E. Blaircacdf2b2017-01-04 13:14:37 -0800360 nodeset = item.current_build_set.getJobNodeSet(job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800361 self.sched.nodepool.useNodeSet(nodeset)
Paul Belanger174a8272017-03-14 13:20:10 -0400362 build = self.sched.executor.execute(job, item,
363 self.pipeline,
James E. Blair1960d682017-04-28 15:44:14 -0700364 build_set.dependent_items,
365 build_set.merger_items)
James E. Blair83005782015-12-11 14:46:03 -0800366 self.log.debug("Adding build %s of job %s to item %s" %
367 (build, job, item))
368 item.addBuild(build)
369 except:
Paul Belanger174a8272017-03-14 13:20:10 -0400370 self.log.exception("Exception while executing job %s "
James E. Blair83005782015-12-11 14:46:03 -0800371 "for change %s:" % (job, item.change))
372
Paul Belanger174a8272017-03-14 13:20:10 -0400373 def executeJobs(self, item):
James E. Blair83005782015-12-11 14:46:03 -0800374 # TODO(jeblair): This should return a value indicating a job
Paul Belanger174a8272017-03-14 13:20:10 -0400375 # was executed. Appears to be a longstanding bug.
James E. Blair29a24fd2017-10-02 15:04:56 -0700376 if not item.layout:
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700377 return False
378
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100379 jobs = item.findJobsToRun(
380 item.pipeline.layout.tenant.semaphore_handler)
James E. Blair83005782015-12-11 14:46:03 -0800381 if jobs:
Paul Belanger174a8272017-03-14 13:20:10 -0400382 self._executeJobs(item, jobs)
James E. Blair83005782015-12-11 14:46:03 -0800383
384 def cancelJobs(self, item, prime=True):
385 self.log.debug("Cancel jobs for change %s" % item.change)
386 canceled = False
387 old_build_set = item.current_build_set
388 if prime and item.current_build_set.ref:
389 item.resetAllBuilds()
James E. Blair8d692392016-04-08 17:47:58 -0700390 for req in old_build_set.node_requests.values():
391 self.sched.nodepool.cancelRequest(req)
392 old_build_set.node_requests = {}
James E. Blaire18d4602017-01-05 11:17:28 -0800393 canceled_jobs = set()
James E. Blair83005782015-12-11 14:46:03 -0800394 for build in old_build_set.getBuilds():
James E. Blair98095cf2017-02-02 14:13:03 -0800395 if build.result:
396 canceled_jobs.add(build.job.name)
397 continue
James E. Blair01695c32017-01-04 17:29:25 -0800398 was_running = False
James E. Blair83005782015-12-11 14:46:03 -0800399 try:
Paul Belanger174a8272017-03-14 13:20:10 -0400400 was_running = self.sched.executor.cancel(build)
James E. Blair83005782015-12-11 14:46:03 -0800401 except:
402 self.log.exception("Exception while canceling build %s "
403 "for change %s" % (build, item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100404 finally:
Tobias Henkel0f714002017-06-30 23:30:52 +0200405 tenant = old_build_set.item.pipeline.layout.tenant
406 tenant.semaphore_handler.release(
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100407 old_build_set.item, build.job)
Tobias Henkelfb91a492017-02-15 07:29:43 +0100408
James E. Blair01695c32017-01-04 17:29:25 -0800409 if not was_running:
James E. Blaird43ef512017-09-29 07:46:31 -0700410 nodeset = build.build_set.getJobNodeSet(build.job.name)
411 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800412 build.result = 'CANCELED'
413 canceled = True
James E. Blaire18d4602017-01-05 11:17:28 -0800414 canceled_jobs.add(build.job.name)
Clint Byrum1d0c7d12017-05-10 19:40:53 -0700415 for jobname, nodeset in list(old_build_set.nodesets.items()):
James E. Blaire18d4602017-01-05 11:17:28 -0800416 if jobname in canceled_jobs:
417 continue
James E. Blair1511bc32017-01-18 09:25:31 -0800418 self.sched.nodepool.returnNodeSet(nodeset)
James E. Blair83005782015-12-11 14:46:03 -0800419 for item_behind in item.items_behind:
420 self.log.debug("Canceling jobs for change %s, behind change %s" %
421 (item_behind.change, item.change))
422 if self.cancelJobs(item_behind, prime=prime):
423 canceled = True
424 return canceled
425
James E. Blair149b69c2017-03-02 10:48:16 -0800426 def _loadDynamicLayout(self, item):
427 # Load layout
428 # Late import to break an import loop
429 import zuul.configloader
430 loader = zuul.configloader.ConfigLoader()
431
432 build_set = item.current_build_set
James E. Blair149b69c2017-03-02 10:48:16 -0800433 try:
Monty Taylor4eeb9b12017-03-06 12:43:16 -0600434 # First parse the config as it will land with the
James E. Blair149b69c2017-03-02 10:48:16 -0800435 # full set of config and project repos. This lets us
436 # catch syntax errors in config repos even though we won't
437 # actually run with that config.
James E. Blair499c3042017-09-28 09:57:20 -0700438 self.log.debug("Loading dynamic layout (phase 1)")
James E. Blair149b69c2017-03-02 10:48:16 -0800439 loader.createDynamicLayout(
440 item.pipeline.layout.tenant,
441 build_set.files,
James E. Blair332636e2017-09-05 10:14:35 -0700442 include_config_projects=True,
443 scheduler=self.sched,
444 connections=self.sched.connections)
James E. Blair149b69c2017-03-02 10:48:16 -0800445
446 # Then create the config a second time but without changes
447 # to config repos so that we actually use this config.
James E. Blair499c3042017-09-28 09:57:20 -0700448 self.log.debug("Loading dynamic layout (phase 2)")
James E. Blair149b69c2017-03-02 10:48:16 -0800449 layout = loader.createDynamicLayout(
450 item.pipeline.layout.tenant,
451 build_set.files,
James E. Blair109da3f2017-04-04 14:39:43 -0700452 include_config_projects=False)
James E. Blair499c3042017-09-28 09:57:20 -0700453 self.log.debug("Loading dynamic layout complete")
James E. Blair149b69c2017-03-02 10:48:16 -0800454 except zuul.configloader.ConfigurationSyntaxError as e:
455 self.log.info("Configuration syntax error "
James E. Blair6e00a852017-09-23 07:44:10 -0700456 "in dynamic layout")
James E. Blair149b69c2017-03-02 10:48:16 -0800457 item.setConfigError(str(e))
458 return None
459 except Exception:
James E. Blair6e00a852017-09-23 07:44:10 -0700460 self.log.exception("Error in dynamic layout")
James E. Blair149b69c2017-03-02 10:48:16 -0800461 item.setConfigError("Unknown configuration error")
462 return None
463 return layout
464
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700465 def getLayout(self, item):
466 if not item.change.updatesConfig():
467 if item.item_ahead:
James E. Blair29a24fd2017-10-02 15:04:56 -0700468 return item.item_ahead.layout
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700469 else:
470 return item.queue.pipeline.layout
471 # This item updates the config, ask the merger for the result.
472 build_set = item.current_build_set
473 if build_set.merge_state == build_set.PENDING:
474 return None
475 if build_set.merge_state == build_set.COMPLETE:
476 if build_set.unable_to_merge:
477 return None
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700478 self.log.debug("Preparing dynamic layout for: %s" % item.change)
James E. Blair149b69c2017-03-02 10:48:16 -0800479 return self._loadDynamicLayout(item)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700480
Tristan Cacqueray829e6172017-06-13 06:49:36 +0000481 def scheduleMerge(self, item, files=None, dirs=None):
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700482 build_set = item.current_build_set
483
Tristan Cacqueray829e6172017-06-13 06:49:36 +0000484 self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
485 (item, files, dirs))
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700486 build_set = item.current_build_set
487 build_set.merge_state = build_set.PENDING
James E. Blair289f5932017-07-27 15:02:29 -0700488 if isinstance(item.change, model.Change):
489 self.sched.merger.mergeChanges(build_set.merger_items,
490 item.current_build_set, files, dirs,
491 precedence=self.pipeline.precedence)
492 else:
493 self.sched.merger.getRepoState(build_set.merger_items,
494 item.current_build_set,
495 precedence=self.pipeline.precedence)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700496 return False
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700497
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700498 def prepareItem(self, item):
499 # This runs on every iteration of _processOneItem
500 # Returns True if the item is ready, false otherwise
501 build_set = item.current_build_set
502 if not build_set.ref:
503 build_set.setConfiguration()
504 if build_set.merge_state == build_set.NEW:
Tristan Cacqueray829e6172017-06-13 06:49:36 +0000505 return self.scheduleMerge(item,
506 files=['zuul.yaml', '.zuul.yaml'],
507 dirs=['zuul.d', '.zuul.d'])
James E. Blair8d144dc2017-05-05 10:13:45 -0700508 if build_set.merge_state == build_set.PENDING:
509 return False
510 if build_set.unable_to_merge:
511 return False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700512 if build_set.config_error:
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700513 return False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700514 return True
515
516 def prepareJobs(self, item):
517 # This only runs once the item is in the pipeline's action window
518 # Returns True if the item is ready, false otherwise
James E. Blair29a24fd2017-10-02 15:04:56 -0700519 if not item.layout:
520 item.layout = self.getLayout(item)
521 if not item.layout:
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200522 return False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700523
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200524 if not item.job_graph:
525 try:
James E. Blairc9455002017-09-06 09:22:19 -0700526 self.log.debug("Freezing job graph for %s" % (item,))
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200527 item.freezeJobGraph()
528 except Exception as e:
529 # TODOv3(jeblair): nicify this exception as it will be reported
530 self.log.exception("Error freezing job graph for %s" %
James E. Blairc9455002017-09-06 09:22:19 -0700531 (item,))
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200532 item.setConfigError("Unable to freeze job graph: %s" %
533 (str(e)))
534 return False
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700535 return True
536
James E. Blair83005782015-12-11 14:46:03 -0800537 def _processOneItem(self, item, nnfi):
538 changed = False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700539 ready = False
James E. Blair332636e2017-09-05 10:14:35 -0700540 dequeued = False
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700541 failing_reasons = [] # Reasons this item is failing
542
James E. Blair83005782015-12-11 14:46:03 -0800543 item_ahead = item.item_ahead
544 if item_ahead and (not item_ahead.live):
545 item_ahead = None
546 change_queue = item.queue
James E. Blair83005782015-12-11 14:46:03 -0800547
548 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
549 # It's not okay to enqueue this change, we should remove it.
550 self.log.info("Dequeuing change %s because "
551 "it can no longer merge" % item.change)
552 self.cancelJobs(item)
553 self.dequeueItem(item)
James E. Blairdbfd3282016-07-21 10:46:19 -0700554 item.setDequeuedNeedingChange()
James E. Blair83005782015-12-11 14:46:03 -0800555 if item.live:
556 try:
557 self.reportItem(item)
558 except exceptions.MergeFailure:
559 pass
560 return (True, nnfi)
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700561
James E. Blair83005782015-12-11 14:46:03 -0800562 actionable = change_queue.isActionable(item)
563 item.active = actionable
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700564
565 dep_items = self.getFailingDependentItems(item)
James E. Blair83005782015-12-11 14:46:03 -0800566 if dep_items:
567 failing_reasons.append('a needed change is failing')
568 self.cancelJobs(item, prime=False)
569 else:
570 item_ahead_merged = False
571 if (item_ahead and item_ahead.change.is_merged):
572 item_ahead_merged = True
573 if (item_ahead != nnfi and not item_ahead_merged):
574 # Our current base is different than what we expected,
575 # and it's not because our current base merged. Something
576 # ahead must have failed.
577 self.log.info("Resetting builds for change %s because the "
578 "item ahead, %s, is not the nearest non-failing "
579 "item, %s" % (item.change, item_ahead, nnfi))
580 change_queue.moveItem(item, nnfi)
581 changed = True
582 self.cancelJobs(item)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700583 if actionable:
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700584 ready = self.prepareItem(item) and self.prepareJobs(item)
Tobias Henkel9842bd72017-05-16 13:40:03 +0200585 # Starting jobs reporting should only be done once if there are
586 # jobs to run for this item.
587 if ready and len(self.pipeline.start_actions) > 0 \
588 and len(item.job_graph.jobs) > 0 \
589 and not item.reported_start \
590 and not item.quiet:
591 self.reportStart(item)
592 item.reported_start = True
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700593 if item.current_build_set.unable_to_merge:
594 failing_reasons.append("it has a merge conflict")
James E. Blair332636e2017-09-05 10:14:35 -0700595 if (not item.live) and (not dequeued):
596 self.dequeueItem(item)
597 changed = dequeued = True
James E. Blaire53250c2017-03-01 14:34:36 -0800598 if item.current_build_set.config_error:
599 failing_reasons.append("it has an invalid configuration")
James E. Blair332636e2017-09-05 10:14:35 -0700600 if (not item.live) and (not dequeued):
601 self.dequeueItem(item)
602 changed = dequeued = True
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700603 if ready and self.provisionNodes(item):
604 changed = True
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700605 if ready and self.executeJobs(item):
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700606 changed = True
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700607
James E. Blairdbfd3282016-07-21 10:46:19 -0700608 if item.didAnyJobFail():
James E. Blair83005782015-12-11 14:46:03 -0800609 failing_reasons.append("at least one job failed")
James E. Blair332636e2017-09-05 10:14:35 -0700610 if (not item.live) and (not item.items_behind) and (not dequeued):
James E. Blair83005782015-12-11 14:46:03 -0800611 failing_reasons.append("is a non-live item with no items behind")
612 self.dequeueItem(item)
James E. Blair332636e2017-09-05 10:14:35 -0700613 changed = dequeued = True
James E. Blairdbfd3282016-07-21 10:46:19 -0700614 if ((not item_ahead) and item.areAllJobsComplete() and item.live):
James E. Blair83005782015-12-11 14:46:03 -0800615 try:
616 self.reportItem(item)
617 except exceptions.MergeFailure:
618 failing_reasons.append("it did not merge")
619 for item_behind in item.items_behind:
620 self.log.info("Resetting builds for change %s because the "
621 "item ahead, %s, failed to merge" %
622 (item_behind.change, item))
623 self.cancelJobs(item_behind)
624 self.dequeueItem(item)
James E. Blair332636e2017-09-05 10:14:35 -0700625 changed = dequeued = True
James E. Blair83005782015-12-11 14:46:03 -0800626 elif not failing_reasons and item.live:
627 nnfi = item
628 item.current_build_set.failing_reasons = failing_reasons
629 if failing_reasons:
630 self.log.debug("%s is a failing item because %s" %
631 (item, failing_reasons))
632 return (changed, nnfi)
633
634 def processQueue(self):
635 # Do whatever needs to be done for each change in the queue
636 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
637 changed = False
638 for queue in self.pipeline.queues:
639 queue_changed = False
640 nnfi = None # Nearest non-failing item
641 for item in queue.queue[:]:
642 item_changed, nnfi = self._processOneItem(
643 item, nnfi)
644 if item_changed:
645 queue_changed = True
646 self.reportStats(item)
647 if queue_changed:
648 changed = True
649 status = ''
650 for item in queue.queue:
651 status += item.formatStatus()
652 if status:
653 self.log.debug("Queue %s status is now:\n %s" %
654 (queue.name, status))
655 self.log.debug("Finished queue processor: %s (changed: %s)" %
656 (self.pipeline.name, changed))
657 return changed
658
James E. Blair83005782015-12-11 14:46:03 -0800659 def onBuildStarted(self, build):
660 self.log.debug("Build %s started" % build)
661 return True
662
663 def onBuildCompleted(self, build):
664 self.log.debug("Build %s completed" % build)
665 item = build.build_set.item
666
James E. Blairdbfd3282016-07-21 10:46:19 -0700667 item.setResult(build)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100668 item.pipeline.layout.tenant.semaphore_handler.release(item, build.job)
James E. Blair83005782015-12-11 14:46:03 -0800669 self.log.debug("Item %s status is now:\n %s" %
670 (item, item.formatStatus()))
James E. Blair62295d32017-01-04 13:27:58 -0800671
James E. Blaire18d4602017-01-05 11:17:28 -0800672 if build.retry:
673 build.build_set.removeJobNodeSet(build.job.name)
674
James E. Blair83005782015-12-11 14:46:03 -0800675 return True
676
677 def onMergeCompleted(self, event):
678 build_set = event.build_set
679 item = build_set.item
680 build_set.merge_state = build_set.COMPLETE
James E. Blair289f5932017-07-27 15:02:29 -0700681 build_set.repo_state = event.repo_state
James E. Blair83005782015-12-11 14:46:03 -0800682 if event.merged:
683 build_set.commit = event.commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700684 build_set.files.setFiles(event.files)
James E. Blair83005782015-12-11 14:46:03 -0800685 elif event.updated:
James E. Blair289f5932017-07-27 15:02:29 -0700686 build_set.commit = (item.change.newrev or
687 '0000000000000000000000000000000000000000')
Clint Byrum6251f722017-03-25 06:21:05 -0700688 if not build_set.commit:
James E. Blair83005782015-12-11 14:46:03 -0800689 self.log.info("Unable to merge change %s" % item.change)
James E. Blairdbfd3282016-07-21 10:46:19 -0700690 item.setUnableToMerge()
James E. Blair83005782015-12-11 14:46:03 -0800691
James E. Blair8d692392016-04-08 17:47:58 -0700692 def onNodesProvisioned(self, event):
James E. Blaira38c28e2017-01-04 10:33:20 -0800693 # TODOv3(jeblair): handle provisioning failure here
James E. Blair8d692392016-04-08 17:47:58 -0700694 request = event.request
695 build_set = request.build_set
696 build_set.jobNodeRequestComplete(request.job.name, request,
James E. Blair0eaad552016-09-02 12:09:54 -0700697 request.nodeset)
James E. Blair6ab79e02017-01-06 10:10:17 -0800698 if request.failed or not request.fulfilled:
699 self.log.info("Node request failure for %s" %
700 (request.job.name,))
701 build_set.item.setNodeRequestFailure(request.job)
James E. Blair34776ee2016-08-25 13:53:54 -0700702 self.log.info("Completed node request %s for job %s of item %s "
703 "with nodes %s" %
704 (request, request.job, build_set.item,
James E. Blair0eaad552016-09-02 12:09:54 -0700705 request.nodeset))
James E. Blair8d692392016-04-08 17:47:58 -0700706
James E. Blair83005782015-12-11 14:46:03 -0800707 def reportItem(self, item):
708 if not item.reported:
709 # _reportItem() returns True if it failed to report.
710 item.reported = not self._reportItem(item)
711 if self.changes_merge:
James E. Blairdbfd3282016-07-21 10:46:19 -0700712 succeeded = item.didAllJobsSucceed()
James E. Blair83005782015-12-11 14:46:03 -0800713 merged = item.reported
James E. Blair6053de42017-04-05 11:27:11 -0700714 source = item.change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800715 if merged:
James E. Blair6053de42017-04-05 11:27:11 -0700716 merged = source.isMerged(item.change, item.change.branch)
James E. Blair83005782015-12-11 14:46:03 -0800717 self.log.info("Reported change %s status: all-succeeded: %s, "
718 "merged: %s" % (item.change, succeeded, merged))
719 change_queue = item.queue
720 if not (succeeded and merged):
721 self.log.debug("Reported change %s failed tests or failed "
722 "to merge" % (item.change))
723 change_queue.decreaseWindowSize()
724 self.log.debug("%s window size decreased to %s" %
725 (change_queue, change_queue.window))
726 raise exceptions.MergeFailure(
727 "Change %s failed to merge" % item.change)
728 else:
729 change_queue.increaseWindowSize()
730 self.log.debug("%s window size increased to %s" %
731 (change_queue, change_queue.window))
732
James E. Blaire511d2f2016-12-08 15:22:26 -0800733 zuul_driver = self.sched.connections.drivers['zuul']
734 tenant = self.pipeline.layout.tenant
James E. Blair6053de42017-04-05 11:27:11 -0700735 zuul_driver.onChangeMerged(tenant, item.change, source)
James E. Blair83005782015-12-11 14:46:03 -0800736
737 def _reportItem(self, item):
738 self.log.debug("Reporting change %s" % item.change)
739 ret = True # Means error as returned by trigger.report
James E. Blair0d3e83b2017-06-05 13:51:57 -0700740
741 # In the case of failure, we may not hove completed an initial
742 # merge which would get the layout for this item, so in order
743 # to determine whether this item's project is in this
744 # pipeline, use the dynamic layout if available, otherwise,
745 # fall back to the current static layout as a best
746 # approximation.
James E. Blair29a24fd2017-10-02 15:04:56 -0700747 layout = (item.layout or self.pipeline.layout)
James E. Blair0d3e83b2017-06-05 13:51:57 -0700748
James E. Blairc9455002017-09-06 09:22:19 -0700749 project_in_pipeline = True
750 if not layout.getProjectPipelineConfig(item.change.project,
751 self.pipeline):
James E. Blair0d3e83b2017-06-05 13:51:57 -0700752 self.log.debug("Project %s not in pipeline %s for change %s" % (
753 item.change.project, self.pipeline, item.change))
James E. Blairc9455002017-09-06 09:22:19 -0700754 project_in_pipeline = False
James E. Blair0d3e83b2017-06-05 13:51:57 -0700755 actions = []
756 elif item.getConfigError():
James E. Blaire53250c2017-03-01 14:34:36 -0800757 self.log.debug("Invalid config for change %s" % item.change)
758 # TODOv3(jeblair): consider a new reporter action for this
759 actions = self.pipeline.merge_failure_actions
760 item.setReportedResult('CONFIG_ERROR')
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700761 elif item.didMergerFail():
762 actions = self.pipeline.merge_failure_actions
763 item.setReportedResult('MERGER_FAILURE')
James E. Blair0d3e83b2017-06-05 13:51:57 -0700764 elif item.wasDequeuedNeedingChange():
765 actions = self.pipeline.failure_actions
766 item.setReportedResult('FAILURE')
James E. Blaire53250c2017-03-01 14:34:36 -0800767 elif not item.getJobs():
K Jonathan Harkerae04e4c2017-03-15 19:07:11 -0700768 # We don't send empty reports with +1
James E. Blair0d3e83b2017-06-05 13:51:57 -0700769 self.log.debug("No jobs for change %s" % (item.change,))
James E. Blair83005782015-12-11 14:46:03 -0800770 actions = []
James E. Blairdbfd3282016-07-21 10:46:19 -0700771 elif item.didAllJobsSucceed():
James E. Blair83005782015-12-11 14:46:03 -0800772 self.log.debug("success %s" % (self.pipeline.success_actions))
773 actions = self.pipeline.success_actions
774 item.setReportedResult('SUCCESS')
775 self.pipeline._consecutive_failures = 0
James E. Blair83005782015-12-11 14:46:03 -0800776 else:
777 actions = self.pipeline.failure_actions
778 item.setReportedResult('FAILURE')
779 self.pipeline._consecutive_failures += 1
James E. Blairc9455002017-09-06 09:22:19 -0700780 if project_in_pipeline and self.pipeline._disabled:
James E. Blair83005782015-12-11 14:46:03 -0800781 actions = self.pipeline.disabled_actions
782 # Check here if we should disable so that we only use the disabled
783 # reporters /after/ the last disable_at failure is still reported as
784 # normal.
785 if (self.pipeline.disable_at and not self.pipeline._disabled and
786 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
787 self.pipeline._disabled = True
788 if actions:
789 try:
790 self.log.info("Reporting item %s, actions: %s" %
791 (item, actions))
Jamie Lennox168bc8f2017-05-04 14:16:33 +1000792 ret = self.sendReport(actions, item)
James E. Blair83005782015-12-11 14:46:03 -0800793 if ret:
794 self.log.error("Reporting item %s received: %s" %
795 (item, ret))
796 except:
797 self.log.exception("Exception while reporting:")
798 item.setReportedResult('ERROR')
James E. Blair83005782015-12-11 14:46:03 -0800799 return ret
800
801 def reportStats(self, item):
James E. Blair552b54f2016-07-22 13:55:32 -0700802 if not self.sched.statsd:
James E. Blair83005782015-12-11 14:46:03 -0800803 return
804 try:
805 # Update the gauge on enqueue and dequeue, but timers only
806 # when dequeing.
807 if item.dequeue_time:
808 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
809 else:
810 dt = None
811 items = len(self.pipeline.getAllItems())
812
813 # stats.timers.zuul.pipeline.NAME.resident_time
814 # stats_counts.zuul.pipeline.NAME.total_changes
815 # stats.gauges.zuul.pipeline.NAME.current_changes
816 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700817 self.sched.statsd.gauge(key + '.current_changes', items)
James E. Blair83005782015-12-11 14:46:03 -0800818 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700819 self.sched.statsd.timing(key + '.resident_time', dt)
820 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800821
822 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
823 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
824 project_name = item.change.project.name.replace('/', '.')
825 key += '.%s' % project_name
826 if dt:
James E. Blair552b54f2016-07-22 13:55:32 -0700827 self.sched.statsd.timing(key + '.resident_time', dt)
828 self.sched.statsd.incr(key + '.total_changes')
James E. Blair83005782015-12-11 14:46:03 -0800829 except:
830 self.log.exception("Exception reporting pipeline stats")