blob: a2e3b6eb1eab26ca0769e665a86fda09fe0f401f [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair8dbd56a2012-12-22 10:55:10 -080018import json
James E. Blairee743612012-05-29 14:49:32 -070019import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080020import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070021import pickle
Monty Taylorb934c1a2017-06-16 19:31:47 -050022import queue
James E. Blair8b2a1472017-02-19 15:33:55 -080023import socket
James E. Blair36658cf2013-12-06 17:53:48 -080024import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080025import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
James E. Blairee743612012-05-29 14:49:32 -070027
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +100028from zuul import configloader
Morgan Fainberg9c4700a2016-05-30 14:25:19 -070029from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080030from zuul import exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040031from zuul import version as zuul_version
James E. Blairbdd50e62017-10-21 08:18:55 -070032from zuul import rpclistener
Paul Belanger40d3ce62017-11-28 11:49:55 -050033from zuul.lib import commandsocket
Tristan Cacqueray91601d72017-06-15 06:00:12 +000034from zuul.lib.config import get_default
James E. Blairded241e2017-10-10 13:22:40 -070035from zuul.lib.statsd import get_statsd
James E. Blair419a8672017-10-18 14:48:25 -070036import zuul.lib.queue
James E. Blairee743612012-05-29 14:49:32 -070037
Paul Belanger40d3ce62017-11-28 11:49:55 -050038COMMANDS = ['stop']
39
James E. Blair1e8dd892012-05-30 09:15:05 -070040
James E. Blair468c8512013-12-06 13:27:19 -080041class ManagementEvent(object):
42 """An event that should be processed within the main queue run loop"""
43 def __init__(self):
44 self._wait_event = threading.Event()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070045 self._exc_info = None
James E. Blair468c8512013-12-06 13:27:19 -080046
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070047 def exception(self, exc_info):
48 self._exc_info = exc_info
James E. Blair36658cf2013-12-06 17:53:48 -080049 self._wait_event.set()
50
51 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080052 self._wait_event.set()
53
54 def wait(self, timeout=None):
55 self._wait_event.wait(timeout)
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070056 if self._exc_info:
Thomas Bechtold7f68ec42017-06-30 14:24:52 +020057 # sys.exc_info returns (type, value, traceback)
58 type_, exception_instance, traceback = self._exc_info
59 raise exception_instance.with_traceback(traceback)
James E. Blair468c8512013-12-06 13:27:19 -080060 return self._wait_event.is_set()
61
62
63class ReconfigureEvent(ManagementEvent):
64 """Reconfigure the scheduler. The layout will be (re-)loaded from
65 the path specified in the configuration.
66
67 :arg ConfigParser config: the new configuration
68 """
69 def __init__(self, config):
70 super(ReconfigureEvent, self).__init__()
71 self.config = config
72
73
James E. Blair646322f2017-01-27 15:50:34 -080074class TenantReconfigureEvent(ManagementEvent):
75 """Reconfigure the given tenant. The layout will be (re-)loaded from
76 the path specified in the configuration.
77
78 :arg Tenant tenant: the tenant to reconfigure
James E. Blaira615c362017-10-02 17:34:42 -070079 :arg Project project: if supplied, clear the cached configuration
80 from this project first
James E. Blair646322f2017-01-27 15:50:34 -080081 """
James E. Blaira615c362017-10-02 17:34:42 -070082 def __init__(self, tenant, project):
James E. Blair646322f2017-01-27 15:50:34 -080083 super(TenantReconfigureEvent, self).__init__()
James E. Blair419a8672017-10-18 14:48:25 -070084 self.tenant_name = tenant.name
85 self.projects = set([project])
86
87 def __ne__(self, other):
88 return not self.__eq__(other)
89
90 def __eq__(self, other):
91 if not isinstance(other, TenantReconfigureEvent):
92 return False
93 # We don't check projects because they will get combined when
94 # merged.
95 return (self.tenant_name == other.tenant_name)
96
97 def merge(self, other):
98 if self.tenant_name != other.tenant_name:
99 raise Exception("Can not merge events from different tenants")
100 self.projects |= other.projects
James E. Blair646322f2017-01-27 15:50:34 -0800101
102
James E. Blair36658cf2013-12-06 17:53:48 -0800103class PromoteEvent(ManagementEvent):
104 """Promote one or more changes to the head of the queue.
105
Paul Belangerbaca3132016-11-04 12:49:54 -0400106 :arg str tenant_name: the name of the tenant
James E. Blair36658cf2013-12-06 17:53:48 -0800107 :arg str pipeline_name: the name of the pipeline
108 :arg list change_ids: a list of strings of change ids in the form
109 1234,1
110 """
111
Paul Belangerbaca3132016-11-04 12:49:54 -0400112 def __init__(self, tenant_name, pipeline_name, change_ids):
James E. Blair36658cf2013-12-06 17:53:48 -0800113 super(PromoteEvent, self).__init__()
Paul Belangerbaca3132016-11-04 12:49:54 -0400114 self.tenant_name = tenant_name
James E. Blair36658cf2013-12-06 17:53:48 -0800115 self.pipeline_name = pipeline_name
116 self.change_ids = change_ids
117
118
James E. Blaird27a96d2014-07-10 13:25:13 -0700119class EnqueueEvent(ManagementEvent):
120 """Enqueue a change into a pipeline
121
122 :arg TriggerEvent trigger_event: a TriggerEvent describing the
123 trigger, pipeline, and change to enqueue
124 """
125
126 def __init__(self, trigger_event):
127 super(EnqueueEvent, self).__init__()
128 self.trigger_event = trigger_event
129
130
James E. Blaira84f0e42014-02-06 07:09:22 -0800131class ResultEvent(object):
132 """An event that needs to modify the pipeline state due to a
133 result from an external system."""
134
135 pass
136
137
138class BuildStartedEvent(ResultEvent):
139 """A build has started.
140
141 :arg Build build: The build which has started.
142 """
143
144 def __init__(self, build):
145 self.build = build
146
147
148class BuildCompletedEvent(ResultEvent):
149 """A build has completed
150
151 :arg Build build: The build which has completed.
152 """
153
154 def __init__(self, build):
155 self.build = build
156
157
James E. Blair4076e2b2014-01-28 12:42:20 -0800158class MergeCompletedEvent(ResultEvent):
159 """A remote merge operation has completed
160
161 :arg BuildSet build_set: The build_set which is ready.
James E. Blair4076e2b2014-01-28 12:42:20 -0800162 :arg bool merged: Whether the merge succeeded (changes with refs).
163 :arg bool updated: Whether the repo was updated (changes without refs).
164 :arg str commit: The SHA of the merged commit (changes with refs).
James E. Blair1960d682017-04-28 15:44:14 -0700165 :arg dict repo_state: The starting repo state before the merge.
James E. Blair4076e2b2014-01-28 12:42:20 -0800166 """
167
Tobias Henkel34ee0882017-07-31 22:26:12 +0200168 def __init__(self, build_set, merged, updated, commit,
James E. Blair1960d682017-04-28 15:44:14 -0700169 files, repo_state):
James E. Blair4076e2b2014-01-28 12:42:20 -0800170 self.build_set = build_set
James E. Blair4076e2b2014-01-28 12:42:20 -0800171 self.merged = merged
172 self.updated = updated
173 self.commit = commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700174 self.files = files
James E. Blair1960d682017-04-28 15:44:14 -0700175 self.repo_state = repo_state
James E. Blair4076e2b2014-01-28 12:42:20 -0800176
177
James E. Blair8d692392016-04-08 17:47:58 -0700178class NodesProvisionedEvent(ResultEvent):
179 """Nodes have been provisioned for a build_set
180
181 :arg BuildSet build_set: The build_set which has nodes.
182 :arg list of Node objects nodes: The provisioned nodes
183 """
184
185 def __init__(self, request):
186 self.request = request
David Shrewsbury94e95882017-10-04 15:26:04 -0400187 self.request_id = request.id
James E. Blair8d692392016-04-08 17:47:58 -0700188
189
James E. Blaire9d45c32012-05-31 09:56:45 -0700190class Scheduler(threading.Thread):
James E. Blaire4de4f42017-01-19 10:35:24 -0800191 """The engine of Zuul.
192
193 The Scheduler is reponsible for recieving events and dispatching
194 them to appropriate components (including pipeline managers,
Paul Belanger174a8272017-03-14 13:20:10 -0400195 mergers and executors).
James E. Blaire4de4f42017-01-19 10:35:24 -0800196
197 It runs a single threaded main loop which processes events
198 received one at a time and takes action as appropriate. Other
199 parts of Zuul may run in their own thread, but synchronization is
200 performed within the scheduler to reduce or eliminate the need for
201 locking in most circumstances.
202
203 The main daemon will have one instance of the Scheduler class
204 running which will persist for the life of the process. The
205 Scheduler instance is supplied to other Zuul components so that
206 they can submit events or otherwise communicate with other
207 components.
208
209 """
210
James E. Blairee743612012-05-29 14:49:32 -0700211 log = logging.getLogger("zuul.Scheduler")
James E. Blair4dd5f4b2017-10-23 07:44:08 -0700212 _stats_interval = 30
James E. Blairee743612012-05-29 14:49:32 -0700213
James E. Blaire4d229c2016-05-25 15:25:41 -0700214 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700215 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400216 self.daemon = True
James E. Blair8b2a1472017-02-19 15:33:55 -0800217 self.hostname = socket.gethostname()
James E. Blairee743612012-05-29 14:49:32 -0700218 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700219 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800220 self.run_handler_lock = threading.Lock()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500221 self.command_map = dict(
222 stop=self.stop,
223 )
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700224 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700225 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700226 self._stopped = False
Paul Belanger174a8272017-03-14 13:20:10 -0400227 self.executor = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800228 self.merger = None
James E. Blair83005782015-12-11 14:46:03 -0800229 self.connections = None
James E. Blairded241e2017-10-10 13:22:40 -0700230 self.statsd = get_statsd(config)
James E. Blairbdd50e62017-10-21 08:18:55 -0700231 self.rpc = rpclistener.RPCListener(config, self)
232 self.stats_thread = threading.Thread(target=self.runStats)
233 self.stats_stop = threading.Event()
James E. Blair83005782015-12-11 14:46:03 -0800234 # TODO(jeblair): fix this
Joshua Hesketh352264b2015-08-11 23:42:08 +1000235 # Despite triggers being part of the pipeline, there is one trigger set
236 # per scheduler. The pipeline handles the trigger filters but since
237 # the events are handled by the scheduler itself it needs to handle
238 # the loading of the triggers.
239 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700240 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000241 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700242
Monty Taylorb934c1a2017-06-16 19:31:47 -0500243 self.trigger_event_queue = queue.Queue()
244 self.result_event_queue = queue.Queue()
James E. Blair419a8672017-10-18 14:48:25 -0700245 self.management_event_queue = zuul.lib.queue.MergedQueue()
James E. Blair59fdbac2015-12-07 17:08:06 -0800246 self.abide = model.Abide()
James E. Blairee743612012-05-29 14:49:32 -0700247
James E. Blaire4d229c2016-05-25 15:25:41 -0700248 if not testonly:
249 time_dir = self._get_time_database_dir()
250 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700251
Paul Belanger40d3ce62017-11-28 11:49:55 -0500252 command_socket = get_default(
253 self.config, 'scheduler', 'command_socket',
254 '/var/lib/zuul/scheduler.socket')
255 self.command_socket = commandsocket.CommandSocket(command_socket)
256
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000257 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400258 self.last_reconfigured = None
Jesse Keating71a47ff2017-06-06 11:36:43 -0700259 self.tenant_last_reconfigured = {}
David Shrewsburyffab07a2017-07-24 12:45:07 -0400260 self.autohold_requests = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400261
James E. Blairbdd50e62017-10-21 08:18:55 -0700262 def start(self):
263 super(Scheduler, self).start()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500264 self._command_running = True
265 self.log.debug("Starting command processor")
266 self.command_socket.start()
267 self.command_thread = threading.Thread(target=self.runCommand,
268 name='command')
269 self.command_thread.daemon = True
270 self.command_thread.start()
271
James E. Blairbdd50e62017-10-21 08:18:55 -0700272 self.rpc.start()
273 self.stats_thread.start()
274
James E. Blairb0fcae42012-07-17 11:12:10 -0700275 def stop(self):
276 self._stopped = True
James E. Blairbdd50e62017-10-21 08:18:55 -0700277 self.stats_stop.set()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000278 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700279 self.wake_event.set()
James E. Blairbdd50e62017-10-21 08:18:55 -0700280 self.stats_thread.join()
281 self.rpc.stop()
282 self.rpc.join()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500283 self._command_running = False
284 self.command_socket.stop()
285
286 def runCommand(self):
287 while self._command_running:
288 try:
289 command = self.command_socket.get().decode('utf8')
290 if command != '_stop':
291 self.command_map[command]()
292 except Exception:
293 self.log.exception("Exception while processing command")
James E. Blairb0fcae42012-07-17 11:12:10 -0700294
Jan Hruban7083edd2015-08-21 14:00:54 +0200295 def registerConnections(self, connections, webapp, load=True):
Joshua Hesketh9a256752016-04-04 13:38:51 +1000296 # load: whether or not to trigger the onLoad for the connection. This
297 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000298 self.connections = connections
Jan Hruban7083edd2015-08-21 14:00:54 +0200299 self.connections.registerWebapp(webapp)
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +1000300 self.connections.registerScheduler(self, load)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000301
302 def stopConnections(self):
James E. Blair83005782015-12-11 14:46:03 -0800303 self.connections.stop()
James E. Blair14abdf42015-12-09 16:11:53 -0800304
Paul Belanger174a8272017-03-14 13:20:10 -0400305 def setExecutor(self, executor):
306 self.executor = executor
James E. Blairee743612012-05-29 14:49:32 -0700307
James E. Blair4076e2b2014-01-28 12:42:20 -0800308 def setMerger(self, merger):
309 self.merger = merger
310
James E. Blair8d692392016-04-08 17:47:58 -0700311 def setNodepool(self, nodepool):
312 self.nodepool = nodepool
313
James E. Blairdce6cea2016-12-20 16:45:32 -0800314 def setZooKeeper(self, zk):
315 self.zk = zk
316
James E. Blairbdd50e62017-10-21 08:18:55 -0700317 def runStats(self):
318 while not self.stats_stop.wait(self._stats_interval):
319 try:
320 self._runStats()
321 except Exception:
322 self.log.exception("Error in periodic stats:")
323
324 def _runStats(self):
325 if not self.statsd:
326 return
327 functions = self.rpc.getFunctions()
328 executors_accepting = 0
329 executors_online = 0
330 execute_queue = 0
331 execute_running = 0
332 mergers_online = 0
333 merge_queue = 0
334 merge_running = 0
335 for (name, (queued, running, registered)) in functions.items():
336 if name == 'executor:execute':
337 executors_accepting = registered
338 execute_queue = queued - running
339 execute_running = running
340 if name.startswith('executor:stop'):
341 executors_online += registered
342 if name == 'merger:merge':
343 mergers_online = registered
344 if name.startswith('merger:'):
345 merge_queue += queued - running
346 merge_running += running
347 self.statsd.gauge('zuul.mergers.online', mergers_online)
348 self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
349 self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
350 self.statsd.gauge('zuul.executors.online', executors_online)
351 self.statsd.gauge('zuul.executors.accepting', executors_accepting)
352 self.statsd.gauge('zuul.executors.jobs_running', execute_running)
353 self.statsd.gauge('zuul.executors.jobs_queued', execute_queue)
354
James E. Blairee743612012-05-29 14:49:32 -0700355 def addEvent(self, event):
James E. Blairee743612012-05-29 14:49:32 -0700356 self.trigger_event_queue.put(event)
357 self.wake_event.set()
358
James E. Blair11700c32012-07-05 17:50:05 -0700359 def onBuildStarted(self, build):
James E. Blair71e94122012-12-24 17:53:08 -0800360 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800361 event = BuildStartedEvent(build)
362 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700363 self.wake_event.set()
364
James E. Blair196f61a2017-06-30 15:42:29 -0700365 def onBuildCompleted(self, build, result, result_data):
James E. Blair71e94122012-12-24 17:53:08 -0800366 build.end_time = time.time()
James E. Blair196f61a2017-06-30 15:42:29 -0700367 build.result_data = result_data
James E. Blairf0358662015-07-20 15:19:12 -0700368 # Note, as soon as the result is set, other threads may act
369 # upon this, even though the event hasn't been fully
370 # processed. Ensure that any other data from the event (eg,
371 # timing) is recorded before setting the result.
372 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800373 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700374 if self.statsd and build.pipeline:
James E. Blair80ac1582017-10-09 07:02:40 -0700375 tenant = build.pipeline.layout.tenant
376 jobname = build.job.name.replace('.', '_').replace('/', '_')
377 hostname = (build.build_set.item.change.project.
378 canonical_hostname.replace('.', '_'))
379 projectname = (build.build_set.item.change.project.name.
380 replace('.', '_').replace('/', '_'))
381 branchname = (build.build_set.item.change.branch.
382 replace('.', '_').replace('/', '_'))
383 basekey = 'zuul.tenant.%s' % tenant.name
384 pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
385 # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
386 key = '%s.all_jobs' % pipekey
James E. Blair552b54f2016-07-22 13:55:32 -0700387 self.statsd.incr(key)
James E. Blair80ac1582017-10-09 07:02:40 -0700388 jobkey = '%s.project.%s.%s.%s.job.%s' % (
389 pipekey, hostname, projectname, branchname, jobname)
390 # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
391 # <host>.<project>.<branch>.job.<job>.<result>
392 key = '%s.%s' % (jobkey, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800393 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
394 dt = int((build.end_time - build.start_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700395 self.statsd.timing(key, dt)
396 self.statsd.incr(key)
James E. Blair80ac1582017-10-09 07:02:40 -0700397 # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
398 # <host>.<project>.<branch>.job.<job>.wait_time
Ian Wienand591afbe2017-10-31 09:27:36 +1100399 if build.start_time:
400 key = '%s.wait_time' % jobkey
401 dt = int((build.start_time - build.execute_time) * 1000)
402 self.statsd.timing(key, dt)
James E. Blair80ac1582017-10-09 07:02:40 -0700403 except Exception:
James E. Blair23ec1ba2013-01-04 18:06:10 -0800404 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800405 event = BuildCompletedEvent(build)
406 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700407 self.wake_event.set()
408
Tobias Henkel34ee0882017-07-31 22:26:12 +0200409 def onMergeCompleted(self, build_set, merged, updated,
James E. Blair1960d682017-04-28 15:44:14 -0700410 commit, files, repo_state):
Tobias Henkel34ee0882017-07-31 22:26:12 +0200411 event = MergeCompletedEvent(build_set, merged,
James E. Blair1960d682017-04-28 15:44:14 -0700412 updated, commit, files, repo_state)
James E. Blair4076e2b2014-01-28 12:42:20 -0800413 self.result_event_queue.put(event)
414 self.wake_event.set()
415
James E. Blair8d692392016-04-08 17:47:58 -0700416 def onNodesProvisioned(self, req):
James E. Blair8d692392016-04-08 17:47:58 -0700417 event = NodesProvisionedEvent(req)
418 self.result_event_queue.put(event)
419 self.wake_event.set()
420
James E. Blaira615c362017-10-02 17:34:42 -0700421 def reconfigureTenant(self, tenant, project):
422 self.log.debug("Submitting tenant reconfiguration event for "
423 "%s due to project %s", tenant.name, project)
424 event = TenantReconfigureEvent(tenant, project)
James E. Blair646322f2017-01-27 15:50:34 -0800425 self.management_event_queue.put(event)
426 self.wake_event.set()
427
James E. Blaire9d45c32012-05-31 09:56:45 -0700428 def reconfigure(self, config):
James E. Blaira615c362017-10-02 17:34:42 -0700429 self.log.debug("Submitting reconfiguration event")
James E. Blair468c8512013-12-06 13:27:19 -0800430 event = ReconfigureEvent(config)
431 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700432 self.wake_event.set()
433 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800434 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700435 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400436 self.last_reconfigured = int(time.time())
James E. Blair646322f2017-01-27 15:50:34 -0800437 # TODOv3(jeblair): reconfigure time should be per-tenant
James E. Blaire9d45c32012-05-31 09:56:45 -0700438
David Shrewsbury36b2adf2017-07-31 15:40:13 -0400439 def autohold(self, tenant_name, project_name, job_name, reason, count):
David Shrewsburyffab07a2017-07-24 12:45:07 -0400440 key = (tenant_name, project_name, job_name)
441 if count == 0 and key in self.autohold_requests:
442 self.log.debug("Removing autohold for %s", key)
443 del self.autohold_requests[key]
444 else:
445 self.log.debug("Autohold requested for %s", key)
David Shrewsbury36b2adf2017-07-31 15:40:13 -0400446 self.autohold_requests[key] = (count, reason)
David Shrewsburyffab07a2017-07-24 12:45:07 -0400447
Paul Belangerbaca3132016-11-04 12:49:54 -0400448 def promote(self, tenant_name, pipeline_name, change_ids):
449 event = PromoteEvent(tenant_name, pipeline_name, change_ids)
James E. Blair36658cf2013-12-06 17:53:48 -0800450 self.management_event_queue.put(event)
451 self.wake_event.set()
452 self.log.debug("Waiting for promotion")
453 event.wait()
454 self.log.debug("Promotion complete")
455
James E. Blaird27a96d2014-07-10 13:25:13 -0700456 def enqueue(self, trigger_event):
457 event = EnqueueEvent(trigger_event)
458 self.management_event_queue.put(event)
459 self.wake_event.set()
460 self.log.debug("Waiting for enqueue")
461 event.wait()
462 self.log.debug("Enqueue complete")
463
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700464 def exit(self):
465 self.log.debug("Prepare to exit")
466 self._pause = True
467 self._exit = True
468 self.wake_event.set()
469 self.log.debug("Waiting for exit")
470
471 def _get_queue_pickle_file(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100472 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000473 '/var/lib/zuul', expand_user=True)
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700474 return os.path.join(state_dir, 'queue.pickle')
475
James E. Blairce8a2132016-05-19 15:21:52 -0700476 def _get_time_database_dir(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100477 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000478 '/var/lib/zuul', expand_user=True)
James E. Blairce8a2132016-05-19 15:21:52 -0700479 d = os.path.join(state_dir, 'times')
480 if not os.path.exists(d):
481 os.mkdir(d)
482 return d
483
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000484 def _get_project_key_dir(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100485 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000486 '/var/lib/zuul', expand_user=True)
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000487 key_dir = os.path.join(state_dir, 'keys')
488 if not os.path.exists(key_dir):
489 os.mkdir(key_dir, 0o700)
490 st = os.stat(key_dir)
491 mode = st.st_mode & 0o777
492 if mode != 0o700:
493 raise Exception("Project key directory %s must be mode 0700; "
494 "current mode is %o" % (key_dir, mode))
495 return key_dir
496
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700497 def _save_queue(self):
498 pickle_file = self._get_queue_pickle_file()
499 events = []
500 while not self.trigger_event_queue.empty():
501 events.append(self.trigger_event_queue.get())
502 self.log.debug("Queue length is %s" % len(events))
503 if events:
504 self.log.debug("Saving queue")
505 pickle.dump(events, open(pickle_file, 'wb'))
506
507 def _load_queue(self):
508 pickle_file = self._get_queue_pickle_file()
509 if os.path.exists(pickle_file):
510 self.log.debug("Loading queue")
511 events = pickle.load(open(pickle_file, 'rb'))
512 self.log.debug("Queue length is %s" % len(events))
513 for event in events:
514 self.trigger_event_queue.put(event)
515 else:
516 self.log.debug("No queue file found")
517
518 def _delete_queue(self):
519 pickle_file = self._get_queue_pickle_file()
520 if os.path.exists(pickle_file):
521 self.log.debug("Deleting saved queue")
522 os.unlink(pickle_file)
523
524 def resume(self):
525 try:
526 self._load_queue()
David Shrewsbury70798ea2017-10-23 12:19:13 -0400527 except Exception:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700528 self.log.exception("Unable to load queue")
529 try:
530 self._delete_queue()
David Shrewsbury70798ea2017-10-23 12:19:13 -0400531 except Exception:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700532 self.log.exception("Unable to delete saved queue")
533 self.log.debug("Resuming queue processing")
534 self.wake_event.set()
535
536 def _doPauseEvent(self):
537 if self._exit:
538 self.log.debug("Exiting")
539 self._save_queue()
540 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700541
James E. Blair468c8512013-12-06 13:27:19 -0800542 def _doReconfigureEvent(self, event):
543 # This is called in the scheduler loop after another thread submits
544 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700545 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800546 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700547 try:
James E. Blaira615c362017-10-02 17:34:42 -0700548 self.log.debug("Full reconfiguration beginning")
James E. Blair83005782015-12-11 14:46:03 -0800549 loader = configloader.ConfigLoader()
550 abide = loader.loadConfig(
James E. Blair39840362017-06-23 20:34:02 +0100551 self.config.get('scheduler', 'tenant_config'),
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000552 self._get_project_key_dir(),
James E. Blair83005782015-12-11 14:46:03 -0800553 self, self.merger, self.connections)
James E. Blair59fdbac2015-12-07 17:08:06 -0800554 for tenant in abide.tenants.values():
555 self._reconfigureTenant(tenant)
556 self.abide = abide
James E. Blaircdccd972013-07-01 12:10:22 -0700557 finally:
558 self.layout_lock.release()
James E. Blaira615c362017-10-02 17:34:42 -0700559 self.log.debug("Full reconfiguration complete")
James E. Blaire9d45c32012-05-31 09:56:45 -0700560
James E. Blair646322f2017-01-27 15:50:34 -0800561 def _doTenantReconfigureEvent(self, event):
562 # This is called in the scheduler loop after another thread submits
563 # a request
564 self.layout_lock.acquire()
565 try:
James E. Blaira615c362017-10-02 17:34:42 -0700566 self.log.debug("Tenant reconfiguration beginning")
567 # If a change landed to a project, clear out the cached
568 # config before reconfiguring.
James E. Blair419a8672017-10-18 14:48:25 -0700569 for project in event.projects:
570 project.unparsed_config = None
571 old_tenant = self.abide.tenants[event.tenant_name]
James E. Blair646322f2017-01-27 15:50:34 -0800572 loader = configloader.ConfigLoader()
573 abide = loader.reloadTenant(
James E. Blair39840362017-06-23 20:34:02 +0100574 self.config.get('scheduler', 'tenant_config'),
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000575 self._get_project_key_dir(),
James E. Blair646322f2017-01-27 15:50:34 -0800576 self, self.merger, self.connections,
James E. Blair419a8672017-10-18 14:48:25 -0700577 self.abide, old_tenant)
578 tenant = abide.tenants[event.tenant_name]
James E. Blair646322f2017-01-27 15:50:34 -0800579 self._reconfigureTenant(tenant)
580 self.abide = abide
581 finally:
582 self.layout_lock.release()
James E. Blaira615c362017-10-02 17:34:42 -0700583 self.log.debug("Tenant reconfiguration complete")
James E. Blair646322f2017-01-27 15:50:34 -0800584
James E. Blairaa30de42017-04-25 10:56:59 -0700585 def _reenqueueGetProject(self, tenant, item):
586 project = item.change.project
James E. Blair6053de42017-04-05 11:27:11 -0700587 # Attempt to get the same project as the one passed in. If
588 # the project is now found on a different connection, return
589 # the new version of the project. If it is no longer
590 # available (due to a connection being removed), return None.
James E. Blairaa30de42017-04-25 10:56:59 -0700591 (trusted, new_project) = tenant.getProject(project.canonical_name)
James E. Blair6053de42017-04-05 11:27:11 -0700592 if new_project:
593 return new_project
James E. Blairaa30de42017-04-25 10:56:59 -0700594 # If this is a non-live item we may be looking at a
595 # "foreign" project, ie, one which is not defined in the
596 # config but is constructed ad-hoc to satisfy a
597 # cross-repo-dependency. Find the corresponding live item
598 # and use its source.
599 child = item
600 while child and not child.live:
601 # This assumes that the queue does not branch behind this
602 # item, which is currently true for non-live items; if
603 # that changes, this traversal will need to be more
604 # complex.
605 if child.items_behind:
606 child = child.items_behind[0]
607 else:
608 child = None
609 if child is item:
610 return None
611 if child and child.live:
612 (child_trusted, child_project) = tenant.getProject(
613 child.change.project.canonical_name)
614 if child_project:
615 source = child_project.source
616 new_project = source.getProject(project.name)
617 return new_project
618 return None
James E. Blair6053de42017-04-05 11:27:11 -0700619
James E. Blair552b54f2016-07-22 13:55:32 -0700620 def _reenqueueTenant(self, old_tenant, tenant):
James E. Blair59fdbac2015-12-07 17:08:06 -0800621 for name, new_pipeline in tenant.layout.pipelines.items():
622 old_pipeline = old_tenant.layout.pipelines.get(name)
623 if not old_pipeline:
624 self.log.warning("No old pipeline matching %s found "
625 "when reconfiguring" % name)
626 continue
627 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blair74be3812017-11-30 14:17:25 -0800628 # TODO(jeblair): This supports an undocument and
629 # unanticipated hack to create a static window. If we
630 # really want to support this, maybe we should have a
631 # 'static' type? But it may be in use in the wild, so we
632 # should allow this at least until there's an upgrade
633 # path.
634 if (new_pipeline.window and
635 new_pipeline.window_increase_type == 'exponential' and
636 new_pipeline.window_decrease_type == 'exponential' and
637 new_pipeline.window_increase_factor == 1 and
638 new_pipeline.window_decrease_factor == 1):
639 static_window = True
640 else:
641 static_window = False
642 if old_pipeline.window and (not static_window):
643 new_pipeline.window = max(old_pipeline.window,
644 new_pipeline.window_floor)
James E. Blair59fdbac2015-12-07 17:08:06 -0800645 items_to_remove = []
646 builds_to_cancel = []
647 last_head = None
648 for shared_queue in old_pipeline.queues:
James E. Blair74be3812017-11-30 14:17:25 -0800649 # Attempt to keep window sizes from shrinking where possible
650 new_queue = new_pipeline.getQueue(shared_queue.projects[0])
651 if new_queue and shared_queue.window and (not static_window):
652 new_queue.window = max(shared_queue.window,
653 new_queue.window_floor)
James E. Blair59fdbac2015-12-07 17:08:06 -0800654 for item in shared_queue.queue:
655 if not item.item_ahead:
656 last_head = item
James E. Blair59fdbac2015-12-07 17:08:06 -0800657 item.pipeline = None
658 item.queue = None
James E. Blair6053de42017-04-05 11:27:11 -0700659 item.change.project = self._reenqueueGetProject(
James E. Blairaa30de42017-04-25 10:56:59 -0700660 tenant, item)
James E. Blair872738f2017-10-31 16:40:36 -0700661 # If the old item ahead made it in, re-enqueue
662 # this one behind it.
663 if item.item_ahead in items_to_remove:
664 old_item_ahead = None
665 item_ahead_valid = False
666 else:
667 old_item_ahead = item.item_ahead
668 item_ahead_valid = True
James E. Blairaa30de42017-04-25 10:56:59 -0700669 item.item_ahead = None
670 item.items_behind = []
James E. Blair027ba992017-09-20 13:48:32 -0700671 reenqueued = False
672 if item.change.project:
673 try:
674 reenqueued = new_pipeline.manager.reEnqueueItem(
James E. Blair872738f2017-10-31 16:40:36 -0700675 item, last_head, old_item_ahead,
676 item_ahead_valid=item_ahead_valid)
James E. Blair027ba992017-09-20 13:48:32 -0700677 except Exception:
678 self.log.exception(
679 "Exception while re-enqueing item %s",
680 item)
681 if reenqueued:
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700682 for build in item.current_build_set.getBuilds():
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200683 new_job = item.getJob(build.job.name)
684 if new_job:
685 build.job = new_job
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700686 else:
687 item.removeBuild(build)
688 builds_to_cancel.append(build)
689 else:
James E. Blair59fdbac2015-12-07 17:08:06 -0800690 items_to_remove.append(item)
691 for item in items_to_remove:
James E. Blairb5a8f0b2017-07-07 17:01:18 -0700692 self.log.warning(
693 "Removing item %s during reconfiguration" % (item,))
James E. Blair59fdbac2015-12-07 17:08:06 -0800694 for build in item.current_build_set.getBuilds():
695 builds_to_cancel.append(build)
696 for build in builds_to_cancel:
697 self.log.warning(
698 "Canceling build %s during reconfiguration" % (build,))
699 try:
Paul Belanger174a8272017-03-14 13:20:10 -0400700 self.executor.cancel(build)
James E. Blair59fdbac2015-12-07 17:08:06 -0800701 except Exception:
702 self.log.exception(
703 "Exception while canceling build %s "
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100704 "for change %s" % (build, build.build_set.item.change))
James E. Blair8fd207b2017-11-30 13:44:38 -0800705 # In the unlikely case that a build is removed and
706 # later added back, make sure we clear out the nodeset
707 # so it gets requested again.
708 try:
709 build.build_set.removeJobNodeSet(build.job.name)
710 except Exception:
711 self.log.exception(
712 "Exception while removing nodeset from build %s "
713 "for change %s" % (build, build.build_set.item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100714 finally:
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100715 tenant.semaphore_handler.release(
716 build.build_set.item, build.job)
James E. Blair552b54f2016-07-22 13:55:32 -0700717
718 def _reconfigureTenant(self, tenant):
719 # This is called from _doReconfigureEvent while holding the
720 # layout lock
721 old_tenant = self.abide.tenants.get(tenant.name)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100722
James E. Blair552b54f2016-07-22 13:55:32 -0700723 if old_tenant:
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100724 # Copy over semaphore handler so we don't loose the currently
725 # held semaphores.
726 tenant.semaphore_handler = old_tenant.semaphore_handler
727
James E. Blair552b54f2016-07-22 13:55:32 -0700728 self._reenqueueTenant(old_tenant, tenant)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100729
James E. Blairb0a95ab2017-10-18 09:39:18 -0700730 # TODOv3(jeblair): update for tenants
731 # self.maintainConnectionCache()
James E. Blaire511d2f2016-12-08 15:22:26 -0800732 self.connections.reconfigureDrivers(tenant)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100733
James E. Blaire511d2f2016-12-08 15:22:26 -0800734 # TODOv3(jeblair): remove postconfig calls?
James E. Blair59fdbac2015-12-07 17:08:06 -0800735 for pipeline in tenant.layout.pipelines.values():
James E. Blair552b54f2016-07-22 13:55:32 -0700736 for trigger in pipeline.triggers:
737 trigger.postConfig(pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800738 for reporter in pipeline.actions:
739 reporter.postConfig()
Jesse Keating71a47ff2017-06-06 11:36:43 -0700740 self.tenant_last_reconfigured[tenant.name] = int(time.time())
James E. Blair552b54f2016-07-22 13:55:32 -0700741 if self.statsd:
James E. Blair59fdbac2015-12-07 17:08:06 -0800742 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700743 for pipeline in tenant.layout.pipelines.values():
James E. Blair59fdbac2015-12-07 17:08:06 -0800744 items = len(pipeline.getAllItems())
James E. Blair57bf14d2017-10-21 09:09:48 -0700745 # stats.gauges.zuul.tenant.<tenant>.pipeline.
746 # <pipeline>.current_changes
747 key = 'zuul.tenant.%s.pipeline.%s' % (
748 tenant.name, pipeline.name)
James E. Blair552b54f2016-07-22 13:55:32 -0700749 self.statsd.gauge(key + '.current_changes', items)
James E. Blair59fdbac2015-12-07 17:08:06 -0800750 except Exception:
751 self.log.exception("Exception reporting initial "
752 "pipeline stats:")
753
James E. Blair36658cf2013-12-06 17:53:48 -0800754 def _doPromoteEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400755 tenant = self.abide.tenants.get(event.tenant_name)
756 pipeline = tenant.layout.pipelines[event.pipeline_name]
James E. Blair36658cf2013-12-06 17:53:48 -0800757 change_ids = [c.split(',') for c in event.change_ids]
758 items_to_enqueue = []
759 change_queue = None
760 for shared_queue in pipeline.queues:
761 if change_queue:
762 break
763 for item in shared_queue.queue:
764 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000765 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800766 change_queue = shared_queue
767 break
768 if not change_queue:
769 raise Exception("Unable to find shared change queue for %s" %
770 event.change_ids[0])
771 for number, patchset in change_ids:
772 found = False
773 for item in change_queue.queue:
774 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000775 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800776 found = True
777 items_to_enqueue.append(item)
778 break
779 if not found:
780 raise Exception("Unable to find %s,%s in queue %s" %
781 (number, patchset, change_queue))
782 for item in change_queue.queue[:]:
783 if item not in items_to_enqueue:
784 items_to_enqueue.append(item)
785 pipeline.manager.cancelJobs(item)
786 pipeline.manager.dequeueItem(item)
787 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500788 pipeline.manager.addChange(
789 item.change,
790 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700791 quiet=True,
792 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800793
James E. Blaird27a96d2014-07-10 13:25:13 -0700794 def _doEnqueueEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400795 tenant = self.abide.tenants.get(event.tenant_name)
James E. Blair0ffa0102017-03-30 13:11:33 -0700796 (trusted, project) = tenant.getProject(event.project_name)
Paul Belangerbaca3132016-11-04 12:49:54 -0400797 pipeline = tenant.layout.pipelines[event.forced_pipeline]
James E. Blair6053de42017-04-05 11:27:11 -0700798 change = project.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700799 self.log.debug("Event %s for change %s was directly assigned "
800 "to pipeline %s" % (event, change, self))
James E. Blaird27a96d2014-07-10 13:25:13 -0700801 pipeline.manager.addChange(change, ignore_requirements=True)
802
James E. Blaire9d45c32012-05-31 09:56:45 -0700803 def _areAllBuildsComplete(self):
804 self.log.debug("Checking if all builds are complete")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700805 if self.merger.areMergesOutstanding():
806 self.log.debug("Waiting on merger")
807 return False
James E. Blaire9d45c32012-05-31 09:56:45 -0700808 waiting = False
Paul Belangerdebd7a72016-11-11 19:56:15 -0500809 for tenant in self.abide.tenants.values():
810 for pipeline in tenant.layout.pipelines.values():
811 for item in pipeline.getAllItems():
812 for build in item.current_build_set.getBuilds():
813 if build.result is None:
814 self.log.debug("%s waiting on %s" %
815 (pipeline.manager, build))
816 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700817 if not waiting:
818 self.log.debug("All builds are complete")
819 return True
James E. Blaire9d45c32012-05-31 09:56:45 -0700820 return False
821
James E. Blairee743612012-05-29 14:49:32 -0700822 def run(self):
James E. Blair552b54f2016-07-22 13:55:32 -0700823 if self.statsd:
James E. Blair71e94122012-12-24 17:53:08 -0800824 self.log.debug("Statsd enabled")
825 else:
Tobias Henkel8f802982017-12-18 19:32:39 +0100826 self.log.debug("Statsd not configured")
James E. Blairee743612012-05-29 14:49:32 -0700827 while True:
828 self.log.debug("Run handler sleeping")
829 self.wake_event.wait()
830 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700831 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800832 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700833 return
James E. Blairee743612012-05-29 14:49:32 -0700834 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800835 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700836 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800837 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800838 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700839
James E. Blair263fba92013-02-27 13:07:19 -0800840 # Give result events priority -- they let us stop builds,
Paul Belanger174a8272017-03-14 13:20:10 -0400841 # whereas trigger events cause us to execute builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800842 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700843 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800844
845 if not self._pause:
846 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800847 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700848
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700849 if self._pause and self._areAllBuildsComplete():
850 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700851
James E. Blair59fdbac2015-12-07 17:08:06 -0800852 for tenant in self.abide.tenants.values():
853 for pipeline in tenant.layout.pipelines.values():
854 while pipeline.manager.processQueue():
855 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700856
James E. Blaira84f0e42014-02-06 07:09:22 -0800857 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700858 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800859 # There may still be more events to process
860 self.wake_event.set()
861 finally:
862 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700863
James E. Blairb0a95ab2017-10-18 09:39:18 -0700864 def maintainConnectionCache(self):
865 # TODOv3(jeblair): update for tenants
866 relevant = set()
867 for tenant in self.abide.tenants.values():
868 for pipeline in tenant.layout.pipelines.values():
869 self.log.debug("Gather relevant cache items for: %s" %
870 pipeline)
871
872 for item in pipeline.getAllItems():
873 relevant.add(item.change)
874 relevant.update(item.change.getRelatedChanges())
875 for connection in self.connections.values():
876 connection.maintainCache(relevant)
877 self.log.debug(
878 "End maintain connection cache for: %s" % connection)
879 self.log.debug("Connection cache size: %s" % len(relevant))
880
James E. Blairee743612012-05-29 14:49:32 -0700881 def process_event_queue(self):
882 self.log.debug("Fetching trigger event")
883 event = self.trigger_event_queue.get()
884 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800885 try:
James E. Blairaa30de42017-04-25 10:56:59 -0700886 full_project_name = ('/'.join([event.project_hostname,
887 event.project_name]))
James E. Blair59fdbac2015-12-07 17:08:06 -0800888 for tenant in self.abide.tenants.values():
James E. Blairaa30de42017-04-25 10:56:59 -0700889 (trusted, project) = tenant.getProject(full_project_name)
890 if project is None:
891 continue
892 try:
893 change = project.source.getChange(event)
894 except exceptions.ChangeNotFound as e:
895 self.log.debug("Unable to get change %s from "
896 "source %s",
897 e.change, project.source)
898 continue
James E. Blair72facdc2017-08-17 10:29:12 -0700899 if ((event.branch_updated and
900 hasattr(change, 'files') and
901 change.updatesConfig()) or
902 event.branch_created or
903 event.branch_deleted):
904 # The change that just landed updates the config
905 # or a branch was just created or deleted. Clear
906 # out cached data for this project and perform a
907 # reconfiguration.
James E. Blaira615c362017-10-02 17:34:42 -0700908 self.reconfigureTenant(tenant, change.project)
James E. Blair59fdbac2015-12-07 17:08:06 -0800909 for pipeline in tenant.layout.pipelines.values():
Jan Hruban324ca5b2015-11-05 19:28:54 +0100910 if event.isPatchsetCreated():
James E. Blair59fdbac2015-12-07 17:08:06 -0800911 pipeline.manager.removeOldVersionsOfChange(change)
Jan Hruban324ca5b2015-11-05 19:28:54 +0100912 elif event.isChangeAbandoned():
James E. Blair59fdbac2015-12-07 17:08:06 -0800913 pipeline.manager.removeAbandonedChange(change)
914 if pipeline.manager.eventMatches(event, change):
James E. Blair59fdbac2015-12-07 17:08:06 -0800915 pipeline.manager.addChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800916 finally:
James E. Blairff791972013-01-09 11:45:43 -0800917 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700918
James E. Blair468c8512013-12-06 13:27:19 -0800919 def process_management_queue(self):
920 self.log.debug("Fetching management event")
921 event = self.management_event_queue.get()
922 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800923 try:
924 if isinstance(event, ReconfigureEvent):
925 self._doReconfigureEvent(event)
James E. Blair21603e62017-02-20 16:23:05 -0500926 elif isinstance(event, TenantReconfigureEvent):
James E. Blair646322f2017-01-27 15:50:34 -0800927 self._doTenantReconfigureEvent(event)
James E. Blair36658cf2013-12-06 17:53:48 -0800928 elif isinstance(event, PromoteEvent):
929 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700930 elif isinstance(event, EnqueueEvent):
931 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800932 else:
933 self.log.error("Unable to handle event %s" % event)
934 event.done()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700935 except Exception:
James E. Blair59424ea2017-07-11 09:52:58 -0700936 self.log.exception("Exception in management event:")
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700937 event.exception(sys.exc_info())
James E. Blair468c8512013-12-06 13:27:19 -0800938 self.management_event_queue.task_done()
939
James E. Blairee743612012-05-29 14:49:32 -0700940 def process_result_queue(self):
941 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800942 event = self.result_event_queue.get()
943 self.log.debug("Processing result event %s" % event)
944 try:
945 if isinstance(event, BuildStartedEvent):
946 self._doBuildStartedEvent(event)
947 elif isinstance(event, BuildCompletedEvent):
948 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800949 elif isinstance(event, MergeCompletedEvent):
950 self._doMergeCompletedEvent(event)
James E. Blair8d692392016-04-08 17:47:58 -0700951 elif isinstance(event, NodesProvisionedEvent):
952 self._doNodesProvisionedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800953 else:
954 self.log.error("Unable to handle event %s" % event)
955 finally:
956 self.result_event_queue.task_done()
957
958 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800959 build = event.build
960 if build.build_set is not build.build_set.item.current_build_set:
961 self.log.warning("Build %s is not in the current build set" %
962 (build,))
963 return
964 pipeline = build.build_set.item.pipeline
965 if not pipeline:
966 self.log.warning("Build %s is not associated with a pipeline" %
967 (build,))
968 return
James E. Blairce8a2132016-05-19 15:21:52 -0700969 try:
970 build.estimated_time = float(self.time_database.getEstimatedTime(
James E. Blairae0f23c2017-09-13 10:55:15 -0600971 build))
James E. Blairce8a2132016-05-19 15:21:52 -0700972 except Exception:
973 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800974 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800975
976 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800977 build = event.build
James E. Blaire18d4602017-01-05 11:17:28 -0800978
979 # Regardless of any other conditions which might cause us not
980 # to pass this on to the pipeline manager, make sure we return
981 # the nodes to nodepool.
982 try:
James E. Blair8fd207b2017-11-30 13:44:38 -0800983 nodeset = build.nodeset
David Shrewsburyffab07a2017-07-24 12:45:07 -0400984 autohold_key = (build.pipeline.layout.tenant.name,
985 build.build_set.item.change.project.canonical_name,
986 build.job.name)
Clark Boylanaeb5a122017-10-20 14:00:11 -0700987 if (build.result == "FAILURE" and
988 autohold_key in self.autohold_requests):
989 # We explicitly only want to hold nodes for jobs if they have
990 # failed and have an autohold request.
991 try:
992 self.nodepool.holdNodeSet(nodeset, autohold_key)
993 except Exception:
994 self.log.exception("Unable to process autohold for %s:",
995 autohold_key)
996 if autohold_key in self.autohold_requests:
997 self.log.debug("Removing autohold %s due to exception",
998 autohold_key)
999 del self.autohold_requests[autohold_key]
David Shrewsburyffab07a2017-07-24 12:45:07 -04001000
James E. Blair1511bc32017-01-18 09:25:31 -08001001 self.nodepool.returnNodeSet(nodeset)
James E. Blaire18d4602017-01-05 11:17:28 -08001002 except Exception:
1003 self.log.exception("Unable to return nodeset %s" % (nodeset,))
1004
James E. Blair4076e2b2014-01-28 12:42:20 -08001005 if build.build_set is not build.build_set.item.current_build_set:
James E. Blaire18d4602017-01-05 11:17:28 -08001006 self.log.debug("Build %s is not in the current build set" %
1007 (build,))
James E. Blair4076e2b2014-01-28 12:42:20 -08001008 return
1009 pipeline = build.build_set.item.pipeline
1010 if not pipeline:
1011 self.log.warning("Build %s is not associated with a pipeline" %
1012 (build,))
1013 return
James E. Blairce8a2132016-05-19 15:21:52 -07001014 if build.end_time and build.start_time and build.result:
1015 duration = build.end_time - build.start_time
Paul Belanger87e4ab02016-06-08 14:17:20 -04001016 try:
James E. Blairae0f23c2017-09-13 10:55:15 -06001017 self.time_database.update(build, duration, build.result)
Paul Belanger87e4ab02016-06-08 14:17:20 -04001018 except Exception:
1019 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -08001020 pipeline.manager.onBuildCompleted(event.build)
1021
1022 def _doMergeCompletedEvent(self, event):
1023 build_set = event.build_set
1024 if build_set is not build_set.item.current_build_set:
1025 self.log.warning("Build set %s is not current" % (build_set,))
1026 return
1027 pipeline = build_set.item.pipeline
1028 if not pipeline:
1029 self.log.warning("Build set %s is not associated with a pipeline" %
1030 (build_set,))
1031 return
1032 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -07001033
James E. Blair8d692392016-04-08 17:47:58 -07001034 def _doNodesProvisionedEvent(self, event):
1035 request = event.request
David Shrewsbury94e95882017-10-04 15:26:04 -04001036 request_id = event.request_id
James E. Blair8d692392016-04-08 17:47:58 -07001037 build_set = request.build_set
James E. Blaira38c28e2017-01-04 10:33:20 -08001038
David Shrewsbury94e95882017-10-04 15:26:04 -04001039 self.nodepool.acceptNodes(request, request_id)
James E. Blaircbbce0d2017-05-19 07:28:29 -07001040 if request.canceled:
1041 return
James E. Blaira38c28e2017-01-04 10:33:20 -08001042
James E. Blair8d692392016-04-08 17:47:58 -07001043 if build_set is not build_set.item.current_build_set:
1044 self.log.warning("Build set %s is not current" % (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -08001045 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -08001046 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -07001047 return
1048 pipeline = build_set.item.pipeline
1049 if not pipeline:
1050 self.log.warning("Build set %s is not associated with a pipeline" %
1051 (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -08001052 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -08001053 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -07001054 return
1055 pipeline.manager.onNodesProvisioned(event)
1056
Paul Belanger6349d152016-10-30 16:21:17 -04001057 def formatStatusJSON(self, tenant_name):
James E. Blair59fdbac2015-12-07 17:08:06 -08001058 # TODOv3(jeblair): use tenants
James E. Blair8dbd56a2012-12-22 10:55:10 -08001059 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001060
1061 data['zuul_version'] = self.zuul_version
Tobias Henkelb4407fc2017-07-07 13:52:56 +02001062 websocket_url = get_default(self.config, 'web', 'websocket_url', None)
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001063
James E. Blair8dbd56a2012-12-22 10:55:10 -08001064 if self._pause:
1065 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -08001066 if self._exit:
1067 ret += 'exit'
1068 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
1069 ret += '</p>'
1070 data['message'] = ret
1071
James E. Blairfb682cc2013-02-26 15:23:27 -08001072 data['trigger_event_queue'] = {}
1073 data['trigger_event_queue']['length'] = \
1074 self.trigger_event_queue.qsize()
1075 data['result_event_queue'] = {}
1076 data['result_event_queue']['length'] = \
1077 self.result_event_queue.qsize()
James E. Blair3692b612017-10-18 13:59:41 -07001078 data['management_event_queue'] = {}
1079 data['management_event_queue']['length'] = \
1080 self.management_event_queue.qsize()
James E. Blairfb682cc2013-02-26 15:23:27 -08001081
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +04001082 if self.last_reconfigured:
1083 data['last_reconfigured'] = self.last_reconfigured * 1000
1084
James E. Blair8dbd56a2012-12-22 10:55:10 -08001085 pipelines = []
1086 data['pipelines'] = pipelines
Paul Belanger6349d152016-10-30 16:21:17 -04001087 tenant = self.abide.tenants.get(tenant_name)
1088 for pipeline in tenant.layout.pipelines.values():
Tobias Henkelb4407fc2017-07-07 13:52:56 +02001089 pipelines.append(pipeline.formatStatusJSON(websocket_url))
James E. Blair8dbd56a2012-12-22 10:55:10 -08001090 return json.dumps(data)
James E. Blair0e4c7912018-01-02 14:20:17 -08001091
1092 def onChangeUpdated(self, change):
1093 """Remove stale dependency references on change update.
1094
1095 When a change is updated with a new patchset, other changes in
1096 the system may still have a reference to the old patchset in
1097 their dependencies. Search for those (across all sources) and
1098 mark that their dependencies are out of date. This will cause
1099 them to be refreshed the next time the queue processor
1100 examines them.
1101 """
1102
1103 self.log.debug("Change %s has been updated, clearing dependent "
1104 "change caches", change)
1105 for source in self.connections.getSources():
1106 for other_change in source.getCachedChanges():
1107 if other_change.commit_needs_changes is None:
1108 continue
1109 for dep in other_change.commit_needs_changes:
1110 if change.isUpdateOf(dep):
1111 other_change.refresh_deps = True
1112 change.refresh_deps = True