blob: 90c362ca586bd08f2bac05c20270d0b2f663dab5 [file] [log] [blame]
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import json
import logging
import os
import shutil
import signal
import socket
import subprocess
import tempfile
import threading
import time
import traceback
import gear
import yaml
import zuul.merger
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
class Watchdog(object):
def __init__(self, timeout, function, args):
self.timeout = timeout
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run)
self.thread.daemon = True
self.timed_out = None
def _run(self):
while self._running and time.time() < self.end:
time.sleep(10)
if self._running:
self.timed_out = True
self.function(*self.args)
self.timed_out = False
def start(self):
self._running = True
self.end = time.time() + self.timeout
self.thread.start()
def stop(self):
self._running = False
# TODOv3(mordred): put git repos in a hierarchy that includes source
# hostname, eg: git.openstack.org/openstack/nova. Also, configure
# sources to have an alias, so that the review.openstack.org source
# repos end up in git.openstack.org.
class JobDir(object):
def __init__(self, keep=False):
self.keep = keep
self.root = tempfile.mkdtemp()
self.git_root = os.path.join(self.root, 'git')
os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
self.config = os.path.join(self.ansible_root, 'ansible.cfg')
self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
def __enter__(self):
return self
def __exit__(self, etype, value, tb):
if not self.keep:
shutil.rmtree(self.root)
class UpdateTask(object):
def __init__(self, project, url):
self.project = project
self.url = url
self.event = threading.Event()
def __eq__(self, other):
if other.project == self.project:
return True
return False
def wait(self):
self.event.wait()
def setComplete(self):
self.event.set()
class DeduplicateQueue(object):
def __init__(self):
self.queue = collections.deque()
self.condition = threading.Condition()
def qsize(self):
return len(self.queue)
def put(self, item):
# Returns the original item if added, or an equivalent item if
# already enqueued.
self.condition.acquire()
ret = None
try:
for x in self.queue:
if item == x:
ret = x
if ret is None:
ret = item
self.queue.append(item)
self.condition.notify()
finally:
self.condition.release()
return ret
def get(self):
self.condition.acquire()
try:
while True:
try:
ret = self.queue.popleft()
return ret
except IndexError:
pass
self.condition.wait()
finally:
self.condition.release()
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
def __init__(self, config, connections={}, keep_jobdir=False):
self.config = config
self.keep_jobdir = keep_jobdir
# TODOv3(mordred): make the launcher name more unique --
# perhaps hostname+pid.
self.hostname = socket.gethostname()
self.zuul_url = config.get('merger', 'zuul_url')
self.command_map = dict(
stop=self.stop,
pause=self.pause,
unpause=self.unpause,
graceful=self.graceful,
verbose=self.verboseOn,
unverbose=self.verboseOff,
)
if self.config.has_option('merger', 'git_dir'):
self.merge_root = self.config.get('merger', 'git_dir')
else:
self.merge_root = '/var/lib/zuul/git'
if self.config.has_option('merger', 'git_user_email'):
self.merge_email = self.config.get('merger', 'git_user_email')
else:
self.merge_email = None
if self.config.has_option('merger', 'git_user_name'):
self.merge_name = self.config.get('merger', 'git_user_name')
else:
self.merge_name = None
if self.config.has_option('launcher', 'private_key_file'):
self.private_key_file = config.get('launcher', 'private_key_file')
else:
self.private_key_file = '~/.ssh/id_rsa'
self.connections = connections
self.merger = self._getMerger(self.merge_root)
self.update_queue = DeduplicateQueue()
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
shutil.copy(os.path.join(library_path, fn), self.library_dir)
def _getMerger(self, root):
return zuul.merger.merger.Merger(root, self.connections,
self.merge_email, self.merge_name)
def start(self):
self._running = True
self._command_running = True
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul Launch Server')
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(target=self.runCommand)
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting worker")
self.update_thread = threading.Thread(target=self._updateLoop)
self.update_thread.daemon = True
self.update_thread.start()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def register(self):
self.worker.registerFunction("launcher:launch")
self.worker.registerFunction("launcher:stop:%s" % self.hostname)
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:cat")
def stop(self):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self._command_running = False
self.command_socket.stop()
self.log.debug("Stopped")
def pause(self):
# TODOv3: implement
pass
def unpause(self):
# TODOv3: implement
pass
def graceful(self):
# TODOv3: implement
pass
def verboseOn(self):
# TODOv3: implement
pass
def verboseOff(self):
# TODOv3: implement
pass
def join(self):
self.update_thread.join()
self.thread.join()
def runCommand(self):
while self._command_running:
try:
command = self.command_socket.get()
self.command_map[command]()
except Exception:
self.log.exception("Exception while processing command")
def _updateLoop(self):
while self._running:
try:
self._innerUpdateLoop()
except:
self.log.exception("Exception in update thread:")
def _innerUpdateLoop(self):
# Inside of a loop that keeps the main repository up to date
task = self.update_queue.get()
self.log.info("Updating repo %s from %s" % (task.project, task.url))
self.merger.updateRepo(task.project, task.url)
self.log.debug("Finished updating repo %s from %s" %
(task.project, task.url))
task.setComplete()
def update(self, project, url):
task = UpdateTask(project, url)
task = self.update_queue.put(task)
return task
def run(self):
self.log.debug("Starting launch listener")
while self._running:
try:
job = self.worker.getJob()
try:
if job.name == 'launcher:launch':
self.log.debug("Got launch job: %s" % job.unique)
self.launchJob(job)
elif job.name.startswith('launcher:stop'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
elif job.name == 'merger:cat':
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
elif job.name == 'merger:merge':
self.log.debug("Got merge job: %s" % job.unique)
self.merge(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
except Exception:
self.log.exception("Exception while getting job")
def launchJob(self, job):
thread = threading.Thread(target=self._launch, args=(job,))
thread.start()
def _launch(self, job):
self.log.debug("Job %s: beginning" % (job.unique,))
with JobDir() as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
args = json.loads(job.arguments)
tasks = []
for project in args['projects']:
self.log.debug("Job %s: updating project %s" %
(job.unique, project['name']))
tasks.append(self.update(project['name'], project['url']))
for task in tasks:
task.wait()
self.log.debug("Job %s: git updates complete" % (job.unique,))
merger = self._getMerger(jobdir.git_root)
commit = merger.mergeChanges(args['items']) # noqa
# TODOv3: Ansible the ansible thing here.
self.prepareAnsibleFiles(jobdir, args)
data = {
'manager': self.hostname,
'url': 'https://server/job',
}
# TODOv3:
# 'name': self.name,
# 'manager': self.launch_server.hostname,
# 'worker_name': 'My Worker',
# 'worker_hostname': 'localhost',
# 'worker_ips': ['127.0.0.1', '192.168.1.1'],
# 'worker_fqdn': 'zuul.example.org',
# 'worker_program': 'FakeBuilder',
# 'worker_version': 'v1.1',
# 'worker_extra': {'something': 'else'}
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
result = self.runAnsible(jobdir, job)
result = dict(result=result)
job.sendWorkComplete(json.dumps(result))
def stopJob(self, job):
# TODOv3: implement.
job.sendWorkComplete()
def getHostList(self, args):
# TODOv3: This should get the appropriate nodes from nodepool,
# or in the unit tests, be overriden to return localhost.
return [('localhost', dict(ansible_connection='local'))]
def prepareAnsibleFiles(self, jobdir, args):
with open(jobdir.inventory, 'w') as inventory:
for host_name, host_vars in self.getHostList(args):
inventory.write(host_name)
inventory.write(' ')
for k, v in host_vars.items():
inventory.write('%s=%s' % (k, v))
inventory.write('\n')
with open(jobdir.playbook, 'w') as playbook:
play = dict(hosts='localhost',
tasks=[dict(name='test',
shell='echo Hello world')])
playbook.write(yaml.dump([play]))
with open(jobdir.config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
config.write('[ssh_connection]\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
def _ansibleTimeout(self, proc, msg):
self.log.warning(msg)
self.abortRunningProc(proc)
def abortRunningProc(self, proc):
aborted = False
self.log.debug("Abort: sending kill signal to job "
"process group")
try:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGKILL)
aborted = True
except Exception:
self.log.exception("Exception while killing "
"ansible process:")
return aborted
def runAnsible(self, jobdir, job):
# Job is included here for the benefit of the test framework.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
if False: # TODOv3: self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.playbook, verbose]
self.log.debug("Ansible command: %s" % (cmd,))
# TODOv3: verbose
proc = subprocess.Popen(
cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
env=env_copy,
)
ret = None
# TODOv3: get this from the job
timeout = 60
watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
self._ansibleTimeout,
(proc,
"Ansible timeout exceeded"))
watchdog.start()
try:
for line in iter(proc.stdout.readline, b''):
line = line[:1024].rstrip()
self.log.debug("Ansible output: %s" % (line,))
ret = proc.wait()
finally:
watchdog.stop()
self.log.debug("Ansible exit code: %s" % (ret,))
if watchdog.timed_out:
return 'TIMED_OUT'
if ret == 3:
# AnsibleHostUnreachable: We had a network issue connecting to
# our zuul-worker.
return None
elif ret == -9:
# Received abort request.
return None
if ret == 0:
return 'SUCCESS'
return 'FAILURE'
def cat(self, job):
args = json.loads(job.arguments)
task = self.update(args['project'], args['url'])
task.wait()
files = self.merger.getFiles(args['project'], args['url'],
args['branch'], args['files'])
result = dict(updated=True,
files=files,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
def merge(self, job):
args = json.loads(job.arguments)
ret = self.merger.mergeChanges(args['items'], args.get('files'))
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
if args.get('files'):
result['commit'], result['files'] = ret
else:
result['commit'] = ret
job.sendWorkComplete(json.dumps(result))