blob: 4f3efdee3b35ca6275bef3dd4d0fadd1c82a6661 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Christian Berendt12d4d722014-06-07 21:03:45 +020023from six.moves import queue as Queue
Zhongyue Luo1c860d72012-07-19 11:03:56 +080024import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
James E. Blair11041d22014-05-02 14:49:53 -070032from model import ActionReporter, Pipeline, Project, ChangeQueue
33from model import EventFilter, ChangeishFilter
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040034from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070035
James E. Blair71e94122012-12-24 17:53:08 -080036statsd = extras.try_import('statsd.statsd')
37
James E. Blair1e8dd892012-05-30 09:15:05 -070038
Antoine Musso80edd5a2013-02-13 15:37:53 +010039def deep_format(obj, paramdict):
40 """Apply the paramdict via str.format() to all string objects found within
41 the supplied obj. Lists and dicts are traversed recursively.
42
43 Borrowed from Jenkins Job Builder project"""
44 if isinstance(obj, str):
45 ret = obj.format(**paramdict)
46 elif isinstance(obj, list):
47 ret = []
48 for item in obj:
49 ret.append(deep_format(item, paramdict))
50 elif isinstance(obj, dict):
51 ret = {}
52 for item in obj:
53 exp_item = item.format(**paramdict)
54
55 ret[exp_item] = deep_format(obj[item], paramdict)
56 else:
57 ret = obj
58 return ret
59
60
James E. Blairfee8d652013-06-07 08:57:52 -070061class MergeFailure(Exception):
62 pass
63
64
James E. Blair468c8512013-12-06 13:27:19 -080065class ManagementEvent(object):
66 """An event that should be processed within the main queue run loop"""
67 def __init__(self):
68 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080069 self._exception = None
70 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080071
James E. Blair36658cf2013-12-06 17:53:48 -080072 def exception(self, e, tb):
73 self._exception = e
74 self._traceback = tb
75 self._wait_event.set()
76
77 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080078 self._wait_event.set()
79
80 def wait(self, timeout=None):
81 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080082 if self._exception:
83 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080084 return self._wait_event.is_set()
85
86
87class ReconfigureEvent(ManagementEvent):
88 """Reconfigure the scheduler. The layout will be (re-)loaded from
89 the path specified in the configuration.
90
91 :arg ConfigParser config: the new configuration
92 """
93 def __init__(self, config):
94 super(ReconfigureEvent, self).__init__()
95 self.config = config
96
97
James E. Blair36658cf2013-12-06 17:53:48 -080098class PromoteEvent(ManagementEvent):
99 """Promote one or more changes to the head of the queue.
100
101 :arg str pipeline_name: the name of the pipeline
102 :arg list change_ids: a list of strings of change ids in the form
103 1234,1
104 """
105
106 def __init__(self, pipeline_name, change_ids):
107 super(PromoteEvent, self).__init__()
108 self.pipeline_name = pipeline_name
109 self.change_ids = change_ids
110
111
James E. Blaird27a96d2014-07-10 13:25:13 -0700112class EnqueueEvent(ManagementEvent):
113 """Enqueue a change into a pipeline
114
115 :arg TriggerEvent trigger_event: a TriggerEvent describing the
116 trigger, pipeline, and change to enqueue
117 """
118
119 def __init__(self, trigger_event):
120 super(EnqueueEvent, self).__init__()
121 self.trigger_event = trigger_event
122
123
James E. Blaira84f0e42014-02-06 07:09:22 -0800124class ResultEvent(object):
125 """An event that needs to modify the pipeline state due to a
126 result from an external system."""
127
128 pass
129
130
131class BuildStartedEvent(ResultEvent):
132 """A build has started.
133
134 :arg Build build: The build which has started.
135 """
136
137 def __init__(self, build):
138 self.build = build
139
140
141class BuildCompletedEvent(ResultEvent):
142 """A build has completed
143
144 :arg Build build: The build which has completed.
145 """
146
147 def __init__(self, build):
148 self.build = build
149
150
James E. Blair4076e2b2014-01-28 12:42:20 -0800151class MergeCompletedEvent(ResultEvent):
152 """A remote merge operation has completed
153
154 :arg BuildSet build_set: The build_set which is ready.
155 :arg str zuul_url: The URL of the Zuul Merger.
156 :arg bool merged: Whether the merge succeeded (changes with refs).
157 :arg bool updated: Whether the repo was updated (changes without refs).
158 :arg str commit: The SHA of the merged commit (changes with refs).
159 """
160
161 def __init__(self, build_set, zuul_url, merged, updated, commit):
162 self.build_set = build_set
163 self.zuul_url = zuul_url
164 self.merged = merged
165 self.updated = updated
166 self.commit = commit
167
168
James E. Blaire9d45c32012-05-31 09:56:45 -0700169class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700170 log = logging.getLogger("zuul.Scheduler")
171
James E. Blaire9d45c32012-05-31 09:56:45 -0700172 def __init__(self):
173 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400174 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700175 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700176 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800177 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700178 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700179 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700180 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700181 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800182 self.merger = None
James E. Blair6c358e72013-07-29 17:06:47 -0700183 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000184 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700185 self.config = None
James E. Blairee743612012-05-29 14:49:32 -0700186
187 self.trigger_event_queue = Queue.Queue()
188 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800189 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400190 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700191
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400192 self.zuul_version = zuul_version.version_info.version_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400193 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400194
James E. Blairb0fcae42012-07-17 11:12:10 -0700195 def stop(self):
196 self._stopped = True
197 self.wake_event.set()
198
James E. Blair47958382013-01-10 17:26:02 -0800199 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700200 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800201
James E. Blaire5a847f2012-07-10 15:29:14 -0700202 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400203 layout = model.Layout()
204 project_templates = {}
205
James E. Blairee743612012-05-29 14:49:32 -0700206 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -0700207 if not item:
208 return []
James E. Blair32663402012-06-01 10:04:18 -0700209 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700210 return item
211 return [item]
212
James E. Blaire5a847f2012-07-10 15:29:14 -0700213 if config_path:
214 config_path = os.path.expanduser(config_path)
215 if not os.path.exists(config_path):
216 raise Exception("Unable to read layout config file at %s" %
217 config_path)
218 config_file = open(config_path)
219 data = yaml.load(config_file)
220
James E. Blair47958382013-01-10 17:26:02 -0800221 validator = layoutvalidator.LayoutValidator()
222 validator.validate(data)
223
James E. Blaireff88162013-07-01 12:44:14 -0400224 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700225 for include in data.get('includes', []):
226 if 'python-file' in include:
227 fn = include['python-file']
228 if not os.path.isabs(fn):
Antoine Musso9adc6d42014-11-14 15:37:48 +0100229 base = os.path.dirname(os.path.realpath(config_path))
James E. Blaire5a847f2012-07-10 15:29:14 -0700230 fn = os.path.join(base, fn)
231 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400232 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700233
James E. Blair4aea70c2012-07-26 14:23:24 -0700234 for conf_pipeline in data.get('pipelines', []):
235 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800236 pipeline.description = conf_pipeline.get('description')
James E. Blairc0dedf82014-08-06 09:37:52 -0700237 # TODO(jeblair): remove backwards compatibility:
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000238 pipeline.source = self.triggers[conf_pipeline.get('source',
239 'gerrit')]
James E. Blair64ed6f22013-07-10 14:07:23 -0700240 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
241 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800242 pipeline.failure_message = conf_pipeline.get('failure-message',
243 "Build failed.")
Joshua Heskethb7179772014-01-30 23:30:46 +1100244 pipeline.merge_failure_message = conf_pipeline.get(
245 'merge-failure-message', "Merge Failed.\n\nThis change was "
246 "unable to be automatically merged with the current state of "
247 "the repository. Please rebase your change and upload a new "
248 "patchset.")
James E. Blair56370192013-01-14 15:47:28 -0800249 pipeline.success_message = conf_pipeline.get('success-message',
250 "Build succeeded.")
Joshua Hesketh3979e3e2014-03-04 11:21:10 +1100251 pipeline.footer_message = conf_pipeline.get('footer-message', "")
James E. Blair2fa50962013-01-30 21:50:41 -0800252 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700253 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000254
255 action_reporters = {}
Joshua Heskethb7179772014-01-30 23:30:46 +1100256 for action in ['start', 'success', 'failure', 'merge-failure']:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000257 action_reporters[action] = []
258 if conf_pipeline.get(action):
259 for reporter_name, params \
260 in conf_pipeline.get(action).items():
261 if reporter_name in self.reporters.keys():
262 action_reporters[action].append(ActionReporter(
263 self.reporters[reporter_name], params))
264 else:
265 self.log.error('Invalid reporter name %s' %
266 reporter_name)
267 pipeline.start_actions = action_reporters['start']
268 pipeline.success_actions = action_reporters['success']
269 pipeline.failure_actions = action_reporters['failure']
Joshua Heskethb7179772014-01-30 23:30:46 +1100270 if len(action_reporters['merge-failure']) > 0:
271 pipeline.merge_failure_actions = \
272 action_reporters['merge-failure']
273 else:
274 pipeline.merge_failure_actions = action_reporters['failure']
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000275
Clark Boylan7603a372014-01-21 11:43:20 -0800276 pipeline.window = conf_pipeline.get('window', 20)
277 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
278 pipeline.window_increase_type = conf_pipeline.get(
279 'window-increase-type', 'linear')
280 pipeline.window_increase_factor = conf_pipeline.get(
281 'window-increase-factor', 1)
282 pipeline.window_decrease_type = conf_pipeline.get(
283 'window-decrease-type', 'exponential')
284 pipeline.window_decrease_factor = conf_pipeline.get(
285 'window-decrease-factor', 2)
286
James E. Blair4aea70c2012-07-26 14:23:24 -0700287 manager = globals()[conf_pipeline['manager']](self, pipeline)
288 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400289 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000290
James E. Blair11041d22014-05-02 14:49:53 -0700291 if 'require' in conf_pipeline:
292 require = conf_pipeline['require']
Clark Boylana9702ad2014-05-08 17:17:24 -0700293 f = ChangeishFilter(
294 open=require.get('open'),
295 current_patchset=require.get('current-patchset'),
296 statuses=toList(require.get('status')),
James E. Blair9c17dbf2014-06-23 14:21:58 -0700297 required_approvals=toList(require.get('approval')))
James E. Blair11041d22014-05-02 14:49:53 -0700298 manager.changeish_filters.append(f)
299
James E. Blair6c358e72013-07-29 17:06:47 -0700300 # TODO: move this into triggers (may require pluggable
301 # configuration)
302 if 'gerrit' in conf_pipeline['trigger']:
James E. Blair6c358e72013-07-29 17:06:47 -0700303 for trigger in toList(conf_pipeline['trigger']['gerrit']):
304 approvals = {}
305 for approval_dict in toList(trigger.get('approval')):
306 for k, v in approval_dict.items():
307 approvals[k] = v
James E. Blair1fbfceb2014-06-23 14:42:53 -0700308 # Backwards compat for *_filter versions of these args
309 comments = toList(trigger.get('comment'))
310 if not comments:
311 comments = toList(trigger.get('comment_filter'))
312 emails = toList(trigger.get('email'))
313 if not emails:
314 emails = toList(trigger.get('email_filter'))
315 usernames = toList(trigger.get('username'))
316 if not usernames:
317 usernames = toList(trigger.get('username_filter'))
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000318 f = EventFilter(
319 trigger=self.triggers['gerrit'],
320 types=toList(trigger['event']),
321 branches=toList(trigger.get('branch')),
322 refs=toList(trigger.get('ref')),
323 event_approvals=approvals,
324 comments=comments,
325 emails=emails,
326 usernames=usernames,
327 required_approvals=toList(
328 trigger.get('require-approval')
329 )
330 )
James E. Blair6c358e72013-07-29 17:06:47 -0700331 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700332 if 'timer' in conf_pipeline['trigger']:
James E. Blair63bb0ef2013-07-29 17:14:51 -0700333 for trigger in toList(conf_pipeline['trigger']['timer']):
James E. Blairc0dedf82014-08-06 09:37:52 -0700334 f = EventFilter(trigger=self.triggers['timer'],
335 types=['timer'],
James E. Blair63bb0ef2013-07-29 17:14:51 -0700336 timespecs=toList(trigger['time']))
337 manager.event_filters.append(f)
James E. Blairc494d542014-08-06 09:23:52 -0700338 if 'zuul' in conf_pipeline['trigger']:
339 for trigger in toList(conf_pipeline['trigger']['zuul']):
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000340 f = EventFilter(
341 trigger=self.triggers['zuul'],
342 types=toList(trigger['event']),
343 pipelines=toList(trigger.get('pipeline')),
344 required_approvals=toList(
345 trigger.get('require-approval')
346 )
347 )
James E. Blairc494d542014-08-06 09:23:52 -0700348 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700349
Antoine Musso80edd5a2013-02-13 15:37:53 +0100350 for project_template in data.get('project-templates', []):
351 # Make sure the template only contains valid pipelines
352 tpl = dict(
353 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400354 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100355 if pipe_name in project_template
356 )
James E. Blaireff88162013-07-01 12:44:14 -0400357 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100358
James E. Blair47958382013-01-10 17:26:02 -0800359 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400360 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700361 # Be careful to only set attributes explicitly present on
362 # this job, to avoid squashing attributes set by a meta-job.
James E. Blairc8a1e052014-02-25 09:29:26 -0800363 m = config_job.get('queue-name', None)
364 if m:
365 job.queue_name = m
James E. Blairb0954652012-06-01 11:32:01 -0700366 m = config_job.get('failure-message', None)
367 if m:
368 job.failure_message = m
369 m = config_job.get('success-message', None)
370 if m:
371 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800372 m = config_job.get('failure-pattern', None)
373 if m:
374 job.failure_pattern = m
375 m = config_job.get('success-pattern', None)
376 if m:
377 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700378 m = config_job.get('hold-following-changes', False)
379 if m:
380 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700381 m = config_job.get('voting', None)
382 if m is not None:
383 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700384 fname = config_job.get('parameter-function', None)
385 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400386 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700387 if not func:
388 raise Exception("Unable to find function %s" % fname)
389 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700390 branches = toList(config_job.get('branch'))
391 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700392 job._branches = branches
393 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800394 files = toList(config_job.get('files'))
395 if files:
396 job._files = files
397 job.files = [re.compile(x) for x in files]
Joshua Hesketh36c3fa52014-01-22 11:40:52 +1100398 swift = toList(config_job.get('swift'))
399 if swift:
400 for s in swift:
401 job.swift[s['name']] = s
James E. Blairee743612012-05-29 14:49:32 -0700402
403 def add_jobs(job_tree, config_jobs):
404 for job in config_jobs:
405 if isinstance(job, list):
406 for x in job:
407 add_jobs(job_tree, x)
408 if isinstance(job, dict):
409 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400410 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700411 add_jobs(parent_tree, children)
412 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400413 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700414
James E. Blair47958382013-01-10 17:26:02 -0800415 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700416 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800417 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100418
James E. Blair3e98c022013-12-16 15:25:38 -0800419 # This is reversed due to the prepend operation below, so
420 # the ultimate order is templates (in order) followed by
421 # statically defined jobs.
422 for requested_template in reversed(
423 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100424 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400425 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100426 requested_template.get('name'))
427 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800428 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100429 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800430 # Finally merge the expansion with whatever has been
431 # already defined for this project. Prepend our new
432 # jobs to existing ones (which may have been
433 # statically defined or defined by other templates).
434 for pipeline in layout.pipelines.values():
435 if pipeline.name in expanded:
436 config_project.update(
437 {pipeline.name: expanded[pipeline.name] +
438 config_project.get(pipeline.name, [])})
James E. Blair12a92b12014-03-26 11:54:53 -0700439 # TODO: future enhancement -- handle the case where
440 # duplicate jobs have different children and you want all
441 # of the children to run after a single run of the
442 # parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100443
James E. Blaireff88162013-07-01 12:44:14 -0400444 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700445 mode = config_project.get('merge-mode', 'merge-resolve')
446 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400447 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700448 if pipeline.name in config_project:
449 job_tree = pipeline.addProject(project)
450 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700451 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700452
James E. Blairb0954652012-06-01 11:32:01 -0700453 # All jobs should be defined at this point, get rid of
454 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700455 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700456
James E. Blaireff88162013-07-01 12:44:14 -0400457 for pipeline in layout.pipelines.values():
458 pipeline.manager._postConfig(layout)
459
460 return layout
James E. Blairee743612012-05-29 14:49:32 -0700461
James E. Blairee743612012-05-29 14:49:32 -0700462 def setLauncher(self, launcher):
463 self.launcher = launcher
464
James E. Blair4076e2b2014-01-28 12:42:20 -0800465 def setMerger(self, merger):
466 self.merger = merger
467
James E. Blair6c358e72013-07-29 17:06:47 -0700468 def registerTrigger(self, trigger, name=None):
469 if name is None:
470 name = trigger.name
471 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700472
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000473 def registerReporter(self, reporter, name=None):
474 if name is None:
475 name = reporter.name
476 self.reporters[name] = reporter
477
James E. Blaircdccd972013-07-01 12:10:22 -0700478 def getProject(self, name):
479 self.layout_lock.acquire()
480 p = None
481 try:
482 p = self.layout.projects.get(name)
483 finally:
484 self.layout_lock.release()
485 return p
486
James E. Blairee743612012-05-29 14:49:32 -0700487 def addEvent(self, event):
488 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800489 try:
490 if statsd:
491 statsd.incr('gerrit.event.%s' % event.type)
492 except:
493 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700494 self.trigger_event_queue.put(event)
495 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800496 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700497
James E. Blair11700c32012-07-05 17:50:05 -0700498 def onBuildStarted(self, build):
499 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800500 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800501 event = BuildStartedEvent(build)
502 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700503 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800504 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700505
James E. Blairee743612012-05-29 14:49:32 -0700506 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700507 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800508 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800509 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700510 if statsd and build.pipeline:
511 jobname = build.job.name.replace('.', '_')
512 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
513 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800514 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
515 dt = int((build.end_time - build.start_time) * 1000)
516 statsd.timing(key, dt)
517 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700518 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
519 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800520 except:
521 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800522 event = BuildCompletedEvent(build)
523 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700524 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800525 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700526
James E. Blair4076e2b2014-01-28 12:42:20 -0800527 def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
528 self.log.debug("Adding merge complete event for build set: %s" %
529 build_set)
530 event = MergeCompletedEvent(build_set, zuul_url,
531 merged, updated, commit)
532 self.result_event_queue.put(event)
533 self.wake_event.set()
534
James E. Blaire9d45c32012-05-31 09:56:45 -0700535 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700536 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800537 event = ReconfigureEvent(config)
538 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700539 self.wake_event.set()
540 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800541 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700542 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400543 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700544
James E. Blair36658cf2013-12-06 17:53:48 -0800545 def promote(self, pipeline_name, change_ids):
546 event = PromoteEvent(pipeline_name, change_ids)
547 self.management_event_queue.put(event)
548 self.wake_event.set()
549 self.log.debug("Waiting for promotion")
550 event.wait()
551 self.log.debug("Promotion complete")
552
James E. Blaird27a96d2014-07-10 13:25:13 -0700553 def enqueue(self, trigger_event):
554 event = EnqueueEvent(trigger_event)
555 self.management_event_queue.put(event)
556 self.wake_event.set()
557 self.log.debug("Waiting for enqueue")
558 event.wait()
559 self.log.debug("Enqueue complete")
560
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700561 def exit(self):
562 self.log.debug("Prepare to exit")
563 self._pause = True
564 self._exit = True
565 self.wake_event.set()
566 self.log.debug("Waiting for exit")
567
568 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700569 if self.config.has_option('zuul', 'state_dir'):
570 state_dir = os.path.expanduser(self.config.get('zuul',
571 'state_dir'))
572 else:
573 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700574 return os.path.join(state_dir, 'queue.pickle')
575
576 def _save_queue(self):
577 pickle_file = self._get_queue_pickle_file()
578 events = []
579 while not self.trigger_event_queue.empty():
580 events.append(self.trigger_event_queue.get())
581 self.log.debug("Queue length is %s" % len(events))
582 if events:
583 self.log.debug("Saving queue")
584 pickle.dump(events, open(pickle_file, 'wb'))
585
586 def _load_queue(self):
587 pickle_file = self._get_queue_pickle_file()
588 if os.path.exists(pickle_file):
589 self.log.debug("Loading queue")
590 events = pickle.load(open(pickle_file, 'rb'))
591 self.log.debug("Queue length is %s" % len(events))
592 for event in events:
593 self.trigger_event_queue.put(event)
594 else:
595 self.log.debug("No queue file found")
596
597 def _delete_queue(self):
598 pickle_file = self._get_queue_pickle_file()
599 if os.path.exists(pickle_file):
600 self.log.debug("Deleting saved queue")
601 os.unlink(pickle_file)
602
603 def resume(self):
604 try:
605 self._load_queue()
606 except:
607 self.log.exception("Unable to load queue")
608 try:
609 self._delete_queue()
610 except:
611 self.log.exception("Unable to delete saved queue")
612 self.log.debug("Resuming queue processing")
613 self.wake_event.set()
614
615 def _doPauseEvent(self):
616 if self._exit:
617 self.log.debug("Exiting")
618 self._save_queue()
619 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700620
James E. Blair468c8512013-12-06 13:27:19 -0800621 def _doReconfigureEvent(self, event):
622 # This is called in the scheduler loop after another thread submits
623 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700624 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800625 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700626 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700627 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700628 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400629 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700630 for name, new_pipeline in layout.pipelines.items():
631 old_pipeline = self.layout.pipelines.get(name)
632 if not old_pipeline:
633 if self.layout.pipelines:
634 # Don't emit this warning on startup
635 self.log.warning("No old pipeline matching %s found "
636 "when reconfiguring" % name)
637 continue
James E. Blairdad52252014-02-07 16:59:17 -0800638 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blaircdccd972013-07-01 12:10:22 -0700639 items_to_remove = []
James E. Blair6b077942014-02-07 17:45:55 -0800640 builds_to_remove = []
James E. Blairbfb8e042014-12-30 17:01:44 -0800641 last_head = None
James E. Blaircdccd972013-07-01 12:10:22 -0700642 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700643 for item in shared_queue.queue:
James E. Blairbfb8e042014-12-30 17:01:44 -0800644 if not item.item_ahead:
645 last_head = item
James E. Blaircdccd972013-07-01 12:10:22 -0700646 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700647 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700648 item.pipeline = None
James E. Blairbfb8e042014-12-30 17:01:44 -0800649 item.queue = None
James E. Blaircdccd972013-07-01 12:10:22 -0700650 project = layout.projects.get(item.change.project.name)
651 if not project:
652 self.log.warning("Unable to find project for "
653 "change %s while reenqueueing" %
654 item.change)
655 item.change.project = None
656 items_to_remove.append(item)
657 continue
658 item.change.project = project
James E. Blairdad52252014-02-07 16:59:17 -0800659 for build in item.current_build_set.getBuilds():
James E. Blair6b077942014-02-07 17:45:55 -0800660 job = layout.jobs.get(build.job.name)
661 if job:
662 build.job = job
663 else:
664 builds_to_remove.append(build)
James E. Blairbfb8e042014-12-30 17:01:44 -0800665 if not new_pipeline.manager.reEnqueueItem(item,
666 last_head):
James E. Blaircdccd972013-07-01 12:10:22 -0700667 items_to_remove.append(item)
James E. Blair6b077942014-02-07 17:45:55 -0800668 for item in items_to_remove:
669 for build in item.current_build_set.getBuilds():
James E. Blaircdccd972013-07-01 12:10:22 -0700670 builds_to_remove.append(build)
James E. Blaircdccd972013-07-01 12:10:22 -0700671 for build in builds_to_remove:
James E. Blair6b077942014-02-07 17:45:55 -0800672 self.log.warning(
673 "Canceling build %s during reconfiguration" % (build,))
James E. Blairdad52252014-02-07 16:59:17 -0800674 try:
675 self.launcher.cancel(build)
676 except Exception:
677 self.log.exception(
678 "Exception while canceling build %s "
679 "for change %s" % (build, item.change))
James E. Blaircdccd972013-07-01 12:10:22 -0700680 self.layout = layout
James E. Blairc0acb552014-08-16 08:17:02 -0700681 self.maintainTriggerCache()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700682 for trigger in self.triggers.values():
683 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700684 if statsd:
685 try:
686 for pipeline in self.layout.pipelines.values():
687 items = len(pipeline.getAllItems())
688 # stats.gauges.zuul.pipeline.NAME.current_changes
689 key = 'zuul.pipeline.%s' % pipeline.name
690 statsd.gauge(key + '.current_changes', items)
691 except Exception:
692 self.log.exception("Exception reporting initial "
693 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700694 finally:
695 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700696
James E. Blair36658cf2013-12-06 17:53:48 -0800697 def _doPromoteEvent(self, event):
698 pipeline = self.layout.pipelines[event.pipeline_name]
699 change_ids = [c.split(',') for c in event.change_ids]
700 items_to_enqueue = []
701 change_queue = None
702 for shared_queue in pipeline.queues:
703 if change_queue:
704 break
705 for item in shared_queue.queue:
706 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000707 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800708 change_queue = shared_queue
709 break
710 if not change_queue:
711 raise Exception("Unable to find shared change queue for %s" %
712 event.change_ids[0])
713 for number, patchset in change_ids:
714 found = False
715 for item in change_queue.queue:
716 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000717 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800718 found = True
719 items_to_enqueue.append(item)
720 break
721 if not found:
722 raise Exception("Unable to find %s,%s in queue %s" %
723 (number, patchset, change_queue))
724 for item in change_queue.queue[:]:
725 if item not in items_to_enqueue:
726 items_to_enqueue.append(item)
727 pipeline.manager.cancelJobs(item)
728 pipeline.manager.dequeueItem(item)
729 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500730 pipeline.manager.addChange(
731 item.change,
732 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700733 quiet=True,
734 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800735
James E. Blaird27a96d2014-07-10 13:25:13 -0700736 def _doEnqueueEvent(self, event):
737 project = self.layout.projects.get(event.project_name)
738 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700739 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700740 self.log.debug("Event %s for change %s was directly assigned "
741 "to pipeline %s" % (event, change, self))
742 self.log.info("Adding %s, %s to %s" %
743 (project, change, pipeline))
744 pipeline.manager.addChange(change, ignore_requirements=True)
745
James E. Blaire9d45c32012-05-31 09:56:45 -0700746 def _areAllBuildsComplete(self):
747 self.log.debug("Checking if all builds are complete")
748 waiting = False
James E. Blair4076e2b2014-01-28 12:42:20 -0800749 if self.merger.areMergesOutstanding():
750 waiting = True
James E. Blaireff88162013-07-01 12:44:14 -0400751 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800752 for item in pipeline.getAllItems():
753 for build in item.current_build_set.getBuilds():
754 if build.result is None:
755 self.log.debug("%s waiting on %s" %
756 (pipeline.manager, build))
757 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700758 if not waiting:
759 self.log.debug("All builds are complete")
760 return True
761 self.log.debug("All builds are not complete")
762 return False
763
James E. Blairee743612012-05-29 14:49:32 -0700764 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800765 if statsd:
766 self.log.debug("Statsd enabled")
767 else:
768 self.log.debug("Statsd disabled because python statsd "
769 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700770 while True:
771 self.log.debug("Run handler sleeping")
772 self.wake_event.wait()
773 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700774 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800775 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700776 return
James E. Blairee743612012-05-29 14:49:32 -0700777 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800778 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700779 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800780 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800781 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700782
James E. Blair263fba92013-02-27 13:07:19 -0800783 # Give result events priority -- they let us stop builds,
784 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800785 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700786 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800787
788 if not self._pause:
789 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800790 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700791
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700792 if self._pause and self._areAllBuildsComplete():
793 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700794
James E. Blaira84f0e42014-02-06 07:09:22 -0800795 for pipeline in self.layout.pipelines.values():
796 while pipeline.manager.processQueue():
797 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700798
James E. Blaira84f0e42014-02-06 07:09:22 -0800799 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700800 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800801 # There may still be more events to process
802 self.wake_event.set()
803 finally:
804 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700805
James E. Blair0e933c52013-07-11 10:18:52 -0700806 def maintainTriggerCache(self):
807 relevant = set()
808 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700809 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700810 for item in pipeline.getAllItems():
811 relevant.add(item.change)
812 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700813 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700814 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700815 for trigger in self.triggers.values():
816 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700817
James E. Blairee743612012-05-29 14:49:32 -0700818 def process_event_queue(self):
819 self.log.debug("Fetching trigger event")
820 event = self.trigger_event_queue.get()
821 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800822 try:
823 project = self.layout.projects.get(event.project_name)
824 if not project:
Jukka Lehtniemibec3ed02014-10-31 13:45:17 +0100825 self.log.debug("Project %s not found" % event.project_name)
James E. Blaira84f0e42014-02-06 07:09:22 -0800826 return
827
James E. Blaira84f0e42014-02-06 07:09:22 -0800828 for pipeline in self.layout.pipelines.values():
James E. Blairc0dedf82014-08-06 09:37:52 -0700829 change = pipeline.source.getChange(event, project)
James E. Blaira84f0e42014-02-06 07:09:22 -0800830 if event.type == 'patchset-created':
831 pipeline.manager.removeOldVersionsOfChange(change)
Antoine Mussobd86a312014-01-08 14:51:33 +0100832 elif event.type == 'change-abandoned':
833 pipeline.manager.removeAbandonedChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800834 if pipeline.manager.eventMatches(event, change):
835 self.log.info("Adding %s, %s to %s" %
836 (project, change, pipeline))
837 pipeline.manager.addChange(change)
838 finally:
James E. Blairff791972013-01-09 11:45:43 -0800839 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700840
James E. Blair468c8512013-12-06 13:27:19 -0800841 def process_management_queue(self):
842 self.log.debug("Fetching management event")
843 event = self.management_event_queue.get()
844 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800845 try:
846 if isinstance(event, ReconfigureEvent):
847 self._doReconfigureEvent(event)
848 elif isinstance(event, PromoteEvent):
849 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700850 elif isinstance(event, EnqueueEvent):
851 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800852 else:
853 self.log.error("Unable to handle event %s" % event)
854 event.done()
855 except Exception as e:
856 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800857 self.management_event_queue.task_done()
858
James E. Blairee743612012-05-29 14:49:32 -0700859 def process_result_queue(self):
860 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800861 event = self.result_event_queue.get()
862 self.log.debug("Processing result event %s" % event)
863 try:
864 if isinstance(event, BuildStartedEvent):
865 self._doBuildStartedEvent(event)
866 elif isinstance(event, BuildCompletedEvent):
867 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800868 elif isinstance(event, MergeCompletedEvent):
869 self._doMergeCompletedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800870 else:
871 self.log.error("Unable to handle event %s" % event)
872 finally:
873 self.result_event_queue.task_done()
874
875 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800876 build = event.build
877 if build.build_set is not build.build_set.item.current_build_set:
878 self.log.warning("Build %s is not in the current build set" %
879 (build,))
880 return
881 pipeline = build.build_set.item.pipeline
882 if not pipeline:
883 self.log.warning("Build %s is not associated with a pipeline" %
884 (build,))
885 return
886 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800887
888 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800889 build = event.build
890 if build.build_set is not build.build_set.item.current_build_set:
891 self.log.warning("Build %s is not in the current build set" %
892 (build,))
893 return
894 pipeline = build.build_set.item.pipeline
895 if not pipeline:
896 self.log.warning("Build %s is not associated with a pipeline" %
897 (build,))
898 return
899 pipeline.manager.onBuildCompleted(event.build)
900
901 def _doMergeCompletedEvent(self, event):
902 build_set = event.build_set
903 if build_set is not build_set.item.current_build_set:
904 self.log.warning("Build set %s is not current" % (build_set,))
905 return
906 pipeline = build_set.item.pipeline
907 if not pipeline:
908 self.log.warning("Build set %s is not associated with a pipeline" %
909 (build_set,))
910 return
911 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700912
James E. Blair8dbd56a2012-12-22 10:55:10 -0800913 def formatStatusJSON(self):
914 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400915
916 data['zuul_version'] = self.zuul_version
917
James E. Blair8dbd56a2012-12-22 10:55:10 -0800918 if self._pause:
919 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800920 if self._exit:
921 ret += 'exit'
922 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
923 ret += '</p>'
924 data['message'] = ret
925
James E. Blairfb682cc2013-02-26 15:23:27 -0800926 data['trigger_event_queue'] = {}
927 data['trigger_event_queue']['length'] = \
928 self.trigger_event_queue.qsize()
929 data['result_event_queue'] = {}
930 data['result_event_queue']['length'] = \
931 self.result_event_queue.qsize()
932
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400933 if self.last_reconfigured:
934 data['last_reconfigured'] = self.last_reconfigured * 1000
935
James E. Blair8dbd56a2012-12-22 10:55:10 -0800936 pipelines = []
937 data['pipelines'] = pipelines
Alex Gaynorfda4c352014-06-04 11:15:26 -0700938 for pipeline in self.layout.pipelines.values():
James E. Blair8dbd56a2012-12-22 10:55:10 -0800939 pipelines.append(pipeline.formatStatusJSON())
940 return json.dumps(data)
941
James E. Blair1e8dd892012-05-30 09:15:05 -0700942
James E. Blair4aea70c2012-07-26 14:23:24 -0700943class BasePipelineManager(object):
944 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700945
James E. Blair4aea70c2012-07-26 14:23:24 -0700946 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700947 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700948 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700949 self.event_filters = []
James E. Blair11041d22014-05-02 14:49:53 -0700950 self.changeish_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700951 if self.sched.config and self.sched.config.has_option(
952 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700953 self.report_times = self.sched.config.getboolean(
954 'zuul', 'report_times')
955 else:
956 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700957
958 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700959 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700960
James E. Blaireff88162013-07-01 12:44:14 -0400961 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700962 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairc0dedf82014-08-06 09:37:52 -0700963 self.log.info(" Source: %s" % self.pipeline.source)
James E. Blair11041d22014-05-02 14:49:53 -0700964 self.log.info(" Requirements:")
965 for f in self.changeish_filters:
966 self.log.info(" %s" % f)
James E. Blairee743612012-05-29 14:49:32 -0700967 self.log.info(" Events:")
968 for e in self.event_filters:
969 self.log.info(" %s" % e)
970 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700971
James E. Blairee743612012-05-29 14:49:32 -0700972 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700973 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700974 if tree.job:
975 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700976 for b in tree.job._branches:
977 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800978 for f in tree.job._files:
979 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700980 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700981 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700982 hold = ''
983 if tree.job.hold_following_changes:
984 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700985 voting = ''
986 if not tree.job.voting:
987 voting = ' [nonvoting]'
988 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
989 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700990 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700991 log_jobs(x, indent + 2)
992
James E. Blaireff88162013-07-01 12:44:14 -0400993 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700994 tree = self.pipeline.getJobTree(p)
995 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700996 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700997 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000998 self.log.info(" On start:")
999 self.log.info(" %s" % self.pipeline.start_actions)
1000 self.log.info(" On success:")
1001 self.log.info(" %s" % self.pipeline.success_actions)
1002 self.log.info(" On failure:")
1003 self.log.info(" %s" % self.pipeline.failure_actions)
Joshua Heskethb7179772014-01-30 23:30:46 +11001004 self.log.info(" On merge-failure:")
1005 self.log.info(" %s" % self.pipeline.merge_failure_actions)
James E. Blairee743612012-05-29 14:49:32 -07001006
James E. Blaire421a232012-07-25 16:59:21 -07001007 def getSubmitAllowNeeds(self):
1008 # Get a list of code review labels that are allowed to be
1009 # "needed" in the submit records for a change, with respect
1010 # to this queue. In other words, the list of review labels
1011 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001012 allow_needs = set()
1013 for action_reporter in self.pipeline.success_actions:
1014 allow_needs.update(action_reporter.getSubmitAllowNeeds())
1015 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -07001016
James E. Blairc053d022014-01-22 14:57:33 -08001017 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -08001018 if event.forced_pipeline:
1019 if event.forced_pipeline == self.pipeline.name:
James E. Blair1b265312014-06-24 09:35:21 -07001020 self.log.debug("Event %s for change %s was directly assigned "
1021 "to pipeline %s" % (event, change, self))
James E. Blairad28e912013-11-27 10:43:22 -08001022 return True
1023 else:
1024 return False
James E. Blairee743612012-05-29 14:49:32 -07001025 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -08001026 if ef.matches(event, change):
James E. Blair1b265312014-06-24 09:35:21 -07001027 self.log.debug("Event %s for change %s matched %s "
1028 "in pipeline %s" % (event, change, ef, self))
James E. Blairee743612012-05-29 14:49:32 -07001029 return True
1030 return False
1031
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001032 def isChangeAlreadyInPipeline(self, change):
1033 # Checks live items in the pipeline
1034 for item in self.pipeline.getAllItems():
1035 if item.live and change.equals(item.change):
1036 return True
1037 return False
1038
1039 def isChangeAlreadyInQueue(self, change, change_queue):
1040 # Checks any item in the specified change queue
1041 for item in change_queue.queue:
1042 if change.equals(item.change):
James E. Blair0dc8ba92012-07-16 14:23:52 -07001043 return True
1044 return False
1045
James E. Blaire0487072012-08-29 17:38:31 -07001046 def reportStart(self, change):
1047 try:
1048 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001049 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -07001050 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -07001051 if self.sched.config.has_option('zuul', 'status_url'):
1052 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001053 ret = self.sendReport(self.pipeline.start_actions,
1054 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -07001055 if ret:
1056 self.log.error("Reporting change start %s received: %s" %
1057 (change, ret))
1058 except:
1059 self.log.exception("Exception while reporting start:")
1060
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001061 def sendReport(self, action_reporters, change, message):
1062 """Sends the built message off to configured reporters.
1063
1064 Takes the action_reporters, change, message and extra options and
1065 sends them to the pluggable reporters.
1066 """
1067 report_errors = []
1068 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001069 for action_reporter in action_reporters:
1070 ret = action_reporter.report(change, message)
1071 if ret:
1072 report_errors.append(ret)
1073 if len(report_errors) == 0:
1074 return
1075 return report_errors
1076
James E. Blaire0487072012-08-29 17:38:31 -07001077 def isChangeReadyToBeEnqueued(self, change):
1078 return True
1079
James E. Blair5ee24252014-12-30 10:12:29 -08001080 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1081 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001082 return True
1083
James E. Blair5ee24252014-12-30 10:12:29 -08001084 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1085 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001086 return True
1087
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001088 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairfee8d652013-06-07 08:57:52 -07001089 return True
1090
James E. Blair6965a4b2014-12-16 17:19:04 -08001091 def getFailingDependentItems(self, item):
James E. Blair972e3c72013-08-29 12:04:55 -07001092 return None
1093
James E. Blairfee8d652013-06-07 08:57:52 -07001094 def getDependentItems(self, item):
1095 orig_item = item
1096 items = []
1097 while item.item_ahead:
1098 items.append(item.item_ahead)
1099 item = item.item_ahead
1100 self.log.info("Change %s depends on changes %s" %
1101 (orig_item.change,
1102 [x.change for x in items]))
1103 return items
1104
James E. Blair972e3c72013-08-29 12:04:55 -07001105 def getItemForChange(self, change):
1106 for item in self.pipeline.getAllItems():
1107 if item.change.equals(change):
1108 return item
1109 return None
1110
James E. Blair2fa50962013-01-30 21:50:41 -08001111 def findOldVersionOfChangeAlreadyInQueue(self, change):
James E. Blairba437362015-02-07 11:41:52 -08001112 for item in self.pipeline.getAllItems():
1113 if not item.live:
1114 continue
1115 if change.isUpdateOf(item.change):
1116 return item
James E. Blair2fa50962013-01-30 21:50:41 -08001117 return None
1118
1119 def removeOldVersionsOfChange(self, change):
1120 if not self.pipeline.dequeue_on_new_patchset:
1121 return
James E. Blairba437362015-02-07 11:41:52 -08001122 old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
1123 if old_item:
James E. Blair2fa50962013-01-30 21:50:41 -08001124 self.log.debug("Change %s is a new version of %s, removing %s" %
James E. Blairba437362015-02-07 11:41:52 -08001125 (change, old_item.change, old_item))
1126 self.removeItem(old_item)
James E. Blair2fa50962013-01-30 21:50:41 -08001127
Antoine Mussobd86a312014-01-08 14:51:33 +01001128 def removeAbandonedChange(self, change):
1129 self.log.debug("Change %s abandoned, removing." % change)
James E. Blairba437362015-02-07 11:41:52 -08001130 for item in self.pipeline.getAllItems():
1131 if not item.live:
1132 continue
1133 if item.change.equals(change):
1134 self.removeItem(item)
Antoine Mussobd86a312014-01-08 14:51:33 +01001135
James E. Blairbfb8e042014-12-30 17:01:44 -08001136 def reEnqueueItem(self, item, last_head):
1137 if last_head.queue:
1138 change_queue = last_head.queue
1139 else:
1140 change_queue = self.getChangeQueue(item.change)
James E. Blaircdccd972013-07-01 12:10:22 -07001141 if change_queue:
1142 self.log.debug("Re-enqueing change %s in queue %s" %
1143 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -07001144 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -07001145 self.reportStats(item)
1146 return True
1147 else:
1148 self.log.error("Unable to find change queue for project %s" %
1149 item.change.project)
1150 return False
1151
James E. Blairf9ab8842014-07-10 13:12:07 -07001152 def addChange(self, change, quiet=False, enqueue_time=None,
James E. Blairbfb8e042014-12-30 17:01:44 -08001153 ignore_requirements=False, live=True,
1154 change_queue=None):
James E. Blaire0487072012-08-29 17:38:31 -07001155 self.log.debug("Considering adding change %s" % change)
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001156
1157 # If we are adding a live change, check if it's a live item
1158 # anywhere in the pipeline. Otherwise, we will perform the
1159 # duplicate check below on the specific change_queue.
1160 if live and self.isChangeAlreadyInPipeline(change):
1161 self.log.debug("Change %s is already in pipeline, "
1162 "ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001163 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001164
James E. Blaire0487072012-08-29 17:38:31 -07001165 if not self.isChangeReadyToBeEnqueued(change):
1166 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1167 change)
1168 return False
1169
James E. Blairf9ab8842014-07-10 13:12:07 -07001170 if not ignore_requirements:
1171 for f in self.changeish_filters:
1172 if not f.matches(change):
1173 self.log.debug("Change %s does not match pipeline "
1174 "requirement %s" % (change, f))
1175 return False
James E. Blair11041d22014-05-02 14:49:53 -07001176
James E. Blair5ee24252014-12-30 10:12:29 -08001177 if not change_queue:
1178 change_queue = self.getChangeQueue(change)
1179 if not change_queue:
1180 self.log.debug("Unable to find change queue for "
1181 "change %s in project %s" %
1182 (change, change.project))
1183 return False
1184
1185 if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
1186 change_queue):
James E. Blair1490eba2013-03-06 19:14:00 -08001187 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001188 return False
1189
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001190 if self.isChangeAlreadyInQueue(change, change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001191 self.log.debug("Change %s is already in queue, ignoring" % change)
1192 return True
1193
James E. Blair5ee24252014-12-30 10:12:29 -08001194 self.log.debug("Adding change %s to queue %s" %
1195 (change, change_queue))
1196 if not quiet:
1197 if len(self.pipeline.start_actions) > 0:
1198 self.reportStart(change)
1199 item = change_queue.enqueueChange(change)
1200 if enqueue_time:
1201 item.enqueue_time = enqueue_time
James E. Blairbfb8e042014-12-30 17:01:44 -08001202 item.live = live
James E. Blair5ee24252014-12-30 10:12:29 -08001203 self.reportStats(item)
1204 self.enqueueChangesBehind(change, quiet, ignore_requirements,
1205 change_queue)
1206 self.sched.triggers['zuul'].onChangeEnqueued(item.change,
1207 self.pipeline)
1208 return True
James E. Blairee743612012-05-29 14:49:32 -07001209
James E. Blair972e3c72013-08-29 12:04:55 -07001210 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001211 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairbfb8e042014-12-30 17:01:44 -08001212 item.queue.dequeueItem(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001213
James E. Blairba437362015-02-07 11:41:52 -08001214 def removeItem(self, item):
1215 # Remove an item from the queue, probably because it has been
Alex Gaynor813d39b2014-05-17 16:17:16 -07001216 # superseded by another change.
James E. Blairba437362015-02-07 11:41:52 -08001217 self.log.debug("Canceling builds behind change: %s "
1218 "because it is being removed." % item.change)
1219 self.cancelJobs(item)
1220 self.dequeueItem(item)
1221 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001222
James E. Blairac2c3242014-01-24 13:38:51 -08001223 def _makeMergerItem(self, item):
1224 # Create a dictionary with all info about the item needed by
1225 # the merger.
Clark Boylan4c6566b2014-03-10 11:02:01 -07001226 number = None
1227 patchset = None
1228 oldrev = None
1229 newrev = None
1230 if hasattr(item.change, 'number'):
1231 number = item.change.number
1232 patchset = item.change.patchset
1233 elif hasattr(item.change, 'newrev'):
1234 oldrev = item.change.oldrev
1235 newrev = item.change.newrev
James E. Blairac2c3242014-01-24 13:38:51 -08001236 return dict(project=item.change.project.name,
James E. Blairc0dedf82014-08-06 09:37:52 -07001237 url=self.pipeline.source.getGitUrl(
James E. Blairac2c3242014-01-24 13:38:51 -08001238 item.change.project),
1239 merge_mode=item.change.project.merge_mode,
1240 refspec=item.change.refspec,
1241 branch=item.change.branch,
1242 ref=item.current_build_set.ref,
Clark Boylan4c6566b2014-03-10 11:02:01 -07001243 number=number,
1244 patchset=patchset,
1245 oldrev=oldrev,
1246 newrev=newrev,
James E. Blairac2c3242014-01-24 13:38:51 -08001247 )
1248
James E. Blairfee8d652013-06-07 08:57:52 -07001249 def prepareRef(self, item):
James E. Blair4076e2b2014-01-28 12:42:20 -08001250 # Returns True if the ref is ready, false otherwise
1251 build_set = item.current_build_set
1252 if build_set.merge_state == build_set.COMPLETE:
1253 return True
1254 if build_set.merge_state == build_set.PENDING:
1255 return False
1256 build_set.merge_state = build_set.PENDING
1257 ref = build_set.ref
James E. Blairfee8d652013-06-07 08:57:52 -07001258 if hasattr(item.change, 'refspec') and not ref:
1259 self.log.debug("Preparing ref for: %s" % item.change)
1260 item.current_build_set.setConfiguration()
James E. Blairfee8d652013-06-07 08:57:52 -07001261 dependent_items = self.getDependentItems(item)
1262 dependent_items.reverse()
1263 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001264 merger_items = map(self._makeMergerItem, all_items)
James E. Blair4076e2b2014-01-28 12:42:20 -08001265 self.sched.merger.mergeChanges(merger_items,
James E. Blaire9a81842014-09-24 13:37:45 -07001266 item.current_build_set,
1267 self.pipeline.precedence)
James E. Blair4076e2b2014-01-28 12:42:20 -08001268 else:
1269 self.log.debug("Preparing update repo for: %s" % item.change)
James E. Blairc0dedf82014-08-06 09:37:52 -07001270 url = self.pipeline.source.getGitUrl(item.change.project)
James E. Blair4076e2b2014-01-28 12:42:20 -08001271 self.sched.merger.updateRepo(item.change.project.name,
James E. Blaire9a81842014-09-24 13:37:45 -07001272 url, build_set,
1273 self.pipeline.precedence)
James E. Blairfee8d652013-06-07 08:57:52 -07001274 return False
1275
1276 def _launchJobs(self, item, jobs):
1277 self.log.debug("Launching jobs for change %s" % item.change)
1278 dependent_items = self.getDependentItems(item)
1279 for job in jobs:
1280 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001281 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001282 build = self.sched.launcher.launch(job, item,
1283 self.pipeline,
1284 dependent_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001285 self.log.debug("Adding build %s of job %s to item %s" %
1286 (build, job, item))
1287 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001288 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001289 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001290 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001291
James E. Blairfee8d652013-06-07 08:57:52 -07001292 def launchJobs(self, item):
1293 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001294 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001295 self._launchJobs(item, jobs)
1296
1297 def cancelJobs(self, item, prime=True):
1298 self.log.debug("Cancel jobs for change %s" % item.change)
1299 canceled = False
James E. Blair6b077942014-02-07 17:45:55 -08001300 old_build_set = item.current_build_set
James E. Blair36658cf2013-12-06 17:53:48 -08001301 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001302 item.resetAllBuilds()
James E. Blair6b077942014-02-07 17:45:55 -08001303 for build in old_build_set.getBuilds():
1304 try:
1305 self.sched.launcher.cancel(build)
1306 except:
1307 self.log.exception("Exception while canceling build %s "
1308 "for change %s" % (build, item.change))
James E. Blairfee8d652013-06-07 08:57:52 -07001309 build.result = 'CANCELED'
James E. Blair6b077942014-02-07 17:45:55 -08001310 canceled = True
James E. Blair972e3c72013-08-29 12:04:55 -07001311 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001312 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001313 (item_behind.change, item.change))
1314 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001315 canceled = True
1316 return canceled
1317
James E. Blair4076e2b2014-01-28 12:42:20 -08001318 def _processOneItem(self, item, nnfi, ready_ahead):
James E. Blairfee8d652013-06-07 08:57:52 -07001319 changed = False
1320 item_ahead = item.item_ahead
James E. Blairbfb8e042014-12-30 17:01:44 -08001321 if item_ahead and (not item_ahead.live):
1322 item_ahead = None
1323 change_queue = item.queue
James E. Blair972e3c72013-08-29 12:04:55 -07001324 failing_reasons = [] # Reasons this item is failing
1325
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001326 if self.checkForChangesNeededBy(item.change, change_queue) is not True:
James E. Blairfee8d652013-06-07 08:57:52 -07001327 # It's not okay to enqueue this change, we should remove it.
1328 self.log.info("Dequeuing change %s because "
1329 "it can no longer merge" % item.change)
1330 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001331 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001332 self.pipeline.setDequeuedNeedingChange(item)
1333 try:
1334 self.reportItem(item)
1335 except MergeFailure:
1336 pass
James E. Blair4076e2b2014-01-28 12:42:20 -08001337 return (True, nnfi, ready_ahead)
James E. Blair6965a4b2014-12-16 17:19:04 -08001338 dep_items = self.getFailingDependentItems(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001339 actionable = change_queue.isActionable(item)
1340 item.active = actionable
James E. Blair4076e2b2014-01-28 12:42:20 -08001341 ready = False
James E. Blair6965a4b2014-12-16 17:19:04 -08001342 if dep_items:
James E. Blair972e3c72013-08-29 12:04:55 -07001343 failing_reasons.append('a needed change is failing')
1344 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001345 else:
James E. Blairfef71632013-09-23 11:15:47 -07001346 item_ahead_merged = False
James E. Blairbfb8e042014-12-30 17:01:44 -08001347 if (item_ahead and item_ahead.change.is_merged):
James E. Blairfef71632013-09-23 11:15:47 -07001348 item_ahead_merged = True
1349 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001350 # Our current base is different than what we expected,
1351 # and it's not because our current base merged. Something
1352 # ahead must have failed.
1353 self.log.info("Resetting builds for change %s because the "
1354 "item ahead, %s, is not the nearest non-failing "
1355 "item, %s" % (item.change, item_ahead, nnfi))
1356 change_queue.moveItem(item, nnfi)
1357 changed = True
1358 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001359 if actionable:
James E. Blair4076e2b2014-01-28 12:42:20 -08001360 ready = self.prepareRef(item)
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001361 if item.current_build_set.unable_to_merge:
1362 failing_reasons.append("it has a merge conflict")
James E. Blair4076e2b2014-01-28 12:42:20 -08001363 ready = False
1364 if not ready:
1365 ready_ahead = False
1366 if actionable and ready_ahead and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001367 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001368 if self.pipeline.didAnyJobFail(item):
1369 failing_reasons.append("at least one job failed")
James E. Blairbfb8e042014-12-30 17:01:44 -08001370 if (not item.live) and (not item.items_behind):
1371 failing_reasons.append("is a non-live item with no items behind")
1372 self.dequeueItem(item)
1373 changed = True
James E. Blairec2e1562015-02-05 10:45:54 -08001374 if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
1375 and item.live):
James E. Blair972e3c72013-08-29 12:04:55 -07001376 try:
1377 self.reportItem(item)
1378 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001379 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001380 for item_behind in item.items_behind:
1381 self.log.info("Resetting builds for change %s because the "
1382 "item ahead, %s, failed to merge" %
1383 (item_behind.change, item))
1384 self.cancelJobs(item_behind)
1385 self.dequeueItem(item)
1386 changed = True
James E. Blairbfb8e042014-12-30 17:01:44 -08001387 elif not failing_reasons and item.live:
James E. Blair972e3c72013-08-29 12:04:55 -07001388 nnfi = item
1389 item.current_build_set.failing_reasons = failing_reasons
1390 if failing_reasons:
1391 self.log.debug("%s is a failing item because %s" %
1392 (item, failing_reasons))
James E. Blair4076e2b2014-01-28 12:42:20 -08001393 return (changed, nnfi, ready_ahead)
James E. Blairfee8d652013-06-07 08:57:52 -07001394
1395 def processQueue(self):
1396 # Do whatever needs to be done for each change in the queue
1397 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1398 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001399 for queue in self.pipeline.queues:
1400 queue_changed = False
1401 nnfi = None # Nearest non-failing item
James E. Blair4076e2b2014-01-28 12:42:20 -08001402 ready_ahead = True # All build sets ahead are ready
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001403 for item in queue.queue[:]:
James E. Blair4076e2b2014-01-28 12:42:20 -08001404 item_changed, nnfi, ready_ahhead = self._processOneItem(
1405 item, nnfi, ready_ahead)
James E. Blair972e3c72013-08-29 12:04:55 -07001406 if item_changed:
1407 queue_changed = True
1408 self.reportStats(item)
1409 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001410 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001411 status = ''
1412 for item in queue.queue:
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001413 status += item.formatStatus()
James E. Blair972e3c72013-08-29 12:04:55 -07001414 if status:
1415 self.log.debug("Queue %s status is now:\n %s" %
1416 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001417 self.log.debug("Finished queue processor: %s (changed: %s)" %
1418 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001419 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001420
James E. Blair11700c32012-07-05 17:50:05 -07001421 def updateBuildDescriptions(self, build_set):
1422 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001423 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001424 self.sched.launcher.setBuildDescription(build, desc)
1425
1426 if build_set.previous_build_set:
1427 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001428 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001429 self.sched.launcher.setBuildDescription(build, desc)
1430
1431 def onBuildStarted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001432 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001433 self.updateBuildDescriptions(build.build_set)
1434 return True
1435
James E. Blairee743612012-05-29 14:49:32 -07001436 def onBuildCompleted(self, build):
James E. Blairfee8d652013-06-07 08:57:52 -07001437 self.log.debug("Build %s completed" % build)
James E. Blair6b077942014-02-07 17:45:55 -08001438 item = build.build_set.item
James E. Blairee743612012-05-29 14:49:32 -07001439
James E. Blair6b077942014-02-07 17:45:55 -08001440 self.pipeline.setResult(item, build)
1441 self.log.debug("Item %s status is now:\n %s" %
Joshua Hesketh85af4e92014-02-21 08:28:58 -08001442 (item, item.formatStatus()))
James E. Blair11700c32012-07-05 17:50:05 -07001443 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -07001444 return True
1445
James E. Blair4076e2b2014-01-28 12:42:20 -08001446 def onMergeCompleted(self, event):
1447 build_set = event.build_set
1448 item = build_set.item
1449 build_set.merge_state = build_set.COMPLETE
1450 build_set.zuul_url = event.zuul_url
1451 if event.merged:
1452 build_set.commit = event.commit
1453 elif event.updated:
1454 build_set.commit = item.change.newrev
1455 if not build_set.commit:
1456 self.log.info("Unable to merge change %s" % item.change)
Joshua Heskethb7179772014-01-30 23:30:46 +11001457 self.pipeline.setUnableToMerge(item)
James E. Blair4076e2b2014-01-28 12:42:20 -08001458
James E. Blairfee8d652013-06-07 08:57:52 -07001459 def reportItem(self, item):
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001460 if not item.reported:
1461 # _reportItem() returns True if it failed to report.
1462 item.reported = not self._reportItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001463 if self.changes_merge:
1464 succeeded = self.pipeline.didAllJobsSucceed(item)
Clark Boylanf7dc4da2014-07-28 10:12:25 -07001465 merged = item.reported
James E. Blairfee8d652013-06-07 08:57:52 -07001466 if merged:
James E. Blairc0dedf82014-08-06 09:37:52 -07001467 merged = self.pipeline.source.isMerged(item.change,
1468 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001469 self.log.info("Reported change %s status: all-succeeded: %s, "
1470 "merged: %s" % (item.change, succeeded, merged))
James E. Blairbfb8e042014-12-30 17:01:44 -08001471 change_queue = item.queue
James E. Blairfee8d652013-06-07 08:57:52 -07001472 if not (succeeded and merged):
1473 self.log.debug("Reported change %s failed tests or failed "
1474 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001475 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001476 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001477 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001478 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001479 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001480 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001481 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001482 (change_queue, change_queue.window))
James E. Blairc494d542014-08-06 09:23:52 -07001483 self.sched.triggers['zuul'].onChangeMerged(item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001484
James E. Blairfee8d652013-06-07 08:57:52 -07001485 def _reportItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001486 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001487 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001488 if self.pipeline.didAllJobsSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001489 self.log.debug("success %s" % (self.pipeline.success_actions))
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001490 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001491 item.setReportedResult('SUCCESS')
Joshua Heskethb7179772014-01-30 23:30:46 +11001492 elif not self.pipeline.didMergerSucceed(item):
1493 actions = self.pipeline.merge_failure_actions
1494 item.setReportedResult('MERGER_FAILURE')
James E. Blairee743612012-05-29 14:49:32 -07001495 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001496 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001497 item.setReportedResult('FAILURE')
James E. Blaire5910202013-12-27 09:50:31 -08001498 if actions:
1499 report = self.formatReport(item)
1500 try:
1501 self.log.info("Reporting change %s, actions: %s" %
1502 (item.change, actions))
1503 ret = self.sendReport(actions, item.change, report)
1504 if ret:
1505 self.log.error("Reporting change %s received: %s" %
1506 (item.change, ret))
1507 except:
1508 self.log.exception("Exception while reporting:")
1509 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001510 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001511 return ret
1512
James E. Blairfee8d652013-06-07 08:57:52 -07001513 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001514 ret = ''
Joshua Heskethb7179772014-01-30 23:30:46 +11001515
Jeremy Stanley10837132014-08-02 16:10:56 +00001516 if item.dequeued_needing_change:
1517 ret += 'This change depends on a change that failed to merge.\n'
1518 elif not self.pipeline.didMergerSucceed(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001519 ret += self.pipeline.merge_failure_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001520 else:
Jeremy Stanley10837132014-08-02 16:10:56 +00001521 if self.pipeline.didAllJobsSucceed(item):
1522 ret += self.pipeline.success_message + '\n\n'
1523 else:
1524 ret += self.pipeline.failure_message + '\n\n'
1525 ret += self._formatReportJobs(item)
1526
1527 if self.pipeline.footer_message:
1528 ret += '\n' + self.pipeline.footer_message
1529
1530 return ret
1531
1532 def _formatReportJobs(self, item):
1533 # Return the list of jobs portion of the report
1534 ret = ''
James E. Blair8b0d4c42012-08-23 16:03:05 -07001535
Joshua Heskethb7179772014-01-30 23:30:46 +11001536 if self.sched.config.has_option('zuul', 'url_pattern'):
1537 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blair8b0d4c42012-08-23 16:03:05 -07001538 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001539 url_pattern = None
1540
James E. Blair107c3852015-02-07 08:23:10 -08001541 for job in self.pipeline.getJobs(item):
Joshua Heskethb7179772014-01-30 23:30:46 +11001542 build = item.current_build_set.getBuild(job.name)
1543 result = build.result
1544 pattern = url_pattern
1545 if result == 'SUCCESS':
1546 if job.success_message:
1547 result = job.success_message
1548 if job.success_pattern:
1549 pattern = job.success_pattern
1550 elif result == 'FAILURE':
1551 if job.failure_message:
1552 result = job.failure_message
1553 if job.failure_pattern:
1554 pattern = job.failure_pattern
1555 if pattern:
Jeremy Stanleyc6d4bc82014-08-01 19:11:29 +00001556 url = pattern.format(change=item.change,
1557 pipeline=self.pipeline,
1558 job=job,
1559 build=build)
James E. Blaira35fcce2012-08-24 10:46:01 -07001560 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001561 url = build.url or job.name
1562 if not job.voting:
1563 voting = ' (non-voting)'
1564 else:
1565 voting = ''
1566 if self.report_times and build.end_time and build.start_time:
1567 dt = int(build.end_time - build.start_time)
1568 m, s = divmod(dt, 60)
1569 h, m = divmod(m, 60)
1570 if h:
1571 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1572 elif m:
1573 elapsed = ' in %dm %02ds' % (m, s)
Ori Livneh7191ee82013-05-02 19:13:53 -07001574 else:
Joshua Heskethb7179772014-01-30 23:30:46 +11001575 elapsed = ' in %ds' % (s)
1576 else:
1577 elapsed = ''
1578 name = ''
1579 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1580 if self.sched.config.getboolean('zuul',
1581 'job_name_in_report'):
1582 name = job.name + ' '
1583 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1584 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001585 return ret
1586
1587 def formatDescription(self, build):
1588 concurrent_changes = ''
1589 concurrent_builds = ''
1590 other_builds = ''
1591
1592 for change in build.build_set.other_changes:
1593 concurrent_changes += '<li><a href="{change.url}">\
1594 {change.number},{change.patchset}</a></li>'.format(
1595 change=change)
1596
James E. Blairfee8d652013-06-07 08:57:52 -07001597 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001598
1599 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001600 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001601 concurrent_builds += """\
1602<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001603 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001604 {build.job.name} #{build.number}</a>: {build.result}
1605</li>
1606""".format(build=build)
1607 else:
1608 concurrent_builds += """\
1609<li>
1610 {build.job.name}: {build.result}
1611</li>""".format(build=build)
1612
1613 if build.build_set.previous_build_set:
1614 other_build = build.build_set.previous_build_set.getBuild(
1615 build.job.name)
1616 if other_build:
1617 other_builds += """\
1618<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001619 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001620 {build.job.name} #{build.number}</a>
1621</li>
1622""".format(build=other_build)
1623
1624 if build.build_set.next_build_set:
1625 other_build = build.build_set.next_build_set.getBuild(
1626 build.job.name)
1627 if other_build:
1628 other_builds += """\
1629<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001630 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001631 {build.job.name} #{build.number}</a>
1632</li>
1633""".format(build=other_build)
1634
1635 result = build.build_set.result
1636
1637 if hasattr(change, 'number'):
1638 ret = """\
1639<p>
1640 Triggered by change:
1641 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1642 Branch: <b>{change.branch}</b><br/>
1643 Pipeline: <b>{self.pipeline.name}</b>
1644</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001645 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001646 ret = """\
1647<p>
1648 Triggered by reference:
1649 {change.ref}</a><br/>
1650 Old revision: <b>{change.oldrev}</b><br/>
1651 New revision: <b>{change.newrev}</b><br/>
1652 Pipeline: <b>{self.pipeline.name}</b>
1653</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001654 else:
1655 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001656
1657 if concurrent_changes:
1658 ret += """\
1659<p>
1660 Other changes tested concurrently with this change:
1661 <ul>{concurrent_changes}</ul>
1662</p>
1663"""
1664 if concurrent_builds:
1665 ret += """\
1666<p>
1667 All builds for this change set:
1668 <ul>{concurrent_builds}</ul>
1669</p>
1670"""
1671
1672 if other_builds:
1673 ret += """\
1674<p>
1675 Other build sets for this change:
1676 <ul>{other_builds}</ul>
1677</p>
1678"""
1679 if result:
1680 ret += """\
1681<p>
1682 Reported result: <b>{result}</b>
1683</p>
1684"""
1685
1686 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001687 return ret
1688
James E. Blairfee8d652013-06-07 08:57:52 -07001689 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001690 if not statsd:
1691 return
1692 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001693 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001694 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001695 if item.dequeue_time:
1696 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001697 else:
1698 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001699 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001700
1701 # stats.timers.zuul.pipeline.NAME.resident_time
1702 # stats_counts.zuul.pipeline.NAME.total_changes
1703 # stats.gauges.zuul.pipeline.NAME.current_changes
1704 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001705 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001706 if dt:
1707 statsd.timing(key + '.resident_time', dt)
1708 statsd.incr(key + '.total_changes')
1709
1710 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1711 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001712 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001713 key += '.%s' % project_name
1714 if dt:
1715 statsd.timing(key + '.resident_time', dt)
1716 statsd.incr(key + '.total_changes')
1717 except:
1718 self.log.exception("Exception reporting pipeline stats")
1719
James E. Blair1e8dd892012-05-30 09:15:05 -07001720
James E. Blair4aea70c2012-07-26 14:23:24 -07001721class IndependentPipelineManager(BasePipelineManager):
1722 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001723 changes_merge = False
1724
James E. Blaireff88162013-07-01 12:44:14 -04001725 def _postConfig(self, layout):
1726 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001727
James E. Blair5ee24252014-12-30 10:12:29 -08001728 def getChangeQueue(self, change):
James E. Blairbfb8e042014-12-30 17:01:44 -08001729 # creates a new change queue for every change
1730 if change.project not in self.pipeline.getProjects():
1731 return None
1732 change_queue = ChangeQueue(self.pipeline)
1733 change_queue.addProject(change.project)
1734 self.pipeline.addQueue(change_queue)
1735 return change_queue
1736
1737 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1738 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001739 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blairbfb8e042014-12-30 17:01:44 -08001740 if ret in [True, False]:
1741 return ret
1742 self.log.debug(" Changes %s must be merged ahead of %s" %
1743 (ret, change))
1744 for needed_change in ret:
1745 # This differs from the dependent pipeline by enqueuing
1746 # changes ahead as "not live", that is, not intended to
1747 # have jobs run. Also, pipeline requirements are always
1748 # ignored (which is safe because the changes are not
1749 # live).
1750 r = self.addChange(needed_change, quiet=True,
1751 ignore_requirements=True,
1752 live=False, change_queue=change_queue)
1753 if not r:
1754 return False
1755 return True
1756
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001757 def checkForChangesNeededBy(self, change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001758 self.log.debug("Checking for changes needed by %s:" % change)
1759 # Return true if okay to proceed enqueing this change,
1760 # false if the change should not be enqueued.
1761 if not hasattr(change, 'needs_changes'):
1762 self.log.debug(" Changeish does not support dependencies")
1763 return True
1764 if not change.needs_changes:
1765 self.log.debug(" No changes needed")
1766 return True
1767 changes_needed = []
1768 for needed_change in change.needs_changes:
1769 self.log.debug(" Change %s needs change %s:" % (
1770 change, needed_change))
1771 if needed_change.is_merged:
1772 self.log.debug(" Needed change is merged")
1773 continue
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001774 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blairbfb8e042014-12-30 17:01:44 -08001775 self.log.debug(" Needed change is already ahead in the queue")
1776 continue
1777 self.log.debug(" Change %s is needed" % needed_change)
1778 if needed_change not in changes_needed:
1779 changes_needed.append(needed_change)
1780 continue
1781 # This differs from the dependent pipeline check in not
1782 # verifying that the dependent change is mergable.
1783 if changes_needed:
1784 return changes_needed
1785 return True
1786
1787 def dequeueItem(self, item):
1788 super(IndependentPipelineManager, self).dequeueItem(item)
1789 # An independent pipeline manager dynamically removes empty
1790 # queues
1791 if not item.queue.queue:
1792 self.pipeline.removeQueue(item.queue)
James E. Blair5ee24252014-12-30 10:12:29 -08001793
James E. Blair1e8dd892012-05-30 09:15:05 -07001794
James E. Blair4aea70c2012-07-26 14:23:24 -07001795class DependentPipelineManager(BasePipelineManager):
1796 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001797 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001798
1799 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001800 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001801
James E. Blaireff88162013-07-01 12:44:14 -04001802 def _postConfig(self, layout):
1803 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001804 self.buildChangeQueues()
1805
1806 def buildChangeQueues(self):
1807 self.log.debug("Building shared change queues")
1808 change_queues = []
1809
James E. Blair4aea70c2012-07-26 14:23:24 -07001810 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001811 change_queue = ChangeQueue(
1812 self.pipeline,
1813 window=self.pipeline.window,
1814 window_floor=self.pipeline.window_floor,
1815 window_increase_type=self.pipeline.window_increase_type,
1816 window_increase_factor=self.pipeline.window_increase_factor,
1817 window_decrease_type=self.pipeline.window_decrease_type,
1818 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001819 change_queue.addProject(project)
1820 change_queues.append(change_queue)
1821 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001822
James E. Blairc3d428e2013-12-03 15:06:48 -08001823 # Iterate over all queues trying to combine them, and keep doing
1824 # so until they can not be combined further.
1825 last_change_queues = change_queues
1826 while True:
1827 new_change_queues = self.combineChangeQueues(last_change_queues)
1828 if len(last_change_queues) == len(new_change_queues):
1829 break
1830 last_change_queues = new_change_queues
1831
1832 self.log.info(" Shared change queues:")
1833 for queue in new_change_queues:
1834 self.pipeline.addQueue(queue)
James E. Blairc8a1e052014-02-25 09:29:26 -08001835 self.log.info(" %s containing %s" % (
1836 queue, queue.generated_name))
James E. Blairc3d428e2013-12-03 15:06:48 -08001837
1838 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001839 self.log.debug("Combining shared queues")
1840 new_change_queues = []
1841 for a in change_queues:
1842 merged_a = False
1843 for b in new_change_queues:
1844 if not a.getJobs().isdisjoint(b.getJobs()):
1845 self.log.debug("Merging queue %s into %s" % (a, b))
1846 b.mergeChangeQueue(a)
1847 merged_a = True
1848 break # this breaks out of 'for b' and continues 'for a'
1849 if not merged_a:
1850 self.log.debug("Keeping queue %s" % (a))
1851 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001852 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001853
James E. Blair5ee24252014-12-30 10:12:29 -08001854 def getChangeQueue(self, change):
1855 return self.pipeline.getQueue(change.project)
1856
James E. Blaire0487072012-08-29 17:38:31 -07001857 def isChangeReadyToBeEnqueued(self, change):
James E. Blairc0dedf82014-08-06 09:37:52 -07001858 if not self.pipeline.source.canMerge(change,
1859 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001860 self.log.debug("Change %s can not merge, ignoring" % change)
1861 return False
1862 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001863
James E. Blair5ee24252014-12-30 10:12:29 -08001864 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
1865 change_queue):
James E. Blaire0487072012-08-29 17:38:31 -07001866 to_enqueue = []
1867 self.log.debug("Checking for changes needing %s:" % change)
1868 if not hasattr(change, 'needed_by_changes'):
1869 self.log.debug(" Changeish does not support dependencies")
1870 return
James E. Blair5ee24252014-12-30 10:12:29 -08001871 for other_change in change.needed_by_changes:
1872 other_change_queue = self.getChangeQueue(other_change)
1873 if other_change_queue != change_queue:
1874 self.log.debug(" Change %s in project %s can not be enqueued "
1875 "in the target queue %s" %
1876 (other_change, other_change.project,
1877 change_queue))
1878 continue
1879 if self.pipeline.source.canMerge(other_change,
James E. Blairc0dedf82014-08-06 09:37:52 -07001880 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001881 self.log.debug(" Change %s needs %s and is ready to merge" %
James E. Blair5ee24252014-12-30 10:12:29 -08001882 (other_change, change))
1883 to_enqueue.append(other_change)
1884
James E. Blaire0487072012-08-29 17:38:31 -07001885 if not to_enqueue:
1886 self.log.debug(" No changes need %s" % change)
1887
1888 for other_change in to_enqueue:
James E. Blairf9ab8842014-07-10 13:12:07 -07001889 self.addChange(other_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001890 ignore_requirements=ignore_requirements,
1891 change_queue=change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001892
James E. Blair5ee24252014-12-30 10:12:29 -08001893 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
1894 change_queue):
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001895 ret = self.checkForChangesNeededBy(change, change_queue)
James E. Blaire0487072012-08-29 17:38:31 -07001896 if ret in [True, False]:
1897 return ret
James E. Blair5ee24252014-12-30 10:12:29 -08001898 self.log.debug(" Changes %s must be merged ahead of %s" %
James E. Blaire0487072012-08-29 17:38:31 -07001899 (ret, change))
James E. Blair6965a4b2014-12-16 17:19:04 -08001900 for needed_change in ret:
1901 r = self.addChange(needed_change, quiet=quiet,
James E. Blair5ee24252014-12-30 10:12:29 -08001902 ignore_requirements=ignore_requirements,
1903 change_queue=change_queue)
James E. Blair6965a4b2014-12-16 17:19:04 -08001904 if not r:
1905 return False
1906 return True
James E. Blaire0487072012-08-29 17:38:31 -07001907
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001908 def checkForChangesNeededBy(self, change, change_queue):
James E. Blaire421a232012-07-25 16:59:21 -07001909 self.log.debug("Checking for changes needed by %s:" % change)
1910 # Return true if okay to proceed enqueing this change,
1911 # false if the change should not be enqueued.
James E. Blair6965a4b2014-12-16 17:19:04 -08001912 if not hasattr(change, 'needs_changes'):
James E. Blair4aea70c2012-07-26 14:23:24 -07001913 self.log.debug(" Changeish does not support dependencies")
1914 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08001915 if not change.needs_changes:
James E. Blaire421a232012-07-25 16:59:21 -07001916 self.log.debug(" No changes needed")
1917 return True
James E. Blair6965a4b2014-12-16 17:19:04 -08001918 changes_needed = []
James E. Blair5ee24252014-12-30 10:12:29 -08001919 change_queue = self.getChangeQueue(change)
James E. Blair6965a4b2014-12-16 17:19:04 -08001920 for needed_change in change.needs_changes:
1921 self.log.debug(" Change %s needs change %s:" % (
1922 change, needed_change))
1923 if needed_change.is_merged:
1924 self.log.debug(" Needed change is merged")
1925 continue
James E. Blair5ee24252014-12-30 10:12:29 -08001926 needed_change_queue = self.getChangeQueue(needed_change)
1927 if needed_change_queue != change_queue:
1928 self.log.debug(" Change %s in project %s does not share a "
1929 "change queue with %s in project %s" %
1930 (needed_change, needed_change.project,
1931 change, change.project))
1932 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08001933 if not needed_change.is_current_patchset:
1934 self.log.debug(" Needed change is not the current patchset")
1935 return False
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001936 if self.isChangeAlreadyInQueue(needed_change, change_queue):
James E. Blair6965a4b2014-12-16 17:19:04 -08001937 self.log.debug(" Needed change is already ahead in the queue")
1938 continue
1939 if self.pipeline.source.canMerge(needed_change,
1940 self.getSubmitAllowNeeds()):
1941 self.log.debug(" Change %s is needed" % needed_change)
1942 if needed_change not in changes_needed:
1943 changes_needed.append(needed_change)
1944 continue
1945 # The needed change can't be merged.
1946 self.log.debug(" Change %s is needed but can not be merged" %
1947 needed_change)
James E. Blaire421a232012-07-25 16:59:21 -07001948 return False
James E. Blair6965a4b2014-12-16 17:19:04 -08001949 if changes_needed:
1950 return changes_needed
1951 return True
James E. Blair972e3c72013-08-29 12:04:55 -07001952
James E. Blair6965a4b2014-12-16 17:19:04 -08001953 def getFailingDependentItems(self, item):
1954 if not hasattr(item.change, 'needs_changes'):
James E. Blair972e3c72013-08-29 12:04:55 -07001955 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08001956 if not item.change.needs_changes:
James E. Blair972e3c72013-08-29 12:04:55 -07001957 return None
James E. Blair6965a4b2014-12-16 17:19:04 -08001958 failing_items = set()
1959 for needed_change in item.change.needs_changes:
1960 needed_item = self.getItemForChange(needed_change)
1961 if not needed_item:
1962 continue
1963 if needed_item.current_build_set.failing_reasons:
1964 failing_items.add(needed_item)
1965 if failing_items:
1966 return failing_items
James E. Blair972e3c72013-08-29 12:04:55 -07001967 return None