Change mutex to counting semaphore

The mutex in zuul is great but is limited to run one job at the same
time. Some use cases like using a limited number floating licenses in
jobs cannot be handled with this. Thus this changes the mutex
functionality to a counting semaphore (which defaults to 1).

This is a port of Ida589e49bc6694f4ccc4c586e0d43b391b8c3ae4 to zuulv3
branch.

Change-Id: Icf4013a6215e2b10ca8e6309928b9e5881dda02c
diff --git a/zuul/model.py b/zuul/model.py
index d347071..0ce332f 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -14,6 +14,8 @@
 
 import abc
 import copy
+
+import logging
 import os
 import re
 import struct
@@ -760,7 +762,7 @@
             post_run=(),
             run=(),
             implied_run=(),
-            mutex=None,
+            semaphore=None,
             attempts=3,
             final=False,
             roles=frozenset(),
@@ -1369,7 +1371,7 @@
             return False
         return self.item_ahead.isHoldingFollowingChanges()
 
-    def findJobsToRun(self, mutex):
+    def findJobsToRun(self, semaphore_handler):
         torun = []
         if not self.live:
             return []
@@ -1408,9 +1410,9 @@
                     # The nodes for this job are not ready, skip
                     # it for now.
                     continue
-                if mutex.acquire(self, job):
-                    # If this job needs a mutex, either acquire it or make
-                    # sure that we have it before running the job.
+                if semaphore_handler.acquire(self, job):
+                    # If this job needs a semaphore, either acquire it or
+                    # make sure that we have it before running the job.
                     torun.append(job)
         return torun
 
@@ -2174,6 +2176,7 @@
         self.projects = {}
         self.nodesets = []
         self.secrets = []
+        self.semaphores = []
 
     def copy(self):
         r = UnparsedTenantConfig()
@@ -2183,6 +2186,7 @@
         r.projects = copy.deepcopy(self.projects)
         r.nodesets = copy.deepcopy(self.nodesets)
         r.secrets = copy.deepcopy(self.secrets)
+        r.semaphores = copy.deepcopy(self.semaphores)
         return r
 
     def extend(self, conf):
@@ -2194,6 +2198,7 @@
                 self.projects.setdefault(k, []).extend(v)
             self.nodesets.extend(conf.nodesets)
             self.secrets.extend(conf.secrets)
+            self.semaphores.extend(conf.semaphores)
             return
 
         if not isinstance(conf, list):
@@ -2224,6 +2229,8 @@
                 self.nodesets.append(value)
             elif key == 'secret':
                 self.secrets.append(value)
+            elif key == 'semaphore':
+                self.semaphores.append(value)
             else:
                 raise Exception("Configuration item `%s` not recognized "
                                 "(when parsing %s)" %
@@ -2247,6 +2254,7 @@
         self.jobs = {'noop': [Job('noop')]}
         self.nodesets = {}
         self.secrets = {}
+        self.semaphores = {}
 
     def getJob(self, name):
         if name in self.jobs:
@@ -2285,6 +2293,11 @@
             raise Exception("Secret %s already defined" % (secret.name,))
         self.secrets[secret.name] = secret
 
+    def addSemaphore(self, semaphore):
+        if semaphore.name in self.semaphores:
+            raise Exception("Semaphore %s already defined" % (semaphore.name,))
+        self.semaphores[semaphore.name] = semaphore
+
     def addPipeline(self, pipeline):
         self.pipelines[pipeline.name] = pipeline
 
@@ -2355,6 +2368,95 @@
         return ret
 
 
+class Semaphore(object):
+    def __init__(self, name, max=1):
+        self.name = name
+        self.max = int(max)
+
+
+class SemaphoreHandler(object):
+    log = logging.getLogger("zuul.SemaphoreHandler")
+
+    def __init__(self):
+        self.semaphores = {}
+
+    def acquire(self, item, job):
+        if not job.semaphore:
+            return True
+
+        semaphore_key = job.semaphore
+
+        m = self.semaphores.get(semaphore_key)
+        if not m:
+            # The semaphore is not held, acquire it
+            self._acquire(semaphore_key, item, job.name)
+            return True
+        if (item, job.name) in m:
+            # This item already holds the semaphore
+            return True
+
+        # semaphore is there, check max
+        if len(m) < self._max_count(item, job.semaphore):
+            self._acquire(semaphore_key, item, job.name)
+            return True
+
+        return False
+
+    def release(self, item, job):
+        if not job.semaphore:
+            return
+
+        semaphore_key = job.semaphore
+
+        m = self.semaphores.get(semaphore_key)
+        if not m:
+            # The semaphore is not held, nothing to do
+            self.log.error("Semaphore can not be released for %s "
+                           "because the semaphore is not held" %
+                           item)
+            return
+        if (item, job.name) in m:
+            # This item is a holder of the semaphore
+            self._release(semaphore_key, item, job.name)
+            return
+        self.log.error("Semaphore can not be released for %s "
+                       "which does not hold it" % item)
+
+    def _acquire(self, semaphore_key, item, job_name):
+        self.log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
+                       .format(semaphore=semaphore_key,
+                               job=job_name,
+                               item=item))
+        if semaphore_key not in self.semaphores:
+            self.semaphores[semaphore_key] = []
+        self.semaphores[semaphore_key].append((item, job_name))
+
+    def _release(self, semaphore_key, item, job_name):
+        self.log.debug("Semaphore release {semaphore}: job {job}, item {item}"
+                       .format(semaphore=semaphore_key,
+                               job=job_name,
+                               item=item))
+        sem_item = (item, job_name)
+        if sem_item in self.semaphores[semaphore_key]:
+            self.semaphores[semaphore_key].remove(sem_item)
+
+        # cleanup if there is no user of the semaphore anymore
+        if len(self.semaphores[semaphore_key]) == 0:
+            del self.semaphores[semaphore_key]
+
+    @staticmethod
+    def _max_count(item, semaphore_name):
+        if not item.current_build_set.layout:
+            # This should not occur as the layout of the item must already be
+            # built when acquiring or releasing a semaphore for a job.
+            raise Exception("Item {} has no layout".format(item))
+
+        # find the right semaphore
+        default_semaphore = Semaphore(semaphore_name, 1)
+        semaphores = item.current_build_set.layout.semaphores
+        return semaphores.get(semaphore_name, default_semaphore).max
+
+
 class Tenant(object):
     def __init__(self, name):
         self.name = name
@@ -2375,6 +2477,8 @@
         # A mapping of source -> {config_repos: {}, project_repos: {}}
         self.sources = {}
 
+        self.semaphore_handler = SemaphoreHandler()
+
     def addConfigRepo(self, source, project):
         sd = self.sources.setdefault(source.name,
                                      {'config_repos': {},