| # Copyright 2011 OpenStack, LLC. |
| # Copyright 2012 Hewlett-Packard Development Company, L.P. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import threading |
| import select |
| import json |
| import time |
| from six.moves import queue as Queue |
| import paramiko |
| import logging |
| import pprint |
| |
| |
| class GerritWatcher(threading.Thread): |
| log = logging.getLogger("gerrit.GerritWatcher") |
| |
| def __init__(self, gerrit, username, hostname, port=29418, keyfile=None): |
| threading.Thread.__init__(self) |
| self.username = username |
| self.keyfile = keyfile |
| self.hostname = hostname |
| self.port = port |
| self.gerrit = gerrit |
| |
| def _read(self, fd): |
| l = fd.readline() |
| data = json.loads(l) |
| self.log.debug("Received data from Gerrit event stream: \n%s" % |
| pprint.pformat(data)) |
| self.gerrit.addEvent((time.time(), data)) |
| |
| def _listen(self, stdout, stderr): |
| poll = select.poll() |
| poll.register(stdout.channel) |
| while True: |
| ret = poll.poll() |
| for (fd, event) in ret: |
| if fd == stdout.channel.fileno(): |
| if event == select.POLLIN: |
| self._read(stdout) |
| else: |
| raise Exception("event on ssh connection") |
| |
| def _run(self): |
| try: |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(paramiko.WarningPolicy()) |
| client.connect(self.hostname, |
| username=self.username, |
| port=self.port, |
| key_filename=self.keyfile) |
| |
| stdin, stdout, stderr = client.exec_command("gerrit stream-events") |
| |
| self._listen(stdout, stderr) |
| |
| ret = stdout.channel.recv_exit_status() |
| self.log.debug("SSH exit status: %s" % ret) |
| |
| if ret: |
| raise Exception("Gerrit error executing stream-events") |
| except: |
| self.log.exception("Exception on ssh event stream:") |
| time.sleep(5) |
| |
| def run(self): |
| while True: |
| self._run() |
| |
| |
| class Gerrit(object): |
| log = logging.getLogger("gerrit.Gerrit") |
| |
| def __init__(self, hostname, username, port=29418, keyfile=None): |
| self.username = username |
| self.hostname = hostname |
| self.port = port |
| self.keyfile = keyfile |
| self.watcher_thread = None |
| self.event_queue = None |
| self.client = None |
| |
| def startWatching(self): |
| self.event_queue = Queue.Queue() |
| self.watcher_thread = GerritWatcher( |
| self, |
| self.username, |
| self.hostname, |
| self.port, |
| keyfile=self.keyfile) |
| self.watcher_thread.start() |
| |
| def addEvent(self, data): |
| return self.event_queue.put(data) |
| |
| def getEvent(self): |
| return self.event_queue.get() |
| |
| def eventDone(self): |
| self.event_queue.task_done() |
| |
| def review(self, project, change, message, action={}): |
| cmd = 'gerrit review --project %s' % project |
| if message: |
| cmd += ' --message "%s"' % message |
| for k, v in action.items(): |
| if v is True: |
| cmd += ' --%s' % k |
| else: |
| cmd += ' --label %s=%s' % (k, v) |
| cmd += ' %s' % change |
| out, err = self._ssh(cmd) |
| return err |
| |
| def query(self, query): |
| args = '--all-approvals --comments --commit-message' |
| args += ' --current-patch-set --dependencies --files' |
| args += ' --patch-sets --submit-records' |
| cmd = 'gerrit query --format json %s %s' % ( |
| args, query) |
| out, err = self._ssh(cmd) |
| if not out: |
| return False |
| lines = out.split('\n') |
| if not lines: |
| return False |
| data = json.loads(lines[0]) |
| if not data: |
| return False |
| self.log.debug("Received data from Gerrit query: \n%s" % |
| (pprint.pformat(data))) |
| return data |
| |
| def simpleQuery(self, query): |
| def _query_chunk(query): |
| args = '--commit-message --current-patch-set' |
| |
| cmd = 'gerrit query --format json %s %s' % ( |
| args, query) |
| out, err = self._ssh(cmd) |
| if not out: |
| return False |
| lines = out.split('\n') |
| if not lines: |
| return False |
| |
| # filter out blank lines |
| data = [json.loads(line) for line in lines |
| if line.startswith('{')] |
| |
| # check last entry for more changes |
| more_changes = None |
| if 'moreChanges' in data[-1]: |
| more_changes = data[-1]['moreChanges'] |
| |
| # we have to remove the statistics line |
| del data[-1] |
| |
| if not data: |
| return False, more_changes |
| self.log.debug("Received data from Gerrit query: \n%s" % |
| (pprint.pformat(data))) |
| return data, more_changes |
| |
| # gerrit returns 500 results by default, so implement paging |
| # for large projects like nova |
| alldata = [] |
| chunk, more_changes = _query_chunk(query) |
| while(chunk): |
| alldata.extend(chunk) |
| if more_changes is None: |
| # continue sortKey based (before Gerrit 2.9) |
| resume = "resume_sortkey:'%s'" % chunk[-1]["sortKey"] |
| elif more_changes: |
| # continue moreChanges based (since Gerrit 2.9) |
| resume = "-S %d" % len(alldata) |
| else: |
| # no more changes |
| break |
| |
| chunk, more_changes = _query_chunk("%s %s" % (query, resume)) |
| return alldata |
| |
| def _open(self): |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(paramiko.WarningPolicy()) |
| client.connect(self.hostname, |
| username=self.username, |
| port=self.port, |
| key_filename=self.keyfile) |
| self.client = client |
| |
| def _ssh(self, command, stdin_data=None): |
| if not self.client: |
| self._open() |
| |
| try: |
| self.log.debug("SSH command:\n%s" % command) |
| stdin, stdout, stderr = self.client.exec_command(command) |
| except: |
| self._open() |
| stdin, stdout, stderr = self.client.exec_command(command) |
| |
| if stdin_data: |
| stdin.write(stdin_data) |
| |
| out = stdout.read() |
| self.log.debug("SSH received stdout:\n%s" % out) |
| |
| ret = stdout.channel.recv_exit_status() |
| self.log.debug("SSH exit status: %s" % ret) |
| |
| err = stderr.read() |
| self.log.debug("SSH received stderr:\n%s" % err) |
| if ret: |
| raise Exception("Gerrit error executing %s" % command) |
| return (out, err) |