blob: f3ecc033fccb6862c7b660d389acdef59c43edae [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
33from model import EventFilter, ChangeishFilter
Maru Newby3fe5f852015-01-13 04:22:14 +000034from zuul import change_matcher
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairfee8d652013-06-07 08:57:52 -070062class MergeFailure(Exception):
63 pass
64
65
James E. Blair468c8512013-12-06 13:27:19 -080066class ManagementEvent(object):
67 """An event that should be processed within the main queue run loop"""
68 def __init__(self):
69 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080070 self._exception = None
71 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080072
James E. Blair36658cf2013-12-06 17:53:48 -080073 def exception(self, e, tb):
74 self._exception = e
75 self._traceback = tb
76 self._wait_event.set()
77
78 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080079 self._wait_event.set()
80
81 def wait(self, timeout=None):
82 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080083 if self._exception:
84 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080085 return self._wait_event.is_set()
86
87
88class ReconfigureEvent(ManagementEvent):
89 """Reconfigure the scheduler. The layout will be (re-)loaded from
90 the path specified in the configuration.
91
92 :arg ConfigParser config: the new configuration
93 """
94 def __init__(self, config):
95 super(ReconfigureEvent, self).__init__()
96 self.config = config
97
98
James E. Blair36658cf2013-12-06 17:53:48 -080099class PromoteEvent(ManagementEvent):
100 """Promote one or more changes to the head of the queue.
101
102 :arg str pipeline_name: the name of the pipeline
103 :arg list change_ids: a list of strings of change ids in the form
104 1234,1
105 """
106
107 def __init__(self, pipeline_name, change_ids):
108 super(PromoteEvent, self).__init__()
109 self.pipeline_name = pipeline_name
110 self.change_ids = change_ids
111
112
James E. Blaird27a96d2014-07-10 13:25:13 -0700113class EnqueueEvent(ManagementEvent):
114 """Enqueue a change into a pipeline
115
116 :arg TriggerEvent trigger_event: a TriggerEvent describing the
117 trigger, pipeline, and change to enqueue
118 """
119
120 def __init__(self, trigger_event):
121 super(EnqueueEvent, self).__init__()
122 self.trigger_event = trigger_event
123
124
James E. Blaira84f0e42014-02-06 07:09:22 -0800125class ResultEvent(object):
126 """An event that needs to modify the pipeline state due to a
127 result from an external system."""
128
129 pass
130
131
132class BuildStartedEvent(ResultEvent):
133 """A build has started.
134
135 :arg Build build: The build which has started.
136 """
137
138 def __init__(self, build):
139 self.build = build
140
141
142class BuildCompletedEvent(ResultEvent):
143 """A build has completed
144
145 :arg Build build: The build which has completed.
146 """
147
148 def __init__(self, build):
149 self.build = build
150
151
James E. Blair4076e2b2014-01-28 12:42:20 -0800152class MergeCompletedEvent(ResultEvent):
153 """A remote merge operation has completed
154
155 :arg BuildSet build_set: The build_set which is ready.
156 :arg str zuul_url: The URL of the Zuul Merger.
157 :arg bool merged: Whether the merge succeeded (changes with refs).
158 :arg bool updated: Whether the repo was updated (changes without refs).
159 :arg str commit: The SHA of the merged commit (changes with refs).
160 """
161
162 def __init__(self, build_set, zuul_url, merged, updated, commit):
163 self.build_set = build_set
164 self.zuul_url = zuul_url
165 self.merged = merged
166 self.updated = updated
167 self.commit = commit
168
169
Maru Newby3fe5f852015-01-13 04:22:14 +0000170def toList(item):
171 if not item:
172 return []
173 if isinstance(item, list):
174 return item
175 return [item]
176
177
James E. Blaire9d45c32012-05-31 09:56:45 -0700178class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700179 log = logging.getLogger("zuul.Scheduler")
180
James E. Blaire9d45c32012-05-31 09:56:45 -0700181 def __init__(self):
182 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400183 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700184 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700185 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800186 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700187 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700188 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700189 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700190 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800191 self.merger = None
James E. Blair6c358e72013-07-29 17:06:47 -0700192 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000193 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700194 self.config = None
James E. Blairee743612012-05-29 14:49:32 -0700195
196 self.trigger_event_queue = Queue.Queue()
197 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800198 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400199 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700200
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000201 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400202 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400203
James E. Blairb0fcae42012-07-17 11:12:10 -0700204 def stop(self):
205 self._stopped = True
206 self.wake_event.set()
207
James E. Blair47958382013-01-10 17:26:02 -0800208 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700209 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800210
Maru Newby3fe5f852015-01-13 04:22:14 +0000211 def _parseSkipIf(self, config_job):
212 cm = change_matcher
213 skip_matchers = []
214
215 for config_skip in config_job.get('skip-if', []):
216 nested_matchers = []
217
218 project_regex = config_skip.get('project')
219 if project_regex:
220 nested_matchers.append(cm.ProjectMatcher(project_regex))
221
222 branch_regex = config_skip.get('branch')
223 if branch_regex:
224 nested_matchers.append(cm.BranchMatcher(branch_regex))
225
226 file_regexes = toList(config_skip.get('all-files-match-any'))
227 if file_regexes:
228 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
229 all_files_matcher = cm.MatchAllFiles(file_matchers)
230 nested_matchers.append(all_files_matcher)
231
232 # All patterns need to match a given skip-if predicate
233 skip_matchers.append(cm.MatchAll(nested_matchers))
234
235 if skip_matchers:
236 # Any skip-if predicate can be matched to trigger a skip
237 return cm.MatchAny(skip_matchers)
238
James E. Blaire5a847f2012-07-10 15:29:14 -0700239 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400240 layout = model.Layout()
241 project_templates = {}
242
James E. Blaire5a847f2012-07-10 15:29:14 -0700243 if config_path:
244 config_path = os.path.expanduser(config_path)
245 if not os.path.exists(config_path):
246 raise Exception("Unable to read layout config file at %s" %
247 config_path)
248 config_file = open(config_path)
249 data = yaml.load(config_file)
250
James E. Blair47958382013-01-10 17:26:02 -0800251 validator = layoutvalidator.LayoutValidator()
252 validator.validate(data)
253
James E. Blaireff88162013-07-01 12:44:14 -0400254 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700255 for include in data.get('includes', []):
256 if 'python-file' in include:
257 fn = include['python-file']
258 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100259 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700260 fn = os.path.join(base, fn)
261 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400262 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700263
James E. Blair4aea70c2012-07-26 14:23:24 -0700264 for conf_pipeline in data.get('pipelines', []):
265 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800266 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700267 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000268 pipeline.source = self.triggers[conf_pipeline.get('source',
269 'gerrit')]
James E. Blair64ed6f22013-07-10 14:07:23 -0700270 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
271 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800272 pipeline.failure_message = conf_pipeline.get('failure-message',
273 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100274 pipeline.merge_failure_message = conf_pipeline.get(
275 'merge-failure-message', "Merge Failed.\n\nThis change was "
276 "unable to be automatically merged with the current state of "
277 "the repository. Please rebase your change and upload a new "
278 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800279 pipeline.success_message = conf_pipeline.get('success-message',
280 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100281 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800282 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700283 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800284 pipeline.ignore_dependencies = conf_pipeline.get(
285 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000286
287 action_reporters = {}
Joshua Heskethb7179772014-01-30 23:30:46 +1100288 for action in ['start', 'success', 'failure', 'merge-failure']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000289 action_reporters[action] = []
290 if conf_pipeline.get(action):
291 for reporter_name, params \
292 in conf_pipeline.get(action).items():
293 if reporter_name in self.reporters.keys():
294 action_reporters[action].append(ActionReporter(
295 self.reporters[reporter_name], params))
296 else:
297 self.log.error('Invalid reporter name %s' %
298 reporter_name)
299 pipeline.start_actions = action_reporters['start']
300 pipeline.success_actions = action_reporters['success']
301 pipeline.failure_actions = action_reporters['failure']
Joshua Heskethb7179772014-01-30 23:30:46 +1100302 if len(action_reporters['merge-failure']) > 0:
303 pipeline.merge_failure_actions = \
304 action_reporters['merge-failure']
305 else:
306 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000307
Clark Boylan7603a372014-01-21 11:43:20 -0800308 pipeline.window = conf_pipeline.get('window', 20)
309 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
310 pipeline.window_increase_type = conf_pipeline.get(
311 'window-increase-type', 'linear')
312 pipeline.window_increase_factor = conf_pipeline.get(
313 'window-increase-factor', 1)
314 pipeline.window_decrease_type = conf_pipeline.get(
315 'window-decrease-type', 'exponential')
316 pipeline.window_decrease_factor = conf_pipeline.get(
317 'window-decrease-factor', 2)
318
James E. Blair4aea70c2012-07-26 14:23:24 -0700319 manager = globals()[conf_pipeline['manager']](self, pipeline)
320 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400321 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000322
James E. Blair11041d22014-05-02 14:49:53 -0700323 if 'require' in conf_pipeline:
324 require = conf_pipeline['require']
Clark Boylana9702ad2014-05-08 17:17:24 -0700325 f = ChangeishFilter(
326 open=require.get('open'),
327 current_patchset=require.get('current-patchset'),
328 statuses=toList(require.get('status')),
James E. Blair9c17dbf2014-06-23 14:21:58 -0700329 required_approvals=toList(require.get('approval')))
James E. Blair11041d22014-05-02 14:49:53 -0700330 manager.changeish_filters.append(f)
331
James E. Blair6c358e72013-07-29 17:06:47 -0700332 # TODO: move this into triggers (may require pluggable
333 # configuration)
334 if 'gerrit' in conf_pipeline['trigger']:
James E. Blair6c358e72013-07-29 17:06:47 -0700335 for trigger in toList(conf_pipeline['trigger']['gerrit']):
336 approvals = {}
337 for approval_dict in toList(trigger.get('approval')):
338 for k, v in approval_dict.items():
339 approvals[k] = v
James E. Blair1fbfceb2014-06-23 14:42:53 -0700340 # Backwards compat for *_filter versions of these args
341 comments = toList(trigger.get('comment'))
342 if not comments:
343 comments = toList(trigger.get('comment_filter'))
344 emails = toList(trigger.get('email'))
345 if not emails:
346 emails = toList(trigger.get('email_filter'))
347 usernames = toList(trigger.get('username'))
348 if not usernames:
349 usernames = toList(trigger.get('username_filter'))
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000350 f = EventFilter(
351 trigger=self.triggers['gerrit'],
352 types=toList(trigger['event']),
353 branches=toList(trigger.get('branch')),
354 refs=toList(trigger.get('ref')),
355 event_approvals=approvals,
356 comments=comments,
357 emails=emails,
358 usernames=usernames,
359 required_approvals=toList(
360 trigger.get('require-approval')
361 )
362 )
James E. Blair6c358e72013-07-29 17:06:47 -0700363 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700364 if 'timer' in conf_pipeline['trigger']:
James E. Blair63bb0ef2013-07-29 17:14:51 -0700365 for trigger in toList(conf_pipeline['trigger']['timer']):
James E. Blairc0dedf82014-08-06 09:37:52 -0700366 f = EventFilter(trigger=self.triggers['timer'],
367 types=['timer'],
James E. Blair63bb0ef2013-07-29 17:14:51 -0700368 timespecs=toList(trigger['time']))
369 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700370 if 'zuul' in conf_pipeline['trigger']:
371 for trigger in toList(conf_pipeline['trigger']['zuul']):
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000372 f = EventFilter(
373 trigger=self.triggers['zuul'],
374 types=toList(trigger['event']),
375 pipelines=toList(trigger.get('pipeline')),
376 required_approvals=toList(
377 trigger.get('require-approval')
378 )
379 )
James E. Blairc494d542014-08-06 09:23:52 -0700380 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700381
Antoine Musso80edd5a2013-02-13 15:37:53 +0100382 for project_template in data.get('project-templates', []):
383 # Make sure the template only contains valid pipelines
384 tpl = dict(
385 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400386 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100387 if pipe_name in project_template
388 )
James E. Blaireff88162013-07-01 12:44:14 -0400389 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100390
James E. Blair47958382013-01-10 17:26:02 -0800391 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400392 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700393 # Be careful to only set attributes explicitly present on
394 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800395 m = config_job.get('queue-name', None)
396 if m:
397 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700398 m = config_job.get('failure-message', None)
399 if m:
400 job.failure_message = m
401 m = config_job.get('success-message', None)
402 if m:
403 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800404 m = config_job.get('failure-pattern', None)
405 if m:
406 job.failure_pattern = m
407 m = config_job.get('success-pattern', None)
408 if m:
409 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700410 m = config_job.get('hold-following-changes', False)
411 if m:
412 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700413 m = config_job.get('voting', None)
414 if m is not None:
415 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700416 fname = config_job.get('parameter-function', None)
417 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400418 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700419 if not func:
420 raise Exception("Unable to find function %s" % fname)
421 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700422 branches = toList(config_job.get('branch'))
423 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700424 job._branches = branches
425 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800426 files = toList(config_job.get('files'))
427 if files:
428 job._files = files
429 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000430 skip_if_matcher = self._parseSkipIf(config_job)
431 if skip_if_matcher:
432 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100433 swift = toList(config_job.get('swift'))
434 if swift:
435 for s in swift:
436 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700437
438 def add_jobs(job_tree, config_jobs):
439 for job in config_jobs:
440 if isinstance(job, list):
441 for x in job:
442 add_jobs(job_tree, x)
443 if isinstance(job, dict):
444 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400445 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700446 add_jobs(parent_tree, children)
447 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400448 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700449
James E. Blair47958382013-01-10 17:26:02 -0800450 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700451 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800452 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100453
James E. Blair3e98c022013-12-16 15:25:38 -0800454 # This is reversed due to the prepend operation below, so
455 # the ultimate order is templates (in order) followed by
456 # statically defined jobs.
457 for requested_template in reversed(
458 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100459 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400460 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100461 requested_template.get('name'))
462 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800463 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100464 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800465 # Finally merge the expansion with whatever has been
466 # already defined for this project. Prepend our new
467 # jobs to existing ones (which may have been
468 # statically defined or defined by other templates).
469 for pipeline in layout.pipelines.values():
470 if pipeline.name in expanded:
471 config_project.update(
472 {pipeline.name: expanded[pipeline.name] +
473 config_project.get(pipeline.name, [])})
James E. Blair12a92b12014-03-26 11:54:53 -0700474 # TODO: future enhancement -- handle the case where
475 # duplicate jobs have different children and you want all
476 # of the children to run after a single run of the
477 # parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100478
James E. Blaireff88162013-07-01 12:44:14 -0400479 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700480 mode = config_project.get('merge-mode', 'merge-resolve')
481 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400482 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700483 if pipeline.name in config_project:
484 job_tree = pipeline.addProject(project)
485 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700486 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700487
James E. Blairb0954652012-06-01 11:32:01 -0700488 # All jobs should be defined at this point, get rid of
489 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700490 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700491
James E. Blaireff88162013-07-01 12:44:14 -0400492 for pipeline in layout.pipelines.values():
493 pipeline.manager._postConfig(layout)
494
495 return layout
James E. Blairee743612012-05-29 14:49:32 -0700496
James E. Blairee743612012-05-29 14:49:32 -0700497 def setLauncher(self, launcher):
498 self.launcher = launcher
499
James E. Blair4076e2b2014-01-28 12:42:20 -0800500 def setMerger(self, merger):
501 self.merger = merger
502
James E. Blair6c358e72013-07-29 17:06:47 -0700503 def registerTrigger(self, trigger, name=None):
504 if name is None:
505 name = trigger.name
506 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700507
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000508 def registerReporter(self, reporter, name=None):
509 if name is None:
510 name = reporter.name
511 self.reporters[name] = reporter
512
James E. Blaircdccd972013-07-01 12:10:22 -0700513 def getProject(self, name):
514 self.layout_lock.acquire()
515 p = None
516 try:
517 p = self.layout.projects.get(name)
518 finally:
519 self.layout_lock.release()
520 return p
521
James E. Blairee743612012-05-29 14:49:32 -0700522 def addEvent(self, event):
523 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800524 try:
525 if statsd:
526 statsd.incr('gerrit.event.%s' % event.type)
527 except:
528 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700529 self.trigger_event_queue.put(event)
530 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800531 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700532
James E. Blair11700c32012-07-05 17:50:05 -0700533 def onBuildStarted(self, build):
534 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800535 build.start_time = time.time()
Timothy Chavezb2332082015-08-07 20:08:04 -0500536 try:
537 if statsd and build.pipeline:
538 jobname = build.job.name.replace('.', '_')
539 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
540 build.pipeline.name, jobname)
541 dt = int((build.start_time - build.launch_time) * 1000)
542 statsd.timing(key, dt)
543 statsd.incr(key)
544 except:
545 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800546 event = BuildStartedEvent(build)
547 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700548 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800549 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700550
James E. Blairee743612012-05-29 14:49:32 -0700551 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700552 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800553 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800554 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700555 if statsd and build.pipeline:
556 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500557 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
558 for label in build.node_labels:
559 # Jenkins includes the node name in its list of labels, so
560 # we filter it out here, since that is not statistically
561 # interesting.
562 if label == build.node_name:
563 continue
564 dt = int((build.start_time - build.launch_time) * 1000)
565 key = 'zuul.node_type.%s.job.%s.wait_time' % (
566 label, jobname)
567 statsd.timing(key, dt)
568 statsd.incr(key)
James E. Blair66eeebf2013-07-27 17:44:32 -0700569 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
570 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800571 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
572 dt = int((build.end_time - build.start_time) * 1000)
573 statsd.timing(key, dt)
574 statsd.incr(key)
575 except:
576 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800577 event = BuildCompletedEvent(build)
578 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700579 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800580 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700581
James E. Blair4076e2b2014-01-28 12:42:20 -0800582 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
583 self.log.debug("Adding merge complete event for build set: %s" %
584 build_set)
585 event = MergeCompletedEvent(build_set, zuul_url,
586 merged, updated, commit)
587 self.result_event_queue.put(event)
588 self.wake_event.set()
589
James E. Blaire9d45c32012-05-31 09:56:45 -0700590 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700591 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800592 event = ReconfigureEvent(config)
593 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700594 self.wake_event.set()
595 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800596 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700597 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400598 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700599
James E. Blair36658cf2013-12-06 17:53:48 -0800600 def promote(self, pipeline_name, change_ids):
601 event = PromoteEvent(pipeline_name, change_ids)
602 self.management_event_queue.put(event)
603 self.wake_event.set()
604 self.log.debug("Waiting for promotion")
605 event.wait()
606 self.log.debug("Promotion complete")
607
James E. Blaird27a96d2014-07-10 13:25:13 -0700608 def enqueue(self, trigger_event):
609 event = EnqueueEvent(trigger_event)
610 self.management_event_queue.put(event)
611 self.wake_event.set()
612 self.log.debug("Waiting for enqueue")
613 event.wait()
614 self.log.debug("Enqueue complete")
615
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700616 def exit(self):
617 self.log.debug("Prepare to exit")
618 self._pause = True
619 self._exit = True
620 self.wake_event.set()
621 self.log.debug("Waiting for exit")
622
623 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700624 if self.config.has_option('zuul', 'state_dir'):
625 state_dir = os.path.expanduser(self.config.get('zuul',
626 'state_dir'))
627 else:
628 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700629 return os.path.join(state_dir, 'queue.pickle')
630
631 def _save_queue(self):
632 pickle_file = self._get_queue_pickle_file()
633 events = []
634 while not self.trigger_event_queue.empty():
635 events.append(self.trigger_event_queue.get())
636 self.log.debug("Queue length is %s" % len(events))
637 if events:
638 self.log.debug("Saving queue")
639 pickle.dump(events, open(pickle_file, 'wb'))
640
641 def _load_queue(self):
642 pickle_file = self._get_queue_pickle_file()
643 if os.path.exists(pickle_file):
644 self.log.debug("Loading queue")
645 events = pickle.load(open(pickle_file, 'rb'))
646 self.log.debug("Queue length is %s" % len(events))
647 for event in events:
648 self.trigger_event_queue.put(event)
649 else:
650 self.log.debug("No queue file found")
651
652 def _delete_queue(self):
653 pickle_file = self._get_queue_pickle_file()
654 if os.path.exists(pickle_file):
655 self.log.debug("Deleting saved queue")
656 os.unlink(pickle_file)
657
658 def resume(self):
659 try:
660 self._load_queue()
661 except:
662 self.log.exception("Unable to load queue")
663 try:
664 self._delete_queue()
665 except:
666 self.log.exception("Unable to delete saved queue")
667 self.log.debug("Resuming queue processing")
668 self.wake_event.set()
669
670 def _doPauseEvent(self):
671 if self._exit:
672 self.log.debug("Exiting")
673 self._save_queue()
674 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700675
James E. Blair468c8512013-12-06 13:27:19 -0800676 def _doReconfigureEvent(self, event):
677 # This is called in the scheduler loop after another thread submits
678 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700679 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800680 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700681 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700682 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700683 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400684 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700685 for name, new_pipeline in layout.pipelines.items():
686 old_pipeline = self.layout.pipelines.get(name)
687 if not old_pipeline:
688 if self.layout.pipelines:
689 # Don't emit this warning on startup
690 self.log.warning("No old pipeline matching %s found "
691 "when reconfiguring" % name)
692 continue
James E. Blairdad52252014-02-07 16:59:17 -0800693 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700694 items_to_remove = []
James E. Blair6b077942014-02-07 17:45:55 -0800695 builds_to_remove = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800696 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700697 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700698 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800699 if not item.item_ahead:
700 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700701 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700702 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700703 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800704 item.queue = None
James E. Blaircdccd972013-07-01 12:10:22 -0700705 project = layout.projects.get(item.change.project.name)
706 if not project:
707 self.log.warning("Unable to find project for "
708 "change %s while reenqueueing" %
709 item.change)
710 item.change.project = None
711 items_to_remove.append(item)
712 continue
713 item.change.project = project
James E. Blairdad52252014-02-07 16:59:17 -0800714 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800715 job = layout.jobs.get(build.job.name)
716 if job:
717 build.job = job
718 else:
719 builds_to_remove.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800720 if not new_pipeline.manager.reEnqueueItem(item,
721 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700722 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800723 for item in items_to_remove:
724 for build in item.current_build_set.getBuilds():
James E. Blaircdccd972013-07-01 12:10:22 -0700725 builds_to_remove.append(build)
James E. Blaircdccd972013-07-01 12:10:22 -0700726 for build in builds_to_remove:
James E. Blair6b077942014-02-07 17:45:55 -0800727 self.log.warning(
728 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800729 try:
730 self.launcher.cancel(build)
731 except Exception:
732 self.log.exception(
733 "Exception while canceling build %s "
734 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700735 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700736 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700737 for trigger in self.triggers.values():
738 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700739 if statsd:
740 try:
741 for pipeline in self.layout.pipelines.values():
742 items = len(pipeline.getAllItems())
743 # stats.gauges.zuul.pipeline.NAME.current_changes
744 key = 'zuul.pipeline.%s' % pipeline.name
745 statsd.gauge(key + '.current_changes', items)
746 except Exception:
747 self.log.exception("Exception reporting initial "
748 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700749 finally:
750 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700751
James E. Blair36658cf2013-12-06 17:53:48 -0800752 def _doPromoteEvent(self, event):
753 pipeline = self.layout.pipelines[event.pipeline_name]
754 change_ids = [c.split(',') for c in event.change_ids]
755 items_to_enqueue = []
756 change_queue = None
757 for shared_queue in pipeline.queues:
758 if change_queue:
759 break
760 for item in shared_queue.queue:
761 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000762 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800763 change_queue = shared_queue
764 break
765 if not change_queue:
766 raise Exception("Unable to find shared change queue for %s" %
767 event.change_ids[0])
768 for number, patchset in change_ids:
769 found = False
770 for item in change_queue.queue:
771 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000772 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800773 found = True
774 items_to_enqueue.append(item)
775 break
776 if not found:
777 raise Exception("Unable to find %s,%s in queue %s" %
778 (number, patchset, change_queue))
779 for item in change_queue.queue[:]:
780 if item not in items_to_enqueue:
781 items_to_enqueue.append(item)
782 pipeline.manager.cancelJobs(item)
783 pipeline.manager.dequeueItem(item)
784 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500785 pipeline.manager.addChange(
786 item.change,
787 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700788 quiet=True,
789 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800790
James E. Blaird27a96d2014-07-10 13:25:13 -0700791 def _doEnqueueEvent(self, event):
792 project = self.layout.projects.get(event.project_name)
793 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700794 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700795 self.log.debug("Event %s for change %s was directly assigned "
796 "to pipeline %s" % (event, change, self))
797 self.log.info("Adding %s, %s to %s" %
798 (project, change, pipeline))
799 pipeline.manager.addChange(change, ignore_requirements=True)
800
James E. Blaire9d45c32012-05-31 09:56:45 -0700801 def _areAllBuildsComplete(self):
802 self.log.debug("Checking if all builds are complete")
803 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800804 if self.merger.areMergesOutstanding():
805 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400806 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800807 for item in pipeline.getAllItems():
808 for build in item.current_build_set.getBuilds():
809 if build.result is None:
810 self.log.debug("%s waiting on %s" %
811 (pipeline.manager, build))
812 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700813 if not waiting:
814 self.log.debug("All builds are complete")
815 return True
816 self.log.debug("All builds are not complete")
817 return False
818
James E. Blairee743612012-05-29 14:49:32 -0700819 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800820 if statsd:
821 self.log.debug("Statsd enabled")
822 else:
823 self.log.debug("Statsd disabled because python statsd "
824 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700825 while True:
826 self.log.debug("Run handler sleeping")
827 self.wake_event.wait()
828 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700829 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800830 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700831 return
James E. Blairee743612012-05-29 14:49:32 -0700832 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800833 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700834 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800835 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800836 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700837
James E. Blair263fba92013-02-27 13:07:19 -0800838 # Give result events priority -- they let us stop builds,
839 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800840 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700841 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800842
843 if not self._pause:
844 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800845 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700846
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700847 if self._pause and self._areAllBuildsComplete():
848 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700849
James E. Blaira84f0e42014-02-06 07:09:22 -0800850 for pipeline in self.layout.pipelines.values():
851 while pipeline.manager.processQueue():
852 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700853
James E. Blaira84f0e42014-02-06 07:09:22 -0800854 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700855 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800856 # There may still be more events to process
857 self.wake_event.set()
858 finally:
859 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700860
James E. Blair0e933c52013-07-11 10:18:52 -0700861 def maintainTriggerCache(self):
862 relevant = set()
863 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700864 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700865 for item in pipeline.getAllItems():
866 relevant.add(item.change)
867 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700868 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700869 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700870 for trigger in self.triggers.values():
871 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700872
James E. Blairee743612012-05-29 14:49:32 -0700873 def process_event_queue(self):
874 self.log.debug("Fetching trigger event")
875 event = self.trigger_event_queue.get()
876 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800877 try:
878 project = self.layout.projects.get(event.project_name)
879 if not project:
Jukka Lehtniemibec3ed02014-10-31 13:45:17 +0100880 self.log.debug("Project %s not found" % event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800881 return
882
James E. Blaira84f0e42014-02-06 07:09:22 -0800883 for pipeline in self.layout.pipelines.values():
James E. Blairc0dedf82014-08-06 09:37:52 -0700884 change = pipeline.source.getChange(event, project)
James E. Blaira84f0e42014-02-06 07:09:22 -0800885 if event.type == 'patchset-created':
886 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100887 elif event.type == 'change-abandoned':
888 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800889 if pipeline.manager.eventMatches(event, change):
890 self.log.info("Adding %s, %s to %s" %
891 (project, change, pipeline))
892 pipeline.manager.addChange(change)
893 finally:
James E. Blairff791972013-01-09 11:45:43 -0800894 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700895
James E. Blair468c8512013-12-06 13:27:19 -0800896 def process_management_queue(self):
897 self.log.debug("Fetching management event")
898 event = self.management_event_queue.get()
899 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800900 try:
901 if isinstance(event, ReconfigureEvent):
902 self._doReconfigureEvent(event)
903 elif isinstance(event, PromoteEvent):
904 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700905 elif isinstance(event, EnqueueEvent):
906 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800907 else:
908 self.log.error("Unable to handle event %s" % event)
909 event.done()
910 except Exception as e:
911 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800912 self.management_event_queue.task_done()
913
James E. Blairee743612012-05-29 14:49:32 -0700914 def process_result_queue(self):
915 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800916 event = self.result_event_queue.get()
917 self.log.debug("Processing result event %s" % event)
918 try:
919 if isinstance(event, BuildStartedEvent):
920 self._doBuildStartedEvent(event)
921 elif isinstance(event, BuildCompletedEvent):
922 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800923 elif isinstance(event, MergeCompletedEvent):
924 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800925 else:
926 self.log.error("Unable to handle event %s" % event)
927 finally:
928 self.result_event_queue.task_done()
929
930 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800931 build = event.build
932 if build.build_set is not build.build_set.item.current_build_set:
933 self.log.warning("Build %s is not in the current build set" %
934 (build,))
935 return
936 pipeline = build.build_set.item.pipeline
937 if not pipeline:
938 self.log.warning("Build %s is not associated with a pipeline" %
939 (build,))
940 return
941 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800942
943 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800944 build = event.build
945 if build.build_set is not build.build_set.item.current_build_set:
946 self.log.warning("Build %s is not in the current build set" %
947 (build,))
948 return
949 pipeline = build.build_set.item.pipeline
950 if not pipeline:
951 self.log.warning("Build %s is not associated with a pipeline" %
952 (build,))
953 return
954 pipeline.manager.onBuildCompleted(event.build)
955
956 def _doMergeCompletedEvent(self, event):
957 build_set = event.build_set
958 if build_set is not build_set.item.current_build_set:
959 self.log.warning("Build set %s is not current" % (build_set,))
960 return
961 pipeline = build_set.item.pipeline
962 if not pipeline:
963 self.log.warning("Build set %s is not associated with a pipeline" %
964 (build_set,))
965 return
966 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700967
James E. Blair8dbd56a2012-12-22 10:55:10 -0800968 def formatStatusJSON(self):
969 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400970
971 data['zuul_version'] = self.zuul_version
972
James E. Blair8dbd56a2012-12-22 10:55:10 -0800973 if self._pause:
974 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800975 if self._exit:
976 ret += 'exit'
977 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
978 ret += '</p>'
979 data['message'] = ret
980
James E. Blairfb682cc2013-02-26 15:23:27 -0800981 data['trigger_event_queue'] = {}
982 data['trigger_event_queue']['length'] = \
983 self.trigger_event_queue.qsize()
984 data['result_event_queue'] = {}
985 data['result_event_queue']['length'] = \
986 self.result_event_queue.qsize()
987
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400988 if self.last_reconfigured:
989 data['last_reconfigured'] = self.last_reconfigured * 1000
990
James E. Blair8dbd56a2012-12-22 10:55:10 -0800991 pipelines = []
992 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -0700993 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -0800994 pipelines.append(pipeline.formatStatusJSON())
995 return json.dumps(data)
996
James E. Blair1e8dd892012-05-30 09:15:05 -0700997
James E. Blair4aea70c2012-07-26 14:23:24 -0700998class BasePipelineManager(object):
999 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -07001000
James E. Blair4aea70c2012-07-26 14:23:24 -07001001 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -07001002 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -07001003 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -07001004 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -07001005 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -07001006 if self.sched.config and self.sched.config.has_option(
1007 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -07001008 self.report_times = self.sched.config.getboolean(
1009 'zuul', 'report_times')
1010 else:
1011 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -07001012
1013 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -07001014 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -07001015
James E. Blaireff88162013-07-01 12:44:14 -04001016 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -07001017 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -07001018 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -07001019 self.log.info(" Requirements:")
1020 for f in self.changeish_filters:
1021 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -07001022 self.log.info(" Events:")
1023 for e in self.event_filters:
1024 self.log.info(" %s" % e)
1025 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001026
James E. Blairee743612012-05-29 14:49:32 -07001027 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001028 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001029 if tree.job:
1030 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001031 for b in tree.job._branches:
1032 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001033 for f in tree.job._files:
1034 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001035 if tree.job.skip_if_matcher:
1036 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001037 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001038 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -07001039 hold = ''
1040 if tree.job.hold_following_changes:
1041 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -07001042 voting = ''
1043 if not tree.job.voting:
1044 voting = ' [nonvoting]'
1045 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
1046 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -07001047 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001048 log_jobs(x, indent + 2)
1049
James E. Blaireff88162013-07-01 12:44:14 -04001050 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001051 tree = self.pipeline.getJobTree(p)
1052 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001053 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001054 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001055 self.log.info(" On start:")
1056 self.log.info(" %s" % self.pipeline.start_actions)
1057 self.log.info(" On success:")
1058 self.log.info(" %s" % self.pipeline.success_actions)
1059 self.log.info(" On failure:")
1060 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001061 self.log.info(" On merge-failure:")
1062 self.log.info(" %s" % self.pipeline.merge_failure_actions)
James E. Blairee743612012-05-29 14:49:32 -07001063
James E. Blaire421a232012-07-25 16:59:21 -07001064 def getSubmitAllowNeeds(self):
1065 # Get a list of code review labels that are allowed to be
1066 # "needed" in the submit records for a change, with respect
1067 # to this queue. In other words, the list of review labels
1068 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001069 allow_needs = set()
1070 for action_reporter in self.pipeline.success_actions:
1071 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1072 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001073
James E. Blairc053d022014-01-22 14:57:33 -08001074 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001075 if event.forced_pipeline:
1076 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001077 self.log.debug("Event %s for change %s was directly assigned "
1078 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001079 return True
1080 else:
1081 return False
James E. Blairee743612012-05-29 14:49:32 -07001082 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001083 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001084 self.log.debug("Event %s for change %s matched %s "
1085 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001086 return True
1087 return False
1088
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001089 def isChangeAlreadyInPipeline(self, change):
1090 # Checks live items in the pipeline
1091 for item in self.pipeline.getAllItems():
1092 if item.live and change.equals(item.change):
1093 return True
1094 return False
1095
1096 def isChangeAlreadyInQueue(self, change, change_queue):
1097 # Checks any item in the specified change queue
1098 for item in change_queue.queue:
1099 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001100 return True
1101 return False
1102
James E. Blaire0487072012-08-29 17:38:31 -07001103 def reportStart(self, change):
1104 try:
1105 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001106 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -07001107 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -07001108 if self.sched.config.has_option('zuul', 'status_url'):
1109 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001110 ret = self.sendReport(self.pipeline.start_actions,
1111 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -07001112 if ret:
1113 self.log.error("Reporting change start %s received: %s" %
1114 (change, ret))
1115 except:
1116 self.log.exception("Exception while reporting start:")
1117
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001118 def sendReport(self, action_reporters, change, message):
1119 """Sends the built message off to configured reporters.
1120
1121 Takes the action_reporters, change, message and extra options and
1122 sends them to the pluggable reporters.
1123 """
1124 report_errors = []
1125 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001126 for action_reporter in action_reporters:
1127 ret = action_reporter.report(change, message)
1128 if ret:
1129 report_errors.append(ret)
1130 if len(report_errors) == 0:
1131 return
1132 return report_errors
1133
James E. Blaire0487072012-08-29 17:38:31 -07001134 def isChangeReadyToBeEnqueued(self, change):
1135 return True
1136
James E. Blair5ee24252014-12-30 10:12:29 -08001137 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1138 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001139 return True
1140
James E. Blair5ee24252014-12-30 10:12:29 -08001141 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1142 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001143 return True
1144
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001145 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001146 return True
1147
James E. Blair6965a4b2014-12-16 17:19:04 -08001148 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001149 return None
1150
James E. Blairfee8d652013-06-07 08:57:52 -07001151 def getDependentItems(self, item):
1152 orig_item = item
1153 items = []
1154 while item.item_ahead:
1155 items.append(item.item_ahead)
1156 item = item.item_ahead
1157 self.log.info("Change %s depends on changes %s" %
1158 (orig_item.change,
1159 [x.change for x in items]))
1160 return items
1161
James E. Blair972e3c72013-08-29 12:04:55 -07001162 def getItemForChange(self, change):
1163 for item in self.pipeline.getAllItems():
1164 if item.change.equals(change):
1165 return item
1166 return None
1167
James E. Blair2fa50962013-01-30 21:50:41 -08001168 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001169 for item in self.pipeline.getAllItems():
1170 if not item.live:
1171 continue
1172 if change.isUpdateOf(item.change):
1173 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001174 return None
1175
1176 def removeOldVersionsOfChange(self, change):
1177 if not self.pipeline.dequeue_on_new_patchset:
1178 return
James E. Blairba437362015-02-07 11:41:52 -08001179 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1180 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001181 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001182 (change, old_item.change, old_item))
1183 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001184
Antoine Mussobd86a312014-01-08 14:51:33 +01001185 def removeAbandonedChange(self, change):
1186 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001187 for item in self.pipeline.getAllItems():
1188 if not item.live:
1189 continue
1190 if item.change.equals(change):
1191 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001192
James E. Blairbfb8e042014-12-30 17:01:44 -08001193 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001194 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1195 if change_queue:
1196 self.log.debug("Re-enqueing change %s in queue %s" %
1197 (item.change, change_queue))
1198 change_queue.enqueueItem(item)
1199 self.reportStats(item)
1200 return True
1201 else:
1202 self.log.error("Unable to find change queue for project %s" %
1203 item.change.project)
1204 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001205
James E. Blairf9ab8842014-07-10 13:12:07 -07001206 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001207 ignore_requirements=False, live=True,
1208 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001209 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001210
1211 # If we are adding a live change, check if it's a live item
1212 # anywhere in the pipeline. Otherwise, we will perform the
1213 # duplicate check below on the specific change_queue.
1214 if live and self.isChangeAlreadyInPipeline(change):
1215 self.log.debug("Change %s is already in pipeline, "
1216 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001217 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001218
James E. Blaire0487072012-08-29 17:38:31 -07001219 if not self.isChangeReadyToBeEnqueued(change):
1220 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1221 change)
1222 return False
1223
James E. Blairf9ab8842014-07-10 13:12:07 -07001224 if not ignore_requirements:
1225 for f in self.changeish_filters:
1226 if not f.matches(change):
1227 self.log.debug("Change %s does not match pipeline "
1228 "requirement %s" % (change, f))
1229 return False
James E. Blair11041d22014-05-02 14:49:53 -07001230
James E. Blair0577cd62015-02-07 11:42:12 -08001231 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001232 if not change_queue:
1233 self.log.debug("Unable to find change queue for "
1234 "change %s in project %s" %
1235 (change, change.project))
1236 return False
1237
James E. Blair0577cd62015-02-07 11:42:12 -08001238 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1239 change_queue):
1240 self.log.debug("Failed to enqueue changes "
1241 "ahead of %s" % change)
1242 return False
James E. Blaire0487072012-08-29 17:38:31 -07001243
James E. Blair0577cd62015-02-07 11:42:12 -08001244 if self.isChangeAlreadyInQueue(change, change_queue):
1245 self.log.debug("Change %s is already in queue, "
1246 "ignoring" % change)
1247 return True
1248
1249 self.log.debug("Adding change %s to queue %s" %
1250 (change, change_queue))
1251 if not quiet:
1252 if len(self.pipeline.start_actions) > 0:
1253 self.reportStart(change)
1254 item = change_queue.enqueueChange(change)
1255 if enqueue_time:
1256 item.enqueue_time = enqueue_time
1257 item.live = live
1258 self.reportStats(item)
1259 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1260 change_queue)
1261 self.sched.triggers['zuul'].onChangeEnqueued(item.change,
1262 self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001263 return True
1264
James E. Blair972e3c72013-08-29 12:04:55 -07001265 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001266 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001267 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001268
James E. Blairba437362015-02-07 11:41:52 -08001269 def removeItem(self, item):
1270 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001271 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001272 self.log.debug("Canceling builds behind change: %s "
1273 "because it is being removed." % item.change)
1274 self.cancelJobs(item)
1275 self.dequeueItem(item)
1276 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001277
James E. Blairac2c3242014-01-24 13:38:51 -08001278 def _makeMergerItem(self, item):
1279 # Create a dictionary with all info about the item needed by
1280 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001281 number = None
1282 patchset = None
1283 oldrev = None
1284 newrev = None
1285 if hasattr(item.change, 'number'):
1286 number = item.change.number
1287 patchset = item.change.patchset
1288 elif hasattr(item.change, 'newrev'):
1289 oldrev = item.change.oldrev
1290 newrev = item.change.newrev
James E. Blairac2c3242014-01-24 13:38:51 -08001291 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001292 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001293 item.change.project),
1294 merge_mode=item.change.project.merge_mode,
1295 refspec=item.change.refspec,
1296 branch=item.change.branch,
1297 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001298 number=number,
1299 patchset=patchset,
1300 oldrev=oldrev,
1301 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001302 )
1303
James E. Blairfee8d652013-06-07 08:57:52 -07001304 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001305 # Returns True if the ref is ready, false otherwise
1306 build_set = item.current_build_set
1307 if build_set.merge_state == build_set.COMPLETE:
1308 return True
1309 if build_set.merge_state == build_set.PENDING:
1310 return False
1311 build_set.merge_state = build_set.PENDING
1312 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001313 if hasattr(item.change, 'refspec') and not ref:
1314 self.log.debug("Preparing ref for: %s" % item.change)
1315 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001316 dependent_items = self.getDependentItems(item)
1317 dependent_items.reverse()
1318 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001319 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001320 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001321 item.current_build_set,
1322 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001323 else:
1324 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001325 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001326 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001327 url, build_set,
1328 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001329 return False
1330
1331 def _launchJobs(self, item, jobs):
1332 self.log.debug("Launching jobs for change %s" % item.change)
1333 dependent_items = self.getDependentItems(item)
1334 for job in jobs:
1335 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001336 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001337 build = self.sched.launcher.launch(job, item,
1338 self.pipeline,
1339 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001340 self.log.debug("Adding build %s of job %s to item %s" %
1341 (build, job, item))
1342 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001343 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001344 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001345 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001346
James E. Blairfee8d652013-06-07 08:57:52 -07001347 def launchJobs(self, item):
1348 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001349 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001350 self._launchJobs(item, jobs)
1351
1352 def cancelJobs(self, item, prime=True):
1353 self.log.debug("Cancel jobs for change %s" % item.change)
1354 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001355 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001356 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001357 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001358 for build in old_build_set.getBuilds():
1359 try:
1360 self.sched.launcher.cancel(build)
1361 except:
1362 self.log.exception("Exception while canceling build %s "
1363 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001364 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001365 canceled = True
James E. Blair972e3c72013-08-29 12:04:55 -07001366 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001367 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001368 (item_behind.change, item.change))
1369 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001370 canceled = True
1371 return canceled
1372
James E. Blair4076e2b2014-01-28 12:42:20 -08001373 def _processOneItem(self, item, nnfi, ready_ahead):
James E. Blairfee8d652013-06-07 08:57:52 -07001374 changed = False
1375 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001376 if item_ahead and (not item_ahead.live):
1377 item_ahead = None
1378 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001379 failing_reasons = [] # Reasons this item is failing
1380
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001381 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001382 # It's not okay to enqueue this change, we should remove it.
1383 self.log.info("Dequeuing change %s because "
1384 "it can no longer merge" % item.change)
1385 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001386 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001387 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001388 if item.live:
1389 try:
1390 self.reportItem(item)
1391 except MergeFailure:
1392 pass
James E. Blair4076e2b2014-01-28 12:42:20 -08001393 return (True, nnfi, ready_ahead)
James E. Blair6965a4b2014-12-16 17:19:04 -08001394 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001395 actionable = change_queue.isActionable(item)
1396 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001397 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001398 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001399 failing_reasons.append('a needed change is failing')
1400 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001401 else:
James E. Blairfef71632013-09-23 11:15:47 -07001402 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001403 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001404 item_ahead_merged = True
1405 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001406 # Our current base is different than what we expected,
1407 # and it's not because our current base merged. Something
1408 # ahead must have failed.
1409 self.log.info("Resetting builds for change %s because the "
1410 "item ahead, %s, is not the nearest non-failing "
1411 "item, %s" % (item.change, item_ahead, nnfi))
1412 change_queue.moveItem(item, nnfi)
1413 changed = True
1414 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001415 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001416 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001417 if item.current_build_set.unable_to_merge:
1418 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001419 ready = False
1420 if not ready:
1421 ready_ahead = False
1422 if actionable and ready_ahead and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001423 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001424 if self.pipeline.didAnyJobFail(item):
1425 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001426 if (not item.live) and (not item.items_behind):
1427 failing_reasons.append("is a non-live item with no items behind")
1428 self.dequeueItem(item)
1429 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001430 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1431 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001432 try:
1433 self.reportItem(item)
1434 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001435 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001436 for item_behind in item.items_behind:
1437 self.log.info("Resetting builds for change %s because the "
1438 "item ahead, %s, failed to merge" %
1439 (item_behind.change, item))
1440 self.cancelJobs(item_behind)
1441 self.dequeueItem(item)
1442 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001443 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001444 nnfi = item
1445 item.current_build_set.failing_reasons = failing_reasons
1446 if failing_reasons:
1447 self.log.debug("%s is a failing item because %s" %
1448 (item, failing_reasons))
James E. Blair4076e2b2014-01-28 12:42:20 -08001449 return (changed, nnfi, ready_ahead)
James E. Blairfee8d652013-06-07 08:57:52 -07001450
1451 def processQueue(self):
1452 # Do whatever needs to be done for each change in the queue
1453 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1454 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001455 for queue in self.pipeline.queues:
1456 queue_changed = False
1457 nnfi = None # Nearest non-failing item
James E. Blair4076e2b2014-01-28 12:42:20 -08001458 ready_ahead = True # All build sets ahead are ready
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001459 for item in queue.queue[:]:
James E. Blair4076e2b2014-01-28 12:42:20 -08001460 item_changed, nnfi, ready_ahhead = self._processOneItem(
1461 item, nnfi, ready_ahead)
James E. Blair972e3c72013-08-29 12:04:55 -07001462 if item_changed:
1463 queue_changed = True
1464 self.reportStats(item)
1465 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001466 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001467 status = ''
1468 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001469 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001470 if status:
1471 self.log.debug("Queue %s status is now:\n %s" %
1472 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001473 self.log.debug("Finished queue processor: %s (changed: %s)" %
1474 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001475 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001476
James E. Blair11700c32012-07-05 17:50:05 -07001477 def updateBuildDescriptions(self, build_set):
1478 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001479 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001480 self.sched.launcher.setBuildDescription(build, desc)
1481
1482 if build_set.previous_build_set:
1483 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001484 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001485 self.sched.launcher.setBuildDescription(build, desc)
1486
1487 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001488 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001489 self.updateBuildDescriptions(build.build_set)
1490 return True
1491
James E. Blairee743612012-05-29 14:49:32 -07001492 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001493 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001494 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001495
James E. Blair6b077942014-02-07 17:45:55 -08001496 self.pipeline.setResult(item, build)
1497 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001498 (item, item.formatStatus()))
James E. Blair11700c32012-07-05 17:50:05 -07001499 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -07001500 return True
1501
James E. Blair4076e2b2014-01-28 12:42:20 -08001502 def onMergeCompleted(self, event):
1503 build_set = event.build_set
1504 item = build_set.item
1505 build_set.merge_state = build_set.COMPLETE
1506 build_set.zuul_url = event.zuul_url
1507 if event.merged:
1508 build_set.commit = event.commit
1509 elif event.updated:
1510 build_set.commit = item.change.newrev
1511 if not build_set.commit:
1512 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001513 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001514
James E. Blairfee8d652013-06-07 08:57:52 -07001515 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001516 if not item.reported:
1517 # _reportItem() returns True if it failed to report.
1518 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001519 if self.changes_merge:
1520 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001521 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001522 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001523 merged = self.pipeline.source.isMerged(item.change,
1524 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001525 self.log.info("Reported change %s status: all-succeeded: %s, "
1526 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001527 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001528 if not (succeeded and merged):
1529 self.log.debug("Reported change %s failed tests or failed "
1530 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001531 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001532 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001533 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001534 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001535 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001536 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001537 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001538 (change_queue, change_queue.window))
James E. Blairc494d542014-08-06 09:23:52 -07001539 self.sched.triggers['zuul'].onChangeMerged(item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001540
James E. Blairfee8d652013-06-07 08:57:52 -07001541 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001542 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001543 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001544 if not self.pipeline.getJobs(item):
1545 # We don't send empty reports with +1,
1546 # and the same for -1's (merge failures or transient errors)
1547 # as they cannot be followed by +1's
1548 self.log.debug("No jobs for change %s" % item.change)
1549 actions = []
1550 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001551 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001552 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001553 item.setReportedResult('SUCCESS')
Joshua Heskethb7179772014-01-30 23:30:46 +11001554 elif not self.pipeline.didMergerSucceed(item):
1555 actions = self.pipeline.merge_failure_actions
1556 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001557 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001558 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001559 item.setReportedResult('FAILURE')
James E. Blaire5910202013-12-27 09:50:31 -08001560 if actions:
1561 report = self.formatReport(item)
1562 try:
1563 self.log.info("Reporting change %s, actions: %s" %
1564 (item.change, actions))
1565 ret = self.sendReport(actions, item.change, report)
1566 if ret:
1567 self.log.error("Reporting change %s received: %s" %
1568 (item.change, ret))
1569 except:
1570 self.log.exception("Exception while reporting:")
1571 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001572 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001573 return ret
1574
James E. Blairfee8d652013-06-07 08:57:52 -07001575 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001576 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001577
Jeremy Stanley10837132014-08-02 16:10:56 +00001578 if item.dequeued_needing_change:
1579 ret += 'This change depends on a change that failed to merge.\n'
1580 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001581 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001582 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001583 if self.pipeline.didAllJobsSucceed(item):
1584 ret += self.pipeline.success_message + '\n\n'
1585 else:
1586 ret += self.pipeline.failure_message + '\n\n'
1587 ret += self._formatReportJobs(item)
1588
1589 if self.pipeline.footer_message:
1590 ret += '\n' + self.pipeline.footer_message
1591
1592 return ret
1593
1594 def _formatReportJobs(self, item):
1595 # Return the list of jobs portion of the report
1596 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001597
Joshua Heskethb7179772014-01-30 23:30:46 +11001598 if self.sched.config.has_option('zuul', 'url_pattern'):
1599 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001600 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001601 url_pattern = None
1602
James E. Blair107c3852015-02-07 08:23:10 -08001603 for job in self.pipeline.getJobs(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001604 build = item.current_build_set.getBuild(job.name)
1605 result = build.result
1606 pattern = url_pattern
1607 if result == 'SUCCESS':
1608 if job.success_message:
1609 result = job.success_message
1610 if job.success_pattern:
1611 pattern = job.success_pattern
1612 elif result == 'FAILURE':
1613 if job.failure_message:
1614 result = job.failure_message
1615 if job.failure_pattern:
1616 pattern = job.failure_pattern
1617 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001618 url = pattern.format(change=item.change,
1619 pipeline=self.pipeline,
1620 job=job,
1621 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001622 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001623 url = build.url or job.name
1624 if not job.voting:
1625 voting = ' (non-voting)'
1626 else:
1627 voting = ''
1628 if self.report_times and build.end_time and build.start_time:
1629 dt = int(build.end_time - build.start_time)
1630 m, s = divmod(dt, 60)
1631 h, m = divmod(m, 60)
1632 if h:
1633 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1634 elif m:
1635 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001636 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001637 elapsed = ' in %ds' % (s)
1638 else:
1639 elapsed = ''
1640 name = ''
1641 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1642 if self.sched.config.getboolean('zuul',
1643 'job_name_in_report'):
1644 name = job.name + ' '
1645 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1646 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001647 return ret
1648
1649 def formatDescription(self, build):
1650 concurrent_changes = ''
1651 concurrent_builds = ''
1652 other_builds = ''
1653
1654 for change in build.build_set.other_changes:
1655 concurrent_changes += '<li><a href="{change.url}">\
1656 {change.number},{change.patchset}</a></li>'.format(
1657 change=change)
1658
James E. Blairfee8d652013-06-07 08:57:52 -07001659 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001660
1661 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001662 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001663 concurrent_builds += """\
1664<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001665 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001666 {build.job.name} #{build.number}</a>: {build.result}
1667</li>
1668""".format(build=build)
1669 else:
1670 concurrent_builds += """\
1671<li>
1672 {build.job.name}: {build.result}
1673</li>""".format(build=build)
1674
1675 if build.build_set.previous_build_set:
1676 other_build = build.build_set.previous_build_set.getBuild(
1677 build.job.name)
1678 if other_build:
1679 other_builds += """\
1680<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001681 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001682 {build.job.name} #{build.number}</a>
1683</li>
1684""".format(build=other_build)
1685
1686 if build.build_set.next_build_set:
1687 other_build = build.build_set.next_build_set.getBuild(
1688 build.job.name)
1689 if other_build:
1690 other_builds += """\
1691<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001692 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001693 {build.job.name} #{build.number}</a>
1694</li>
1695""".format(build=other_build)
1696
1697 result = build.build_set.result
1698
1699 if hasattr(change, 'number'):
1700 ret = """\
1701<p>
1702 Triggered by change:
1703 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1704 Branch: <b>{change.branch}</b><br/>
1705 Pipeline: <b>{self.pipeline.name}</b>
1706</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001707 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001708 ret = """\
1709<p>
1710 Triggered by reference:
1711 {change.ref}</a><br/>
1712 Old revision: <b>{change.oldrev}</b><br/>
1713 New revision: <b>{change.newrev}</b><br/>
1714 Pipeline: <b>{self.pipeline.name}</b>
1715</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001716 else:
1717 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001718
1719 if concurrent_changes:
1720 ret += """\
1721<p>
1722 Other changes tested concurrently with this change:
1723 <ul>{concurrent_changes}</ul>
1724</p>
1725"""
1726 if concurrent_builds:
1727 ret += """\
1728<p>
1729 All builds for this change set:
1730 <ul>{concurrent_builds}</ul>
1731</p>
1732"""
1733
1734 if other_builds:
1735 ret += """\
1736<p>
1737 Other build sets for this change:
1738 <ul>{other_builds}</ul>
1739</p>
1740"""
1741 if result:
1742 ret += """\
1743<p>
1744 Reported result: <b>{result}</b>
1745</p>
1746"""
1747
1748 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001749 return ret
1750
James E. Blairfee8d652013-06-07 08:57:52 -07001751 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001752 if not statsd:
1753 return
1754 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001755 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001756 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001757 if item.dequeue_time:
1758 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001759 else:
1760 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001761 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001762
1763 # stats.timers.zuul.pipeline.NAME.resident_time
1764 # stats_counts.zuul.pipeline.NAME.total_changes
1765 # stats.gauges.zuul.pipeline.NAME.current_changes
1766 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001767 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001768 if dt:
1769 statsd.timing(key + '.resident_time', dt)
1770 statsd.incr(key + '.total_changes')
1771
1772 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1773 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001774 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001775 key += '.%s' % project_name
1776 if dt:
1777 statsd.timing(key + '.resident_time', dt)
1778 statsd.incr(key + '.total_changes')
1779 except:
1780 self.log.exception("Exception reporting pipeline stats")
1781
James E. Blair1e8dd892012-05-30 09:15:05 -07001782
James E. Blair0577cd62015-02-07 11:42:12 -08001783class DynamicChangeQueueContextManager(object):
1784 def __init__(self, change_queue):
1785 self.change_queue = change_queue
1786
1787 def __enter__(self):
1788 return self.change_queue
1789
1790 def __exit__(self, etype, value, tb):
1791 if self.change_queue and not self.change_queue.queue:
1792 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1793
1794
James E. Blair4aea70c2012-07-26 14:23:24 -07001795class IndependentPipelineManager(BasePipelineManager):
1796 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001797 changes_merge = False
1798
James E. Blaireff88162013-07-01 12:44:14 -04001799 def _postConfig(self, layout):
1800 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001801
James E. Blair0577cd62015-02-07 11:42:12 -08001802 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001803 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001804 if existing:
1805 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001806 if change.project not in self.pipeline.getProjects():
James E. Blair0577cd62015-02-07 11:42:12 -08001807 return DynamicChangeQueueContextManager(None)
James E. Blairbfb8e042014-12-30 17:01:44 -08001808 change_queue = ChangeQueue(self.pipeline)
1809 change_queue.addProject(change.project)
1810 self.pipeline.addQueue(change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001811 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001812
1813 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1814 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001815 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001816 if ret in [True, False]:
1817 return ret
1818 self.log.debug(" Changes %s must be merged ahead of %s" %
1819 (ret, change))
1820 for needed_change in ret:
1821 # This differs from the dependent pipeline by enqueuing
1822 # changes ahead as "not live", that is, not intended to
1823 # have jobs run. Also, pipeline requirements are always
1824 # ignored (which is safe because the changes are not
1825 # live).
1826 r = self.addChange(needed_change, quiet=True,
1827 ignore_requirements=True,
1828 live=False, change_queue=change_queue)
1829 if not r:
1830 return False
1831 return True
1832
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001833 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001834 if self.pipeline.ignore_dependencies:
1835 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001836 self.log.debug("Checking for changes needed by %s:" % change)
1837 # Return true if okay to proceed enqueing this change,
1838 # false if the change should not be enqueued.
1839 if not hasattr(change, 'needs_changes'):
1840 self.log.debug(" Changeish does not support dependencies")
1841 return True
1842 if not change.needs_changes:
1843 self.log.debug(" No changes needed")
1844 return True
1845 changes_needed = []
1846 for needed_change in change.needs_changes:
1847 self.log.debug(" Change %s needs change %s:" % (
1848 change, needed_change))
1849 if needed_change.is_merged:
1850 self.log.debug(" Needed change is merged")
1851 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001852 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001853 self.log.debug(" Needed change is already ahead in the queue")
1854 continue
1855 self.log.debug(" Change %s is needed" % needed_change)
1856 if needed_change not in changes_needed:
1857 changes_needed.append(needed_change)
1858 continue
1859 # This differs from the dependent pipeline check in not
1860 # verifying that the dependent change is mergable.
1861 if changes_needed:
1862 return changes_needed
1863 return True
1864
1865 def dequeueItem(self, item):
1866 super(IndependentPipelineManager, self).dequeueItem(item)
1867 # An independent pipeline manager dynamically removes empty
1868 # queues
1869 if not item.queue.queue:
1870 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001871
James E. Blair1e8dd892012-05-30 09:15:05 -07001872
James E. Blair0577cd62015-02-07 11:42:12 -08001873class StaticChangeQueueContextManager(object):
1874 def __init__(self, change_queue):
1875 self.change_queue = change_queue
1876
1877 def __enter__(self):
1878 return self.change_queue
1879
1880 def __exit__(self, etype, value, tb):
1881 pass
1882
1883
James E. Blair4aea70c2012-07-26 14:23:24 -07001884class DependentPipelineManager(BasePipelineManager):
1885 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001886 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001887
1888 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001889 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001890
James E. Blaireff88162013-07-01 12:44:14 -04001891 def _postConfig(self, layout):
1892 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001893 self.buildChangeQueues()
1894
1895 def buildChangeQueues(self):
1896 self.log.debug("Building shared change queues")
1897 change_queues = []
1898
James E. Blair4aea70c2012-07-26 14:23:24 -07001899 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001900 change_queue = ChangeQueue(
1901 self.pipeline,
1902 window=self.pipeline.window,
1903 window_floor=self.pipeline.window_floor,
1904 window_increase_type=self.pipeline.window_increase_type,
1905 window_increase_factor=self.pipeline.window_increase_factor,
1906 window_decrease_type=self.pipeline.window_decrease_type,
1907 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001908 change_queue.addProject(project)
1909 change_queues.append(change_queue)
1910 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001911
James E. Blairc3d428e2013-12-03 15:06:48 -08001912 # Iterate over all queues trying to combine them, and keep doing
1913 # so until they can not be combined further.
1914 last_change_queues = change_queues
1915 while True:
1916 new_change_queues = self.combineChangeQueues(last_change_queues)
1917 if len(last_change_queues) == len(new_change_queues):
1918 break
1919 last_change_queues = new_change_queues
1920
1921 self.log.info(" Shared change queues:")
1922 for queue in new_change_queues:
1923 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08001924 self.log.info(" %s containing %s" % (
1925 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08001926
1927 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001928 self.log.debug("Combining shared queues")
1929 new_change_queues = []
1930 for a in change_queues:
1931 merged_a = False
1932 for b in new_change_queues:
1933 if not a.getJobs().isdisjoint(b.getJobs()):
1934 self.log.debug("Merging queue %s into %s" % (a, b))
1935 b.mergeChangeQueue(a)
1936 merged_a = True
1937 break # this breaks out of 'for b' and continues 'for a'
1938 if not merged_a:
1939 self.log.debug("Keeping queue %s" % (a))
1940 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001941 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001942
James E. Blair0577cd62015-02-07 11:42:12 -08001943 def getChangeQueue(self, change, existing=None):
1944 if existing:
1945 return StaticChangeQueueContextManager(existing)
1946 return StaticChangeQueueContextManager(
1947 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08001948
James E. Blaire0487072012-08-29 17:38:31 -07001949 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07001950 if not self.pipeline.source.canMerge(change,
1951 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001952 self.log.debug("Change %s can not merge, ignoring" % change)
1953 return False
1954 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001955
James E. Blair5ee24252014-12-30 10:12:29 -08001956 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1957 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001958 to_enqueue = []
1959 self.log.debug("Checking for changes needing %s:" % change)
1960 if not hasattr(change, 'needed_by_changes'):
1961 self.log.debug(" Changeish does not support dependencies")
1962 return
James E. Blair5ee24252014-12-30 10:12:29 -08001963 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08001964 with self.getChangeQueue(other_change) as other_change_queue:
1965 if other_change_queue != change_queue:
1966 self.log.debug(" Change %s in project %s can not be "
1967 "enqueued in the target queue %s" %
1968 (other_change, other_change.project,
1969 change_queue))
1970 continue
James E. Blair5ee24252014-12-30 10:12:29 -08001971 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07001972 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001973 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08001974 (other_change, change))
1975 to_enqueue.append(other_change)
1976
James E. Blaire0487072012-08-29 17:38:31 -07001977 if not to_enqueue:
1978 self.log.debug(" No changes need %s" % change)
1979
1980 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07001981 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001982 ignore_requirements=ignore_requirements,
1983 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001984
James E. Blair5ee24252014-12-30 10:12:29 -08001985 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1986 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001987 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001988 if ret in [True, False]:
1989 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08001990 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07001991 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08001992 for needed_change in ret:
1993 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001994 ignore_requirements=ignore_requirements,
1995 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08001996 if not r:
1997 return False
1998 return True
James E. Blaire0487072012-08-29 17:38:31 -07001999
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002000 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002001 self.log.debug("Checking for changes needed by %s:" % change)
2002 # Return true if okay to proceed enqueing this change,
2003 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002004 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002005 self.log.debug(" Changeish does not support dependencies")
2006 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002007 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002008 self.log.debug(" No changes needed")
2009 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002010 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002011 # Ignore supplied change_queue
2012 with self.getChangeQueue(change) as change_queue:
2013 for needed_change in change.needs_changes:
2014 self.log.debug(" Change %s needs change %s:" % (
2015 change, needed_change))
2016 if needed_change.is_merged:
2017 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002018 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002019 with self.getChangeQueue(needed_change) as needed_change_queue:
2020 if needed_change_queue != change_queue:
2021 self.log.debug(" Change %s in project %s does not "
2022 "share a change queue with %s "
2023 "in project %s" %
2024 (needed_change, needed_change.project,
2025 change, change.project))
2026 return False
2027 if not needed_change.is_current_patchset:
2028 self.log.debug(" Needed change is not the "
2029 "current patchset")
2030 return False
2031 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2032 self.log.debug(" Needed change is already ahead "
2033 "in the queue")
2034 continue
2035 if self.pipeline.source.canMerge(needed_change,
2036 self.getSubmitAllowNeeds()):
2037 self.log.debug(" Change %s is needed" % needed_change)
2038 if needed_change not in changes_needed:
2039 changes_needed.append(needed_change)
2040 continue
2041 # The needed change can't be merged.
2042 self.log.debug(" Change %s is needed but can not be merged" %
2043 needed_change)
2044 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002045 if changes_needed:
2046 return changes_needed
2047 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002048
James E. Blair6965a4b2014-12-16 17:19:04 -08002049 def getFailingDependentItems(self, item):
2050 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002051 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002052 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002053 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002054 failing_items = set()
2055 for needed_change in item.change.needs_changes:
2056 needed_item = self.getItemForChange(needed_change)
2057 if not needed_item:
2058 continue
2059 if needed_item.current_build_set.failing_reasons:
2060 failing_items.add(needed_item)
2061 if failing_items:
2062 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002063 return None