blob: 6eca846c54462687854818d732a4e8297065beb8 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080028import yaml
James E. Blairee743612012-05-29 14:49:32 -070029
James E. Blair47958382013-01-10 17:26:02 -080030import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070031import model
Joshua Hesketh1879cf72013-08-19 14:13:15 +100032from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070033import merger
James E. Blairee743612012-05-29 14:49:32 -070034
James E. Blair71e94122012-12-24 17:53:08 -080035statsd = extras.try_import('statsd.statsd')
36
James E. Blair1e8dd892012-05-30 09:15:05 -070037
Antoine Musso80edd5a2013-02-13 15:37:53 +010038def deep_format(obj, paramdict):
39 """Apply the paramdict via str.format() to all string objects found within
40 the supplied obj. Lists and dicts are traversed recursively.
41
42 Borrowed from Jenkins Job Builder project"""
43 if isinstance(obj, str):
44 ret = obj.format(**paramdict)
45 elif isinstance(obj, list):
46 ret = []
47 for item in obj:
48 ret.append(deep_format(item, paramdict))
49 elif isinstance(obj, dict):
50 ret = {}
51 for item in obj:
52 exp_item = item.format(**paramdict)
53
54 ret[exp_item] = deep_format(obj[item], paramdict)
55 else:
56 ret = obj
57 return ret
58
59
James E. Blairfee8d652013-06-07 08:57:52 -070060class MergeFailure(Exception):
61 pass
62
63
James E. Blair468c8512013-12-06 13:27:19 -080064class ManagementEvent(object):
65 """An event that should be processed within the main queue run loop"""
66 def __init__(self):
67 self._wait_event = threading.Event()
James E. Blair36658cf2013-12-06 17:53:48 -080068 self._exception = None
69 self._traceback = None
James E. Blair468c8512013-12-06 13:27:19 -080070
James E. Blair36658cf2013-12-06 17:53:48 -080071 def exception(self, e, tb):
72 self._exception = e
73 self._traceback = tb
74 self._wait_event.set()
75
76 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080077 self._wait_event.set()
78
79 def wait(self, timeout=None):
80 self._wait_event.wait(timeout)
James E. Blair36658cf2013-12-06 17:53:48 -080081 if self._exception:
82 raise self._exception, None, self._traceback
James E. Blair468c8512013-12-06 13:27:19 -080083 return self._wait_event.is_set()
84
85
86class ReconfigureEvent(ManagementEvent):
87 """Reconfigure the scheduler. The layout will be (re-)loaded from
88 the path specified in the configuration.
89
90 :arg ConfigParser config: the new configuration
91 """
92 def __init__(self, config):
93 super(ReconfigureEvent, self).__init__()
94 self.config = config
95
96
James E. Blair36658cf2013-12-06 17:53:48 -080097class PromoteEvent(ManagementEvent):
98 """Promote one or more changes to the head of the queue.
99
100 :arg str pipeline_name: the name of the pipeline
101 :arg list change_ids: a list of strings of change ids in the form
102 1234,1
103 """
104
105 def __init__(self, pipeline_name, change_ids):
106 super(PromoteEvent, self).__init__()
107 self.pipeline_name = pipeline_name
108 self.change_ids = change_ids
109
110
James E. Blaire9d45c32012-05-31 09:56:45 -0700111class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700112 log = logging.getLogger("zuul.Scheduler")
113
James E. Blaire9d45c32012-05-31 09:56:45 -0700114 def __init__(self):
115 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400116 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700117 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700118 self.layout_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700119 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700120 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700121 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700122 self.launcher = None
James E. Blair6c358e72013-07-29 17:06:47 -0700123 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000124 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -0700125 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -0700126 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -0700127
128 self.trigger_event_queue = Queue.Queue()
129 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800130 self.management_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -0400131 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -0700132
James E. Blairb0fcae42012-07-17 11:12:10 -0700133 def stop(self):
134 self._stopped = True
135 self.wake_event.set()
136
James E. Blair47958382013-01-10 17:26:02 -0800137 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -0700138 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -0800139
James E. Blaire5a847f2012-07-10 15:29:14 -0700140 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -0400141 layout = model.Layout()
142 project_templates = {}
143
James E. Blairee743612012-05-29 14:49:32 -0700144 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -0700145 if not item:
146 return []
James E. Blair32663402012-06-01 10:04:18 -0700147 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700148 return item
149 return [item]
150
James E. Blaire5a847f2012-07-10 15:29:14 -0700151 if config_path:
152 config_path = os.path.expanduser(config_path)
153 if not os.path.exists(config_path):
154 raise Exception("Unable to read layout config file at %s" %
155 config_path)
156 config_file = open(config_path)
157 data = yaml.load(config_file)
158
James E. Blair47958382013-01-10 17:26:02 -0800159 validator = layoutvalidator.LayoutValidator()
160 validator.validate(data)
161
James E. Blaireff88162013-07-01 12:44:14 -0400162 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700163 for include in data.get('includes', []):
164 if 'python-file' in include:
165 fn = include['python-file']
166 if not os.path.isabs(fn):
167 base = os.path.dirname(config_path)
168 fn = os.path.join(base, fn)
169 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400170 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700171
James E. Blair4aea70c2012-07-26 14:23:24 -0700172 for conf_pipeline in data.get('pipelines', []):
173 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800174 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700175 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
176 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800177 pipeline.failure_message = conf_pipeline.get('failure-message',
178 "Build failed.")
179 pipeline.success_message = conf_pipeline.get('success-message',
180 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800181 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700182 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000183
184 action_reporters = {}
185 for action in ['start', 'success', 'failure']:
186 action_reporters[action] = []
187 if conf_pipeline.get(action):
188 for reporter_name, params \
189 in conf_pipeline.get(action).items():
190 if reporter_name in self.reporters.keys():
191 action_reporters[action].append(ActionReporter(
192 self.reporters[reporter_name], params))
193 else:
194 self.log.error('Invalid reporter name %s' %
195 reporter_name)
196 pipeline.start_actions = action_reporters['start']
197 pipeline.success_actions = action_reporters['success']
198 pipeline.failure_actions = action_reporters['failure']
199
James E. Blair4aea70c2012-07-26 14:23:24 -0700200 manager = globals()[conf_pipeline['manager']](self, pipeline)
201 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400202 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000203
James E. Blair6c358e72013-07-29 17:06:47 -0700204 # TODO: move this into triggers (may require pluggable
205 # configuration)
206 if 'gerrit' in conf_pipeline['trigger']:
207 pipeline.trigger = self.triggers['gerrit']
208 for trigger in toList(conf_pipeline['trigger']['gerrit']):
209 approvals = {}
210 for approval_dict in toList(trigger.get('approval')):
211 for k, v in approval_dict.items():
212 approvals[k] = v
213 f = EventFilter(types=toList(trigger['event']),
214 branches=toList(trigger.get('branch')),
215 refs=toList(trigger.get('ref')),
216 approvals=approvals,
217 comment_filters=
218 toList(trigger.get('comment_filter')),
219 email_filters=
220 toList(trigger.get('email_filter')))
221 manager.event_filters.append(f)
James E. Blair63bb0ef2013-07-29 17:14:51 -0700222 elif 'timer' in conf_pipeline['trigger']:
223 pipeline.trigger = self.triggers['timer']
224 for trigger in toList(conf_pipeline['trigger']['timer']):
225 f = EventFilter(types=['timer'],
226 timespecs=toList(trigger['time']))
227 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700228
Antoine Musso80edd5a2013-02-13 15:37:53 +0100229 for project_template in data.get('project-templates', []):
230 # Make sure the template only contains valid pipelines
231 tpl = dict(
232 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400233 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100234 if pipe_name in project_template
235 )
James E. Blaireff88162013-07-01 12:44:14 -0400236 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100237
James E. Blair47958382013-01-10 17:26:02 -0800238 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400239 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700240 # Be careful to only set attributes explicitly present on
241 # this job, to avoid squashing attributes set by a meta-job.
242 m = config_job.get('failure-message', None)
243 if m:
244 job.failure_message = m
245 m = config_job.get('success-message', None)
246 if m:
247 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800248 m = config_job.get('failure-pattern', None)
249 if m:
250 job.failure_pattern = m
251 m = config_job.get('success-pattern', None)
252 if m:
253 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700254 m = config_job.get('hold-following-changes', False)
255 if m:
256 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700257 m = config_job.get('voting', None)
258 if m is not None:
259 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700260 fname = config_job.get('parameter-function', None)
261 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400262 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700263 if not func:
264 raise Exception("Unable to find function %s" % fname)
265 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700266 branches = toList(config_job.get('branch'))
267 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700268 job._branches = branches
269 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800270 files = toList(config_job.get('files'))
271 if files:
272 job._files = files
273 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700274
275 def add_jobs(job_tree, config_jobs):
276 for job in config_jobs:
277 if isinstance(job, list):
278 for x in job:
279 add_jobs(job_tree, x)
280 if isinstance(job, dict):
281 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400282 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700283 add_jobs(parent_tree, children)
284 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400285 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700286
James E. Blair47958382013-01-10 17:26:02 -0800287 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700288 project = Project(config_project['name'])
Antoine Musso80edd5a2013-02-13 15:37:53 +0100289
James E. Blair3e98c022013-12-16 15:25:38 -0800290 # This is reversed due to the prepend operation below, so
291 # the ultimate order is templates (in order) followed by
292 # statically defined jobs.
293 for requested_template in reversed(
294 config_project.get('template', [])):
Antoine Musso80edd5a2013-02-13 15:37:53 +0100295 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400296 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100297 requested_template.get('name'))
298 # Expand it with the project context
299 expanded = deep_format(tpl, requested_template)
James E. Blair3e98c022013-12-16 15:25:38 -0800300 # Finally merge the expansion with whatever has been
301 # already defined for this project. Prepend our new
302 # jobs to existing ones (which may have been
303 # statically defined or defined by other templates).
304 for pipeline in layout.pipelines.values():
305 if pipeline.name in expanded:
306 config_project.update(
307 {pipeline.name: expanded[pipeline.name] +
308 config_project.get(pipeline.name, [])})
309 # TODO: future enhancement -- add an option to the
310 # template block to indicate that duplicate jobs should be
311 # merged (especially to handle the case where they have
312 # children and you want all of the children to run after a
313 # single run of the parent).
Antoine Musso80edd5a2013-02-13 15:37:53 +0100314
James E. Blaireff88162013-07-01 12:44:14 -0400315 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700316 mode = config_project.get('merge-mode', 'merge-resolve')
317 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400318 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700319 if pipeline.name in config_project:
320 job_tree = pipeline.addProject(project)
321 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700322 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700323
James E. Blairb0954652012-06-01 11:32:01 -0700324 # All jobs should be defined at this point, get rid of
325 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700326 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700327
James E. Blaireff88162013-07-01 12:44:14 -0400328 for pipeline in layout.pipelines.values():
329 pipeline.manager._postConfig(layout)
330
331 return layout
James E. Blairee743612012-05-29 14:49:32 -0700332
James E. Blair47958382013-01-10 17:26:02 -0800333 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700334 if self.config.has_option('zuul', 'git_dir'):
335 merge_root = self.config.get('zuul', 'git_dir')
336 else:
337 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800338
Paul Belangerb67aba12013-05-13 19:22:14 -0400339 if self.config.has_option('zuul', 'git_user_email'):
340 merge_email = self.config.get('zuul', 'git_user_email')
341 else:
342 merge_email = None
343
344 if self.config.has_option('zuul', 'git_user_name'):
345 merge_name = self.config.get('zuul', 'git_user_name')
346 else:
347 merge_name = None
348
James E. Blairceabcbc2012-08-17 13:48:46 -0700349 if self.config.has_option('zuul', 'push_change_refs'):
350 push_refs = self.config.getboolean('zuul', 'push_change_refs')
351 else:
352 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800353
James E. Blairad615012012-11-30 16:14:21 -0800354 if self.config.has_option('gerrit', 'sshkey'):
355 sshkey = self.config.get('gerrit', 'sshkey')
356 else:
357 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800358
James E. Blair6c358e72013-07-29 17:06:47 -0700359 # TODO: The merger should have an upstream repo independent of
360 # triggers, and then each trigger should provide a fetch
361 # location.
362 self.merger = merger.Merger(self.triggers['gerrit'],
363 merge_root, push_refs,
Paul Belangerb67aba12013-05-13 19:22:14 -0400364 sshkey, merge_email, merge_name)
James E. Blaireff88162013-07-01 12:44:14 -0400365 for project in self.layout.projects.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700366 url = self.triggers['gerrit'].getGitUrl(project)
James E. Blair4886cc12012-07-18 15:39:41 -0700367 self.merger.addProject(project, url)
368
James E. Blairee743612012-05-29 14:49:32 -0700369 def setLauncher(self, launcher):
370 self.launcher = launcher
371
James E. Blair6c358e72013-07-29 17:06:47 -0700372 def registerTrigger(self, trigger, name=None):
373 if name is None:
374 name = trigger.name
375 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700376
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000377 def registerReporter(self, reporter, name=None):
378 if name is None:
379 name = reporter.name
380 self.reporters[name] = reporter
381
James E. Blaircdccd972013-07-01 12:10:22 -0700382 def getProject(self, name):
383 self.layout_lock.acquire()
384 p = None
385 try:
386 p = self.layout.projects.get(name)
387 finally:
388 self.layout_lock.release()
389 return p
390
James E. Blairee743612012-05-29 14:49:32 -0700391 def addEvent(self, event):
392 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800393 try:
394 if statsd:
395 statsd.incr('gerrit.event.%s' % event.type)
396 except:
397 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700398 self.trigger_event_queue.put(event)
399 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800400 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700401
James E. Blair11700c32012-07-05 17:50:05 -0700402 def onBuildStarted(self, build):
403 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800404 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700405 self.result_event_queue.put(('started', build))
406 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800407 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700408
James E. Blairee743612012-05-29 14:49:32 -0700409 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700410 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800411 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800412 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700413 if statsd and build.pipeline:
414 jobname = build.job.name.replace('.', '_')
415 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
416 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800417 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
418 dt = int((build.end_time - build.start_time) * 1000)
419 statsd.timing(key, dt)
420 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700421 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
422 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800423 except:
424 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700425 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700426 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800427 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700428
James E. Blaire9d45c32012-05-31 09:56:45 -0700429 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700430 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800431 event = ReconfigureEvent(config)
432 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700433 self.wake_event.set()
434 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800435 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700436 self.log.debug("Reconfiguration complete")
437
James E. Blair36658cf2013-12-06 17:53:48 -0800438 def promote(self, pipeline_name, change_ids):
439 event = PromoteEvent(pipeline_name, change_ids)
440 self.management_event_queue.put(event)
441 self.wake_event.set()
442 self.log.debug("Waiting for promotion")
443 event.wait()
444 self.log.debug("Promotion complete")
445
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700446 def exit(self):
447 self.log.debug("Prepare to exit")
448 self._pause = True
449 self._exit = True
450 self.wake_event.set()
451 self.log.debug("Waiting for exit")
452
453 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700454 if self.config.has_option('zuul', 'state_dir'):
455 state_dir = os.path.expanduser(self.config.get('zuul',
456 'state_dir'))
457 else:
458 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700459 return os.path.join(state_dir, 'queue.pickle')
460
461 def _save_queue(self):
462 pickle_file = self._get_queue_pickle_file()
463 events = []
464 while not self.trigger_event_queue.empty():
465 events.append(self.trigger_event_queue.get())
466 self.log.debug("Queue length is %s" % len(events))
467 if events:
468 self.log.debug("Saving queue")
469 pickle.dump(events, open(pickle_file, 'wb'))
470
471 def _load_queue(self):
472 pickle_file = self._get_queue_pickle_file()
473 if os.path.exists(pickle_file):
474 self.log.debug("Loading queue")
475 events = pickle.load(open(pickle_file, 'rb'))
476 self.log.debug("Queue length is %s" % len(events))
477 for event in events:
478 self.trigger_event_queue.put(event)
479 else:
480 self.log.debug("No queue file found")
481
482 def _delete_queue(self):
483 pickle_file = self._get_queue_pickle_file()
484 if os.path.exists(pickle_file):
485 self.log.debug("Deleting saved queue")
486 os.unlink(pickle_file)
487
488 def resume(self):
489 try:
490 self._load_queue()
491 except:
492 self.log.exception("Unable to load queue")
493 try:
494 self._delete_queue()
495 except:
496 self.log.exception("Unable to delete saved queue")
497 self.log.debug("Resuming queue processing")
498 self.wake_event.set()
499
500 def _doPauseEvent(self):
501 if self._exit:
502 self.log.debug("Exiting")
503 self._save_queue()
504 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700505
James E. Blair468c8512013-12-06 13:27:19 -0800506 def _doReconfigureEvent(self, event):
507 # This is called in the scheduler loop after another thread submits
508 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700509 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800510 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700511 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700512 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700513 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400514 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700515 for name, new_pipeline in layout.pipelines.items():
516 old_pipeline = self.layout.pipelines.get(name)
517 if not old_pipeline:
518 if self.layout.pipelines:
519 # Don't emit this warning on startup
520 self.log.warning("No old pipeline matching %s found "
521 "when reconfiguring" % name)
522 continue
523 self.log.debug("Re-enqueueing changes for pipeline %s" %
524 name)
525 items_to_remove = []
526 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700527 for item in shared_queue.queue:
James E. Blaircdccd972013-07-01 12:10:22 -0700528 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700529 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700530 item.pipeline = None
531 project = layout.projects.get(item.change.project.name)
532 if not project:
533 self.log.warning("Unable to find project for "
534 "change %s while reenqueueing" %
535 item.change)
536 item.change.project = None
537 items_to_remove.append(item)
538 continue
539 item.change.project = project
James E. Blair972e3c72013-08-29 12:04:55 -0700540 if not new_pipeline.manager.reEnqueueItem(item):
James E. Blaircdccd972013-07-01 12:10:22 -0700541 items_to_remove.append(item)
542 builds_to_remove = []
543 for build, item in old_pipeline.manager.building_jobs.items():
544 if item in items_to_remove:
545 builds_to_remove.append(build)
546 self.log.warning("Deleting running build %s for "
547 "change %s while reenqueueing" % (
548 build, item.change))
549 for build in builds_to_remove:
550 del old_pipeline.manager.building_jobs[build]
551 new_pipeline.manager.building_jobs = \
552 old_pipeline.manager.building_jobs
553 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800554 self._setupMerger()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700555 for trigger in self.triggers.values():
556 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700557 if statsd:
558 try:
559 for pipeline in self.layout.pipelines.values():
560 items = len(pipeline.getAllItems())
561 # stats.gauges.zuul.pipeline.NAME.current_changes
562 key = 'zuul.pipeline.%s' % pipeline.name
563 statsd.gauge(key + '.current_changes', items)
564 except Exception:
565 self.log.exception("Exception reporting initial "
566 "pipeline stats:")
James E. Blaircdccd972013-07-01 12:10:22 -0700567 finally:
568 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700569
James E. Blair36658cf2013-12-06 17:53:48 -0800570 def _doPromoteEvent(self, event):
571 pipeline = self.layout.pipelines[event.pipeline_name]
572 change_ids = [c.split(',') for c in event.change_ids]
573 items_to_enqueue = []
574 change_queue = None
575 for shared_queue in pipeline.queues:
576 if change_queue:
577 break
578 for item in shared_queue.queue:
579 if (item.change.number == change_ids[0][0] and
580 item.change.patchset == change_ids[0][1]):
581 change_queue = shared_queue
582 break
583 if not change_queue:
584 raise Exception("Unable to find shared change queue for %s" %
585 event.change_ids[0])
586 for number, patchset in change_ids:
587 found = False
588 for item in change_queue.queue:
589 if (item.change.number == number and
590 item.change.patchset == patchset):
591 found = True
592 items_to_enqueue.append(item)
593 break
594 if not found:
595 raise Exception("Unable to find %s,%s in queue %s" %
596 (number, patchset, change_queue))
597 for item in change_queue.queue[:]:
598 if item not in items_to_enqueue:
599 items_to_enqueue.append(item)
600 pipeline.manager.cancelJobs(item)
601 pipeline.manager.dequeueItem(item)
602 for item in items_to_enqueue:
603 pipeline.manager.addChange(item.change, quiet=True)
604 while pipeline.manager.processQueue():
605 pass
606
James E. Blaire9d45c32012-05-31 09:56:45 -0700607 def _areAllBuildsComplete(self):
608 self.log.debug("Checking if all builds are complete")
609 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400610 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700611 for build in pipeline.manager.building_jobs.keys():
612 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700613 waiting = True
614 if not waiting:
615 self.log.debug("All builds are complete")
616 return True
617 self.log.debug("All builds are not complete")
618 return False
619
James E. Blairee743612012-05-29 14:49:32 -0700620 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800621 if statsd:
622 self.log.debug("Statsd enabled")
623 else:
624 self.log.debug("Statsd disabled because python statsd "
625 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700626 while True:
627 self.log.debug("Run handler sleeping")
628 self.wake_event.wait()
629 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700630 if self._stopped:
631 return
James E. Blairee743612012-05-29 14:49:32 -0700632 self.log.debug("Run handler awake")
633 try:
James E. Blair468c8512013-12-06 13:27:19 -0800634 if not self.management_event_queue.empty():
635 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700636
James E. Blair263fba92013-02-27 13:07:19 -0800637 # Give result events priority -- they let us stop builds,
638 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700639 if not self.result_event_queue.empty():
640 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800641 elif not self._pause:
642 if not self.trigger_event_queue.empty():
643 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700644
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700645 if self._pause and self._areAllBuildsComplete():
646 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700647
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700648 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700649 if not (self.trigger_event_queue.empty() and
650 self.result_event_queue.empty()):
651 self.wake_event.set()
652 else:
653 if not self.result_event_queue.empty():
654 self.wake_event.set()
James E. Blair0e933c52013-07-11 10:18:52 -0700655
656 if self._maintain_trigger_cache:
657 self.maintainTriggerCache()
658 self._maintain_trigger_cache = False
659
James E. Blairee743612012-05-29 14:49:32 -0700660 except:
661 self.log.exception("Exception in run handler:")
662
James E. Blair0e933c52013-07-11 10:18:52 -0700663 def maintainTriggerCache(self):
664 relevant = set()
665 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700666 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700667 for item in pipeline.getAllItems():
668 relevant.add(item.change)
669 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700670 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700671 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700672 for trigger in self.triggers.values():
673 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700674
James E. Blairee743612012-05-29 14:49:32 -0700675 def process_event_queue(self):
676 self.log.debug("Fetching trigger event")
677 event = self.trigger_event_queue.get()
678 self.log.debug("Processing trigger event %s" % event)
James E. Blaireff88162013-07-01 12:44:14 -0400679 project = self.layout.projects.get(event.project_name)
James E. Blairee743612012-05-29 14:49:32 -0700680 if not project:
681 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800682 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700683 return
684
Antoine Mussofeba9672013-01-17 13:44:59 +0100685 # Preprocessing for ref-update events
James E. Blair312df9a2013-08-27 17:17:02 -0700686 if event.ref:
Antoine Mussofeba9672013-01-17 13:44:59 +0100687 # Make sure the local git repo is up-to-date with the remote one.
688 # We better have the new ref before enqueuing the changes.
689 # This is done before enqueuing the changes to avoid calling an
690 # update per pipeline accepting the change.
James E. Blairb34e9262013-08-27 17:12:31 -0700691 self.log.info("Fetching references for %s" % project)
692 self.merger.updateRepo(project)
Antoine Mussofeba9672013-01-17 13:44:59 +0100693
James E. Blaireff88162013-07-01 12:44:14 -0400694 for pipeline in self.layout.pipelines.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700695 change = event.getChange(project,
696 self.triggers.get(event.trigger_name))
James E. Blair2fa50962013-01-30 21:50:41 -0800697 if event.type == 'patchset-created':
698 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700699 if pipeline.manager.eventMatches(event):
700 self.log.info("Adding %s, %s to %s" %
701 (project, change, pipeline))
702 pipeline.manager.addChange(change)
703 while pipeline.manager.processQueue():
704 pass
705
James E. Blairff791972013-01-09 11:45:43 -0800706 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700707
James E. Blair468c8512013-12-06 13:27:19 -0800708 def process_management_queue(self):
709 self.log.debug("Fetching management event")
710 event = self.management_event_queue.get()
711 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800712 try:
713 if isinstance(event, ReconfigureEvent):
714 self._doReconfigureEvent(event)
715 elif isinstance(event, PromoteEvent):
716 self._doPromoteEvent(event)
717 else:
718 self.log.error("Unable to handle event %s" % event)
719 event.done()
720 except Exception as e:
721 event.exception(e, sys.exc_info()[2])
James E. Blair468c8512013-12-06 13:27:19 -0800722 self.management_event_queue.task_done()
723
James E. Blairee743612012-05-29 14:49:32 -0700724 def process_result_queue(self):
725 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700726 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700727 self.log.debug("Processing result event %s" % build)
James E. Blaireff88162013-07-01 12:44:14 -0400728 for pipeline in self.layout.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700729 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700730 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800731 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700732 return
733 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700734 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800735 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700736 return
James E. Blairc84dd262012-05-31 10:03:13 -0700737 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800738 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700739
James E. Blair268d9342012-06-13 18:24:29 -0700740 def formatStatusHTML(self):
741 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700742 if self._pause:
743 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700744 if self._exit:
745 ret += 'exit'
746 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
747 ret += '</p>'
748
James E. Blaireff88162013-07-01 12:44:14 -0400749 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700750 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400751 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700752 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700753 ret += s + '\n'
754 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700755 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700756 ret += '\n'
757 ret += '</pre></html>'
758 return ret
759
James E. Blair8dbd56a2012-12-22 10:55:10 -0800760 def formatStatusJSON(self):
761 data = {}
762 if self._pause:
763 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800764 if self._exit:
765 ret += 'exit'
766 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
767 ret += '</p>'
768 data['message'] = ret
769
James E. Blairfb682cc2013-02-26 15:23:27 -0800770 data['trigger_event_queue'] = {}
771 data['trigger_event_queue']['length'] = \
772 self.trigger_event_queue.qsize()
773 data['result_event_queue'] = {}
774 data['result_event_queue']['length'] = \
775 self.result_event_queue.qsize()
776
James E. Blair8dbd56a2012-12-22 10:55:10 -0800777 pipelines = []
778 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400779 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800780 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400781 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800782 pipelines.append(pipeline.formatStatusJSON())
783 return json.dumps(data)
784
James E. Blair1e8dd892012-05-30 09:15:05 -0700785
James E. Blair4aea70c2012-07-26 14:23:24 -0700786class BasePipelineManager(object):
787 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700788
James E. Blair4aea70c2012-07-26 14:23:24 -0700789 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700790 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700791 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700792 self.building_jobs = {}
793 self.event_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700794 if self.sched.config and self.sched.config.has_option(
795 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700796 self.report_times = self.sched.config.getboolean(
797 'zuul', 'report_times')
798 else:
799 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700800
801 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700802 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700803
James E. Blaireff88162013-07-01 12:44:14 -0400804 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700805 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700806 self.log.info(" Events:")
807 for e in self.event_filters:
808 self.log.info(" %s" % e)
809 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700810
James E. Blairee743612012-05-29 14:49:32 -0700811 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700812 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700813 if tree.job:
814 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700815 for b in tree.job._branches:
816 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800817 for f in tree.job._files:
818 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700819 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700820 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700821 hold = ''
822 if tree.job.hold_following_changes:
823 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700824 voting = ''
825 if not tree.job.voting:
826 voting = ' [nonvoting]'
827 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
828 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700829 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700830 log_jobs(x, indent + 2)
831
James E. Blaireff88162013-07-01 12:44:14 -0400832 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700833 tree = self.pipeline.getJobTree(p)
834 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700835 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700836 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000837 self.log.info(" On start:")
838 self.log.info(" %s" % self.pipeline.start_actions)
839 self.log.info(" On success:")
840 self.log.info(" %s" % self.pipeline.success_actions)
841 self.log.info(" On failure:")
842 self.log.info(" %s" % self.pipeline.failure_actions)
James E. Blairee743612012-05-29 14:49:32 -0700843
James E. Blaire421a232012-07-25 16:59:21 -0700844 def getSubmitAllowNeeds(self):
845 # Get a list of code review labels that are allowed to be
846 # "needed" in the submit records for a change, with respect
847 # to this queue. In other words, the list of review labels
848 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000849 allow_needs = set()
850 for action_reporter in self.pipeline.success_actions:
851 allow_needs.update(action_reporter.getSubmitAllowNeeds())
852 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -0700853
James E. Blairee743612012-05-29 14:49:32 -0700854 def eventMatches(self, event):
James E. Blairad28e912013-11-27 10:43:22 -0800855 if event.forced_pipeline:
856 if event.forced_pipeline == self.pipeline.name:
857 return True
858 else:
859 return False
James E. Blairee743612012-05-29 14:49:32 -0700860 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700861 if ef.matches(event):
862 return True
863 return False
864
James E. Blair0dc8ba92012-07-16 14:23:52 -0700865 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700866 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700867 if change.equals(c):
868 return True
869 return False
870
James E. Blaire0487072012-08-29 17:38:31 -0700871 def reportStart(self, change):
872 try:
873 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000874 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -0700875 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700876 if self.sched.config.has_option('zuul', 'status_url'):
877 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000878 ret = self.sendReport(self.pipeline.start_actions,
879 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -0700880 if ret:
881 self.log.error("Reporting change start %s received: %s" %
882 (change, ret))
883 except:
884 self.log.exception("Exception while reporting start:")
885
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000886 def sendReport(self, action_reporters, change, message):
887 """Sends the built message off to configured reporters.
888
889 Takes the action_reporters, change, message and extra options and
890 sends them to the pluggable reporters.
891 """
892 report_errors = []
893 if len(action_reporters) > 0:
894 if not change.number:
895 self.log.debug("Not reporting change %s: No number present."
896 % change)
897 return
898 for action_reporter in action_reporters:
899 ret = action_reporter.report(change, message)
900 if ret:
901 report_errors.append(ret)
902 if len(report_errors) == 0:
903 return
904 return report_errors
905
James E. Blaire0487072012-08-29 17:38:31 -0700906 def isChangeReadyToBeEnqueued(self, change):
907 return True
908
James E. Blair36658cf2013-12-06 17:53:48 -0800909 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700910 return True
911
James E. Blair36658cf2013-12-06 17:53:48 -0800912 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -0700913 return True
914
James E. Blairfee8d652013-06-07 08:57:52 -0700915 def checkForChangesNeededBy(self, change):
916 return True
917
James E. Blair972e3c72013-08-29 12:04:55 -0700918 def getFailingDependentItem(self, item):
919 return None
920
James E. Blairfee8d652013-06-07 08:57:52 -0700921 def getDependentItems(self, item):
922 orig_item = item
923 items = []
924 while item.item_ahead:
925 items.append(item.item_ahead)
926 item = item.item_ahead
927 self.log.info("Change %s depends on changes %s" %
928 (orig_item.change,
929 [x.change for x in items]))
930 return items
931
James E. Blair972e3c72013-08-29 12:04:55 -0700932 def getItemForChange(self, change):
933 for item in self.pipeline.getAllItems():
934 if item.change.equals(change):
935 return item
936 return None
937
James E. Blair2fa50962013-01-30 21:50:41 -0800938 def findOldVersionOfChangeAlreadyInQueue(self, change):
939 for c in self.pipeline.getChangesInQueue():
940 if change.isUpdateOf(c):
941 return c
942 return None
943
944 def removeOldVersionsOfChange(self, change):
945 if not self.pipeline.dequeue_on_new_patchset:
946 return
947 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
948 if old_change:
949 self.log.debug("Change %s is a new version of %s, removing %s" %
950 (change, old_change, old_change))
951 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -0800952
James E. Blair972e3c72013-08-29 12:04:55 -0700953 def reEnqueueItem(self, item):
James E. Blaircdccd972013-07-01 12:10:22 -0700954 change_queue = self.pipeline.getQueue(item.change.project)
955 if change_queue:
956 self.log.debug("Re-enqueing change %s in queue %s" %
957 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -0700958 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -0700959 self.reportStats(item)
960 return True
961 else:
962 self.log.error("Unable to find change queue for project %s" %
963 item.change.project)
964 return False
965
James E. Blair36658cf2013-12-06 17:53:48 -0800966 def addChange(self, change, quiet=False):
James E. Blaire0487072012-08-29 17:38:31 -0700967 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700968 if self.isChangeAlreadyInQueue(change):
969 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700970 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700971
James E. Blaire0487072012-08-29 17:38:31 -0700972 if not self.isChangeReadyToBeEnqueued(change):
973 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
974 change)
975 return False
976
James E. Blair36658cf2013-12-06 17:53:48 -0800977 if not self.enqueueChangesAhead(change, quiet):
James E. Blair1490eba2013-03-06 19:14:00 -0800978 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700979 return False
980
981 if self.isChangeAlreadyInQueue(change):
982 self.log.debug("Change %s is already in queue, ignoring" % change)
983 return True
984
985 change_queue = self.pipeline.getQueue(change.project)
986 if change_queue:
987 self.log.debug("Adding change %s to queue %s" %
988 (change, change_queue))
James E. Blair36658cf2013-12-06 17:53:48 -0800989 if not quiet:
990 if len(self.pipeline.start_actions) > 0:
991 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700992 item = change_queue.enqueueChange(change)
993 self.reportStats(item)
James E. Blair36658cf2013-12-06 17:53:48 -0800994 self.enqueueChangesBehind(change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -0700995 else:
996 self.log.error("Unable to find change queue for project %s" %
997 change.project)
998 return False
James E. Blairee743612012-05-29 14:49:32 -0700999
James E. Blair972e3c72013-08-29 12:04:55 -07001000 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -07001001 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairfee8d652013-06-07 08:57:52 -07001002 change_queue = self.pipeline.getQueue(item.change.project)
1003 change_queue.dequeueItem(item)
James E. Blair0e933c52013-07-11 10:18:52 -07001004 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -08001005
1006 def removeChange(self, change):
1007 # Remove a change from the queue, probably because it has been
1008 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -07001009 for item in self.pipeline.getAllItems():
1010 if item.change == change:
1011 self.log.debug("Canceling builds behind change: %s "
1012 "because it is being removed." % item.change)
1013 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001014 self.dequeueItem(item)
James E. Blair94235562013-08-26 18:12:31 -07001015 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -08001016
James E. Blairfee8d652013-06-07 08:57:52 -07001017 def prepareRef(self, item):
1018 # Returns False on success.
1019 # Returns True if we were unable to prepare the ref.
1020 ref = item.current_build_set.ref
1021 if hasattr(item.change, 'refspec') and not ref:
1022 self.log.debug("Preparing ref for: %s" % item.change)
1023 item.current_build_set.setConfiguration()
1024 ref = item.current_build_set.ref
1025 dependent_items = self.getDependentItems(item)
1026 dependent_items.reverse()
1027 all_items = dependent_items + [item]
James E. Blairfee8d652013-06-07 08:57:52 -07001028 commit = self.sched.merger.mergeChanges(all_items, ref)
1029 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -07001030 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -07001031 self.log.info("Unable to merge change %s" % item.change)
James E. Blair972e3c72013-08-29 12:04:55 -07001032 msg = ("This change was unable to be automatically merged "
1033 "with the current state of the repository. Please "
1034 "rebase your change and upload a new patchset.")
James E. Blair6736beb2013-07-11 15:18:15 -07001035 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -07001036 return True
1037 return False
1038
1039 def _launchJobs(self, item, jobs):
1040 self.log.debug("Launching jobs for change %s" % item.change)
1041 dependent_items = self.getDependentItems(item)
1042 for job in jobs:
1043 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001044 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001045 build = self.sched.launcher.launch(job, item,
1046 self.pipeline,
1047 dependent_items)
1048 self.building_jobs[build] = item
1049 self.log.debug("Adding build %s of job %s to item %s" %
1050 (build, job, item))
1051 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -07001052 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001053 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -07001054 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -07001055
James E. Blairfee8d652013-06-07 08:57:52 -07001056 def launchJobs(self, item):
1057 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -07001058 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -07001059 self._launchJobs(item, jobs)
1060
1061 def cancelJobs(self, item, prime=True):
1062 self.log.debug("Cancel jobs for change %s" % item.change)
1063 canceled = False
1064 to_remove = []
James E. Blair36658cf2013-12-06 17:53:48 -08001065 if prime and item.current_build_set.ref:
James E. Blairfee8d652013-06-07 08:57:52 -07001066 item.resetAllBuilds()
1067 for build, build_item in self.building_jobs.items():
1068 if build_item == item:
1069 self.log.debug("Found build %s for change %s to cancel" %
1070 (build, item.change))
1071 try:
1072 self.sched.launcher.cancel(build)
1073 except:
1074 self.log.exception("Exception while canceling build %s "
1075 "for change %s" % (build, item.change))
1076 to_remove.append(build)
1077 canceled = True
1078 for build in to_remove:
1079 self.log.debug("Removing build %s from running builds" % build)
1080 build.result = 'CANCELED'
1081 del self.building_jobs[build]
James E. Blair972e3c72013-08-29 12:04:55 -07001082 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -07001083 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -07001084 (item_behind.change, item.change))
1085 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -07001086 canceled = True
1087 return canceled
1088
James E. Blair972e3c72013-08-29 12:04:55 -07001089 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -07001090 changed = False
1091 item_ahead = item.item_ahead
James E. Blair972e3c72013-08-29 12:04:55 -07001092 change_queue = self.pipeline.getQueue(item.change.project)
1093 failing_reasons = [] # Reasons this item is failing
1094
James E. Blairfee8d652013-06-07 08:57:52 -07001095 if self.checkForChangesNeededBy(item.change) is not True:
1096 # It's not okay to enqueue this change, we should remove it.
1097 self.log.info("Dequeuing change %s because "
1098 "it can no longer merge" % item.change)
1099 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -07001100 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -07001101 self.pipeline.setDequeuedNeedingChange(item)
1102 try:
1103 self.reportItem(item)
1104 except MergeFailure:
1105 pass
James E. Blair972e3c72013-08-29 12:04:55 -07001106 return (True, nnfi)
1107 dep_item = self.getFailingDependentItem(item)
1108 if dep_item:
1109 failing_reasons.append('a needed change is failing')
1110 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -07001111 else:
James E. Blairfef71632013-09-23 11:15:47 -07001112 item_ahead_merged = False
1113 if ((item_ahead and item_ahead.change.is_merged) or
1114 not change_queue.dependent):
1115 item_ahead_merged = True
1116 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -07001117 # Our current base is different than what we expected,
1118 # and it's not because our current base merged. Something
1119 # ahead must have failed.
1120 self.log.info("Resetting builds for change %s because the "
1121 "item ahead, %s, is not the nearest non-failing "
1122 "item, %s" % (item.change, item_ahead, nnfi))
1123 change_queue.moveItem(item, nnfi)
1124 changed = True
1125 self.cancelJobs(item)
1126 self.prepareRef(item)
1127 if item.current_build_set.unable_to_merge:
James E. Blair062c4fb2013-09-26 07:46:00 -07001128 failing_reasons.append("it has a merge conflict")
James E. Blairfee8d652013-06-07 08:57:52 -07001129 if self.launchJobs(item):
1130 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001131 if self.pipeline.didAnyJobFail(item):
1132 failing_reasons.append("at least one job failed")
1133 if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
1134 try:
1135 self.reportItem(item)
1136 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001137 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001138 for item_behind in item.items_behind:
1139 self.log.info("Resetting builds for change %s because the "
1140 "item ahead, %s, failed to merge" %
1141 (item_behind.change, item))
1142 self.cancelJobs(item_behind)
1143 self.dequeueItem(item)
1144 changed = True
1145 elif not failing_reasons:
1146 nnfi = item
1147 item.current_build_set.failing_reasons = failing_reasons
1148 if failing_reasons:
1149 self.log.debug("%s is a failing item because %s" %
1150 (item, failing_reasons))
1151 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001152
1153 def processQueue(self):
1154 # Do whatever needs to be done for each change in the queue
1155 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1156 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001157 for queue in self.pipeline.queues:
1158 queue_changed = False
1159 nnfi = None # Nearest non-failing item
1160 for item in queue.queue[:]:
1161 item_changed, nnfi = self._processOneItem(item, nnfi)
1162 if item_changed:
1163 queue_changed = True
1164 self.reportStats(item)
1165 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001166 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001167 status = ''
1168 for item in queue.queue:
1169 status += self.pipeline.formatStatus(item)
1170 if status:
1171 self.log.debug("Queue %s status is now:\n %s" %
1172 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001173 self.log.debug("Finished queue processor: %s (changed: %s)" %
1174 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001175 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001176
James E. Blair11700c32012-07-05 17:50:05 -07001177 def updateBuildDescriptions(self, build_set):
1178 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001179 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001180 self.sched.launcher.setBuildDescription(build, desc)
1181
1182 if build_set.previous_build_set:
1183 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001184 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001185 self.sched.launcher.setBuildDescription(build, desc)
1186
1187 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -07001188 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -07001189 # Or triggered externally, or triggered before zuul started,
1190 # or restarted
1191 return False
1192
James E. Blairfee8d652013-06-07 08:57:52 -07001193 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001194 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001195 while self.processQueue():
1196 pass
James E. Blair11700c32012-07-05 17:50:05 -07001197 return True
1198
James E. Blairee743612012-05-29 14:49:32 -07001199 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -07001200 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -07001201 # Or triggered externally, or triggered before zuul started,
1202 # or restarted
1203 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001204
1205 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -07001206 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001207 self.log.debug("Found change %s which triggered completed build %s" %
1208 (change, build))
James E. Blairee743612012-05-29 14:49:32 -07001209
1210 del self.building_jobs[build]
1211
James E. Blair4aea70c2012-07-26 14:23:24 -07001212 self.pipeline.setResult(change, build)
James E. Blair972e3c72013-08-29 12:04:55 -07001213 self.log.debug("Change %s status is now:\n %s" %
1214 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -07001215 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001216 while self.processQueue():
1217 pass
James E. Blairee743612012-05-29 14:49:32 -07001218 return True
1219
James E. Blairfee8d652013-06-07 08:57:52 -07001220 def reportItem(self, item):
1221 if item.change.is_reportable and item.reported:
1222 raise Exception("Already reported change %s" % item.change)
1223 ret = self._reportItem(item)
1224 if self.changes_merge:
1225 succeeded = self.pipeline.didAllJobsSucceed(item)
1226 merged = (not ret)
1227 if merged:
James E. Blair6c358e72013-07-29 17:06:47 -07001228 merged = self.pipeline.trigger.isMerged(item.change,
1229 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001230 self.log.info("Reported change %s status: all-succeeded: %s, "
1231 "merged: %s" % (item.change, succeeded, merged))
1232 if not (succeeded and merged):
1233 self.log.debug("Reported change %s failed tests or failed "
1234 "to merge" % (item.change))
1235 raise MergeFailure("Change %s failed to merge" % item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001236
James E. Blairfee8d652013-06-07 08:57:52 -07001237 def _reportItem(self, item):
1238 if not item.change.is_reportable:
James E. Blaire0487072012-08-29 17:38:31 -07001239 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001240 if item.change.is_reportable and item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001241 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001242 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001243 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001244 if self.pipeline.didAllJobsSucceed(item):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001245 self.log.debug("success %s %s" % (self.pipeline.success_actions,
1246 self.pipeline.failure_actions))
1247 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001248 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001249 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001250 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001251 item.setReportedResult('FAILURE')
1252 report = self.formatReport(item)
1253 item.reported = True
James E. Blairee743612012-05-29 14:49:32 -07001254 try:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001255 self.log.info("Reporting change %s, actions: %s" %
1256 (item.change, actions))
1257 ret = self.sendReport(actions, item.change, report)
James E. Blairee743612012-05-29 14:49:32 -07001258 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001259 self.log.error("Reporting change %s received: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001260 (item.change, ret))
James E. Blairee743612012-05-29 14:49:32 -07001261 except:
1262 self.log.exception("Exception while reporting:")
James E. Blairfee8d652013-06-07 08:57:52 -07001263 item.setReportedResult('ERROR')
1264 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001265 return ret
1266
James E. Blairfee8d652013-06-07 08:57:52 -07001267 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001268 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001269 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001270 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001271 else:
James E. Blair56370192013-01-14 15:47:28 -08001272 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001273
James E. Blairfee8d652013-06-07 08:57:52 -07001274 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001275 ret += "This change depends on a change that failed to merge."
James E. Blair6736beb2013-07-11 15:18:15 -07001276 elif item.current_build_set.unable_to_merge_message:
1277 ret += item.current_build_set.unable_to_merge_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001278 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001279 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001280 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001281 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001282 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001283 for job in self.pipeline.getJobs(item.change):
1284 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001285 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001286 pattern = url_pattern
1287 if result == 'SUCCESS':
1288 if job.success_message:
1289 result = job.success_message
1290 if job.success_pattern:
1291 pattern = job.success_pattern
1292 elif result == 'FAILURE':
1293 if job.failure_message:
1294 result = job.failure_message
1295 if job.failure_pattern:
1296 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001297 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001298 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001299 pipeline=self.pipeline,
1300 job=job,
1301 build=build)
1302 else:
1303 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001304 if not job.voting:
1305 voting = ' (non-voting)'
1306 else:
1307 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001308 if self.report_times and build.end_time and build.start_time:
1309 dt = int(build.end_time - build.start_time)
1310 m, s = divmod(dt, 60)
1311 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001312 if h:
1313 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1314 elif m:
1315 elapsed = ' in %dm %02ds' % (m, s)
1316 else:
1317 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001318 else:
1319 elapsed = ''
James E. Blair754e31e2013-08-18 13:15:15 -07001320 name = ''
James E. Blair8fef52b2013-08-17 17:32:50 -07001321 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1322 if self.sched.config.getboolean('zuul',
1323 'job_name_in_report'):
1324 name = job.name + ' '
James E. Blair8fef52b2013-08-17 17:32:50 -07001325 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1326 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001327 return ret
1328
1329 def formatDescription(self, build):
1330 concurrent_changes = ''
1331 concurrent_builds = ''
1332 other_builds = ''
1333
1334 for change in build.build_set.other_changes:
1335 concurrent_changes += '<li><a href="{change.url}">\
1336 {change.number},{change.patchset}</a></li>'.format(
1337 change=change)
1338
James E. Blairfee8d652013-06-07 08:57:52 -07001339 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001340
1341 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001342 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001343 concurrent_builds += """\
1344<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001345 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001346 {build.job.name} #{build.number}</a>: {build.result}
1347</li>
1348""".format(build=build)
1349 else:
1350 concurrent_builds += """\
1351<li>
1352 {build.job.name}: {build.result}
1353</li>""".format(build=build)
1354
1355 if build.build_set.previous_build_set:
1356 other_build = build.build_set.previous_build_set.getBuild(
1357 build.job.name)
1358 if other_build:
1359 other_builds += """\
1360<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001361 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001362 {build.job.name} #{build.number}</a>
1363</li>
1364""".format(build=other_build)
1365
1366 if build.build_set.next_build_set:
1367 other_build = build.build_set.next_build_set.getBuild(
1368 build.job.name)
1369 if other_build:
1370 other_builds += """\
1371<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001372 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001373 {build.job.name} #{build.number}</a>
1374</li>
1375""".format(build=other_build)
1376
1377 result = build.build_set.result
1378
1379 if hasattr(change, 'number'):
1380 ret = """\
1381<p>
1382 Triggered by change:
1383 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1384 Branch: <b>{change.branch}</b><br/>
1385 Pipeline: <b>{self.pipeline.name}</b>
1386</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001387 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001388 ret = """\
1389<p>
1390 Triggered by reference:
1391 {change.ref}</a><br/>
1392 Old revision: <b>{change.oldrev}</b><br/>
1393 New revision: <b>{change.newrev}</b><br/>
1394 Pipeline: <b>{self.pipeline.name}</b>
1395</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001396 else:
1397 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001398
1399 if concurrent_changes:
1400 ret += """\
1401<p>
1402 Other changes tested concurrently with this change:
1403 <ul>{concurrent_changes}</ul>
1404</p>
1405"""
1406 if concurrent_builds:
1407 ret += """\
1408<p>
1409 All builds for this change set:
1410 <ul>{concurrent_builds}</ul>
1411</p>
1412"""
1413
1414 if other_builds:
1415 ret += """\
1416<p>
1417 Other build sets for this change:
1418 <ul>{other_builds}</ul>
1419</p>
1420"""
1421 if result:
1422 ret += """\
1423<p>
1424 Reported result: <b>{result}</b>
1425</p>
1426"""
1427
1428 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001429 return ret
1430
James E. Blairfee8d652013-06-07 08:57:52 -07001431 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001432 if not statsd:
1433 return
1434 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001435 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001436 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001437 if item.dequeue_time:
1438 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001439 else:
1440 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001441 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001442
1443 # stats.timers.zuul.pipeline.NAME.resident_time
1444 # stats_counts.zuul.pipeline.NAME.total_changes
1445 # stats.gauges.zuul.pipeline.NAME.current_changes
1446 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001447 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001448 if dt:
1449 statsd.timing(key + '.resident_time', dt)
1450 statsd.incr(key + '.total_changes')
1451
1452 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1453 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001454 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001455 key += '.%s' % project_name
1456 if dt:
1457 statsd.timing(key + '.resident_time', dt)
1458 statsd.incr(key + '.total_changes')
1459 except:
1460 self.log.exception("Exception reporting pipeline stats")
1461
James E. Blair1e8dd892012-05-30 09:15:05 -07001462
James E. Blair4aea70c2012-07-26 14:23:24 -07001463class IndependentPipelineManager(BasePipelineManager):
1464 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001465 changes_merge = False
1466
James E. Blaireff88162013-07-01 12:44:14 -04001467 def _postConfig(self, layout):
1468 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001469
1470 change_queue = ChangeQueue(self.pipeline, dependent=False)
1471 for project in self.pipeline.getProjects():
1472 change_queue.addProject(project)
1473
1474 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001475
James E. Blair1e8dd892012-05-30 09:15:05 -07001476
James E. Blair4aea70c2012-07-26 14:23:24 -07001477class DependentPipelineManager(BasePipelineManager):
1478 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001479 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001480
1481 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001482 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001483
James E. Blaireff88162013-07-01 12:44:14 -04001484 def _postConfig(self, layout):
1485 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001486 self.buildChangeQueues()
1487
1488 def buildChangeQueues(self):
1489 self.log.debug("Building shared change queues")
1490 change_queues = []
1491
James E. Blair4aea70c2012-07-26 14:23:24 -07001492 for project in self.pipeline.getProjects():
1493 change_queue = ChangeQueue(self.pipeline)
1494 change_queue.addProject(project)
1495 change_queues.append(change_queue)
1496 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001497
James E. Blairc3d428e2013-12-03 15:06:48 -08001498 # Iterate over all queues trying to combine them, and keep doing
1499 # so until they can not be combined further.
1500 last_change_queues = change_queues
1501 while True:
1502 new_change_queues = self.combineChangeQueues(last_change_queues)
1503 if len(last_change_queues) == len(new_change_queues):
1504 break
1505 last_change_queues = new_change_queues
1506
1507 self.log.info(" Shared change queues:")
1508 for queue in new_change_queues:
1509 self.pipeline.addQueue(queue)
1510 self.log.info(" %s" % queue)
1511
1512 def combineChangeQueues(self, change_queues):
James E. Blairee743612012-05-29 14:49:32 -07001513 self.log.debug("Combining shared queues")
1514 new_change_queues = []
1515 for a in change_queues:
1516 merged_a = False
1517 for b in new_change_queues:
1518 if not a.getJobs().isdisjoint(b.getJobs()):
1519 self.log.debug("Merging queue %s into %s" % (a, b))
1520 b.mergeChangeQueue(a)
1521 merged_a = True
1522 break # this breaks out of 'for b' and continues 'for a'
1523 if not merged_a:
1524 self.log.debug("Keeping queue %s" % (a))
1525 new_change_queues.append(a)
James E. Blairc3d428e2013-12-03 15:06:48 -08001526 return new_change_queues
James E. Blairee743612012-05-29 14:49:32 -07001527
James E. Blaire0487072012-08-29 17:38:31 -07001528 def isChangeReadyToBeEnqueued(self, change):
James E. Blair6c358e72013-07-29 17:06:47 -07001529 if not self.pipeline.trigger.canMerge(change,
1530 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001531 self.log.debug("Change %s can not merge, ignoring" % change)
1532 return False
1533 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001534
James E. Blair36658cf2013-12-06 17:53:48 -08001535 def enqueueChangesBehind(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001536 to_enqueue = []
1537 self.log.debug("Checking for changes needing %s:" % change)
1538 if not hasattr(change, 'needed_by_changes'):
1539 self.log.debug(" Changeish does not support dependencies")
1540 return
1541 for needs in change.needed_by_changes:
James E. Blair6c358e72013-07-29 17:06:47 -07001542 if self.pipeline.trigger.canMerge(needs,
1543 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001544 self.log.debug(" Change %s needs %s and is ready to merge" %
1545 (needs, change))
1546 to_enqueue.append(needs)
1547 if not to_enqueue:
1548 self.log.debug(" No changes need %s" % change)
1549
1550 for other_change in to_enqueue:
James E. Blair36658cf2013-12-06 17:53:48 -08001551 self.addChange(other_change, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001552
James E. Blair36658cf2013-12-06 17:53:48 -08001553 def enqueueChangesAhead(self, change, quiet):
James E. Blaire0487072012-08-29 17:38:31 -07001554 ret = self.checkForChangesNeededBy(change)
1555 if ret in [True, False]:
1556 return ret
1557 self.log.debug(" Change %s must be merged ahead of %s" %
1558 (ret, change))
James E. Blair36658cf2013-12-06 17:53:48 -08001559 return self.addChange(ret, quiet)
James E. Blaire0487072012-08-29 17:38:31 -07001560
1561 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001562 self.log.debug("Checking for changes needed by %s:" % change)
1563 # Return true if okay to proceed enqueing this change,
1564 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001565 if not hasattr(change, 'needs_change'):
1566 self.log.debug(" Changeish does not support dependencies")
1567 return True
James E. Blaire421a232012-07-25 16:59:21 -07001568 if not change.needs_change:
1569 self.log.debug(" No changes needed")
1570 return True
1571 if change.needs_change.is_merged:
1572 self.log.debug(" Needed change is merged")
1573 return True
1574 if not change.needs_change.is_current_patchset:
1575 self.log.debug(" Needed change is not the current patchset")
1576 return False
James E. Blair127bc182012-08-28 15:55:15 -07001577 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001578 self.log.debug(" Needed change is already ahead in the queue")
1579 return True
James E. Blair6c358e72013-07-29 17:06:47 -07001580 if self.pipeline.trigger.canMerge(change.needs_change,
1581 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001582 self.log.debug(" Change %s is needed" %
1583 change.needs_change)
1584 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001585 # The needed change can't be merged.
1586 self.log.debug(" Change %s is needed but can not be merged" %
1587 change.needs_change)
1588 return False
James E. Blair972e3c72013-08-29 12:04:55 -07001589
1590 def getFailingDependentItem(self, item):
1591 if not hasattr(item.change, 'needs_change'):
1592 return None
1593 if not item.change.needs_change:
1594 return None
1595 needs_item = self.getItemForChange(item.change.needs_change)
1596 if not needs_item:
1597 return None
1598 if needs_item.current_build_set.failing_reasons:
1599 return needs_item
1600 return None