Re-order executor/job classes
There should be no code changes in this commit, only a re-ordering
of the classes involved to reduce the diff in a subsequent change.
Change-Id: I0fc4287b6fadfaea02250bc5c1f57eba7e65f450
diff --git a/tests/base.py b/tests/base.py
index b029f0b..028a194 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1361,6 +1361,63 @@
return repos
+class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
+ def doMergeChanges(self, merger, items, repo_state):
+ # Get a merger in order to update the repos involved in this job.
+ commit = super(RecordingAnsibleJob, self).doMergeChanges(
+ merger, items, repo_state)
+ if not commit: # merge conflict
+ self.recordResult('MERGER_FAILURE')
+ return commit
+
+ def recordResult(self, result):
+ build = self.executor_server.job_builds[self.job.unique]
+ self.executor_server.lock.acquire()
+ self.executor_server.build_history.append(
+ BuildHistory(name=build.name, result=result, changes=build.changes,
+ node=build.node, uuid=build.unique,
+ ref=build.parameters['zuul']['ref'],
+ parameters=build.parameters, jobdir=build.jobdir,
+ pipeline=build.parameters['zuul']['pipeline'])
+ )
+ self.executor_server.running_builds.remove(build)
+ del self.executor_server.job_builds[self.job.unique]
+ self.executor_server.lock.release()
+
+ def runPlaybooks(self, args):
+ build = self.executor_server.job_builds[self.job.unique]
+ build.jobdir = self.jobdir
+
+ result = super(RecordingAnsibleJob, self).runPlaybooks(args)
+ self.recordResult(result)
+ return result
+
+ def runAnsible(self, cmd, timeout, playbook):
+ build = self.executor_server.job_builds[self.job.unique]
+
+ if self.executor_server._run_ansible:
+ result = super(RecordingAnsibleJob, self).runAnsible(
+ cmd, timeout, playbook)
+ else:
+ if playbook.path:
+ result = build.run()
+ else:
+ result = (self.RESULT_NORMAL, 0)
+ return result
+
+ def getHostList(self, args):
+ self.log.debug("hostlist")
+ hosts = super(RecordingAnsibleJob, self).getHostList(args)
+ for host in hosts:
+ host['host_vars']['ansible_connection'] = 'local'
+
+ hosts.append(dict(
+ name='localhost',
+ host_vars=dict(ansible_connection='local'),
+ host_keys=[]))
+ return hosts
+
+
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests.
@@ -1445,63 +1502,6 @@
super(RecordingExecutorServer, self).stop()
-class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
- def doMergeChanges(self, merger, items, repo_state):
- # Get a merger in order to update the repos involved in this job.
- commit = super(RecordingAnsibleJob, self).doMergeChanges(
- merger, items, repo_state)
- if not commit: # merge conflict
- self.recordResult('MERGER_FAILURE')
- return commit
-
- def recordResult(self, result):
- build = self.executor_server.job_builds[self.job.unique]
- self.executor_server.lock.acquire()
- self.executor_server.build_history.append(
- BuildHistory(name=build.name, result=result, changes=build.changes,
- node=build.node, uuid=build.unique,
- ref=build.parameters['zuul']['ref'],
- parameters=build.parameters, jobdir=build.jobdir,
- pipeline=build.parameters['zuul']['pipeline'])
- )
- self.executor_server.running_builds.remove(build)
- del self.executor_server.job_builds[self.job.unique]
- self.executor_server.lock.release()
-
- def runPlaybooks(self, args):
- build = self.executor_server.job_builds[self.job.unique]
- build.jobdir = self.jobdir
-
- result = super(RecordingAnsibleJob, self).runPlaybooks(args)
- self.recordResult(result)
- return result
-
- def runAnsible(self, cmd, timeout, playbook):
- build = self.executor_server.job_builds[self.job.unique]
-
- if self.executor_server._run_ansible:
- result = super(RecordingAnsibleJob, self).runAnsible(
- cmd, timeout, playbook)
- else:
- if playbook.path:
- result = build.run()
- else:
- result = (self.RESULT_NORMAL, 0)
- return result
-
- def getHostList(self, args):
- self.log.debug("hostlist")
- hosts = super(RecordingAnsibleJob, self).getHostList(args)
- for host in hosts:
- host['host_vars']['ansible_connection'] = 'local'
-
- hosts.append(dict(
- name='localhost',
- host_vars=dict(ansible_connection='local'),
- host_keys=[]))
- return hosts
-
-
class FakeGearmanServer(gear.Server):
"""A Gearman server for use in tests.
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index d739c18..670a420 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -500,412 +500,6 @@
return inventory
-class ExecutorMergeWorker(gear.TextWorker):
- def __init__(self, executor_server, *args, **kw):
- self.zuul_executor_server = executor_server
- super(ExecutorMergeWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- # Wait until the update queue is empty before responding
- while self.zuul_executor_server.update_queue.qsize():
- time.sleep(1)
-
- with self.zuul_executor_server.merger_lock:
- super(ExecutorMergeWorker, self).handleNoop(packet)
-
-
-class ExecutorExecuteWorker(gear.TextWorker):
- def __init__(self, executor_server, *args, **kw):
- self.zuul_executor_server = executor_server
- super(ExecutorExecuteWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- # Delay our response to running a new job based on the number
- # of jobs we're currently running, in an attempt to spread
- # load evenly among executors.
- workers = len(self.zuul_executor_server.job_workers)
- delay = (workers ** 2) / 1000.0
- time.sleep(delay)
- return super(ExecutorExecuteWorker, self).handleNoop(packet)
-
-
-class ExecutorServer(object):
- log = logging.getLogger("zuul.ExecutorServer")
-
- def __init__(self, config, connections={}, jobdir_root=None,
- keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
- self.config = config
- self.keep_jobdir = keep_jobdir
- self.jobdir_root = jobdir_root
- # TODOv3(mordred): make the executor name more unique --
- # perhaps hostname+pid.
- self.hostname = socket.gethostname()
- self.log_streaming_port = log_streaming_port
- self.merger_lock = threading.Lock()
- self.verbose = False
- self.command_map = dict(
- stop=self.stop,
- pause=self.pause,
- unpause=self.unpause,
- graceful=self.graceful,
- verbose=self.verboseOn,
- unverbose=self.verboseOff,
- keep=self.keep,
- nokeep=self.nokeep,
- )
-
- self.merge_root = get_default(self.config, 'executor', 'git_dir',
- '/var/lib/zuul/executor-git')
- self.default_username = get_default(self.config, 'executor',
- 'default_username', 'zuul')
- self.disk_limit_per_job = int(get_default(self.config, 'executor',
- 'disk_limit_per_job', 250))
- self.merge_email = get_default(self.config, 'merger', 'git_user_email')
- self.merge_name = get_default(self.config, 'merger', 'git_user_name')
- self.merge_speed_limit = get_default(
- config, 'merger', 'git_http_low_speed_limit', '1000')
- self.merge_speed_time = get_default(
- config, 'merger', 'git_http_low_speed_time', '30')
- execution_wrapper_name = get_default(self.config, 'executor',
- 'execution_wrapper', 'bubblewrap')
- load_multiplier = float(get_default(self.config, 'executor',
- 'load_multiplier', '2.5'))
- self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
- self.accepting_work = False
- self.execution_wrapper = connections.drivers[execution_wrapper_name]
-
- self.connections = connections
- # This merger and its git repos are used to maintain
- # up-to-date copies of all the repos that are used by jobs, as
- # well as to support the merger:cat functon to supply
- # configuration information to Zuul when it starts.
- self.merger = self._getMerger(self.merge_root)
- self.update_queue = DeduplicateQueue()
-
- state_dir = get_default(self.config, 'executor', 'state_dir',
- '/var/lib/zuul', expand_user=True)
- path = os.path.join(state_dir, 'executor.socket')
- self.command_socket = commandsocket.CommandSocket(path)
- ansible_dir = os.path.join(state_dir, 'ansible')
- self.ansible_dir = ansible_dir
- if os.path.exists(ansible_dir):
- shutil.rmtree(ansible_dir)
-
- zuul_dir = os.path.join(ansible_dir, 'zuul')
- plugin_dir = os.path.join(zuul_dir, 'ansible')
-
- os.makedirs(plugin_dir, mode=0o0755)
-
- self.library_dir = os.path.join(plugin_dir, 'library')
- self.action_dir = os.path.join(plugin_dir, 'action')
- self.callback_dir = os.path.join(plugin_dir, 'callback')
- self.lookup_dir = os.path.join(plugin_dir, 'lookup')
- self.filter_dir = os.path.join(plugin_dir, 'filter')
-
- _copy_ansible_files(zuul.ansible, plugin_dir)
-
- # We're copying zuul.ansible.* into a directory we are going
- # to add to pythonpath, so our plugins can "import
- # zuul.ansible". But we're not installing all of zuul, so
- # create a __init__.py file for the stub "zuul" module.
- with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
- pass
-
- self.job_workers = {}
- self.disk_accountant = DiskAccountant(self.jobdir_root,
- self.disk_limit_per_job,
- self.stopJobDiskFull,
- self.merge_root)
-
- def _getMerger(self, root, logger=None):
- if root != self.merge_root:
- cache_root = self.merge_root
- else:
- cache_root = None
- return zuul.merger.merger.Merger(
- root, self.connections, self.merge_email, self.merge_name,
- self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
-
- def start(self):
- self._running = True
- self._command_running = True
- server = self.config.get('gearman', 'server')
- port = get_default(self.config, 'gearman', 'port', 4730)
- ssl_key = get_default(self.config, 'gearman', 'ssl_key')
- ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
- ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
- self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
- self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.executor_worker = ExecutorExecuteWorker(
- self, 'Zuul Executor Server')
- self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.log.debug("Waiting for server")
- self.merger_worker.waitForServer()
- self.executor_worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- self.log.debug("Starting command processor")
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
- self.command_thread.daemon = True
- self.command_thread.start()
-
- self.log.debug("Starting worker")
- self.update_thread = threading.Thread(target=self._updateLoop)
- self.update_thread.daemon = True
- self.update_thread.start()
- self.merger_thread = threading.Thread(target=self.run_merger)
- self.merger_thread.daemon = True
- self.merger_thread.start()
- self.executor_thread = threading.Thread(target=self.run_executor)
- self.executor_thread.daemon = True
- self.executor_thread.start()
- self.governor_stop_event = threading.Event()
- self.governor_thread = threading.Thread(target=self.run_governor)
- self.governor_thread.daemon = True
- self.governor_thread.start()
- self.disk_accountant.start()
-
- def register(self):
- self.register_work()
- self.executor_worker.registerFunction("executor:stop:%s" %
- self.hostname)
- self.merger_worker.registerFunction("merger:merge")
- self.merger_worker.registerFunction("merger:cat")
- self.merger_worker.registerFunction("merger:refstate")
-
- def register_work(self):
- self.accepting_work = True
- self.executor_worker.registerFunction("executor:execute")
-
- def unregister_work(self):
- self.accepting_work = False
- self.executor_worker.unregisterFunction("executor:execute")
-
- def stop(self):
- self.log.debug("Stopping")
- self.disk_accountant.stop()
- self.governor_stop_event.set()
- self._running = False
- self._command_running = False
- self.command_socket.stop()
- self.update_queue.put(None)
-
- for job_worker in list(self.job_workers.values()):
- try:
- job_worker.stop()
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
- self.merger_worker.shutdown()
- self.executor_worker.shutdown()
- self.log.debug("Stopped")
-
- def pause(self):
- # TODOv3: implement
- pass
-
- def unpause(self):
- # TODOv3: implement
- pass
-
- def graceful(self):
- # TODOv3: implement
- pass
-
- def verboseOn(self):
- self.verbose = True
-
- def verboseOff(self):
- self.verbose = False
-
- def keep(self):
- self.keep_jobdir = True
-
- def nokeep(self):
- self.keep_jobdir = False
-
- def join(self):
- self.update_thread.join()
- self.merger_thread.join()
- self.executor_thread.join()
- self.governor_thread.join()
-
- def runCommand(self):
- while self._command_running:
- try:
- command = self.command_socket.get().decode('utf8')
- if command != '_stop':
- self.command_map[command]()
- except Exception:
- self.log.exception("Exception while processing command")
-
- def _updateLoop(self):
- while self._running:
- try:
- self._innerUpdateLoop()
- except:
- self.log.exception("Exception in update thread:")
-
- def _innerUpdateLoop(self):
- # Inside of a loop that keeps the main repositories up to date
- task = self.update_queue.get()
- if task is None:
- # We are asked to stop
- return
- with self.merger_lock:
- self.log.info("Updating repo %s/%s" % (
- task.connection_name, task.project_name))
- self.merger.updateRepo(task.connection_name, task.project_name)
- self.log.debug("Finished updating repo %s/%s" %
- (task.connection_name, task.project_name))
- task.setComplete()
-
- def update(self, connection_name, project_name):
- # Update a repository in the main merger
- task = UpdateTask(connection_name, project_name)
- task = self.update_queue.put(task)
- return task
-
- def run_merger(self):
- self.log.debug("Starting merger listener")
- while self._running:
- try:
- job = self.merger_worker.getJob()
- try:
- if job.name == 'merger:cat':
- self.log.debug("Got cat job: %s" % job.unique)
- self.cat(job)
- elif job.name == 'merger:merge':
- self.log.debug("Got merge job: %s" % job.unique)
- self.merge(job)
- elif job.name == 'merger:refstate':
- self.log.debug("Got refstate job: %s" % job.unique)
- self.refstate(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def run_executor(self):
- self.log.debug("Starting executor listener")
- while self._running:
- try:
- job = self.executor_worker.getJob()
- try:
- if job.name == 'executor:execute':
- self.log.debug("Got execute job: %s" % job.unique)
- self.executeJob(job)
- elif job.name.startswith('executor:stop'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def run_governor(self):
- while not self.governor_stop_event.wait(30):
- self.manageLoad()
-
- def executeJob(self, job):
- self.job_workers[job.unique] = AnsibleJob(self, job)
- self.job_workers[job.unique].run()
-
- def manageLoad(self):
- ''' Apply some heuristics to decide whether or not we should
- be askign for more jobs '''
- load_avg = os.getloadavg()[0]
- if self.accepting_work:
- # Don't unregister if we don't have any active jobs.
- if load_avg > self.max_load_avg and self.job_workers:
- self.log.info(
- "Unregistering due to high system load {} > {}".format(
- load_avg, self.max_load_avg))
- self.unregister_work()
- elif load_avg <= self.max_load_avg:
- self.log.info(
- "Re-registering as load is within limits {} <= {}".format(
- load_avg, self.max_load_avg))
- self.register_work()
-
- def finishJob(self, unique):
- del(self.job_workers[unique])
-
- def stopJobDiskFull(self, jobdir):
- unique = os.path.basename(jobdir)
- self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
-
- def stopJob(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Stop job with arguments: %s" % (args,))
- unique = args['uuid']
- self.stopJobByUnique(unique)
- finally:
- job.sendWorkComplete()
-
- def stopJobByUnique(self, unique, reason=None):
- job_worker = self.job_workers.get(unique)
- if not job_worker:
- self.log.debug("Unable to find worker for job %s" % (unique,))
- return
- try:
- job_worker.stop(reason)
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
-
- def cat(self, job):
- args = json.loads(job.arguments)
- task = self.update(args['connection'], args['project'])
- task.wait()
- with self.merger_lock:
- files = self.merger.getFiles(args['connection'], args['project'],
- args['branch'], args['files'],
- args.get('dirs', []))
- result = dict(updated=True,
- files=files)
- job.sendWorkComplete(json.dumps(result))
-
- def refstate(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- success, repo_state = self.merger.getRepoState(args['items'])
- result = dict(updated=success,
- repo_state=repo_state)
- job.sendWorkComplete(json.dumps(result))
-
- def merge(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- ret = self.merger.mergeChanges(args['items'], args.get('files'),
- args.get('dirs', []),
- args.get('repo_state'))
- result = dict(merged=(ret is not None))
- if ret is None:
- result['commit'] = result['files'] = result['repo_state'] = None
- else:
- (result['commit'], result['files'], result['repo_state'],
- recent) = ret
- job.sendWorkComplete(json.dumps(result))
-
-
class AnsibleJobLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
@@ -1856,3 +1450,409 @@
self.emitPlaybookBanner(playbook, 'END', phase, result=result)
return result, code
+
+
+class ExecutorMergeWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Wait until the update queue is empty before responding
+ while self.zuul_executor_server.update_queue.qsize():
+ time.sleep(1)
+
+ with self.zuul_executor_server.merger_lock:
+ super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
+class ExecutorExecuteWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorExecuteWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Delay our response to running a new job based on the number
+ # of jobs we're currently running, in an attempt to spread
+ # load evenly among executors.
+ workers = len(self.zuul_executor_server.job_workers)
+ delay = (workers ** 2) / 1000.0
+ time.sleep(delay)
+ return super(ExecutorExecuteWorker, self).handleNoop(packet)
+
+
+class ExecutorServer(object):
+ log = logging.getLogger("zuul.ExecutorServer")
+
+ def __init__(self, config, connections={}, jobdir_root=None,
+ keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
+ self.config = config
+ self.keep_jobdir = keep_jobdir
+ self.jobdir_root = jobdir_root
+ # TODOv3(mordred): make the executor name more unique --
+ # perhaps hostname+pid.
+ self.hostname = socket.gethostname()
+ self.log_streaming_port = log_streaming_port
+ self.merger_lock = threading.Lock()
+ self.verbose = False
+ self.command_map = dict(
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ graceful=self.graceful,
+ verbose=self.verboseOn,
+ unverbose=self.verboseOff,
+ keep=self.keep,
+ nokeep=self.nokeep,
+ )
+
+ self.merge_root = get_default(self.config, 'executor', 'git_dir',
+ '/var/lib/zuul/executor-git')
+ self.default_username = get_default(self.config, 'executor',
+ 'default_username', 'zuul')
+ self.disk_limit_per_job = int(get_default(self.config, 'executor',
+ 'disk_limit_per_job', 250))
+ self.merge_email = get_default(self.config, 'merger', 'git_user_email')
+ self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+ self.merge_speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ self.merge_speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ execution_wrapper_name = get_default(self.config, 'executor',
+ 'execution_wrapper', 'bubblewrap')
+ load_multiplier = float(get_default(self.config, 'executor',
+ 'load_multiplier', '2.5'))
+ self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.accepting_work = False
+ self.execution_wrapper = connections.drivers[execution_wrapper_name]
+
+ self.connections = connections
+ # This merger and its git repos are used to maintain
+ # up-to-date copies of all the repos that are used by jobs, as
+ # well as to support the merger:cat functon to supply
+ # configuration information to Zuul when it starts.
+ self.merger = self._getMerger(self.merge_root)
+ self.update_queue = DeduplicateQueue()
+
+ state_dir = get_default(self.config, 'executor', 'state_dir',
+ '/var/lib/zuul', expand_user=True)
+ path = os.path.join(state_dir, 'executor.socket')
+ self.command_socket = commandsocket.CommandSocket(path)
+ ansible_dir = os.path.join(state_dir, 'ansible')
+ self.ansible_dir = ansible_dir
+ if os.path.exists(ansible_dir):
+ shutil.rmtree(ansible_dir)
+
+ zuul_dir = os.path.join(ansible_dir, 'zuul')
+ plugin_dir = os.path.join(zuul_dir, 'ansible')
+
+ os.makedirs(plugin_dir, mode=0o0755)
+
+ self.library_dir = os.path.join(plugin_dir, 'library')
+ self.action_dir = os.path.join(plugin_dir, 'action')
+ self.callback_dir = os.path.join(plugin_dir, 'callback')
+ self.lookup_dir = os.path.join(plugin_dir, 'lookup')
+ self.filter_dir = os.path.join(plugin_dir, 'filter')
+
+ _copy_ansible_files(zuul.ansible, plugin_dir)
+
+ # We're copying zuul.ansible.* into a directory we are going
+ # to add to pythonpath, so our plugins can "import
+ # zuul.ansible". But we're not installing all of zuul, so
+ # create a __init__.py file for the stub "zuul" module.
+ with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
+ pass
+
+ self.job_workers = {}
+ self.disk_accountant = DiskAccountant(self.jobdir_root,
+ self.disk_limit_per_job,
+ self.stopJobDiskFull,
+ self.merge_root)
+
+ def _getMerger(self, root, logger=None):
+ if root != self.merge_root:
+ cache_root = self.merge_root
+ else:
+ cache_root = None
+ return zuul.merger.merger.Merger(
+ root, self.connections, self.merge_email, self.merge_name,
+ self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
+
+ def start(self):
+ self._running = True
+ self._command_running = True
+ server = self.config.get('gearman', 'server')
+ port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+ self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+ self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.executor_worker = ExecutorExecuteWorker(
+ self, 'Zuul Executor Server')
+ self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.log.debug("Waiting for server")
+ self.merger_worker.waitForServer()
+ self.executor_worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand)
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
+ self.log.debug("Starting worker")
+ self.update_thread = threading.Thread(target=self._updateLoop)
+ self.update_thread.daemon = True
+ self.update_thread.start()
+ self.merger_thread = threading.Thread(target=self.run_merger)
+ self.merger_thread.daemon = True
+ self.merger_thread.start()
+ self.executor_thread = threading.Thread(target=self.run_executor)
+ self.executor_thread.daemon = True
+ self.executor_thread.start()
+ self.governor_stop_event = threading.Event()
+ self.governor_thread = threading.Thread(target=self.run_governor)
+ self.governor_thread.daemon = True
+ self.governor_thread.start()
+ self.disk_accountant.start()
+
+ def register(self):
+ self.register_work()
+ self.executor_worker.registerFunction("executor:stop:%s" %
+ self.hostname)
+ self.merger_worker.registerFunction("merger:merge")
+ self.merger_worker.registerFunction("merger:cat")
+ self.merger_worker.registerFunction("merger:refstate")
+
+ def register_work(self):
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
+
+ def unregister_work(self):
+ self.accepting_work = False
+ self.executor_worker.unregisterFunction("executor:execute")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self.disk_accountant.stop()
+ self.governor_stop_event.set()
+ self._running = False
+ self._command_running = False
+ self.command_socket.stop()
+ self.update_queue.put(None)
+
+ for job_worker in list(self.job_workers.values()):
+ try:
+ job_worker.stop()
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+ self.merger_worker.shutdown()
+ self.executor_worker.shutdown()
+ self.log.debug("Stopped")
+
+ def pause(self):
+ # TODOv3: implement
+ pass
+
+ def unpause(self):
+ # TODOv3: implement
+ pass
+
+ def graceful(self):
+ # TODOv3: implement
+ pass
+
+ def verboseOn(self):
+ self.verbose = True
+
+ def verboseOff(self):
+ self.verbose = False
+
+ def keep(self):
+ self.keep_jobdir = True
+
+ def nokeep(self):
+ self.keep_jobdir = False
+
+ def join(self):
+ self.update_thread.join()
+ self.merger_thread.join()
+ self.executor_thread.join()
+ self.governor_thread.join()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
+
+ def _updateLoop(self):
+ while self._running:
+ try:
+ self._innerUpdateLoop()
+ except:
+ self.log.exception("Exception in update thread:")
+
+ def _innerUpdateLoop(self):
+ # Inside of a loop that keeps the main repositories up to date
+ task = self.update_queue.get()
+ if task is None:
+ # We are asked to stop
+ return
+ with self.merger_lock:
+ self.log.info("Updating repo %s/%s" % (
+ task.connection_name, task.project_name))
+ self.merger.updateRepo(task.connection_name, task.project_name)
+ self.log.debug("Finished updating repo %s/%s" %
+ (task.connection_name, task.project_name))
+ task.setComplete()
+
+ def update(self, connection_name, project_name):
+ # Update a repository in the main merger
+ task = UpdateTask(connection_name, project_name)
+ task = self.update_queue.put(task)
+ return task
+
+ def run_merger(self):
+ self.log.debug("Starting merger listener")
+ while self._running:
+ try:
+ job = self.merger_worker.getJob()
+ try:
+ if job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ elif job.name == 'merger:merge':
+ self.log.debug("Got merge job: %s" % job.unique)
+ self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def run_executor(self):
+ self.log.debug("Starting executor listener")
+ while self._running:
+ try:
+ job = self.executor_worker.getJob()
+ try:
+ if job.name == 'executor:execute':
+ self.log.debug("Got execute job: %s" % job.unique)
+ self.executeJob(job)
+ elif job.name.startswith('executor:stop'):
+ self.log.debug("Got stop job: %s" % job.unique)
+ self.stopJob(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ self.manageLoad()
+
+ def executeJob(self, job):
+ self.job_workers[job.unique] = AnsibleJob(self, job)
+ self.job_workers[job.unique].run()
+
+ def manageLoad(self):
+ ''' Apply some heuristics to decide whether or not we should
+ be askign for more jobs '''
+ load_avg = os.getloadavg()[0]
+ if self.accepting_work:
+ # Don't unregister if we don't have any active jobs.
+ if load_avg > self.max_load_avg and self.job_workers:
+ self.log.info(
+ "Unregistering due to high system load {} > {}".format(
+ load_avg, self.max_load_avg))
+ self.unregister_work()
+ elif load_avg <= self.max_load_avg:
+ self.log.info(
+ "Re-registering as load is within limits {} <= {}".format(
+ load_avg, self.max_load_avg))
+ self.register_work()
+
+ def finishJob(self, unique):
+ del(self.job_workers[unique])
+
+ def stopJobDiskFull(self, jobdir):
+ unique = os.path.basename(jobdir)
+ self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
+
+ def stopJob(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Stop job with arguments: %s" % (args,))
+ unique = args['uuid']
+ self.stopJobByUnique(unique)
+ finally:
+ job.sendWorkComplete()
+
+ def stopJobByUnique(self, unique, reason=None):
+ job_worker = self.job_workers.get(unique)
+ if not job_worker:
+ self.log.debug("Unable to find worker for job %s" % (unique,))
+ return
+ try:
+ job_worker.stop(reason)
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+
+ def cat(self, job):
+ args = json.loads(job.arguments)
+ task = self.update(args['connection'], args['project'])
+ task.wait()
+ with self.merger_lock:
+ files = self.merger.getFiles(args['connection'], args['project'],
+ args['branch'], args['files'],
+ args.get('dirs', []))
+ result = dict(updated=True,
+ files=files)
+ job.sendWorkComplete(json.dumps(result))
+
+ def refstate(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ success, repo_state = self.merger.getRepoState(args['items'])
+ result = dict(updated=success,
+ repo_state=repo_state)
+ job.sendWorkComplete(json.dumps(result))
+
+ def merge(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ ret = self.merger.mergeChanges(args['items'], args.get('files'),
+ args.get('dirs', []),
+ args.get('repo_state'))
+ result = dict(merged=(ret is not None))
+ if ret is None:
+ result['commit'] = result['files'] = result['repo_state'] = None
+ else:
+ (result['commit'], result['files'], result['repo_state'],
+ recent) = ret
+ job.sendWorkComplete(json.dumps(result))