| # Copyright 2011 OpenStack, LLC. |
| # Copyright 2012 Hewlett-Packard Development Company, L.P. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import threading |
| import select |
| import json |
| import time |
| from six.moves import queue as Queue |
| import paramiko |
| import logging |
| import pprint |
| import voluptuous as v |
| import urllib2 |
| |
| from zuul.connection import BaseConnection |
| from zuul.model import TriggerEvent |
| |
| |
| class GerritEventConnector(threading.Thread): |
| """Move events from Gerrit to the scheduler.""" |
| |
| log = logging.getLogger("zuul.GerritEventConnector") |
| delay = 5.0 |
| |
| def __init__(self, connection): |
| super(GerritEventConnector, self).__init__() |
| self.daemon = True |
| self.connection = connection |
| self._stopped = False |
| |
| def stop(self): |
| self._stopped = True |
| self.connection.addEvent(None) |
| |
| def _handleEvent(self): |
| ts, data = self.connection.getEvent() |
| if self._stopped: |
| return |
| # Gerrit can produce inconsistent data immediately after an |
| # event, So ensure that we do not deliver the event to Zuul |
| # until at least a certain amount of time has passed. Note |
| # that if we receive several events in succession, we will |
| # only need to delay for the first event. In essence, Zuul |
| # should always be a constant number of seconds behind Gerrit. |
| now = time.time() |
| time.sleep(max((ts + self.delay) - now, 0.0)) |
| event = TriggerEvent() |
| event.type = data.get('type') |
| event.trigger_name = 'gerrit' |
| change = data.get('change') |
| if change: |
| event.project_name = change.get('project') |
| event.branch = change.get('branch') |
| event.change_number = change.get('number') |
| event.change_url = change.get('url') |
| patchset = data.get('patchSet') |
| if patchset: |
| event.patch_number = patchset.get('number') |
| event.refspec = patchset.get('ref') |
| event.approvals = data.get('approvals', []) |
| event.comment = data.get('comment') |
| refupdate = data.get('refUpdate') |
| if refupdate: |
| event.project_name = refupdate.get('project') |
| event.ref = refupdate.get('refName') |
| event.oldrev = refupdate.get('oldRev') |
| event.newrev = refupdate.get('newRev') |
| # Map the event types to a field name holding a Gerrit |
| # account attribute. See Gerrit stream-event documentation |
| # in cmd-stream-events.html |
| accountfield_from_type = { |
| 'patchset-created': 'uploader', |
| 'draft-published': 'uploader', # Gerrit 2.5/2.6 |
| 'change-abandoned': 'abandoner', |
| 'change-restored': 'restorer', |
| 'change-merged': 'submitter', |
| 'merge-failed': 'submitter', # Gerrit 2.5/2.6 |
| 'comment-added': 'author', |
| 'ref-updated': 'submitter', |
| 'reviewer-added': 'reviewer', # Gerrit 2.5/2.6 |
| } |
| try: |
| event.account = data.get(accountfield_from_type[event.type]) |
| except KeyError: |
| self.log.warning("Received unrecognized event type '%s' from Gerrit.\ |
| Can not get account information." % event.type) |
| event.account = None |
| |
| if (event.change_number and |
| self.connection.sched.getProject(event.project_name)): |
| # Call _getChange for the side effect of updating the |
| # cache. Note that this modifies Change objects outside |
| # the main thread. |
| # NOTE(jhesketh): Ideally we'd just remove the change from the |
| # cache to denote that it needs updating. However the change |
| # object is already used by Item's and hence BuildSet's etc. and |
| # we need to update those objects by reference so that they have |
| # the correct/new information and also avoid hitting gerrit |
| # multiple times. |
| if self.connection.attached_to['source']: |
| self.connection.attached_to['source'][0]._getChange( |
| event.change_number, event.patch_number, refresh=True) |
| # We only need to do this once since the connection maintains |
| # the cache (which is shared between all the sources) |
| # NOTE(jhesketh): We may couple sources and connections again |
| # at which point this becomes more sensible. |
| self.connection.sched.addEvent(event) |
| |
| def run(self): |
| while True: |
| if self._stopped: |
| return |
| try: |
| self._handleEvent() |
| except: |
| self.log.exception("Exception moving Gerrit event:") |
| finally: |
| self.connection.eventDone() |
| |
| |
| class GerritWatcher(threading.Thread): |
| log = logging.getLogger("gerrit.GerritWatcher") |
| poll_timeout = 500 |
| |
| def __init__(self, gerrit_connection, username, hostname, port=29418, |
| keyfile=None): |
| threading.Thread.__init__(self) |
| self.username = username |
| self.keyfile = keyfile |
| self.hostname = hostname |
| self.port = port |
| self.gerrit_connection = gerrit_connection |
| self._stopped = False |
| |
| def _read(self, fd): |
| l = fd.readline() |
| data = json.loads(l) |
| self.log.debug("Received data from Gerrit event stream: \n%s" % |
| pprint.pformat(data)) |
| self.gerrit_connection.addEvent(data) |
| |
| def _listen(self, stdout, stderr): |
| poll = select.poll() |
| poll.register(stdout.channel) |
| while not self._stopped: |
| ret = poll.poll(self.poll_timeout) |
| for (fd, event) in ret: |
| if fd == stdout.channel.fileno(): |
| if event == select.POLLIN: |
| self._read(stdout) |
| else: |
| raise Exception("event on ssh connection") |
| |
| def _run(self): |
| try: |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(paramiko.WarningPolicy()) |
| client.connect(self.hostname, |
| username=self.username, |
| port=self.port, |
| key_filename=self.keyfile) |
| |
| stdin, stdout, stderr = client.exec_command("gerrit stream-events") |
| |
| self._listen(stdout, stderr) |
| |
| if not stdout.channel.exit_status_ready(): |
| # The stream-event is still running but we are done polling |
| # on stdout most likely due to being asked to stop. |
| # Try to stop the stream-events command sending Ctrl-C |
| stdin.write("\x03") |
| time.sleep(.2) |
| if not stdout.channel.exit_status_ready(): |
| # we're still not ready to exit, lets force the channel |
| # closed now. |
| stdout.channel.close() |
| ret = stdout.channel.recv_exit_status() |
| self.log.debug("SSH exit status: %s" % ret) |
| client.close() |
| |
| if ret and ret not in [-1, 130]: |
| raise Exception("Gerrit error executing stream-events") |
| except: |
| self.log.exception("Exception on ssh event stream:") |
| time.sleep(5) |
| |
| def run(self): |
| while not self._stopped: |
| self._run() |
| |
| def stop(self): |
| self.log.debug("Stopping watcher") |
| self._stopped = True |
| |
| |
| class GerritConnection(BaseConnection): |
| driver_name = 'gerrit' |
| log = logging.getLogger("connection.gerrit") |
| |
| def __init__(self, connection_name, connection_config): |
| super(GerritConnection, self).__init__(connection_name, |
| connection_config) |
| if 'server' not in self.connection_config: |
| raise Exception('server is required for gerrit connections in ' |
| '%s' % self.connection_name) |
| if 'user' not in self.connection_config: |
| raise Exception('user is required for gerrit connections in ' |
| '%s' % self.connection_name) |
| |
| self.user = self.connection_config.get('user') |
| self.server = self.connection_config.get('server') |
| self.port = int(self.connection_config.get('port', 29418)) |
| self.keyfile = self.connection_config.get('sshkey', None) |
| self.watcher_thread = None |
| self.event_queue = None |
| self.client = None |
| |
| self.baseurl = self.connection_config.get('baseurl', |
| 'https://%s' % self.server) |
| |
| self._change_cache = {} |
| self.gerrit_event_connector = None |
| |
| def getCachedChange(self, key): |
| if key in self._change_cache: |
| return self._change_cache.get(key) |
| return None |
| |
| def updateChangeCache(self, key, value): |
| self._change_cache[key] = value |
| |
| def deleteCachedChange(self, key): |
| if key in self._change_cache: |
| del self._change_cache[key] |
| |
| def maintainCache(self, relevant): |
| # This lets the user supply a list of change objects that are |
| # still in use. Anything in our cache that isn't in the supplied |
| # list should be safe to remove from the cache. |
| remove = [] |
| for key, change in self._change_cache.items(): |
| if change not in relevant: |
| remove.append(key) |
| for key in remove: |
| del self._change_cache[key] |
| |
| def addEvent(self, data): |
| return self.event_queue.put((time.time(), data)) |
| |
| def getEvent(self): |
| return self.event_queue.get() |
| |
| def eventDone(self): |
| self.event_queue.task_done() |
| |
| def review(self, project, change, message, action={}): |
| cmd = 'gerrit review --project %s' % project |
| if message: |
| cmd += ' --message "%s"' % message |
| for key, val in action.items(): |
| if val is True: |
| cmd += ' --%s' % key |
| else: |
| cmd += ' --%s %s' % (key, val) |
| cmd += ' %s' % change |
| out, err = self._ssh(cmd) |
| return err |
| |
| def query(self, query): |
| args = '--all-approvals --comments --commit-message' |
| args += ' --current-patch-set --dependencies --files' |
| args += ' --patch-sets --submit-records' |
| cmd = 'gerrit query --format json %s %s' % ( |
| args, query) |
| out, err = self._ssh(cmd) |
| if not out: |
| return False |
| lines = out.split('\n') |
| if not lines: |
| return False |
| data = json.loads(lines[0]) |
| if not data: |
| return False |
| self.log.debug("Received data from Gerrit query: \n%s" % |
| (pprint.pformat(data))) |
| return data |
| |
| def simpleQuery(self, query): |
| def _query_chunk(query): |
| args = '--commit-message --current-patch-set' |
| |
| cmd = 'gerrit query --format json %s %s' % ( |
| args, query) |
| out, err = self._ssh(cmd) |
| if not out: |
| return False |
| lines = out.split('\n') |
| if not lines: |
| return False |
| |
| # filter out blank lines |
| data = [json.loads(line) for line in lines |
| if line.startswith('{')] |
| |
| # check last entry for more changes |
| more_changes = None |
| if 'moreChanges' in data[-1]: |
| more_changes = data[-1]['moreChanges'] |
| |
| # we have to remove the statistics line |
| del data[-1] |
| |
| if not data: |
| return False, more_changes |
| self.log.debug("Received data from Gerrit query: \n%s" % |
| (pprint.pformat(data))) |
| return data, more_changes |
| |
| # gerrit returns 500 results by default, so implement paging |
| # for large projects like nova |
| alldata = [] |
| chunk, more_changes = _query_chunk(query) |
| while(chunk): |
| alldata.extend(chunk) |
| if more_changes is None: |
| # continue sortKey based (before Gerrit 2.9) |
| resume = "resume_sortkey:'%s'" % chunk[-1]["sortKey"] |
| elif more_changes: |
| # continue moreChanges based (since Gerrit 2.9) |
| resume = "-S %d" % len(alldata) |
| else: |
| # no more changes |
| break |
| |
| chunk, more_changes = _query_chunk("%s %s" % (query, resume)) |
| return alldata |
| |
| def _open(self): |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(paramiko.WarningPolicy()) |
| client.connect(self.server, |
| username=self.user, |
| port=self.port, |
| key_filename=self.keyfile) |
| self.client = client |
| |
| def _ssh(self, command, stdin_data=None): |
| if not self.client: |
| self._open() |
| |
| try: |
| self.log.debug("SSH command:\n%s" % command) |
| stdin, stdout, stderr = self.client.exec_command(command) |
| except: |
| self._open() |
| stdin, stdout, stderr = self.client.exec_command(command) |
| |
| if stdin_data: |
| stdin.write(stdin_data) |
| |
| out = stdout.read() |
| self.log.debug("SSH received stdout:\n%s" % out) |
| |
| ret = stdout.channel.recv_exit_status() |
| self.log.debug("SSH exit status: %s" % ret) |
| |
| err = stderr.read() |
| self.log.debug("SSH received stderr:\n%s" % err) |
| if ret: |
| raise Exception("Gerrit error executing %s" % command) |
| return (out, err) |
| |
| def getInfoRefs(self, project): |
| url = "%s/p/%s/info/refs?service=git-upload-pack" % ( |
| self.baseurl, project) |
| try: |
| data = urllib2.urlopen(url).read() |
| except: |
| self.log.error("Cannot get references from %s" % url) |
| raise # keeps urllib2 error informations |
| ret = {} |
| read_headers = False |
| read_advertisement = False |
| if data[4] != '#': |
| raise Exception("Gerrit repository does not support " |
| "git-upload-pack") |
| i = 0 |
| while i < len(data): |
| if len(data) - i < 4: |
| raise Exception("Invalid length in info/refs") |
| plen = int(data[i:i + 4], 16) |
| i += 4 |
| # It's the length of the packet, including the 4 bytes of the |
| # length itself, unless it's null, in which case the length is |
| # not included. |
| if plen > 0: |
| plen -= 4 |
| if len(data) - i < plen: |
| raise Exception("Invalid data in info/refs") |
| line = data[i:i + plen] |
| i += plen |
| if not read_headers: |
| if plen == 0: |
| read_headers = True |
| continue |
| if not read_advertisement: |
| read_advertisement = True |
| continue |
| if plen == 0: |
| # The terminating null |
| continue |
| line = line.strip() |
| revision, ref = line.split() |
| ret[ref] = revision |
| return ret |
| |
| def getGitUrl(self, project): |
| url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port, |
| project.name) |
| return url |
| |
| def getGitwebUrl(self, project, sha=None): |
| url = '%s/gitweb?p=%s.git' % (self.baseurl, project) |
| if sha: |
| url += ';a=commitdiff;h=' + sha |
| return url |
| |
| def onLoad(self): |
| self.log.debug("Starting Gerrit Conncetion/Watchers") |
| self._start_watcher_thread() |
| self._start_event_connector() |
| |
| def onStop(self): |
| self.log.debug("Stopping Gerrit Conncetion/Watchers") |
| self._stop_watcher_thread() |
| self._stop_event_connector() |
| |
| def _stop_watcher_thread(self): |
| if self.watcher_thread: |
| self.watcher_thread.stop() |
| self.watcher_thread.join() |
| |
| def _start_watcher_thread(self): |
| self.event_queue = Queue.Queue() |
| self.watcher_thread = GerritWatcher( |
| self, |
| self.user, |
| self.server, |
| self.port, |
| keyfile=self.keyfile) |
| self.watcher_thread.start() |
| |
| def _stop_event_connector(self): |
| if self.gerrit_event_connector: |
| self.gerrit_event_connector.stop() |
| self.gerrit_event_connector.join() |
| |
| def _start_event_connector(self): |
| self.gerrit_event_connector = GerritEventConnector(self) |
| self.gerrit_event_connector.start() |
| |
| |
| def getSchema(): |
| gerrit_connection = v.Any(str, v.Schema({}, extra=True)) |
| return gerrit_connection |