blob: 38fd9fb49e1fa477926bb255854d19c89b160879 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
James E. Blairee743612012-05-29 14:49:32 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
James E. Blair71e94122012-12-24 17:53:08 -080016import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080017import json
James E. Blairee743612012-05-29 14:49:32 -070018import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080019import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070020import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import Queue
22import re
23import threading
James E. Blair71e94122012-12-24 17:53:08 -080024import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080025import yaml
James E. Blairee743612012-05-29 14:49:32 -070026
James E. Blair47958382013-01-10 17:26:02 -080027import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070028import model
James E. Blair4aea70c2012-07-26 14:23:24 -070029from model import Pipeline, Job, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070030import merger
James E. Blairee743612012-05-29 14:49:32 -070031
James E. Blair71e94122012-12-24 17:53:08 -080032statsd = extras.try_import('statsd.statsd')
33
James E. Blair1e8dd892012-05-30 09:15:05 -070034
James E. Blaire9d45c32012-05-31 09:56:45 -070035class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -070036 log = logging.getLogger("zuul.Scheduler")
37
James E. Blaire9d45c32012-05-31 09:56:45 -070038 def __init__(self):
39 threading.Thread.__init__(self)
James E. Blairee743612012-05-29 14:49:32 -070040 self.wake_event = threading.Event()
James E. Blaire9d45c32012-05-31 09:56:45 -070041 self.reconfigure_complete_event = threading.Event()
James E. Blair5d5bc2b2012-07-06 10:24:01 -070042 self._pause = False
43 self._reconfigure = False
44 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -070045 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -070046 self.launcher = None
47 self.trigger = None
48
49 self.trigger_event_queue = Queue.Queue()
50 self.result_event_queue = Queue.Queue()
James E. Blaire9d45c32012-05-31 09:56:45 -070051 self._init()
James E. Blairee743612012-05-29 14:49:32 -070052
James E. Blaire9d45c32012-05-31 09:56:45 -070053 def _init(self):
James E. Blair4aea70c2012-07-26 14:23:24 -070054 self.pipelines = {}
James E. Blaire9d45c32012-05-31 09:56:45 -070055 self.jobs = {}
56 self.projects = {}
James E. Blairb0954652012-06-01 11:32:01 -070057 self.metajobs = {}
James E. Blairee743612012-05-29 14:49:32 -070058
James E. Blairb0fcae42012-07-17 11:12:10 -070059 def stop(self):
60 self._stopped = True
61 self.wake_event.set()
62
James E. Blair47958382013-01-10 17:26:02 -080063 def testConfig(self, config_path):
64 self._init()
65 self._parseConfig(config_path)
66
James E. Blaire5a847f2012-07-10 15:29:14 -070067 def _parseConfig(self, config_path):
James E. Blairee743612012-05-29 14:49:32 -070068 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -070069 if not item:
70 return []
James E. Blair32663402012-06-01 10:04:18 -070071 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -070072 return item
73 return [item]
74
James E. Blaire5a847f2012-07-10 15:29:14 -070075 if config_path:
76 config_path = os.path.expanduser(config_path)
77 if not os.path.exists(config_path):
78 raise Exception("Unable to read layout config file at %s" %
79 config_path)
80 config_file = open(config_path)
81 data = yaml.load(config_file)
82
James E. Blair47958382013-01-10 17:26:02 -080083 validator = layoutvalidator.LayoutValidator()
84 validator.validate(data)
85
James E. Blaire5a847f2012-07-10 15:29:14 -070086 self._config_env = {}
87 for include in data.get('includes', []):
88 if 'python-file' in include:
89 fn = include['python-file']
90 if not os.path.isabs(fn):
91 base = os.path.dirname(config_path)
92 fn = os.path.join(base, fn)
93 fn = os.path.expanduser(fn)
94 execfile(fn, self._config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -070095
James E. Blair4aea70c2012-07-26 14:23:24 -070096 for conf_pipeline in data.get('pipelines', []):
97 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -080098 pipeline.description = conf_pipeline.get('description')
James E. Blair56370192013-01-14 15:47:28 -080099 pipeline.failure_message = conf_pipeline.get('failure-message',
100 "Build failed.")
101 pipeline.success_message = conf_pipeline.get('success-message',
102 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800103 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
104 'dequeue-on-new-patchset',
105 True)
James E. Blair4aea70c2012-07-26 14:23:24 -0700106 manager = globals()[conf_pipeline['manager']](self, pipeline)
107 pipeline.setManager(manager)
108
109 self.pipelines[conf_pipeline['name']] = pipeline
110 manager.success_action = conf_pipeline.get('success')
111 manager.failure_action = conf_pipeline.get('failure')
112 manager.start_action = conf_pipeline.get('start')
113 for trigger in toList(conf_pipeline['trigger']):
James E. Blairee743612012-05-29 14:49:32 -0700114 approvals = {}
115 for approval_dict in toList(trigger.get('approval')):
116 for k, v in approval_dict.items():
James E. Blair1e8dd892012-05-30 09:15:05 -0700117 approvals[k] = v
James E. Blairee743612012-05-29 14:49:32 -0700118 f = EventFilter(types=toList(trigger['event']),
119 branches=toList(trigger.get('branch')),
120 refs=toList(trigger.get('ref')),
Clark Boylanb9bcb402012-06-29 17:44:05 -0700121 approvals=approvals,
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800122 comment_filters=
Antoine Mussob4e809e2012-12-06 16:58:06 +0100123 toList(trigger.get('comment_filter')),
124 email_filters=
125 toList(trigger.get('email_filter')))
James E. Blairee743612012-05-29 14:49:32 -0700126 manager.event_filters.append(f)
127
James E. Blair47958382013-01-10 17:26:02 -0800128 for config_job in data.get('jobs', []):
James E. Blairee743612012-05-29 14:49:32 -0700129 job = self.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700130 # Be careful to only set attributes explicitly present on
131 # this job, to avoid squashing attributes set by a meta-job.
132 m = config_job.get('failure-message', None)
133 if m:
134 job.failure_message = m
135 m = config_job.get('success-message', None)
136 if m:
137 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800138 m = config_job.get('failure-pattern', None)
139 if m:
140 job.failure_pattern = m
141 m = config_job.get('success-pattern', None)
142 if m:
143 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700144 m = config_job.get('hold-following-changes', False)
145 if m:
146 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700147 m = config_job.get('voting', None)
148 if m is not None:
149 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700150 fname = config_job.get('parameter-function', None)
151 if fname:
152 func = self._config_env.get(fname, None)
153 if not func:
154 raise Exception("Unable to find function %s" % fname)
155 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700156 branches = toList(config_job.get('branch'))
157 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700158 job._branches = branches
159 job.branches = [re.compile(x) for x in branches]
James E. Blairee743612012-05-29 14:49:32 -0700160
161 def add_jobs(job_tree, config_jobs):
162 for job in config_jobs:
163 if isinstance(job, list):
164 for x in job:
165 add_jobs(job_tree, x)
166 if isinstance(job, dict):
167 for parent, children in job.items():
168 parent_tree = job_tree.addJob(self.getJob(parent))
169 add_jobs(parent_tree, children)
170 if isinstance(job, str):
171 job_tree.addJob(self.getJob(job))
172
James E. Blair47958382013-01-10 17:26:02 -0800173 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700174 project = Project(config_project['name'])
175 self.projects[config_project['name']] = project
James E. Blair4886cc12012-07-18 15:39:41 -0700176 mode = config_project.get('merge-mode')
177 if mode and mode == 'cherry-pick':
178 project.merge_mode = model.CHERRY_PICK
James E. Blair4aea70c2012-07-26 14:23:24 -0700179 for pipeline in self.pipelines.values():
180 if pipeline.name in config_project:
181 job_tree = pipeline.addProject(project)
182 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700183 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700184
James E. Blairb0954652012-06-01 11:32:01 -0700185 # All jobs should be defined at this point, get rid of
186 # metajobs so that getJob isn't doing anything weird.
187 self.metajobs = {}
188
James E. Blair4aea70c2012-07-26 14:23:24 -0700189 for pipeline in self.pipelines.values():
190 pipeline.manager._postConfig()
James E. Blairee743612012-05-29 14:49:32 -0700191
James E. Blair47958382013-01-10 17:26:02 -0800192 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700193 if self.config.has_option('zuul', 'git_dir'):
194 merge_root = self.config.get('zuul', 'git_dir')
195 else:
196 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800197
James E. Blairceabcbc2012-08-17 13:48:46 -0700198 if self.config.has_option('zuul', 'push_change_refs'):
199 push_refs = self.config.getboolean('zuul', 'push_change_refs')
200 else:
201 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800202
James E. Blairad615012012-11-30 16:14:21 -0800203 if self.config.has_option('gerrit', 'sshkey'):
204 sshkey = self.config.get('gerrit', 'sshkey')
205 else:
206 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800207
James E. Blairad615012012-11-30 16:14:21 -0800208 self.merger = merger.Merger(self.trigger, merge_root, push_refs,
209 sshkey)
James E. Blair4886cc12012-07-18 15:39:41 -0700210 for project in self.projects.values():
211 url = self.trigger.getGitUrl(project)
212 self.merger.addProject(project, url)
213
James E. Blairee743612012-05-29 14:49:32 -0700214 def getJob(self, name):
James E. Blair1e8dd892012-05-30 09:15:05 -0700215 if name in self.jobs:
James E. Blairee743612012-05-29 14:49:32 -0700216 return self.jobs[name]
217 job = Job(name)
James E. Blairb0954652012-06-01 11:32:01 -0700218 if name.startswith('^'):
219 # This is a meta-job
220 regex = re.compile(name)
221 self.metajobs[regex] = job
222 else:
223 # Apply attributes from matching meta-jobs
224 for regex, metajob in self.metajobs.items():
225 if regex.match(name):
226 job.copy(metajob)
227 self.jobs[name] = job
James E. Blairee743612012-05-29 14:49:32 -0700228 return job
229
230 def setLauncher(self, launcher):
231 self.launcher = launcher
232
233 def setTrigger(self, trigger):
234 self.trigger = trigger
235
236 def addEvent(self, event):
237 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800238 try:
239 if statsd:
240 statsd.incr('gerrit.event.%s' % event.type)
241 except:
242 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700243 self.trigger_event_queue.put(event)
244 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800245 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700246
James E. Blair11700c32012-07-05 17:50:05 -0700247 def onBuildStarted(self, build):
248 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800249 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700250 self.result_event_queue.put(('started', build))
251 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800252 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700253
James E. Blairee743612012-05-29 14:49:32 -0700254 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700255 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800256 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800257 try:
258 if statsd:
259 key = 'zuul.job.%s' % build.job.name
260 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
261 dt = int((build.end_time - build.start_time) * 1000)
262 statsd.timing(key, dt)
263 statsd.incr(key)
264 except:
265 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700266 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700267 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800268 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700269
James E. Blaire9d45c32012-05-31 09:56:45 -0700270 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700271 self.log.debug("Prepare to reconfigure")
James E. Blaire9d45c32012-05-31 09:56:45 -0700272 self.config = config
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700273 self._pause = True
274 self._reconfigure = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700275 self.wake_event.set()
276 self.log.debug("Waiting for reconfiguration")
277 self.reconfigure_complete_event.wait()
278 self.reconfigure_complete_event.clear()
279 self.log.debug("Reconfiguration complete")
280
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700281 def exit(self):
282 self.log.debug("Prepare to exit")
283 self._pause = True
284 self._exit = True
285 self.wake_event.set()
286 self.log.debug("Waiting for exit")
287
288 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700289 if self.config.has_option('zuul', 'state_dir'):
290 state_dir = os.path.expanduser(self.config.get('zuul',
291 'state_dir'))
292 else:
293 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700294 return os.path.join(state_dir, 'queue.pickle')
295
296 def _save_queue(self):
297 pickle_file = self._get_queue_pickle_file()
298 events = []
299 while not self.trigger_event_queue.empty():
300 events.append(self.trigger_event_queue.get())
301 self.log.debug("Queue length is %s" % len(events))
302 if events:
303 self.log.debug("Saving queue")
304 pickle.dump(events, open(pickle_file, 'wb'))
305
306 def _load_queue(self):
307 pickle_file = self._get_queue_pickle_file()
308 if os.path.exists(pickle_file):
309 self.log.debug("Loading queue")
310 events = pickle.load(open(pickle_file, 'rb'))
311 self.log.debug("Queue length is %s" % len(events))
312 for event in events:
313 self.trigger_event_queue.put(event)
314 else:
315 self.log.debug("No queue file found")
316
317 def _delete_queue(self):
318 pickle_file = self._get_queue_pickle_file()
319 if os.path.exists(pickle_file):
320 self.log.debug("Deleting saved queue")
321 os.unlink(pickle_file)
322
323 def resume(self):
324 try:
325 self._load_queue()
326 except:
327 self.log.exception("Unable to load queue")
328 try:
329 self._delete_queue()
330 except:
331 self.log.exception("Unable to delete saved queue")
332 self.log.debug("Resuming queue processing")
333 self.wake_event.set()
334
335 def _doPauseEvent(self):
336 if self._exit:
337 self.log.debug("Exiting")
338 self._save_queue()
339 os._exit(0)
340 if self._reconfigure:
341 self.log.debug("Performing reconfiguration")
342 self._init()
343 self._parseConfig(self.config.get('zuul', 'layout_config'))
James E. Blair47958382013-01-10 17:26:02 -0800344 self._setupMerger()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700345 self._pause = False
James E. Blaire0487072012-08-29 17:38:31 -0700346 self._reconfigure = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700347 self.reconfigure_complete_event.set()
James E. Blaire9d45c32012-05-31 09:56:45 -0700348
349 def _areAllBuildsComplete(self):
350 self.log.debug("Checking if all builds are complete")
351 waiting = False
James E. Blair4aea70c2012-07-26 14:23:24 -0700352 for pipeline in self.pipelines.values():
353 for build in pipeline.manager.building_jobs.keys():
354 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700355 waiting = True
356 if not waiting:
357 self.log.debug("All builds are complete")
358 return True
359 self.log.debug("All builds are not complete")
360 return False
361
James E. Blairee743612012-05-29 14:49:32 -0700362 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800363 if statsd:
364 self.log.debug("Statsd enabled")
365 else:
366 self.log.debug("Statsd disabled because python statsd "
367 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700368 while True:
369 self.log.debug("Run handler sleeping")
370 self.wake_event.wait()
371 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700372 if self._stopped:
373 return
James E. Blairee743612012-05-29 14:49:32 -0700374 self.log.debug("Run handler awake")
375 try:
James E. Blair263fba92013-02-27 13:07:19 -0800376 # Give result events priority -- they let us stop builds,
377 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700378 if not self.result_event_queue.empty():
379 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800380 elif not self._pause:
381 if not self.trigger_event_queue.empty():
382 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700383
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700384 if self._pause and self._areAllBuildsComplete():
385 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700386
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700387 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700388 if not (self.trigger_event_queue.empty() and
389 self.result_event_queue.empty()):
390 self.wake_event.set()
391 else:
392 if not self.result_event_queue.empty():
393 self.wake_event.set()
James E. Blairee743612012-05-29 14:49:32 -0700394 except:
395 self.log.exception("Exception in run handler:")
396
397 def process_event_queue(self):
398 self.log.debug("Fetching trigger event")
399 event = self.trigger_event_queue.get()
400 self.log.debug("Processing trigger event %s" % event)
401 project = self.projects.get(event.project_name)
402 if not project:
403 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800404 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700405 return
406
Antoine Mussofeba9672013-01-17 13:44:59 +0100407 # Preprocessing for ref-update events
408 if hasattr(event, 'refspec'):
409 # Make sure the local git repo is up-to-date with the remote one.
410 # We better have the new ref before enqueuing the changes.
411 # This is done before enqueuing the changes to avoid calling an
412 # update per pipeline accepting the change.
413 self.log.info("Fetching references for %s" % project)
414 self.merger.updateRepo(project)
415
James E. Blair4aea70c2012-07-26 14:23:24 -0700416 for pipeline in self.pipelines.values():
James E. Blair2fa50962013-01-30 21:50:41 -0800417 change = event.getChange(project, self.trigger)
418 if event.type == 'patchset-created':
419 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blair4aea70c2012-07-26 14:23:24 -0700420 if not pipeline.manager.eventMatches(event):
421 self.log.debug("Event %s ignored by %s" % (event, pipeline))
James E. Blairee743612012-05-29 14:49:32 -0700422 continue
James E. Blaire421a232012-07-25 16:59:21 -0700423 self.log.info("Adding %s, %s to %s" %
James E. Blair4aea70c2012-07-26 14:23:24 -0700424 (project, change, pipeline))
425 pipeline.manager.addChange(change)
James E. Blairff791972013-01-09 11:45:43 -0800426 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700427
James E. Blairee743612012-05-29 14:49:32 -0700428 def process_result_queue(self):
429 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700430 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700431 self.log.debug("Processing result event %s" % build)
James E. Blair4aea70c2012-07-26 14:23:24 -0700432 for pipeline in self.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700433 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700434 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800435 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700436 return
437 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700438 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800439 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700440 return
James E. Blairc84dd262012-05-31 10:03:13 -0700441 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800442 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700443
James E. Blair268d9342012-06-13 18:24:29 -0700444 def formatStatusHTML(self):
445 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700446 if self._pause:
447 ret += '<p><b>Queue only mode:</b> preparing to '
448 if self._reconfigure:
449 ret += 'reconfigure'
450 if self._exit:
451 ret += 'exit'
452 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
453 ret += '</p>'
454
James E. Blair4aea70c2012-07-26 14:23:24 -0700455 keys = self.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700456 keys.sort()
457 for key in keys:
James E. Blair4aea70c2012-07-26 14:23:24 -0700458 pipeline = self.pipelines[key]
459 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700460 ret += s + '\n'
461 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700462 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700463 ret += '\n'
464 ret += '</pre></html>'
465 return ret
466
James E. Blair8dbd56a2012-12-22 10:55:10 -0800467 def formatStatusJSON(self):
468 data = {}
469 if self._pause:
470 ret = '<p><b>Queue only mode:</b> preparing to '
471 if self._reconfigure:
472 ret += 'reconfigure'
473 if self._exit:
474 ret += 'exit'
475 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
476 ret += '</p>'
477 data['message'] = ret
478
James E. Blairfb682cc2013-02-26 15:23:27 -0800479 data['trigger_event_queue'] = {}
480 data['trigger_event_queue']['length'] = \
481 self.trigger_event_queue.qsize()
482 data['result_event_queue'] = {}
483 data['result_event_queue']['length'] = \
484 self.result_event_queue.qsize()
485
James E. Blair8dbd56a2012-12-22 10:55:10 -0800486 pipelines = []
487 data['pipelines'] = pipelines
488 keys = self.pipelines.keys()
489 keys.sort()
490 for key in keys:
491 pipeline = self.pipelines[key]
492 pipelines.append(pipeline.formatStatusJSON())
493 return json.dumps(data)
494
James E. Blair1e8dd892012-05-30 09:15:05 -0700495
James E. Blair4aea70c2012-07-26 14:23:24 -0700496class BasePipelineManager(object):
497 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700498
James E. Blair4aea70c2012-07-26 14:23:24 -0700499 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700500 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700501 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700502 self.building_jobs = {}
503 self.event_filters = []
504 self.success_action = {}
505 self.failure_action = {}
James E. Blairdc253862012-06-13 17:12:42 -0700506 self.start_action = {}
James E. Blairee743612012-05-29 14:49:32 -0700507
508 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700509 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700510
511 def _postConfig(self):
James E. Blair4aea70c2012-07-26 14:23:24 -0700512 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700513 self.log.info(" Events:")
514 for e in self.event_filters:
515 self.log.info(" %s" % e)
516 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700517
James E. Blairee743612012-05-29 14:49:32 -0700518 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700519 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700520 if tree.job:
521 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700522 for b in tree.job._branches:
523 efilters += str(b)
James E. Blairee743612012-05-29 14:49:32 -0700524 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700525 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700526 hold = ''
527 if tree.job.hold_following_changes:
528 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700529 voting = ''
530 if not tree.job.voting:
531 voting = ' [nonvoting]'
532 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
533 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700534 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700535 log_jobs(x, indent + 2)
536
James E. Blairee743612012-05-29 14:49:32 -0700537 for p in self.sched.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700538 tree = self.pipeline.getJobTree(p)
539 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700540 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700541 log_jobs(tree)
James E. Blairdc253862012-06-13 17:12:42 -0700542 if self.start_action:
543 self.log.info(" On start:")
544 self.log.info(" %s" % self.start_action)
James E. Blairee743612012-05-29 14:49:32 -0700545 if self.success_action:
546 self.log.info(" On success:")
547 self.log.info(" %s" % self.success_action)
548 if self.failure_action:
549 self.log.info(" On failure:")
550 self.log.info(" %s" % self.failure_action)
551
James E. Blaire421a232012-07-25 16:59:21 -0700552 def getSubmitAllowNeeds(self):
553 # Get a list of code review labels that are allowed to be
554 # "needed" in the submit records for a change, with respect
555 # to this queue. In other words, the list of review labels
556 # this queue itself is likely to set before submitting.
James E. Blair4aea70c2012-07-26 14:23:24 -0700557 if self.success_action:
558 return self.success_action.keys()
559 else:
560 return {}
James E. Blaire421a232012-07-25 16:59:21 -0700561
James E. Blairee743612012-05-29 14:49:32 -0700562 def eventMatches(self, event):
563 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700564 if ef.matches(event):
565 return True
566 return False
567
James E. Blair0dc8ba92012-07-16 14:23:52 -0700568 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700569 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700570 if change.equals(c):
571 return True
572 return False
573
James E. Blaire0487072012-08-29 17:38:31 -0700574 def reportStart(self, change):
575 try:
576 self.log.info("Reporting start, action %s change %s" %
577 (self.start_action, change))
578 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700579 if self.sched.config.has_option('zuul', 'status_url'):
580 msg += "\n" + self.sched.config.get('zuul', 'status_url')
James E. Blaire0487072012-08-29 17:38:31 -0700581 ret = self.sched.trigger.report(change, msg, self.start_action)
582 if ret:
583 self.log.error("Reporting change start %s received: %s" %
584 (change, ret))
585 except:
586 self.log.exception("Exception while reporting start:")
587
588 def isChangeReadyToBeEnqueued(self, change):
589 return True
590
591 def enqueueChangesAhead(self, change):
592 return True
593
594 def enqueueChangesBehind(self, change):
595 return True
596
James E. Blair2fa50962013-01-30 21:50:41 -0800597 def findOldVersionOfChangeAlreadyInQueue(self, change):
598 for c in self.pipeline.getChangesInQueue():
599 if change.isUpdateOf(c):
600 return c
601 return None
602
603 def removeOldVersionsOfChange(self, change):
604 if not self.pipeline.dequeue_on_new_patchset:
605 return
606 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
607 if old_change:
608 self.log.debug("Change %s is a new version of %s, removing %s" %
609 (change, old_change, old_change))
610 self.removeChange(old_change)
611 self.launchJobs()
612
James E. Blairee743612012-05-29 14:49:32 -0700613 def addChange(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700614 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700615 if self.isChangeAlreadyInQueue(change):
616 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700617 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700618
James E. Blaire0487072012-08-29 17:38:31 -0700619 if not self.isChangeReadyToBeEnqueued(change):
620 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
621 change)
622 return False
623
624 if not self.enqueueChangesAhead(change):
625 self.log.debug("Failed to enqueue changes ahead of" % change)
626 return False
627
628 if self.isChangeAlreadyInQueue(change):
629 self.log.debug("Change %s is already in queue, ignoring" % change)
630 return True
631
632 change_queue = self.pipeline.getQueue(change.project)
633 if change_queue:
634 self.log.debug("Adding change %s to queue %s" %
635 (change, change_queue))
636 if self.start_action:
637 self.reportStart(change)
638 change_queue.enqueueChange(change)
James E. Blair8fa16972013-01-15 16:57:20 -0800639 self.reportStats(change)
James E. Blaire0487072012-08-29 17:38:31 -0700640 self.enqueueChangesBehind(change)
641 else:
642 self.log.error("Unable to find change queue for project %s" %
643 change.project)
644 return False
645 self.launchJobs()
James E. Blairee743612012-05-29 14:49:32 -0700646
James E. Blair2fa50962013-01-30 21:50:41 -0800647 def cancelJobs(self, change, prime=True):
648 self.log.debug("Cancel jobs for change %s" % change)
649 to_remove = []
650 for build, build_change in self.building_jobs.items():
651 if build_change == change:
652 self.log.debug("Found build %s for change %s to cancel" %
653 (build, change))
654 try:
655 self.sched.launcher.cancel(build)
656 except:
657 self.log.exception("Exception while canceling build %s "
658 "for change %s" % (build, change))
659 to_remove.append(build)
660 for build in to_remove:
661 self.log.debug("Removing build %s from running builds" % build)
662 build.result = 'CANCELED'
663 del self.building_jobs[build]
664
665 def dequeueChange(self, change):
666 self.log.debug("Removing change %s from queue" % change)
667 change_queue = self.pipeline.getQueue(change.project)
668 change_queue.dequeueChange(change)
669
670 def removeChange(self, change):
671 # Remove a change from the queue, probably because it has been
672 # superceded by another change.
673 self.log.debug("Canceling builds behind change: %s because it is "
674 "being removed." % change)
675 self.cancelJobs(change)
676 self.dequeueChange(change)
677
James E. Blairdaabed22012-08-15 15:38:57 -0700678 def _launchJobs(self, change, jobs):
James E. Blairee743612012-05-29 14:49:32 -0700679 self.log.debug("Launching jobs for change %s" % change)
James E. Blair81515ad2012-10-01 18:29:08 -0700680 ref = change.current_build_set.ref
James E. Blairdaabed22012-08-15 15:38:57 -0700681 if hasattr(change, 'refspec') and not ref:
James E. Blair4886cc12012-07-18 15:39:41 -0700682 change.current_build_set.setConfiguration()
James E. Blair81515ad2012-10-01 18:29:08 -0700683 ref = change.current_build_set.ref
Zhongyue Luoaa85ebf2012-09-21 16:38:33 +0800684 mode = model.MERGE_IF_NECESSARY
James E. Blair81515ad2012-10-01 18:29:08 -0700685 commit = self.sched.merger.mergeChanges([change], ref, mode=mode)
686 if not commit:
James E. Blair973721f2012-08-15 10:19:43 -0700687 self.log.info("Unable to merge change %s" % change)
688 self.pipeline.setUnableToMerge(change)
689 self.possiblyReportChange(change)
690 return
James E. Blair81515ad2012-10-01 18:29:08 -0700691 change.current_build_set.commit = commit
James E. Blair4aea70c2012-07-26 14:23:24 -0700692 for job in self.pipeline.findJobsToRun(change):
James E. Blairee743612012-05-29 14:49:32 -0700693 self.log.debug("Found job %s for change %s" % (job, change))
694 try:
James E. Blair03b94ef2012-08-20 10:54:29 -0700695 build = self.sched.launcher.launch(job, change, self.pipeline)
James E. Blairee743612012-05-29 14:49:32 -0700696 self.building_jobs[build] = change
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800697 self.log.debug("Adding build %s of job %s to change %s" %
698 (build, job, change))
James E. Blairee743612012-05-29 14:49:32 -0700699 change.addBuild(build)
700 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800701 self.log.exception("Exception while launching job %s "
702 "for change %s:" % (job, change))
James E. Blairee743612012-05-29 14:49:32 -0700703
James E. Blaire0487072012-08-29 17:38:31 -0700704 def launchJobs(self, change=None):
705 if not change:
706 for change in self.pipeline.getAllChanges():
707 self.launchJobs(change)
708 return
James E. Blairdaabed22012-08-15 15:38:57 -0700709 jobs = self.pipeline.findJobsToRun(change)
710 if jobs:
711 self._launchJobs(change, jobs)
712
James E. Blair11700c32012-07-05 17:50:05 -0700713 def updateBuildDescriptions(self, build_set):
714 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700715 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700716 self.sched.launcher.setBuildDescription(build, desc)
717
718 if build_set.previous_build_set:
719 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -0700720 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -0700721 self.sched.launcher.setBuildDescription(build, desc)
722
723 def onBuildStarted(self, build):
724 self.log.debug("Build %s started" % build)
725 if build not in self.building_jobs:
726 self.log.debug("Build %s not found" % (build))
727 # Or triggered externally, or triggered before zuul started,
728 # or restarted
729 return False
730
731 self.updateBuildDescriptions(build.build_set)
732 return True
733
James E. Blaire0487072012-08-29 17:38:31 -0700734 def handleFailedChange(self, change):
735 pass
736
James E. Blairee743612012-05-29 14:49:32 -0700737 def onBuildCompleted(self, build):
738 self.log.debug("Build %s completed" % build)
James E. Blair1e8dd892012-05-30 09:15:05 -0700739 if build not in self.building_jobs:
James E. Blairc84dd262012-05-31 10:03:13 -0700740 self.log.debug("Build %s not found" % (build))
James E. Blairee743612012-05-29 14:49:32 -0700741 # Or triggered externally, or triggered before zuul started,
742 # or restarted
743 return False
744 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800745 self.log.debug("Found change %s which triggered completed build %s" %
746 (change, build))
James E. Blairee743612012-05-29 14:49:32 -0700747
748 del self.building_jobs[build]
749
James E. Blair4aea70c2012-07-26 14:23:24 -0700750 self.pipeline.setResult(change, build)
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800751 self.log.info("Change %s status is now:\n %s" %
James E. Blaire0487072012-08-29 17:38:31 -0700752 (change, self.pipeline.formatStatus(change)))
James E. Blairee743612012-05-29 14:49:32 -0700753
James E. Blaire0487072012-08-29 17:38:31 -0700754 if self.pipeline.didAnyJobFail(change):
755 self.handleFailedChange(change)
James E. Blair11700c32012-07-05 17:50:05 -0700756
James E. Blaire0487072012-08-29 17:38:31 -0700757 self.reportChanges()
758 self.launchJobs()
James E. Blair11700c32012-07-05 17:50:05 -0700759 self.updateBuildDescriptions(build.build_set)
James E. Blairee743612012-05-29 14:49:32 -0700760 return True
761
James E. Blaire0487072012-08-29 17:38:31 -0700762 def reportChanges(self):
763 self.log.debug("Searching for changes to report")
764 reported = False
765 for change in self.pipeline.getAllChanges():
766 self.log.debug(" checking %s" % change)
767 if self.pipeline.areAllJobsComplete(change):
768 self.log.debug(" possibly reporting %s" % change)
769 if self.possiblyReportChange(change):
770 reported = True
771 if reported:
772 self.reportChanges()
773 self.log.debug("Done searching for changes to report")
774
James E. Blairee743612012-05-29 14:49:32 -0700775 def possiblyReportChange(self, change):
776 self.log.debug("Possibly reporting change %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700777 # Even if a change isn't reportable, keep going so that it
778 # gets dequeued in the normal manner.
779 if change.is_reportable and change.reported:
780 self.log.debug("Change %s already reported" % change)
781 return False
782 change_ahead = change.change_ahead
783 if not change_ahead:
784 self.log.debug("Change %s is at the front of the queue, "
785 "reporting" % (change))
786 ret = self.reportChange(change)
787 if self.changes_merge:
788 succeeded = self.pipeline.didAllJobsSucceed(change)
789 merged = (not ret)
790 if merged:
791 merged = self.sched.trigger.isMerged(change, change.branch)
792 self.log.info("Reported change %s status: all-succeeded: %s, "
793 "merged: %s" % (change, succeeded, merged))
794 if not (succeeded and merged):
795 self.log.debug("Reported change %s failed tests or failed "
796 "to merge" % (change))
797 self.handleFailedChange(change)
798 return True
799 self.log.debug("Removing reported change %s from queue" %
800 change)
801 change_queue = self.pipeline.getQueue(change.project)
802 change_queue.dequeueChange(change)
James E. Blair8fa16972013-01-15 16:57:20 -0800803 self.reportStats(change)
James E. Blaire0487072012-08-29 17:38:31 -0700804 return True
James E. Blairee743612012-05-29 14:49:32 -0700805
806 def reportChange(self, change):
James E. Blair4aea70c2012-07-26 14:23:24 -0700807 if not change.is_reportable:
808 return False
James E. Blair6173fb32012-07-11 17:23:33 -0700809 if change.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -0700810 return 0
James E. Blair4aea70c2012-07-26 14:23:24 -0700811 self.log.debug("Reporting change %s" % change)
James E. Blairee743612012-05-29 14:49:32 -0700812 ret = None
James E. Blair4aea70c2012-07-26 14:23:24 -0700813 if self.pipeline.didAllJobsSucceed(change):
James E. Blairee743612012-05-29 14:49:32 -0700814 action = self.success_action
James E. Blair11700c32012-07-05 17:50:05 -0700815 change.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -0700816 else:
817 action = self.failure_action
James E. Blair11700c32012-07-05 17:50:05 -0700818 change.setReportedResult('FAILURE')
James E. Blair8b0d4c42012-08-23 16:03:05 -0700819 report = self.formatReport(change)
James E. Blaire0487072012-08-29 17:38:31 -0700820 change.reported = True
James E. Blairee743612012-05-29 14:49:32 -0700821 try:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800822 self.log.info("Reporting change %s, action: %s" %
823 (change, action))
James E. Blair8b0d4c42012-08-23 16:03:05 -0700824 ret = self.sched.trigger.report(change, report, action)
James E. Blairee743612012-05-29 14:49:32 -0700825 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800826 self.log.error("Reporting change %s received: %s" %
827 (change, ret))
James E. Blairee743612012-05-29 14:49:32 -0700828 except:
829 self.log.exception("Exception while reporting:")
James E. Blair11700c32012-07-05 17:50:05 -0700830 change.setReportedResult('ERROR')
831 self.updateBuildDescriptions(change.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -0700832 return ret
833
James E. Blair8b0d4c42012-08-23 16:03:05 -0700834 def formatReport(self, changeish):
835 ret = ''
836 if self.pipeline.didAllJobsSucceed(changeish):
James E. Blair56370192013-01-14 15:47:28 -0800837 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -0700838 else:
James E. Blair56370192013-01-14 15:47:28 -0800839 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -0700840
841 if changeish.dequeued_needing_change:
842 ret += "This change depends on a change that failed to merge."
843 elif changeish.current_build_set.unable_to_merge:
844 ret += "This change was unable to be automatically merged "\
845 "with the current state of the repository. Please "\
846 "rebase your change and upload a new patchset."
847 else:
James E. Blaira35fcce2012-08-24 10:46:01 -0700848 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -0800849 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -0700850 else:
James E. Blair6aea36d2012-12-17 13:03:24 -0800851 url_pattern = None
James E. Blair8b0d4c42012-08-23 16:03:05 -0700852 for job in self.pipeline.getJobs(changeish):
853 build = changeish.current_build_set.getBuild(job.name)
854 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -0800855 pattern = url_pattern
856 if result == 'SUCCESS':
857 if job.success_message:
858 result = job.success_message
859 if job.success_pattern:
860 pattern = job.success_pattern
861 elif result == 'FAILURE':
862 if job.failure_message:
863 result = job.failure_message
864 if job.failure_pattern:
865 pattern = job.failure_pattern
James E. Blair7ee88a22012-09-12 18:59:31 +0200866 url = None
James E. Blair8b0d4c42012-08-23 16:03:05 -0700867 if build.url:
868 if pattern:
869 url = pattern.format(change=changeish,
870 pipeline=self.pipeline,
871 job=job,
872 build=build)
873 else:
874 url = build.url
875 if not url:
876 url = job.name
877 if not job.voting:
878 voting = ' (non-voting)'
879 else:
880 voting = ''
881 ret += '- %s : %s%s\n' % (url, result, voting)
882 return ret
883
884 def formatDescription(self, build):
885 concurrent_changes = ''
886 concurrent_builds = ''
887 other_builds = ''
888
889 for change in build.build_set.other_changes:
890 concurrent_changes += '<li><a href="{change.url}">\
891 {change.number},{change.patchset}</a></li>'.format(
892 change=change)
893
894 change = build.build_set.change
895
896 for build in build.build_set.getBuilds():
897 if build.base_url:
898 concurrent_builds += """\
899<li>
900 <a href="{build.base_url}">
901 {build.job.name} #{build.number}</a>: {build.result}
902</li>
903""".format(build=build)
904 else:
905 concurrent_builds += """\
906<li>
907 {build.job.name}: {build.result}
908</li>""".format(build=build)
909
910 if build.build_set.previous_build_set:
911 other_build = build.build_set.previous_build_set.getBuild(
912 build.job.name)
913 if other_build:
914 other_builds += """\
915<li>
916 Preceded by: <a href="{build.base_url}">
917 {build.job.name} #{build.number}</a>
918</li>
919""".format(build=other_build)
920
921 if build.build_set.next_build_set:
922 other_build = build.build_set.next_build_set.getBuild(
923 build.job.name)
924 if other_build:
925 other_builds += """\
926<li>
927 Succeeded by: <a href="{build.base_url}">
928 {build.job.name} #{build.number}</a>
929</li>
930""".format(build=other_build)
931
932 result = build.build_set.result
933
934 if hasattr(change, 'number'):
935 ret = """\
936<p>
937 Triggered by change:
938 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
939 Branch: <b>{change.branch}</b><br/>
940 Pipeline: <b>{self.pipeline.name}</b>
941</p>"""
942 else:
943 ret = """\
944<p>
945 Triggered by reference:
946 {change.ref}</a><br/>
947 Old revision: <b>{change.oldrev}</b><br/>
948 New revision: <b>{change.newrev}</b><br/>
949 Pipeline: <b>{self.pipeline.name}</b>
950</p>"""
951
952 if concurrent_changes:
953 ret += """\
954<p>
955 Other changes tested concurrently with this change:
956 <ul>{concurrent_changes}</ul>
957</p>
958"""
959 if concurrent_builds:
960 ret += """\
961<p>
962 All builds for this change set:
963 <ul>{concurrent_builds}</ul>
964</p>
965"""
966
967 if other_builds:
968 ret += """\
969<p>
970 Other build sets for this change:
971 <ul>{other_builds}</ul>
972</p>
973"""
974 if result:
975 ret += """\
976<p>
977 Reported result: <b>{result}</b>
978</p>
979"""
980
981 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -0700982 return ret
983
James E. Blair8fa16972013-01-15 16:57:20 -0800984 def reportStats(self, change):
985 if not statsd:
986 return
987 try:
988 # Update the guage on enqueue and dequeue, but timers only
989 # when dequeing.
990 if change.dequeue_time:
991 dt = int((change.dequeue_time - change.enqueue_time) * 1000)
992 else:
993 dt = None
994 changes = len(self.pipeline.getAllChanges())
995
996 # stats.timers.zuul.pipeline.NAME.resident_time
997 # stats_counts.zuul.pipeline.NAME.total_changes
998 # stats.gauges.zuul.pipeline.NAME.current_changes
999 key = 'zuul.pipeline.%s' % self.pipeline.name
1000 statsd.gauge(key + '.current_changes', changes)
1001 if dt:
1002 statsd.timing(key + '.resident_time', dt)
1003 statsd.incr(key + '.total_changes')
1004
1005 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1006 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
1007 project_name = change.project.name.replace('/', '.')
1008 key += '.%s' % project_name
1009 if dt:
1010 statsd.timing(key + '.resident_time', dt)
1011 statsd.incr(key + '.total_changes')
1012 except:
1013 self.log.exception("Exception reporting pipeline stats")
1014
James E. Blair1e8dd892012-05-30 09:15:05 -07001015
James E. Blair4aea70c2012-07-26 14:23:24 -07001016class IndependentPipelineManager(BasePipelineManager):
1017 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001018 changes_merge = False
1019
1020 def _postConfig(self):
1021 super(IndependentPipelineManager, self)._postConfig()
1022
1023 change_queue = ChangeQueue(self.pipeline, dependent=False)
1024 for project in self.pipeline.getProjects():
1025 change_queue.addProject(project)
1026
1027 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001028
James E. Blair1e8dd892012-05-30 09:15:05 -07001029
James E. Blair4aea70c2012-07-26 14:23:24 -07001030class DependentPipelineManager(BasePipelineManager):
1031 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001032 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001033
1034 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001035 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001036
1037 def _postConfig(self):
James E. Blair4aea70c2012-07-26 14:23:24 -07001038 super(DependentPipelineManager, self)._postConfig()
James E. Blairee743612012-05-29 14:49:32 -07001039 self.buildChangeQueues()
1040
1041 def buildChangeQueues(self):
1042 self.log.debug("Building shared change queues")
1043 change_queues = []
1044
James E. Blair4aea70c2012-07-26 14:23:24 -07001045 for project in self.pipeline.getProjects():
1046 change_queue = ChangeQueue(self.pipeline)
1047 change_queue.addProject(project)
1048 change_queues.append(change_queue)
1049 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001050
1051 self.log.debug("Combining shared queues")
1052 new_change_queues = []
1053 for a in change_queues:
1054 merged_a = False
1055 for b in new_change_queues:
1056 if not a.getJobs().isdisjoint(b.getJobs()):
1057 self.log.debug("Merging queue %s into %s" % (a, b))
1058 b.mergeChangeQueue(a)
1059 merged_a = True
1060 break # this breaks out of 'for b' and continues 'for a'
1061 if not merged_a:
1062 self.log.debug("Keeping queue %s" % (a))
1063 new_change_queues.append(a)
James E. Blair1e8dd892012-05-30 09:15:05 -07001064
James E. Blairee743612012-05-29 14:49:32 -07001065 self.log.info(" Shared change queues:")
James E. Blaire0487072012-08-29 17:38:31 -07001066 for queue in new_change_queues:
1067 self.pipeline.addQueue(queue)
1068 self.log.info(" %s" % queue)
James E. Blairee743612012-05-29 14:49:32 -07001069
James E. Blaire0487072012-08-29 17:38:31 -07001070 def isChangeReadyToBeEnqueued(self, change):
1071 if not self.sched.trigger.canMerge(change,
1072 self.getSubmitAllowNeeds()):
1073 self.log.debug("Change %s can not merge, ignoring" % change)
1074 return False
1075 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001076
James E. Blaire0487072012-08-29 17:38:31 -07001077 def enqueueChangesBehind(self, change):
1078 to_enqueue = []
1079 self.log.debug("Checking for changes needing %s:" % change)
1080 if not hasattr(change, 'needed_by_changes'):
1081 self.log.debug(" Changeish does not support dependencies")
1082 return
1083 for needs in change.needed_by_changes:
1084 if self.sched.trigger.canMerge(needs,
1085 self.getSubmitAllowNeeds()):
1086 self.log.debug(" Change %s needs %s and is ready to merge" %
1087 (needs, change))
1088 to_enqueue.append(needs)
1089 if not to_enqueue:
1090 self.log.debug(" No changes need %s" % change)
1091
1092 for other_change in to_enqueue:
1093 self.addChange(other_change)
1094
1095 def enqueueChangesAhead(self, change):
1096 ret = self.checkForChangesNeededBy(change)
1097 if ret in [True, False]:
1098 return ret
1099 self.log.debug(" Change %s must be merged ahead of %s" %
1100 (ret, change))
1101 return self.addChange(ret)
1102
1103 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001104 self.log.debug("Checking for changes needed by %s:" % change)
1105 # Return true if okay to proceed enqueing this change,
1106 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001107 if not hasattr(change, 'needs_change'):
1108 self.log.debug(" Changeish does not support dependencies")
1109 return True
James E. Blaire421a232012-07-25 16:59:21 -07001110 if not change.needs_change:
1111 self.log.debug(" No changes needed")
1112 return True
1113 if change.needs_change.is_merged:
1114 self.log.debug(" Needed change is merged")
1115 return True
1116 if not change.needs_change.is_current_patchset:
1117 self.log.debug(" Needed change is not the current patchset")
1118 return False
James E. Blair127bc182012-08-28 15:55:15 -07001119 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001120 self.log.debug(" Needed change is already ahead in the queue")
1121 return True
James E. Blaire0487072012-08-29 17:38:31 -07001122 if self.sched.trigger.canMerge(change.needs_change,
1123 self.getSubmitAllowNeeds()):
1124 self.log.debug(" Change %s is needed" %
1125 change.needs_change)
1126 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001127 # The needed change can't be merged.
1128 self.log.debug(" Change %s is needed but can not be merged" %
1129 change.needs_change)
1130 return False
1131
James E. Blairee743612012-05-29 14:49:32 -07001132 def _getDependentChanges(self, change):
James E. Blair9f9667e2012-06-12 17:51:08 -07001133 orig_change = change
James E. Blairee743612012-05-29 14:49:32 -07001134 changes = []
1135 while change.change_ahead:
1136 changes.append(change.change_ahead)
1137 change = change.change_ahead
James E. Blair9f9667e2012-06-12 17:51:08 -07001138 self.log.info("Change %s depends on changes %s" % (orig_change,
1139 changes))
James E. Blairee743612012-05-29 14:49:32 -07001140 return changes
1141
Clark Boylanc2592322013-02-20 17:12:28 -08001142 def _unableToMerge(self, change, all_changes):
1143 self.log.info("Unable to merge changes %s" % all_changes)
1144 self.pipeline.setUnableToMerge(change)
1145 self.possiblyReportChange(change)
1146
James E. Blairdaabed22012-08-15 15:38:57 -07001147 def _launchJobs(self, change, jobs):
James E. Blairee743612012-05-29 14:49:32 -07001148 self.log.debug("Launching jobs for change %s" % change)
James E. Blair81515ad2012-10-01 18:29:08 -07001149 ref = change.current_build_set.ref
James E. Blairdaabed22012-08-15 15:38:57 -07001150 if hasattr(change, 'refspec') and not ref:
James E. Blair4886cc12012-07-18 15:39:41 -07001151 change.current_build_set.setConfiguration()
James E. Blair81515ad2012-10-01 18:29:08 -07001152 ref = change.current_build_set.ref
James E. Blair4886cc12012-07-18 15:39:41 -07001153 dependent_changes = self._getDependentChanges(change)
1154 dependent_changes.reverse()
James E. Blair973721f2012-08-15 10:19:43 -07001155 all_changes = dependent_changes + [change]
Clark Boylanc2592322013-02-20 17:12:28 -08001156 if (dependent_changes and
1157 not dependent_changes[-1].current_build_set.commit):
1158 self._unableToMerge(change, all_changes)
James E. Blair973721f2012-08-15 10:19:43 -07001159 return
Clark Boylanc2592322013-02-20 17:12:28 -08001160 commit = self.sched.merger.mergeChanges(all_changes, ref)
James E. Blair81515ad2012-10-01 18:29:08 -07001161 change.current_build_set.commit = commit
Clark Boylanc2592322013-02-20 17:12:28 -08001162 if not commit:
1163 self._unableToMerge(change, all_changes)
1164 return
James E. Blair4886cc12012-07-18 15:39:41 -07001165 #TODO: remove this line after GERRIT_CHANGES is gone
James E. Blairee743612012-05-29 14:49:32 -07001166 dependent_changes = self._getDependentChanges(change)
James E. Blairdaabed22012-08-15 15:38:57 -07001167 for job in jobs:
James E. Blairee743612012-05-29 14:49:32 -07001168 self.log.debug("Found job %s for change %s" % (job, change))
1169 try:
James E. Blair4886cc12012-07-18 15:39:41 -07001170 #TODO: remove dependent_changes after GERRIT_CHANGES is gone
James E. Blair03b94ef2012-08-20 10:54:29 -07001171 build = self.sched.launcher.launch(job, change, self.pipeline,
James E. Blairee743612012-05-29 14:49:32 -07001172 dependent_changes)
1173 self.building_jobs[build] = change
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001174 self.log.debug("Adding build %s of job %s to change %s" %
1175 (build, job, change))
James E. Blairee743612012-05-29 14:49:32 -07001176 change.addBuild(build)
1177 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001178 self.log.exception("Exception while launching job %s "
1179 "for change %s:" % (job, change))
James E. Blairdaabed22012-08-15 15:38:57 -07001180
Clark Boylan826ef9e2012-07-12 16:44:46 -07001181 def cancelJobs(self, change, prime=True):
James E. Blairee743612012-05-29 14:49:32 -07001182 self.log.debug("Cancel jobs for change %s" % change)
1183 to_remove = []
Clark Boylan826ef9e2012-07-12 16:44:46 -07001184 if prime:
1185 change.resetAllBuilds()
James E. Blairee743612012-05-29 14:49:32 -07001186 for build, build_change in self.building_jobs.items():
1187 if build_change == change:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001188 self.log.debug("Found build %s for change %s to cancel" %
1189 (build, change))
James E. Blairee743612012-05-29 14:49:32 -07001190 try:
1191 self.sched.launcher.cancel(build)
1192 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001193 self.log.exception("Exception while canceling build %s "
1194 "for change %s" % (build, change))
James E. Blairee743612012-05-29 14:49:32 -07001195 to_remove.append(build)
1196 for build in to_remove:
1197 self.log.debug("Removing build %s from running builds" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001198 build.result = 'CANCELED'
James E. Blairee743612012-05-29 14:49:32 -07001199 del self.building_jobs[build]
1200 if change.change_behind:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001201 self.log.debug("Canceling jobs for change %s, behind change %s" %
1202 (change.change_behind, change))
Clark Boylan826ef9e2012-07-12 16:44:46 -07001203 self.cancelJobs(change.change_behind, prime=prime)
1204
James E. Blair2fa50962013-01-30 21:50:41 -08001205 def removeChange(self, change):
1206 # Remove a change from the queue (even the middle), probably
1207 # because it has been superceded by another change (or
1208 # otherwise will not merge).
1209 self.log.debug("Canceling builds behind change: %s because it is "
1210 "being removed." % change)
James E. Blair4b22c332013-02-26 14:36:58 -08001211 self.cancelJobs(change)
James E. Blair2fa50962013-01-30 21:50:41 -08001212 self.dequeueChange(change, keep_severed_heads=False)
1213
James E. Blaire0487072012-08-29 17:38:31 -07001214 def handleFailedChange(self, change):
1215 # A build failed. All changes behind this change will need to
1216 # be retested. To free up resources cancel the builds behind
1217 # this one as they will be rerun anyways.
1218 change_ahead = change.change_ahead
1219 change_behind = change.change_behind
1220 if not change_ahead:
1221 # If we're at the head of the queue, allow changes to relaunch
1222 if change_behind:
James E. Blairec590122012-08-22 15:19:31 -07001223 self.log.info("Canceling/relaunching jobs for change %s "
1224 "behind failed change %s" %
1225 (change_behind, change))
1226 self.cancelJobs(change_behind)
James E. Blaire0487072012-08-29 17:38:31 -07001227 self.dequeueChange(change)
Clark Boylanafd18ac2012-08-22 12:59:32 -07001228 elif change_behind:
James E. Blaire0487072012-08-29 17:38:31 -07001229 self.log.debug("Canceling builds behind change: %s due to "
1230 "failure." % change)
1231 self.cancelJobs(change_behind, prime=False)
James E. Blair268d9342012-06-13 18:24:29 -07001232
James E. Blair2fa50962013-01-30 21:50:41 -08001233 def dequeueChange(self, change, keep_severed_heads=True):
James E. Blaire0487072012-08-29 17:38:31 -07001234 self.log.debug("Removing change %s from queue" % change)
1235 change_ahead = change.change_ahead
1236 change_behind = change.change_behind
1237 change_queue = self.pipeline.getQueue(change.project)
1238 change_queue.dequeueChange(change)
James E. Blair2fa50962013-01-30 21:50:41 -08001239 if keep_severed_heads and not change_ahead and not change.reported:
James E. Blaire0487072012-08-29 17:38:31 -07001240 self.log.debug("Adding %s as a severed head" % change)
1241 change_queue.addSeveredHead(change)
1242 self.dequeueDependentChanges(change_behind)
1243
1244 def dequeueDependentChanges(self, change):
James E. Blaircaec0c52012-08-22 14:52:22 -07001245 # When a change is dequeued after failing, dequeue any changes that
1246 # depend on it.
James E. Blaircaec0c52012-08-22 14:52:22 -07001247 while change:
1248 change_behind = change.change_behind
James E. Blaire0487072012-08-29 17:38:31 -07001249 if self.checkForChangesNeededBy(change) is not True:
James E. Blaircaec0c52012-08-22 14:52:22 -07001250 # It's not okay to enqueue this change, we should remove it.
James E. Blaircaec0c52012-08-22 14:52:22 -07001251 self.log.info("Dequeuing change %s because "
1252 "it can no longer merge" % change)
James E. Blaire0487072012-08-29 17:38:31 -07001253 change_queue = self.pipeline.getQueue(change.project)
1254 change_queue.dequeueChange(change)
James E. Blaircaec0c52012-08-22 14:52:22 -07001255 self.pipeline.setDequeuedNeedingChange(change)
1256 self.reportChange(change)
1257 # We don't need to recurse, because any changes that might
1258 # be affected by the removal of this change are behind us
1259 # in the queue, so we can continue walking backwards.
1260 change = change_behind