blob: 3a8644addfaf8386866bf08058e3bf35e586eb20 [file] [log] [blame]
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import time
import urllib2
from zuul.lib import gerrit
from zuul.model import TriggerEvent, Change
class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
def __init__(self, gerrit, sched, trigger):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.gerrit = gerrit
self.sched = sched
self.trigger = trigger
self._stopped = False
def stop(self):
self._stopped = True
self.gerrit.addEvent(None)
def _handleEvent(self):
data = self.gerrit.getEvent()
if self._stopped:
return
event = TriggerEvent()
event.type = data.get('type')
event.trigger_name = self.trigger.name
change = data.get('change')
if change:
event.project_name = change.get('project')
event.branch = change.get('branch')
event.change_number = change.get('number')
event.change_url = change.get('url')
patchset = data.get('patchSet')
if patchset:
event.patch_number = patchset.get('number')
event.refspec = patchset.get('ref')
event.approvals = data.get('approvals', [])
event.comment = data.get('comment')
refupdate = data.get('refUpdate')
if refupdate:
event.project_name = refupdate.get('project')
event.ref = refupdate.get('refName')
event.oldrev = refupdate.get('oldRev')
event.newrev = refupdate.get('newRev')
# Map the event types to a field name holding a Gerrit
# account attribute. See Gerrit stream-event documentation
# in cmd-stream-events.html
accountfield_from_type = {
'patchset-created': 'uploader',
'draft-published': 'uploader', # Gerrit 2.5/2.6
'change-abandoned': 'abandoner',
'change-restored': 'restorer',
'change-merged': 'submitter',
'merge-failed': 'submitter', # Gerrit 2.5/2.6
'comment-added': 'author',
'ref-updated': 'submitter',
'reviewer-added': 'reviewer', # Gerrit 2.5/2.6
}
try:
event.account = data.get(accountfield_from_type[event.type])
except KeyError:
self.log.error("Received unrecognized event type '%s' from Gerrit.\
Can not get account information." % event.type)
event.account = None
if event.change_number:
# Call getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.trigger.getChange(event.change_number,
event.patch_number,
refresh=True)
self.sched.addEvent(event)
self.gerrit.eventDone()
def run(self):
while True:
if self._stopped:
return
try:
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
class Gerrit(object):
name = 'gerrit'
log = logging.getLogger("zuul.Gerrit")
replication_timeout = 60
replication_retry_interval = 5
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
self.config = config
self.server = config.get('gerrit', 'server')
if config.has_option('gerrit', 'baseurl'):
self.baseurl = config.get('gerrit', 'baseurl')
else:
self.baseurl = 'https://%s' % self.server
user = config.get('gerrit', 'user')
if config.has_option('gerrit', 'sshkey'):
sshkey = config.get('gerrit', 'sshkey')
else:
sshkey = None
if config.has_option('gerrit', 'port'):
port = int(config.get('gerrit', 'port'))
else:
port = 29418
self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
self.gerrit.startWatching()
self.gerrit_connector = GerritEventConnector(
self.gerrit, sched, self)
self.gerrit_connector.start()
def stop(self):
self.gerrit_connector.stop()
self.gerrit_connector.join()
def _getInfoRefs(self, project):
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
data = urllib2.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
raise # keeps urllib2 error informations
ret = {}
read_headers = False
read_advertisement = False
if data[4] != '#':
raise Exception("Gerrit repository does not support "
"git-upload-pack")
i = 0
while i < len(data):
if len(data) - i < 4:
raise Exception("Invalid length in info/refs")
plen = int(data[i:i + 4], 16)
i += 4
# It's the length of the packet, including the 4 bytes of the
# length itself, unless it's null, in which case the length is
# not included.
if plen > 0:
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
line = data[i:i + plen]
i += plen
if not read_headers:
if plen == 0:
read_headers = True
continue
if not read_advertisement:
read_advertisement = True
continue
if plen == 0:
# The terminating null
continue
line = line.strip()
revision, ref = line.split()
ret[ref] = revision
return ret
def getRefSha(self, project, ref):
refs = {}
try:
refs = self._getInfoRefs(project)
except:
self.log.exception("Exception looking for ref %s" %
ref)
sha = refs.get(ref, '')
return sha
def waitForRefSha(self, project, ref, old_sha=''):
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
sha = self.getRefSha(project.name, ref)
if old_sha != sha:
return True
time.sleep(self.replication_retry_interval)
return False
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.gerrit.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if not head:
return change.is_merged
if not change.is_merged:
return False
ref = 'refs/heads/' + change.branch
self.log.debug("Waiting for %s to appear in git repo" % (change))
if self.waitForRefSha(change.project, ref, change._ref_sha):
self.log.debug("Change %s is in the git repo" %
(change))
return True
self.log.debug("Change %s did not appear in the git repo" %
(change))
return False
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED' or status == 'SUBMITTED':
return True
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if not 'submitRecords' in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] == 'OK':
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
# list should be same to remove from the cache.
remove = []
for key, change in self._change_cache.items():
if change not in relevant:
remove.append(key)
for key in remove:
del self._change_cache[key]
def postConfig(self):
pass
def getChange(self, number, patchset, refresh=False):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
change = self._change_cache.get(key)
if not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self.updateChange(change)
except Exception:
del self._change_cache[key]
raise
return change
def updateChange(self, change):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
change._data = data
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise Exception("Change %s,%s not found" % (change.number,
change.patchset))
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
change.files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
change.files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
change.is_merged = self._isMerged(change)
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
change.needs_change = None
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
if not dep.is_merged:
change.needs_change = dep
change.needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
change.needed_by_changes.append(dep)
return change
def getGitUrl(self, project):
server = self.config.get('gerrit', 'server')
user = self.config.get('gerrit', 'user')
if self.config.has_option('gerrit', 'port'):
port = int(self.config.get('gerrit', 'port'))
else:
port = 29418
url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
return url
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url