blob: 30a6c814372b4d9a5d32bed4dc2e4302e5219336 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
Joshua Heskethde958652015-11-10 19:19:50 +110032from model import Pipeline, Project, ChangeQueue
Joshua Hesketh70b13492015-08-11 23:40:42 +100033from model import ChangeishFilter, NullChange
Joshua Hesketh352264b2015-08-11 23:42:08 +100034from zuul import change_matcher, exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairaf17a972016-02-03 15:07:18 -080062class MutexHandler(object):
63 log = logging.getLogger("zuul.MutexHandler")
64
65 def __init__(self):
66 self.mutexes = {}
67
68 def acquire(self, item, job):
69 if not job.mutex:
70 return True
71 mutex_name = job.mutex
72 m = self.mutexes.get(mutex_name)
73 if not m:
74 # The mutex is not held, acquire it
75 self._acquire(mutex_name, item, job.name)
76 return True
77 held_item, held_job_name = m
78 if held_item is item and held_job_name == job.name:
79 # This item already holds the mutex
80 return True
81 held_build = held_item.current_build_set.getBuild(held_job_name)
82 if held_build and held_build.result:
83 # The build that held the mutex is complete, release it
84 # and let the new item have it.
85 self.log.error("Held mutex %s being released because "
86 "the build that holds it is complete" %
87 (mutex_name,))
88 self._release(mutex_name, item, job.name)
89 self._acquire(mutex_name, item, job.name)
90 return True
91 return False
92
93 def release(self, item, job):
94 if not job.mutex:
95 return
96 mutex_name = job.mutex
97 m = self.mutexes.get(mutex_name)
98 if not m:
99 # The mutex is not held, nothing to do
100 self.log.error("Mutex can not be released for %s "
101 "because the mutex is not held" %
102 (item,))
103 return
104 held_item, held_job_name = m
105 if held_item is item and held_job_name == job.name:
106 # This item holds the mutex
107 self._release(mutex_name, item, job.name)
108 return
109 self.log.error("Mutex can not be released for %s "
110 "which does not hold it" %
111 (item,))
112
113 def _acquire(self, mutex_name, item, job_name):
114 self.log.debug("Job %s of item %s acquiring mutex %s" %
115 (job_name, item, mutex_name))
116 self.mutexes[mutex_name] = (item, job_name)
117
118 def _release(self, mutex_name, item, job_name):
119 self.log.debug("Job %s of item %s releasing mutex %s" %
120 (job_name, item, mutex_name))
121 del self.mutexes[mutex_name]
122
123
James E. Blair468c8512013-12-06 13:27:19 -0800124class ManagementEvent(object):
125 """An event that should be processed within the main queue run loop"""
126 def __init__(self):
127 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -0800128 self._exception = None
129 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -0800130
James E. Blair36658cf2013-12-06 17:53:48 -0800131 def exception(self, e, tb):
132 self._exception = e
133 self._traceback = tb
134 self._wait_event.set()
135
136 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -0800137 self._wait_event.set()
138
139 def wait(self, timeout=None):
140 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -0800141 if self._exception:
142 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -0800143 return self._wait_event.is_set()
144
145
146class ReconfigureEvent(ManagementEvent):
147 """Reconfigure the scheduler. The layout will be (re-)loaded from
148 the path specified in the configuration.
149
150 :arg ConfigParser config: the new configuration
151 """
152 def __init__(self, config):
153 super(ReconfigureEvent, self).__init__()
154 self.config = config
155
156
James E. Blair36658cf2013-12-06 17:53:48 -0800157class PromoteEvent(ManagementEvent):
158 """Promote one or more changes to the head of the queue.
159
160 :arg str pipeline_name: the name of the pipeline
161 :arg list change_ids: a list of strings of change ids in the form
162 1234,1
163 """
164
165 def __init__(self, pipeline_name, change_ids):
166 super(PromoteEvent, self).__init__()
167 self.pipeline_name = pipeline_name
168 self.change_ids = change_ids
169
170
James E. Blaird27a96d2014-07-10 13:25:13 -0700171class EnqueueEvent(ManagementEvent):
172 """Enqueue a change into a pipeline
173
174 :arg TriggerEvent trigger_event: a TriggerEvent describing the
175 trigger, pipeline, and change to enqueue
176 """
177
178 def __init__(self, trigger_event):
179 super(EnqueueEvent, self).__init__()
180 self.trigger_event = trigger_event
181
182
James E. Blaira84f0e42014-02-06 07:09:22 -0800183class ResultEvent(object):
184 """An event that needs to modify the pipeline state due to a
185 result from an external system."""
186
187 pass
188
189
190class BuildStartedEvent(ResultEvent):
191 """A build has started.
192
193 :arg Build build: The build which has started.
194 """
195
196 def __init__(self, build):
197 self.build = build
198
199
200class BuildCompletedEvent(ResultEvent):
201 """A build has completed
202
203 :arg Build build: The build which has completed.
204 """
205
206 def __init__(self, build):
207 self.build = build
208
209
James E. Blair4076e2b2014-01-28 12:42:20 -0800210class MergeCompletedEvent(ResultEvent):
211 """A remote merge operation has completed
212
213 :arg BuildSet build_set: The build_set which is ready.
214 :arg str zuul_url: The URL of the Zuul Merger.
215 :arg bool merged: Whether the merge succeeded (changes with refs).
216 :arg bool updated: Whether the repo was updated (changes without refs).
217 :arg str commit: The SHA of the merged commit (changes with refs).
218 """
219
220 def __init__(self, build_set, zuul_url, merged, updated, commit):
221 self.build_set = build_set
222 self.zuul_url = zuul_url
223 self.merged = merged
224 self.updated = updated
225 self.commit = commit
226
227
Maru Newby3fe5f852015-01-13 04:22:14 +0000228def toList(item):
229 if not item:
230 return []
231 if isinstance(item, list):
232 return item
233 return [item]
234
235
James E. Blaire9d45c32012-05-31 09:56:45 -0700236class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700237 log = logging.getLogger("zuul.Scheduler")
238
James E. Blaire4d229c2016-05-25 15:25:41 -0700239 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700240 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400241 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700242 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700243 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800244 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700245 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700246 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700247 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700248 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800249 self.merger = None
James E. Blairaf17a972016-02-03 15:07:18 -0800250 self.mutex = MutexHandler()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000251 self.connections = dict()
252 # Despite triggers being part of the pipeline, there is one trigger set
253 # per scheduler. The pipeline handles the trigger filters but since
254 # the events are handled by the scheduler itself it needs to handle
255 # the loading of the triggers.
256 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700257 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000258 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700259
260 self.trigger_event_queue = Queue.Queue()
261 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800262 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400263 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700264
James E. Blaire4d229c2016-05-25 15:25:41 -0700265 if not testonly:
266 time_dir = self._get_time_database_dir()
267 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700268
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000269 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400270 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400271
Joshua Heskethde958652015-11-10 19:19:50 +1100272 # A set of reporter configuration keys to action mapping
273 self._reporter_actions = {
274 'start': 'start_actions',
275 'success': 'success_actions',
276 'failure': 'failure_actions',
277 'merge-failure': 'merge_failure_actions',
278 'disabled': 'disabled_actions',
279 }
280
James E. Blairb0fcae42012-07-17 11:12:10 -0700281 def stop(self):
282 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000283 self._unloadDrivers()
284 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700285 self.wake_event.set()
286
Joshua Hesketh352264b2015-08-11 23:42:08 +1000287 def testConfig(self, config_path, connections):
288 # Take the list of set up connections directly here rather than with
289 # registerConnections as we don't want to do the onLoad event yet.
290 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800291
Maru Newby3fe5f852015-01-13 04:22:14 +0000292 def _parseSkipIf(self, config_job):
293 cm = change_matcher
294 skip_matchers = []
295
296 for config_skip in config_job.get('skip-if', []):
297 nested_matchers = []
298
299 project_regex = config_skip.get('project')
300 if project_regex:
301 nested_matchers.append(cm.ProjectMatcher(project_regex))
302
303 branch_regex = config_skip.get('branch')
304 if branch_regex:
305 nested_matchers.append(cm.BranchMatcher(branch_regex))
306
307 file_regexes = toList(config_skip.get('all-files-match-any'))
308 if file_regexes:
309 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
310 all_files_matcher = cm.MatchAllFiles(file_matchers)
311 nested_matchers.append(all_files_matcher)
312
313 # All patterns need to match a given skip-if predicate
314 skip_matchers.append(cm.MatchAll(nested_matchers))
315
316 if skip_matchers:
317 # Any skip-if predicate can be matched to trigger a skip
318 return cm.MatchAny(skip_matchers)
319
Joshua Hesketh352264b2015-08-11 23:42:08 +1000320 def registerConnections(self, connections):
321 self.connections = connections
322 for connection_name, connection in self.connections.items():
323 connection.registerScheduler(self)
324 connection.onLoad()
325
326 def stopConnections(self):
327 for connection_name, connection in self.connections.items():
328 connection.onStop()
329
330 def _unloadDrivers(self):
331 for trigger in self.triggers.values():
332 trigger.stop()
Joshua Hesketh90b61db2016-02-03 14:22:15 +1100333 self.triggers = {}
Joshua Hesketh352264b2015-08-11 23:42:08 +1000334 for pipeline in self.layout.pipelines.values():
335 pipeline.source.stop()
Joshua Heskethde958652015-11-10 19:19:50 +1100336 for action in self._reporter_actions.values():
337 for reporter in pipeline.__getattribute__(action):
338 reporter.stop()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000339
340 def _getDriver(self, dtype, connection_name, driver_config={}):
341 # Instantiate a driver such as a trigger, source or reporter
342 # TODO(jhesketh): Make this list dynamic or use entrypoints etc.
343 # Stevedore was not a good fit here due to the nature of triggers.
344 # Specifically we don't want to load a trigger per a pipeline as one
345 # trigger can listen to a stream (from gerrit, for example) and the
346 # scheduler decides which eventfilter to use. As such we want to load
347 # trigger+connection pairs uniquely.
348 drivers = {
349 'source': {
350 'gerrit': 'zuul.source.gerrit:GerritSource',
351 },
352 'trigger': {
353 'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
354 'timer': 'zuul.trigger.timer:TimerTrigger',
355 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
356 },
357 'reporter': {
358 'gerrit': 'zuul.reporter.gerrit:GerritReporter',
359 'smtp': 'zuul.reporter.smtp:SMTPReporter',
360 },
361 }
362
363 # TODO(jhesketh): Check the connection_name exists
364 if connection_name in self.connections.keys():
365 driver_name = self.connections[connection_name].driver_name
366 connection = self.connections[connection_name]
367 else:
368 # In some cases a driver may not be related to a connection. For
369 # example, the 'timer' or 'zuul' triggers.
370 driver_name = connection_name
371 connection = None
372 driver = drivers[dtype][driver_name].split(':')
373 driver_instance = getattr(
374 __import__(driver[0], fromlist=['']), driver[1])(
375 driver_config, self, connection
376 )
377
Joshua Hesketh811e2e92015-12-08 09:55:05 +1100378 if connection:
379 connection.registerUse(dtype, driver_instance)
380
Joshua Hesketh352264b2015-08-11 23:42:08 +1000381 return driver_instance
382
383 def _getSourceDriver(self, connection_name):
384 return self._getDriver('source', connection_name)
385
386 def _getReporterDriver(self, connection_name, driver_config={}):
387 return self._getDriver('reporter', connection_name, driver_config)
388
389 def _getTriggerDriver(self, connection_name, driver_config={}):
390 return self._getDriver('trigger', connection_name, driver_config)
391
392 def _parseConfig(self, config_path, connections):
James E. Blaireff88162013-07-01 12:44:14 -0400393 layout = model.Layout()
394 project_templates = {}
395
James E. Blaire5a847f2012-07-10 15:29:14 -0700396 if config_path:
397 config_path = os.path.expanduser(config_path)
398 if not os.path.exists(config_path):
399 raise Exception("Unable to read layout config file at %s" %
400 config_path)
Einst Crazyff9837b2015-11-17 17:32:37 +0800401 with open(config_path) as config_file:
402 data = yaml.load(config_file)
James E. Blaire5a847f2012-07-10 15:29:14 -0700403
James E. Blair47958382013-01-10 17:26:02 -0800404 validator = layoutvalidator.LayoutValidator()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000405 validator.validate(data, connections)
James E. Blair47958382013-01-10 17:26:02 -0800406
James E. Blaireff88162013-07-01 12:44:14 -0400407 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700408 for include in data.get('includes', []):
409 if 'python-file' in include:
410 fn = include['python-file']
411 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100412 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700413 fn = os.path.join(base, fn)
414 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400415 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700416
James E. Blair4aea70c2012-07-26 14:23:24 -0700417 for conf_pipeline in data.get('pipelines', []):
418 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800419 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700420 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000421 pipeline.source = self._getSourceDriver(
422 conf_pipeline.get('source', 'gerrit'))
James E. Blair64ed6f22013-07-10 14:07:23 -0700423 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
424 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800425 pipeline.failure_message = conf_pipeline.get('failure-message',
426 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100427 pipeline.merge_failure_message = conf_pipeline.get(
Jeremy Stanley1c2c3c22015-06-15 21:23:19 +0000428 'merge-failure-message', "Merge Failed.\n\nThis change or one "
429 "of its cross-repo dependencies was unable to be "
430 "automatically merged with the current state of its "
431 "repository. Please rebase the change and upload a new "
Joshua Heskethb7179772014-01-30 23:30:46 +1100432 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800433 pipeline.success_message = conf_pipeline.get('success-message',
434 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100435 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800436 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700437 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800438 pipeline.ignore_dependencies = conf_pipeline.get(
439 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000440
Joshua Heskethde958652015-11-10 19:19:50 +1100441 for conf_key, action in self._reporter_actions.items():
442 reporter_set = []
443 if conf_pipeline.get(conf_key):
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000444 for reporter_name, params \
Joshua Heskethde958652015-11-10 19:19:50 +1100445 in conf_pipeline.get(conf_key).items():
Joshua Hesketh352264b2015-08-11 23:42:08 +1000446 reporter = self._getReporterDriver(reporter_name,
447 params)
Joshua Hesketh385d11e2015-09-14 14:50:01 -0600448 reporter.setAction(conf_key)
Joshua Heskethde958652015-11-10 19:19:50 +1100449 reporter_set.append(reporter)
450 setattr(pipeline, action, reporter_set)
451
452 # If merge-failure actions aren't explicit, use the failure actions
453 if not pipeline.merge_failure_actions:
454 pipeline.merge_failure_actions = pipeline.failure_actions
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000455
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100456 pipeline.disable_at = conf_pipeline.get(
457 'disable-after-consecutive-failures', None)
458
Clark Boylan7603a372014-01-21 11:43:20 -0800459 pipeline.window = conf_pipeline.get('window', 20)
460 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
461 pipeline.window_increase_type = conf_pipeline.get(
462 'window-increase-type', 'linear')
463 pipeline.window_increase_factor = conf_pipeline.get(
464 'window-increase-factor', 1)
465 pipeline.window_decrease_type = conf_pipeline.get(
466 'window-decrease-type', 'exponential')
467 pipeline.window_decrease_factor = conf_pipeline.get(
468 'window-decrease-factor', 2)
469
James E. Blair4aea70c2012-07-26 14:23:24 -0700470 manager = globals()[conf_pipeline['manager']](self, pipeline)
471 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400472 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000473
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000474 if 'require' in conf_pipeline or 'reject' in conf_pipeline:
475 require = conf_pipeline.get('require', {})
476 reject = conf_pipeline.get('reject', {})
Clark Boylana9702ad2014-05-08 17:17:24 -0700477 f = ChangeishFilter(
478 open=require.get('open'),
479 current_patchset=require.get('current-patchset'),
480 statuses=toList(require.get('status')),
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000481 required_approvals=toList(require.get('approval')),
482 reject_approvals=toList(reject.get('approval'))
483 )
James E. Blair11041d22014-05-02 14:49:53 -0700484 manager.changeish_filters.append(f)
485
Joshua Hesketh352264b2015-08-11 23:42:08 +1000486 for trigger_name, trigger_config\
487 in conf_pipeline.get('trigger').items():
488 if trigger_name not in self.triggers.keys():
489 self.triggers[trigger_name] = \
490 self._getTriggerDriver(trigger_name, trigger_config)
491
492 for trigger_name, trigger in self.triggers.items():
493 if trigger_name in conf_pipeline['trigger']:
494 manager.event_filters += trigger.getEventFilters(
495 conf_pipeline['trigger'][trigger_name])
James E. Blairee743612012-05-29 14:49:32 -0700496
Antoine Musso80edd5a2013-02-13 15:37:53 +0100497 for project_template in data.get('project-templates', []):
498 # Make sure the template only contains valid pipelines
499 tpl = dict(
500 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400501 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100502 if pipe_name in project_template
503 )
James E. Blaireff88162013-07-01 12:44:14 -0400504 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100505
James E. Blair47958382013-01-10 17:26:02 -0800506 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400507 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700508 # Be careful to only set attributes explicitly present on
509 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800510 m = config_job.get('queue-name', None)
511 if m:
512 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700513 m = config_job.get('failure-message', None)
514 if m:
515 job.failure_message = m
516 m = config_job.get('success-message', None)
517 if m:
518 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800519 m = config_job.get('failure-pattern', None)
520 if m:
521 job.failure_pattern = m
522 m = config_job.get('success-pattern', None)
523 if m:
524 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700525 m = config_job.get('hold-following-changes', False)
526 if m:
527 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700528 m = config_job.get('voting', None)
529 if m is not None:
530 job.voting = m
James E. Blairaf17a972016-02-03 15:07:18 -0800531 m = config_job.get('mutex', None)
532 if m is not None:
533 job.mutex = m
James E. Blair456f2fb2016-02-09 09:29:33 -0800534 tags = toList(config_job.get('tags'))
535 if tags:
536 # Tags are merged via a union rather than a
537 # destructive copy because they are intended to
538 # accumulate onto any previously applied tags from
539 # metajobs.
540 job.tags = job.tags.union(set(tags))
James E. Blaire5a847f2012-07-10 15:29:14 -0700541 fname = config_job.get('parameter-function', None)
542 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400543 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700544 if not func:
545 raise Exception("Unable to find function %s" % fname)
546 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700547 branches = toList(config_job.get('branch'))
548 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700549 job._branches = branches
550 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800551 files = toList(config_job.get('files'))
552 if files:
553 job._files = files
554 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000555 skip_if_matcher = self._parseSkipIf(config_job)
556 if skip_if_matcher:
557 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100558 swift = toList(config_job.get('swift'))
559 if swift:
560 for s in swift:
561 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700562
563 def add_jobs(job_tree, config_jobs):
564 for job in config_jobs:
565 if isinstance(job, list):
566 for x in job:
567 add_jobs(job_tree, x)
568 if isinstance(job, dict):
569 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400570 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700571 add_jobs(parent_tree, children)
572 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400573 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700574
James E. Blair47958382013-01-10 17:26:02 -0800575 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700576 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800577 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100578
James E. Blair3e98c022013-12-16 15:25:38 -0800579 # This is reversed due to the prepend operation below, so
580 # the ultimate order is templates (in order) followed by
581 # statically defined jobs.
582 for requested_template in reversed(
583 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100584 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400585 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100586 requested_template.get('name'))
587 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800588 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100589 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800590 # Finally merge the expansion with whatever has been
591 # already defined for this project. Prepend our new
592 # jobs to existing ones (which may have been
593 # statically defined or defined by other templates).
594 for pipeline in layout.pipelines.values():
595 if pipeline.name in expanded:
596 config_project.update(
597 {pipeline.name: expanded[pipeline.name] +
598 config_project.get(pipeline.name, [])})
Antoine Musso80edd5a2013-02-13 15:37:53 +0100599
James E. Blaireff88162013-07-01 12:44:14 -0400600 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700601 mode = config_project.get('merge-mode', 'merge-resolve')
602 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400603 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700604 if pipeline.name in config_project:
605 job_tree = pipeline.addProject(project)
606 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700607 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700608
James E. Blairb0954652012-06-01 11:32:01 -0700609 # All jobs should be defined at this point, get rid of
610 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700611 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700612
James E. Blaireff88162013-07-01 12:44:14 -0400613 for pipeline in layout.pipelines.values():
614 pipeline.manager._postConfig(layout)
615
616 return layout
James E. Blairee743612012-05-29 14:49:32 -0700617
James E. Blairee743612012-05-29 14:49:32 -0700618 def setLauncher(self, launcher):
619 self.launcher = launcher
620
James E. Blair4076e2b2014-01-28 12:42:20 -0800621 def setMerger(self, merger):
622 self.merger = merger
623
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000624 def getProject(self, name, create_foreign=False):
James E. Blaircdccd972013-07-01 12:10:22 -0700625 self.layout_lock.acquire()
626 p = None
627 try:
628 p = self.layout.projects.get(name)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000629 if p is None and create_foreign:
630 self.log.info("Registering foreign project: %s" % name)
631 p = Project(name, foreign=True)
632 self.layout.projects[name] = p
James E. Blaircdccd972013-07-01 12:10:22 -0700633 finally:
634 self.layout_lock.release()
635 return p
636
James E. Blairee743612012-05-29 14:49:32 -0700637 def addEvent(self, event):
638 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800639 try:
640 if statsd:
641 statsd.incr('gerrit.event.%s' % event.type)
642 except:
643 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700644 self.trigger_event_queue.put(event)
645 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800646 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700647
James E. Blair11700c32012-07-05 17:50:05 -0700648 def onBuildStarted(self, build):
649 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800650 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800651 event = BuildStartedEvent(build)
652 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700653 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800654 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700655
James E. Blairf0358662015-07-20 15:19:12 -0700656 def onBuildCompleted(self, build, result):
657 self.log.debug("Adding complete event for build: %s result: %s" % (
658 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800659 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700660 # Note, as soon as the result is set, other threads may act
661 # upon this, even though the event hasn't been fully
662 # processed. Ensure that any other data from the event (eg,
663 # timing) is recorded before setting the result.
664 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800665 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700666 if statsd and build.pipeline:
667 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500668 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
Bruno Tavaresf564b282015-10-15 15:20:51 -0300669 statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500670 for label in build.node_labels:
671 # Jenkins includes the node name in its list of labels, so
672 # we filter it out here, since that is not statistically
673 # interesting.
674 if label == build.node_name:
675 continue
676 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800677 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
678 build.pipeline.name, label)
Timothy Chavezb2332082015-08-07 20:08:04 -0500679 statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700680 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
681 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800682 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
683 dt = int((build.end_time - build.start_time) * 1000)
684 statsd.timing(key, dt)
685 statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800686
687 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
688 build.pipeline.name, jobname)
689 dt = int((build.start_time - build.launch_time) * 1000)
690 statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800691 except:
692 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800693 event = BuildCompletedEvent(build)
694 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700695 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800696 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700697
James E. Blair4076e2b2014-01-28 12:42:20 -0800698 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
699 self.log.debug("Adding merge complete event for build set: %s" %
700 build_set)
701 event = MergeCompletedEvent(build_set, zuul_url,
702 merged, updated, commit)
703 self.result_event_queue.put(event)
704 self.wake_event.set()
705
James E. Blaire9d45c32012-05-31 09:56:45 -0700706 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700707 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800708 event = ReconfigureEvent(config)
709 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700710 self.wake_event.set()
711 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800712 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700713 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400714 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700715
James E. Blair36658cf2013-12-06 17:53:48 -0800716 def promote(self, pipeline_name, change_ids):
717 event = PromoteEvent(pipeline_name, change_ids)
718 self.management_event_queue.put(event)
719 self.wake_event.set()
720 self.log.debug("Waiting for promotion")
721 event.wait()
722 self.log.debug("Promotion complete")
723
James E. Blaird27a96d2014-07-10 13:25:13 -0700724 def enqueue(self, trigger_event):
725 event = EnqueueEvent(trigger_event)
726 self.management_event_queue.put(event)
727 self.wake_event.set()
728 self.log.debug("Waiting for enqueue")
729 event.wait()
730 self.log.debug("Enqueue complete")
731
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700732 def exit(self):
733 self.log.debug("Prepare to exit")
734 self._pause = True
735 self._exit = True
736 self.wake_event.set()
737 self.log.debug("Waiting for exit")
738
739 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700740 if self.config.has_option('zuul', 'state_dir'):
741 state_dir = os.path.expanduser(self.config.get('zuul',
742 'state_dir'))
743 else:
744 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700745 return os.path.join(state_dir, 'queue.pickle')
746
James E. Blairce8a2132016-05-19 15:21:52 -0700747 def _get_time_database_dir(self):
748 if self.config.has_option('zuul', 'state_dir'):
749 state_dir = os.path.expanduser(self.config.get('zuul',
750 'state_dir'))
751 else:
752 state_dir = '/var/lib/zuul'
753 d = os.path.join(state_dir, 'times')
754 if not os.path.exists(d):
755 os.mkdir(d)
756 return d
757
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700758 def _save_queue(self):
759 pickle_file = self._get_queue_pickle_file()
760 events = []
761 while not self.trigger_event_queue.empty():
762 events.append(self.trigger_event_queue.get())
763 self.log.debug("Queue length is %s" % len(events))
764 if events:
765 self.log.debug("Saving queue")
766 pickle.dump(events, open(pickle_file, 'wb'))
767
768 def _load_queue(self):
769 pickle_file = self._get_queue_pickle_file()
770 if os.path.exists(pickle_file):
771 self.log.debug("Loading queue")
772 events = pickle.load(open(pickle_file, 'rb'))
773 self.log.debug("Queue length is %s" % len(events))
774 for event in events:
775 self.trigger_event_queue.put(event)
776 else:
777 self.log.debug("No queue file found")
778
779 def _delete_queue(self):
780 pickle_file = self._get_queue_pickle_file()
781 if os.path.exists(pickle_file):
782 self.log.debug("Deleting saved queue")
783 os.unlink(pickle_file)
784
785 def resume(self):
786 try:
787 self._load_queue()
788 except:
789 self.log.exception("Unable to load queue")
790 try:
791 self._delete_queue()
792 except:
793 self.log.exception("Unable to delete saved queue")
794 self.log.debug("Resuming queue processing")
795 self.wake_event.set()
796
797 def _doPauseEvent(self):
798 if self._exit:
799 self.log.debug("Exiting")
800 self._save_queue()
801 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700802
James E. Blair468c8512013-12-06 13:27:19 -0800803 def _doReconfigureEvent(self, event):
804 # This is called in the scheduler loop after another thread submits
805 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700806 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800807 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700808 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700809 self.log.debug("Performing reconfiguration")
Joshua Hesketh352264b2015-08-11 23:42:08 +1000810 self._unloadDrivers()
James E. Blaircdccd972013-07-01 12:10:22 -0700811 layout = self._parseConfig(
Joshua Hesketh352264b2015-08-11 23:42:08 +1000812 self.config.get('zuul', 'layout_config'), self.connections)
James E. Blaircdccd972013-07-01 12:10:22 -0700813 for name, new_pipeline in layout.pipelines.items():
814 old_pipeline = self.layout.pipelines.get(name)
815 if not old_pipeline:
816 if self.layout.pipelines:
817 # Don't emit this warning on startup
818 self.log.warning("No old pipeline matching %s found "
819 "when reconfiguring" % name)
820 continue
James E. Blairdad52252014-02-07 16:59:17 -0800821 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700822 items_to_remove = []
James E. Blair400e8fd2015-07-30 17:44:45 -0700823 builds_to_cancel = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800824 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700825 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700826 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800827 if not item.item_ahead:
828 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700829 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700830 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700831 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800832 item.queue = None
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000833 project_name = item.change.project.name
834 item.change.project = layout.projects.get(project_name)
835 if not item.change.project:
836 self.log.debug("Project %s not defined, "
837 "re-instantiating as foreign" %
838 project_name)
839 project = Project(project_name, foreign=True)
840 layout.projects[project_name] = project
841 item.change.project = project
James E. Blairfe707d12015-08-05 15:18:15 -0700842 item_jobs = new_pipeline.getJobs(item)
James E. Blairdad52252014-02-07 16:59:17 -0800843 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800844 job = layout.jobs.get(build.job.name)
James E. Blairfe707d12015-08-05 15:18:15 -0700845 if job and job in item_jobs:
James E. Blair6b077942014-02-07 17:45:55 -0800846 build.job = job
847 else:
James E. Blair400e8fd2015-07-30 17:44:45 -0700848 item.removeBuild(build)
849 builds_to_cancel.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800850 if not new_pipeline.manager.reEnqueueItem(item,
851 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700852 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800853 for item in items_to_remove:
854 for build in item.current_build_set.getBuilds():
James E. Blair400e8fd2015-07-30 17:44:45 -0700855 builds_to_cancel.append(build)
856 for build in builds_to_cancel:
James E. Blair6b077942014-02-07 17:45:55 -0800857 self.log.warning(
858 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800859 try:
860 self.launcher.cancel(build)
861 except Exception:
862 self.log.exception(
863 "Exception while canceling build %s "
864 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700865 self.layout = layout
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100866 self.maintainConnectionCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700867 for trigger in self.triggers.values():
868 trigger.postConfig()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000869 for pipeline in self.layout.pipelines.values():
870 pipeline.source.postConfig()
Joshua Heskethde958652015-11-10 19:19:50 +1100871 for action in self._reporter_actions.values():
872 for reporter in pipeline.__getattribute__(action):
873 reporter.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700874 if statsd:
875 try:
876 for pipeline in self.layout.pipelines.values():
877 items = len(pipeline.getAllItems())
878 # stats.gauges.zuul.pipeline.NAME.current_changes
879 key = 'zuul.pipeline.%s' % pipeline.name
880 statsd.gauge(key + '.current_changes', items)
881 except Exception:
882 self.log.exception("Exception reporting initial "
883 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700884 finally:
885 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700886
James E. Blair36658cf2013-12-06 17:53:48 -0800887 def _doPromoteEvent(self, event):
888 pipeline = self.layout.pipelines[event.pipeline_name]
889 change_ids = [c.split(',') for c in event.change_ids]
890 items_to_enqueue = []
891 change_queue = None
892 for shared_queue in pipeline.queues:
893 if change_queue:
894 break
895 for item in shared_queue.queue:
896 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000897 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800898 change_queue = shared_queue
899 break
900 if not change_queue:
901 raise Exception("Unable to find shared change queue for %s" %
902 event.change_ids[0])
903 for number, patchset in change_ids:
904 found = False
905 for item in change_queue.queue:
906 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000907 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800908 found = True
909 items_to_enqueue.append(item)
910 break
911 if not found:
912 raise Exception("Unable to find %s,%s in queue %s" %
913 (number, patchset, change_queue))
914 for item in change_queue.queue[:]:
915 if item not in items_to_enqueue:
916 items_to_enqueue.append(item)
917 pipeline.manager.cancelJobs(item)
918 pipeline.manager.dequeueItem(item)
919 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500920 pipeline.manager.addChange(
921 item.change,
922 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700923 quiet=True,
924 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800925
James E. Blaird27a96d2014-07-10 13:25:13 -0700926 def _doEnqueueEvent(self, event):
927 project = self.layout.projects.get(event.project_name)
928 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700929 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700930 self.log.debug("Event %s for change %s was directly assigned "
931 "to pipeline %s" % (event, change, self))
932 self.log.info("Adding %s, %s to %s" %
933 (project, change, pipeline))
934 pipeline.manager.addChange(change, ignore_requirements=True)
935
James E. Blaire9d45c32012-05-31 09:56:45 -0700936 def _areAllBuildsComplete(self):
937 self.log.debug("Checking if all builds are complete")
938 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800939 if self.merger.areMergesOutstanding():
940 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400941 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800942 for item in pipeline.getAllItems():
943 for build in item.current_build_set.getBuilds():
944 if build.result is None:
945 self.log.debug("%s waiting on %s" %
946 (pipeline.manager, build))
947 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700948 if not waiting:
949 self.log.debug("All builds are complete")
950 return True
951 self.log.debug("All builds are not complete")
952 return False
953
James E. Blairee743612012-05-29 14:49:32 -0700954 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800955 if statsd:
956 self.log.debug("Statsd enabled")
957 else:
958 self.log.debug("Statsd disabled because python statsd "
959 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700960 while True:
961 self.log.debug("Run handler sleeping")
962 self.wake_event.wait()
963 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700964 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800965 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700966 return
James E. Blairee743612012-05-29 14:49:32 -0700967 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800968 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700969 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800970 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800971 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700972
James E. Blair263fba92013-02-27 13:07:19 -0800973 # Give result events priority -- they let us stop builds,
974 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800975 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700976 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800977
978 if not self._pause:
979 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800980 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700981
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700982 if self._pause and self._areAllBuildsComplete():
983 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700984
James E. Blaira84f0e42014-02-06 07:09:22 -0800985 for pipeline in self.layout.pipelines.values():
986 while pipeline.manager.processQueue():
987 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700988
James E. Blaira84f0e42014-02-06 07:09:22 -0800989 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700990 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800991 # There may still be more events to process
992 self.wake_event.set()
993 finally:
994 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700995
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100996 def maintainConnectionCache(self):
James E. Blair0e933c52013-07-11 10:18:52 -0700997 relevant = set()
998 for pipeline in self.layout.pipelines.values():
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100999 self.log.debug("Gather relevant cache items for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -07001000 for item in pipeline.getAllItems():
1001 relevant.add(item.change)
1002 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh4bd7da32016-02-17 20:58:47 +11001003 for connection in self.connections.values():
1004 connection.maintainCache(relevant)
1005 self.log.debug(
1006 "End maintain connection cache for: %s" % connection)
1007 self.log.debug("Connection cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -07001008
James E. Blairee743612012-05-29 14:49:32 -07001009 def process_event_queue(self):
1010 self.log.debug("Fetching trigger event")
1011 event = self.trigger_event_queue.get()
1012 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -08001013 try:
1014 project = self.layout.projects.get(event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -08001015
James E. Blaira84f0e42014-02-06 07:09:22 -08001016 for pipeline in self.layout.pipelines.values():
Joshua Hesketh352264b2015-08-11 23:42:08 +10001017 # Get the change even if the project is unknown to us for the
1018 # use of updating the cache if there is another change
1019 # depending on this foreign one.
1020 try:
1021 change = pipeline.source.getChange(event, project)
1022 except exceptions.ChangeNotFound as e:
1023 self.log.debug("Unable to get change %s from source %s. "
1024 "(most likely looking for a change from "
1025 "another connection trigger)",
1026 e.change, pipeline.source)
1027 continue
1028 if not project or project.foreign:
1029 self.log.debug("Project %s not found" % event.project_name)
1030 continue
James E. Blaira84f0e42014-02-06 07:09:22 -08001031 if event.type == 'patchset-created':
1032 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +01001033 elif event.type == 'change-abandoned':
1034 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -08001035 if pipeline.manager.eventMatches(event, change):
1036 self.log.info("Adding %s, %s to %s" %
1037 (project, change, pipeline))
1038 pipeline.manager.addChange(change)
1039 finally:
James E. Blairff791972013-01-09 11:45:43 -08001040 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -07001041
James E. Blair468c8512013-12-06 13:27:19 -08001042 def process_management_queue(self):
1043 self.log.debug("Fetching management event")
1044 event = self.management_event_queue.get()
1045 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -08001046 try:
1047 if isinstance(event, ReconfigureEvent):
1048 self._doReconfigureEvent(event)
1049 elif isinstance(event, PromoteEvent):
1050 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -07001051 elif isinstance(event, EnqueueEvent):
1052 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -08001053 else:
1054 self.log.error("Unable to handle event %s" % event)
1055 event.done()
1056 except Exception as e:
1057 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -08001058 self.management_event_queue.task_done()
1059
James E. Blairee743612012-05-29 14:49:32 -07001060 def process_result_queue(self):
1061 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -08001062 event = self.result_event_queue.get()
1063 self.log.debug("Processing result event %s" % event)
1064 try:
1065 if isinstance(event, BuildStartedEvent):
1066 self._doBuildStartedEvent(event)
1067 elif isinstance(event, BuildCompletedEvent):
1068 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -08001069 elif isinstance(event, MergeCompletedEvent):
1070 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -08001071 else:
1072 self.log.error("Unable to handle event %s" % event)
1073 finally:
1074 self.result_event_queue.task_done()
1075
1076 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -08001077 build = event.build
1078 if build.build_set is not build.build_set.item.current_build_set:
1079 self.log.warning("Build %s is not in the current build set" %
1080 (build,))
1081 return
1082 pipeline = build.build_set.item.pipeline
1083 if not pipeline:
1084 self.log.warning("Build %s is not associated with a pipeline" %
1085 (build,))
1086 return
James E. Blairce8a2132016-05-19 15:21:52 -07001087 try:
1088 build.estimated_time = float(self.time_database.getEstimatedTime(
1089 build.job.name))
1090 except Exception:
1091 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -08001092 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -08001093
1094 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -08001095 build = event.build
1096 if build.build_set is not build.build_set.item.current_build_set:
1097 self.log.warning("Build %s is not in the current build set" %
1098 (build,))
1099 return
1100 pipeline = build.build_set.item.pipeline
1101 if not pipeline:
1102 self.log.warning("Build %s is not associated with a pipeline" %
1103 (build,))
1104 return
James E. Blairce8a2132016-05-19 15:21:52 -07001105 if build.end_time and build.start_time and build.result:
1106 duration = build.end_time - build.start_time
1107 try:
1108 self.time_database.update(build.job.name, duration, build.result)
1109 except Exception:
1110 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -08001111 pipeline.manager.onBuildCompleted(event.build)
1112
1113 def _doMergeCompletedEvent(self, event):
1114 build_set = event.build_set
1115 if build_set is not build_set.item.current_build_set:
1116 self.log.warning("Build set %s is not current" % (build_set,))
1117 return
1118 pipeline = build_set.item.pipeline
1119 if not pipeline:
1120 self.log.warning("Build set %s is not associated with a pipeline" %
1121 (build_set,))
1122 return
1123 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -07001124
James E. Blair8dbd56a2012-12-22 10:55:10 -08001125 def formatStatusJSON(self):
James E. Blairb7273ef2016-04-19 08:58:51 -07001126 if self.config.has_option('zuul', 'url_pattern'):
1127 url_pattern = self.config.get('zuul', 'url_pattern')
1128 else:
1129 url_pattern = None
1130
James E. Blair8dbd56a2012-12-22 10:55:10 -08001131 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001132
1133 data['zuul_version'] = self.zuul_version
1134
James E. Blair8dbd56a2012-12-22 10:55:10 -08001135 if self._pause:
1136 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -08001137 if self._exit:
1138 ret += 'exit'
1139 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
1140 ret += '</p>'
1141 data['message'] = ret
1142
James E. Blairfb682cc2013-02-26 15:23:27 -08001143 data['trigger_event_queue'] = {}
1144 data['trigger_event_queue']['length'] = \
1145 self.trigger_event_queue.qsize()
1146 data['result_event_queue'] = {}
1147 data['result_event_queue']['length'] = \
1148 self.result_event_queue.qsize()
1149
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +04001150 if self.last_reconfigured:
1151 data['last_reconfigured'] = self.last_reconfigured * 1000
1152
James E. Blair8dbd56a2012-12-22 10:55:10 -08001153 pipelines = []
1154 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -07001155 for pipeline in self.layout.pipelines.values():
James E. Blairb7273ef2016-04-19 08:58:51 -07001156 pipelines.append(pipeline.formatStatusJSON(url_pattern))
James E. Blair8dbd56a2012-12-22 10:55:10 -08001157 return json.dumps(data)
1158
James E. Blair1e8dd892012-05-30 09:15:05 -07001159
James E. Blair4aea70c2012-07-26 14:23:24 -07001160class BasePipelineManager(object):
1161 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -07001162
James E. Blair4aea70c2012-07-26 14:23:24 -07001163 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -07001164 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -07001165 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -07001166 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -07001167 self.changeish_filters = []
James E. Blairee743612012-05-29 14:49:32 -07001168
1169 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -07001170 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -07001171
James E. Blaireff88162013-07-01 12:44:14 -04001172 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -07001173 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -07001174 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -07001175 self.log.info(" Requirements:")
1176 for f in self.changeish_filters:
1177 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -07001178 self.log.info(" Events:")
1179 for e in self.event_filters:
1180 self.log.info(" %s" % e)
1181 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001182
James E. Blairee743612012-05-29 14:49:32 -07001183 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001184 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001185 if tree.job:
1186 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001187 for b in tree.job._branches:
1188 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001189 for f in tree.job._files:
1190 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001191 if tree.job.skip_if_matcher:
1192 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001193 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001194 efilters = ' ' + efilters
James E. Blairaf17a972016-02-03 15:07:18 -08001195 tags = []
James E. Blair222d4982012-07-16 09:31:19 -07001196 if tree.job.hold_following_changes:
James E. Blairaf17a972016-02-03 15:07:18 -08001197 tags.append('[hold]')
James E. Blair4ec821f2012-08-23 15:28:28 -07001198 if not tree.job.voting:
James E. Blairaf17a972016-02-03 15:07:18 -08001199 tags.append('[nonvoting]')
1200 if tree.job.mutex:
1201 tags.append('[mutex: %s]' % tree.job.mutex)
1202 tags = ' '.join(tags)
1203 self.log.info("%s%s%s %s" % (istr, repr(tree.job),
1204 efilters, tags))
James E. Blairee743612012-05-29 14:49:32 -07001205 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001206 log_jobs(x, indent + 2)
1207
James E. Blaireff88162013-07-01 12:44:14 -04001208 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001209 tree = self.pipeline.getJobTree(p)
1210 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001211 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001212 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001213 self.log.info(" On start:")
1214 self.log.info(" %s" % self.pipeline.start_actions)
1215 self.log.info(" On success:")
1216 self.log.info(" %s" % self.pipeline.success_actions)
1217 self.log.info(" On failure:")
1218 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001219 self.log.info(" On merge-failure:")
1220 self.log.info(" %s" % self.pipeline.merge_failure_actions)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001221 self.log.info(" When disabled:")
1222 self.log.info(" %s" % self.pipeline.disabled_actions)
James E. Blairee743612012-05-29 14:49:32 -07001223
James E. Blaire421a232012-07-25 16:59:21 -07001224 def getSubmitAllowNeeds(self):
1225 # Get a list of code review labels that are allowed to be
1226 # "needed" in the submit records for a change, with respect
1227 # to this queue. In other words, the list of review labels
1228 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001229 allow_needs = set()
1230 for action_reporter in self.pipeline.success_actions:
1231 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1232 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001233
James E. Blairc053d022014-01-22 14:57:33 -08001234 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001235 if event.forced_pipeline:
1236 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001237 self.log.debug("Event %s for change %s was directly assigned "
1238 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001239 return True
1240 else:
1241 return False
James E. Blairee743612012-05-29 14:49:32 -07001242 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001243 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001244 self.log.debug("Event %s for change %s matched %s "
1245 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001246 return True
1247 return False
1248
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001249 def isChangeAlreadyInPipeline(self, change):
1250 # Checks live items in the pipeline
1251 for item in self.pipeline.getAllItems():
1252 if item.live and change.equals(item.change):
1253 return True
1254 return False
1255
1256 def isChangeAlreadyInQueue(self, change, change_queue):
1257 # Checks any item in the specified change queue
1258 for item in change_queue.queue:
1259 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001260 return True
1261 return False
1262
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001263 def reportStart(self, item):
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001264 if not self.pipeline._disabled:
1265 try:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001266 self.log.info("Reporting start, action %s item %s" %
1267 (self.pipeline.start_actions, item))
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001268 ret = self.sendReport(self.pipeline.start_actions,
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001269 self.pipeline.source, item)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001270 if ret:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001271 self.log.error("Reporting item start %s received: %s" %
1272 (item, ret))
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001273 except:
1274 self.log.exception("Exception while reporting start:")
James E. Blaire0487072012-08-29 17:38:31 -07001275
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001276 def sendReport(self, action_reporters, source, item,
1277 message=None):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001278 """Sends the built message off to configured reporters.
1279
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001280 Takes the action_reporters, item, message and extra options and
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001281 sends them to the pluggable reporters.
1282 """
1283 report_errors = []
1284 if len(action_reporters) > 0:
Joshua Heskethde958652015-11-10 19:19:50 +11001285 for reporter in action_reporters:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001286 ret = reporter.report(source, self.pipeline, item)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001287 if ret:
1288 report_errors.append(ret)
1289 if len(report_errors) == 0:
1290 return
1291 return report_errors
1292
James E. Blaire0487072012-08-29 17:38:31 -07001293 def isChangeReadyToBeEnqueued(self, change):
1294 return True
1295
James E. Blair5ee24252014-12-30 10:12:29 -08001296 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1297 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001298 return True
1299
James E. Blair5ee24252014-12-30 10:12:29 -08001300 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1301 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001302 return True
1303
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001304 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001305 return True
1306
James E. Blair6965a4b2014-12-16 17:19:04 -08001307 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001308 return None
1309
James E. Blairfee8d652013-06-07 08:57:52 -07001310 def getDependentItems(self, item):
1311 orig_item = item
1312 items = []
1313 while item.item_ahead:
1314 items.append(item.item_ahead)
1315 item = item.item_ahead
1316 self.log.info("Change %s depends on changes %s" %
1317 (orig_item.change,
1318 [x.change for x in items]))
1319 return items
1320
James E. Blair972e3c72013-08-29 12:04:55 -07001321 def getItemForChange(self, change):
1322 for item in self.pipeline.getAllItems():
1323 if item.change.equals(change):
1324 return item
1325 return None
1326
James E. Blair2fa50962013-01-30 21:50:41 -08001327 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001328 for item in self.pipeline.getAllItems():
1329 if not item.live:
1330 continue
1331 if change.isUpdateOf(item.change):
1332 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001333 return None
1334
1335 def removeOldVersionsOfChange(self, change):
1336 if not self.pipeline.dequeue_on_new_patchset:
1337 return
James E. Blairba437362015-02-07 11:41:52 -08001338 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1339 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001340 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001341 (change, old_item.change, old_item))
1342 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001343
Antoine Mussobd86a312014-01-08 14:51:33 +01001344 def removeAbandonedChange(self, change):
1345 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001346 for item in self.pipeline.getAllItems():
1347 if not item.live:
1348 continue
1349 if item.change.equals(change):
1350 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001351
James E. Blairbfb8e042014-12-30 17:01:44 -08001352 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001353 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1354 if change_queue:
1355 self.log.debug("Re-enqueing change %s in queue %s" %
1356 (item.change, change_queue))
1357 change_queue.enqueueItem(item)
James E. Blair6bc782d2015-07-17 16:20:21 -07001358
1359 # Re-set build results in case any new jobs have been
1360 # added to the tree.
1361 for build in item.current_build_set.getBuilds():
1362 if build.result:
1363 self.pipeline.setResult(item, build)
1364 # Similarly, reset the item state.
1365 if item.current_build_set.unable_to_merge:
1366 self.pipeline.setUnableToMerge(item)
1367 if item.dequeued_needing_change:
1368 self.pipeline.setDequeuedNeedingChange(item)
1369
James E. Blair0577cd62015-02-07 11:42:12 -08001370 self.reportStats(item)
1371 return True
1372 else:
1373 self.log.error("Unable to find change queue for project %s" %
1374 item.change.project)
1375 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001376
James E. Blairf9ab8842014-07-10 13:12:07 -07001377 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001378 ignore_requirements=False, live=True,
1379 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001380 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001381
1382 # If we are adding a live change, check if it's a live item
1383 # anywhere in the pipeline. Otherwise, we will perform the
1384 # duplicate check below on the specific change_queue.
1385 if live and self.isChangeAlreadyInPipeline(change):
1386 self.log.debug("Change %s is already in pipeline, "
1387 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001388 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001389
James E. Blaire0487072012-08-29 17:38:31 -07001390 if not self.isChangeReadyToBeEnqueued(change):
1391 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1392 change)
1393 return False
1394
James E. Blairf9ab8842014-07-10 13:12:07 -07001395 if not ignore_requirements:
1396 for f in self.changeish_filters:
1397 if not f.matches(change):
1398 self.log.debug("Change %s does not match pipeline "
1399 "requirement %s" % (change, f))
1400 return False
James E. Blair11041d22014-05-02 14:49:53 -07001401
James E. Blair0577cd62015-02-07 11:42:12 -08001402 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001403 if not change_queue:
1404 self.log.debug("Unable to find change queue for "
1405 "change %s in project %s" %
1406 (change, change.project))
1407 return False
1408
James E. Blair0577cd62015-02-07 11:42:12 -08001409 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1410 change_queue):
1411 self.log.debug("Failed to enqueue changes "
1412 "ahead of %s" % change)
1413 return False
James E. Blaire0487072012-08-29 17:38:31 -07001414
James E. Blair0577cd62015-02-07 11:42:12 -08001415 if self.isChangeAlreadyInQueue(change, change_queue):
1416 self.log.debug("Change %s is already in queue, "
1417 "ignoring" % change)
1418 return True
1419
1420 self.log.debug("Adding change %s to queue %s" %
1421 (change, change_queue))
James E. Blair0577cd62015-02-07 11:42:12 -08001422 item = change_queue.enqueueChange(change)
1423 if enqueue_time:
1424 item.enqueue_time = enqueue_time
1425 item.live = live
1426 self.reportStats(item)
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001427 if not quiet:
1428 if len(self.pipeline.start_actions) > 0:
1429 self.reportStart(item)
James E. Blair0577cd62015-02-07 11:42:12 -08001430 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1431 change_queue)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001432 for trigger in self.sched.triggers.values():
1433 trigger.onChangeEnqueued(item.change, self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001434 return True
1435
James E. Blair972e3c72013-08-29 12:04:55 -07001436 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001437 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001438 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001439
James E. Blairba437362015-02-07 11:41:52 -08001440 def removeItem(self, item):
1441 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001442 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001443 self.log.debug("Canceling builds behind change: %s "
1444 "because it is being removed." % item.change)
1445 self.cancelJobs(item)
1446 self.dequeueItem(item)
1447 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001448
James E. Blairac2c3242014-01-24 13:38:51 -08001449 def _makeMergerItem(self, item):
1450 # Create a dictionary with all info about the item needed by
1451 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001452 number = None
1453 patchset = None
1454 oldrev = None
1455 newrev = None
1456 if hasattr(item.change, 'number'):
1457 number = item.change.number
1458 patchset = item.change.patchset
1459 elif hasattr(item.change, 'newrev'):
1460 oldrev = item.change.oldrev
1461 newrev = item.change.newrev
Joshua Hesketh352264b2015-08-11 23:42:08 +10001462 connection_name = self.pipeline.source.connection.connection_name
James E. Blairac2c3242014-01-24 13:38:51 -08001463 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001464 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001465 item.change.project),
Joshua Hesketh352264b2015-08-11 23:42:08 +10001466 connection_name=connection_name,
James E. Blairac2c3242014-01-24 13:38:51 -08001467 merge_mode=item.change.project.merge_mode,
1468 refspec=item.change.refspec,
1469 branch=item.change.branch,
1470 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001471 number=number,
1472 patchset=patchset,
1473 oldrev=oldrev,
1474 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001475 )
1476
James E. Blairfee8d652013-06-07 08:57:52 -07001477 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001478 # Returns True if the ref is ready, false otherwise
1479 build_set = item.current_build_set
1480 if build_set.merge_state == build_set.COMPLETE:
1481 return True
1482 if build_set.merge_state == build_set.PENDING:
1483 return False
James E. Blair4076e2b2014-01-28 12:42:20 -08001484 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001485 if hasattr(item.change, 'refspec') and not ref:
1486 self.log.debug("Preparing ref for: %s" % item.change)
1487 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001488 dependent_items = self.getDependentItems(item)
1489 dependent_items.reverse()
1490 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001491 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001492 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001493 item.current_build_set,
1494 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001495 else:
1496 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001497 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001498 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001499 url, build_set,
1500 self.pipeline.precedence)
Antoine Mussofe20ccf2014-10-16 15:32:14 +02001501 # merge:merge has been emitted properly:
1502 build_set.merge_state = build_set.PENDING
James E. Blairfee8d652013-06-07 08:57:52 -07001503 return False
1504
1505 def _launchJobs(self, item, jobs):
1506 self.log.debug("Launching jobs for change %s" % item.change)
1507 dependent_items = self.getDependentItems(item)
1508 for job in jobs:
1509 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001510 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001511 build = self.sched.launcher.launch(job, item,
1512 self.pipeline,
1513 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001514 self.log.debug("Adding build %s of job %s to item %s" %
1515 (build, job, item))
1516 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001517 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001518 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001519 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001520
James E. Blairfee8d652013-06-07 08:57:52 -07001521 def launchJobs(self, item):
James E. Blairaf17a972016-02-03 15:07:18 -08001522 jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
James E. Blairdaabed22012-08-15 15:38:57 -07001523 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001524 self._launchJobs(item, jobs)
1525
1526 def cancelJobs(self, item, prime=True):
1527 self.log.debug("Cancel jobs for change %s" % item.change)
1528 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001529 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001530 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001531 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001532 for build in old_build_set.getBuilds():
1533 try:
1534 self.sched.launcher.cancel(build)
1535 except:
1536 self.log.exception("Exception while canceling build %s "
1537 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001538 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001539 canceled = True
James E. Blair82a42d12015-07-20 13:50:09 -07001540 self.updateBuildDescriptions(old_build_set)
James E. Blair972e3c72013-08-29 12:04:55 -07001541 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001542 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001543 (item_behind.change, item.change))
1544 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001545 canceled = True
1546 return canceled
1547
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001548 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001549 changed = False
1550 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001551 if item_ahead and (not item_ahead.live):
1552 item_ahead = None
1553 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001554 failing_reasons = [] # Reasons this item is failing
1555
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001556 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001557 # It's not okay to enqueue this change, we should remove it.
1558 self.log.info("Dequeuing change %s because "
1559 "it can no longer merge" % item.change)
1560 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001561 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001562 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001563 if item.live:
1564 try:
1565 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001566 except exceptions.MergeFailure:
James E. Blairf8b42fb2015-02-18 09:23:36 -08001567 pass
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001568 return (True, nnfi)
James E. Blair6965a4b2014-12-16 17:19:04 -08001569 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001570 actionable = change_queue.isActionable(item)
1571 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001572 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001573 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001574 failing_reasons.append('a needed change is failing')
1575 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001576 else:
James E. Blairfef71632013-09-23 11:15:47 -07001577 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001578 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001579 item_ahead_merged = True
1580 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001581 # Our current base is different than what we expected,
1582 # and it's not because our current base merged. Something
1583 # ahead must have failed.
1584 self.log.info("Resetting builds for change %s because the "
1585 "item ahead, %s, is not the nearest non-failing "
1586 "item, %s" % (item.change, item_ahead, nnfi))
1587 change_queue.moveItem(item, nnfi)
1588 changed = True
1589 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001590 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001591 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001592 if item.current_build_set.unable_to_merge:
1593 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001594 ready = False
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001595 if actionable and ready and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001596 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001597 if self.pipeline.didAnyJobFail(item):
1598 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001599 if (not item.live) and (not item.items_behind):
1600 failing_reasons.append("is a non-live item with no items behind")
1601 self.dequeueItem(item)
1602 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001603 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1604 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001605 try:
1606 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001607 except exceptions.MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001608 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001609 for item_behind in item.items_behind:
1610 self.log.info("Resetting builds for change %s because the "
1611 "item ahead, %s, failed to merge" %
1612 (item_behind.change, item))
1613 self.cancelJobs(item_behind)
1614 self.dequeueItem(item)
1615 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001616 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001617 nnfi = item
1618 item.current_build_set.failing_reasons = failing_reasons
1619 if failing_reasons:
1620 self.log.debug("%s is a failing item because %s" %
1621 (item, failing_reasons))
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001622 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001623
1624 def processQueue(self):
1625 # Do whatever needs to be done for each change in the queue
1626 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1627 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001628 for queue in self.pipeline.queues:
1629 queue_changed = False
1630 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001631 for item in queue.queue[:]:
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001632 item_changed, nnfi = self._processOneItem(
1633 item, nnfi)
James E. Blair972e3c72013-08-29 12:04:55 -07001634 if item_changed:
1635 queue_changed = True
1636 self.reportStats(item)
1637 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001638 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001639 status = ''
1640 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001641 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001642 if status:
1643 self.log.debug("Queue %s status is now:\n %s" %
1644 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001645 self.log.debug("Finished queue processor: %s (changed: %s)" %
1646 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001647 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001648
James E. Blair11700c32012-07-05 17:50:05 -07001649 def updateBuildDescriptions(self, build_set):
1650 for build in build_set.getBuilds():
Richard Hedlind13cb40c2015-12-18 13:45:58 -07001651 try:
1652 desc = self.formatDescription(build)
1653 self.sched.launcher.setBuildDescription(build, desc)
1654 except:
1655 # Log the failure and let loop continue
1656 self.log.error("Failed to update description for build %s" %
1657 (build))
James E. Blair11700c32012-07-05 17:50:05 -07001658
1659 if build_set.previous_build_set:
1660 for build in build_set.previous_build_set.getBuilds():
Richard Hedlind13cb40c2015-12-18 13:45:58 -07001661 try:
1662 desc = self.formatDescription(build)
1663 self.sched.launcher.setBuildDescription(build, desc)
1664 except:
1665 # Log the failure and let loop continue
1666 self.log.error("Failed to update description for "
1667 "build %s in previous build set" % (build))
James E. Blair11700c32012-07-05 17:50:05 -07001668
1669 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001670 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001671 return True
1672
James E. Blairee743612012-05-29 14:49:32 -07001673 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001674 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001675 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001676
James E. Blair6b077942014-02-07 17:45:55 -08001677 self.pipeline.setResult(item, build)
James E. Blairaf17a972016-02-03 15:07:18 -08001678 self.sched.mutex.release(item, build.job)
James E. Blair6b077942014-02-07 17:45:55 -08001679 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001680 (item, item.formatStatus()))
James E. Blairee743612012-05-29 14:49:32 -07001681 return True
1682
James E. Blair4076e2b2014-01-28 12:42:20 -08001683 def onMergeCompleted(self, event):
1684 build_set = event.build_set
1685 item = build_set.item
1686 build_set.merge_state = build_set.COMPLETE
1687 build_set.zuul_url = event.zuul_url
1688 if event.merged:
1689 build_set.commit = event.commit
1690 elif event.updated:
Evgeny Antyshevb078bb42016-01-25 14:44:21 +00001691 if not isinstance(item.change, NullChange):
Yolanda Robla276996c2015-11-10 10:41:18 +01001692 build_set.commit = item.change.newrev
Evgeny Antyshevb078bb42016-01-25 14:44:21 +00001693 if not build_set.commit and not isinstance(item.change, NullChange):
James E. Blair4076e2b2014-01-28 12:42:20 -08001694 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001695 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001696
James E. Blairfee8d652013-06-07 08:57:52 -07001697 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001698 if not item.reported:
1699 # _reportItem() returns True if it failed to report.
1700 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001701 if self.changes_merge:
1702 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001703 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001704 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001705 merged = self.pipeline.source.isMerged(item.change,
1706 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001707 self.log.info("Reported change %s status: all-succeeded: %s, "
1708 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001709 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001710 if not (succeeded and merged):
1711 self.log.debug("Reported change %s failed tests or failed "
1712 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001713 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001714 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001715 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001716 raise exceptions.MergeFailure(
1717 "Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001718 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001719 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001720 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001721 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001722
1723 for trigger in self.sched.triggers.values():
1724 trigger.onChangeMerged(item.change, self.pipeline.source)
James E. Blaire0487072012-08-29 17:38:31 -07001725
James E. Blairfee8d652013-06-07 08:57:52 -07001726 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001727 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001728 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001729 if not self.pipeline.getJobs(item):
1730 # We don't send empty reports with +1,
1731 # and the same for -1's (merge failures or transient errors)
1732 # as they cannot be followed by +1's
1733 self.log.debug("No jobs for change %s" % item.change)
1734 actions = []
1735 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001736 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001737 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001738 item.setReportedResult('SUCCESS')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001739 self.pipeline._consecutive_failures = 0
Joshua Heskethb7179772014-01-30 23:30:46 +11001740 elif not self.pipeline.didMergerSucceed(item):
1741 actions = self.pipeline.merge_failure_actions
1742 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001743 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001744 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001745 item.setReportedResult('FAILURE')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001746 self.pipeline._consecutive_failures += 1
1747 if self.pipeline._disabled:
1748 actions = self.pipeline.disabled_actions
1749 # Check here if we should disable so that we only use the disabled
1750 # reporters /after/ the last disable_at failure is still reported as
1751 # normal.
1752 if (self.pipeline.disable_at and not self.pipeline._disabled and
1753 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
1754 self.pipeline._disabled = True
James E. Blaire5910202013-12-27 09:50:31 -08001755 if actions:
James E. Blaire5910202013-12-27 09:50:31 -08001756 try:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001757 self.log.info("Reporting item %s, actions: %s" %
1758 (item, actions))
1759 ret = self.sendReport(actions, self.pipeline.source, item)
James E. Blaire5910202013-12-27 09:50:31 -08001760 if ret:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001761 self.log.error("Reporting item %s received: %s" %
1762 (item, ret))
James E. Blaire5910202013-12-27 09:50:31 -08001763 except:
1764 self.log.exception("Exception while reporting:")
1765 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001766 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001767 return ret
1768
James E. Blair8b0d4c42012-08-23 16:03:05 -07001769 def formatDescription(self, build):
1770 concurrent_changes = ''
1771 concurrent_builds = ''
1772 other_builds = ''
1773
1774 for change in build.build_set.other_changes:
1775 concurrent_changes += '<li><a href="{change.url}">\
1776 {change.number},{change.patchset}</a></li>'.format(
1777 change=change)
1778
James E. Blairfee8d652013-06-07 08:57:52 -07001779 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001780
1781 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001782 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001783 concurrent_builds += """\
1784<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001785 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001786 {build.job.name} #{build.number}</a>: {build.result}
1787</li>
1788""".format(build=build)
1789 else:
1790 concurrent_builds += """\
1791<li>
1792 {build.job.name}: {build.result}
1793</li>""".format(build=build)
1794
1795 if build.build_set.previous_build_set:
1796 other_build = build.build_set.previous_build_set.getBuild(
1797 build.job.name)
1798 if other_build:
1799 other_builds += """\
1800<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001801 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001802 {build.job.name} #{build.number}</a>
1803</li>
1804""".format(build=other_build)
1805
1806 if build.build_set.next_build_set:
1807 other_build = build.build_set.next_build_set.getBuild(
1808 build.job.name)
1809 if other_build:
1810 other_builds += """\
1811<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001812 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001813 {build.job.name} #{build.number}</a>
1814</li>
1815""".format(build=other_build)
1816
1817 result = build.build_set.result
1818
1819 if hasattr(change, 'number'):
1820 ret = """\
1821<p>
1822 Triggered by change:
1823 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1824 Branch: <b>{change.branch}</b><br/>
1825 Pipeline: <b>{self.pipeline.name}</b>
1826</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001827 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001828 ret = """\
1829<p>
1830 Triggered by reference:
1831 {change.ref}</a><br/>
1832 Old revision: <b>{change.oldrev}</b><br/>
1833 New revision: <b>{change.newrev}</b><br/>
1834 Pipeline: <b>{self.pipeline.name}</b>
1835</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001836 else:
1837 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001838
1839 if concurrent_changes:
1840 ret += """\
1841<p>
1842 Other changes tested concurrently with this change:
1843 <ul>{concurrent_changes}</ul>
1844</p>
1845"""
1846 if concurrent_builds:
1847 ret += """\
1848<p>
1849 All builds for this change set:
1850 <ul>{concurrent_builds}</ul>
1851</p>
1852"""
1853
1854 if other_builds:
1855 ret += """\
1856<p>
1857 Other build sets for this change:
1858 <ul>{other_builds}</ul>
1859</p>
1860"""
1861 if result:
1862 ret += """\
1863<p>
1864 Reported result: <b>{result}</b>
1865</p>
1866"""
1867
1868 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001869 return ret
1870
James E. Blairfee8d652013-06-07 08:57:52 -07001871 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001872 if not statsd:
1873 return
1874 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001875 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001876 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001877 if item.dequeue_time:
1878 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001879 else:
1880 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001881 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001882
1883 # stats.timers.zuul.pipeline.NAME.resident_time
1884 # stats_counts.zuul.pipeline.NAME.total_changes
1885 # stats.gauges.zuul.pipeline.NAME.current_changes
1886 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001887 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001888 if dt:
1889 statsd.timing(key + '.resident_time', dt)
1890 statsd.incr(key + '.total_changes')
1891
1892 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1893 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001894 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001895 key += '.%s' % project_name
1896 if dt:
1897 statsd.timing(key + '.resident_time', dt)
1898 statsd.incr(key + '.total_changes')
1899 except:
1900 self.log.exception("Exception reporting pipeline stats")
1901
James E. Blair1e8dd892012-05-30 09:15:05 -07001902
James E. Blair0577cd62015-02-07 11:42:12 -08001903class DynamicChangeQueueContextManager(object):
1904 def __init__(self, change_queue):
1905 self.change_queue = change_queue
1906
1907 def __enter__(self):
1908 return self.change_queue
1909
1910 def __exit__(self, etype, value, tb):
1911 if self.change_queue and not self.change_queue.queue:
1912 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1913
1914
James E. Blair4aea70c2012-07-26 14:23:24 -07001915class IndependentPipelineManager(BasePipelineManager):
1916 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001917 changes_merge = False
1918
James E. Blaireff88162013-07-01 12:44:14 -04001919 def _postConfig(self, layout):
1920 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001921
James E. Blair0577cd62015-02-07 11:42:12 -08001922 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001923 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001924 if existing:
1925 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001926 if change.project not in self.pipeline.getProjects():
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001927 self.pipeline.addProject(change.project)
James E. Blairbfb8e042014-12-30 17:01:44 -08001928 change_queue = ChangeQueue(self.pipeline)
1929 change_queue.addProject(change.project)
1930 self.pipeline.addQueue(change_queue)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001931 self.log.debug("Dynamically created queue %s", change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001932 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001933
1934 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1935 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001936 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001937 if ret in [True, False]:
1938 return ret
1939 self.log.debug(" Changes %s must be merged ahead of %s" %
1940 (ret, change))
1941 for needed_change in ret:
1942 # This differs from the dependent pipeline by enqueuing
1943 # changes ahead as "not live", that is, not intended to
1944 # have jobs run. Also, pipeline requirements are always
1945 # ignored (which is safe because the changes are not
1946 # live).
1947 r = self.addChange(needed_change, quiet=True,
1948 ignore_requirements=True,
1949 live=False, change_queue=change_queue)
1950 if not r:
1951 return False
1952 return True
1953
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001954 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001955 if self.pipeline.ignore_dependencies:
1956 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001957 self.log.debug("Checking for changes needed by %s:" % change)
1958 # Return true if okay to proceed enqueing this change,
1959 # false if the change should not be enqueued.
1960 if not hasattr(change, 'needs_changes'):
1961 self.log.debug(" Changeish does not support dependencies")
1962 return True
1963 if not change.needs_changes:
1964 self.log.debug(" No changes needed")
1965 return True
1966 changes_needed = []
1967 for needed_change in change.needs_changes:
1968 self.log.debug(" Change %s needs change %s:" % (
1969 change, needed_change))
1970 if needed_change.is_merged:
1971 self.log.debug(" Needed change is merged")
1972 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001973 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001974 self.log.debug(" Needed change is already ahead in the queue")
1975 continue
1976 self.log.debug(" Change %s is needed" % needed_change)
1977 if needed_change not in changes_needed:
1978 changes_needed.append(needed_change)
1979 continue
1980 # This differs from the dependent pipeline check in not
1981 # verifying that the dependent change is mergable.
1982 if changes_needed:
1983 return changes_needed
1984 return True
1985
1986 def dequeueItem(self, item):
1987 super(IndependentPipelineManager, self).dequeueItem(item)
1988 # An independent pipeline manager dynamically removes empty
1989 # queues
1990 if not item.queue.queue:
1991 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001992
James E. Blair1e8dd892012-05-30 09:15:05 -07001993
James E. Blair0577cd62015-02-07 11:42:12 -08001994class StaticChangeQueueContextManager(object):
1995 def __init__(self, change_queue):
1996 self.change_queue = change_queue
1997
1998 def __enter__(self):
1999 return self.change_queue
2000
2001 def __exit__(self, etype, value, tb):
2002 pass
2003
2004
James E. Blair4aea70c2012-07-26 14:23:24 -07002005class DependentPipelineManager(BasePipelineManager):
2006 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07002007 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07002008
2009 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07002010 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07002011
James E. Blaireff88162013-07-01 12:44:14 -04002012 def _postConfig(self, layout):
2013 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07002014 self.buildChangeQueues()
2015
2016 def buildChangeQueues(self):
2017 self.log.debug("Building shared change queues")
2018 change_queues = []
2019
James E. Blair4aea70c2012-07-26 14:23:24 -07002020 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08002021 change_queue = ChangeQueue(
2022 self.pipeline,
2023 window=self.pipeline.window,
2024 window_floor=self.pipeline.window_floor,
2025 window_increase_type=self.pipeline.window_increase_type,
2026 window_increase_factor=self.pipeline.window_increase_factor,
2027 window_decrease_type=self.pipeline.window_decrease_type,
2028 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07002029 change_queue.addProject(project)
2030 change_queues.append(change_queue)
2031 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07002032
James E. Blairc3d428e2013-12-03 15:06:48 -08002033 # Iterate over all queues trying to combine them, and keep doing
2034 # so until they can not be combined further.
2035 last_change_queues = change_queues
2036 while True:
2037 new_change_queues = self.combineChangeQueues(last_change_queues)
2038 if len(last_change_queues) == len(new_change_queues):
2039 break
2040 last_change_queues = new_change_queues
2041
2042 self.log.info(" Shared change queues:")
2043 for queue in new_change_queues:
2044 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08002045 self.log.info(" %s containing %s" % (
2046 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08002047
2048 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07002049 self.log.debug("Combining shared queues")
2050 new_change_queues = []
2051 for a in change_queues:
2052 merged_a = False
2053 for b in new_change_queues:
2054 if not a.getJobs().isdisjoint(b.getJobs()):
2055 self.log.debug("Merging queue %s into %s" % (a, b))
2056 b.mergeChangeQueue(a)
2057 merged_a = True
2058 break # this breaks out of 'for b' and continues 'for a'
2059 if not merged_a:
2060 self.log.debug("Keeping queue %s" % (a))
2061 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08002062 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07002063
James E. Blair0577cd62015-02-07 11:42:12 -08002064 def getChangeQueue(self, change, existing=None):
2065 if existing:
2066 return StaticChangeQueueContextManager(existing)
2067 return StaticChangeQueueContextManager(
2068 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08002069
James E. Blaire0487072012-08-29 17:38:31 -07002070 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07002071 if not self.pipeline.source.canMerge(change,
2072 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002073 self.log.debug("Change %s can not merge, ignoring" % change)
2074 return False
2075 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07002076
James E. Blair5ee24252014-12-30 10:12:29 -08002077 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
2078 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07002079 to_enqueue = []
2080 self.log.debug("Checking for changes needing %s:" % change)
2081 if not hasattr(change, 'needed_by_changes'):
2082 self.log.debug(" Changeish does not support dependencies")
2083 return
James E. Blair5ee24252014-12-30 10:12:29 -08002084 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08002085 with self.getChangeQueue(other_change) as other_change_queue:
2086 if other_change_queue != change_queue:
2087 self.log.debug(" Change %s in project %s can not be "
2088 "enqueued in the target queue %s" %
2089 (other_change, other_change.project,
2090 change_queue))
2091 continue
James E. Blair5ee24252014-12-30 10:12:29 -08002092 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07002093 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002094 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08002095 (other_change, change))
2096 to_enqueue.append(other_change)
2097
James E. Blaire0487072012-08-29 17:38:31 -07002098 if not to_enqueue:
2099 self.log.debug(" No changes need %s" % change)
2100
2101 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07002102 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002103 ignore_requirements=ignore_requirements,
2104 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002105
James E. Blair5ee24252014-12-30 10:12:29 -08002106 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
2107 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002108 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002109 if ret in [True, False]:
2110 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08002111 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07002112 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08002113 for needed_change in ret:
2114 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002115 ignore_requirements=ignore_requirements,
2116 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08002117 if not r:
2118 return False
2119 return True
James E. Blaire0487072012-08-29 17:38:31 -07002120
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002121 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002122 self.log.debug("Checking for changes needed by %s:" % change)
2123 # Return true if okay to proceed enqueing this change,
2124 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002125 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002126 self.log.debug(" Changeish does not support dependencies")
2127 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002128 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002129 self.log.debug(" No changes needed")
2130 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002131 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002132 # Ignore supplied change_queue
2133 with self.getChangeQueue(change) as change_queue:
2134 for needed_change in change.needs_changes:
2135 self.log.debug(" Change %s needs change %s:" % (
2136 change, needed_change))
2137 if needed_change.is_merged:
2138 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002139 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002140 with self.getChangeQueue(needed_change) as needed_change_queue:
2141 if needed_change_queue != change_queue:
2142 self.log.debug(" Change %s in project %s does not "
2143 "share a change queue with %s "
2144 "in project %s" %
2145 (needed_change, needed_change.project,
2146 change, change.project))
2147 return False
2148 if not needed_change.is_current_patchset:
2149 self.log.debug(" Needed change is not the "
2150 "current patchset")
2151 return False
2152 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2153 self.log.debug(" Needed change is already ahead "
2154 "in the queue")
2155 continue
2156 if self.pipeline.source.canMerge(needed_change,
2157 self.getSubmitAllowNeeds()):
2158 self.log.debug(" Change %s is needed" % needed_change)
2159 if needed_change not in changes_needed:
2160 changes_needed.append(needed_change)
2161 continue
2162 # The needed change can't be merged.
2163 self.log.debug(" Change %s is needed but can not be merged" %
2164 needed_change)
2165 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002166 if changes_needed:
2167 return changes_needed
2168 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002169
James E. Blair6965a4b2014-12-16 17:19:04 -08002170 def getFailingDependentItems(self, item):
2171 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002172 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002173 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002174 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002175 failing_items = set()
2176 for needed_change in item.change.needs_changes:
2177 needed_item = self.getItemForChange(needed_change)
2178 if not needed_item:
2179 continue
2180 if needed_item.current_build_set.failing_reasons:
2181 failing_items.add(needed_item)
2182 if failing_items:
2183 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002184 return None