blob: fc21bf10d2d2a8dc1f9af012bf983fb98d00b361 [file] [log] [blame]
James E. Blairc494d542014-08-06 09:23:52 -07001# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
33from model import EventFilter, ChangeishFilter
Maru Newby3fe5f852015-01-13 04:22:14 +000034from zuul import change_matcher
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairfee8d652013-06-07 08:57:52 -070062class MergeFailure(Exception):
63 pass
64
65
James E. Blair468c8512013-12-06 13:27:19 -080066class ManagementEvent(object):
67 """An event that should be processed within the main queue run loop"""
68 def __init__(self):
69 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080070 self._exception = None
71 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080072
James E. Blair36658cf2013-12-06 17:53:48 -080073 def exception(self, e, tb):
74 self._exception = e
75 self._traceback = tb
76 self._wait_event.set()
77
78 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080079 self._wait_event.set()
80
81 def wait(self, timeout=None):
82 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080083 if self._exception:
84 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080085 return self._wait_event.is_set()
86
87
88class ReconfigureEvent(ManagementEvent):
89 """Reconfigure the scheduler. The layout will be (re-)loaded from
90 the path specified in the configuration.
91
92 :arg ConfigParser config: the new configuration
93 """
94 def __init__(self, config):
95 super(ReconfigureEvent, self).__init__()
96 self.config = config
97
98
James E. Blair36658cf2013-12-06 17:53:48 -080099class PromoteEvent(ManagementEvent):
100 """Promote one or more changes to the head of the queue.
101
102 :arg str pipeline_name: the name of the pipeline
103 :arg list change_ids: a list of strings of change ids in the form
104 1234,1
105 """
106
107 def __init__(self, pipeline_name, change_ids):
108 super(PromoteEvent, self).__init__()
109 self.pipeline_name = pipeline_name
110 self.change_ids = change_ids
111
112
James E. Blaird27a96d2014-07-10 13:25:13 -0700113class EnqueueEvent(ManagementEvent):
114 """Enqueue a change into a pipeline
115
116 :arg TriggerEvent trigger_event: a TriggerEvent describing the
117 trigger, pipeline, and change to enqueue
118 """
119
120 def __init__(self, trigger_event):
121 super(EnqueueEvent, self).__init__()
122 self.trigger_event = trigger_event
123
124
James E. Blaira84f0e42014-02-06 07:09:22 -0800125class ResultEvent(object):
126 """An event that needs to modify the pipeline state due to a
127 result from an external system."""
128
129 pass
130
131
132class BuildStartedEvent(ResultEvent):
133 """A build has started.
134
135 :arg Build build: The build which has started.
136 """
137
138 def __init__(self, build):
139 self.build = build
140
141
142class BuildCompletedEvent(ResultEvent):
143 """A build has completed
144
145 :arg Build build: The build which has completed.
146 """
147
148 def __init__(self, build):
149 self.build = build
150
151
James E. Blair4076e2b2014-01-28 12:42:20 -0800152class MergeCompletedEvent(ResultEvent):
153 """A remote merge operation has completed
154
155 :arg BuildSet build_set: The build_set which is ready.
156 :arg str zuul_url: The URL of the Zuul Merger.
157 :arg bool merged: Whether the merge succeeded (changes with refs).
158 :arg bool updated: Whether the repo was updated (changes without refs).
159 :arg str commit: The SHA of the merged commit (changes with refs).
160 """
161
162 def __init__(self, build_set, zuul_url, merged, updated, commit):
163 self.build_set = build_set
164 self.zuul_url = zuul_url
165 self.merged = merged
166 self.updated = updated
167 self.commit = commit
168
169
Maru Newby3fe5f852015-01-13 04:22:14 +0000170def toList(item):
171 if not item:
172 return []
173 if isinstance(item, list):
174 return item
175 return [item]
176
177
James E. Blaire9d45c32012-05-31 09:56:45 -0700178class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700179 log = logging.getLogger("zuul.Scheduler")
180
James E. Blaire9d45c32012-05-31 09:56:45 -0700181 def __init__(self):
182 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400183 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700184 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700185 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800186 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700187 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700188 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700189 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700190 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800191 self.merger = None
James E. Blair6c358e72013-07-29 17:06:47 -0700192 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000193 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700194 self.config = None
James E. Blairee743612012-05-29 14:49:32 -0700195
196 self.trigger_event_queue = Queue.Queue()
197 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800198 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400199 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700200
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400201 self.zuul_version = zuul_version.version_info.version_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400202 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400203
James E. Blairb0fcae42012-07-17 11:12:10 -0700204 def stop(self):
205 self._stopped = True
206 self.wake_event.set()
207
James E. Blair47958382013-01-10 17:26:02 -0800208 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700209 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800210
Maru Newby3fe5f852015-01-13 04:22:14 +0000211 def _parseSkipIf(self, config_job):
212 cm = change_matcher
213 skip_matchers = []
214
215 for config_skip in config_job.get('skip-if', []):
216 nested_matchers = []
217
218 project_regex = config_skip.get('project')
219 if project_regex:
220 nested_matchers.append(cm.ProjectMatcher(project_regex))
221
222 branch_regex = config_skip.get('branch')
223 if branch_regex:
224 nested_matchers.append(cm.BranchMatcher(branch_regex))
225
226 file_regexes = toList(config_skip.get('all-files-match-any'))
227 if file_regexes:
228 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
229 all_files_matcher = cm.MatchAllFiles(file_matchers)
230 nested_matchers.append(all_files_matcher)
231
232 # All patterns need to match a given skip-if predicate
233 skip_matchers.append(cm.MatchAll(nested_matchers))
234
235 if skip_matchers:
236 # Any skip-if predicate can be matched to trigger a skip
237 return cm.MatchAny(skip_matchers)
238
James E. Blaire5a847f2012-07-10 15:29:14 -0700239 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400240 layout = model.Layout()
241 project_templates = {}
242
James E. Blaire5a847f2012-07-10 15:29:14 -0700243 if config_path:
244 config_path = os.path.expanduser(config_path)
245 if not os.path.exists(config_path):
246 raise Exception("Unable to read layout config file at %s" %
247 config_path)
248 config_file = open(config_path)
249 data = yaml.load(config_file)
250
James E. Blair47958382013-01-10 17:26:02 -0800251 validator = layoutvalidator.LayoutValidator()
252 validator.validate(data)
253
James E. Blaireff88162013-07-01 12:44:14 -0400254 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700255 for include in data.get('includes', []):
256 if 'python-file' in include:
257 fn = include['python-file']
258 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100259 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700260 fn = os.path.join(base, fn)
261 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400262 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700263
James E. Blair4aea70c2012-07-26 14:23:24 -0700264 for conf_pipeline in data.get('pipelines', []):
265 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800266 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700267 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000268 pipeline.source = self.triggers[conf_pipeline.get('source',
269 'gerrit')]
James E. Blair64ed6f22013-07-10 14:07:23 -0700270 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
271 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800272 pipeline.failure_message = conf_pipeline.get('failure-message',
273 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100274 pipeline.merge_failure_message = conf_pipeline.get(
275 'merge-failure-message', "Merge Failed.\n\nThis change was "
276 "unable to be automatically merged with the current state of "
277 "the repository. Please rebase your change and upload a new "
278 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800279 pipeline.success_message = conf_pipeline.get('success-message',
280 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100281 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800282 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700283 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000284
285 action_reporters = {}
Joshua Heskethb7179772014-01-30 23:30:46 +1100286 for action in ['start', 'success', 'failure', 'merge-failure']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000287 action_reporters[action] = []
288 if conf_pipeline.get(action):
289 for reporter_name, params \
290 in conf_pipeline.get(action).items():
291 if reporter_name in self.reporters.keys():
292 action_reporters[action].append(ActionReporter(
293 self.reporters[reporter_name], params))
294 else:
295 self.log.error('Invalid reporter name %s' %
296 reporter_name)
297 pipeline.start_actions = action_reporters['start']
298 pipeline.success_actions = action_reporters['success']
299 pipeline.failure_actions = action_reporters['failure']
Joshua Heskethb7179772014-01-30 23:30:46 +1100300 if len(action_reporters['merge-failure']) > 0:
301 pipeline.merge_failure_actions = \
302 action_reporters['merge-failure']
303 else:
304 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000305
Clark Boylan7603a372014-01-21 11:43:20 -0800306 pipeline.window = conf_pipeline.get('window', 20)
307 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
308 pipeline.window_increase_type = conf_pipeline.get(
309 'window-increase-type', 'linear')
310 pipeline.window_increase_factor = conf_pipeline.get(
311 'window-increase-factor', 1)
312 pipeline.window_decrease_type = conf_pipeline.get(
313 'window-decrease-type', 'exponential')
314 pipeline.window_decrease_factor = conf_pipeline.get(
315 'window-decrease-factor', 2)
316
James E. Blair4aea70c2012-07-26 14:23:24 -0700317 manager = globals()[conf_pipeline['manager']](self, pipeline)
318 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400319 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000320
James E. Blair11041d22014-05-02 14:49:53 -0700321 if 'require' in conf_pipeline:
322 require = conf_pipeline['require']
Clark Boylana9702ad2014-05-08 17:17:24 -0700323 f = ChangeishFilter(
324 open=require.get('open'),
325 current_patchset=require.get('current-patchset'),
326 statuses=toList(require.get('status')),
James E. Blair9c17dbf2014-06-23 14:21:58 -0700327 required_approvals=toList(require.get('approval')))
James E. Blair11041d22014-05-02 14:49:53 -0700328 manager.changeish_filters.append(f)
329
James E. Blair6c358e72013-07-29 17:06:47 -0700330 # TODO: move this into triggers (may require pluggable
331 # configuration)
332 if 'gerrit' in conf_pipeline['trigger']:
James E. Blair6c358e72013-07-29 17:06:47 -0700333 for trigger in toList(conf_pipeline['trigger']['gerrit']):
334 approvals = {}
335 for approval_dict in toList(trigger.get('approval')):
336 for k, v in approval_dict.items():
337 approvals[k] = v
James E. Blair1fbfceb2014-06-23 14:42:53 -0700338 # Backwards compat for *_filter versions of these args
339 comments = toList(trigger.get('comment'))
340 if not comments:
341 comments = toList(trigger.get('comment_filter'))
342 emails = toList(trigger.get('email'))
343 if not emails:
344 emails = toList(trigger.get('email_filter'))
345 usernames = toList(trigger.get('username'))
346 if not usernames:
347 usernames = toList(trigger.get('username_filter'))
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000348 f = EventFilter(
349 trigger=self.triggers['gerrit'],
350 types=toList(trigger['event']),
351 branches=toList(trigger.get('branch')),
352 refs=toList(trigger.get('ref')),
353 event_approvals=approvals,
354 comments=comments,
355 emails=emails,
356 usernames=usernames,
357 required_approvals=toList(
358 trigger.get('require-approval')
359 )
360 )
James E. Blair6c358e72013-07-29 17:06:47 -0700361 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700362 if 'timer' in conf_pipeline['trigger']:
James E. Blair63bb0ef2013-07-29 17:14:51 -0700363 for trigger in toList(conf_pipeline['trigger']['timer']):
James E. Blairc0dedf82014-08-06 09:37:52 -0700364 f = EventFilter(trigger=self.triggers['timer'],
365 types=['timer'],
James E. Blair63bb0ef2013-07-29 17:14:51 -0700366 timespecs=toList(trigger['time']))
367 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700368 if 'zuul' in conf_pipeline['trigger']:
369 for trigger in toList(conf_pipeline['trigger']['zuul']):
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000370 f = EventFilter(
371 trigger=self.triggers['zuul'],
372 types=toList(trigger['event']),
373 pipelines=toList(trigger.get('pipeline')),
374 required_approvals=toList(
375 trigger.get('require-approval')
376 )
377 )
James E. Blairc494d542014-08-06 09:23:52 -0700378 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700379
Antoine Musso80edd5a2013-02-13 15:37:53 +0100380 for project_template in data.get('project-templates', []):
381 # Make sure the template only contains valid pipelines
382 tpl = dict(
383 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400384 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100385 if pipe_name in project_template
386 )
James E. Blaireff88162013-07-01 12:44:14 -0400387 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100388
James E. Blair47958382013-01-10 17:26:02 -0800389 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400390 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700391 # Be careful to only set attributes explicitly present on
392 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800393 m = config_job.get('queue-name', None)
394 if m:
395 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700396 m = config_job.get('failure-message', None)
397 if m:
398 job.failure_message = m
399 m = config_job.get('success-message', None)
400 if m:
401 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800402 m = config_job.get('failure-pattern', None)
403 if m:
404 job.failure_pattern = m
405 m = config_job.get('success-pattern', None)
406 if m:
407 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700408 m = config_job.get('hold-following-changes', False)
409 if m:
410 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700411 m = config_job.get('voting', None)
412 if m is not None:
413 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700414 fname = config_job.get('parameter-function', None)
415 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400416 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700417 if not func:
418 raise Exception("Unable to find function %s" % fname)
419 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700420 branches = toList(config_job.get('branch'))
421 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700422 job._branches = branches
423 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800424 files = toList(config_job.get('files'))
425 if files:
426 job._files = files
427 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000428 skip_if_matcher = self._parseSkipIf(config_job)
429 if skip_if_matcher:
430 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100431 swift = toList(config_job.get('swift'))
432 if swift:
433 for s in swift:
434 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700435
436 def add_jobs(job_tree, config_jobs):
437 for job in config_jobs:
438 if isinstance(job, list):
439 for x in job:
440 add_jobs(job_tree, x)
441 if isinstance(job, dict):
442 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400443 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700444 add_jobs(parent_tree, children)
445 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400446 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700447
James E. Blair47958382013-01-10 17:26:02 -0800448 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700449 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800450 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100451
James E. Blair3e98c022013-12-16 15:25:38 -0800452 # This is reversed due to the prepend operation below, so
453 # the ultimate order is templates (in order) followed by
454 # statically defined jobs.
455 for requested_template in reversed(
456 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100457 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400458 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100459 requested_template.get('name'))
460 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800461 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100462 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800463 # Finally merge the expansion with whatever has been
464 # already defined for this project. Prepend our new
465 # jobs to existing ones (which may have been
466 # statically defined or defined by other templates).
467 for pipeline in layout.pipelines.values():
468 if pipeline.name in expanded:
469 config_project.update(
470 {pipeline.name: expanded[pipeline.name] +
471 config_project.get(pipeline.name, [])})
James E. Blair12a92b12014-03-26 11:54:53 -0700472 # TODO: future enhancement -- handle the case where
473 # duplicate jobs have different children and you want all
474 # of the children to run after a single run of the
475 # parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100476
James E. Blaireff88162013-07-01 12:44:14 -0400477 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700478 mode = config_project.get('merge-mode', 'merge-resolve')
479 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400480 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700481 if pipeline.name in config_project:
482 job_tree = pipeline.addProject(project)
483 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700484 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700485
James E. Blairb0954652012-06-01 11:32:01 -0700486 # All jobs should be defined at this point, get rid of
487 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700488 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700489
James E. Blaireff88162013-07-01 12:44:14 -0400490 for pipeline in layout.pipelines.values():
491 pipeline.manager._postConfig(layout)
492
493 return layout
James E. Blairee743612012-05-29 14:49:32 -0700494
James E. Blairee743612012-05-29 14:49:32 -0700495 def setLauncher(self, launcher):
496 self.launcher = launcher
497
James E. Blair4076e2b2014-01-28 12:42:20 -0800498 def setMerger(self, merger):
499 self.merger = merger
500
James E. Blair6c358e72013-07-29 17:06:47 -0700501 def registerTrigger(self, trigger, name=None):
502 if name is None:
503 name = trigger.name
504 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700505
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000506 def registerReporter(self, reporter, name=None):
507 if name is None:
508 name = reporter.name
509 self.reporters[name] = reporter
510
James E. Blaircdccd972013-07-01 12:10:22 -0700511 def getProject(self, name):
512 self.layout_lock.acquire()
513 p = None
514 try:
515 p = self.layout.projects.get(name)
516 finally:
517 self.layout_lock.release()
518 return p
519
James E. Blairee743612012-05-29 14:49:32 -0700520 def addEvent(self, event):
521 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800522 try:
523 if statsd:
524 statsd.incr('gerrit.event.%s' % event.type)
525 except:
526 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700527 self.trigger_event_queue.put(event)
528 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800529 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700530
James E. Blair11700c32012-07-05 17:50:05 -0700531 def onBuildStarted(self, build):
532 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800533 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800534 event = BuildStartedEvent(build)
535 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700536 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800537 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700538
James E. Blairee743612012-05-29 14:49:32 -0700539 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700540 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800541 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800542 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700543 if statsd and build.pipeline:
544 jobname = build.job.name.replace('.', '_')
545 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
546 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800547 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
548 dt = int((build.end_time - build.start_time) * 1000)
549 statsd.timing(key, dt)
550 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700551 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
552 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800553 except:
554 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800555 event = BuildCompletedEvent(build)
556 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700557 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800558 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700559
James E. Blair4076e2b2014-01-28 12:42:20 -0800560 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
561 self.log.debug("Adding merge complete event for build set: %s" %
562 build_set)
563 event = MergeCompletedEvent(build_set, zuul_url,
564 merged, updated, commit)
565 self.result_event_queue.put(event)
566 self.wake_event.set()
567
James E. Blaire9d45c32012-05-31 09:56:45 -0700568 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700569 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800570 event = ReconfigureEvent(config)
571 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700572 self.wake_event.set()
573 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800574 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700575 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400576 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700577
James E. Blair36658cf2013-12-06 17:53:48 -0800578 def promote(self, pipeline_name, change_ids):
579 event = PromoteEvent(pipeline_name, change_ids)
580 self.management_event_queue.put(event)
581 self.wake_event.set()
582 self.log.debug("Waiting for promotion")
583 event.wait()
584 self.log.debug("Promotion complete")
585
James E. Blaird27a96d2014-07-10 13:25:13 -0700586 def enqueue(self, trigger_event):
587 event = EnqueueEvent(trigger_event)
588 self.management_event_queue.put(event)
589 self.wake_event.set()
590 self.log.debug("Waiting for enqueue")
591 event.wait()
592 self.log.debug("Enqueue complete")
593
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700594 def exit(self):
595 self.log.debug("Prepare to exit")
596 self._pause = True
597 self._exit = True
598 self.wake_event.set()
599 self.log.debug("Waiting for exit")
600
601 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700602 if self.config.has_option('zuul', 'state_dir'):
603 state_dir = os.path.expanduser(self.config.get('zuul',
604 'state_dir'))
605 else:
606 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700607 return os.path.join(state_dir, 'queue.pickle')
608
609 def _save_queue(self):
610 pickle_file = self._get_queue_pickle_file()
611 events = []
612 while not self.trigger_event_queue.empty():
613 events.append(self.trigger_event_queue.get())
614 self.log.debug("Queue length is %s" % len(events))
615 if events:
616 self.log.debug("Saving queue")
617 pickle.dump(events, open(pickle_file, 'wb'))
618
619 def _load_queue(self):
620 pickle_file = self._get_queue_pickle_file()
621 if os.path.exists(pickle_file):
622 self.log.debug("Loading queue")
623 events = pickle.load(open(pickle_file, 'rb'))
624 self.log.debug("Queue length is %s" % len(events))
625 for event in events:
626 self.trigger_event_queue.put(event)
627 else:
628 self.log.debug("No queue file found")
629
630 def _delete_queue(self):
631 pickle_file = self._get_queue_pickle_file()
632 if os.path.exists(pickle_file):
633 self.log.debug("Deleting saved queue")
634 os.unlink(pickle_file)
635
636 def resume(self):
637 try:
638 self._load_queue()
639 except:
640 self.log.exception("Unable to load queue")
641 try:
642 self._delete_queue()
643 except:
644 self.log.exception("Unable to delete saved queue")
645 self.log.debug("Resuming queue processing")
646 self.wake_event.set()
647
648 def _doPauseEvent(self):
649 if self._exit:
650 self.log.debug("Exiting")
651 self._save_queue()
652 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700653
James E. Blair468c8512013-12-06 13:27:19 -0800654 def _doReconfigureEvent(self, event):
655 # This is called in the scheduler loop after another thread submits
656 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700657 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800658 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700659 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700660 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700661 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400662 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700663 for name, new_pipeline in layout.pipelines.items():
664 old_pipeline = self.layout.pipelines.get(name)
665 if not old_pipeline:
666 if self.layout.pipelines:
667 # Don't emit this warning on startup
668 self.log.warning("No old pipeline matching %s found "
669 "when reconfiguring" % name)
670 continue
James E. Blairdad52252014-02-07 16:59:17 -0800671 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700672 items_to_remove = []
James E. Blair6b077942014-02-07 17:45:55 -0800673 builds_to_remove = []
James E. Blaircdccd972013-07-01 12:10:22 -0700674 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700675 for item in shared_queue.queue:
James E. Blaircdccd972013-07-01 12:10:22 -0700676 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700677 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700678 item.pipeline = None
679 project = layout.projects.get(item.change.project.name)
680 if not project:
681 self.log.warning("Unable to find project for "
682 "change %s while reenqueueing" %
683 item.change)
684 item.change.project = None
685 items_to_remove.append(item)
686 continue
687 item.change.project = project
James E. Blairdad52252014-02-07 16:59:17 -0800688 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800689 job = layout.jobs.get(build.job.name)
690 if job:
691 build.job = job
692 else:
693 builds_to_remove.append(build)
James E. Blair972e3c72013-08-29 12:04:55 -0700694 if not new_pipeline.manager.reEnqueueItem(item):
James E. Blaircdccd972013-07-01 12:10:22 -0700695 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800696 for item in items_to_remove:
697 for build in item.current_build_set.getBuilds():
James E. Blaircdccd972013-07-01 12:10:22 -0700698 builds_to_remove.append(build)
James E. Blaircdccd972013-07-01 12:10:22 -0700699 for build in builds_to_remove:
James E. Blair6b077942014-02-07 17:45:55 -0800700 self.log.warning(
701 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800702 try:
703 self.launcher.cancel(build)
704 except Exception:
705 self.log.exception(
706 "Exception while canceling build %s "
707 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700708 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700709 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700710 for trigger in self.triggers.values():
711 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700712 if statsd:
713 try:
714 for pipeline in self.layout.pipelines.values():
715 items = len(pipeline.getAllItems())
716 # stats.gauges.zuul.pipeline.NAME.current_changes
717 key = 'zuul.pipeline.%s' % pipeline.name
718 statsd.gauge(key + '.current_changes', items)
719 except Exception:
720 self.log.exception("Exception reporting initial "
721 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700722 finally:
723 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700724
James E. Blair36658cf2013-12-06 17:53:48 -0800725 def _doPromoteEvent(self, event):
726 pipeline = self.layout.pipelines[event.pipeline_name]
727 change_ids = [c.split(',') for c in event.change_ids]
728 items_to_enqueue = []
729 change_queue = None
730 for shared_queue in pipeline.queues:
731 if change_queue:
732 break
733 for item in shared_queue.queue:
734 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000735 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800736 change_queue = shared_queue
737 break
738 if not change_queue:
739 raise Exception("Unable to find shared change queue for %s" %
740 event.change_ids[0])
741 for number, patchset in change_ids:
742 found = False
743 for item in change_queue.queue:
744 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000745 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800746 found = True
747 items_to_enqueue.append(item)
748 break
749 if not found:
750 raise Exception("Unable to find %s,%s in queue %s" %
751 (number, patchset, change_queue))
752 for item in change_queue.queue[:]:
753 if item not in items_to_enqueue:
754 items_to_enqueue.append(item)
755 pipeline.manager.cancelJobs(item)
756 pipeline.manager.dequeueItem(item)
757 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500758 pipeline.manager.addChange(
759 item.change,
760 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700761 quiet=True,
762 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800763
James E. Blaird27a96d2014-07-10 13:25:13 -0700764 def _doEnqueueEvent(self, event):
765 project = self.layout.projects.get(event.project_name)
766 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700767 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700768 self.log.debug("Event %s for change %s was directly assigned "
769 "to pipeline %s" % (event, change, self))
770 self.log.info("Adding %s, %s to %s" %
771 (project, change, pipeline))
772 pipeline.manager.addChange(change, ignore_requirements=True)
773
James E. Blaire9d45c32012-05-31 09:56:45 -0700774 def _areAllBuildsComplete(self):
775 self.log.debug("Checking if all builds are complete")
776 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800777 if self.merger.areMergesOutstanding():
778 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400779 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800780 for item in pipeline.getAllItems():
781 for build in item.current_build_set.getBuilds():
782 if build.result is None:
783 self.log.debug("%s waiting on %s" %
784 (pipeline.manager, build))
785 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700786 if not waiting:
787 self.log.debug("All builds are complete")
788 return True
789 self.log.debug("All builds are not complete")
790 return False
791
James E. Blairee743612012-05-29 14:49:32 -0700792 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800793 if statsd:
794 self.log.debug("Statsd enabled")
795 else:
796 self.log.debug("Statsd disabled because python statsd "
797 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700798 while True:
799 self.log.debug("Run handler sleeping")
800 self.wake_event.wait()
801 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700802 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800803 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700804 return
James E. Blairee743612012-05-29 14:49:32 -0700805 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800806 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700807 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800808 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800809 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700810
James E. Blair263fba92013-02-27 13:07:19 -0800811 # Give result events priority -- they let us stop builds,
812 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800813 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700814 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800815
816 if not self._pause:
817 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800818 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700819
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700820 if self._pause and self._areAllBuildsComplete():
821 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700822
James E. Blaira84f0e42014-02-06 07:09:22 -0800823 for pipeline in self.layout.pipelines.values():
824 while pipeline.manager.processQueue():
825 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700826
James E. Blaira84f0e42014-02-06 07:09:22 -0800827 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700828 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800829 # There may still be more events to process
830 self.wake_event.set()
831 finally:
832 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700833
James E. Blair0e933c52013-07-11 10:18:52 -0700834 def maintainTriggerCache(self):
835 relevant = set()
836 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700837 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700838 for item in pipeline.getAllItems():
839 relevant.add(item.change)
840 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700841 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700842 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700843 for trigger in self.triggers.values():
844 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700845
James E. Blairee743612012-05-29 14:49:32 -0700846 def process_event_queue(self):
847 self.log.debug("Fetching trigger event")
848 event = self.trigger_event_queue.get()
849 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800850 try:
851 project = self.layout.projects.get(event.project_name)
852 if not project:
Jukka Lehtniemibec3ed02014-10-31 13:45:17 +0100853 self.log.debug("Project %s not found" % event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800854 return
855
James E. Blaira84f0e42014-02-06 07:09:22 -0800856 for pipeline in self.layout.pipelines.values():
James E. Blairc0dedf82014-08-06 09:37:52 -0700857 change = pipeline.source.getChange(event, project)
James E. Blaira84f0e42014-02-06 07:09:22 -0800858 if event.type == 'patchset-created':
859 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100860 elif event.type == 'change-abandoned':
861 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800862 if pipeline.manager.eventMatches(event, change):
863 self.log.info("Adding %s, %s to %s" %
864 (project, change, pipeline))
865 pipeline.manager.addChange(change)
866 finally:
James E. Blairff791972013-01-09 11:45:43 -0800867 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700868
James E. Blair468c8512013-12-06 13:27:19 -0800869 def process_management_queue(self):
870 self.log.debug("Fetching management event")
871 event = self.management_event_queue.get()
872 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800873 try:
874 if isinstance(event, ReconfigureEvent):
875 self._doReconfigureEvent(event)
876 elif isinstance(event, PromoteEvent):
877 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700878 elif isinstance(event, EnqueueEvent):
879 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800880 else:
881 self.log.error("Unable to handle event %s" % event)
882 event.done()
883 except Exception as e:
884 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800885 self.management_event_queue.task_done()
886
James E. Blairee743612012-05-29 14:49:32 -0700887 def process_result_queue(self):
888 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800889 event = self.result_event_queue.get()
890 self.log.debug("Processing result event %s" % event)
891 try:
892 if isinstance(event, BuildStartedEvent):
893 self._doBuildStartedEvent(event)
894 elif isinstance(event, BuildCompletedEvent):
895 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800896 elif isinstance(event, MergeCompletedEvent):
897 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800898 else:
899 self.log.error("Unable to handle event %s" % event)
900 finally:
901 self.result_event_queue.task_done()
902
903 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800904 build = event.build
905 if build.build_set is not build.build_set.item.current_build_set:
906 self.log.warning("Build %s is not in the current build set" %
907 (build,))
908 return
909 pipeline = build.build_set.item.pipeline
910 if not pipeline:
911 self.log.warning("Build %s is not associated with a pipeline" %
912 (build,))
913 return
914 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800915
916 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800917 build = event.build
918 if build.build_set is not build.build_set.item.current_build_set:
919 self.log.warning("Build %s is not in the current build set" %
920 (build,))
921 return
922 pipeline = build.build_set.item.pipeline
923 if not pipeline:
924 self.log.warning("Build %s is not associated with a pipeline" %
925 (build,))
926 return
927 pipeline.manager.onBuildCompleted(event.build)
928
929 def _doMergeCompletedEvent(self, event):
930 build_set = event.build_set
931 if build_set is not build_set.item.current_build_set:
932 self.log.warning("Build set %s is not current" % (build_set,))
933 return
934 pipeline = build_set.item.pipeline
935 if not pipeline:
936 self.log.warning("Build set %s is not associated with a pipeline" %
937 (build_set,))
938 return
939 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700940
James E. Blair8dbd56a2012-12-22 10:55:10 -0800941 def formatStatusJSON(self):
942 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400943
944 data['zuul_version'] = self.zuul_version
945
James E. Blair8dbd56a2012-12-22 10:55:10 -0800946 if self._pause:
947 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800948 if self._exit:
949 ret += 'exit'
950 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
951 ret += '</p>'
952 data['message'] = ret
953
James E. Blairfb682cc2013-02-26 15:23:27 -0800954 data['trigger_event_queue'] = {}
955 data['trigger_event_queue']['length'] = \
956 self.trigger_event_queue.qsize()
957 data['result_event_queue'] = {}
958 data['result_event_queue']['length'] = \
959 self.result_event_queue.qsize()
960
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400961 if self.last_reconfigured:
962 data['last_reconfigured'] = self.last_reconfigured * 1000
963
James E. Blair8dbd56a2012-12-22 10:55:10 -0800964 pipelines = []
965 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -0700966 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -0800967 pipelines.append(pipeline.formatStatusJSON())
968 return json.dumps(data)
969
James E. Blair1e8dd892012-05-30 09:15:05 -0700970
James E. Blair4aea70c2012-07-26 14:23:24 -0700971class BasePipelineManager(object):
972 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700973
James E. Blair4aea70c2012-07-26 14:23:24 -0700974 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700975 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700976 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700977 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -0700978 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700979 if self.sched.config and self.sched.config.has_option(
980 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700981 self.report_times = self.sched.config.getboolean(
982 'zuul', 'report_times')
983 else:
984 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700985
986 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700987 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700988
James E. Blaireff88162013-07-01 12:44:14 -0400989 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700990 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -0700991 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -0700992 self.log.info(" Requirements:")
993 for f in self.changeish_filters:
994 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -0700995 self.log.info(" Events:")
996 for e in self.event_filters:
997 self.log.info(" %s" % e)
998 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700999
James E. Blairee743612012-05-29 14:49:32 -07001000 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001001 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001002 if tree.job:
1003 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001004 for b in tree.job._branches:
1005 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001006 for f in tree.job._files:
1007 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001008 if tree.job.skip_if_matcher:
1009 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001010 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001011 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -07001012 hold = ''
1013 if tree.job.hold_following_changes:
1014 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -07001015 voting = ''
1016 if not tree.job.voting:
1017 voting = ' [nonvoting]'
1018 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
1019 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -07001020 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001021 log_jobs(x, indent + 2)
1022
James E. Blaireff88162013-07-01 12:44:14 -04001023 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001024 tree = self.pipeline.getJobTree(p)
1025 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001026 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001027 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001028 self.log.info(" On start:")
1029 self.log.info(" %s" % self.pipeline.start_actions)
1030 self.log.info(" On success:")
1031 self.log.info(" %s" % self.pipeline.success_actions)
1032 self.log.info(" On failure:")
1033 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001034 self.log.info(" On merge-failure:")
1035 self.log.info(" %s" % self.pipeline.merge_failure_actions)
James E. Blairee743612012-05-29 14:49:32 -07001036
James E. Blaire421a232012-07-25 16:59:21 -07001037 def getSubmitAllowNeeds(self):
1038 # Get a list of code review labels that are allowed to be
1039 # "needed" in the submit records for a change, with respect
1040 # to this queue. In other words, the list of review labels
1041 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001042 allow_needs = set()
1043 for action_reporter in self.pipeline.success_actions:
1044 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1045 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001046
James E. Blairc053d022014-01-22 14:57:33 -08001047 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001048 if event.forced_pipeline:
1049 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001050 self.log.debug("Event %s for change %s was directly assigned "
1051 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001052 return True
1053 else:
1054 return False
James E. Blairee743612012-05-29 14:49:32 -07001055 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001056 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001057 self.log.debug("Event %s for change %s matched %s "
1058 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001059 return True
1060 return False
1061
James E. Blair0dc8ba92012-07-16 14:23:52 -07001062 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -07001063 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -07001064 if change.equals(c):
1065 return True
1066 return False
1067
James E. Blaire0487072012-08-29 17:38:31 -07001068 def reportStart(self, change):
1069 try:
1070 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001071 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -07001072 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -07001073 if self.sched.config.has_option('zuul', 'status_url'):
1074 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001075 ret = self.sendReport(self.pipeline.start_actions,
1076 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -07001077 if ret:
1078 self.log.error("Reporting change start %s received: %s" %
1079 (change, ret))
1080 except:
1081 self.log.exception("Exception while reporting start:")
1082
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001083 def sendReport(self, action_reporters, change, message):
1084 """Sends the built message off to configured reporters.
1085
1086 Takes the action_reporters, change, message and extra options and
1087 sends them to the pluggable reporters.
1088 """
1089 report_errors = []
1090 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001091 for action_reporter in action_reporters:
1092 ret = action_reporter.report(change, message)
1093 if ret:
1094 report_errors.append(ret)
1095 if len(report_errors) == 0:
1096 return
1097 return report_errors
1098
James E. Blaire0487072012-08-29 17:38:31 -07001099 def isChangeReadyToBeEnqueued(self, change):
1100 return True
1101
James E. Blairf9ab8842014-07-10 13:12:07 -07001102 def enqueueChangesAhead(self, change, quiet, ignore_requirements):
James E. Blaire0487072012-08-29 17:38:31 -07001103 return True
1104
James E. Blairf9ab8842014-07-10 13:12:07 -07001105 def enqueueChangesBehind(self, change, quiet, ignore_requirements):
James E. Blaire0487072012-08-29 17:38:31 -07001106 return True
1107
James E. Blairfee8d652013-06-07 08:57:52 -07001108 def checkForChangesNeededBy(self, change):
1109 return True
1110
James E. Blair972e3c72013-08-29 12:04:55 -07001111 def getFailingDependentItem(self, item):
1112 return None
1113
James E. Blairfee8d652013-06-07 08:57:52 -07001114 def getDependentItems(self, item):
1115 orig_item = item
1116 items = []
1117 while item.item_ahead:
1118 items.append(item.item_ahead)
1119 item = item.item_ahead
1120 self.log.info("Change %s depends on changes %s" %
1121 (orig_item.change,
1122 [x.change for x in items]))
1123 return items
1124
James E. Blair972e3c72013-08-29 12:04:55 -07001125 def getItemForChange(self, change):
1126 for item in self.pipeline.getAllItems():
1127 if item.change.equals(change):
1128 return item
1129 return None
1130
James E. Blair2fa50962013-01-30 21:50:41 -08001131 def findOldVersionOfChangeAlreadyInQueue(self, change):
1132 for c in self.pipeline.getChangesInQueue():
1133 if change.isUpdateOf(c):
1134 return c
1135 return None
1136
1137 def removeOldVersionsOfChange(self, change):
1138 if not self.pipeline.dequeue_on_new_patchset:
1139 return
1140 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
1141 if old_change:
1142 self.log.debug("Change %s is a new version of %s, removing %s" %
1143 (change, old_change, old_change))
1144 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -08001145
Antoine Mussobd86a312014-01-08 14:51:33 +01001146 def removeAbandonedChange(self, change):
1147 self.log.debug("Change %s abandoned, removing." % change)
1148 self.removeChange(change)
1149
James E. Blair972e3c72013-08-29 12:04:55 -07001150 def reEnqueueItem(self, item):
James E. Blaircdccd972013-07-01 12:10:22 -07001151 change_queue = self.pipeline.getQueue(item.change.project)
1152 if change_queue:
1153 self.log.debug("Re-enqueing change %s in queue %s" %
1154 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -07001155 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -07001156 self.reportStats(item)
1157 return True
1158 else:
1159 self.log.error("Unable to find change queue for project %s" %
1160 item.change.project)
1161 return False
1162
James E. Blairf9ab8842014-07-10 13:12:07 -07001163 def addChange(self, change, quiet=False, enqueue_time=None,
1164 ignore_requirements=False):
James E. Blaire0487072012-08-29 17:38:31 -07001165 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -07001166 if self.isChangeAlreadyInQueue(change):
1167 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001168 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001169
James E. Blaire0487072012-08-29 17:38:31 -07001170 if not self.isChangeReadyToBeEnqueued(change):
1171 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1172 change)
1173 return False
1174
James E. Blairf9ab8842014-07-10 13:12:07 -07001175 if not ignore_requirements:
1176 for f in self.changeish_filters:
1177 if not f.matches(change):
1178 self.log.debug("Change %s does not match pipeline "
1179 "requirement %s" % (change, f))
1180 return False
James E. Blair11041d22014-05-02 14:49:53 -07001181
James E. Blairf9ab8842014-07-10 13:12:07 -07001182 if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
James E. Blair1490eba2013-03-06 19:14:00 -08001183 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001184 return False
1185
1186 if self.isChangeAlreadyInQueue(change):
1187 self.log.debug("Change %s is already in queue, ignoring" % change)
1188 return True
1189
1190 change_queue = self.pipeline.getQueue(change.project)
1191 if change_queue:
1192 self.log.debug("Adding change %s to queue %s" %
1193 (change, change_queue))
James E. Blair36658cf2013-12-06 17:53:48 -08001194 if not quiet:
1195 if len(self.pipeline.start_actions) > 0:
1196 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -07001197 item = change_queue.enqueueChange(change)
Sean Daguef39b9ca2014-01-10 21:34:35 -05001198 if enqueue_time:
1199 item.enqueue_time = enqueue_time
James E. Blairfee8d652013-06-07 08:57:52 -07001200 self.reportStats(item)
James E. Blairf9ab8842014-07-10 13:12:07 -07001201 self.enqueueChangesBehind(change, quiet, ignore_requirements)
Joshua Hesketh29d99b72014-08-19 16:27:42 +10001202 self.sched.triggers['zuul'].onChangeEnqueued(item.change,
1203 self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001204 else:
1205 self.log.error("Unable to find change queue for project %s" %
1206 change.project)
1207 return False
James E. Blairee743612012-05-29 14:49:32 -07001208
James E. Blair972e3c72013-08-29 12:04:55 -07001209 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001210 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairfee8d652013-06-07 08:57:52 -07001211 change_queue = self.pipeline.getQueue(item.change.project)
1212 change_queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001213
1214 def removeChange(self, change):
1215 # Remove a change from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001216 # superseded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -07001217 for item in self.pipeline.getAllItems():
1218 if item.change == change:
1219 self.log.debug("Canceling builds behind change: %s "
1220 "because it is being removed." % item.change)
1221 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001222 self.dequeueItem(item)
James E. Blair94235562013-08-26 18:12:31 -07001223 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001224
James E. Blairac2c3242014-01-24 13:38:51 -08001225 def _makeMergerItem(self, item):
1226 # Create a dictionary with all info about the item needed by
1227 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001228 number = None
1229 patchset = None
1230 oldrev = None
1231 newrev = None
1232 if hasattr(item.change, 'number'):
1233 number = item.change.number
1234 patchset = item.change.patchset
1235 elif hasattr(item.change, 'newrev'):
1236 oldrev = item.change.oldrev
1237 newrev = item.change.newrev
James E. Blairac2c3242014-01-24 13:38:51 -08001238 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001239 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001240 item.change.project),
1241 merge_mode=item.change.project.merge_mode,
1242 refspec=item.change.refspec,
1243 branch=item.change.branch,
1244 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001245 number=number,
1246 patchset=patchset,
1247 oldrev=oldrev,
1248 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001249 )
1250
James E. Blairfee8d652013-06-07 08:57:52 -07001251 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001252 # Returns True if the ref is ready, false otherwise
1253 build_set = item.current_build_set
1254 if build_set.merge_state == build_set.COMPLETE:
1255 return True
1256 if build_set.merge_state == build_set.PENDING:
1257 return False
1258 build_set.merge_state = build_set.PENDING
1259 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001260 if hasattr(item.change, 'refspec') and not ref:
1261 self.log.debug("Preparing ref for: %s" % item.change)
1262 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001263 dependent_items = self.getDependentItems(item)
1264 dependent_items.reverse()
1265 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001266 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001267 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001268 item.current_build_set,
1269 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001270 else:
1271 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001272 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001273 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001274 url, build_set,
1275 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001276 return False
1277
1278 def _launchJobs(self, item, jobs):
1279 self.log.debug("Launching jobs for change %s" % item.change)
1280 dependent_items = self.getDependentItems(item)
1281 for job in jobs:
1282 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001283 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001284 build = self.sched.launcher.launch(job, item,
1285 self.pipeline,
1286 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001287 self.log.debug("Adding build %s of job %s to item %s" %
1288 (build, job, item))
1289 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001290 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001291 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001292 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001293
James E. Blairfee8d652013-06-07 08:57:52 -07001294 def launchJobs(self, item):
1295 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001296 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001297 self._launchJobs(item, jobs)
1298
1299 def cancelJobs(self, item, prime=True):
1300 self.log.debug("Cancel jobs for change %s" % item.change)
1301 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001302 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001303 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001304 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001305 for build in old_build_set.getBuilds():
1306 try:
1307 self.sched.launcher.cancel(build)
1308 except:
1309 self.log.exception("Exception while canceling build %s "
1310 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001311 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001312 canceled = True
James E. Blair972e3c72013-08-29 12:04:55 -07001313 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001314 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001315 (item_behind.change, item.change))
1316 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001317 canceled = True
1318 return canceled
1319
James E. Blair4076e2b2014-01-28 12:42:20 -08001320 def _processOneItem(self, item, nnfi, ready_ahead):
James E. Blairfee8d652013-06-07 08:57:52 -07001321 changed = False
1322 item_ahead = item.item_ahead
James E. Blair972e3c72013-08-29 12:04:55 -07001323 change_queue = self.pipeline.getQueue(item.change.project)
1324 failing_reasons = [] # Reasons this item is failing
1325
James E. Blairfee8d652013-06-07 08:57:52 -07001326 if self.checkForChangesNeededBy(item.change) is not True:
1327 # It's not okay to enqueue this change, we should remove it.
1328 self.log.info("Dequeuing change %s because "
1329 "it can no longer merge" % item.change)
1330 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001331 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001332 self.pipeline.setDequeuedNeedingChange(item)
1333 try:
1334 self.reportItem(item)
1335 except MergeFailure:
1336 pass
James E. Blair4076e2b2014-01-28 12:42:20 -08001337 return (True, nnfi, ready_ahead)
James E. Blair972e3c72013-08-29 12:04:55 -07001338 dep_item = self.getFailingDependentItem(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001339 actionable = change_queue.isActionable(item)
1340 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001341 ready = False
James E. Blair972e3c72013-08-29 12:04:55 -07001342 if dep_item:
1343 failing_reasons.append('a needed change is failing')
1344 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001345 else:
James E. Blairfef71632013-09-23 11:15:47 -07001346 item_ahead_merged = False
1347 if ((item_ahead and item_ahead.change.is_merged) or
1348 not change_queue.dependent):
1349 item_ahead_merged = True
1350 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001351 # Our current base is different than what we expected,
1352 # and it's not because our current base merged. Something
1353 # ahead must have failed.
1354 self.log.info("Resetting builds for change %s because the "
1355 "item ahead, %s, is not the nearest non-failing "
1356 "item, %s" % (item.change, item_ahead, nnfi))
1357 change_queue.moveItem(item, nnfi)
1358 changed = True
1359 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001360 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001361 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001362 if item.current_build_set.unable_to_merge:
1363 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001364 ready = False
1365 if not ready:
1366 ready_ahead = False
1367 if actionable and ready_ahead and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001368 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001369 if self.pipeline.didAnyJobFail(item):
1370 failing_reasons.append("at least one job failed")
1371 if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
1372 try:
1373 self.reportItem(item)
1374 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001375 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001376 for item_behind in item.items_behind:
1377 self.log.info("Resetting builds for change %s because the "
1378 "item ahead, %s, failed to merge" %
1379 (item_behind.change, item))
1380 self.cancelJobs(item_behind)
1381 self.dequeueItem(item)
1382 changed = True
1383 elif not failing_reasons:
1384 nnfi = item
1385 item.current_build_set.failing_reasons = failing_reasons
1386 if failing_reasons:
1387 self.log.debug("%s is a failing item because %s" %
1388 (item, failing_reasons))
James E. Blair4076e2b2014-01-28 12:42:20 -08001389 return (changed, nnfi, ready_ahead)
James E. Blairfee8d652013-06-07 08:57:52 -07001390
1391 def processQueue(self):
1392 # Do whatever needs to be done for each change in the queue
1393 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1394 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001395 for queue in self.pipeline.queues:
1396 queue_changed = False
1397 nnfi = None # Nearest non-failing item
James E. Blair4076e2b2014-01-28 12:42:20 -08001398 ready_ahead = True # All build sets ahead are ready
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001399 for item in queue.queue[:]:
James E. Blair4076e2b2014-01-28 12:42:20 -08001400 item_changed, nnfi, ready_ahhead = self._processOneItem(
1401 item, nnfi, ready_ahead)
James E. Blair972e3c72013-08-29 12:04:55 -07001402 if item_changed:
1403 queue_changed = True
1404 self.reportStats(item)
1405 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001406 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001407 status = ''
1408 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001409 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001410 if status:
1411 self.log.debug("Queue %s status is now:\n %s" %
1412 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001413 self.log.debug("Finished queue processor: %s (changed: %s)" %
1414 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001415 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001416
James E. Blair11700c32012-07-05 17:50:05 -07001417 def updateBuildDescriptions(self, build_set):
1418 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001419 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001420 self.sched.launcher.setBuildDescription(build, desc)
1421
1422 if build_set.previous_build_set:
1423 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001424 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001425 self.sched.launcher.setBuildDescription(build, desc)
1426
1427 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001428 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001429 self.updateBuildDescriptions(build.build_set)
1430 return True
1431
James E. Blairee743612012-05-29 14:49:32 -07001432 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001433 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001434 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001435
James E. Blair6b077942014-02-07 17:45:55 -08001436 self.pipeline.setResult(item, build)
1437 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001438 (item, item.formatStatus()))
James E. Blair11700c32012-07-05 17:50:05 -07001439 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -07001440 return True
1441
James E. Blair4076e2b2014-01-28 12:42:20 -08001442 def onMergeCompleted(self, event):
1443 build_set = event.build_set
1444 item = build_set.item
1445 build_set.merge_state = build_set.COMPLETE
1446 build_set.zuul_url = event.zuul_url
1447 if event.merged:
1448 build_set.commit = event.commit
1449 elif event.updated:
1450 build_set.commit = item.change.newrev
1451 if not build_set.commit:
1452 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001453 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001454
James E. Blairfee8d652013-06-07 08:57:52 -07001455 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001456 if not item.reported:
1457 # _reportItem() returns True if it failed to report.
1458 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001459 if self.changes_merge:
1460 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001461 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001462 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001463 merged = self.pipeline.source.isMerged(item.change,
1464 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001465 self.log.info("Reported change %s status: all-succeeded: %s, "
1466 "merged: %s" % (item.change, succeeded, merged))
James E. Blair4a035d92014-01-23 13:10:48 -08001467 change_queue = self.pipeline.getQueue(item.change.project)
James E. Blairfee8d652013-06-07 08:57:52 -07001468 if not (succeeded and merged):
1469 self.log.debug("Reported change %s failed tests or failed "
1470 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001471 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001472 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001473 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001474 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001475 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001476 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001477 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001478 (change_queue, change_queue.window))
James E. Blairc494d542014-08-06 09:23:52 -07001479 self.sched.triggers['zuul'].onChangeMerged(item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001480
James E. Blairfee8d652013-06-07 08:57:52 -07001481 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001482 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001483 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001484 if self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001485 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001486 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001487 item.setReportedResult('SUCCESS')
Joshua Heskethb7179772014-01-30 23:30:46 +11001488 elif not self.pipeline.didMergerSucceed(item):
1489 actions = self.pipeline.merge_failure_actions
1490 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001491 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001492 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001493 item.setReportedResult('FAILURE')
James E. Blaire5910202013-12-27 09:50:31 -08001494 if actions:
1495 report = self.formatReport(item)
1496 try:
1497 self.log.info("Reporting change %s, actions: %s" %
1498 (item.change, actions))
1499 ret = self.sendReport(actions, item.change, report)
1500 if ret:
1501 self.log.error("Reporting change %s received: %s" %
1502 (item.change, ret))
1503 except:
1504 self.log.exception("Exception while reporting:")
1505 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001506 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001507 return ret
1508
James E. Blairfee8d652013-06-07 08:57:52 -07001509 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001510 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001511
Jeremy Stanley10837132014-08-02 16:10:56 +00001512 if item.dequeued_needing_change:
1513 ret += 'This change depends on a change that failed to merge.\n'
1514 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001515 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001516 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001517 if self.pipeline.didAllJobsSucceed(item):
1518 ret += self.pipeline.success_message + '\n\n'
1519 else:
1520 ret += self.pipeline.failure_message + '\n\n'
1521 ret += self._formatReportJobs(item)
1522
1523 if self.pipeline.footer_message:
1524 ret += '\n' + self.pipeline.footer_message
1525
1526 return ret
1527
1528 def _formatReportJobs(self, item):
1529 # Return the list of jobs portion of the report
1530 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001531
Joshua Heskethb7179772014-01-30 23:30:46 +11001532 if self.sched.config.has_option('zuul', 'url_pattern'):
1533 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001534 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001535 url_pattern = None
1536
1537 for job in self.pipeline.getJobs(item.change):
1538 build = item.current_build_set.getBuild(job.name)
1539 result = build.result
1540 pattern = url_pattern
1541 if result == 'SUCCESS':
1542 if job.success_message:
1543 result = job.success_message
1544 if job.success_pattern:
1545 pattern = job.success_pattern
1546 elif result == 'FAILURE':
1547 if job.failure_message:
1548 result = job.failure_message
1549 if job.failure_pattern:
1550 pattern = job.failure_pattern
1551 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001552 url = pattern.format(change=item.change,
1553 pipeline=self.pipeline,
1554 job=job,
1555 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001556 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001557 url = build.url or job.name
1558 if not job.voting:
1559 voting = ' (non-voting)'
1560 else:
1561 voting = ''
1562 if self.report_times and build.end_time and build.start_time:
1563 dt = int(build.end_time - build.start_time)
1564 m, s = divmod(dt, 60)
1565 h, m = divmod(m, 60)
1566 if h:
1567 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1568 elif m:
1569 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001570 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001571 elapsed = ' in %ds' % (s)
1572 else:
1573 elapsed = ''
1574 name = ''
1575 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1576 if self.sched.config.getboolean('zuul',
1577 'job_name_in_report'):
1578 name = job.name + ' '
1579 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1580 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001581 return ret
1582
1583 def formatDescription(self, build):
1584 concurrent_changes = ''
1585 concurrent_builds = ''
1586 other_builds = ''
1587
1588 for change in build.build_set.other_changes:
1589 concurrent_changes += '<li><a href="{change.url}">\
1590 {change.number},{change.patchset}</a></li>'.format(
1591 change=change)
1592
James E. Blairfee8d652013-06-07 08:57:52 -07001593 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001594
1595 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001596 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001597 concurrent_builds += """\
1598<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001599 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001600 {build.job.name} #{build.number}</a>: {build.result}
1601</li>
1602""".format(build=build)
1603 else:
1604 concurrent_builds += """\
1605<li>
1606 {build.job.name}: {build.result}
1607</li>""".format(build=build)
1608
1609 if build.build_set.previous_build_set:
1610 other_build = build.build_set.previous_build_set.getBuild(
1611 build.job.name)
1612 if other_build:
1613 other_builds += """\
1614<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001615 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001616 {build.job.name} #{build.number}</a>
1617</li>
1618""".format(build=other_build)
1619
1620 if build.build_set.next_build_set:
1621 other_build = build.build_set.next_build_set.getBuild(
1622 build.job.name)
1623 if other_build:
1624 other_builds += """\
1625<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001626 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001627 {build.job.name} #{build.number}</a>
1628</li>
1629""".format(build=other_build)
1630
1631 result = build.build_set.result
1632
1633 if hasattr(change, 'number'):
1634 ret = """\
1635<p>
1636 Triggered by change:
1637 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1638 Branch: <b>{change.branch}</b><br/>
1639 Pipeline: <b>{self.pipeline.name}</b>
1640</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001641 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001642 ret = """\
1643<p>
1644 Triggered by reference:
1645 {change.ref}</a><br/>
1646 Old revision: <b>{change.oldrev}</b><br/>
1647 New revision: <b>{change.newrev}</b><br/>
1648 Pipeline: <b>{self.pipeline.name}</b>
1649</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001650 else:
1651 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001652
1653 if concurrent_changes:
1654 ret += """\
1655<p>
1656 Other changes tested concurrently with this change:
1657 <ul>{concurrent_changes}</ul>
1658</p>
1659"""
1660 if concurrent_builds:
1661 ret += """\
1662<p>
1663 All builds for this change set:
1664 <ul>{concurrent_builds}</ul>
1665</p>
1666"""
1667
1668 if other_builds:
1669 ret += """\
1670<p>
1671 Other build sets for this change:
1672 <ul>{other_builds}</ul>
1673</p>
1674"""
1675 if result:
1676 ret += """\
1677<p>
1678 Reported result: <b>{result}</b>
1679</p>
1680"""
1681
1682 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001683 return ret
1684
James E. Blairfee8d652013-06-07 08:57:52 -07001685 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001686 if not statsd:
1687 return
1688 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001689 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001690 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001691 if item.dequeue_time:
1692 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001693 else:
1694 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001695 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001696
1697 # stats.timers.zuul.pipeline.NAME.resident_time
1698 # stats_counts.zuul.pipeline.NAME.total_changes
1699 # stats.gauges.zuul.pipeline.NAME.current_changes
1700 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001701 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001702 if dt:
1703 statsd.timing(key + '.resident_time', dt)
1704 statsd.incr(key + '.total_changes')
1705
1706 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1707 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001708 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001709 key += '.%s' % project_name
1710 if dt:
1711 statsd.timing(key + '.resident_time', dt)
1712 statsd.incr(key + '.total_changes')
1713 except:
1714 self.log.exception("Exception reporting pipeline stats")
1715
James E. Blair1e8dd892012-05-30 09:15:05 -07001716
James E. Blair4aea70c2012-07-26 14:23:24 -07001717class IndependentPipelineManager(BasePipelineManager):
1718 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001719 changes_merge = False
1720
James E. Blaireff88162013-07-01 12:44:14 -04001721 def _postConfig(self, layout):
1722 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001723
1724 change_queue = ChangeQueue(self.pipeline, dependent=False)
1725 for project in self.pipeline.getProjects():
1726 change_queue.addProject(project)
1727
1728 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001729
James E. Blair1e8dd892012-05-30 09:15:05 -07001730
James E. Blair4aea70c2012-07-26 14:23:24 -07001731class DependentPipelineManager(BasePipelineManager):
1732 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001733 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001734
1735 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001736 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001737
James E. Blaireff88162013-07-01 12:44:14 -04001738 def _postConfig(self, layout):
1739 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001740 self.buildChangeQueues()
1741
1742 def buildChangeQueues(self):
1743 self.log.debug("Building shared change queues")
1744 change_queues = []
1745
James E. Blair4aea70c2012-07-26 14:23:24 -07001746 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001747 change_queue = ChangeQueue(
1748 self.pipeline,
1749 window=self.pipeline.window,
1750 window_floor=self.pipeline.window_floor,
1751 window_increase_type=self.pipeline.window_increase_type,
1752 window_increase_factor=self.pipeline.window_increase_factor,
1753 window_decrease_type=self.pipeline.window_decrease_type,
1754 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001755 change_queue.addProject(project)
1756 change_queues.append(change_queue)
1757 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001758
James E. Blairc3d428e2013-12-03 15:06:48 -08001759 # Iterate over all queues trying to combine them, and keep doing
1760 # so until they can not be combined further.
1761 last_change_queues = change_queues
1762 while True:
1763 new_change_queues = self.combineChangeQueues(last_change_queues)
1764 if len(last_change_queues) == len(new_change_queues):
1765 break
1766 last_change_queues = new_change_queues
1767
1768 self.log.info(" Shared change queues:")
1769 for queue in new_change_queues:
1770 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08001771 self.log.info(" %s containing %s" % (
1772 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08001773
1774 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001775 self.log.debug("Combining shared queues")
1776 new_change_queues = []
1777 for a in change_queues:
1778 merged_a = False
1779 for b in new_change_queues:
1780 if not a.getJobs().isdisjoint(b.getJobs()):
1781 self.log.debug("Merging queue %s into %s" % (a, b))
1782 b.mergeChangeQueue(a)
1783 merged_a = True
1784 break # this breaks out of 'for b' and continues 'for a'
1785 if not merged_a:
1786 self.log.debug("Keeping queue %s" % (a))
1787 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001788 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001789
James E. Blaire0487072012-08-29 17:38:31 -07001790 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07001791 if not self.pipeline.source.canMerge(change,
1792 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001793 self.log.debug("Change %s can not merge, ignoring" % change)
1794 return False
1795 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001796
James E. Blairf9ab8842014-07-10 13:12:07 -07001797 def enqueueChangesBehind(self, change, quiet, ignore_requirements):
James E. Blaire0487072012-08-29 17:38:31 -07001798 to_enqueue = []
1799 self.log.debug("Checking for changes needing %s:" % change)
1800 if not hasattr(change, 'needed_by_changes'):
1801 self.log.debug(" Changeish does not support dependencies")
1802 return
1803 for needs in change.needed_by_changes:
James E. Blairc0dedf82014-08-06 09:37:52 -07001804 if self.pipeline.source.canMerge(needs,
1805 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001806 self.log.debug(" Change %s needs %s and is ready to merge" %
1807 (needs, change))
1808 to_enqueue.append(needs)
1809 if not to_enqueue:
1810 self.log.debug(" No changes need %s" % change)
1811
1812 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07001813 self.addChange(other_change, quiet=quiet,
1814 ignore_requirements=ignore_requirements)
James E. Blaire0487072012-08-29 17:38:31 -07001815
James E. Blairf9ab8842014-07-10 13:12:07 -07001816 def enqueueChangesAhead(self, change, quiet, ignore_requirements):
James E. Blaire0487072012-08-29 17:38:31 -07001817 ret = self.checkForChangesNeededBy(change)
1818 if ret in [True, False]:
1819 return ret
1820 self.log.debug(" Change %s must be merged ahead of %s" %
1821 (ret, change))
James E. Blairf9ab8842014-07-10 13:12:07 -07001822 return self.addChange(ret, quiet=quiet,
1823 ignore_requirements=ignore_requirements)
James E. Blaire0487072012-08-29 17:38:31 -07001824
1825 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001826 self.log.debug("Checking for changes needed by %s:" % change)
1827 # Return true if okay to proceed enqueing this change,
1828 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001829 if not hasattr(change, 'needs_change'):
1830 self.log.debug(" Changeish does not support dependencies")
1831 return True
James E. Blaire421a232012-07-25 16:59:21 -07001832 if not change.needs_change:
1833 self.log.debug(" No changes needed")
1834 return True
1835 if change.needs_change.is_merged:
1836 self.log.debug(" Needed change is merged")
1837 return True
1838 if not change.needs_change.is_current_patchset:
1839 self.log.debug(" Needed change is not the current patchset")
1840 return False
James E. Blair127bc182012-08-28 15:55:15 -07001841 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001842 self.log.debug(" Needed change is already ahead in the queue")
1843 return True
James E. Blairc0dedf82014-08-06 09:37:52 -07001844 if self.pipeline.source.canMerge(change.needs_change,
1845 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001846 self.log.debug(" Change %s is needed" %
1847 change.needs_change)
1848 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001849 # The needed change can't be merged.
1850 self.log.debug(" Change %s is needed but can not be merged" %
1851 change.needs_change)
1852 return False
James E. Blair972e3c72013-08-29 12:04:55 -07001853
1854 def getFailingDependentItem(self, item):
1855 if not hasattr(item.change, 'needs_change'):
1856 return None
1857 if not item.change.needs_change:
1858 return None
1859 needs_item = self.getItemForChange(item.change.needs_change)
1860 if not needs_item:
1861 return None
1862 if needs_item.current_build_set.failing_reasons:
1863 return needs_item
1864 return None