Change mutex to counting semaphore
The mutex in zuul is great but is limited to run one job at the same
time. Some use cases like using a limited number floating licenses in
jobs cannot be handled with this. Thus this changes the mutex
functionality to a counting semaphore (which defaults to 1).
This is a port of Ida589e49bc6694f4ccc4c586e0d43b391b8c3ae4 to zuulv3
branch.
Change-Id: Icf4013a6215e2b10ca8e6309928b9e5881dda02c
diff --git a/zuul/model.py b/zuul/model.py
index d347071..0ce332f 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -14,6 +14,8 @@
import abc
import copy
+
+import logging
import os
import re
import struct
@@ -760,7 +762,7 @@
post_run=(),
run=(),
implied_run=(),
- mutex=None,
+ semaphore=None,
attempts=3,
final=False,
roles=frozenset(),
@@ -1369,7 +1371,7 @@
return False
return self.item_ahead.isHoldingFollowingChanges()
- def findJobsToRun(self, mutex):
+ def findJobsToRun(self, semaphore_handler):
torun = []
if not self.live:
return []
@@ -1408,9 +1410,9 @@
# The nodes for this job are not ready, skip
# it for now.
continue
- if mutex.acquire(self, job):
- # If this job needs a mutex, either acquire it or make
- # sure that we have it before running the job.
+ if semaphore_handler.acquire(self, job):
+ # If this job needs a semaphore, either acquire it or
+ # make sure that we have it before running the job.
torun.append(job)
return torun
@@ -2174,6 +2176,7 @@
self.projects = {}
self.nodesets = []
self.secrets = []
+ self.semaphores = []
def copy(self):
r = UnparsedTenantConfig()
@@ -2183,6 +2186,7 @@
r.projects = copy.deepcopy(self.projects)
r.nodesets = copy.deepcopy(self.nodesets)
r.secrets = copy.deepcopy(self.secrets)
+ r.semaphores = copy.deepcopy(self.semaphores)
return r
def extend(self, conf):
@@ -2194,6 +2198,7 @@
self.projects.setdefault(k, []).extend(v)
self.nodesets.extend(conf.nodesets)
self.secrets.extend(conf.secrets)
+ self.semaphores.extend(conf.semaphores)
return
if not isinstance(conf, list):
@@ -2224,6 +2229,8 @@
self.nodesets.append(value)
elif key == 'secret':
self.secrets.append(value)
+ elif key == 'semaphore':
+ self.semaphores.append(value)
else:
raise Exception("Configuration item `%s` not recognized "
"(when parsing %s)" %
@@ -2247,6 +2254,7 @@
self.jobs = {'noop': [Job('noop')]}
self.nodesets = {}
self.secrets = {}
+ self.semaphores = {}
def getJob(self, name):
if name in self.jobs:
@@ -2285,6 +2293,11 @@
raise Exception("Secret %s already defined" % (secret.name,))
self.secrets[secret.name] = secret
+ def addSemaphore(self, semaphore):
+ if semaphore.name in self.semaphores:
+ raise Exception("Semaphore %s already defined" % (semaphore.name,))
+ self.semaphores[semaphore.name] = semaphore
+
def addPipeline(self, pipeline):
self.pipelines[pipeline.name] = pipeline
@@ -2355,6 +2368,95 @@
return ret
+class Semaphore(object):
+ def __init__(self, name, max=1):
+ self.name = name
+ self.max = int(max)
+
+
+class SemaphoreHandler(object):
+ log = logging.getLogger("zuul.SemaphoreHandler")
+
+ def __init__(self):
+ self.semaphores = {}
+
+ def acquire(self, item, job):
+ if not job.semaphore:
+ return True
+
+ semaphore_key = job.semaphore
+
+ m = self.semaphores.get(semaphore_key)
+ if not m:
+ # The semaphore is not held, acquire it
+ self._acquire(semaphore_key, item, job.name)
+ return True
+ if (item, job.name) in m:
+ # This item already holds the semaphore
+ return True
+
+ # semaphore is there, check max
+ if len(m) < self._max_count(item, job.semaphore):
+ self._acquire(semaphore_key, item, job.name)
+ return True
+
+ return False
+
+ def release(self, item, job):
+ if not job.semaphore:
+ return
+
+ semaphore_key = job.semaphore
+
+ m = self.semaphores.get(semaphore_key)
+ if not m:
+ # The semaphore is not held, nothing to do
+ self.log.error("Semaphore can not be released for %s "
+ "because the semaphore is not held" %
+ item)
+ return
+ if (item, job.name) in m:
+ # This item is a holder of the semaphore
+ self._release(semaphore_key, item, job.name)
+ return
+ self.log.error("Semaphore can not be released for %s "
+ "which does not hold it" % item)
+
+ def _acquire(self, semaphore_key, item, job_name):
+ self.log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
+ .format(semaphore=semaphore_key,
+ job=job_name,
+ item=item))
+ if semaphore_key not in self.semaphores:
+ self.semaphores[semaphore_key] = []
+ self.semaphores[semaphore_key].append((item, job_name))
+
+ def _release(self, semaphore_key, item, job_name):
+ self.log.debug("Semaphore release {semaphore}: job {job}, item {item}"
+ .format(semaphore=semaphore_key,
+ job=job_name,
+ item=item))
+ sem_item = (item, job_name)
+ if sem_item in self.semaphores[semaphore_key]:
+ self.semaphores[semaphore_key].remove(sem_item)
+
+ # cleanup if there is no user of the semaphore anymore
+ if len(self.semaphores[semaphore_key]) == 0:
+ del self.semaphores[semaphore_key]
+
+ @staticmethod
+ def _max_count(item, semaphore_name):
+ if not item.current_build_set.layout:
+ # This should not occur as the layout of the item must already be
+ # built when acquiring or releasing a semaphore for a job.
+ raise Exception("Item {} has no layout".format(item))
+
+ # find the right semaphore
+ default_semaphore = Semaphore(semaphore_name, 1)
+ semaphores = item.current_build_set.layout.semaphores
+ return semaphores.get(semaphore_name, default_semaphore).max
+
+
class Tenant(object):
def __init__(self, name):
self.name = name
@@ -2375,6 +2477,8 @@
# A mapping of source -> {config_repos: {}, project_repos: {}}
self.sources = {}
+ self.semaphore_handler = SemaphoreHandler()
+
def addConfigRepo(self, source, project):
sd = self.sources.setdefault(source.name,
{'config_repos': {},