| # Copyright 2013 Rackspace Australia |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| |
| import git |
| import logging |
| import magic |
| import os |
| import requests |
| import select |
| import shutil |
| import subprocess |
| import swiftclient |
| import sys |
| import tempfile |
| import time |
| |
| |
| log = logging.getLogger('lib.utils') |
| |
| |
| class GitRepository(object): |
| |
| """ Manage a git repository for our uses """ |
| log = logging.getLogger("lib.utils.GitRepository") |
| |
| def __init__(self, remote_url, local_path): |
| self.remote_url = remote_url |
| self.local_path = local_path |
| self._ensure_cloned() |
| |
| self.repo = git.Repo(self.local_path) |
| |
| def _ensure_cloned(self): |
| if not os.path.exists(self.local_path): |
| self.log.debug("Cloning from %s to %s" % (self.remote_url, |
| self.local_path)) |
| git.Repo.clone_from(self.remote_url, self.local_path) |
| |
| def fetch(self, ref): |
| # The git.remote.fetch method may read in git progress info and |
| # interpret it improperly causing an AssertionError. Because the |
| # data was fetched properly subsequent fetches don't seem to fail. |
| # So try again if an AssertionError is caught. |
| origin = self.repo.remotes.origin |
| self.log.debug("Fetching %s from %s" % (ref, origin)) |
| |
| try: |
| origin.fetch(ref) |
| except AssertionError: |
| origin.fetch(ref) |
| |
| def checkout(self, ref): |
| self.log.debug("Checking out %s" % ref) |
| return self.repo.git.checkout(ref) |
| |
| def reset(self): |
| self._ensure_cloned() |
| self.log.debug("Resetting repository %s" % self.local_path) |
| self.update() |
| origin = self.repo.remotes.origin |
| for ref in origin.refs: |
| if ref.remote_head == 'HEAD': |
| continue |
| self.repo.create_head(ref.remote_head, ref, force=True) |
| |
| # Reset to remote HEAD (usually origin/master) |
| self.repo.head.reference = origin.refs['HEAD'] |
| self.repo.head.reset(index=True, working_tree=True) |
| self.repo.git.clean('-x', '-f', '-d') |
| |
| def update(self): |
| self._ensure_cloned() |
| self.log.debug("Updating repository %s" % self.local_path) |
| origin = self.repo.remotes.origin |
| origin.update() |
| # If the remote repository is repacked, the repo object's |
| # cache may be out of date. Specifically, it caches whether |
| # to check the loose or packed DB for a given SHA. Further, |
| # if there was no pack or lose directory to start with, the |
| # repo object may not even have a database for it. Avoid |
| # these problems by recreating the repo object. |
| self.repo = git.Repo(self.local_path) |
| |
| |
| def execute_to_log(cmd, logfile, timeout=-1, watch_logs=[], heartbeat=30, |
| env=None, cwd=None): |
| """ Executes a command and logs the STDOUT/STDERR and output of any |
| supplied watch_logs from logs into a new logfile |
| |
| watch_logs is a list of tuples with (name,file) """ |
| |
| if not os.path.isdir(os.path.dirname(logfile)): |
| os.makedirs(os.path.dirname(logfile)) |
| |
| logger = logging.getLogger(logfile) |
| log_handler = logging.FileHandler(logfile) |
| log_formatter = logging.Formatter('%(asctime)s %(message)s') |
| log_handler.setFormatter(log_formatter) |
| logger.addHandler(log_handler) |
| |
| descriptors = {} |
| |
| for watch_file in watch_logs: |
| if not os.path.exists(watch_file[1]): |
| logger.warning('Failed to monitor log file %s: file not found' |
| % watch_file[1]) |
| continue |
| |
| try: |
| fd = os.open(watch_file[1], os.O_RDONLY) |
| os.lseek(fd, 0, os.SEEK_END) |
| descriptors[fd] = {'name': watch_file[0], |
| 'poll': select.POLLIN, |
| 'lines': ''} |
| except Exception as e: |
| logger.warning('Failed to monitor log file %s: %s' |
| % (watch_file[1], e)) |
| |
| cmd += ' 2>&1' |
| logger.info("[running %s]" % cmd) |
| start_time = time.time() |
| p = subprocess.Popen( |
| cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| env=env, cwd=cwd) |
| |
| descriptors[p.stdout.fileno()] = dict( |
| name='[output]', |
| poll=(select.POLLIN | select.POLLHUP), |
| lines='' |
| ) |
| |
| poll_obj = select.poll() |
| for fd, descriptor in descriptors.items(): |
| poll_obj.register(fd, descriptor['poll']) |
| |
| last_heartbeat = time.time() |
| |
| def process(fd): |
| """ Write the fd to log """ |
| global last_heartbeat |
| descriptors[fd]['lines'] += os.read(fd, 1024 * 1024) |
| # Avoid partial lines by only processing input with breaks |
| if descriptors[fd]['lines'].find('\n') != -1: |
| elems = descriptors[fd]['lines'].split('\n') |
| # Take all but the partial line |
| for l in elems[:-1]: |
| if len(l) > 0: |
| l = '%s %s' % (descriptors[fd]['name'], l) |
| logger.info(l) |
| last_heartbeat = time.time() |
| # Place the partial line back into lines to be processed |
| descriptors[fd]['lines'] = elems[-1] |
| |
| while p.poll() is None: |
| if timeout > 0 and time.time() - start_time > timeout: |
| # Append to logfile |
| logger.info("[timeout]") |
| os.kill(p.pid, 9) |
| |
| for fd, flag in poll_obj.poll(0): |
| process(fd) |
| |
| if heartbeat and (time.time() - last_heartbeat > heartbeat): |
| # Append to logfile |
| logger.info("[heartbeat]") |
| last_heartbeat = time.time() |
| |
| # Do one last write to get the remaining lines |
| for fd, flag in poll_obj.poll(0): |
| process(fd) |
| |
| # Clean up |
| for fd, descriptor in descriptors.items(): |
| poll_obj.unregister(fd) |
| if fd == p.stdout.fileno(): |
| # Don't try and close the process, it'll clean itself up |
| continue |
| os.close(fd) |
| try: |
| p.kill() |
| except OSError: |
| pass |
| |
| logger.info('[script exit code = %d]' % p.returncode) |
| logger.removeHandler(log_handler) |
| log_handler.flush() |
| log_handler.close() |
| return p.returncode |
| |
| |
| def zuul_swift_upload(file_path, job_arguments): |
| """Upload working_dir to swift as per zuul's instructions""" |
| # TODO(jhesketh): replace with swift_form_post_submit from below |
| |
| # NOTE(jhesketh): Zuul specifies an object prefix in the destination so |
| # we don't need to be concerned with results_set_name |
| |
| file_list = [] |
| if os.path.isfile(file_path): |
| file_list.append(file_path) |
| elif os.path.isdir(file_path): |
| for path, folders, files in os.walk(file_path): |
| for f in files: |
| f_path = os.path.join(path, f) |
| file_list.append(f_path) |
| |
| # We are uploading the file_list as an HTTP POST multipart encoded. |
| # First grab out the information we need to send back from the hmac_body |
| payload = {} |
| (object_prefix, |
| payload['redirect'], |
| payload['max_file_size'], |
| payload['max_file_count'], |
| payload['expires']) = \ |
| job_arguments['ZUUL_EXTRA_SWIFT_HMAC_BODY'].split('\n') |
| |
| url = job_arguments['ZUUL_EXTRA_SWIFT_URL'] |
| payload['signature'] = job_arguments['ZUUL_EXTRA_SWIFT_SIGNATURE'] |
| logserver_prefix = job_arguments['ZUUL_EXTRA_SWIFT_LOGSERVER_PREFIX'] |
| |
| files = {} |
| for i, f in enumerate(file_list): |
| files['file%d' % (i + 1)] = open(f, 'rb') |
| |
| requests.post(url, data=payload, files=files) |
| |
| return (logserver_prefix + |
| job_arguments['ZUUL_EXTRA_SWIFT_DESTINATION_PREFIX']) |
| |
| |
| def generate_log_index(file_list, logserver_prefix, results_set_name): |
| """Create an index of logfiles and links to them""" |
| |
| output = '<html><head><title>Index of results</title></head><body>' |
| output += '<ul>' |
| for f in file_list: |
| file_url = os.path.join(logserver_prefix, results_set_name, f) |
| # Because file_list is simply a list to create an index for and it |
| # isn't necessarily on disk we can't check if a file is a folder or |
| # not. As such we normalise the name to get the folder/filename but |
| # then need to check if the last character was a trailing slash so to |
| # re-append it to make it obvious that it links to a folder |
| filename_postfix = '/' if f[-1] == '/' else '' |
| filename = os.path.basename(os.path.normpath(f)) + filename_postfix |
| output += '<li>' |
| output += '<a href="%s">%s</a>' % (file_url, filename) |
| output += '</li>' |
| |
| output += '</ul>' |
| output += '</body></html>' |
| return output |
| |
| |
| def make_index_file(file_list, logserver_prefix, results_set_name, |
| index_filename='index.html'): |
| """Writes an index into a file for pushing""" |
| |
| index_content = generate_log_index(file_list, logserver_prefix, |
| results_set_name) |
| tempdir = tempfile.mkdtemp() |
| fd = open(os.path.join(tempdir, index_filename), 'w') |
| fd.write(index_content) |
| return os.path.join(tempdir, index_filename) |
| |
| |
| def get_file_mime(file_path): |
| """Get the file mime using libmagic""" |
| |
| if not os.path.isfile(file_path): |
| return None |
| |
| if hasattr(magic, 'from_file'): |
| return magic.from_file(file_path, mime=True) |
| else: |
| # no magic.from_file, we might be using the libmagic bindings |
| m = magic.open(magic.MAGIC_MIME) |
| m.load() |
| return m.file(file_path).split(';')[0] |
| |
| |
| def swift_form_post_submit(file_list, url, hmac_body, signature): |
| """Send the files to swift via the FormPost middleware""" |
| |
| # We are uploading the file_list as an HTTP POST multipart encoded. |
| # First grab out the information we need to send back from the hmac_body |
| payload = {} |
| |
| (object_prefix, |
| payload['redirect'], |
| payload['max_file_size'], |
| payload['max_file_count'], |
| payload['expires']) = hmac_body.split('\n') |
| payload['signature'] = signature |
| |
| # Loop over the file list in chunks of max_file_count |
| for sub_file_list in (file_list[pos:pos + int(payload['max_file_count'])] |
| for pos in xrange(0, len(file_list), |
| int(payload['max_file_count']))): |
| if payload['expires'] < time.time(): |
| raise Exception("Ran out of time uploading files!") |
| files = {} |
| # Zuul's log path is generated without a tailing slash. As such the |
| # object prefix does not contain a slash and the files would be |
| # uploaded as 'prefix' + 'filename'. Assume we want the destination |
| # url to look like a folder and make sure there's a slash between. |
| filename_prefix = '/' if url[-1] != '/' else '' |
| for i, f in enumerate(sub_file_list): |
| if os.path.getsize(f['path']) > int(payload['max_file_size']): |
| sys.stderr.write('Warning: %s exceeds %d bytes. Skipping...\n' |
| % (f['path'], int(payload['max_file_size']))) |
| continue |
| files['file%d' % (i + 1)] = (filename_prefix + f['filename'], |
| open(f['path'], 'rb'), |
| get_file_mime(f['path'])) |
| requests.post(url, data=payload, files=files) |
| |
| |
| def build_file_list(file_path, logserver_prefix, results_set_name, |
| create_dir_indexes=True): |
| """Generate a list of files to upload to zuul. Recurses through directories |
| and generates index.html files if requested.""" |
| |
| # file_list: a list of dicts with {path=..., filename=...} where filename |
| # is appended to the end of the object (paths can be used) |
| file_list = [] |
| if os.path.isfile(file_path): |
| file_list.append({'filename': os.path.basename(file_path), |
| 'path': file_path}) |
| elif os.path.isdir(file_path): |
| if file_path[-1] == os.sep: |
| file_path = file_path[:-1] |
| parent_dir = os.path.dirname(file_path) |
| for path, folders, files in os.walk(file_path): |
| folder_contents = [] |
| for f in files: |
| full_path = os.path.join(path, f) |
| relative_name = os.path.relpath(full_path, parent_dir) |
| push_file = {'filename': relative_name, |
| 'path': full_path} |
| file_list.append(push_file) |
| folder_contents.append(relative_name) |
| |
| for f in folders: |
| full_path = os.path.join(path, f) |
| relative_name = os.path.relpath(full_path, parent_dir) |
| folder_contents.append(relative_name + '/') |
| |
| if create_dir_indexes: |
| index_file = make_index_file(folder_contents, logserver_prefix, |
| results_set_name) |
| relative_name = os.path.relpath(path, parent_dir) |
| file_list.append({ |
| 'filename': os.path.join(relative_name, |
| os.path.basename(index_file)), |
| 'path': index_file}) |
| |
| return file_list |
| |
| |
| def push_files(results_set_name, path_list, publish_config, |
| generate_indexes=True): |
| """ Push a log file/foler to a server. Returns the public URL """ |
| |
| file_list = [] |
| root_list = [] |
| |
| for file_path in path_list: |
| file_path = os.path.normpath(file_path) |
| if os.path.isfile(file_path): |
| root_list.append(os.path.basename(file_path)) |
| else: |
| root_list.append(os.path.basename(file_path) + '/') |
| |
| file_list += build_file_list( |
| file_path, publish_config['prepend_url'], results_set_name, |
| generate_indexes |
| ) |
| |
| index_file = '' |
| if generate_indexes: |
| index_file = make_index_file(root_list, publish_config['prepend_url'], |
| results_set_name) |
| file_list.append({ |
| 'filename': os.path.basename(index_file), |
| 'path': index_file}) |
| |
| method = publish_config['type'] + '_push_files' |
| if method in globals() and hasattr(globals()[method], '__call__'): |
| globals()[method](results_set_name, file_list, publish_config) |
| |
| return os.path.join(publish_config['prepend_url'], results_set_name, |
| os.path.basename(index_file)) |
| |
| |
| def swift_push_files(results_set_name, file_list, swift_config): |
| """ Push a log file to a swift server. """ |
| for file_item in file_list: |
| with open(file_item['path'], 'r') as fd: |
| con = swiftclient.client.Connection( |
| authurl=swift_config['authurl'], |
| user=swift_config['user'], |
| key=swift_config['password'], |
| os_options={'region_name': swift_config['region']}, |
| tenant_name=swift_config['tenant'], |
| auth_version=2.0) |
| filename = os.path.join(results_set_name, file_item['filename']) |
| con.put_object(swift_config['container'], filename, fd) |
| |
| |
| def local_push_files(results_set_name, file_list, local_config): |
| """ Copy the file locally somewhere sensible """ |
| for file_item in file_list: |
| dest_dir = os.path.join(local_config['path'], results_set_name, |
| os.path.dirname(file_item['filename'])) |
| dest_filename = os.path.basename(file_item['filename']) |
| if not os.path.isdir(dest_dir): |
| os.makedirs(dest_dir) |
| |
| dest_file = os.path.join(dest_dir, dest_filename) |
| shutil.copyfile(file_item['path'], dest_file) |
| |
| |
| def scp_push_files(results_set_name, file_path, local_config): |
| """ Copy the file remotely over ssh """ |
| # TODO! |
| pass |