blob: 73034c7bda217437f75af98870efa022d54c9e2b [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
Joshua Hesketh1879cf72013-08-19 14:13:15 +100032from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070033import merger
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040034from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070035
James E. Blair71e94122012-12-24 17:53:08 -080036statsd = extras.try_import('statsd.statsd')
37
James E. Blair1e8dd892012-05-30 09:15:05 -070038
Antoine Musso80edd5a2013-02-13 15:37:53 +010039def deep_format(obj, paramdict):
40 """Apply the paramdict via str.format() to all string objects found within
41 the supplied obj. Lists and dicts are traversed recursively.
42
43 Borrowed from Jenkins Job Builder project"""
44 if isinstance(obj, str):
45 ret = obj.format(**paramdict)
46 elif isinstance(obj, list):
47 ret = []
48 for item in obj:
49 ret.append(deep_format(item, paramdict))
50 elif isinstance(obj, dict):
51 ret = {}
52 for item in obj:
53 exp_item = item.format(**paramdict)
54
55 ret[exp_item] = deep_format(obj[item], paramdict)
56 else:
57 ret = obj
58 return ret
59
60
James E. Blairfee8d652013-06-07 08:57:52 -070061class MergeFailure(Exception):
62 pass
63
64
James E. Blair468c8512013-12-06 13:27:19 -080065class ManagementEvent(object):
66 """An event that should be processed within the main queue run loop"""
67 def __init__(self):
68 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080069 self._exception = None
70 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080071
James E. Blair36658cf2013-12-06 17:53:48 -080072 def exception(self, e, tb):
73 self._exception = e
74 self._traceback = tb
75 self._wait_event.set()
76
77 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080078 self._wait_event.set()
79
80 def wait(self, timeout=None):
81 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080082 if self._exception:
83 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080084 return self._wait_event.is_set()
85
86
87class ReconfigureEvent(ManagementEvent):
88 """Reconfigure the scheduler. The layout will be (re-)loaded from
89 the path specified in the configuration.
90
91 :arg ConfigParser config: the new configuration
92 """
93 def __init__(self, config):
94 super(ReconfigureEvent, self).__init__()
95 self.config = config
96
97
James E. Blair36658cf2013-12-06 17:53:48 -080098class PromoteEvent(ManagementEvent):
99 """Promote one or more changes to the head of the queue.
100
101 :arg str pipeline_name: the name of the pipeline
102 :arg list change_ids: a list of strings of change ids in the form
103 1234,1
104 """
105
106 def __init__(self, pipeline_name, change_ids):
107 super(PromoteEvent, self).__init__()
108 self.pipeline_name = pipeline_name
109 self.change_ids = change_ids
110
111
James E. Blaire9d45c32012-05-31 09:56:45 -0700112class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700113 log = logging.getLogger("zuul.Scheduler")
114
James E. Blaire9d45c32012-05-31 09:56:45 -0700115 def __init__(self):
116 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400117 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700118 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700119 self.layout_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700120 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700121 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700122 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700123 self.launcher = None
James E. Blair6c358e72013-07-29 17:06:47 -0700124 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000125 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700126 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -0700127 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -0700128
129 self.trigger_event_queue = Queue.Queue()
130 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800131 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400132 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700133
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400134 self.zuul_version = zuul_version.version_info.version_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400135 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400136
James E. Blairb0fcae42012-07-17 11:12:10 -0700137 def stop(self):
138 self._stopped = True
139 self.wake_event.set()
140
James E. Blair47958382013-01-10 17:26:02 -0800141 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700142 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800143
James E. Blaire5a847f2012-07-10 15:29:14 -0700144 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400145 layout = model.Layout()
146 project_templates = {}
147
James E. Blairee743612012-05-29 14:49:32 -0700148 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -0700149 if not item:
150 return []
James E. Blair32663402012-06-01 10:04:18 -0700151 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700152 return item
153 return [item]
154
James E. Blaire5a847f2012-07-10 15:29:14 -0700155 if config_path:
156 config_path = os.path.expanduser(config_path)
157 if not os.path.exists(config_path):
158 raise Exception("Unable to read layout config file at %s" %
159 config_path)
160 config_file = open(config_path)
161 data = yaml.load(config_file)
162
James E. Blair47958382013-01-10 17:26:02 -0800163 validator = layoutvalidator.LayoutValidator()
164 validator.validate(data)
165
James E. Blaireff88162013-07-01 12:44:14 -0400166 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700167 for include in data.get('includes', []):
168 if 'python-file' in include:
169 fn = include['python-file']
170 if not os.path.isabs(fn):
171 base = os.path.dirname(config_path)
172 fn = os.path.join(base, fn)
173 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400174 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700175
James E. Blair4aea70c2012-07-26 14:23:24 -0700176 for conf_pipeline in data.get('pipelines', []):
177 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800178 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700179 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
180 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800181 pipeline.failure_message = conf_pipeline.get('failure-message',
182 "Build failed.")
183 pipeline.success_message = conf_pipeline.get('success-message',
184 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800185 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700186 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000187
188 action_reporters = {}
189 for action in ['start', 'success', 'failure']:
190 action_reporters[action] = []
191 if conf_pipeline.get(action):
192 for reporter_name, params \
193 in conf_pipeline.get(action).items():
194 if reporter_name in self.reporters.keys():
195 action_reporters[action].append(ActionReporter(
196 self.reporters[reporter_name], params))
197 else:
198 self.log.error('Invalid reporter name %s' %
199 reporter_name)
200 pipeline.start_actions = action_reporters['start']
201 pipeline.success_actions = action_reporters['success']
202 pipeline.failure_actions = action_reporters['failure']
203
Clark Boylan7603a372014-01-21 11:43:20 -0800204 pipeline.window = conf_pipeline.get('window', 20)
205 pipeline.window_floor = conf_pipeline.get('window-floor', 3)
206 pipeline.window_increase_type = conf_pipeline.get(
207 'window-increase-type', 'linear')
208 pipeline.window_increase_factor = conf_pipeline.get(
209 'window-increase-factor', 1)
210 pipeline.window_decrease_type = conf_pipeline.get(
211 'window-decrease-type', 'exponential')
212 pipeline.window_decrease_factor = conf_pipeline.get(
213 'window-decrease-factor', 2)
214
James E. Blair4aea70c2012-07-26 14:23:24 -0700215 manager = globals()[conf_pipeline['manager']](self, pipeline)
216 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400217 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000218
James E. Blair6c358e72013-07-29 17:06:47 -0700219 # TODO: move this into triggers (may require pluggable
220 # configuration)
221 if 'gerrit' in conf_pipeline['trigger']:
222 pipeline.trigger = self.triggers['gerrit']
223 for trigger in toList(conf_pipeline['trigger']['gerrit']):
224 approvals = {}
225 for approval_dict in toList(trigger.get('approval')):
226 for k, v in approval_dict.items():
227 approvals[k] = v
228 f = EventFilter(types=toList(trigger['event']),
229 branches=toList(trigger.get('branch')),
230 refs=toList(trigger.get('ref')),
James E. Blairc053d022014-01-22 14:57:33 -0800231 event_approvals=approvals,
James E. Blair6c358e72013-07-29 17:06:47 -0700232 comment_filters=
233 toList(trigger.get('comment_filter')),
234 email_filters=
Joshua Heskethb8a817e2013-12-27 11:21:38 +1100235 toList(trigger.get('email_filter')),
236 username_filters=
James E. Blairc053d022014-01-22 14:57:33 -0800237 toList(trigger.get('username_filter')),
238 require_approvals=
239 toList(trigger.get('require-approval')))
James E. Blair6c358e72013-07-29 17:06:47 -0700240 manager.event_filters.append(f)
James E. Blair63bb0ef2013-07-29 17:14:51 -0700241 elif 'timer' in conf_pipeline['trigger']:
242 pipeline.trigger = self.triggers['timer']
243 for trigger in toList(conf_pipeline['trigger']['timer']):
244 f = EventFilter(types=['timer'],
245 timespecs=toList(trigger['time']))
246 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700247
Antoine Musso80edd5a2013-02-13 15:37:53 +0100248 for project_template in data.get('project-templates', []):
249 # Make sure the template only contains valid pipelines
250 tpl = dict(
251 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400252 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100253 if pipe_name in project_template
254 )
James E. Blaireff88162013-07-01 12:44:14 -0400255 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100256
James E. Blair47958382013-01-10 17:26:02 -0800257 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400258 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700259 # Be careful to only set attributes explicitly present on
260 # this job, to avoid squashing attributes set by a meta-job.
261 m = config_job.get('failure-message', None)
262 if m:
263 job.failure_message = m
264 m = config_job.get('success-message', None)
265 if m:
266 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800267 m = config_job.get('failure-pattern', None)
268 if m:
269 job.failure_pattern = m
270 m = config_job.get('success-pattern', None)
271 if m:
272 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700273 m = config_job.get('hold-following-changes', False)
274 if m:
275 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700276 m = config_job.get('voting', None)
277 if m is not None:
278 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700279 fname = config_job.get('parameter-function', None)
280 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400281 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700282 if not func:
283 raise Exception("Unable to find function %s" % fname)
284 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700285 branches = toList(config_job.get('branch'))
286 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700287 job._branches = branches
288 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800289 files = toList(config_job.get('files'))
290 if files:
291 job._files = files
292 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700293
294 def add_jobs(job_tree, config_jobs):
295 for job in config_jobs:
296 if isinstance(job, list):
297 for x in job:
298 add_jobs(job_tree, x)
299 if isinstance(job, dict):
300 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400301 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700302 add_jobs(parent_tree, children)
303 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400304 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700305
James E. Blair47958382013-01-10 17:26:02 -0800306 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700307 project = Project(config_project['name'])
James E. Blairaea6cf62013-12-16 15:38:12 -0800308 shortname = config_project['name'].split('/')[-1]
Antoine Musso80edd5a2013-02-13 15:37:53 +0100309
James E. Blair3e98c022013-12-16 15:25:38 -0800310 # This is reversed due to the prepend operation below, so
311 # the ultimate order is templates (in order) followed by
312 # statically defined jobs.
313 for requested_template in reversed(
314 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100315 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400316 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100317 requested_template.get('name'))
318 # Expand it with the project context
James E. Blairaea6cf62013-12-16 15:38:12 -0800319 requested_template['name'] = shortname
Antoine Musso80edd5a2013-02-13 15:37:53 +0100320 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800321 # Finally merge the expansion with whatever has been
322 # already defined for this project. Prepend our new
323 # jobs to existing ones (which may have been
324 # statically defined or defined by other templates).
325 for pipeline in layout.pipelines.values():
326 if pipeline.name in expanded:
327 config_project.update(
328 {pipeline.name: expanded[pipeline.name] +
329 config_project.get(pipeline.name, [])})
330 # TODO: future enhancement -- add an option to the
331 # template block to indicate that duplicate jobs should be
332 # merged (especially to handle the case where they have
333 # children and you want all of the children to run after a
334 # single run of the parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100335
James E. Blaireff88162013-07-01 12:44:14 -0400336 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700337 mode = config_project.get('merge-mode', 'merge-resolve')
338 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400339 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700340 if pipeline.name in config_project:
341 job_tree = pipeline.addProject(project)
342 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700343 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700344
James E. Blairb0954652012-06-01 11:32:01 -0700345 # All jobs should be defined at this point, get rid of
346 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700347 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700348
James E. Blaireff88162013-07-01 12:44:14 -0400349 for pipeline in layout.pipelines.values():
350 pipeline.manager._postConfig(layout)
351
352 return layout
James E. Blairee743612012-05-29 14:49:32 -0700353
James E. Blair47958382013-01-10 17:26:02 -0800354 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700355 if self.config.has_option('zuul', 'git_dir'):
356 merge_root = self.config.get('zuul', 'git_dir')
357 else:
358 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800359
Paul Belangerb67aba12013-05-13 19:22:14 -0400360 if self.config.has_option('zuul', 'git_user_email'):
361 merge_email = self.config.get('zuul', 'git_user_email')
362 else:
363 merge_email = None
364
365 if self.config.has_option('zuul', 'git_user_name'):
366 merge_name = self.config.get('zuul', 'git_user_name')
367 else:
368 merge_name = None
369
James E. Blair87650fa2014-01-08 11:43:23 +0800370 replicate_urls = []
371 if self.config.has_section('replication'):
372 for k, v in self.config.items('replication'):
373 replicate_urls.append(v)
374
James E. Blairad615012012-11-30 16:14:21 -0800375 if self.config.has_option('gerrit', 'sshkey'):
376 sshkey = self.config.get('gerrit', 'sshkey')
377 else:
378 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800379
James E. Blair6c358e72013-07-29 17:06:47 -0700380 # TODO: The merger should have an upstream repo independent of
381 # triggers, and then each trigger should provide a fetch
382 # location.
James E. Blair01c2e632014-01-23 10:55:08 -0800383 self.merger = merger.Merger(merge_root, sshkey, merge_email,
384 merge_name, replicate_urls)
James E. Blaireff88162013-07-01 12:44:14 -0400385 for project in self.layout.projects.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700386 url = self.triggers['gerrit'].getGitUrl(project)
James E. Blair4886cc12012-07-18 15:39:41 -0700387 self.merger.addProject(project, url)
388
James E. Blairee743612012-05-29 14:49:32 -0700389 def setLauncher(self, launcher):
390 self.launcher = launcher
391
James E. Blair6c358e72013-07-29 17:06:47 -0700392 def registerTrigger(self, trigger, name=None):
393 if name is None:
394 name = trigger.name
395 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700396
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000397 def registerReporter(self, reporter, name=None):
398 if name is None:
399 name = reporter.name
400 self.reporters[name] = reporter
401
James E. Blaircdccd972013-07-01 12:10:22 -0700402 def getProject(self, name):
403 self.layout_lock.acquire()
404 p = None
405 try:
406 p = self.layout.projects.get(name)
407 finally:
408 self.layout_lock.release()
409 return p
410
James E. Blairee743612012-05-29 14:49:32 -0700411 def addEvent(self, event):
412 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800413 try:
414 if statsd:
415 statsd.incr('gerrit.event.%s' % event.type)
416 except:
417 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700418 self.trigger_event_queue.put(event)
419 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800420 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700421
James E. Blair11700c32012-07-05 17:50:05 -0700422 def onBuildStarted(self, build):
423 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800424 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700425 self.result_event_queue.put(('started', build))
426 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800427 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700428
James E. Blairee743612012-05-29 14:49:32 -0700429 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700430 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800431 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800432 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700433 if statsd and build.pipeline:
434 jobname = build.job.name.replace('.', '_')
435 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
436 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800437 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
438 dt = int((build.end_time - build.start_time) * 1000)
439 statsd.timing(key, dt)
440 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700441 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
442 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800443 except:
444 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700445 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700446 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800447 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700448
James E. Blaire9d45c32012-05-31 09:56:45 -0700449 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700450 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800451 event = ReconfigureEvent(config)
452 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700453 self.wake_event.set()
454 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800455 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700456 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400457 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700458
James E. Blair36658cf2013-12-06 17:53:48 -0800459 def promote(self, pipeline_name, change_ids):
460 event = PromoteEvent(pipeline_name, change_ids)
461 self.management_event_queue.put(event)
462 self.wake_event.set()
463 self.log.debug("Waiting for promotion")
464 event.wait()
465 self.log.debug("Promotion complete")
466
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700467 def exit(self):
468 self.log.debug("Prepare to exit")
469 self._pause = True
470 self._exit = True
471 self.wake_event.set()
472 self.log.debug("Waiting for exit")
473
474 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700475 if self.config.has_option('zuul', 'state_dir'):
476 state_dir = os.path.expanduser(self.config.get('zuul',
477 'state_dir'))
478 else:
479 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700480 return os.path.join(state_dir, 'queue.pickle')
481
482 def _save_queue(self):
483 pickle_file = self._get_queue_pickle_file()
484 events = []
485 while not self.trigger_event_queue.empty():
486 events.append(self.trigger_event_queue.get())
487 self.log.debug("Queue length is %s" % len(events))
488 if events:
489 self.log.debug("Saving queue")
490 pickle.dump(events, open(pickle_file, 'wb'))
491
492 def _load_queue(self):
493 pickle_file = self._get_queue_pickle_file()
494 if os.path.exists(pickle_file):
495 self.log.debug("Loading queue")
496 events = pickle.load(open(pickle_file, 'rb'))
497 self.log.debug("Queue length is %s" % len(events))
498 for event in events:
499 self.trigger_event_queue.put(event)
500 else:
501 self.log.debug("No queue file found")
502
503 def _delete_queue(self):
504 pickle_file = self._get_queue_pickle_file()
505 if os.path.exists(pickle_file):
506 self.log.debug("Deleting saved queue")
507 os.unlink(pickle_file)
508
509 def resume(self):
510 try:
511 self._load_queue()
512 except:
513 self.log.exception("Unable to load queue")
514 try:
515 self._delete_queue()
516 except:
517 self.log.exception("Unable to delete saved queue")
518 self.log.debug("Resuming queue processing")
519 self.wake_event.set()
520
521 def _doPauseEvent(self):
522 if self._exit:
523 self.log.debug("Exiting")
524 self._save_queue()
525 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700526
James E. Blair468c8512013-12-06 13:27:19 -0800527 def _doReconfigureEvent(self, event):
528 # This is called in the scheduler loop after another thread submits
529 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700530 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800531 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700532 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700533 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700534 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400535 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700536 for name, new_pipeline in layout.pipelines.items():
537 old_pipeline = self.layout.pipelines.get(name)
538 if not old_pipeline:
539 if self.layout.pipelines:
540 # Don't emit this warning on startup
541 self.log.warning("No old pipeline matching %s found "
542 "when reconfiguring" % name)
543 continue
544 self.log.debug("Re-enqueueing changes for pipeline %s" %
545 name)
546 items_to_remove = []
547 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700548 for item in shared_queue.queue:
James E. Blaircdccd972013-07-01 12:10:22 -0700549 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700550 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700551 item.pipeline = None
552 project = layout.projects.get(item.change.project.name)
553 if not project:
554 self.log.warning("Unable to find project for "
555 "change %s while reenqueueing" %
556 item.change)
557 item.change.project = None
558 items_to_remove.append(item)
559 continue
560 item.change.project = project
James E. Blair972e3c72013-08-29 12:04:55 -0700561 if not new_pipeline.manager.reEnqueueItem(item):
James E. Blaircdccd972013-07-01 12:10:22 -0700562 items_to_remove.append(item)
563 builds_to_remove = []
564 for build, item in old_pipeline.manager.building_jobs.items():
565 if item in items_to_remove:
566 builds_to_remove.append(build)
567 self.log.warning("Deleting running build %s for "
568 "change %s while reenqueueing" % (
569 build, item.change))
570 for build in builds_to_remove:
571 del old_pipeline.manager.building_jobs[build]
572 new_pipeline.manager.building_jobs = \
573 old_pipeline.manager.building_jobs
574 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800575 self._setupMerger()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700576 for trigger in self.triggers.values():
577 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700578 if statsd:
579 try:
580 for pipeline in self.layout.pipelines.values():
581 items = len(pipeline.getAllItems())
582 # stats.gauges.zuul.pipeline.NAME.current_changes
583 key = 'zuul.pipeline.%s' % pipeline.name
584 statsd.gauge(key + '.current_changes', items)
585 except Exception:
586 self.log.exception("Exception reporting initial "
587 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700588 finally:
589 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700590
James E. Blair36658cf2013-12-06 17:53:48 -0800591 def _doPromoteEvent(self, event):
592 pipeline = self.layout.pipelines[event.pipeline_name]
593 change_ids = [c.split(',') for c in event.change_ids]
594 items_to_enqueue = []
595 change_queue = None
596 for shared_queue in pipeline.queues:
597 if change_queue:
598 break
599 for item in shared_queue.queue:
600 if (item.change.number == change_ids[0][0] and
601 item.change.patchset == change_ids[0][1]):
602 change_queue = shared_queue
603 break
604 if not change_queue:
605 raise Exception("Unable to find shared change queue for %s" %
606 event.change_ids[0])
607 for number, patchset in change_ids:
608 found = False
609 for item in change_queue.queue:
610 if (item.change.number == number and
611 item.change.patchset == patchset):
612 found = True
613 items_to_enqueue.append(item)
614 break
615 if not found:
616 raise Exception("Unable to find %s,%s in queue %s" %
617 (number, patchset, change_queue))
618 for item in change_queue.queue[:]:
619 if item not in items_to_enqueue:
620 items_to_enqueue.append(item)
621 pipeline.manager.cancelJobs(item)
622 pipeline.manager.dequeueItem(item)
623 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500624 pipeline.manager.addChange(
625 item.change,
626 enqueue_time=item.enqueue_time,
627 quiet=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800628 while pipeline.manager.processQueue():
629 pass
630
James E. Blaire9d45c32012-05-31 09:56:45 -0700631 def _areAllBuildsComplete(self):
632 self.log.debug("Checking if all builds are complete")
633 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400634 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700635 for build in pipeline.manager.building_jobs.keys():
636 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700637 waiting = True
638 if not waiting:
639 self.log.debug("All builds are complete")
640 return True
641 self.log.debug("All builds are not complete")
642 return False
643
James E. Blairee743612012-05-29 14:49:32 -0700644 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800645 if statsd:
646 self.log.debug("Statsd enabled")
647 else:
648 self.log.debug("Statsd disabled because python statsd "
649 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700650 while True:
651 self.log.debug("Run handler sleeping")
652 self.wake_event.wait()
653 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700654 if self._stopped:
655 return
James E. Blairee743612012-05-29 14:49:32 -0700656 self.log.debug("Run handler awake")
657 try:
James E. Blair468c8512013-12-06 13:27:19 -0800658 if not self.management_event_queue.empty():
659 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700660
James E. Blair263fba92013-02-27 13:07:19 -0800661 # Give result events priority -- they let us stop builds,
662 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700663 if not self.result_event_queue.empty():
664 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800665 elif not self._pause:
666 if not self.trigger_event_queue.empty():
667 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700668
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700669 if self._pause and self._areAllBuildsComplete():
670 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700671
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700672 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700673 if not (self.trigger_event_queue.empty() and
674 self.result_event_queue.empty()):
675 self.wake_event.set()
676 else:
677 if not self.result_event_queue.empty():
678 self.wake_event.set()
James E. Blair0e933c52013-07-11 10:18:52 -0700679
680 if self._maintain_trigger_cache:
681 self.maintainTriggerCache()
682 self._maintain_trigger_cache = False
683
James E. Blairee743612012-05-29 14:49:32 -0700684 except:
685 self.log.exception("Exception in run handler:")
686
James E. Blair0e933c52013-07-11 10:18:52 -0700687 def maintainTriggerCache(self):
688 relevant = set()
689 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700690 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700691 for item in pipeline.getAllItems():
692 relevant.add(item.change)
693 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700694 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700695 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700696 for trigger in self.triggers.values():
697 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700698
James E. Blairee743612012-05-29 14:49:32 -0700699 def process_event_queue(self):
700 self.log.debug("Fetching trigger event")
701 event = self.trigger_event_queue.get()
702 self.log.debug("Processing trigger event %s" % event)
James E. Blaireff88162013-07-01 12:44:14 -0400703 project = self.layout.projects.get(event.project_name)
James E. Blairee743612012-05-29 14:49:32 -0700704 if not project:
705 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800706 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700707 return
708
Antoine Mussofeba9672013-01-17 13:44:59 +0100709 # Preprocessing for ref-update events
James E. Blair312df9a2013-08-27 17:17:02 -0700710 if event.ref:
Antoine Mussofeba9672013-01-17 13:44:59 +0100711 # Make sure the local git repo is up-to-date with the remote one.
712 # We better have the new ref before enqueuing the changes.
713 # This is done before enqueuing the changes to avoid calling an
714 # update per pipeline accepting the change.
James E. Blairb34e9262013-08-27 17:12:31 -0700715 self.log.info("Fetching references for %s" % project)
716 self.merger.updateRepo(project)
Antoine Mussofeba9672013-01-17 13:44:59 +0100717
James E. Blaireff88162013-07-01 12:44:14 -0400718 for pipeline in self.layout.pipelines.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700719 change = event.getChange(project,
720 self.triggers.get(event.trigger_name))
James E. Blair2fa50962013-01-30 21:50:41 -0800721 if event.type == 'patchset-created':
722 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blairc053d022014-01-22 14:57:33 -0800723 if pipeline.manager.eventMatches(event, change):
James E. Blairfee8d652013-06-07 08:57:52 -0700724 self.log.info("Adding %s, %s to %s" %
725 (project, change, pipeline))
726 pipeline.manager.addChange(change)
727 while pipeline.manager.processQueue():
728 pass
729
James E. Blairff791972013-01-09 11:45:43 -0800730 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700731
James E. Blair468c8512013-12-06 13:27:19 -0800732 def process_management_queue(self):
733 self.log.debug("Fetching management event")
734 event = self.management_event_queue.get()
735 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800736 try:
737 if isinstance(event, ReconfigureEvent):
738 self._doReconfigureEvent(event)
739 elif isinstance(event, PromoteEvent):
740 self._doPromoteEvent(event)
741 else:
742 self.log.error("Unable to handle event %s" % event)
743 event.done()
744 except Exception as e:
745 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800746 self.management_event_queue.task_done()
747
James E. Blairee743612012-05-29 14:49:32 -0700748 def process_result_queue(self):
749 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700750 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700751 self.log.debug("Processing result event %s" % build)
James E. Blaireff88162013-07-01 12:44:14 -0400752 for pipeline in self.layout.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700753 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700754 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800755 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700756 return
757 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700758 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800759 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700760 return
James E. Blairc84dd262012-05-31 10:03:13 -0700761 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800762 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700763
James E. Blair268d9342012-06-13 18:24:29 -0700764 def formatStatusHTML(self):
765 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700766 if self._pause:
767 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700768 if self._exit:
769 ret += 'exit'
770 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
771 ret += '</p>'
772
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400773 if self.last_reconfigured:
774 ret += '<p>Last reconfigured: %s</p>' % self.last_reconfigured
775
James E. Blaireff88162013-07-01 12:44:14 -0400776 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700777 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400778 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700779 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700780 ret += s + '\n'
781 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700782 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700783 ret += '\n'
784 ret += '</pre></html>'
785 return ret
786
James E. Blair8dbd56a2012-12-22 10:55:10 -0800787 def formatStatusJSON(self):
788 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400789
790 data['zuul_version'] = self.zuul_version
791
James E. Blair8dbd56a2012-12-22 10:55:10 -0800792 if self._pause:
793 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800794 if self._exit:
795 ret += 'exit'
796 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
797 ret += '</p>'
798 data['message'] = ret
799
James E. Blairfb682cc2013-02-26 15:23:27 -0800800 data['trigger_event_queue'] = {}
801 data['trigger_event_queue']['length'] = \
802 self.trigger_event_queue.qsize()
803 data['result_event_queue'] = {}
804 data['result_event_queue']['length'] = \
805 self.result_event_queue.qsize()
806
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400807 if self.last_reconfigured:
808 data['last_reconfigured'] = self.last_reconfigured * 1000
809
James E. Blair8dbd56a2012-12-22 10:55:10 -0800810 pipelines = []
811 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400812 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800813 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400814 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800815 pipelines.append(pipeline.formatStatusJSON())
816 return json.dumps(data)
817
James E. Blair1e8dd892012-05-30 09:15:05 -0700818
James E. Blair4aea70c2012-07-26 14:23:24 -0700819class BasePipelineManager(object):
820 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700821
James E. Blair4aea70c2012-07-26 14:23:24 -0700822 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700823 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700824 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700825 self.building_jobs = {}
826 self.event_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700827 if self.sched.config and self.sched.config.has_option(
828 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700829 self.report_times = self.sched.config.getboolean(
830 'zuul', 'report_times')
831 else:
832 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700833
834 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700835 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700836
James E. Blaireff88162013-07-01 12:44:14 -0400837 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700838 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700839 self.log.info(" Events:")
840 for e in self.event_filters:
841 self.log.info(" %s" % e)
842 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700843
James E. Blairee743612012-05-29 14:49:32 -0700844 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700845 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700846 if tree.job:
847 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700848 for b in tree.job._branches:
849 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800850 for f in tree.job._files:
851 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700852 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700853 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700854 hold = ''
855 if tree.job.hold_following_changes:
856 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700857 voting = ''
858 if not tree.job.voting:
859 voting = ' [nonvoting]'
860 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
861 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700862 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700863 log_jobs(x, indent + 2)
864
James E. Blaireff88162013-07-01 12:44:14 -0400865 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700866 tree = self.pipeline.getJobTree(p)
867 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700868 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700869 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000870 self.log.info(" On start:")
871 self.log.info(" %s" % self.pipeline.start_actions)
872 self.log.info(" On success:")
873 self.log.info(" %s" % self.pipeline.success_actions)
874 self.log.info(" On failure:")
875 self.log.info(" %s" % self.pipeline.failure_actions)
James E. Blairee743612012-05-29 14:49:32 -0700876
James E. Blaire421a232012-07-25 16:59:21 -0700877 def getSubmitAllowNeeds(self):
878 # Get a list of code review labels that are allowed to be
879 # "needed" in the submit records for a change, with respect
880 # to this queue. In other words, the list of review labels
881 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000882 allow_needs = set()
883 for action_reporter in self.pipeline.success_actions:
884 allow_needs.update(action_reporter.getSubmitAllowNeeds())
885 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -0700886
James E. Blairc053d022014-01-22 14:57:33 -0800887 def eventMatches(self, event, change):
James E. Blairad28e912013-11-27 10:43:22 -0800888 if event.forced_pipeline:
889 if event.forced_pipeline == self.pipeline.name:
890 return True
891 else:
892 return False
James E. Blairee743612012-05-29 14:49:32 -0700893 for ef in self.event_filters:
James E. Blairc053d022014-01-22 14:57:33 -0800894 if ef.matches(event, change):
James E. Blairee743612012-05-29 14:49:32 -0700895 return True
896 return False
897
James E. Blair0dc8ba92012-07-16 14:23:52 -0700898 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700899 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700900 if change.equals(c):
901 return True
902 return False
903
James E. Blaire0487072012-08-29 17:38:31 -0700904 def reportStart(self, change):
905 try:
906 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000907 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -0700908 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700909 if self.sched.config.has_option('zuul', 'status_url'):
910 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000911 ret = self.sendReport(self.pipeline.start_actions,
912 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -0700913 if ret:
914 self.log.error("Reporting change start %s received: %s" %
915 (change, ret))
916 except:
917 self.log.exception("Exception while reporting start:")
918
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000919 def sendReport(self, action_reporters, change, message):
920 """Sends the built message off to configured reporters.
921
922 Takes the action_reporters, change, message and extra options and
923 sends them to the pluggable reporters.
924 """
925 report_errors = []
926 if len(action_reporters) > 0:
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000927 for action_reporter in action_reporters:
928 ret = action_reporter.report(change, message)
929 if ret:
930 report_errors.append(ret)
931 if len(report_errors) == 0:
932 return
933 return report_errors
934
James E. Blaire0487072012-08-29 17:38:31 -0700935 def isChangeReadyToBeEnqueued(self, change):
936 return True
937
James E. Blair36658cf2013-12-06 17:53:48 -0800938 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700939 return True
940
James E. Blair36658cf2013-12-06 17:53:48 -0800941 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700942 return True
943
James E. Blairfee8d652013-06-07 08:57:52 -0700944 def checkForChangesNeededBy(self, change):
945 return True
946
James E. Blair972e3c72013-08-29 12:04:55 -0700947 def getFailingDependentItem(self, item):
948 return None
949
James E. Blairfee8d652013-06-07 08:57:52 -0700950 def getDependentItems(self, item):
951 orig_item = item
952 items = []
953 while item.item_ahead:
954 items.append(item.item_ahead)
955 item = item.item_ahead
956 self.log.info("Change %s depends on changes %s" %
957 (orig_item.change,
958 [x.change for x in items]))
959 return items
960
James E. Blair972e3c72013-08-29 12:04:55 -0700961 def getItemForChange(self, change):
962 for item in self.pipeline.getAllItems():
963 if item.change.equals(change):
964 return item
965 return None
966
James E. Blair2fa50962013-01-30 21:50:41 -0800967 def findOldVersionOfChangeAlreadyInQueue(self, change):
968 for c in self.pipeline.getChangesInQueue():
969 if change.isUpdateOf(c):
970 return c
971 return None
972
973 def removeOldVersionsOfChange(self, change):
974 if not self.pipeline.dequeue_on_new_patchset:
975 return
976 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
977 if old_change:
978 self.log.debug("Change %s is a new version of %s, removing %s" %
979 (change, old_change, old_change))
980 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -0800981
James E. Blair972e3c72013-08-29 12:04:55 -0700982 def reEnqueueItem(self, item):
James E. Blaircdccd972013-07-01 12:10:22 -0700983 change_queue = self.pipeline.getQueue(item.change.project)
984 if change_queue:
985 self.log.debug("Re-enqueing change %s in queue %s" %
986 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -0700987 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -0700988 self.reportStats(item)
989 return True
990 else:
991 self.log.error("Unable to find change queue for project %s" %
992 item.change.project)
993 return False
994
Sean Daguef39b9ca2014-01-10 21:34:35 -0500995 def addChange(self, change, quiet=False, enqueue_time=None):
James E. Blaire0487072012-08-29 17:38:31 -0700996 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700997 if self.isChangeAlreadyInQueue(change):
998 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700999 return True
James E. Blair692c6b32012-07-17 11:16:35 -07001000
James E. Blaire0487072012-08-29 17:38:31 -07001001 if not self.isChangeReadyToBeEnqueued(change):
1002 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
1003 change)
1004 return False
1005
James E. Blair36658cf2013-12-06 17:53:48 -08001006 if not self.enqueueChangesAhead(change, quiet):
James E. Blair1490eba2013-03-06 19:14:00 -08001007 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001008 return False
1009
1010 if self.isChangeAlreadyInQueue(change):
1011 self.log.debug("Change %s is already in queue, ignoring" % change)
1012 return True
1013
1014 change_queue = self.pipeline.getQueue(change.project)
1015 if change_queue:
1016 self.log.debug("Adding change %s to queue %s" %
1017 (change, change_queue))
James E. Blair36658cf2013-12-06 17:53:48 -08001018 if not quiet:
1019 if len(self.pipeline.start_actions) > 0:
1020 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -07001021 item = change_queue.enqueueChange(change)
Sean Daguef39b9ca2014-01-10 21:34:35 -05001022 if enqueue_time:
1023 item.enqueue_time = enqueue_time
James E. Blairfee8d652013-06-07 08:57:52 -07001024 self.reportStats(item)
James E. Blair36658cf2013-12-06 17:53:48 -08001025 self.enqueueChangesBehind(change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001026 else:
1027 self.log.error("Unable to find change queue for project %s" %
1028 change.project)
1029 return False
James E. Blairee743612012-05-29 14:49:32 -07001030
James E. Blair972e3c72013-08-29 12:04:55 -07001031 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001032 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairfee8d652013-06-07 08:57:52 -07001033 change_queue = self.pipeline.getQueue(item.change.project)
1034 change_queue.dequeueItem(item)
James E. Blair0e933c52013-07-11 10:18:52 -07001035 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -08001036
1037 def removeChange(self, change):
1038 # Remove a change from the queue, probably because it has been
1039 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -07001040 for item in self.pipeline.getAllItems():
1041 if item.change == change:
1042 self.log.debug("Canceling builds behind change: %s "
1043 "because it is being removed." % item.change)
1044 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001045 self.dequeueItem(item)
James E. Blair94235562013-08-26 18:12:31 -07001046 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001047
James E. Blairfee8d652013-06-07 08:57:52 -07001048 def prepareRef(self, item):
1049 # Returns False on success.
1050 # Returns True if we were unable to prepare the ref.
1051 ref = item.current_build_set.ref
1052 if hasattr(item.change, 'refspec') and not ref:
1053 self.log.debug("Preparing ref for: %s" % item.change)
1054 item.current_build_set.setConfiguration()
1055 ref = item.current_build_set.ref
1056 dependent_items = self.getDependentItems(item)
1057 dependent_items.reverse()
1058 all_items = dependent_items + [item]
James E. Blairfee8d652013-06-07 08:57:52 -07001059 commit = self.sched.merger.mergeChanges(all_items, ref)
1060 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -07001061 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -07001062 self.log.info("Unable to merge change %s" % item.change)
James E. Blair972e3c72013-08-29 12:04:55 -07001063 msg = ("This change was unable to be automatically merged "
1064 "with the current state of the repository. Please "
1065 "rebase your change and upload a new patchset.")
James E. Blair6736beb2013-07-11 15:18:15 -07001066 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -07001067 return True
1068 return False
1069
1070 def _launchJobs(self, item, jobs):
1071 self.log.debug("Launching jobs for change %s" % item.change)
1072 dependent_items = self.getDependentItems(item)
1073 for job in jobs:
1074 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001075 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001076 build = self.sched.launcher.launch(job, item,
1077 self.pipeline,
1078 dependent_items)
1079 self.building_jobs[build] = item
1080 self.log.debug("Adding build %s of job %s to item %s" %
1081 (build, job, item))
1082 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001083 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001084 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001085 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001086
James E. Blairfee8d652013-06-07 08:57:52 -07001087 def launchJobs(self, item):
1088 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001089 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001090 self._launchJobs(item, jobs)
1091
1092 def cancelJobs(self, item, prime=True):
1093 self.log.debug("Cancel jobs for change %s" % item.change)
1094 canceled = False
1095 to_remove = []
James E. Blair36658cf2013-12-06 17:53:48 -08001096 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001097 item.resetAllBuilds()
1098 for build, build_item in self.building_jobs.items():
1099 if build_item == item:
1100 self.log.debug("Found build %s for change %s to cancel" %
1101 (build, item.change))
1102 try:
1103 self.sched.launcher.cancel(build)
1104 except:
1105 self.log.exception("Exception while canceling build %s "
1106 "for change %s" % (build, item.change))
1107 to_remove.append(build)
1108 canceled = True
1109 for build in to_remove:
1110 self.log.debug("Removing build %s from running builds" % build)
1111 build.result = 'CANCELED'
1112 del self.building_jobs[build]
James E. Blair972e3c72013-08-29 12:04:55 -07001113 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001114 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001115 (item_behind.change, item.change))
1116 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001117 canceled = True
1118 return canceled
1119
James E. Blair972e3c72013-08-29 12:04:55 -07001120 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001121 changed = False
1122 item_ahead = item.item_ahead
James E. Blair972e3c72013-08-29 12:04:55 -07001123 change_queue = self.pipeline.getQueue(item.change.project)
1124 failing_reasons = [] # Reasons this item is failing
1125
James E. Blairfee8d652013-06-07 08:57:52 -07001126 if self.checkForChangesNeededBy(item.change) is not True:
1127 # It's not okay to enqueue this change, we should remove it.
1128 self.log.info("Dequeuing change %s because "
1129 "it can no longer merge" % item.change)
1130 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001131 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001132 self.pipeline.setDequeuedNeedingChange(item)
1133 try:
1134 self.reportItem(item)
1135 except MergeFailure:
1136 pass
James E. Blair972e3c72013-08-29 12:04:55 -07001137 return (True, nnfi)
1138 dep_item = self.getFailingDependentItem(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001139 actionable = change_queue.isActionable(item)
1140 item.active = actionable
James E. Blair972e3c72013-08-29 12:04:55 -07001141 if dep_item:
1142 failing_reasons.append('a needed change is failing')
1143 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001144 else:
James E. Blairfef71632013-09-23 11:15:47 -07001145 item_ahead_merged = False
1146 if ((item_ahead and item_ahead.change.is_merged) or
1147 not change_queue.dependent):
1148 item_ahead_merged = True
1149 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001150 # Our current base is different than what we expected,
1151 # and it's not because our current base merged. Something
1152 # ahead must have failed.
1153 self.log.info("Resetting builds for change %s because the "
1154 "item ahead, %s, is not the nearest non-failing "
1155 "item, %s" % (item.change, item_ahead, nnfi))
1156 change_queue.moveItem(item, nnfi)
1157 changed = True
1158 self.cancelJobs(item)
Clark Boylanaf2476f2014-01-23 14:47:36 -08001159 if actionable:
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001160 self.prepareRef(item)
1161 if item.current_build_set.unable_to_merge:
1162 failing_reasons.append("it has a merge conflict")
Clark Boylanaf2476f2014-01-23 14:47:36 -08001163 if actionable and self.launchJobs(item):
James E. Blairfee8d652013-06-07 08:57:52 -07001164 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001165 if self.pipeline.didAnyJobFail(item):
1166 failing_reasons.append("at least one job failed")
1167 if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
1168 try:
1169 self.reportItem(item)
1170 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001171 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001172 for item_behind in item.items_behind:
1173 self.log.info("Resetting builds for change %s because the "
1174 "item ahead, %s, failed to merge" %
1175 (item_behind.change, item))
1176 self.cancelJobs(item_behind)
1177 self.dequeueItem(item)
1178 changed = True
1179 elif not failing_reasons:
1180 nnfi = item
1181 item.current_build_set.failing_reasons = failing_reasons
1182 if failing_reasons:
1183 self.log.debug("%s is a failing item because %s" %
1184 (item, failing_reasons))
1185 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001186
1187 def processQueue(self):
1188 # Do whatever needs to be done for each change in the queue
1189 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1190 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001191 for queue in self.pipeline.queues:
1192 queue_changed = False
1193 nnfi = None # Nearest non-failing item
Clark Boylan3d2f7a72014-01-23 11:07:42 -08001194 for item in queue.queue[:]:
James E. Blair972e3c72013-08-29 12:04:55 -07001195 item_changed, nnfi = self._processOneItem(item, nnfi)
1196 if item_changed:
1197 queue_changed = True
1198 self.reportStats(item)
1199 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001200 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001201 status = ''
1202 for item in queue.queue:
1203 status += self.pipeline.formatStatus(item)
1204 if status:
1205 self.log.debug("Queue %s status is now:\n %s" %
1206 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001207 self.log.debug("Finished queue processor: %s (changed: %s)" %
1208 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001209 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001210
James E. Blair11700c32012-07-05 17:50:05 -07001211 def updateBuildDescriptions(self, build_set):
1212 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001213 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001214 self.sched.launcher.setBuildDescription(build, desc)
1215
1216 if build_set.previous_build_set:
1217 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001218 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001219 self.sched.launcher.setBuildDescription(build, desc)
1220
1221 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -07001222 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -07001223 # Or triggered externally, or triggered before zuul started,
1224 # or restarted
1225 return False
1226
James E. Blairfee8d652013-06-07 08:57:52 -07001227 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001228 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001229 while self.processQueue():
1230 pass
James E. Blair11700c32012-07-05 17:50:05 -07001231 return True
1232
James E. Blairee743612012-05-29 14:49:32 -07001233 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -07001234 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -07001235 # Or triggered externally, or triggered before zuul started,
1236 # or restarted
1237 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001238
1239 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -07001240 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001241 self.log.debug("Found change %s which triggered completed build %s" %
1242 (change, build))
James E. Blairee743612012-05-29 14:49:32 -07001243
1244 del self.building_jobs[build]
1245
James E. Blair4aea70c2012-07-26 14:23:24 -07001246 self.pipeline.setResult(change, build)
James E. Blair972e3c72013-08-29 12:04:55 -07001247 self.log.debug("Change %s status is now:\n %s" %
1248 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -07001249 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001250 while self.processQueue():
1251 pass
James E. Blairee743612012-05-29 14:49:32 -07001252 return True
1253
James E. Blairfee8d652013-06-07 08:57:52 -07001254 def reportItem(self, item):
James E. Blaire5910202013-12-27 09:50:31 -08001255 if item.reported:
James E. Blairfee8d652013-06-07 08:57:52 -07001256 raise Exception("Already reported change %s" % item.change)
1257 ret = self._reportItem(item)
1258 if self.changes_merge:
1259 succeeded = self.pipeline.didAllJobsSucceed(item)
1260 merged = (not ret)
1261 if merged:
James E. Blair6c358e72013-07-29 17:06:47 -07001262 merged = self.pipeline.trigger.isMerged(item.change,
1263 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001264 self.log.info("Reported change %s status: all-succeeded: %s, "
1265 "merged: %s" % (item.change, succeeded, merged))
James E. Blair4a035d92014-01-23 13:10:48 -08001266 change_queue = self.pipeline.getQueue(item.change.project)
James E. Blairfee8d652013-06-07 08:57:52 -07001267 if not (succeeded and merged):
1268 self.log.debug("Reported change %s failed tests or failed "
1269 "to merge" % (item.change))
James E. Blair4a035d92014-01-23 13:10:48 -08001270 change_queue.decreaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001271 self.log.debug("%s window size decreased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001272 (change_queue, change_queue.window))
James E. Blairfee8d652013-06-07 08:57:52 -07001273 raise MergeFailure("Change %s failed to merge" % item.change)
Clark Boylan7603a372014-01-21 11:43:20 -08001274 else:
James E. Blair4a035d92014-01-23 13:10:48 -08001275 change_queue.increaseWindowSize()
Clark Boylan7603a372014-01-21 11:43:20 -08001276 self.log.debug("%s window size increased to %s" %
James E. Blair4a035d92014-01-23 13:10:48 -08001277 (change_queue, change_queue.window))
James E. Blaire0487072012-08-29 17:38:31 -07001278
James E. Blairfee8d652013-06-07 08:57:52 -07001279 def _reportItem(self, item):
James E. Blaire5910202013-12-27 09:50:31 -08001280 if item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001281 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001282 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001283 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001284 if self.pipeline.didAllJobsSucceed(item):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001285 self.log.debug("success %s %s" % (self.pipeline.success_actions,
1286 self.pipeline.failure_actions))
1287 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001288 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001289 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001290 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001291 item.setReportedResult('FAILURE')
James E. Blairfee8d652013-06-07 08:57:52 -07001292 item.reported = True
James E. Blaire5910202013-12-27 09:50:31 -08001293 if actions:
1294 report = self.formatReport(item)
1295 try:
1296 self.log.info("Reporting change %s, actions: %s" %
1297 (item.change, actions))
1298 ret = self.sendReport(actions, item.change, report)
1299 if ret:
1300 self.log.error("Reporting change %s received: %s" %
1301 (item.change, ret))
1302 except:
1303 self.log.exception("Exception while reporting:")
1304 item.setReportedResult('ERROR')
James E. Blairfee8d652013-06-07 08:57:52 -07001305 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001306 return ret
1307
James E. Blairfee8d652013-06-07 08:57:52 -07001308 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001309 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001310 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001311 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001312 else:
James E. Blair56370192013-01-14 15:47:28 -08001313 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001314
James E. Blairfee8d652013-06-07 08:57:52 -07001315 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001316 ret += "This change depends on a change that failed to merge."
James E. Blair6736beb2013-07-11 15:18:15 -07001317 elif item.current_build_set.unable_to_merge_message:
1318 ret += item.current_build_set.unable_to_merge_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001319 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001320 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001321 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001322 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001323 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001324 for job in self.pipeline.getJobs(item.change):
1325 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001326 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001327 pattern = url_pattern
1328 if result == 'SUCCESS':
1329 if job.success_message:
1330 result = job.success_message
1331 if job.success_pattern:
1332 pattern = job.success_pattern
1333 elif result == 'FAILURE':
1334 if job.failure_message:
1335 result = job.failure_message
1336 if job.failure_pattern:
1337 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001338 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001339 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001340 pipeline=self.pipeline,
1341 job=job,
1342 build=build)
1343 else:
1344 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001345 if not job.voting:
1346 voting = ' (non-voting)'
1347 else:
1348 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001349 if self.report_times and build.end_time and build.start_time:
1350 dt = int(build.end_time - build.start_time)
1351 m, s = divmod(dt, 60)
1352 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001353 if h:
1354 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1355 elif m:
1356 elapsed = ' in %dm %02ds' % (m, s)
1357 else:
1358 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001359 else:
1360 elapsed = ''
James E. Blair754e31e2013-08-18 13:15:15 -07001361 name = ''
James E. Blair8fef52b2013-08-17 17:32:50 -07001362 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1363 if self.sched.config.getboolean('zuul',
1364 'job_name_in_report'):
1365 name = job.name + ' '
James E. Blair8fef52b2013-08-17 17:32:50 -07001366 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1367 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001368 return ret
1369
1370 def formatDescription(self, build):
1371 concurrent_changes = ''
1372 concurrent_builds = ''
1373 other_builds = ''
1374
1375 for change in build.build_set.other_changes:
1376 concurrent_changes += '<li><a href="{change.url}">\
1377 {change.number},{change.patchset}</a></li>'.format(
1378 change=change)
1379
James E. Blairfee8d652013-06-07 08:57:52 -07001380 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001381
1382 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001383 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001384 concurrent_builds += """\
1385<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001386 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001387 {build.job.name} #{build.number}</a>: {build.result}
1388</li>
1389""".format(build=build)
1390 else:
1391 concurrent_builds += """\
1392<li>
1393 {build.job.name}: {build.result}
1394</li>""".format(build=build)
1395
1396 if build.build_set.previous_build_set:
1397 other_build = build.build_set.previous_build_set.getBuild(
1398 build.job.name)
1399 if other_build:
1400 other_builds += """\
1401<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001402 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001403 {build.job.name} #{build.number}</a>
1404</li>
1405""".format(build=other_build)
1406
1407 if build.build_set.next_build_set:
1408 other_build = build.build_set.next_build_set.getBuild(
1409 build.job.name)
1410 if other_build:
1411 other_builds += """\
1412<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001413 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001414 {build.job.name} #{build.number}</a>
1415</li>
1416""".format(build=other_build)
1417
1418 result = build.build_set.result
1419
1420 if hasattr(change, 'number'):
1421 ret = """\
1422<p>
1423 Triggered by change:
1424 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1425 Branch: <b>{change.branch}</b><br/>
1426 Pipeline: <b>{self.pipeline.name}</b>
1427</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001428 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001429 ret = """\
1430<p>
1431 Triggered by reference:
1432 {change.ref}</a><br/>
1433 Old revision: <b>{change.oldrev}</b><br/>
1434 New revision: <b>{change.newrev}</b><br/>
1435 Pipeline: <b>{self.pipeline.name}</b>
1436</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001437 else:
1438 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001439
1440 if concurrent_changes:
1441 ret += """\
1442<p>
1443 Other changes tested concurrently with this change:
1444 <ul>{concurrent_changes}</ul>
1445</p>
1446"""
1447 if concurrent_builds:
1448 ret += """\
1449<p>
1450 All builds for this change set:
1451 <ul>{concurrent_builds}</ul>
1452</p>
1453"""
1454
1455 if other_builds:
1456 ret += """\
1457<p>
1458 Other build sets for this change:
1459 <ul>{other_builds}</ul>
1460</p>
1461"""
1462 if result:
1463 ret += """\
1464<p>
1465 Reported result: <b>{result}</b>
1466</p>
1467"""
1468
1469 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001470 return ret
1471
James E. Blairfee8d652013-06-07 08:57:52 -07001472 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001473 if not statsd:
1474 return
1475 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001476 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001477 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001478 if item.dequeue_time:
1479 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001480 else:
1481 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001482 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001483
1484 # stats.timers.zuul.pipeline.NAME.resident_time
1485 # stats_counts.zuul.pipeline.NAME.total_changes
1486 # stats.gauges.zuul.pipeline.NAME.current_changes
1487 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001488 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001489 if dt:
1490 statsd.timing(key + '.resident_time', dt)
1491 statsd.incr(key + '.total_changes')
1492
1493 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1494 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001495 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001496 key += '.%s' % project_name
1497 if dt:
1498 statsd.timing(key + '.resident_time', dt)
1499 statsd.incr(key + '.total_changes')
1500 except:
1501 self.log.exception("Exception reporting pipeline stats")
1502
James E. Blair1e8dd892012-05-30 09:15:05 -07001503
James E. Blair4aea70c2012-07-26 14:23:24 -07001504class IndependentPipelineManager(BasePipelineManager):
1505 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001506 changes_merge = False
1507
James E. Blaireff88162013-07-01 12:44:14 -04001508 def _postConfig(self, layout):
1509 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001510
1511 change_queue = ChangeQueue(self.pipeline, dependent=False)
1512 for project in self.pipeline.getProjects():
1513 change_queue.addProject(project)
1514
1515 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001516
James E. Blair1e8dd892012-05-30 09:15:05 -07001517
James E. Blair4aea70c2012-07-26 14:23:24 -07001518class DependentPipelineManager(BasePipelineManager):
1519 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001520 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001521
1522 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001523 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001524
James E. Blaireff88162013-07-01 12:44:14 -04001525 def _postConfig(self, layout):
1526 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001527 self.buildChangeQueues()
1528
1529 def buildChangeQueues(self):
1530 self.log.debug("Building shared change queues")
1531 change_queues = []
1532
James E. Blair4aea70c2012-07-26 14:23:24 -07001533 for project in self.pipeline.getProjects():
Clark Boylan7603a372014-01-21 11:43:20 -08001534 change_queue = ChangeQueue(
1535 self.pipeline,
1536 window=self.pipeline.window,
1537 window_floor=self.pipeline.window_floor,
1538 window_increase_type=self.pipeline.window_increase_type,
1539 window_increase_factor=self.pipeline.window_increase_factor,
1540 window_decrease_type=self.pipeline.window_decrease_type,
1541 window_decrease_factor=self.pipeline.window_decrease_factor)
James E. Blair4aea70c2012-07-26 14:23:24 -07001542 change_queue.addProject(project)
1543 change_queues.append(change_queue)
1544 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001545
James E. Blairc3d428e2013-12-03 15:06:48 -08001546 # Iterate over all queues trying to combine them, and keep doing
1547 # so until they can not be combined further.
1548 last_change_queues = change_queues
1549 while True:
1550 new_change_queues = self.combineChangeQueues(last_change_queues)
1551 if len(last_change_queues) == len(new_change_queues):
1552 break
1553 last_change_queues = new_change_queues
1554
1555 self.log.info(" Shared change queues:")
1556 for queue in new_change_queues:
1557 self.pipeline.addQueue(queue)
1558 self.log.info(" %s" % queue)
1559
1560 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001561 self.log.debug("Combining shared queues")
1562 new_change_queues = []
1563 for a in change_queues:
1564 merged_a = False
1565 for b in new_change_queues:
1566 if not a.getJobs().isdisjoint(b.getJobs()):
1567 self.log.debug("Merging queue %s into %s" % (a, b))
1568 b.mergeChangeQueue(a)
1569 merged_a = True
1570 break # this breaks out of 'for b' and continues 'for a'
1571 if not merged_a:
1572 self.log.debug("Keeping queue %s" % (a))
1573 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001574 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001575
James E. Blaire0487072012-08-29 17:38:31 -07001576 def isChangeReadyToBeEnqueued(self, change):
James E. Blair6c358e72013-07-29 17:06:47 -07001577 if not self.pipeline.trigger.canMerge(change,
1578 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001579 self.log.debug("Change %s can not merge, ignoring" % change)
1580 return False
1581 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001582
James E. Blair36658cf2013-12-06 17:53:48 -08001583 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001584 to_enqueue = []
1585 self.log.debug("Checking for changes needing %s:" % change)
1586 if not hasattr(change, 'needed_by_changes'):
1587 self.log.debug(" Changeish does not support dependencies")
1588 return
1589 for needs in change.needed_by_changes:
James E. Blair6c358e72013-07-29 17:06:47 -07001590 if self.pipeline.trigger.canMerge(needs,
1591 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001592 self.log.debug(" Change %s needs %s and is ready to merge" %
1593 (needs, change))
1594 to_enqueue.append(needs)
1595 if not to_enqueue:
1596 self.log.debug(" No changes need %s" % change)
1597
1598 for other_change in to_enqueue:
James E. Blair36658cf2013-12-06 17:53:48 -08001599 self.addChange(other_change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001600
James E. Blair36658cf2013-12-06 17:53:48 -08001601 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001602 ret = self.checkForChangesNeededBy(change)
1603 if ret in [True, False]:
1604 return ret
1605 self.log.debug(" Change %s must be merged ahead of %s" %
1606 (ret, change))
James E. Blair36658cf2013-12-06 17:53:48 -08001607 return self.addChange(ret, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001608
1609 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001610 self.log.debug("Checking for changes needed by %s:" % change)
1611 # Return true if okay to proceed enqueing this change,
1612 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001613 if not hasattr(change, 'needs_change'):
1614 self.log.debug(" Changeish does not support dependencies")
1615 return True
James E. Blaire421a232012-07-25 16:59:21 -07001616 if not change.needs_change:
1617 self.log.debug(" No changes needed")
1618 return True
1619 if change.needs_change.is_merged:
1620 self.log.debug(" Needed change is merged")
1621 return True
1622 if not change.needs_change.is_current_patchset:
1623 self.log.debug(" Needed change is not the current patchset")
1624 return False
James E. Blair127bc182012-08-28 15:55:15 -07001625 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001626 self.log.debug(" Needed change is already ahead in the queue")
1627 return True
James E. Blair6c358e72013-07-29 17:06:47 -07001628 if self.pipeline.trigger.canMerge(change.needs_change,
1629 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001630 self.log.debug(" Change %s is needed" %
1631 change.needs_change)
1632 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001633 # The needed change can't be merged.
1634 self.log.debug(" Change %s is needed but can not be merged" %
1635 change.needs_change)
1636 return False
James E. Blair972e3c72013-08-29 12:04:55 -07001637
1638 def getFailingDependentItem(self, item):
1639 if not hasattr(item.change, 'needs_change'):
1640 return None
1641 if not item.change.needs_change:
1642 return None
1643 needs_item = self.getItemForChange(item.change.needs_change)
1644 if not needs_item:
1645 return None
1646 if needs_item.current_build_set.failing_reasons:
1647 return needs_item
1648 return None