blob: 551cbe057b17d327667fd276f952cc7f640ec107 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
Joshua Hesketh70b13492015-08-11 23:40:42 +100033from model import ChangeishFilter, NullChange
Maru Newby3fe5f852015-01-13 04:22:14 +000034from zuul import change_matcher
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040035from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070036
James E. Blair71e94122012-12-24 17:53:08 -080037statsd = extras.try_import('statsd.statsd')
38
James E. Blair1e8dd892012-05-30 09:15:05 -070039
Antoine Musso80edd5a2013-02-13 15:37:53 +010040def deep_format(obj, paramdict):
41 """Apply the paramdict via str.format() to all string objects found within
42 the supplied obj. Lists and dicts are traversed recursively.
43
44 Borrowed from Jenkins Job Builder project"""
45 if isinstance(obj, str):
46 ret = obj.format(**paramdict)
47 elif isinstance(obj, list):
48 ret = []
49 for item in obj:
50 ret.append(deep_format(item, paramdict))
51 elif isinstance(obj, dict):
52 ret = {}
53 for item in obj:
54 exp_item = item.format(**paramdict)
55
56 ret[exp_item] = deep_format(obj[item], paramdict)
57 else:
58 ret = obj
59 return ret
60
61
James E. Blairfee8d652013-06-07 08:57:52 -070062class MergeFailure(Exception):
63 pass
64
65
James E. Blair468c8512013-12-06 13:27:19 -080066class ManagementEvent(object):
67 """An event that should be processed within the main queue run loop"""
68 def __init__(self):
69 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080070 self._exception = None
71 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080072
James E. Blair36658cf2013-12-06 17:53:48 -080073 def exception(self, e, tb):
74 self._exception = e
75 self._traceback = tb
76 self._wait_event.set()
77
78 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080079 self._wait_event.set()
80
81 def wait(self, timeout=None):
82 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080083 if self._exception:
84 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080085 return self._wait_event.is_set()
86
87
88class ReconfigureEvent(ManagementEvent):
89 """Reconfigure the scheduler. The layout will be (re-)loaded from
90 the path specified in the configuration.
91
92 :arg ConfigParser config: the new configuration
93 """
94 def __init__(self, config):
95 super(ReconfigureEvent, self).__init__()
96 self.config = config
97
98
James E. Blair36658cf2013-12-06 17:53:48 -080099class PromoteEvent(ManagementEvent):
100 """Promote one or more changes to the head of the queue.
101
102 :arg str pipeline_name: the name of the pipeline
103 :arg list change_ids: a list of strings of change ids in the form
104 1234,1
105 """
106
107 def __init__(self, pipeline_name, change_ids):
108 super(PromoteEvent, self).__init__()
109 self.pipeline_name = pipeline_name
110 self.change_ids = change_ids
111
112
James E. Blaird27a96d2014-07-10 13:25:13 -0700113class EnqueueEvent(ManagementEvent):
114 """Enqueue a change into a pipeline
115
116 :arg TriggerEvent trigger_event: a TriggerEvent describing the
117 trigger, pipeline, and change to enqueue
118 """
119
120 def __init__(self, trigger_event):
121 super(EnqueueEvent, self).__init__()
122 self.trigger_event = trigger_event
123
124
James E. Blaira84f0e42014-02-06 07:09:22 -0800125class ResultEvent(object):
126 """An event that needs to modify the pipeline state due to a
127 result from an external system."""
128
129 pass
130
131
132class BuildStartedEvent(ResultEvent):
133 """A build has started.
134
135 :arg Build build: The build which has started.
136 """
137
138 def __init__(self, build):
139 self.build = build
140
141
142class BuildCompletedEvent(ResultEvent):
143 """A build has completed
144
145 :arg Build build: The build which has completed.
146 """
147
148 def __init__(self, build):
149 self.build = build
150
151
James E. Blair4076e2b2014-01-28 12:42:20 -0800152class MergeCompletedEvent(ResultEvent):
153 """A remote merge operation has completed
154
155 :arg BuildSet build_set: The build_set which is ready.
156 :arg str zuul_url: The URL of the Zuul Merger.
157 :arg bool merged: Whether the merge succeeded (changes with refs).
158 :arg bool updated: Whether the repo was updated (changes without refs).
159 :arg str commit: The SHA of the merged commit (changes with refs).
160 """
161
162 def __init__(self, build_set, zuul_url, merged, updated, commit):
163 self.build_set = build_set
164 self.zuul_url = zuul_url
165 self.merged = merged
166 self.updated = updated
167 self.commit = commit
168
169
Maru Newby3fe5f852015-01-13 04:22:14 +0000170def toList(item):
171 if not item:
172 return []
173 if isinstance(item, list):
174 return item
175 return [item]
176
177
James E. Blaire9d45c32012-05-31 09:56:45 -0700178class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700179 log = logging.getLogger("zuul.Scheduler")
180
James E. Blaire9d45c32012-05-31 09:56:45 -0700181 def __init__(self):
182 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400183 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700184 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700185 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800186 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700187 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700188 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700189 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700190 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800191 self.merger = None
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100192 self.sources = dict()
James E. Blair6c358e72013-07-29 17:06:47 -0700193 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000194 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700195 self.config = None
James E. Blairee743612012-05-29 14:49:32 -0700196
197 self.trigger_event_queue = Queue.Queue()
198 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800199 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400200 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700201
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000202 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400203 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400204
James E. Blairb0fcae42012-07-17 11:12:10 -0700205 def stop(self):
206 self._stopped = True
207 self.wake_event.set()
208
James E. Blair47958382013-01-10 17:26:02 -0800209 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700210 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800211
Maru Newby3fe5f852015-01-13 04:22:14 +0000212 def _parseSkipIf(self, config_job):
213 cm = change_matcher
214 skip_matchers = []
215
216 for config_skip in config_job.get('skip-if', []):
217 nested_matchers = []
218
219 project_regex = config_skip.get('project')
220 if project_regex:
221 nested_matchers.append(cm.ProjectMatcher(project_regex))
222
223 branch_regex = config_skip.get('branch')
224 if branch_regex:
225 nested_matchers.append(cm.BranchMatcher(branch_regex))
226
227 file_regexes = toList(config_skip.get('all-files-match-any'))
228 if file_regexes:
229 file_matchers = [cm.FileMatcher(x) for x in file_regexes]
230 all_files_matcher = cm.MatchAllFiles(file_matchers)
231 nested_matchers.append(all_files_matcher)
232
233 # All patterns need to match a given skip-if predicate
234 skip_matchers.append(cm.MatchAll(nested_matchers))
235
236 if skip_matchers:
237 # Any skip-if predicate can be matched to trigger a skip
238 return cm.MatchAny(skip_matchers)
239
James E. Blaire5a847f2012-07-10 15:29:14 -0700240 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400241 layout = model.Layout()
242 project_templates = {}
243
James E. Blaire5a847f2012-07-10 15:29:14 -0700244 if config_path:
245 config_path = os.path.expanduser(config_path)
246 if not os.path.exists(config_path):
247 raise Exception("Unable to read layout config file at %s" %
248 config_path)
Einst Crazyff9837b2015-11-17 17:32:37 +0800249 with open(config_path) as config_file:
250 data = yaml.load(config_file)
James E. Blaire5a847f2012-07-10 15:29:14 -0700251
James E. Blair47958382013-01-10 17:26:02 -0800252 validator = layoutvalidator.LayoutValidator()
253 validator.validate(data)
254
James E. Blaireff88162013-07-01 12:44:14 -0400255 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700256 for include in data.get('includes', []):
257 if 'python-file' in include:
258 fn = include['python-file']
259 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100260 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700261 fn = os.path.join(base, fn)
262 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400263 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700264
James E. Blair4aea70c2012-07-26 14:23:24 -0700265 for conf_pipeline in data.get('pipelines', []):
266 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800267 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700268 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100269 pipeline.source = self.sources[conf_pipeline.get('source',
270 'gerrit')]
James E. Blair64ed6f22013-07-10 14:07:23 -0700271 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
272 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800273 pipeline.failure_message = conf_pipeline.get('failure-message',
274 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100275 pipeline.merge_failure_message = conf_pipeline.get(
Jeremy Stanley1c2c3c22015-06-15 21:23:19 +0000276 'merge-failure-message', "Merge Failed.\n\nThis change or one "
277 "of its cross-repo dependencies was unable to be "
278 "automatically merged with the current state of its "
279 "repository. Please rebase the change and upload a new "
Joshua Heskethb7179772014-01-30 23:30:46 +1100280 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800281 pipeline.success_message = conf_pipeline.get('success-message',
282 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100283 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800284 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700285 'dequeue-on-new-patchset', True)
James E. Blair17dd6772015-02-09 14:45:18 -0800286 pipeline.ignore_dependencies = conf_pipeline.get(
287 'ignore-dependencies', False)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000288
289 action_reporters = {}
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100290 for action in ['start', 'success', 'failure', 'merge-failure',
291 'disabled']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000292 action_reporters[action] = []
293 if conf_pipeline.get(action):
294 for reporter_name, params \
295 in conf_pipeline.get(action).items():
296 if reporter_name in self.reporters.keys():
297 action_reporters[action].append(ActionReporter(
298 self.reporters[reporter_name], params))
299 else:
300 self.log.error('Invalid reporter name %s' %
301 reporter_name)
302 pipeline.start_actions = action_reporters['start']
303 pipeline.success_actions = action_reporters['success']
304 pipeline.failure_actions = action_reporters['failure']
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100305 pipeline.disabled_actions = action_reporters['disabled']
Joshua Heskethb7179772014-01-30 23:30:46 +1100306 if len(action_reporters['merge-failure']) > 0:
307 pipeline.merge_failure_actions = \
308 action_reporters['merge-failure']
309 else:
310 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000311
Joshua Hesketh89e829d2015-02-10 16:29:45 +1100312 pipeline.disable_at = conf_pipeline.get(
313 'disable-after-consecutive-failures', None)
314
Clark Boylan7603a372014-01-21 11:43:20 -0800315 pipeline.window = conf_pipeline.get('window', 20)
316 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
317 pipeline.window_increase_type = conf_pipeline.get(
318 'window-increase-type', 'linear')
319 pipeline.window_increase_factor = conf_pipeline.get(
320 'window-increase-factor', 1)
321 pipeline.window_decrease_type = conf_pipeline.get(
322 'window-decrease-type', 'exponential')
323 pipeline.window_decrease_factor = conf_pipeline.get(
324 'window-decrease-factor', 2)
325
James E. Blair4aea70c2012-07-26 14:23:24 -0700326 manager = globals()[conf_pipeline['manager']](self, pipeline)
327 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400328 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000329
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000330 if 'require' in conf_pipeline or 'reject' in conf_pipeline:
331 require = conf_pipeline.get('require', {})
332 reject = conf_pipeline.get('reject', {})
Clark Boylana9702ad2014-05-08 17:17:24 -0700333 f = ChangeishFilter(
334 open=require.get('open'),
335 current_patchset=require.get('current-patchset'),
336 statuses=toList(require.get('status')),
Joshua Hesketh66c8e522014-06-26 15:30:08 +1000337 required_approvals=toList(require.get('approval')),
338 reject_approvals=toList(reject.get('approval'))
339 )
James E. Blair11041d22014-05-02 14:49:53 -0700340 manager.changeish_filters.append(f)
341
Joshua Hesketh70b13492015-08-11 23:40:42 +1000342 # TODO(jhesketh): Allow multiple triggers per pipeline
343 for trigger in self.triggers.values():
344 manager.event_filters += \
345 trigger.getEventFilters(conf_pipeline['trigger'])
James E. Blairee743612012-05-29 14:49:32 -0700346
Antoine Musso80edd5a2013-02-13 15:37:53 +0100347 for project_template in data.get('project-templates', []):
348 # Make sure the template only contains valid pipelines
349 tpl = dict(
350 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400351 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100352 if pipe_name in project_template
353 )
James E. Blaireff88162013-07-01 12:44:14 -0400354 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100355
James E. Blair47958382013-01-10 17:26:02 -0800356 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400357 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700358 # Be careful to only set attributes explicitly present on
359 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800360 m = config_job.get('queue-name', None)
361 if m:
362 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700363 m = config_job.get('failure-message', None)
364 if m:
365 job.failure_message = m
366 m = config_job.get('success-message', None)
367 if m:
368 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800369 m = config_job.get('failure-pattern', None)
370 if m:
371 job.failure_pattern = m
372 m = config_job.get('success-pattern', None)
373 if m:
374 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700375 m = config_job.get('hold-following-changes', False)
376 if m:
377 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700378 m = config_job.get('voting', None)
379 if m is not None:
380 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700381 fname = config_job.get('parameter-function', None)
382 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400383 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700384 if not func:
385 raise Exception("Unable to find function %s" % fname)
386 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700387 branches = toList(config_job.get('branch'))
388 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700389 job._branches = branches
390 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800391 files = toList(config_job.get('files'))
392 if files:
393 job._files = files
394 job.files = [re.compile(x) for x in files]
Maru Newby3fe5f852015-01-13 04:22:14 +0000395 skip_if_matcher = self._parseSkipIf(config_job)
396 if skip_if_matcher:
397 job.skip_if_matcher = skip_if_matcher
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100398 swift = toList(config_job.get('swift'))
399 if swift:
400 for s in swift:
401 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700402
403 def add_jobs(job_tree, config_jobs):
404 for job in config_jobs:
405 if isinstance(job, list):
406 for x in job:
407 add_jobs(job_tree, x)
408 if isinstance(job, dict):
409 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400410 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700411 add_jobs(parent_tree, children)
412 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400413 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700414
James E. Blair47958382013-01-10 17:26:02 -0800415 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700416 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800417 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100418
James E. Blair3e98c022013-12-16 15:25:38 -0800419 # This is reversed due to the prepend operation below, so
420 # the ultimate order is templates (in order) followed by
421 # statically defined jobs.
422 for requested_template in reversed(
423 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100424 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400425 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100426 requested_template.get('name'))
427 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800428 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100429 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800430 # Finally merge the expansion with whatever has been
431 # already defined for this project. Prepend our new
432 # jobs to existing ones (which may have been
433 # statically defined or defined by other templates).
434 for pipeline in layout.pipelines.values():
435 if pipeline.name in expanded:
436 config_project.update(
437 {pipeline.name: expanded[pipeline.name] +
438 config_project.get(pipeline.name, [])})
Antoine Musso80edd5a2013-02-13 15:37:53 +0100439
James E. Blaireff88162013-07-01 12:44:14 -0400440 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700441 mode = config_project.get('merge-mode', 'merge-resolve')
442 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400443 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700444 if pipeline.name in config_project:
445 job_tree = pipeline.addProject(project)
446 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700447 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700448
James E. Blairb0954652012-06-01 11:32:01 -0700449 # All jobs should be defined at this point, get rid of
450 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700451 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700452
James E. Blaireff88162013-07-01 12:44:14 -0400453 for pipeline in layout.pipelines.values():
454 pipeline.manager._postConfig(layout)
455
456 return layout
James E. Blairee743612012-05-29 14:49:32 -0700457
James E. Blairee743612012-05-29 14:49:32 -0700458 def setLauncher(self, launcher):
459 self.launcher = launcher
460
James E. Blair4076e2b2014-01-28 12:42:20 -0800461 def setMerger(self, merger):
462 self.merger = merger
463
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100464 def registerSource(self, source, name=None):
465 if name is None:
466 name = source.name
467 self.sources[name] = source
468
James E. Blair6c358e72013-07-29 17:06:47 -0700469 def registerTrigger(self, trigger, name=None):
470 if name is None:
471 name = trigger.name
472 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700473
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000474 def registerReporter(self, reporter, name=None):
475 if name is None:
476 name = reporter.name
477 self.reporters[name] = reporter
478
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000479 def getProject(self, name, create_foreign=False):
James E. Blaircdccd972013-07-01 12:10:22 -0700480 self.layout_lock.acquire()
481 p = None
482 try:
483 p = self.layout.projects.get(name)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000484 if p is None and create_foreign:
485 self.log.info("Registering foreign project: %s" % name)
486 p = Project(name, foreign=True)
487 self.layout.projects[name] = p
James E. Blaircdccd972013-07-01 12:10:22 -0700488 finally:
489 self.layout_lock.release()
490 return p
491
James E. Blairee743612012-05-29 14:49:32 -0700492 def addEvent(self, event):
493 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800494 try:
495 if statsd:
496 statsd.incr('gerrit.event.%s' % event.type)
497 except:
498 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700499 self.trigger_event_queue.put(event)
500 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800501 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700502
James E. Blair11700c32012-07-05 17:50:05 -0700503 def onBuildStarted(self, build):
504 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800505 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800506 event = BuildStartedEvent(build)
507 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700508 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800509 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700510
James E. Blairf0358662015-07-20 15:19:12 -0700511 def onBuildCompleted(self, build, result):
512 self.log.debug("Adding complete event for build: %s result: %s" % (
513 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800514 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700515 # Note, as soon as the result is set, other threads may act
516 # upon this, even though the event hasn't been fully
517 # processed. Ensure that any other data from the event (eg,
518 # timing) is recorded before setting the result.
519 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800520 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700521 if statsd and build.pipeline:
522 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500523 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
Bruno Tavaresf564b282015-10-15 15:20:51 -0300524 statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500525 for label in build.node_labels:
526 # Jenkins includes the node name in its list of labels, so
527 # we filter it out here, since that is not statistically
528 # interesting.
529 if label == build.node_name:
530 continue
531 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800532 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
533 build.pipeline.name, label)
Timothy Chavezb2332082015-08-07 20:08:04 -0500534 statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700535 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
536 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800537 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
538 dt = int((build.end_time - build.start_time) * 1000)
539 statsd.timing(key, dt)
540 statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800541
542 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
543 build.pipeline.name, jobname)
544 dt = int((build.start_time - build.launch_time) * 1000)
545 statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800546 except:
547 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800548 event = BuildCompletedEvent(build)
549 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700550 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800551 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700552
James E. Blair4076e2b2014-01-28 12:42:20 -0800553 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
554 self.log.debug("Adding merge complete event for build set: %s" %
555 build_set)
556 event = MergeCompletedEvent(build_set, zuul_url,
557 merged, updated, commit)
558 self.result_event_queue.put(event)
559 self.wake_event.set()
560
James E. Blaire9d45c32012-05-31 09:56:45 -0700561 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700562 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800563 event = ReconfigureEvent(config)
564 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700565 self.wake_event.set()
566 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800567 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700568 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400569 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700570
James E. Blair36658cf2013-12-06 17:53:48 -0800571 def promote(self, pipeline_name, change_ids):
572 event = PromoteEvent(pipeline_name, change_ids)
573 self.management_event_queue.put(event)
574 self.wake_event.set()
575 self.log.debug("Waiting for promotion")
576 event.wait()
577 self.log.debug("Promotion complete")
578
James E. Blaird27a96d2014-07-10 13:25:13 -0700579 def enqueue(self, trigger_event):
580 event = EnqueueEvent(trigger_event)
581 self.management_event_queue.put(event)
582 self.wake_event.set()
583 self.log.debug("Waiting for enqueue")
584 event.wait()
585 self.log.debug("Enqueue complete")
586
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700587 def exit(self):
588 self.log.debug("Prepare to exit")
589 self._pause = True
590 self._exit = True
591 self.wake_event.set()
592 self.log.debug("Waiting for exit")
593
594 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700595 if self.config.has_option('zuul', 'state_dir'):
596 state_dir = os.path.expanduser(self.config.get('zuul',
597 'state_dir'))
598 else:
599 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700600 return os.path.join(state_dir, 'queue.pickle')
601
602 def _save_queue(self):
603 pickle_file = self._get_queue_pickle_file()
604 events = []
605 while not self.trigger_event_queue.empty():
606 events.append(self.trigger_event_queue.get())
607 self.log.debug("Queue length is %s" % len(events))
608 if events:
609 self.log.debug("Saving queue")
610 pickle.dump(events, open(pickle_file, 'wb'))
611
612 def _load_queue(self):
613 pickle_file = self._get_queue_pickle_file()
614 if os.path.exists(pickle_file):
615 self.log.debug("Loading queue")
616 events = pickle.load(open(pickle_file, 'rb'))
617 self.log.debug("Queue length is %s" % len(events))
618 for event in events:
619 self.trigger_event_queue.put(event)
620 else:
621 self.log.debug("No queue file found")
622
623 def _delete_queue(self):
624 pickle_file = self._get_queue_pickle_file()
625 if os.path.exists(pickle_file):
626 self.log.debug("Deleting saved queue")
627 os.unlink(pickle_file)
628
629 def resume(self):
630 try:
631 self._load_queue()
632 except:
633 self.log.exception("Unable to load queue")
634 try:
635 self._delete_queue()
636 except:
637 self.log.exception("Unable to delete saved queue")
638 self.log.debug("Resuming queue processing")
639 self.wake_event.set()
640
641 def _doPauseEvent(self):
642 if self._exit:
643 self.log.debug("Exiting")
644 self._save_queue()
645 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700646
James E. Blair468c8512013-12-06 13:27:19 -0800647 def _doReconfigureEvent(self, event):
648 # This is called in the scheduler loop after another thread submits
649 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700650 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800651 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700652 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700653 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700654 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400655 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700656 for name, new_pipeline in layout.pipelines.items():
657 old_pipeline = self.layout.pipelines.get(name)
658 if not old_pipeline:
659 if self.layout.pipelines:
660 # Don't emit this warning on startup
661 self.log.warning("No old pipeline matching %s found "
662 "when reconfiguring" % name)
663 continue
James E. Blairdad52252014-02-07 16:59:17 -0800664 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700665 items_to_remove = []
James E. Blair400e8fd2015-07-30 17:44:45 -0700666 builds_to_cancel = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800667 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700668 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700669 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800670 if not item.item_ahead:
671 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700672 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700673 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700674 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800675 item.queue = None
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000676 project_name = item.change.project.name
677 item.change.project = layout.projects.get(project_name)
678 if not item.change.project:
679 self.log.debug("Project %s not defined, "
680 "re-instantiating as foreign" %
681 project_name)
682 project = Project(project_name, foreign=True)
683 layout.projects[project_name] = project
684 item.change.project = project
James E. Blairfe707d12015-08-05 15:18:15 -0700685 item_jobs = new_pipeline.getJobs(item)
James E. Blairdad52252014-02-07 16:59:17 -0800686 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800687 job = layout.jobs.get(build.job.name)
James E. Blairfe707d12015-08-05 15:18:15 -0700688 if job and job in item_jobs:
James E. Blair6b077942014-02-07 17:45:55 -0800689 build.job = job
690 else:
James E. Blair400e8fd2015-07-30 17:44:45 -0700691 item.removeBuild(build)
692 builds_to_cancel.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800693 if not new_pipeline.manager.reEnqueueItem(item,
694 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700695 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800696 for item in items_to_remove:
697 for build in item.current_build_set.getBuilds():
James E. Blair400e8fd2015-07-30 17:44:45 -0700698 builds_to_cancel.append(build)
699 for build in builds_to_cancel:
James E. Blair6b077942014-02-07 17:45:55 -0800700 self.log.warning(
701 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800702 try:
703 self.launcher.cancel(build)
704 except Exception:
705 self.log.exception(
706 "Exception while canceling build %s "
707 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700708 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700709 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700710 for trigger in self.triggers.values():
711 trigger.postConfig()
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100712 for source in self.sources.values():
713 source.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700714 if statsd:
715 try:
716 for pipeline in self.layout.pipelines.values():
717 items = len(pipeline.getAllItems())
718 # stats.gauges.zuul.pipeline.NAME.current_changes
719 key = 'zuul.pipeline.%s' % pipeline.name
720 statsd.gauge(key + '.current_changes', items)
721 except Exception:
722 self.log.exception("Exception reporting initial "
723 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700724 finally:
725 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700726
James E. Blair36658cf2013-12-06 17:53:48 -0800727 def _doPromoteEvent(self, event):
728 pipeline = self.layout.pipelines[event.pipeline_name]
729 change_ids = [c.split(',') for c in event.change_ids]
730 items_to_enqueue = []
731 change_queue = None
732 for shared_queue in pipeline.queues:
733 if change_queue:
734 break
735 for item in shared_queue.queue:
736 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000737 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800738 change_queue = shared_queue
739 break
740 if not change_queue:
741 raise Exception("Unable to find shared change queue for %s" %
742 event.change_ids[0])
743 for number, patchset in change_ids:
744 found = False
745 for item in change_queue.queue:
746 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000747 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800748 found = True
749 items_to_enqueue.append(item)
750 break
751 if not found:
752 raise Exception("Unable to find %s,%s in queue %s" %
753 (number, patchset, change_queue))
754 for item in change_queue.queue[:]:
755 if item not in items_to_enqueue:
756 items_to_enqueue.append(item)
757 pipeline.manager.cancelJobs(item)
758 pipeline.manager.dequeueItem(item)
759 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500760 pipeline.manager.addChange(
761 item.change,
762 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700763 quiet=True,
764 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800765
James E. Blaird27a96d2014-07-10 13:25:13 -0700766 def _doEnqueueEvent(self, event):
767 project = self.layout.projects.get(event.project_name)
768 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700769 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700770 self.log.debug("Event %s for change %s was directly assigned "
771 "to pipeline %s" % (event, change, self))
772 self.log.info("Adding %s, %s to %s" %
773 (project, change, pipeline))
774 pipeline.manager.addChange(change, ignore_requirements=True)
775
James E. Blaire9d45c32012-05-31 09:56:45 -0700776 def _areAllBuildsComplete(self):
777 self.log.debug("Checking if all builds are complete")
778 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800779 if self.merger.areMergesOutstanding():
780 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400781 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800782 for item in pipeline.getAllItems():
783 for build in item.current_build_set.getBuilds():
784 if build.result is None:
785 self.log.debug("%s waiting on %s" %
786 (pipeline.manager, build))
787 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700788 if not waiting:
789 self.log.debug("All builds are complete")
790 return True
791 self.log.debug("All builds are not complete")
792 return False
793
James E. Blairee743612012-05-29 14:49:32 -0700794 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800795 if statsd:
796 self.log.debug("Statsd enabled")
797 else:
798 self.log.debug("Statsd disabled because python statsd "
799 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700800 while True:
801 self.log.debug("Run handler sleeping")
802 self.wake_event.wait()
803 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700804 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800805 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700806 return
James E. Blairee743612012-05-29 14:49:32 -0700807 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800808 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700809 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800810 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800811 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700812
James E. Blair263fba92013-02-27 13:07:19 -0800813 # Give result events priority -- they let us stop builds,
814 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800815 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700816 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800817
818 if not self._pause:
819 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800820 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700821
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700822 if self._pause and self._areAllBuildsComplete():
823 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700824
James E. Blaira84f0e42014-02-06 07:09:22 -0800825 for pipeline in self.layout.pipelines.values():
826 while pipeline.manager.processQueue():
827 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700828
James E. Blaira84f0e42014-02-06 07:09:22 -0800829 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700830 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800831 # There may still be more events to process
832 self.wake_event.set()
833 finally:
834 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700835
James E. Blair0e933c52013-07-11 10:18:52 -0700836 def maintainTriggerCache(self):
837 relevant = set()
838 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700839 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700840 for item in pipeline.getAllItems():
841 relevant.add(item.change)
842 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700843 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700844 self.log.debug("Trigger cache size: %s" % len(relevant))
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100845 for source in self.sources.values():
846 source.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700847
James E. Blairee743612012-05-29 14:49:32 -0700848 def process_event_queue(self):
849 self.log.debug("Fetching trigger event")
850 event = self.trigger_event_queue.get()
851 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800852 try:
853 project = self.layout.projects.get(event.project_name)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +0000854 if not project or project.foreign:
Jukka Lehtniemibec3ed02014-10-31 13:45:17 +0100855 self.log.debug("Project %s not found" % event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800856 return
857
James E. Blaira84f0e42014-02-06 07:09:22 -0800858 for pipeline in self.layout.pipelines.values():
James E. Blairc0dedf82014-08-06 09:37:52 -0700859 change = pipeline.source.getChange(event, project)
James E. Blaira84f0e42014-02-06 07:09:22 -0800860 if event.type == 'patchset-created':
861 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100862 elif event.type == 'change-abandoned':
863 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800864 if pipeline.manager.eventMatches(event, change):
865 self.log.info("Adding %s, %s to %s" %
866 (project, change, pipeline))
867 pipeline.manager.addChange(change)
868 finally:
James E. Blairff791972013-01-09 11:45:43 -0800869 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700870
James E. Blair468c8512013-12-06 13:27:19 -0800871 def process_management_queue(self):
872 self.log.debug("Fetching management event")
873 event = self.management_event_queue.get()
874 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800875 try:
876 if isinstance(event, ReconfigureEvent):
877 self._doReconfigureEvent(event)
878 elif isinstance(event, PromoteEvent):
879 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700880 elif isinstance(event, EnqueueEvent):
881 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800882 else:
883 self.log.error("Unable to handle event %s" % event)
884 event.done()
885 except Exception as e:
886 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800887 self.management_event_queue.task_done()
888
James E. Blairee743612012-05-29 14:49:32 -0700889 def process_result_queue(self):
890 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800891 event = self.result_event_queue.get()
892 self.log.debug("Processing result event %s" % event)
893 try:
894 if isinstance(event, BuildStartedEvent):
895 self._doBuildStartedEvent(event)
896 elif isinstance(event, BuildCompletedEvent):
897 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800898 elif isinstance(event, MergeCompletedEvent):
899 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800900 else:
901 self.log.error("Unable to handle event %s" % event)
902 finally:
903 self.result_event_queue.task_done()
904
905 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800906 build = event.build
907 if build.build_set is not build.build_set.item.current_build_set:
908 self.log.warning("Build %s is not in the current build set" %
909 (build,))
910 return
911 pipeline = build.build_set.item.pipeline
912 if not pipeline:
913 self.log.warning("Build %s is not associated with a pipeline" %
914 (build,))
915 return
916 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800917
918 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800919 build = event.build
920 if build.build_set is not build.build_set.item.current_build_set:
921 self.log.warning("Build %s is not in the current build set" %
922 (build,))
923 return
924 pipeline = build.build_set.item.pipeline
925 if not pipeline:
926 self.log.warning("Build %s is not associated with a pipeline" %
927 (build,))
928 return
929 pipeline.manager.onBuildCompleted(event.build)
930
931 def _doMergeCompletedEvent(self, event):
932 build_set = event.build_set
933 if build_set is not build_set.item.current_build_set:
934 self.log.warning("Build set %s is not current" % (build_set,))
935 return
936 pipeline = build_set.item.pipeline
937 if not pipeline:
938 self.log.warning("Build set %s is not associated with a pipeline" %
939 (build_set,))
940 return
941 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700942
James E. Blair8dbd56a2012-12-22 10:55:10 -0800943 def formatStatusJSON(self):
944 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400945
946 data['zuul_version'] = self.zuul_version
947
James E. Blair8dbd56a2012-12-22 10:55:10 -0800948 if self._pause:
949 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800950 if self._exit:
951 ret += 'exit'
952 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
953 ret += '</p>'
954 data['message'] = ret
955
James E. Blairfb682cc2013-02-26 15:23:27 -0800956 data['trigger_event_queue'] = {}
957 data['trigger_event_queue']['length'] = \
958 self.trigger_event_queue.qsize()
959 data['result_event_queue'] = {}
960 data['result_event_queue']['length'] = \
961 self.result_event_queue.qsize()
962
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400963 if self.last_reconfigured:
964 data['last_reconfigured'] = self.last_reconfigured * 1000
965
James E. Blair8dbd56a2012-12-22 10:55:10 -0800966 pipelines = []
967 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -0700968 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -0800969 pipelines.append(pipeline.formatStatusJSON())
970 return json.dumps(data)
971
James E. Blair1e8dd892012-05-30 09:15:05 -0700972
James E. Blair4aea70c2012-07-26 14:23:24 -0700973class BasePipelineManager(object):
974 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700975
James E. Blair4aea70c2012-07-26 14:23:24 -0700976 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700977 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700978 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700979 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -0700980 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700981 if self.sched.config and self.sched.config.has_option(
982 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700983 self.report_times = self.sched.config.getboolean(
984 'zuul', 'report_times')
985 else:
986 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700987
988 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700989 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700990
James E. Blaireff88162013-07-01 12:44:14 -0400991 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700992 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -0700993 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -0700994 self.log.info(" Requirements:")
995 for f in self.changeish_filters:
996 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -0700997 self.log.info(" Events:")
998 for e in self.event_filters:
999 self.log.info(" %s" % e)
1000 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -07001001
James E. Blairee743612012-05-29 14:49:32 -07001002 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -07001003 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -07001004 if tree.job:
1005 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -07001006 for b in tree.job._branches:
1007 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -08001008 for f in tree.job._files:
1009 efilters += str(f)
Maru Newby3fe5f852015-01-13 04:22:14 +00001010 if tree.job.skip_if_matcher:
1011 efilters += str(tree.job.skip_if_matcher)
James E. Blairee743612012-05-29 14:49:32 -07001012 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -07001013 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -07001014 hold = ''
1015 if tree.job.hold_following_changes:
1016 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -07001017 voting = ''
1018 if not tree.job.voting:
1019 voting = ' [nonvoting]'
1020 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
1021 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -07001022 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -07001023 log_jobs(x, indent + 2)
1024
James E. Blaireff88162013-07-01 12:44:14 -04001025 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -07001026 tree = self.pipeline.getJobTree(p)
1027 if tree:
James E. Blairee743612012-05-29 14:49:32 -07001028 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -07001029 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001030 self.log.info(" On start:")
1031 self.log.info(" %s" % self.pipeline.start_actions)
1032 self.log.info(" On success:")
1033 self.log.info(" %s" % self.pipeline.success_actions)
1034 self.log.info(" On failure:")
1035 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001036 self.log.info(" On merge-failure:")
1037 self.log.info(" %s" % self.pipeline.merge_failure_actions)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001038 self.log.info(" When disabled:")
1039 self.log.info(" %s" % self.pipeline.disabled_actions)
James E. Blairee743612012-05-29 14:49:32 -07001040
James E. Blaire421a232012-07-25 16:59:21 -07001041 def getSubmitAllowNeeds(self):
1042 # Get a list of code review labels that are allowed to be
1043 # "needed" in the submit records for a change, with respect
1044 # to this queue. In other words, the list of review labels
1045 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001046 allow_needs = set()
1047 for action_reporter in self.pipeline.success_actions:
1048 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1049 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001050
James E. Blairc053d022014-01-22 14:57:33 -08001051 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001052 if event.forced_pipeline:
1053 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001054 self.log.debug("Event %s for change %s was directly assigned "
1055 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001056 return True
1057 else:
1058 return False
James E. Blairee743612012-05-29 14:49:32 -07001059 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001060 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001061 self.log.debug("Event %s for change %s matched %s "
1062 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001063 return True
1064 return False
1065
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001066 def isChangeAlreadyInPipeline(self, change):
1067 # Checks live items in the pipeline
1068 for item in self.pipeline.getAllItems():
1069 if item.live and change.equals(item.change):
1070 return True
1071 return False
1072
1073 def isChangeAlreadyInQueue(self, change, change_queue):
1074 # Checks any item in the specified change queue
1075 for item in change_queue.queue:
1076 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001077 return True
1078 return False
1079
James E. Blaire0487072012-08-29 17:38:31 -07001080 def reportStart(self, change):
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001081 if not self.pipeline._disabled:
1082 try:
1083 self.log.info("Reporting start, action %s change %s" %
1084 (self.pipeline.start_actions, change))
1085 msg = "Starting %s jobs." % self.pipeline.name
1086 if self.sched.config.has_option('zuul', 'status_url'):
1087 msg += "\n" + self.sched.config.get('zuul', 'status_url')
1088 ret = self.sendReport(self.pipeline.start_actions,
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001089 self.pipeline.source, change, msg)
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001090 if ret:
1091 self.log.error("Reporting change start %s received: %s" %
1092 (change, ret))
1093 except:
1094 self.log.exception("Exception while reporting start:")
James E. Blaire0487072012-08-29 17:38:31 -07001095
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001096 def sendReport(self, action_reporters, source, change, message):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001097 """Sends the built message off to configured reporters.
1098
1099 Takes the action_reporters, change, message and extra options and
1100 sends them to the pluggable reporters.
1101 """
1102 report_errors = []
1103 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001104 for action_reporter in action_reporters:
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001105 ret = action_reporter.report(source, change, message)
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001106 if ret:
1107 report_errors.append(ret)
1108 if len(report_errors) == 0:
1109 return
1110 return report_errors
1111
James E. Blaire0487072012-08-29 17:38:31 -07001112 def isChangeReadyToBeEnqueued(self, change):
1113 return True
1114
James E. Blair5ee24252014-12-30 10:12:29 -08001115 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1116 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001117 return True
1118
James E. Blair5ee24252014-12-30 10:12:29 -08001119 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1120 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001121 return True
1122
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001123 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001124 return True
1125
James E. Blair6965a4b2014-12-16 17:19:04 -08001126 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001127 return None
1128
James E. Blairfee8d652013-06-07 08:57:52 -07001129 def getDependentItems(self, item):
1130 orig_item = item
1131 items = []
1132 while item.item_ahead:
1133 items.append(item.item_ahead)
1134 item = item.item_ahead
1135 self.log.info("Change %s depends on changes %s" %
1136 (orig_item.change,
1137 [x.change for x in items]))
1138 return items
1139
James E. Blair972e3c72013-08-29 12:04:55 -07001140 def getItemForChange(self, change):
1141 for item in self.pipeline.getAllItems():
1142 if item.change.equals(change):
1143 return item
1144 return None
1145
James E. Blair2fa50962013-01-30 21:50:41 -08001146 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001147 for item in self.pipeline.getAllItems():
1148 if not item.live:
1149 continue
1150 if change.isUpdateOf(item.change):
1151 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001152 return None
1153
1154 def removeOldVersionsOfChange(self, change):
1155 if not self.pipeline.dequeue_on_new_patchset:
1156 return
James E. Blairba437362015-02-07 11:41:52 -08001157 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1158 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001159 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001160 (change, old_item.change, old_item))
1161 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001162
Antoine Mussobd86a312014-01-08 14:51:33 +01001163 def removeAbandonedChange(self, change):
1164 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001165 for item in self.pipeline.getAllItems():
1166 if not item.live:
1167 continue
1168 if item.change.equals(change):
1169 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001170
James E. Blairbfb8e042014-12-30 17:01:44 -08001171 def reEnqueueItem(self, item, last_head):
James E. Blair0577cd62015-02-07 11:42:12 -08001172 with self.getChangeQueue(item.change, last_head.queue) as change_queue:
1173 if change_queue:
1174 self.log.debug("Re-enqueing change %s in queue %s" %
1175 (item.change, change_queue))
1176 change_queue.enqueueItem(item)
James E. Blair6bc782d2015-07-17 16:20:21 -07001177
1178 # Re-set build results in case any new jobs have been
1179 # added to the tree.
1180 for build in item.current_build_set.getBuilds():
1181 if build.result:
1182 self.pipeline.setResult(item, build)
1183 # Similarly, reset the item state.
1184 if item.current_build_set.unable_to_merge:
1185 self.pipeline.setUnableToMerge(item)
1186 if item.dequeued_needing_change:
1187 self.pipeline.setDequeuedNeedingChange(item)
1188
James E. Blair0577cd62015-02-07 11:42:12 -08001189 self.reportStats(item)
1190 return True
1191 else:
1192 self.log.error("Unable to find change queue for project %s" %
1193 item.change.project)
1194 return False
James E. Blaircdccd972013-07-01 12:10:22 -07001195
James E. Blairf9ab8842014-07-10 13:12:07 -07001196 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001197 ignore_requirements=False, live=True,
1198 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001199 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001200
1201 # If we are adding a live change, check if it's a live item
1202 # anywhere in the pipeline. Otherwise, we will perform the
1203 # duplicate check below on the specific change_queue.
1204 if live and self.isChangeAlreadyInPipeline(change):
1205 self.log.debug("Change %s is already in pipeline, "
1206 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001207 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001208
James E. Blaire0487072012-08-29 17:38:31 -07001209 if not self.isChangeReadyToBeEnqueued(change):
1210 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1211 change)
1212 return False
1213
James E. Blairf9ab8842014-07-10 13:12:07 -07001214 if not ignore_requirements:
1215 for f in self.changeish_filters:
1216 if not f.matches(change):
1217 self.log.debug("Change %s does not match pipeline "
1218 "requirement %s" % (change, f))
1219 return False
James E. Blair11041d22014-05-02 14:49:53 -07001220
James E. Blair0577cd62015-02-07 11:42:12 -08001221 with self.getChangeQueue(change, change_queue) as change_queue:
James E. Blair5ee24252014-12-30 10:12:29 -08001222 if not change_queue:
1223 self.log.debug("Unable to find change queue for "
1224 "change %s in project %s" %
1225 (change, change.project))
1226 return False
1227
James E. Blair0577cd62015-02-07 11:42:12 -08001228 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1229 change_queue):
1230 self.log.debug("Failed to enqueue changes "
1231 "ahead of %s" % change)
1232 return False
James E. Blaire0487072012-08-29 17:38:31 -07001233
James E. Blair0577cd62015-02-07 11:42:12 -08001234 if self.isChangeAlreadyInQueue(change, change_queue):
1235 self.log.debug("Change %s is already in queue, "
1236 "ignoring" % change)
1237 return True
1238
1239 self.log.debug("Adding change %s to queue %s" %
1240 (change, change_queue))
1241 if not quiet:
1242 if len(self.pipeline.start_actions) > 0:
1243 self.reportStart(change)
1244 item = change_queue.enqueueChange(change)
1245 if enqueue_time:
1246 item.enqueue_time = enqueue_time
1247 item.live = live
1248 self.reportStats(item)
1249 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1250 change_queue)
1251 self.sched.triggers['zuul'].onChangeEnqueued(item.change,
1252 self.pipeline)
James E. Blaire0487072012-08-29 17:38:31 -07001253 return True
1254
James E. Blair972e3c72013-08-29 12:04:55 -07001255 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001256 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001257 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001258
James E. Blairba437362015-02-07 11:41:52 -08001259 def removeItem(self, item):
1260 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001261 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001262 self.log.debug("Canceling builds behind change: %s "
1263 "because it is being removed." % item.change)
1264 self.cancelJobs(item)
1265 self.dequeueItem(item)
1266 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001267
James E. Blairac2c3242014-01-24 13:38:51 -08001268 def _makeMergerItem(self, item):
1269 # Create a dictionary with all info about the item needed by
1270 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001271 number = None
1272 patchset = None
1273 oldrev = None
1274 newrev = None
1275 if hasattr(item.change, 'number'):
1276 number = item.change.number
1277 patchset = item.change.patchset
1278 elif hasattr(item.change, 'newrev'):
1279 oldrev = item.change.oldrev
1280 newrev = item.change.newrev
James E. Blairac2c3242014-01-24 13:38:51 -08001281 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001282 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001283 item.change.project),
1284 merge_mode=item.change.project.merge_mode,
1285 refspec=item.change.refspec,
1286 branch=item.change.branch,
1287 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001288 number=number,
1289 patchset=patchset,
1290 oldrev=oldrev,
1291 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001292 )
1293
James E. Blairfee8d652013-06-07 08:57:52 -07001294 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001295 # Returns True if the ref is ready, false otherwise
1296 build_set = item.current_build_set
1297 if build_set.merge_state == build_set.COMPLETE:
1298 return True
1299 if build_set.merge_state == build_set.PENDING:
1300 return False
1301 build_set.merge_state = build_set.PENDING
1302 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001303 if hasattr(item.change, 'refspec') and not ref:
1304 self.log.debug("Preparing ref for: %s" % item.change)
1305 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001306 dependent_items = self.getDependentItems(item)
1307 dependent_items.reverse()
1308 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001309 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001310 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001311 item.current_build_set,
1312 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001313 else:
1314 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001315 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001316 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001317 url, build_set,
1318 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001319 return False
1320
1321 def _launchJobs(self, item, jobs):
1322 self.log.debug("Launching jobs for change %s" % item.change)
1323 dependent_items = self.getDependentItems(item)
1324 for job in jobs:
1325 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001326 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001327 build = self.sched.launcher.launch(job, item,
1328 self.pipeline,
1329 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001330 self.log.debug("Adding build %s of job %s to item %s" %
1331 (build, job, item))
1332 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001333 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001334 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001335 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001336
James E. Blairfee8d652013-06-07 08:57:52 -07001337 def launchJobs(self, item):
1338 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001339 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001340 self._launchJobs(item, jobs)
1341
1342 def cancelJobs(self, item, prime=True):
1343 self.log.debug("Cancel jobs for change %s" % item.change)
1344 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001345 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001346 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001347 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001348 for build in old_build_set.getBuilds():
1349 try:
1350 self.sched.launcher.cancel(build)
1351 except:
1352 self.log.exception("Exception while canceling build %s "
1353 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001354 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001355 canceled = True
James E. Blair82a42d12015-07-20 13:50:09 -07001356 self.updateBuildDescriptions(old_build_set)
James E. Blair972e3c72013-08-29 12:04:55 -07001357 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001358 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001359 (item_behind.change, item.change))
1360 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001361 canceled = True
1362 return canceled
1363
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001364 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001365 changed = False
1366 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001367 if item_ahead and (not item_ahead.live):
1368 item_ahead = None
1369 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001370 failing_reasons = [] # Reasons this item is failing
1371
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001372 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001373 # It's not okay to enqueue this change, we should remove it.
1374 self.log.info("Dequeuing change %s because "
1375 "it can no longer merge" % item.change)
1376 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001377 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001378 self.pipeline.setDequeuedNeedingChange(item)
James E. Blairf8b42fb2015-02-18 09:23:36 -08001379 if item.live:
1380 try:
1381 self.reportItem(item)
1382 except MergeFailure:
1383 pass
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001384 return (True, nnfi)
James E. Blair6965a4b2014-12-16 17:19:04 -08001385 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001386 actionable = change_queue.isActionable(item)
1387 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001388 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001389 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001390 failing_reasons.append('a needed change is failing')
1391 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001392 else:
James E. Blairfef71632013-09-23 11:15:47 -07001393 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001394 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001395 item_ahead_merged = True
1396 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001397 # Our current base is different than what we expected,
1398 # and it's not because our current base merged. Something
1399 # ahead must have failed.
1400 self.log.info("Resetting builds for change %s because the "
1401 "item ahead, %s, is not the nearest non-failing "
1402 "item, %s" % (item.change, item_ahead, nnfi))
1403 change_queue.moveItem(item, nnfi)
1404 changed = True
1405 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001406 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001407 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001408 if item.current_build_set.unable_to_merge:
1409 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001410 ready = False
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001411 if actionable and ready and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001412 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001413 if self.pipeline.didAnyJobFail(item):
1414 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001415 if (not item.live) and (not item.items_behind):
1416 failing_reasons.append("is a non-live item with no items behind")
1417 self.dequeueItem(item)
1418 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001419 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1420 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001421 try:
1422 self.reportItem(item)
1423 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001424 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001425 for item_behind in item.items_behind:
1426 self.log.info("Resetting builds for change %s because the "
1427 "item ahead, %s, failed to merge" %
1428 (item_behind.change, item))
1429 self.cancelJobs(item_behind)
1430 self.dequeueItem(item)
1431 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001432 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001433 nnfi = item
1434 item.current_build_set.failing_reasons = failing_reasons
1435 if failing_reasons:
1436 self.log.debug("%s is a failing item because %s" %
1437 (item, failing_reasons))
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001438 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001439
1440 def processQueue(self):
1441 # Do whatever needs to be done for each change in the queue
1442 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1443 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001444 for queue in self.pipeline.queues:
1445 queue_changed = False
1446 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001447 for item in queue.queue[:]:
Dmitry Teselkin116fef32015-04-20 14:32:14 +03001448 item_changed, nnfi = self._processOneItem(
1449 item, nnfi)
James E. Blair972e3c72013-08-29 12:04:55 -07001450 if item_changed:
1451 queue_changed = True
1452 self.reportStats(item)
1453 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001454 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001455 status = ''
1456 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001457 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001458 if status:
1459 self.log.debug("Queue %s status is now:\n %s" %
1460 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001461 self.log.debug("Finished queue processor: %s (changed: %s)" %
1462 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001463 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001464
James E. Blair11700c32012-07-05 17:50:05 -07001465 def updateBuildDescriptions(self, build_set):
1466 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001467 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001468 self.sched.launcher.setBuildDescription(build, desc)
1469
1470 if build_set.previous_build_set:
1471 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001472 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001473 self.sched.launcher.setBuildDescription(build, desc)
1474
1475 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001476 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001477 return True
1478
James E. Blairee743612012-05-29 14:49:32 -07001479 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001480 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001481 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001482
James E. Blair6b077942014-02-07 17:45:55 -08001483 self.pipeline.setResult(item, build)
1484 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001485 (item, item.formatStatus()))
James E. Blairee743612012-05-29 14:49:32 -07001486 return True
1487
James E. Blair4076e2b2014-01-28 12:42:20 -08001488 def onMergeCompleted(self, event):
1489 build_set = event.build_set
1490 item = build_set.item
1491 build_set.merge_state = build_set.COMPLETE
1492 build_set.zuul_url = event.zuul_url
1493 if event.merged:
1494 build_set.commit = event.commit
1495 elif event.updated:
Yolanda Robla276996c2015-11-10 10:41:18 +01001496 if not isinstance(item, NullChange):
1497 build_set.commit = item.change.newrev
James E. Blair4076e2b2014-01-28 12:42:20 -08001498 if not build_set.commit:
1499 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001500 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001501
James E. Blairfee8d652013-06-07 08:57:52 -07001502 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001503 if not item.reported:
1504 # _reportItem() returns True if it failed to report.
1505 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001506 if self.changes_merge:
1507 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001508 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001509 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001510 merged = self.pipeline.source.isMerged(item.change,
1511 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001512 self.log.info("Reported change %s status: all-succeeded: %s, "
1513 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001514 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001515 if not (succeeded and merged):
1516 self.log.debug("Reported change %s failed tests or failed "
1517 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001518 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001519 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001520 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001521 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001522 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001523 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001524 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001525 (change_queue, change_queue.window))
James E. Blairc494d542014-08-06 09:23:52 -07001526 self.sched.triggers['zuul'].onChangeMerged(item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001527
James E. Blairfee8d652013-06-07 08:57:52 -07001528 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001529 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001530 ret = True # Means error as returned by trigger.report
Evgeny Antyshev88db9cb2015-06-04 12:51:40 +00001531 if not self.pipeline.getJobs(item):
1532 # We don't send empty reports with +1,
1533 # and the same for -1's (merge failures or transient errors)
1534 # as they cannot be followed by +1's
1535 self.log.debug("No jobs for change %s" % item.change)
1536 actions = []
1537 elif self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001538 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001539 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001540 item.setReportedResult('SUCCESS')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001541 self.pipeline._consecutive_failures = 0
Joshua Heskethb7179772014-01-30 23:30:46 +11001542 elif not self.pipeline.didMergerSucceed(item):
1543 actions = self.pipeline.merge_failure_actions
1544 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001545 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001546 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001547 item.setReportedResult('FAILURE')
Joshua Hesketh89e829d2015-02-10 16:29:45 +11001548 self.pipeline._consecutive_failures += 1
1549 if self.pipeline._disabled:
1550 actions = self.pipeline.disabled_actions
1551 # Check here if we should disable so that we only use the disabled
1552 # reporters /after/ the last disable_at failure is still reported as
1553 # normal.
1554 if (self.pipeline.disable_at and not self.pipeline._disabled and
1555 self.pipeline._consecutive_failures >= self.pipeline.disable_at):
1556 self.pipeline._disabled = True
James E. Blaire5910202013-12-27 09:50:31 -08001557 if actions:
1558 report = self.formatReport(item)
1559 try:
1560 self.log.info("Reporting change %s, actions: %s" %
1561 (item.change, actions))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001562 ret = self.sendReport(actions, self.pipeline.source,
1563 item.change, report)
James E. Blaire5910202013-12-27 09:50:31 -08001564 if ret:
1565 self.log.error("Reporting change %s received: %s" %
1566 (item.change, ret))
1567 except:
1568 self.log.exception("Exception while reporting:")
1569 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001570 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001571 return ret
1572
James E. Blairfee8d652013-06-07 08:57:52 -07001573 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001574 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001575
Jeremy Stanley10837132014-08-02 16:10:56 +00001576 if item.dequeued_needing_change:
1577 ret += 'This change depends on a change that failed to merge.\n'
1578 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001579 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001580 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001581 if self.pipeline.didAllJobsSucceed(item):
1582 ret += self.pipeline.success_message + '\n\n'
1583 else:
1584 ret += self.pipeline.failure_message + '\n\n'
1585 ret += self._formatReportJobs(item)
1586
1587 if self.pipeline.footer_message:
1588 ret += '\n' + self.pipeline.footer_message
1589
1590 return ret
1591
1592 def _formatReportJobs(self, item):
1593 # Return the list of jobs portion of the report
1594 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001595
Joshua Heskethb7179772014-01-30 23:30:46 +11001596 if self.sched.config.has_option('zuul', 'url_pattern'):
1597 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001598 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001599 url_pattern = None
1600
James E. Blair107c3852015-02-07 08:23:10 -08001601 for job in self.pipeline.getJobs(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001602 build = item.current_build_set.getBuild(job.name)
1603 result = build.result
1604 pattern = url_pattern
1605 if result == 'SUCCESS':
1606 if job.success_message:
1607 result = job.success_message
1608 if job.success_pattern:
1609 pattern = job.success_pattern
1610 elif result == 'FAILURE':
1611 if job.failure_message:
1612 result = job.failure_message
1613 if job.failure_pattern:
1614 pattern = job.failure_pattern
1615 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001616 url = pattern.format(change=item.change,
1617 pipeline=self.pipeline,
1618 job=job,
1619 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001620 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001621 url = build.url or job.name
1622 if not job.voting:
1623 voting = ' (non-voting)'
1624 else:
1625 voting = ''
1626 if self.report_times and build.end_time and build.start_time:
1627 dt = int(build.end_time - build.start_time)
1628 m, s = divmod(dt, 60)
1629 h, m = divmod(m, 60)
1630 if h:
1631 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1632 elif m:
1633 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001634 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001635 elapsed = ' in %ds' % (s)
1636 else:
1637 elapsed = ''
1638 name = ''
1639 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1640 if self.sched.config.getboolean('zuul',
1641 'job_name_in_report'):
1642 name = job.name + ' '
1643 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1644 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001645 return ret
1646
1647 def formatDescription(self, build):
1648 concurrent_changes = ''
1649 concurrent_builds = ''
1650 other_builds = ''
1651
1652 for change in build.build_set.other_changes:
1653 concurrent_changes += '<li><a href="{change.url}">\
1654 {change.number},{change.patchset}</a></li>'.format(
1655 change=change)
1656
James E. Blairfee8d652013-06-07 08:57:52 -07001657 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001658
1659 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001660 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001661 concurrent_builds += """\
1662<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001663 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001664 {build.job.name} #{build.number}</a>: {build.result}
1665</li>
1666""".format(build=build)
1667 else:
1668 concurrent_builds += """\
1669<li>
1670 {build.job.name}: {build.result}
1671</li>""".format(build=build)
1672
1673 if build.build_set.previous_build_set:
1674 other_build = build.build_set.previous_build_set.getBuild(
1675 build.job.name)
1676 if other_build:
1677 other_builds += """\
1678<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001679 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001680 {build.job.name} #{build.number}</a>
1681</li>
1682""".format(build=other_build)
1683
1684 if build.build_set.next_build_set:
1685 other_build = build.build_set.next_build_set.getBuild(
1686 build.job.name)
1687 if other_build:
1688 other_builds += """\
1689<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001690 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001691 {build.job.name} #{build.number}</a>
1692</li>
1693""".format(build=other_build)
1694
1695 result = build.build_set.result
1696
1697 if hasattr(change, 'number'):
1698 ret = """\
1699<p>
1700 Triggered by change:
1701 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1702 Branch: <b>{change.branch}</b><br/>
1703 Pipeline: <b>{self.pipeline.name}</b>
1704</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001705 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001706 ret = """\
1707<p>
1708 Triggered by reference:
1709 {change.ref}</a><br/>
1710 Old revision: <b>{change.oldrev}</b><br/>
1711 New revision: <b>{change.newrev}</b><br/>
1712 Pipeline: <b>{self.pipeline.name}</b>
1713</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001714 else:
1715 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001716
1717 if concurrent_changes:
1718 ret += """\
1719<p>
1720 Other changes tested concurrently with this change:
1721 <ul>{concurrent_changes}</ul>
1722</p>
1723"""
1724 if concurrent_builds:
1725 ret += """\
1726<p>
1727 All builds for this change set:
1728 <ul>{concurrent_builds}</ul>
1729</p>
1730"""
1731
1732 if other_builds:
1733 ret += """\
1734<p>
1735 Other build sets for this change:
1736 <ul>{other_builds}</ul>
1737</p>
1738"""
1739 if result:
1740 ret += """\
1741<p>
1742 Reported result: <b>{result}</b>
1743</p>
1744"""
1745
1746 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001747 return ret
1748
James E. Blairfee8d652013-06-07 08:57:52 -07001749 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001750 if not statsd:
1751 return
1752 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001753 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001754 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001755 if item.dequeue_time:
1756 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001757 else:
1758 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001759 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001760
1761 # stats.timers.zuul.pipeline.NAME.resident_time
1762 # stats_counts.zuul.pipeline.NAME.total_changes
1763 # stats.gauges.zuul.pipeline.NAME.current_changes
1764 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001765 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001766 if dt:
1767 statsd.timing(key + '.resident_time', dt)
1768 statsd.incr(key + '.total_changes')
1769
1770 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1771 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001772 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001773 key += '.%s' % project_name
1774 if dt:
1775 statsd.timing(key + '.resident_time', dt)
1776 statsd.incr(key + '.total_changes')
1777 except:
1778 self.log.exception("Exception reporting pipeline stats")
1779
James E. Blair1e8dd892012-05-30 09:15:05 -07001780
James E. Blair0577cd62015-02-07 11:42:12 -08001781class DynamicChangeQueueContextManager(object):
1782 def __init__(self, change_queue):
1783 self.change_queue = change_queue
1784
1785 def __enter__(self):
1786 return self.change_queue
1787
1788 def __exit__(self, etype, value, tb):
1789 if self.change_queue and not self.change_queue.queue:
1790 self.change_queue.pipeline.removeQueue(self.change_queue.queue)
1791
1792
James E. Blair4aea70c2012-07-26 14:23:24 -07001793class IndependentPipelineManager(BasePipelineManager):
1794 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001795 changes_merge = False
1796
James E. Blaireff88162013-07-01 12:44:14 -04001797 def _postConfig(self, layout):
1798 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001799
James E. Blair0577cd62015-02-07 11:42:12 -08001800 def getChangeQueue(self, change, existing=None):
James E. Blairbfb8e042014-12-30 17:01:44 -08001801 # creates a new change queue for every change
James E. Blair0577cd62015-02-07 11:42:12 -08001802 if existing:
1803 return DynamicChangeQueueContextManager(existing)
James E. Blairbfb8e042014-12-30 17:01:44 -08001804 if change.project not in self.pipeline.getProjects():
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001805 self.pipeline.addProject(change.project)
James E. Blairbfb8e042014-12-30 17:01:44 -08001806 change_queue = ChangeQueue(self.pipeline)
1807 change_queue.addProject(change.project)
1808 self.pipeline.addQueue(change_queue)
Evgeny Antyshev0deaaad2015-08-03 20:22:56 +00001809 self.log.debug("Dynamically created queue %s", change_queue)
James E. Blair0577cd62015-02-07 11:42:12 -08001810 return DynamicChangeQueueContextManager(change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001811
1812 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1813 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001814 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001815 if ret in [True, False]:
1816 return ret
1817 self.log.debug(" Changes %s must be merged ahead of %s" %
1818 (ret, change))
1819 for needed_change in ret:
1820 # This differs from the dependent pipeline by enqueuing
1821 # changes ahead as "not live", that is, not intended to
1822 # have jobs run. Also, pipeline requirements are always
1823 # ignored (which is safe because the changes are not
1824 # live).
1825 r = self.addChange(needed_change, quiet=True,
1826 ignore_requirements=True,
1827 live=False, change_queue=change_queue)
1828 if not r:
1829 return False
1830 return True
1831
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001832 def checkForChangesNeededBy(self, change, change_queue):
James E. Blair17dd6772015-02-09 14:45:18 -08001833 if self.pipeline.ignore_dependencies:
1834 return True
James E. Blairbfb8e042014-12-30 17:01:44 -08001835 self.log.debug("Checking for changes needed by %s:" % change)
1836 # Return true if okay to proceed enqueing this change,
1837 # false if the change should not be enqueued.
1838 if not hasattr(change, 'needs_changes'):
1839 self.log.debug(" Changeish does not support dependencies")
1840 return True
1841 if not change.needs_changes:
1842 self.log.debug(" No changes needed")
1843 return True
1844 changes_needed = []
1845 for needed_change in change.needs_changes:
1846 self.log.debug(" Change %s needs change %s:" % (
1847 change, needed_change))
1848 if needed_change.is_merged:
1849 self.log.debug(" Needed change is merged")
1850 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001851 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001852 self.log.debug(" Needed change is already ahead in the queue")
1853 continue
1854 self.log.debug(" Change %s is needed" % needed_change)
1855 if needed_change not in changes_needed:
1856 changes_needed.append(needed_change)
1857 continue
1858 # This differs from the dependent pipeline check in not
1859 # verifying that the dependent change is mergable.
1860 if changes_needed:
1861 return changes_needed
1862 return True
1863
1864 def dequeueItem(self, item):
1865 super(IndependentPipelineManager, self).dequeueItem(item)
1866 # An independent pipeline manager dynamically removes empty
1867 # queues
1868 if not item.queue.queue:
1869 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001870
James E. Blair1e8dd892012-05-30 09:15:05 -07001871
James E. Blair0577cd62015-02-07 11:42:12 -08001872class StaticChangeQueueContextManager(object):
1873 def __init__(self, change_queue):
1874 self.change_queue = change_queue
1875
1876 def __enter__(self):
1877 return self.change_queue
1878
1879 def __exit__(self, etype, value, tb):
1880 pass
1881
1882
James E. Blair4aea70c2012-07-26 14:23:24 -07001883class DependentPipelineManager(BasePipelineManager):
1884 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001885 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001886
1887 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001888 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001889
James E. Blaireff88162013-07-01 12:44:14 -04001890 def _postConfig(self, layout):
1891 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001892 self.buildChangeQueues()
1893
1894 def buildChangeQueues(self):
1895 self.log.debug("Building shared change queues")
1896 change_queues = []
1897
James E. Blair4aea70c2012-07-26 14:23:24 -07001898 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001899 change_queue = ChangeQueue(
1900 self.pipeline,
1901 window=self.pipeline.window,
1902 window_floor=self.pipeline.window_floor,
1903 window_increase_type=self.pipeline.window_increase_type,
1904 window_increase_factor=self.pipeline.window_increase_factor,
1905 window_decrease_type=self.pipeline.window_decrease_type,
1906 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001907 change_queue.addProject(project)
1908 change_queues.append(change_queue)
1909 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001910
James E. Blairc3d428e2013-12-03 15:06:48 -08001911 # Iterate over all queues trying to combine them, and keep doing
1912 # so until they can not be combined further.
1913 last_change_queues = change_queues
1914 while True:
1915 new_change_queues = self.combineChangeQueues(last_change_queues)
1916 if len(last_change_queues) == len(new_change_queues):
1917 break
1918 last_change_queues = new_change_queues
1919
1920 self.log.info(" Shared change queues:")
1921 for queue in new_change_queues:
1922 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08001923 self.log.info(" %s containing %s" % (
1924 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08001925
1926 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001927 self.log.debug("Combining shared queues")
1928 new_change_queues = []
1929 for a in change_queues:
1930 merged_a = False
1931 for b in new_change_queues:
1932 if not a.getJobs().isdisjoint(b.getJobs()):
1933 self.log.debug("Merging queue %s into %s" % (a, b))
1934 b.mergeChangeQueue(a)
1935 merged_a = True
1936 break # this breaks out of 'for b' and continues 'for a'
1937 if not merged_a:
1938 self.log.debug("Keeping queue %s" % (a))
1939 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001940 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001941
James E. Blair0577cd62015-02-07 11:42:12 -08001942 def getChangeQueue(self, change, existing=None):
1943 if existing:
1944 return StaticChangeQueueContextManager(existing)
1945 return StaticChangeQueueContextManager(
1946 self.pipeline.getQueue(change.project))
James E. Blair5ee24252014-12-30 10:12:29 -08001947
James E. Blaire0487072012-08-29 17:38:31 -07001948 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07001949 if not self.pipeline.source.canMerge(change,
1950 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001951 self.log.debug("Change %s can not merge, ignoring" % change)
1952 return False
1953 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001954
James E. Blair5ee24252014-12-30 10:12:29 -08001955 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1956 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001957 to_enqueue = []
1958 self.log.debug("Checking for changes needing %s:" % change)
1959 if not hasattr(change, 'needed_by_changes'):
1960 self.log.debug(" Changeish does not support dependencies")
1961 return
James E. Blair5ee24252014-12-30 10:12:29 -08001962 for other_change in change.needed_by_changes:
James E. Blair0577cd62015-02-07 11:42:12 -08001963 with self.getChangeQueue(other_change) as other_change_queue:
1964 if other_change_queue != change_queue:
1965 self.log.debug(" Change %s in project %s can not be "
1966 "enqueued in the target queue %s" %
1967 (other_change, other_change.project,
1968 change_queue))
1969 continue
James E. Blair5ee24252014-12-30 10:12:29 -08001970 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07001971 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001972 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08001973 (other_change, change))
1974 to_enqueue.append(other_change)
1975
James E. Blaire0487072012-08-29 17:38:31 -07001976 if not to_enqueue:
1977 self.log.debug(" No changes need %s" % change)
1978
1979 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07001980 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001981 ignore_requirements=ignore_requirements,
1982 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001983
James E. Blair5ee24252014-12-30 10:12:29 -08001984 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1985 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001986 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001987 if ret in [True, False]:
1988 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08001989 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07001990 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08001991 for needed_change in ret:
1992 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001993 ignore_requirements=ignore_requirements,
1994 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08001995 if not r:
1996 return False
1997 return True
James E. Blaire0487072012-08-29 17:38:31 -07001998
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001999 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07002000 self.log.debug("Checking for changes needed by %s:" % change)
2001 # Return true if okay to proceed enqueing this change,
2002 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08002003 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07002004 self.log.debug(" Changeish does not support dependencies")
2005 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002006 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07002007 self.log.debug(" No changes needed")
2008 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08002009 changes_needed = []
James E. Blair0577cd62015-02-07 11:42:12 -08002010 # Ignore supplied change_queue
2011 with self.getChangeQueue(change) as change_queue:
2012 for needed_change in change.needs_changes:
2013 self.log.debug(" Change %s needs change %s:" % (
2014 change, needed_change))
2015 if needed_change.is_merged:
2016 self.log.debug(" Needed change is merged")
James E. Blair6965a4b2014-12-16 17:19:04 -08002017 continue
James E. Blair0577cd62015-02-07 11:42:12 -08002018 with self.getChangeQueue(needed_change) as needed_change_queue:
2019 if needed_change_queue != change_queue:
2020 self.log.debug(" Change %s in project %s does not "
2021 "share a change queue with %s "
2022 "in project %s" %
2023 (needed_change, needed_change.project,
2024 change, change.project))
2025 return False
2026 if not needed_change.is_current_patchset:
2027 self.log.debug(" Needed change is not the "
2028 "current patchset")
2029 return False
2030 if self.isChangeAlreadyInQueue(needed_change, change_queue):
2031 self.log.debug(" Needed change is already ahead "
2032 "in the queue")
2033 continue
2034 if self.pipeline.source.canMerge(needed_change,
2035 self.getSubmitAllowNeeds()):
2036 self.log.debug(" Change %s is needed" % needed_change)
2037 if needed_change not in changes_needed:
2038 changes_needed.append(needed_change)
2039 continue
2040 # The needed change can't be merged.
2041 self.log.debug(" Change %s is needed but can not be merged" %
2042 needed_change)
2043 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08002044 if changes_needed:
2045 return changes_needed
2046 return True
James E. Blair972e3c72013-08-29 12:04:55 -07002047
James E. Blair6965a4b2014-12-16 17:19:04 -08002048 def getFailingDependentItems(self, item):
2049 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07002050 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002051 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07002052 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08002053 failing_items = set()
2054 for needed_change in item.change.needs_changes:
2055 needed_item = self.getItemForChange(needed_change)
2056 if not needed_item:
2057 continue
2058 if needed_item.current_build_set.failing_reasons:
2059 failing_items.add(needed_item)
2060 if failing_items:
2061 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07002062 return None