Mutex repo updates and merge ops in executor

The executor maintains a set of all of the repos known to Zuul.

This is primarily for the purpose of having a local cache from which
to clone the repos which are used in jobs.  To that end, before each
job starts, it submits a request to a queue to update all the repos
in the job.  This makes sure they are up to date before being cloned
for the job.

Since we have that set of repos handy, we can also use them to perform
merge operations.  In other words, the executor con also act as a
merger.  This can be useful under heavy load, or in the case of a
very simple Zuul installation which has no other merger.

However, merge and update operations must not run at the same time, as
simultaneous access to the git repo may cause errors.  To that end,
set up a mutex around merge jobs and update tasks.

Since the primary purpose of the repos is to perform update tasks,
create a special gearman worker for the merge tasks so that we
can de-prioritize them in the executor.  In this, we delay response
to a NOOP packet until the update queue is empty.  That means that
if the gearman server notifies us that a merge job is ready, we
won't grab it unless our merger is otherwise idle (we can still
race here and get an update task between NOOP and GRAB_JOB, but
there's little we can do about that, and the worst case is that
we briefly delay a merge job).

Since the executor jobs are now in a different worker, their NOOP/
GRAB_JOB cycle is unimpeded, even if the merge worker is waiting.

Change-Id: Icf3663b1a2ce5309e496b1106d5adee6579e37c7
diff --git a/tests/base.py b/tests/base.py
index 0aac6bd..ea4539e 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2301,7 +2301,8 @@
             # It hasn't been reported yet.
             return False
         # Make sure that none of the worker connections are in GRAB_WAIT
-        for connection in self.executor_server.worker.active_connections:
+        worker = self.executor_server.executor_worker
+        for connection in worker.active_connections:
             if connection.state == 'GRAB_WAIT':
                 return False
         return True
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 0f8cd0f..a291e52 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -311,6 +311,20 @@
                 shutil.copy(os.path.join(library_path, fn), target_dir)
 
 
+class ExecutorMergeWorker(gear.TextWorker):
+    def __init__(self, executor_server, *args, **kw):
+        self.zuul_executor_server = executor_server
+        super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        # Wait until the update queue is empty before responding
+        while self.zuul_executor_server.update_queue.qsize():
+            time.sleep(1)
+
+        with self.zuul_executor_server.merger_lock:
+            super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
 class ExecutorServer(object):
     log = logging.getLogger("zuul.ExecutorServer")
 
@@ -323,6 +337,7 @@
         # perhaps hostname+pid.
         self.hostname = socket.gethostname()
         self.zuul_url = config.get('merger', 'zuul_url')
+        self.merger_lock = threading.Lock()
         self.command_map = dict(
             stop=self.stop,
             pause=self.pause,
@@ -406,10 +421,13 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.TextWorker('Zuul Executor Server')
-        self.worker.addServer(server, port)
+        self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+        self.merger_worker.addServer(server, port)
+        self.executor_worker = gear.TextWorker('Zuul Executor Server')
+        self.executor_worker.addServer(server, port)
         self.log.debug("Waiting for server")
-        self.worker.waitForServer()
+        self.merger_worker.waitForServer()
+        self.executor_worker.waitForServer()
         self.log.debug("Registering")
         self.register()
 
@@ -423,15 +441,19 @@
         self.update_thread = threading.Thread(target=self._updateLoop)
         self.update_thread.daemon = True
         self.update_thread.start()
-        self.thread = threading.Thread(target=self.run)
-        self.thread.daemon = True
-        self.thread.start()
+        self.merger_thread = threading.Thread(target=self.run_merger)
+        self.merger_thread.daemon = True
+        self.merger_thread.start()
+        self.executor_thread = threading.Thread(target=self.run_executor)
+        self.executor_thread.daemon = True
+        self.executor_thread.start()
 
     def register(self):
-        self.worker.registerFunction("executor:execute")
-        self.worker.registerFunction("executor:stop:%s" % self.hostname)
-        self.worker.registerFunction("merger:merge")
-        self.worker.registerFunction("merger:cat")
+        self.executor_worker.registerFunction("executor:execute")
+        self.executor_worker.registerFunction("executor:stop:%s" %
+                                              self.hostname)
+        self.merger_worker.registerFunction("merger:merge")
+        self.merger_worker.registerFunction("merger:cat")
 
     def stop(self):
         self.log.debug("Stopping")
@@ -446,7 +468,8 @@
             except Exception:
                 self.log.exception("Exception sending stop command "
                                    "to worker:")
-        self.worker.shutdown()
+        self.merger_worker.shutdown()
+        self.executor_worker.shutdown()
         self.log.debug("Stopped")
 
     def pause(self):
@@ -471,7 +494,8 @@
 
     def join(self):
         self.update_thread.join()
-        self.thread.join()
+        self.merger_thread.join()
+        self.executor_thread.join()
 
     def runCommand(self):
         while self._command_running:
@@ -495,11 +519,12 @@
         if task is None:
             # We are asked to stop
             return
-        self.log.info("Updating repo %s/%s" % (
-            task.connection_name, task.project_name))
-        self.merger.updateRepo(task.connection_name, task.project_name)
-        self.log.debug("Finished updating repo %s/%s" %
-                       (task.connection_name, task.project_name))
+        with self.merger_lock:
+            self.log.info("Updating repo %s/%s" % (
+                task.connection_name, task.project_name))
+            self.merger.updateRepo(task.connection_name, task.project_name)
+            self.log.debug("Finished updating repo %s/%s" %
+                           (task.connection_name, task.project_name))
         task.setComplete()
 
     def update(self, connection_name, project_name):
@@ -508,11 +533,35 @@
         task = self.update_queue.put(task)
         return task
 
-    def run(self):
+    def run_merger(self):
+        self.log.debug("Starting merger listener")
+        while self._running:
+            try:
+                job = self.merger_worker.getJob()
+                try:
+                    if job.name == 'merger:cat':
+                        self.log.debug("Got cat job: %s" % job.unique)
+                        self.cat(job)
+                    elif job.name == 'merger:merge':
+                        self.log.debug("Got merge job: %s" % job.unique)
+                        self.merge(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(
+                        traceback.format_exc().encode('utf8'))
+            except gear.InterruptedError:
+                pass
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def run_executor(self):
         self.log.debug("Starting executor listener")
         while self._running:
             try:
-                job = self.worker.getJob()
+                job = self.executor_worker.getJob()
                 try:
                     if job.name == 'executor:execute':
                         self.log.debug("Got execute job: %s" % job.unique)
@@ -520,12 +569,6 @@
                     elif job.name.startswith('executor:stop'):
                         self.log.debug("Got stop job: %s" % job.unique)
                         self.stopJob(job)
-                    elif job.name == 'merger:cat':
-                        self.log.debug("Got cat job: %s" % job.unique)
-                        self.cat(job)
-                    elif job.name == 'merger:merge':
-                        self.log.debug("Got merge job: %s" % job.unique)
-                        self.merge(job)
                     else:
                         self.log.error("Unable to handle job %s" % job.name)
                         job.sendWorkFail()
@@ -566,8 +609,9 @@
         args = json.loads(job.arguments)
         task = self.update(args['connection'], args['project'])
         task.wait()
-        files = self.merger.getFiles(args['connection'], args['project'],
-                                     args['branch'], args['files'])
+        with self.merger_lock:
+            files = self.merger.getFiles(args['connection'], args['project'],
+                                         args['branch'], args['files'])
         result = dict(updated=True,
                       files=files,
                       zuul_url=self.zuul_url)
@@ -575,8 +619,9 @@
 
     def merge(self, job):
         args = json.loads(job.arguments)
-        ret = self.merger.mergeChanges(args['items'], args.get('files'),
-                                       args.get('repo_state'))
+        with self.merger_lock:
+            ret = self.merger.mergeChanges(args['items'], args.get('files'),
+                                           args.get('repo_state'))
         result = dict(merged=(ret is not None),
                       zuul_url=self.zuul_url)
         if ret is None: