blob: 2130ede8eb9e757d078ea823fb5e12f76beb4c97 [file] [log] [blame]
James E. Blairdbfe1cd2015-02-07 11:41:19 -08001# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
James E. Blair47958382013-01-10 17:26:02 -08002# Copyright 2013 OpenStack Foundation
Antoine Musso80edd5a2013-02-13 15:37:53 +01003# Copyright 2013 Antoine "hashar" Musso
4# Copyright 2013 Wikimedia Foundation Inc.
James E. Blairee743612012-05-29 14:49:32 -07005#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
James E. Blair8dbd56a2012-12-22 10:55:10 -080018import json
James E. Blairee743612012-05-29 14:49:32 -070019import logging
Zhongyue Luo1c860d72012-07-19 11:03:56 +080020import os
James E. Blair5d5bc2b2012-07-06 10:24:01 -070021import pickle
Monty Taylorb934c1a2017-06-16 19:31:47 -050022import queue
James E. Blair8b2a1472017-02-19 15:33:55 -080023import socket
James E. Blair36658cf2013-12-06 17:53:48 -080024import sys
Zhongyue Luo1c860d72012-07-19 11:03:56 +080025import threading
James E. Blair71e94122012-12-24 17:53:08 -080026import time
James E. Blairee743612012-05-29 14:49:32 -070027
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +100028from zuul import configloader
Morgan Fainberg9c4700a2016-05-30 14:25:19 -070029from zuul import model
James E. Blair83005782015-12-11 14:46:03 -080030from zuul import exceptions
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +040031from zuul import version as zuul_version
James E. Blairbdd50e62017-10-21 08:18:55 -070032from zuul import rpclistener
Paul Belanger40d3ce62017-11-28 11:49:55 -050033from zuul.lib import commandsocket
Tristan Cacqueray91601d72017-06-15 06:00:12 +000034from zuul.lib.config import get_default
James E. Blairded241e2017-10-10 13:22:40 -070035from zuul.lib.statsd import get_statsd
James E. Blair419a8672017-10-18 14:48:25 -070036import zuul.lib.queue
James E. Blairee743612012-05-29 14:49:32 -070037
Paul Belanger40d3ce62017-11-28 11:49:55 -050038COMMANDS = ['stop']
39
James E. Blair1e8dd892012-05-30 09:15:05 -070040
James E. Blair468c8512013-12-06 13:27:19 -080041class ManagementEvent(object):
42 """An event that should be processed within the main queue run loop"""
43 def __init__(self):
44 self._wait_event = threading.Event()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070045 self._exc_info = None
James E. Blair468c8512013-12-06 13:27:19 -080046
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070047 def exception(self, exc_info):
48 self._exc_info = exc_info
James E. Blair36658cf2013-12-06 17:53:48 -080049 self._wait_event.set()
50
51 def done(self):
James E. Blair468c8512013-12-06 13:27:19 -080052 self._wait_event.set()
53
54 def wait(self, timeout=None):
55 self._wait_event.wait(timeout)
Morgan Fainberg1b9bd782016-05-30 14:03:30 -070056 if self._exc_info:
Thomas Bechtold7f68ec42017-06-30 14:24:52 +020057 # sys.exc_info returns (type, value, traceback)
58 type_, exception_instance, traceback = self._exc_info
59 raise exception_instance.with_traceback(traceback)
James E. Blair468c8512013-12-06 13:27:19 -080060 return self._wait_event.is_set()
61
62
63class ReconfigureEvent(ManagementEvent):
64 """Reconfigure the scheduler. The layout will be (re-)loaded from
65 the path specified in the configuration.
66
67 :arg ConfigParser config: the new configuration
68 """
69 def __init__(self, config):
70 super(ReconfigureEvent, self).__init__()
71 self.config = config
72
73
James E. Blair646322f2017-01-27 15:50:34 -080074class TenantReconfigureEvent(ManagementEvent):
75 """Reconfigure the given tenant. The layout will be (re-)loaded from
76 the path specified in the configuration.
77
78 :arg Tenant tenant: the tenant to reconfigure
James E. Blaira615c362017-10-02 17:34:42 -070079 :arg Project project: if supplied, clear the cached configuration
80 from this project first
James E. Blair646322f2017-01-27 15:50:34 -080081 """
James E. Blaira615c362017-10-02 17:34:42 -070082 def __init__(self, tenant, project):
James E. Blair646322f2017-01-27 15:50:34 -080083 super(TenantReconfigureEvent, self).__init__()
James E. Blair419a8672017-10-18 14:48:25 -070084 self.tenant_name = tenant.name
85 self.projects = set([project])
86
87 def __ne__(self, other):
88 return not self.__eq__(other)
89
90 def __eq__(self, other):
91 if not isinstance(other, TenantReconfigureEvent):
92 return False
93 # We don't check projects because they will get combined when
94 # merged.
95 return (self.tenant_name == other.tenant_name)
96
97 def merge(self, other):
98 if self.tenant_name != other.tenant_name:
99 raise Exception("Can not merge events from different tenants")
100 self.projects |= other.projects
James E. Blair646322f2017-01-27 15:50:34 -0800101
102
James E. Blair36658cf2013-12-06 17:53:48 -0800103class PromoteEvent(ManagementEvent):
104 """Promote one or more changes to the head of the queue.
105
Paul Belangerbaca3132016-11-04 12:49:54 -0400106 :arg str tenant_name: the name of the tenant
James E. Blair36658cf2013-12-06 17:53:48 -0800107 :arg str pipeline_name: the name of the pipeline
108 :arg list change_ids: a list of strings of change ids in the form
109 1234,1
110 """
111
Paul Belangerbaca3132016-11-04 12:49:54 -0400112 def __init__(self, tenant_name, pipeline_name, change_ids):
James E. Blair36658cf2013-12-06 17:53:48 -0800113 super(PromoteEvent, self).__init__()
Paul Belangerbaca3132016-11-04 12:49:54 -0400114 self.tenant_name = tenant_name
James E. Blair36658cf2013-12-06 17:53:48 -0800115 self.pipeline_name = pipeline_name
116 self.change_ids = change_ids
117
118
James E. Blaird27a96d2014-07-10 13:25:13 -0700119class EnqueueEvent(ManagementEvent):
120 """Enqueue a change into a pipeline
121
122 :arg TriggerEvent trigger_event: a TriggerEvent describing the
123 trigger, pipeline, and change to enqueue
124 """
125
126 def __init__(self, trigger_event):
127 super(EnqueueEvent, self).__init__()
128 self.trigger_event = trigger_event
129
130
James E. Blaira84f0e42014-02-06 07:09:22 -0800131class ResultEvent(object):
132 """An event that needs to modify the pipeline state due to a
133 result from an external system."""
134
135 pass
136
137
138class BuildStartedEvent(ResultEvent):
139 """A build has started.
140
141 :arg Build build: The build which has started.
142 """
143
144 def __init__(self, build):
145 self.build = build
146
147
148class BuildCompletedEvent(ResultEvent):
149 """A build has completed
150
151 :arg Build build: The build which has completed.
152 """
153
154 def __init__(self, build):
155 self.build = build
156
157
James E. Blair4076e2b2014-01-28 12:42:20 -0800158class MergeCompletedEvent(ResultEvent):
159 """A remote merge operation has completed
160
161 :arg BuildSet build_set: The build_set which is ready.
James E. Blair4076e2b2014-01-28 12:42:20 -0800162 :arg bool merged: Whether the merge succeeded (changes with refs).
163 :arg bool updated: Whether the repo was updated (changes without refs).
164 :arg str commit: The SHA of the merged commit (changes with refs).
James E. Blair1960d682017-04-28 15:44:14 -0700165 :arg dict repo_state: The starting repo state before the merge.
James E. Blair4076e2b2014-01-28 12:42:20 -0800166 """
167
Tobias Henkel34ee0882017-07-31 22:26:12 +0200168 def __init__(self, build_set, merged, updated, commit,
James E. Blair1960d682017-04-28 15:44:14 -0700169 files, repo_state):
James E. Blair4076e2b2014-01-28 12:42:20 -0800170 self.build_set = build_set
James E. Blair4076e2b2014-01-28 12:42:20 -0800171 self.merged = merged
172 self.updated = updated
173 self.commit = commit
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700174 self.files = files
James E. Blair1960d682017-04-28 15:44:14 -0700175 self.repo_state = repo_state
James E. Blair4076e2b2014-01-28 12:42:20 -0800176
177
James E. Blair8d692392016-04-08 17:47:58 -0700178class NodesProvisionedEvent(ResultEvent):
179 """Nodes have been provisioned for a build_set
180
181 :arg BuildSet build_set: The build_set which has nodes.
182 :arg list of Node objects nodes: The provisioned nodes
183 """
184
185 def __init__(self, request):
186 self.request = request
David Shrewsbury94e95882017-10-04 15:26:04 -0400187 self.request_id = request.id
James E. Blair8d692392016-04-08 17:47:58 -0700188
189
James E. Blaire9d45c32012-05-31 09:56:45 -0700190class Scheduler(threading.Thread):
James E. Blaire4de4f42017-01-19 10:35:24 -0800191 """The engine of Zuul.
192
193 The Scheduler is reponsible for recieving events and dispatching
194 them to appropriate components (including pipeline managers,
Paul Belanger174a8272017-03-14 13:20:10 -0400195 mergers and executors).
James E. Blaire4de4f42017-01-19 10:35:24 -0800196
197 It runs a single threaded main loop which processes events
198 received one at a time and takes action as appropriate. Other
199 parts of Zuul may run in their own thread, but synchronization is
200 performed within the scheduler to reduce or eliminate the need for
201 locking in most circumstances.
202
203 The main daemon will have one instance of the Scheduler class
204 running which will persist for the life of the process. The
205 Scheduler instance is supplied to other Zuul components so that
206 they can submit events or otherwise communicate with other
207 components.
208
209 """
210
James E. Blairee743612012-05-29 14:49:32 -0700211 log = logging.getLogger("zuul.Scheduler")
James E. Blair4dd5f4b2017-10-23 07:44:08 -0700212 _stats_interval = 30
James E. Blairee743612012-05-29 14:49:32 -0700213
James E. Blaire4d229c2016-05-25 15:25:41 -0700214 def __init__(self, config, testonly=False):
James E. Blaire9d45c32012-05-31 09:56:45 -0700215 threading.Thread.__init__(self)
James E. Blair8a6f0c22013-07-01 12:31:34 -0400216 self.daemon = True
James E. Blairce1bf1f2018-02-03 17:34:30 -0800217 self.hostname = socket.getfqdn()
James E. Blairee743612012-05-29 14:49:32 -0700218 self.wake_event = threading.Event()
James E. Blaircdccd972013-07-01 12:10:22 -0700219 self.layout_lock = threading.Lock()
James E. Blaira84f0e42014-02-06 07:09:22 -0800220 self.run_handler_lock = threading.Lock()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500221 self.command_map = dict(
222 stop=self.stop,
223 )
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700224 self._pause = False
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700225 self._exit = False
James E. Blairb0fcae42012-07-17 11:12:10 -0700226 self._stopped = False
Paul Belanger174a8272017-03-14 13:20:10 -0400227 self.executor = None
James E. Blair4076e2b2014-01-28 12:42:20 -0800228 self.merger = None
James E. Blair83005782015-12-11 14:46:03 -0800229 self.connections = None
James E. Blairded241e2017-10-10 13:22:40 -0700230 self.statsd = get_statsd(config)
James E. Blairbdd50e62017-10-21 08:18:55 -0700231 self.rpc = rpclistener.RPCListener(config, self)
232 self.stats_thread = threading.Thread(target=self.runStats)
233 self.stats_stop = threading.Event()
James E. Blair83005782015-12-11 14:46:03 -0800234 # TODO(jeblair): fix this
Joshua Hesketh352264b2015-08-11 23:42:08 +1000235 # Despite triggers being part of the pipeline, there is one trigger set
236 # per scheduler. The pipeline handles the trigger filters but since
237 # the events are handled by the scheduler itself it needs to handle
238 # the loading of the triggers.
239 # self.triggers['connection_name'] = triggerObject
James E. Blair6c358e72013-07-29 17:06:47 -0700240 self.triggers = dict()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000241 self.config = config
James E. Blairee743612012-05-29 14:49:32 -0700242
Monty Taylorb934c1a2017-06-16 19:31:47 -0500243 self.trigger_event_queue = queue.Queue()
244 self.result_event_queue = queue.Queue()
James E. Blair419a8672017-10-18 14:48:25 -0700245 self.management_event_queue = zuul.lib.queue.MergedQueue()
James E. Blair59fdbac2015-12-07 17:08:06 -0800246 self.abide = model.Abide()
James E. Blairee743612012-05-29 14:49:32 -0700247
James E. Blaire4d229c2016-05-25 15:25:41 -0700248 if not testonly:
249 time_dir = self._get_time_database_dir()
250 self.time_database = model.TimeDataBase(time_dir)
James E. Blairce8a2132016-05-19 15:21:52 -0700251
Paul Belanger40d3ce62017-11-28 11:49:55 -0500252 command_socket = get_default(
253 self.config, 'scheduler', 'command_socket',
254 '/var/lib/zuul/scheduler.socket')
255 self.command_socket = commandsocket.CommandSocket(command_socket)
256
Jeremy Stanley98b38de2015-06-04 21:20:43 +0000257 self.zuul_version = zuul_version.version_info.release_string()
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400258 self.last_reconfigured = None
Jesse Keating71a47ff2017-06-06 11:36:43 -0700259 self.tenant_last_reconfigured = {}
David Shrewsburyffab07a2017-07-24 12:45:07 -0400260 self.autohold_requests = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +0400261
James E. Blairbdd50e62017-10-21 08:18:55 -0700262 def start(self):
263 super(Scheduler, self).start()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500264 self._command_running = True
265 self.log.debug("Starting command processor")
266 self.command_socket.start()
267 self.command_thread = threading.Thread(target=self.runCommand,
268 name='command')
269 self.command_thread.daemon = True
270 self.command_thread.start()
271
James E. Blairbdd50e62017-10-21 08:18:55 -0700272 self.rpc.start()
273 self.stats_thread.start()
274
James E. Blairb0fcae42012-07-17 11:12:10 -0700275 def stop(self):
276 self._stopped = True
James E. Blairbdd50e62017-10-21 08:18:55 -0700277 self.stats_stop.set()
Joshua Hesketh352264b2015-08-11 23:42:08 +1000278 self.stopConnections()
James E. Blairb0fcae42012-07-17 11:12:10 -0700279 self.wake_event.set()
James E. Blairbdd50e62017-10-21 08:18:55 -0700280 self.stats_thread.join()
281 self.rpc.stop()
282 self.rpc.join()
Paul Belanger40d3ce62017-11-28 11:49:55 -0500283 self._command_running = False
284 self.command_socket.stop()
285
286 def runCommand(self):
287 while self._command_running:
288 try:
289 command = self.command_socket.get().decode('utf8')
290 if command != '_stop':
291 self.command_map[command]()
292 except Exception:
293 self.log.exception("Exception while processing command")
James E. Blairb0fcae42012-07-17 11:12:10 -0700294
Jesse Keating80730e62017-09-14 15:35:11 -0600295 def registerConnections(self, connections, load=True):
Joshua Hesketh9a256752016-04-04 13:38:51 +1000296 # load: whether or not to trigger the onLoad for the connection. This
297 # is useful for not doing a full load during layout validation.
Joshua Hesketh352264b2015-08-11 23:42:08 +1000298 self.connections = connections
Joshua Hesketh0aa7e8b2016-07-14 00:12:25 +1000299 self.connections.registerScheduler(self, load)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000300
301 def stopConnections(self):
James E. Blair83005782015-12-11 14:46:03 -0800302 self.connections.stop()
James E. Blair14abdf42015-12-09 16:11:53 -0800303
Paul Belanger174a8272017-03-14 13:20:10 -0400304 def setExecutor(self, executor):
305 self.executor = executor
James E. Blairee743612012-05-29 14:49:32 -0700306
James E. Blair4076e2b2014-01-28 12:42:20 -0800307 def setMerger(self, merger):
308 self.merger = merger
309
James E. Blair8d692392016-04-08 17:47:58 -0700310 def setNodepool(self, nodepool):
311 self.nodepool = nodepool
312
James E. Blairdce6cea2016-12-20 16:45:32 -0800313 def setZooKeeper(self, zk):
314 self.zk = zk
315
James E. Blairbdd50e62017-10-21 08:18:55 -0700316 def runStats(self):
317 while not self.stats_stop.wait(self._stats_interval):
318 try:
319 self._runStats()
320 except Exception:
321 self.log.exception("Error in periodic stats:")
322
323 def _runStats(self):
324 if not self.statsd:
325 return
326 functions = self.rpc.getFunctions()
327 executors_accepting = 0
328 executors_online = 0
329 execute_queue = 0
330 execute_running = 0
331 mergers_online = 0
332 merge_queue = 0
333 merge_running = 0
334 for (name, (queued, running, registered)) in functions.items():
335 if name == 'executor:execute':
336 executors_accepting = registered
337 execute_queue = queued - running
338 execute_running = running
339 if name.startswith('executor:stop'):
340 executors_online += registered
341 if name == 'merger:merge':
342 mergers_online = registered
343 if name.startswith('merger:'):
344 merge_queue += queued - running
345 merge_running += running
346 self.statsd.gauge('zuul.mergers.online', mergers_online)
347 self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
348 self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
349 self.statsd.gauge('zuul.executors.online', executors_online)
350 self.statsd.gauge('zuul.executors.accepting', executors_accepting)
351 self.statsd.gauge('zuul.executors.jobs_running', execute_running)
352 self.statsd.gauge('zuul.executors.jobs_queued', execute_queue)
353
James E. Blairee743612012-05-29 14:49:32 -0700354 def addEvent(self, event):
James E. Blairee743612012-05-29 14:49:32 -0700355 self.trigger_event_queue.put(event)
356 self.wake_event.set()
357
James E. Blair11700c32012-07-05 17:50:05 -0700358 def onBuildStarted(self, build):
James E. Blair71e94122012-12-24 17:53:08 -0800359 build.start_time = time.time()
James E. Blaira84f0e42014-02-06 07:09:22 -0800360 event = BuildStartedEvent(build)
361 self.result_event_queue.put(event)
James E. Blair11700c32012-07-05 17:50:05 -0700362 self.wake_event.set()
363
James E. Blair196f61a2017-06-30 15:42:29 -0700364 def onBuildCompleted(self, build, result, result_data):
James E. Blair71e94122012-12-24 17:53:08 -0800365 build.end_time = time.time()
James E. Blair196f61a2017-06-30 15:42:29 -0700366 build.result_data = result_data
James E. Blairf0358662015-07-20 15:19:12 -0700367 # Note, as soon as the result is set, other threads may act
368 # upon this, even though the event hasn't been fully
369 # processed. Ensure that any other data from the event (eg,
370 # timing) is recorded before setting the result.
371 build.result = result
James E. Blair23ec1ba2013-01-04 18:06:10 -0800372 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700373 if self.statsd and build.pipeline:
James E. Blair80ac1582017-10-09 07:02:40 -0700374 tenant = build.pipeline.layout.tenant
375 jobname = build.job.name.replace('.', '_').replace('/', '_')
376 hostname = (build.build_set.item.change.project.
377 canonical_hostname.replace('.', '_'))
378 projectname = (build.build_set.item.change.project.name.
379 replace('.', '_').replace('/', '_'))
380 branchname = (build.build_set.item.change.branch.
381 replace('.', '_').replace('/', '_'))
382 basekey = 'zuul.tenant.%s' % tenant.name
383 pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
384 # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
385 key = '%s.all_jobs' % pipekey
James E. Blair552b54f2016-07-22 13:55:32 -0700386 self.statsd.incr(key)
James E. Blair80ac1582017-10-09 07:02:40 -0700387 jobkey = '%s.project.%s.%s.%s.job.%s' % (
388 pipekey, hostname, projectname, branchname, jobname)
389 # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
390 # <host>.<project>.<branch>.job.<job>.<result>
391 key = '%s.%s' % (jobkey, build.result)
James E. Blair23ec1ba2013-01-04 18:06:10 -0800392 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
393 dt = int((build.end_time - build.start_time) * 1000)
James E. Blair552b54f2016-07-22 13:55:32 -0700394 self.statsd.timing(key, dt)
395 self.statsd.incr(key)
James E. Blair80ac1582017-10-09 07:02:40 -0700396 # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
397 # <host>.<project>.<branch>.job.<job>.wait_time
Ian Wienand591afbe2017-10-31 09:27:36 +1100398 if build.start_time:
399 key = '%s.wait_time' % jobkey
400 dt = int((build.start_time - build.execute_time) * 1000)
401 self.statsd.timing(key, dt)
James E. Blair80ac1582017-10-09 07:02:40 -0700402 except Exception:
James E. Blair23ec1ba2013-01-04 18:06:10 -0800403 self.log.exception("Exception reporting runtime stats")
James E. Blaira84f0e42014-02-06 07:09:22 -0800404 event = BuildCompletedEvent(build)
405 self.result_event_queue.put(event)
James E. Blairee743612012-05-29 14:49:32 -0700406 self.wake_event.set()
407
Tobias Henkel34ee0882017-07-31 22:26:12 +0200408 def onMergeCompleted(self, build_set, merged, updated,
James E. Blair1960d682017-04-28 15:44:14 -0700409 commit, files, repo_state):
Tobias Henkel34ee0882017-07-31 22:26:12 +0200410 event = MergeCompletedEvent(build_set, merged,
James E. Blair1960d682017-04-28 15:44:14 -0700411 updated, commit, files, repo_state)
James E. Blair4076e2b2014-01-28 12:42:20 -0800412 self.result_event_queue.put(event)
413 self.wake_event.set()
414
James E. Blair8d692392016-04-08 17:47:58 -0700415 def onNodesProvisioned(self, req):
James E. Blair8d692392016-04-08 17:47:58 -0700416 event = NodesProvisionedEvent(req)
417 self.result_event_queue.put(event)
418 self.wake_event.set()
419
James E. Blaira615c362017-10-02 17:34:42 -0700420 def reconfigureTenant(self, tenant, project):
421 self.log.debug("Submitting tenant reconfiguration event for "
422 "%s due to project %s", tenant.name, project)
423 event = TenantReconfigureEvent(tenant, project)
James E. Blair646322f2017-01-27 15:50:34 -0800424 self.management_event_queue.put(event)
425 self.wake_event.set()
426
James E. Blaire9d45c32012-05-31 09:56:45 -0700427 def reconfigure(self, config):
James E. Blaira615c362017-10-02 17:34:42 -0700428 self.log.debug("Submitting reconfiguration event")
James E. Blair468c8512013-12-06 13:27:19 -0800429 event = ReconfigureEvent(config)
430 self.management_event_queue.put(event)
James E. Blaire9d45c32012-05-31 09:56:45 -0700431 self.wake_event.set()
432 self.log.debug("Waiting for reconfiguration")
James E. Blair468c8512013-12-06 13:27:19 -0800433 event.wait()
James E. Blaire9d45c32012-05-31 09:56:45 -0700434 self.log.debug("Reconfiguration complete")
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +0400435 self.last_reconfigured = int(time.time())
James E. Blair646322f2017-01-27 15:50:34 -0800436 # TODOv3(jeblair): reconfigure time should be per-tenant
James E. Blaire9d45c32012-05-31 09:56:45 -0700437
David Shrewsbury36b2adf2017-07-31 15:40:13 -0400438 def autohold(self, tenant_name, project_name, job_name, reason, count):
David Shrewsburyffab07a2017-07-24 12:45:07 -0400439 key = (tenant_name, project_name, job_name)
440 if count == 0 and key in self.autohold_requests:
441 self.log.debug("Removing autohold for %s", key)
442 del self.autohold_requests[key]
443 else:
444 self.log.debug("Autohold requested for %s", key)
David Shrewsbury36b2adf2017-07-31 15:40:13 -0400445 self.autohold_requests[key] = (count, reason)
David Shrewsburyffab07a2017-07-24 12:45:07 -0400446
Paul Belangerbaca3132016-11-04 12:49:54 -0400447 def promote(self, tenant_name, pipeline_name, change_ids):
448 event = PromoteEvent(tenant_name, pipeline_name, change_ids)
James E. Blair36658cf2013-12-06 17:53:48 -0800449 self.management_event_queue.put(event)
450 self.wake_event.set()
451 self.log.debug("Waiting for promotion")
452 event.wait()
453 self.log.debug("Promotion complete")
454
James E. Blaird27a96d2014-07-10 13:25:13 -0700455 def enqueue(self, trigger_event):
456 event = EnqueueEvent(trigger_event)
457 self.management_event_queue.put(event)
458 self.wake_event.set()
459 self.log.debug("Waiting for enqueue")
460 event.wait()
461 self.log.debug("Enqueue complete")
462
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700463 def exit(self):
464 self.log.debug("Prepare to exit")
465 self._pause = True
466 self._exit = True
467 self.wake_event.set()
468 self.log.debug("Waiting for exit")
469
470 def _get_queue_pickle_file(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100471 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000472 '/var/lib/zuul', expand_user=True)
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700473 return os.path.join(state_dir, 'queue.pickle')
474
James E. Blairce8a2132016-05-19 15:21:52 -0700475 def _get_time_database_dir(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100476 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000477 '/var/lib/zuul', expand_user=True)
James E. Blairce8a2132016-05-19 15:21:52 -0700478 d = os.path.join(state_dir, 'times')
479 if not os.path.exists(d):
480 os.mkdir(d)
481 return d
482
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000483 def _get_project_key_dir(self):
James E. Blaird1de9462017-06-23 20:53:09 +0100484 state_dir = get_default(self.config, 'scheduler', 'state_dir',
Tristan Cacqueray91601d72017-06-15 06:00:12 +0000485 '/var/lib/zuul', expand_user=True)
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000486 key_dir = os.path.join(state_dir, 'keys')
487 if not os.path.exists(key_dir):
488 os.mkdir(key_dir, 0o700)
489 st = os.stat(key_dir)
490 mode = st.st_mode & 0o777
491 if mode != 0o700:
492 raise Exception("Project key directory %s must be mode 0700; "
493 "current mode is %o" % (key_dir, mode))
494 return key_dir
495
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700496 def _save_queue(self):
497 pickle_file = self._get_queue_pickle_file()
498 events = []
499 while not self.trigger_event_queue.empty():
500 events.append(self.trigger_event_queue.get())
501 self.log.debug("Queue length is %s" % len(events))
502 if events:
503 self.log.debug("Saving queue")
504 pickle.dump(events, open(pickle_file, 'wb'))
505
506 def _load_queue(self):
507 pickle_file = self._get_queue_pickle_file()
508 if os.path.exists(pickle_file):
509 self.log.debug("Loading queue")
510 events = pickle.load(open(pickle_file, 'rb'))
511 self.log.debug("Queue length is %s" % len(events))
512 for event in events:
513 self.trigger_event_queue.put(event)
514 else:
515 self.log.debug("No queue file found")
516
517 def _delete_queue(self):
518 pickle_file = self._get_queue_pickle_file()
519 if os.path.exists(pickle_file):
520 self.log.debug("Deleting saved queue")
521 os.unlink(pickle_file)
522
523 def resume(self):
524 try:
525 self._load_queue()
David Shrewsbury70798ea2017-10-23 12:19:13 -0400526 except Exception:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700527 self.log.exception("Unable to load queue")
528 try:
529 self._delete_queue()
David Shrewsbury70798ea2017-10-23 12:19:13 -0400530 except Exception:
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700531 self.log.exception("Unable to delete saved queue")
532 self.log.debug("Resuming queue processing")
533 self.wake_event.set()
534
535 def _doPauseEvent(self):
536 if self._exit:
537 self.log.debug("Exiting")
538 self._save_queue()
539 os._exit(0)
James E. Blaircdccd972013-07-01 12:10:22 -0700540
James E. Blair468c8512013-12-06 13:27:19 -0800541 def _doReconfigureEvent(self, event):
542 # This is called in the scheduler loop after another thread submits
543 # a request
James E. Blaircdccd972013-07-01 12:10:22 -0700544 self.layout_lock.acquire()
James E. Blair468c8512013-12-06 13:27:19 -0800545 self.config = event.config
James E. Blaircdccd972013-07-01 12:10:22 -0700546 try:
James E. Blaira615c362017-10-02 17:34:42 -0700547 self.log.debug("Full reconfiguration beginning")
James E. Blair83005782015-12-11 14:46:03 -0800548 loader = configloader.ConfigLoader()
549 abide = loader.loadConfig(
James E. Blair39840362017-06-23 20:34:02 +0100550 self.config.get('scheduler', 'tenant_config'),
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000551 self._get_project_key_dir(),
James E. Blair83005782015-12-11 14:46:03 -0800552 self, self.merger, self.connections)
James E. Blair59fdbac2015-12-07 17:08:06 -0800553 for tenant in abide.tenants.values():
554 self._reconfigureTenant(tenant)
555 self.abide = abide
James E. Blaircdccd972013-07-01 12:10:22 -0700556 finally:
557 self.layout_lock.release()
James E. Blaira615c362017-10-02 17:34:42 -0700558 self.log.debug("Full reconfiguration complete")
James E. Blaire9d45c32012-05-31 09:56:45 -0700559
James E. Blair646322f2017-01-27 15:50:34 -0800560 def _doTenantReconfigureEvent(self, event):
561 # This is called in the scheduler loop after another thread submits
562 # a request
563 self.layout_lock.acquire()
564 try:
James E. Blaira615c362017-10-02 17:34:42 -0700565 self.log.debug("Tenant reconfiguration beginning")
566 # If a change landed to a project, clear out the cached
567 # config before reconfiguring.
James E. Blair419a8672017-10-18 14:48:25 -0700568 for project in event.projects:
569 project.unparsed_config = None
570 old_tenant = self.abide.tenants[event.tenant_name]
James E. Blair646322f2017-01-27 15:50:34 -0800571 loader = configloader.ConfigLoader()
572 abide = loader.reloadTenant(
James E. Blair39840362017-06-23 20:34:02 +0100573 self.config.get('scheduler', 'tenant_config'),
Ricardo Carrillo Cruz22994f92016-12-02 11:41:58 +0000574 self._get_project_key_dir(),
James E. Blair646322f2017-01-27 15:50:34 -0800575 self, self.merger, self.connections,
James E. Blair419a8672017-10-18 14:48:25 -0700576 self.abide, old_tenant)
577 tenant = abide.tenants[event.tenant_name]
James E. Blair646322f2017-01-27 15:50:34 -0800578 self._reconfigureTenant(tenant)
579 self.abide = abide
580 finally:
581 self.layout_lock.release()
James E. Blaira615c362017-10-02 17:34:42 -0700582 self.log.debug("Tenant reconfiguration complete")
James E. Blair646322f2017-01-27 15:50:34 -0800583
James E. Blairaa30de42017-04-25 10:56:59 -0700584 def _reenqueueGetProject(self, tenant, item):
585 project = item.change.project
James E. Blair6053de42017-04-05 11:27:11 -0700586 # Attempt to get the same project as the one passed in. If
587 # the project is now found on a different connection, return
588 # the new version of the project. If it is no longer
589 # available (due to a connection being removed), return None.
James E. Blairaa30de42017-04-25 10:56:59 -0700590 (trusted, new_project) = tenant.getProject(project.canonical_name)
James E. Blair6053de42017-04-05 11:27:11 -0700591 if new_project:
592 return new_project
James E. Blairaa30de42017-04-25 10:56:59 -0700593 # If this is a non-live item we may be looking at a
594 # "foreign" project, ie, one which is not defined in the
595 # config but is constructed ad-hoc to satisfy a
596 # cross-repo-dependency. Find the corresponding live item
597 # and use its source.
598 child = item
599 while child and not child.live:
600 # This assumes that the queue does not branch behind this
601 # item, which is currently true for non-live items; if
602 # that changes, this traversal will need to be more
603 # complex.
604 if child.items_behind:
605 child = child.items_behind[0]
606 else:
607 child = None
608 if child is item:
609 return None
610 if child and child.live:
611 (child_trusted, child_project) = tenant.getProject(
612 child.change.project.canonical_name)
613 if child_project:
614 source = child_project.source
615 new_project = source.getProject(project.name)
616 return new_project
617 return None
James E. Blair6053de42017-04-05 11:27:11 -0700618
James E. Blair552b54f2016-07-22 13:55:32 -0700619 def _reenqueueTenant(self, old_tenant, tenant):
James E. Blair59fdbac2015-12-07 17:08:06 -0800620 for name, new_pipeline in tenant.layout.pipelines.items():
621 old_pipeline = old_tenant.layout.pipelines.get(name)
622 if not old_pipeline:
623 self.log.warning("No old pipeline matching %s found "
624 "when reconfiguring" % name)
625 continue
626 self.log.debug("Re-enqueueing changes for pipeline %s" % name)
James E. Blair74be3812017-11-30 14:17:25 -0800627 # TODO(jeblair): This supports an undocument and
628 # unanticipated hack to create a static window. If we
629 # really want to support this, maybe we should have a
630 # 'static' type? But it may be in use in the wild, so we
631 # should allow this at least until there's an upgrade
632 # path.
633 if (new_pipeline.window and
634 new_pipeline.window_increase_type == 'exponential' and
635 new_pipeline.window_decrease_type == 'exponential' and
636 new_pipeline.window_increase_factor == 1 and
637 new_pipeline.window_decrease_factor == 1):
638 static_window = True
639 else:
640 static_window = False
641 if old_pipeline.window and (not static_window):
642 new_pipeline.window = max(old_pipeline.window,
643 new_pipeline.window_floor)
James E. Blair59fdbac2015-12-07 17:08:06 -0800644 items_to_remove = []
645 builds_to_cancel = []
646 last_head = None
647 for shared_queue in old_pipeline.queues:
James E. Blair74be3812017-11-30 14:17:25 -0800648 # Attempt to keep window sizes from shrinking where possible
649 new_queue = new_pipeline.getQueue(shared_queue.projects[0])
650 if new_queue and shared_queue.window and (not static_window):
651 new_queue.window = max(shared_queue.window,
652 new_queue.window_floor)
James E. Blair59fdbac2015-12-07 17:08:06 -0800653 for item in shared_queue.queue:
654 if not item.item_ahead:
655 last_head = item
James E. Blair59fdbac2015-12-07 17:08:06 -0800656 item.pipeline = None
657 item.queue = None
James E. Blair6053de42017-04-05 11:27:11 -0700658 item.change.project = self._reenqueueGetProject(
James E. Blairaa30de42017-04-25 10:56:59 -0700659 tenant, item)
James E. Blair872738f2017-10-31 16:40:36 -0700660 # If the old item ahead made it in, re-enqueue
661 # this one behind it.
662 if item.item_ahead in items_to_remove:
663 old_item_ahead = None
664 item_ahead_valid = False
665 else:
666 old_item_ahead = item.item_ahead
667 item_ahead_valid = True
James E. Blairaa30de42017-04-25 10:56:59 -0700668 item.item_ahead = None
669 item.items_behind = []
James E. Blair027ba992017-09-20 13:48:32 -0700670 reenqueued = False
671 if item.change.project:
672 try:
673 reenqueued = new_pipeline.manager.reEnqueueItem(
James E. Blair872738f2017-10-31 16:40:36 -0700674 item, last_head, old_item_ahead,
675 item_ahead_valid=item_ahead_valid)
James E. Blair027ba992017-09-20 13:48:32 -0700676 except Exception:
677 self.log.exception(
678 "Exception while re-enqueing item %s",
679 item)
680 if reenqueued:
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700681 for build in item.current_build_set.getBuilds():
Fredrik Medleyf8aec832015-09-28 13:40:20 +0200682 new_job = item.getJob(build.job.name)
683 if new_job:
684 build.job = new_job
James E. Blair3b5ff3b2016-07-21 10:08:24 -0700685 else:
686 item.removeBuild(build)
687 builds_to_cancel.append(build)
688 else:
James E. Blair59fdbac2015-12-07 17:08:06 -0800689 items_to_remove.append(item)
690 for item in items_to_remove:
James E. Blairb5a8f0b2017-07-07 17:01:18 -0700691 self.log.warning(
692 "Removing item %s during reconfiguration" % (item,))
James E. Blair59fdbac2015-12-07 17:08:06 -0800693 for build in item.current_build_set.getBuilds():
694 builds_to_cancel.append(build)
695 for build in builds_to_cancel:
696 self.log.warning(
697 "Canceling build %s during reconfiguration" % (build,))
698 try:
Paul Belanger174a8272017-03-14 13:20:10 -0400699 self.executor.cancel(build)
James E. Blair59fdbac2015-12-07 17:08:06 -0800700 except Exception:
701 self.log.exception(
702 "Exception while canceling build %s "
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100703 "for change %s" % (build, build.build_set.item.change))
James E. Blair8fd207b2017-11-30 13:44:38 -0800704 # In the unlikely case that a build is removed and
705 # later added back, make sure we clear out the nodeset
706 # so it gets requested again.
707 try:
708 build.build_set.removeJobNodeSet(build.job.name)
709 except Exception:
710 self.log.exception(
711 "Exception while removing nodeset from build %s "
712 "for change %s" % (build, build.build_set.item.change))
Tobias Henkelfb91a492017-02-15 07:29:43 +0100713 finally:
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100714 tenant.semaphore_handler.release(
715 build.build_set.item, build.job)
James E. Blair552b54f2016-07-22 13:55:32 -0700716
717 def _reconfigureTenant(self, tenant):
718 # This is called from _doReconfigureEvent while holding the
719 # layout lock
720 old_tenant = self.abide.tenants.get(tenant.name)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100721
James E. Blair552b54f2016-07-22 13:55:32 -0700722 if old_tenant:
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100723 # Copy over semaphore handler so we don't loose the currently
724 # held semaphores.
725 tenant.semaphore_handler = old_tenant.semaphore_handler
726
James E. Blair552b54f2016-07-22 13:55:32 -0700727 self._reenqueueTenant(old_tenant, tenant)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100728
James E. Blairb0a95ab2017-10-18 09:39:18 -0700729 # TODOv3(jeblair): update for tenants
730 # self.maintainConnectionCache()
James E. Blaire511d2f2016-12-08 15:22:26 -0800731 self.connections.reconfigureDrivers(tenant)
Tobias Henkel9a0e1942017-03-20 16:16:02 +0100732
James E. Blaire511d2f2016-12-08 15:22:26 -0800733 # TODOv3(jeblair): remove postconfig calls?
James E. Blair59fdbac2015-12-07 17:08:06 -0800734 for pipeline in tenant.layout.pipelines.values():
James E. Blair552b54f2016-07-22 13:55:32 -0700735 for trigger in pipeline.triggers:
736 trigger.postConfig(pipeline)
James E. Blair83005782015-12-11 14:46:03 -0800737 for reporter in pipeline.actions:
738 reporter.postConfig()
Jesse Keating71a47ff2017-06-06 11:36:43 -0700739 self.tenant_last_reconfigured[tenant.name] = int(time.time())
James E. Blair552b54f2016-07-22 13:55:32 -0700740 if self.statsd:
James E. Blair59fdbac2015-12-07 17:08:06 -0800741 try:
James E. Blair552b54f2016-07-22 13:55:32 -0700742 for pipeline in tenant.layout.pipelines.values():
James E. Blair59fdbac2015-12-07 17:08:06 -0800743 items = len(pipeline.getAllItems())
James E. Blair57bf14d2017-10-21 09:09:48 -0700744 # stats.gauges.zuul.tenant.<tenant>.pipeline.
745 # <pipeline>.current_changes
746 key = 'zuul.tenant.%s.pipeline.%s' % (
747 tenant.name, pipeline.name)
James E. Blair552b54f2016-07-22 13:55:32 -0700748 self.statsd.gauge(key + '.current_changes', items)
James E. Blair59fdbac2015-12-07 17:08:06 -0800749 except Exception:
750 self.log.exception("Exception reporting initial "
751 "pipeline stats:")
752
James E. Blair36658cf2013-12-06 17:53:48 -0800753 def _doPromoteEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400754 tenant = self.abide.tenants.get(event.tenant_name)
755 pipeline = tenant.layout.pipelines[event.pipeline_name]
James E. Blair36658cf2013-12-06 17:53:48 -0800756 change_ids = [c.split(',') for c in event.change_ids]
757 items_to_enqueue = []
758 change_queue = None
759 for shared_queue in pipeline.queues:
760 if change_queue:
761 break
762 for item in shared_queue.queue:
763 if (item.change.number == change_ids[0][0] and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000764 item.change.patchset == change_ids[0][1]):
James E. Blair36658cf2013-12-06 17:53:48 -0800765 change_queue = shared_queue
766 break
767 if not change_queue:
768 raise Exception("Unable to find shared change queue for %s" %
769 event.change_ids[0])
770 for number, patchset in change_ids:
771 found = False
772 for item in change_queue.queue:
773 if (item.change.number == number and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000774 item.change.patchset == patchset):
James E. Blair36658cf2013-12-06 17:53:48 -0800775 found = True
776 items_to_enqueue.append(item)
777 break
778 if not found:
779 raise Exception("Unable to find %s,%s in queue %s" %
780 (number, patchset, change_queue))
781 for item in change_queue.queue[:]:
782 if item not in items_to_enqueue:
783 items_to_enqueue.append(item)
784 pipeline.manager.cancelJobs(item)
785 pipeline.manager.dequeueItem(item)
786 for item in items_to_enqueue:
Sean Daguef39b9ca2014-01-10 21:34:35 -0500787 pipeline.manager.addChange(
788 item.change,
789 enqueue_time=item.enqueue_time,
James E. Blairf9ab8842014-07-10 13:12:07 -0700790 quiet=True,
791 ignore_requirements=True)
James E. Blair36658cf2013-12-06 17:53:48 -0800792
James E. Blaird27a96d2014-07-10 13:25:13 -0700793 def _doEnqueueEvent(self, event):
Paul Belangerbaca3132016-11-04 12:49:54 -0400794 tenant = self.abide.tenants.get(event.tenant_name)
James E. Blair0ffa0102017-03-30 13:11:33 -0700795 (trusted, project) = tenant.getProject(event.project_name)
Paul Belangerbaca3132016-11-04 12:49:54 -0400796 pipeline = tenant.layout.pipelines[event.forced_pipeline]
James E. Blair6053de42017-04-05 11:27:11 -0700797 change = project.source.getChange(event, project)
James E. Blaird27a96d2014-07-10 13:25:13 -0700798 self.log.debug("Event %s for change %s was directly assigned "
799 "to pipeline %s" % (event, change, self))
James E. Blaird27a96d2014-07-10 13:25:13 -0700800 pipeline.manager.addChange(change, ignore_requirements=True)
801
James E. Blaire9d45c32012-05-31 09:56:45 -0700802 def _areAllBuildsComplete(self):
803 self.log.debug("Checking if all builds are complete")
James E. Blair8b1dc3f2016-07-05 16:49:00 -0700804 if self.merger.areMergesOutstanding():
805 self.log.debug("Waiting on merger")
806 return False
James E. Blaire9d45c32012-05-31 09:56:45 -0700807 waiting = False
Paul Belangerdebd7a72016-11-11 19:56:15 -0500808 for tenant in self.abide.tenants.values():
809 for pipeline in tenant.layout.pipelines.values():
810 for item in pipeline.getAllItems():
811 for build in item.current_build_set.getBuilds():
812 if build.result is None:
813 self.log.debug("%s waiting on %s" %
814 (pipeline.manager, build))
815 waiting = True
James E. Blaire9d45c32012-05-31 09:56:45 -0700816 if not waiting:
817 self.log.debug("All builds are complete")
818 return True
James E. Blaire9d45c32012-05-31 09:56:45 -0700819 return False
820
James E. Blairee743612012-05-29 14:49:32 -0700821 def run(self):
James E. Blair552b54f2016-07-22 13:55:32 -0700822 if self.statsd:
James E. Blair71e94122012-12-24 17:53:08 -0800823 self.log.debug("Statsd enabled")
824 else:
Tobias Henkel8f802982017-12-18 19:32:39 +0100825 self.log.debug("Statsd not configured")
James E. Blairee743612012-05-29 14:49:32 -0700826 while True:
827 self.log.debug("Run handler sleeping")
828 self.wake_event.wait()
829 self.wake_event.clear()
James E. Blairb0fcae42012-07-17 11:12:10 -0700830 if self._stopped:
James E. Blair4076e2b2014-01-28 12:42:20 -0800831 self.log.debug("Run handler stopping")
James E. Blairb0fcae42012-07-17 11:12:10 -0700832 return
James E. Blairee743612012-05-29 14:49:32 -0700833 self.log.debug("Run handler awake")
James E. Blaira84f0e42014-02-06 07:09:22 -0800834 self.run_handler_lock.acquire()
James E. Blairee743612012-05-29 14:49:32 -0700835 try:
James E. Blaira84f0e42014-02-06 07:09:22 -0800836 while not self.management_event_queue.empty():
James E. Blair468c8512013-12-06 13:27:19 -0800837 self.process_management_queue()
James E. Blaircdccd972013-07-01 12:10:22 -0700838
James E. Blair263fba92013-02-27 13:07:19 -0800839 # Give result events priority -- they let us stop builds,
Paul Belanger174a8272017-03-14 13:20:10 -0400840 # whereas trigger events cause us to execute builds.
James E. Blaira84f0e42014-02-06 07:09:22 -0800841 while not self.result_event_queue.empty():
James E. Blairee743612012-05-29 14:49:32 -0700842 self.process_result_queue()
James E. Blaira84f0e42014-02-06 07:09:22 -0800843
844 if not self._pause:
845 while not self.trigger_event_queue.empty():
James E. Blair263fba92013-02-27 13:07:19 -0800846 self.process_event_queue()
James E. Blaire9d45c32012-05-31 09:56:45 -0700847
James E. Blair5d5bc2b2012-07-06 10:24:01 -0700848 if self._pause and self._areAllBuildsComplete():
849 self._doPauseEvent()
James E. Blaire9d45c32012-05-31 09:56:45 -0700850
James E. Blair59fdbac2015-12-07 17:08:06 -0800851 for tenant in self.abide.tenants.values():
852 for pipeline in tenant.layout.pipelines.values():
853 while pipeline.manager.processQueue():
854 pass
James E. Blair0e933c52013-07-11 10:18:52 -0700855
James E. Blaira84f0e42014-02-06 07:09:22 -0800856 except Exception:
James E. Blairee743612012-05-29 14:49:32 -0700857 self.log.exception("Exception in run handler:")
James E. Blaira84f0e42014-02-06 07:09:22 -0800858 # There may still be more events to process
859 self.wake_event.set()
860 finally:
861 self.run_handler_lock.release()
James E. Blairee743612012-05-29 14:49:32 -0700862
James E. Blairb0a95ab2017-10-18 09:39:18 -0700863 def maintainConnectionCache(self):
864 # TODOv3(jeblair): update for tenants
865 relevant = set()
866 for tenant in self.abide.tenants.values():
867 for pipeline in tenant.layout.pipelines.values():
868 self.log.debug("Gather relevant cache items for: %s" %
869 pipeline)
870
871 for item in pipeline.getAllItems():
872 relevant.add(item.change)
873 relevant.update(item.change.getRelatedChanges())
874 for connection in self.connections.values():
875 connection.maintainCache(relevant)
876 self.log.debug(
877 "End maintain connection cache for: %s" % connection)
878 self.log.debug("Connection cache size: %s" % len(relevant))
879
James E. Blairee743612012-05-29 14:49:32 -0700880 def process_event_queue(self):
881 self.log.debug("Fetching trigger event")
882 event = self.trigger_event_queue.get()
883 self.log.debug("Processing trigger event %s" % event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800884 try:
James E. Blairaa30de42017-04-25 10:56:59 -0700885 full_project_name = ('/'.join([event.project_hostname,
886 event.project_name]))
James E. Blair59fdbac2015-12-07 17:08:06 -0800887 for tenant in self.abide.tenants.values():
James E. Blairaa30de42017-04-25 10:56:59 -0700888 (trusted, project) = tenant.getProject(full_project_name)
889 if project is None:
890 continue
891 try:
892 change = project.source.getChange(event)
893 except exceptions.ChangeNotFound as e:
894 self.log.debug("Unable to get change %s from "
895 "source %s",
896 e.change, project.source)
897 continue
James E. Blair72facdc2017-08-17 10:29:12 -0700898 if ((event.branch_updated and
899 hasattr(change, 'files') and
900 change.updatesConfig()) or
901 event.branch_created or
902 event.branch_deleted):
903 # The change that just landed updates the config
904 # or a branch was just created or deleted. Clear
905 # out cached data for this project and perform a
906 # reconfiguration.
James E. Blaira615c362017-10-02 17:34:42 -0700907 self.reconfigureTenant(tenant, change.project)
James E. Blair59fdbac2015-12-07 17:08:06 -0800908 for pipeline in tenant.layout.pipelines.values():
Jan Hruban324ca5b2015-11-05 19:28:54 +0100909 if event.isPatchsetCreated():
James E. Blair59fdbac2015-12-07 17:08:06 -0800910 pipeline.manager.removeOldVersionsOfChange(change)
Jan Hruban324ca5b2015-11-05 19:28:54 +0100911 elif event.isChangeAbandoned():
James E. Blair59fdbac2015-12-07 17:08:06 -0800912 pipeline.manager.removeAbandonedChange(change)
913 if pipeline.manager.eventMatches(event, change):
James E. Blair59fdbac2015-12-07 17:08:06 -0800914 pipeline.manager.addChange(change)
James E. Blaira84f0e42014-02-06 07:09:22 -0800915 finally:
James E. Blairff791972013-01-09 11:45:43 -0800916 self.trigger_event_queue.task_done()
James E. Blair1e8dd892012-05-30 09:15:05 -0700917
James E. Blair468c8512013-12-06 13:27:19 -0800918 def process_management_queue(self):
919 self.log.debug("Fetching management event")
920 event = self.management_event_queue.get()
921 self.log.debug("Processing management event %s" % event)
James E. Blair36658cf2013-12-06 17:53:48 -0800922 try:
923 if isinstance(event, ReconfigureEvent):
924 self._doReconfigureEvent(event)
James E. Blair21603e62017-02-20 16:23:05 -0500925 elif isinstance(event, TenantReconfigureEvent):
James E. Blair646322f2017-01-27 15:50:34 -0800926 self._doTenantReconfigureEvent(event)
James E. Blair36658cf2013-12-06 17:53:48 -0800927 elif isinstance(event, PromoteEvent):
928 self._doPromoteEvent(event)
James E. Blaird27a96d2014-07-10 13:25:13 -0700929 elif isinstance(event, EnqueueEvent):
930 self._doEnqueueEvent(event.trigger_event)
James E. Blair36658cf2013-12-06 17:53:48 -0800931 else:
932 self.log.error("Unable to handle event %s" % event)
933 event.done()
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700934 except Exception:
James E. Blair59424ea2017-07-11 09:52:58 -0700935 self.log.exception("Exception in management event:")
Morgan Fainberg1b9bd782016-05-30 14:03:30 -0700936 event.exception(sys.exc_info())
James E. Blair468c8512013-12-06 13:27:19 -0800937 self.management_event_queue.task_done()
938
James E. Blairee743612012-05-29 14:49:32 -0700939 def process_result_queue(self):
940 self.log.debug("Fetching result event")
James E. Blaira84f0e42014-02-06 07:09:22 -0800941 event = self.result_event_queue.get()
942 self.log.debug("Processing result event %s" % event)
943 try:
944 if isinstance(event, BuildStartedEvent):
945 self._doBuildStartedEvent(event)
946 elif isinstance(event, BuildCompletedEvent):
947 self._doBuildCompletedEvent(event)
James E. Blair4076e2b2014-01-28 12:42:20 -0800948 elif isinstance(event, MergeCompletedEvent):
949 self._doMergeCompletedEvent(event)
James E. Blair8d692392016-04-08 17:47:58 -0700950 elif isinstance(event, NodesProvisionedEvent):
951 self._doNodesProvisionedEvent(event)
James E. Blaira84f0e42014-02-06 07:09:22 -0800952 else:
953 self.log.error("Unable to handle event %s" % event)
954 finally:
955 self.result_event_queue.task_done()
956
957 def _doBuildStartedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800958 build = event.build
959 if build.build_set is not build.build_set.item.current_build_set:
960 self.log.warning("Build %s is not in the current build set" %
961 (build,))
962 return
963 pipeline = build.build_set.item.pipeline
964 if not pipeline:
965 self.log.warning("Build %s is not associated with a pipeline" %
966 (build,))
967 return
James E. Blairce8a2132016-05-19 15:21:52 -0700968 try:
969 build.estimated_time = float(self.time_database.getEstimatedTime(
James E. Blairae0f23c2017-09-13 10:55:15 -0600970 build))
James E. Blairce8a2132016-05-19 15:21:52 -0700971 except Exception:
972 self.log.exception("Exception estimating build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -0800973 pipeline.manager.onBuildStarted(event.build)
James E. Blaira84f0e42014-02-06 07:09:22 -0800974
975 def _doBuildCompletedEvent(self, event):
James E. Blair4076e2b2014-01-28 12:42:20 -0800976 build = event.build
James E. Blaire18d4602017-01-05 11:17:28 -0800977
978 # Regardless of any other conditions which might cause us not
979 # to pass this on to the pipeline manager, make sure we return
980 # the nodes to nodepool.
981 try:
James E. Blair8fd207b2017-11-30 13:44:38 -0800982 nodeset = build.nodeset
David Shrewsburyffab07a2017-07-24 12:45:07 -0400983 autohold_key = (build.pipeline.layout.tenant.name,
984 build.build_set.item.change.project.canonical_name,
985 build.job.name)
Clark Boylanaeb5a122017-10-20 14:00:11 -0700986 if (build.result == "FAILURE" and
987 autohold_key in self.autohold_requests):
988 # We explicitly only want to hold nodes for jobs if they have
989 # failed and have an autohold request.
990 try:
991 self.nodepool.holdNodeSet(nodeset, autohold_key)
992 except Exception:
993 self.log.exception("Unable to process autohold for %s:",
994 autohold_key)
995 if autohold_key in self.autohold_requests:
996 self.log.debug("Removing autohold %s due to exception",
997 autohold_key)
998 del self.autohold_requests[autohold_key]
David Shrewsburyffab07a2017-07-24 12:45:07 -0400999
James E. Blair1511bc32017-01-18 09:25:31 -08001000 self.nodepool.returnNodeSet(nodeset)
James E. Blaire18d4602017-01-05 11:17:28 -08001001 except Exception:
1002 self.log.exception("Unable to return nodeset %s" % (nodeset,))
1003
James E. Blair4076e2b2014-01-28 12:42:20 -08001004 if build.build_set is not build.build_set.item.current_build_set:
James E. Blaire18d4602017-01-05 11:17:28 -08001005 self.log.debug("Build %s is not in the current build set" %
1006 (build,))
James E. Blair4076e2b2014-01-28 12:42:20 -08001007 return
1008 pipeline = build.build_set.item.pipeline
1009 if not pipeline:
1010 self.log.warning("Build %s is not associated with a pipeline" %
1011 (build,))
1012 return
James E. Blairce8a2132016-05-19 15:21:52 -07001013 if build.end_time and build.start_time and build.result:
1014 duration = build.end_time - build.start_time
Paul Belanger87e4ab02016-06-08 14:17:20 -04001015 try:
James E. Blairae0f23c2017-09-13 10:55:15 -06001016 self.time_database.update(build, duration, build.result)
Paul Belanger87e4ab02016-06-08 14:17:20 -04001017 except Exception:
1018 self.log.exception("Exception recording build time:")
James E. Blair4076e2b2014-01-28 12:42:20 -08001019 pipeline.manager.onBuildCompleted(event.build)
1020
1021 def _doMergeCompletedEvent(self, event):
1022 build_set = event.build_set
1023 if build_set is not build_set.item.current_build_set:
1024 self.log.warning("Build set %s is not current" % (build_set,))
1025 return
1026 pipeline = build_set.item.pipeline
1027 if not pipeline:
1028 self.log.warning("Build set %s is not associated with a pipeline" %
1029 (build_set,))
1030 return
1031 pipeline.manager.onMergeCompleted(event)
James E. Blairee743612012-05-29 14:49:32 -07001032
James E. Blair8d692392016-04-08 17:47:58 -07001033 def _doNodesProvisionedEvent(self, event):
1034 request = event.request
David Shrewsbury94e95882017-10-04 15:26:04 -04001035 request_id = event.request_id
James E. Blair8d692392016-04-08 17:47:58 -07001036 build_set = request.build_set
James E. Blaira38c28e2017-01-04 10:33:20 -08001037
David Shrewsbury94e95882017-10-04 15:26:04 -04001038 self.nodepool.acceptNodes(request, request_id)
James E. Blaircbbce0d2017-05-19 07:28:29 -07001039 if request.canceled:
1040 return
James E. Blaira38c28e2017-01-04 10:33:20 -08001041
James E. Blair8d692392016-04-08 17:47:58 -07001042 if build_set is not build_set.item.current_build_set:
1043 self.log.warning("Build set %s is not current" % (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -08001044 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -08001045 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -07001046 return
1047 pipeline = build_set.item.pipeline
1048 if not pipeline:
1049 self.log.warning("Build set %s is not associated with a pipeline" %
1050 (build_set,))
James E. Blair6ab79e02017-01-06 10:10:17 -08001051 if request.fulfilled:
James E. Blair1511bc32017-01-18 09:25:31 -08001052 self.nodepool.returnNodeSet(request.nodeset)
James E. Blair8d692392016-04-08 17:47:58 -07001053 return
1054 pipeline.manager.onNodesProvisioned(event)
1055
Paul Belanger6349d152016-10-30 16:21:17 -04001056 def formatStatusJSON(self, tenant_name):
James E. Blair59fdbac2015-12-07 17:08:06 -08001057 # TODOv3(jeblair): use tenants
James E. Blair8dbd56a2012-12-22 10:55:10 -08001058 data = {}
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001059
1060 data['zuul_version'] = self.zuul_version
Tobias Henkelb4407fc2017-07-07 13:52:56 +02001061 websocket_url = get_default(self.config, 'web', 'websocket_url', None)
Sergey Lukjanov5ba961b2013-12-27 01:21:04 +04001062
James E. Blair8dbd56a2012-12-22 10:55:10 -08001063 if self._pause:
1064 ret = '<p><b>Queue only mode:</b> preparing to '
James E. Blair8dbd56a2012-12-22 10:55:10 -08001065 if self._exit:
1066 ret += 'exit'
1067 ret += ', queue length: %s' % self.trigger_event_queue.qsize()
1068 ret += '</p>'
1069 data['message'] = ret
1070
James E. Blairfb682cc2013-02-26 15:23:27 -08001071 data['trigger_event_queue'] = {}
1072 data['trigger_event_queue']['length'] = \
1073 self.trigger_event_queue.qsize()
1074 data['result_event_queue'] = {}
1075 data['result_event_queue']['length'] = \
1076 self.result_event_queue.qsize()
James E. Blair3692b612017-10-18 13:59:41 -07001077 data['management_event_queue'] = {}
1078 data['management_event_queue']['length'] = \
1079 self.management_event_queue.qsize()
James E. Blairfb682cc2013-02-26 15:23:27 -08001080
Sergey Lukjanov5d0438d2013-12-24 03:36:39 +04001081 if self.last_reconfigured:
1082 data['last_reconfigured'] = self.last_reconfigured * 1000
1083
James E. Blair8dbd56a2012-12-22 10:55:10 -08001084 pipelines = []
1085 data['pipelines'] = pipelines
Paul Belanger6349d152016-10-30 16:21:17 -04001086 tenant = self.abide.tenants.get(tenant_name)
Tristan Cacqueray8e1847a2018-01-11 02:45:55 +00001087 if not tenant:
1088 self.log.warning("Tenant %s isn't loaded" % tenant_name)
1089 return json.dumps(
1090 {"message": "Tenant %s isn't ready" % tenant_name})
Paul Belanger6349d152016-10-30 16:21:17 -04001091 for pipeline in tenant.layout.pipelines.values():
Tobias Henkelb4407fc2017-07-07 13:52:56 +02001092 pipelines.append(pipeline.formatStatusJSON(websocket_url))
James E. Blair8dbd56a2012-12-22 10:55:10 -08001093 return json.dumps(data)
James E. Blair0e4c7912018-01-02 14:20:17 -08001094
1095 def onChangeUpdated(self, change):
1096 """Remove stale dependency references on change update.
1097
1098 When a change is updated with a new patchset, other changes in
1099 the system may still have a reference to the old patchset in
1100 their dependencies. Search for those (across all sources) and
1101 mark that their dependencies are out of date. This will cause
1102 them to be refreshed the next time the queue processor
1103 examines them.
1104 """
1105
1106 self.log.debug("Change %s has been updated, clearing dependent "
1107 "change caches", change)
1108 for source in self.connections.getSources():
1109 for other_change in source.getCachedChanges():
1110 if other_change.commit_needs_changes is None:
1111 continue
1112 for dep in other_change.commit_needs_changes:
1113 if change.isUpdateOf(dep):
1114 other_change.refresh_deps = True
1115 change.refresh_deps = True