blob: 805d334afd3454b7cdd5eb6d22c90d822f5bfb41 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
Joshua Hesketh1879cf72013-08-19 14:13:15 +100032from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070033import merger
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040034from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070035
James E. Blair71e94122012-12-24 17:53:08 -080036statsd = extras.try_import('statsd.statsd')
37
James E. Blair1e8dd892012-05-30 09:15:05 -070038
Antoine Musso80edd5a2013-02-13 15:37:53 +010039def deep_format(obj, paramdict):
40 """Apply the paramdict via str.format() to all string objects found within
41 the supplied obj. Lists and dicts are traversed recursively.
42
43 Borrowed from Jenkins Job Builder project"""
44 if isinstance(obj, str):
45 ret = obj.format(**paramdict)
46 elif isinstance(obj, list):
47 ret = []
48 for item in obj:
49 ret.append(deep_format(item, paramdict))
50 elif isinstance(obj, dict):
51 ret = {}
52 for item in obj:
53 exp_item = item.format(**paramdict)
54
55 ret[exp_item] = deep_format(obj[item], paramdict)
56 else:
57 ret = obj
58 return ret
59
60
James E. Blairfee8d652013-06-07 08:57:52 -070061class MergeFailure(Exception):
62 pass
63
64
James E. Blair468c8512013-12-06 13:27:19 -080065class ManagementEvent(object):
66 """An event that should be processed within the main queue run loop"""
67 def __init__(self):
68 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080069 self._exception = None
70 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080071
James E. Blair36658cf2013-12-06 17:53:48 -080072 def exception(self, e, tb):
73 self._exception = e
74 self._traceback = tb
75 self._wait_event.set()
76
77 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080078 self._wait_event.set()
79
80 def wait(self, timeout=None):
81 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080082 if self._exception:
83 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080084 return self._wait_event.is_set()
85
86
87class ReconfigureEvent(ManagementEvent):
88 """Reconfigure the scheduler. The layout will be (re-)loaded from
89 the path specified in the configuration.
90
91 :arg ConfigParser config: the new configuration
92 """
93 def __init__(self, config):
94 super(ReconfigureEvent, self).__init__()
95 self.config = config
96
97
James E. Blair36658cf2013-12-06 17:53:48 -080098class PromoteEvent(ManagementEvent):
99 """Promote one or more changes to the head of the queue.
100
101 :arg str pipeline_name: the name of the pipeline
102 :arg list change_ids: a list of strings of change ids in the form
103 1234,1
104 """
105
106 def __init__(self, pipeline_name, change_ids):
107 super(PromoteEvent, self).__init__()
108 self.pipeline_name = pipeline_name
109 self.change_ids = change_ids
110
111
James E. Blaira84f0e42014-02-06 07:09:22 -0800112class ResultEvent(object):
113 """An event that needs to modify the pipeline state due to a
114 result from an external system."""
115
116 pass
117
118
119class BuildStartedEvent(ResultEvent):
120 """A build has started.
121
122 :arg Build build: The build which has started.
123 """
124
125 def __init__(self, build):
126 self.build = build
127
128
129class BuildCompletedEvent(ResultEvent):
130 """A build has completed
131
132 :arg Build build: The build which has completed.
133 """
134
135 def __init__(self, build):
136 self.build = build
137
138
James E. Blaire9d45c32012-05-31 09:56:45 -0700139class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700140 log = logging.getLogger("zuul.Scheduler")
141
James E. Blaire9d45c32012-05-31 09:56:45 -0700142 def __init__(self):
143 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400144 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700145 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700146 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800147 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700148 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700149 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700150 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700151 self.launcher = None
James E. Blair6c358e72013-07-29 17:06:47 -0700152 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000153 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700154 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -0700155 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -0700156
157 self.trigger_event_queue = Queue.Queue()
158 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800159 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400160 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700161
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400162 self.zuul_version = zuul_version.version_info.version_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400163 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400164
James E. Blairb0fcae42012-07-17 11:12:10 -0700165 def stop(self):
166 self._stopped = True
167 self.wake_event.set()
168
James E. Blair47958382013-01-10 17:26:02 -0800169 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700170 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800171
James E. Blaire5a847f2012-07-10 15:29:14 -0700172 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400173 layout = model.Layout()
174 project_templates = {}
175
James E. Blairee743612012-05-29 14:49:32 -0700176 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -0700177 if not item:
178 return []
James E. Blair32663402012-06-01 10:04:18 -0700179 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700180 return item
181 return [item]
182
James E. Blaire5a847f2012-07-10 15:29:14 -0700183 if config_path:
184 config_path = os.path.expanduser(config_path)
185 if not os.path.exists(config_path):
186 raise Exception("Unable to read layout config file at %s" %
187 config_path)
188 config_file = open(config_path)
189 data = yaml.load(config_file)
190
James E. Blair47958382013-01-10 17:26:02 -0800191 validator = layoutvalidator.LayoutValidator()
192 validator.validate(data)
193
James E. Blaireff88162013-07-01 12:44:14 -0400194 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700195 for include in data.get('includes', []):
196 if 'python-file' in include:
197 fn = include['python-file']
198 if not os.path.isabs(fn):
199 base = os.path.dirname(config_path)
200 fn = os.path.join(base, fn)
201 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400202 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700203
James E. Blair4aea70c2012-07-26 14:23:24 -0700204 for conf_pipeline in data.get('pipelines', []):
205 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800206 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700207 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
208 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800209 pipeline.failure_message = conf_pipeline.get('failure-message',
210 "Build failed.")
211 pipeline.success_message = conf_pipeline.get('success-message',
212 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800213 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700214 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000215
216 action_reporters = {}
217 for action in ['start', 'success', 'failure']:
218 action_reporters[action] = []
219 if conf_pipeline.get(action):
220 for reporter_name, params \
221 in conf_pipeline.get(action).items():
222 if reporter_name in self.reporters.keys():
223 action_reporters[action].append(ActionReporter(
224 self.reporters[reporter_name], params))
225 else:
226 self.log.error('Invalid reporter name %s' %
227 reporter_name)
228 pipeline.start_actions = action_reporters['start']
229 pipeline.success_actions = action_reporters['success']
230 pipeline.failure_actions = action_reporters['failure']
231
Clark Boylan7603a372014-01-21 11:43:20 -0800232 pipeline.window = conf_pipeline.get('window', 20)
233 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
234 pipeline.window_increase_type = conf_pipeline.get(
235 'window-increase-type', 'linear')
236 pipeline.window_increase_factor = conf_pipeline.get(
237 'window-increase-factor', 1)
238 pipeline.window_decrease_type = conf_pipeline.get(
239 'window-decrease-type', 'exponential')
240 pipeline.window_decrease_factor = conf_pipeline.get(
241 'window-decrease-factor', 2)
242
James E. Blair4aea70c2012-07-26 14:23:24 -0700243 manager = globals()[conf_pipeline['manager']](self, pipeline)
244 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400245 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000246
James E. Blair6c358e72013-07-29 17:06:47 -0700247 # TODO: move this into triggers (may require pluggable
248 # configuration)
249 if 'gerrit' in conf_pipeline['trigger']:
250 pipeline.trigger = self.triggers['gerrit']
251 for trigger in toList(conf_pipeline['trigger']['gerrit']):
252 approvals = {}
253 for approval_dict in toList(trigger.get('approval')):
254 for k, v in approval_dict.items():
255 approvals[k] = v
256 f = EventFilter(types=toList(trigger['event']),
257 branches=toList(trigger.get('branch')),
258 refs=toList(trigger.get('ref')),
James E. Blairc053d022014-01-22 14:57:33 -0800259 event_approvals=approvals,
James E. Blair6c358e72013-07-29 17:06:47 -0700260 comment_filters=
261 toList(trigger.get('comment_filter')),
262 email_filters=
Joshua Heskethb8a817e2013-12-27 11:21:38 +1100263 toList(trigger.get('email_filter')),
264 username_filters=
James E. Blairc053d022014-01-22 14:57:33 -0800265 toList(trigger.get('username_filter')),
266 require_approvals=
267 toList(trigger.get('require-approval')))
James E. Blair6c358e72013-07-29 17:06:47 -0700268 manager.event_filters.append(f)
James E. Blair63bb0ef2013-07-29 17:14:51 -0700269 elif 'timer' in conf_pipeline['trigger']:
270 pipeline.trigger = self.triggers['timer']
271 for trigger in toList(conf_pipeline['trigger']['timer']):
272 f = EventFilter(types=['timer'],
273 timespecs=toList(trigger['time']))
274 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700275
Antoine Musso80edd5a2013-02-13 15:37:53 +0100276 for project_template in data.get('project-templates', []):
277 # Make sure the template only contains valid pipelines
278 tpl = dict(
279 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400280 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100281 if pipe_name in project_template
282 )
James E. Blaireff88162013-07-01 12:44:14 -0400283 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100284
James E. Blair47958382013-01-10 17:26:02 -0800285 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400286 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700287 # Be careful to only set attributes explicitly present on
288 # this job, to avoid squashing attributes set by a meta-job.
289 m = config_job.get('failure-message', None)
290 if m:
291 job.failure_message = m
292 m = config_job.get('success-message', None)
293 if m:
294 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800295 m = config_job.get('failure-pattern', None)
296 if m:
297 job.failure_pattern = m
298 m = config_job.get('success-pattern', None)
299 if m:
300 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700301 m = config_job.get('hold-following-changes', False)
302 if m:
303 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700304 m = config_job.get('voting', None)
305 if m is not None:
306 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700307 fname = config_job.get('parameter-function', None)
308 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400309 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700310 if not func:
311 raise Exception("Unable to find function %s" % fname)
312 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700313 branches = toList(config_job.get('branch'))
314 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700315 job._branches = branches
316 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800317 files = toList(config_job.get('files'))
318 if files:
319 job._files = files
320 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700321
322 def add_jobs(job_tree, config_jobs):
323 for job in config_jobs:
324 if isinstance(job, list):
325 for x in job:
326 add_jobs(job_tree, x)
327 if isinstance(job, dict):
328 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400329 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700330 add_jobs(parent_tree, children)
331 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400332 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700333
James E. Blair47958382013-01-10 17:26:02 -0800334 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700335 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800336 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100337
James E. Blair3e98c022013-12-16 15:25:38 -0800338 # This is reversed due to the prepend operation below, so
339 # the ultimate order is templates (in order) followed by
340 # statically defined jobs.
341 for requested_template in reversed(
342 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100343 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400344 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100345 requested_template.get('name'))
346 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800347 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100348 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800349 # Finally merge the expansion with whatever has been
350 # already defined for this project. Prepend our new
351 # jobs to existing ones (which may have been
352 # statically defined or defined by other templates).
353 for pipeline in layout.pipelines.values():
354 if pipeline.name in expanded:
355 config_project.update(
356 {pipeline.name: expanded[pipeline.name] +
357 config_project.get(pipeline.name, [])})
358 # TODO: future enhancement -- add an option to the
359 # template block to indicate that duplicate jobs should be
360 # merged (especially to handle the case where they have
361 # children and you want all of the children to run after a
362 # single run of the parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100363
James E. Blaireff88162013-07-01 12:44:14 -0400364 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700365 mode = config_project.get('merge-mode', 'merge-resolve')
366 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400367 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700368 if pipeline.name in config_project:
369 job_tree = pipeline.addProject(project)
370 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700371 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700372
James E. Blairb0954652012-06-01 11:32:01 -0700373 # All jobs should be defined at this point, get rid of
374 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700375 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700376
James E. Blaireff88162013-07-01 12:44:14 -0400377 for pipeline in layout.pipelines.values():
378 pipeline.manager._postConfig(layout)
379
380 return layout
James E. Blairee743612012-05-29 14:49:32 -0700381
James E. Blair47958382013-01-10 17:26:02 -0800382 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700383 if self.config.has_option('zuul', 'git_dir'):
384 merge_root = self.config.get('zuul', 'git_dir')
385 else:
386 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800387
Paul Belangerb67aba12013-05-13 19:22:14 -0400388 if self.config.has_option('zuul', 'git_user_email'):
389 merge_email = self.config.get('zuul', 'git_user_email')
390 else:
391 merge_email = None
392
393 if self.config.has_option('zuul', 'git_user_name'):
394 merge_name = self.config.get('zuul', 'git_user_name')
395 else:
396 merge_name = None
397
James E. Blairad615012012-11-30 16:14:21 -0800398 if self.config.has_option('gerrit', 'sshkey'):
399 sshkey = self.config.get('gerrit', 'sshkey')
400 else:
401 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800402
James E. Blair6c358e72013-07-29 17:06:47 -0700403 # TODO: The merger should have an upstream repo independent of
404 # triggers, and then each trigger should provide a fetch
405 # location.
James E. Blair9ca39832014-01-28 12:27:13 -0800406 self.merger = merger.Merger(merge_root, sshkey,
407 merge_email, merge_name)
James E. Blair4886cc12012-07-18 15:39:41 -0700408
James E. Blairee743612012-05-29 14:49:32 -0700409 def setLauncher(self, launcher):
410 self.launcher = launcher
411
James E. Blair6c358e72013-07-29 17:06:47 -0700412 def registerTrigger(self, trigger, name=None):
413 if name is None:
414 name = trigger.name
415 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700416
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000417 def registerReporter(self, reporter, name=None):
418 if name is None:
419 name = reporter.name
420 self.reporters[name] = reporter
421
James E. Blaircdccd972013-07-01 12:10:22 -0700422 def getProject(self, name):
423 self.layout_lock.acquire()
424 p = None
425 try:
426 p = self.layout.projects.get(name)
427 finally:
428 self.layout_lock.release()
429 return p
430
James E. Blairee743612012-05-29 14:49:32 -0700431 def addEvent(self, event):
432 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800433 try:
434 if statsd:
435 statsd.incr('gerrit.event.%s' % event.type)
436 except:
437 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700438 self.trigger_event_queue.put(event)
439 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800440 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700441
James E. Blair11700c32012-07-05 17:50:05 -0700442 def onBuildStarted(self, build):
443 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800444 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800445 event = BuildStartedEvent(build)
446 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700447 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800448 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700449
James E. Blairee743612012-05-29 14:49:32 -0700450 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700451 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800452 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800453 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700454 if statsd and build.pipeline:
455 jobname = build.job.name.replace('.', '_')
456 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
457 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800458 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
459 dt = int((build.end_time - build.start_time) * 1000)
460 statsd.timing(key, dt)
461 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700462 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
463 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800464 except:
465 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800466 event = BuildCompletedEvent(build)
467 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700468 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800469 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700470
James E. Blaire9d45c32012-05-31 09:56:45 -0700471 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700472 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800473 event = ReconfigureEvent(config)
474 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700475 self.wake_event.set()
476 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800477 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700478 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400479 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700480
James E. Blair36658cf2013-12-06 17:53:48 -0800481 def promote(self, pipeline_name, change_ids):
482 event = PromoteEvent(pipeline_name, change_ids)
483 self.management_event_queue.put(event)
484 self.wake_event.set()
485 self.log.debug("Waiting for promotion")
486 event.wait()
487 self.log.debug("Promotion complete")
488
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700489 def exit(self):
490 self.log.debug("Prepare to exit")
491 self._pause = True
492 self._exit = True
493 self.wake_event.set()
494 self.log.debug("Waiting for exit")
495
496 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700497 if self.config.has_option('zuul', 'state_dir'):
498 state_dir = os.path.expanduser(self.config.get('zuul',
499 'state_dir'))
500 else:
501 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700502 return os.path.join(state_dir, 'queue.pickle')
503
504 def _save_queue(self):
505 pickle_file = self._get_queue_pickle_file()
506 events = []
507 while not self.trigger_event_queue.empty():
508 events.append(self.trigger_event_queue.get())
509 self.log.debug("Queue length is %s" % len(events))
510 if events:
511 self.log.debug("Saving queue")
512 pickle.dump(events, open(pickle_file, 'wb'))
513
514 def _load_queue(self):
515 pickle_file = self._get_queue_pickle_file()
516 if os.path.exists(pickle_file):
517 self.log.debug("Loading queue")
518 events = pickle.load(open(pickle_file, 'rb'))
519 self.log.debug("Queue length is %s" % len(events))
520 for event in events:
521 self.trigger_event_queue.put(event)
522 else:
523 self.log.debug("No queue file found")
524
525 def _delete_queue(self):
526 pickle_file = self._get_queue_pickle_file()
527 if os.path.exists(pickle_file):
528 self.log.debug("Deleting saved queue")
529 os.unlink(pickle_file)
530
531 def resume(self):
532 try:
533 self._load_queue()
534 except:
535 self.log.exception("Unable to load queue")
536 try:
537 self._delete_queue()
538 except:
539 self.log.exception("Unable to delete saved queue")
540 self.log.debug("Resuming queue processing")
541 self.wake_event.set()
542
543 def _doPauseEvent(self):
544 if self._exit:
545 self.log.debug("Exiting")
546 self._save_queue()
547 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700548
James E. Blair468c8512013-12-06 13:27:19 -0800549 def _doReconfigureEvent(self, event):
550 # This is called in the scheduler loop after another thread submits
551 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700552 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800553 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700554 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700555 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700556 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400557 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700558 for name, new_pipeline in layout.pipelines.items():
559 old_pipeline = self.layout.pipelines.get(name)
560 if not old_pipeline:
561 if self.layout.pipelines:
562 # Don't emit this warning on startup
563 self.log.warning("No old pipeline matching %s found "
564 "when reconfiguring" % name)
565 continue
566 self.log.debug("Re-enqueueing changes for pipeline %s" %
567 name)
568 items_to_remove = []
569 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700570 for item in shared_queue.queue:
James E. Blaircdccd972013-07-01 12:10:22 -0700571 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700572 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700573 item.pipeline = None
574 project = layout.projects.get(item.change.project.name)
575 if not project:
576 self.log.warning("Unable to find project for "
577 "change %s while reenqueueing" %
578 item.change)
579 item.change.project = None
580 items_to_remove.append(item)
581 continue
582 item.change.project = project
James E. Blair972e3c72013-08-29 12:04:55 -0700583 if not new_pipeline.manager.reEnqueueItem(item):
James E. Blaircdccd972013-07-01 12:10:22 -0700584 items_to_remove.append(item)
585 builds_to_remove = []
586 for build, item in old_pipeline.manager.building_jobs.items():
587 if item in items_to_remove:
588 builds_to_remove.append(build)
589 self.log.warning("Deleting running build %s for "
590 "change %s while reenqueueing" % (
591 build, item.change))
592 for build in builds_to_remove:
593 del old_pipeline.manager.building_jobs[build]
594 new_pipeline.manager.building_jobs = \
595 old_pipeline.manager.building_jobs
596 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800597 self._setupMerger()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700598 for trigger in self.triggers.values():
599 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700600 if statsd:
601 try:
602 for pipeline in self.layout.pipelines.values():
603 items = len(pipeline.getAllItems())
604 # stats.gauges.zuul.pipeline.NAME.current_changes
605 key = 'zuul.pipeline.%s' % pipeline.name
606 statsd.gauge(key + '.current_changes', items)
607 except Exception:
608 self.log.exception("Exception reporting initial "
609 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700610 finally:
611 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700612
James E. Blair36658cf2013-12-06 17:53:48 -0800613 def _doPromoteEvent(self, event):
614 pipeline = self.layout.pipelines[event.pipeline_name]
615 change_ids = [c.split(',') for c in event.change_ids]
616 items_to_enqueue = []
617 change_queue = None
618 for shared_queue in pipeline.queues:
619 if change_queue:
620 break
621 for item in shared_queue.queue:
622 if (item.change.number == change_ids[0][0] and
623 item.change.patchset == change_ids[0][1]):
624 change_queue = shared_queue
625 break
626 if not change_queue:
627 raise Exception("Unable to find shared change queue for %s" %
628 event.change_ids[0])
629 for number, patchset in change_ids:
630 found = False
631 for item in change_queue.queue:
632 if (item.change.number == number and
633 item.change.patchset == patchset):
634 found = True
635 items_to_enqueue.append(item)
636 break
637 if not found:
638 raise Exception("Unable to find %s,%s in queue %s" %
639 (number, patchset, change_queue))
640 for item in change_queue.queue[:]:
641 if item not in items_to_enqueue:
642 items_to_enqueue.append(item)
643 pipeline.manager.cancelJobs(item)
644 pipeline.manager.dequeueItem(item)
645 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500646 pipeline.manager.addChange(
647 item.change,
648 enqueue_time=item.enqueue_time,
649 quiet=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800650
James E. Blaire9d45c32012-05-31 09:56:45 -0700651 def _areAllBuildsComplete(self):
652 self.log.debug("Checking if all builds are complete")
653 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400654 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700655 for build in pipeline.manager.building_jobs.keys():
656 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700657 waiting = True
658 if not waiting:
659 self.log.debug("All builds are complete")
660 return True
661 self.log.debug("All builds are not complete")
662 return False
663
James E. Blairee743612012-05-29 14:49:32 -0700664 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800665 if statsd:
666 self.log.debug("Statsd enabled")
667 else:
668 self.log.debug("Statsd disabled because python statsd "
669 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700670 while True:
671 self.log.debug("Run handler sleeping")
672 self.wake_event.wait()
673 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700674 if self._stopped:
675 return
James E. Blairee743612012-05-29 14:49:32 -0700676 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800677 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700678 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800679 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800680 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700681
James E. Blair263fba92013-02-27 13:07:19 -0800682 # Give result events priority -- they let us stop builds,
683 # whereas trigger evensts cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800684 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700685 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800686
687 if not self._pause:
688 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800689 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700690
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700691 if self._pause and self._areAllBuildsComplete():
692 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700693
James E. Blaira84f0e42014-02-06 07:09:22 -0800694 for pipeline in self.layout.pipelines.values():
695 while pipeline.manager.processQueue():
696 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700697
698 if self._maintain_trigger_cache:
699 self.maintainTriggerCache()
700 self._maintain_trigger_cache = False
701
James E. Blaira84f0e42014-02-06 07:09:22 -0800702 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700703 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800704 # There may still be more events to process
705 self.wake_event.set()
706 finally:
707 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700708
James E. Blair0e933c52013-07-11 10:18:52 -0700709 def maintainTriggerCache(self):
710 relevant = set()
711 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700712 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700713 for item in pipeline.getAllItems():
714 relevant.add(item.change)
715 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700716 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700717 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700718 for trigger in self.triggers.values():
719 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700720
James E. Blairee743612012-05-29 14:49:32 -0700721 def process_event_queue(self):
722 self.log.debug("Fetching trigger event")
723 event = self.trigger_event_queue.get()
724 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800725 try:
726 project = self.layout.projects.get(event.project_name)
727 if not project:
728 self.log.warning("Project %s not found" % event.project_name)
729 return
730
731 # Preprocessing for ref-update events
732 if event.ref:
733 # Make sure the local git repo is up-to-date with the
734 # remote one. We better have the new ref before
735 # enqueuing the changes. This is done before
736 # enqueuing the changes to avoid calling an update per
737 # pipeline accepting the change.
738 self.log.info("Fetching references for %s" % project)
739 url = self.triggers['gerrit'].getGitUrl(project)
740 self.merger.updateRepo(project.name, url)
741
742 for pipeline in self.layout.pipelines.values():
743 change = event.getChange(project,
744 self.triggers.get(event.trigger_name))
745 if event.type == 'patchset-created':
746 pipeline.manager.removeOldVersionsOfChange(change)
747 if pipeline.manager.eventMatches(event, change):
748 self.log.info("Adding %s, %s to %s" %
749 (project, change, pipeline))
750 pipeline.manager.addChange(change)
751 finally:
James E. Blairff791972013-01-09 11:45:43 -0800752 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700753
James E. Blair468c8512013-12-06 13:27:19 -0800754 def process_management_queue(self):
755 self.log.debug("Fetching management event")
756 event = self.management_event_queue.get()
757 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800758 try:
759 if isinstance(event, ReconfigureEvent):
760 self._doReconfigureEvent(event)
761 elif isinstance(event, PromoteEvent):
762 self._doPromoteEvent(event)
763 else:
764 self.log.error("Unable to handle event %s" % event)
765 event.done()
766 except Exception as e:
767 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800768 self.management_event_queue.task_done()
769
James E. Blairee743612012-05-29 14:49:32 -0700770 def process_result_queue(self):
771 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800772 event = self.result_event_queue.get()
773 self.log.debug("Processing result event %s" % event)
774 try:
775 if isinstance(event, BuildStartedEvent):
776 self._doBuildStartedEvent(event)
777 elif isinstance(event, BuildCompletedEvent):
778 self._doBuildCompletedEvent(event)
779 else:
780 self.log.error("Unable to handle event %s" % event)
781 finally:
782 self.result_event_queue.task_done()
783
784 def _doBuildStartedEvent(self, event):
James E. Blaireff88162013-07-01 12:44:14 -0400785 for pipeline in self.layout.pipelines.values():
James E. Blaira84f0e42014-02-06 07:09:22 -0800786 if pipeline.manager.onBuildStarted(event.build):
787 return
788 self.log.warning("Build %s not found by any queue manager" %
789 (event.build))
790
791 def _doBuildCompletedEvent(self, event):
792 for pipeline in self.layout.pipelines.values():
793 if pipeline.manager.onBuildCompleted(event.build):
794 return
795 self.log.warning("Build %s not found by any queue manager" %
796 (event.build))
James E. Blairee743612012-05-29 14:49:32 -0700797
James E. Blair268d9342012-06-13 18:24:29 -0700798 def formatStatusHTML(self):
799 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700800 if self._pause:
801 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700802 if self._exit:
803 ret += 'exit'
804 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
805 ret += '</p>'
806
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400807 if self.last_reconfigured:
808 ret += '<p>Last reconfigured: %s</p>' % self.last_reconfigured
809
James E. Blaireff88162013-07-01 12:44:14 -0400810 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700811 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400812 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700813 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700814 ret += s + '\n'
815 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700816 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700817 ret += '\n'
818 ret += '</pre></html>'
819 return ret
820
James E. Blair8dbd56a2012-12-22 10:55:10 -0800821 def formatStatusJSON(self):
822 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400823
824 data['zuul_version'] = self.zuul_version
825
James E. Blair8dbd56a2012-12-22 10:55:10 -0800826 if self._pause:
827 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800828 if self._exit:
829 ret += 'exit'
830 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
831 ret += '</p>'
832 data['message'] = ret
833
James E. Blairfb682cc2013-02-26 15:23:27 -0800834 data['trigger_event_queue'] = {}
835 data['trigger_event_queue']['length'] = \
836 self.trigger_event_queue.qsize()
837 data['result_event_queue'] = {}
838 data['result_event_queue']['length'] = \
839 self.result_event_queue.qsize()
840
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400841 if self.last_reconfigured:
842 data['last_reconfigured'] = self.last_reconfigured * 1000
843
James E. Blair8dbd56a2012-12-22 10:55:10 -0800844 pipelines = []
845 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400846 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800847 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400848 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800849 pipelines.append(pipeline.formatStatusJSON())
850 return json.dumps(data)
851
James E. Blair1e8dd892012-05-30 09:15:05 -0700852
James E. Blair4aea70c2012-07-26 14:23:24 -0700853class BasePipelineManager(object):
854 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700855
James E. Blair4aea70c2012-07-26 14:23:24 -0700856 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700857 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700858 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700859 self.building_jobs = {}
860 self.event_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700861 if self.sched.config and self.sched.config.has_option(
862 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700863 self.report_times = self.sched.config.getboolean(
864 'zuul', 'report_times')
865 else:
866 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700867
868 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700869 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700870
James E. Blaireff88162013-07-01 12:44:14 -0400871 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700872 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700873 self.log.info(" Events:")
874 for e in self.event_filters:
875 self.log.info(" %s" % e)
876 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700877
James E. Blairee743612012-05-29 14:49:32 -0700878 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700879 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700880 if tree.job:
881 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700882 for b in tree.job._branches:
883 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800884 for f in tree.job._files:
885 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700886 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700887 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700888 hold = ''
889 if tree.job.hold_following_changes:
890 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700891 voting = ''
892 if not tree.job.voting:
893 voting = ' [nonvoting]'
894 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
895 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700896 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700897 log_jobs(x, indent + 2)
898
James E. Blaireff88162013-07-01 12:44:14 -0400899 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700900 tree = self.pipeline.getJobTree(p)
901 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700902 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700903 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000904 self.log.info(" On start:")
905 self.log.info(" %s" % self.pipeline.start_actions)
906 self.log.info(" On success:")
907 self.log.info(" %s" % self.pipeline.success_actions)
908 self.log.info(" On failure:")
909 self.log.info(" %s" % self.pipeline.failure_actions)
James E. Blairee743612012-05-29 14:49:32 -0700910
James E. Blaire421a232012-07-25 16:59:21 -0700911 def getSubmitAllowNeeds(self):
912 # Get a list of code review labels that are allowed to be
913 # "needed" in the submit records for a change, with respect
914 # to this queue. In other words, the list of review labels
915 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000916 allow_needs = set()
917 for action_reporter in self.pipeline.success_actions:
918 allow_needs.update(action_reporter.getSubmitAllowNeeds())
919 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -0700920
James E. Blairc053d022014-01-22 14:57:33 -0800921 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -0800922 if event.forced_pipeline:
923 if event.forced_pipeline == self.pipeline.name:
924 return True
925 else:
926 return False
James E. Blairee743612012-05-29 14:49:32 -0700927 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -0800928 if ef.matches(event, change):
James E. Blairee743612012-05-29 14:49:32 -0700929 return True
930 return False
931
James E. Blair0dc8ba92012-07-16 14:23:52 -0700932 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700933 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700934 if change.equals(c):
935 return True
936 return False
937
James E. Blaire0487072012-08-29 17:38:31 -0700938 def reportStart(self, change):
939 try:
940 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000941 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -0700942 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700943 if self.sched.config.has_option('zuul', 'status_url'):
944 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000945 ret = self.sendReport(self.pipeline.start_actions,
946 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -0700947 if ret:
948 self.log.error("Reporting change start %s received: %s" %
949 (change, ret))
950 except:
951 self.log.exception("Exception while reporting start:")
952
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000953 def sendReport(self, action_reporters, change, message):
954 """Sends the built message off to configured reporters.
955
956 Takes the action_reporters, change, message and extra options and
957 sends them to the pluggable reporters.
958 """
959 report_errors = []
960 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000961 for action_reporter in action_reporters:
962 ret = action_reporter.report(change, message)
963 if ret:
964 report_errors.append(ret)
965 if len(report_errors) == 0:
966 return
967 return report_errors
968
James E. Blaire0487072012-08-29 17:38:31 -0700969 def isChangeReadyToBeEnqueued(self, change):
970 return True
971
James E. Blair36658cf2013-12-06 17:53:48 -0800972 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700973 return True
974
James E. Blair36658cf2013-12-06 17:53:48 -0800975 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700976 return True
977
James E. Blairfee8d652013-06-07 08:57:52 -0700978 def checkForChangesNeededBy(self, change):
979 return True
980
James E. Blair972e3c72013-08-29 12:04:55 -0700981 def getFailingDependentItem(self, item):
982 return None
983
James E. Blairfee8d652013-06-07 08:57:52 -0700984 def getDependentItems(self, item):
985 orig_item = item
986 items = []
987 while item.item_ahead:
988 items.append(item.item_ahead)
989 item = item.item_ahead
990 self.log.info("Change %s depends on changes %s" %
991 (orig_item.change,
992 [x.change for x in items]))
993 return items
994
James E. Blair972e3c72013-08-29 12:04:55 -0700995 def getItemForChange(self, change):
996 for item in self.pipeline.getAllItems():
997 if item.change.equals(change):
998 return item
999 return None
1000
James E. Blair2fa50962013-01-30 21:50:41 -08001001 def findOldVersionOfChangeAlreadyInQueue(self, change):
1002 for c in self.pipeline.getChangesInQueue():
1003 if change.isUpdateOf(c):
1004 return c
1005 return None
1006
1007 def removeOldVersionsOfChange(self, change):
1008 if not self.pipeline.dequeue_on_new_patchset:
1009 return
1010 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
1011 if old_change:
1012 self.log.debug("Change %s is a new version of %s, removing %s" %
1013 (change, old_change, old_change))
1014 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -08001015
James E. Blair972e3c72013-08-29 12:04:55 -07001016 def reEnqueueItem(self, item):
James E. Blaircdccd972013-07-01 12:10:22 -07001017 change_queue = self.pipeline.getQueue(item.change.project)
1018 if change_queue:
1019 self.log.debug("Re-enqueing change %s in queue %s" %
1020 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -07001021 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -07001022 self.reportStats(item)
1023 return True
1024 else:
1025 self.log.error("Unable to find change queue for project %s" %
1026 item.change.project)
1027 return False
1028
Sean Daguef39b9ca2014-01-10 21:34:35 -05001029 def addChange(self, change, quiet=False, enqueue_time=None):
James E. Blaire0487072012-08-29 17:38:31 -07001030 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -07001031 if self.isChangeAlreadyInQueue(change):
1032 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001033 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001034
James E. Blaire0487072012-08-29 17:38:31 -07001035 if not self.isChangeReadyToBeEnqueued(change):
1036 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1037 change)
1038 return False
1039
James E. Blair36658cf2013-12-06 17:53:48 -08001040 if not self.enqueueChangesAhead(change, quiet):
James E. Blair1490eba2013-03-06 19:14:00 -08001041 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001042 return False
1043
1044 if self.isChangeAlreadyInQueue(change):
1045 self.log.debug("Change %s is already in queue, ignoring" % change)
1046 return True
1047
1048 change_queue = self.pipeline.getQueue(change.project)
1049 if change_queue:
1050 self.log.debug("Adding change %s to queue %s" %
1051 (change, change_queue))
James E. Blair36658cf2013-12-06 17:53:48 -08001052 if not quiet:
1053 if len(self.pipeline.start_actions) > 0:
1054 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -07001055 item = change_queue.enqueueChange(change)
Sean Daguef39b9ca2014-01-10 21:34:35 -05001056 if enqueue_time:
1057 item.enqueue_time = enqueue_time
James E. Blairfee8d652013-06-07 08:57:52 -07001058 self.reportStats(item)
James E. Blair36658cf2013-12-06 17:53:48 -08001059 self.enqueueChangesBehind(change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001060 else:
1061 self.log.error("Unable to find change queue for project %s" %
1062 change.project)
1063 return False
James E. Blairee743612012-05-29 14:49:32 -07001064
James E. Blair972e3c72013-08-29 12:04:55 -07001065 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001066 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairfee8d652013-06-07 08:57:52 -07001067 change_queue = self.pipeline.getQueue(item.change.project)
1068 change_queue.dequeueItem(item)
James E. Blair0e933c52013-07-11 10:18:52 -07001069 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -08001070
1071 def removeChange(self, change):
1072 # Remove a change from the queue, probably because it has been
1073 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -07001074 for item in self.pipeline.getAllItems():
1075 if item.change == change:
1076 self.log.debug("Canceling builds behind change: %s "
1077 "because it is being removed." % item.change)
1078 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001079 self.dequeueItem(item)
James E. Blair94235562013-08-26 18:12:31 -07001080 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001081
James E. Blairac2c3242014-01-24 13:38:51 -08001082 def _makeMergerItem(self, item):
1083 # Create a dictionary with all info about the item needed by
1084 # the merger.
1085 return dict(project=item.change.project.name,
1086 url=self.sched.triggers['gerrit'].getGitUrl(
1087 item.change.project),
1088 merge_mode=item.change.project.merge_mode,
1089 refspec=item.change.refspec,
1090 branch=item.change.branch,
1091 ref=item.current_build_set.ref,
1092 )
1093
James E. Blairfee8d652013-06-07 08:57:52 -07001094 def prepareRef(self, item):
1095 # Returns False on success.
1096 # Returns True if we were unable to prepare the ref.
1097 ref = item.current_build_set.ref
1098 if hasattr(item.change, 'refspec') and not ref:
1099 self.log.debug("Preparing ref for: %s" % item.change)
1100 item.current_build_set.setConfiguration()
1101 ref = item.current_build_set.ref
1102 dependent_items = self.getDependentItems(item)
1103 dependent_items.reverse()
1104 all_items = dependent_items + [item]
James E. Blairac2c3242014-01-24 13:38:51 -08001105 merger_items = map(self._makeMergerItem, all_items)
1106 commit = self.sched.merger.mergeChanges(merger_items)
James E. Blairfee8d652013-06-07 08:57:52 -07001107 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -07001108 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -07001109 self.log.info("Unable to merge change %s" % item.change)
James E. Blair972e3c72013-08-29 12:04:55 -07001110 msg = ("This change was unable to be automatically merged "
1111 "with the current state of the repository. Please "
1112 "rebase your change and upload a new patchset.")
James E. Blair6736beb2013-07-11 15:18:15 -07001113 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -07001114 return True
1115 return False
1116
1117 def _launchJobs(self, item, jobs):
1118 self.log.debug("Launching jobs for change %s" % item.change)
1119 dependent_items = self.getDependentItems(item)
1120 for job in jobs:
1121 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001122 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001123 build = self.sched.launcher.launch(job, item,
1124 self.pipeline,
1125 dependent_items)
1126 self.building_jobs[build] = item
1127 self.log.debug("Adding build %s of job %s to item %s" %
1128 (build, job, item))
1129 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001130 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001131 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001132 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001133
James E. Blairfee8d652013-06-07 08:57:52 -07001134 def launchJobs(self, item):
1135 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001136 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001137 self._launchJobs(item, jobs)
1138
1139 def cancelJobs(self, item, prime=True):
1140 self.log.debug("Cancel jobs for change %s" % item.change)
1141 canceled = False
1142 to_remove = []
James E. Blair36658cf2013-12-06 17:53:48 -08001143 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001144 item.resetAllBuilds()
1145 for build, build_item in self.building_jobs.items():
1146 if build_item == item:
1147 self.log.debug("Found build %s for change %s to cancel" %
1148 (build, item.change))
1149 try:
1150 self.sched.launcher.cancel(build)
1151 except:
1152 self.log.exception("Exception while canceling build %s "
1153 "for change %s" % (build, item.change))
1154 to_remove.append(build)
1155 canceled = True
1156 for build in to_remove:
1157 self.log.debug("Removing build %s from running builds" % build)
1158 build.result = 'CANCELED'
1159 del self.building_jobs[build]
James E. Blair972e3c72013-08-29 12:04:55 -07001160 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001161 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001162 (item_behind.change, item.change))
1163 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001164 canceled = True
1165 return canceled
1166
James E. Blair972e3c72013-08-29 12:04:55 -07001167 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001168 changed = False
1169 item_ahead = item.item_ahead
James E. Blair972e3c72013-08-29 12:04:55 -07001170 change_queue = self.pipeline.getQueue(item.change.project)
1171 failing_reasons = [] # Reasons this item is failing
1172
James E. Blairfee8d652013-06-07 08:57:52 -07001173 if self.checkForChangesNeededBy(item.change) is not True:
1174 # It's not okay to enqueue this change, we should remove it.
1175 self.log.info("Dequeuing change %s because "
1176 "it can no longer merge" % item.change)
1177 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001178 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001179 self.pipeline.setDequeuedNeedingChange(item)
1180 try:
1181 self.reportItem(item)
1182 except MergeFailure:
1183 pass
James E. Blair972e3c72013-08-29 12:04:55 -07001184 return (True, nnfi)
1185 dep_item = self.getFailingDependentItem(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001186 actionable = change_queue.isActionable(item)
1187 item.active = actionable
James E. Blair972e3c72013-08-29 12:04:55 -07001188 if dep_item:
1189 failing_reasons.append('a needed change is failing')
1190 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001191 else:
James E. Blairfef71632013-09-23 11:15:47 -07001192 item_ahead_merged = False
1193 if ((item_ahead and item_ahead.change.is_merged) or
1194 not change_queue.dependent):
1195 item_ahead_merged = True
1196 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001197 # Our current base is different than what we expected,
1198 # and it's not because our current base merged. Something
1199 # ahead must have failed.
1200 self.log.info("Resetting builds for change %s because the "
1201 "item ahead, %s, is not the nearest non-failing "
1202 "item, %s" % (item.change, item_ahead, nnfi))
1203 change_queue.moveItem(item, nnfi)
1204 changed = True
1205 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001206 if actionable:
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001207 self.prepareRef(item)
1208 if item.current_build_set.unable_to_merge:
1209 failing_reasons.append("it has a merge conflict")
Clark Boylanaf2476f2014-01-23 14:47:36 -08001210 if actionable and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001211 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001212 if self.pipeline.didAnyJobFail(item):
1213 failing_reasons.append("at least one job failed")
1214 if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
1215 try:
1216 self.reportItem(item)
1217 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001218 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001219 for item_behind in item.items_behind:
1220 self.log.info("Resetting builds for change %s because the "
1221 "item ahead, %s, failed to merge" %
1222 (item_behind.change, item))
1223 self.cancelJobs(item_behind)
1224 self.dequeueItem(item)
1225 changed = True
1226 elif not failing_reasons:
1227 nnfi = item
1228 item.current_build_set.failing_reasons = failing_reasons
1229 if failing_reasons:
1230 self.log.debug("%s is a failing item because %s" %
1231 (item, failing_reasons))
1232 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001233
1234 def processQueue(self):
1235 # Do whatever needs to be done for each change in the queue
1236 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1237 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001238 for queue in self.pipeline.queues:
1239 queue_changed = False
1240 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001241 for item in queue.queue[:]:
James E. Blair972e3c72013-08-29 12:04:55 -07001242 item_changed, nnfi = self._processOneItem(item, nnfi)
1243 if item_changed:
1244 queue_changed = True
1245 self.reportStats(item)
1246 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001247 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001248 status = ''
1249 for item in queue.queue:
1250 status += self.pipeline.formatStatus(item)
1251 if status:
1252 self.log.debug("Queue %s status is now:\n %s" %
1253 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001254 self.log.debug("Finished queue processor: %s (changed: %s)" %
1255 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001256 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001257
James E. Blair11700c32012-07-05 17:50:05 -07001258 def updateBuildDescriptions(self, build_set):
1259 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001260 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001261 self.sched.launcher.setBuildDescription(build, desc)
1262
1263 if build_set.previous_build_set:
1264 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001265 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001266 self.sched.launcher.setBuildDescription(build, desc)
1267
1268 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -07001269 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -07001270 # Or triggered externally, or triggered before zuul started,
1271 # or restarted
1272 return False
1273
James E. Blairfee8d652013-06-07 08:57:52 -07001274 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001275 self.updateBuildDescriptions(build.build_set)
1276 return True
1277
James E. Blairee743612012-05-29 14:49:32 -07001278 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -07001279 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -07001280 # Or triggered externally, or triggered before zuul started,
1281 # or restarted
1282 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001283
1284 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -07001285 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001286 self.log.debug("Found change %s which triggered completed build %s" %
1287 (change, build))
James E. Blairee743612012-05-29 14:49:32 -07001288
1289 del self.building_jobs[build]
1290
James E. Blair4aea70c2012-07-26 14:23:24 -07001291 self.pipeline.setResult(change, build)
James E. Blair972e3c72013-08-29 12:04:55 -07001292 self.log.debug("Change %s status is now:\n %s" %
1293 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -07001294 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -07001295 return True
1296
James E. Blairfee8d652013-06-07 08:57:52 -07001297 def reportItem(self, item):
James E. Blaire5910202013-12-27 09:50:31 -08001298 if item.reported:
James E. Blairfee8d652013-06-07 08:57:52 -07001299 raise Exception("Already reported change %s" % item.change)
1300 ret = self._reportItem(item)
1301 if self.changes_merge:
1302 succeeded = self.pipeline.didAllJobsSucceed(item)
1303 merged = (not ret)
1304 if merged:
James E. Blair6c358e72013-07-29 17:06:47 -07001305 merged = self.pipeline.trigger.isMerged(item.change,
1306 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001307 self.log.info("Reported change %s status: all-succeeded: %s, "
1308 "merged: %s" % (item.change, succeeded, merged))
James E. Blair4a035d92014-01-23 13:10:48 -08001309 change_queue = self.pipeline.getQueue(item.change.project)
James E. Blairfee8d652013-06-07 08:57:52 -07001310 if not (succeeded and merged):
1311 self.log.debug("Reported change %s failed tests or failed "
1312 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001313 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001314 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001315 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001316 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001317 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001318 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001319 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001320 (change_queue, change_queue.window))
James E. Blaire0487072012-08-29 17:38:31 -07001321
James E. Blairfee8d652013-06-07 08:57:52 -07001322 def _reportItem(self, item):
James E. Blaire5910202013-12-27 09:50:31 -08001323 if item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001324 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001325 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001326 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001327 if self.pipeline.didAllJobsSucceed(item):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001328 self.log.debug("success %s %s" % (self.pipeline.success_actions,
1329 self.pipeline.failure_actions))
1330 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001331 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001332 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001333 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001334 item.setReportedResult('FAILURE')
James E. Blairfee8d652013-06-07 08:57:52 -07001335 item.reported = True
James E. Blaire5910202013-12-27 09:50:31 -08001336 if actions:
1337 report = self.formatReport(item)
1338 try:
1339 self.log.info("Reporting change %s, actions: %s" %
1340 (item.change, actions))
1341 ret = self.sendReport(actions, item.change, report)
1342 if ret:
1343 self.log.error("Reporting change %s received: %s" %
1344 (item.change, ret))
1345 except:
1346 self.log.exception("Exception while reporting:")
1347 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001348 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001349 return ret
1350
James E. Blairfee8d652013-06-07 08:57:52 -07001351 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001352 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001353 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001354 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001355 else:
James E. Blair56370192013-01-14 15:47:28 -08001356 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001357
James E. Blairfee8d652013-06-07 08:57:52 -07001358 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001359 ret += "This change depends on a change that failed to merge."
James E. Blair6736beb2013-07-11 15:18:15 -07001360 elif item.current_build_set.unable_to_merge_message:
1361 ret += item.current_build_set.unable_to_merge_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001362 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001363 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001364 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001365 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001366 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001367 for job in self.pipeline.getJobs(item.change):
1368 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001369 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001370 pattern = url_pattern
1371 if result == 'SUCCESS':
1372 if job.success_message:
1373 result = job.success_message
1374 if job.success_pattern:
1375 pattern = job.success_pattern
1376 elif result == 'FAILURE':
1377 if job.failure_message:
1378 result = job.failure_message
1379 if job.failure_pattern:
1380 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001381 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001382 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001383 pipeline=self.pipeline,
1384 job=job,
1385 build=build)
1386 else:
1387 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001388 if not job.voting:
1389 voting = ' (non-voting)'
1390 else:
1391 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001392 if self.report_times and build.end_time and build.start_time:
1393 dt = int(build.end_time - build.start_time)
1394 m, s = divmod(dt, 60)
1395 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001396 if h:
1397 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1398 elif m:
1399 elapsed = ' in %dm %02ds' % (m, s)
1400 else:
1401 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001402 else:
1403 elapsed = ''
James E. Blair754e31e2013-08-18 13:15:15 -07001404 name = ''
James E. Blair8fef52b2013-08-17 17:32:50 -07001405 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1406 if self.sched.config.getboolean('zuul',
1407 'job_name_in_report'):
1408 name = job.name + ' '
James E. Blair8fef52b2013-08-17 17:32:50 -07001409 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1410 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001411 return ret
1412
1413 def formatDescription(self, build):
1414 concurrent_changes = ''
1415 concurrent_builds = ''
1416 other_builds = ''
1417
1418 for change in build.build_set.other_changes:
1419 concurrent_changes += '<li><a href="{change.url}">\
1420 {change.number},{change.patchset}</a></li>'.format(
1421 change=change)
1422
James E. Blairfee8d652013-06-07 08:57:52 -07001423 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001424
1425 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001426 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001427 concurrent_builds += """\
1428<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001429 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001430 {build.job.name} #{build.number}</a>: {build.result}
1431</li>
1432""".format(build=build)
1433 else:
1434 concurrent_builds += """\
1435<li>
1436 {build.job.name}: {build.result}
1437</li>""".format(build=build)
1438
1439 if build.build_set.previous_build_set:
1440 other_build = build.build_set.previous_build_set.getBuild(
1441 build.job.name)
1442 if other_build:
1443 other_builds += """\
1444<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001445 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001446 {build.job.name} #{build.number}</a>
1447</li>
1448""".format(build=other_build)
1449
1450 if build.build_set.next_build_set:
1451 other_build = build.build_set.next_build_set.getBuild(
1452 build.job.name)
1453 if other_build:
1454 other_builds += """\
1455<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001456 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001457 {build.job.name} #{build.number}</a>
1458</li>
1459""".format(build=other_build)
1460
1461 result = build.build_set.result
1462
1463 if hasattr(change, 'number'):
1464 ret = """\
1465<p>
1466 Triggered by change:
1467 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1468 Branch: <b>{change.branch}</b><br/>
1469 Pipeline: <b>{self.pipeline.name}</b>
1470</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001471 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001472 ret = """\
1473<p>
1474 Triggered by reference:
1475 {change.ref}</a><br/>
1476 Old revision: <b>{change.oldrev}</b><br/>
1477 New revision: <b>{change.newrev}</b><br/>
1478 Pipeline: <b>{self.pipeline.name}</b>
1479</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001480 else:
1481 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001482
1483 if concurrent_changes:
1484 ret += """\
1485<p>
1486 Other changes tested concurrently with this change:
1487 <ul>{concurrent_changes}</ul>
1488</p>
1489"""
1490 if concurrent_builds:
1491 ret += """\
1492<p>
1493 All builds for this change set:
1494 <ul>{concurrent_builds}</ul>
1495</p>
1496"""
1497
1498 if other_builds:
1499 ret += """\
1500<p>
1501 Other build sets for this change:
1502 <ul>{other_builds}</ul>
1503</p>
1504"""
1505 if result:
1506 ret += """\
1507<p>
1508 Reported result: <b>{result}</b>
1509</p>
1510"""
1511
1512 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001513 return ret
1514
James E. Blairfee8d652013-06-07 08:57:52 -07001515 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001516 if not statsd:
1517 return
1518 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001519 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001520 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001521 if item.dequeue_time:
1522 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001523 else:
1524 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001525 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001526
1527 # stats.timers.zuul.pipeline.NAME.resident_time
1528 # stats_counts.zuul.pipeline.NAME.total_changes
1529 # stats.gauges.zuul.pipeline.NAME.current_changes
1530 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001531 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001532 if dt:
1533 statsd.timing(key + '.resident_time', dt)
1534 statsd.incr(key + '.total_changes')
1535
1536 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1537 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001538 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001539 key += '.%s' % project_name
1540 if dt:
1541 statsd.timing(key + '.resident_time', dt)
1542 statsd.incr(key + '.total_changes')
1543 except:
1544 self.log.exception("Exception reporting pipeline stats")
1545
James E. Blair1e8dd892012-05-30 09:15:05 -07001546
James E. Blair4aea70c2012-07-26 14:23:24 -07001547class IndependentPipelineManager(BasePipelineManager):
1548 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001549 changes_merge = False
1550
James E. Blaireff88162013-07-01 12:44:14 -04001551 def _postConfig(self, layout):
1552 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001553
1554 change_queue = ChangeQueue(self.pipeline, dependent=False)
1555 for project in self.pipeline.getProjects():
1556 change_queue.addProject(project)
1557
1558 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001559
James E. Blair1e8dd892012-05-30 09:15:05 -07001560
James E. Blair4aea70c2012-07-26 14:23:24 -07001561class DependentPipelineManager(BasePipelineManager):
1562 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001563 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001564
1565 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001566 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001567
James E. Blaireff88162013-07-01 12:44:14 -04001568 def _postConfig(self, layout):
1569 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001570 self.buildChangeQueues()
1571
1572 def buildChangeQueues(self):
1573 self.log.debug("Building shared change queues")
1574 change_queues = []
1575
James E. Blair4aea70c2012-07-26 14:23:24 -07001576 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001577 change_queue = ChangeQueue(
1578 self.pipeline,
1579 window=self.pipeline.window,
1580 window_floor=self.pipeline.window_floor,
1581 window_increase_type=self.pipeline.window_increase_type,
1582 window_increase_factor=self.pipeline.window_increase_factor,
1583 window_decrease_type=self.pipeline.window_decrease_type,
1584 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001585 change_queue.addProject(project)
1586 change_queues.append(change_queue)
1587 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001588
James E. Blairc3d428e2013-12-03 15:06:48 -08001589 # Iterate over all queues trying to combine them, and keep doing
1590 # so until they can not be combined further.
1591 last_change_queues = change_queues
1592 while True:
1593 new_change_queues = self.combineChangeQueues(last_change_queues)
1594 if len(last_change_queues) == len(new_change_queues):
1595 break
1596 last_change_queues = new_change_queues
1597
1598 self.log.info(" Shared change queues:")
1599 for queue in new_change_queues:
1600 self.pipeline.addQueue(queue)
1601 self.log.info(" %s" % queue)
1602
1603 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001604 self.log.debug("Combining shared queues")
1605 new_change_queues = []
1606 for a in change_queues:
1607 merged_a = False
1608 for b in new_change_queues:
1609 if not a.getJobs().isdisjoint(b.getJobs()):
1610 self.log.debug("Merging queue %s into %s" % (a, b))
1611 b.mergeChangeQueue(a)
1612 merged_a = True
1613 break # this breaks out of 'for b' and continues 'for a'
1614 if not merged_a:
1615 self.log.debug("Keeping queue %s" % (a))
1616 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001617 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001618
James E. Blaire0487072012-08-29 17:38:31 -07001619 def isChangeReadyToBeEnqueued(self, change):
James E. Blair6c358e72013-07-29 17:06:47 -07001620 if not self.pipeline.trigger.canMerge(change,
1621 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001622 self.log.debug("Change %s can not merge, ignoring" % change)
1623 return False
1624 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001625
James E. Blair36658cf2013-12-06 17:53:48 -08001626 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001627 to_enqueue = []
1628 self.log.debug("Checking for changes needing %s:" % change)
1629 if not hasattr(change, 'needed_by_changes'):
1630 self.log.debug(" Changeish does not support dependencies")
1631 return
1632 for needs in change.needed_by_changes:
James E. Blair6c358e72013-07-29 17:06:47 -07001633 if self.pipeline.trigger.canMerge(needs,
1634 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001635 self.log.debug(" Change %s needs %s and is ready to merge" %
1636 (needs, change))
1637 to_enqueue.append(needs)
1638 if not to_enqueue:
1639 self.log.debug(" No changes need %s" % change)
1640
1641 for other_change in to_enqueue:
James E. Blair36658cf2013-12-06 17:53:48 -08001642 self.addChange(other_change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001643
James E. Blair36658cf2013-12-06 17:53:48 -08001644 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001645 ret = self.checkForChangesNeededBy(change)
1646 if ret in [True, False]:
1647 return ret
1648 self.log.debug(" Change %s must be merged ahead of %s" %
1649 (ret, change))
James E. Blair36658cf2013-12-06 17:53:48 -08001650 return self.addChange(ret, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001651
1652 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001653 self.log.debug("Checking for changes needed by %s:" % change)
1654 # Return true if okay to proceed enqueing this change,
1655 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001656 if not hasattr(change, 'needs_change'):
1657 self.log.debug(" Changeish does not support dependencies")
1658 return True
James E. Blaire421a232012-07-25 16:59:21 -07001659 if not change.needs_change:
1660 self.log.debug(" No changes needed")
1661 return True
1662 if change.needs_change.is_merged:
1663 self.log.debug(" Needed change is merged")
1664 return True
1665 if not change.needs_change.is_current_patchset:
1666 self.log.debug(" Needed change is not the current patchset")
1667 return False
James E. Blair127bc182012-08-28 15:55:15 -07001668 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001669 self.log.debug(" Needed change is already ahead in the queue")
1670 return True
James E. Blair6c358e72013-07-29 17:06:47 -07001671 if self.pipeline.trigger.canMerge(change.needs_change,
1672 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001673 self.log.debug(" Change %s is needed" %
1674 change.needs_change)
1675 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001676 # The needed change can't be merged.
1677 self.log.debug(" Change %s is needed but can not be merged" %
1678 change.needs_change)
1679 return False
James E. Blair972e3c72013-08-29 12:04:55 -07001680
1681 def getFailingDependentItem(self, item):
1682 if not hasattr(item.change, 'needs_change'):
1683 return None
1684 if not item.change.needs_change:
1685 return None
1686 needs_item = self.getItemForChange(item.change.needs_change)
1687 if not needs_item:
1688 return None
1689 if needs_item.current_build_set.failing_reasons:
1690 return needs_item
1691 return None