blob: 938bb501256de71371e43020de06c8389c9c602b [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
25import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080027import yaml
James E. Blairee743612012-05-29 14:49:32 -070028
James E. Blair47958382013-01-10 17:26:02 -080029import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070030import model
James E. Blair4aea70c2012-07-26 14:23:24 -070031from model import Pipeline, Job, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070032import merger
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair71e94122012-12-24 17:53:08 -080034statsd = extras.try_import('statsd.statsd')
35
James E. Blair1e8dd892012-05-30 09:15:05 -070036
Antoine Musso80edd5a2013-02-13 15:37:53 +010037def deep_format(obj, paramdict):
38 """Apply the paramdict via str.format() to all string objects found within
39 the supplied obj. Lists and dicts are traversed recursively.
40
41 Borrowed from Jenkins Job Builder project"""
42 if isinstance(obj, str):
43 ret = obj.format(**paramdict)
44 elif isinstance(obj, list):
45 ret = []
46 for item in obj:
47 ret.append(deep_format(item, paramdict))
48 elif isinstance(obj, dict):
49 ret = {}
50 for item in obj:
51 exp_item = item.format(**paramdict)
52
53 ret[exp_item] = deep_format(obj[item], paramdict)
54 else:
55 ret = obj
56 return ret
57
58
James E. Blaire9d45c32012-05-31 09:56:45 -070059class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -070060 log = logging.getLogger("zuul.Scheduler")
61
James E. Blaire9d45c32012-05-31 09:56:45 -070062 def __init__(self):
63 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -040064 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -070065 self.wake_event = threading.Event()
James E. Blaire9d45c32012-05-31 09:56:45 -070066 self.reconfigure_complete_event = threading.Event()
James E. Blair5d5bc2b2012-07-06 10:24:01 -070067 self._pause = False
68 self._reconfigure = False
69 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -070070 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -070071 self.launcher = None
72 self.trigger = None
James E. Blair3c5e5b52013-04-26 11:17:03 -070073 self.config = None
James E. Blairee743612012-05-29 14:49:32 -070074
75 self.trigger_event_queue = Queue.Queue()
76 self.result_event_queue = Queue.Queue()
James E. Blaire9d45c32012-05-31 09:56:45 -070077 self._init()
James E. Blairee743612012-05-29 14:49:32 -070078
James E. Blaire9d45c32012-05-31 09:56:45 -070079 def _init(self):
James E. Blair4aea70c2012-07-26 14:23:24 -070080 self.pipelines = {}
James E. Blaire9d45c32012-05-31 09:56:45 -070081 self.jobs = {}
82 self.projects = {}
Antoine Musso80edd5a2013-02-13 15:37:53 +010083 self.project_templates = {}
James E. Blairb0954652012-06-01 11:32:01 -070084 self.metajobs = {}
James E. Blairee743612012-05-29 14:49:32 -070085
James E. Blairb0fcae42012-07-17 11:12:10 -070086 def stop(self):
87 self._stopped = True
88 self.wake_event.set()
89
James E. Blair47958382013-01-10 17:26:02 -080090 def testConfig(self, config_path):
91 self._init()
92 self._parseConfig(config_path)
93
James E. Blaire5a847f2012-07-10 15:29:14 -070094 def _parseConfig(self, config_path):
James E. Blairee743612012-05-29 14:49:32 -070095 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -070096 if not item:
97 return []
James E. Blair32663402012-06-01 10:04:18 -070098 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -070099 return item
100 return [item]
101
James E. Blaire5a847f2012-07-10 15:29:14 -0700102 if config_path:
103 config_path = os.path.expanduser(config_path)
104 if not os.path.exists(config_path):
105 raise Exception("Unable to read layout config file at %s" %
106 config_path)
107 config_file = open(config_path)
108 data = yaml.load(config_file)
109
James E. Blair47958382013-01-10 17:26:02 -0800110 validator = layoutvalidator.LayoutValidator()
111 validator.validate(data)
112
James E. Blaire5a847f2012-07-10 15:29:14 -0700113 self._config_env = {}
114 for include in data.get('includes', []):
115 if 'python-file' in include:
116 fn = include['python-file']
117 if not os.path.isabs(fn):
118 base = os.path.dirname(config_path)
119 fn = os.path.join(base, fn)
120 fn = os.path.expanduser(fn)
121 execfile(fn, self._config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700122
James E. Blair4aea70c2012-07-26 14:23:24 -0700123 for conf_pipeline in data.get('pipelines', []):
124 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800125 pipeline.description = conf_pipeline.get('description')
James E. Blair56370192013-01-14 15:47:28 -0800126 pipeline.failure_message = conf_pipeline.get('failure-message',
127 "Build failed.")
128 pipeline.success_message = conf_pipeline.get('success-message',
129 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800130 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
131 'dequeue-on-new-patchset',
132 True)
James E. Blair4aea70c2012-07-26 14:23:24 -0700133 manager = globals()[conf_pipeline['manager']](self, pipeline)
134 pipeline.setManager(manager)
135
136 self.pipelines[conf_pipeline['name']] = pipeline
137 manager.success_action = conf_pipeline.get('success')
138 manager.failure_action = conf_pipeline.get('failure')
139 manager.start_action = conf_pipeline.get('start')
140 for trigger in toList(conf_pipeline['trigger']):
James E. Blairee743612012-05-29 14:49:32 -0700141 approvals = {}
142 for approval_dict in toList(trigger.get('approval')):
143 for k, v in approval_dict.items():
James E. Blair1e8dd892012-05-30 09:15:05 -0700144 approvals[k] = v
James E. Blairee743612012-05-29 14:49:32 -0700145 f = EventFilter(types=toList(trigger['event']),
146 branches=toList(trigger.get('branch')),
147 refs=toList(trigger.get('ref')),
Clark Boylanb9bcb402012-06-29 17:44:05 -0700148 approvals=approvals,
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800149 comment_filters=
Antoine Mussob4e809e2012-12-06 16:58:06 +0100150 toList(trigger.get('comment_filter')),
151 email_filters=
152 toList(trigger.get('email_filter')))
James E. Blairee743612012-05-29 14:49:32 -0700153 manager.event_filters.append(f)
154
Antoine Musso80edd5a2013-02-13 15:37:53 +0100155 for project_template in data.get('project-templates', []):
156 # Make sure the template only contains valid pipelines
157 tpl = dict(
158 (pipe_name, project_template.get(pipe_name))
159 for pipe_name in self.pipelines.keys()
160 if pipe_name in project_template
161 )
162 self.project_templates[project_template.get('name')] \
163 = tpl
164
James E. Blair47958382013-01-10 17:26:02 -0800165 for config_job in data.get('jobs', []):
James E. Blairee743612012-05-29 14:49:32 -0700166 job = self.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700167 # Be careful to only set attributes explicitly present on
168 # this job, to avoid squashing attributes set by a meta-job.
169 m = config_job.get('failure-message', None)
170 if m:
171 job.failure_message = m
172 m = config_job.get('success-message', None)
173 if m:
174 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800175 m = config_job.get('failure-pattern', None)
176 if m:
177 job.failure_pattern = m
178 m = config_job.get('success-pattern', None)
179 if m:
180 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700181 m = config_job.get('hold-following-changes', False)
182 if m:
183 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700184 m = config_job.get('voting', None)
185 if m is not None:
186 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700187 fname = config_job.get('parameter-function', None)
188 if fname:
189 func = self._config_env.get(fname, None)
190 if not func:
191 raise Exception("Unable to find function %s" % fname)
192 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700193 branches = toList(config_job.get('branch'))
194 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700195 job._branches = branches
196 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800197 files = toList(config_job.get('files'))
198 if files:
199 job._files = files
200 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700201
202 def add_jobs(job_tree, config_jobs):
203 for job in config_jobs:
204 if isinstance(job, list):
205 for x in job:
206 add_jobs(job_tree, x)
207 if isinstance(job, dict):
208 for parent, children in job.items():
209 parent_tree = job_tree.addJob(self.getJob(parent))
210 add_jobs(parent_tree, children)
211 if isinstance(job, str):
212 job_tree.addJob(self.getJob(job))
213
James E. Blair47958382013-01-10 17:26:02 -0800214 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700215 project = Project(config_project['name'])
Antoine Musso80edd5a2013-02-13 15:37:53 +0100216
217 for requested_template in config_project.get('template', []):
218 # Fetch the template from 'project-templates'
219 tpl = self.project_templates.get(
220 requested_template.get('name'))
221 # Expand it with the project context
222 expanded = deep_format(tpl, requested_template)
223 # Finally merge the expansion with whatever has been already
224 # defined for this project
225 config_project.update(expanded)
226
James E. Blairee743612012-05-29 14:49:32 -0700227 self.projects[config_project['name']] = project
James E. Blair4886cc12012-07-18 15:39:41 -0700228 mode = config_project.get('merge-mode')
229 if mode and mode == 'cherry-pick':
230 project.merge_mode = model.CHERRY_PICK
James E. Blair4aea70c2012-07-26 14:23:24 -0700231 for pipeline in self.pipelines.values():
232 if pipeline.name in config_project:
233 job_tree = pipeline.addProject(project)
234 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700235 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700236
James E. Blairb0954652012-06-01 11:32:01 -0700237 # All jobs should be defined at this point, get rid of
238 # metajobs so that getJob isn't doing anything weird.
239 self.metajobs = {}
240
James E. Blair4aea70c2012-07-26 14:23:24 -0700241 for pipeline in self.pipelines.values():
242 pipeline.manager._postConfig()
James E. Blairee743612012-05-29 14:49:32 -0700243
James E. Blair47958382013-01-10 17:26:02 -0800244 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700245 if self.config.has_option('zuul', 'git_dir'):
246 merge_root = self.config.get('zuul', 'git_dir')
247 else:
248 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800249
Paul Belangerb67aba12013-05-13 19:22:14 -0400250 if self.config.has_option('zuul', 'git_user_email'):
251 merge_email = self.config.get('zuul', 'git_user_email')
252 else:
253 merge_email = None
254
255 if self.config.has_option('zuul', 'git_user_name'):
256 merge_name = self.config.get('zuul', 'git_user_name')
257 else:
258 merge_name = None
259
James E. Blairceabcbc2012-08-17 13:48:46 -0700260 if self.config.has_option('zuul', 'push_change_refs'):
261 push_refs = self.config.getboolean('zuul', 'push_change_refs')
262 else:
263 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800264
James E. Blairad615012012-11-30 16:14:21 -0800265 if self.config.has_option('gerrit', 'sshkey'):
266 sshkey = self.config.get('gerrit', 'sshkey')
267 else:
268 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800269
James E. Blairad615012012-11-30 16:14:21 -0800270 self.merger = merger.Merger(self.trigger, merge_root, push_refs,
Paul Belangerb67aba12013-05-13 19:22:14 -0400271 sshkey, merge_email, merge_name)
James E. Blair4886cc12012-07-18 15:39:41 -0700272 for project in self.projects.values():
273 url = self.trigger.getGitUrl(project)
274 self.merger.addProject(project, url)
275
James E. Blairee743612012-05-29 14:49:32 -0700276 def getJob(self, name):
James E. Blair1e8dd892012-05-30 09:15:05 -0700277 if name in self.jobs:
James E. Blairee743612012-05-29 14:49:32 -0700278 return self.jobs[name]
279 job = Job(name)
James E. Blairb0954652012-06-01 11:32:01 -0700280 if name.startswith('^'):
281 # This is a meta-job
282 regex = re.compile(name)
283 self.metajobs[regex] = job
284 else:
285 # Apply attributes from matching meta-jobs
286 for regex, metajob in self.metajobs.items():
287 if regex.match(name):
288 job.copy(metajob)
289 self.jobs[name] = job
James E. Blairee743612012-05-29 14:49:32 -0700290 return job
291
292 def setLauncher(self, launcher):
293 self.launcher = launcher
294
295 def setTrigger(self, trigger):
296 self.trigger = trigger
297
298 def addEvent(self, event):
299 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800300 try:
301 if statsd:
302 statsd.incr('gerrit.event.%s' % event.type)
303 except:
304 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700305 self.trigger_event_queue.put(event)
306 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800307 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700308
James E. Blair11700c32012-07-05 17:50:05 -0700309 def onBuildStarted(self, build):
310 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800311 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700312 self.result_event_queue.put(('started', build))
313 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800314 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700315
James E. Blairee743612012-05-29 14:49:32 -0700316 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700317 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800318 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800319 try:
320 if statsd:
321 key = 'zuul.job.%s' % build.job.name
322 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
323 dt = int((build.end_time - build.start_time) * 1000)
324 statsd.timing(key, dt)
325 statsd.incr(key)
326 except:
327 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700328 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700329 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800330 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700331
James E. Blaire9d45c32012-05-31 09:56:45 -0700332 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700333 self.log.debug("Prepare to reconfigure")
James E. Blaire9d45c32012-05-31 09:56:45 -0700334 self.config = config
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700335 self._pause = True
336 self._reconfigure = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700337 self.wake_event.set()
338 self.log.debug("Waiting for reconfiguration")
339 self.reconfigure_complete_event.wait()
340 self.reconfigure_complete_event.clear()
341 self.log.debug("Reconfiguration complete")
342
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700343 def exit(self):
344 self.log.debug("Prepare to exit")
345 self._pause = True
346 self._exit = True
347 self.wake_event.set()
348 self.log.debug("Waiting for exit")
349
350 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700351 if self.config.has_option('zuul', 'state_dir'):
352 state_dir = os.path.expanduser(self.config.get('zuul',
353 'state_dir'))
354 else:
355 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700356 return os.path.join(state_dir, 'queue.pickle')
357
358 def _save_queue(self):
359 pickle_file = self._get_queue_pickle_file()
360 events = []
361 while not self.trigger_event_queue.empty():
362 events.append(self.trigger_event_queue.get())
363 self.log.debug("Queue length is %s" % len(events))
364 if events:
365 self.log.debug("Saving queue")
366 pickle.dump(events, open(pickle_file, 'wb'))
367
368 def _load_queue(self):
369 pickle_file = self._get_queue_pickle_file()
370 if os.path.exists(pickle_file):
371 self.log.debug("Loading queue")
372 events = pickle.load(open(pickle_file, 'rb'))
373 self.log.debug("Queue length is %s" % len(events))
374 for event in events:
375 self.trigger_event_queue.put(event)
376 else:
377 self.log.debug("No queue file found")
378
379 def _delete_queue(self):
380 pickle_file = self._get_queue_pickle_file()
381 if os.path.exists(pickle_file):
382 self.log.debug("Deleting saved queue")
383 os.unlink(pickle_file)
384
385 def resume(self):
386 try:
387 self._load_queue()
388 except:
389 self.log.exception("Unable to load queue")
390 try:
391 self._delete_queue()
392 except:
393 self.log.exception("Unable to delete saved queue")
394 self.log.debug("Resuming queue processing")
395 self.wake_event.set()
396
397 def _doPauseEvent(self):
398 if self._exit:
399 self.log.debug("Exiting")
400 self._save_queue()
401 os._exit(0)
402 if self._reconfigure:
403 self.log.debug("Performing reconfiguration")
404 self._init()
405 self._parseConfig(self.config.get('zuul', 'layout_config'))
James E. Blair47958382013-01-10 17:26:02 -0800406 self._setupMerger()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700407 self._pause = False
James E. Blaire0487072012-08-29 17:38:31 -0700408 self._reconfigure = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700409 self.reconfigure_complete_event.set()
James E. Blaire9d45c32012-05-31 09:56:45 -0700410
411 def _areAllBuildsComplete(self):
412 self.log.debug("Checking if all builds are complete")
413 waiting = False
James E. Blair4aea70c2012-07-26 14:23:24 -0700414 for pipeline in self.pipelines.values():
415 for build in pipeline.manager.building_jobs.keys():
416 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700417 waiting = True
418 if not waiting:
419 self.log.debug("All builds are complete")
420 return True
421 self.log.debug("All builds are not complete")
422 return False
423
James E. Blairee743612012-05-29 14:49:32 -0700424 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800425 if statsd:
426 self.log.debug("Statsd enabled")
427 else:
428 self.log.debug("Statsd disabled because python statsd "
429 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700430 while True:
431 self.log.debug("Run handler sleeping")
432 self.wake_event.wait()
433 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700434 if self._stopped:
435 return
James E. Blairee743612012-05-29 14:49:32 -0700436 self.log.debug("Run handler awake")
437 try:
James E. Blair263fba92013-02-27 13:07:19 -0800438 # Give result events priority -- they let us stop builds,
439 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700440 if not self.result_event_queue.empty():
441 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800442 elif not self._pause:
443 if not self.trigger_event_queue.empty():
444 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700445
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700446 if self._pause and self._areAllBuildsComplete():
447 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700448
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700449 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700450 if not (self.trigger_event_queue.empty() and
451 self.result_event_queue.empty()):
452 self.wake_event.set()
453 else:
454 if not self.result_event_queue.empty():
455 self.wake_event.set()
James E. Blairee743612012-05-29 14:49:32 -0700456 except:
457 self.log.exception("Exception in run handler:")
458
459 def process_event_queue(self):
460 self.log.debug("Fetching trigger event")
461 event = self.trigger_event_queue.get()
462 self.log.debug("Processing trigger event %s" % event)
463 project = self.projects.get(event.project_name)
464 if not project:
465 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800466 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700467 return
468
Antoine Mussofeba9672013-01-17 13:44:59 +0100469 # Preprocessing for ref-update events
470 if hasattr(event, 'refspec'):
471 # Make sure the local git repo is up-to-date with the remote one.
472 # We better have the new ref before enqueuing the changes.
473 # This is done before enqueuing the changes to avoid calling an
474 # update per pipeline accepting the change.
475 self.log.info("Fetching references for %s" % project)
476 self.merger.updateRepo(project)
477
James E. Blair4aea70c2012-07-26 14:23:24 -0700478 for pipeline in self.pipelines.values():
James E. Blair2fa50962013-01-30 21:50:41 -0800479 change = event.getChange(project, self.trigger)
480 if event.type == 'patchset-created':
481 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blair4aea70c2012-07-26 14:23:24 -0700482 if not pipeline.manager.eventMatches(event):
483 self.log.debug("Event %s ignored by %s" % (event, pipeline))
James E. Blairee743612012-05-29 14:49:32 -0700484 continue
James E. Blaire421a232012-07-25 16:59:21 -0700485 self.log.info("Adding %s, %s to %s" %
James E. Blair4aea70c2012-07-26 14:23:24 -0700486 (project, change, pipeline))
487 pipeline.manager.addChange(change)
James E. Blairff791972013-01-09 11:45:43 -0800488 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700489
James E. Blairee743612012-05-29 14:49:32 -0700490 def process_result_queue(self):
491 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700492 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700493 self.log.debug("Processing result event %s" % build)
James E. Blair4aea70c2012-07-26 14:23:24 -0700494 for pipeline in self.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700495 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700496 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800497 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700498 return
499 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700500 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800501 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700502 return
James E. Blairc84dd262012-05-31 10:03:13 -0700503 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800504 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700505
James E. Blair268d9342012-06-13 18:24:29 -0700506 def formatStatusHTML(self):
507 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700508 if self._pause:
509 ret += '<p><b>Queue only mode:</b> preparing to '
510 if self._reconfigure:
511 ret += 'reconfigure'
512 if self._exit:
513 ret += 'exit'
514 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
515 ret += '</p>'
516
James E. Blair4aea70c2012-07-26 14:23:24 -0700517 keys = self.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700518 keys.sort()
519 for key in keys:
James E. Blair4aea70c2012-07-26 14:23:24 -0700520 pipeline = self.pipelines[key]
521 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700522 ret += s + '\n'
523 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700524 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700525 ret += '\n'
526 ret += '</pre></html>'
527 return ret
528
James E. Blair8dbd56a2012-12-22 10:55:10 -0800529 def formatStatusJSON(self):
530 data = {}
531 if self._pause:
532 ret = '<p><b>Queue only mode:</b> preparing to '
533 if self._reconfigure:
534 ret += 'reconfigure'
535 if self._exit:
536 ret += 'exit'
537 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
538 ret += '</p>'
539 data['message'] = ret
540
James E. Blairfb682cc2013-02-26 15:23:27 -0800541 data['trigger_event_queue'] = {}
542 data['trigger_event_queue']['length'] = \
543 self.trigger_event_queue.qsize()
544 data['result_event_queue'] = {}
545 data['result_event_queue']['length'] = \
546 self.result_event_queue.qsize()
547
James E. Blair8dbd56a2012-12-22 10:55:10 -0800548 pipelines = []
549 data['pipelines'] = pipelines
550 keys = self.pipelines.keys()
551 keys.sort()
552 for key in keys:
553 pipeline = self.pipelines[key]
554 pipelines.append(pipeline.formatStatusJSON())
555 return json.dumps(data)
556
James E. Blair1e8dd892012-05-30 09:15:05 -0700557
James E. Blair4aea70c2012-07-26 14:23:24 -0700558class BasePipelineManager(object):
559 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700560
James E. Blair4aea70c2012-07-26 14:23:24 -0700561 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700562 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700563 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700564 self.building_jobs = {}
565 self.event_filters = []
566 self.success_action = {}
567 self.failure_action = {}
James E. Blairdc253862012-06-13 17:12:42 -0700568 self.start_action = {}
James E. Blair3c5e5b52013-04-26 11:17:03 -0700569 if self.sched.config and self.sched.config.has_option(
570 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700571 self.report_times = self.sched.config.getboolean(
572 'zuul', 'report_times')
573 else:
574 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700575
576 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700577 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700578
579 def _postConfig(self):
James E. Blair4aea70c2012-07-26 14:23:24 -0700580 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700581 self.log.info(" Events:")
582 for e in self.event_filters:
583 self.log.info(" %s" % e)
584 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700585
James E. Blairee743612012-05-29 14:49:32 -0700586 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700587 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700588 if tree.job:
589 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700590 for b in tree.job._branches:
591 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800592 for f in tree.job._files:
593 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700594 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700595 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700596 hold = ''
597 if tree.job.hold_following_changes:
598 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700599 voting = ''
600 if not tree.job.voting:
601 voting = ' [nonvoting]'
602 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
603 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700604 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700605 log_jobs(x, indent + 2)
606
James E. Blairee743612012-05-29 14:49:32 -0700607 for p in self.sched.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700608 tree = self.pipeline.getJobTree(p)
609 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700610 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700611 log_jobs(tree)
James E. Blairdc253862012-06-13 17:12:42 -0700612 if self.start_action:
613 self.log.info(" On start:")
614 self.log.info(" %s" % self.start_action)
James E. Blairee743612012-05-29 14:49:32 -0700615 if self.success_action:
616 self.log.info(" On success:")
617 self.log.info(" %s" % self.success_action)
618 if self.failure_action:
619 self.log.info(" On failure:")
620 self.log.info(" %s" % self.failure_action)
621
James E. Blaire421a232012-07-25 16:59:21 -0700622 def getSubmitAllowNeeds(self):
623 # Get a list of code review labels that are allowed to be
624 # "needed" in the submit records for a change, with respect
625 # to this queue. In other words, the list of review labels
626 # this queue itself is likely to set before submitting.
James E. Blair4aea70c2012-07-26 14:23:24 -0700627 if self.success_action:
628 return self.success_action.keys()
629 else:
630 return {}
James E. Blaire421a232012-07-25 16:59:21 -0700631
James E. Blairee743612012-05-29 14:49:32 -0700632 def eventMatches(self, event):
633 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700634 if ef.matches(event):
635 return True
636 return False
637
James E. Blair0dc8ba92012-07-16 14:23:52 -0700638 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700639 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700640 if change.equals(c):
641 return True
642 return False
643
James E. Blaire0487072012-08-29 17:38:31 -0700644 def reportStart(self, change):
645 try:
646 self.log.info("Reporting start, action %s change %s" %
647 (self.start_action, change))
648 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700649 if self.sched.config.has_option('zuul', 'status_url'):
650 msg += "\n" + self.sched.config.get('zuul', 'status_url')
James E. Blaire0487072012-08-29 17:38:31 -0700651 ret = self.sched.trigger.report(change, msg, self.start_action)
652 if ret:
653 self.log.error("Reporting change start %s received: %s" %
654 (change, ret))
655 except:
656 self.log.exception("Exception while reporting start:")
657
658 def isChangeReadyToBeEnqueued(self, change):
659 return True
660
661 def enqueueChangesAhead(self, change):
662 return True
663
664 def enqueueChangesBehind(self, change):
665 return True
666
James E. Blair2fa50962013-01-30 21:50:41 -0800667 def findOldVersionOfChangeAlreadyInQueue(self, change):
668 for c in self.pipeline.getChangesInQueue():
669 if change.isUpdateOf(c):
670 return c
671 return None
672
673 def removeOldVersionsOfChange(self, change):
674 if not self.pipeline.dequeue_on_new_patchset:
675 return
676 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
677 if old_change:
678 self.log.debug("Change %s is a new version of %s, removing %s" %
679 (change, old_change, old_change))
680 self.removeChange(old_change)
681 self.launchJobs()
682
James E. Blairee743612012-05-29 14:49:32 -0700683 def addChange(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700684 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700685 if self.isChangeAlreadyInQueue(change):
686 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700687 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700688
James E. Blaire0487072012-08-29 17:38:31 -0700689 if not self.isChangeReadyToBeEnqueued(change):
690 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
691 change)
692 return False
693
694 if not self.enqueueChangesAhead(change):
James E. Blair1490eba2013-03-06 19:14:00 -0800695 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700696 return False
697
698 if self.isChangeAlreadyInQueue(change):
699 self.log.debug("Change %s is already in queue, ignoring" % change)
700 return True
701
702 change_queue = self.pipeline.getQueue(change.project)
703 if change_queue:
704 self.log.debug("Adding change %s to queue %s" %
705 (change, change_queue))
706 if self.start_action:
707 self.reportStart(change)
708 change_queue.enqueueChange(change)
James E. Blair8fa16972013-01-15 16:57:20 -0800709 self.reportStats(change)
James E. Blaire0487072012-08-29 17:38:31 -0700710 self.enqueueChangesBehind(change)
711 else:
712 self.log.error("Unable to find change queue for project %s" %
713 change.project)
714 return False
715 self.launchJobs()
James E. Blairee743612012-05-29 14:49:32 -0700716
James E. Blair2fa50962013-01-30 21:50:41 -0800717 def cancelJobs(self, change, prime=True):
718 self.log.debug("Cancel jobs for change %s" % change)
719 to_remove = []
720 for build, build_change in self.building_jobs.items():
721 if build_change == change:
722 self.log.debug("Found build %s for change %s to cancel" %
723 (build, change))
724 try:
725 self.sched.launcher.cancel(build)
726 except:
727 self.log.exception("Exception while canceling build %s "
728 "for change %s" % (build, change))
729 to_remove.append(build)
730 for build in to_remove:
731 self.log.debug("Removing build %s from running builds" % build)
732 build.result = 'CANCELED'
733 del self.building_jobs[build]
734
735 def dequeueChange(self, change):
736 self.log.debug("Removing change %s from queue" % change)
737 change_queue = self.pipeline.getQueue(change.project)
738 change_queue.dequeueChange(change)
739
740 def removeChange(self, change):
741 # Remove a change from the queue, probably because it has been
742 # superceded by another change.
743 self.log.debug("Canceling builds behind change: %s because it is "
744 "being removed." % change)
745 self.cancelJobs(change)
746 self.dequeueChange(change)
747
James E. Blairdaabed22012-08-15 15:38:57 -0700748 def _launchJobs(self, change, jobs):
James E. Blairee743612012-05-29 14:49:32 -0700749 self.log.debug("Launching jobs for change %s" % change)
James E. Blair81515ad2012-10-01 18:29:08 -0700750 ref = change.current_build_set.ref
James E. Blairdaabed22012-08-15 15:38:57 -0700751 if hasattr(change, 'refspec') and not ref:
James E. Blair4886cc12012-07-18 15:39:41 -0700752 change.current_build_set.setConfiguration()
James E. Blair81515ad2012-10-01 18:29:08 -0700753 ref = change.current_build_set.ref
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800754 mode = model.MERGE_IF_NECESSARY
James E. Blair81515ad2012-10-01 18:29:08 -0700755 commit = self.sched.merger.mergeChanges([change], ref, mode=mode)
756 if not commit:
James E. Blair973721f2012-08-15 10:19:43 -0700757 self.log.info("Unable to merge change %s" % change)
758 self.pipeline.setUnableToMerge(change)
759 self.possiblyReportChange(change)
760 return
James E. Blair81515ad2012-10-01 18:29:08 -0700761 change.current_build_set.commit = commit
James E. Blair4aea70c2012-07-26 14:23:24 -0700762 for job in self.pipeline.findJobsToRun(change):
James E. Blairee743612012-05-29 14:49:32 -0700763 self.log.debug("Found job %s for change %s" % (job, change))
764 try:
James E. Blair03b94ef2012-08-20 10:54:29 -0700765 build = self.sched.launcher.launch(job, change, self.pipeline)
James E. Blairee743612012-05-29 14:49:32 -0700766 self.building_jobs[build] = change
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800767 self.log.debug("Adding build %s of job %s to change %s" %
768 (build, job, change))
James E. Blairee743612012-05-29 14:49:32 -0700769 change.addBuild(build)
770 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800771 self.log.exception("Exception while launching job %s "
772 "for change %s:" % (job, change))
James E. Blairee743612012-05-29 14:49:32 -0700773
James E. Blaire0487072012-08-29 17:38:31 -0700774 def launchJobs(self, change=None):
775 if not change:
776 for change in self.pipeline.getAllChanges():
777 self.launchJobs(change)
778 return
James E. Blairdaabed22012-08-15 15:38:57 -0700779 jobs = self.pipeline.findJobsToRun(change)
780 if jobs:
781 self._launchJobs(change, jobs)
782
James E. Blair11700c32012-07-05 17:50:05 -0700783 def updateBuildDescriptions(self, build_set):
784 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700785 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700786 self.sched.launcher.setBuildDescription(build, desc)
787
788 if build_set.previous_build_set:
789 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700790 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700791 self.sched.launcher.setBuildDescription(build, desc)
792
793 def onBuildStarted(self, build):
794 self.log.debug("Build %s started" % build)
795 if build not in self.building_jobs:
796 self.log.debug("Build %s not found" % (build))
797 # Or triggered externally, or triggered before zuul started,
798 # or restarted
799 return False
800
801 self.updateBuildDescriptions(build.build_set)
802 return True
803
James E. Blaire0487072012-08-29 17:38:31 -0700804 def handleFailedChange(self, change):
805 pass
806
James E. Blairee743612012-05-29 14:49:32 -0700807 def onBuildCompleted(self, build):
808 self.log.debug("Build %s completed" % build)
James E. Blair1e8dd892012-05-30 09:15:05 -0700809 if build not in self.building_jobs:
James E. Blairc84dd262012-05-31 10:03:13 -0700810 self.log.debug("Build %s not found" % (build))
James E. Blairee743612012-05-29 14:49:32 -0700811 # Or triggered externally, or triggered before zuul started,
812 # or restarted
813 return False
814 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800815 self.log.debug("Found change %s which triggered completed build %s" %
816 (change, build))
James E. Blairee743612012-05-29 14:49:32 -0700817
818 del self.building_jobs[build]
819
James E. Blair4aea70c2012-07-26 14:23:24 -0700820 self.pipeline.setResult(change, build)
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800821 self.log.info("Change %s status is now:\n %s" %
James E. Blaire0487072012-08-29 17:38:31 -0700822 (change, self.pipeline.formatStatus(change)))
James E. Blairee743612012-05-29 14:49:32 -0700823
James E. Blaire0487072012-08-29 17:38:31 -0700824 if self.pipeline.didAnyJobFail(change):
825 self.handleFailedChange(change)
James E. Blair11700c32012-07-05 17:50:05 -0700826
James E. Blaire0487072012-08-29 17:38:31 -0700827 self.reportChanges()
828 self.launchJobs()
James E. Blair11700c32012-07-05 17:50:05 -0700829 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -0700830 return True
831
James E. Blaire0487072012-08-29 17:38:31 -0700832 def reportChanges(self):
833 self.log.debug("Searching for changes to report")
834 reported = False
835 for change in self.pipeline.getAllChanges():
836 self.log.debug(" checking %s" % change)
837 if self.pipeline.areAllJobsComplete(change):
838 self.log.debug(" possibly reporting %s" % change)
839 if self.possiblyReportChange(change):
840 reported = True
841 if reported:
842 self.reportChanges()
843 self.log.debug("Done searching for changes to report")
844
James E. Blairee743612012-05-29 14:49:32 -0700845 def possiblyReportChange(self, change):
846 self.log.debug("Possibly reporting change %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700847 # Even if a change isn't reportable, keep going so that it
848 # gets dequeued in the normal manner.
849 if change.is_reportable and change.reported:
850 self.log.debug("Change %s already reported" % change)
851 return False
852 change_ahead = change.change_ahead
853 if not change_ahead:
854 self.log.debug("Change %s is at the front of the queue, "
855 "reporting" % (change))
856 ret = self.reportChange(change)
857 if self.changes_merge:
858 succeeded = self.pipeline.didAllJobsSucceed(change)
859 merged = (not ret)
860 if merged:
861 merged = self.sched.trigger.isMerged(change, change.branch)
862 self.log.info("Reported change %s status: all-succeeded: %s, "
863 "merged: %s" % (change, succeeded, merged))
864 if not (succeeded and merged):
865 self.log.debug("Reported change %s failed tests or failed "
866 "to merge" % (change))
867 self.handleFailedChange(change)
868 return True
869 self.log.debug("Removing reported change %s from queue" %
870 change)
871 change_queue = self.pipeline.getQueue(change.project)
872 change_queue.dequeueChange(change)
James E. Blair8fa16972013-01-15 16:57:20 -0800873 self.reportStats(change)
James E. Blaire0487072012-08-29 17:38:31 -0700874 return True
James E. Blairee743612012-05-29 14:49:32 -0700875
876 def reportChange(self, change):
James E. Blair4aea70c2012-07-26 14:23:24 -0700877 if not change.is_reportable:
878 return False
James E. Blair6173fb32012-07-11 17:23:33 -0700879 if change.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -0700880 return 0
James E. Blair4aea70c2012-07-26 14:23:24 -0700881 self.log.debug("Reporting change %s" % change)
James E. Blairee743612012-05-29 14:49:32 -0700882 ret = None
James E. Blair4aea70c2012-07-26 14:23:24 -0700883 if self.pipeline.didAllJobsSucceed(change):
James E. Blairee743612012-05-29 14:49:32 -0700884 action = self.success_action
James E. Blair11700c32012-07-05 17:50:05 -0700885 change.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -0700886 else:
887 action = self.failure_action
James E. Blair11700c32012-07-05 17:50:05 -0700888 change.setReportedResult('FAILURE')
James E. Blair8b0d4c42012-08-23 16:03:05 -0700889 report = self.formatReport(change)
James E. Blaire0487072012-08-29 17:38:31 -0700890 change.reported = True
James E. Blairee743612012-05-29 14:49:32 -0700891 try:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800892 self.log.info("Reporting change %s, action: %s" %
893 (change, action))
James E. Blair8b0d4c42012-08-23 16:03:05 -0700894 ret = self.sched.trigger.report(change, report, action)
James E. Blairee743612012-05-29 14:49:32 -0700895 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800896 self.log.error("Reporting change %s received: %s" %
897 (change, ret))
James E. Blairee743612012-05-29 14:49:32 -0700898 except:
899 self.log.exception("Exception while reporting:")
James E. Blair11700c32012-07-05 17:50:05 -0700900 change.setReportedResult('ERROR')
901 self.updateBuildDescriptions(change.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -0700902 return ret
903
James E. Blair8b0d4c42012-08-23 16:03:05 -0700904 def formatReport(self, changeish):
905 ret = ''
906 if self.pipeline.didAllJobsSucceed(changeish):
James E. Blair56370192013-01-14 15:47:28 -0800907 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -0700908 else:
James E. Blair56370192013-01-14 15:47:28 -0800909 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -0700910
911 if changeish.dequeued_needing_change:
912 ret += "This change depends on a change that failed to merge."
913 elif changeish.current_build_set.unable_to_merge:
914 ret += "This change was unable to be automatically merged "\
915 "with the current state of the repository. Please "\
916 "rebase your change and upload a new patchset."
917 else:
James E. Blaira35fcce2012-08-24 10:46:01 -0700918 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -0800919 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -0700920 else:
James E. Blair6aea36d2012-12-17 13:03:24 -0800921 url_pattern = None
James E. Blair8b0d4c42012-08-23 16:03:05 -0700922 for job in self.pipeline.getJobs(changeish):
923 build = changeish.current_build_set.getBuild(job.name)
924 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -0800925 pattern = url_pattern
926 if result == 'SUCCESS':
927 if job.success_message:
928 result = job.success_message
929 if job.success_pattern:
930 pattern = job.success_pattern
931 elif result == 'FAILURE':
932 if job.failure_message:
933 result = job.failure_message
934 if job.failure_pattern:
935 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -0700936 if pattern:
937 url = pattern.format(change=changeish,
938 pipeline=self.pipeline,
939 job=job,
940 build=build)
941 else:
942 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -0700943 if not job.voting:
944 voting = ' (non-voting)'
945 else:
946 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -0700947 if self.report_times and build.end_time and build.start_time:
948 dt = int(build.end_time - build.start_time)
949 m, s = divmod(dt, 60)
950 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -0400951 if h:
952 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
953 elif m:
954 elapsed = ' in %dm %02ds' % (m, s)
955 else:
956 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -0700957 else:
958 elapsed = ''
959 ret += '- %s : %s%s%s\n' % (url, result, elapsed, voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -0700960 return ret
961
962 def formatDescription(self, build):
963 concurrent_changes = ''
964 concurrent_builds = ''
965 other_builds = ''
966
967 for change in build.build_set.other_changes:
968 concurrent_changes += '<li><a href="{change.url}">\
969 {change.number},{change.patchset}</a></li>'.format(
970 change=change)
971
972 change = build.build_set.change
973
974 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -0700975 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -0700976 concurrent_builds += """\
977<li>
Ori Livneh7191ee82013-05-02 19:13:53 -0700978 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -0700979 {build.job.name} #{build.number}</a>: {build.result}
980</li>
981""".format(build=build)
982 else:
983 concurrent_builds += """\
984<li>
985 {build.job.name}: {build.result}
986</li>""".format(build=build)
987
988 if build.build_set.previous_build_set:
989 other_build = build.build_set.previous_build_set.getBuild(
990 build.job.name)
991 if other_build:
992 other_builds += """\
993<li>
Ori Livneh7191ee82013-05-02 19:13:53 -0700994 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -0700995 {build.job.name} #{build.number}</a>
996</li>
997""".format(build=other_build)
998
999 if build.build_set.next_build_set:
1000 other_build = build.build_set.next_build_set.getBuild(
1001 build.job.name)
1002 if other_build:
1003 other_builds += """\
1004<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001005 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001006 {build.job.name} #{build.number}</a>
1007</li>
1008""".format(build=other_build)
1009
1010 result = build.build_set.result
1011
1012 if hasattr(change, 'number'):
1013 ret = """\
1014<p>
1015 Triggered by change:
1016 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1017 Branch: <b>{change.branch}</b><br/>
1018 Pipeline: <b>{self.pipeline.name}</b>
1019</p>"""
1020 else:
1021 ret = """\
1022<p>
1023 Triggered by reference:
1024 {change.ref}</a><br/>
1025 Old revision: <b>{change.oldrev}</b><br/>
1026 New revision: <b>{change.newrev}</b><br/>
1027 Pipeline: <b>{self.pipeline.name}</b>
1028</p>"""
1029
1030 if concurrent_changes:
1031 ret += """\
1032<p>
1033 Other changes tested concurrently with this change:
1034 <ul>{concurrent_changes}</ul>
1035</p>
1036"""
1037 if concurrent_builds:
1038 ret += """\
1039<p>
1040 All builds for this change set:
1041 <ul>{concurrent_builds}</ul>
1042</p>
1043"""
1044
1045 if other_builds:
1046 ret += """\
1047<p>
1048 Other build sets for this change:
1049 <ul>{other_builds}</ul>
1050</p>
1051"""
1052 if result:
1053 ret += """\
1054<p>
1055 Reported result: <b>{result}</b>
1056</p>
1057"""
1058
1059 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001060 return ret
1061
James E. Blair8fa16972013-01-15 16:57:20 -08001062 def reportStats(self, change):
1063 if not statsd:
1064 return
1065 try:
1066 # Update the guage on enqueue and dequeue, but timers only
1067 # when dequeing.
1068 if change.dequeue_time:
1069 dt = int((change.dequeue_time - change.enqueue_time) * 1000)
1070 else:
1071 dt = None
1072 changes = len(self.pipeline.getAllChanges())
1073
1074 # stats.timers.zuul.pipeline.NAME.resident_time
1075 # stats_counts.zuul.pipeline.NAME.total_changes
1076 # stats.gauges.zuul.pipeline.NAME.current_changes
1077 key = 'zuul.pipeline.%s' % self.pipeline.name
1078 statsd.gauge(key + '.current_changes', changes)
1079 if dt:
1080 statsd.timing(key + '.resident_time', dt)
1081 statsd.incr(key + '.total_changes')
1082
1083 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1084 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
1085 project_name = change.project.name.replace('/', '.')
1086 key += '.%s' % project_name
1087 if dt:
1088 statsd.timing(key + '.resident_time', dt)
1089 statsd.incr(key + '.total_changes')
1090 except:
1091 self.log.exception("Exception reporting pipeline stats")
1092
James E. Blair1e8dd892012-05-30 09:15:05 -07001093
James E. Blair4aea70c2012-07-26 14:23:24 -07001094class IndependentPipelineManager(BasePipelineManager):
1095 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001096 changes_merge = False
1097
1098 def _postConfig(self):
1099 super(IndependentPipelineManager, self)._postConfig()
1100
1101 change_queue = ChangeQueue(self.pipeline, dependent=False)
1102 for project in self.pipeline.getProjects():
1103 change_queue.addProject(project)
1104
1105 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001106
James E. Blair1e8dd892012-05-30 09:15:05 -07001107
James E. Blair4aea70c2012-07-26 14:23:24 -07001108class DependentPipelineManager(BasePipelineManager):
1109 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001110 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001111
1112 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001113 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001114
1115 def _postConfig(self):
James E. Blair4aea70c2012-07-26 14:23:24 -07001116 super(DependentPipelineManager, self)._postConfig()
James E. Blairee743612012-05-29 14:49:32 -07001117 self.buildChangeQueues()
1118
1119 def buildChangeQueues(self):
1120 self.log.debug("Building shared change queues")
1121 change_queues = []
1122
James E. Blair4aea70c2012-07-26 14:23:24 -07001123 for project in self.pipeline.getProjects():
1124 change_queue = ChangeQueue(self.pipeline)
1125 change_queue.addProject(project)
1126 change_queues.append(change_queue)
1127 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001128
1129 self.log.debug("Combining shared queues")
1130 new_change_queues = []
1131 for a in change_queues:
1132 merged_a = False
1133 for b in new_change_queues:
1134 if not a.getJobs().isdisjoint(b.getJobs()):
1135 self.log.debug("Merging queue %s into %s" % (a, b))
1136 b.mergeChangeQueue(a)
1137 merged_a = True
1138 break # this breaks out of 'for b' and continues 'for a'
1139 if not merged_a:
1140 self.log.debug("Keeping queue %s" % (a))
1141 new_change_queues.append(a)
James E. Blair1e8dd892012-05-30 09:15:05 -07001142
James E. Blairee743612012-05-29 14:49:32 -07001143 self.log.info(" Shared change queues:")
James E. Blaire0487072012-08-29 17:38:31 -07001144 for queue in new_change_queues:
1145 self.pipeline.addQueue(queue)
1146 self.log.info(" %s" % queue)
James E. Blairee743612012-05-29 14:49:32 -07001147
James E. Blaire0487072012-08-29 17:38:31 -07001148 def isChangeReadyToBeEnqueued(self, change):
1149 if not self.sched.trigger.canMerge(change,
1150 self.getSubmitAllowNeeds()):
1151 self.log.debug("Change %s can not merge, ignoring" % change)
1152 return False
1153 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001154
James E. Blaire0487072012-08-29 17:38:31 -07001155 def enqueueChangesBehind(self, change):
1156 to_enqueue = []
1157 self.log.debug("Checking for changes needing %s:" % change)
1158 if not hasattr(change, 'needed_by_changes'):
1159 self.log.debug(" Changeish does not support dependencies")
1160 return
1161 for needs in change.needed_by_changes:
1162 if self.sched.trigger.canMerge(needs,
1163 self.getSubmitAllowNeeds()):
1164 self.log.debug(" Change %s needs %s and is ready to merge" %
1165 (needs, change))
1166 to_enqueue.append(needs)
1167 if not to_enqueue:
1168 self.log.debug(" No changes need %s" % change)
1169
1170 for other_change in to_enqueue:
1171 self.addChange(other_change)
1172
1173 def enqueueChangesAhead(self, change):
1174 ret = self.checkForChangesNeededBy(change)
1175 if ret in [True, False]:
1176 return ret
1177 self.log.debug(" Change %s must be merged ahead of %s" %
1178 (ret, change))
1179 return self.addChange(ret)
1180
1181 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001182 self.log.debug("Checking for changes needed by %s:" % change)
1183 # Return true if okay to proceed enqueing this change,
1184 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001185 if not hasattr(change, 'needs_change'):
1186 self.log.debug(" Changeish does not support dependencies")
1187 return True
James E. Blaire421a232012-07-25 16:59:21 -07001188 if not change.needs_change:
1189 self.log.debug(" No changes needed")
1190 return True
1191 if change.needs_change.is_merged:
1192 self.log.debug(" Needed change is merged")
1193 return True
1194 if not change.needs_change.is_current_patchset:
1195 self.log.debug(" Needed change is not the current patchset")
1196 return False
James E. Blair127bc182012-08-28 15:55:15 -07001197 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001198 self.log.debug(" Needed change is already ahead in the queue")
1199 return True
James E. Blaire0487072012-08-29 17:38:31 -07001200 if self.sched.trigger.canMerge(change.needs_change,
1201 self.getSubmitAllowNeeds()):
1202 self.log.debug(" Change %s is needed" %
1203 change.needs_change)
1204 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001205 # The needed change can't be merged.
1206 self.log.debug(" Change %s is needed but can not be merged" %
1207 change.needs_change)
1208 return False
1209
James E. Blairee743612012-05-29 14:49:32 -07001210 def _getDependentChanges(self, change):
James E. Blair9f9667e2012-06-12 17:51:08 -07001211 orig_change = change
James E. Blairee743612012-05-29 14:49:32 -07001212 changes = []
1213 while change.change_ahead:
1214 changes.append(change.change_ahead)
1215 change = change.change_ahead
James E. Blair9f9667e2012-06-12 17:51:08 -07001216 self.log.info("Change %s depends on changes %s" % (orig_change,
1217 changes))
James E. Blairee743612012-05-29 14:49:32 -07001218 return changes
1219
Clark Boylanc2592322013-02-20 17:12:28 -08001220 def _unableToMerge(self, change, all_changes):
1221 self.log.info("Unable to merge changes %s" % all_changes)
1222 self.pipeline.setUnableToMerge(change)
1223 self.possiblyReportChange(change)
1224
James E. Blairdaabed22012-08-15 15:38:57 -07001225 def _launchJobs(self, change, jobs):
James E. Blairee743612012-05-29 14:49:32 -07001226 self.log.debug("Launching jobs for change %s" % change)
James E. Blair81515ad2012-10-01 18:29:08 -07001227 ref = change.current_build_set.ref
James E. Blairdaabed22012-08-15 15:38:57 -07001228 if hasattr(change, 'refspec') and not ref:
James E. Blair4886cc12012-07-18 15:39:41 -07001229 change.current_build_set.setConfiguration()
James E. Blair81515ad2012-10-01 18:29:08 -07001230 ref = change.current_build_set.ref
James E. Blair4886cc12012-07-18 15:39:41 -07001231 dependent_changes = self._getDependentChanges(change)
1232 dependent_changes.reverse()
James E. Blair973721f2012-08-15 10:19:43 -07001233 all_changes = dependent_changes + [change]
Clark Boylanc2592322013-02-20 17:12:28 -08001234 if (dependent_changes and
1235 not dependent_changes[-1].current_build_set.commit):
1236 self._unableToMerge(change, all_changes)
James E. Blair973721f2012-08-15 10:19:43 -07001237 return
Clark Boylanc2592322013-02-20 17:12:28 -08001238 commit = self.sched.merger.mergeChanges(all_changes, ref)
James E. Blair81515ad2012-10-01 18:29:08 -07001239 change.current_build_set.commit = commit
Clark Boylanc2592322013-02-20 17:12:28 -08001240 if not commit:
1241 self._unableToMerge(change, all_changes)
1242 return
James E. Blair4886cc12012-07-18 15:39:41 -07001243 #TODO: remove this line after GERRIT_CHANGES is gone
James E. Blairee743612012-05-29 14:49:32 -07001244 dependent_changes = self._getDependentChanges(change)
James E. Blairdaabed22012-08-15 15:38:57 -07001245 for job in jobs:
James E. Blairee743612012-05-29 14:49:32 -07001246 self.log.debug("Found job %s for change %s" % (job, change))
1247 try:
James E. Blair4886cc12012-07-18 15:39:41 -07001248 #TODO: remove dependent_changes after GERRIT_CHANGES is gone
James E. Blair03b94ef2012-08-20 10:54:29 -07001249 build = self.sched.launcher.launch(job, change, self.pipeline,
James E. Blairee743612012-05-29 14:49:32 -07001250 dependent_changes)
1251 self.building_jobs[build] = change
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001252 self.log.debug("Adding build %s of job %s to change %s" %
1253 (build, job, change))
James E. Blairee743612012-05-29 14:49:32 -07001254 change.addBuild(build)
1255 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001256 self.log.exception("Exception while launching job %s "
1257 "for change %s:" % (job, change))
James E. Blairdaabed22012-08-15 15:38:57 -07001258
Clark Boylan826ef9e2012-07-12 16:44:46 -07001259 def cancelJobs(self, change, prime=True):
James E. Blairee743612012-05-29 14:49:32 -07001260 self.log.debug("Cancel jobs for change %s" % change)
1261 to_remove = []
Clark Boylan826ef9e2012-07-12 16:44:46 -07001262 if prime:
1263 change.resetAllBuilds()
James E. Blairee743612012-05-29 14:49:32 -07001264 for build, build_change in self.building_jobs.items():
1265 if build_change == change:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001266 self.log.debug("Found build %s for change %s to cancel" %
1267 (build, change))
James E. Blairee743612012-05-29 14:49:32 -07001268 try:
1269 self.sched.launcher.cancel(build)
1270 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001271 self.log.exception("Exception while canceling build %s "
1272 "for change %s" % (build, change))
James E. Blairee743612012-05-29 14:49:32 -07001273 to_remove.append(build)
1274 for build in to_remove:
1275 self.log.debug("Removing build %s from running builds" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001276 build.result = 'CANCELED'
James E. Blairee743612012-05-29 14:49:32 -07001277 del self.building_jobs[build]
1278 if change.change_behind:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001279 self.log.debug("Canceling jobs for change %s, behind change %s" %
1280 (change.change_behind, change))
Clark Boylan826ef9e2012-07-12 16:44:46 -07001281 self.cancelJobs(change.change_behind, prime=prime)
1282
James E. Blair2fa50962013-01-30 21:50:41 -08001283 def removeChange(self, change):
1284 # Remove a change from the queue (even the middle), probably
1285 # because it has been superceded by another change (or
1286 # otherwise will not merge).
1287 self.log.debug("Canceling builds behind change: %s because it is "
1288 "being removed." % change)
James E. Blair4b22c332013-02-26 14:36:58 -08001289 self.cancelJobs(change)
James E. Blair2fa50962013-01-30 21:50:41 -08001290 self.dequeueChange(change, keep_severed_heads=False)
1291
James E. Blaire0487072012-08-29 17:38:31 -07001292 def handleFailedChange(self, change):
1293 # A build failed. All changes behind this change will need to
1294 # be retested. To free up resources cancel the builds behind
1295 # this one as they will be rerun anyways.
1296 change_ahead = change.change_ahead
1297 change_behind = change.change_behind
1298 if not change_ahead:
1299 # If we're at the head of the queue, allow changes to relaunch
1300 if change_behind:
James E. Blairec590122012-08-22 15:19:31 -07001301 self.log.info("Canceling/relaunching jobs for change %s "
1302 "behind failed change %s" %
1303 (change_behind, change))
1304 self.cancelJobs(change_behind)
James E. Blaire0487072012-08-29 17:38:31 -07001305 self.dequeueChange(change)
Clark Boylanafd18ac2012-08-22 12:59:32 -07001306 elif change_behind:
James E. Blaire0487072012-08-29 17:38:31 -07001307 self.log.debug("Canceling builds behind change: %s due to "
1308 "failure." % change)
1309 self.cancelJobs(change_behind, prime=False)
James E. Blair268d9342012-06-13 18:24:29 -07001310
James E. Blair2fa50962013-01-30 21:50:41 -08001311 def dequeueChange(self, change, keep_severed_heads=True):
James E. Blaire0487072012-08-29 17:38:31 -07001312 self.log.debug("Removing change %s from queue" % change)
1313 change_ahead = change.change_ahead
1314 change_behind = change.change_behind
1315 change_queue = self.pipeline.getQueue(change.project)
1316 change_queue.dequeueChange(change)
James E. Blair2fa50962013-01-30 21:50:41 -08001317 if keep_severed_heads and not change_ahead and not change.reported:
James E. Blaire0487072012-08-29 17:38:31 -07001318 self.log.debug("Adding %s as a severed head" % change)
1319 change_queue.addSeveredHead(change)
1320 self.dequeueDependentChanges(change_behind)
1321
1322 def dequeueDependentChanges(self, change):
James E. Blaircaec0c52012-08-22 14:52:22 -07001323 # When a change is dequeued after failing, dequeue any changes that
1324 # depend on it.
James E. Blaircaec0c52012-08-22 14:52:22 -07001325 while change:
1326 change_behind = change.change_behind
James E. Blaire0487072012-08-29 17:38:31 -07001327 if self.checkForChangesNeededBy(change) is not True:
James E. Blaircaec0c52012-08-22 14:52:22 -07001328 # It's not okay to enqueue this change, we should remove it.
James E. Blaircaec0c52012-08-22 14:52:22 -07001329 self.log.info("Dequeuing change %s because "
1330 "it can no longer merge" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001331 change_queue = self.pipeline.getQueue(change.project)
1332 change_queue.dequeueChange(change)
James E. Blaircaec0c52012-08-22 14:52:22 -07001333 self.pipeline.setDequeuedNeedingChange(change)
1334 self.reportChange(change)
1335 # We don't need to recurse, because any changes that might
1336 # be affected by the removal of this change are behind us
1337 # in the queue, so we can continue walking backwards.
1338 change = change_behind