blob: 38187cf76538c48e8a337a7895096f3785c55c60 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070023import six
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
James E. Blairee743612012-05-29 14:49:32 -070028
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +100029from zuul import configloader
Morgan Fainberg9c4700a2016-05-30 14:25:19 -070030from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080031from zuul import exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040032from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair1e8dd892012-05-30 09:15:05 -070034
James E. Blairaf17a972016-02-03 15:07:18 -080035class MutexHandler(object):
36 log = logging.getLogger("zuul.MutexHandler")
37
38 def __init__(self):
39 self.mutexes = {}
40
41 def acquire(self, item, job):
42 if not job.mutex:
43 return True
44 mutex_name = job.mutex
45 m = self.mutexes.get(mutex_name)
46 if not m:
47 # The mutex is not held, acquire it
48 self._acquire(mutex_name, item, job.name)
49 return True
50 held_item, held_job_name = m
51 if held_item is item and held_job_name == job.name:
52 # This item already holds the mutex
53 return True
54 held_build = held_item.current_build_set.getBuild(held_job_name)
55 if held_build and held_build.result:
56 # The build that held the mutex is complete, release it
57 # and let the new item have it.
58 self.log.error("Held mutex %s being released because "
59 "the build that holds it is complete" %
60 (mutex_name,))
61 self._release(mutex_name, item, job.name)
62 self._acquire(mutex_name, item, job.name)
63 return True
64 return False
65
66 def release(self, item, job):
67 if not job.mutex:
68 return
69 mutex_name = job.mutex
70 m = self.mutexes.get(mutex_name)
71 if not m:
72 # The mutex is not held, nothing to do
73 self.log.error("Mutex can not be released for %s "
74 "because the mutex is not held" %
75 (item,))
76 return
77 held_item, held_job_name = m
78 if held_item is item and held_job_name == job.name:
79 # This item holds the mutex
80 self._release(mutex_name, item, job.name)
81 return
82 self.log.error("Mutex can not be released for %s "
83 "which does not hold it" %
84 (item,))
85
86 def _acquire(self, mutex_name, item, job_name):
87 self.log.debug("Job %s of item %s acquiring mutex %s" %
88 (job_name, item, mutex_name))
89 self.mutexes[mutex_name] = (item, job_name)
90
91 def _release(self, mutex_name, item, job_name):
92 self.log.debug("Job %s of item %s releasing mutex %s" %
93 (job_name, item, mutex_name))
94 del self.mutexes[mutex_name]
95
96
James E. Blair468c8512013-12-06 13:27:19 -080097class ManagementEvent(object):
98 """An event that should be processed within the main queue run loop"""
99 def __init__(self):
100 self._wait_event = threading.Event()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700101 self._exc_info = None
James E. Blair468c8512013-12-06 13:27:19 -0800102
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700103 def exception(self, exc_info):
104 self._exc_info = exc_info
James E. Blair36658cf2013-12-06 17:53:48 -0800105 self._wait_event.set()
106
107 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -0800108 self._wait_event.set()
109
110 def wait(self, timeout=None):
111 self._wait_event.wait(timeout)
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700112 if self._exc_info:
113 six.reraise(*self._exc_info)
James E. Blair468c8512013-12-06 13:27:19 -0800114 return self._wait_event.is_set()
115
116
117class ReconfigureEvent(ManagementEvent):
118 """Reconfigure the scheduler. The layout will be (re-)loaded from
119 the path specified in the configuration.
120
121 :arg ConfigParser config: the new configuration
122 """
123 def __init__(self, config):
124 super(ReconfigureEvent, self).__init__()
125 self.config = config
126
127
James E. Blair36658cf2013-12-06 17:53:48 -0800128class PromoteEvent(ManagementEvent):
129 """Promote one or more changes to the head of the queue.
130
Paul Belangerbaca3132016-11-04 12:49:54 -0400131 :arg str tenant_name: the name of the tenant
James E. Blair36658cf2013-12-06 17:53:48 -0800132 :arg str pipeline_name: the name of the pipeline
133 :arg list change_ids: a list of strings of change ids in the form
134 1234,1
135 """
136
Paul Belangerbaca3132016-11-04 12:49:54 -0400137 def __init__(self, tenant_name, pipeline_name, change_ids):
James E. Blair36658cf2013-12-06 17:53:48 -0800138 super(PromoteEvent, self).__init__()
Paul Belangerbaca3132016-11-04 12:49:54 -0400139 self.tenant_name = tenant_name
James E. Blair36658cf2013-12-06 17:53:48 -0800140 self.pipeline_name = pipeline_name
141 self.change_ids = change_ids
142
143
James E. Blaird27a96d2014-07-10 13:25:13 -0700144class EnqueueEvent(ManagementEvent):
145 """Enqueue a change into a pipeline
146
147 :arg TriggerEvent trigger_event: a TriggerEvent describing the
148 trigger, pipeline, and change to enqueue
149 """
150
151 def __init__(self, trigger_event):
152 super(EnqueueEvent, self).__init__()
153 self.trigger_event = trigger_event
154
155
James E. Blaira84f0e42014-02-06 07:09:22 -0800156class ResultEvent(object):
157 """An event that needs to modify the pipeline state due to a
158 result from an external system."""
159
160 pass
161
162
163class BuildStartedEvent(ResultEvent):
164 """A build has started.
165
166 :arg Build build: The build which has started.
167 """
168
169 def __init__(self, build):
170 self.build = build
171
172
173class BuildCompletedEvent(ResultEvent):
174 """A build has completed
175
176 :arg Build build: The build which has completed.
177 """
178
179 def __init__(self, build):
180 self.build = build
181
182
James E. Blair4076e2b2014-01-28 12:42:20 -0800183class MergeCompletedEvent(ResultEvent):
184 """A remote merge operation has completed
185
186 :arg BuildSet build_set: The build_set which is ready.
187 :arg str zuul_url: The URL of the Zuul Merger.
188 :arg bool merged: Whether the merge succeeded (changes with refs).
189 :arg bool updated: Whether the repo was updated (changes without refs).
190 :arg str commit: The SHA of the merged commit (changes with refs).
191 """
192
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700193 def __init__(self, build_set, zuul_url, merged, updated, commit,
194 files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800195 self.build_set = build_set
196 self.zuul_url = zuul_url
197 self.merged = merged
198 self.updated = updated
199 self.commit = commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700200 self.files = files
James E. Blair4076e2b2014-01-28 12:42:20 -0800201
202
James E. Blair8d692392016-04-08 17:47:58 -0700203class NodesProvisionedEvent(ResultEvent):
204 """Nodes have been provisioned for a build_set
205
206 :arg BuildSet build_set: The build_set which has nodes.
207 :arg list of Node objects nodes: The provisioned nodes
208 """
209
210 def __init__(self, request):
211 self.request = request
212
213
Maru Newby3fe5f852015-01-13 04:22:14 +0000214def toList(item):
215 if not item:
216 return []
217 if isinstance(item, list):
218 return item
219 return [item]
220
221
James E. Blaire9d45c32012-05-31 09:56:45 -0700222class Scheduler(threading.Thread):
James E. Blaire4de4f42017-01-19 10:35:24 -0800223 """The engine of Zuul.
224
225 The Scheduler is reponsible for recieving events and dispatching
226 them to appropriate components (including pipeline managers,
227 mergers and launchers).
228
229 It runs a single threaded main loop which processes events
230 received one at a time and takes action as appropriate. Other
231 parts of Zuul may run in their own thread, but synchronization is
232 performed within the scheduler to reduce or eliminate the need for
233 locking in most circumstances.
234
235 The main daemon will have one instance of the Scheduler class
236 running which will persist for the life of the process. The
237 Scheduler instance is supplied to other Zuul components so that
238 they can submit events or otherwise communicate with other
239 components.
240
241 """
242
James E. Blairee743612012-05-29 14:49:32 -0700243 log = logging.getLogger("zuul.Scheduler")
244
James E. Blaire4d229c2016-05-25 15:25:41 -0700245 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700246 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400247 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700248 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700249 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800250 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700251 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700252 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700253 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700254 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800255 self.merger = None
James E. Blair83005782015-12-11 14:46:03 -0800256 self.connections = None
James E. Blair552b54f2016-07-22 13:55:32 -0700257 self.statsd = extras.try_import('statsd.statsd')
James E. Blair83005782015-12-11 14:46:03 -0800258 # TODO(jeblair): fix this
James E. Blairaf17a972016-02-03 15:07:18 -0800259 self.mutex = MutexHandler()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000260 # Despite triggers being part of the pipeline, there is one trigger set
261 # per scheduler. The pipeline handles the trigger filters but since
262 # the events are handled by the scheduler itself it needs to handle
263 # the loading of the triggers.
264 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700265 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000266 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700267
268 self.trigger_event_queue = Queue.Queue()
269 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800270 self.management_event_queue = Queue.Queue()
James E. Blair59fdbac2015-12-07 17:08:06 -0800271 self.abide = model.Abide()
James E. Blairee743612012-05-29 14:49:32 -0700272
James E. Blaire4d229c2016-05-25 15:25:41 -0700273 if not testonly:
274 time_dir = self._get_time_database_dir()
275 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700276
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000277 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400278 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400279
James E. Blairb0fcae42012-07-17 11:12:10 -0700280 def stop(self):
281 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000282 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700283 self.wake_event.set()
284
Joshua Hesketh352264b2015-08-11 23:42:08 +1000285 def testConfig(self, config_path, connections):
286 # Take the list of set up connections directly here rather than with
287 # registerConnections as we don't want to do the onLoad event yet.
288 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800289
Joshua Hesketh9a256752016-04-04 13:38:51 +1000290 def registerConnections(self, connections, load=True):
291 # load: whether or not to trigger the onLoad for the connection. This
292 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000293 self.connections = connections
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +1000294 self.connections.registerScheduler(self, load)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000295
296 def stopConnections(self):
James E. Blair83005782015-12-11 14:46:03 -0800297 self.connections.stop()
James E. Blair14abdf42015-12-09 16:11:53 -0800298
James E. Blairee743612012-05-29 14:49:32 -0700299 def setLauncher(self, launcher):
300 self.launcher = launcher
301
James E. Blair4076e2b2014-01-28 12:42:20 -0800302 def setMerger(self, merger):
303 self.merger = merger
304
James E. Blair8d692392016-04-08 17:47:58 -0700305 def setNodepool(self, nodepool):
306 self.nodepool = nodepool
307
James E. Blairdce6cea2016-12-20 16:45:32 -0800308 def setZooKeeper(self, zk):
309 self.zk = zk
310
James E. Blairee743612012-05-29 14:49:32 -0700311 def addEvent(self, event):
312 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800313 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700314 if self.statsd:
315 self.statsd.incr('gerrit.event.%s' % event.type)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800316 except:
317 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700318 self.trigger_event_queue.put(event)
319 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800320 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700321
James E. Blair11700c32012-07-05 17:50:05 -0700322 def onBuildStarted(self, build):
323 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800324 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800325 event = BuildStartedEvent(build)
326 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700327 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800328 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700329
James E. Blairf0358662015-07-20 15:19:12 -0700330 def onBuildCompleted(self, build, result):
331 self.log.debug("Adding complete event for build: %s result: %s" % (
332 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800333 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700334 # Note, as soon as the result is set, other threads may act
335 # upon this, even though the event hasn't been fully
336 # processed. Ensure that any other data from the event (eg,
337 # timing) is recorded before setting the result.
338 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800339 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700340 if self.statsd and build.pipeline:
James E. Blair66eeebf2013-07-27 17:44:32 -0700341 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500342 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700343 self.statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500344 for label in build.node_labels:
345 # Jenkins includes the node name in its list of labels, so
346 # we filter it out here, since that is not statistically
347 # interesting.
348 if label == build.node_name:
349 continue
350 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800351 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
352 build.pipeline.name, label)
James E. Blair552b54f2016-07-22 13:55:32 -0700353 self.statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700354 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
355 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800356 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
357 dt = int((build.end_time - build.start_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700358 self.statsd.timing(key, dt)
359 self.statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800360
361 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
362 build.pipeline.name, jobname)
363 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700364 self.statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800365 except:
366 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800367 event = BuildCompletedEvent(build)
368 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700369 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800370 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700371
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700372 def onMergeCompleted(self, build_set, zuul_url, merged, updated,
373 commit, files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800374 self.log.debug("Adding merge complete event for build set: %s" %
375 build_set)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700376 event = MergeCompletedEvent(build_set, zuul_url, merged,
377 updated, commit, files)
James E. Blair4076e2b2014-01-28 12:42:20 -0800378 self.result_event_queue.put(event)
379 self.wake_event.set()
380
James E. Blair8d692392016-04-08 17:47:58 -0700381 def onNodesProvisioned(self, req):
382 self.log.debug("Adding nodes provisioned event for build set: %s" %
383 req.build_set)
384 event = NodesProvisionedEvent(req)
385 self.result_event_queue.put(event)
386 self.wake_event.set()
387
James E. Blaire9d45c32012-05-31 09:56:45 -0700388 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700389 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800390 event = ReconfigureEvent(config)
391 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700392 self.wake_event.set()
393 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800394 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700395 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400396 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700397
Paul Belangerbaca3132016-11-04 12:49:54 -0400398 def promote(self, tenant_name, pipeline_name, change_ids):
399 event = PromoteEvent(tenant_name, pipeline_name, change_ids)
James E. Blair36658cf2013-12-06 17:53:48 -0800400 self.management_event_queue.put(event)
401 self.wake_event.set()
402 self.log.debug("Waiting for promotion")
403 event.wait()
404 self.log.debug("Promotion complete")
405
James E. Blaird27a96d2014-07-10 13:25:13 -0700406 def enqueue(self, trigger_event):
407 event = EnqueueEvent(trigger_event)
408 self.management_event_queue.put(event)
409 self.wake_event.set()
410 self.log.debug("Waiting for enqueue")
411 event.wait()
412 self.log.debug("Enqueue complete")
413
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700414 def exit(self):
415 self.log.debug("Prepare to exit")
416 self._pause = True
417 self._exit = True
418 self.wake_event.set()
419 self.log.debug("Waiting for exit")
420
421 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700422 if self.config.has_option('zuul', 'state_dir'):
423 state_dir = os.path.expanduser(self.config.get('zuul',
424 'state_dir'))
425 else:
426 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700427 return os.path.join(state_dir, 'queue.pickle')
428
James E. Blairce8a2132016-05-19 15:21:52 -0700429 def _get_time_database_dir(self):
430 if self.config.has_option('zuul', 'state_dir'):
431 state_dir = os.path.expanduser(self.config.get('zuul',
432 'state_dir'))
433 else:
434 state_dir = '/var/lib/zuul'
435 d = os.path.join(state_dir, 'times')
436 if not os.path.exists(d):
437 os.mkdir(d)
438 return d
439
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700440 def _save_queue(self):
441 pickle_file = self._get_queue_pickle_file()
442 events = []
443 while not self.trigger_event_queue.empty():
444 events.append(self.trigger_event_queue.get())
445 self.log.debug("Queue length is %s" % len(events))
446 if events:
447 self.log.debug("Saving queue")
448 pickle.dump(events, open(pickle_file, 'wb'))
449
450 def _load_queue(self):
451 pickle_file = self._get_queue_pickle_file()
452 if os.path.exists(pickle_file):
453 self.log.debug("Loading queue")
454 events = pickle.load(open(pickle_file, 'rb'))
455 self.log.debug("Queue length is %s" % len(events))
456 for event in events:
457 self.trigger_event_queue.put(event)
458 else:
459 self.log.debug("No queue file found")
460
461 def _delete_queue(self):
462 pickle_file = self._get_queue_pickle_file()
463 if os.path.exists(pickle_file):
464 self.log.debug("Deleting saved queue")
465 os.unlink(pickle_file)
466
467 def resume(self):
468 try:
469 self._load_queue()
470 except:
471 self.log.exception("Unable to load queue")
472 try:
473 self._delete_queue()
474 except:
475 self.log.exception("Unable to delete saved queue")
476 self.log.debug("Resuming queue processing")
477 self.wake_event.set()
478
479 def _doPauseEvent(self):
480 if self._exit:
481 self.log.debug("Exiting")
482 self._save_queue()
483 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700484
James E. Blair468c8512013-12-06 13:27:19 -0800485 def _doReconfigureEvent(self, event):
486 # This is called in the scheduler loop after another thread submits
487 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700488 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800489 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700490 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700491 self.log.debug("Performing reconfiguration")
James E. Blair83005782015-12-11 14:46:03 -0800492 loader = configloader.ConfigLoader()
493 abide = loader.loadConfig(
494 self.config.get('zuul', 'tenant_config'),
495 self, self.merger, self.connections)
James E. Blair59fdbac2015-12-07 17:08:06 -0800496 for tenant in abide.tenants.values():
497 self._reconfigureTenant(tenant)
498 self.abide = abide
James E. Blaircdccd972013-07-01 12:10:22 -0700499 finally:
500 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700501
James E. Blair552b54f2016-07-22 13:55:32 -0700502 def _reenqueueTenant(self, old_tenant, tenant):
James E. Blair59fdbac2015-12-07 17:08:06 -0800503 for name, new_pipeline in tenant.layout.pipelines.items():
504 old_pipeline = old_tenant.layout.pipelines.get(name)
505 if not old_pipeline:
506 self.log.warning("No old pipeline matching %s found "
507 "when reconfiguring" % name)
508 continue
509 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
510 items_to_remove = []
511 builds_to_cancel = []
512 last_head = None
513 for shared_queue in old_pipeline.queues:
514 for item in shared_queue.queue:
515 if not item.item_ahead:
516 last_head = item
517 item.item_ahead = None
518 item.items_behind = []
519 item.pipeline = None
520 item.queue = None
521 project_name = item.change.project.name
522 item.change.project = new_pipeline.source.getProject(
523 project_name)
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700524 if new_pipeline.manager.reEnqueueItem(item,
525 last_head):
526 new_jobs = item.getJobs()
527 for build in item.current_build_set.getBuilds():
Paul Belangere22baea2016-11-03 16:59:27 -0400528 jobtree = item.job_tree.getJobTreeForJob(build.job)
529 if jobtree and jobtree.job in new_jobs:
530 build.job = jobtree.job
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700531 else:
532 item.removeBuild(build)
533 builds_to_cancel.append(build)
534 else:
James E. Blair59fdbac2015-12-07 17:08:06 -0800535 items_to_remove.append(item)
536 for item in items_to_remove:
537 for build in item.current_build_set.getBuilds():
538 builds_to_cancel.append(build)
539 for build in builds_to_cancel:
540 self.log.warning(
541 "Canceling build %s during reconfiguration" % (build,))
542 try:
543 self.launcher.cancel(build)
544 except Exception:
545 self.log.exception(
546 "Exception while canceling build %s "
547 "for change %s" % (build, item.change))
James E. Blair552b54f2016-07-22 13:55:32 -0700548
549 def _reconfigureTenant(self, tenant):
550 # This is called from _doReconfigureEvent while holding the
551 # layout lock
552 old_tenant = self.abide.tenants.get(tenant.name)
553 if old_tenant:
554 self._reenqueueTenant(old_tenant, tenant)
James E. Blair59fdbac2015-12-07 17:08:06 -0800555 # TODOv3(jeblair): update for tenants
James E. Blair552b54f2016-07-22 13:55:32 -0700556 # self.maintainConnectionCache()
James E. Blaire511d2f2016-12-08 15:22:26 -0800557 self.connections.reconfigureDrivers(tenant)
558 # TODOv3(jeblair): remove postconfig calls?
James E. Blair59fdbac2015-12-07 17:08:06 -0800559 for pipeline in tenant.layout.pipelines.values():
560 pipeline.source.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700561 for trigger in pipeline.triggers:
562 trigger.postConfig(pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800563 for reporter in pipeline.actions:
564 reporter.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700565 if self.statsd:
James E. Blair59fdbac2015-12-07 17:08:06 -0800566 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700567 for pipeline in tenant.layout.pipelines.values():
James E. Blair59fdbac2015-12-07 17:08:06 -0800568 items = len(pipeline.getAllItems())
569 # stats.gauges.zuul.pipeline.NAME.current_changes
570 key = 'zuul.pipeline.%s' % pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700571 self.statsd.gauge(key + '.current_changes', items)
James E. Blair59fdbac2015-12-07 17:08:06 -0800572 except Exception:
573 self.log.exception("Exception reporting initial "
574 "pipeline stats:")
575
James E. Blair36658cf2013-12-06 17:53:48 -0800576 def _doPromoteEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400577 tenant = self.abide.tenants.get(event.tenant_name)
578 pipeline = tenant.layout.pipelines[event.pipeline_name]
James E. Blair36658cf2013-12-06 17:53:48 -0800579 change_ids = [c.split(',') for c in event.change_ids]
580 items_to_enqueue = []
581 change_queue = None
582 for shared_queue in pipeline.queues:
583 if change_queue:
584 break
585 for item in shared_queue.queue:
586 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000587 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800588 change_queue = shared_queue
589 break
590 if not change_queue:
591 raise Exception("Unable to find shared change queue for %s" %
592 event.change_ids[0])
593 for number, patchset in change_ids:
594 found = False
595 for item in change_queue.queue:
596 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000597 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800598 found = True
599 items_to_enqueue.append(item)
600 break
601 if not found:
602 raise Exception("Unable to find %s,%s in queue %s" %
603 (number, patchset, change_queue))
604 for item in change_queue.queue[:]:
605 if item not in items_to_enqueue:
606 items_to_enqueue.append(item)
607 pipeline.manager.cancelJobs(item)
608 pipeline.manager.dequeueItem(item)
609 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500610 pipeline.manager.addChange(
611 item.change,
612 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700613 quiet=True,
614 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800615
James E. Blaird27a96d2014-07-10 13:25:13 -0700616 def _doEnqueueEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400617 tenant = self.abide.tenants.get(event.tenant_name)
618 project = tenant.layout.project_configs.get(event.project_name)
619 pipeline = tenant.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700620 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700621 self.log.debug("Event %s for change %s was directly assigned "
622 "to pipeline %s" % (event, change, self))
James E. Blair59fdbac2015-12-07 17:08:06 -0800623 self.log.info("Adding %s %s to %s" %
James E. Blaird27a96d2014-07-10 13:25:13 -0700624 (project, change, pipeline))
625 pipeline.manager.addChange(change, ignore_requirements=True)
626
James E. Blaire9d45c32012-05-31 09:56:45 -0700627 def _areAllBuildsComplete(self):
628 self.log.debug("Checking if all builds are complete")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700629 if self.merger.areMergesOutstanding():
630 self.log.debug("Waiting on merger")
631 return False
James E. Blaire9d45c32012-05-31 09:56:45 -0700632 waiting = False
Paul Belangerdebd7a72016-11-11 19:56:15 -0500633 for tenant in self.abide.tenants.values():
634 for pipeline in tenant.layout.pipelines.values():
635 for item in pipeline.getAllItems():
636 for build in item.current_build_set.getBuilds():
637 if build.result is None:
638 self.log.debug("%s waiting on %s" %
639 (pipeline.manager, build))
640 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700641 if not waiting:
642 self.log.debug("All builds are complete")
643 return True
James E. Blaire9d45c32012-05-31 09:56:45 -0700644 return False
645
James E. Blairee743612012-05-29 14:49:32 -0700646 def run(self):
James E. Blair552b54f2016-07-22 13:55:32 -0700647 if self.statsd:
James E. Blair71e94122012-12-24 17:53:08 -0800648 self.log.debug("Statsd enabled")
649 else:
650 self.log.debug("Statsd disabled because python statsd "
651 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700652 while True:
653 self.log.debug("Run handler sleeping")
654 self.wake_event.wait()
655 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700656 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800657 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700658 return
James E. Blairee743612012-05-29 14:49:32 -0700659 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800660 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700661 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800662 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800663 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700664
James E. Blair263fba92013-02-27 13:07:19 -0800665 # Give result events priority -- they let us stop builds,
James E. Blair59fdbac2015-12-07 17:08:06 -0800666 # whereas trigger events cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800667 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700668 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800669
670 if not self._pause:
671 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800672 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700673
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700674 if self._pause and self._areAllBuildsComplete():
675 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700676
James E. Blair59fdbac2015-12-07 17:08:06 -0800677 for tenant in self.abide.tenants.values():
678 for pipeline in tenant.layout.pipelines.values():
679 while pipeline.manager.processQueue():
680 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700681
James E. Blaira84f0e42014-02-06 07:09:22 -0800682 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700683 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800684 # There may still be more events to process
685 self.wake_event.set()
686 finally:
687 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700688
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100689 def maintainConnectionCache(self):
James E. Blair59fdbac2015-12-07 17:08:06 -0800690 # TODOv3(jeblair): update for tenants
James E. Blair0e933c52013-07-11 10:18:52 -0700691 relevant = set()
James E. Blair59fdbac2015-12-07 17:08:06 -0800692 for tenant in self.abide.tenants.values():
693 for pipeline in tenant.layout.pipelines.values():
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100694 self.log.debug("Gather relevant cache items for: %s" %
James E. Blair59fdbac2015-12-07 17:08:06 -0800695 pipeline)
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100696
James E. Blair59fdbac2015-12-07 17:08:06 -0800697 for item in pipeline.getAllItems():
698 relevant.add(item.change)
699 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100700 for connection in self.connections.values():
701 connection.maintainCache(relevant)
702 self.log.debug(
703 "End maintain connection cache for: %s" % connection)
704 self.log.debug("Connection cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -0700705
James E. Blairee743612012-05-29 14:49:32 -0700706 def process_event_queue(self):
707 self.log.debug("Fetching trigger event")
708 event = self.trigger_event_queue.get()
709 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800710 try:
James E. Blair59fdbac2015-12-07 17:08:06 -0800711 for tenant in self.abide.tenants.values():
712 for pipeline in tenant.layout.pipelines.values():
713 # Get the change even if the project is unknown to
714 # us for the use of updating the cache if there is
715 # another change depending on this foreign one.
716 try:
717 change = pipeline.source.getChange(event)
718 except exceptions.ChangeNotFound as e:
719 self.log.debug("Unable to get change %s from "
720 "source %s (most likely looking "
721 "for a change from another "
722 "connection trigger)",
723 e.change, pipeline.source)
724 continue
725 if event.type == 'patchset-created':
726 pipeline.manager.removeOldVersionsOfChange(change)
727 elif event.type == 'change-abandoned':
728 pipeline.manager.removeAbandonedChange(change)
729 if pipeline.manager.eventMatches(event, change):
730 self.log.info("Adding %s %s to %s" %
731 (change.project, change, pipeline))
732 pipeline.manager.addChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800733 finally:
James E. Blairff791972013-01-09 11:45:43 -0800734 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700735
James E. Blair468c8512013-12-06 13:27:19 -0800736 def process_management_queue(self):
737 self.log.debug("Fetching management event")
738 event = self.management_event_queue.get()
739 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800740 try:
741 if isinstance(event, ReconfigureEvent):
742 self._doReconfigureEvent(event)
743 elif isinstance(event, PromoteEvent):
744 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700745 elif isinstance(event, EnqueueEvent):
746 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800747 else:
748 self.log.error("Unable to handle event %s" % event)
749 event.done()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700750 except Exception:
751 event.exception(sys.exc_info())
James E. Blair468c8512013-12-06 13:27:19 -0800752 self.management_event_queue.task_done()
753
James E. Blairee743612012-05-29 14:49:32 -0700754 def process_result_queue(self):
755 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800756 event = self.result_event_queue.get()
757 self.log.debug("Processing result event %s" % event)
758 try:
759 if isinstance(event, BuildStartedEvent):
760 self._doBuildStartedEvent(event)
761 elif isinstance(event, BuildCompletedEvent):
762 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800763 elif isinstance(event, MergeCompletedEvent):
764 self._doMergeCompletedEvent(event)
James E. Blair8d692392016-04-08 17:47:58 -0700765 elif isinstance(event, NodesProvisionedEvent):
766 self._doNodesProvisionedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800767 else:
768 self.log.error("Unable to handle event %s" % event)
769 finally:
770 self.result_event_queue.task_done()
771
772 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800773 build = event.build
774 if build.build_set is not build.build_set.item.current_build_set:
775 self.log.warning("Build %s is not in the current build set" %
776 (build,))
777 return
778 pipeline = build.build_set.item.pipeline
779 if not pipeline:
780 self.log.warning("Build %s is not associated with a pipeline" %
781 (build,))
782 return
James E. Blairce8a2132016-05-19 15:21:52 -0700783 try:
784 build.estimated_time = float(self.time_database.getEstimatedTime(
785 build.job.name))
786 except Exception:
787 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800788 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800789
790 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800791 build = event.build
James E. Blaire18d4602017-01-05 11:17:28 -0800792
793 # Regardless of any other conditions which might cause us not
794 # to pass this on to the pipeline manager, make sure we return
795 # the nodes to nodepool.
796 try:
797 nodeset = build.build_set.getJobNodeSet(build.job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800798 self.nodepool.returnNodeSet(nodeset)
James E. Blaire18d4602017-01-05 11:17:28 -0800799 except Exception:
800 self.log.exception("Unable to return nodeset %s" % (nodeset,))
801
James E. Blair4076e2b2014-01-28 12:42:20 -0800802 if build.build_set is not build.build_set.item.current_build_set:
James E. Blaire18d4602017-01-05 11:17:28 -0800803 self.log.debug("Build %s is not in the current build set" %
804 (build,))
James E. Blair4076e2b2014-01-28 12:42:20 -0800805 return
806 pipeline = build.build_set.item.pipeline
807 if not pipeline:
808 self.log.warning("Build %s is not associated with a pipeline" %
809 (build,))
810 return
James E. Blairce8a2132016-05-19 15:21:52 -0700811 if build.end_time and build.start_time and build.result:
812 duration = build.end_time - build.start_time
Paul Belanger87e4ab02016-06-08 14:17:20 -0400813 try:
814 self.time_database.update(
815 build.job.name, duration, build.result)
816 except Exception:
817 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800818 pipeline.manager.onBuildCompleted(event.build)
819
820 def _doMergeCompletedEvent(self, event):
821 build_set = event.build_set
822 if build_set is not build_set.item.current_build_set:
823 self.log.warning("Build set %s is not current" % (build_set,))
824 return
825 pipeline = build_set.item.pipeline
826 if not pipeline:
827 self.log.warning("Build set %s is not associated with a pipeline" %
828 (build_set,))
829 return
830 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700831
James E. Blair8d692392016-04-08 17:47:58 -0700832 def _doNodesProvisionedEvent(self, event):
833 request = event.request
834 build_set = request.build_set
James E. Blaira38c28e2017-01-04 10:33:20 -0800835
James E. Blair6ab79e02017-01-06 10:10:17 -0800836 self.nodepool.acceptNodes(request)
James E. Blaira38c28e2017-01-04 10:33:20 -0800837
James E. Blair8d692392016-04-08 17:47:58 -0700838 if build_set is not build_set.item.current_build_set:
839 self.log.warning("Build set %s is not current" % (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -0800840 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -0800841 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -0700842 return
843 pipeline = build_set.item.pipeline
844 if not pipeline:
845 self.log.warning("Build set %s is not associated with a pipeline" %
846 (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -0800847 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -0800848 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -0700849 return
850 pipeline.manager.onNodesProvisioned(event)
851
Paul Belanger6349d152016-10-30 16:21:17 -0400852 def formatStatusJSON(self, tenant_name):
James E. Blair59fdbac2015-12-07 17:08:06 -0800853 # TODOv3(jeblair): use tenants
James E. Blairb7273ef2016-04-19 08:58:51 -0700854 if self.config.has_option('zuul', 'url_pattern'):
855 url_pattern = self.config.get('zuul', 'url_pattern')
856 else:
857 url_pattern = None
858
James E. Blair8dbd56a2012-12-22 10:55:10 -0800859 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400860
861 data['zuul_version'] = self.zuul_version
862
James E. Blair8dbd56a2012-12-22 10:55:10 -0800863 if self._pause:
864 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800865 if self._exit:
866 ret += 'exit'
867 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
868 ret += '</p>'
869 data['message'] = ret
870
James E. Blairfb682cc2013-02-26 15:23:27 -0800871 data['trigger_event_queue'] = {}
872 data['trigger_event_queue']['length'] = \
873 self.trigger_event_queue.qsize()
874 data['result_event_queue'] = {}
875 data['result_event_queue']['length'] = \
876 self.result_event_queue.qsize()
877
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400878 if self.last_reconfigured:
879 data['last_reconfigured'] = self.last_reconfigured * 1000
880
James E. Blair8dbd56a2012-12-22 10:55:10 -0800881 pipelines = []
882 data['pipelines'] = pipelines
Paul Belanger6349d152016-10-30 16:21:17 -0400883 tenant = self.abide.tenants.get(tenant_name)
884 for pipeline in tenant.layout.pipelines.values():
James E. Blairb7273ef2016-04-19 08:58:51 -0700885 pipelines.append(pipeline.formatStatusJSON(url_pattern))
James E. Blair8dbd56a2012-12-22 10:55:10 -0800886 return json.dumps(data)