Merge "Allow pipelines triggers to filter by username"
diff --git a/README.rst b/README.rst
index b66305e..1b227e7 100644
--- a/README.rst
+++ b/README.rst
@@ -6,7 +6,8 @@
Contributing
------------
-To get the latest code, see: https://github.com/openstack-infra/zuul
+To browse the latest code, see: https://git.openstack.org/cgit/openstack-infra/zuul/tree/
+To clone the latest code, use `git clone git://git.openstack.org/openstack-infra/zuul`
Bugs are handled at: https://launchpad.net/zuul
diff --git a/doc/source/reporters.rst b/doc/source/reporters.rst
index 63a86c4..7c0214d 100644
--- a/doc/source/reporters.rst
+++ b/doc/source/reporters.rst
@@ -42,8 +42,8 @@
zuul.conf contains the SMTP server and default to/from as describe
in :ref:`zuulconf`.
-Each pipeline can overwrite the to or from address by providing
-alternatives as arguments to the reporter. For example, ::
+Each pipeline can overwrite the subject or the to or from address by
+providing alternatives as arguments to the reporter. For example, ::
pipelines:
- name: post-merge
@@ -57,3 +57,4 @@
smtp:
to: you@example.com
from: alternative@example.com
+ subject: Change {change} failed
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index a03cc1c..f71df22 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -160,6 +160,16 @@
This can be overridden by individual pipelines.
``default_to=you@example.com``
+replication
+"""""""""""
+
+Zuul can push the refs it creates to any number of servers. To do so,
+list the git push URLs in this section, one per line as follows::
+
+ [replication]
+ url1=ssh://user@host1.example.com:port/path/to/repo
+ url2=ssh://user@host2.example.com:port/path/to/repo
+
layout.yaml
~~~~~~~~~~~
diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml
new file mode 100644
index 0000000..9f6748c
--- /dev/null
+++ b/tests/fixtures/layout-rate-limit.yaml
@@ -0,0 +1,32 @@
+pipelines:
+ - name: gate
+ manager: DependentPipelineManager
+ failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ start:
+ gerrit:
+ verified: 0
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ window: 2
+ window-floor: 1
+ window-increase-type: linear
+ window-increase-factor: 1
+ window-decrease-type: exponential
+ window-decrease-factor: 2
+
+projects:
+ - name: org/project
+ gate:
+ - project-merge:
+ - project-test1
+ - project-test2
diff --git a/tests/fixtures/layout-timer-smtp.yaml b/tests/fixtures/layout-timer-smtp.yaml
new file mode 100644
index 0000000..ac59df4
--- /dev/null
+++ b/tests/fixtures/layout-timer-smtp.yaml
@@ -0,0 +1,23 @@
+pipelines:
+ - name: periodic
+ manager: IndependentPipelineManager
+ trigger:
+ timer:
+ - time: '* * * * * */10'
+ success:
+ smtp:
+ to: alternative_me@example.com
+ from: zuul_from@example.com
+ subject: 'Periodic check for {change.project} succeeded'
+
+jobs:
+ - name: project-bitrot-stable-old
+ success-pattern: http://logs.example.com/{job.name}/{build.number}
+ - name: project-bitrot-stable-older
+ success-pattern: http://logs.example.com/{job.name}/{build.number}
+
+projects:
+ - name: org/project
+ periodic:
+ - project-bitrot-stable-old
+ - project-bitrot-stable-older
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 48f2281..0ce0f88 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -690,11 +690,11 @@
class FakeSMTP(object):
log = logging.getLogger('zuul.FakeSMTP')
- messages = []
- def __init__(self, server, port):
+ def __init__(self, messages, server, port):
self.server = server
self.port = port
+ self.messages = messages
def sendmail(self, from_email, to_email, msg):
self.log.info("Sending email from %s, to %s, with msg %s" % (
@@ -703,7 +703,7 @@
headers = msg.split('\n\n', 1)[0]
body = msg.split('\n\n', 1)[1]
- FakeSMTP.messages.append(dict(
+ self.messages.append(dict(
from_email=from_email,
to_email=to_email,
msg=msg,
@@ -729,7 +729,7 @@
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
- self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+ self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
@@ -800,8 +800,14 @@
urllib2.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
+ self.smtp_messages = []
+
+ def FakeSMTPFactory(*args, **kw):
+ args = [self.smtp_messages] + list(args)
+ return FakeSMTP(*args, **kw)
+
zuul.lib.gerrit.Gerrit = FakeGerrit
- self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTP))
+ self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
self.gerrit = FakeGerritTrigger(
self.upstream_root, self.config, self.sched)
@@ -2914,6 +2920,39 @@
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
+ def test_push_urls(self):
+ "Test that Zuul can push refs to multiple URLs"
+ upstream_path = os.path.join(self.upstream_root, 'org/project')
+ replica1 = os.path.join(self.upstream_root, 'replica1')
+ replica2 = os.path.join(self.upstream_root, 'replica2')
+
+ self.config.add_section('replication')
+ self.config.set('replication', 'url1', 'file://%s' % replica1)
+ self.config.set('replication', 'url2', 'file://%s' % replica2)
+ self.sched.reconfigure(self.config)
+
+ r1 = git.Repo.clone_from(upstream_path, replica1 + '/org/project.git')
+ r2 = git.Repo.clone_from(upstream_path, replica2 + '/org/project.git')
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
+ B.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ count = 0
+ for ref in r1.refs:
+ if ref.path.startswith('refs/zuul'):
+ count += 1
+ self.assertEqual(count, 3)
+
+ count = 0
+ for ref in r2.refs:
+ if ref.path.startswith('refs/zuul'):
+ count += 1
+ self.assertEqual(count, 3)
+
def test_timer(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
@@ -2972,25 +3011,64 @@
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
- self.assertEqual(len(FakeSMTP.messages), 2)
+ self.assertEqual(len(self.smtp_messages), 2)
# A.messages only holds what FakeGerrit places in it. Thus we
# work on the knowledge of what the first message should be as
# it is only configured to go to SMTP.
self.assertEqual('zuul@example.com',
- FakeSMTP.messages[0]['from_email'])
+ self.smtp_messages[0]['from_email'])
self.assertEqual(['you@example.com'],
- FakeSMTP.messages[0]['to_email'])
+ self.smtp_messages[0]['to_email'])
self.assertEqual('Starting check jobs.',
- FakeSMTP.messages[0]['body'])
+ self.smtp_messages[0]['body'])
self.assertEqual('zuul_from@example.com',
- FakeSMTP.messages[1]['from_email'])
+ self.smtp_messages[1]['from_email'])
self.assertEqual(['alternative_me@example.com'],
- FakeSMTP.messages[1]['to_email'])
+ self.smtp_messages[1]['to_email'])
self.assertEqual(A.messages[0],
- FakeSMTP.messages[1]['body'])
+ self.smtp_messages[1]['body'])
+
+ def test_timer_smtp(self):
+ "Test that a periodic job is triggered"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-timer-smtp.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+
+ start = time.time()
+ failed = True
+ while ((time.time() - start) < 30):
+ if len(self.history) == 2:
+ failed = False
+ break
+ else:
+ time.sleep(1)
+
+ if failed:
+ raise Exception("Expected jobs never ran")
+
+ self.waitUntilSettled()
+
+ self.assertEqual(self.getJobFromHistory(
+ 'project-bitrot-stable-old').result, 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory(
+ 'project-bitrot-stable-older').result, 'SUCCESS')
+
+ self.assertEqual(len(self.smtp_messages), 1)
+
+ # A.messages only holds what FakeGerrit places in it. Thus we
+ # work on the knowledge of what the first message should be as
+ # it is only configured to go to SMTP.
+
+ self.assertEqual('zuul_from@example.com',
+ self.smtp_messages[0]['from_email'])
+ self.assertEqual(['alternative_me@example.com'],
+ self.smtp_messages[0]['to_email'])
+ self.assertIn('Subject: Periodic check for org/project succeeded',
+ self.smtp_messages[0]['headers'])
def test_client_enqueue(self):
"Test that the RPC client can enqueue a change"
@@ -3075,11 +3153,22 @@
self.waitUntilSettled()
+ items = self.sched.layout.pipelines['gate'].getAllItems()
+ enqueue_times = {}
+ for item in items:
+ enqueue_times[str(item.change)] = item.enqueue_time
+
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['2,1', '3,1'])
+ # ensure that enqueue times are durable
+ items = self.sched.layout.pipelines['gate'].getAllItems()
+ for item in items:
+ self.assertEqual(
+ enqueue_times[str(item.change)], item.enqueue_time)
+
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
@@ -3212,3 +3301,167 @@
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
+
+ def test_queue_rate_limiting(self):
+ "Test that DependentPipelines are rate limited with dep across window"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-rate-limit.yaml')
+ self.sched.reconfigure(self.config)
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+ C.setDependsOn(B, 1)
+ self.worker.addFailTest('project-test1', A)
+
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ # Only A and B will have their merge jobs queued because
+ # window is 2.
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-merge')
+ self.assertEqual(self.builds[1].name, 'project-merge')
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Only A and B will have their test jobs queued because
+ # window is 2.
+ self.assertEqual(len(self.builds), 4)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+
+ self.worker.release('project-.*')
+ self.waitUntilSettled()
+
+ queue = self.sched.layout.pipelines['gate'].queues[0]
+ # A failed so window is reduced by 1 to 1.
+ self.assertEqual(queue.window, 1)
+ self.assertEqual(queue.window_floor, 1)
+ self.assertEqual(A.data['status'], 'NEW')
+
+ # Gate is reset and only B's merge job is queued because
+ # window shrunk to 1.
+ self.assertEqual(len(self.builds), 1)
+ self.assertEqual(self.builds[0].name, 'project-merge')
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Only B's test jobs are queued because window is still 1.
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+
+ self.worker.release('project-.*')
+ self.waitUntilSettled()
+
+ # B was successfully merged so window is increased to 2.
+ self.assertEqual(queue.window, 2)
+ self.assertEqual(queue.window_floor, 1)
+ self.assertEqual(B.data['status'], 'MERGED')
+
+ # Only C is left and its merge job is queued.
+ self.assertEqual(len(self.builds), 1)
+ self.assertEqual(self.builds[0].name, 'project-merge')
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # After successful merge job the test jobs for C are queued.
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+
+ self.worker.release('project-.*')
+ self.waitUntilSettled()
+
+ # C successfully merged so window is bumped to 3.
+ self.assertEqual(queue.window, 3)
+ self.assertEqual(queue.window_floor, 1)
+ self.assertEqual(C.data['status'], 'MERGED')
+
+ def test_queue_rate_limiting_dependent(self):
+ "Test that DependentPipelines are rate limited with dep in window"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-rate-limit.yaml')
+ self.sched.reconfigure(self.config)
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+ B.setDependsOn(A, 1)
+
+ self.worker.addFailTest('project-test1', A)
+
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ # Only A and B will have their merge jobs queued because
+ # window is 2.
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-merge')
+ self.assertEqual(self.builds[1].name, 'project-merge')
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Only A and B will have their test jobs queued because
+ # window is 2.
+ self.assertEqual(len(self.builds), 4)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+ self.assertEqual(self.builds[3].name, 'project-test2')
+
+ self.worker.release('project-.*')
+ self.waitUntilSettled()
+
+ queue = self.sched.layout.pipelines['gate'].queues[0]
+ # A failed so window is reduced by 1 to 1.
+ self.assertEqual(queue.window, 1)
+ self.assertEqual(queue.window_floor, 1)
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ # Gate is reset and only C's merge job is queued because
+ # window shrunk to 1 and A and B were dequeued.
+ self.assertEqual(len(self.builds), 1)
+ self.assertEqual(self.builds[0].name, 'project-merge')
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Only C's test jobs are queued because window is still 1.
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test2')
+
+ self.worker.release('project-.*')
+ self.waitUntilSettled()
+
+ # C was successfully merged so window is increased to 2.
+ self.assertEqual(queue.window, 2)
+ self.assertEqual(queue.window_floor, 1)
+ self.assertEqual(C.data['status'], 'MERGED')
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 3a51b1c..c2a9dab 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -30,11 +30,10 @@
import signal
import traceback
-import gear
-
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
def stack_dump_handler(signum, frame):
@@ -149,6 +148,7 @@
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
+ import gear
gear.Server(4730)
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index d4ff143..d464998 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -56,7 +56,15 @@
{'timer': toList(timer_trigger)}))
report_actions = {'gerrit': variable_dict,
- 'smtp': variable_dict}
+ 'smtp': {'to': str,
+ 'from': str,
+ 'subject': str,
+ },
+ }
+ window = v.All(int, v.Range(min=1))
+ window_floor = v.All(int, v.Range(min=1))
+ window_type = v.Any('linear', 'exponential')
+ window_factor = v.All(int, v.Range(min=1))
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
@@ -69,6 +77,12 @@
'success': report_actions,
'failure': report_actions,
'start': report_actions,
+ 'window': window,
+ 'window-floor': window_floor,
+ 'window-increase-type': window_type,
+ 'window-increase-factor': window_factor,
+ 'window-decrease-type': window_type,
+ 'window-decrease-factor': window_factor,
}
pipelines = [pipeline]
diff --git a/zuul/merger.py b/zuul/merger.py
index 1f3c547..09011ae 100644
--- a/zuul/merger.py
+++ b/zuul/merger.py
@@ -16,6 +16,7 @@
import os
import logging
import model
+import threading
class ZuulReference(git.Reference):
@@ -131,6 +132,11 @@
self.remote_url))
repo.remotes.origin.push('%s:%s' % (local, remote))
+ def push_url(self, url, refspecs):
+ repo = self.createRepoObject()
+ self.log.debug("Pushing %s to %s" % (refspecs, url))
+ repo.git.push([url] + refspecs)
+
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
@@ -142,7 +148,7 @@
log = logging.getLogger("zuul.Merger")
def __init__(self, trigger, working_root, push_refs, sshkey, email,
- username):
+ username, replicate_urls):
self.trigger = trigger
self.repos = {}
self.working_root = working_root
@@ -153,6 +159,7 @@
self._makeSSHWrapper(sshkey)
self.email = email
self.username = username
+ self.replicate_urls = replicate_urls
def _makeSSHWrapper(self, key):
name = os.path.join(self.working_root, '.ssh_wrapper')
@@ -219,6 +226,25 @@
return False
return commit
+ def replicateRefspecs(self, refspecs):
+ threads = []
+ for url in self.replicate_urls:
+ t = threading.Thread(target=self._replicate,
+ args=(url, refspecs))
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+
+ def _replicate(self, url, project_refspecs):
+ try:
+ for project, refspecs in project_refspecs.items():
+ repo = self.getRepo(project)
+ repo.push_url(os.path.join(url, project.name + '.git'),
+ refspecs)
+ except Exception:
+ self.log.exception("Exception pushing to %s" % url)
+
def mergeChanges(self, items, target_ref=None):
# Merge shortcuts:
# if this is the only change just merge it against its branch.
@@ -257,6 +283,7 @@
return commit
project_branches = []
+ replicate_refspecs = {}
for i in reversed(items):
# Here we create all of the necessary zuul refs and potentially
# push them back to Gerrit.
@@ -276,10 +303,13 @@
self.log.exception("Unable to set zuul ref %s for "
"change %s" % (zuul_ref, i.change))
return False
+ ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
+ refspecs = replicate_refspecs.get(i.change.project, [])
+ refspecs.append('%s:%s' % (ref, ref))
+ replicate_refspecs[i.change.project] = refspecs
if self.push_refs:
# Push the results upstream to the zuul ref after
# they are created.
- ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
try:
repo.push(ref, ref)
complete = self.trigger.waitForRefSha(i.change.project,
@@ -291,5 +321,5 @@
self.log.error("Ref %s did not show up in repo" % ref)
return False
project_branches.append((i.change.project, i.change.branch))
-
+ self.replicateRefspecs(replicate_refspecs)
return commit
diff --git a/zuul/model.py b/zuul/model.py
index e62feed..a2573f7 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -59,6 +59,12 @@
self.start_actions = None
self.success_actions = None
self.failure_actions = None
+ self.window = None
+ self.window_floor = None
+ self.window_increase_type = None
+ self.window_increase_factor = None
+ self.window_decrease_type = None
+ self.window_decrease_factor = None
def __repr__(self):
return '<Pipeline %s>' % self.name
@@ -375,13 +381,21 @@
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
PipelineQueues."""
- def __init__(self, pipeline, dependent=True):
+ def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
+ window_increase_type='linear', window_increase_factor=1,
+ window_decrease_type='exponential', window_decrease_factor=2):
self.pipeline = pipeline
self.name = ''
self.projects = []
self._jobs = set()
self.queue = []
self.dependent = dependent
+ self.window = window
+ self.window_floor = window_floor
+ self.window_increase_type = window_increase_type
+ self.window_increase_factor = window_increase_factor
+ self.window_decrease_type = window_decrease_type
+ self.window_decrease_factor = window_decrease_factor
def __repr__(self):
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
@@ -398,7 +412,7 @@
self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
def enqueueChange(self, change):
- item = QueueItem(self.pipeline, change)
+ item = QueueItem(self, self.pipeline, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
@@ -444,6 +458,32 @@
def mergeChangeQueue(self, other):
for project in other.projects:
self.addProject(project)
+ self.window = min(self.window, other.window)
+ # TODO merge semantics
+
+ def getActionableItems(self):
+ if self.dependent and self.window:
+ return self.queue[:self.window]
+ else:
+ return self.queue[:]
+
+ def increaseWindowSize(self):
+ if self.dependent:
+ if self.window_increase_type == 'linear':
+ self.window += self.window_increase_factor
+ elif self.window_increase_type == 'exponential':
+ self.window *= self.window_increase_factor
+
+ def decreaseWindowSize(self):
+ if self.dependent:
+ if self.window_decrease_type == 'linear':
+ self.window = max(
+ self.window_floor,
+ self.window - self.window_decrease_factor)
+ elif self.window_decrease_type == 'exponential':
+ self.window = max(
+ self.window_floor,
+ self.window / self.window_decrease_factor)
class Project(object):
@@ -619,7 +659,8 @@
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
- def __init__(self, pipeline, change):
+ def __init__(self, change_queue, pipeline, change):
+ self.change_queue = change_queue
self.pipeline = pipeline
self.change = change # a changeish
self.build_sets = []
@@ -661,7 +702,6 @@
class Changeish(object):
"""Something like a change; either a change or a ref"""
- is_reportable = False
def __init__(self, project):
self.project = project
@@ -680,8 +720,6 @@
class Change(Changeish):
- is_reportable = True
-
def __init__(self, project):
super(Change, self).__init__(project)
self.branch = None
@@ -729,8 +767,6 @@
class Ref(Changeish):
- is_reportable = False
-
def __init__(self, project):
super(Ref, self).__init__(project)
self.ref = None
@@ -767,7 +803,8 @@
class NullChange(Changeish):
- is_reportable = False
+ def __repr__(self):
+ return '<NullChange for %s>' % (self.project)
def _id(self):
return None
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
index 66dcd45..b214019 100644
--- a/zuul/reporter/smtp.py
+++ b/zuul/reporter/smtp.py
@@ -46,7 +46,11 @@
to_email = params['to']\
if 'to' in params else self.smtp_default_to
msg = MIMEText(message)
- msg['Subject'] = "Report change %s" % change
+ if 'subject' in params:
+ subject = params['subject'].format(change=change)
+ else:
+ subject = "Report for change %s" % change
+ msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = to_email
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index ea25948..a2d1d34 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
import model
from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
import merger
+from zuul import version as zuul_version
statsd = extras.try_import('statsd.statsd')
@@ -130,6 +131,8 @@
self.management_event_queue = Queue.Queue()
self.layout = model.Layout()
+ self.zuul_version = zuul_version.version_info.version_string()
+
def stop(self):
self._stopped = True
self.wake_event.set()
@@ -197,6 +200,17 @@
pipeline.success_actions = action_reporters['success']
pipeline.failure_actions = action_reporters['failure']
+ pipeline.window = conf_pipeline.get('window', 20)
+ pipeline.window_floor = conf_pipeline.get('window-floor', 3)
+ pipeline.window_increase_type = conf_pipeline.get(
+ 'window-increase-type', 'linear')
+ pipeline.window_increase_factor = conf_pipeline.get(
+ 'window-increase-factor', 1)
+ pipeline.window_decrease_type = conf_pipeline.get(
+ 'window-decrease-type', 'exponential')
+ pipeline.window_decrease_factor = conf_pipeline.get(
+ 'window-decrease-factor', 2)
+
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
layout.pipelines[conf_pipeline['name']] = pipeline
@@ -355,6 +369,11 @@
else:
push_refs = False
+ replicate_urls = []
+ if self.config.has_section('replication'):
+ for k, v in self.config.items('replication'):
+ replicate_urls.append(v)
+
if self.config.has_option('gerrit', 'sshkey'):
sshkey = self.config.get('gerrit', 'sshkey')
else:
@@ -365,7 +384,8 @@
# location.
self.merger = merger.Merger(self.triggers['gerrit'],
merge_root, push_refs,
- sshkey, merge_email, merge_name)
+ sshkey, merge_email, merge_name,
+ replicate_urls)
for project in self.layout.projects.values():
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.addProject(project, url)
@@ -604,7 +624,10 @@
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
- pipeline.manager.addChange(item.change, quiet=True)
+ pipeline.manager.addChange(
+ item.change,
+ enqueue_time=item.enqueue_time,
+ quiet=True)
while pipeline.manager.processQueue():
pass
@@ -763,6 +786,9 @@
def formatStatusJSON(self):
data = {}
+
+ data['zuul_version'] = self.zuul_version
+
if self._pause:
ret = '<p><b>Queue only mode:</b> preparing to '
if self._exit:
@@ -895,10 +921,6 @@
"""
report_errors = []
if len(action_reporters) > 0:
- if not change.number:
- self.log.debug("Not reporting change %s: No number present."
- % change)
- return
for action_reporter in action_reporters:
ret = action_reporter.report(change, message)
if ret:
@@ -967,7 +989,7 @@
item.change.project)
return False
- def addChange(self, change, quiet=False):
+ def addChange(self, change, quiet=False, enqueue_time=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -994,6 +1016,8 @@
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
+ if enqueue_time:
+ item.enqueue_time = enqueue_time
self.reportStats(item)
self.enqueueChangesBehind(change, quiet)
else:
@@ -1161,7 +1185,7 @@
for queue in self.pipeline.queues:
queue_changed = False
nnfi = None # Nearest non-failing item
- for item in queue.queue[:]:
+ for item in queue.getActionableItems():
item_changed, nnfi = self._processOneItem(item, nnfi)
if item_changed:
queue_changed = True
@@ -1222,7 +1246,7 @@
return True
def reportItem(self, item):
- if item.change.is_reportable and item.reported:
+ if item.reported:
raise Exception("Already reported change %s" % item.change)
ret = self._reportItem(item)
if self.changes_merge:
@@ -1236,12 +1260,19 @@
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
+ item.change_queue.decreaseWindowSize()
+ self.log.debug("%s window size decreased to %s" %
+ (item.change_queue,
+ item.change_queue.window))
raise MergeFailure("Change %s failed to merge" % item.change)
+ else:
+ item.change_queue.increaseWindowSize()
+ self.log.debug("%s window size increased to %s" %
+ (item.change_queue,
+ item.change_queue.window))
def _reportItem(self, item):
- if not item.change.is_reportable:
- return False
- if item.change.is_reportable and item.reported:
+ if item.reported:
return 0
self.log.debug("Reporting change %s" % item.change)
ret = True # Means error as returned by trigger.report
@@ -1253,18 +1284,19 @@
else:
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
- report = self.formatReport(item)
item.reported = True
- try:
- self.log.info("Reporting change %s, actions: %s" %
- (item.change, actions))
- ret = self.sendReport(actions, item.change, report)
- if ret:
- self.log.error("Reporting change %s received: %s" %
- (item.change, ret))
- except:
- self.log.exception("Exception while reporting:")
- item.setReportedResult('ERROR')
+ if actions:
+ report = self.formatReport(item)
+ try:
+ self.log.info("Reporting change %s, actions: %s" %
+ (item.change, actions))
+ ret = self.sendReport(actions, item.change, report)
+ if ret:
+ self.log.error("Reporting change %s received: %s" %
+ (item.change, ret))
+ except:
+ self.log.exception("Exception while reporting:")
+ item.setReportedResult('ERROR')
self.updateBuildDescriptions(item.current_build_set)
return ret
@@ -1494,7 +1526,14 @@
change_queues = []
for project in self.pipeline.getProjects():
- change_queue = ChangeQueue(self.pipeline)
+ change_queue = ChangeQueue(
+ self.pipeline,
+ window=self.pipeline.window,
+ window_floor=self.pipeline.window_floor,
+ window_increase_type=self.pipeline.window_increase_type,
+ window_increase_factor=self.pipeline.window_increase_factor,
+ window_decrease_type=self.pipeline.window_decrease_type,
+ window_decrease_factor=self.pipeline.window_decrease_factor)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index b75a165..f055a50 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -28,11 +28,12 @@
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
- def _onTrigger(self, timespec):
+ def _onTrigger(self, pipeline_name, timespec):
for project in self.sched.layout.projects.values():
event = TriggerEvent()
event.type = 'timer'
event.timespec = timespec
+ event.forced_pipeline = pipeline_name
event.project_name = project.name
self.log.debug("Adding event %s" % event)
self.sched.addEvent(event)
@@ -78,7 +79,8 @@
hour=hour,
minute=minute,
second=second,
- args=(timespec,))
+ args=(pipeline.name,
+ timespec,))
def getChange(self, number, patchset, refresh=False):
raise Exception("Timer trigger does not support changes.")