blob: b000a6f5eb73e4f7d499be4e2429e05372adea3e [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070023import six
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
James E. Blairee743612012-05-29 14:49:32 -070028
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +100029from zuul import configloader
Morgan Fainberg9c4700a2016-05-30 14:25:19 -070030from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080031from zuul import exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040032from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair1e8dd892012-05-30 09:15:05 -070034
James E. Blairaf17a972016-02-03 15:07:18 -080035class MutexHandler(object):
36 log = logging.getLogger("zuul.MutexHandler")
37
38 def __init__(self):
39 self.mutexes = {}
40
41 def acquire(self, item, job):
42 if not job.mutex:
43 return True
44 mutex_name = job.mutex
45 m = self.mutexes.get(mutex_name)
46 if not m:
47 # The mutex is not held, acquire it
48 self._acquire(mutex_name, item, job.name)
49 return True
50 held_item, held_job_name = m
51 if held_item is item and held_job_name == job.name:
52 # This item already holds the mutex
53 return True
54 held_build = held_item.current_build_set.getBuild(held_job_name)
55 if held_build and held_build.result:
56 # The build that held the mutex is complete, release it
57 # and let the new item have it.
58 self.log.error("Held mutex %s being released because "
59 "the build that holds it is complete" %
60 (mutex_name,))
61 self._release(mutex_name, item, job.name)
62 self._acquire(mutex_name, item, job.name)
63 return True
64 return False
65
66 def release(self, item, job):
67 if not job.mutex:
68 return
69 mutex_name = job.mutex
70 m = self.mutexes.get(mutex_name)
71 if not m:
72 # The mutex is not held, nothing to do
73 self.log.error("Mutex can not be released for %s "
74 "because the mutex is not held" %
75 (item,))
76 return
77 held_item, held_job_name = m
78 if held_item is item and held_job_name == job.name:
79 # This item holds the mutex
80 self._release(mutex_name, item, job.name)
81 return
82 self.log.error("Mutex can not be released for %s "
83 "which does not hold it" %
84 (item,))
85
86 def _acquire(self, mutex_name, item, job_name):
87 self.log.debug("Job %s of item %s acquiring mutex %s" %
88 (job_name, item, mutex_name))
89 self.mutexes[mutex_name] = (item, job_name)
90
91 def _release(self, mutex_name, item, job_name):
92 self.log.debug("Job %s of item %s releasing mutex %s" %
93 (job_name, item, mutex_name))
94 del self.mutexes[mutex_name]
95
96
James E. Blair468c8512013-12-06 13:27:19 -080097class ManagementEvent(object):
98 """An event that should be processed within the main queue run loop"""
99 def __init__(self):
100 self._wait_event = threading.Event()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700101 self._exc_info = None
James E. Blair468c8512013-12-06 13:27:19 -0800102
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700103 def exception(self, exc_info):
104 self._exc_info = exc_info
James E. Blair36658cf2013-12-06 17:53:48 -0800105 self._wait_event.set()
106
107 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -0800108 self._wait_event.set()
109
110 def wait(self, timeout=None):
111 self._wait_event.wait(timeout)
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700112 if self._exc_info:
113 six.reraise(*self._exc_info)
James E. Blair468c8512013-12-06 13:27:19 -0800114 return self._wait_event.is_set()
115
116
117class ReconfigureEvent(ManagementEvent):
118 """Reconfigure the scheduler. The layout will be (re-)loaded from
119 the path specified in the configuration.
120
121 :arg ConfigParser config: the new configuration
122 """
123 def __init__(self, config):
124 super(ReconfigureEvent, self).__init__()
125 self.config = config
126
127
James E. Blair36658cf2013-12-06 17:53:48 -0800128class PromoteEvent(ManagementEvent):
129 """Promote one or more changes to the head of the queue.
130
131 :arg str pipeline_name: the name of the pipeline
132 :arg list change_ids: a list of strings of change ids in the form
133 1234,1
134 """
135
136 def __init__(self, pipeline_name, change_ids):
137 super(PromoteEvent, self).__init__()
138 self.pipeline_name = pipeline_name
139 self.change_ids = change_ids
140
141
James E. Blaird27a96d2014-07-10 13:25:13 -0700142class EnqueueEvent(ManagementEvent):
143 """Enqueue a change into a pipeline
144
145 :arg TriggerEvent trigger_event: a TriggerEvent describing the
146 trigger, pipeline, and change to enqueue
147 """
148
149 def __init__(self, trigger_event):
150 super(EnqueueEvent, self).__init__()
151 self.trigger_event = trigger_event
152
153
James E. Blaira84f0e42014-02-06 07:09:22 -0800154class ResultEvent(object):
155 """An event that needs to modify the pipeline state due to a
156 result from an external system."""
157
158 pass
159
160
161class BuildStartedEvent(ResultEvent):
162 """A build has started.
163
164 :arg Build build: The build which has started.
165 """
166
167 def __init__(self, build):
168 self.build = build
169
170
171class BuildCompletedEvent(ResultEvent):
172 """A build has completed
173
174 :arg Build build: The build which has completed.
175 """
176
177 def __init__(self, build):
178 self.build = build
179
180
James E. Blair4076e2b2014-01-28 12:42:20 -0800181class MergeCompletedEvent(ResultEvent):
182 """A remote merge operation has completed
183
184 :arg BuildSet build_set: The build_set which is ready.
185 :arg str zuul_url: The URL of the Zuul Merger.
186 :arg bool merged: Whether the merge succeeded (changes with refs).
187 :arg bool updated: Whether the repo was updated (changes without refs).
188 :arg str commit: The SHA of the merged commit (changes with refs).
189 """
190
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700191 def __init__(self, build_set, zuul_url, merged, updated, commit,
192 files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800193 self.build_set = build_set
194 self.zuul_url = zuul_url
195 self.merged = merged
196 self.updated = updated
197 self.commit = commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700198 self.files = files
James E. Blair4076e2b2014-01-28 12:42:20 -0800199
200
James E. Blair8d692392016-04-08 17:47:58 -0700201class NodesProvisionedEvent(ResultEvent):
202 """Nodes have been provisioned for a build_set
203
204 :arg BuildSet build_set: The build_set which has nodes.
205 :arg list of Node objects nodes: The provisioned nodes
206 """
207
208 def __init__(self, request):
209 self.request = request
210
211
Maru Newby3fe5f852015-01-13 04:22:14 +0000212def toList(item):
213 if not item:
214 return []
215 if isinstance(item, list):
216 return item
217 return [item]
218
219
James E. Blaire9d45c32012-05-31 09:56:45 -0700220class Scheduler(threading.Thread):
James E. Blairee743612012-05-29 14:49:32 -0700221 log = logging.getLogger("zuul.Scheduler")
222
James E. Blaire4d229c2016-05-25 15:25:41 -0700223 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700224 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400225 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700226 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700227 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800228 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700229 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700230 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700231 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700232 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800233 self.merger = None
James E. Blair83005782015-12-11 14:46:03 -0800234 self.connections = None
James E. Blair552b54f2016-07-22 13:55:32 -0700235 self.statsd = extras.try_import('statsd.statsd')
James E. Blair83005782015-12-11 14:46:03 -0800236 # TODO(jeblair): fix this
James E. Blairaf17a972016-02-03 15:07:18 -0800237 self.mutex = MutexHandler()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000238 # Despite triggers being part of the pipeline, there is one trigger set
239 # per scheduler. The pipeline handles the trigger filters but since
240 # the events are handled by the scheduler itself it needs to handle
241 # the loading of the triggers.
242 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700243 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000244 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700245
246 self.trigger_event_queue = Queue.Queue()
247 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800248 self.management_event_queue = Queue.Queue()
James E. Blair59fdbac2015-12-07 17:08:06 -0800249 self.abide = model.Abide()
James E. Blairee743612012-05-29 14:49:32 -0700250
James E. Blaire4d229c2016-05-25 15:25:41 -0700251 if not testonly:
252 time_dir = self._get_time_database_dir()
253 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700254
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000255 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400256 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400257
James E. Blairb0fcae42012-07-17 11:12:10 -0700258 def stop(self):
259 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000260 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700261 self.wake_event.set()
262
Joshua Hesketh352264b2015-08-11 23:42:08 +1000263 def testConfig(self, config_path, connections):
264 # Take the list of set up connections directly here rather than with
265 # registerConnections as we don't want to do the onLoad event yet.
266 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800267
Joshua Hesketh9a256752016-04-04 13:38:51 +1000268 def registerConnections(self, connections, load=True):
269 # load: whether or not to trigger the onLoad for the connection. This
270 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000271 self.connections = connections
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +1000272 self.connections.registerScheduler(self, load)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000273
274 def stopConnections(self):
James E. Blair83005782015-12-11 14:46:03 -0800275 self.connections.stop()
James E. Blair14abdf42015-12-09 16:11:53 -0800276
James E. Blairee743612012-05-29 14:49:32 -0700277 def setLauncher(self, launcher):
278 self.launcher = launcher
279
James E. Blair4076e2b2014-01-28 12:42:20 -0800280 def setMerger(self, merger):
281 self.merger = merger
282
James E. Blair8d692392016-04-08 17:47:58 -0700283 def setNodepool(self, nodepool):
284 self.nodepool = nodepool
285
James E. Blairee743612012-05-29 14:49:32 -0700286 def addEvent(self, event):
287 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800288 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700289 if self.statsd:
290 self.statsd.incr('gerrit.event.%s' % event.type)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800291 except:
292 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700293 self.trigger_event_queue.put(event)
294 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800295 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700296
James E. Blair11700c32012-07-05 17:50:05 -0700297 def onBuildStarted(self, build):
298 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800299 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800300 event = BuildStartedEvent(build)
301 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700302 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800303 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700304
James E. Blairf0358662015-07-20 15:19:12 -0700305 def onBuildCompleted(self, build, result):
306 self.log.debug("Adding complete event for build: %s result: %s" % (
307 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800308 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700309 # Note, as soon as the result is set, other threads may act
310 # upon this, even though the event hasn't been fully
311 # processed. Ensure that any other data from the event (eg,
312 # timing) is recorded before setting the result.
313 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800314 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700315 if self.statsd and build.pipeline:
James E. Blair66eeebf2013-07-27 17:44:32 -0700316 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500317 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700318 self.statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500319 for label in build.node_labels:
320 # Jenkins includes the node name in its list of labels, so
321 # we filter it out here, since that is not statistically
322 # interesting.
323 if label == build.node_name:
324 continue
325 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800326 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
327 build.pipeline.name, label)
James E. Blair552b54f2016-07-22 13:55:32 -0700328 self.statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700329 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
330 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800331 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
332 dt = int((build.end_time - build.start_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700333 self.statsd.timing(key, dt)
334 self.statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800335
336 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
337 build.pipeline.name, jobname)
338 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700339 self.statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800340 except:
341 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800342 event = BuildCompletedEvent(build)
343 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700344 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800345 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700346
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700347 def onMergeCompleted(self, build_set, zuul_url, merged, updated,
348 commit, files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800349 self.log.debug("Adding merge complete event for build set: %s" %
350 build_set)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700351 event = MergeCompletedEvent(build_set, zuul_url, merged,
352 updated, commit, files)
James E. Blair4076e2b2014-01-28 12:42:20 -0800353 self.result_event_queue.put(event)
354 self.wake_event.set()
355
James E. Blair8d692392016-04-08 17:47:58 -0700356 def onNodesProvisioned(self, req):
357 self.log.debug("Adding nodes provisioned event for build set: %s" %
358 req.build_set)
359 event = NodesProvisionedEvent(req)
360 self.result_event_queue.put(event)
361 self.wake_event.set()
362
James E. Blaire9d45c32012-05-31 09:56:45 -0700363 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700364 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800365 event = ReconfigureEvent(config)
366 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700367 self.wake_event.set()
368 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800369 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700370 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400371 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700372
James E. Blair36658cf2013-12-06 17:53:48 -0800373 def promote(self, pipeline_name, change_ids):
374 event = PromoteEvent(pipeline_name, change_ids)
375 self.management_event_queue.put(event)
376 self.wake_event.set()
377 self.log.debug("Waiting for promotion")
378 event.wait()
379 self.log.debug("Promotion complete")
380
James E. Blaird27a96d2014-07-10 13:25:13 -0700381 def enqueue(self, trigger_event):
382 event = EnqueueEvent(trigger_event)
383 self.management_event_queue.put(event)
384 self.wake_event.set()
385 self.log.debug("Waiting for enqueue")
386 event.wait()
387 self.log.debug("Enqueue complete")
388
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700389 def exit(self):
390 self.log.debug("Prepare to exit")
391 self._pause = True
392 self._exit = True
393 self.wake_event.set()
394 self.log.debug("Waiting for exit")
395
396 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700397 if self.config.has_option('zuul', 'state_dir'):
398 state_dir = os.path.expanduser(self.config.get('zuul',
399 'state_dir'))
400 else:
401 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700402 return os.path.join(state_dir, 'queue.pickle')
403
James E. Blairce8a2132016-05-19 15:21:52 -0700404 def _get_time_database_dir(self):
405 if self.config.has_option('zuul', 'state_dir'):
406 state_dir = os.path.expanduser(self.config.get('zuul',
407 'state_dir'))
408 else:
409 state_dir = '/var/lib/zuul'
410 d = os.path.join(state_dir, 'times')
411 if not os.path.exists(d):
412 os.mkdir(d)
413 return d
414
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700415 def _save_queue(self):
416 pickle_file = self._get_queue_pickle_file()
417 events = []
418 while not self.trigger_event_queue.empty():
419 events.append(self.trigger_event_queue.get())
420 self.log.debug("Queue length is %s" % len(events))
421 if events:
422 self.log.debug("Saving queue")
423 pickle.dump(events, open(pickle_file, 'wb'))
424
425 def _load_queue(self):
426 pickle_file = self._get_queue_pickle_file()
427 if os.path.exists(pickle_file):
428 self.log.debug("Loading queue")
429 events = pickle.load(open(pickle_file, 'rb'))
430 self.log.debug("Queue length is %s" % len(events))
431 for event in events:
432 self.trigger_event_queue.put(event)
433 else:
434 self.log.debug("No queue file found")
435
436 def _delete_queue(self):
437 pickle_file = self._get_queue_pickle_file()
438 if os.path.exists(pickle_file):
439 self.log.debug("Deleting saved queue")
440 os.unlink(pickle_file)
441
442 def resume(self):
443 try:
444 self._load_queue()
445 except:
446 self.log.exception("Unable to load queue")
447 try:
448 self._delete_queue()
449 except:
450 self.log.exception("Unable to delete saved queue")
451 self.log.debug("Resuming queue processing")
452 self.wake_event.set()
453
454 def _doPauseEvent(self):
455 if self._exit:
456 self.log.debug("Exiting")
457 self._save_queue()
458 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700459
James E. Blair468c8512013-12-06 13:27:19 -0800460 def _doReconfigureEvent(self, event):
461 # This is called in the scheduler loop after another thread submits
462 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700463 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800464 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700465 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700466 self.log.debug("Performing reconfiguration")
James E. Blair83005782015-12-11 14:46:03 -0800467 loader = configloader.ConfigLoader()
468 abide = loader.loadConfig(
469 self.config.get('zuul', 'tenant_config'),
470 self, self.merger, self.connections)
James E. Blair59fdbac2015-12-07 17:08:06 -0800471 for tenant in abide.tenants.values():
472 self._reconfigureTenant(tenant)
473 self.abide = abide
James E. Blaircdccd972013-07-01 12:10:22 -0700474 finally:
475 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700476
James E. Blair552b54f2016-07-22 13:55:32 -0700477 def _reenqueueTenant(self, old_tenant, tenant):
James E. Blair59fdbac2015-12-07 17:08:06 -0800478 for name, new_pipeline in tenant.layout.pipelines.items():
479 old_pipeline = old_tenant.layout.pipelines.get(name)
480 if not old_pipeline:
481 self.log.warning("No old pipeline matching %s found "
482 "when reconfiguring" % name)
483 continue
484 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
485 items_to_remove = []
486 builds_to_cancel = []
487 last_head = None
488 for shared_queue in old_pipeline.queues:
489 for item in shared_queue.queue:
490 if not item.item_ahead:
491 last_head = item
492 item.item_ahead = None
493 item.items_behind = []
494 item.pipeline = None
495 item.queue = None
496 project_name = item.change.project.name
497 item.change.project = new_pipeline.source.getProject(
498 project_name)
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700499 if new_pipeline.manager.reEnqueueItem(item,
500 last_head):
501 new_jobs = item.getJobs()
502 for build in item.current_build_set.getBuilds():
503 job = item.layout.getJob(build.job.name)
504 if job and job in new_jobs:
505 build.job = job
506 else:
507 item.removeBuild(build)
508 builds_to_cancel.append(build)
509 else:
James E. Blair59fdbac2015-12-07 17:08:06 -0800510 items_to_remove.append(item)
511 for item in items_to_remove:
512 for build in item.current_build_set.getBuilds():
513 builds_to_cancel.append(build)
514 for build in builds_to_cancel:
515 self.log.warning(
516 "Canceling build %s during reconfiguration" % (build,))
517 try:
518 self.launcher.cancel(build)
519 except Exception:
520 self.log.exception(
521 "Exception while canceling build %s "
522 "for change %s" % (build, item.change))
James E. Blair552b54f2016-07-22 13:55:32 -0700523
524 def _reconfigureTenant(self, tenant):
525 # This is called from _doReconfigureEvent while holding the
526 # layout lock
527 old_tenant = self.abide.tenants.get(tenant.name)
528 if old_tenant:
529 self._reenqueueTenant(old_tenant, tenant)
James E. Blair59fdbac2015-12-07 17:08:06 -0800530 # TODOv3(jeblair): update for tenants
James E. Blair552b54f2016-07-22 13:55:32 -0700531 # self.maintainConnectionCache()
James E. Blair59fdbac2015-12-07 17:08:06 -0800532 for pipeline in tenant.layout.pipelines.values():
533 pipeline.source.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700534 for trigger in pipeline.triggers:
535 trigger.postConfig(pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800536 for reporter in pipeline.actions:
537 reporter.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700538 if self.statsd:
James E. Blair59fdbac2015-12-07 17:08:06 -0800539 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700540 for pipeline in tenant.layout.pipelines.values():
James E. Blair59fdbac2015-12-07 17:08:06 -0800541 items = len(pipeline.getAllItems())
542 # stats.gauges.zuul.pipeline.NAME.current_changes
543 key = 'zuul.pipeline.%s' % pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700544 self.statsd.gauge(key + '.current_changes', items)
James E. Blair59fdbac2015-12-07 17:08:06 -0800545 except Exception:
546 self.log.exception("Exception reporting initial "
547 "pipeline stats:")
548
James E. Blair36658cf2013-12-06 17:53:48 -0800549 def _doPromoteEvent(self, event):
550 pipeline = self.layout.pipelines[event.pipeline_name]
551 change_ids = [c.split(',') for c in event.change_ids]
552 items_to_enqueue = []
553 change_queue = None
554 for shared_queue in pipeline.queues:
555 if change_queue:
556 break
557 for item in shared_queue.queue:
558 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000559 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800560 change_queue = shared_queue
561 break
562 if not change_queue:
563 raise Exception("Unable to find shared change queue for %s" %
564 event.change_ids[0])
565 for number, patchset in change_ids:
566 found = False
567 for item in change_queue.queue:
568 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000569 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800570 found = True
571 items_to_enqueue.append(item)
572 break
573 if not found:
574 raise Exception("Unable to find %s,%s in queue %s" %
575 (number, patchset, change_queue))
576 for item in change_queue.queue[:]:
577 if item not in items_to_enqueue:
578 items_to_enqueue.append(item)
579 pipeline.manager.cancelJobs(item)
580 pipeline.manager.dequeueItem(item)
581 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500582 pipeline.manager.addChange(
583 item.change,
584 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700585 quiet=True,
586 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800587
James E. Blaird27a96d2014-07-10 13:25:13 -0700588 def _doEnqueueEvent(self, event):
589 project = self.layout.projects.get(event.project_name)
590 pipeline = self.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700591 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700592 self.log.debug("Event %s for change %s was directly assigned "
593 "to pipeline %s" % (event, change, self))
James E. Blair59fdbac2015-12-07 17:08:06 -0800594 self.log.info("Adding %s %s to %s" %
James E. Blaird27a96d2014-07-10 13:25:13 -0700595 (project, change, pipeline))
596 pipeline.manager.addChange(change, ignore_requirements=True)
597
James E. Blaire9d45c32012-05-31 09:56:45 -0700598 def _areAllBuildsComplete(self):
599 self.log.debug("Checking if all builds are complete")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700600 if self.merger.areMergesOutstanding():
601 self.log.debug("Waiting on merger")
602 return False
James E. Blaire9d45c32012-05-31 09:56:45 -0700603 waiting = False
James E. Blaireff88162013-07-01 12:44:14 -0400604 for pipeline in self.layout.pipelines.values():
James E. Blair6b077942014-02-07 17:45:55 -0800605 for item in pipeline.getAllItems():
606 for build in item.current_build_set.getBuilds():
607 if build.result is None:
608 self.log.debug("%s waiting on %s" %
609 (pipeline.manager, build))
610 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700611 if not waiting:
612 self.log.debug("All builds are complete")
613 return True
James E. Blaire9d45c32012-05-31 09:56:45 -0700614 return False
615
James E. Blairee743612012-05-29 14:49:32 -0700616 def run(self):
James E. Blair552b54f2016-07-22 13:55:32 -0700617 if self.statsd:
James E. Blair71e94122012-12-24 17:53:08 -0800618 self.log.debug("Statsd enabled")
619 else:
620 self.log.debug("Statsd disabled because python statsd "
621 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700622 while True:
623 self.log.debug("Run handler sleeping")
624 self.wake_event.wait()
625 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700626 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800627 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700628 return
James E. Blairee743612012-05-29 14:49:32 -0700629 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800630 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700631 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800632 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800633 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700634
James E. Blair263fba92013-02-27 13:07:19 -0800635 # Give result events priority -- they let us stop builds,
James E. Blair59fdbac2015-12-07 17:08:06 -0800636 # whereas trigger events cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800637 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700638 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800639
640 if not self._pause:
641 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800642 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700643
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700644 if self._pause and self._areAllBuildsComplete():
645 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700646
James E. Blair59fdbac2015-12-07 17:08:06 -0800647 for tenant in self.abide.tenants.values():
648 for pipeline in tenant.layout.pipelines.values():
649 while pipeline.manager.processQueue():
650 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700651
James E. Blaira84f0e42014-02-06 07:09:22 -0800652 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700653 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800654 # There may still be more events to process
655 self.wake_event.set()
656 finally:
657 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700658
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100659 def maintainConnectionCache(self):
James E. Blair59fdbac2015-12-07 17:08:06 -0800660 # TODOv3(jeblair): update for tenants
James E. Blair0e933c52013-07-11 10:18:52 -0700661 relevant = set()
James E. Blair59fdbac2015-12-07 17:08:06 -0800662 for tenant in self.abide.tenants.values():
663 for pipeline in tenant.layout.pipelines.values():
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100664 self.log.debug("Gather relevant cache items for: %s" %
James E. Blair59fdbac2015-12-07 17:08:06 -0800665 pipeline)
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100666
James E. Blair59fdbac2015-12-07 17:08:06 -0800667 for item in pipeline.getAllItems():
668 relevant.add(item.change)
669 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100670 for connection in self.connections.values():
671 connection.maintainCache(relevant)
672 self.log.debug(
673 "End maintain connection cache for: %s" % connection)
674 self.log.debug("Connection cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -0700675
James E. Blairee743612012-05-29 14:49:32 -0700676 def process_event_queue(self):
677 self.log.debug("Fetching trigger event")
678 event = self.trigger_event_queue.get()
679 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800680 try:
James E. Blair59fdbac2015-12-07 17:08:06 -0800681 for tenant in self.abide.tenants.values():
682 for pipeline in tenant.layout.pipelines.values():
683 # Get the change even if the project is unknown to
684 # us for the use of updating the cache if there is
685 # another change depending on this foreign one.
686 try:
687 change = pipeline.source.getChange(event)
688 except exceptions.ChangeNotFound as e:
689 self.log.debug("Unable to get change %s from "
690 "source %s (most likely looking "
691 "for a change from another "
692 "connection trigger)",
693 e.change, pipeline.source)
694 continue
695 if event.type == 'patchset-created':
696 pipeline.manager.removeOldVersionsOfChange(change)
697 elif event.type == 'change-abandoned':
698 pipeline.manager.removeAbandonedChange(change)
699 if pipeline.manager.eventMatches(event, change):
700 self.log.info("Adding %s %s to %s" %
701 (change.project, change, pipeline))
702 pipeline.manager.addChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800703 finally:
James E. Blairff791972013-01-09 11:45:43 -0800704 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700705
James E. Blair468c8512013-12-06 13:27:19 -0800706 def process_management_queue(self):
707 self.log.debug("Fetching management event")
708 event = self.management_event_queue.get()
709 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800710 try:
711 if isinstance(event, ReconfigureEvent):
712 self._doReconfigureEvent(event)
713 elif isinstance(event, PromoteEvent):
714 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700715 elif isinstance(event, EnqueueEvent):
716 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800717 else:
718 self.log.error("Unable to handle event %s" % event)
719 event.done()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700720 except Exception:
721 event.exception(sys.exc_info())
James E. Blair468c8512013-12-06 13:27:19 -0800722 self.management_event_queue.task_done()
723
James E. Blairee743612012-05-29 14:49:32 -0700724 def process_result_queue(self):
725 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800726 event = self.result_event_queue.get()
727 self.log.debug("Processing result event %s" % event)
728 try:
729 if isinstance(event, BuildStartedEvent):
730 self._doBuildStartedEvent(event)
731 elif isinstance(event, BuildCompletedEvent):
732 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800733 elif isinstance(event, MergeCompletedEvent):
734 self._doMergeCompletedEvent(event)
James E. Blair8d692392016-04-08 17:47:58 -0700735 elif isinstance(event, NodesProvisionedEvent):
736 self._doNodesProvisionedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800737 else:
738 self.log.error("Unable to handle event %s" % event)
739 finally:
740 self.result_event_queue.task_done()
741
742 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800743 build = event.build
744 if build.build_set is not build.build_set.item.current_build_set:
745 self.log.warning("Build %s is not in the current build set" %
746 (build,))
747 return
748 pipeline = build.build_set.item.pipeline
749 if not pipeline:
750 self.log.warning("Build %s is not associated with a pipeline" %
751 (build,))
752 return
James E. Blairce8a2132016-05-19 15:21:52 -0700753 try:
754 build.estimated_time = float(self.time_database.getEstimatedTime(
755 build.job.name))
756 except Exception:
757 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800758 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800759
760 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800761 build = event.build
762 if build.build_set is not build.build_set.item.current_build_set:
763 self.log.warning("Build %s is not in the current build set" %
764 (build,))
765 return
766 pipeline = build.build_set.item.pipeline
767 if not pipeline:
768 self.log.warning("Build %s is not associated with a pipeline" %
769 (build,))
770 return
James E. Blairce8a2132016-05-19 15:21:52 -0700771 if build.end_time and build.start_time and build.result:
772 duration = build.end_time - build.start_time
Paul Belanger87e4ab02016-06-08 14:17:20 -0400773 try:
774 self.time_database.update(
775 build.job.name, duration, build.result)
776 except Exception:
777 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800778 pipeline.manager.onBuildCompleted(event.build)
779
780 def _doMergeCompletedEvent(self, event):
781 build_set = event.build_set
782 if build_set is not build_set.item.current_build_set:
783 self.log.warning("Build set %s is not current" % (build_set,))
784 return
785 pipeline = build_set.item.pipeline
786 if not pipeline:
787 self.log.warning("Build set %s is not associated with a pipeline" %
788 (build_set,))
789 return
790 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700791
James E. Blair8d692392016-04-08 17:47:58 -0700792 def _doNodesProvisionedEvent(self, event):
793 request = event.request
794 build_set = request.build_set
795 if build_set is not build_set.item.current_build_set:
796 self.log.warning("Build set %s is not current" % (build_set,))
797 self.nodepool.returnNodes(request.nodes, used=False)
798 return
799 pipeline = build_set.item.pipeline
800 if not pipeline:
801 self.log.warning("Build set %s is not associated with a pipeline" %
802 (build_set,))
803 return
804 pipeline.manager.onNodesProvisioned(event)
805
Paul Belanger6349d152016-10-30 16:21:17 -0400806 def formatStatusJSON(self, tenant_name):
James E. Blair59fdbac2015-12-07 17:08:06 -0800807 # TODOv3(jeblair): use tenants
James E. Blairb7273ef2016-04-19 08:58:51 -0700808 if self.config.has_option('zuul', 'url_pattern'):
809 url_pattern = self.config.get('zuul', 'url_pattern')
810 else:
811 url_pattern = None
812
James E. Blair8dbd56a2012-12-22 10:55:10 -0800813 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400814
815 data['zuul_version'] = self.zuul_version
816
James E. Blair8dbd56a2012-12-22 10:55:10 -0800817 if self._pause:
818 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800819 if self._exit:
820 ret += 'exit'
821 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
822 ret += '</p>'
823 data['message'] = ret
824
James E. Blairfb682cc2013-02-26 15:23:27 -0800825 data['trigger_event_queue'] = {}
826 data['trigger_event_queue']['length'] = \
827 self.trigger_event_queue.qsize()
828 data['result_event_queue'] = {}
829 data['result_event_queue']['length'] = \
830 self.result_event_queue.qsize()
831
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400832 if self.last_reconfigured:
833 data['last_reconfigured'] = self.last_reconfigured * 1000
834
James E. Blair8dbd56a2012-12-22 10:55:10 -0800835 pipelines = []
836 data['pipelines'] = pipelines
Paul Belanger6349d152016-10-30 16:21:17 -0400837 tenant = self.abide.tenants.get(tenant_name)
838 for pipeline in tenant.layout.pipelines.values():
James E. Blairb7273ef2016-04-19 08:58:51 -0700839 pipelines.append(pipeline.formatStatusJSON(url_pattern))
James E. Blair8dbd56a2012-12-22 10:55:10 -0800840 return json.dumps(data)