blob: a973877c53244aa7576c792ab798c747fde57c81 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
25import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080027import yaml
James E. Blairee743612012-05-29 14:49:32 -070028
James E. Blair47958382013-01-10 17:26:02 -080029import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070030import model
James E. Blaireff88162013-07-01 12:44:14 -040031from model import Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070032import merger
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair71e94122012-12-24 17:53:08 -080034statsd = extras.try_import('statsd.statsd')
35
James E. Blair1e8dd892012-05-30 09:15:05 -070036
Antoine Musso80edd5a2013-02-13 15:37:53 +010037def deep_format(obj, paramdict):
38 """Apply the paramdict via str.format() to all string objects found within
39 the supplied obj. Lists and dicts are traversed recursively.
40
41 Borrowed from Jenkins Job Builder project"""
42 if isinstance(obj, str):
43 ret = obj.format(**paramdict)
44 elif isinstance(obj, list):
45 ret = []
46 for item in obj:
47 ret.append(deep_format(item, paramdict))
48 elif isinstance(obj, dict):
49 ret = {}
50 for item in obj:
51 exp_item = item.format(**paramdict)
52
53 ret[exp_item] = deep_format(obj[item], paramdict)
54 else:
55 ret = obj
56 return ret
57
58
James E. Blairfee8d652013-06-07 08:57:52 -070059class MergeFailure(Exception):
60 pass
61
62
James E. Blaire9d45c32012-05-31 09:56:45 -070063class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -070064 log = logging.getLogger("zuul.Scheduler")
65
James E. Blaire9d45c32012-05-31 09:56:45 -070066 def __init__(self):
67 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -040068 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -070069 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -070070 self.layout_lock = threading.Lock()
James E. Blaire9d45c32012-05-31 09:56:45 -070071 self.reconfigure_complete_event = threading.Event()
James E. Blair5d5bc2b2012-07-06 10:24:01 -070072 self._pause = False
73 self._reconfigure = False
74 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -070075 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -070076 self.launcher = None
77 self.trigger = None
James E. Blair3c5e5b52013-04-26 11:17:03 -070078 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -070079 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -070080
81 self.trigger_event_queue = Queue.Queue()
82 self.result_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -040083 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -070084
James E. Blairb0fcae42012-07-17 11:12:10 -070085 def stop(self):
86 self._stopped = True
87 self.wake_event.set()
88
James E. Blair47958382013-01-10 17:26:02 -080089 def testConfig(self, config_path):
James E. Blair47958382013-01-10 17:26:02 -080090 self._parseConfig(config_path)
91
James E. Blaire5a847f2012-07-10 15:29:14 -070092 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -040093 layout = model.Layout()
94 project_templates = {}
95
James E. Blairee743612012-05-29 14:49:32 -070096 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -070097 if not item:
98 return []
James E. Blair32663402012-06-01 10:04:18 -070099 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700100 return item
101 return [item]
102
James E. Blaire5a847f2012-07-10 15:29:14 -0700103 if config_path:
104 config_path = os.path.expanduser(config_path)
105 if not os.path.exists(config_path):
106 raise Exception("Unable to read layout config file at %s" %
107 config_path)
108 config_file = open(config_path)
109 data = yaml.load(config_file)
110
James E. Blair47958382013-01-10 17:26:02 -0800111 validator = layoutvalidator.LayoutValidator()
112 validator.validate(data)
113
James E. Blaireff88162013-07-01 12:44:14 -0400114 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700115 for include in data.get('includes', []):
116 if 'python-file' in include:
117 fn = include['python-file']
118 if not os.path.isabs(fn):
119 base = os.path.dirname(config_path)
120 fn = os.path.join(base, fn)
121 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400122 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700123
James E. Blair4aea70c2012-07-26 14:23:24 -0700124 for conf_pipeline in data.get('pipelines', []):
125 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800126 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700127 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
128 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800129 pipeline.failure_message = conf_pipeline.get('failure-message',
130 "Build failed.")
131 pipeline.success_message = conf_pipeline.get('success-message',
132 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800133 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
134 'dequeue-on-new-patchset',
135 True)
James E. Blair4aea70c2012-07-26 14:23:24 -0700136 manager = globals()[conf_pipeline['manager']](self, pipeline)
137 pipeline.setManager(manager)
138
James E. Blaireff88162013-07-01 12:44:14 -0400139 layout.pipelines[conf_pipeline['name']] = pipeline
James E. Blair4aea70c2012-07-26 14:23:24 -0700140 manager.success_action = conf_pipeline.get('success')
141 manager.failure_action = conf_pipeline.get('failure')
142 manager.start_action = conf_pipeline.get('start')
143 for trigger in toList(conf_pipeline['trigger']):
James E. Blairee743612012-05-29 14:49:32 -0700144 approvals = {}
145 for approval_dict in toList(trigger.get('approval')):
146 for k, v in approval_dict.items():
James E. Blair1e8dd892012-05-30 09:15:05 -0700147 approvals[k] = v
James E. Blairee743612012-05-29 14:49:32 -0700148 f = EventFilter(types=toList(trigger['event']),
149 branches=toList(trigger.get('branch')),
150 refs=toList(trigger.get('ref')),
Clark Boylanb9bcb402012-06-29 17:44:05 -0700151 approvals=approvals,
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800152 comment_filters=
Antoine Mussob4e809e2012-12-06 16:58:06 +0100153 toList(trigger.get('comment_filter')),
154 email_filters=
155 toList(trigger.get('email_filter')))
James E. Blairee743612012-05-29 14:49:32 -0700156 manager.event_filters.append(f)
157
Antoine Musso80edd5a2013-02-13 15:37:53 +0100158 for project_template in data.get('project-templates', []):
159 # Make sure the template only contains valid pipelines
160 tpl = dict(
161 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400162 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100163 if pipe_name in project_template
164 )
James E. Blaireff88162013-07-01 12:44:14 -0400165 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100166
James E. Blair47958382013-01-10 17:26:02 -0800167 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400168 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700169 # Be careful to only set attributes explicitly present on
170 # this job, to avoid squashing attributes set by a meta-job.
171 m = config_job.get('failure-message', None)
172 if m:
173 job.failure_message = m
174 m = config_job.get('success-message', None)
175 if m:
176 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800177 m = config_job.get('failure-pattern', None)
178 if m:
179 job.failure_pattern = m
180 m = config_job.get('success-pattern', None)
181 if m:
182 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700183 m = config_job.get('hold-following-changes', False)
184 if m:
185 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700186 m = config_job.get('voting', None)
187 if m is not None:
188 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700189 fname = config_job.get('parameter-function', None)
190 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400191 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700192 if not func:
193 raise Exception("Unable to find function %s" % fname)
194 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700195 branches = toList(config_job.get('branch'))
196 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700197 job._branches = branches
198 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800199 files = toList(config_job.get('files'))
200 if files:
201 job._files = files
202 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700203
204 def add_jobs(job_tree, config_jobs):
205 for job in config_jobs:
206 if isinstance(job, list):
207 for x in job:
208 add_jobs(job_tree, x)
209 if isinstance(job, dict):
210 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400211 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700212 add_jobs(parent_tree, children)
213 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400214 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700215
James E. Blair47958382013-01-10 17:26:02 -0800216 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700217 project = Project(config_project['name'])
Antoine Musso80edd5a2013-02-13 15:37:53 +0100218
219 for requested_template in config_project.get('template', []):
220 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400221 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100222 requested_template.get('name'))
223 # Expand it with the project context
224 expanded = deep_format(tpl, requested_template)
225 # Finally merge the expansion with whatever has been already
226 # defined for this project
227 config_project.update(expanded)
228
James E. Blaireff88162013-07-01 12:44:14 -0400229 layout.projects[config_project['name']] = project
James E. Blair4886cc12012-07-18 15:39:41 -0700230 mode = config_project.get('merge-mode')
231 if mode and mode == 'cherry-pick':
232 project.merge_mode = model.CHERRY_PICK
James E. Blaireff88162013-07-01 12:44:14 -0400233 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700234 if pipeline.name in config_project:
235 job_tree = pipeline.addProject(project)
236 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700237 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700238
James E. Blairb0954652012-06-01 11:32:01 -0700239 # All jobs should be defined at this point, get rid of
240 # metajobs so that getJob isn't doing anything weird.
James E. Blaireff88162013-07-01 12:44:14 -0400241 layout.metajobs = {}
James E. Blairb0954652012-06-01 11:32:01 -0700242
James E. Blaireff88162013-07-01 12:44:14 -0400243 for pipeline in layout.pipelines.values():
244 pipeline.manager._postConfig(layout)
245
246 return layout
James E. Blairee743612012-05-29 14:49:32 -0700247
James E. Blair47958382013-01-10 17:26:02 -0800248 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700249 if self.config.has_option('zuul', 'git_dir'):
250 merge_root = self.config.get('zuul', 'git_dir')
251 else:
252 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800253
Paul Belangerb67aba12013-05-13 19:22:14 -0400254 if self.config.has_option('zuul', 'git_user_email'):
255 merge_email = self.config.get('zuul', 'git_user_email')
256 else:
257 merge_email = None
258
259 if self.config.has_option('zuul', 'git_user_name'):
260 merge_name = self.config.get('zuul', 'git_user_name')
261 else:
262 merge_name = None
263
James E. Blairceabcbc2012-08-17 13:48:46 -0700264 if self.config.has_option('zuul', 'push_change_refs'):
265 push_refs = self.config.getboolean('zuul', 'push_change_refs')
266 else:
267 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800268
James E. Blairad615012012-11-30 16:14:21 -0800269 if self.config.has_option('gerrit', 'sshkey'):
270 sshkey = self.config.get('gerrit', 'sshkey')
271 else:
272 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800273
James E. Blairad615012012-11-30 16:14:21 -0800274 self.merger = merger.Merger(self.trigger, merge_root, push_refs,
Paul Belangerb67aba12013-05-13 19:22:14 -0400275 sshkey, merge_email, merge_name)
James E. Blaireff88162013-07-01 12:44:14 -0400276 for project in self.layout.projects.values():
James E. Blair4886cc12012-07-18 15:39:41 -0700277 url = self.trigger.getGitUrl(project)
278 self.merger.addProject(project, url)
279
James E. Blairee743612012-05-29 14:49:32 -0700280 def setLauncher(self, launcher):
281 self.launcher = launcher
282
283 def setTrigger(self, trigger):
284 self.trigger = trigger
285
James E. Blaircdccd972013-07-01 12:10:22 -0700286 def getProject(self, name):
287 self.layout_lock.acquire()
288 p = None
289 try:
290 p = self.layout.projects.get(name)
291 finally:
292 self.layout_lock.release()
293 return p
294
James E. Blairee743612012-05-29 14:49:32 -0700295 def addEvent(self, event):
296 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800297 try:
298 if statsd:
299 statsd.incr('gerrit.event.%s' % event.type)
300 except:
301 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700302 self.trigger_event_queue.put(event)
303 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800304 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700305
James E. Blair11700c32012-07-05 17:50:05 -0700306 def onBuildStarted(self, build):
307 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800308 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700309 self.result_event_queue.put(('started', build))
310 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800311 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700312
James E. Blairee743612012-05-29 14:49:32 -0700313 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700314 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800315 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800316 try:
317 if statsd:
318 key = 'zuul.job.%s' % build.job.name
319 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
320 dt = int((build.end_time - build.start_time) * 1000)
321 statsd.timing(key, dt)
322 statsd.incr(key)
323 except:
324 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700325 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700326 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800327 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700328
James E. Blaire9d45c32012-05-31 09:56:45 -0700329 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700330 self.log.debug("Prepare to reconfigure")
James E. Blaire9d45c32012-05-31 09:56:45 -0700331 self.config = config
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700332 self._reconfigure = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700333 self.wake_event.set()
334 self.log.debug("Waiting for reconfiguration")
335 self.reconfigure_complete_event.wait()
336 self.reconfigure_complete_event.clear()
337 self.log.debug("Reconfiguration complete")
338
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700339 def exit(self):
340 self.log.debug("Prepare to exit")
341 self._pause = True
342 self._exit = True
343 self.wake_event.set()
344 self.log.debug("Waiting for exit")
345
346 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700347 if self.config.has_option('zuul', 'state_dir'):
348 state_dir = os.path.expanduser(self.config.get('zuul',
349 'state_dir'))
350 else:
351 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700352 return os.path.join(state_dir, 'queue.pickle')
353
354 def _save_queue(self):
355 pickle_file = self._get_queue_pickle_file()
356 events = []
357 while not self.trigger_event_queue.empty():
358 events.append(self.trigger_event_queue.get())
359 self.log.debug("Queue length is %s" % len(events))
360 if events:
361 self.log.debug("Saving queue")
362 pickle.dump(events, open(pickle_file, 'wb'))
363
364 def _load_queue(self):
365 pickle_file = self._get_queue_pickle_file()
366 if os.path.exists(pickle_file):
367 self.log.debug("Loading queue")
368 events = pickle.load(open(pickle_file, 'rb'))
369 self.log.debug("Queue length is %s" % len(events))
370 for event in events:
371 self.trigger_event_queue.put(event)
372 else:
373 self.log.debug("No queue file found")
374
375 def _delete_queue(self):
376 pickle_file = self._get_queue_pickle_file()
377 if os.path.exists(pickle_file):
378 self.log.debug("Deleting saved queue")
379 os.unlink(pickle_file)
380
381 def resume(self):
382 try:
383 self._load_queue()
384 except:
385 self.log.exception("Unable to load queue")
386 try:
387 self._delete_queue()
388 except:
389 self.log.exception("Unable to delete saved queue")
390 self.log.debug("Resuming queue processing")
391 self.wake_event.set()
392
393 def _doPauseEvent(self):
394 if self._exit:
395 self.log.debug("Exiting")
396 self._save_queue()
397 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700398
399 def _doReconfigureEvent(self):
400 # This is called in the scheduler loop after another thread sets
401 # the reconfigure flag
402 self.layout_lock.acquire()
403 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700404 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700405 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400406 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700407 for name, new_pipeline in layout.pipelines.items():
408 old_pipeline = self.layout.pipelines.get(name)
409 if not old_pipeline:
410 if self.layout.pipelines:
411 # Don't emit this warning on startup
412 self.log.warning("No old pipeline matching %s found "
413 "when reconfiguring" % name)
414 continue
415 self.log.debug("Re-enqueueing changes for pipeline %s" %
416 name)
417 items_to_remove = []
418 for shared_queue in old_pipeline.queues:
419 for item in (shared_queue.queue +
420 shared_queue.severed_heads):
421 item.item_ahead = None
422 item.item_behind = None
423 item.pipeline = None
424 project = layout.projects.get(item.change.project.name)
425 if not project:
426 self.log.warning("Unable to find project for "
427 "change %s while reenqueueing" %
428 item.change)
429 item.change.project = None
430 items_to_remove.append(item)
431 continue
432 item.change.project = project
433 severed = item in shared_queue.severed_heads
James E. Blair78e31b32013-07-09 09:11:34 -0700434 if not new_pipeline.manager.reEnqueueItem(
435 item, severed=severed):
James E. Blaircdccd972013-07-01 12:10:22 -0700436 items_to_remove.append(item)
437 builds_to_remove = []
438 for build, item in old_pipeline.manager.building_jobs.items():
439 if item in items_to_remove:
440 builds_to_remove.append(build)
441 self.log.warning("Deleting running build %s for "
442 "change %s while reenqueueing" % (
443 build, item.change))
444 for build in builds_to_remove:
445 del old_pipeline.manager.building_jobs[build]
446 new_pipeline.manager.building_jobs = \
447 old_pipeline.manager.building_jobs
448 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800449 self._setupMerger()
James E. Blaire0487072012-08-29 17:38:31 -0700450 self._reconfigure = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700451 self.reconfigure_complete_event.set()
James E. Blaircdccd972013-07-01 12:10:22 -0700452 finally:
453 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700454
455 def _areAllBuildsComplete(self):
456 self.log.debug("Checking if all builds are complete")
457 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400458 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700459 for build in pipeline.manager.building_jobs.keys():
460 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700461 waiting = True
462 if not waiting:
463 self.log.debug("All builds are complete")
464 return True
465 self.log.debug("All builds are not complete")
466 return False
467
James E. Blairee743612012-05-29 14:49:32 -0700468 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800469 if statsd:
470 self.log.debug("Statsd enabled")
471 else:
472 self.log.debug("Statsd disabled because python statsd "
473 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700474 while True:
475 self.log.debug("Run handler sleeping")
476 self.wake_event.wait()
477 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700478 if self._stopped:
479 return
James E. Blairee743612012-05-29 14:49:32 -0700480 self.log.debug("Run handler awake")
481 try:
James E. Blaircdccd972013-07-01 12:10:22 -0700482 if self._reconfigure:
483 self._doReconfigureEvent()
484
James E. Blair263fba92013-02-27 13:07:19 -0800485 # Give result events priority -- they let us stop builds,
486 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700487 if not self.result_event_queue.empty():
488 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800489 elif not self._pause:
490 if not self.trigger_event_queue.empty():
491 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700492
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700493 if self._pause and self._areAllBuildsComplete():
494 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700495
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700496 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700497 if not (self.trigger_event_queue.empty() and
498 self.result_event_queue.empty()):
499 self.wake_event.set()
500 else:
501 if not self.result_event_queue.empty():
502 self.wake_event.set()
James E. Blair0e933c52013-07-11 10:18:52 -0700503
504 if self._maintain_trigger_cache:
505 self.maintainTriggerCache()
506 self._maintain_trigger_cache = False
507
James E. Blairee743612012-05-29 14:49:32 -0700508 except:
509 self.log.exception("Exception in run handler:")
510
James E. Blair0e933c52013-07-11 10:18:52 -0700511 def maintainTriggerCache(self):
512 relevant = set()
513 for pipeline in self.layout.pipelines.values():
514 for item in pipeline.getAllItems():
515 relevant.add(item.change)
516 relevant.update(item.change.getRelatedChanges())
517 self.log.debug("Trigger cache size: %s" % len(relevant))
518 self.trigger.maintainCache(relevant)
519
James E. Blairee743612012-05-29 14:49:32 -0700520 def process_event_queue(self):
521 self.log.debug("Fetching trigger event")
522 event = self.trigger_event_queue.get()
523 self.log.debug("Processing trigger event %s" % event)
James E. Blaireff88162013-07-01 12:44:14 -0400524 project = self.layout.projects.get(event.project_name)
James E. Blairee743612012-05-29 14:49:32 -0700525 if not project:
526 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800527 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700528 return
529
Antoine Mussofeba9672013-01-17 13:44:59 +0100530 # Preprocessing for ref-update events
531 if hasattr(event, 'refspec'):
532 # Make sure the local git repo is up-to-date with the remote one.
533 # We better have the new ref before enqueuing the changes.
534 # This is done before enqueuing the changes to avoid calling an
535 # update per pipeline accepting the change.
536 self.log.info("Fetching references for %s" % project)
537 self.merger.updateRepo(project)
538
James E. Blaireff88162013-07-01 12:44:14 -0400539 for pipeline in self.layout.pipelines.values():
James E. Blair2fa50962013-01-30 21:50:41 -0800540 change = event.getChange(project, self.trigger)
541 if event.type == 'patchset-created':
542 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700543 if pipeline.manager.eventMatches(event):
544 self.log.info("Adding %s, %s to %s" %
545 (project, change, pipeline))
546 pipeline.manager.addChange(change)
547 while pipeline.manager.processQueue():
548 pass
549
James E. Blairff791972013-01-09 11:45:43 -0800550 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700551
James E. Blairee743612012-05-29 14:49:32 -0700552 def process_result_queue(self):
553 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700554 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700555 self.log.debug("Processing result event %s" % build)
James E. Blaireff88162013-07-01 12:44:14 -0400556 for pipeline in self.layout.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700557 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700558 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800559 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700560 return
561 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700562 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800563 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700564 return
James E. Blairc84dd262012-05-31 10:03:13 -0700565 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800566 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700567
James E. Blair268d9342012-06-13 18:24:29 -0700568 def formatStatusHTML(self):
569 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700570 if self._pause:
571 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700572 if self._exit:
573 ret += 'exit'
574 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
575 ret += '</p>'
576
James E. Blaireff88162013-07-01 12:44:14 -0400577 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700578 keys.sort()
579 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400580 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700581 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700582 ret += s + '\n'
583 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700584 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700585 ret += '\n'
586 ret += '</pre></html>'
587 return ret
588
James E. Blair8dbd56a2012-12-22 10:55:10 -0800589 def formatStatusJSON(self):
590 data = {}
591 if self._pause:
592 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800593 if self._exit:
594 ret += 'exit'
595 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
596 ret += '</p>'
597 data['message'] = ret
598
James E. Blairfb682cc2013-02-26 15:23:27 -0800599 data['trigger_event_queue'] = {}
600 data['trigger_event_queue']['length'] = \
601 self.trigger_event_queue.qsize()
602 data['result_event_queue'] = {}
603 data['result_event_queue']['length'] = \
604 self.result_event_queue.qsize()
605
James E. Blair8dbd56a2012-12-22 10:55:10 -0800606 pipelines = []
607 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400608 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800609 keys.sort()
610 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400611 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800612 pipelines.append(pipeline.formatStatusJSON())
613 return json.dumps(data)
614
James E. Blair1e8dd892012-05-30 09:15:05 -0700615
James E. Blair4aea70c2012-07-26 14:23:24 -0700616class BasePipelineManager(object):
617 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700618
James E. Blair4aea70c2012-07-26 14:23:24 -0700619 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700620 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700621 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700622 self.building_jobs = {}
623 self.event_filters = []
624 self.success_action = {}
625 self.failure_action = {}
James E. Blairdc253862012-06-13 17:12:42 -0700626 self.start_action = {}
James E. Blair3c5e5b52013-04-26 11:17:03 -0700627 if self.sched.config and self.sched.config.has_option(
628 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700629 self.report_times = self.sched.config.getboolean(
630 'zuul', 'report_times')
631 else:
632 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700633
634 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700635 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700636
James E. Blaireff88162013-07-01 12:44:14 -0400637 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700638 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700639 self.log.info(" Events:")
640 for e in self.event_filters:
641 self.log.info(" %s" % e)
642 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700643
James E. Blairee743612012-05-29 14:49:32 -0700644 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700645 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700646 if tree.job:
647 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700648 for b in tree.job._branches:
649 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800650 for f in tree.job._files:
651 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700652 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700653 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700654 hold = ''
655 if tree.job.hold_following_changes:
656 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700657 voting = ''
658 if not tree.job.voting:
659 voting = ' [nonvoting]'
660 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
661 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700662 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700663 log_jobs(x, indent + 2)
664
James E. Blaireff88162013-07-01 12:44:14 -0400665 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700666 tree = self.pipeline.getJobTree(p)
667 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700668 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700669 log_jobs(tree)
James E. Blairdc253862012-06-13 17:12:42 -0700670 if self.start_action:
671 self.log.info(" On start:")
672 self.log.info(" %s" % self.start_action)
James E. Blairee743612012-05-29 14:49:32 -0700673 if self.success_action:
674 self.log.info(" On success:")
675 self.log.info(" %s" % self.success_action)
676 if self.failure_action:
677 self.log.info(" On failure:")
678 self.log.info(" %s" % self.failure_action)
679
James E. Blaire421a232012-07-25 16:59:21 -0700680 def getSubmitAllowNeeds(self):
681 # Get a list of code review labels that are allowed to be
682 # "needed" in the submit records for a change, with respect
683 # to this queue. In other words, the list of review labels
684 # this queue itself is likely to set before submitting.
James E. Blair4aea70c2012-07-26 14:23:24 -0700685 if self.success_action:
686 return self.success_action.keys()
687 else:
688 return {}
James E. Blaire421a232012-07-25 16:59:21 -0700689
James E. Blairee743612012-05-29 14:49:32 -0700690 def eventMatches(self, event):
691 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700692 if ef.matches(event):
693 return True
694 return False
695
James E. Blair0dc8ba92012-07-16 14:23:52 -0700696 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700697 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700698 if change.equals(c):
699 return True
700 return False
701
James E. Blaire0487072012-08-29 17:38:31 -0700702 def reportStart(self, change):
703 try:
704 self.log.info("Reporting start, action %s change %s" %
705 (self.start_action, change))
706 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700707 if self.sched.config.has_option('zuul', 'status_url'):
708 msg += "\n" + self.sched.config.get('zuul', 'status_url')
James E. Blaire0487072012-08-29 17:38:31 -0700709 ret = self.sched.trigger.report(change, msg, self.start_action)
710 if ret:
711 self.log.error("Reporting change start %s received: %s" %
712 (change, ret))
713 except:
714 self.log.exception("Exception while reporting start:")
715
716 def isChangeReadyToBeEnqueued(self, change):
717 return True
718
719 def enqueueChangesAhead(self, change):
720 return True
721
722 def enqueueChangesBehind(self, change):
723 return True
724
James E. Blairfee8d652013-06-07 08:57:52 -0700725 def checkForChangesNeededBy(self, change):
726 return True
727
728 def getDependentItems(self, item):
729 orig_item = item
730 items = []
731 while item.item_ahead:
732 items.append(item.item_ahead)
733 item = item.item_ahead
734 self.log.info("Change %s depends on changes %s" %
735 (orig_item.change,
736 [x.change for x in items]))
737 return items
738
James E. Blair2fa50962013-01-30 21:50:41 -0800739 def findOldVersionOfChangeAlreadyInQueue(self, change):
740 for c in self.pipeline.getChangesInQueue():
741 if change.isUpdateOf(c):
742 return c
743 return None
744
745 def removeOldVersionsOfChange(self, change):
746 if not self.pipeline.dequeue_on_new_patchset:
747 return
748 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
749 if old_change:
750 self.log.debug("Change %s is a new version of %s, removing %s" %
751 (change, old_change, old_change))
752 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -0800753
James E. Blaircdccd972013-07-01 12:10:22 -0700754 def reEnqueueItem(self, item, severed=False):
755 change_queue = self.pipeline.getQueue(item.change.project)
756 if change_queue:
757 self.log.debug("Re-enqueing change %s in queue %s" %
758 (item.change, change_queue))
759 if severed:
760 change_queue.addSeveredHead(item)
761 else:
762 change_queue.enqueueItem(item)
763 self.reportStats(item)
764 return True
765 else:
766 self.log.error("Unable to find change queue for project %s" %
767 item.change.project)
768 return False
769
James E. Blairee743612012-05-29 14:49:32 -0700770 def addChange(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700771 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700772 if self.isChangeAlreadyInQueue(change):
773 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700774 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700775
James E. Blaire0487072012-08-29 17:38:31 -0700776 if not self.isChangeReadyToBeEnqueued(change):
777 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
778 change)
779 return False
780
781 if not self.enqueueChangesAhead(change):
James E. Blair1490eba2013-03-06 19:14:00 -0800782 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700783 return False
784
785 if self.isChangeAlreadyInQueue(change):
786 self.log.debug("Change %s is already in queue, ignoring" % change)
787 return True
788
789 change_queue = self.pipeline.getQueue(change.project)
790 if change_queue:
791 self.log.debug("Adding change %s to queue %s" %
792 (change, change_queue))
793 if self.start_action:
794 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700795 item = change_queue.enqueueChange(change)
796 self.reportStats(item)
James E. Blaire0487072012-08-29 17:38:31 -0700797 self.enqueueChangesBehind(change)
798 else:
799 self.log.error("Unable to find change queue for project %s" %
800 change.project)
801 return False
James E. Blairee743612012-05-29 14:49:32 -0700802
James E. Blairfee8d652013-06-07 08:57:52 -0700803 def dequeueItem(self, item, keep_severed_heads=True):
804 self.log.debug("Removing change %s from queue" % item.change)
805 item_ahead = item.item_ahead
806 change_queue = self.pipeline.getQueue(item.change.project)
807 change_queue.dequeueItem(item)
808 if (keep_severed_heads and not item_ahead and
809 (item.change.is_reportable and not item.reported)):
810 self.log.debug("Adding %s as a severed head" % item.change)
811 change_queue.addSeveredHead(item)
James E. Blair0e933c52013-07-11 10:18:52 -0700812 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -0800813
814 def removeChange(self, change):
815 # Remove a change from the queue, probably because it has been
816 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -0700817 for item in self.pipeline.getAllItems():
818 if item.change == change:
819 self.log.debug("Canceling builds behind change: %s "
820 "because it is being removed." % item.change)
821 self.cancelJobs(item)
822 self.dequeueItem(item, keep_severed_heads=False)
James E. Blair2fa50962013-01-30 21:50:41 -0800823
James E. Blairfee8d652013-06-07 08:57:52 -0700824 def prepareRef(self, item):
825 # Returns False on success.
826 # Returns True if we were unable to prepare the ref.
827 ref = item.current_build_set.ref
828 if hasattr(item.change, 'refspec') and not ref:
829 self.log.debug("Preparing ref for: %s" % item.change)
830 item.current_build_set.setConfiguration()
831 ref = item.current_build_set.ref
832 dependent_items = self.getDependentItems(item)
833 dependent_items.reverse()
834 all_items = dependent_items + [item]
835 if (dependent_items and
836 not dependent_items[-1].current_build_set.commit):
837 self.pipeline.setUnableToMerge(item)
838 return True
839 commit = self.sched.merger.mergeChanges(all_items, ref)
840 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -0700841 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -0700842 self.log.info("Unable to merge change %s" % item.change)
843 self.pipeline.setUnableToMerge(item)
844 return True
845 return False
846
847 def _launchJobs(self, item, jobs):
848 self.log.debug("Launching jobs for change %s" % item.change)
849 dependent_items = self.getDependentItems(item)
850 for job in jobs:
851 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700852 try:
James E. Blairfee8d652013-06-07 08:57:52 -0700853 build = self.sched.launcher.launch(job, item,
854 self.pipeline,
855 dependent_items)
856 self.building_jobs[build] = item
857 self.log.debug("Adding build %s of job %s to item %s" %
858 (build, job, item))
859 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -0700860 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800861 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -0700862 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700863
James E. Blairfee8d652013-06-07 08:57:52 -0700864 def launchJobs(self, item):
865 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -0700866 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -0700867 self._launchJobs(item, jobs)
868
869 def cancelJobs(self, item, prime=True):
870 self.log.debug("Cancel jobs for change %s" % item.change)
871 canceled = False
872 to_remove = []
873 if prime and item.current_build_set.builds:
874 item.resetAllBuilds()
875 for build, build_item in self.building_jobs.items():
876 if build_item == item:
877 self.log.debug("Found build %s for change %s to cancel" %
878 (build, item.change))
879 try:
880 self.sched.launcher.cancel(build)
881 except:
882 self.log.exception("Exception while canceling build %s "
883 "for change %s" % (build, item.change))
884 to_remove.append(build)
885 canceled = True
886 for build in to_remove:
887 self.log.debug("Removing build %s from running builds" % build)
888 build.result = 'CANCELED'
889 del self.building_jobs[build]
890 if item.item_behind:
891 self.log.debug("Canceling jobs for change %s, behind change %s" %
892 (item.item_behind.change, item.change))
893 if self.cancelJobs(item.item_behind, prime=prime):
894 canceled = True
895 return canceled
896
897 def _processOneItem(self, item):
898 changed = False
899 item_ahead = item.item_ahead
900 item_behind = item.item_behind
901 if self.prepareRef(item):
902 changed = True
903 if self.checkForChangesNeededBy(item.change) is not True:
904 # It's not okay to enqueue this change, we should remove it.
905 self.log.info("Dequeuing change %s because "
906 "it can no longer merge" % item.change)
907 self.cancelJobs(item)
908 self.dequeueItem(item, keep_severed_heads=False)
909 self.pipeline.setDequeuedNeedingChange(item)
910 try:
911 self.reportItem(item)
912 except MergeFailure:
913 pass
914 changed = True
915 return changed
916 if not item_ahead:
917 merge_failed = False
918 if self.pipeline.areAllJobsComplete(item):
919 try:
920 self.reportItem(item)
921 except MergeFailure:
922 merge_failed = True
923 self.dequeueItem(item)
924 changed = True
925 if merge_failed or self.pipeline.didAnyJobFail(item):
926 if item_behind:
927 self.cancelJobs(item_behind)
928 changed = True
929 self.dequeueItem(item)
930 else:
931 if self.pipeline.didAnyJobFail(item):
932 if item_behind:
933 if self.cancelJobs(item_behind, prime=False):
934 changed = True
935 # don't restart yet; this change will eventually become
936 # the head
937 if self.launchJobs(item):
938 changed = True
939 return changed
940
941 def processQueue(self):
942 # Do whatever needs to be done for each change in the queue
943 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
944 changed = False
945 for item in self.pipeline.getAllItems():
946 if self._processOneItem(item):
947 changed = True
948 self.reportStats(item)
949 return changed
James E. Blairdaabed22012-08-15 15:38:57 -0700950
James E. Blair11700c32012-07-05 17:50:05 -0700951 def updateBuildDescriptions(self, build_set):
952 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700953 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700954 self.sched.launcher.setBuildDescription(build, desc)
955
956 if build_set.previous_build_set:
957 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700958 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700959 self.sched.launcher.setBuildDescription(build, desc)
960
961 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700962 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -0700963 # Or triggered externally, or triggered before zuul started,
964 # or restarted
965 return False
966
James E. Blairfee8d652013-06-07 08:57:52 -0700967 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700968 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -0700969 while self.processQueue():
970 pass
James E. Blair11700c32012-07-05 17:50:05 -0700971 return True
972
James E. Blairee743612012-05-29 14:49:32 -0700973 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -0700974 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -0700975 # Or triggered externally, or triggered before zuul started,
976 # or restarted
977 return False
James E. Blairfee8d652013-06-07 08:57:52 -0700978
979 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -0700980 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800981 self.log.debug("Found change %s which triggered completed build %s" %
982 (change, build))
James E. Blairee743612012-05-29 14:49:32 -0700983
984 del self.building_jobs[build]
985
James E. Blair4aea70c2012-07-26 14:23:24 -0700986 self.pipeline.setResult(change, build)
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800987 self.log.info("Change %s status is now:\n %s" %
James E. Blaire0487072012-08-29 17:38:31 -0700988 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -0700989 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -0700990 while self.processQueue():
991 pass
James E. Blairee743612012-05-29 14:49:32 -0700992 return True
993
James E. Blairfee8d652013-06-07 08:57:52 -0700994 def reportItem(self, item):
995 if item.change.is_reportable and item.reported:
996 raise Exception("Already reported change %s" % item.change)
997 ret = self._reportItem(item)
998 if self.changes_merge:
999 succeeded = self.pipeline.didAllJobsSucceed(item)
1000 merged = (not ret)
1001 if merged:
1002 merged = self.sched.trigger.isMerged(item.change,
1003 item.change.branch)
1004 self.log.info("Reported change %s status: all-succeeded: %s, "
1005 "merged: %s" % (item.change, succeeded, merged))
1006 if not (succeeded and merged):
1007 self.log.debug("Reported change %s failed tests or failed "
1008 "to merge" % (item.change))
1009 raise MergeFailure("Change %s failed to merge" % item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001010
James E. Blairfee8d652013-06-07 08:57:52 -07001011 def _reportItem(self, item):
1012 if not item.change.is_reportable:
James E. Blaire0487072012-08-29 17:38:31 -07001013 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001014 if item.change.is_reportable and item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001015 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001016 self.log.debug("Reporting change %s" % item.change)
James E. Blairee743612012-05-29 14:49:32 -07001017 ret = None
James E. Blairfee8d652013-06-07 08:57:52 -07001018 if self.pipeline.didAllJobsSucceed(item):
1019 self.log.debug("success %s %s" % (self.success_action,
1020 self.failure_action))
James E. Blairee743612012-05-29 14:49:32 -07001021 action = self.success_action
James E. Blairfee8d652013-06-07 08:57:52 -07001022 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001023 else:
1024 action = self.failure_action
James E. Blairfee8d652013-06-07 08:57:52 -07001025 item.setReportedResult('FAILURE')
1026 report = self.formatReport(item)
1027 item.reported = True
James E. Blairee743612012-05-29 14:49:32 -07001028 try:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001029 self.log.info("Reporting change %s, action: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001030 (item.change, action))
1031 ret = self.sched.trigger.report(item.change, report, action)
James E. Blairee743612012-05-29 14:49:32 -07001032 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001033 self.log.error("Reporting change %s received: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001034 (item.change, ret))
James E. Blairee743612012-05-29 14:49:32 -07001035 except:
1036 self.log.exception("Exception while reporting:")
James E. Blairfee8d652013-06-07 08:57:52 -07001037 item.setReportedResult('ERROR')
1038 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001039 return ret
1040
James E. Blairfee8d652013-06-07 08:57:52 -07001041 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001042 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001043 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001044 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001045 else:
James E. Blair56370192013-01-14 15:47:28 -08001046 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001047
James E. Blairfee8d652013-06-07 08:57:52 -07001048 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001049 ret += "This change depends on a change that failed to merge."
James E. Blairfee8d652013-06-07 08:57:52 -07001050 elif item.current_build_set.unable_to_merge:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001051 ret += "This change was unable to be automatically merged "\
1052 "with the current state of the repository. Please "\
1053 "rebase your change and upload a new patchset."
1054 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001055 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001056 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001057 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001058 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001059 for job in self.pipeline.getJobs(item.change):
1060 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001061 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001062 pattern = url_pattern
1063 if result == 'SUCCESS':
1064 if job.success_message:
1065 result = job.success_message
1066 if job.success_pattern:
1067 pattern = job.success_pattern
1068 elif result == 'FAILURE':
1069 if job.failure_message:
1070 result = job.failure_message
1071 if job.failure_pattern:
1072 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001073 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001074 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001075 pipeline=self.pipeline,
1076 job=job,
1077 build=build)
1078 else:
1079 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001080 if not job.voting:
1081 voting = ' (non-voting)'
1082 else:
1083 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001084 if self.report_times and build.end_time and build.start_time:
1085 dt = int(build.end_time - build.start_time)
1086 m, s = divmod(dt, 60)
1087 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001088 if h:
1089 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1090 elif m:
1091 elapsed = ' in %dm %02ds' % (m, s)
1092 else:
1093 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001094 else:
1095 elapsed = ''
1096 ret += '- %s : %s%s%s\n' % (url, result, elapsed, voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001097 return ret
1098
1099 def formatDescription(self, build):
1100 concurrent_changes = ''
1101 concurrent_builds = ''
1102 other_builds = ''
1103
1104 for change in build.build_set.other_changes:
1105 concurrent_changes += '<li><a href="{change.url}">\
1106 {change.number},{change.patchset}</a></li>'.format(
1107 change=change)
1108
James E. Blairfee8d652013-06-07 08:57:52 -07001109 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001110
1111 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001112 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001113 concurrent_builds += """\
1114<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001115 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001116 {build.job.name} #{build.number}</a>: {build.result}
1117</li>
1118""".format(build=build)
1119 else:
1120 concurrent_builds += """\
1121<li>
1122 {build.job.name}: {build.result}
1123</li>""".format(build=build)
1124
1125 if build.build_set.previous_build_set:
1126 other_build = build.build_set.previous_build_set.getBuild(
1127 build.job.name)
1128 if other_build:
1129 other_builds += """\
1130<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001131 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001132 {build.job.name} #{build.number}</a>
1133</li>
1134""".format(build=other_build)
1135
1136 if build.build_set.next_build_set:
1137 other_build = build.build_set.next_build_set.getBuild(
1138 build.job.name)
1139 if other_build:
1140 other_builds += """\
1141<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001142 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001143 {build.job.name} #{build.number}</a>
1144</li>
1145""".format(build=other_build)
1146
1147 result = build.build_set.result
1148
1149 if hasattr(change, 'number'):
1150 ret = """\
1151<p>
1152 Triggered by change:
1153 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1154 Branch: <b>{change.branch}</b><br/>
1155 Pipeline: <b>{self.pipeline.name}</b>
1156</p>"""
1157 else:
1158 ret = """\
1159<p>
1160 Triggered by reference:
1161 {change.ref}</a><br/>
1162 Old revision: <b>{change.oldrev}</b><br/>
1163 New revision: <b>{change.newrev}</b><br/>
1164 Pipeline: <b>{self.pipeline.name}</b>
1165</p>"""
1166
1167 if concurrent_changes:
1168 ret += """\
1169<p>
1170 Other changes tested concurrently with this change:
1171 <ul>{concurrent_changes}</ul>
1172</p>
1173"""
1174 if concurrent_builds:
1175 ret += """\
1176<p>
1177 All builds for this change set:
1178 <ul>{concurrent_builds}</ul>
1179</p>
1180"""
1181
1182 if other_builds:
1183 ret += """\
1184<p>
1185 Other build sets for this change:
1186 <ul>{other_builds}</ul>
1187</p>
1188"""
1189 if result:
1190 ret += """\
1191<p>
1192 Reported result: <b>{result}</b>
1193</p>
1194"""
1195
1196 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001197 return ret
1198
James E. Blairfee8d652013-06-07 08:57:52 -07001199 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001200 if not statsd:
1201 return
1202 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001203 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001204 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001205 if item.dequeue_time:
1206 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001207 else:
1208 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001209 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001210
1211 # stats.timers.zuul.pipeline.NAME.resident_time
1212 # stats_counts.zuul.pipeline.NAME.total_changes
1213 # stats.gauges.zuul.pipeline.NAME.current_changes
1214 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001215 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001216 if dt:
1217 statsd.timing(key + '.resident_time', dt)
1218 statsd.incr(key + '.total_changes')
1219
1220 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1221 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001222 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001223 key += '.%s' % project_name
1224 if dt:
1225 statsd.timing(key + '.resident_time', dt)
1226 statsd.incr(key + '.total_changes')
1227 except:
1228 self.log.exception("Exception reporting pipeline stats")
1229
James E. Blair1e8dd892012-05-30 09:15:05 -07001230
James E. Blair4aea70c2012-07-26 14:23:24 -07001231class IndependentPipelineManager(BasePipelineManager):
1232 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001233 changes_merge = False
1234
James E. Blaireff88162013-07-01 12:44:14 -04001235 def _postConfig(self, layout):
1236 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001237
1238 change_queue = ChangeQueue(self.pipeline, dependent=False)
1239 for project in self.pipeline.getProjects():
1240 change_queue.addProject(project)
1241
1242 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001243
James E. Blair1e8dd892012-05-30 09:15:05 -07001244
James E. Blair4aea70c2012-07-26 14:23:24 -07001245class DependentPipelineManager(BasePipelineManager):
1246 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001247 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001248
1249 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001250 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001251
James E. Blaireff88162013-07-01 12:44:14 -04001252 def _postConfig(self, layout):
1253 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001254 self.buildChangeQueues()
1255
1256 def buildChangeQueues(self):
1257 self.log.debug("Building shared change queues")
1258 change_queues = []
1259
James E. Blair4aea70c2012-07-26 14:23:24 -07001260 for project in self.pipeline.getProjects():
1261 change_queue = ChangeQueue(self.pipeline)
1262 change_queue.addProject(project)
1263 change_queues.append(change_queue)
1264 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001265
1266 self.log.debug("Combining shared queues")
1267 new_change_queues = []
1268 for a in change_queues:
1269 merged_a = False
1270 for b in new_change_queues:
1271 if not a.getJobs().isdisjoint(b.getJobs()):
1272 self.log.debug("Merging queue %s into %s" % (a, b))
1273 b.mergeChangeQueue(a)
1274 merged_a = True
1275 break # this breaks out of 'for b' and continues 'for a'
1276 if not merged_a:
1277 self.log.debug("Keeping queue %s" % (a))
1278 new_change_queues.append(a)
James E. Blair1e8dd892012-05-30 09:15:05 -07001279
James E. Blairee743612012-05-29 14:49:32 -07001280 self.log.info(" Shared change queues:")
James E. Blaire0487072012-08-29 17:38:31 -07001281 for queue in new_change_queues:
1282 self.pipeline.addQueue(queue)
1283 self.log.info(" %s" % queue)
James E. Blairee743612012-05-29 14:49:32 -07001284
James E. Blaire0487072012-08-29 17:38:31 -07001285 def isChangeReadyToBeEnqueued(self, change):
1286 if not self.sched.trigger.canMerge(change,
1287 self.getSubmitAllowNeeds()):
1288 self.log.debug("Change %s can not merge, ignoring" % change)
1289 return False
1290 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001291
James E. Blaire0487072012-08-29 17:38:31 -07001292 def enqueueChangesBehind(self, change):
1293 to_enqueue = []
1294 self.log.debug("Checking for changes needing %s:" % change)
1295 if not hasattr(change, 'needed_by_changes'):
1296 self.log.debug(" Changeish does not support dependencies")
1297 return
1298 for needs in change.needed_by_changes:
1299 if self.sched.trigger.canMerge(needs,
1300 self.getSubmitAllowNeeds()):
1301 self.log.debug(" Change %s needs %s and is ready to merge" %
1302 (needs, change))
1303 to_enqueue.append(needs)
1304 if not to_enqueue:
1305 self.log.debug(" No changes need %s" % change)
1306
1307 for other_change in to_enqueue:
1308 self.addChange(other_change)
1309
1310 def enqueueChangesAhead(self, change):
1311 ret = self.checkForChangesNeededBy(change)
1312 if ret in [True, False]:
1313 return ret
1314 self.log.debug(" Change %s must be merged ahead of %s" %
1315 (ret, change))
1316 return self.addChange(ret)
1317
1318 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001319 self.log.debug("Checking for changes needed by %s:" % change)
1320 # Return true if okay to proceed enqueing this change,
1321 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001322 if not hasattr(change, 'needs_change'):
1323 self.log.debug(" Changeish does not support dependencies")
1324 return True
James E. Blaire421a232012-07-25 16:59:21 -07001325 if not change.needs_change:
1326 self.log.debug(" No changes needed")
1327 return True
1328 if change.needs_change.is_merged:
1329 self.log.debug(" Needed change is merged")
1330 return True
1331 if not change.needs_change.is_current_patchset:
1332 self.log.debug(" Needed change is not the current patchset")
1333 return False
James E. Blair127bc182012-08-28 15:55:15 -07001334 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001335 self.log.debug(" Needed change is already ahead in the queue")
1336 return True
James E. Blaire0487072012-08-29 17:38:31 -07001337 if self.sched.trigger.canMerge(change.needs_change,
1338 self.getSubmitAllowNeeds()):
1339 self.log.debug(" Change %s is needed" %
1340 change.needs_change)
1341 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001342 # The needed change can't be merged.
1343 self.log.debug(" Change %s is needed but can not be merged" %
1344 change.needs_change)
1345 return False