Refactor sources out of triggers
This is to further differentiate between sources and triggers.
Eventually allowing for multiple triggers per pipeline.
Still to come is separating connections from everything.
Change-Id: I1d680dbed5f650165643842af450f16b32ec5ed9
diff --git a/tests/base.py b/tests/base.py
index 57b30b5..9316b43 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -52,6 +52,7 @@
import zuul.merger.server
import zuul.reporter.gerrit
import zuul.reporter.smtp
+import zuul.source.gerrit
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
@@ -382,12 +383,12 @@
log = logging.getLogger("zuul.test.FakeGerrit")
def __init__(self, hostname, username, port=29418, keyfile=None,
- changes_dbs={}):
+ changes_dbs={}, queues_dbs={}):
self.hostname = hostname
self.username = username
self.port = port
self.keyfile = keyfile
- self.event_queue = Queue.Queue()
+ self.event_queue = queues_dbs.get(hostname, {})
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_dbs.get(hostname, {})
@@ -499,13 +500,14 @@
return ret
-class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
+class FakeGerritSource(zuul.source.gerrit.Gerrit):
name = 'gerrit'
def __init__(self, upstream_root, *args):
- super(FakeGerritTrigger, self).__init__(*args)
+ super(FakeGerritSource, self).__init__(*args)
self.upstream_root = upstream_root
- self.gerrit_connector.delay = 0.0
+ self.replication_timeout = 1.5
+ self.replication_retry_interval = 0.5
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
@@ -958,11 +960,6 @@
old_urlopen = urllib2.urlopen
urllib2.urlopen = URLOpenerFactory
- self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
- self.swift)
- self.merge_client = zuul.merger.client.MergeClient(
- self.config, self.sched)
-
self.smtp_messages = []
def FakeSMTPFactory(*args, **kw):
@@ -971,12 +968,16 @@
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
+ self.gerrit_queues_dbs = {
+ self.config.get('gerrit', 'server'): Queue.Queue()
+ }
self.gerrit_changes_dbs = {
self.config.get('gerrit', 'server'): {}
}
def FakeGerritFactory(*args, **kw):
kw['changes_dbs'] = self.gerrit_changes_dbs
+ kw['queues_dbs'] = self.gerrit_queues_dbs
return FakeGerrit(*args, **kw)
self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
@@ -984,32 +985,23 @@
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
- self.gerrit = FakeGerritTrigger(
- self.upstream_root, self.config, self.sched)
- self.gerrit.replication_timeout = 1.5
- self.gerrit.replication_retry_interval = 0.5
- self.fake_gerrit = self.gerrit.gerrit
- self.fake_gerrit.upstream_root = self.upstream_root
-
- self.webapp = zuul.webapp.WebApp(self.sched, port=0)
- self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
+ self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
+ self.swift)
+ self.merge_client = zuul.merger.client.MergeClient(
+ self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
- self.sched.registerTrigger(self.gerrit)
- self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
- self.sched.registerTrigger(self.timer)
- self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
- self.sched)
- self.sched.registerTrigger(self.zuultrigger)
- self.sched.registerReporter(
- zuul.reporter.gerrit.Reporter(self.gerrit))
- self.smtp_reporter = zuul.reporter.smtp.Reporter(
- self.config.get('smtp', 'default_from'),
- self.config.get('smtp', 'default_to'),
- self.config.get('smtp', 'server'))
- self.sched.registerReporter(self.smtp_reporter)
+ self.register_sources()
+ self.fake_gerrit = self.gerrit_source.gerrit
+ self.fake_gerrit.upstream_root = self.upstream_root
+
+ self.register_triggers()
+ self.register_reporters()
+
+ self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+ self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()
self.sched.reconfigure(self.config)
@@ -1024,6 +1016,38 @@
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
+ def register_sources(self):
+ # Register the available sources
+ self.gerrit_source = FakeGerritSource(
+ self.upstream_root, self.config, self.sched)
+ self.gerrit_source.replication_timeout = 1.5
+ self.gerrit_source.replication_retry_interval = 0.5
+
+ self.sched.registerSource(self.gerrit_source)
+
+ def register_triggers(self):
+ # Register the available triggers
+ self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(
+ self.fake_gerrit, self.config, self.sched, self.gerrit_source)
+ self.gerrit_trigger.gerrit_connector.delay = 0.0
+
+ self.sched.registerTrigger(self.gerrit_trigger)
+ self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
+ self.sched.registerTrigger(self.timer)
+ self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
+ self.sched)
+ self.sched.registerTrigger(self.zuultrigger)
+
+ def register_reporters(self):
+ # Register the available reporters
+ self.sched.registerReporter(
+ zuul.reporter.gerrit.Reporter(self.fake_gerrit))
+ self.smtp_reporter = zuul.reporter.smtp.Reporter(
+ self.config.get('smtp', 'default_from'),
+ self.config.get('smtp', 'default_to'),
+ self.config.get('smtp', 'server'))
+ self.sched.registerReporter(self.smtp_reporter)
+
def setup_config(self):
"""Per test config object. Override to set different config."""
self.config = ConfigParser.ConfigParser()
@@ -1050,7 +1074,7 @@
self.merge_server.join()
self.merge_client.stop()
self.worker.shutdown()
- self.gerrit.stop()
+ self.gerrit_trigger.stop()
self.timer.stop()
self.sched.stop()
self.sched.join()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a257440..d99a65c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -729,8 +729,8 @@
self.assertEqual(self.history[6].changes,
'1,1 2,1 3,1 4,1 5,1 6,1 7,1')
- def test_trigger_cache(self):
- "Test that the trigger cache operates correctly"
+ def test_source_cache(self):
+ "Test that the source cache operates correctly"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -762,9 +762,9 @@
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
- self.log.debug("len %s" % self.gerrit._change_cache.keys())
+ self.log.debug("len %s" % self.gerrit_source._change_cache.keys())
# there should still be changes in the cache
- self.assertNotEqual(len(self.gerrit._change_cache.keys()), 0)
+ self.assertNotEqual(len(self.gerrit_source._change_cache.keys()), 0)
self.worker.hold_jobs_in_build = False
self.worker.release()
@@ -1469,7 +1469,7 @@
"Test that the merger works with large changes after a repack"
# https://bugs.launchpad.net/zuul/+bug/1078946
# This test assumes the repo is already cloned; make sure it is
- url = self.sched.triggers['gerrit'].getGitUrl(
+ url = self.sched.sources['gerrit'].getGitUrl(
self.sched.layout.projects['org/project1'])
self.merge_server.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@@ -2165,6 +2165,7 @@
def test_test_config(self):
"Test that we can test the config"
sched = zuul.scheduler.Scheduler()
+ sched.registerSource(None, 'gerrit')
sched.registerTrigger(None, 'gerrit')
sched.registerTrigger(None, 'timer')
sched.registerTrigger(None, 'zuul')
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 2d99a1f..75226dc 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -144,17 +144,58 @@
if self.gear_server_pid:
os.kill(self.gear_server_pid, signal.SIGKILL)
+ def register_sources(self):
+ # Register the available sources
+ # See comment at top of file about zuul imports
+ import zuul.source.gerrit
+ self.gerrit_source = zuul.source.gerrit.Gerrit(self.config, self.sched)
+
+ self.sched.registerSource(self.gerrit_source)
+
+ def register_triggers(self):
+ # Register the available triggers
+ # See comment at top of file about zuul imports
+ import zuul.trigger.gerrit
+ import zuul.trigger.timer
+ import zuul.trigger.zuultrigger
+ self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(self.gerrit,
+ self.config,
+ self.sched,
+ self.gerrit_source)
+ timer = zuul.trigger.timer.Timer(self.config, self.sched)
+ zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
+ self.sched)
+
+ self.sched.registerTrigger(self.gerrit_trigger)
+ self.sched.registerTrigger(timer)
+ self.sched.registerTrigger(zuultrigger)
+
+ def register_reporters(self):
+ # Register the available reporters
+ # See comment at top of file about zuul imports
+ import zuul.reporter.gerrit
+ import zuul.reporter.smtp
+ gerrit_reporter = zuul.reporter.gerrit.Reporter(self.gerrit)
+ smtp_reporter = zuul.reporter.smtp.Reporter(
+ self.config.get('smtp', 'default_from')
+ if self.config.has_option('smtp', 'default_from') else 'zuul',
+ self.config.get('smtp', 'default_to')
+ if self.config.has_option('smtp', 'default_to') else 'zuul',
+ self.config.get('smtp', 'server')
+ if self.config.has_option('smtp', 'server') else 'localhost',
+ self.config.get('smtp', 'port')
+ if self.config.has_option('smtp', 'port') else 25
+ )
+
+ self.sched.registerReporter(gerrit_reporter)
+ self.sched.registerReporter(smtp_reporter)
+
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.gearman
import zuul.merger.client
import zuul.lib.swift
- import zuul.reporter.gerrit
- import zuul.reporter.smtp
- import zuul.trigger.gerrit
- import zuul.trigger.timer
- import zuul.trigger.zuultrigger
import zuul.webapp
import zuul.rpclistener
@@ -172,35 +213,22 @@
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
- gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
- timer = zuul.trigger.timer.Timer(self.config, self.sched)
- zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
- self.sched)
+
if self.config.has_option('zuul', 'status_expiry'):
cache_expiry = self.config.getint('zuul', 'status_expiry')
else:
cache_expiry = 1
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
- gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
- smtp_reporter = zuul.reporter.smtp.Reporter(
- self.config.get('smtp', 'default_from')
- if self.config.has_option('smtp', 'default_from') else 'zuul',
- self.config.get('smtp', 'default_to')
- if self.config.has_option('smtp', 'default_to') else 'zuul',
- self.config.get('smtp', 'server')
- if self.config.has_option('smtp', 'server') else 'localhost',
- self.config.get('smtp', 'port')
- if self.config.has_option('smtp', 'port') else 25
- )
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
- self.sched.registerTrigger(gerrit)
- self.sched.registerTrigger(timer)
- self.sched.registerTrigger(zuultrigger)
- self.sched.registerReporter(gerrit_reporter)
- self.sched.registerReporter(smtp_reporter)
+ self.register_sources()
+ # TODO(jhesketh): Use connections instead of grabbing the gerrit lib
+ # from the source
+ self.gerrit = self.gerrit_source.gerrit
+ self.register_triggers()
+ self.register_reporters()
self.log.info('Starting scheduler')
self.sched.start()
diff --git a/zuul/model.py b/zuul/model.py
index 9106887..a646413 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -301,11 +301,11 @@
self.reporter = reporter
self.params = params
- def report(self, change, message):
+ def report(self, source, change, message):
"""Sends the built message off to the configured reporter.
Takes the change and message and adds the configured parameters.
"""
- return self.reporter.report(change, message, self.params)
+ return self.reporter.report(source, change, message, self.params)
def getSubmitAllowNeeds(self):
"""Gets the submit allow needs from the reporter based off the
diff --git a/zuul/reporter/gerrit.py b/zuul/reporter/gerrit.py
index 7c4774b..a2debfd 100644
--- a/zuul/reporter/gerrit.py
+++ b/zuul/reporter/gerrit.py
@@ -21,20 +21,20 @@
name = 'gerrit'
log = logging.getLogger("zuul.reporter.gerrit.Reporter")
- def __init__(self, trigger):
+ def __init__(self, gerrit):
"""Set up the reporter."""
- self.gerrit = trigger.gerrit
- self.trigger = trigger
+ # TODO: make default_gerrit come from a connection
+ self.default_gerrit = gerrit
- def report(self, change, message, params):
+ def report(self, source, change, message, params):
"""Send a message to gerrit."""
self.log.debug("Report change %s, params %s, message: %s" %
(change, params, message))
changeid = '%s,%s' % (change.number, change.patchset)
- change._ref_sha = self.trigger.getRefSha(change.project.name,
- 'refs/heads/' + change.branch)
- return self.gerrit.review(change.project.name, changeid, message,
- params)
+ change._ref_sha = source.getRefSha(change.project.name,
+ 'refs/heads/' + change.branch)
+ return self.default_gerrit.review(
+ change.project.name, changeid, message, params)
def getSubmitAllowNeeds(self, params):
"""Get a list of code review labels that are allowed to be
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
index b214019..0992d66 100644
--- a/zuul/reporter/smtp.py
+++ b/zuul/reporter/smtp.py
@@ -35,7 +35,7 @@
self.smtp_default_from = smtp_default_from
self.smtp_default_to = smtp_default_to
- def report(self, change, message, params):
+ def report(self, source, change, message, params):
"""Send the compiled report message via smtp."""
self.log.debug("Report change %s, params %s, message: %s" %
(change, params, message))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2355801..5567f40 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -189,6 +189,7 @@
self._stopped = False
self.launcher = None
self.merger = None
+ self.sources = dict()
self.triggers = dict()
self.reporters = dict()
self.config = None
@@ -265,8 +266,8 @@
pipeline = Pipeline(conf_pipeline['name'])
pipeline.description = conf_pipeline.get('description')
# TODO(jeblair): remove backwards compatibility:
- pipeline.source = self.triggers[conf_pipeline.get('source',
- 'gerrit')]
+ pipeline.source = self.sources[conf_pipeline.get('source',
+ 'gerrit')]
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
@@ -513,6 +514,11 @@
def setMerger(self, merger):
self.merger = merger
+ def registerSource(self, source, name=None):
+ if name is None:
+ name = source.name
+ self.sources[name] = source
+
def registerTrigger(self, trigger, name=None):
if name is None:
name = trigger.name
@@ -756,6 +762,8 @@
self.maintainTriggerCache()
for trigger in self.triggers.values():
trigger.postConfig()
+ for source in self.sources.values():
+ source.postConfig()
if statsd:
try:
for pipeline in self.layout.pipelines.values():
@@ -887,8 +895,8 @@
relevant.update(item.change.getRelatedChanges())
self.log.debug("End maintain trigger cache for: %s" % pipeline)
self.log.debug("Trigger cache size: %s" % len(relevant))
- for trigger in self.triggers.values():
- trigger.maintainCache(relevant)
+ for source in self.sources.values():
+ source.maintainCache(relevant)
def process_event_queue(self):
self.log.debug("Fetching trigger event")
@@ -1131,14 +1139,14 @@
if self.sched.config.has_option('zuul', 'status_url'):
msg += "\n" + self.sched.config.get('zuul', 'status_url')
ret = self.sendReport(self.pipeline.start_actions,
- change, msg)
+ self.pipeline.source, change, msg)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
- def sendReport(self, action_reporters, change, message):
+ def sendReport(self, action_reporters, source, change, message):
"""Sends the built message off to configured reporters.
Takes the action_reporters, change, message and extra options and
@@ -1147,7 +1155,7 @@
report_errors = []
if len(action_reporters) > 0:
for action_reporter in action_reporters:
- ret = action_reporter.report(change, message)
+ ret = action_reporter.report(source, change, message)
if ret:
report_errors.append(ret)
if len(report_errors) == 0:
@@ -1604,7 +1612,8 @@
try:
self.log.info("Reporting change %s, actions: %s" %
(item.change, actions))
- ret = self.sendReport(actions, item.change, report)
+ ret = self.sendReport(actions, self.pipeline.source,
+ item.change, report)
if ret:
self.log.error("Reporting change %s received: %s" %
(item.change, ret))
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/source/__init__.py
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
new file mode 100644
index 0000000..44b8609
--- /dev/null
+++ b/zuul/source/gerrit.py
@@ -0,0 +1,408 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+import time
+import urllib2
+from zuul.lib import gerrit
+from zuul.model import Change, Ref, NullChange
+
+
+class Gerrit(object):
+ name = 'gerrit'
+ log = logging.getLogger("zuul.source.Gerrit")
+ replication_timeout = 300
+ replication_retry_interval = 5
+
+ depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
+ re.MULTILINE | re.IGNORECASE)
+
+ def __init__(self, config, sched):
+ self._change_cache = {}
+ self.sched = sched
+ self.config = config
+ self.server = config.get('gerrit', 'server')
+ if config.has_option('gerrit', 'baseurl'):
+ self.baseurl = config.get('gerrit', 'baseurl')
+ else:
+ self.baseurl = 'https://%s' % self.server
+ user = config.get('gerrit', 'user')
+ if config.has_option('gerrit', 'sshkey'):
+ sshkey = config.get('gerrit', 'sshkey')
+ else:
+ sshkey = None
+ if config.has_option('gerrit', 'port'):
+ port = int(config.get('gerrit', 'port'))
+ else:
+ port = 29418
+ self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
+ self.gerrit.startWatching()
+
+ def _getInfoRefs(self, project):
+ url = "%s/p/%s/info/refs?service=git-upload-pack" % (
+ self.baseurl, project)
+ try:
+ data = urllib2.urlopen(url).read()
+ except:
+ self.log.error("Cannot get references from %s" % url)
+ raise # keeps urllib2 error informations
+ ret = {}
+ read_headers = False
+ read_advertisement = False
+ if data[4] != '#':
+ raise Exception("Gerrit repository does not support "
+ "git-upload-pack")
+ i = 0
+ while i < len(data):
+ if len(data) - i < 4:
+ raise Exception("Invalid length in info/refs")
+ plen = int(data[i:i + 4], 16)
+ i += 4
+ # It's the length of the packet, including the 4 bytes of the
+ # length itself, unless it's null, in which case the length is
+ # not included.
+ if plen > 0:
+ plen -= 4
+ if len(data) - i < plen:
+ raise Exception("Invalid data in info/refs")
+ line = data[i:i + plen]
+ i += plen
+ if not read_headers:
+ if plen == 0:
+ read_headers = True
+ continue
+ if not read_advertisement:
+ read_advertisement = True
+ continue
+ if plen == 0:
+ # The terminating null
+ continue
+ line = line.strip()
+ revision, ref = line.split()
+ ret[ref] = revision
+ return ret
+
+ def getRefSha(self, project, ref):
+ refs = {}
+ try:
+ refs = self._getInfoRefs(project)
+ except:
+ self.log.exception("Exception looking for ref %s" %
+ ref)
+ sha = refs.get(ref, '')
+ return sha
+
+ def waitForRefSha(self, project, ref, old_sha=''):
+ # Wait for the ref to show up in the repo
+ start = time.time()
+ while time.time() - start < self.replication_timeout:
+ sha = self.getRefSha(project.name, ref)
+ if old_sha != sha:
+ return True
+ time.sleep(self.replication_retry_interval)
+ return False
+
+ def isMerged(self, change, head=None):
+ self.log.debug("Checking if change %s is merged" % change)
+ if not change.number:
+ self.log.debug("Change has no number; considering it merged")
+ # Good question. It's probably ref-updated, which, ah,
+ # means it's merged.
+ return True
+
+ data = self.gerrit.query(change.number)
+ change._data = data
+ change.is_merged = self._isMerged(change)
+ if not head:
+ return change.is_merged
+ if not change.is_merged:
+ return False
+
+ ref = 'refs/heads/' + change.branch
+ self.log.debug("Waiting for %s to appear in git repo" % (change))
+ if self.waitForRefSha(change.project, ref, change._ref_sha):
+ self.log.debug("Change %s is in the git repo" %
+ (change))
+ return True
+ self.log.debug("Change %s did not appear in the git repo" %
+ (change))
+ return False
+
+ def _isMerged(self, change):
+ data = change._data
+ if not data:
+ return False
+ status = data.get('status')
+ if not status:
+ return False
+ self.log.debug("Change %s status: %s" % (change, status))
+ if status == 'MERGED':
+ return True
+ return False
+
+ def canMerge(self, change, allow_needs):
+ if not change.number:
+ self.log.debug("Change has no number; considering it merged")
+ # Good question. It's probably ref-updated, which, ah,
+ # means it's merged.
+ return True
+ data = change._data
+ if not data:
+ return False
+ if 'submitRecords' not in data:
+ return False
+ try:
+ for sr in data['submitRecords']:
+ if sr['status'] == 'OK':
+ return True
+ elif sr['status'] == 'NOT_READY':
+ for label in sr['labels']:
+ if label['status'] in ['OK', 'MAY']:
+ continue
+ elif label['status'] in ['NEED', 'REJECT']:
+ # It may be our own rejection, so we ignore
+ if label['label'].lower() not in allow_needs:
+ return False
+ continue
+ else:
+ # IMPOSSIBLE
+ return False
+ else:
+ # CLOSED, RULE_ERROR
+ return False
+ except:
+ self.log.exception("Exception determining whether change"
+ "%s can merge:" % change)
+ return False
+ return True
+
+ def maintainCache(self, relevant):
+ # This lets the user supply a list of change objects that are
+ # still in use. Anything in our cache that isn't in the supplied
+ # list should be safe to remove from the cache.
+ remove = []
+ for key, change in self._change_cache.items():
+ if change not in relevant:
+ remove.append(key)
+ for key in remove:
+ del self._change_cache[key]
+
+ def postConfig(self):
+ pass
+
+ def getChange(self, event, project):
+ if event.change_number:
+ change = self._getChange(event.change_number, event.patch_number)
+ elif event.ref:
+ change = Ref(project)
+ change.ref = event.ref
+ change.oldrev = event.oldrev
+ change.newrev = event.newrev
+ change.url = self.getGitwebUrl(project, sha=event.newrev)
+ else:
+ change = NullChange(project)
+ return change
+
+ def _getChange(self, number, patchset, refresh=False, history=None):
+ key = '%s,%s' % (number, patchset)
+ change = None
+ if key in self._change_cache:
+ change = self._change_cache.get(key)
+ if not refresh:
+ return change
+ if not change:
+ change = Change(None)
+ change.number = number
+ change.patchset = patchset
+ key = '%s,%s' % (change.number, change.patchset)
+ self._change_cache[key] = change
+ try:
+ self.updateChange(change, history)
+ except Exception:
+ del self._change_cache[key]
+ raise
+ return change
+
+ def getProjectOpenChanges(self, project):
+ # This is a best-effort function in case Gerrit is unable to return
+ # a particular change. It happens.
+ query = "project:%s status:open" % (project.name,)
+ self.log.debug("Running query %s to get project open changes" %
+ (query,))
+ data = self.gerrit.simpleQuery(query)
+ changes = []
+ for record in data:
+ try:
+ changes.append(
+ self._getChange(record['number'],
+ record['currentPatchSet']['number']))
+ except Exception:
+ self.log.exception("Unable to query change %s" %
+ (record.get('number'),))
+ return changes
+
+ def _getDependsOnFromCommit(self, message):
+ records = []
+ seen = set()
+ for match in self.depends_on_re.findall(message):
+ if match in seen:
+ self.log.debug("Ignoring duplicate Depends-On: %s" %
+ (match,))
+ continue
+ seen.add(match)
+ query = "change:%s" % (match,)
+ self.log.debug("Running query %s to find needed changes" %
+ (query,))
+ records.extend(self.gerrit.simpleQuery(query))
+ return records
+
+ def _getNeededByFromCommit(self, change_id):
+ records = []
+ seen = set()
+ query = 'message:%s' % change_id
+ self.log.debug("Running query %s to find changes needed-by" %
+ (query,))
+ results = self.gerrit.simpleQuery(query)
+ for result in results:
+ for match in self.depends_on_re.findall(
+ result['commitMessage']):
+ if match != change_id:
+ continue
+ key = (result['number'], result['currentPatchSet']['number'])
+ if key in seen:
+ continue
+ self.log.debug("Found change %s,%s needs %s from commit" %
+ (key[0], key[1], change_id))
+ seen.add(key)
+ records.append(result)
+ return records
+
+ def updateChange(self, change, history=None):
+ self.log.info("Updating information for %s,%s" %
+ (change.number, change.patchset))
+ data = self.gerrit.query(change.number)
+ change._data = data
+
+ if change.patchset is None:
+ change.patchset = data['currentPatchSet']['number']
+
+ if 'project' not in data:
+ raise Exception("Change %s,%s not found" % (change.number,
+ change.patchset))
+ # If updated changed came as a dependent on
+ # and its project is not defined,
+ # then create a 'foreign' project for it in layout
+ change.project = self.sched.getProject(data['project'],
+ create_foreign=bool(history))
+ change.branch = data['branch']
+ change.url = data['url']
+ max_ps = 0
+ files = []
+ for ps in data['patchSets']:
+ if ps['number'] == change.patchset:
+ change.refspec = ps['ref']
+ for f in ps.get('files', []):
+ files.append(f['file'])
+ if int(ps['number']) > int(max_ps):
+ max_ps = ps['number']
+ if max_ps == change.patchset:
+ change.is_current_patchset = True
+ else:
+ change.is_current_patchset = False
+ change.files = files
+
+ change.is_merged = self._isMerged(change)
+ change.approvals = data['currentPatchSet'].get('approvals', [])
+ change.open = data['open']
+ change.status = data['status']
+ change.owner = data['owner']
+
+ if change.is_merged:
+ # This change is merged, so we don't need to look any further
+ # for dependencies.
+ return change
+
+ if history is None:
+ history = []
+ else:
+ history = history[:]
+ history.append(change.number)
+
+ needs_changes = []
+ if 'dependsOn' in data:
+ parts = data['dependsOn'][0]['ref'].split('/')
+ dep_num, dep_ps = parts[3], parts[4]
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting git-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in needs_changes:
+ needs_changes.append(dep)
+
+ for record in self._getDependsOnFromCommit(data['commitMessage']):
+ dep_num = record['number']
+ dep_ps = record['currentPatchSet']['number']
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting commit-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in needs_changes:
+ needs_changes.append(dep)
+ change.needs_changes = needs_changes
+
+ needed_by_changes = []
+ if 'neededBy' in data:
+ for needed in data['neededBy']:
+ parts = needed['ref'].split('/')
+ dep_num, dep_ps = parts[3], parts[4]
+ dep = self._getChange(dep_num, dep_ps)
+ if (not dep.is_merged) and dep.is_current_patchset:
+ needed_by_changes.append(dep)
+
+ for record in self._getNeededByFromCommit(data['id']):
+ dep_num = record['number']
+ dep_ps = record['currentPatchSet']['number']
+ self.log.debug("Getting commit-needed change %s,%s" %
+ (dep_num, dep_ps))
+ # Because a commit needed-by may be a cross-repo
+ # dependency, cause that change to refresh so that it will
+ # reference the latest patchset of its Depends-On (this
+ # change).
+ dep = self._getChange(dep_num, dep_ps, refresh=True)
+ if (not dep.is_merged) and dep.is_current_patchset:
+ needed_by_changes.append(dep)
+ change.needed_by_changes = needed_by_changes
+
+ return change
+
+ def getGitUrl(self, project):
+ server = self.config.get('gerrit', 'server')
+ user = self.config.get('gerrit', 'user')
+ if self.config.has_option('gerrit', 'port'):
+ port = int(self.config.get('gerrit', 'port'))
+ else:
+ port = 29418
+ url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
+ return url
+
+ def getGitwebUrl(self, project, sha=None):
+ url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
+ if sha:
+ url += ';a=commitdiff;h=' + sha
+ return url
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 05d7581..428f67a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -13,13 +13,10 @@
# under the License.
import logging
-import re
import threading
import time
-import urllib2
import voluptuous
-from zuul.lib import gerrit
-from zuul.model import TriggerEvent, Change, Ref, NullChange
+from zuul.model import TriggerEvent
class GerritEventConnector(threading.Thread):
@@ -28,12 +25,13 @@
log = logging.getLogger("zuul.GerritEventConnector")
delay = 5.0
- def __init__(self, gerrit, sched, trigger):
+ def __init__(self, gerrit, sched, trigger, source):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.gerrit = gerrit
self.sched = sched
self.trigger = trigger
+ self.source = source
self._stopped = False
def stop(self):
@@ -98,9 +96,9 @@
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
- self.trigger._getChange(event.change_number,
- event.patch_number,
- refresh=True)
+ self.source._getChange(event.change_number,
+ event.patch_number,
+ refresh=True)
self.sched.addEvent(event)
@@ -118,398 +116,28 @@
class Gerrit(object):
name = 'gerrit'
- log = logging.getLogger("zuul.Gerrit")
- replication_timeout = 300
- replication_retry_interval = 5
+ log = logging.getLogger("zuul.trigger.Gerrit")
- depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
- re.MULTILINE | re.IGNORECASE)
-
- def __init__(self, config, sched):
- self._change_cache = {}
+ def __init__(self, gerrit, config, sched, source):
self.sched = sched
+ # TODO(jhesketh): Make 'gerrit' come from a connection (rather than the
+ # source)
+ # TODO(jhesketh): Remove the requirement for a gerrit source (currently
+ # it is needed so on a trigger event the cache is
+ # updated. However if we share a connection object the
+ # cache could be stored there)
self.config = config
- self.server = config.get('gerrit', 'server')
- if config.has_option('gerrit', 'baseurl'):
- self.baseurl = config.get('gerrit', 'baseurl')
- else:
- self.baseurl = 'https://%s' % self.server
- user = config.get('gerrit', 'user')
- if config.has_option('gerrit', 'sshkey'):
- sshkey = config.get('gerrit', 'sshkey')
- else:
- sshkey = None
- if config.has_option('gerrit', 'port'):
- port = int(config.get('gerrit', 'port'))
- else:
- port = 29418
- self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
- self.gerrit.startWatching()
- self.gerrit_connector = GerritEventConnector(
- self.gerrit, sched, self)
+ self.gerrit_connector = GerritEventConnector(gerrit, sched, self,
+ source)
self.gerrit_connector.start()
def stop(self):
self.gerrit_connector.stop()
self.gerrit_connector.join()
- def _getInfoRefs(self, project):
- url = "%s/p/%s/info/refs?service=git-upload-pack" % (
- self.baseurl, project)
- try:
- data = urllib2.urlopen(url).read()
- except:
- self.log.error("Cannot get references from %s" % url)
- raise # keeps urllib2 error informations
- ret = {}
- read_headers = False
- read_advertisement = False
- if data[4] != '#':
- raise Exception("Gerrit repository does not support "
- "git-upload-pack")
- i = 0
- while i < len(data):
- if len(data) - i < 4:
- raise Exception("Invalid length in info/refs")
- plen = int(data[i:i + 4], 16)
- i += 4
- # It's the length of the packet, including the 4 bytes of the
- # length itself, unless it's null, in which case the length is
- # not included.
- if plen > 0:
- plen -= 4
- if len(data) - i < plen:
- raise Exception("Invalid data in info/refs")
- line = data[i:i + plen]
- i += plen
- if not read_headers:
- if plen == 0:
- read_headers = True
- continue
- if not read_advertisement:
- read_advertisement = True
- continue
- if plen == 0:
- # The terminating null
- continue
- line = line.strip()
- revision, ref = line.split()
- ret[ref] = revision
- return ret
-
- def getRefSha(self, project, ref):
- refs = {}
- try:
- refs = self._getInfoRefs(project)
- except:
- self.log.exception("Exception looking for ref %s" %
- ref)
- sha = refs.get(ref, '')
- return sha
-
- def waitForRefSha(self, project, ref, old_sha=''):
- # Wait for the ref to show up in the repo
- start = time.time()
- while time.time() - start < self.replication_timeout:
- sha = self.getRefSha(project.name, ref)
- if old_sha != sha:
- return True
- time.sleep(self.replication_retry_interval)
- return False
-
- def isMerged(self, change, head=None):
- self.log.debug("Checking if change %s is merged" % change)
- if not change.number:
- self.log.debug("Change has no number; considering it merged")
- # Good question. It's probably ref-updated, which, ah,
- # means it's merged.
- return True
-
- data = self.gerrit.query(change.number)
- change._data = data
- change.is_merged = self._isMerged(change)
- if not head:
- return change.is_merged
- if not change.is_merged:
- return False
-
- ref = 'refs/heads/' + change.branch
- self.log.debug("Waiting for %s to appear in git repo" % (change))
- if self.waitForRefSha(change.project, ref, change._ref_sha):
- self.log.debug("Change %s is in the git repo" %
- (change))
- return True
- self.log.debug("Change %s did not appear in the git repo" %
- (change))
- return False
-
- def _isMerged(self, change):
- data = change._data
- if not data:
- return False
- status = data.get('status')
- if not status:
- return False
- self.log.debug("Change %s status: %s" % (change, status))
- if status == 'MERGED':
- return True
- return False
-
- def canMerge(self, change, allow_needs):
- if not change.number:
- self.log.debug("Change has no number; considering it merged")
- # Good question. It's probably ref-updated, which, ah,
- # means it's merged.
- return True
- data = change._data
- if not data:
- return False
- if 'submitRecords' not in data:
- return False
- try:
- for sr in data['submitRecords']:
- if sr['status'] == 'OK':
- return True
- elif sr['status'] == 'NOT_READY':
- for label in sr['labels']:
- if label['status'] in ['OK', 'MAY']:
- continue
- elif label['status'] in ['NEED', 'REJECT']:
- # It may be our own rejection, so we ignore
- if label['label'].lower() not in allow_needs:
- return False
- continue
- else:
- # IMPOSSIBLE
- return False
- else:
- # CLOSED, RULE_ERROR
- return False
- except:
- self.log.exception("Exception determining whether change"
- "%s can merge:" % change)
- return False
- return True
-
- def maintainCache(self, relevant):
- # This lets the user supply a list of change objects that are
- # still in use. Anything in our cache that isn't in the supplied
- # list should be safe to remove from the cache.
- remove = []
- for key, change in self._change_cache.items():
- if change not in relevant:
- remove.append(key)
- for key in remove:
- del self._change_cache[key]
-
def postConfig(self):
pass
- def getChange(self, event, project):
- if event.change_number:
- change = self._getChange(event.change_number, event.patch_number)
- elif event.ref:
- change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
- else:
- change = NullChange(project)
- return change
-
- def _getChange(self, number, patchset, refresh=False, history=None):
- key = '%s,%s' % (number, patchset)
- change = None
- if key in self._change_cache:
- change = self._change_cache.get(key)
- if not refresh:
- return change
- if not change:
- change = Change(None)
- change.number = number
- change.patchset = patchset
- key = '%s,%s' % (change.number, change.patchset)
- self._change_cache[key] = change
- try:
- self.updateChange(change, history)
- except Exception:
- del self._change_cache[key]
- raise
- return change
-
- def getProjectOpenChanges(self, project):
- # This is a best-effort function in case Gerrit is unable to return
- # a particular change. It happens.
- query = "project:%s status:open" % (project.name,)
- self.log.debug("Running query %s to get project open changes" %
- (query,))
- data = self.gerrit.simpleQuery(query)
- changes = []
- for record in data:
- try:
- changes.append(
- self._getChange(record['number'],
- record['currentPatchSet']['number']))
- except Exception:
- self.log.exception("Unable to query change %s" %
- (record.get('number'),))
- return changes
-
- def _getDependsOnFromCommit(self, message):
- records = []
- seen = set()
- for match in self.depends_on_re.findall(message):
- if match in seen:
- self.log.debug("Ignoring duplicate Depends-On: %s" %
- (match,))
- continue
- seen.add(match)
- query = "change:%s" % (match,)
- self.log.debug("Running query %s to find needed changes" %
- (query,))
- records.extend(self.gerrit.simpleQuery(query))
- return records
-
- def _getNeededByFromCommit(self, change_id):
- records = []
- seen = set()
- query = 'message:%s' % change_id
- self.log.debug("Running query %s to find changes needed-by" %
- (query,))
- results = self.gerrit.simpleQuery(query)
- for result in results:
- for match in self.depends_on_re.findall(
- result['commitMessage']):
- if match != change_id:
- continue
- key = (result['number'], result['currentPatchSet']['number'])
- if key in seen:
- continue
- self.log.debug("Found change %s,%s needs %s from commit" %
- (key[0], key[1], change_id))
- seen.add(key)
- records.append(result)
- return records
-
- def updateChange(self, change, history=None):
- self.log.info("Updating information for %s,%s" %
- (change.number, change.patchset))
- data = self.gerrit.query(change.number)
- change._data = data
-
- if change.patchset is None:
- change.patchset = data['currentPatchSet']['number']
-
- if 'project' not in data:
- raise Exception("Change %s,%s not found" % (change.number,
- change.patchset))
- # If updated changed came as a dependent on
- # and its project is not defined,
- # then create a 'foreign' project for it in layout
- change.project = self.sched.getProject(data['project'],
- create_foreign=bool(history))
- change.branch = data['branch']
- change.url = data['url']
- max_ps = 0
- files = []
- for ps in data['patchSets']:
- if ps['number'] == change.patchset:
- change.refspec = ps['ref']
- for f in ps.get('files', []):
- files.append(f['file'])
- if int(ps['number']) > int(max_ps):
- max_ps = ps['number']
- if max_ps == change.patchset:
- change.is_current_patchset = True
- else:
- change.is_current_patchset = False
- change.files = files
-
- change.is_merged = self._isMerged(change)
- change.approvals = data['currentPatchSet'].get('approvals', [])
- change.open = data['open']
- change.status = data['status']
- change.owner = data['owner']
-
- if change.is_merged:
- # This change is merged, so we don't need to look any further
- # for dependencies.
- return change
-
- if history is None:
- history = []
- else:
- history = history[:]
- history.append(change.number)
-
- needs_changes = []
- if 'dependsOn' in data:
- parts = data['dependsOn'][0]['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
- if dep_num in history:
- raise Exception("Dependency cycle detected: %s in %s" % (
- dep_num, history))
- self.log.debug("Getting git-dependent change %s,%s" %
- (dep_num, dep_ps))
- dep = self._getChange(dep_num, dep_ps, history=history)
- if (not dep.is_merged) and dep not in needs_changes:
- needs_changes.append(dep)
-
- for record in self._getDependsOnFromCommit(data['commitMessage']):
- dep_num = record['number']
- dep_ps = record['currentPatchSet']['number']
- if dep_num in history:
- raise Exception("Dependency cycle detected: %s in %s" % (
- dep_num, history))
- self.log.debug("Getting commit-dependent change %s,%s" %
- (dep_num, dep_ps))
- dep = self._getChange(dep_num, dep_ps, history=history)
- if (not dep.is_merged) and dep not in needs_changes:
- needs_changes.append(dep)
- change.needs_changes = needs_changes
-
- needed_by_changes = []
- if 'neededBy' in data:
- for needed in data['neededBy']:
- parts = needed['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
- dep = self._getChange(dep_num, dep_ps)
- if (not dep.is_merged) and dep.is_current_patchset:
- needed_by_changes.append(dep)
-
- for record in self._getNeededByFromCommit(data['id']):
- dep_num = record['number']
- dep_ps = record['currentPatchSet']['number']
- self.log.debug("Getting commit-needed change %s,%s" %
- (dep_num, dep_ps))
- # Because a commit needed-by may be a cross-repo
- # dependency, cause that change to refresh so that it will
- # reference the latest patchset of its Depends-On (this
- # change).
- dep = self._getChange(dep_num, dep_ps, refresh=True)
- if (not dep.is_merged) and dep.is_current_patchset:
- needed_by_changes.append(dep)
- change.needed_by_changes = needed_by_changes
-
- return change
-
- def getGitUrl(self, project):
- server = self.config.get('gerrit', 'server')
- user = self.config.get('gerrit', 'user')
- if self.config.has_option('gerrit', 'port'):
- port = int(self.config.get('gerrit', 'port'))
- else:
- port = 29418
- url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
- return url
-
- def getGitwebUrl(self, project, sha=None):
- url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
- if sha:
- url += ';a=commitdiff;h=' + sha
- return url
-
def validate_trigger(trigger_data):
"""Validates the layout's trigger data."""
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index 3d5cd9b..be22a70 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -41,17 +41,6 @@
def stop(self):
self.apsched.shutdown()
- def isMerged(self, change, head=None):
- raise Exception("Timer trigger does not support checking if "
- "a change is merged.")
-
- def canMerge(self, change, allow_needs):
- raise Exception("Timer trigger does not support checking if "
- "a change can merge.")
-
- def maintainCache(self, relevant):
- return
-
def postConfig(self):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
@@ -81,12 +70,3 @@
second=second,
args=(pipeline.name,
timespec,))
-
- def getChange(self, event, project):
- raise Exception("Timer trigger does not support changes.")
-
- def getGitUrl(self, project):
- raise Exception("Timer trigger does not support changes.")
-
- def getGitwebUrl(self, project, sha=None):
- raise Exception("Timer trigger does not support changes.")
diff --git a/zuul/trigger/zuultrigger.py b/zuul/trigger/zuultrigger.py
index 4418d6f..3cc4ff9 100644
--- a/zuul/trigger/zuultrigger.py
+++ b/zuul/trigger/zuultrigger.py
@@ -30,17 +30,6 @@
def stop(self):
pass
- def isMerged(self, change, head=None):
- raise Exception("Zuul trigger does not support checking if "
- "a change is merged.")
-
- def canMerge(self, change, allow_needs):
- raise Exception("Zuul trigger does not support checking if "
- "a change can merge.")
-
- def maintainCache(self, relevant):
- return
-
def onChangeMerged(self, change):
# Called each time zuul merges a change
if self._handle_project_change_merged_events:
@@ -62,7 +51,7 @@
"%s in %s" % (change, pipeline))
def _createProjectChangeMergedEvents(self, change):
- changes = self.sched.triggers['gerrit'].getProjectOpenChanges(
+ changes = self.sched.sources['gerrit'].getProjectOpenChanges(
change.project)
for open_change in changes:
self._createProjectChangeMergedEvent(open_change)
@@ -111,12 +100,3 @@
self._handle_parent_change_enqueued_events = True
elif 'project-change-merged' in ef._types:
self._handle_project_change_merged_events = True
-
- def getChange(self, number, patchset, refresh=False):
- raise Exception("Zuul trigger does not support changes.")
-
- def getGitUrl(self, project):
- raise Exception("Zuul trigger does not support changes.")
-
- def getGitwebUrl(self, project, sha=None):
- raise Exception("Zuul trigger does not support changes.")