blob: 93016aced1482b209c10c791c03294f5b329e70d [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
Joshua Heskethde958652015-11-10 19:19:50 +110032from model import Pipeline, Project, ChangeQueue
Joshua Hesketh70b13492015-08-11 23:40:42 +100033from model import ChangeishFilter, NullChange
Joshua Hesketh352264b2015-08-11 23:42:08 +100034from zuul import change_matcher, exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairaf17a972016-02-03 15:07:18 -080062class MutexHandler(object):
63 log = logging.getLogger("zuul.MutexHandler")
64
65 def __init__(self):
66 self.mutexes = {}
67
68 def acquire(self, item, job):
69 if not job.mutex:
70 return True
71 mutex_name = job.mutex
72 m = self.mutexes.get(mutex_name)
73 if not m:
74 # The mutex is not held, acquire it
75 self._acquire(mutex_name, item, job.name)
76 return True
77 held_item, held_job_name = m
78 if held_item is item and held_job_name == job.name:
79 # This item already holds the mutex
80 return True
81 held_build = held_item.current_build_set.getBuild(held_job_name)
82 if held_build and held_build.result:
83 # The build that held the mutex is complete, release it
84 # and let the new item have it.
85 self.log.error("Held mutex %s being released because "
86 "the build that holds it is complete" %
87 (mutex_name,))
88 self._release(mutex_name, item, job.name)
89 self._acquire(mutex_name, item, job.name)
90 return True
91 return False
92
93 def release(self, item, job):
94 if not job.mutex:
95 return
96 mutex_name = job.mutex
97 m = self.mutexes.get(mutex_name)
98 if not m:
99 # The mutex is not held, nothing to do
100 self.log.error("Mutex can not be released for %s "
101 "because the mutex is not held" %
102 (item,))
103 return
104 held_item, held_job_name = m
105 if held_item is item and held_job_name == job.name:
106 # This item holds the mutex
107 self._release(mutex_name, item, job.name)
108 return
109 self.log.error("Mutex can not be released for %s "
110 "which does not hold it" %
111 (item,))
112
113 def _acquire(self, mutex_name, item, job_name):
114 self.log.debug("Job %s of item %s acquiring mutex %s" %
115 (job_name, item, mutex_name))
116 self.mutexes[mutex_name] = (item, job_name)
117
118 def _release(self, mutex_name, item, job_name):
119 self.log.debug("Job %s of item %s releasing mutex %s" %
120 (job_name, item, mutex_name))
121 del self.mutexes[mutex_name]
122
123
James E. Blair468c8512013-12-06 13:27:19 -0800124class ManagementEvent(object):
125 """An event that should be processed within the main queue run loop"""
126 def __init__(self):
127 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -0800128 self._exception = None
129 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -0800130
James E. Blair36658cf2013-12-06 17:53:48 -0800131 def exception(self, e, tb):
132 self._exception = e
133 self._traceback = tb
134 self._wait_event.set()
135
136 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -0800137 self._wait_event.set()
138
139 def wait(self, timeout=None):
140 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -0800141 if self._exception:
142 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -0800143 return self._wait_event.is_set()
144
145
146class ReconfigureEvent(ManagementEvent):
147 """Reconfigure the scheduler. The layout will be (re-)loaded from
148 the path specified in the configuration.
149
150 :arg ConfigParser config: the new configuration
151 """
152 def __init__(self, config):
153 super(ReconfigureEvent, self).__init__()
154 self.config = config
155
156
James E. Blair36658cf2013-12-06 17:53:48 -0800157class PromoteEvent(ManagementEvent):
158 """Promote one or more changes to the head of the queue.
159
160 :arg str pipeline_name: the name of the pipeline
161 :arg list change_ids: a list of strings of change ids in the form
162 1234,1
163 """
164
165 def __init__(self, pipeline_name, change_ids):
166 super(PromoteEvent, self).__init__()
167 self.pipeline_name = pipeline_name
168 self.change_ids = change_ids
169
170
James E. Blaird27a96d2014-07-10 13:25:13 -0700171class EnqueueEvent(ManagementEvent):
172 """Enqueue a change into a pipeline
173
174 :arg TriggerEvent trigger_event: a TriggerEvent describing the
175 trigger, pipeline, and change to enqueue
176 """
177
178 def __init__(self, trigger_event):
179 super(EnqueueEvent, self).__init__()
180 self.trigger_event = trigger_event
181
182
James E. Blaira84f0e42014-02-06 07:09:22 -0800183class ResultEvent(object):
184 """An event that needs to modify the pipeline state due to a
185 result from an external system."""
186
187 pass
188
189
190class BuildStartedEvent(ResultEvent):
191 """A build has started.
192
193 :arg Build build: The build which has started.
194 """
195
196 def __init__(self, build):
197 self.build = build
198
199
200class BuildCompletedEvent(ResultEvent):
201 """A build has completed
202
203 :arg Build build: The build which has completed.
204 """
205
206 def __init__(self, build):
207 self.build = build
208
209
James E. Blair4076e2b2014-01-28 12:42:20 -0800210class MergeCompletedEvent(ResultEvent):
211 """A remote merge operation has completed
212
213 :arg BuildSet build_set: The build_set which is ready.
214 :arg str zuul_url: The URL of the Zuul Merger.
215 :arg bool merged: Whether the merge succeeded (changes with refs).
216 :arg bool updated: Whether the repo was updated (changes without refs).
217 :arg str commit: The SHA of the merged commit (changes with refs).
218 """
219
220 def __init__(self, build_set, zuul_url, merged, updated, commit):
221 self.build_set = build_set
222 self.zuul_url = zuul_url
223 self.merged = merged
224 self.updated = updated
225 self.commit = commit
226
227
Maru Newby3fe5f852015-01-13 04:22:14 +0000228def toList(item):
229 if not item:
230 return []
231 if isinstance(item, list):
232 return item
233 return [item]
234
235
James E. Blaire9d45c32012-05-31 09:56:45 -0700236class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700237 log = logging.getLogger("zuul.Scheduler")
238
Joshua Hesketh352264b2015-08-11 23:42:08 +1000239 def __init__(self, config):
James E. Blaire9d45c32012-05-31 09:56:45 -0700240 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400241 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700242 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700243 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800244 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700245 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700246 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700247 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700248 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800249 self.merger = None
James E. Blairaf17a972016-02-03 15:07:18 -0800250 self.mutex = MutexHandler()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000251 self.connections = dict()
252 # Despite triggers being part of the pipeline, there is one trigger set
253 # per scheduler. The pipeline handles the trigger filters but since
254 # the events are handled by the scheduler itself it needs to handle
255 # the loading of the triggers.
256 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700257 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000258 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700259
260 self.trigger_event_queue = Queue.Queue()
261 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800262 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400263 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700264
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000265 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400266 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400267
Joshua Heskethde958652015-11-10 19:19:50 +1100268 # A set of reporter configuration keys to action mapping
269 self._reporter_actions = {
270 'start': 'start_actions',
271 'success': 'success_actions',
272 'failure': 'failure_actions',
273 'merge-failure': 'merge_failure_actions',
274 'disabled': 'disabled_actions',
275 }
276
James E. Blairb0fcae42012-07-17 11:12:10 -0700277 def stop(self):
278 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000279 self._unloadDrivers()
280 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700281 self.wake_event.set()
282
Joshua Hesketh352264b2015-08-11 23:42:08 +1000283 def testConfig(self, config_path, connections):
284 # Take the list of set up connections directly here rather than with
285 # registerConnections as we don't want to do the onLoad event yet.
286 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800287
Maru Newby3fe5f852015-01-13 04:22:14 +0000288 def _parseSkipIf(self, config_job):
289 cm = change_matcher
290 skip_matchers = []
291
292 for config_skip in config_job.get('skip-if', []):
293 nested_matchers = []
294
295 project_regex = config_skip.get('project')
296 if project_regex:
297 nested_matchers.append(cm.ProjectMatcher(project_regex))
298
299 branch_regex = config_skip.get('branch')
300 if branch_regex:
301 nested_matchers.append(cm.BranchMatcher(branch_regex))
302
303 file_regexes = toList(config_skip.get('all-files-match-any'))
304 if file_regexes:
305 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
306 all_files_matcher = cm.MatchAllFiles(file_matchers)
307 nested_matchers.append(all_files_matcher)
308
309 # All patterns need to match a given skip-if predicate
310 skip_matchers.append(cm.MatchAll(nested_matchers))
311
312 if skip_matchers:
313 # Any skip-if predicate can be matched to trigger a skip
314 return cm.MatchAny(skip_matchers)
315
Joshua Hesketh9a256752016-04-04 13:38:51 +1000316 def registerConnections(self, connections, load=True):
317 # load: whether or not to trigger the onLoad for the connection. This
318 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000319 self.connections = connections
320 for connection_name, connection in self.connections.items():
321 connection.registerScheduler(self)
Joshua Hesketh9a256752016-04-04 13:38:51 +1000322 if load:
323 connection.onLoad()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000324
325 def stopConnections(self):
326 for connection_name, connection in self.connections.items():
327 connection.onStop()
328
329 def _unloadDrivers(self):
330 for trigger in self.triggers.values():
331 trigger.stop()
Joshua Hesketh90b61db2016-02-03 14:22:15 +1100332 self.triggers = {}
Joshua Hesketh352264b2015-08-11 23:42:08 +1000333 for pipeline in self.layout.pipelines.values():
334 pipeline.source.stop()
Joshua Heskethde958652015-11-10 19:19:50 +1100335 for action in self._reporter_actions.values():
336 for reporter in pipeline.__getattribute__(action):
337 reporter.stop()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000338
339 def _getDriver(self, dtype, connection_name, driver_config={}):
340 # Instantiate a driver such as a trigger, source or reporter
341 # TODO(jhesketh): Make this list dynamic or use entrypoints etc.
342 # Stevedore was not a good fit here due to the nature of triggers.
343 # Specifically we don't want to load a trigger per a pipeline as one
344 # trigger can listen to a stream (from gerrit, for example) and the
345 # scheduler decides which eventfilter to use. As such we want to load
346 # trigger+connection pairs uniquely.
347 drivers = {
348 'source': {
349 'gerrit': 'zuul.source.gerrit:GerritSource',
350 },
351 'trigger': {
352 'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
353 'timer': 'zuul.trigger.timer:TimerTrigger',
354 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
355 },
356 'reporter': {
357 'gerrit': 'zuul.reporter.gerrit:GerritReporter',
358 'smtp': 'zuul.reporter.smtp:SMTPReporter',
359 },
360 }
361
362 # TODO(jhesketh): Check the connection_name exists
363 if connection_name in self.connections.keys():
364 driver_name = self.connections[connection_name].driver_name
365 connection = self.connections[connection_name]
366 else:
367 # In some cases a driver may not be related to a connection. For
368 # example, the 'timer' or 'zuul' triggers.
369 driver_name = connection_name
370 connection = None
371 driver = drivers[dtype][driver_name].split(':')
372 driver_instance = getattr(
373 __import__(driver[0], fromlist=['']), driver[1])(
374 driver_config, self, connection
375 )
376
Joshua Hesketh811e2e92015-12-08 09:55:05 +1100377 if connection:
378 connection.registerUse(dtype, driver_instance)
379
Joshua Hesketh352264b2015-08-11 23:42:08 +1000380 return driver_instance
381
382 def _getSourceDriver(self, connection_name):
383 return self._getDriver('source', connection_name)
384
385 def _getReporterDriver(self, connection_name, driver_config={}):
386 return self._getDriver('reporter', connection_name, driver_config)
387
388 def _getTriggerDriver(self, connection_name, driver_config={}):
389 return self._getDriver('trigger', connection_name, driver_config)
390
391 def _parseConfig(self, config_path, connections):
James E. Blaireff88162013-07-01 12:44:14 -0400392 layout = model.Layout()
393 project_templates = {}
394
James E. Blaire5a847f2012-07-10 15:29:14 -0700395 if config_path:
396 config_path = os.path.expanduser(config_path)
397 if not os.path.exists(config_path):
398 raise Exception("Unable to read layout config file at %s" %
399 config_path)
Einst Crazyff9837b2015-11-17 17:32:37 +0800400 with open(config_path) as config_file:
401 data = yaml.load(config_file)
James E. Blaire5a847f2012-07-10 15:29:14 -0700402
James E. Blair47958382013-01-10 17:26:02 -0800403 validator = layoutvalidator.LayoutValidator()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000404 validator.validate(data, connections)
James E. Blair47958382013-01-10 17:26:02 -0800405
James E. Blaireff88162013-07-01 12:44:14 -0400406 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700407 for include in data.get('includes', []):
408 if 'python-file' in include:
409 fn = include['python-file']
410 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100411 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700412 fn = os.path.join(base, fn)
413 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400414 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700415
James E. Blair4aea70c2012-07-26 14:23:24 -0700416 for conf_pipeline in data.get('pipelines', []):
417 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800418 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700419 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000420 pipeline.source = self._getSourceDriver(
421 conf_pipeline.get('source', 'gerrit'))
James E. Blair64ed6f22013-07-10 14:07:23 -0700422 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
423 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800424 pipeline.failure_message = conf_pipeline.get('failure-message',
425 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100426 pipeline.merge_failure_message = conf_pipeline.get(
Jeremy Stanley1c2c3c22015-06-15 21:23:19 +0000427 'merge-failure-message', "Merge Failed.\n\nThis change or one "
428 "of its cross-repo dependencies was unable to be "
429 "automatically merged with the current state of its "
430 "repository. Please rebase the change and upload a new "
Joshua Heskethb7179772014-01-30 23:30:46 +1100431 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800432 pipeline.success_message = conf_pipeline.get('success-message',
433 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100434 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800435 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700436 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800437 pipeline.ignore_dependencies = conf_pipeline.get(
438 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000439
Joshua Heskethde958652015-11-10 19:19:50 +1100440 for conf_key, action in self._reporter_actions.items():
441 reporter_set = []
442 if conf_pipeline.get(conf_key):
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000443 for reporter_name, params \
Joshua Heskethde958652015-11-10 19:19:50 +1100444 in conf_pipeline.get(conf_key).items():
Joshua Hesketh352264b2015-08-11 23:42:08 +1000445 reporter = self._getReporterDriver(reporter_name,
446 params)
Joshua Hesketh385d11e2015-09-14 14:50:01 -0600447 reporter.setAction(conf_key)
Joshua Heskethde958652015-11-10 19:19:50 +1100448 reporter_set.append(reporter)
449 setattr(pipeline, action, reporter_set)
450
451 # If merge-failure actions aren't explicit, use the failure actions
452 if not pipeline.merge_failure_actions:
453 pipeline.merge_failure_actions = pipeline.failure_actions
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000454
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100455 pipeline.disable_at = conf_pipeline.get(
456 'disable-after-consecutive-failures', None)
457
Clark Boylan7603a372014-01-21 11:43:20 -0800458 pipeline.window = conf_pipeline.get('window', 20)
459 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
460 pipeline.window_increase_type = conf_pipeline.get(
461 'window-increase-type', 'linear')
462 pipeline.window_increase_factor = conf_pipeline.get(
463 'window-increase-factor', 1)
464 pipeline.window_decrease_type = conf_pipeline.get(
465 'window-decrease-type', 'exponential')
466 pipeline.window_decrease_factor = conf_pipeline.get(
467 'window-decrease-factor', 2)
468
James E. Blair4aea70c2012-07-26 14:23:24 -0700469 manager = globals()[conf_pipeline['manager']](self, pipeline)
470 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400471 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000472
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000473 if 'require' in conf_pipeline or 'reject' in conf_pipeline:
474 require = conf_pipeline.get('require', {})
475 reject = conf_pipeline.get('reject', {})
Clark Boylana9702ad2014-05-08 17:17:24 -0700476 f = ChangeishFilter(
477 open=require.get('open'),
478 current_patchset=require.get('current-patchset'),
479 statuses=toList(require.get('status')),
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000480 required_approvals=toList(require.get('approval')),
481 reject_approvals=toList(reject.get('approval'))
482 )
James E. Blair11041d22014-05-02 14:49:53 -0700483 manager.changeish_filters.append(f)
484
Joshua Hesketh352264b2015-08-11 23:42:08 +1000485 for trigger_name, trigger_config\
486 in conf_pipeline.get('trigger').items():
487 if trigger_name not in self.triggers.keys():
488 self.triggers[trigger_name] = \
489 self._getTriggerDriver(trigger_name, trigger_config)
490
491 for trigger_name, trigger in self.triggers.items():
492 if trigger_name in conf_pipeline['trigger']:
493 manager.event_filters += trigger.getEventFilters(
494 conf_pipeline['trigger'][trigger_name])
James E. Blairee743612012-05-29 14:49:32 -0700495
Antoine Musso80edd5a2013-02-13 15:37:53 +0100496 for project_template in data.get('project-templates', []):
497 # Make sure the template only contains valid pipelines
498 tpl = dict(
499 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400500 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100501 if pipe_name in project_template
502 )
James E. Blaireff88162013-07-01 12:44:14 -0400503 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100504
James E. Blair47958382013-01-10 17:26:02 -0800505 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400506 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700507 # Be careful to only set attributes explicitly present on
508 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800509 m = config_job.get('queue-name', None)
510 if m:
511 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700512 m = config_job.get('failure-message', None)
513 if m:
514 job.failure_message = m
515 m = config_job.get('success-message', None)
516 if m:
517 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800518 m = config_job.get('failure-pattern', None)
519 if m:
520 job.failure_pattern = m
521 m = config_job.get('success-pattern', None)
522 if m:
523 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700524 m = config_job.get('hold-following-changes', False)
525 if m:
526 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700527 m = config_job.get('voting', None)
528 if m is not None:
529 job.voting = m
James E. Blairaf17a972016-02-03 15:07:18 -0800530 m = config_job.get('mutex', None)
531 if m is not None:
532 job.mutex = m
James E. Blair456f2fb2016-02-09 09:29:33 -0800533 tags = toList(config_job.get('tags'))
534 if tags:
535 # Tags are merged via a union rather than a
536 # destructive copy because they are intended to
537 # accumulate onto any previously applied tags from
538 # metajobs.
539 job.tags = job.tags.union(set(tags))
James E. Blaire5a847f2012-07-10 15:29:14 -0700540 fname = config_job.get('parameter-function', None)
541 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400542 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700543 if not func:
544 raise Exception("Unable to find function %s" % fname)
545 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700546 branches = toList(config_job.get('branch'))
547 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700548 job._branches = branches
549 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800550 files = toList(config_job.get('files'))
551 if files:
552 job._files = files
553 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000554 skip_if_matcher = self._parseSkipIf(config_job)
555 if skip_if_matcher:
556 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100557 swift = toList(config_job.get('swift'))
558 if swift:
559 for s in swift:
560 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700561
562 def add_jobs(job_tree, config_jobs):
563 for job in config_jobs:
564 if isinstance(job, list):
565 for x in job:
566 add_jobs(job_tree, x)
567 if isinstance(job, dict):
568 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400569 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700570 add_jobs(parent_tree, children)
571 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400572 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700573
James E. Blair47958382013-01-10 17:26:02 -0800574 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700575 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800576 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100577
James E. Blair3e98c022013-12-16 15:25:38 -0800578 # This is reversed due to the prepend operation below, so
579 # the ultimate order is templates (in order) followed by
580 # statically defined jobs.
581 for requested_template in reversed(
582 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100583 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400584 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100585 requested_template.get('name'))
586 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800587 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100588 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800589 # Finally merge the expansion with whatever has been
590 # already defined for this project. Prepend our new
591 # jobs to existing ones (which may have been
592 # statically defined or defined by other templates).
593 for pipeline in layout.pipelines.values():
594 if pipeline.name in expanded:
595 config_project.update(
596 {pipeline.name: expanded[pipeline.name] +
597 config_project.get(pipeline.name, [])})
Antoine Musso80edd5a2013-02-13 15:37:53 +0100598
James E. Blaireff88162013-07-01 12:44:14 -0400599 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700600 mode = config_project.get('merge-mode', 'merge-resolve')
601 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400602 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700603 if pipeline.name in config_project:
604 job_tree = pipeline.addProject(project)
605 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700606 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700607
James E. Blairb0954652012-06-01 11:32:01 -0700608 # All jobs should be defined at this point, get rid of
609 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700610 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700611
James E. Blaireff88162013-07-01 12:44:14 -0400612 for pipeline in layout.pipelines.values():
613 pipeline.manager._postConfig(layout)
614
615 return layout
James E. Blairee743612012-05-29 14:49:32 -0700616
James E. Blairee743612012-05-29 14:49:32 -0700617 def setLauncher(self, launcher):
618 self.launcher = launcher
619
James E. Blair4076e2b2014-01-28 12:42:20 -0800620 def setMerger(self, merger):
621 self.merger = merger
622
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000623 def getProject(self, name, create_foreign=False):
James E. Blaircdccd972013-07-01 12:10:22 -0700624 self.layout_lock.acquire()
625 p = None
626 try:
627 p = self.layout.projects.get(name)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000628 if p is None and create_foreign:
629 self.log.info("Registering foreign project: %s" % name)
630 p = Project(name, foreign=True)
631 self.layout.projects[name] = p
James E. Blaircdccd972013-07-01 12:10:22 -0700632 finally:
633 self.layout_lock.release()
634 return p
635
James E. Blairee743612012-05-29 14:49:32 -0700636 def addEvent(self, event):
637 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800638 try:
639 if statsd:
640 statsd.incr('gerrit.event.%s' % event.type)
641 except:
642 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700643 self.trigger_event_queue.put(event)
644 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800645 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700646
James E. Blair11700c32012-07-05 17:50:05 -0700647 def onBuildStarted(self, build):
648 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800649 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800650 event = BuildStartedEvent(build)
651 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700652 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800653 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700654
James E. Blairf0358662015-07-20 15:19:12 -0700655 def onBuildCompleted(self, build, result):
656 self.log.debug("Adding complete event for build: %s result: %s" % (
657 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800658 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700659 # Note, as soon as the result is set, other threads may act
660 # upon this, even though the event hasn't been fully
661 # processed. Ensure that any other data from the event (eg,
662 # timing) is recorded before setting the result.
663 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800664 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700665 if statsd and build.pipeline:
666 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500667 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
Bruno Tavaresf564b282015-10-15 15:20:51 -0300668 statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500669 for label in build.node_labels:
670 # Jenkins includes the node name in its list of labels, so
671 # we filter it out here, since that is not statistically
672 # interesting.
673 if label == build.node_name:
674 continue
675 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800676 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
677 build.pipeline.name, label)
Timothy Chavezb2332082015-08-07 20:08:04 -0500678 statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700679 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
680 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800681 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
682 dt = int((build.end_time - build.start_time) * 1000)
683 statsd.timing(key, dt)
684 statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800685
686 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
687 build.pipeline.name, jobname)
688 dt = int((build.start_time - build.launch_time) * 1000)
689 statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800690 except:
691 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800692 event = BuildCompletedEvent(build)
693 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700694 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800695 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700696
James E. Blair4076e2b2014-01-28 12:42:20 -0800697 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
698 self.log.debug("Adding merge complete event for build set: %s" %
699 build_set)
700 event = MergeCompletedEvent(build_set, zuul_url,
701 merged, updated, commit)
702 self.result_event_queue.put(event)
703 self.wake_event.set()
704
James E. Blaire9d45c32012-05-31 09:56:45 -0700705 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700706 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800707 event = ReconfigureEvent(config)
708 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700709 self.wake_event.set()
710 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800711 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700712 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400713 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700714
James E. Blair36658cf2013-12-06 17:53:48 -0800715 def promote(self, pipeline_name, change_ids):
716 event = PromoteEvent(pipeline_name, change_ids)
717 self.management_event_queue.put(event)
718 self.wake_event.set()
719 self.log.debug("Waiting for promotion")
720 event.wait()
721 self.log.debug("Promotion complete")
722
James E. Blaird27a96d2014-07-10 13:25:13 -0700723 def enqueue(self, trigger_event):
724 event = EnqueueEvent(trigger_event)
725 self.management_event_queue.put(event)
726 self.wake_event.set()
727 self.log.debug("Waiting for enqueue")
728 event.wait()
729 self.log.debug("Enqueue complete")
730
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700731 def exit(self):
732 self.log.debug("Prepare to exit")
733 self._pause = True
734 self._exit = True
735 self.wake_event.set()
736 self.log.debug("Waiting for exit")
737
738 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700739 if self.config.has_option('zuul', 'state_dir'):
740 state_dir = os.path.expanduser(self.config.get('zuul',
741 'state_dir'))
742 else:
743 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700744 return os.path.join(state_dir, 'queue.pickle')
745
746 def _save_queue(self):
747 pickle_file = self._get_queue_pickle_file()
748 events = []
749 while not self.trigger_event_queue.empty():
750 events.append(self.trigger_event_queue.get())
751 self.log.debug("Queue length is %s" % len(events))
752 if events:
753 self.log.debug("Saving queue")
754 pickle.dump(events, open(pickle_file, 'wb'))
755
756 def _load_queue(self):
757 pickle_file = self._get_queue_pickle_file()
758 if os.path.exists(pickle_file):
759 self.log.debug("Loading queue")
760 events = pickle.load(open(pickle_file, 'rb'))
761 self.log.debug("Queue length is %s" % len(events))
762 for event in events:
763 self.trigger_event_queue.put(event)
764 else:
765 self.log.debug("No queue file found")
766
767 def _delete_queue(self):
768 pickle_file = self._get_queue_pickle_file()
769 if os.path.exists(pickle_file):
770 self.log.debug("Deleting saved queue")
771 os.unlink(pickle_file)
772
773 def resume(self):
774 try:
775 self._load_queue()
776 except:
777 self.log.exception("Unable to load queue")
778 try:
779 self._delete_queue()
780 except:
781 self.log.exception("Unable to delete saved queue")
782 self.log.debug("Resuming queue processing")
783 self.wake_event.set()
784
785 def _doPauseEvent(self):
786 if self._exit:
787 self.log.debug("Exiting")
788 self._save_queue()
789 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700790
James E. Blair468c8512013-12-06 13:27:19 -0800791 def _doReconfigureEvent(self, event):
792 # This is called in the scheduler loop after another thread submits
793 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700794 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800795 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700796 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700797 self.log.debug("Performing reconfiguration")
Joshua Hesketh352264b2015-08-11 23:42:08 +1000798 self._unloadDrivers()
James E. Blaircdccd972013-07-01 12:10:22 -0700799 layout = self._parseConfig(
Joshua Hesketh352264b2015-08-11 23:42:08 +1000800 self.config.get('zuul', 'layout_config'), self.connections)
James E. Blaircdccd972013-07-01 12:10:22 -0700801 for name, new_pipeline in layout.pipelines.items():
802 old_pipeline = self.layout.pipelines.get(name)
803 if not old_pipeline:
804 if self.layout.pipelines:
805 # Don't emit this warning on startup
806 self.log.warning("No old pipeline matching %s found "
807 "when reconfiguring" % name)
808 continue
James E. Blairdad52252014-02-07 16:59:17 -0800809 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700810 items_to_remove = []
James E. Blair400e8fd2015-07-30 17:44:45 -0700811 builds_to_cancel = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800812 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700813 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700814 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800815 if not item.item_ahead:
816 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700817 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700818 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700819 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800820 item.queue = None
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000821 project_name = item.change.project.name
822 item.change.project = layout.projects.get(project_name)
823 if not item.change.project:
824 self.log.debug("Project %s not defined, "
825 "re-instantiating as foreign" %
826 project_name)
827 project = Project(project_name, foreign=True)
828 layout.projects[project_name] = project
829 item.change.project = project
James E. Blairfe707d12015-08-05 15:18:15 -0700830 item_jobs = new_pipeline.getJobs(item)
James E. Blairdad52252014-02-07 16:59:17 -0800831 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800832 job = layout.jobs.get(build.job.name)
James E. Blairfe707d12015-08-05 15:18:15 -0700833 if job and job in item_jobs:
James E. Blair6b077942014-02-07 17:45:55 -0800834 build.job = job
835 else:
James E. Blair400e8fd2015-07-30 17:44:45 -0700836 item.removeBuild(build)
837 builds_to_cancel.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800838 if not new_pipeline.manager.reEnqueueItem(item,
839 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700840 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800841 for item in items_to_remove:
842 for build in item.current_build_set.getBuilds():
James E. Blair400e8fd2015-07-30 17:44:45 -0700843 builds_to_cancel.append(build)
844 for build in builds_to_cancel:
James E. Blair6b077942014-02-07 17:45:55 -0800845 self.log.warning(
846 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800847 try:
848 self.launcher.cancel(build)
849 except Exception:
850 self.log.exception(
851 "Exception while canceling build %s "
852 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700853 self.layout = layout
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100854 self.maintainConnectionCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700855 for trigger in self.triggers.values():
856 trigger.postConfig()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000857 for pipeline in self.layout.pipelines.values():
858 pipeline.source.postConfig()
Joshua Heskethde958652015-11-10 19:19:50 +1100859 for action in self._reporter_actions.values():
860 for reporter in pipeline.__getattribute__(action):
861 reporter.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700862 if statsd:
863 try:
864 for pipeline in self.layout.pipelines.values():
865 items = len(pipeline.getAllItems())
866 # stats.gauges.zuul.pipeline.NAME.current_changes
867 key = 'zuul.pipeline.%s' % pipeline.name
868 statsd.gauge(key + '.current_changes', items)
869 except Exception:
870 self.log.exception("Exception reporting initial "
871 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700872 finally:
873 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700874
James E. Blair36658cf2013-12-06 17:53:48 -0800875 def _doPromoteEvent(self, event):
876 pipeline = self.layout.pipelines[event.pipeline_name]
877 change_ids = [c.split(',') for c in event.change_ids]
878 items_to_enqueue = []
879 change_queue = None
880 for shared_queue in pipeline.queues:
881 if change_queue:
882 break
883 for item in shared_queue.queue:
884 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000885 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800886 change_queue = shared_queue
887 break
888 if not change_queue:
889 raise Exception("Unable to find shared change queue for %s" %
890 event.change_ids[0])
891 for number, patchset in change_ids:
892 found = False
893 for item in change_queue.queue:
894 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000895 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800896 found = True
897 items_to_enqueue.append(item)
898 break
899 if not found:
900 raise Exception("Unable to find %s,%s in queue %s" %
901 (number, patchset, change_queue))
902 for item in change_queue.queue[:]:
903 if item not in items_to_enqueue:
904 items_to_enqueue.append(item)
905 pipeline.manager.cancelJobs(item)
906 pipeline.manager.dequeueItem(item)
907 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500908 pipeline.manager.addChange(
909 item.change,
910 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700911 quiet=True,
912 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800913
James E. Blaird27a96d2014-07-10 13:25:13 -0700914 def _doEnqueueEvent(self, event):
915 project = self.layout.projects.get(event.project_name)
916 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700917 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700918 self.log.debug("Event %s for change %s was directly assigned "
919 "to pipeline %s" % (event, change, self))
920 self.log.info("Adding %s, %s to %s" %
921 (project, change, pipeline))
922 pipeline.manager.addChange(change, ignore_requirements=True)
923
James E. Blaire9d45c32012-05-31 09:56:45 -0700924 def _areAllBuildsComplete(self):
925 self.log.debug("Checking if all builds are complete")
926 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800927 if self.merger.areMergesOutstanding():
928 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400929 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800930 for item in pipeline.getAllItems():
931 for build in item.current_build_set.getBuilds():
932 if build.result is None:
933 self.log.debug("%s waiting on %s" %
934 (pipeline.manager, build))
935 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700936 if not waiting:
937 self.log.debug("All builds are complete")
938 return True
939 self.log.debug("All builds are not complete")
940 return False
941
James E. Blairee743612012-05-29 14:49:32 -0700942 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800943 if statsd:
944 self.log.debug("Statsd enabled")
945 else:
946 self.log.debug("Statsd disabled because python statsd "
947 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700948 while True:
949 self.log.debug("Run handler sleeping")
950 self.wake_event.wait()
951 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700952 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800953 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700954 return
James E. Blairee743612012-05-29 14:49:32 -0700955 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800956 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700957 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800958 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800959 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700960
James E. Blair263fba92013-02-27 13:07:19 -0800961 # Give result events priority -- they let us stop builds,
962 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800963 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700964 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800965
966 if not self._pause:
967 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800968 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700969
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700970 if self._pause and self._areAllBuildsComplete():
971 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700972
James E. Blaira84f0e42014-02-06 07:09:22 -0800973 for pipeline in self.layout.pipelines.values():
974 while pipeline.manager.processQueue():
975 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700976
James E. Blaira84f0e42014-02-06 07:09:22 -0800977 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700978 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800979 # There may still be more events to process
980 self.wake_event.set()
981 finally:
982 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700983
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100984 def maintainConnectionCache(self):
James E. Blair0e933c52013-07-11 10:18:52 -0700985 relevant = set()
986 for pipeline in self.layout.pipelines.values():
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100987 self.log.debug("Gather relevant cache items for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700988 for item in pipeline.getAllItems():
989 relevant.add(item.change)
990 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100991 for connection in self.connections.values():
992 connection.maintainCache(relevant)
993 self.log.debug(
994 "End maintain connection cache for: %s" % connection)
995 self.log.debug("Connection cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -0700996
James E. Blairee743612012-05-29 14:49:32 -0700997 def process_event_queue(self):
998 self.log.debug("Fetching trigger event")
999 event = self.trigger_event_queue.get()
1000 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -08001001 try:
1002 project = self.layout.projects.get(event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -08001003
James E. Blaira84f0e42014-02-06 07:09:22 -08001004 for pipeline in self.layout.pipelines.values():
Joshua Hesketh352264b2015-08-11 23:42:08 +10001005 # Get the change even if the project is unknown to us for the
1006 # use of updating the cache if there is another change
1007 # depending on this foreign one.
1008 try:
1009 change = pipeline.source.getChange(event, project)
1010 except exceptions.ChangeNotFound as e:
1011 self.log.debug("Unable to get change %s from source %s. "
1012 "(most likely looking for a change from "
1013 "another connection trigger)",
1014 e.change, pipeline.source)
1015 continue
1016 if not project or project.foreign:
1017 self.log.debug("Project %s not found" % event.project_name)
1018 continue
James E. Blaira84f0e42014-02-06 07:09:22 -08001019 if event.type == 'patchset-created':
1020 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +01001021 elif event.type == 'change-abandoned':
1022 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -08001023 if pipeline.manager.eventMatches(event, change):
1024 self.log.info("Adding %s, %s to %s" %
1025 (project, change, pipeline))
1026 pipeline.manager.addChange(change)
1027 finally:
James E. Blairff791972013-01-09 11:45:43 -08001028 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -07001029
James E. Blair468c8512013-12-06 13:27:19 -08001030 def process_management_queue(self):
1031 self.log.debug("Fetching management event")
1032 event = self.management_event_queue.get()
1033 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -08001034 try:
1035 if isinstance(event, ReconfigureEvent):
1036 self._doReconfigureEvent(event)
1037 elif isinstance(event, PromoteEvent):
1038 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -07001039 elif isinstance(event, EnqueueEvent):
1040 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -08001041 else:
1042 self.log.error("Unable to handle event %s" % event)
1043 event.done()
1044 except Exception as e:
1045 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -08001046 self.management_event_queue.task_done()
1047
James E. Blairee743612012-05-29 14:49:32 -07001048 def process_result_queue(self):
1049 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -08001050 event = self.result_event_queue.get()
1051 self.log.debug("Processing result event %s" % event)
1052 try:
1053 if isinstance(event, BuildStartedEvent):
1054 self._doBuildStartedEvent(event)
1055 elif isinstance(event, BuildCompletedEvent):
1056 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -08001057 elif isinstance(event, MergeCompletedEvent):
1058 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -08001059 else:
1060 self.log.error("Unable to handle event %s" % event)
1061 finally:
1062 self.result_event_queue.task_done()
1063
1064 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -08001065 build = event.build
1066 if build.build_set is not build.build_set.item.current_build_set:
1067 self.log.warning("Build %s is not in the current build set" %
1068 (build,))
1069 return
1070 pipeline = build.build_set.item.pipeline
1071 if not pipeline:
1072 self.log.warning("Build %s is not associated with a pipeline" %
1073 (build,))
1074 return
1075 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -08001076
1077 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -08001078 build = event.build
1079 if build.build_set is not build.build_set.item.current_build_set:
1080 self.log.warning("Build %s is not in the current build set" %
1081 (build,))
1082 return
1083 pipeline = build.build_set.item.pipeline
1084 if not pipeline:
1085 self.log.warning("Build %s is not associated with a pipeline" %
1086 (build,))
1087 return
1088 pipeline.manager.onBuildCompleted(event.build)
1089
1090 def _doMergeCompletedEvent(self, event):
1091 build_set = event.build_set
1092 if build_set is not build_set.item.current_build_set:
1093 self.log.warning("Build set %s is not current" % (build_set,))
1094 return
1095 pipeline = build_set.item.pipeline
1096 if not pipeline:
1097 self.log.warning("Build set %s is not associated with a pipeline" %
1098 (build_set,))
1099 return
1100 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -07001101
James E. Blair8dbd56a2012-12-22 10:55:10 -08001102 def formatStatusJSON(self):
1103 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001104
1105 data['zuul_version'] = self.zuul_version
1106
James E. Blair8dbd56a2012-12-22 10:55:10 -08001107 if self._pause:
1108 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -08001109 if self._exit:
1110 ret += 'exit'
1111 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
1112 ret += '</p>'
1113 data['message'] = ret
1114
James E. Blairfb682cc2013-02-26 15:23:27 -08001115 data['trigger_event_queue'] = {}
1116 data['trigger_event_queue']['length'] = \
1117 self.trigger_event_queue.qsize()
1118 data['result_event_queue'] = {}
1119 data['result_event_queue']['length'] = \
1120 self.result_event_queue.qsize()
1121
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +04001122 if self.last_reconfigured:
1123 data['last_reconfigured'] = self.last_reconfigured * 1000
1124
James E. Blair8dbd56a2012-12-22 10:55:10 -08001125 pipelines = []
1126 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -07001127 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -08001128 pipelines.append(pipeline.formatStatusJSON())
1129 return json.dumps(data)
1130
James E. Blair1e8dd892012-05-30 09:15:05 -07001131
James E. Blair4aea70c2012-07-26 14:23:24 -07001132class BasePipelineManager(object):
1133 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -07001134
James E. Blair4aea70c2012-07-26 14:23:24 -07001135 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -07001136 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -07001137 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -07001138 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -07001139 self.changeish_filters = []
James E. Blairee743612012-05-29 14:49:32 -07001140
1141 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -07001142 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -07001143
James E. Blaireff88162013-07-01 12:44:14 -04001144 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -07001145 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -07001146 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -07001147 self.log.info(" Requirements:")
1148 for f in self.changeish_filters:
1149 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -07001150 self.log.info(" Events:")
1151 for e in self.event_filters:
1152 self.log.info(" %s" % e)
1153 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001154
James E. Blairee743612012-05-29 14:49:32 -07001155 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001156 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001157 if tree.job:
1158 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001159 for b in tree.job._branches:
1160 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001161 for f in tree.job._files:
1162 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001163 if tree.job.skip_if_matcher:
1164 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001165 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001166 efilters = ' ' + efilters
James E. Blairaf17a972016-02-03 15:07:18 -08001167 tags = []
James E. Blair222d4982012-07-16 09:31:19 -07001168 if tree.job.hold_following_changes:
James E. Blairaf17a972016-02-03 15:07:18 -08001169 tags.append('[hold]')
James E. Blair4ec821f2012-08-23 15:28:28 -07001170 if not tree.job.voting:
James E. Blairaf17a972016-02-03 15:07:18 -08001171 tags.append('[nonvoting]')
1172 if tree.job.mutex:
1173 tags.append('[mutex: %s]' % tree.job.mutex)
1174 tags = ' '.join(tags)
1175 self.log.info("%s%s%s %s" % (istr, repr(tree.job),
1176 efilters, tags))
James E. Blairee743612012-05-29 14:49:32 -07001177 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001178 log_jobs(x, indent + 2)
1179
James E. Blaireff88162013-07-01 12:44:14 -04001180 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001181 tree = self.pipeline.getJobTree(p)
1182 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001183 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001184 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001185 self.log.info(" On start:")
1186 self.log.info(" %s" % self.pipeline.start_actions)
1187 self.log.info(" On success:")
1188 self.log.info(" %s" % self.pipeline.success_actions)
1189 self.log.info(" On failure:")
1190 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001191 self.log.info(" On merge-failure:")
1192 self.log.info(" %s" % self.pipeline.merge_failure_actions)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001193 self.log.info(" When disabled:")
1194 self.log.info(" %s" % self.pipeline.disabled_actions)
James E. Blairee743612012-05-29 14:49:32 -07001195
James E. Blaire421a232012-07-25 16:59:21 -07001196 def getSubmitAllowNeeds(self):
1197 # Get a list of code review labels that are allowed to be
1198 # "needed" in the submit records for a change, with respect
1199 # to this queue. In other words, the list of review labels
1200 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001201 allow_needs = set()
1202 for action_reporter in self.pipeline.success_actions:
1203 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1204 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001205
James E. Blairc053d022014-01-22 14:57:33 -08001206 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001207 if event.forced_pipeline:
1208 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001209 self.log.debug("Event %s for change %s was directly assigned "
1210 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001211 return True
1212 else:
1213 return False
James E. Blairee743612012-05-29 14:49:32 -07001214 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001215 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001216 self.log.debug("Event %s for change %s matched %s "
1217 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001218 return True
1219 return False
1220
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001221 def isChangeAlreadyInPipeline(self, change):
1222 # Checks live items in the pipeline
1223 for item in self.pipeline.getAllItems():
1224 if item.live and change.equals(item.change):
1225 return True
1226 return False
1227
1228 def isChangeAlreadyInQueue(self, change, change_queue):
1229 # Checks any item in the specified change queue
1230 for item in change_queue.queue:
1231 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001232 return True
1233 return False
1234
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001235 def reportStart(self, item):
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001236 if not self.pipeline._disabled:
1237 try:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001238 self.log.info("Reporting start, action %s item %s" %
1239 (self.pipeline.start_actions, item))
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001240 ret = self.sendReport(self.pipeline.start_actions,
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001241 self.pipeline.source, item)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001242 if ret:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001243 self.log.error("Reporting item start %s received: %s" %
1244 (item, ret))
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001245 except:
1246 self.log.exception("Exception while reporting start:")
James E. Blaire0487072012-08-29 17:38:31 -07001247
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001248 def sendReport(self, action_reporters, source, item,
1249 message=None):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001250 """Sends the built message off to configured reporters.
1251
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001252 Takes the action_reporters, item, message and extra options and
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001253 sends them to the pluggable reporters.
1254 """
1255 report_errors = []
1256 if len(action_reporters) > 0:
Joshua Heskethde958652015-11-10 19:19:50 +11001257 for reporter in action_reporters:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001258 ret = reporter.report(source, self.pipeline, item)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001259 if ret:
1260 report_errors.append(ret)
1261 if len(report_errors) == 0:
1262 return
1263 return report_errors
1264
James E. Blaire0487072012-08-29 17:38:31 -07001265 def isChangeReadyToBeEnqueued(self, change):
1266 return True
1267
James E. Blair5ee24252014-12-30 10:12:29 -08001268 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1269 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001270 return True
1271
James E. Blair5ee24252014-12-30 10:12:29 -08001272 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1273 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001274 return True
1275
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001276 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001277 return True
1278
James E. Blair6965a4b2014-12-16 17:19:04 -08001279 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001280 return None
1281
James E. Blairfee8d652013-06-07 08:57:52 -07001282 def getDependentItems(self, item):
1283 orig_item = item
1284 items = []
1285 while item.item_ahead:
1286 items.append(item.item_ahead)
1287 item = item.item_ahead
1288 self.log.info("Change %s depends on changes %s" %
1289 (orig_item.change,
1290 [x.change for x in items]))
1291 return items
1292
James E. Blair972e3c72013-08-29 12:04:55 -07001293 def getItemForChange(self, change):
1294 for item in self.pipeline.getAllItems():
1295 if item.change.equals(change):
1296 return item
1297 return None
1298
James E. Blair2fa50962013-01-30 21:50:41 -08001299 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001300 for item in self.pipeline.getAllItems():
1301 if not item.live:
1302 continue
1303 if change.isUpdateOf(item.change):
1304 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001305 return None
1306
1307 def removeOldVersionsOfChange(self, change):
1308 if not self.pipeline.dequeue_on_new_patchset:
1309 return
James E. Blairba437362015-02-07 11:41:52 -08001310 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1311 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001312 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001313 (change, old_item.change, old_item))
1314 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001315
Antoine Mussobd86a312014-01-08 14:51:33 +01001316 def removeAbandonedChange(self, change):
1317 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001318 for item in self.pipeline.getAllItems():
1319 if not item.live:
1320 continue
1321 if item.change.equals(change):
1322 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001323
James E. Blairbfb8e042014-12-30 17:01:44 -08001324 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001325 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1326 if change_queue:
1327 self.log.debug("Re-enqueing change %s in queue %s" %
1328 (item.change, change_queue))
1329 change_queue.enqueueItem(item)
James E. Blair6bc782d2015-07-17 16:20:21 -07001330
1331 # Re-set build results in case any new jobs have been
1332 # added to the tree.
1333 for build in item.current_build_set.getBuilds():
1334 if build.result:
1335 self.pipeline.setResult(item, build)
1336 # Similarly, reset the item state.
1337 if item.current_build_set.unable_to_merge:
1338 self.pipeline.setUnableToMerge(item)
1339 if item.dequeued_needing_change:
1340 self.pipeline.setDequeuedNeedingChange(item)
1341
James E. Blair0577cd62015-02-07 11:42:12 -08001342 self.reportStats(item)
1343 return True
1344 else:
1345 self.log.error("Unable to find change queue for project %s" %
1346 item.change.project)
1347 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001348
James E. Blairf9ab8842014-07-10 13:12:07 -07001349 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001350 ignore_requirements=False, live=True,
1351 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001352 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001353
1354 # If we are adding a live change, check if it's a live item
1355 # anywhere in the pipeline. Otherwise, we will perform the
1356 # duplicate check below on the specific change_queue.
1357 if live and self.isChangeAlreadyInPipeline(change):
1358 self.log.debug("Change %s is already in pipeline, "
1359 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001360 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001361
James E. Blaire0487072012-08-29 17:38:31 -07001362 if not self.isChangeReadyToBeEnqueued(change):
1363 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1364 change)
1365 return False
1366
James E. Blairf9ab8842014-07-10 13:12:07 -07001367 if not ignore_requirements:
1368 for f in self.changeish_filters:
1369 if not f.matches(change):
1370 self.log.debug("Change %s does not match pipeline "
1371 "requirement %s" % (change, f))
1372 return False
James E. Blair11041d22014-05-02 14:49:53 -07001373
James E. Blair0577cd62015-02-07 11:42:12 -08001374 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001375 if not change_queue:
1376 self.log.debug("Unable to find change queue for "
1377 "change %s in project %s" %
1378 (change, change.project))
1379 return False
1380
James E. Blair0577cd62015-02-07 11:42:12 -08001381 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1382 change_queue):
1383 self.log.debug("Failed to enqueue changes "
1384 "ahead of %s" % change)
1385 return False
James E. Blaire0487072012-08-29 17:38:31 -07001386
James E. Blair0577cd62015-02-07 11:42:12 -08001387 if self.isChangeAlreadyInQueue(change, change_queue):
1388 self.log.debug("Change %s is already in queue, "
1389 "ignoring" % change)
1390 return True
1391
1392 self.log.debug("Adding change %s to queue %s" %
1393 (change, change_queue))
James E. Blair0577cd62015-02-07 11:42:12 -08001394 item = change_queue.enqueueChange(change)
1395 if enqueue_time:
1396 item.enqueue_time = enqueue_time
1397 item.live = live
1398 self.reportStats(item)
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001399 if not quiet:
1400 if len(self.pipeline.start_actions) > 0:
1401 self.reportStart(item)
James E. Blair0577cd62015-02-07 11:42:12 -08001402 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1403 change_queue)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001404 for trigger in self.sched.triggers.values():
1405 trigger.onChangeEnqueued(item.change, self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001406 return True
1407
James E. Blair972e3c72013-08-29 12:04:55 -07001408 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001409 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001410 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001411
James E. Blairba437362015-02-07 11:41:52 -08001412 def removeItem(self, item):
1413 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001414 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001415 self.log.debug("Canceling builds behind change: %s "
1416 "because it is being removed." % item.change)
1417 self.cancelJobs(item)
1418 self.dequeueItem(item)
1419 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001420
James E. Blairac2c3242014-01-24 13:38:51 -08001421 def _makeMergerItem(self, item):
1422 # Create a dictionary with all info about the item needed by
1423 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001424 number = None
1425 patchset = None
1426 oldrev = None
1427 newrev = None
1428 if hasattr(item.change, 'number'):
1429 number = item.change.number
1430 patchset = item.change.patchset
1431 elif hasattr(item.change, 'newrev'):
1432 oldrev = item.change.oldrev
1433 newrev = item.change.newrev
Joshua Hesketh352264b2015-08-11 23:42:08 +10001434 connection_name = self.pipeline.source.connection.connection_name
James E. Blairac2c3242014-01-24 13:38:51 -08001435 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001436 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001437 item.change.project),
Joshua Hesketh352264b2015-08-11 23:42:08 +10001438 connection_name=connection_name,
James E. Blairac2c3242014-01-24 13:38:51 -08001439 merge_mode=item.change.project.merge_mode,
1440 refspec=item.change.refspec,
1441 branch=item.change.branch,
1442 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001443 number=number,
1444 patchset=patchset,
1445 oldrev=oldrev,
1446 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001447 )
1448
James E. Blairfee8d652013-06-07 08:57:52 -07001449 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001450 # Returns True if the ref is ready, false otherwise
1451 build_set = item.current_build_set
1452 if build_set.merge_state == build_set.COMPLETE:
1453 return True
1454 if build_set.merge_state == build_set.PENDING:
1455 return False
1456 build_set.merge_state = build_set.PENDING
1457 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001458 if hasattr(item.change, 'refspec') and not ref:
1459 self.log.debug("Preparing ref for: %s" % item.change)
1460 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001461 dependent_items = self.getDependentItems(item)
1462 dependent_items.reverse()
1463 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001464 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001465 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001466 item.current_build_set,
1467 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001468 else:
1469 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001470 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001471 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001472 url, build_set,
1473 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001474 return False
1475
1476 def _launchJobs(self, item, jobs):
1477 self.log.debug("Launching jobs for change %s" % item.change)
1478 dependent_items = self.getDependentItems(item)
1479 for job in jobs:
1480 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001481 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001482 build = self.sched.launcher.launch(job, item,
1483 self.pipeline,
1484 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001485 self.log.debug("Adding build %s of job %s to item %s" %
1486 (build, job, item))
1487 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001488 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001489 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001490 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001491
James E. Blairfee8d652013-06-07 08:57:52 -07001492 def launchJobs(self, item):
James E. Blairaf17a972016-02-03 15:07:18 -08001493 jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
James E. Blairdaabed22012-08-15 15:38:57 -07001494 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001495 self._launchJobs(item, jobs)
1496
1497 def cancelJobs(self, item, prime=True):
1498 self.log.debug("Cancel jobs for change %s" % item.change)
1499 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001500 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001501 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001502 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001503 for build in old_build_set.getBuilds():
1504 try:
1505 self.sched.launcher.cancel(build)
1506 except:
1507 self.log.exception("Exception while canceling build %s "
1508 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001509 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001510 canceled = True
James E. Blair82a42d12015-07-20 13:50:09 -07001511 self.updateBuildDescriptions(old_build_set)
James E. Blair972e3c72013-08-29 12:04:55 -07001512 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001513 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001514 (item_behind.change, item.change))
1515 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001516 canceled = True
1517 return canceled
1518
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001519 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001520 changed = False
1521 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001522 if item_ahead and (not item_ahead.live):
1523 item_ahead = None
1524 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001525 failing_reasons = [] # Reasons this item is failing
1526
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001527 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001528 # It's not okay to enqueue this change, we should remove it.
1529 self.log.info("Dequeuing change %s because "
1530 "it can no longer merge" % item.change)
1531 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001532 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001533 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001534 if item.live:
1535 try:
1536 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001537 except exceptions.MergeFailure:
James E. Blairf8b42fb2015-02-18 09:23:36 -08001538 pass
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001539 return (True, nnfi)
James E. Blair6965a4b2014-12-16 17:19:04 -08001540 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001541 actionable = change_queue.isActionable(item)
1542 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001543 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001544 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001545 failing_reasons.append('a needed change is failing')
1546 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001547 else:
James E. Blairfef71632013-09-23 11:15:47 -07001548 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001549 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001550 item_ahead_merged = True
1551 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001552 # Our current base is different than what we expected,
1553 # and it's not because our current base merged. Something
1554 # ahead must have failed.
1555 self.log.info("Resetting builds for change %s because the "
1556 "item ahead, %s, is not the nearest non-failing "
1557 "item, %s" % (item.change, item_ahead, nnfi))
1558 change_queue.moveItem(item, nnfi)
1559 changed = True
1560 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001561 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001562 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001563 if item.current_build_set.unable_to_merge:
1564 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001565 ready = False
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001566 if actionable and ready and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001567 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001568 if self.pipeline.didAnyJobFail(item):
1569 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001570 if (not item.live) and (not item.items_behind):
1571 failing_reasons.append("is a non-live item with no items behind")
1572 self.dequeueItem(item)
1573 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001574 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1575 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001576 try:
1577 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001578 except exceptions.MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001579 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001580 for item_behind in item.items_behind:
1581 self.log.info("Resetting builds for change %s because the "
1582 "item ahead, %s, failed to merge" %
1583 (item_behind.change, item))
1584 self.cancelJobs(item_behind)
1585 self.dequeueItem(item)
1586 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001587 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001588 nnfi = item
1589 item.current_build_set.failing_reasons = failing_reasons
1590 if failing_reasons:
1591 self.log.debug("%s is a failing item because %s" %
1592 (item, failing_reasons))
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001593 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001594
1595 def processQueue(self):
1596 # Do whatever needs to be done for each change in the queue
1597 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1598 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001599 for queue in self.pipeline.queues:
1600 queue_changed = False
1601 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001602 for item in queue.queue[:]:
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001603 item_changed, nnfi = self._processOneItem(
1604 item, nnfi)
James E. Blair972e3c72013-08-29 12:04:55 -07001605 if item_changed:
1606 queue_changed = True
1607 self.reportStats(item)
1608 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001609 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001610 status = ''
1611 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001612 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001613 if status:
1614 self.log.debug("Queue %s status is now:\n %s" %
1615 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001616 self.log.debug("Finished queue processor: %s (changed: %s)" %
1617 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001618 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001619
James E. Blair11700c32012-07-05 17:50:05 -07001620 def updateBuildDescriptions(self, build_set):
1621 for build in build_set.getBuilds():
Richard Hedlind13cb40c2015-12-18 13:45:58 -07001622 try:
1623 desc = self.formatDescription(build)
1624 self.sched.launcher.setBuildDescription(build, desc)
1625 except:
1626 # Log the failure and let loop continue
1627 self.log.error("Failed to update description for build %s" %
1628 (build))
James E. Blair11700c32012-07-05 17:50:05 -07001629
1630 if build_set.previous_build_set:
1631 for build in build_set.previous_build_set.getBuilds():
Richard Hedlind13cb40c2015-12-18 13:45:58 -07001632 try:
1633 desc = self.formatDescription(build)
1634 self.sched.launcher.setBuildDescription(build, desc)
1635 except:
1636 # Log the failure and let loop continue
1637 self.log.error("Failed to update description for "
1638 "build %s in previous build set" % (build))
James E. Blair11700c32012-07-05 17:50:05 -07001639
1640 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001641 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001642 return True
1643
James E. Blairee743612012-05-29 14:49:32 -07001644 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001645 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001646 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001647
James E. Blair6b077942014-02-07 17:45:55 -08001648 self.pipeline.setResult(item, build)
James E. Blairaf17a972016-02-03 15:07:18 -08001649 self.sched.mutex.release(item, build.job)
James E. Blair6b077942014-02-07 17:45:55 -08001650 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001651 (item, item.formatStatus()))
James E. Blairee743612012-05-29 14:49:32 -07001652 return True
1653
James E. Blair4076e2b2014-01-28 12:42:20 -08001654 def onMergeCompleted(self, event):
1655 build_set = event.build_set
1656 item = build_set.item
1657 build_set.merge_state = build_set.COMPLETE
1658 build_set.zuul_url = event.zuul_url
1659 if event.merged:
1660 build_set.commit = event.commit
1661 elif event.updated:
Evgeny Antyshevb078bb42016-01-25 14:44:21 +00001662 if not isinstance(item.change, NullChange):
Yolanda Robla276996c2015-11-10 10:41:18 +01001663 build_set.commit = item.change.newrev
Evgeny Antyshevb078bb42016-01-25 14:44:21 +00001664 if not build_set.commit and not isinstance(item.change, NullChange):
James E. Blair4076e2b2014-01-28 12:42:20 -08001665 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001666 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001667
James E. Blairfee8d652013-06-07 08:57:52 -07001668 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001669 if not item.reported:
1670 # _reportItem() returns True if it failed to report.
1671 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001672 if self.changes_merge:
1673 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001674 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001675 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001676 merged = self.pipeline.source.isMerged(item.change,
1677 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001678 self.log.info("Reported change %s status: all-succeeded: %s, "
1679 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001680 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001681 if not (succeeded and merged):
1682 self.log.debug("Reported change %s failed tests or failed "
1683 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001684 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001685 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001686 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001687 raise exceptions.MergeFailure(
1688 "Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001689 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001690 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001691 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001692 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001693
1694 for trigger in self.sched.triggers.values():
1695 trigger.onChangeMerged(item.change, self.pipeline.source)
James E. Blaire0487072012-08-29 17:38:31 -07001696
James E. Blairfee8d652013-06-07 08:57:52 -07001697 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001698 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001699 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001700 if not self.pipeline.getJobs(item):
1701 # We don't send empty reports with +1,
1702 # and the same for -1's (merge failures or transient errors)
1703 # as they cannot be followed by +1's
1704 self.log.debug("No jobs for change %s" % item.change)
1705 actions = []
1706 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001707 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001708 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001709 item.setReportedResult('SUCCESS')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001710 self.pipeline._consecutive_failures = 0
Joshua Heskethb7179772014-01-30 23:30:46 +11001711 elif not self.pipeline.didMergerSucceed(item):
1712 actions = self.pipeline.merge_failure_actions
1713 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001714 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001715 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001716 item.setReportedResult('FAILURE')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001717 self.pipeline._consecutive_failures += 1
1718 if self.pipeline._disabled:
1719 actions = self.pipeline.disabled_actions
1720 # Check here if we should disable so that we only use the disabled
1721 # reporters /after/ the last disable_at failure is still reported as
1722 # normal.
1723 if (self.pipeline.disable_at and not self.pipeline._disabled and
1724 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
1725 self.pipeline._disabled = True
James E. Blaire5910202013-12-27 09:50:31 -08001726 if actions:
James E. Blaire5910202013-12-27 09:50:31 -08001727 try:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001728 self.log.info("Reporting item %s, actions: %s" %
1729 (item, actions))
1730 ret = self.sendReport(actions, self.pipeline.source, item)
James E. Blaire5910202013-12-27 09:50:31 -08001731 if ret:
Joshua Hesketh385d11e2015-09-14 14:50:01 -06001732 self.log.error("Reporting item %s received: %s" %
1733 (item, ret))
James E. Blaire5910202013-12-27 09:50:31 -08001734 except:
1735 self.log.exception("Exception while reporting:")
1736 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001737 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001738 return ret
1739
James E. Blair8b0d4c42012-08-23 16:03:05 -07001740 def formatDescription(self, build):
1741 concurrent_changes = ''
1742 concurrent_builds = ''
1743 other_builds = ''
1744
1745 for change in build.build_set.other_changes:
1746 concurrent_changes += '<li><a href="{change.url}">\
1747 {change.number},{change.patchset}</a></li>'.format(
1748 change=change)
1749
James E. Blairfee8d652013-06-07 08:57:52 -07001750 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001751
1752 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001753 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001754 concurrent_builds += """\
1755<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001756 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001757 {build.job.name} #{build.number}</a>: {build.result}
1758</li>
1759""".format(build=build)
1760 else:
1761 concurrent_builds += """\
1762<li>
1763 {build.job.name}: {build.result}
1764</li>""".format(build=build)
1765
1766 if build.build_set.previous_build_set:
1767 other_build = build.build_set.previous_build_set.getBuild(
1768 build.job.name)
1769 if other_build:
1770 other_builds += """\
1771<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001772 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001773 {build.job.name} #{build.number}</a>
1774</li>
1775""".format(build=other_build)
1776
1777 if build.build_set.next_build_set:
1778 other_build = build.build_set.next_build_set.getBuild(
1779 build.job.name)
1780 if other_build:
1781 other_builds += """\
1782<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001783 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001784 {build.job.name} #{build.number}</a>
1785</li>
1786""".format(build=other_build)
1787
1788 result = build.build_set.result
1789
1790 if hasattr(change, 'number'):
1791 ret = """\
1792<p>
1793 Triggered by change:
1794 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1795 Branch: <b>{change.branch}</b><br/>
1796 Pipeline: <b>{self.pipeline.name}</b>
1797</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001798 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001799 ret = """\
1800<p>
1801 Triggered by reference:
1802 {change.ref}</a><br/>
1803 Old revision: <b>{change.oldrev}</b><br/>
1804 New revision: <b>{change.newrev}</b><br/>
1805 Pipeline: <b>{self.pipeline.name}</b>
1806</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001807 else:
1808 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001809
1810 if concurrent_changes:
1811 ret += """\
1812<p>
1813 Other changes tested concurrently with this change:
1814 <ul>{concurrent_changes}</ul>
1815</p>
1816"""
1817 if concurrent_builds:
1818 ret += """\
1819<p>
1820 All builds for this change set:
1821 <ul>{concurrent_builds}</ul>
1822</p>
1823"""
1824
1825 if other_builds:
1826 ret += """\
1827<p>
1828 Other build sets for this change:
1829 <ul>{other_builds}</ul>
1830</p>
1831"""
1832 if result:
1833 ret += """\
1834<p>
1835 Reported result: <b>{result}</b>
1836</p>
1837"""
1838
1839 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001840 return ret
1841
James E. Blairfee8d652013-06-07 08:57:52 -07001842 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001843 if not statsd:
1844 return
1845 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001846 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001847 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001848 if item.dequeue_time:
1849 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001850 else:
1851 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001852 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001853
1854 # stats.timers.zuul.pipeline.NAME.resident_time
1855 # stats_counts.zuul.pipeline.NAME.total_changes
1856 # stats.gauges.zuul.pipeline.NAME.current_changes
1857 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001858 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001859 if dt:
1860 statsd.timing(key + '.resident_time', dt)
1861 statsd.incr(key + '.total_changes')
1862
1863 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1864 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001865 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001866 key += '.%s' % project_name
1867 if dt:
1868 statsd.timing(key + '.resident_time', dt)
1869 statsd.incr(key + '.total_changes')
1870 except:
1871 self.log.exception("Exception reporting pipeline stats")
1872
James E. Blair1e8dd892012-05-30 09:15:05 -07001873
James E. Blair0577cd62015-02-07 11:42:12 -08001874class DynamicChangeQueueContextManager(object):
1875 def __init__(self, change_queue):
1876 self.change_queue = change_queue
1877
1878 def __enter__(self):
1879 return self.change_queue
1880
1881 def __exit__(self, etype, value, tb):
1882 if self.change_queue and not self.change_queue.queue:
1883 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1884
1885
James E. Blair4aea70c2012-07-26 14:23:24 -07001886class IndependentPipelineManager(BasePipelineManager):
1887 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001888 changes_merge = False
1889
James E. Blaireff88162013-07-01 12:44:14 -04001890 def _postConfig(self, layout):
1891 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001892
James E. Blair0577cd62015-02-07 11:42:12 -08001893 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001894 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001895 if existing:
1896 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001897 if change.project not in self.pipeline.getProjects():
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001898 self.pipeline.addProject(change.project)
James E. Blairbfb8e042014-12-30 17:01:44 -08001899 change_queue = ChangeQueue(self.pipeline)
1900 change_queue.addProject(change.project)
1901 self.pipeline.addQueue(change_queue)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001902 self.log.debug("Dynamically created queue %s", change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001903 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001904
1905 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1906 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001907 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001908 if ret in [True, False]:
1909 return ret
1910 self.log.debug(" Changes %s must be merged ahead of %s" %
1911 (ret, change))
1912 for needed_change in ret:
1913 # This differs from the dependent pipeline by enqueuing
1914 # changes ahead as "not live", that is, not intended to
1915 # have jobs run. Also, pipeline requirements are always
1916 # ignored (which is safe because the changes are not
1917 # live).
1918 r = self.addChange(needed_change, quiet=True,
1919 ignore_requirements=True,
1920 live=False, change_queue=change_queue)
1921 if not r:
1922 return False
1923 return True
1924
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001925 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001926 if self.pipeline.ignore_dependencies:
1927 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001928 self.log.debug("Checking for changes needed by %s:" % change)
1929 # Return true if okay to proceed enqueing this change,
1930 # false if the change should not be enqueued.
1931 if not hasattr(change, 'needs_changes'):
1932 self.log.debug(" Changeish does not support dependencies")
1933 return True
1934 if not change.needs_changes:
1935 self.log.debug(" No changes needed")
1936 return True
1937 changes_needed = []
1938 for needed_change in change.needs_changes:
1939 self.log.debug(" Change %s needs change %s:" % (
1940 change, needed_change))
1941 if needed_change.is_merged:
1942 self.log.debug(" Needed change is merged")
1943 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001944 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001945 self.log.debug(" Needed change is already ahead in the queue")
1946 continue
1947 self.log.debug(" Change %s is needed" % needed_change)
1948 if needed_change not in changes_needed:
1949 changes_needed.append(needed_change)
1950 continue
1951 # This differs from the dependent pipeline check in not
1952 # verifying that the dependent change is mergable.
1953 if changes_needed:
1954 return changes_needed
1955 return True
1956
1957 def dequeueItem(self, item):
1958 super(IndependentPipelineManager, self).dequeueItem(item)
1959 # An independent pipeline manager dynamically removes empty
1960 # queues
1961 if not item.queue.queue:
1962 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001963
James E. Blair1e8dd892012-05-30 09:15:05 -07001964
James E. Blair0577cd62015-02-07 11:42:12 -08001965class StaticChangeQueueContextManager(object):
1966 def __init__(self, change_queue):
1967 self.change_queue = change_queue
1968
1969 def __enter__(self):
1970 return self.change_queue
1971
1972 def __exit__(self, etype, value, tb):
1973 pass
1974
1975
James E. Blair4aea70c2012-07-26 14:23:24 -07001976class DependentPipelineManager(BasePipelineManager):
1977 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001978 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001979
1980 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001981 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001982
James E. Blaireff88162013-07-01 12:44:14 -04001983 def _postConfig(self, layout):
1984 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001985 self.buildChangeQueues()
1986
1987 def buildChangeQueues(self):
1988 self.log.debug("Building shared change queues")
1989 change_queues = []
1990
James E. Blair4aea70c2012-07-26 14:23:24 -07001991 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001992 change_queue = ChangeQueue(
1993 self.pipeline,
1994 window=self.pipeline.window,
1995 window_floor=self.pipeline.window_floor,
1996 window_increase_type=self.pipeline.window_increase_type,
1997 window_increase_factor=self.pipeline.window_increase_factor,
1998 window_decrease_type=self.pipeline.window_decrease_type,
1999 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07002000 change_queue.addProject(project)
2001 change_queues.append(change_queue)
2002 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07002003
James E. Blairc3d428e2013-12-03 15:06:48 -08002004 # Iterate over all queues trying to combine them, and keep doing
2005 # so until they can not be combined further.
2006 last_change_queues = change_queues
2007 while True:
2008 new_change_queues = self.combineChangeQueues(last_change_queues)
2009 if len(last_change_queues) == len(new_change_queues):
2010 break
2011 last_change_queues = new_change_queues
2012
2013 self.log.info(" Shared change queues:")
2014 for queue in new_change_queues:
2015 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08002016 self.log.info(" %s containing %s" % (
2017 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08002018
2019 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07002020 self.log.debug("Combining shared queues")
2021 new_change_queues = []
2022 for a in change_queues:
2023 merged_a = False
2024 for b in new_change_queues:
2025 if not a.getJobs().isdisjoint(b.getJobs()):
2026 self.log.debug("Merging queue %s into %s" % (a, b))
2027 b.mergeChangeQueue(a)
2028 merged_a = True
2029 break # this breaks out of 'for b' and continues 'for a'
2030 if not merged_a:
2031 self.log.debug("Keeping queue %s" % (a))
2032 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08002033 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07002034
James E. Blair0577cd62015-02-07 11:42:12 -08002035 def getChangeQueue(self, change, existing=None):
2036 if existing:
2037 return StaticChangeQueueContextManager(existing)
2038 return StaticChangeQueueContextManager(
2039 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08002040
James E. Blaire0487072012-08-29 17:38:31 -07002041 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07002042 if not self.pipeline.source.canMerge(change,
2043 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002044 self.log.debug("Change %s can not merge, ignoring" % change)
2045 return False
2046 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07002047
James E. Blair5ee24252014-12-30 10:12:29 -08002048 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
2049 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07002050 to_enqueue = []
2051 self.log.debug("Checking for changes needing %s:" % change)
2052 if not hasattr(change, 'needed_by_changes'):
2053 self.log.debug(" Changeish does not support dependencies")
2054 return
James E. Blair5ee24252014-12-30 10:12:29 -08002055 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08002056 with self.getChangeQueue(other_change) as other_change_queue:
2057 if other_change_queue != change_queue:
2058 self.log.debug(" Change %s in project %s can not be "
2059 "enqueued in the target queue %s" %
2060 (other_change, other_change.project,
2061 change_queue))
2062 continue
James E. Blair5ee24252014-12-30 10:12:29 -08002063 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07002064 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002065 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08002066 (other_change, change))
2067 to_enqueue.append(other_change)
2068
James E. Blaire0487072012-08-29 17:38:31 -07002069 if not to_enqueue:
2070 self.log.debug(" No changes need %s" % change)
2071
2072 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07002073 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002074 ignore_requirements=ignore_requirements,
2075 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002076
James E. Blair5ee24252014-12-30 10:12:29 -08002077 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
2078 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002079 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002080 if ret in [True, False]:
2081 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08002082 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07002083 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08002084 for needed_change in ret:
2085 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002086 ignore_requirements=ignore_requirements,
2087 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08002088 if not r:
2089 return False
2090 return True
James E. Blaire0487072012-08-29 17:38:31 -07002091
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002092 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002093 self.log.debug("Checking for changes needed by %s:" % change)
2094 # Return true if okay to proceed enqueing this change,
2095 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002096 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002097 self.log.debug(" Changeish does not support dependencies")
2098 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002099 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002100 self.log.debug(" No changes needed")
2101 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002102 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002103 # Ignore supplied change_queue
2104 with self.getChangeQueue(change) as change_queue:
2105 for needed_change in change.needs_changes:
2106 self.log.debug(" Change %s needs change %s:" % (
2107 change, needed_change))
2108 if needed_change.is_merged:
2109 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002110 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002111 with self.getChangeQueue(needed_change) as needed_change_queue:
2112 if needed_change_queue != change_queue:
2113 self.log.debug(" Change %s in project %s does not "
2114 "share a change queue with %s "
2115 "in project %s" %
2116 (needed_change, needed_change.project,
2117 change, change.project))
2118 return False
2119 if not needed_change.is_current_patchset:
2120 self.log.debug(" Needed change is not the "
2121 "current patchset")
2122 return False
2123 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2124 self.log.debug(" Needed change is already ahead "
2125 "in the queue")
2126 continue
2127 if self.pipeline.source.canMerge(needed_change,
2128 self.getSubmitAllowNeeds()):
2129 self.log.debug(" Change %s is needed" % needed_change)
2130 if needed_change not in changes_needed:
2131 changes_needed.append(needed_change)
2132 continue
2133 # The needed change can't be merged.
2134 self.log.debug(" Change %s is needed but can not be merged" %
2135 needed_change)
2136 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002137 if changes_needed:
2138 return changes_needed
2139 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002140
James E. Blair6965a4b2014-12-16 17:19:04 -08002141 def getFailingDependentItems(self, item):
2142 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002143 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002144 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002145 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002146 failing_items = set()
2147 for needed_change in item.change.needs_changes:
2148 needed_item = self.getItemForChange(needed_change)
2149 if not needed_item:
2150 continue
2151 if needed_item.current_build_set.failing_reasons:
2152 failing_items.add(needed_item)
2153 if failing_items:
2154 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002155 return None