Wait for internal gearman server to settle.
If we're using the internal gearman server, wait for workers
to register their functions before proceeding with startup.
This is mostly to handle restarts where we may be immediately
ready to run jobs, but workers may not have registered functions
yet. In that case, we would immediately start declaring jobs
LOST. So give them a chance to show up first.
The gearman-plugin for jenkins currently tries to reconnect
every 3 seconds. We could probably even lower that to 1 or 2.
Change-Id: I173b16ba78cecac91acec26b4f2d55f38610e5a2
Reviewed-on: https://review.openstack.org/31735
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: Clark Boylan <clark.boylan@gmail.com>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Tested-by: Jenkins
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 35dc081..5651462 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -95,6 +95,49 @@
if build.__gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(job)
+ def waitForGearmanToSettle(self):
+ # If we're running the internal gearman server, it's possible
+ # that after a restart or reload, we may be immediately ready
+ # to run jobs but all the gearman workers may not have
+ # registered yet. Give them a sporting chance to show up
+ # before we start declaring jobs lost because we don't have
+ # gearman functions registered for them.
+
+ # Spend up to 30 seconds after we connect to the gearman
+ # server waiting for the set of defined jobs to become
+ # consistent over a sliding 5 second window.
+
+ self.log.info("Waiting for connection to internal Gearman server")
+ self.waitForServer()
+ self.log.info("Waiting for gearman function set to settle")
+ start = time.time()
+ last_change = start
+ all_functions = set()
+ while time.time() - start < 30:
+ now = time.time()
+ last_functions = set()
+ for connection in self.active_connections:
+ try:
+ req = gear.StatusAdminRequest()
+ connection.sendAdminRequest(req)
+ except Exception:
+ self.log.exception("Exception while checking functions")
+ continue
+ for line in req.response.split('\n'):
+ parts = [x.strip() for x in line.split()]
+ if not parts or parts[0] == '.':
+ continue
+ last_functions.add(parts[0])
+ if last_functions != all_functions:
+ last_change = now
+ all_functions.update(last_functions)
+ else:
+ if now - last_change > 5:
+ self.log.info("Gearman function set has settled")
+ break
+ time.sleep(1)
+ self.log.info("Done waiting for Gearman server")
+
class Gearman(object):
log = logging.getLogger("zuul.Gearman")
@@ -113,6 +156,10 @@
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port)
+ if (config.has_option('gearman_server', 'start') and
+ config.getboolean('gearman_server', 'start')):
+ self.gearman.waitForGearmanToSettle()
+
self.cleanup_thread = GearmanCleanup(self)
self.cleanup_thread.start()
self.function_cache = set()