blob: 0836ab40626ad6d0f125fc79775cb73d28a22697 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
Joshua Hesketh70b13492015-08-11 23:40:42 +100033from model import ChangeishFilter, NullChange
Joshua Hesketh352264b2015-08-11 23:42:08 +100034from zuul import change_matcher, exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blair468c8512013-12-06 13:27:19 -080062class ManagementEvent(object):
63 """An event that should be processed within the main queue run loop"""
64 def __init__(self):
65 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080066 self._exception = None
67 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080068
James E. Blair36658cf2013-12-06 17:53:48 -080069 def exception(self, e, tb):
70 self._exception = e
71 self._traceback = tb
72 self._wait_event.set()
73
74 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080075 self._wait_event.set()
76
77 def wait(self, timeout=None):
78 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080079 if self._exception:
80 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080081 return self._wait_event.is_set()
82
83
84class ReconfigureEvent(ManagementEvent):
85 """Reconfigure the scheduler. The layout will be (re-)loaded from
86 the path specified in the configuration.
87
88 :arg ConfigParser config: the new configuration
89 """
90 def __init__(self, config):
91 super(ReconfigureEvent, self).__init__()
92 self.config = config
93
94
James E. Blair36658cf2013-12-06 17:53:48 -080095class PromoteEvent(ManagementEvent):
96 """Promote one or more changes to the head of the queue.
97
98 :arg str pipeline_name: the name of the pipeline
99 :arg list change_ids: a list of strings of change ids in the form
100 1234,1
101 """
102
103 def __init__(self, pipeline_name, change_ids):
104 super(PromoteEvent, self).__init__()
105 self.pipeline_name = pipeline_name
106 self.change_ids = change_ids
107
108
James E. Blaird27a96d2014-07-10 13:25:13 -0700109class EnqueueEvent(ManagementEvent):
110 """Enqueue a change into a pipeline
111
112 :arg TriggerEvent trigger_event: a TriggerEvent describing the
113 trigger, pipeline, and change to enqueue
114 """
115
116 def __init__(self, trigger_event):
117 super(EnqueueEvent, self).__init__()
118 self.trigger_event = trigger_event
119
120
James E. Blaira84f0e42014-02-06 07:09:22 -0800121class ResultEvent(object):
122 """An event that needs to modify the pipeline state due to a
123 result from an external system."""
124
125 pass
126
127
128class BuildStartedEvent(ResultEvent):
129 """A build has started.
130
131 :arg Build build: The build which has started.
132 """
133
134 def __init__(self, build):
135 self.build = build
136
137
138class BuildCompletedEvent(ResultEvent):
139 """A build has completed
140
141 :arg Build build: The build which has completed.
142 """
143
144 def __init__(self, build):
145 self.build = build
146
147
James E. Blair4076e2b2014-01-28 12:42:20 -0800148class MergeCompletedEvent(ResultEvent):
149 """A remote merge operation has completed
150
151 :arg BuildSet build_set: The build_set which is ready.
152 :arg str zuul_url: The URL of the Zuul Merger.
153 :arg bool merged: Whether the merge succeeded (changes with refs).
154 :arg bool updated: Whether the repo was updated (changes without refs).
155 :arg str commit: The SHA of the merged commit (changes with refs).
156 """
157
158 def __init__(self, build_set, zuul_url, merged, updated, commit):
159 self.build_set = build_set
160 self.zuul_url = zuul_url
161 self.merged = merged
162 self.updated = updated
163 self.commit = commit
164
165
Maru Newby3fe5f852015-01-13 04:22:14 +0000166def toList(item):
167 if not item:
168 return []
169 if isinstance(item, list):
170 return item
171 return [item]
172
173
James E. Blaire9d45c32012-05-31 09:56:45 -0700174class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700175 log = logging.getLogger("zuul.Scheduler")
176
Joshua Hesketh352264b2015-08-11 23:42:08 +1000177 def __init__(self, config):
James E. Blaire9d45c32012-05-31 09:56:45 -0700178 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400179 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700180 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700181 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800182 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700183 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700184 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700185 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700186 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800187 self.merger = None
Joshua Hesketh352264b2015-08-11 23:42:08 +1000188 self.connections = dict()
189 # Despite triggers being part of the pipeline, there is one trigger set
190 # per scheduler. The pipeline handles the trigger filters but since
191 # the events are handled by the scheduler itself it needs to handle
192 # the loading of the triggers.
193 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700194 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000195 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700196
197 self.trigger_event_queue = Queue.Queue()
198 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800199 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400200 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700201
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000202 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400203 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400204
James E. Blairb0fcae42012-07-17 11:12:10 -0700205 def stop(self):
206 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000207 self._unloadDrivers()
208 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700209 self.wake_event.set()
210
Joshua Hesketh352264b2015-08-11 23:42:08 +1000211 def testConfig(self, config_path, connections):
212 # Take the list of set up connections directly here rather than with
213 # registerConnections as we don't want to do the onLoad event yet.
214 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800215
Maru Newby3fe5f852015-01-13 04:22:14 +0000216 def _parseSkipIf(self, config_job):
217 cm = change_matcher
218 skip_matchers = []
219
220 for config_skip in config_job.get('skip-if', []):
221 nested_matchers = []
222
223 project_regex = config_skip.get('project')
224 if project_regex:
225 nested_matchers.append(cm.ProjectMatcher(project_regex))
226
227 branch_regex = config_skip.get('branch')
228 if branch_regex:
229 nested_matchers.append(cm.BranchMatcher(branch_regex))
230
231 file_regexes = toList(config_skip.get('all-files-match-any'))
232 if file_regexes:
233 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
234 all_files_matcher = cm.MatchAllFiles(file_matchers)
235 nested_matchers.append(all_files_matcher)
236
237 # All patterns need to match a given skip-if predicate
238 skip_matchers.append(cm.MatchAll(nested_matchers))
239
240 if skip_matchers:
241 # Any skip-if predicate can be matched to trigger a skip
242 return cm.MatchAny(skip_matchers)
243
Joshua Hesketh352264b2015-08-11 23:42:08 +1000244 def registerConnections(self, connections):
245 self.connections = connections
246 for connection_name, connection in self.connections.items():
247 connection.registerScheduler(self)
248 connection.onLoad()
249
250 def stopConnections(self):
251 for connection_name, connection in self.connections.items():
252 connection.onStop()
253
254 def _unloadDrivers(self):
255 for trigger in self.triggers.values():
256 trigger.stop()
257 for pipeline in self.layout.pipelines.values():
258 pipeline.source.stop()
259 for action in ['start_actions', 'success_actions',
260 'failure_actions', 'merge_failure_actions']:
261 for action_reporter in pipeline.__getattribute__(action):
262 action_reporter.reporter.stop()
263
264 def _getDriver(self, dtype, connection_name, driver_config={}):
265 # Instantiate a driver such as a trigger, source or reporter
266 # TODO(jhesketh): Make this list dynamic or use entrypoints etc.
267 # Stevedore was not a good fit here due to the nature of triggers.
268 # Specifically we don't want to load a trigger per a pipeline as one
269 # trigger can listen to a stream (from gerrit, for example) and the
270 # scheduler decides which eventfilter to use. As such we want to load
271 # trigger+connection pairs uniquely.
272 drivers = {
273 'source': {
274 'gerrit': 'zuul.source.gerrit:GerritSource',
275 },
276 'trigger': {
277 'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
278 'timer': 'zuul.trigger.timer:TimerTrigger',
279 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
280 },
281 'reporter': {
282 'gerrit': 'zuul.reporter.gerrit:GerritReporter',
283 'smtp': 'zuul.reporter.smtp:SMTPReporter',
284 },
285 }
286
287 # TODO(jhesketh): Check the connection_name exists
288 if connection_name in self.connections.keys():
289 driver_name = self.connections[connection_name].driver_name
290 connection = self.connections[connection_name]
291 else:
292 # In some cases a driver may not be related to a connection. For
293 # example, the 'timer' or 'zuul' triggers.
294 driver_name = connection_name
295 connection = None
296 driver = drivers[dtype][driver_name].split(':')
297 driver_instance = getattr(
298 __import__(driver[0], fromlist=['']), driver[1])(
299 driver_config, self, connection
300 )
301
302 return driver_instance
303
304 def _getSourceDriver(self, connection_name):
305 return self._getDriver('source', connection_name)
306
307 def _getReporterDriver(self, connection_name, driver_config={}):
308 return self._getDriver('reporter', connection_name, driver_config)
309
310 def _getTriggerDriver(self, connection_name, driver_config={}):
311 return self._getDriver('trigger', connection_name, driver_config)
312
313 def _parseConfig(self, config_path, connections):
James E. Blaireff88162013-07-01 12:44:14 -0400314 layout = model.Layout()
315 project_templates = {}
316
James E. Blaire5a847f2012-07-10 15:29:14 -0700317 if config_path:
318 config_path = os.path.expanduser(config_path)
319 if not os.path.exists(config_path):
320 raise Exception("Unable to read layout config file at %s" %
321 config_path)
Einst Crazyff9837b2015-11-17 17:32:37 +0800322 with open(config_path) as config_file:
323 data = yaml.load(config_file)
James E. Blaire5a847f2012-07-10 15:29:14 -0700324
James E. Blair47958382013-01-10 17:26:02 -0800325 validator = layoutvalidator.LayoutValidator()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000326 validator.validate(data, connections)
James E. Blair47958382013-01-10 17:26:02 -0800327
James E. Blaireff88162013-07-01 12:44:14 -0400328 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700329 for include in data.get('includes', []):
330 if 'python-file' in include:
331 fn = include['python-file']
332 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100333 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700334 fn = os.path.join(base, fn)
335 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400336 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700337
James E. Blair4aea70c2012-07-26 14:23:24 -0700338 for conf_pipeline in data.get('pipelines', []):
339 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800340 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700341 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000342 pipeline.source = self._getSourceDriver(
343 conf_pipeline.get('source', 'gerrit'))
James E. Blair64ed6f22013-07-10 14:07:23 -0700344 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
345 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800346 pipeline.failure_message = conf_pipeline.get('failure-message',
347 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100348 pipeline.merge_failure_message = conf_pipeline.get(
Jeremy Stanley1c2c3c22015-06-15 21:23:19 +0000349 'merge-failure-message', "Merge Failed.\n\nThis change or one "
350 "of its cross-repo dependencies was unable to be "
351 "automatically merged with the current state of its "
352 "repository. Please rebase the change and upload a new "
Joshua Heskethb7179772014-01-30 23:30:46 +1100353 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800354 pipeline.success_message = conf_pipeline.get('success-message',
355 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100356 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800357 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700358 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800359 pipeline.ignore_dependencies = conf_pipeline.get(
360 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000361
362 action_reporters = {}
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100363 for action in ['start', 'success', 'failure', 'merge-failure',
364 'disabled']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000365 action_reporters[action] = []
366 if conf_pipeline.get(action):
367 for reporter_name, params \
368 in conf_pipeline.get(action).items():
Joshua Hesketh352264b2015-08-11 23:42:08 +1000369 reporter = self._getReporterDriver(reporter_name,
370 params)
371 action_reporters[action].append(ActionReporter(
372 reporter))
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000373 pipeline.start_actions = action_reporters['start']
374 pipeline.success_actions = action_reporters['success']
375 pipeline.failure_actions = action_reporters['failure']
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100376 pipeline.disabled_actions = action_reporters['disabled']
Joshua Heskethb7179772014-01-30 23:30:46 +1100377 if len(action_reporters['merge-failure']) > 0:
378 pipeline.merge_failure_actions = \
379 action_reporters['merge-failure']
380 else:
381 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000382
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100383 pipeline.disable_at = conf_pipeline.get(
384 'disable-after-consecutive-failures', None)
385
Clark Boylan7603a372014-01-21 11:43:20 -0800386 pipeline.window = conf_pipeline.get('window', 20)
387 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
388 pipeline.window_increase_type = conf_pipeline.get(
389 'window-increase-type', 'linear')
390 pipeline.window_increase_factor = conf_pipeline.get(
391 'window-increase-factor', 1)
392 pipeline.window_decrease_type = conf_pipeline.get(
393 'window-decrease-type', 'exponential')
394 pipeline.window_decrease_factor = conf_pipeline.get(
395 'window-decrease-factor', 2)
396
James E. Blair4aea70c2012-07-26 14:23:24 -0700397 manager = globals()[conf_pipeline['manager']](self, pipeline)
398 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400399 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000400
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000401 if 'require' in conf_pipeline or 'reject' in conf_pipeline:
402 require = conf_pipeline.get('require', {})
403 reject = conf_pipeline.get('reject', {})
Clark Boylana9702ad2014-05-08 17:17:24 -0700404 f = ChangeishFilter(
405 open=require.get('open'),
406 current_patchset=require.get('current-patchset'),
407 statuses=toList(require.get('status')),
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000408 required_approvals=toList(require.get('approval')),
409 reject_approvals=toList(reject.get('approval'))
410 )
James E. Blair11041d22014-05-02 14:49:53 -0700411 manager.changeish_filters.append(f)
412
Joshua Hesketh352264b2015-08-11 23:42:08 +1000413 for trigger_name, trigger_config\
414 in conf_pipeline.get('trigger').items():
415 if trigger_name not in self.triggers.keys():
416 self.triggers[trigger_name] = \
417 self._getTriggerDriver(trigger_name, trigger_config)
418
419 for trigger_name, trigger in self.triggers.items():
420 if trigger_name in conf_pipeline['trigger']:
421 manager.event_filters += trigger.getEventFilters(
422 conf_pipeline['trigger'][trigger_name])
James E. Blairee743612012-05-29 14:49:32 -0700423
Antoine Musso80edd5a2013-02-13 15:37:53 +0100424 for project_template in data.get('project-templates', []):
425 # Make sure the template only contains valid pipelines
426 tpl = dict(
427 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400428 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100429 if pipe_name in project_template
430 )
James E. Blaireff88162013-07-01 12:44:14 -0400431 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100432
James E. Blair47958382013-01-10 17:26:02 -0800433 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400434 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700435 # Be careful to only set attributes explicitly present on
436 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800437 m = config_job.get('queue-name', None)
438 if m:
439 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700440 m = config_job.get('failure-message', None)
441 if m:
442 job.failure_message = m
443 m = config_job.get('success-message', None)
444 if m:
445 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800446 m = config_job.get('failure-pattern', None)
447 if m:
448 job.failure_pattern = m
449 m = config_job.get('success-pattern', None)
450 if m:
451 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700452 m = config_job.get('hold-following-changes', False)
453 if m:
454 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700455 m = config_job.get('voting', None)
456 if m is not None:
457 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700458 fname = config_job.get('parameter-function', None)
459 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400460 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700461 if not func:
462 raise Exception("Unable to find function %s" % fname)
463 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700464 branches = toList(config_job.get('branch'))
465 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700466 job._branches = branches
467 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800468 files = toList(config_job.get('files'))
469 if files:
470 job._files = files
471 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000472 skip_if_matcher = self._parseSkipIf(config_job)
473 if skip_if_matcher:
474 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100475 swift = toList(config_job.get('swift'))
476 if swift:
477 for s in swift:
478 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700479
480 def add_jobs(job_tree, config_jobs):
481 for job in config_jobs:
482 if isinstance(job, list):
483 for x in job:
484 add_jobs(job_tree, x)
485 if isinstance(job, dict):
486 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400487 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700488 add_jobs(parent_tree, children)
489 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400490 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700491
James E. Blair47958382013-01-10 17:26:02 -0800492 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700493 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800494 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100495
James E. Blair3e98c022013-12-16 15:25:38 -0800496 # This is reversed due to the prepend operation below, so
497 # the ultimate order is templates (in order) followed by
498 # statically defined jobs.
499 for requested_template in reversed(
500 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100501 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400502 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100503 requested_template.get('name'))
504 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800505 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100506 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800507 # Finally merge the expansion with whatever has been
508 # already defined for this project. Prepend our new
509 # jobs to existing ones (which may have been
510 # statically defined or defined by other templates).
511 for pipeline in layout.pipelines.values():
512 if pipeline.name in expanded:
513 config_project.update(
514 {pipeline.name: expanded[pipeline.name] +
515 config_project.get(pipeline.name, [])})
Antoine Musso80edd5a2013-02-13 15:37:53 +0100516
James E. Blaireff88162013-07-01 12:44:14 -0400517 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700518 mode = config_project.get('merge-mode', 'merge-resolve')
519 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400520 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700521 if pipeline.name in config_project:
522 job_tree = pipeline.addProject(project)
523 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700524 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700525
James E. Blairb0954652012-06-01 11:32:01 -0700526 # All jobs should be defined at this point, get rid of
527 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700528 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700529
James E. Blaireff88162013-07-01 12:44:14 -0400530 for pipeline in layout.pipelines.values():
531 pipeline.manager._postConfig(layout)
532
533 return layout
James E. Blairee743612012-05-29 14:49:32 -0700534
James E. Blairee743612012-05-29 14:49:32 -0700535 def setLauncher(self, launcher):
536 self.launcher = launcher
537
James E. Blair4076e2b2014-01-28 12:42:20 -0800538 def setMerger(self, merger):
539 self.merger = merger
540
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000541 def getProject(self, name, create_foreign=False):
James E. Blaircdccd972013-07-01 12:10:22 -0700542 self.layout_lock.acquire()
543 p = None
544 try:
545 p = self.layout.projects.get(name)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000546 if p is None and create_foreign:
547 self.log.info("Registering foreign project: %s" % name)
548 p = Project(name, foreign=True)
549 self.layout.projects[name] = p
James E. Blaircdccd972013-07-01 12:10:22 -0700550 finally:
551 self.layout_lock.release()
552 return p
553
James E. Blairee743612012-05-29 14:49:32 -0700554 def addEvent(self, event):
555 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800556 try:
557 if statsd:
558 statsd.incr('gerrit.event.%s' % event.type)
559 except:
560 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700561 self.trigger_event_queue.put(event)
562 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800563 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700564
James E. Blair11700c32012-07-05 17:50:05 -0700565 def onBuildStarted(self, build):
566 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800567 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800568 event = BuildStartedEvent(build)
569 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700570 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800571 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700572
James E. Blairf0358662015-07-20 15:19:12 -0700573 def onBuildCompleted(self, build, result):
574 self.log.debug("Adding complete event for build: %s result: %s" % (
575 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800576 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700577 # Note, as soon as the result is set, other threads may act
578 # upon this, even though the event hasn't been fully
579 # processed. Ensure that any other data from the event (eg,
580 # timing) is recorded before setting the result.
581 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800582 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700583 if statsd and build.pipeline:
584 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500585 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
Bruno Tavaresf564b282015-10-15 15:20:51 -0300586 statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500587 for label in build.node_labels:
588 # Jenkins includes the node name in its list of labels, so
589 # we filter it out here, since that is not statistically
590 # interesting.
591 if label == build.node_name:
592 continue
593 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800594 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
595 build.pipeline.name, label)
Timothy Chavezb2332082015-08-07 20:08:04 -0500596 statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700597 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
598 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800599 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
600 dt = int((build.end_time - build.start_time) * 1000)
601 statsd.timing(key, dt)
602 statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800603
604 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
605 build.pipeline.name, jobname)
606 dt = int((build.start_time - build.launch_time) * 1000)
607 statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800608 except:
609 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800610 event = BuildCompletedEvent(build)
611 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700612 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800613 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700614
James E. Blair4076e2b2014-01-28 12:42:20 -0800615 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
616 self.log.debug("Adding merge complete event for build set: %s" %
617 build_set)
618 event = MergeCompletedEvent(build_set, zuul_url,
619 merged, updated, commit)
620 self.result_event_queue.put(event)
621 self.wake_event.set()
622
James E. Blaire9d45c32012-05-31 09:56:45 -0700623 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700624 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800625 event = ReconfigureEvent(config)
626 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700627 self.wake_event.set()
628 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800629 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700630 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400631 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700632
James E. Blair36658cf2013-12-06 17:53:48 -0800633 def promote(self, pipeline_name, change_ids):
634 event = PromoteEvent(pipeline_name, change_ids)
635 self.management_event_queue.put(event)
636 self.wake_event.set()
637 self.log.debug("Waiting for promotion")
638 event.wait()
639 self.log.debug("Promotion complete")
640
James E. Blaird27a96d2014-07-10 13:25:13 -0700641 def enqueue(self, trigger_event):
642 event = EnqueueEvent(trigger_event)
643 self.management_event_queue.put(event)
644 self.wake_event.set()
645 self.log.debug("Waiting for enqueue")
646 event.wait()
647 self.log.debug("Enqueue complete")
648
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700649 def exit(self):
650 self.log.debug("Prepare to exit")
651 self._pause = True
652 self._exit = True
653 self.wake_event.set()
654 self.log.debug("Waiting for exit")
655
656 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700657 if self.config.has_option('zuul', 'state_dir'):
658 state_dir = os.path.expanduser(self.config.get('zuul',
659 'state_dir'))
660 else:
661 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700662 return os.path.join(state_dir, 'queue.pickle')
663
664 def _save_queue(self):
665 pickle_file = self._get_queue_pickle_file()
666 events = []
667 while not self.trigger_event_queue.empty():
668 events.append(self.trigger_event_queue.get())
669 self.log.debug("Queue length is %s" % len(events))
670 if events:
671 self.log.debug("Saving queue")
672 pickle.dump(events, open(pickle_file, 'wb'))
673
674 def _load_queue(self):
675 pickle_file = self._get_queue_pickle_file()
676 if os.path.exists(pickle_file):
677 self.log.debug("Loading queue")
678 events = pickle.load(open(pickle_file, 'rb'))
679 self.log.debug("Queue length is %s" % len(events))
680 for event in events:
681 self.trigger_event_queue.put(event)
682 else:
683 self.log.debug("No queue file found")
684
685 def _delete_queue(self):
686 pickle_file = self._get_queue_pickle_file()
687 if os.path.exists(pickle_file):
688 self.log.debug("Deleting saved queue")
689 os.unlink(pickle_file)
690
691 def resume(self):
692 try:
693 self._load_queue()
694 except:
695 self.log.exception("Unable to load queue")
696 try:
697 self._delete_queue()
698 except:
699 self.log.exception("Unable to delete saved queue")
700 self.log.debug("Resuming queue processing")
701 self.wake_event.set()
702
703 def _doPauseEvent(self):
704 if self._exit:
705 self.log.debug("Exiting")
706 self._save_queue()
707 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700708
James E. Blair468c8512013-12-06 13:27:19 -0800709 def _doReconfigureEvent(self, event):
710 # This is called in the scheduler loop after another thread submits
711 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700712 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800713 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700714 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700715 self.log.debug("Performing reconfiguration")
Joshua Hesketh352264b2015-08-11 23:42:08 +1000716 self._unloadDrivers()
James E. Blaircdccd972013-07-01 12:10:22 -0700717 layout = self._parseConfig(
Joshua Hesketh352264b2015-08-11 23:42:08 +1000718 self.config.get('zuul', 'layout_config'), self.connections)
James E. Blaircdccd972013-07-01 12:10:22 -0700719 for name, new_pipeline in layout.pipelines.items():
720 old_pipeline = self.layout.pipelines.get(name)
721 if not old_pipeline:
722 if self.layout.pipelines:
723 # Don't emit this warning on startup
724 self.log.warning("No old pipeline matching %s found "
725 "when reconfiguring" % name)
726 continue
James E. Blairdad52252014-02-07 16:59:17 -0800727 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700728 items_to_remove = []
James E. Blair400e8fd2015-07-30 17:44:45 -0700729 builds_to_cancel = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800730 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700731 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700732 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800733 if not item.item_ahead:
734 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700735 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700736 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700737 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800738 item.queue = None
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000739 project_name = item.change.project.name
740 item.change.project = layout.projects.get(project_name)
741 if not item.change.project:
742 self.log.debug("Project %s not defined, "
743 "re-instantiating as foreign" %
744 project_name)
745 project = Project(project_name, foreign=True)
746 layout.projects[project_name] = project
747 item.change.project = project
James E. Blairfe707d12015-08-05 15:18:15 -0700748 item_jobs = new_pipeline.getJobs(item)
James E. Blairdad52252014-02-07 16:59:17 -0800749 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800750 job = layout.jobs.get(build.job.name)
James E. Blairfe707d12015-08-05 15:18:15 -0700751 if job and job in item_jobs:
James E. Blair6b077942014-02-07 17:45:55 -0800752 build.job = job
753 else:
James E. Blair400e8fd2015-07-30 17:44:45 -0700754 item.removeBuild(build)
755 builds_to_cancel.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800756 if not new_pipeline.manager.reEnqueueItem(item,
757 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700758 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800759 for item in items_to_remove:
760 for build in item.current_build_set.getBuilds():
James E. Blair400e8fd2015-07-30 17:44:45 -0700761 builds_to_cancel.append(build)
762 for build in builds_to_cancel:
James E. Blair6b077942014-02-07 17:45:55 -0800763 self.log.warning(
764 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800765 try:
766 self.launcher.cancel(build)
767 except Exception:
768 self.log.exception(
769 "Exception while canceling build %s "
770 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700771 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700772 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700773 for trigger in self.triggers.values():
774 trigger.postConfig()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000775 for pipeline in self.layout.pipelines.values():
776 pipeline.source.postConfig()
777 for action in ['start_actions', 'success_actions',
778 'failure_actions', 'merge_failure_actions']:
779 for action_reporter in pipeline.__getattribute__(action):
780 action_reporter.reporter.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700781 if statsd:
782 try:
783 for pipeline in self.layout.pipelines.values():
784 items = len(pipeline.getAllItems())
785 # stats.gauges.zuul.pipeline.NAME.current_changes
786 key = 'zuul.pipeline.%s' % pipeline.name
787 statsd.gauge(key + '.current_changes', items)
788 except Exception:
789 self.log.exception("Exception reporting initial "
790 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700791 finally:
792 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700793
James E. Blair36658cf2013-12-06 17:53:48 -0800794 def _doPromoteEvent(self, event):
795 pipeline = self.layout.pipelines[event.pipeline_name]
796 change_ids = [c.split(',') for c in event.change_ids]
797 items_to_enqueue = []
798 change_queue = None
799 for shared_queue in pipeline.queues:
800 if change_queue:
801 break
802 for item in shared_queue.queue:
803 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000804 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800805 change_queue = shared_queue
806 break
807 if not change_queue:
808 raise Exception("Unable to find shared change queue for %s" %
809 event.change_ids[0])
810 for number, patchset in change_ids:
811 found = False
812 for item in change_queue.queue:
813 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000814 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800815 found = True
816 items_to_enqueue.append(item)
817 break
818 if not found:
819 raise Exception("Unable to find %s,%s in queue %s" %
820 (number, patchset, change_queue))
821 for item in change_queue.queue[:]:
822 if item not in items_to_enqueue:
823 items_to_enqueue.append(item)
824 pipeline.manager.cancelJobs(item)
825 pipeline.manager.dequeueItem(item)
826 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500827 pipeline.manager.addChange(
828 item.change,
829 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700830 quiet=True,
831 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800832
James E. Blaird27a96d2014-07-10 13:25:13 -0700833 def _doEnqueueEvent(self, event):
834 project = self.layout.projects.get(event.project_name)
835 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700836 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700837 self.log.debug("Event %s for change %s was directly assigned "
838 "to pipeline %s" % (event, change, self))
839 self.log.info("Adding %s, %s to %s" %
840 (project, change, pipeline))
841 pipeline.manager.addChange(change, ignore_requirements=True)
842
James E. Blaire9d45c32012-05-31 09:56:45 -0700843 def _areAllBuildsComplete(self):
844 self.log.debug("Checking if all builds are complete")
845 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800846 if self.merger.areMergesOutstanding():
847 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400848 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800849 for item in pipeline.getAllItems():
850 for build in item.current_build_set.getBuilds():
851 if build.result is None:
852 self.log.debug("%s waiting on %s" %
853 (pipeline.manager, build))
854 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700855 if not waiting:
856 self.log.debug("All builds are complete")
857 return True
858 self.log.debug("All builds are not complete")
859 return False
860
James E. Blairee743612012-05-29 14:49:32 -0700861 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800862 if statsd:
863 self.log.debug("Statsd enabled")
864 else:
865 self.log.debug("Statsd disabled because python statsd "
866 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700867 while True:
868 self.log.debug("Run handler sleeping")
869 self.wake_event.wait()
870 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700871 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800872 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700873 return
James E. Blairee743612012-05-29 14:49:32 -0700874 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800875 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700876 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800877 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800878 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700879
James E. Blair263fba92013-02-27 13:07:19 -0800880 # Give result events priority -- they let us stop builds,
881 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800882 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700883 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800884
885 if not self._pause:
886 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800887 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700888
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700889 if self._pause and self._areAllBuildsComplete():
890 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700891
James E. Blaira84f0e42014-02-06 07:09:22 -0800892 for pipeline in self.layout.pipelines.values():
893 while pipeline.manager.processQueue():
894 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700895
James E. Blaira84f0e42014-02-06 07:09:22 -0800896 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700897 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800898 # There may still be more events to process
899 self.wake_event.set()
900 finally:
901 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700902
James E. Blair0e933c52013-07-11 10:18:52 -0700903 def maintainTriggerCache(self):
904 relevant = set()
905 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700906 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700907 for item in pipeline.getAllItems():
908 relevant.add(item.change)
909 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh352264b2015-08-11 23:42:08 +1000910 pipeline.source.maintainCache(relevant)
James E. Blairfadc6e12013-08-21 18:23:15 -0700911 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700912 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -0700913
James E. Blairee743612012-05-29 14:49:32 -0700914 def process_event_queue(self):
915 self.log.debug("Fetching trigger event")
916 event = self.trigger_event_queue.get()
917 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800918 try:
919 project = self.layout.projects.get(event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800920
James E. Blaira84f0e42014-02-06 07:09:22 -0800921 for pipeline in self.layout.pipelines.values():
Joshua Hesketh352264b2015-08-11 23:42:08 +1000922 # Get the change even if the project is unknown to us for the
923 # use of updating the cache if there is another change
924 # depending on this foreign one.
925 try:
926 change = pipeline.source.getChange(event, project)
927 except exceptions.ChangeNotFound as e:
928 self.log.debug("Unable to get change %s from source %s. "
929 "(most likely looking for a change from "
930 "another connection trigger)",
931 e.change, pipeline.source)
932 continue
933 if not project or project.foreign:
934 self.log.debug("Project %s not found" % event.project_name)
935 continue
James E. Blaira84f0e42014-02-06 07:09:22 -0800936 if event.type == 'patchset-created':
937 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100938 elif event.type == 'change-abandoned':
939 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800940 if pipeline.manager.eventMatches(event, change):
941 self.log.info("Adding %s, %s to %s" %
942 (project, change, pipeline))
943 pipeline.manager.addChange(change)
944 finally:
James E. Blairff791972013-01-09 11:45:43 -0800945 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700946
James E. Blair468c8512013-12-06 13:27:19 -0800947 def process_management_queue(self):
948 self.log.debug("Fetching management event")
949 event = self.management_event_queue.get()
950 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800951 try:
952 if isinstance(event, ReconfigureEvent):
953 self._doReconfigureEvent(event)
954 elif isinstance(event, PromoteEvent):
955 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700956 elif isinstance(event, EnqueueEvent):
957 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800958 else:
959 self.log.error("Unable to handle event %s" % event)
960 event.done()
961 except Exception as e:
962 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800963 self.management_event_queue.task_done()
964
James E. Blairee743612012-05-29 14:49:32 -0700965 def process_result_queue(self):
966 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800967 event = self.result_event_queue.get()
968 self.log.debug("Processing result event %s" % event)
969 try:
970 if isinstance(event, BuildStartedEvent):
971 self._doBuildStartedEvent(event)
972 elif isinstance(event, BuildCompletedEvent):
973 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800974 elif isinstance(event, MergeCompletedEvent):
975 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800976 else:
977 self.log.error("Unable to handle event %s" % event)
978 finally:
979 self.result_event_queue.task_done()
980
981 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800982 build = event.build
983 if build.build_set is not build.build_set.item.current_build_set:
984 self.log.warning("Build %s is not in the current build set" %
985 (build,))
986 return
987 pipeline = build.build_set.item.pipeline
988 if not pipeline:
989 self.log.warning("Build %s is not associated with a pipeline" %
990 (build,))
991 return
992 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800993
994 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800995 build = event.build
996 if build.build_set is not build.build_set.item.current_build_set:
997 self.log.warning("Build %s is not in the current build set" %
998 (build,))
999 return
1000 pipeline = build.build_set.item.pipeline
1001 if not pipeline:
1002 self.log.warning("Build %s is not associated with a pipeline" %
1003 (build,))
1004 return
1005 pipeline.manager.onBuildCompleted(event.build)
1006
1007 def _doMergeCompletedEvent(self, event):
1008 build_set = event.build_set
1009 if build_set is not build_set.item.current_build_set:
1010 self.log.warning("Build set %s is not current" % (build_set,))
1011 return
1012 pipeline = build_set.item.pipeline
1013 if not pipeline:
1014 self.log.warning("Build set %s is not associated with a pipeline" %
1015 (build_set,))
1016 return
1017 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -07001018
James E. Blair8dbd56a2012-12-22 10:55:10 -08001019 def formatStatusJSON(self):
1020 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001021
1022 data['zuul_version'] = self.zuul_version
1023
James E. Blair8dbd56a2012-12-22 10:55:10 -08001024 if self._pause:
1025 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -08001026 if self._exit:
1027 ret += 'exit'
1028 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
1029 ret += '</p>'
1030 data['message'] = ret
1031
James E. Blairfb682cc2013-02-26 15:23:27 -08001032 data['trigger_event_queue'] = {}
1033 data['trigger_event_queue']['length'] = \
1034 self.trigger_event_queue.qsize()
1035 data['result_event_queue'] = {}
1036 data['result_event_queue']['length'] = \
1037 self.result_event_queue.qsize()
1038
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +04001039 if self.last_reconfigured:
1040 data['last_reconfigured'] = self.last_reconfigured * 1000
1041
James E. Blair8dbd56a2012-12-22 10:55:10 -08001042 pipelines = []
1043 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -07001044 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -08001045 pipelines.append(pipeline.formatStatusJSON())
1046 return json.dumps(data)
1047
James E. Blair1e8dd892012-05-30 09:15:05 -07001048
James E. Blair4aea70c2012-07-26 14:23:24 -07001049class BasePipelineManager(object):
1050 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -07001051
James E. Blair4aea70c2012-07-26 14:23:24 -07001052 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -07001053 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -07001054 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -07001055 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -07001056 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -07001057 if self.sched.config and self.sched.config.has_option(
1058 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -07001059 self.report_times = self.sched.config.getboolean(
1060 'zuul', 'report_times')
1061 else:
1062 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -07001063
1064 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -07001065 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -07001066
James E. Blaireff88162013-07-01 12:44:14 -04001067 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -07001068 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -07001069 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -07001070 self.log.info(" Requirements:")
1071 for f in self.changeish_filters:
1072 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -07001073 self.log.info(" Events:")
1074 for e in self.event_filters:
1075 self.log.info(" %s" % e)
1076 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001077
James E. Blairee743612012-05-29 14:49:32 -07001078 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001079 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001080 if tree.job:
1081 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001082 for b in tree.job._branches:
1083 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001084 for f in tree.job._files:
1085 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001086 if tree.job.skip_if_matcher:
1087 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001088 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001089 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -07001090 hold = ''
1091 if tree.job.hold_following_changes:
1092 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -07001093 voting = ''
1094 if not tree.job.voting:
1095 voting = ' [nonvoting]'
1096 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
1097 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -07001098 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001099 log_jobs(x, indent + 2)
1100
James E. Blaireff88162013-07-01 12:44:14 -04001101 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001102 tree = self.pipeline.getJobTree(p)
1103 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001104 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001105 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001106 self.log.info(" On start:")
1107 self.log.info(" %s" % self.pipeline.start_actions)
1108 self.log.info(" On success:")
1109 self.log.info(" %s" % self.pipeline.success_actions)
1110 self.log.info(" On failure:")
1111 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001112 self.log.info(" On merge-failure:")
1113 self.log.info(" %s" % self.pipeline.merge_failure_actions)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001114 self.log.info(" When disabled:")
1115 self.log.info(" %s" % self.pipeline.disabled_actions)
James E. Blairee743612012-05-29 14:49:32 -07001116
James E. Blaire421a232012-07-25 16:59:21 -07001117 def getSubmitAllowNeeds(self):
1118 # Get a list of code review labels that are allowed to be
1119 # "needed" in the submit records for a change, with respect
1120 # to this queue. In other words, the list of review labels
1121 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001122 allow_needs = set()
1123 for action_reporter in self.pipeline.success_actions:
1124 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1125 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001126
James E. Blairc053d022014-01-22 14:57:33 -08001127 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001128 if event.forced_pipeline:
1129 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001130 self.log.debug("Event %s for change %s was directly assigned "
1131 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001132 return True
1133 else:
1134 return False
James E. Blairee743612012-05-29 14:49:32 -07001135 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001136 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001137 self.log.debug("Event %s for change %s matched %s "
1138 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001139 return True
1140 return False
1141
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001142 def isChangeAlreadyInPipeline(self, change):
1143 # Checks live items in the pipeline
1144 for item in self.pipeline.getAllItems():
1145 if item.live and change.equals(item.change):
1146 return True
1147 return False
1148
1149 def isChangeAlreadyInQueue(self, change, change_queue):
1150 # Checks any item in the specified change queue
1151 for item in change_queue.queue:
1152 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001153 return True
1154 return False
1155
James E. Blaire0487072012-08-29 17:38:31 -07001156 def reportStart(self, change):
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001157 if not self.pipeline._disabled:
1158 try:
1159 self.log.info("Reporting start, action %s change %s" %
1160 (self.pipeline.start_actions, change))
1161 msg = "Starting %s jobs." % self.pipeline.name
1162 if self.sched.config.has_option('zuul', 'status_url'):
1163 msg += "\n" + self.sched.config.get('zuul', 'status_url')
1164 ret = self.sendReport(self.pipeline.start_actions,
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001165 self.pipeline.source, change, msg)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001166 if ret:
1167 self.log.error("Reporting change start %s received: %s" %
1168 (change, ret))
1169 except:
1170 self.log.exception("Exception while reporting start:")
James E. Blaire0487072012-08-29 17:38:31 -07001171
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001172 def sendReport(self, action_reporters, source, change, message):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001173 """Sends the built message off to configured reporters.
1174
1175 Takes the action_reporters, change, message and extra options and
1176 sends them to the pluggable reporters.
1177 """
1178 report_errors = []
1179 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001180 for action_reporter in action_reporters:
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001181 ret = action_reporter.report(source, change, message)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001182 if ret:
1183 report_errors.append(ret)
1184 if len(report_errors) == 0:
1185 return
1186 return report_errors
1187
James E. Blaire0487072012-08-29 17:38:31 -07001188 def isChangeReadyToBeEnqueued(self, change):
1189 return True
1190
James E. Blair5ee24252014-12-30 10:12:29 -08001191 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1192 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001193 return True
1194
James E. Blair5ee24252014-12-30 10:12:29 -08001195 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1196 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001197 return True
1198
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001199 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001200 return True
1201
James E. Blair6965a4b2014-12-16 17:19:04 -08001202 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001203 return None
1204
James E. Blairfee8d652013-06-07 08:57:52 -07001205 def getDependentItems(self, item):
1206 orig_item = item
1207 items = []
1208 while item.item_ahead:
1209 items.append(item.item_ahead)
1210 item = item.item_ahead
1211 self.log.info("Change %s depends on changes %s" %
1212 (orig_item.change,
1213 [x.change for x in items]))
1214 return items
1215
James E. Blair972e3c72013-08-29 12:04:55 -07001216 def getItemForChange(self, change):
1217 for item in self.pipeline.getAllItems():
1218 if item.change.equals(change):
1219 return item
1220 return None
1221
James E. Blair2fa50962013-01-30 21:50:41 -08001222 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001223 for item in self.pipeline.getAllItems():
1224 if not item.live:
1225 continue
1226 if change.isUpdateOf(item.change):
1227 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001228 return None
1229
1230 def removeOldVersionsOfChange(self, change):
1231 if not self.pipeline.dequeue_on_new_patchset:
1232 return
James E. Blairba437362015-02-07 11:41:52 -08001233 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1234 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001235 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001236 (change, old_item.change, old_item))
1237 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001238
Antoine Mussobd86a312014-01-08 14:51:33 +01001239 def removeAbandonedChange(self, change):
1240 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001241 for item in self.pipeline.getAllItems():
1242 if not item.live:
1243 continue
1244 if item.change.equals(change):
1245 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001246
James E. Blairbfb8e042014-12-30 17:01:44 -08001247 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001248 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1249 if change_queue:
1250 self.log.debug("Re-enqueing change %s in queue %s" %
1251 (item.change, change_queue))
1252 change_queue.enqueueItem(item)
James E. Blair6bc782d2015-07-17 16:20:21 -07001253
1254 # Re-set build results in case any new jobs have been
1255 # added to the tree.
1256 for build in item.current_build_set.getBuilds():
1257 if build.result:
1258 self.pipeline.setResult(item, build)
1259 # Similarly, reset the item state.
1260 if item.current_build_set.unable_to_merge:
1261 self.pipeline.setUnableToMerge(item)
1262 if item.dequeued_needing_change:
1263 self.pipeline.setDequeuedNeedingChange(item)
1264
James E. Blair0577cd62015-02-07 11:42:12 -08001265 self.reportStats(item)
1266 return True
1267 else:
1268 self.log.error("Unable to find change queue for project %s" %
1269 item.change.project)
1270 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001271
James E. Blairf9ab8842014-07-10 13:12:07 -07001272 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001273 ignore_requirements=False, live=True,
1274 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001275 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001276
1277 # If we are adding a live change, check if it's a live item
1278 # anywhere in the pipeline. Otherwise, we will perform the
1279 # duplicate check below on the specific change_queue.
1280 if live and self.isChangeAlreadyInPipeline(change):
1281 self.log.debug("Change %s is already in pipeline, "
1282 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001283 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001284
James E. Blaire0487072012-08-29 17:38:31 -07001285 if not self.isChangeReadyToBeEnqueued(change):
1286 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1287 change)
1288 return False
1289
James E. Blairf9ab8842014-07-10 13:12:07 -07001290 if not ignore_requirements:
1291 for f in self.changeish_filters:
1292 if not f.matches(change):
1293 self.log.debug("Change %s does not match pipeline "
1294 "requirement %s" % (change, f))
1295 return False
James E. Blair11041d22014-05-02 14:49:53 -07001296
James E. Blair0577cd62015-02-07 11:42:12 -08001297 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001298 if not change_queue:
1299 self.log.debug("Unable to find change queue for "
1300 "change %s in project %s" %
1301 (change, change.project))
1302 return False
1303
James E. Blair0577cd62015-02-07 11:42:12 -08001304 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1305 change_queue):
1306 self.log.debug("Failed to enqueue changes "
1307 "ahead of %s" % change)
1308 return False
James E. Blaire0487072012-08-29 17:38:31 -07001309
James E. Blair0577cd62015-02-07 11:42:12 -08001310 if self.isChangeAlreadyInQueue(change, change_queue):
1311 self.log.debug("Change %s is already in queue, "
1312 "ignoring" % change)
1313 return True
1314
1315 self.log.debug("Adding change %s to queue %s" %
1316 (change, change_queue))
1317 if not quiet:
1318 if len(self.pipeline.start_actions) > 0:
1319 self.reportStart(change)
1320 item = change_queue.enqueueChange(change)
1321 if enqueue_time:
1322 item.enqueue_time = enqueue_time
1323 item.live = live
1324 self.reportStats(item)
1325 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1326 change_queue)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001327 for trigger in self.sched.triggers.values():
1328 trigger.onChangeEnqueued(item.change, self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001329 return True
1330
James E. Blair972e3c72013-08-29 12:04:55 -07001331 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001332 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001333 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001334
James E. Blairba437362015-02-07 11:41:52 -08001335 def removeItem(self, item):
1336 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001337 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001338 self.log.debug("Canceling builds behind change: %s "
1339 "because it is being removed." % item.change)
1340 self.cancelJobs(item)
1341 self.dequeueItem(item)
1342 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001343
James E. Blairac2c3242014-01-24 13:38:51 -08001344 def _makeMergerItem(self, item):
1345 # Create a dictionary with all info about the item needed by
1346 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001347 number = None
1348 patchset = None
1349 oldrev = None
1350 newrev = None
1351 if hasattr(item.change, 'number'):
1352 number = item.change.number
1353 patchset = item.change.patchset
1354 elif hasattr(item.change, 'newrev'):
1355 oldrev = item.change.oldrev
1356 newrev = item.change.newrev
Joshua Hesketh352264b2015-08-11 23:42:08 +10001357 connection_name = self.pipeline.source.connection.connection_name
James E. Blairac2c3242014-01-24 13:38:51 -08001358 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001359 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001360 item.change.project),
Joshua Hesketh352264b2015-08-11 23:42:08 +10001361 connection_name=connection_name,
James E. Blairac2c3242014-01-24 13:38:51 -08001362 merge_mode=item.change.project.merge_mode,
1363 refspec=item.change.refspec,
1364 branch=item.change.branch,
1365 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001366 number=number,
1367 patchset=patchset,
1368 oldrev=oldrev,
1369 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001370 )
1371
James E. Blairfee8d652013-06-07 08:57:52 -07001372 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001373 # Returns True if the ref is ready, false otherwise
1374 build_set = item.current_build_set
1375 if build_set.merge_state == build_set.COMPLETE:
1376 return True
1377 if build_set.merge_state == build_set.PENDING:
1378 return False
1379 build_set.merge_state = build_set.PENDING
1380 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001381 if hasattr(item.change, 'refspec') and not ref:
1382 self.log.debug("Preparing ref for: %s" % item.change)
1383 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001384 dependent_items = self.getDependentItems(item)
1385 dependent_items.reverse()
1386 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001387 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001388 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001389 item.current_build_set,
1390 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001391 else:
1392 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001393 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001394 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001395 url, build_set,
1396 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001397 return False
1398
1399 def _launchJobs(self, item, jobs):
1400 self.log.debug("Launching jobs for change %s" % item.change)
1401 dependent_items = self.getDependentItems(item)
1402 for job in jobs:
1403 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001404 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001405 build = self.sched.launcher.launch(job, item,
1406 self.pipeline,
1407 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001408 self.log.debug("Adding build %s of job %s to item %s" %
1409 (build, job, item))
1410 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001411 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001412 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001413 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001414
James E. Blairfee8d652013-06-07 08:57:52 -07001415 def launchJobs(self, item):
1416 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001417 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001418 self._launchJobs(item, jobs)
1419
1420 def cancelJobs(self, item, prime=True):
1421 self.log.debug("Cancel jobs for change %s" % item.change)
1422 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001423 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001424 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001425 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001426 for build in old_build_set.getBuilds():
1427 try:
1428 self.sched.launcher.cancel(build)
1429 except:
1430 self.log.exception("Exception while canceling build %s "
1431 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001432 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001433 canceled = True
James E. Blair82a42d12015-07-20 13:50:09 -07001434 self.updateBuildDescriptions(old_build_set)
James E. Blair972e3c72013-08-29 12:04:55 -07001435 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001436 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001437 (item_behind.change, item.change))
1438 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001439 canceled = True
1440 return canceled
1441
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001442 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001443 changed = False
1444 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001445 if item_ahead and (not item_ahead.live):
1446 item_ahead = None
1447 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001448 failing_reasons = [] # Reasons this item is failing
1449
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001450 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001451 # It's not okay to enqueue this change, we should remove it.
1452 self.log.info("Dequeuing change %s because "
1453 "it can no longer merge" % item.change)
1454 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001455 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001456 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001457 if item.live:
1458 try:
1459 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001460 except exceptions.MergeFailure:
James E. Blairf8b42fb2015-02-18 09:23:36 -08001461 pass
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001462 return (True, nnfi)
James E. Blair6965a4b2014-12-16 17:19:04 -08001463 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001464 actionable = change_queue.isActionable(item)
1465 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001466 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001467 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001468 failing_reasons.append('a needed change is failing')
1469 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001470 else:
James E. Blairfef71632013-09-23 11:15:47 -07001471 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001472 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001473 item_ahead_merged = True
1474 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001475 # Our current base is different than what we expected,
1476 # and it's not because our current base merged. Something
1477 # ahead must have failed.
1478 self.log.info("Resetting builds for change %s because the "
1479 "item ahead, %s, is not the nearest non-failing "
1480 "item, %s" % (item.change, item_ahead, nnfi))
1481 change_queue.moveItem(item, nnfi)
1482 changed = True
1483 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001484 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001485 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001486 if item.current_build_set.unable_to_merge:
1487 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001488 ready = False
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001489 if actionable and ready and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001490 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001491 if self.pipeline.didAnyJobFail(item):
1492 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001493 if (not item.live) and (not item.items_behind):
1494 failing_reasons.append("is a non-live item with no items behind")
1495 self.dequeueItem(item)
1496 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001497 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1498 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001499 try:
1500 self.reportItem(item)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001501 except exceptions.MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001502 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001503 for item_behind in item.items_behind:
1504 self.log.info("Resetting builds for change %s because the "
1505 "item ahead, %s, failed to merge" %
1506 (item_behind.change, item))
1507 self.cancelJobs(item_behind)
1508 self.dequeueItem(item)
1509 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001510 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001511 nnfi = item
1512 item.current_build_set.failing_reasons = failing_reasons
1513 if failing_reasons:
1514 self.log.debug("%s is a failing item because %s" %
1515 (item, failing_reasons))
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001516 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001517
1518 def processQueue(self):
1519 # Do whatever needs to be done for each change in the queue
1520 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1521 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001522 for queue in self.pipeline.queues:
1523 queue_changed = False
1524 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001525 for item in queue.queue[:]:
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001526 item_changed, nnfi = self._processOneItem(
1527 item, nnfi)
James E. Blair972e3c72013-08-29 12:04:55 -07001528 if item_changed:
1529 queue_changed = True
1530 self.reportStats(item)
1531 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001532 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001533 status = ''
1534 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001535 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001536 if status:
1537 self.log.debug("Queue %s status is now:\n %s" %
1538 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001539 self.log.debug("Finished queue processor: %s (changed: %s)" %
1540 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001541 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001542
James E. Blair11700c32012-07-05 17:50:05 -07001543 def updateBuildDescriptions(self, build_set):
1544 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001545 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001546 self.sched.launcher.setBuildDescription(build, desc)
1547
1548 if build_set.previous_build_set:
1549 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001550 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001551 self.sched.launcher.setBuildDescription(build, desc)
1552
1553 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001554 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001555 return True
1556
James E. Blairee743612012-05-29 14:49:32 -07001557 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001558 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001559 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001560
James E. Blair6b077942014-02-07 17:45:55 -08001561 self.pipeline.setResult(item, build)
1562 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001563 (item, item.formatStatus()))
James E. Blairee743612012-05-29 14:49:32 -07001564 return True
1565
James E. Blair4076e2b2014-01-28 12:42:20 -08001566 def onMergeCompleted(self, event):
1567 build_set = event.build_set
1568 item = build_set.item
1569 build_set.merge_state = build_set.COMPLETE
1570 build_set.zuul_url = event.zuul_url
1571 if event.merged:
1572 build_set.commit = event.commit
1573 elif event.updated:
Yolanda Robla276996c2015-11-10 10:41:18 +01001574 if not isinstance(item, NullChange):
1575 build_set.commit = item.change.newrev
James E. Blair4076e2b2014-01-28 12:42:20 -08001576 if not build_set.commit:
1577 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001578 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001579
James E. Blairfee8d652013-06-07 08:57:52 -07001580 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001581 if not item.reported:
1582 # _reportItem() returns True if it failed to report.
1583 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001584 if self.changes_merge:
1585 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001586 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001587 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001588 merged = self.pipeline.source.isMerged(item.change,
1589 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001590 self.log.info("Reported change %s status: all-succeeded: %s, "
1591 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001592 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001593 if not (succeeded and merged):
1594 self.log.debug("Reported change %s failed tests or failed "
1595 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001596 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001597 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001598 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001599 raise exceptions.MergeFailure(
1600 "Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001601 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001602 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001603 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001604 (change_queue, change_queue.window))
Joshua Hesketh352264b2015-08-11 23:42:08 +10001605
1606 for trigger in self.sched.triggers.values():
1607 trigger.onChangeMerged(item.change, self.pipeline.source)
James E. Blaire0487072012-08-29 17:38:31 -07001608
James E. Blairfee8d652013-06-07 08:57:52 -07001609 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001610 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001611 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001612 if not self.pipeline.getJobs(item):
1613 # We don't send empty reports with +1,
1614 # and the same for -1's (merge failures or transient errors)
1615 # as they cannot be followed by +1's
1616 self.log.debug("No jobs for change %s" % item.change)
1617 actions = []
1618 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001619 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001620 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001621 item.setReportedResult('SUCCESS')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001622 self.pipeline._consecutive_failures = 0
Joshua Heskethb7179772014-01-30 23:30:46 +11001623 elif not self.pipeline.didMergerSucceed(item):
1624 actions = self.pipeline.merge_failure_actions
1625 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001626 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001627 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001628 item.setReportedResult('FAILURE')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001629 self.pipeline._consecutive_failures += 1
1630 if self.pipeline._disabled:
1631 actions = self.pipeline.disabled_actions
1632 # Check here if we should disable so that we only use the disabled
1633 # reporters /after/ the last disable_at failure is still reported as
1634 # normal.
1635 if (self.pipeline.disable_at and not self.pipeline._disabled and
1636 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
1637 self.pipeline._disabled = True
James E. Blaire5910202013-12-27 09:50:31 -08001638 if actions:
1639 report = self.formatReport(item)
1640 try:
1641 self.log.info("Reporting change %s, actions: %s" %
1642 (item.change, actions))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001643 ret = self.sendReport(actions, self.pipeline.source,
1644 item.change, report)
James E. Blaire5910202013-12-27 09:50:31 -08001645 if ret:
1646 self.log.error("Reporting change %s received: %s" %
1647 (item.change, ret))
1648 except:
1649 self.log.exception("Exception while reporting:")
1650 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001651 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001652 return ret
1653
James E. Blairfee8d652013-06-07 08:57:52 -07001654 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001655 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001656
Jeremy Stanley10837132014-08-02 16:10:56 +00001657 if item.dequeued_needing_change:
1658 ret += 'This change depends on a change that failed to merge.\n'
1659 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001660 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001661 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001662 if self.pipeline.didAllJobsSucceed(item):
1663 ret += self.pipeline.success_message + '\n\n'
1664 else:
1665 ret += self.pipeline.failure_message + '\n\n'
1666 ret += self._formatReportJobs(item)
1667
1668 if self.pipeline.footer_message:
1669 ret += '\n' + self.pipeline.footer_message
1670
1671 return ret
1672
1673 def _formatReportJobs(self, item):
1674 # Return the list of jobs portion of the report
1675 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001676
Joshua Heskethb7179772014-01-30 23:30:46 +11001677 if self.sched.config.has_option('zuul', 'url_pattern'):
1678 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001679 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001680 url_pattern = None
1681
James E. Blair107c3852015-02-07 08:23:10 -08001682 for job in self.pipeline.getJobs(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001683 build = item.current_build_set.getBuild(job.name)
1684 result = build.result
1685 pattern = url_pattern
1686 if result == 'SUCCESS':
1687 if job.success_message:
1688 result = job.success_message
1689 if job.success_pattern:
1690 pattern = job.success_pattern
1691 elif result == 'FAILURE':
1692 if job.failure_message:
1693 result = job.failure_message
1694 if job.failure_pattern:
1695 pattern = job.failure_pattern
1696 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001697 url = pattern.format(change=item.change,
1698 pipeline=self.pipeline,
1699 job=job,
1700 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001701 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001702 url = build.url or job.name
1703 if not job.voting:
1704 voting = ' (non-voting)'
1705 else:
1706 voting = ''
1707 if self.report_times and build.end_time and build.start_time:
1708 dt = int(build.end_time - build.start_time)
1709 m, s = divmod(dt, 60)
1710 h, m = divmod(m, 60)
1711 if h:
1712 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1713 elif m:
1714 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001715 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001716 elapsed = ' in %ds' % (s)
1717 else:
1718 elapsed = ''
1719 name = ''
1720 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1721 if self.sched.config.getboolean('zuul',
1722 'job_name_in_report'):
1723 name = job.name + ' '
1724 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1725 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001726 return ret
1727
1728 def formatDescription(self, build):
1729 concurrent_changes = ''
1730 concurrent_builds = ''
1731 other_builds = ''
1732
1733 for change in build.build_set.other_changes:
1734 concurrent_changes += '<li><a href="{change.url}">\
1735 {change.number},{change.patchset}</a></li>'.format(
1736 change=change)
1737
James E. Blairfee8d652013-06-07 08:57:52 -07001738 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001739
1740 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001741 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001742 concurrent_builds += """\
1743<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001744 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001745 {build.job.name} #{build.number}</a>: {build.result}
1746</li>
1747""".format(build=build)
1748 else:
1749 concurrent_builds += """\
1750<li>
1751 {build.job.name}: {build.result}
1752</li>""".format(build=build)
1753
1754 if build.build_set.previous_build_set:
1755 other_build = build.build_set.previous_build_set.getBuild(
1756 build.job.name)
1757 if other_build:
1758 other_builds += """\
1759<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001760 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001761 {build.job.name} #{build.number}</a>
1762</li>
1763""".format(build=other_build)
1764
1765 if build.build_set.next_build_set:
1766 other_build = build.build_set.next_build_set.getBuild(
1767 build.job.name)
1768 if other_build:
1769 other_builds += """\
1770<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001771 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001772 {build.job.name} #{build.number}</a>
1773</li>
1774""".format(build=other_build)
1775
1776 result = build.build_set.result
1777
1778 if hasattr(change, 'number'):
1779 ret = """\
1780<p>
1781 Triggered by change:
1782 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1783 Branch: <b>{change.branch}</b><br/>
1784 Pipeline: <b>{self.pipeline.name}</b>
1785</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001786 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001787 ret = """\
1788<p>
1789 Triggered by reference:
1790 {change.ref}</a><br/>
1791 Old revision: <b>{change.oldrev}</b><br/>
1792 New revision: <b>{change.newrev}</b><br/>
1793 Pipeline: <b>{self.pipeline.name}</b>
1794</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001795 else:
1796 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001797
1798 if concurrent_changes:
1799 ret += """\
1800<p>
1801 Other changes tested concurrently with this change:
1802 <ul>{concurrent_changes}</ul>
1803</p>
1804"""
1805 if concurrent_builds:
1806 ret += """\
1807<p>
1808 All builds for this change set:
1809 <ul>{concurrent_builds}</ul>
1810</p>
1811"""
1812
1813 if other_builds:
1814 ret += """\
1815<p>
1816 Other build sets for this change:
1817 <ul>{other_builds}</ul>
1818</p>
1819"""
1820 if result:
1821 ret += """\
1822<p>
1823 Reported result: <b>{result}</b>
1824</p>
1825"""
1826
1827 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001828 return ret
1829
James E. Blairfee8d652013-06-07 08:57:52 -07001830 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001831 if not statsd:
1832 return
1833 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001834 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001835 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001836 if item.dequeue_time:
1837 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001838 else:
1839 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001840 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001841
1842 # stats.timers.zuul.pipeline.NAME.resident_time
1843 # stats_counts.zuul.pipeline.NAME.total_changes
1844 # stats.gauges.zuul.pipeline.NAME.current_changes
1845 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001846 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001847 if dt:
1848 statsd.timing(key + '.resident_time', dt)
1849 statsd.incr(key + '.total_changes')
1850
1851 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1852 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001853 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001854 key += '.%s' % project_name
1855 if dt:
1856 statsd.timing(key + '.resident_time', dt)
1857 statsd.incr(key + '.total_changes')
1858 except:
1859 self.log.exception("Exception reporting pipeline stats")
1860
James E. Blair1e8dd892012-05-30 09:15:05 -07001861
James E. Blair0577cd62015-02-07 11:42:12 -08001862class DynamicChangeQueueContextManager(object):
1863 def __init__(self, change_queue):
1864 self.change_queue = change_queue
1865
1866 def __enter__(self):
1867 return self.change_queue
1868
1869 def __exit__(self, etype, value, tb):
1870 if self.change_queue and not self.change_queue.queue:
1871 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1872
1873
James E. Blair4aea70c2012-07-26 14:23:24 -07001874class IndependentPipelineManager(BasePipelineManager):
1875 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001876 changes_merge = False
1877
James E. Blaireff88162013-07-01 12:44:14 -04001878 def _postConfig(self, layout):
1879 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001880
James E. Blair0577cd62015-02-07 11:42:12 -08001881 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001882 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001883 if existing:
1884 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001885 if change.project not in self.pipeline.getProjects():
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001886 self.pipeline.addProject(change.project)
James E. Blairbfb8e042014-12-30 17:01:44 -08001887 change_queue = ChangeQueue(self.pipeline)
1888 change_queue.addProject(change.project)
1889 self.pipeline.addQueue(change_queue)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001890 self.log.debug("Dynamically created queue %s", change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001891 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001892
1893 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1894 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001895 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001896 if ret in [True, False]:
1897 return ret
1898 self.log.debug(" Changes %s must be merged ahead of %s" %
1899 (ret, change))
1900 for needed_change in ret:
1901 # This differs from the dependent pipeline by enqueuing
1902 # changes ahead as "not live", that is, not intended to
1903 # have jobs run. Also, pipeline requirements are always
1904 # ignored (which is safe because the changes are not
1905 # live).
1906 r = self.addChange(needed_change, quiet=True,
1907 ignore_requirements=True,
1908 live=False, change_queue=change_queue)
1909 if not r:
1910 return False
1911 return True
1912
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001913 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001914 if self.pipeline.ignore_dependencies:
1915 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001916 self.log.debug("Checking for changes needed by %s:" % change)
1917 # Return true if okay to proceed enqueing this change,
1918 # false if the change should not be enqueued.
1919 if not hasattr(change, 'needs_changes'):
1920 self.log.debug(" Changeish does not support dependencies")
1921 return True
1922 if not change.needs_changes:
1923 self.log.debug(" No changes needed")
1924 return True
1925 changes_needed = []
1926 for needed_change in change.needs_changes:
1927 self.log.debug(" Change %s needs change %s:" % (
1928 change, needed_change))
1929 if needed_change.is_merged:
1930 self.log.debug(" Needed change is merged")
1931 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001932 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001933 self.log.debug(" Needed change is already ahead in the queue")
1934 continue
1935 self.log.debug(" Change %s is needed" % needed_change)
1936 if needed_change not in changes_needed:
1937 changes_needed.append(needed_change)
1938 continue
1939 # This differs from the dependent pipeline check in not
1940 # verifying that the dependent change is mergable.
1941 if changes_needed:
1942 return changes_needed
1943 return True
1944
1945 def dequeueItem(self, item):
1946 super(IndependentPipelineManager, self).dequeueItem(item)
1947 # An independent pipeline manager dynamically removes empty
1948 # queues
1949 if not item.queue.queue:
1950 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001951
James E. Blair1e8dd892012-05-30 09:15:05 -07001952
James E. Blair0577cd62015-02-07 11:42:12 -08001953class StaticChangeQueueContextManager(object):
1954 def __init__(self, change_queue):
1955 self.change_queue = change_queue
1956
1957 def __enter__(self):
1958 return self.change_queue
1959
1960 def __exit__(self, etype, value, tb):
1961 pass
1962
1963
James E. Blair4aea70c2012-07-26 14:23:24 -07001964class DependentPipelineManager(BasePipelineManager):
1965 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001966 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001967
1968 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001969 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001970
James E. Blaireff88162013-07-01 12:44:14 -04001971 def _postConfig(self, layout):
1972 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001973 self.buildChangeQueues()
1974
1975 def buildChangeQueues(self):
1976 self.log.debug("Building shared change queues")
1977 change_queues = []
1978
James E. Blair4aea70c2012-07-26 14:23:24 -07001979 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001980 change_queue = ChangeQueue(
1981 self.pipeline,
1982 window=self.pipeline.window,
1983 window_floor=self.pipeline.window_floor,
1984 window_increase_type=self.pipeline.window_increase_type,
1985 window_increase_factor=self.pipeline.window_increase_factor,
1986 window_decrease_type=self.pipeline.window_decrease_type,
1987 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001988 change_queue.addProject(project)
1989 change_queues.append(change_queue)
1990 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001991
James E. Blairc3d428e2013-12-03 15:06:48 -08001992 # Iterate over all queues trying to combine them, and keep doing
1993 # so until they can not be combined further.
1994 last_change_queues = change_queues
1995 while True:
1996 new_change_queues = self.combineChangeQueues(last_change_queues)
1997 if len(last_change_queues) == len(new_change_queues):
1998 break
1999 last_change_queues = new_change_queues
2000
2001 self.log.info(" Shared change queues:")
2002 for queue in new_change_queues:
2003 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08002004 self.log.info(" %s containing %s" % (
2005 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08002006
2007 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07002008 self.log.debug("Combining shared queues")
2009 new_change_queues = []
2010 for a in change_queues:
2011 merged_a = False
2012 for b in new_change_queues:
2013 if not a.getJobs().isdisjoint(b.getJobs()):
2014 self.log.debug("Merging queue %s into %s" % (a, b))
2015 b.mergeChangeQueue(a)
2016 merged_a = True
2017 break # this breaks out of 'for b' and continues 'for a'
2018 if not merged_a:
2019 self.log.debug("Keeping queue %s" % (a))
2020 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08002021 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07002022
James E. Blair0577cd62015-02-07 11:42:12 -08002023 def getChangeQueue(self, change, existing=None):
2024 if existing:
2025 return StaticChangeQueueContextManager(existing)
2026 return StaticChangeQueueContextManager(
2027 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08002028
James E. Blaire0487072012-08-29 17:38:31 -07002029 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07002030 if not self.pipeline.source.canMerge(change,
2031 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002032 self.log.debug("Change %s can not merge, ignoring" % change)
2033 return False
2034 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07002035
James E. Blair5ee24252014-12-30 10:12:29 -08002036 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
2037 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07002038 to_enqueue = []
2039 self.log.debug("Checking for changes needing %s:" % change)
2040 if not hasattr(change, 'needed_by_changes'):
2041 self.log.debug(" Changeish does not support dependencies")
2042 return
James E. Blair5ee24252014-12-30 10:12:29 -08002043 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08002044 with self.getChangeQueue(other_change) as other_change_queue:
2045 if other_change_queue != change_queue:
2046 self.log.debug(" Change %s in project %s can not be "
2047 "enqueued in the target queue %s" %
2048 (other_change, other_change.project,
2049 change_queue))
2050 continue
James E. Blair5ee24252014-12-30 10:12:29 -08002051 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07002052 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07002053 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08002054 (other_change, change))
2055 to_enqueue.append(other_change)
2056
James E. Blaire0487072012-08-29 17:38:31 -07002057 if not to_enqueue:
2058 self.log.debug(" No changes need %s" % change)
2059
2060 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07002061 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002062 ignore_requirements=ignore_requirements,
2063 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002064
James E. Blair5ee24252014-12-30 10:12:29 -08002065 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
2066 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002067 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07002068 if ret in [True, False]:
2069 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08002070 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07002071 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08002072 for needed_change in ret:
2073 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08002074 ignore_requirements=ignore_requirements,
2075 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08002076 if not r:
2077 return False
2078 return True
James E. Blaire0487072012-08-29 17:38:31 -07002079
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002080 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002081 self.log.debug("Checking for changes needed by %s:" % change)
2082 # Return true if okay to proceed enqueing this change,
2083 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002084 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002085 self.log.debug(" Changeish does not support dependencies")
2086 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002087 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002088 self.log.debug(" No changes needed")
2089 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002090 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002091 # Ignore supplied change_queue
2092 with self.getChangeQueue(change) as change_queue:
2093 for needed_change in change.needs_changes:
2094 self.log.debug(" Change %s needs change %s:" % (
2095 change, needed_change))
2096 if needed_change.is_merged:
2097 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002098 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002099 with self.getChangeQueue(needed_change) as needed_change_queue:
2100 if needed_change_queue != change_queue:
2101 self.log.debug(" Change %s in project %s does not "
2102 "share a change queue with %s "
2103 "in project %s" %
2104 (needed_change, needed_change.project,
2105 change, change.project))
2106 return False
2107 if not needed_change.is_current_patchset:
2108 self.log.debug(" Needed change is not the "
2109 "current patchset")
2110 return False
2111 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2112 self.log.debug(" Needed change is already ahead "
2113 "in the queue")
2114 continue
2115 if self.pipeline.source.canMerge(needed_change,
2116 self.getSubmitAllowNeeds()):
2117 self.log.debug(" Change %s is needed" % needed_change)
2118 if needed_change not in changes_needed:
2119 changes_needed.append(needed_change)
2120 continue
2121 # The needed change can't be merged.
2122 self.log.debug(" Change %s is needed but can not be merged" %
2123 needed_change)
2124 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002125 if changes_needed:
2126 return changes_needed
2127 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002128
James E. Blair6965a4b2014-12-16 17:19:04 -08002129 def getFailingDependentItems(self, item):
2130 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002131 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002132 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002133 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002134 failing_items = set()
2135 for needed_change in item.change.needs_changes:
2136 needed_item = self.getItemForChange(needed_change)
2137 if not needed_item:
2138 continue
2139 if needed_item.current_build_set.failing_reasons:
2140 failing_items.add(needed_item)
2141 if failing_items:
2142 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002143 return None