blob: 8b6c20c8744f2fafbddf50167bc0d74ac5876437 [file] [log] [blame]
James E. Blairee743612012-05-29 14:49:32 -07001# Copyright 2012 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Zhongyue Luo1c860d72012-07-19 11:03:56 +080023import Queue
24import re
25import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
Zhongyue Luo1c860d72012-07-19 11:03:56 +080027import yaml
James E. Blairee743612012-05-29 14:49:32 -070028
James E. Blair47958382013-01-10 17:26:02 -080029import layoutvalidator
James E. Blair4886cc12012-07-18 15:39:41 -070030import model
Joshua Hesketh1879cf72013-08-19 14:13:15 +100031from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
James E. Blair4886cc12012-07-18 15:39:41 -070032import merger
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair71e94122012-12-24 17:53:08 -080034statsd = extras.try_import('statsd.statsd')
35
James E. Blair1e8dd892012-05-30 09:15:05 -070036
Antoine Musso80edd5a2013-02-13 15:37:53 +010037def deep_format(obj, paramdict):
38 """Apply the paramdict via str.format() to all string objects found within
39 the supplied obj. Lists and dicts are traversed recursively.
40
41 Borrowed from Jenkins Job Builder project"""
42 if isinstance(obj, str):
43 ret = obj.format(**paramdict)
44 elif isinstance(obj, list):
45 ret = []
46 for item in obj:
47 ret.append(deep_format(item, paramdict))
48 elif isinstance(obj, dict):
49 ret = {}
50 for item in obj:
51 exp_item = item.format(**paramdict)
52
53 ret[exp_item] = deep_format(obj[item], paramdict)
54 else:
55 ret = obj
56 return ret
57
58
James E. Blairfee8d652013-06-07 08:57:52 -070059class MergeFailure(Exception):
60 pass
61
62
James E. Blaire9d45c32012-05-31 09:56:45 -070063class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -070064 log = logging.getLogger("zuul.Scheduler")
65
James E. Blaire9d45c32012-05-31 09:56:45 -070066 def __init__(self):
67 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -040068 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -070069 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -070070 self.layout_lock = threading.Lock()
James E. Blaire9d45c32012-05-31 09:56:45 -070071 self.reconfigure_complete_event = threading.Event()
James E. Blair5d5bc2b2012-07-06 10:24:01 -070072 self._pause = False
73 self._reconfigure = False
74 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -070075 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -070076 self.launcher = None
James E. Blair6c358e72013-07-29 17:06:47 -070077 self.triggers = dict()
Joshua Hesketh1879cf72013-08-19 14:13:15 +100078 self.reporters = dict()
James E. Blair3c5e5b52013-04-26 11:17:03 -070079 self.config = None
James E. Blair0e933c52013-07-11 10:18:52 -070080 self._maintain_trigger_cache = False
James E. Blairee743612012-05-29 14:49:32 -070081
82 self.trigger_event_queue = Queue.Queue()
83 self.result_event_queue = Queue.Queue()
James E. Blaireff88162013-07-01 12:44:14 -040084 self.layout = model.Layout()
James E. Blairee743612012-05-29 14:49:32 -070085
James E. Blairb0fcae42012-07-17 11:12:10 -070086 def stop(self):
87 self._stopped = True
88 self.wake_event.set()
89
James E. Blair47958382013-01-10 17:26:02 -080090 def testConfig(self, config_path):
James E. Blair04948c72013-07-25 23:03:17 -070091 return self._parseConfig(config_path)
James E. Blair47958382013-01-10 17:26:02 -080092
James E. Blaire5a847f2012-07-10 15:29:14 -070093 def _parseConfig(self, config_path):
James E. Blaireff88162013-07-01 12:44:14 -040094 layout = model.Layout()
95 project_templates = {}
96
James E. Blairee743612012-05-29 14:49:32 -070097 def toList(item):
James E. Blair1e8dd892012-05-30 09:15:05 -070098 if not item:
99 return []
James E. Blair32663402012-06-01 10:04:18 -0700100 if isinstance(item, list):
James E. Blairee743612012-05-29 14:49:32 -0700101 return item
102 return [item]
103
James E. Blaire5a847f2012-07-10 15:29:14 -0700104 if config_path:
105 config_path = os.path.expanduser(config_path)
106 if not os.path.exists(config_path):
107 raise Exception("Unable to read layout config file at %s" %
108 config_path)
109 config_file = open(config_path)
110 data = yaml.load(config_file)
111
James E. Blair47958382013-01-10 17:26:02 -0800112 validator = layoutvalidator.LayoutValidator()
113 validator.validate(data)
114
James E. Blaireff88162013-07-01 12:44:14 -0400115 config_env = {}
James E. Blaire5a847f2012-07-10 15:29:14 -0700116 for include in data.get('includes', []):
117 if 'python-file' in include:
118 fn = include['python-file']
119 if not os.path.isabs(fn):
120 base = os.path.dirname(config_path)
121 fn = os.path.join(base, fn)
122 fn = os.path.expanduser(fn)
James E. Blaireff88162013-07-01 12:44:14 -0400123 execfile(fn, config_env)
James E. Blair1e8dd892012-05-30 09:15:05 -0700124
James E. Blair4aea70c2012-07-26 14:23:24 -0700125 for conf_pipeline in data.get('pipelines', []):
126 pipeline = Pipeline(conf_pipeline['name'])
James E. Blair8dbd56a2012-12-22 10:55:10 -0800127 pipeline.description = conf_pipeline.get('description')
James E. Blair64ed6f22013-07-10 14:07:23 -0700128 precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
129 pipeline.precedence = precedence
James E. Blair56370192013-01-14 15:47:28 -0800130 pipeline.failure_message = conf_pipeline.get('failure-message',
131 "Build failed.")
132 pipeline.success_message = conf_pipeline.get('success-message',
133 "Build succeeded.")
James E. Blair2fa50962013-01-30 21:50:41 -0800134 pipeline.dequeue_on_new_patchset = conf_pipeline.get(
James E. Blair6736beb2013-07-11 15:18:15 -0700135 'dequeue-on-new-patchset', True)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000136
137 action_reporters = {}
138 for action in ['start', 'success', 'failure']:
139 action_reporters[action] = []
140 if conf_pipeline.get(action):
141 for reporter_name, params \
142 in conf_pipeline.get(action).items():
143 if reporter_name in self.reporters.keys():
144 action_reporters[action].append(ActionReporter(
145 self.reporters[reporter_name], params))
146 else:
147 self.log.error('Invalid reporter name %s' %
148 reporter_name)
149 pipeline.start_actions = action_reporters['start']
150 pipeline.success_actions = action_reporters['success']
151 pipeline.failure_actions = action_reporters['failure']
152
James E. Blair4aea70c2012-07-26 14:23:24 -0700153 manager = globals()[conf_pipeline['manager']](self, pipeline)
154 pipeline.setManager(manager)
James E. Blaireff88162013-07-01 12:44:14 -0400155 layout.pipelines[conf_pipeline['name']] = pipeline
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000156
James E. Blair6c358e72013-07-29 17:06:47 -0700157 # TODO: move this into triggers (may require pluggable
158 # configuration)
159 if 'gerrit' in conf_pipeline['trigger']:
160 pipeline.trigger = self.triggers['gerrit']
161 for trigger in toList(conf_pipeline['trigger']['gerrit']):
162 approvals = {}
163 for approval_dict in toList(trigger.get('approval')):
164 for k, v in approval_dict.items():
165 approvals[k] = v
166 f = EventFilter(types=toList(trigger['event']),
167 branches=toList(trigger.get('branch')),
168 refs=toList(trigger.get('ref')),
169 approvals=approvals,
170 comment_filters=
171 toList(trigger.get('comment_filter')),
172 email_filters=
173 toList(trigger.get('email_filter')))
174 manager.event_filters.append(f)
James E. Blair63bb0ef2013-07-29 17:14:51 -0700175 elif 'timer' in conf_pipeline['trigger']:
176 pipeline.trigger = self.triggers['timer']
177 for trigger in toList(conf_pipeline['trigger']['timer']):
178 f = EventFilter(types=['timer'],
179 timespecs=toList(trigger['time']))
180 manager.event_filters.append(f)
James E. Blairee743612012-05-29 14:49:32 -0700181
Antoine Musso80edd5a2013-02-13 15:37:53 +0100182 for project_template in data.get('project-templates', []):
183 # Make sure the template only contains valid pipelines
184 tpl = dict(
185 (pipe_name, project_template.get(pipe_name))
James E. Blaireff88162013-07-01 12:44:14 -0400186 for pipe_name in layout.pipelines.keys()
Antoine Musso80edd5a2013-02-13 15:37:53 +0100187 if pipe_name in project_template
188 )
James E. Blaireff88162013-07-01 12:44:14 -0400189 project_templates[project_template.get('name')] = tpl
Antoine Musso80edd5a2013-02-13 15:37:53 +0100190
James E. Blair47958382013-01-10 17:26:02 -0800191 for config_job in data.get('jobs', []):
James E. Blaireff88162013-07-01 12:44:14 -0400192 job = layout.getJob(config_job['name'])
James E. Blairb0954652012-06-01 11:32:01 -0700193 # Be careful to only set attributes explicitly present on
194 # this job, to avoid squashing attributes set by a meta-job.
195 m = config_job.get('failure-message', None)
196 if m:
197 job.failure_message = m
198 m = config_job.get('success-message', None)
199 if m:
200 job.success_message = m
James E. Blair6aea36d2012-12-17 13:03:24 -0800201 m = config_job.get('failure-pattern', None)
202 if m:
203 job.failure_pattern = m
204 m = config_job.get('success-pattern', None)
205 if m:
206 job.success_pattern = m
James E. Blair222d4982012-07-16 09:31:19 -0700207 m = config_job.get('hold-following-changes', False)
208 if m:
209 job.hold_following_changes = True
James E. Blair4ec821f2012-08-23 15:28:28 -0700210 m = config_job.get('voting', None)
211 if m is not None:
212 job.voting = m
James E. Blaire5a847f2012-07-10 15:29:14 -0700213 fname = config_job.get('parameter-function', None)
214 if fname:
James E. Blaireff88162013-07-01 12:44:14 -0400215 func = config_env.get(fname, None)
James E. Blaire5a847f2012-07-10 15:29:14 -0700216 if not func:
217 raise Exception("Unable to find function %s" % fname)
218 job.parameter_function = func
James E. Blairee743612012-05-29 14:49:32 -0700219 branches = toList(config_job.get('branch'))
220 if branches:
James E. Blaire421a232012-07-25 16:59:21 -0700221 job._branches = branches
222 job.branches = [re.compile(x) for x in branches]
James E. Blair70c71582013-03-06 08:50:50 -0800223 files = toList(config_job.get('files'))
224 if files:
225 job._files = files
226 job.files = [re.compile(x) for x in files]
James E. Blairee743612012-05-29 14:49:32 -0700227
228 def add_jobs(job_tree, config_jobs):
229 for job in config_jobs:
230 if isinstance(job, list):
231 for x in job:
232 add_jobs(job_tree, x)
233 if isinstance(job, dict):
234 for parent, children in job.items():
James E. Blaireff88162013-07-01 12:44:14 -0400235 parent_tree = job_tree.addJob(layout.getJob(parent))
James E. Blairee743612012-05-29 14:49:32 -0700236 add_jobs(parent_tree, children)
237 if isinstance(job, str):
James E. Blaireff88162013-07-01 12:44:14 -0400238 job_tree.addJob(layout.getJob(job))
James E. Blairee743612012-05-29 14:49:32 -0700239
James E. Blair47958382013-01-10 17:26:02 -0800240 for config_project in data.get('projects', []):
James E. Blairee743612012-05-29 14:49:32 -0700241 project = Project(config_project['name'])
Antoine Musso80edd5a2013-02-13 15:37:53 +0100242
243 for requested_template in config_project.get('template', []):
244 # Fetch the template from 'project-templates'
James E. Blaireff88162013-07-01 12:44:14 -0400245 tpl = project_templates.get(
Antoine Musso80edd5a2013-02-13 15:37:53 +0100246 requested_template.get('name'))
247 # Expand it with the project context
248 expanded = deep_format(tpl, requested_template)
249 # Finally merge the expansion with whatever has been already
250 # defined for this project
251 config_project.update(expanded)
252
James E. Blaireff88162013-07-01 12:44:14 -0400253 layout.projects[config_project['name']] = project
James E. Blair19deff22013-08-25 13:17:35 -0700254 mode = config_project.get('merge-mode', 'merge-resolve')
255 project.merge_mode = model.MERGER_MAP[mode]
James E. Blaireff88162013-07-01 12:44:14 -0400256 for pipeline in layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700257 if pipeline.name in config_project:
258 job_tree = pipeline.addProject(project)
259 config_jobs = config_project[pipeline.name]
James E. Blairee743612012-05-29 14:49:32 -0700260 add_jobs(job_tree, config_jobs)
James E. Blairee743612012-05-29 14:49:32 -0700261
James E. Blairb0954652012-06-01 11:32:01 -0700262 # All jobs should be defined at this point, get rid of
263 # metajobs so that getJob isn't doing anything weird.
James E. Blairc28d1b02013-07-19 11:37:06 -0700264 layout.metajobs = []
James E. Blairb0954652012-06-01 11:32:01 -0700265
James E. Blaireff88162013-07-01 12:44:14 -0400266 for pipeline in layout.pipelines.values():
267 pipeline.manager._postConfig(layout)
268
269 return layout
James E. Blairee743612012-05-29 14:49:32 -0700270
James E. Blair47958382013-01-10 17:26:02 -0800271 def _setupMerger(self):
James E. Blair4886cc12012-07-18 15:39:41 -0700272 if self.config.has_option('zuul', 'git_dir'):
273 merge_root = self.config.get('zuul', 'git_dir')
274 else:
275 merge_root = '/var/lib/zuul/git'
James E. Blair47958382013-01-10 17:26:02 -0800276
Paul Belangerb67aba12013-05-13 19:22:14 -0400277 if self.config.has_option('zuul', 'git_user_email'):
278 merge_email = self.config.get('zuul', 'git_user_email')
279 else:
280 merge_email = None
281
282 if self.config.has_option('zuul', 'git_user_name'):
283 merge_name = self.config.get('zuul', 'git_user_name')
284 else:
285 merge_name = None
286
James E. Blairceabcbc2012-08-17 13:48:46 -0700287 if self.config.has_option('zuul', 'push_change_refs'):
288 push_refs = self.config.getboolean('zuul', 'push_change_refs')
289 else:
290 push_refs = False
James E. Blair47958382013-01-10 17:26:02 -0800291
James E. Blairad615012012-11-30 16:14:21 -0800292 if self.config.has_option('gerrit', 'sshkey'):
293 sshkey = self.config.get('gerrit', 'sshkey')
294 else:
295 sshkey = None
James E. Blair47958382013-01-10 17:26:02 -0800296
James E. Blair6c358e72013-07-29 17:06:47 -0700297 # TODO: The merger should have an upstream repo independent of
298 # triggers, and then each trigger should provide a fetch
299 # location.
300 self.merger = merger.Merger(self.triggers['gerrit'],
301 merge_root, push_refs,
Paul Belangerb67aba12013-05-13 19:22:14 -0400302 sshkey, merge_email, merge_name)
James E. Blaireff88162013-07-01 12:44:14 -0400303 for project in self.layout.projects.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700304 url = self.triggers['gerrit'].getGitUrl(project)
James E. Blair4886cc12012-07-18 15:39:41 -0700305 self.merger.addProject(project, url)
306
James E. Blairee743612012-05-29 14:49:32 -0700307 def setLauncher(self, launcher):
308 self.launcher = launcher
309
James E. Blair6c358e72013-07-29 17:06:47 -0700310 def registerTrigger(self, trigger, name=None):
311 if name is None:
312 name = trigger.name
313 self.triggers[name] = trigger
James E. Blairee743612012-05-29 14:49:32 -0700314
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000315 def registerReporter(self, reporter, name=None):
316 if name is None:
317 name = reporter.name
318 self.reporters[name] = reporter
319
James E. Blaircdccd972013-07-01 12:10:22 -0700320 def getProject(self, name):
321 self.layout_lock.acquire()
322 p = None
323 try:
324 p = self.layout.projects.get(name)
325 finally:
326 self.layout_lock.release()
327 return p
328
James E. Blairee743612012-05-29 14:49:32 -0700329 def addEvent(self, event):
330 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800331 try:
332 if statsd:
333 statsd.incr('gerrit.event.%s' % event.type)
334 except:
335 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700336 self.trigger_event_queue.put(event)
337 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800338 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700339
James E. Blair11700c32012-07-05 17:50:05 -0700340 def onBuildStarted(self, build):
341 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800342 build.start_time = time.time()
James E. Blair11700c32012-07-05 17:50:05 -0700343 self.result_event_queue.put(('started', build))
344 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800345 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700346
James E. Blairee743612012-05-29 14:49:32 -0700347 def onBuildCompleted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -0700348 self.log.debug("Adding complete event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800349 build.end_time = time.time()
James E. Blair23ec1ba2013-01-04 18:06:10 -0800350 try:
James E. Blair66eeebf2013-07-27 17:44:32 -0700351 if statsd and build.pipeline:
352 jobname = build.job.name.replace('.', '_')
353 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
354 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800355 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
356 dt = int((build.end_time - build.start_time) * 1000)
357 statsd.timing(key, dt)
358 statsd.incr(key)
James E. Blair7f4a1902013-08-24 08:20:02 -0700359 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
360 statsd.incr(key)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800361 except:
362 self.log.exception("Exception reporting runtime stats")
James E. Blair11700c32012-07-05 17:50:05 -0700363 self.result_event_queue.put(('completed', build))
James E. Blairee743612012-05-29 14:49:32 -0700364 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800365 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700366
James E. Blaire9d45c32012-05-31 09:56:45 -0700367 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700368 self.log.debug("Prepare to reconfigure")
James E. Blaire9d45c32012-05-31 09:56:45 -0700369 self.config = config
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700370 self._reconfigure = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700371 self.wake_event.set()
372 self.log.debug("Waiting for reconfiguration")
373 self.reconfigure_complete_event.wait()
374 self.reconfigure_complete_event.clear()
375 self.log.debug("Reconfiguration complete")
376
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700377 def exit(self):
378 self.log.debug("Prepare to exit")
379 self._pause = True
380 self._exit = True
381 self.wake_event.set()
382 self.log.debug("Waiting for exit")
383
384 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700385 if self.config.has_option('zuul', 'state_dir'):
386 state_dir = os.path.expanduser(self.config.get('zuul',
387 'state_dir'))
388 else:
389 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700390 return os.path.join(state_dir, 'queue.pickle')
391
392 def _save_queue(self):
393 pickle_file = self._get_queue_pickle_file()
394 events = []
395 while not self.trigger_event_queue.empty():
396 events.append(self.trigger_event_queue.get())
397 self.log.debug("Queue length is %s" % len(events))
398 if events:
399 self.log.debug("Saving queue")
400 pickle.dump(events, open(pickle_file, 'wb'))
401
402 def _load_queue(self):
403 pickle_file = self._get_queue_pickle_file()
404 if os.path.exists(pickle_file):
405 self.log.debug("Loading queue")
406 events = pickle.load(open(pickle_file, 'rb'))
407 self.log.debug("Queue length is %s" % len(events))
408 for event in events:
409 self.trigger_event_queue.put(event)
410 else:
411 self.log.debug("No queue file found")
412
413 def _delete_queue(self):
414 pickle_file = self._get_queue_pickle_file()
415 if os.path.exists(pickle_file):
416 self.log.debug("Deleting saved queue")
417 os.unlink(pickle_file)
418
419 def resume(self):
420 try:
421 self._load_queue()
422 except:
423 self.log.exception("Unable to load queue")
424 try:
425 self._delete_queue()
426 except:
427 self.log.exception("Unable to delete saved queue")
428 self.log.debug("Resuming queue processing")
429 self.wake_event.set()
430
431 def _doPauseEvent(self):
432 if self._exit:
433 self.log.debug("Exiting")
434 self._save_queue()
435 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700436
437 def _doReconfigureEvent(self):
438 # This is called in the scheduler loop after another thread sets
439 # the reconfigure flag
440 self.layout_lock.acquire()
441 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700442 self.log.debug("Performing reconfiguration")
James E. Blaircdccd972013-07-01 12:10:22 -0700443 layout = self._parseConfig(
James E. Blaireff88162013-07-01 12:44:14 -0400444 self.config.get('zuul', 'layout_config'))
James E. Blaircdccd972013-07-01 12:10:22 -0700445 for name, new_pipeline in layout.pipelines.items():
446 old_pipeline = self.layout.pipelines.get(name)
447 if not old_pipeline:
448 if self.layout.pipelines:
449 # Don't emit this warning on startup
450 self.log.warning("No old pipeline matching %s found "
451 "when reconfiguring" % name)
452 continue
453 self.log.debug("Re-enqueueing changes for pipeline %s" %
454 name)
455 items_to_remove = []
456 for shared_queue in old_pipeline.queues:
James E. Blair972e3c72013-08-29 12:04:55 -0700457 for item in shared_queue.queue:
James E. Blaircdccd972013-07-01 12:10:22 -0700458 item.item_ahead = None
James E. Blair972e3c72013-08-29 12:04:55 -0700459 item.items_behind = []
James E. Blaircdccd972013-07-01 12:10:22 -0700460 item.pipeline = None
461 project = layout.projects.get(item.change.project.name)
462 if not project:
463 self.log.warning("Unable to find project for "
464 "change %s while reenqueueing" %
465 item.change)
466 item.change.project = None
467 items_to_remove.append(item)
468 continue
469 item.change.project = project
James E. Blair972e3c72013-08-29 12:04:55 -0700470 if not new_pipeline.manager.reEnqueueItem(item):
James E. Blaircdccd972013-07-01 12:10:22 -0700471 items_to_remove.append(item)
472 builds_to_remove = []
473 for build, item in old_pipeline.manager.building_jobs.items():
474 if item in items_to_remove:
475 builds_to_remove.append(build)
476 self.log.warning("Deleting running build %s for "
477 "change %s while reenqueueing" % (
478 build, item.change))
479 for build in builds_to_remove:
480 del old_pipeline.manager.building_jobs[build]
481 new_pipeline.manager.building_jobs = \
482 old_pipeline.manager.building_jobs
483 self.layout = layout
James E. Blair47958382013-01-10 17:26:02 -0800484 self._setupMerger()
James E. Blair63bb0ef2013-07-29 17:14:51 -0700485 for trigger in self.triggers.values():
486 trigger.postConfig()
James E. Blair3cb10702013-08-24 08:56:03 -0700487 if statsd:
488 try:
489 for pipeline in self.layout.pipelines.values():
490 items = len(pipeline.getAllItems())
491 # stats.gauges.zuul.pipeline.NAME.current_changes
492 key = 'zuul.pipeline.%s' % pipeline.name
493 statsd.gauge(key + '.current_changes', items)
494 except Exception:
495 self.log.exception("Exception reporting initial "
496 "pipeline stats:")
James E. Blaire0487072012-08-29 17:38:31 -0700497 self._reconfigure = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700498 self.reconfigure_complete_event.set()
James E. Blaircdccd972013-07-01 12:10:22 -0700499 finally:
500 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700501
502 def _areAllBuildsComplete(self):
503 self.log.debug("Checking if all builds are complete")
504 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400505 for pipeline in self.layout.pipelines.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700506 for build in pipeline.manager.building_jobs.keys():
507 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
James E. Blaire9d45c32012-05-31 09:56:45 -0700508 waiting = True
509 if not waiting:
510 self.log.debug("All builds are complete")
511 return True
512 self.log.debug("All builds are not complete")
513 return False
514
James E. Blairee743612012-05-29 14:49:32 -0700515 def run(self):
James E. Blair71e94122012-12-24 17:53:08 -0800516 if statsd:
517 self.log.debug("Statsd enabled")
518 else:
519 self.log.debug("Statsd disabled because python statsd "
520 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700521 while True:
522 self.log.debug("Run handler sleeping")
523 self.wake_event.wait()
524 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700525 if self._stopped:
526 return
James E. Blairee743612012-05-29 14:49:32 -0700527 self.log.debug("Run handler awake")
528 try:
James E. Blaircdccd972013-07-01 12:10:22 -0700529 if self._reconfigure:
530 self._doReconfigureEvent()
531
James E. Blair263fba92013-02-27 13:07:19 -0800532 # Give result events priority -- they let us stop builds,
533 # whereas trigger evensts cause us to launch builds.
James E. Blairee743612012-05-29 14:49:32 -0700534 if not self.result_event_queue.empty():
535 self.process_result_queue()
James E. Blair263fba92013-02-27 13:07:19 -0800536 elif not self._pause:
537 if not self.trigger_event_queue.empty():
538 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700539
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700540 if self._pause and self._areAllBuildsComplete():
541 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700542
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700543 if not self._pause:
James E. Blair4baa94c2012-06-07 17:04:21 -0700544 if not (self.trigger_event_queue.empty() and
545 self.result_event_queue.empty()):
546 self.wake_event.set()
547 else:
548 if not self.result_event_queue.empty():
549 self.wake_event.set()
James E. Blair0e933c52013-07-11 10:18:52 -0700550
551 if self._maintain_trigger_cache:
552 self.maintainTriggerCache()
553 self._maintain_trigger_cache = False
554
James E. Blairee743612012-05-29 14:49:32 -0700555 except:
556 self.log.exception("Exception in run handler:")
557
James E. Blair0e933c52013-07-11 10:18:52 -0700558 def maintainTriggerCache(self):
559 relevant = set()
560 for pipeline in self.layout.pipelines.values():
James E. Blairfadc6e12013-08-21 18:23:15 -0700561 self.log.debug("Start maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700562 for item in pipeline.getAllItems():
563 relevant.add(item.change)
564 relevant.update(item.change.getRelatedChanges())
James E. Blairfadc6e12013-08-21 18:23:15 -0700565 self.log.debug("End maintain trigger cache for: %s" % pipeline)
James E. Blair0e933c52013-07-11 10:18:52 -0700566 self.log.debug("Trigger cache size: %s" % len(relevant))
James E. Blair6c358e72013-07-29 17:06:47 -0700567 for trigger in self.triggers.values():
568 trigger.maintainCache(relevant)
James E. Blair0e933c52013-07-11 10:18:52 -0700569
James E. Blairee743612012-05-29 14:49:32 -0700570 def process_event_queue(self):
571 self.log.debug("Fetching trigger event")
572 event = self.trigger_event_queue.get()
573 self.log.debug("Processing trigger event %s" % event)
James E. Blaireff88162013-07-01 12:44:14 -0400574 project = self.layout.projects.get(event.project_name)
James E. Blairee743612012-05-29 14:49:32 -0700575 if not project:
576 self.log.warning("Project %s not found" % event.project_name)
James E. Blairff791972013-01-09 11:45:43 -0800577 self.trigger_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700578 return
579
Antoine Mussofeba9672013-01-17 13:44:59 +0100580 # Preprocessing for ref-update events
James E. Blair312df9a2013-08-27 17:17:02 -0700581 if event.ref:
Antoine Mussofeba9672013-01-17 13:44:59 +0100582 # Make sure the local git repo is up-to-date with the remote one.
583 # We better have the new ref before enqueuing the changes.
584 # This is done before enqueuing the changes to avoid calling an
585 # update per pipeline accepting the change.
James E. Blairb34e9262013-08-27 17:12:31 -0700586 self.log.info("Fetching references for %s" % project)
587 self.merger.updateRepo(project)
Antoine Mussofeba9672013-01-17 13:44:59 +0100588
James E. Blaireff88162013-07-01 12:44:14 -0400589 for pipeline in self.layout.pipelines.values():
James E. Blair6c358e72013-07-29 17:06:47 -0700590 change = event.getChange(project,
591 self.triggers.get(event.trigger_name))
James E. Blair2fa50962013-01-30 21:50:41 -0800592 if event.type == 'patchset-created':
593 pipeline.manager.removeOldVersionsOfChange(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700594 if pipeline.manager.eventMatches(event):
595 self.log.info("Adding %s, %s to %s" %
596 (project, change, pipeline))
597 pipeline.manager.addChange(change)
598 while pipeline.manager.processQueue():
599 pass
600
James E. Blairff791972013-01-09 11:45:43 -0800601 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700602
James E. Blairee743612012-05-29 14:49:32 -0700603 def process_result_queue(self):
604 self.log.debug("Fetching result event")
James E. Blair11700c32012-07-05 17:50:05 -0700605 event_type, build = self.result_event_queue.get()
James E. Blairee743612012-05-29 14:49:32 -0700606 self.log.debug("Processing result event %s" % build)
James E. Blaireff88162013-07-01 12:44:14 -0400607 for pipeline in self.layout.pipelines.values():
James E. Blair11700c32012-07-05 17:50:05 -0700608 if event_type == 'started':
James E. Blair4aea70c2012-07-26 14:23:24 -0700609 if pipeline.manager.onBuildStarted(build):
James E. Blairff791972013-01-09 11:45:43 -0800610 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700611 return
612 elif event_type == 'completed':
James E. Blair4aea70c2012-07-26 14:23:24 -0700613 if pipeline.manager.onBuildCompleted(build):
James E. Blairff791972013-01-09 11:45:43 -0800614 self.result_event_queue.task_done()
James E. Blair11700c32012-07-05 17:50:05 -0700615 return
James E. Blairc84dd262012-05-31 10:03:13 -0700616 self.log.warning("Build %s not found by any queue manager" % (build))
James E. Blairff791972013-01-09 11:45:43 -0800617 self.result_event_queue.task_done()
James E. Blairee743612012-05-29 14:49:32 -0700618
James E. Blair268d9342012-06-13 18:24:29 -0700619 def formatStatusHTML(self):
620 ret = '<html><pre>'
James E. Blaire0487072012-08-29 17:38:31 -0700621 if self._pause:
622 ret += '<p><b>Queue only mode:</b> preparing to '
James E. Blaire0487072012-08-29 17:38:31 -0700623 if self._exit:
624 ret += 'exit'
625 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
626 ret += '</p>'
627
James E. Blaireff88162013-07-01 12:44:14 -0400628 keys = self.layout.pipelines.keys()
James E. Blair268d9342012-06-13 18:24:29 -0700629 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400630 pipeline = self.layout.pipelines[key]
James E. Blair4aea70c2012-07-26 14:23:24 -0700631 s = 'Pipeline: %s' % pipeline.name
James E. Blair268d9342012-06-13 18:24:29 -0700632 ret += s + '\n'
633 ret += '-' * len(s) + '\n'
James E. Blaire0487072012-08-29 17:38:31 -0700634 ret += pipeline.formatStatusHTML()
James E. Blair268d9342012-06-13 18:24:29 -0700635 ret += '\n'
636 ret += '</pre></html>'
637 return ret
638
James E. Blair8dbd56a2012-12-22 10:55:10 -0800639 def formatStatusJSON(self):
640 data = {}
641 if self._pause:
642 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800643 if self._exit:
644 ret += 'exit'
645 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
646 ret += '</p>'
647 data['message'] = ret
648
James E. Blairfb682cc2013-02-26 15:23:27 -0800649 data['trigger_event_queue'] = {}
650 data['trigger_event_queue']['length'] = \
651 self.trigger_event_queue.qsize()
652 data['result_event_queue'] = {}
653 data['result_event_queue']['length'] = \
654 self.result_event_queue.qsize()
655
James E. Blair8dbd56a2012-12-22 10:55:10 -0800656 pipelines = []
657 data['pipelines'] = pipelines
James E. Blaireff88162013-07-01 12:44:14 -0400658 keys = self.layout.pipelines.keys()
James E. Blair8dbd56a2012-12-22 10:55:10 -0800659 for key in keys:
James E. Blaireff88162013-07-01 12:44:14 -0400660 pipeline = self.layout.pipelines[key]
James E. Blair8dbd56a2012-12-22 10:55:10 -0800661 pipelines.append(pipeline.formatStatusJSON())
662 return json.dumps(data)
663
James E. Blair1e8dd892012-05-30 09:15:05 -0700664
James E. Blair4aea70c2012-07-26 14:23:24 -0700665class BasePipelineManager(object):
666 log = logging.getLogger("zuul.BasePipelineManager")
James E. Blairee743612012-05-29 14:49:32 -0700667
James E. Blair4aea70c2012-07-26 14:23:24 -0700668 def __init__(self, sched, pipeline):
James E. Blairee743612012-05-29 14:49:32 -0700669 self.sched = sched
James E. Blair4aea70c2012-07-26 14:23:24 -0700670 self.pipeline = pipeline
James E. Blairee743612012-05-29 14:49:32 -0700671 self.building_jobs = {}
672 self.event_filters = []
James E. Blair3c5e5b52013-04-26 11:17:03 -0700673 if self.sched.config and self.sched.config.has_option(
674 'zuul', 'report_times'):
James E. Blair0ac6c012013-04-26 09:04:23 -0700675 self.report_times = self.sched.config.getboolean(
676 'zuul', 'report_times')
677 else:
678 self.report_times = True
James E. Blairee743612012-05-29 14:49:32 -0700679
680 def __str__(self):
James E. Blair93cc8d42012-08-07 10:46:51 -0700681 return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700682
James E. Blaireff88162013-07-01 12:44:14 -0400683 def _postConfig(self, layout):
James E. Blair4aea70c2012-07-26 14:23:24 -0700684 self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
James E. Blairee743612012-05-29 14:49:32 -0700685 self.log.info(" Events:")
686 for e in self.event_filters:
687 self.log.info(" %s" % e)
688 self.log.info(" Projects:")
James E. Blair1e8dd892012-05-30 09:15:05 -0700689
James E. Blairee743612012-05-29 14:49:32 -0700690 def log_jobs(tree, indent=0):
James E. Blair1e8dd892012-05-30 09:15:05 -0700691 istr = ' ' + ' ' * indent
James E. Blairee743612012-05-29 14:49:32 -0700692 if tree.job:
693 efilters = ''
James E. Blaire421a232012-07-25 16:59:21 -0700694 for b in tree.job._branches:
695 efilters += str(b)
James E. Blair70c71582013-03-06 08:50:50 -0800696 for f in tree.job._files:
697 efilters += str(f)
James E. Blairee743612012-05-29 14:49:32 -0700698 if efilters:
James E. Blair1e8dd892012-05-30 09:15:05 -0700699 efilters = ' ' + efilters
James E. Blair222d4982012-07-16 09:31:19 -0700700 hold = ''
701 if tree.job.hold_following_changes:
702 hold = ' [hold]'
James E. Blair4ec821f2012-08-23 15:28:28 -0700703 voting = ''
704 if not tree.job.voting:
705 voting = ' [nonvoting]'
706 self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
707 efilters, hold, voting))
James E. Blairee743612012-05-29 14:49:32 -0700708 for x in tree.job_trees:
James E. Blair1e8dd892012-05-30 09:15:05 -0700709 log_jobs(x, indent + 2)
710
James E. Blaireff88162013-07-01 12:44:14 -0400711 for p in layout.projects.values():
James E. Blair4aea70c2012-07-26 14:23:24 -0700712 tree = self.pipeline.getJobTree(p)
713 if tree:
James E. Blairee743612012-05-29 14:49:32 -0700714 self.log.info(" %s" % p)
James E. Blair4aea70c2012-07-26 14:23:24 -0700715 log_jobs(tree)
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000716 self.log.info(" On start:")
717 self.log.info(" %s" % self.pipeline.start_actions)
718 self.log.info(" On success:")
719 self.log.info(" %s" % self.pipeline.success_actions)
720 self.log.info(" On failure:")
721 self.log.info(" %s" % self.pipeline.failure_actions)
James E. Blairee743612012-05-29 14:49:32 -0700722
James E. Blaire421a232012-07-25 16:59:21 -0700723 def getSubmitAllowNeeds(self):
724 # Get a list of code review labels that are allowed to be
725 # "needed" in the submit records for a change, with respect
726 # to this queue. In other words, the list of review labels
727 # this queue itself is likely to set before submitting.
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000728 allow_needs = set()
729 for action_reporter in self.pipeline.success_actions:
730 allow_needs.update(action_reporter.getSubmitAllowNeeds())
731 return allow_needs
James E. Blaire421a232012-07-25 16:59:21 -0700732
James E. Blairee743612012-05-29 14:49:32 -0700733 def eventMatches(self, event):
734 for ef in self.event_filters:
James E. Blairee743612012-05-29 14:49:32 -0700735 if ef.matches(event):
736 return True
737 return False
738
James E. Blair0dc8ba92012-07-16 14:23:52 -0700739 def isChangeAlreadyInQueue(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700740 for c in self.pipeline.getChangesInQueue():
James E. Blair0dc8ba92012-07-16 14:23:52 -0700741 if change.equals(c):
742 return True
743 return False
744
James E. Blaire0487072012-08-29 17:38:31 -0700745 def reportStart(self, change):
746 try:
747 self.log.info("Reporting start, action %s change %s" %
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000748 (self.pipeline.start_actions, change))
James E. Blaire0487072012-08-29 17:38:31 -0700749 msg = "Starting %s jobs." % self.pipeline.name
Clark Boylan9b670902012-09-28 13:47:56 -0700750 if self.sched.config.has_option('zuul', 'status_url'):
751 msg += "\n" + self.sched.config.get('zuul', 'status_url')
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000752 ret = self.sendReport(self.pipeline.start_actions,
753 change, msg)
James E. Blaire0487072012-08-29 17:38:31 -0700754 if ret:
755 self.log.error("Reporting change start %s received: %s" %
756 (change, ret))
757 except:
758 self.log.exception("Exception while reporting start:")
759
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000760 def sendReport(self, action_reporters, change, message):
761 """Sends the built message off to configured reporters.
762
763 Takes the action_reporters, change, message and extra options and
764 sends them to the pluggable reporters.
765 """
766 report_errors = []
767 if len(action_reporters) > 0:
768 if not change.number:
769 self.log.debug("Not reporting change %s: No number present."
770 % change)
771 return
772 for action_reporter in action_reporters:
773 ret = action_reporter.report(change, message)
774 if ret:
775 report_errors.append(ret)
776 if len(report_errors) == 0:
777 return
778 return report_errors
779
James E. Blaire0487072012-08-29 17:38:31 -0700780 def isChangeReadyToBeEnqueued(self, change):
781 return True
782
783 def enqueueChangesAhead(self, change):
784 return True
785
786 def enqueueChangesBehind(self, change):
787 return True
788
James E. Blairfee8d652013-06-07 08:57:52 -0700789 def checkForChangesNeededBy(self, change):
790 return True
791
James E. Blair972e3c72013-08-29 12:04:55 -0700792 def getFailingDependentItem(self, item):
793 return None
794
James E. Blairfee8d652013-06-07 08:57:52 -0700795 def getDependentItems(self, item):
796 orig_item = item
797 items = []
798 while item.item_ahead:
799 items.append(item.item_ahead)
800 item = item.item_ahead
801 self.log.info("Change %s depends on changes %s" %
802 (orig_item.change,
803 [x.change for x in items]))
804 return items
805
James E. Blair972e3c72013-08-29 12:04:55 -0700806 def getItemForChange(self, change):
807 for item in self.pipeline.getAllItems():
808 if item.change.equals(change):
809 return item
810 return None
811
James E. Blair2fa50962013-01-30 21:50:41 -0800812 def findOldVersionOfChangeAlreadyInQueue(self, change):
813 for c in self.pipeline.getChangesInQueue():
814 if change.isUpdateOf(c):
815 return c
816 return None
817
818 def removeOldVersionsOfChange(self, change):
819 if not self.pipeline.dequeue_on_new_patchset:
820 return
821 old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
822 if old_change:
823 self.log.debug("Change %s is a new version of %s, removing %s" %
824 (change, old_change, old_change))
825 self.removeChange(old_change)
James E. Blair2fa50962013-01-30 21:50:41 -0800826
James E. Blair972e3c72013-08-29 12:04:55 -0700827 def reEnqueueItem(self, item):
James E. Blaircdccd972013-07-01 12:10:22 -0700828 change_queue = self.pipeline.getQueue(item.change.project)
829 if change_queue:
830 self.log.debug("Re-enqueing change %s in queue %s" %
831 (item.change, change_queue))
James E. Blair972e3c72013-08-29 12:04:55 -0700832 change_queue.enqueueItem(item)
James E. Blaircdccd972013-07-01 12:10:22 -0700833 self.reportStats(item)
834 return True
835 else:
836 self.log.error("Unable to find change queue for project %s" %
837 item.change.project)
838 return False
839
James E. Blairee743612012-05-29 14:49:32 -0700840 def addChange(self, change):
James E. Blaire0487072012-08-29 17:38:31 -0700841 self.log.debug("Considering adding change %s" % change)
James E. Blair0dc8ba92012-07-16 14:23:52 -0700842 if self.isChangeAlreadyInQueue(change):
843 self.log.debug("Change %s is already in queue, ignoring" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700844 return True
James E. Blair692c6b32012-07-17 11:16:35 -0700845
James E. Blaire0487072012-08-29 17:38:31 -0700846 if not self.isChangeReadyToBeEnqueued(change):
847 self.log.debug("Change %s is not ready to be enqueued, ignoring" %
848 change)
849 return False
850
851 if not self.enqueueChangesAhead(change):
James E. Blair1490eba2013-03-06 19:14:00 -0800852 self.log.debug("Failed to enqueue changes ahead of %s" % change)
James E. Blaire0487072012-08-29 17:38:31 -0700853 return False
854
855 if self.isChangeAlreadyInQueue(change):
856 self.log.debug("Change %s is already in queue, ignoring" % change)
857 return True
858
859 change_queue = self.pipeline.getQueue(change.project)
860 if change_queue:
861 self.log.debug("Adding change %s to queue %s" %
862 (change, change_queue))
Joshua Hesketh1879cf72013-08-19 14:13:15 +1000863 if len(self.pipeline.start_actions) > 0:
James E. Blaire0487072012-08-29 17:38:31 -0700864 self.reportStart(change)
James E. Blairfee8d652013-06-07 08:57:52 -0700865 item = change_queue.enqueueChange(change)
866 self.reportStats(item)
James E. Blaire0487072012-08-29 17:38:31 -0700867 self.enqueueChangesBehind(change)
868 else:
869 self.log.error("Unable to find change queue for project %s" %
870 change.project)
871 return False
James E. Blairee743612012-05-29 14:49:32 -0700872
James E. Blair972e3c72013-08-29 12:04:55 -0700873 def dequeueItem(self, item):
James E. Blairfee8d652013-06-07 08:57:52 -0700874 self.log.debug("Removing change %s from queue" % item.change)
James E. Blairfee8d652013-06-07 08:57:52 -0700875 change_queue = self.pipeline.getQueue(item.change.project)
876 change_queue.dequeueItem(item)
James E. Blair0e933c52013-07-11 10:18:52 -0700877 self.sched._maintain_trigger_cache = True
James E. Blair2fa50962013-01-30 21:50:41 -0800878
879 def removeChange(self, change):
880 # Remove a change from the queue, probably because it has been
881 # superceded by another change.
James E. Blairfee8d652013-06-07 08:57:52 -0700882 for item in self.pipeline.getAllItems():
883 if item.change == change:
884 self.log.debug("Canceling builds behind change: %s "
885 "because it is being removed." % item.change)
886 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -0700887 self.dequeueItem(item)
James E. Blair94235562013-08-26 18:12:31 -0700888 self.reportStats(item)
James E. Blair2fa50962013-01-30 21:50:41 -0800889
James E. Blairfee8d652013-06-07 08:57:52 -0700890 def prepareRef(self, item):
891 # Returns False on success.
892 # Returns True if we were unable to prepare the ref.
893 ref = item.current_build_set.ref
894 if hasattr(item.change, 'refspec') and not ref:
895 self.log.debug("Preparing ref for: %s" % item.change)
896 item.current_build_set.setConfiguration()
897 ref = item.current_build_set.ref
898 dependent_items = self.getDependentItems(item)
899 dependent_items.reverse()
900 all_items = dependent_items + [item]
James E. Blairfee8d652013-06-07 08:57:52 -0700901 commit = self.sched.merger.mergeChanges(all_items, ref)
902 item.current_build_set.commit = commit
James E. Blair81515ad2012-10-01 18:29:08 -0700903 if not commit:
James E. Blairfee8d652013-06-07 08:57:52 -0700904 self.log.info("Unable to merge change %s" % item.change)
James E. Blair972e3c72013-08-29 12:04:55 -0700905 msg = ("This change was unable to be automatically merged "
906 "with the current state of the repository. Please "
907 "rebase your change and upload a new patchset.")
James E. Blair6736beb2013-07-11 15:18:15 -0700908 self.pipeline.setUnableToMerge(item, msg)
James E. Blairfee8d652013-06-07 08:57:52 -0700909 return True
910 return False
911
912 def _launchJobs(self, item, jobs):
913 self.log.debug("Launching jobs for change %s" % item.change)
914 dependent_items = self.getDependentItems(item)
915 for job in jobs:
916 self.log.debug("Found job %s for change %s" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700917 try:
James E. Blairfee8d652013-06-07 08:57:52 -0700918 build = self.sched.launcher.launch(job, item,
919 self.pipeline,
920 dependent_items)
921 self.building_jobs[build] = item
922 self.log.debug("Adding build %s of job %s to item %s" %
923 (build, job, item))
924 item.addBuild(build)
James E. Blairee743612012-05-29 14:49:32 -0700925 except:
Zhongyue Luo1c860d72012-07-19 11:03:56 +0800926 self.log.exception("Exception while launching job %s "
James E. Blairfee8d652013-06-07 08:57:52 -0700927 "for change %s:" % (job, item.change))
James E. Blairee743612012-05-29 14:49:32 -0700928
James E. Blairfee8d652013-06-07 08:57:52 -0700929 def launchJobs(self, item):
930 jobs = self.pipeline.findJobsToRun(item)
James E. Blairdaabed22012-08-15 15:38:57 -0700931 if jobs:
James E. Blairfee8d652013-06-07 08:57:52 -0700932 self._launchJobs(item, jobs)
933
934 def cancelJobs(self, item, prime=True):
935 self.log.debug("Cancel jobs for change %s" % item.change)
936 canceled = False
937 to_remove = []
938 if prime and item.current_build_set.builds:
939 item.resetAllBuilds()
940 for build, build_item in self.building_jobs.items():
941 if build_item == item:
942 self.log.debug("Found build %s for change %s to cancel" %
943 (build, item.change))
944 try:
945 self.sched.launcher.cancel(build)
946 except:
947 self.log.exception("Exception while canceling build %s "
948 "for change %s" % (build, item.change))
949 to_remove.append(build)
950 canceled = True
951 for build in to_remove:
952 self.log.debug("Removing build %s from running builds" % build)
953 build.result = 'CANCELED'
954 del self.building_jobs[build]
James E. Blair972e3c72013-08-29 12:04:55 -0700955 for item_behind in item.items_behind:
James E. Blairfee8d652013-06-07 08:57:52 -0700956 self.log.debug("Canceling jobs for change %s, behind change %s" %
James E. Blair972e3c72013-08-29 12:04:55 -0700957 (item_behind.change, item.change))
958 if self.cancelJobs(item_behind, prime=prime):
James E. Blairfee8d652013-06-07 08:57:52 -0700959 canceled = True
960 return canceled
961
James E. Blair972e3c72013-08-29 12:04:55 -0700962 def _processOneItem(self, item, nnfi):
James E. Blairfee8d652013-06-07 08:57:52 -0700963 changed = False
964 item_ahead = item.item_ahead
James E. Blair972e3c72013-08-29 12:04:55 -0700965 change_queue = self.pipeline.getQueue(item.change.project)
966 failing_reasons = [] # Reasons this item is failing
967
James E. Blairfee8d652013-06-07 08:57:52 -0700968 if self.checkForChangesNeededBy(item.change) is not True:
969 # It's not okay to enqueue this change, we should remove it.
970 self.log.info("Dequeuing change %s because "
971 "it can no longer merge" % item.change)
972 self.cancelJobs(item)
James E. Blair972e3c72013-08-29 12:04:55 -0700973 self.dequeueItem(item)
James E. Blairfee8d652013-06-07 08:57:52 -0700974 self.pipeline.setDequeuedNeedingChange(item)
975 try:
976 self.reportItem(item)
977 except MergeFailure:
978 pass
James E. Blair972e3c72013-08-29 12:04:55 -0700979 return (True, nnfi)
980 dep_item = self.getFailingDependentItem(item)
981 if dep_item:
982 failing_reasons.append('a needed change is failing')
983 self.cancelJobs(item, prime=False)
James E. Blairfee8d652013-06-07 08:57:52 -0700984 else:
James E. Blairfef71632013-09-23 11:15:47 -0700985 item_ahead_merged = False
986 if ((item_ahead and item_ahead.change.is_merged) or
987 not change_queue.dependent):
988 item_ahead_merged = True
989 if (item_ahead != nnfi and not item_ahead_merged):
James E. Blair972e3c72013-08-29 12:04:55 -0700990 # Our current base is different than what we expected,
991 # and it's not because our current base merged. Something
992 # ahead must have failed.
993 self.log.info("Resetting builds for change %s because the "
994 "item ahead, %s, is not the nearest non-failing "
995 "item, %s" % (item.change, item_ahead, nnfi))
996 change_queue.moveItem(item, nnfi)
997 changed = True
998 self.cancelJobs(item)
999 self.prepareRef(item)
1000 if item.current_build_set.unable_to_merge:
James E. Blair062c4fb2013-09-26 07:46:00 -07001001 failing_reasons.append("it has a merge conflict")
James E. Blairfee8d652013-06-07 08:57:52 -07001002 if self.launchJobs(item):
1003 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001004 if self.pipeline.didAnyJobFail(item):
1005 failing_reasons.append("at least one job failed")
1006 if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
1007 try:
1008 self.reportItem(item)
1009 except MergeFailure:
James E. Blair062c4fb2013-09-26 07:46:00 -07001010 failing_reasons.append("it did not merge")
James E. Blair972e3c72013-08-29 12:04:55 -07001011 for item_behind in item.items_behind:
1012 self.log.info("Resetting builds for change %s because the "
1013 "item ahead, %s, failed to merge" %
1014 (item_behind.change, item))
1015 self.cancelJobs(item_behind)
1016 self.dequeueItem(item)
1017 changed = True
1018 elif not failing_reasons:
1019 nnfi = item
1020 item.current_build_set.failing_reasons = failing_reasons
1021 if failing_reasons:
1022 self.log.debug("%s is a failing item because %s" %
1023 (item, failing_reasons))
1024 return (changed, nnfi)
James E. Blairfee8d652013-06-07 08:57:52 -07001025
1026 def processQueue(self):
1027 # Do whatever needs to be done for each change in the queue
1028 self.log.debug("Starting queue processor: %s" % self.pipeline.name)
1029 changed = False
James E. Blair972e3c72013-08-29 12:04:55 -07001030 for queue in self.pipeline.queues:
1031 queue_changed = False
1032 nnfi = None # Nearest non-failing item
1033 for item in queue.queue[:]:
1034 item_changed, nnfi = self._processOneItem(item, nnfi)
1035 if item_changed:
1036 queue_changed = True
1037 self.reportStats(item)
1038 if queue_changed:
James E. Blairfee8d652013-06-07 08:57:52 -07001039 changed = True
James E. Blair972e3c72013-08-29 12:04:55 -07001040 status = ''
1041 for item in queue.queue:
1042 status += self.pipeline.formatStatus(item)
1043 if status:
1044 self.log.debug("Queue %s status is now:\n %s" %
1045 (queue.name, status))
James E. Blairfadc6e12013-08-21 18:23:15 -07001046 self.log.debug("Finished queue processor: %s (changed: %s)" %
1047 (self.pipeline.name, changed))
James E. Blairfee8d652013-06-07 08:57:52 -07001048 return changed
James E. Blairdaabed22012-08-15 15:38:57 -07001049
James E. Blair11700c32012-07-05 17:50:05 -07001050 def updateBuildDescriptions(self, build_set):
1051 for build in build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001052 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001053 self.sched.launcher.setBuildDescription(build, desc)
1054
1055 if build_set.previous_build_set:
1056 for build in build_set.previous_build_set.getBuilds():
James E. Blair8b0d4c42012-08-23 16:03:05 -07001057 desc = self.formatDescription(build)
James E. Blair11700c32012-07-05 17:50:05 -07001058 self.sched.launcher.setBuildDescription(build, desc)
1059
1060 def onBuildStarted(self, build):
James E. Blair11700c32012-07-05 17:50:05 -07001061 if build not in self.building_jobs:
James E. Blair11700c32012-07-05 17:50:05 -07001062 # Or triggered externally, or triggered before zuul started,
1063 # or restarted
1064 return False
1065
James E. Blairfee8d652013-06-07 08:57:52 -07001066 self.log.debug("Build %s started" % build)
James E. Blair11700c32012-07-05 17:50:05 -07001067 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001068 while self.processQueue():
1069 pass
James E. Blair11700c32012-07-05 17:50:05 -07001070 return True
1071
James E. Blairee743612012-05-29 14:49:32 -07001072 def onBuildCompleted(self, build):
James E. Blair1e8dd892012-05-30 09:15:05 -07001073 if build not in self.building_jobs:
James E. Blairee743612012-05-29 14:49:32 -07001074 # Or triggered externally, or triggered before zuul started,
1075 # or restarted
1076 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001077
1078 self.log.debug("Build %s completed" % build)
James E. Blairee743612012-05-29 14:49:32 -07001079 change = self.building_jobs[build]
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001080 self.log.debug("Found change %s which triggered completed build %s" %
1081 (change, build))
James E. Blairee743612012-05-29 14:49:32 -07001082
1083 del self.building_jobs[build]
1084
James E. Blair4aea70c2012-07-26 14:23:24 -07001085 self.pipeline.setResult(change, build)
James E. Blair972e3c72013-08-29 12:04:55 -07001086 self.log.debug("Change %s status is now:\n %s" %
1087 (change, self.pipeline.formatStatus(change)))
James E. Blair11700c32012-07-05 17:50:05 -07001088 self.updateBuildDescriptions(build.build_set)
James E. Blairfee8d652013-06-07 08:57:52 -07001089 while self.processQueue():
1090 pass
James E. Blairee743612012-05-29 14:49:32 -07001091 return True
1092
James E. Blairfee8d652013-06-07 08:57:52 -07001093 def reportItem(self, item):
1094 if item.change.is_reportable and item.reported:
1095 raise Exception("Already reported change %s" % item.change)
1096 ret = self._reportItem(item)
1097 if self.changes_merge:
1098 succeeded = self.pipeline.didAllJobsSucceed(item)
1099 merged = (not ret)
1100 if merged:
James E. Blair6c358e72013-07-29 17:06:47 -07001101 merged = self.pipeline.trigger.isMerged(item.change,
1102 item.change.branch)
James E. Blairfee8d652013-06-07 08:57:52 -07001103 self.log.info("Reported change %s status: all-succeeded: %s, "
1104 "merged: %s" % (item.change, succeeded, merged))
1105 if not (succeeded and merged):
1106 self.log.debug("Reported change %s failed tests or failed "
1107 "to merge" % (item.change))
1108 raise MergeFailure("Change %s failed to merge" % item.change)
James E. Blaire0487072012-08-29 17:38:31 -07001109
James E. Blairfee8d652013-06-07 08:57:52 -07001110 def _reportItem(self, item):
1111 if not item.change.is_reportable:
James E. Blaire0487072012-08-29 17:38:31 -07001112 return False
James E. Blairfee8d652013-06-07 08:57:52 -07001113 if item.change.is_reportable and item.reported:
James E. Blairb0fcae42012-07-17 11:12:10 -07001114 return 0
James E. Blairfee8d652013-06-07 08:57:52 -07001115 self.log.debug("Reporting change %s" % item.change)
James E. Blairb98fcdb2013-08-26 18:23:09 -07001116 ret = True # Means error as returned by trigger.report
James E. Blairfee8d652013-06-07 08:57:52 -07001117 if self.pipeline.didAllJobsSucceed(item):
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001118 self.log.debug("success %s %s" % (self.pipeline.success_actions,
1119 self.pipeline.failure_actions))
1120 actions = self.pipeline.success_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001121 item.setReportedResult('SUCCESS')
James E. Blairee743612012-05-29 14:49:32 -07001122 else:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001123 actions = self.pipeline.failure_actions
James E. Blairfee8d652013-06-07 08:57:52 -07001124 item.setReportedResult('FAILURE')
1125 report = self.formatReport(item)
1126 item.reported = True
James E. Blairee743612012-05-29 14:49:32 -07001127 try:
Joshua Hesketh1879cf72013-08-19 14:13:15 +10001128 self.log.info("Reporting change %s, actions: %s" %
1129 (item.change, actions))
1130 ret = self.sendReport(actions, item.change, report)
James E. Blairee743612012-05-29 14:49:32 -07001131 if ret:
Zhongyue Luo1c860d72012-07-19 11:03:56 +08001132 self.log.error("Reporting change %s received: %s" %
James E. Blairfee8d652013-06-07 08:57:52 -07001133 (item.change, ret))
James E. Blairee743612012-05-29 14:49:32 -07001134 except:
1135 self.log.exception("Exception while reporting:")
James E. Blairfee8d652013-06-07 08:57:52 -07001136 item.setReportedResult('ERROR')
1137 self.updateBuildDescriptions(item.current_build_set)
James E. Blairee743612012-05-29 14:49:32 -07001138 return ret
1139
James E. Blairfee8d652013-06-07 08:57:52 -07001140 def formatReport(self, item):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001141 ret = ''
James E. Blairfee8d652013-06-07 08:57:52 -07001142 if self.pipeline.didAllJobsSucceed(item):
James E. Blair56370192013-01-14 15:47:28 -08001143 ret += self.pipeline.success_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001144 else:
James E. Blair56370192013-01-14 15:47:28 -08001145 ret += self.pipeline.failure_message + '\n\n'
James E. Blair8b0d4c42012-08-23 16:03:05 -07001146
James E. Blairfee8d652013-06-07 08:57:52 -07001147 if item.dequeued_needing_change:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001148 ret += "This change depends on a change that failed to merge."
James E. Blair6736beb2013-07-11 15:18:15 -07001149 elif item.current_build_set.unable_to_merge_message:
1150 ret += item.current_build_set.unable_to_merge_message
James E. Blair8b0d4c42012-08-23 16:03:05 -07001151 else:
James E. Blaira35fcce2012-08-24 10:46:01 -07001152 if self.sched.config.has_option('zuul', 'url_pattern'):
James E. Blair6aea36d2012-12-17 13:03:24 -08001153 url_pattern = self.sched.config.get('zuul', 'url_pattern')
James E. Blaira35fcce2012-08-24 10:46:01 -07001154 else:
James E. Blair6aea36d2012-12-17 13:03:24 -08001155 url_pattern = None
James E. Blairfee8d652013-06-07 08:57:52 -07001156 for job in self.pipeline.getJobs(item.change):
1157 build = item.current_build_set.getBuild(job.name)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001158 result = build.result
James E. Blair6aea36d2012-12-17 13:03:24 -08001159 pattern = url_pattern
1160 if result == 'SUCCESS':
1161 if job.success_message:
1162 result = job.success_message
1163 if job.success_pattern:
1164 pattern = job.success_pattern
1165 elif result == 'FAILURE':
1166 if job.failure_message:
1167 result = job.failure_message
1168 if job.failure_pattern:
1169 pattern = job.failure_pattern
Ori Livneh7191ee82013-05-02 19:13:53 -07001170 if pattern:
James E. Blairfee8d652013-06-07 08:57:52 -07001171 url = pattern.format(change=item.change,
Ori Livneh7191ee82013-05-02 19:13:53 -07001172 pipeline=self.pipeline,
1173 job=job,
1174 build=build)
1175 else:
1176 url = build.url or job.name
James E. Blair8b0d4c42012-08-23 16:03:05 -07001177 if not job.voting:
1178 voting = ' (non-voting)'
1179 else:
1180 voting = ''
James E. Blair0ac6c012013-04-26 09:04:23 -07001181 if self.report_times and build.end_time and build.start_time:
1182 dt = int(build.end_time - build.start_time)
1183 m, s = divmod(dt, 60)
1184 h, m = divmod(m, 60)
Sean Dague51fd1192013-05-03 07:09:53 -04001185 if h:
1186 elapsed = ' in %dh %02dm %02ds' % (h, m, s)
1187 elif m:
1188 elapsed = ' in %dm %02ds' % (m, s)
1189 else:
1190 elapsed = ' in %ds' % (s)
James E. Blair0ac6c012013-04-26 09:04:23 -07001191 else:
1192 elapsed = ''
James E. Blair754e31e2013-08-18 13:15:15 -07001193 name = ''
James E. Blair8fef52b2013-08-17 17:32:50 -07001194 if self.sched.config.has_option('zuul', 'job_name_in_report'):
1195 if self.sched.config.getboolean('zuul',
1196 'job_name_in_report'):
1197 name = job.name + ' '
James E. Blair8fef52b2013-08-17 17:32:50 -07001198 ret += '- %s%s : %s%s%s\n' % (name, url, result, elapsed,
1199 voting)
James E. Blair8b0d4c42012-08-23 16:03:05 -07001200 return ret
1201
1202 def formatDescription(self, build):
1203 concurrent_changes = ''
1204 concurrent_builds = ''
1205 other_builds = ''
1206
1207 for change in build.build_set.other_changes:
1208 concurrent_changes += '<li><a href="{change.url}">\
1209 {change.number},{change.patchset}</a></li>'.format(
1210 change=change)
1211
James E. Blairfee8d652013-06-07 08:57:52 -07001212 change = build.build_set.item.change
James E. Blair8b0d4c42012-08-23 16:03:05 -07001213
1214 for build in build.build_set.getBuilds():
Ori Livneh7191ee82013-05-02 19:13:53 -07001215 if build.url:
James E. Blair8b0d4c42012-08-23 16:03:05 -07001216 concurrent_builds += """\
1217<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001218 <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001219 {build.job.name} #{build.number}</a>: {build.result}
1220</li>
1221""".format(build=build)
1222 else:
1223 concurrent_builds += """\
1224<li>
1225 {build.job.name}: {build.result}
1226</li>""".format(build=build)
1227
1228 if build.build_set.previous_build_set:
1229 other_build = build.build_set.previous_build_set.getBuild(
1230 build.job.name)
1231 if other_build:
1232 other_builds += """\
1233<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001234 Preceded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001235 {build.job.name} #{build.number}</a>
1236</li>
1237""".format(build=other_build)
1238
1239 if build.build_set.next_build_set:
1240 other_build = build.build_set.next_build_set.getBuild(
1241 build.job.name)
1242 if other_build:
1243 other_builds += """\
1244<li>
Ori Livneh7191ee82013-05-02 19:13:53 -07001245 Succeeded by: <a href="{build.url}">
James E. Blair8b0d4c42012-08-23 16:03:05 -07001246 {build.job.name} #{build.number}</a>
1247</li>
1248""".format(build=other_build)
1249
1250 result = build.build_set.result
1251
1252 if hasattr(change, 'number'):
1253 ret = """\
1254<p>
1255 Triggered by change:
1256 <a href="{change.url}">{change.number},{change.patchset}</a><br/>
1257 Branch: <b>{change.branch}</b><br/>
1258 Pipeline: <b>{self.pipeline.name}</b>
1259</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001260 elif hasattr(change, 'ref'):
James E. Blair8b0d4c42012-08-23 16:03:05 -07001261 ret = """\
1262<p>
1263 Triggered by reference:
1264 {change.ref}</a><br/>
1265 Old revision: <b>{change.oldrev}</b><br/>
1266 New revision: <b>{change.newrev}</b><br/>
1267 Pipeline: <b>{self.pipeline.name}</b>
1268</p>"""
James E. Blair63bb0ef2013-07-29 17:14:51 -07001269 else:
1270 ret = ""
James E. Blair8b0d4c42012-08-23 16:03:05 -07001271
1272 if concurrent_changes:
1273 ret += """\
1274<p>
1275 Other changes tested concurrently with this change:
1276 <ul>{concurrent_changes}</ul>
1277</p>
1278"""
1279 if concurrent_builds:
1280 ret += """\
1281<p>
1282 All builds for this change set:
1283 <ul>{concurrent_builds}</ul>
1284</p>
1285"""
1286
1287 if other_builds:
1288 ret += """\
1289<p>
1290 Other build sets for this change:
1291 <ul>{other_builds}</ul>
1292</p>
1293"""
1294 if result:
1295 ret += """\
1296<p>
1297 Reported result: <b>{result}</b>
1298</p>
1299"""
1300
1301 ret = ret.format(**locals())
James E. Blair268d9342012-06-13 18:24:29 -07001302 return ret
1303
James E. Blairfee8d652013-06-07 08:57:52 -07001304 def reportStats(self, item):
James E. Blair8fa16972013-01-15 16:57:20 -08001305 if not statsd:
1306 return
1307 try:
James E. Blairfee8d652013-06-07 08:57:52 -07001308 # Update the gauge on enqueue and dequeue, but timers only
James E. Blair8fa16972013-01-15 16:57:20 -08001309 # when dequeing.
James E. Blairfee8d652013-06-07 08:57:52 -07001310 if item.dequeue_time:
1311 dt = int((item.dequeue_time - item.enqueue_time) * 1000)
James E. Blair8fa16972013-01-15 16:57:20 -08001312 else:
1313 dt = None
James E. Blairfee8d652013-06-07 08:57:52 -07001314 items = len(self.pipeline.getAllItems())
James E. Blair8fa16972013-01-15 16:57:20 -08001315
1316 # stats.timers.zuul.pipeline.NAME.resident_time
1317 # stats_counts.zuul.pipeline.NAME.total_changes
1318 # stats.gauges.zuul.pipeline.NAME.current_changes
1319 key = 'zuul.pipeline.%s' % self.pipeline.name
James E. Blairfee8d652013-06-07 08:57:52 -07001320 statsd.gauge(key + '.current_changes', items)
James E. Blair8fa16972013-01-15 16:57:20 -08001321 if dt:
1322 statsd.timing(key + '.resident_time', dt)
1323 statsd.incr(key + '.total_changes')
1324
1325 # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
1326 # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
James E. Blairfee8d652013-06-07 08:57:52 -07001327 project_name = item.change.project.name.replace('/', '.')
James E. Blair8fa16972013-01-15 16:57:20 -08001328 key += '.%s' % project_name
1329 if dt:
1330 statsd.timing(key + '.resident_time', dt)
1331 statsd.incr(key + '.total_changes')
1332 except:
1333 self.log.exception("Exception reporting pipeline stats")
1334
James E. Blair1e8dd892012-05-30 09:15:05 -07001335
James E. Blair4aea70c2012-07-26 14:23:24 -07001336class IndependentPipelineManager(BasePipelineManager):
1337 log = logging.getLogger("zuul.IndependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001338 changes_merge = False
1339
James E. Blaireff88162013-07-01 12:44:14 -04001340 def _postConfig(self, layout):
1341 super(IndependentPipelineManager, self)._postConfig(layout)
James E. Blaire0487072012-08-29 17:38:31 -07001342
1343 change_queue = ChangeQueue(self.pipeline, dependent=False)
1344 for project in self.pipeline.getProjects():
1345 change_queue.addProject(project)
1346
1347 self.pipeline.addQueue(change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001348
James E. Blair1e8dd892012-05-30 09:15:05 -07001349
James E. Blair4aea70c2012-07-26 14:23:24 -07001350class DependentPipelineManager(BasePipelineManager):
1351 log = logging.getLogger("zuul.DependentPipelineManager")
James E. Blaire0487072012-08-29 17:38:31 -07001352 changes_merge = True
James E. Blairee743612012-05-29 14:49:32 -07001353
1354 def __init__(self, *args, **kwargs):
James E. Blair4aea70c2012-07-26 14:23:24 -07001355 super(DependentPipelineManager, self).__init__(*args, **kwargs)
James E. Blairee743612012-05-29 14:49:32 -07001356
James E. Blaireff88162013-07-01 12:44:14 -04001357 def _postConfig(self, layout):
1358 super(DependentPipelineManager, self)._postConfig(layout)
James E. Blairee743612012-05-29 14:49:32 -07001359 self.buildChangeQueues()
1360
1361 def buildChangeQueues(self):
1362 self.log.debug("Building shared change queues")
1363 change_queues = []
1364
James E. Blair4aea70c2012-07-26 14:23:24 -07001365 for project in self.pipeline.getProjects():
1366 change_queue = ChangeQueue(self.pipeline)
1367 change_queue.addProject(project)
1368 change_queues.append(change_queue)
1369 self.log.debug("Created queue: %s" % change_queue)
James E. Blairee743612012-05-29 14:49:32 -07001370
1371 self.log.debug("Combining shared queues")
1372 new_change_queues = []
1373 for a in change_queues:
1374 merged_a = False
1375 for b in new_change_queues:
1376 if not a.getJobs().isdisjoint(b.getJobs()):
1377 self.log.debug("Merging queue %s into %s" % (a, b))
1378 b.mergeChangeQueue(a)
1379 merged_a = True
1380 break # this breaks out of 'for b' and continues 'for a'
1381 if not merged_a:
1382 self.log.debug("Keeping queue %s" % (a))
1383 new_change_queues.append(a)
James E. Blair1e8dd892012-05-30 09:15:05 -07001384
James E. Blairee743612012-05-29 14:49:32 -07001385 self.log.info(" Shared change queues:")
James E. Blaire0487072012-08-29 17:38:31 -07001386 for queue in new_change_queues:
1387 self.pipeline.addQueue(queue)
1388 self.log.info(" %s" % queue)
James E. Blairee743612012-05-29 14:49:32 -07001389
James E. Blaire0487072012-08-29 17:38:31 -07001390 def isChangeReadyToBeEnqueued(self, change):
James E. Blair6c358e72013-07-29 17:06:47 -07001391 if not self.pipeline.trigger.canMerge(change,
1392 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001393 self.log.debug("Change %s can not merge, ignoring" % change)
1394 return False
1395 return True
James E. Blair1e8dd892012-05-30 09:15:05 -07001396
James E. Blaire0487072012-08-29 17:38:31 -07001397 def enqueueChangesBehind(self, change):
1398 to_enqueue = []
1399 self.log.debug("Checking for changes needing %s:" % change)
1400 if not hasattr(change, 'needed_by_changes'):
1401 self.log.debug(" Changeish does not support dependencies")
1402 return
1403 for needs in change.needed_by_changes:
James E. Blair6c358e72013-07-29 17:06:47 -07001404 if self.pipeline.trigger.canMerge(needs,
1405 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001406 self.log.debug(" Change %s needs %s and is ready to merge" %
1407 (needs, change))
1408 to_enqueue.append(needs)
1409 if not to_enqueue:
1410 self.log.debug(" No changes need %s" % change)
1411
1412 for other_change in to_enqueue:
1413 self.addChange(other_change)
1414
1415 def enqueueChangesAhead(self, change):
1416 ret = self.checkForChangesNeededBy(change)
1417 if ret in [True, False]:
1418 return ret
1419 self.log.debug(" Change %s must be merged ahead of %s" %
1420 (ret, change))
1421 return self.addChange(ret)
1422
1423 def checkForChangesNeededBy(self, change):
James E. Blaire421a232012-07-25 16:59:21 -07001424 self.log.debug("Checking for changes needed by %s:" % change)
1425 # Return true if okay to proceed enqueing this change,
1426 # false if the change should not be enqueued.
James E. Blair4aea70c2012-07-26 14:23:24 -07001427 if not hasattr(change, 'needs_change'):
1428 self.log.debug(" Changeish does not support dependencies")
1429 return True
James E. Blaire421a232012-07-25 16:59:21 -07001430 if not change.needs_change:
1431 self.log.debug(" No changes needed")
1432 return True
1433 if change.needs_change.is_merged:
1434 self.log.debug(" Needed change is merged")
1435 return True
1436 if not change.needs_change.is_current_patchset:
1437 self.log.debug(" Needed change is not the current patchset")
1438 return False
James E. Blair127bc182012-08-28 15:55:15 -07001439 if self.isChangeAlreadyInQueue(change.needs_change):
James E. Blaire421a232012-07-25 16:59:21 -07001440 self.log.debug(" Needed change is already ahead in the queue")
1441 return True
James E. Blair6c358e72013-07-29 17:06:47 -07001442 if self.pipeline.trigger.canMerge(change.needs_change,
1443 self.getSubmitAllowNeeds()):
James E. Blaire0487072012-08-29 17:38:31 -07001444 self.log.debug(" Change %s is needed" %
1445 change.needs_change)
1446 return change.needs_change
James E. Blaire421a232012-07-25 16:59:21 -07001447 # The needed change can't be merged.
1448 self.log.debug(" Change %s is needed but can not be merged" %
1449 change.needs_change)
1450 return False
James E. Blair972e3c72013-08-29 12:04:55 -07001451
1452 def getFailingDependentItem(self, item):
1453 if not hasattr(item.change, 'needs_change'):
1454 return None
1455 if not item.change.needs_change:
1456 return None
1457 needs_item = self.getItemForChange(item.change.needs_change)
1458 if not needs_item:
1459 return None
1460 if needs_item.current_build_set.failing_reasons:
1461 return needs_item
1462 return None