blob: c19db203f3c8178ebd53d3015b03425708ce27d2 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
25import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080027import yaml
James E. Blairee743612012-05-29 14:49:32 -070028
James E. Blair47958382013-01-10 17:26:02 -080029import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070030import model
James E. Blaireff88162013-07-01 12:44:14 -040031from model import Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070032import merger
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair71e94122012-12-24 17:53:08 -080034statsd = extras.try_import('statsd.statsd')
35
James E. Blair1e8dd892012-05-30 09:15:05 -070036
Antoine Musso80edd5a2013-02-13 15:37:53 +010037def deep_format(obj, paramdict):
38 """Apply the paramdict via str.format() to all string objects found within
39 the supplied obj. Lists and dicts are traversed recursively.
40
41 Borrowed from Jenkins Job Builder project"""
42 if isinstance(obj, str):
43 ret = obj.format(**paramdict)
44 elif isinstance(obj, list):
45 ret = []
46 for item in obj:
47 ret.append(deep_format(item, paramdict))
48 elif isinstance(obj, dict):
49 ret = {}
50 for item in obj:
51 exp_item = item.format(**paramdict)
52
53 ret[exp_item] = deep_format(obj[item], paramdict)
54 else:
55 ret = obj
56 return ret
57
58
James E. Blairfee8d652013-06-07 08:57:52 -070059class MergeFailure(Exception):
60 pass
61
62
James E. Blaire9d45c32012-05-31 09:56:45 -070063class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -070064 log = logging.getLogger("zuul.Scheduler")
65
James E. Blaire9d45c32012-05-31 09:56:45 -070066 def __init__(self):
67 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -040068 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -070069 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -070070 self.layout_lock = threading.Lock()
James E. Blaire9d45c32012-05-31 09:56:45 -070071 self.reconfigure_complete_event = threading.Event()
James E. Blair5d5bc2b2012-07-06 10:24:01 -070072 self._pause = False
73 self._reconfigure = False
74 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -070075 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -070076 self.launcher = None
77 self.trigger = None
James E. Blair3c5e5b52013-04-26 11:17:03 -070078 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -070079 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -070080
81 self.trigger_event_queue = Queue.Queue()
82 self.result_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -040083 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -070084
James E. Blairb0fcae42012-07-17 11:12:10 -070085 def stop(self):
86 self._stopped = True
87 self.wake_event.set()
88
James E. Blair47958382013-01-10 17:26:02 -080089 def testConfig(self, config_path):
James E. Blair47958382013-01-10 17:26:02 -080090 self._parseConfig(config_path)
91
James E. Blaire5a847f2012-07-10 15:29:14 -070092 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -040093 layout = model.Layout()
94 project_templates = {}
95
James E. Blairee743612012-05-29 14:49:32 -070096 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -070097 if not item:
98 return []
James E. Blair32663402012-06-01 10:04:18 -070099 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700100 return item
101 return [item]
102
James E. Blaire5a847f2012-07-10 15:29:14 -0700103 if config_path:
104 config_path = os.path.expanduser(config_path)
105 if not os.path.exists(config_path):
106 raise Exception("Unable to read layout config file at %s" %
107 config_path)
108 config_file = open(config_path)
109 data = yaml.load(config_file)
110
James E. Blair47958382013-01-10 17:26:02 -0800111 validator = layoutvalidator.LayoutValidator()
112 validator.validate(data)
113
James E. Blaireff88162013-07-01 12:44:14 -0400114 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700115 for include in data.get('includes', []):
116 if 'python-file' in include:
117 fn = include['python-file']
118 if not os.path.isabs(fn):
119 base = os.path.dirname(config_path)
120 fn = os.path.join(base, fn)
121 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400122 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700123
James E. Blair4aea70c2012-07-26 14:23:24 -0700124 for conf_pipeline in data.get('pipelines', []):
125 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800126 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700127 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
128 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800129 pipeline.failure_message = conf_pipeline.get('failure-message',
130 "Build failed.")
131 pipeline.success_message = conf_pipeline.get('success-message',
132 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800133 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700134 'dequeue-on-new-patchset', True)
135 pipeline.dequeue_on_conflict = conf_pipeline.get(
136 'dequeue-on-conflict', True)
James E. Blair4aea70c2012-07-26 14:23:24 -0700137 manager = globals()[conf_pipeline['manager']](self, pipeline)
138 pipeline.setManager(manager)
139
James E. Blaireff88162013-07-01 12:44:14 -0400140 layout.pipelines[conf_pipeline['name']] = pipeline
James E. Blair4aea70c2012-07-26 14:23:24 -0700141 manager.success_action = conf_pipeline.get('success')
142 manager.failure_action = conf_pipeline.get('failure')
143 manager.start_action = conf_pipeline.get('start')
144 for trigger in toList(conf_pipeline['trigger']):
James E. Blairee743612012-05-29 14:49:32 -0700145 approvals = {}
146 for approval_dict in toList(trigger.get('approval')):
147 for k, v in approval_dict.items():
James E. Blair1e8dd892012-05-30 09:15:05 -0700148 approvals[k] = v
James E. Blairee743612012-05-29 14:49:32 -0700149 f = EventFilter(types=toList(trigger['event']),
150 branches=toList(trigger.get('branch')),
151 refs=toList(trigger.get('ref')),
Clark Boylanb9bcb402012-06-29 17:44:05 -0700152 approvals=approvals,
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800153 comment_filters=
Antoine Mussob4e809e2012-12-06 16:58:06 +0100154 toList(trigger.get('comment_filter')),
155 email_filters=
156 toList(trigger.get('email_filter')))
James E. Blairee743612012-05-29 14:49:32 -0700157 manager.event_filters.append(f)
158
Antoine Musso80edd5a2013-02-13 15:37:53 +0100159 for project_template in data.get('project-templates', []):
160 # Make sure the template only contains valid pipelines
161 tpl = dict(
162 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400163 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100164 if pipe_name in project_template
165 )
James E. Blaireff88162013-07-01 12:44:14 -0400166 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100167
James E. Blair47958382013-01-10 17:26:02 -0800168 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400169 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700170 # Be careful to only set attributes explicitly present on
171 # this job, to avoid squashing attributes set by a meta-job.
172 m = config_job.get('failure-message', None)
173 if m:
174 job.failure_message = m
175 m = config_job.get('success-message', None)
176 if m:
177 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800178 m = config_job.get('failure-pattern', None)
179 if m:
180 job.failure_pattern = m
181 m = config_job.get('success-pattern', None)
182 if m:
183 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700184 m = config_job.get('hold-following-changes', False)
185 if m:
186 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700187 m = config_job.get('voting', None)
188 if m is not None:
189 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700190 fname = config_job.get('parameter-function', None)
191 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400192 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700193 if not func:
194 raise Exception("Unable to find function %s" % fname)
195 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700196 branches = toList(config_job.get('branch'))
197 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700198 job._branches = branches
199 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800200 files = toList(config_job.get('files'))
201 if files:
202 job._files = files
203 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700204
205 def add_jobs(job_tree, config_jobs):
206 for job in config_jobs:
207 if isinstance(job, list):
208 for x in job:
209 add_jobs(job_tree, x)
210 if isinstance(job, dict):
211 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400212 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700213 add_jobs(parent_tree, children)
214 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400215 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700216
James E. Blair47958382013-01-10 17:26:02 -0800217 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700218 project = Project(config_project['name'])
Antoine Musso80edd5a2013-02-13 15:37:53 +0100219
220 for requested_template in config_project.get('template', []):
221 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400222 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100223 requested_template.get('name'))
224 # Expand it with the project context
225 expanded = deep_format(tpl, requested_template)
226 # Finally merge the expansion with whatever has been already
227 # defined for this project
228 config_project.update(expanded)
229
James E. Blaireff88162013-07-01 12:44:14 -0400230 layout.projects[config_project['name']] = project
James E. Blair4886cc12012-07-18 15:39:41 -0700231 mode = config_project.get('merge-mode')
232 if mode and mode == 'cherry-pick':
233 project.merge_mode = model.CHERRY_PICK
James E. Blaireff88162013-07-01 12:44:14 -0400234 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700235 if pipeline.name in config_project:
236 job_tree = pipeline.addProject(project)
237 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700238 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700239
James E. Blairb0954652012-06-01 11:32:01 -0700240 # All jobs should be defined at this point, get rid of
241 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700242 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700243
James E. Blaireff88162013-07-01 12:44:14 -0400244 for pipeline in layout.pipelines.values():
245 pipeline.manager._postConfig(layout)
246
247 return layout
James E. Blairee743612012-05-29 14:49:32 -0700248
James E. Blair47958382013-01-10 17:26:02 -0800249 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700250 if self.config.has_option('zuul', 'git_dir'):
251 merge_root = self.config.get('zuul', 'git_dir')
252 else:
253 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800254
Paul Belangerb67aba12013-05-13 19:22:14 -0400255 if self.config.has_option('zuul', 'git_user_email'):
256 merge_email = self.config.get('zuul', 'git_user_email')
257 else:
258 merge_email = None
259
260 if self.config.has_option('zuul', 'git_user_name'):
261 merge_name = self.config.get('zuul', 'git_user_name')
262 else:
263 merge_name = None
264
James E. Blairceabcbc2012-08-17 13:48:46 -0700265 if self.config.has_option('zuul', 'push_change_refs'):
266 push_refs = self.config.getboolean('zuul', 'push_change_refs')
267 else:
268 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800269
James E. Blairad615012012-11-30 16:14:21 -0800270 if self.config.has_option('gerrit', 'sshkey'):
271 sshkey = self.config.get('gerrit', 'sshkey')
272 else:
273 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800274
James E. Blairad615012012-11-30 16:14:21 -0800275 self.merger = merger.Merger(self.trigger, merge_root, push_refs,
Paul Belangerb67aba12013-05-13 19:22:14 -0400276 sshkey, merge_email, merge_name)
James E. Blaireff88162013-07-01 12:44:14 -0400277 for project in self.layout.projects.values():
James E. Blair4886cc12012-07-18 15:39:41 -0700278 url = self.trigger.getGitUrl(project)
279 self.merger.addProject(project, url)
280
James E. Blairee743612012-05-29 14:49:32 -0700281 def setLauncher(self, launcher):
282 self.launcher = launcher
283
284 def setTrigger(self, trigger):
285 self.trigger = trigger
286
James E. Blaircdccd972013-07-01 12:10:22 -0700287 def getProject(self, name):
288 self.layout_lock.acquire()
289 p = None
290 try:
291 p = self.layout.projects.get(name)
292 finally:
293 self.layout_lock.release()
294 return p
295
James E. Blairee743612012-05-29 14:49:32 -0700296 def addEvent(self, event):
297 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800298 try:
299 if statsd:
300 statsd.incr('gerrit.event.%s' % event.type)
301 except:
302 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700303 self.trigger_event_queue.put(event)
304 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800305 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700306
James E. Blair11700c32012-07-05 17:50:05 -0700307 def onBuildStarted(self, build):
308 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800309 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700310 self.result_event_queue.put(('started', build))
311 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800312 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700313
James E. Blairee743612012-05-29 14:49:32 -0700314 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700315 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800316 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800317 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700318 if statsd and build.pipeline:
319 jobname = build.job.name.replace('.', '_')
320 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
321 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800322 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
323 dt = int((build.end_time - build.start_time) * 1000)
324 statsd.timing(key, dt)
325 statsd.incr(key)
326 except:
327 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700328 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700329 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800330 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700331
James E. Blaire9d45c32012-05-31 09:56:45 -0700332 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700333 self.log.debug("Prepare to reconfigure")
James E. Blaire9d45c32012-05-31 09:56:45 -0700334 self.config = config
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700335 self._reconfigure = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700336 self.wake_event.set()
337 self.log.debug("Waiting for reconfiguration")
338 self.reconfigure_complete_event.wait()
339 self.reconfigure_complete_event.clear()
340 self.log.debug("Reconfiguration complete")
341
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700342 def exit(self):
343 self.log.debug("Prepare to exit")
344 self._pause = True
345 self._exit = True
346 self.wake_event.set()
347 self.log.debug("Waiting for exit")
348
349 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700350 if self.config.has_option('zuul', 'state_dir'):
351 state_dir = os.path.expanduser(self.config.get('zuul',
352 'state_dir'))
353 else:
354 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700355 return os.path.join(state_dir, 'queue.pickle')
356
357 def _save_queue(self):
358 pickle_file = self._get_queue_pickle_file()
359 events = []
360 while not self.trigger_event_queue.empty():
361 events.append(self.trigger_event_queue.get())
362 self.log.debug("Queue length is %s" % len(events))
363 if events:
364 self.log.debug("Saving queue")
365 pickle.dump(events, open(pickle_file, 'wb'))
366
367 def _load_queue(self):
368 pickle_file = self._get_queue_pickle_file()
369 if os.path.exists(pickle_file):
370 self.log.debug("Loading queue")
371 events = pickle.load(open(pickle_file, 'rb'))
372 self.log.debug("Queue length is %s" % len(events))
373 for event in events:
374 self.trigger_event_queue.put(event)
375 else:
376 self.log.debug("No queue file found")
377
378 def _delete_queue(self):
379 pickle_file = self._get_queue_pickle_file()
380 if os.path.exists(pickle_file):
381 self.log.debug("Deleting saved queue")
382 os.unlink(pickle_file)
383
384 def resume(self):
385 try:
386 self._load_queue()
387 except:
388 self.log.exception("Unable to load queue")
389 try:
390 self._delete_queue()
391 except:
392 self.log.exception("Unable to delete saved queue")
393 self.log.debug("Resuming queue processing")
394 self.wake_event.set()
395
396 def _doPauseEvent(self):
397 if self._exit:
398 self.log.debug("Exiting")
399 self._save_queue()
400 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700401
402 def _doReconfigureEvent(self):
403 # This is called in the scheduler loop after another thread sets
404 # the reconfigure flag
405 self.layout_lock.acquire()
406 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700407 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700408 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400409 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700410 for name, new_pipeline in layout.pipelines.items():
411 old_pipeline = self.layout.pipelines.get(name)
412 if not old_pipeline:
413 if self.layout.pipelines:
414 # Don't emit this warning on startup
415 self.log.warning("No old pipeline matching %s found "
416 "when reconfiguring" % name)
417 continue
418 self.log.debug("Re-enqueueing changes for pipeline %s" %
419 name)
420 items_to_remove = []
421 for shared_queue in old_pipeline.queues:
422 for item in (shared_queue.queue +
423 shared_queue.severed_heads):
424 item.item_ahead = None
425 item.item_behind = None
426 item.pipeline = None
427 project = layout.projects.get(item.change.project.name)
428 if not project:
429 self.log.warning("Unable to find project for "
430 "change %s while reenqueueing" %
431 item.change)
432 item.change.project = None
433 items_to_remove.append(item)
434 continue
435 item.change.project = project
436 severed = item in shared_queue.severed_heads
James E. Blair78e31b32013-07-09 09:11:34 -0700437 if not new_pipeline.manager.reEnqueueItem(
438 item, severed=severed):
James E. Blaircdccd972013-07-01 12:10:22 -0700439 items_to_remove.append(item)
440 builds_to_remove = []
441 for build, item in old_pipeline.manager.building_jobs.items():
442 if item in items_to_remove:
443 builds_to_remove.append(build)
444 self.log.warning("Deleting running build %s for "
445 "change %s while reenqueueing" % (
446 build, item.change))
447 for build in builds_to_remove:
448 del old_pipeline.manager.building_jobs[build]
449 new_pipeline.manager.building_jobs = \
450 old_pipeline.manager.building_jobs
451 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800452 self._setupMerger()
James E. Blaire0487072012-08-29 17:38:31 -0700453 self._reconfigure = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700454 self.reconfigure_complete_event.set()
James E. Blaircdccd972013-07-01 12:10:22 -0700455 finally:
456 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700457
458 def _areAllBuildsComplete(self):
459 self.log.debug("Checking if all builds are complete")
460 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400461 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700462 for build in pipeline.manager.building_jobs.keys():
463 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700464 waiting = True
465 if not waiting:
466 self.log.debug("All builds are complete")
467 return True
468 self.log.debug("All builds are not complete")
469 return False
470
James E. Blairee743612012-05-29 14:49:32 -0700471 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800472 if statsd:
473 self.log.debug("Statsd enabled")
474 else:
475 self.log.debug("Statsd disabled because python statsd "
476 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700477 while True:
478 self.log.debug("Run handler sleeping")
479 self.wake_event.wait()
480 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700481 if self._stopped:
482 return
James E. Blairee743612012-05-29 14:49:32 -0700483 self.log.debug("Run handler awake")
484 try:
James E. Blaircdccd972013-07-01 12:10:22 -0700485 if self._reconfigure:
486 self._doReconfigureEvent()
487
James E. Blair263fba92013-02-27 13:07:19 -0800488 # Give result events priority -- they let us stop builds,
489 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700490 if not self.result_event_queue.empty():
491 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800492 elif not self._pause:
493 if not self.trigger_event_queue.empty():
494 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700495
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700496 if self._pause and self._areAllBuildsComplete():
497 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700498
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700499 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700500 if not (self.trigger_event_queue.empty() and
501 self.result_event_queue.empty()):
502 self.wake_event.set()
503 else:
504 if not self.result_event_queue.empty():
505 self.wake_event.set()
James E. Blair0e933c52013-07-11 10:18:52 -0700506
507 if self._maintain_trigger_cache:
508 self.maintainTriggerCache()
509 self._maintain_trigger_cache = False
510
James E. Blairee743612012-05-29 14:49:32 -0700511 except:
512 self.log.exception("Exception in run handler:")
513
James E. Blair0e933c52013-07-11 10:18:52 -0700514 def maintainTriggerCache(self):
515 relevant = set()
516 for pipeline in self.layout.pipelines.values():
517 for item in pipeline.getAllItems():
518 relevant.add(item.change)
519 relevant.update(item.change.getRelatedChanges())
520 self.log.debug("Trigger cache size: %s" % len(relevant))
521 self.trigger.maintainCache(relevant)
522
James E. Blairee743612012-05-29 14:49:32 -0700523 def process_event_queue(self):
524 self.log.debug("Fetching trigger event")
525 event = self.trigger_event_queue.get()
526 self.log.debug("Processing trigger event %s" % event)
James E. Blaireff88162013-07-01 12:44:14 -0400527 project = self.layout.projects.get(event.project_name)
James E. Blairee743612012-05-29 14:49:32 -0700528 if not project:
529 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800530 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700531 return
532
Antoine Mussofeba9672013-01-17 13:44:59 +0100533 # Preprocessing for ref-update events
534 if hasattr(event, 'refspec'):
535 # Make sure the local git repo is up-to-date with the remote one.
536 # We better have the new ref before enqueuing the changes.
537 # This is done before enqueuing the changes to avoid calling an
538 # update per pipeline accepting the change.
539 self.log.info("Fetching references for %s" % project)
540 self.merger.updateRepo(project)
541
James E. Blaireff88162013-07-01 12:44:14 -0400542 for pipeline in self.layout.pipelines.values():
James E. Blair2fa50962013-01-30 21:50:41 -0800543 change = event.getChange(project, self.trigger)
544 if event.type == 'patchset-created':
545 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700546 if pipeline.manager.eventMatches(event):
547 self.log.info("Adding %s, %s to %s" %
548 (project, change, pipeline))
549 pipeline.manager.addChange(change)
550 while pipeline.manager.processQueue():
551 pass
552
James E. Blairff791972013-01-09 11:45:43 -0800553 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700554
James E. Blairee743612012-05-29 14:49:32 -0700555 def process_result_queue(self):
556 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700557 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700558 self.log.debug("Processing result event %s" % build)
James E. Blaireff88162013-07-01 12:44:14 -0400559 for pipeline in self.layout.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700560 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700561 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800562 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700563 return
564 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700565 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800566 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700567 return
James E. Blairc84dd262012-05-31 10:03:13 -0700568 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800569 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700570
James E. Blair268d9342012-06-13 18:24:29 -0700571 def formatStatusHTML(self):
572 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700573 if self._pause:
574 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700575 if self._exit:
576 ret += 'exit'
577 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
578 ret += '</p>'
579
James E. Blaireff88162013-07-01 12:44:14 -0400580 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700581 keys.sort()
582 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400583 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700584 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700585 ret += s + '\n'
586 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700587 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700588 ret += '\n'
589 ret += '</pre></html>'
590 return ret
591
James E. Blair8dbd56a2012-12-22 10:55:10 -0800592 def formatStatusJSON(self):
593 data = {}
594 if self._pause:
595 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800596 if self._exit:
597 ret += 'exit'
598 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
599 ret += '</p>'
600 data['message'] = ret
601
James E. Blairfb682cc2013-02-26 15:23:27 -0800602 data['trigger_event_queue'] = {}
603 data['trigger_event_queue']['length'] = \
604 self.trigger_event_queue.qsize()
605 data['result_event_queue'] = {}
606 data['result_event_queue']['length'] = \
607 self.result_event_queue.qsize()
608
James E. Blair8dbd56a2012-12-22 10:55:10 -0800609 pipelines = []
610 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400611 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800612 keys.sort()
613 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400614 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800615 pipelines.append(pipeline.formatStatusJSON())
616 return json.dumps(data)
617
James E. Blair1e8dd892012-05-30 09:15:05 -0700618
James E. Blair4aea70c2012-07-26 14:23:24 -0700619class BasePipelineManager(object):
620 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700621
James E. Blair4aea70c2012-07-26 14:23:24 -0700622 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700623 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700624 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700625 self.building_jobs = {}
626 self.event_filters = []
627 self.success_action = {}
628 self.failure_action = {}
James E. Blairdc253862012-06-13 17:12:42 -0700629 self.start_action = {}
James E. Blair3c5e5b52013-04-26 11:17:03 -0700630 if self.sched.config and self.sched.config.has_option(
631 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700632 self.report_times = self.sched.config.getboolean(
633 'zuul', 'report_times')
634 else:
635 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700636
637 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700638 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700639
James E. Blaireff88162013-07-01 12:44:14 -0400640 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700641 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700642 self.log.info(" Events:")
643 for e in self.event_filters:
644 self.log.info(" %s" % e)
645 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700646
James E. Blairee743612012-05-29 14:49:32 -0700647 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700648 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700649 if tree.job:
650 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700651 for b in tree.job._branches:
652 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800653 for f in tree.job._files:
654 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700655 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700656 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700657 hold = ''
658 if tree.job.hold_following_changes:
659 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700660 voting = ''
661 if not tree.job.voting:
662 voting = ' [nonvoting]'
663 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
664 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700665 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700666 log_jobs(x, indent + 2)
667
James E. Blaireff88162013-07-01 12:44:14 -0400668 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700669 tree = self.pipeline.getJobTree(p)
670 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700671 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700672 log_jobs(tree)
James E. Blairdc253862012-06-13 17:12:42 -0700673 if self.start_action:
674 self.log.info(" On start:")
675 self.log.info(" %s" % self.start_action)
James E. Blairee743612012-05-29 14:49:32 -0700676 if self.success_action:
677 self.log.info(" On success:")
678 self.log.info(" %s" % self.success_action)
679 if self.failure_action:
680 self.log.info(" On failure:")
681 self.log.info(" %s" % self.failure_action)
682
James E. Blaire421a232012-07-25 16:59:21 -0700683 def getSubmitAllowNeeds(self):
684 # Get a list of code review labels that are allowed to be
685 # "needed" in the submit records for a change, with respect
686 # to this queue. In other words, the list of review labels
687 # this queue itself is likely to set before submitting.
James E. Blair4aea70c2012-07-26 14:23:24 -0700688 if self.success_action:
689 return self.success_action.keys()
690 else:
691 return {}
James E. Blaire421a232012-07-25 16:59:21 -0700692
James E. Blairee743612012-05-29 14:49:32 -0700693 def eventMatches(self, event):
694 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700695 if ef.matches(event):
696 return True
697 return False
698
James E. Blair0dc8ba92012-07-16 14:23:52 -0700699 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700700 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700701 if change.equals(c):
702 return True
703 return False
704
James E. Blaire0487072012-08-29 17:38:31 -0700705 def reportStart(self, change):
706 try:
707 self.log.info("Reporting start, action %s change %s" %
708 (self.start_action, change))
709 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700710 if self.sched.config.has_option('zuul', 'status_url'):
711 msg += "\n" + self.sched.config.get('zuul', 'status_url')
James E. Blaire0487072012-08-29 17:38:31 -0700712 ret = self.sched.trigger.report(change, msg, self.start_action)
713 if ret:
714 self.log.error("Reporting change start %s received: %s" %
715 (change, ret))
716 except:
717 self.log.exception("Exception while reporting start:")
718
719 def isChangeReadyToBeEnqueued(self, change):
720 return True
721
722 def enqueueChangesAhead(self, change):
723 return True
724
725 def enqueueChangesBehind(self, change):
726 return True
727
James E. Blairfee8d652013-06-07 08:57:52 -0700728 def checkForChangesNeededBy(self, change):
729 return True
730
731 def getDependentItems(self, item):
732 orig_item = item
733 items = []
734 while item.item_ahead:
735 items.append(item.item_ahead)
736 item = item.item_ahead
737 self.log.info("Change %s depends on changes %s" %
738 (orig_item.change,
739 [x.change for x in items]))
740 return items
741
James E. Blair2fa50962013-01-30 21:50:41 -0800742 def findOldVersionOfChangeAlreadyInQueue(self, change):
743 for c in self.pipeline.getChangesInQueue():
744 if change.isUpdateOf(c):
745 return c
746 return None
747
748 def removeOldVersionsOfChange(self, change):
749 if not self.pipeline.dequeue_on_new_patchset:
750 return
751 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
752 if old_change:
753 self.log.debug("Change %s is a new version of %s, removing %s" %
754 (change, old_change, old_change))
755 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -0800756
James E. Blaircdccd972013-07-01 12:10:22 -0700757 def reEnqueueItem(self, item, severed=False):
758 change_queue = self.pipeline.getQueue(item.change.project)
759 if change_queue:
760 self.log.debug("Re-enqueing change %s in queue %s" %
761 (item.change, change_queue))
762 if severed:
763 change_queue.addSeveredHead(item)
764 else:
765 change_queue.enqueueItem(item)
766 self.reportStats(item)
767 return True
768 else:
769 self.log.error("Unable to find change queue for project %s" %
770 item.change.project)
771 return False
772
James E. Blairee743612012-05-29 14:49:32 -0700773 def addChange(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700774 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700775 if self.isChangeAlreadyInQueue(change):
776 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700777 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700778
James E. Blaire0487072012-08-29 17:38:31 -0700779 if not self.isChangeReadyToBeEnqueued(change):
780 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
781 change)
782 return False
783
784 if not self.enqueueChangesAhead(change):
James E. Blair1490eba2013-03-06 19:14:00 -0800785 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700786 return False
787
788 if self.isChangeAlreadyInQueue(change):
789 self.log.debug("Change %s is already in queue, ignoring" % change)
790 return True
791
792 change_queue = self.pipeline.getQueue(change.project)
793 if change_queue:
794 self.log.debug("Adding change %s to queue %s" %
795 (change, change_queue))
796 if self.start_action:
797 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700798 item = change_queue.enqueueChange(change)
799 self.reportStats(item)
James E. Blaire0487072012-08-29 17:38:31 -0700800 self.enqueueChangesBehind(change)
801 else:
802 self.log.error("Unable to find change queue for project %s" %
803 change.project)
804 return False
James E. Blairee743612012-05-29 14:49:32 -0700805
James E. Blairfee8d652013-06-07 08:57:52 -0700806 def dequeueItem(self, item, keep_severed_heads=True):
807 self.log.debug("Removing change %s from queue" % item.change)
808 item_ahead = item.item_ahead
809 change_queue = self.pipeline.getQueue(item.change.project)
810 change_queue.dequeueItem(item)
811 if (keep_severed_heads and not item_ahead and
812 (item.change.is_reportable and not item.reported)):
813 self.log.debug("Adding %s as a severed head" % item.change)
814 change_queue.addSeveredHead(item)
James E. Blair0e933c52013-07-11 10:18:52 -0700815 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -0800816
817 def removeChange(self, change):
818 # Remove a change from the queue, probably because it has been
819 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -0700820 for item in self.pipeline.getAllItems():
821 if item.change == change:
822 self.log.debug("Canceling builds behind change: %s "
823 "because it is being removed." % item.change)
824 self.cancelJobs(item)
825 self.dequeueItem(item, keep_severed_heads=False)
James E. Blair2fa50962013-01-30 21:50:41 -0800826
James E. Blairfee8d652013-06-07 08:57:52 -0700827 def prepareRef(self, item):
828 # Returns False on success.
829 # Returns True if we were unable to prepare the ref.
830 ref = item.current_build_set.ref
831 if hasattr(item.change, 'refspec') and not ref:
832 self.log.debug("Preparing ref for: %s" % item.change)
833 item.current_build_set.setConfiguration()
834 ref = item.current_build_set.ref
835 dependent_items = self.getDependentItems(item)
836 dependent_items.reverse()
James E. Blair6736beb2013-07-11 15:18:15 -0700837 dependent_str = ', '.join(
838 ['%s' % i.change.number for i in dependent_items
839 if i.change.project == item.change.project])
840 if dependent_str:
841 msg = \
842 "This change was unable to be automatically merged "\
843 "with the current state of the repository and the "\
844 "following changes which were enqueued ahead of it: "\
845 "%s. Please rebase your change and upload a new "\
846 "patchset." % dependent_str
847 else:
848 msg = "This change was unable to be automatically merged "\
849 "with the current state of the repository. Please "\
850 "rebase your change and upload a new patchset."
James E. Blairfee8d652013-06-07 08:57:52 -0700851 all_items = dependent_items + [item]
852 if (dependent_items and
853 not dependent_items[-1].current_build_set.commit):
James E. Blair6736beb2013-07-11 15:18:15 -0700854 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -0700855 return True
856 commit = self.sched.merger.mergeChanges(all_items, ref)
857 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -0700858 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -0700859 self.log.info("Unable to merge change %s" % item.change)
James E. Blair6736beb2013-07-11 15:18:15 -0700860 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -0700861 return True
862 return False
863
864 def _launchJobs(self, item, jobs):
865 self.log.debug("Launching jobs for change %s" % item.change)
866 dependent_items = self.getDependentItems(item)
867 for job in jobs:
868 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700869 try:
James E. Blairfee8d652013-06-07 08:57:52 -0700870 build = self.sched.launcher.launch(job, item,
871 self.pipeline,
872 dependent_items)
873 self.building_jobs[build] = item
874 self.log.debug("Adding build %s of job %s to item %s" %
875 (build, job, item))
876 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -0700877 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800878 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -0700879 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700880
James E. Blairfee8d652013-06-07 08:57:52 -0700881 def launchJobs(self, item):
882 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -0700883 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -0700884 self._launchJobs(item, jobs)
885
886 def cancelJobs(self, item, prime=True):
887 self.log.debug("Cancel jobs for change %s" % item.change)
888 canceled = False
889 to_remove = []
890 if prime and item.current_build_set.builds:
891 item.resetAllBuilds()
892 for build, build_item in self.building_jobs.items():
893 if build_item == item:
894 self.log.debug("Found build %s for change %s to cancel" %
895 (build, item.change))
896 try:
897 self.sched.launcher.cancel(build)
898 except:
899 self.log.exception("Exception while canceling build %s "
900 "for change %s" % (build, item.change))
901 to_remove.append(build)
902 canceled = True
903 for build in to_remove:
904 self.log.debug("Removing build %s from running builds" % build)
905 build.result = 'CANCELED'
906 del self.building_jobs[build]
907 if item.item_behind:
908 self.log.debug("Canceling jobs for change %s, behind change %s" %
909 (item.item_behind.change, item.change))
910 if self.cancelJobs(item.item_behind, prime=prime):
911 canceled = True
912 return canceled
913
914 def _processOneItem(self, item):
915 changed = False
916 item_ahead = item.item_ahead
917 item_behind = item.item_behind
918 if self.prepareRef(item):
919 changed = True
James E. Blair6736beb2013-07-11 15:18:15 -0700920 if self.pipeline.dequeue_on_conflict:
921 self.log.info("Dequeuing change %s because "
922 "of a git merge error" % item.change)
923 self.dequeueItem(item, keep_severed_heads=False)
924 try:
925 self.reportItem(item)
926 except MergeFailure:
927 pass
928 return changed
James E. Blairfee8d652013-06-07 08:57:52 -0700929 if self.checkForChangesNeededBy(item.change) is not True:
930 # It's not okay to enqueue this change, we should remove it.
931 self.log.info("Dequeuing change %s because "
932 "it can no longer merge" % item.change)
933 self.cancelJobs(item)
934 self.dequeueItem(item, keep_severed_heads=False)
935 self.pipeline.setDequeuedNeedingChange(item)
936 try:
937 self.reportItem(item)
938 except MergeFailure:
939 pass
940 changed = True
941 return changed
942 if not item_ahead:
943 merge_failed = False
944 if self.pipeline.areAllJobsComplete(item):
945 try:
946 self.reportItem(item)
947 except MergeFailure:
948 merge_failed = True
949 self.dequeueItem(item)
950 changed = True
951 if merge_failed or self.pipeline.didAnyJobFail(item):
952 if item_behind:
953 self.cancelJobs(item_behind)
954 changed = True
955 self.dequeueItem(item)
956 else:
957 if self.pipeline.didAnyJobFail(item):
958 if item_behind:
959 if self.cancelJobs(item_behind, prime=False):
960 changed = True
961 # don't restart yet; this change will eventually become
962 # the head
963 if self.launchJobs(item):
964 changed = True
965 return changed
966
967 def processQueue(self):
968 # Do whatever needs to be done for each change in the queue
969 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
970 changed = False
971 for item in self.pipeline.getAllItems():
972 if self._processOneItem(item):
973 changed = True
974 self.reportStats(item)
975 return changed
James E. Blairdaabed22012-08-15 15:38:57 -0700976
James E. Blair11700c32012-07-05 17:50:05 -0700977 def updateBuildDescriptions(self, build_set):
978 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700979 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700980 self.sched.launcher.setBuildDescription(build, desc)
981
982 if build_set.previous_build_set:
983 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700984 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700985 self.sched.launcher.setBuildDescription(build, desc)
986
987 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700988 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -0700989 # Or triggered externally, or triggered before zuul started,
990 # or restarted
991 return False
992
James E. Blairfee8d652013-06-07 08:57:52 -0700993 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700994 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -0700995 while self.processQueue():
996 pass
James E. Blair11700c32012-07-05 17:50:05 -0700997 return True
998
James E. Blairee743612012-05-29 14:49:32 -0700999 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -07001000 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -07001001 # Or triggered externally, or triggered before zuul started,
1002 # or restarted
1003 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001004
1005 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -07001006 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001007 self.log.debug("Found change %s which triggered completed build %s" %
1008 (change, build))
James E. Blairee743612012-05-29 14:49:32 -07001009
1010 del self.building_jobs[build]
1011
James E. Blair4aea70c2012-07-26 14:23:24 -07001012 self.pipeline.setResult(change, build)
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001013 self.log.info("Change %s status is now:\n %s" %
James E. Blaire0487072012-08-29 17:38:31 -07001014 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -07001015 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001016 while self.processQueue():
1017 pass
James E. Blairee743612012-05-29 14:49:32 -07001018 return True
1019
James E. Blairfee8d652013-06-07 08:57:52 -07001020 def reportItem(self, item):
1021 if item.change.is_reportable and item.reported:
1022 raise Exception("Already reported change %s" % item.change)
1023 ret = self._reportItem(item)
1024 if self.changes_merge:
1025 succeeded = self.pipeline.didAllJobsSucceed(item)
1026 merged = (not ret)
1027 if merged:
1028 merged = self.sched.trigger.isMerged(item.change,
1029 item.change.branch)
1030 self.log.info("Reported change %s status: all-succeeded: %s, "
1031 "merged: %s" % (item.change, succeeded, merged))
1032 if not (succeeded and merged):
1033 self.log.debug("Reported change %s failed tests or failed "
1034 "to merge" % (item.change))
1035 raise MergeFailure("Change %s failed to merge" % item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001036
James E. Blairfee8d652013-06-07 08:57:52 -07001037 def _reportItem(self, item):
1038 if not item.change.is_reportable:
James E. Blaire0487072012-08-29 17:38:31 -07001039 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001040 if item.change.is_reportable and item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001041 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001042 self.log.debug("Reporting change %s" % item.change)
James E. Blairee743612012-05-29 14:49:32 -07001043 ret = None
James E. Blairfee8d652013-06-07 08:57:52 -07001044 if self.pipeline.didAllJobsSucceed(item):
1045 self.log.debug("success %s %s" % (self.success_action,
1046 self.failure_action))
James E. Blairee743612012-05-29 14:49:32 -07001047 action = self.success_action
James E. Blairfee8d652013-06-07 08:57:52 -07001048 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001049 else:
1050 action = self.failure_action
James E. Blairfee8d652013-06-07 08:57:52 -07001051 item.setReportedResult('FAILURE')
1052 report = self.formatReport(item)
1053 item.reported = True
James E. Blairee743612012-05-29 14:49:32 -07001054 try:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001055 self.log.info("Reporting change %s, action: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001056 (item.change, action))
1057 ret = self.sched.trigger.report(item.change, report, action)
James E. Blairee743612012-05-29 14:49:32 -07001058 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001059 self.log.error("Reporting change %s received: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001060 (item.change, ret))
James E. Blairee743612012-05-29 14:49:32 -07001061 except:
1062 self.log.exception("Exception while reporting:")
James E. Blairfee8d652013-06-07 08:57:52 -07001063 item.setReportedResult('ERROR')
1064 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001065 return ret
1066
James E. Blairfee8d652013-06-07 08:57:52 -07001067 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001068 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001069 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001070 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001071 else:
James E. Blair56370192013-01-14 15:47:28 -08001072 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001073
James E. Blairfee8d652013-06-07 08:57:52 -07001074 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001075 ret += "This change depends on a change that failed to merge."
James E. Blair6736beb2013-07-11 15:18:15 -07001076 elif item.current_build_set.unable_to_merge_message:
1077 ret += item.current_build_set.unable_to_merge_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001078 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001079 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001080 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001081 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001082 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001083 for job in self.pipeline.getJobs(item.change):
1084 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001085 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001086 pattern = url_pattern
1087 if result == 'SUCCESS':
1088 if job.success_message:
1089 result = job.success_message
1090 if job.success_pattern:
1091 pattern = job.success_pattern
1092 elif result == 'FAILURE':
1093 if job.failure_message:
1094 result = job.failure_message
1095 if job.failure_pattern:
1096 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001097 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001098 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001099 pipeline=self.pipeline,
1100 job=job,
1101 build=build)
1102 else:
1103 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001104 if not job.voting:
1105 voting = ' (non-voting)'
1106 else:
1107 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001108 if self.report_times and build.end_time and build.start_time:
1109 dt = int(build.end_time - build.start_time)
1110 m, s = divmod(dt, 60)
1111 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001112 if h:
1113 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1114 elif m:
1115 elapsed = ' in %dm %02ds' % (m, s)
1116 else:
1117 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001118 else:
1119 elapsed = ''
1120 ret += '- %s : %s%s%s\n' % (url, result, elapsed, voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001121 return ret
1122
1123 def formatDescription(self, build):
1124 concurrent_changes = ''
1125 concurrent_builds = ''
1126 other_builds = ''
1127
1128 for change in build.build_set.other_changes:
1129 concurrent_changes += '<li><a href="{change.url}">\
1130 {change.number},{change.patchset}</a></li>'.format(
1131 change=change)
1132
James E. Blairfee8d652013-06-07 08:57:52 -07001133 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001134
1135 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001136 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001137 concurrent_builds += """\
1138<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001139 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001140 {build.job.name} #{build.number}</a>: {build.result}
1141</li>
1142""".format(build=build)
1143 else:
1144 concurrent_builds += """\
1145<li>
1146 {build.job.name}: {build.result}
1147</li>""".format(build=build)
1148
1149 if build.build_set.previous_build_set:
1150 other_build = build.build_set.previous_build_set.getBuild(
1151 build.job.name)
1152 if other_build:
1153 other_builds += """\
1154<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001155 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001156 {build.job.name} #{build.number}</a>
1157</li>
1158""".format(build=other_build)
1159
1160 if build.build_set.next_build_set:
1161 other_build = build.build_set.next_build_set.getBuild(
1162 build.job.name)
1163 if other_build:
1164 other_builds += """\
1165<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001166 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001167 {build.job.name} #{build.number}</a>
1168</li>
1169""".format(build=other_build)
1170
1171 result = build.build_set.result
1172
1173 if hasattr(change, 'number'):
1174 ret = """\
1175<p>
1176 Triggered by change:
1177 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1178 Branch: <b>{change.branch}</b><br/>
1179 Pipeline: <b>{self.pipeline.name}</b>
1180</p>"""
1181 else:
1182 ret = """\
1183<p>
1184 Triggered by reference:
1185 {change.ref}</a><br/>
1186 Old revision: <b>{change.oldrev}</b><br/>
1187 New revision: <b>{change.newrev}</b><br/>
1188 Pipeline: <b>{self.pipeline.name}</b>
1189</p>"""
1190
1191 if concurrent_changes:
1192 ret += """\
1193<p>
1194 Other changes tested concurrently with this change:
1195 <ul>{concurrent_changes}</ul>
1196</p>
1197"""
1198 if concurrent_builds:
1199 ret += """\
1200<p>
1201 All builds for this change set:
1202 <ul>{concurrent_builds}</ul>
1203</p>
1204"""
1205
1206 if other_builds:
1207 ret += """\
1208<p>
1209 Other build sets for this change:
1210 <ul>{other_builds}</ul>
1211</p>
1212"""
1213 if result:
1214 ret += """\
1215<p>
1216 Reported result: <b>{result}</b>
1217</p>
1218"""
1219
1220 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001221 return ret
1222
James E. Blairfee8d652013-06-07 08:57:52 -07001223 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001224 if not statsd:
1225 return
1226 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001227 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001228 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001229 if item.dequeue_time:
1230 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001231 else:
1232 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001233 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001234
1235 # stats.timers.zuul.pipeline.NAME.resident_time
1236 # stats_counts.zuul.pipeline.NAME.total_changes
1237 # stats.gauges.zuul.pipeline.NAME.current_changes
1238 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001239 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001240 if dt:
1241 statsd.timing(key + '.resident_time', dt)
1242 statsd.incr(key + '.total_changes')
1243
1244 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1245 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001246 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001247 key += '.%s' % project_name
1248 if dt:
1249 statsd.timing(key + '.resident_time', dt)
1250 statsd.incr(key + '.total_changes')
1251 except:
1252 self.log.exception("Exception reporting pipeline stats")
1253
James E. Blair1e8dd892012-05-30 09:15:05 -07001254
James E. Blair4aea70c2012-07-26 14:23:24 -07001255class IndependentPipelineManager(BasePipelineManager):
1256 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001257 changes_merge = False
1258
James E. Blaireff88162013-07-01 12:44:14 -04001259 def _postConfig(self, layout):
1260 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001261
1262 change_queue = ChangeQueue(self.pipeline, dependent=False)
1263 for project in self.pipeline.getProjects():
1264 change_queue.addProject(project)
1265
1266 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001267
James E. Blair1e8dd892012-05-30 09:15:05 -07001268
James E. Blair4aea70c2012-07-26 14:23:24 -07001269class DependentPipelineManager(BasePipelineManager):
1270 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001271 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001272
1273 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001274 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001275
James E. Blaireff88162013-07-01 12:44:14 -04001276 def _postConfig(self, layout):
1277 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001278 self.buildChangeQueues()
1279
1280 def buildChangeQueues(self):
1281 self.log.debug("Building shared change queues")
1282 change_queues = []
1283
James E. Blair4aea70c2012-07-26 14:23:24 -07001284 for project in self.pipeline.getProjects():
1285 change_queue = ChangeQueue(self.pipeline)
1286 change_queue.addProject(project)
1287 change_queues.append(change_queue)
1288 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001289
1290 self.log.debug("Combining shared queues")
1291 new_change_queues = []
1292 for a in change_queues:
1293 merged_a = False
1294 for b in new_change_queues:
1295 if not a.getJobs().isdisjoint(b.getJobs()):
1296 self.log.debug("Merging queue %s into %s" % (a, b))
1297 b.mergeChangeQueue(a)
1298 merged_a = True
1299 break # this breaks out of 'for b' and continues 'for a'
1300 if not merged_a:
1301 self.log.debug("Keeping queue %s" % (a))
1302 new_change_queues.append(a)
James E. Blair1e8dd892012-05-30 09:15:05 -07001303
James E. Blairee743612012-05-29 14:49:32 -07001304 self.log.info(" Shared change queues:")
James E. Blaire0487072012-08-29 17:38:31 -07001305 for queue in new_change_queues:
1306 self.pipeline.addQueue(queue)
1307 self.log.info(" %s" % queue)
James E. Blairee743612012-05-29 14:49:32 -07001308
James E. Blaire0487072012-08-29 17:38:31 -07001309 def isChangeReadyToBeEnqueued(self, change):
1310 if not self.sched.trigger.canMerge(change,
1311 self.getSubmitAllowNeeds()):
1312 self.log.debug("Change %s can not merge, ignoring" % change)
1313 return False
1314 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001315
James E. Blaire0487072012-08-29 17:38:31 -07001316 def enqueueChangesBehind(self, change):
1317 to_enqueue = []
1318 self.log.debug("Checking for changes needing %s:" % change)
1319 if not hasattr(change, 'needed_by_changes'):
1320 self.log.debug(" Changeish does not support dependencies")
1321 return
1322 for needs in change.needed_by_changes:
1323 if self.sched.trigger.canMerge(needs,
1324 self.getSubmitAllowNeeds()):
1325 self.log.debug(" Change %s needs %s and is ready to merge" %
1326 (needs, change))
1327 to_enqueue.append(needs)
1328 if not to_enqueue:
1329 self.log.debug(" No changes need %s" % change)
1330
1331 for other_change in to_enqueue:
1332 self.addChange(other_change)
1333
1334 def enqueueChangesAhead(self, change):
1335 ret = self.checkForChangesNeededBy(change)
1336 if ret in [True, False]:
1337 return ret
1338 self.log.debug(" Change %s must be merged ahead of %s" %
1339 (ret, change))
1340 return self.addChange(ret)
1341
1342 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001343 self.log.debug("Checking for changes needed by %s:" % change)
1344 # Return true if okay to proceed enqueing this change,
1345 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001346 if not hasattr(change, 'needs_change'):
1347 self.log.debug(" Changeish does not support dependencies")
1348 return True
James E. Blaire421a232012-07-25 16:59:21 -07001349 if not change.needs_change:
1350 self.log.debug(" No changes needed")
1351 return True
1352 if change.needs_change.is_merged:
1353 self.log.debug(" Needed change is merged")
1354 return True
1355 if not change.needs_change.is_current_patchset:
1356 self.log.debug(" Needed change is not the current patchset")
1357 return False
James E. Blair127bc182012-08-28 15:55:15 -07001358 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001359 self.log.debug(" Needed change is already ahead in the queue")
1360 return True
James E. Blaire0487072012-08-29 17:38:31 -07001361 if self.sched.trigger.canMerge(change.needs_change,
1362 self.getSubmitAllowNeeds()):
1363 self.log.debug(" Change %s is needed" %
1364 change.needs_change)
1365 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001366 # The needed change can't be merged.
1367 self.log.debug(" Change %s is needed but can not be merged" %
1368 change.needs_change)
1369 return False