Add graceful exit.
A SIGUSR1 will cause zuul to queue new events, wait for existing
jobs to finish, then save the queue and exit.
It will likely take quite a while to complete (perhaps an hour),
so it's not implemented as a SIGTERM handler.
Can be used in an init script to implement a graceful restart.
Change-Id: I09fce571e971f16b5d20c5d69d595a05c7f6a4ba
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index c1b2a19..41f2386 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -12,3 +12,4 @@
layout_config=/etc/zuul/layout.yaml
log_config=/etc/zuul/logging.yaml
pidfile=/var/run/zuul/zuul.pid
+state_dir=/var/lib/zuul
diff --git a/zuul-server b/zuul-server
index c55c252..0f89cd2 100755
--- a/zuul-server
+++ b/zuul-server
@@ -69,6 +69,10 @@
self.sched.reconfigure(self.config)
signal.signal(signal.SIGHUP, self.reconfigure_handler)
+ def exit_handler(self, signum, frame):
+ signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ self.sched.exit()
+
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
@@ -85,7 +89,9 @@
self.sched.start()
self.sched.reconfigure(self.config)
+ self.sched.resume()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
+ signal.signal(signal.SIGUSR1, self.exit_handler)
while True:
signal.pause()
@@ -95,6 +101,21 @@
server.parse_arguments()
server.read_config()
+ if server.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ test_fn = os.path.join(state_dir, 'test')
+ try:
+ f = open(test_fn, 'w')
+ f.close()
+ os.unlink(test_fn)
+ except:
+ print
+ print "Unable to write to state directory: %s" % state_dir
+ print
+ raise
+
if server.config.has_option('zuul', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
else:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 740c2bf..3e279d6 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -18,6 +18,7 @@
import logging
import re
import yaml
+import pickle
from model import Job, Change, Project, ChangeQueue, EventFilter
@@ -29,6 +30,9 @@
threading.Thread.__init__(self)
self.wake_event = threading.Event()
self.reconfigure_complete_event = threading.Event()
+ self._pause = False
+ self._reconfigure = False
+ self._exit = False
self.launcher = None
self.trigger = None
@@ -160,21 +164,77 @@
self.wake_event.set()
def reconfigure(self, config):
- self.log.debug("Reconfigure")
+ self.log.debug("Prepare to reconfigure")
self.config = config
- self._reconfigure_flag = True
+ self._pause = True
+ self._reconfigure = True
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
self.reconfigure_complete_event.wait()
self.reconfigure_complete_event.clear()
self.log.debug("Reconfiguration complete")
- def _doReconfigure(self):
- self.log.debug("Performing reconfiguration")
- self._init()
- self._parseConfig(self.config.get('zuul', 'layout_config'))
- self._reconfigure_flag = False
- self.reconfigure_complete_event.set()
+ def exit(self):
+ self.log.debug("Prepare to exit")
+ self._pause = True
+ self._exit = True
+ self.wake_event.set()
+ self.log.debug("Waiting for exit")
+
+ def _get_queue_pickle_file(self):
+ state_dir = os.path.expanduser(self.config.get('zuul', 'state_dir'))
+ return os.path.join(state_dir, 'queue.pickle')
+
+ def _save_queue(self):
+ pickle_file = self._get_queue_pickle_file()
+ events = []
+ while not self.trigger_event_queue.empty():
+ events.append(self.trigger_event_queue.get())
+ self.log.debug("Queue length is %s" % len(events))
+ if events:
+ self.log.debug("Saving queue")
+ pickle.dump(events, open(pickle_file, 'wb'))
+
+ def _load_queue(self):
+ pickle_file = self._get_queue_pickle_file()
+ if os.path.exists(pickle_file):
+ self.log.debug("Loading queue")
+ events = pickle.load(open(pickle_file, 'rb'))
+ self.log.debug("Queue length is %s" % len(events))
+ for event in events:
+ self.trigger_event_queue.put(event)
+ else:
+ self.log.debug("No queue file found")
+
+ def _delete_queue(self):
+ pickle_file = self._get_queue_pickle_file()
+ if os.path.exists(pickle_file):
+ self.log.debug("Deleting saved queue")
+ os.unlink(pickle_file)
+
+ def resume(self):
+ try:
+ self._load_queue()
+ except:
+ self.log.exception("Unable to load queue")
+ try:
+ self._delete_queue()
+ except:
+ self.log.exception("Unable to delete saved queue")
+ self.log.debug("Resuming queue processing")
+ self.wake_event.set()
+
+ def _doPauseEvent(self):
+ if self._exit:
+ self.log.debug("Exiting")
+ self._save_queue()
+ os._exit(0)
+ if self._reconfigure:
+ self.log.debug("Performing reconfiguration")
+ self._init()
+ self._parseConfig(self.config.get('zuul', 'layout_config'))
+ self._pause = False
+ self.reconfigure_complete_event.set()
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@@ -196,17 +256,17 @@
self.wake_event.clear()
self.log.debug("Run handler awake")
try:
- if not self._reconfigure_flag:
+ if not self._pause:
if not self.trigger_event_queue.empty():
self.process_event_queue()
if not self.result_event_queue.empty():
self.process_result_queue()
- if self._reconfigure_flag and self._areAllBuildsComplete():
- self._doReconfigure()
+ if self._pause and self._areAllBuildsComplete():
+ self._doPauseEvent()
- if not self._reconfigure_flag:
+ if not self._pause:
if not (self.trigger_event_queue.empty() and
self.result_event_queue.empty()):
self.wake_event.set()