blob: 60738ec3e10f8be4364fe091cf3ad3f234021edf [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair71e94122012-12-24 17:53:08 -080018import extras
James E. Blair8dbd56a2012-12-22 10:55:10 -080019import json
James E. Blairee743612012-05-29 14:49:32 -070020import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080021import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070022import pickle
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070023import six
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
James E. Blair36658cf2013-12-06 17:53:48 -080025import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080026import threading
James E. Blair71e94122012-12-24 17:53:08 -080027import time
James E. Blairee743612012-05-29 14:49:32 -070028
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +100029from zuul import configloader
Morgan Fainberg9c4700a2016-05-30 14:25:19 -070030from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080031from zuul import exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040032from zuul import version as zuul_version
James E. Blairee743612012-05-29 14:49:32 -070033
James E. Blair1e8dd892012-05-30 09:15:05 -070034
James E. Blairaf17a972016-02-03 15:07:18 -080035class MutexHandler(object):
36 log = logging.getLogger("zuul.MutexHandler")
37
38 def __init__(self):
39 self.mutexes = {}
40
41 def acquire(self, item, job):
42 if not job.mutex:
43 return True
44 mutex_name = job.mutex
45 m = self.mutexes.get(mutex_name)
46 if not m:
47 # The mutex is not held, acquire it
48 self._acquire(mutex_name, item, job.name)
49 return True
50 held_item, held_job_name = m
51 if held_item is item and held_job_name == job.name:
52 # This item already holds the mutex
53 return True
54 held_build = held_item.current_build_set.getBuild(held_job_name)
55 if held_build and held_build.result:
56 # The build that held the mutex is complete, release it
57 # and let the new item have it.
58 self.log.error("Held mutex %s being released because "
59 "the build that holds it is complete" %
60 (mutex_name,))
61 self._release(mutex_name, item, job.name)
62 self._acquire(mutex_name, item, job.name)
63 return True
64 return False
65
66 def release(self, item, job):
67 if not job.mutex:
68 return
69 mutex_name = job.mutex
70 m = self.mutexes.get(mutex_name)
71 if not m:
72 # The mutex is not held, nothing to do
73 self.log.error("Mutex can not be released for %s "
74 "because the mutex is not held" %
75 (item,))
76 return
77 held_item, held_job_name = m
78 if held_item is item and held_job_name == job.name:
79 # This item holds the mutex
80 self._release(mutex_name, item, job.name)
81 return
82 self.log.error("Mutex can not be released for %s "
83 "which does not hold it" %
84 (item,))
85
86 def _acquire(self, mutex_name, item, job_name):
87 self.log.debug("Job %s of item %s acquiring mutex %s" %
88 (job_name, item, mutex_name))
89 self.mutexes[mutex_name] = (item, job_name)
90
91 def _release(self, mutex_name, item, job_name):
92 self.log.debug("Job %s of item %s releasing mutex %s" %
93 (job_name, item, mutex_name))
94 del self.mutexes[mutex_name]
95
96
James E. Blair468c8512013-12-06 13:27:19 -080097class ManagementEvent(object):
98 """An event that should be processed within the main queue run loop"""
99 def __init__(self):
100 self._wait_event = threading.Event()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700101 self._exc_info = None
James E. Blair468c8512013-12-06 13:27:19 -0800102
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700103 def exception(self, exc_info):
104 self._exc_info = exc_info
James E. Blair36658cf2013-12-06 17:53:48 -0800105 self._wait_event.set()
106
107 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -0800108 self._wait_event.set()
109
110 def wait(self, timeout=None):
111 self._wait_event.wait(timeout)
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700112 if self._exc_info:
113 six.reraise(*self._exc_info)
James E. Blair468c8512013-12-06 13:27:19 -0800114 return self._wait_event.is_set()
115
116
117class ReconfigureEvent(ManagementEvent):
118 """Reconfigure the scheduler. The layout will be (re-)loaded from
119 the path specified in the configuration.
120
121 :arg ConfigParser config: the new configuration
122 """
123 def __init__(self, config):
124 super(ReconfigureEvent, self).__init__()
125 self.config = config
126
127
James E. Blair36658cf2013-12-06 17:53:48 -0800128class PromoteEvent(ManagementEvent):
129 """Promote one or more changes to the head of the queue.
130
Paul Belangerbaca3132016-11-04 12:49:54 -0400131 :arg str tenant_name: the name of the tenant
James E. Blair36658cf2013-12-06 17:53:48 -0800132 :arg str pipeline_name: the name of the pipeline
133 :arg list change_ids: a list of strings of change ids in the form
134 1234,1
135 """
136
Paul Belangerbaca3132016-11-04 12:49:54 -0400137 def __init__(self, tenant_name, pipeline_name, change_ids):
James E. Blair36658cf2013-12-06 17:53:48 -0800138 super(PromoteEvent, self).__init__()
Paul Belangerbaca3132016-11-04 12:49:54 -0400139 self.tenant_name = tenant_name
James E. Blair36658cf2013-12-06 17:53:48 -0800140 self.pipeline_name = pipeline_name
141 self.change_ids = change_ids
142
143
James E. Blaird27a96d2014-07-10 13:25:13 -0700144class EnqueueEvent(ManagementEvent):
145 """Enqueue a change into a pipeline
146
147 :arg TriggerEvent trigger_event: a TriggerEvent describing the
148 trigger, pipeline, and change to enqueue
149 """
150
151 def __init__(self, trigger_event):
152 super(EnqueueEvent, self).__init__()
153 self.trigger_event = trigger_event
154
155
James E. Blaira84f0e42014-02-06 07:09:22 -0800156class ResultEvent(object):
157 """An event that needs to modify the pipeline state due to a
158 result from an external system."""
159
160 pass
161
162
163class BuildStartedEvent(ResultEvent):
164 """A build has started.
165
166 :arg Build build: The build which has started.
167 """
168
169 def __init__(self, build):
170 self.build = build
171
172
173class BuildCompletedEvent(ResultEvent):
174 """A build has completed
175
176 :arg Build build: The build which has completed.
177 """
178
179 def __init__(self, build):
180 self.build = build
181
182
James E. Blair4076e2b2014-01-28 12:42:20 -0800183class MergeCompletedEvent(ResultEvent):
184 """A remote merge operation has completed
185
186 :arg BuildSet build_set: The build_set which is ready.
187 :arg str zuul_url: The URL of the Zuul Merger.
188 :arg bool merged: Whether the merge succeeded (changes with refs).
189 :arg bool updated: Whether the repo was updated (changes without refs).
190 :arg str commit: The SHA of the merged commit (changes with refs).
191 """
192
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700193 def __init__(self, build_set, zuul_url, merged, updated, commit,
194 files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800195 self.build_set = build_set
196 self.zuul_url = zuul_url
197 self.merged = merged
198 self.updated = updated
199 self.commit = commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700200 self.files = files
James E. Blair4076e2b2014-01-28 12:42:20 -0800201
202
James E. Blair8d692392016-04-08 17:47:58 -0700203class NodesProvisionedEvent(ResultEvent):
204 """Nodes have been provisioned for a build_set
205
206 :arg BuildSet build_set: The build_set which has nodes.
207 :arg list of Node objects nodes: The provisioned nodes
208 """
209
210 def __init__(self, request):
211 self.request = request
212
213
Maru Newby3fe5f852015-01-13 04:22:14 +0000214def toList(item):
215 if not item:
216 return []
217 if isinstance(item, list):
218 return item
219 return [item]
220
221
James E. Blaire9d45c32012-05-31 09:56:45 -0700222class Scheduler(threading.Thread):
James E. Blaire4de4f42017-01-19 10:35:24 -0800223 """The engine of Zuul.
224
225 The Scheduler is reponsible for recieving events and dispatching
226 them to appropriate components (including pipeline managers,
227 mergers and launchers).
228
229 It runs a single threaded main loop which processes events
230 received one at a time and takes action as appropriate. Other
231 parts of Zuul may run in their own thread, but synchronization is
232 performed within the scheduler to reduce or eliminate the need for
233 locking in most circumstances.
234
235 The main daemon will have one instance of the Scheduler class
236 running which will persist for the life of the process. The
237 Scheduler instance is supplied to other Zuul components so that
238 they can submit events or otherwise communicate with other
239 components.
240
241 """
242
James E. Blairee743612012-05-29 14:49:32 -0700243 log = logging.getLogger("zuul.Scheduler")
244
James E. Blaire4d229c2016-05-25 15:25:41 -0700245 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700246 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400247 self.daemon = True
James E. Blairee743612012-05-29 14:49:32 -0700248 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700249 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800250 self.run_handler_lock = threading.Lock()
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700251 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700252 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700253 self._stopped = False
James E. Blairee743612012-05-29 14:49:32 -0700254 self.launcher = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800255 self.merger = None
James E. Blair83005782015-12-11 14:46:03 -0800256 self.connections = None
James E. Blair552b54f2016-07-22 13:55:32 -0700257 self.statsd = extras.try_import('statsd.statsd')
James E. Blair83005782015-12-11 14:46:03 -0800258 # TODO(jeblair): fix this
James E. Blairaf17a972016-02-03 15:07:18 -0800259 self.mutex = MutexHandler()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000260 # Despite triggers being part of the pipeline, there is one trigger set
261 # per scheduler. The pipeline handles the trigger filters but since
262 # the events are handled by the scheduler itself it needs to handle
263 # the loading of the triggers.
264 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700265 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000266 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700267
268 self.trigger_event_queue = Queue.Queue()
269 self.result_event_queue = Queue.Queue()
James E. Blair468c8512013-12-06 13:27:19 -0800270 self.management_event_queue = Queue.Queue()
James E. Blair59fdbac2015-12-07 17:08:06 -0800271 self.abide = model.Abide()
James E. Blairee743612012-05-29 14:49:32 -0700272
James E. Blaire4d229c2016-05-25 15:25:41 -0700273 if not testonly:
274 time_dir = self._get_time_database_dir()
275 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700276
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000277 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400278 self.last_reconfigured = None
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400279
James E. Blairb0fcae42012-07-17 11:12:10 -0700280 def stop(self):
281 self._stopped = True
Joshua Hesketh352264b2015-08-11 23:42:08 +1000282 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700283 self.wake_event.set()
284
Joshua Hesketh352264b2015-08-11 23:42:08 +1000285 def testConfig(self, config_path, connections):
286 # Take the list of set up connections directly here rather than with
287 # registerConnections as we don't want to do the onLoad event yet.
288 return self._parseConfig(config_path, connections)
James E. Blair47958382013-01-10 17:26:02 -0800289
Joshua Hesketh9a256752016-04-04 13:38:51 +1000290 def registerConnections(self, connections, load=True):
291 # load: whether or not to trigger the onLoad for the connection. This
292 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000293 self.connections = connections
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +1000294 self.connections.registerScheduler(self, load)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000295
296 def stopConnections(self):
James E. Blair83005782015-12-11 14:46:03 -0800297 self.connections.stop()
James E. Blair14abdf42015-12-09 16:11:53 -0800298
James E. Blairee743612012-05-29 14:49:32 -0700299 def setLauncher(self, launcher):
300 self.launcher = launcher
301
James E. Blair4076e2b2014-01-28 12:42:20 -0800302 def setMerger(self, merger):
303 self.merger = merger
304
James E. Blair8d692392016-04-08 17:47:58 -0700305 def setNodepool(self, nodepool):
306 self.nodepool = nodepool
307
James E. Blairdce6cea2016-12-20 16:45:32 -0800308 def setZooKeeper(self, zk):
309 self.zk = zk
310
James E. Blairee743612012-05-29 14:49:32 -0700311 def addEvent(self, event):
312 self.log.debug("Adding trigger event: %s" % event)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800313 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700314 if self.statsd:
315 self.statsd.incr('gerrit.event.%s' % event.type)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800316 except:
317 self.log.exception("Exception reporting event stats")
James E. Blairee743612012-05-29 14:49:32 -0700318 self.trigger_event_queue.put(event)
319 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800320 self.log.debug("Done adding trigger event: %s" % event)
James E. Blairee743612012-05-29 14:49:32 -0700321
James E. Blair11700c32012-07-05 17:50:05 -0700322 def onBuildStarted(self, build):
323 self.log.debug("Adding start event for build: %s" % build)
James E. Blair71e94122012-12-24 17:53:08 -0800324 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800325 event = BuildStartedEvent(build)
326 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700327 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800328 self.log.debug("Done adding start event for build: %s" % build)
James E. Blair11700c32012-07-05 17:50:05 -0700329
James E. Blairf0358662015-07-20 15:19:12 -0700330 def onBuildCompleted(self, build, result):
331 self.log.debug("Adding complete event for build: %s result: %s" % (
332 build, result))
James E. Blair71e94122012-12-24 17:53:08 -0800333 build.end_time = time.time()
James E. Blairf0358662015-07-20 15:19:12 -0700334 # Note, as soon as the result is set, other threads may act
335 # upon this, even though the event hasn't been fully
336 # processed. Ensure that any other data from the event (eg,
337 # timing) is recorded before setting the result.
338 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800339 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700340 if self.statsd and build.pipeline:
James E. Blair66eeebf2013-07-27 17:44:32 -0700341 jobname = build.job.name.replace('.', '_')
Timothy Chavezb2332082015-08-07 20:08:04 -0500342 key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700343 self.statsd.incr(key)
Timothy Chavezb2332082015-08-07 20:08:04 -0500344 for label in build.node_labels:
345 # Jenkins includes the node name in its list of labels, so
346 # we filter it out here, since that is not statistically
347 # interesting.
348 if label == build.node_name:
349 continue
350 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair50aacbc2015-11-17 14:09:59 -0800351 key = 'zuul.pipeline.%s.label.%s.wait_time' % (
352 build.pipeline.name, label)
James E. Blair552b54f2016-07-22 13:55:32 -0700353 self.statsd.timing(key, dt)
James E. Blair66eeebf2013-07-27 17:44:32 -0700354 key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
355 jobname, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800356 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
357 dt = int((build.end_time - build.start_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700358 self.statsd.timing(key, dt)
359 self.statsd.incr(key)
James E. Blair50aacbc2015-11-17 14:09:59 -0800360
361 key = 'zuul.pipeline.%s.job.%s.wait_time' % (
362 build.pipeline.name, jobname)
363 dt = int((build.start_time - build.launch_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700364 self.statsd.timing(key, dt)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800365 except:
366 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800367 event = BuildCompletedEvent(build)
368 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700369 self.wake_event.set()
James E. Blairf62d4282012-12-31 17:01:50 -0800370 self.log.debug("Done adding complete event for build: %s" % build)
James E. Blairee743612012-05-29 14:49:32 -0700371
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700372 def onMergeCompleted(self, build_set, zuul_url, merged, updated,
373 commit, files):
James E. Blair4076e2b2014-01-28 12:42:20 -0800374 self.log.debug("Adding merge complete event for build set: %s" %
375 build_set)
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700376 event = MergeCompletedEvent(build_set, zuul_url, merged,
377 updated, commit, files)
James E. Blair4076e2b2014-01-28 12:42:20 -0800378 self.result_event_queue.put(event)
379 self.wake_event.set()
380
James E. Blair8d692392016-04-08 17:47:58 -0700381 def onNodesProvisioned(self, req):
382 self.log.debug("Adding nodes provisioned event for build set: %s" %
383 req.build_set)
384 event = NodesProvisionedEvent(req)
385 self.result_event_queue.put(event)
386 self.wake_event.set()
387
James E. Blaire9d45c32012-05-31 09:56:45 -0700388 def reconfigure(self, config):
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700389 self.log.debug("Prepare to reconfigure")
James E. Blair468c8512013-12-06 13:27:19 -0800390 event = ReconfigureEvent(config)
391 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700392 self.wake_event.set()
393 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800394 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700395 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400396 self.last_reconfigured = int(time.time())
James E. Blaire9d45c32012-05-31 09:56:45 -0700397
Paul Belangerbaca3132016-11-04 12:49:54 -0400398 def promote(self, tenant_name, pipeline_name, change_ids):
399 event = PromoteEvent(tenant_name, pipeline_name, change_ids)
James E. Blair36658cf2013-12-06 17:53:48 -0800400 self.management_event_queue.put(event)
401 self.wake_event.set()
402 self.log.debug("Waiting for promotion")
403 event.wait()
404 self.log.debug("Promotion complete")
405
James E. Blaird27a96d2014-07-10 13:25:13 -0700406 def enqueue(self, trigger_event):
407 event = EnqueueEvent(trigger_event)
408 self.management_event_queue.put(event)
409 self.wake_event.set()
410 self.log.debug("Waiting for enqueue")
411 event.wait()
412 self.log.debug("Enqueue complete")
413
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700414 def exit(self):
415 self.log.debug("Prepare to exit")
416 self._pause = True
417 self._exit = True
418 self.wake_event.set()
419 self.log.debug("Waiting for exit")
420
421 def _get_queue_pickle_file(self):
James E. Blair5a95c862012-07-09 15:11:17 -0700422 if self.config.has_option('zuul', 'state_dir'):
423 state_dir = os.path.expanduser(self.config.get('zuul',
424 'state_dir'))
425 else:
426 state_dir = '/var/lib/zuul'
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700427 return os.path.join(state_dir, 'queue.pickle')
428
James E. Blairce8a2132016-05-19 15:21:52 -0700429 def _get_time_database_dir(self):
430 if self.config.has_option('zuul', 'state_dir'):
431 state_dir = os.path.expanduser(self.config.get('zuul',
432 'state_dir'))
433 else:
434 state_dir = '/var/lib/zuul'
435 d = os.path.join(state_dir, 'times')
436 if not os.path.exists(d):
437 os.mkdir(d)
438 return d
439
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700440 def _save_queue(self):
441 pickle_file = self._get_queue_pickle_file()
442 events = []
443 while not self.trigger_event_queue.empty():
444 events.append(self.trigger_event_queue.get())
445 self.log.debug("Queue length is %s" % len(events))
446 if events:
447 self.log.debug("Saving queue")
448 pickle.dump(events, open(pickle_file, 'wb'))
449
450 def _load_queue(self):
451 pickle_file = self._get_queue_pickle_file()
452 if os.path.exists(pickle_file):
453 self.log.debug("Loading queue")
454 events = pickle.load(open(pickle_file, 'rb'))
455 self.log.debug("Queue length is %s" % len(events))
456 for event in events:
457 self.trigger_event_queue.put(event)
458 else:
459 self.log.debug("No queue file found")
460
461 def _delete_queue(self):
462 pickle_file = self._get_queue_pickle_file()
463 if os.path.exists(pickle_file):
464 self.log.debug("Deleting saved queue")
465 os.unlink(pickle_file)
466
467 def resume(self):
468 try:
469 self._load_queue()
470 except:
471 self.log.exception("Unable to load queue")
472 try:
473 self._delete_queue()
474 except:
475 self.log.exception("Unable to delete saved queue")
476 self.log.debug("Resuming queue processing")
477 self.wake_event.set()
478
479 def _doPauseEvent(self):
480 if self._exit:
481 self.log.debug("Exiting")
482 self._save_queue()
483 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700484
James E. Blair468c8512013-12-06 13:27:19 -0800485 def _doReconfigureEvent(self, event):
486 # This is called in the scheduler loop after another thread submits
487 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700488 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800489 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700490 try:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700491 self.log.debug("Performing reconfiguration")
James E. Blair83005782015-12-11 14:46:03 -0800492 loader = configloader.ConfigLoader()
493 abide = loader.loadConfig(
494 self.config.get('zuul', 'tenant_config'),
495 self, self.merger, self.connections)
James E. Blair59fdbac2015-12-07 17:08:06 -0800496 for tenant in abide.tenants.values():
497 self._reconfigureTenant(tenant)
498 self.abide = abide
James E. Blaircdccd972013-07-01 12:10:22 -0700499 finally:
500 self.layout_lock.release()
James E. Blaire9d45c32012-05-31 09:56:45 -0700501
James E. Blair552b54f2016-07-22 13:55:32 -0700502 def _reenqueueTenant(self, old_tenant, tenant):
James E. Blair59fdbac2015-12-07 17:08:06 -0800503 for name, new_pipeline in tenant.layout.pipelines.items():
504 old_pipeline = old_tenant.layout.pipelines.get(name)
505 if not old_pipeline:
506 self.log.warning("No old pipeline matching %s found "
507 "when reconfiguring" % name)
508 continue
509 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
510 items_to_remove = []
511 builds_to_cancel = []
512 last_head = None
513 for shared_queue in old_pipeline.queues:
514 for item in shared_queue.queue:
515 if not item.item_ahead:
516 last_head = item
517 item.item_ahead = None
518 item.items_behind = []
519 item.pipeline = None
520 item.queue = None
521 project_name = item.change.project.name
522 item.change.project = new_pipeline.source.getProject(
523 project_name)
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700524 if new_pipeline.manager.reEnqueueItem(item,
525 last_head):
526 new_jobs = item.getJobs()
527 for build in item.current_build_set.getBuilds():
Paul Belangere22baea2016-11-03 16:59:27 -0400528 jobtree = item.job_tree.getJobTreeForJob(build.job)
529 if jobtree and jobtree.job in new_jobs:
530 build.job = jobtree.job
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700531 else:
532 item.removeBuild(build)
533 builds_to_cancel.append(build)
534 else:
James E. Blair59fdbac2015-12-07 17:08:06 -0800535 items_to_remove.append(item)
536 for item in items_to_remove:
537 for build in item.current_build_set.getBuilds():
538 builds_to_cancel.append(build)
539 for build in builds_to_cancel:
540 self.log.warning(
541 "Canceling build %s during reconfiguration" % (build,))
542 try:
543 self.launcher.cancel(build)
544 except Exception:
545 self.log.exception(
546 "Exception while canceling build %s "
547 "for change %s" % (build, item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100548 finally:
549 self.mutex.release(build.build_set.item, build.job)
James E. Blair552b54f2016-07-22 13:55:32 -0700550
551 def _reconfigureTenant(self, tenant):
552 # This is called from _doReconfigureEvent while holding the
553 # layout lock
554 old_tenant = self.abide.tenants.get(tenant.name)
555 if old_tenant:
556 self._reenqueueTenant(old_tenant, tenant)
James E. Blair59fdbac2015-12-07 17:08:06 -0800557 # TODOv3(jeblair): update for tenants
James E. Blair552b54f2016-07-22 13:55:32 -0700558 # self.maintainConnectionCache()
James E. Blaire511d2f2016-12-08 15:22:26 -0800559 self.connections.reconfigureDrivers(tenant)
560 # TODOv3(jeblair): remove postconfig calls?
James E. Blair59fdbac2015-12-07 17:08:06 -0800561 for pipeline in tenant.layout.pipelines.values():
562 pipeline.source.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700563 for trigger in pipeline.triggers:
564 trigger.postConfig(pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800565 for reporter in pipeline.actions:
566 reporter.postConfig()
James E. Blair552b54f2016-07-22 13:55:32 -0700567 if self.statsd:
James E. Blair59fdbac2015-12-07 17:08:06 -0800568 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700569 for pipeline in tenant.layout.pipelines.values():
James E. Blair59fdbac2015-12-07 17:08:06 -0800570 items = len(pipeline.getAllItems())
571 # stats.gauges.zuul.pipeline.NAME.current_changes
572 key = 'zuul.pipeline.%s' % pipeline.name
James E. Blair552b54f2016-07-22 13:55:32 -0700573 self.statsd.gauge(key + '.current_changes', items)
James E. Blair59fdbac2015-12-07 17:08:06 -0800574 except Exception:
575 self.log.exception("Exception reporting initial "
576 "pipeline stats:")
577
James E. Blair36658cf2013-12-06 17:53:48 -0800578 def _doPromoteEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400579 tenant = self.abide.tenants.get(event.tenant_name)
580 pipeline = tenant.layout.pipelines[event.pipeline_name]
James E. Blair36658cf2013-12-06 17:53:48 -0800581 change_ids = [c.split(',') for c in event.change_ids]
582 items_to_enqueue = []
583 change_queue = None
584 for shared_queue in pipeline.queues:
585 if change_queue:
586 break
587 for item in shared_queue.queue:
588 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000589 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800590 change_queue = shared_queue
591 break
592 if not change_queue:
593 raise Exception("Unable to find shared change queue for %s" %
594 event.change_ids[0])
595 for number, patchset in change_ids:
596 found = False
597 for item in change_queue.queue:
598 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000599 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800600 found = True
601 items_to_enqueue.append(item)
602 break
603 if not found:
604 raise Exception("Unable to find %s,%s in queue %s" %
605 (number, patchset, change_queue))
606 for item in change_queue.queue[:]:
607 if item not in items_to_enqueue:
608 items_to_enqueue.append(item)
609 pipeline.manager.cancelJobs(item)
610 pipeline.manager.dequeueItem(item)
611 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500612 pipeline.manager.addChange(
613 item.change,
614 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700615 quiet=True,
616 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800617
James E. Blaird27a96d2014-07-10 13:25:13 -0700618 def _doEnqueueEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400619 tenant = self.abide.tenants.get(event.tenant_name)
620 project = tenant.layout.project_configs.get(event.project_name)
621 pipeline = tenant.layout.pipelines[event.forced_pipeline]
James E. Blairc0dedf82014-08-06 09:37:52 -0700622 change = pipeline.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700623 self.log.debug("Event %s for change %s was directly assigned "
624 "to pipeline %s" % (event, change, self))
James E. Blair59fdbac2015-12-07 17:08:06 -0800625 self.log.info("Adding %s %s to %s" %
James E. Blaird27a96d2014-07-10 13:25:13 -0700626 (project, change, pipeline))
627 pipeline.manager.addChange(change, ignore_requirements=True)
628
James E. Blaire9d45c32012-05-31 09:56:45 -0700629 def _areAllBuildsComplete(self):
630 self.log.debug("Checking if all builds are complete")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700631 if self.merger.areMergesOutstanding():
632 self.log.debug("Waiting on merger")
633 return False
James E. Blaire9d45c32012-05-31 09:56:45 -0700634 waiting = False
Paul Belangerdebd7a72016-11-11 19:56:15 -0500635 for tenant in self.abide.tenants.values():
636 for pipeline in tenant.layout.pipelines.values():
637 for item in pipeline.getAllItems():
638 for build in item.current_build_set.getBuilds():
639 if build.result is None:
640 self.log.debug("%s waiting on %s" %
641 (pipeline.manager, build))
642 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700643 if not waiting:
644 self.log.debug("All builds are complete")
645 return True
James E. Blaire9d45c32012-05-31 09:56:45 -0700646 return False
647
James E. Blairee743612012-05-29 14:49:32 -0700648 def run(self):
James E. Blair552b54f2016-07-22 13:55:32 -0700649 if self.statsd:
James E. Blair71e94122012-12-24 17:53:08 -0800650 self.log.debug("Statsd enabled")
651 else:
652 self.log.debug("Statsd disabled because python statsd "
653 "package not found")
James E. Blairee743612012-05-29 14:49:32 -0700654 while True:
655 self.log.debug("Run handler sleeping")
656 self.wake_event.wait()
657 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700658 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800659 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700660 return
James E. Blairee743612012-05-29 14:49:32 -0700661 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800662 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700663 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800664 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800665 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700666
James E. Blair263fba92013-02-27 13:07:19 -0800667 # Give result events priority -- they let us stop builds,
James E. Blair59fdbac2015-12-07 17:08:06 -0800668 # whereas trigger events cause us to launch builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800669 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700670 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800671
672 if not self._pause:
673 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800674 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700675
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700676 if self._pause and self._areAllBuildsComplete():
677 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700678
James E. Blair59fdbac2015-12-07 17:08:06 -0800679 for tenant in self.abide.tenants.values():
680 for pipeline in tenant.layout.pipelines.values():
681 while pipeline.manager.processQueue():
682 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700683
James E. Blaira84f0e42014-02-06 07:09:22 -0800684 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700685 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800686 # There may still be more events to process
687 self.wake_event.set()
688 finally:
689 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700690
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100691 def maintainConnectionCache(self):
James E. Blair59fdbac2015-12-07 17:08:06 -0800692 # TODOv3(jeblair): update for tenants
James E. Blair0e933c52013-07-11 10:18:52 -0700693 relevant = set()
James E. Blair59fdbac2015-12-07 17:08:06 -0800694 for tenant in self.abide.tenants.values():
695 for pipeline in tenant.layout.pipelines.values():
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100696 self.log.debug("Gather relevant cache items for: %s" %
James E. Blair59fdbac2015-12-07 17:08:06 -0800697 pipeline)
Joshua Heskethdc7820c2016-03-11 13:14:28 +1100698
James E. Blair59fdbac2015-12-07 17:08:06 -0800699 for item in pipeline.getAllItems():
700 relevant.add(item.change)
701 relevant.update(item.change.getRelatedChanges())
Joshua Hesketh4bd7da32016-02-17 20:58:47 +1100702 for connection in self.connections.values():
703 connection.maintainCache(relevant)
704 self.log.debug(
705 "End maintain connection cache for: %s" % connection)
706 self.log.debug("Connection cache size: %s" % len(relevant))
James E. Blair0e933c52013-07-11 10:18:52 -0700707
James E. Blairee743612012-05-29 14:49:32 -0700708 def process_event_queue(self):
709 self.log.debug("Fetching trigger event")
710 event = self.trigger_event_queue.get()
711 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800712 try:
James E. Blair59fdbac2015-12-07 17:08:06 -0800713 for tenant in self.abide.tenants.values():
714 for pipeline in tenant.layout.pipelines.values():
715 # Get the change even if the project is unknown to
716 # us for the use of updating the cache if there is
717 # another change depending on this foreign one.
718 try:
719 change = pipeline.source.getChange(event)
720 except exceptions.ChangeNotFound as e:
721 self.log.debug("Unable to get change %s from "
722 "source %s (most likely looking "
723 "for a change from another "
724 "connection trigger)",
725 e.change, pipeline.source)
726 continue
727 if event.type == 'patchset-created':
728 pipeline.manager.removeOldVersionsOfChange(change)
729 elif event.type == 'change-abandoned':
730 pipeline.manager.removeAbandonedChange(change)
731 if pipeline.manager.eventMatches(event, change):
732 self.log.info("Adding %s %s to %s" %
733 (change.project, change, pipeline))
734 pipeline.manager.addChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800735 finally:
James E. Blairff791972013-01-09 11:45:43 -0800736 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700737
James E. Blair468c8512013-12-06 13:27:19 -0800738 def process_management_queue(self):
739 self.log.debug("Fetching management event")
740 event = self.management_event_queue.get()
741 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800742 try:
743 if isinstance(event, ReconfigureEvent):
744 self._doReconfigureEvent(event)
745 elif isinstance(event, PromoteEvent):
746 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700747 elif isinstance(event, EnqueueEvent):
748 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800749 else:
750 self.log.error("Unable to handle event %s" % event)
751 event.done()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700752 except Exception:
753 event.exception(sys.exc_info())
James E. Blair468c8512013-12-06 13:27:19 -0800754 self.management_event_queue.task_done()
755
James E. Blairee743612012-05-29 14:49:32 -0700756 def process_result_queue(self):
757 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800758 event = self.result_event_queue.get()
759 self.log.debug("Processing result event %s" % event)
760 try:
761 if isinstance(event, BuildStartedEvent):
762 self._doBuildStartedEvent(event)
763 elif isinstance(event, BuildCompletedEvent):
764 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800765 elif isinstance(event, MergeCompletedEvent):
766 self._doMergeCompletedEvent(event)
James E. Blair8d692392016-04-08 17:47:58 -0700767 elif isinstance(event, NodesProvisionedEvent):
768 self._doNodesProvisionedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800769 else:
770 self.log.error("Unable to handle event %s" % event)
771 finally:
772 self.result_event_queue.task_done()
773
774 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800775 build = event.build
776 if build.build_set is not build.build_set.item.current_build_set:
777 self.log.warning("Build %s is not in the current build set" %
778 (build,))
779 return
780 pipeline = build.build_set.item.pipeline
781 if not pipeline:
782 self.log.warning("Build %s is not associated with a pipeline" %
783 (build,))
784 return
James E. Blairce8a2132016-05-19 15:21:52 -0700785 try:
786 build.estimated_time = float(self.time_database.getEstimatedTime(
787 build.job.name))
788 except Exception:
789 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800790 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800791
792 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800793 build = event.build
James E. Blaire18d4602017-01-05 11:17:28 -0800794
795 # Regardless of any other conditions which might cause us not
796 # to pass this on to the pipeline manager, make sure we return
797 # the nodes to nodepool.
798 try:
799 nodeset = build.build_set.getJobNodeSet(build.job.name)
James E. Blair1511bc32017-01-18 09:25:31 -0800800 self.nodepool.returnNodeSet(nodeset)
James E. Blaire18d4602017-01-05 11:17:28 -0800801 except Exception:
802 self.log.exception("Unable to return nodeset %s" % (nodeset,))
803
James E. Blair4076e2b2014-01-28 12:42:20 -0800804 if build.build_set is not build.build_set.item.current_build_set:
James E. Blaire18d4602017-01-05 11:17:28 -0800805 self.log.debug("Build %s is not in the current build set" %
806 (build,))
James E. Blair4076e2b2014-01-28 12:42:20 -0800807 return
808 pipeline = build.build_set.item.pipeline
809 if not pipeline:
810 self.log.warning("Build %s is not associated with a pipeline" %
811 (build,))
812 return
James E. Blairce8a2132016-05-19 15:21:52 -0700813 if build.end_time and build.start_time and build.result:
814 duration = build.end_time - build.start_time
Paul Belanger87e4ab02016-06-08 14:17:20 -0400815 try:
816 self.time_database.update(
817 build.job.name, duration, build.result)
818 except Exception:
819 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800820 pipeline.manager.onBuildCompleted(event.build)
821
822 def _doMergeCompletedEvent(self, event):
823 build_set = event.build_set
824 if build_set is not build_set.item.current_build_set:
825 self.log.warning("Build set %s is not current" % (build_set,))
826 return
827 pipeline = build_set.item.pipeline
828 if not pipeline:
829 self.log.warning("Build set %s is not associated with a pipeline" %
830 (build_set,))
831 return
832 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -0700833
James E. Blair8d692392016-04-08 17:47:58 -0700834 def _doNodesProvisionedEvent(self, event):
835 request = event.request
836 build_set = request.build_set
James E. Blaira38c28e2017-01-04 10:33:20 -0800837
James E. Blair6ab79e02017-01-06 10:10:17 -0800838 self.nodepool.acceptNodes(request)
James E. Blaira38c28e2017-01-04 10:33:20 -0800839
James E. Blair8d692392016-04-08 17:47:58 -0700840 if build_set is not build_set.item.current_build_set:
841 self.log.warning("Build set %s is not current" % (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -0800842 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -0800843 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -0700844 return
845 pipeline = build_set.item.pipeline
846 if not pipeline:
847 self.log.warning("Build set %s is not associated with a pipeline" %
848 (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -0800849 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -0800850 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -0700851 return
852 pipeline.manager.onNodesProvisioned(event)
853
Paul Belanger6349d152016-10-30 16:21:17 -0400854 def formatStatusJSON(self, tenant_name):
James E. Blair59fdbac2015-12-07 17:08:06 -0800855 # TODOv3(jeblair): use tenants
James E. Blairb7273ef2016-04-19 08:58:51 -0700856 if self.config.has_option('zuul', 'url_pattern'):
857 url_pattern = self.config.get('zuul', 'url_pattern')
858 else:
859 url_pattern = None
860
James E. Blair8dbd56a2012-12-22 10:55:10 -0800861 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400862
863 data['zuul_version'] = self.zuul_version
864
James E. Blair8dbd56a2012-12-22 10:55:10 -0800865 if self._pause:
866 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -0800867 if self._exit:
868 ret += 'exit'
869 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
870 ret += '</p>'
871 data['message'] = ret
872
James E. Blairfb682cc2013-02-26 15:23:27 -0800873 data['trigger_event_queue'] = {}
874 data['trigger_event_queue']['length'] = \
875 self.trigger_event_queue.qsize()
876 data['result_event_queue'] = {}
877 data['result_event_queue']['length'] = \
878 self.result_event_queue.qsize()
879
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400880 if self.last_reconfigured:
881 data['last_reconfigured'] = self.last_reconfigured * 1000
882
James E. Blair8dbd56a2012-12-22 10:55:10 -0800883 pipelines = []
884 data['pipelines'] = pipelines
Paul Belanger6349d152016-10-30 16:21:17 -0400885 tenant = self.abide.tenants.get(tenant_name)
886 for pipeline in tenant.layout.pipelines.values():
James E. Blairb7273ef2016-04-19 08:58:51 -0700887 pipelines.append(pipeline.formatStatusJSON(url_pattern))
James E. Blair8dbd56a2012-12-22 10:55:10 -0800888 return json.dumps(data)