blob: 2be98ea4c37fb0bcaa42f274a7122082d6558d34 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
33from model import EventFilter, ChangeishFilter
Maru Newby3fe5f852015-01-13 04:22:14 +000034from zuul import change_matcher
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairfee8d652013-06-07 08:57:52 -070062class MergeFailure(Exception):
63 pass
64
65
James E. Blair468c8512013-12-06 13:27:19 -080066class ManagementEvent(object):
67 """An event that should be processed within the main queue run loop"""
68 def __init__(self):
69 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080070 self._exception = None
71 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080072
James E. Blair36658cf2013-12-06 17:53:48 -080073 def exception(self, e, tb):
74 self._exception = e
75 self._traceback = tb
76 self._wait_event.set()
77
78 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080079 self._wait_event.set()
80
81 def wait(self, timeout=None):
82 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080083 if self._exception:
84 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080085 return self._wait_event.is_set()
86
87
88class ReconfigureEvent(ManagementEvent):
89 """Reconfigure the scheduler. The layout will be (re-)loaded from
90 the path specified in the configuration.
91
92 :arg ConfigParser config: the new configuration
93 """
94 def __init__(self, config):
95 super(ReconfigureEvent, self).__init__()
96 self.config = config
97
98
James E. Blair36658cf2013-12-06 17:53:48 -080099class PromoteEvent(ManagementEvent):
100 """Promote one or more changes to the head of the queue.
101
102 :arg str pipeline_name: the name of the pipeline
103 :arg list change_ids: a list of strings of change ids in the form
104 1234,1
105 """
106
107 def __init__(self, pipeline_name, change_ids):
108 super(PromoteEvent, self).__init__()
109 self.pipeline_name = pipeline_name
110 self.change_ids = change_ids
111
112
James E. Blaird27a96d2014-07-10 13:25:13 -0700113class EnqueueEvent(ManagementEvent):
114 """Enqueue a change into a pipeline
115
116 :arg TriggerEvent trigger_event: a TriggerEvent describing the
117 trigger, pipeline, and change to enqueue
118 """
119
120 def __init__(self, trigger_event):
121 super(EnqueueEvent, self).__init__()
122 self.trigger_event = trigger_event
123
124
James E. Blaira84f0e42014-02-06 07:09:22 -0800125class ResultEvent(object):
126 """An event that needs to modify the pipeline state due to a
127 result from an external system."""
128
129 pass
130
131
132class BuildStartedEvent(ResultEvent):
133 """A build has started.
134
135 :arg Build build: The build which has started.
136 """
137
138 def __init__(self, build):
139 self.build = build
140
141
142class BuildCompletedEvent(ResultEvent):
143 """A build has completed
144
145 :arg Build build: The build which has completed.
146 """
147
148 def __init__(self, build):
149 self.build = build
150
151
James E. Blair4076e2b2014-01-28 12:42:20 -0800152class MergeCompletedEvent(ResultEvent):
153 """A remote merge operation has completed
154
155 :arg BuildSet build_set: The build_set which is ready.
156 :arg str zuul_url: The URL of the Zuul Merger.
157 :arg bool merged: Whether the merge succeeded (changes with refs).
158 :arg bool updated: Whether the repo was updated (changes without refs).
159 :arg str commit: The SHA of the merged commit (changes with refs).
160 """
161
162 def __init__(self, build_set, zuul_url, merged, updated, commit):
163 self.build_set = build_set
164 self.zuul_url = zuul_url
165 self.merged = merged
166 self.updated = updated
167 self.commit = commit
168
169
Maru Newby3fe5f852015-01-13 04:22:14 +0000170def toList(item):
171 if not item:
172 return []
173 if isinstance(item, list):
174 return item
175 return [item]
176
177
James E. Blaire9d45c32012-05-31 09:56:45 -0700178class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700179 log = logging.getLogger("zuul.Scheduler")
180
James E. Blaire9d45c32012-05-31 09:56:45 -0700181 def __init__(self):
182 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400183 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700184 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700185 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800186 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700187 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700188 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700189 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700190 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800191 self.merger = None
James E. Blair6c358e72013-07-29 17:06:47 -0700192 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000193 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700194 self.config = None
James E. Blairee743612012-05-29 14:49:32 -0700195
196 self.trigger_event_queue = Queue.Queue()
197 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800198 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400199 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700200
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000201 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400202 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400203
James E. Blairb0fcae42012-07-17 11:12:10 -0700204 def stop(self):
205 self._stopped = True
206 self.wake_event.set()
207
James E. Blair47958382013-01-10 17:26:02 -0800208 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700209 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800210
Maru Newby3fe5f852015-01-13 04:22:14 +0000211 def _parseSkipIf(self, config_job):
212 cm = change_matcher
213 skip_matchers = []
214
215 for config_skip in config_job.get('skip-if', []):
216 nested_matchers = []
217
218 project_regex = config_skip.get('project')
219 if project_regex:
220 nested_matchers.append(cm.ProjectMatcher(project_regex))
221
222 branch_regex = config_skip.get('branch')
223 if branch_regex:
224 nested_matchers.append(cm.BranchMatcher(branch_regex))
225
226 file_regexes = toList(config_skip.get('all-files-match-any'))
227 if file_regexes:
228 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
229 all_files_matcher = cm.MatchAllFiles(file_matchers)
230 nested_matchers.append(all_files_matcher)
231
232 # All patterns need to match a given skip-if predicate
233 skip_matchers.append(cm.MatchAll(nested_matchers))
234
235 if skip_matchers:
236 # Any skip-if predicate can be matched to trigger a skip
237 return cm.MatchAny(skip_matchers)
238
James E. Blaire5a847f2012-07-10 15:29:14 -0700239 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400240 layout = model.Layout()
241 project_templates = {}
242
James E. Blaire5a847f2012-07-10 15:29:14 -0700243 if config_path:
244 config_path = os.path.expanduser(config_path)
245 if not os.path.exists(config_path):
246 raise Exception("Unable to read layout config file at %s" %
247 config_path)
248 config_file = open(config_path)
249 data = yaml.load(config_file)
250
James E. Blair47958382013-01-10 17:26:02 -0800251 validator = layoutvalidator.LayoutValidator()
252 validator.validate(data)
253
James E. Blaireff88162013-07-01 12:44:14 -0400254 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700255 for include in data.get('includes', []):
256 if 'python-file' in include:
257 fn = include['python-file']
258 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100259 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700260 fn = os.path.join(base, fn)
261 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400262 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700263
James E. Blair4aea70c2012-07-26 14:23:24 -0700264 for conf_pipeline in data.get('pipelines', []):
265 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800266 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700267 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000268 pipeline.source = self.triggers[conf_pipeline.get('source',
269 'gerrit')]
James E. Blair64ed6f22013-07-10 14:07:23 -0700270 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
271 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800272 pipeline.failure_message = conf_pipeline.get('failure-message',
273 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100274 pipeline.merge_failure_message = conf_pipeline.get(
Jeremy Stanley1c2c3c22015-06-15 21:23:19 +0000275 'merge-failure-message', "Merge Failed.\n\nThis change or one "
276 "of its cross-repo dependencies was unable to be "
277 "automatically merged with the current state of its "
278 "repository. Please rebase the change and upload a new "
Joshua Heskethb7179772014-01-30 23:30:46 +1100279 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800280 pipeline.success_message = conf_pipeline.get('success-message',
281 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100282 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800283 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700284 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800285 pipeline.ignore_dependencies = conf_pipeline.get(
286 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000287
288 action_reporters = {}
Joshua Heskethb7179772014-01-30 23:30:46 +1100289 for action in ['start', 'success', 'failure', 'merge-failure']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000290 action_reporters[action] = []
291 if conf_pipeline.get(action):
292 for reporter_name, params \
293 in conf_pipeline.get(action).items():
294 if reporter_name in self.reporters.keys():
295 action_reporters[action].append(ActionReporter(
296 self.reporters[reporter_name], params))
297 else:
298 self.log.error('Invalid reporter name %s' %
299 reporter_name)
300 pipeline.start_actions = action_reporters['start']
301 pipeline.success_actions = action_reporters['success']
302 pipeline.failure_actions = action_reporters['failure']
Joshua Heskethb7179772014-01-30 23:30:46 +1100303 if len(action_reporters['merge-failure']) > 0:
304 pipeline.merge_failure_actions = \
305 action_reporters['merge-failure']
306 else:
307 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000308
Clark Boylan7603a372014-01-21 11:43:20 -0800309 pipeline.window = conf_pipeline.get('window', 20)
310 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
311 pipeline.window_increase_type = conf_pipeline.get(
312 'window-increase-type', 'linear')
313 pipeline.window_increase_factor = conf_pipeline.get(
314 'window-increase-factor', 1)
315 pipeline.window_decrease_type = conf_pipeline.get(
316 'window-decrease-type', 'exponential')
317 pipeline.window_decrease_factor = conf_pipeline.get(
318 'window-decrease-factor', 2)
319
James E. Blair4aea70c2012-07-26 14:23:24 -0700320 manager = globals()[conf_pipeline['manager']](self, pipeline)
321 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400322 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000323
James E. Blair11041d22014-05-02 14:49:53 -0700324 if 'require' in conf_pipeline:
325 require = conf_pipeline['require']
Clark Boylana9702ad2014-05-08 17:17:24 -0700326 f = ChangeishFilter(
327 open=require.get('open'),
328 current_patchset=require.get('current-patchset'),
329 statuses=toList(require.get('status')),
James E. Blair5bf78a32015-07-30 18:08:24 +0000330 required_approvals=toList(require.get('approval')))
James E. Blair11041d22014-05-02 14:49:53 -0700331 manager.changeish_filters.append(f)
332
James E. Blair6c358e72013-07-29 17:06:47 -0700333 # TODO: move this into triggers (may require pluggable
334 # configuration)
335 if 'gerrit' in conf_pipeline['trigger']:
James E. Blair6c358e72013-07-29 17:06:47 -0700336 for trigger in toList(conf_pipeline['trigger']['gerrit']):
337 approvals = {}
338 for approval_dict in toList(trigger.get('approval')):
339 for k, v in approval_dict.items():
340 approvals[k] = v
James E. Blair1fbfceb2014-06-23 14:42:53 -0700341 # Backwards compat for *_filter versions of these args
342 comments = toList(trigger.get('comment'))
343 if not comments:
344 comments = toList(trigger.get('comment_filter'))
345 emails = toList(trigger.get('email'))
346 if not emails:
347 emails = toList(trigger.get('email_filter'))
348 usernames = toList(trigger.get('username'))
349 if not usernames:
350 usernames = toList(trigger.get('username_filter'))
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000351 f = EventFilter(
352 trigger=self.triggers['gerrit'],
353 types=toList(trigger['event']),
354 branches=toList(trigger.get('branch')),
355 refs=toList(trigger.get('ref')),
356 event_approvals=approvals,
357 comments=comments,
358 emails=emails,
359 usernames=usernames,
James E. Blair5bf78a32015-07-30 18:08:24 +0000360 required_approvals=toList(
361 trigger.get('require-approval')
362 )
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000363 )
James E. Blair6c358e72013-07-29 17:06:47 -0700364 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700365 if 'timer' in conf_pipeline['trigger']:
James E. Blair63bb0ef2013-07-29 17:14:51 -0700366 for trigger in toList(conf_pipeline['trigger']['timer']):
James E. Blairc0dedf82014-08-06 09:37:52 -0700367 f = EventFilter(trigger=self.triggers['timer'],
368 types=['timer'],
James E. Blair63bb0ef2013-07-29 17:14:51 -0700369 timespecs=toList(trigger['time']))
370 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700371 if 'zuul' in conf_pipeline['trigger']:
372 for trigger in toList(conf_pipeline['trigger']['zuul']):
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000373 f = EventFilter(
374 trigger=self.triggers['zuul'],
375 types=toList(trigger['event']),
376 pipelines=toList(trigger.get('pipeline')),
James E. Blair5bf78a32015-07-30 18:08:24 +0000377 required_approvals=toList(
378 trigger.get('require-approval')
379 )
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000380 )
James E. Blairc494d542014-08-06 09:23:52 -0700381 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700382
Antoine Musso80edd5a2013-02-13 15:37:53 +0100383 for project_template in data.get('project-templates', []):
384 # Make sure the template only contains valid pipelines
385 tpl = dict(
386 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400387 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100388 if pipe_name in project_template
389 )
James E. Blaireff88162013-07-01 12:44:14 -0400390 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100391
James E. Blair47958382013-01-10 17:26:02 -0800392 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400393 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700394 # Be careful to only set attributes explicitly present on
395 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800396 m = config_job.get('queue-name', None)
397 if m:
398 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700399 m = config_job.get('failure-message', None)
400 if m:
401 job.failure_message = m
402 m = config_job.get('success-message', None)
403 if m:
404 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800405 m = config_job.get('failure-pattern', None)
406 if m:
407 job.failure_pattern = m
408 m = config_job.get('success-pattern', None)
409 if m:
410 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700411 m = config_job.get('hold-following-changes', False)
412 if m:
413 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700414 m = config_job.get('voting', None)
415 if m is not None:
416 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700417 fname = config_job.get('parameter-function', None)
418 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400419 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700420 if not func:
421 raise Exception("Unable to find function %s" % fname)
422 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700423 branches = toList(config_job.get('branch'))
424 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700425 job._branches = branches
426 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800427 files = toList(config_job.get('files'))
428 if files:
429 job._files = files
430 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000431 skip_if_matcher = self._parseSkipIf(config_job)
432 if skip_if_matcher:
433 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100434 swift = toList(config_job.get('swift'))
435 if swift:
436 for s in swift:
437 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700438
439 def add_jobs(job_tree, config_jobs):
440 for job in config_jobs:
441 if isinstance(job, list):
442 for x in job:
443 add_jobs(job_tree, x)
444 if isinstance(job, dict):
445 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400446 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700447 add_jobs(parent_tree, children)
448 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400449 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700450
James E. Blair47958382013-01-10 17:26:02 -0800451 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700452 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800453 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100454
James E. Blair3e98c022013-12-16 15:25:38 -0800455 # This is reversed due to the prepend operation below, so
456 # the ultimate order is templates (in order) followed by
457 # statically defined jobs.
458 for requested_template in reversed(
459 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100460 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400461 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100462 requested_template.get('name'))
463 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800464 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100465 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800466 # Finally merge the expansion with whatever has been
467 # already defined for this project. Prepend our new
468 # jobs to existing ones (which may have been
469 # statically defined or defined by other templates).
470 for pipeline in layout.pipelines.values():
471 if pipeline.name in expanded:
472 config_project.update(
473 {pipeline.name: expanded[pipeline.name] +
474 config_project.get(pipeline.name, [])})
James E. Blair12a92b12014-03-26 11:54:53 -0700475 # TODO: future enhancement -- handle the case where
476 # duplicate jobs have different children and you want all
477 # of the children to run after a single run of the
478 # parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100479
James E. Blaireff88162013-07-01 12:44:14 -0400480 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700481 mode = config_project.get('merge-mode', 'merge-resolve')
482 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400483 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700484 if pipeline.name in config_project:
485 job_tree = pipeline.addProject(project)
486 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700487 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700488
James E. Blairb0954652012-06-01 11:32:01 -0700489 # All jobs should be defined at this point, get rid of
490 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700491 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700492
James E. Blaireff88162013-07-01 12:44:14 -0400493 for pipeline in layout.pipelines.values():
494 pipeline.manager._postConfig(layout)
495
496 return layout
James E. Blairee743612012-05-29 14:49:32 -0700497
James E. Blairee743612012-05-29 14:49:32 -0700498 def setLauncher(self, launcher):
499 self.launcher = launcher
500
James E. Blair4076e2b2014-01-28 12:42:20 -0800501 def setMerger(self, merger):
502 self.merger = merger
503
James E. Blair6c358e72013-07-29 17:06:47 -0700504 def registerTrigger(self, trigger, name=None):
505 if name is None:
506 name = trigger.name
507 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700508
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000509 def registerReporter(self, reporter, name=None):
510 if name is None:
511 name = reporter.name
512 self.reporters[name] = reporter
513
James E. Blaircdccd972013-07-01 12:10:22 -0700514 def getProject(self, name):
515 self.layout_lock.acquire()
516 p = None
517 try:
518 p = self.layout.projects.get(name)
519 finally:
520 self.layout_lock.release()
521 return p
522
James E. Blairee743612012-05-29 14:49:32 -0700523 def addEvent(self, event):
524 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800525 try:
526 if statsd:
527 statsd.incr('gerrit.event.%s' % event.type)
528 except:
529 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700530 self.trigger_event_queue.put(event)
531 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800532 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700533
James E. Blair11700c32012-07-05 17:50:05 -0700534 def onBuildStarted(self, build):
535 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800536 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800537 event = BuildStartedEvent(build)
538 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700539 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800540 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700541
James E. Blairf0358662015-07-20 15:19:12 -0700542 def onBuildCompleted(self, build, result):
543 self.log.debug("Adding complete event for build: %s result: %s" % (
544 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800545 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700546 # Note, as soon as the result is set, other threads may act
547 # upon this, even though the event hasn't been fully
548 # processed. Ensure that any other data from the event (eg,
549 # timing) is recorded before setting the result.
550 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800551 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700552 if statsd and build.pipeline:
553 jobname = build.job.name.replace('.', '_')
554 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
555 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800556 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
557 dt = int((build.end_time - build.start_time) * 1000)
558 statsd.timing(key, dt)
559 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700560 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
561 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800562 except:
563 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800564 event = BuildCompletedEvent(build)
565 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700566 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800567 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700568
James E. Blair4076e2b2014-01-28 12:42:20 -0800569 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
570 self.log.debug("Adding merge complete event for build set: %s" %
571 build_set)
572 event = MergeCompletedEvent(build_set, zuul_url,
573 merged, updated, commit)
574 self.result_event_queue.put(event)
575 self.wake_event.set()
576
James E. Blaire9d45c32012-05-31 09:56:45 -0700577 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700578 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800579 event = ReconfigureEvent(config)
580 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700581 self.wake_event.set()
582 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800583 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700584 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400585 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700586
James E. Blair36658cf2013-12-06 17:53:48 -0800587 def promote(self, pipeline_name, change_ids):
588 event = PromoteEvent(pipeline_name, change_ids)
589 self.management_event_queue.put(event)
590 self.wake_event.set()
591 self.log.debug("Waiting for promotion")
592 event.wait()
593 self.log.debug("Promotion complete")
594
James E. Blaird27a96d2014-07-10 13:25:13 -0700595 def enqueue(self, trigger_event):
596 event = EnqueueEvent(trigger_event)
597 self.management_event_queue.put(event)
598 self.wake_event.set()
599 self.log.debug("Waiting for enqueue")
600 event.wait()
601 self.log.debug("Enqueue complete")
602
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700603 def exit(self):
604 self.log.debug("Prepare to exit")
605 self._pause = True
606 self._exit = True
607 self.wake_event.set()
608 self.log.debug("Waiting for exit")
609
610 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700611 if self.config.has_option('zuul', 'state_dir'):
612 state_dir = os.path.expanduser(self.config.get('zuul',
613 'state_dir'))
614 else:
615 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700616 return os.path.join(state_dir, 'queue.pickle')
617
618 def _save_queue(self):
619 pickle_file = self._get_queue_pickle_file()
620 events = []
621 while not self.trigger_event_queue.empty():
622 events.append(self.trigger_event_queue.get())
623 self.log.debug("Queue length is %s" % len(events))
624 if events:
625 self.log.debug("Saving queue")
626 pickle.dump(events, open(pickle_file, 'wb'))
627
628 def _load_queue(self):
629 pickle_file = self._get_queue_pickle_file()
630 if os.path.exists(pickle_file):
631 self.log.debug("Loading queue")
632 events = pickle.load(open(pickle_file, 'rb'))
633 self.log.debug("Queue length is %s" % len(events))
634 for event in events:
635 self.trigger_event_queue.put(event)
636 else:
637 self.log.debug("No queue file found")
638
639 def _delete_queue(self):
640 pickle_file = self._get_queue_pickle_file()
641 if os.path.exists(pickle_file):
642 self.log.debug("Deleting saved queue")
643 os.unlink(pickle_file)
644
645 def resume(self):
646 try:
647 self._load_queue()
648 except:
649 self.log.exception("Unable to load queue")
650 try:
651 self._delete_queue()
652 except:
653 self.log.exception("Unable to delete saved queue")
654 self.log.debug("Resuming queue processing")
655 self.wake_event.set()
656
657 def _doPauseEvent(self):
658 if self._exit:
659 self.log.debug("Exiting")
660 self._save_queue()
661 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700662
James E. Blair468c8512013-12-06 13:27:19 -0800663 def _doReconfigureEvent(self, event):
664 # This is called in the scheduler loop after another thread submits
665 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700666 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800667 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700668 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700669 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700670 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400671 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700672 for name, new_pipeline in layout.pipelines.items():
673 old_pipeline = self.layout.pipelines.get(name)
674 if not old_pipeline:
675 if self.layout.pipelines:
676 # Don't emit this warning on startup
677 self.log.warning("No old pipeline matching %s found "
678 "when reconfiguring" % name)
679 continue
James E. Blairdad52252014-02-07 16:59:17 -0800680 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700681 items_to_remove = []
James E. Blair400e8fd2015-07-30 17:44:45 -0700682 builds_to_cancel = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800683 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700684 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700685 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800686 if not item.item_ahead:
687 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700688 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700689 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700690 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800691 item.queue = None
James E. Blaircdccd972013-07-01 12:10:22 -0700692 project = layout.projects.get(item.change.project.name)
693 if not project:
694 self.log.warning("Unable to find project for "
695 "change %s while reenqueueing" %
696 item.change)
697 item.change.project = None
698 items_to_remove.append(item)
699 continue
700 item.change.project = project
James E. Blairdad52252014-02-07 16:59:17 -0800701 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800702 job = layout.jobs.get(build.job.name)
703 if job:
704 build.job = job
705 else:
James E. Blair400e8fd2015-07-30 17:44:45 -0700706 item.removeBuild(build)
707 builds_to_cancel.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800708 if not new_pipeline.manager.reEnqueueItem(item,
709 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700710 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800711 for item in items_to_remove:
712 for build in item.current_build_set.getBuilds():
James E. Blair400e8fd2015-07-30 17:44:45 -0700713 builds_to_cancel.append(build)
714 for build in builds_to_cancel:
James E. Blair6b077942014-02-07 17:45:55 -0800715 self.log.warning(
716 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800717 try:
718 self.launcher.cancel(build)
719 except Exception:
720 self.log.exception(
721 "Exception while canceling build %s "
722 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700723 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700724 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700725 for trigger in self.triggers.values():
726 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700727 if statsd:
728 try:
729 for pipeline in self.layout.pipelines.values():
730 items = len(pipeline.getAllItems())
731 # stats.gauges.zuul.pipeline.NAME.current_changes
732 key = 'zuul.pipeline.%s' % pipeline.name
733 statsd.gauge(key + '.current_changes', items)
734 except Exception:
735 self.log.exception("Exception reporting initial "
736 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700737 finally:
738 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700739
James E. Blair36658cf2013-12-06 17:53:48 -0800740 def _doPromoteEvent(self, event):
741 pipeline = self.layout.pipelines[event.pipeline_name]
742 change_ids = [c.split(',') for c in event.change_ids]
743 items_to_enqueue = []
744 change_queue = None
745 for shared_queue in pipeline.queues:
746 if change_queue:
747 break
748 for item in shared_queue.queue:
749 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000750 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800751 change_queue = shared_queue
752 break
753 if not change_queue:
754 raise Exception("Unable to find shared change queue for %s" %
755 event.change_ids[0])
756 for number, patchset in change_ids:
757 found = False
758 for item in change_queue.queue:
759 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000760 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800761 found = True
762 items_to_enqueue.append(item)
763 break
764 if not found:
765 raise Exception("Unable to find %s,%s in queue %s" %
766 (number, patchset, change_queue))
767 for item in change_queue.queue[:]:
768 if item not in items_to_enqueue:
769 items_to_enqueue.append(item)
770 pipeline.manager.cancelJobs(item)
771 pipeline.manager.dequeueItem(item)
772 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500773 pipeline.manager.addChange(
774 item.change,
775 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700776 quiet=True,
777 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800778
James E. Blaird27a96d2014-07-10 13:25:13 -0700779 def _doEnqueueEvent(self, event):
780 project = self.layout.projects.get(event.project_name)
781 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700782 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700783 self.log.debug("Event %s for change %s was directly assigned "
784 "to pipeline %s" % (event, change, self))
785 self.log.info("Adding %s, %s to %s" %
786 (project, change, pipeline))
787 pipeline.manager.addChange(change, ignore_requirements=True)
788
James E. Blaire9d45c32012-05-31 09:56:45 -0700789 def _areAllBuildsComplete(self):
790 self.log.debug("Checking if all builds are complete")
791 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800792 if self.merger.areMergesOutstanding():
793 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400794 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800795 for item in pipeline.getAllItems():
796 for build in item.current_build_set.getBuilds():
797 if build.result is None:
798 self.log.debug("%s waiting on %s" %
799 (pipeline.manager, build))
800 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700801 if not waiting:
802 self.log.debug("All builds are complete")
803 return True
804 self.log.debug("All builds are not complete")
805 return False
806
James E. Blairee743612012-05-29 14:49:32 -0700807 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800808 if statsd:
809 self.log.debug("Statsd enabled")
810 else:
811 self.log.debug("Statsd disabled because python statsd "
812 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700813 while True:
814 self.log.debug("Run handler sleeping")
815 self.wake_event.wait()
816 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700817 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800818 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700819 return
James E. Blairee743612012-05-29 14:49:32 -0700820 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800821 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700822 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800823 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800824 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700825
James E. Blair263fba92013-02-27 13:07:19 -0800826 # Give result events priority -- they let us stop builds,
827 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800828 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700829 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800830
831 if not self._pause:
832 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800833 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700834
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700835 if self._pause and self._areAllBuildsComplete():
836 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700837
James E. Blaira84f0e42014-02-06 07:09:22 -0800838 for pipeline in self.layout.pipelines.values():
839 while pipeline.manager.processQueue():
840 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700841
James E. Blaira84f0e42014-02-06 07:09:22 -0800842 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700843 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800844 # There may still be more events to process
845 self.wake_event.set()
846 finally:
847 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700848
James E. Blair0e933c52013-07-11 10:18:52 -0700849 def maintainTriggerCache(self):
850 relevant = set()
851 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700852 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700853 for item in pipeline.getAllItems():
854 relevant.add(item.change)
855 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700856 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700857 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700858 for trigger in self.triggers.values():
859 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700860
James E. Blairee743612012-05-29 14:49:32 -0700861 def process_event_queue(self):
862 self.log.debug("Fetching trigger event")
863 event = self.trigger_event_queue.get()
864 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800865 try:
866 project = self.layout.projects.get(event.project_name)
867 if not project:
Jukka Lehtniemibec3ed02014-10-31 13:45:17 +0100868 self.log.debug("Project %s not found" % event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800869 return
870
James E. Blaira84f0e42014-02-06 07:09:22 -0800871 for pipeline in self.layout.pipelines.values():
James E. Blairc0dedf82014-08-06 09:37:52 -0700872 change = pipeline.source.getChange(event, project)
James E. Blaira84f0e42014-02-06 07:09:22 -0800873 if event.type == 'patchset-created':
874 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100875 elif event.type == 'change-abandoned':
876 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800877 if pipeline.manager.eventMatches(event, change):
878 self.log.info("Adding %s, %s to %s" %
879 (project, change, pipeline))
880 pipeline.manager.addChange(change)
881 finally:
James E. Blairff791972013-01-09 11:45:43 -0800882 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700883
James E. Blair468c8512013-12-06 13:27:19 -0800884 def process_management_queue(self):
885 self.log.debug("Fetching management event")
886 event = self.management_event_queue.get()
887 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800888 try:
889 if isinstance(event, ReconfigureEvent):
890 self._doReconfigureEvent(event)
891 elif isinstance(event, PromoteEvent):
892 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700893 elif isinstance(event, EnqueueEvent):
894 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800895 else:
896 self.log.error("Unable to handle event %s" % event)
897 event.done()
898 except Exception as e:
899 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800900 self.management_event_queue.task_done()
901
James E. Blairee743612012-05-29 14:49:32 -0700902 def process_result_queue(self):
903 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800904 event = self.result_event_queue.get()
905 self.log.debug("Processing result event %s" % event)
906 try:
907 if isinstance(event, BuildStartedEvent):
908 self._doBuildStartedEvent(event)
909 elif isinstance(event, BuildCompletedEvent):
910 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800911 elif isinstance(event, MergeCompletedEvent):
912 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800913 else:
914 self.log.error("Unable to handle event %s" % event)
915 finally:
916 self.result_event_queue.task_done()
917
918 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800919 build = event.build
920 if build.build_set is not build.build_set.item.current_build_set:
921 self.log.warning("Build %s is not in the current build set" %
922 (build,))
923 return
924 pipeline = build.build_set.item.pipeline
925 if not pipeline:
926 self.log.warning("Build %s is not associated with a pipeline" %
927 (build,))
928 return
929 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800930
931 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800932 build = event.build
933 if build.build_set is not build.build_set.item.current_build_set:
934 self.log.warning("Build %s is not in the current build set" %
935 (build,))
936 return
937 pipeline = build.build_set.item.pipeline
938 if not pipeline:
939 self.log.warning("Build %s is not associated with a pipeline" %
940 (build,))
941 return
942 pipeline.manager.onBuildCompleted(event.build)
943
944 def _doMergeCompletedEvent(self, event):
945 build_set = event.build_set
946 if build_set is not build_set.item.current_build_set:
947 self.log.warning("Build set %s is not current" % (build_set,))
948 return
949 pipeline = build_set.item.pipeline
950 if not pipeline:
951 self.log.warning("Build set %s is not associated with a pipeline" %
952 (build_set,))
953 return
954 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700955
James E. Blair8dbd56a2012-12-22 10:55:10 -0800956 def formatStatusJSON(self):
957 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400958
959 data['zuul_version'] = self.zuul_version
960
James E. Blair8dbd56a2012-12-22 10:55:10 -0800961 if self._pause:
962 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800963 if self._exit:
964 ret += 'exit'
965 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
966 ret += '</p>'
967 data['message'] = ret
968
James E. Blairfb682cc2013-02-26 15:23:27 -0800969 data['trigger_event_queue'] = {}
970 data['trigger_event_queue']['length'] = \
971 self.trigger_event_queue.qsize()
972 data['result_event_queue'] = {}
973 data['result_event_queue']['length'] = \
974 self.result_event_queue.qsize()
975
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400976 if self.last_reconfigured:
977 data['last_reconfigured'] = self.last_reconfigured * 1000
978
James E. Blair8dbd56a2012-12-22 10:55:10 -0800979 pipelines = []
980 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -0700981 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -0800982 pipelines.append(pipeline.formatStatusJSON())
983 return json.dumps(data)
984
James E. Blair1e8dd892012-05-30 09:15:05 -0700985
James E. Blair4aea70c2012-07-26 14:23:24 -0700986class BasePipelineManager(object):
987 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700988
James E. Blair4aea70c2012-07-26 14:23:24 -0700989 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700990 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700991 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700992 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -0700993 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700994 if self.sched.config and self.sched.config.has_option(
995 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700996 self.report_times = self.sched.config.getboolean(
997 'zuul', 'report_times')
998 else:
999 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -07001000
1001 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -07001002 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -07001003
James E. Blaireff88162013-07-01 12:44:14 -04001004 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -07001005 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -07001006 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -07001007 self.log.info(" Requirements:")
1008 for f in self.changeish_filters:
1009 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -07001010 self.log.info(" Events:")
1011 for e in self.event_filters:
1012 self.log.info(" %s" % e)
1013 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001014
James E. Blairee743612012-05-29 14:49:32 -07001015 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001016 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001017 if tree.job:
1018 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001019 for b in tree.job._branches:
1020 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001021 for f in tree.job._files:
1022 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001023 if tree.job.skip_if_matcher:
1024 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001025 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001026 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -07001027 hold = ''
1028 if tree.job.hold_following_changes:
1029 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -07001030 voting = ''
1031 if not tree.job.voting:
1032 voting = ' [nonvoting]'
1033 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
1034 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -07001035 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001036 log_jobs(x, indent + 2)
1037
James E. Blaireff88162013-07-01 12:44:14 -04001038 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001039 tree = self.pipeline.getJobTree(p)
1040 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001041 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001042 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001043 self.log.info(" On start:")
1044 self.log.info(" %s" % self.pipeline.start_actions)
1045 self.log.info(" On success:")
1046 self.log.info(" %s" % self.pipeline.success_actions)
1047 self.log.info(" On failure:")
1048 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001049 self.log.info(" On merge-failure:")
1050 self.log.info(" %s" % self.pipeline.merge_failure_actions)
James E. Blairee743612012-05-29 14:49:32 -07001051
James E. Blaire421a232012-07-25 16:59:21 -07001052 def getSubmitAllowNeeds(self):
1053 # Get a list of code review labels that are allowed to be
1054 # "needed" in the submit records for a change, with respect
1055 # to this queue. In other words, the list of review labels
1056 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001057 allow_needs = set()
1058 for action_reporter in self.pipeline.success_actions:
1059 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1060 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001061
James E. Blairc053d022014-01-22 14:57:33 -08001062 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001063 if event.forced_pipeline:
1064 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001065 self.log.debug("Event %s for change %s was directly assigned "
1066 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001067 return True
1068 else:
1069 return False
James E. Blairee743612012-05-29 14:49:32 -07001070 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001071 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001072 self.log.debug("Event %s for change %s matched %s "
1073 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001074 return True
1075 return False
1076
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001077 def isChangeAlreadyInPipeline(self, change):
1078 # Checks live items in the pipeline
1079 for item in self.pipeline.getAllItems():
1080 if item.live and change.equals(item.change):
1081 return True
1082 return False
1083
1084 def isChangeAlreadyInQueue(self, change, change_queue):
1085 # Checks any item in the specified change queue
1086 for item in change_queue.queue:
1087 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001088 return True
1089 return False
1090
James E. Blaire0487072012-08-29 17:38:31 -07001091 def reportStart(self, change):
1092 try:
1093 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001094 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -07001095 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -07001096 if self.sched.config.has_option('zuul', 'status_url'):
1097 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001098 ret = self.sendReport(self.pipeline.start_actions,
1099 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -07001100 if ret:
1101 self.log.error("Reporting change start %s received: %s" %
1102 (change, ret))
1103 except:
1104 self.log.exception("Exception while reporting start:")
1105
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001106 def sendReport(self, action_reporters, change, message):
1107 """Sends the built message off to configured reporters.
1108
1109 Takes the action_reporters, change, message and extra options and
1110 sends them to the pluggable reporters.
1111 """
1112 report_errors = []
1113 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001114 for action_reporter in action_reporters:
1115 ret = action_reporter.report(change, message)
1116 if ret:
1117 report_errors.append(ret)
1118 if len(report_errors) == 0:
1119 return
1120 return report_errors
1121
James E. Blaire0487072012-08-29 17:38:31 -07001122 def isChangeReadyToBeEnqueued(self, change):
1123 return True
1124
James E. Blair5ee24252014-12-30 10:12:29 -08001125 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1126 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001127 return True
1128
James E. Blair5ee24252014-12-30 10:12:29 -08001129 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1130 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001131 return True
1132
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001133 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001134 return True
1135
James E. Blair6965a4b2014-12-16 17:19:04 -08001136 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001137 return None
1138
James E. Blairfee8d652013-06-07 08:57:52 -07001139 def getDependentItems(self, item):
1140 orig_item = item
1141 items = []
1142 while item.item_ahead:
1143 items.append(item.item_ahead)
1144 item = item.item_ahead
1145 self.log.info("Change %s depends on changes %s" %
1146 (orig_item.change,
1147 [x.change for x in items]))
1148 return items
1149
James E. Blair972e3c72013-08-29 12:04:55 -07001150 def getItemForChange(self, change):
1151 for item in self.pipeline.getAllItems():
1152 if item.change.equals(change):
1153 return item
1154 return None
1155
James E. Blair2fa50962013-01-30 21:50:41 -08001156 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001157 for item in self.pipeline.getAllItems():
1158 if not item.live:
1159 continue
1160 if change.isUpdateOf(item.change):
1161 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001162 return None
1163
1164 def removeOldVersionsOfChange(self, change):
1165 if not self.pipeline.dequeue_on_new_patchset:
1166 return
James E. Blairba437362015-02-07 11:41:52 -08001167 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1168 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001169 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001170 (change, old_item.change, old_item))
1171 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001172
Antoine Mussobd86a312014-01-08 14:51:33 +01001173 def removeAbandonedChange(self, change):
1174 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001175 for item in self.pipeline.getAllItems():
1176 if not item.live:
1177 continue
1178 if item.change.equals(change):
1179 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001180
James E. Blairbfb8e042014-12-30 17:01:44 -08001181 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001182 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1183 if change_queue:
1184 self.log.debug("Re-enqueing change %s in queue %s" %
1185 (item.change, change_queue))
1186 change_queue.enqueueItem(item)
James E. Blair6bc782d2015-07-17 16:20:21 -07001187
1188 # Re-set build results in case any new jobs have been
1189 # added to the tree.
1190 for build in item.current_build_set.getBuilds():
1191 if build.result:
1192 self.pipeline.setResult(item, build)
1193 # Similarly, reset the item state.
1194 if item.current_build_set.unable_to_merge:
1195 self.pipeline.setUnableToMerge(item)
1196 if item.dequeued_needing_change:
1197 self.pipeline.setDequeuedNeedingChange(item)
1198
James E. Blair0577cd62015-02-07 11:42:12 -08001199 self.reportStats(item)
1200 return True
1201 else:
1202 self.log.error("Unable to find change queue for project %s" %
1203 item.change.project)
1204 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001205
James E. Blairf9ab8842014-07-10 13:12:07 -07001206 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001207 ignore_requirements=False, live=True,
1208 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001209 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001210
1211 # If we are adding a live change, check if it's a live item
1212 # anywhere in the pipeline. Otherwise, we will perform the
1213 # duplicate check below on the specific change_queue.
1214 if live and self.isChangeAlreadyInPipeline(change):
1215 self.log.debug("Change %s is already in pipeline, "
1216 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001217 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001218
James E. Blaire0487072012-08-29 17:38:31 -07001219 if not self.isChangeReadyToBeEnqueued(change):
1220 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1221 change)
1222 return False
1223
James E. Blairf9ab8842014-07-10 13:12:07 -07001224 if not ignore_requirements:
1225 for f in self.changeish_filters:
1226 if not f.matches(change):
1227 self.log.debug("Change %s does not match pipeline "
1228 "requirement %s" % (change, f))
1229 return False
James E. Blair11041d22014-05-02 14:49:53 -07001230
James E. Blair0577cd62015-02-07 11:42:12 -08001231 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001232 if not change_queue:
1233 self.log.debug("Unable to find change queue for "
1234 "change %s in project %s" %
1235 (change, change.project))
1236 return False
1237
James E. Blair0577cd62015-02-07 11:42:12 -08001238 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1239 change_queue):
1240 self.log.debug("Failed to enqueue changes "
1241 "ahead of %s" % change)
1242 return False
James E. Blaire0487072012-08-29 17:38:31 -07001243
James E. Blair0577cd62015-02-07 11:42:12 -08001244 if self.isChangeAlreadyInQueue(change, change_queue):
1245 self.log.debug("Change %s is already in queue, "
1246 "ignoring" % change)
1247 return True
1248
1249 self.log.debug("Adding change %s to queue %s" %
1250 (change, change_queue))
1251 if not quiet:
1252 if len(self.pipeline.start_actions) > 0:
1253 self.reportStart(change)
1254 item = change_queue.enqueueChange(change)
1255 if enqueue_time:
1256 item.enqueue_time = enqueue_time
1257 item.live = live
1258 self.reportStats(item)
1259 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1260 change_queue)
1261 self.sched.triggers['zuul'].onChangeEnqueued(item.change,
1262 self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001263 return True
1264
James E. Blair972e3c72013-08-29 12:04:55 -07001265 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001266 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001267 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001268
James E. Blairba437362015-02-07 11:41:52 -08001269 def removeItem(self, item):
1270 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001271 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001272 self.log.debug("Canceling builds behind change: %s "
1273 "because it is being removed." % item.change)
1274 self.cancelJobs(item)
1275 self.dequeueItem(item)
1276 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001277
James E. Blairac2c3242014-01-24 13:38:51 -08001278 def _makeMergerItem(self, item):
1279 # Create a dictionary with all info about the item needed by
1280 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001281 number = None
1282 patchset = None
1283 oldrev = None
1284 newrev = None
1285 if hasattr(item.change, 'number'):
1286 number = item.change.number
1287 patchset = item.change.patchset
1288 elif hasattr(item.change, 'newrev'):
1289 oldrev = item.change.oldrev
1290 newrev = item.change.newrev
James E. Blairac2c3242014-01-24 13:38:51 -08001291 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001292 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001293 item.change.project),
1294 merge_mode=item.change.project.merge_mode,
1295 refspec=item.change.refspec,
1296 branch=item.change.branch,
1297 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001298 number=number,
1299 patchset=patchset,
1300 oldrev=oldrev,
1301 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001302 )
1303
James E. Blairfee8d652013-06-07 08:57:52 -07001304 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001305 # Returns True if the ref is ready, false otherwise
1306 build_set = item.current_build_set
1307 if build_set.merge_state == build_set.COMPLETE:
1308 return True
1309 if build_set.merge_state == build_set.PENDING:
1310 return False
1311 build_set.merge_state = build_set.PENDING
1312 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001313 if hasattr(item.change, 'refspec') and not ref:
1314 self.log.debug("Preparing ref for: %s" % item.change)
1315 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001316 dependent_items = self.getDependentItems(item)
1317 dependent_items.reverse()
1318 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001319 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001320 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001321 item.current_build_set,
1322 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001323 else:
1324 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001325 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001326 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001327 url, build_set,
1328 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001329 return False
1330
1331 def _launchJobs(self, item, jobs):
1332 self.log.debug("Launching jobs for change %s" % item.change)
1333 dependent_items = self.getDependentItems(item)
1334 for job in jobs:
1335 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001336 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001337 build = self.sched.launcher.launch(job, item,
1338 self.pipeline,
1339 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001340 self.log.debug("Adding build %s of job %s to item %s" %
1341 (build, job, item))
1342 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001343 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001344 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001345 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001346
James E. Blairfee8d652013-06-07 08:57:52 -07001347 def launchJobs(self, item):
1348 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001349 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001350 self._launchJobs(item, jobs)
1351
1352 def cancelJobs(self, item, prime=True):
1353 self.log.debug("Cancel jobs for change %s" % item.change)
1354 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001355 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001356 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001357 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001358 for build in old_build_set.getBuilds():
1359 try:
1360 self.sched.launcher.cancel(build)
1361 except:
1362 self.log.exception("Exception while canceling build %s "
1363 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001364 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001365 canceled = True
James E. Blair972e3c72013-08-29 12:04:55 -07001366 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001367 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001368 (item_behind.change, item.change))
1369 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001370 canceled = True
1371 return canceled
1372
James E. Blair4076e2b2014-01-28 12:42:20 -08001373 def _processOneItem(self, item, nnfi, ready_ahead):
James E. Blairfee8d652013-06-07 08:57:52 -07001374 changed = False
1375 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001376 if item_ahead and (not item_ahead.live):
1377 item_ahead = None
1378 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001379 failing_reasons = [] # Reasons this item is failing
1380
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001381 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001382 # It's not okay to enqueue this change, we should remove it.
1383 self.log.info("Dequeuing change %s because "
1384 "it can no longer merge" % item.change)
1385 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001386 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001387 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001388 if item.live:
1389 try:
1390 self.reportItem(item)
1391 except MergeFailure:
1392 pass
James E. Blair4076e2b2014-01-28 12:42:20 -08001393 return (True, nnfi, ready_ahead)
James E. Blair6965a4b2014-12-16 17:19:04 -08001394 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001395 actionable = change_queue.isActionable(item)
1396 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001397 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001398 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001399 failing_reasons.append('a needed change is failing')
1400 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001401 else:
James E. Blairfef71632013-09-23 11:15:47 -07001402 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001403 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001404 item_ahead_merged = True
1405 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001406 # Our current base is different than what we expected,
1407 # and it's not because our current base merged. Something
1408 # ahead must have failed.
1409 self.log.info("Resetting builds for change %s because the "
1410 "item ahead, %s, is not the nearest non-failing "
1411 "item, %s" % (item.change, item_ahead, nnfi))
1412 change_queue.moveItem(item, nnfi)
1413 changed = True
1414 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001415 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001416 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001417 if item.current_build_set.unable_to_merge:
1418 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001419 ready = False
1420 if not ready:
1421 ready_ahead = False
1422 if actionable and ready_ahead and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001423 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001424 if self.pipeline.didAnyJobFail(item):
1425 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001426 if (not item.live) and (not item.items_behind):
1427 failing_reasons.append("is a non-live item with no items behind")
1428 self.dequeueItem(item)
1429 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001430 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1431 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001432 try:
1433 self.reportItem(item)
1434 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001435 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001436 for item_behind in item.items_behind:
1437 self.log.info("Resetting builds for change %s because the "
1438 "item ahead, %s, failed to merge" %
1439 (item_behind.change, item))
1440 self.cancelJobs(item_behind)
1441 self.dequeueItem(item)
1442 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001443 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001444 nnfi = item
1445 item.current_build_set.failing_reasons = failing_reasons
1446 if failing_reasons:
1447 self.log.debug("%s is a failing item because %s" %
1448 (item, failing_reasons))
James E. Blair4076e2b2014-01-28 12:42:20 -08001449 return (changed, nnfi, ready_ahead)
James E. Blairfee8d652013-06-07 08:57:52 -07001450
1451 def processQueue(self):
1452 # Do whatever needs to be done for each change in the queue
1453 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1454 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001455 for queue in self.pipeline.queues:
1456 queue_changed = False
1457 nnfi = None # Nearest non-failing item
James E. Blair4076e2b2014-01-28 12:42:20 -08001458 ready_ahead = True # All build sets ahead are ready
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001459 for item in queue.queue[:]:
James E. Blair4076e2b2014-01-28 12:42:20 -08001460 item_changed, nnfi, ready_ahhead = self._processOneItem(
1461 item, nnfi, ready_ahead)
James E. Blair972e3c72013-08-29 12:04:55 -07001462 if item_changed:
1463 queue_changed = True
1464 self.reportStats(item)
1465 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001466 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001467 status = ''
1468 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001469 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001470 if status:
1471 self.log.debug("Queue %s status is now:\n %s" %
1472 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001473 self.log.debug("Finished queue processor: %s (changed: %s)" %
1474 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001475 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001476
James E. Blair11700c32012-07-05 17:50:05 -07001477 def updateBuildDescriptions(self, build_set):
1478 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001479 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001480 self.sched.launcher.setBuildDescription(build, desc)
1481
1482 if build_set.previous_build_set:
1483 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001484 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001485 self.sched.launcher.setBuildDescription(build, desc)
1486
1487 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001488 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001489 self.updateBuildDescriptions(build.build_set)
1490 return True
1491
James E. Blairee743612012-05-29 14:49:32 -07001492 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001493 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001494 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001495
James E. Blair6b077942014-02-07 17:45:55 -08001496 self.pipeline.setResult(item, build)
1497 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001498 (item, item.formatStatus()))
James E. Blair11700c32012-07-05 17:50:05 -07001499 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -07001500 return True
1501
James E. Blair4076e2b2014-01-28 12:42:20 -08001502 def onMergeCompleted(self, event):
1503 build_set = event.build_set
1504 item = build_set.item
1505 build_set.merge_state = build_set.COMPLETE
1506 build_set.zuul_url = event.zuul_url
1507 if event.merged:
1508 build_set.commit = event.commit
1509 elif event.updated:
1510 build_set.commit = item.change.newrev
1511 if not build_set.commit:
1512 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001513 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001514
James E. Blairfee8d652013-06-07 08:57:52 -07001515 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001516 if not item.reported:
1517 # _reportItem() returns True if it failed to report.
1518 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001519 if self.changes_merge:
1520 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001521 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001522 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001523 merged = self.pipeline.source.isMerged(item.change,
1524 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001525 self.log.info("Reported change %s status: all-succeeded: %s, "
1526 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001527 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001528 if not (succeeded and merged):
1529 self.log.debug("Reported change %s failed tests or failed "
1530 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001531 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001532 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001533 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001534 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001535 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001536 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001537 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001538 (change_queue, change_queue.window))
James E. Blairc494d542014-08-06 09:23:52 -07001539 self.sched.triggers['zuul'].onChangeMerged(item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001540
James E. Blairfee8d652013-06-07 08:57:52 -07001541 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001542 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001543 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001544 if not self.pipeline.getJobs(item):
1545 # We don't send empty reports with +1,
1546 # and the same for -1's (merge failures or transient errors)
1547 # as they cannot be followed by +1's
1548 self.log.debug("No jobs for change %s" % item.change)
1549 actions = []
1550 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001551 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001552 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001553 item.setReportedResult('SUCCESS')
Joshua Heskethb7179772014-01-30 23:30:46 +11001554 elif not self.pipeline.didMergerSucceed(item):
1555 actions = self.pipeline.merge_failure_actions
1556 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001557 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001558 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001559 item.setReportedResult('FAILURE')
James E. Blaire5910202013-12-27 09:50:31 -08001560 if actions:
1561 report = self.formatReport(item)
1562 try:
1563 self.log.info("Reporting change %s, actions: %s" %
1564 (item.change, actions))
1565 ret = self.sendReport(actions, item.change, report)
1566 if ret:
1567 self.log.error("Reporting change %s received: %s" %
1568 (item.change, ret))
1569 except:
1570 self.log.exception("Exception while reporting:")
1571 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001572 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001573 return ret
1574
James E. Blairfee8d652013-06-07 08:57:52 -07001575 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001576 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001577
Jeremy Stanley10837132014-08-02 16:10:56 +00001578 if item.dequeued_needing_change:
1579 ret += 'This change depends on a change that failed to merge.\n'
1580 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001581 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001582 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001583 if self.pipeline.didAllJobsSucceed(item):
1584 ret += self.pipeline.success_message + '\n\n'
1585 else:
1586 ret += self.pipeline.failure_message + '\n\n'
1587 ret += self._formatReportJobs(item)
1588
1589 if self.pipeline.footer_message:
1590 ret += '\n' + self.pipeline.footer_message
1591
1592 return ret
1593
1594 def _formatReportJobs(self, item):
1595 # Return the list of jobs portion of the report
1596 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001597
Joshua Heskethb7179772014-01-30 23:30:46 +11001598 if self.sched.config.has_option('zuul', 'url_pattern'):
1599 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001600 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001601 url_pattern = None
1602
James E. Blair107c3852015-02-07 08:23:10 -08001603 for job in self.pipeline.getJobs(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001604 build = item.current_build_set.getBuild(job.name)
1605 result = build.result
1606 pattern = url_pattern
1607 if result == 'SUCCESS':
1608 if job.success_message:
1609 result = job.success_message
1610 if job.success_pattern:
1611 pattern = job.success_pattern
1612 elif result == 'FAILURE':
1613 if job.failure_message:
1614 result = job.failure_message
1615 if job.failure_pattern:
1616 pattern = job.failure_pattern
1617 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001618 url = pattern.format(change=item.change,
1619 pipeline=self.pipeline,
1620 job=job,
1621 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001622 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001623 url = build.url or job.name
1624 if not job.voting:
1625 voting = ' (non-voting)'
1626 else:
1627 voting = ''
1628 if self.report_times and build.end_time and build.start_time:
1629 dt = int(build.end_time - build.start_time)
1630 m, s = divmod(dt, 60)
1631 h, m = divmod(m, 60)
1632 if h:
1633 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1634 elif m:
1635 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001636 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001637 elapsed = ' in %ds' % (s)
1638 else:
1639 elapsed = ''
1640 name = ''
1641 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1642 if self.sched.config.getboolean('zuul',
1643 'job_name_in_report'):
1644 name = job.name + ' '
1645 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1646 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001647 return ret
1648
1649 def formatDescription(self, build):
1650 concurrent_changes = ''
1651 concurrent_builds = ''
1652 other_builds = ''
1653
1654 for change in build.build_set.other_changes:
1655 concurrent_changes += '<li><a href="{change.url}">\
1656 {change.number},{change.patchset}</a></li>'.format(
1657 change=change)
1658
James E. Blairfee8d652013-06-07 08:57:52 -07001659 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001660
1661 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001662 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001663 concurrent_builds += """\
1664<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001665 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001666 {build.job.name} #{build.number}</a>: {build.result}
1667</li>
1668""".format(build=build)
1669 else:
1670 concurrent_builds += """\
1671<li>
1672 {build.job.name}: {build.result}
1673</li>""".format(build=build)
1674
1675 if build.build_set.previous_build_set:
1676 other_build = build.build_set.previous_build_set.getBuild(
1677 build.job.name)
1678 if other_build:
1679 other_builds += """\
1680<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001681 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001682 {build.job.name} #{build.number}</a>
1683</li>
1684""".format(build=other_build)
1685
1686 if build.build_set.next_build_set:
1687 other_build = build.build_set.next_build_set.getBuild(
1688 build.job.name)
1689 if other_build:
1690 other_builds += """\
1691<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001692 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001693 {build.job.name} #{build.number}</a>
1694</li>
1695""".format(build=other_build)
1696
1697 result = build.build_set.result
1698
1699 if hasattr(change, 'number'):
1700 ret = """\
1701<p>
1702 Triggered by change:
1703 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1704 Branch: <b>{change.branch}</b><br/>
1705 Pipeline: <b>{self.pipeline.name}</b>
1706</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001707 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001708 ret = """\
1709<p>
1710 Triggered by reference:
1711 {change.ref}</a><br/>
1712 Old revision: <b>{change.oldrev}</b><br/>
1713 New revision: <b>{change.newrev}</b><br/>
1714 Pipeline: <b>{self.pipeline.name}</b>
1715</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001716 else:
1717 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001718
1719 if concurrent_changes:
1720 ret += """\
1721<p>
1722 Other changes tested concurrently with this change:
1723 <ul>{concurrent_changes}</ul>
1724</p>
1725"""
1726 if concurrent_builds:
1727 ret += """\
1728<p>
1729 All builds for this change set:
1730 <ul>{concurrent_builds}</ul>
1731</p>
1732"""
1733
1734 if other_builds:
1735 ret += """\
1736<p>
1737 Other build sets for this change:
1738 <ul>{other_builds}</ul>
1739</p>
1740"""
1741 if result:
1742 ret += """\
1743<p>
1744 Reported result: <b>{result}</b>
1745</p>
1746"""
1747
1748 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001749 return ret
1750
James E. Blairfee8d652013-06-07 08:57:52 -07001751 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001752 if not statsd:
1753 return
1754 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001755 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001756 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001757 if item.dequeue_time:
1758 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001759 else:
1760 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001761 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001762
1763 # stats.timers.zuul.pipeline.NAME.resident_time
1764 # stats_counts.zuul.pipeline.NAME.total_changes
1765 # stats.gauges.zuul.pipeline.NAME.current_changes
1766 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001767 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001768 if dt:
1769 statsd.timing(key + '.resident_time', dt)
1770 statsd.incr(key + '.total_changes')
1771
1772 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1773 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001774 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001775 key += '.%s' % project_name
1776 if dt:
1777 statsd.timing(key + '.resident_time', dt)
1778 statsd.incr(key + '.total_changes')
1779 except:
1780 self.log.exception("Exception reporting pipeline stats")
1781
James E. Blair1e8dd892012-05-30 09:15:05 -07001782
James E. Blair0577cd62015-02-07 11:42:12 -08001783class DynamicChangeQueueContextManager(object):
1784 def __init__(self, change_queue):
1785 self.change_queue = change_queue
1786
1787 def __enter__(self):
1788 return self.change_queue
1789
1790 def __exit__(self, etype, value, tb):
1791 if self.change_queue and not self.change_queue.queue:
1792 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1793
1794
James E. Blair4aea70c2012-07-26 14:23:24 -07001795class IndependentPipelineManager(BasePipelineManager):
1796 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001797 changes_merge = False
1798
James E. Blaireff88162013-07-01 12:44:14 -04001799 def _postConfig(self, layout):
1800 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001801
James E. Blair0577cd62015-02-07 11:42:12 -08001802 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001803 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001804 if existing:
1805 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001806 if change.project not in self.pipeline.getProjects():
James E. Blair0577cd62015-02-07 11:42:12 -08001807 return DynamicChangeQueueContextManager(None)
James E. Blairbfb8e042014-12-30 17:01:44 -08001808 change_queue = ChangeQueue(self.pipeline)
1809 change_queue.addProject(change.project)
1810 self.pipeline.addQueue(change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001811 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001812
1813 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1814 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001815 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001816 if ret in [True, False]:
1817 return ret
1818 self.log.debug(" Changes %s must be merged ahead of %s" %
1819 (ret, change))
1820 for needed_change in ret:
1821 # This differs from the dependent pipeline by enqueuing
1822 # changes ahead as "not live", that is, not intended to
1823 # have jobs run. Also, pipeline requirements are always
1824 # ignored (which is safe because the changes are not
1825 # live).
1826 r = self.addChange(needed_change, quiet=True,
1827 ignore_requirements=True,
1828 live=False, change_queue=change_queue)
1829 if not r:
1830 return False
1831 return True
1832
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001833 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001834 if self.pipeline.ignore_dependencies:
1835 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001836 self.log.debug("Checking for changes needed by %s:" % change)
1837 # Return true if okay to proceed enqueing this change,
1838 # false if the change should not be enqueued.
1839 if not hasattr(change, 'needs_changes'):
1840 self.log.debug(" Changeish does not support dependencies")
1841 return True
1842 if not change.needs_changes:
1843 self.log.debug(" No changes needed")
1844 return True
1845 changes_needed = []
1846 for needed_change in change.needs_changes:
1847 self.log.debug(" Change %s needs change %s:" % (
1848 change, needed_change))
1849 if needed_change.is_merged:
1850 self.log.debug(" Needed change is merged")
1851 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001852 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001853 self.log.debug(" Needed change is already ahead in the queue")
1854 continue
1855 self.log.debug(" Change %s is needed" % needed_change)
1856 if needed_change not in changes_needed:
1857 changes_needed.append(needed_change)
1858 continue
1859 # This differs from the dependent pipeline check in not
1860 # verifying that the dependent change is mergable.
1861 if changes_needed:
1862 return changes_needed
1863 return True
1864
1865 def dequeueItem(self, item):
1866 super(IndependentPipelineManager, self).dequeueItem(item)
1867 # An independent pipeline manager dynamically removes empty
1868 # queues
1869 if not item.queue.queue:
1870 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001871
James E. Blair1e8dd892012-05-30 09:15:05 -07001872
James E. Blair0577cd62015-02-07 11:42:12 -08001873class StaticChangeQueueContextManager(object):
1874 def __init__(self, change_queue):
1875 self.change_queue = change_queue
1876
1877 def __enter__(self):
1878 return self.change_queue
1879
1880 def __exit__(self, etype, value, tb):
1881 pass
1882
1883
James E. Blair4aea70c2012-07-26 14:23:24 -07001884class DependentPipelineManager(BasePipelineManager):
1885 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001886 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001887
1888 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001889 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001890
James E. Blaireff88162013-07-01 12:44:14 -04001891 def _postConfig(self, layout):
1892 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001893 self.buildChangeQueues()
1894
1895 def buildChangeQueues(self):
1896 self.log.debug("Building shared change queues")
1897 change_queues = []
1898
James E. Blair4aea70c2012-07-26 14:23:24 -07001899 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001900 change_queue = ChangeQueue(
1901 self.pipeline,
1902 window=self.pipeline.window,
1903 window_floor=self.pipeline.window_floor,
1904 window_increase_type=self.pipeline.window_increase_type,
1905 window_increase_factor=self.pipeline.window_increase_factor,
1906 window_decrease_type=self.pipeline.window_decrease_type,
1907 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001908 change_queue.addProject(project)
1909 change_queues.append(change_queue)
1910 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001911
James E. Blairc3d428e2013-12-03 15:06:48 -08001912 # Iterate over all queues trying to combine them, and keep doing
1913 # so until they can not be combined further.
1914 last_change_queues = change_queues
1915 while True:
1916 new_change_queues = self.combineChangeQueues(last_change_queues)
1917 if len(last_change_queues) == len(new_change_queues):
1918 break
1919 last_change_queues = new_change_queues
1920
1921 self.log.info(" Shared change queues:")
1922 for queue in new_change_queues:
1923 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08001924 self.log.info(" %s containing %s" % (
1925 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08001926
1927 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001928 self.log.debug("Combining shared queues")
1929 new_change_queues = []
1930 for a in change_queues:
1931 merged_a = False
1932 for b in new_change_queues:
1933 if not a.getJobs().isdisjoint(b.getJobs()):
1934 self.log.debug("Merging queue %s into %s" % (a, b))
1935 b.mergeChangeQueue(a)
1936 merged_a = True
1937 break # this breaks out of 'for b' and continues 'for a'
1938 if not merged_a:
1939 self.log.debug("Keeping queue %s" % (a))
1940 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001941 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001942
James E. Blair0577cd62015-02-07 11:42:12 -08001943 def getChangeQueue(self, change, existing=None):
1944 if existing:
1945 return StaticChangeQueueContextManager(existing)
1946 return StaticChangeQueueContextManager(
1947 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08001948
James E. Blaire0487072012-08-29 17:38:31 -07001949 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07001950 if not self.pipeline.source.canMerge(change,
1951 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001952 self.log.debug("Change %s can not merge, ignoring" % change)
1953 return False
1954 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001955
James E. Blair5ee24252014-12-30 10:12:29 -08001956 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1957 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001958 to_enqueue = []
1959 self.log.debug("Checking for changes needing %s:" % change)
1960 if not hasattr(change, 'needed_by_changes'):
1961 self.log.debug(" Changeish does not support dependencies")
1962 return
James E. Blair5ee24252014-12-30 10:12:29 -08001963 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08001964 with self.getChangeQueue(other_change) as other_change_queue:
1965 if other_change_queue != change_queue:
1966 self.log.debug(" Change %s in project %s can not be "
1967 "enqueued in the target queue %s" %
1968 (other_change, other_change.project,
1969 change_queue))
1970 continue
James E. Blair5ee24252014-12-30 10:12:29 -08001971 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07001972 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001973 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08001974 (other_change, change))
1975 to_enqueue.append(other_change)
1976
James E. Blaire0487072012-08-29 17:38:31 -07001977 if not to_enqueue:
1978 self.log.debug(" No changes need %s" % change)
1979
1980 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07001981 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001982 ignore_requirements=ignore_requirements,
1983 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001984
James E. Blair5ee24252014-12-30 10:12:29 -08001985 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1986 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001987 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001988 if ret in [True, False]:
1989 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08001990 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07001991 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08001992 for needed_change in ret:
1993 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001994 ignore_requirements=ignore_requirements,
1995 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08001996 if not r:
1997 return False
1998 return True
James E. Blaire0487072012-08-29 17:38:31 -07001999
James E. Blairdbfe1cd2015-02-07 11:41:19 -08002000 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002001 self.log.debug("Checking for changes needed by %s:" % change)
2002 # Return true if okay to proceed enqueing this change,
2003 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002004 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002005 self.log.debug(" Changeish does not support dependencies")
2006 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002007 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002008 self.log.debug(" No changes needed")
2009 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002010 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002011 # Ignore supplied change_queue
2012 with self.getChangeQueue(change) as change_queue:
2013 for needed_change in change.needs_changes:
2014 self.log.debug(" Change %s needs change %s:" % (
2015 change, needed_change))
2016 if needed_change.is_merged:
2017 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002018 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002019 with self.getChangeQueue(needed_change) as needed_change_queue:
2020 if needed_change_queue != change_queue:
2021 self.log.debug(" Change %s in project %s does not "
2022 "share a change queue with %s "
2023 "in project %s" %
2024 (needed_change, needed_change.project,
2025 change, change.project))
2026 return False
2027 if not needed_change.is_current_patchset:
2028 self.log.debug(" Needed change is not the "
2029 "current patchset")
2030 return False
2031 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2032 self.log.debug(" Needed change is already ahead "
2033 "in the queue")
2034 continue
2035 if self.pipeline.source.canMerge(needed_change,
2036 self.getSubmitAllowNeeds()):
2037 self.log.debug(" Change %s is needed" % needed_change)
2038 if needed_change not in changes_needed:
2039 changes_needed.append(needed_change)
2040 continue
2041 # The needed change can't be merged.
2042 self.log.debug(" Change %s is needed but can not be merged" %
2043 needed_change)
2044 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002045 if changes_needed:
2046 return changes_needed
2047 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002048
James E. Blair6965a4b2014-12-16 17:19:04 -08002049 def getFailingDependentItems(self, item):
2050 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002051 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002052 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002053 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002054 failing_items = set()
2055 for needed_change in item.change.needs_changes:
2056 needed_item = self.getItemForChange(needed_change)
2057 if not needed_item:
2058 continue
2059 if needed_item.current_build_set.failing_reasons:
2060 failing_items.add(needed_item)
2061 if failing_items:
2062 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002063 return None