Merge branch 'master' into feature/zuulv3
Conflicts:
zuul/model.py
zuul/scheduler.py
Change-Id: I2973bfae65b3658549dc13aa3ea0efe60669ba8e
diff --git a/.gitignore b/.gitignore
index b59cb77..f516785 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,7 +9,6 @@
AUTHORS
build/*
ChangeLog
-config
doc/build/*
zuul/versioninfo
dist/
diff --git a/.gitreview b/.gitreview
index 665adb6..9ba1bdc 100644
--- a/.gitreview
+++ b/.gitreview
@@ -2,3 +2,4 @@
host=review.openstack.org
port=29418
project=openstack-infra/zuul.git
+defaultbranch=feature/zuulv3
diff --git a/requirements.txt b/requirements.txt
index 8388f0b..84b9008 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,3 +15,4 @@
PrettyTable>=0.6,<0.8
babel>=1.0
six>=1.6.0
+ansible>=2.0.0.1
diff --git a/tests/base.py b/tests/base.py
index 405caa0..f8eed53 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -30,6 +30,7 @@
import string
import subprocess
import swiftclient
+import tempfile
import threading
import time
import urllib2
@@ -47,8 +48,10 @@
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
-import zuul.launcher.gearman
+import zuul.launcher.ansiblelaunchserver
+import zuul.launcher.launchclient
import zuul.lib.swift
+import zuul.lib.connections
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
@@ -634,6 +637,23 @@
self.worker.lock.release()
+class RecordingLaunchServer(zuul.launcher.ansiblelaunchserver.LaunchServer):
+ def __init__(self, *args, **kw):
+ super(RecordingLaunchServer, self).__init__(*args, **kw)
+ self.job_history = []
+
+ def launch(self, job):
+ self.job_history.append(job)
+ job.data = []
+
+ def sendWorkComplete(data=b''):
+ job.data.append(data)
+ gear.WorkerJob.sendWorkComplete(job, data)
+
+ job.sendWorkComplete = sendWorkComplete
+ super(RecordingLaunchServer, self).launch(job)
+
+
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
@@ -864,6 +884,7 @@
class ZuulTestCase(BaseTestCase):
+ config_file = 'zuul.conf'
def setUp(self):
super(ZuulTestCase, self).setUp()
@@ -884,9 +905,9 @@
# Make per test copy of Configuration.
self.setup_config()
- self.config.set('zuul', 'layout_config',
+ self.config.set('zuul', 'tenant_config',
os.path.join(FIXTURE_DIR,
- self.config.get('zuul', 'layout_config')))
+ self.config.get('zuul', 'tenant_config')))
self.config.set('merger', 'git_dir', self.git_root)
# For each project in config:
@@ -907,6 +928,8 @@
self.init_repo("org/experimental-project")
self.init_repo("org/no-jobs-project")
+ self.setup_repos()
+
self.statsd = FakeStatsd()
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
@@ -921,10 +944,6 @@
self.config.set('gearman', 'port', str(self.gearman_server.port))
- self.worker = FakeWorker('fake_worker', self)
- self.worker.addServer('127.0.0.1', self.gearman_server.port)
- self.gearman_server.worker = self.worker
-
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
@@ -940,9 +959,13 @@
self.sched.trigger_event_queue
]
- self.configure_connections()
+ self.configure_connections(self.sched)
self.sched.registerConnections(self.connections)
+ self.ansible_server = RecordingLaunchServer(
+ self.config, self.connections)
+ self.ansible_server.start()
+
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib2.Request):
return old_urlopen(*args, **kw)
@@ -955,8 +978,8 @@
self.connections)
self.merge_server.start()
- self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
- self.swift)
+ self.launcher = zuul.launcher.launchclient.LaunchClient(
+ self.config, self.sched, self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
@@ -972,14 +995,11 @@
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
- self.registerJobs()
- self.builds = self.worker.running_builds
- self.history = self.worker.build_history
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
- def configure_connections(self):
+ def configure_connections(self, sched):
# Register connections from the config
self.smtp_messages = []
@@ -993,7 +1013,7 @@
# a virtual canonical database given by the configured hostname
self.gerrit_changes_dbs = {}
self.gerrit_queues_dbs = {}
- self.connections = {}
+ self.connections = zuul.lib.connections.ConnectionRegistry(sched)
for section_name in self.config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
@@ -1018,15 +1038,16 @@
Queue.Queue()
self.event_queues.append(
self.gerrit_queues_dbs[con_config['server']])
- self.connections[con_name] = FakeGerritConnection(
+ self.connections.connections[con_name] = FakeGerritConnection(
con_name, con_config,
changes_db=self.gerrit_changes_dbs[con_config['server']],
queues_db=self.gerrit_queues_dbs[con_config['server']],
upstream_root=self.upstream_root
)
- setattr(self, 'fake_' + con_name, self.connections[con_name])
+ setattr(self, 'fake_' + con_name,
+ self.connections.connections[con_name])
elif con_driver == 'smtp':
- self.connections[con_name] = \
+ self.connections.connections[con_name] = \
zuul.connection.smtp.SMTPConnection(con_name, con_config)
else:
raise Exception("Unknown driver, %s, for connection %s"
@@ -1039,20 +1060,26 @@
self.gerrit_changes_dbs['gerrit'] = {}
self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
- self.connections['gerrit'] = FakeGerritConnection(
+ self.connections.connections['gerrit'] = FakeGerritConnection(
'_legacy_gerrit', dict(self.config.items('gerrit')),
changes_db=self.gerrit_changes_dbs['gerrit'],
queues_db=self.gerrit_queues_dbs['gerrit'])
if 'smtp' in self.config.sections():
- self.connections['smtp'] = \
+ self.connections.connections['smtp'] = \
zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(self.config.items('smtp')))
- def setup_config(self, config_file='zuul.conf'):
+ def setup_config(self):
"""Per test config object. Override to set different config."""
self.config = ConfigParser.ConfigParser()
- self.config.read(os.path.join(FIXTURE_DIR, config_file))
+ self.config.read(os.path.join(FIXTURE_DIR, self.config_file))
+ if hasattr(self, 'tenant_config_file'):
+ self.config.set('zuul', 'tenant_config', self.tenant_config_file)
+
+ def setup_repos(self):
+ """Subclasses can override to manipulate repos before tests"""
+ pass
def assertFinalState(self):
# Make sure that git.Repo objects have been garbage collected.
@@ -1063,10 +1090,11 @@
repos.append(obj)
self.assertEqual(len(repos), 0)
self.assertEmptyQueues()
- for pipeline in self.sched.layout.pipelines.values():
- if isinstance(pipeline.manager,
- zuul.scheduler.IndependentPipelineManager):
- self.assertEqual(len(pipeline.queues), 0)
+ ipm = zuul.manager.independent.IndependentPipelineManager
+ for tenant in self.sched.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ if isinstance(pipeline.manager, ipm):
+ self.assertEqual(len(pipeline.queues), 0)
def shutdown(self):
self.log.debug("Shutting down after tests")
@@ -1074,7 +1102,6 @@
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
- self.worker.shutdown()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@@ -1171,17 +1198,6 @@
self.log.debug(" OK")
return True
- def registerJobs(self):
- count = 0
- for job in self.sched.layout.jobs.keys():
- self.worker.registerFunction('build:' + job)
- count += 1
- self.worker.registerFunction('stop:' + self.worker.worker_id)
- count += 1
-
- while len(self.gearman_server.functions) < count:
- time.sleep(0)
-
def orderedRelease(self):
# Run one build at a time to ensure non-race order:
while len(self.builds):
@@ -1226,15 +1242,15 @@
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
- for build in self.worker.build_history:
- zbuild = self.launcher.builds.get(build.uuid)
+ for job in self.ansible_server.job_history:
+ zbuild = self.launcher.builds.get(job.unique)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
- for connection in self.worker.active_connections:
+ for connection in self.ansible_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
@@ -1265,7 +1281,8 @@
return False
if server_job.waiting:
continue
- worker_job = self.worker.gearman_jobs.get(server_job.unique)
+ worker_job = self.ansible_server.worker.gearman_jobs.get(
+ server_job.unique)
if worker_job:
if build.number is None:
self.log.debug("%s has not reported start" % worker_job)
@@ -1298,23 +1315,20 @@
print self.areAllBuildsWaiting()
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
- self.worker.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
# processed
self.eventQueuesJoin()
self.sched.run_handler_lock.acquire()
- if (not self.merge_client.build_sets and
+ if (not self.merge_client.jobs and
all(self.eventQueuesEmpty()) and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
- self.worker.lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
- self.worker.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@@ -1322,21 +1336,28 @@
return len(jobs)
def getJobFromHistory(self, name):
- history = self.worker.build_history
+ history = self.ansible_server.job_history
for job in history:
- if job.name == name:
- return job
+ params = json.loads(job.arguments)
+ if params['job'] == name:
+ result = json.loads(job.data[-1])
+ print result
+ ret = BuildHistory(job=job,
+ name=params['job'],
+ result=result['result'])
+ return ret
raise Exception("Unable to find job %s in history" % name)
def assertEmptyQueues(self):
# Make sure there are no orphaned jobs
- for pipeline in self.sched.layout.pipelines.values():
- for queue in pipeline.queues:
- if len(queue.queue) != 0:
- print 'pipeline %s queue %s contents %s' % (
- pipeline.name, queue.name, queue.queue)
- self.assertEqual(len(queue.queue), 0,
- "Pipelines queues should be empty")
+ for tenant in self.sched.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ for queue in pipeline.queues:
+ if len(queue.queue) != 0:
+ print 'pipeline %s queue %s contents %s' % (
+ pipeline.name, queue.name, queue.queue)
+ self.assertEqual(len(queue.queue), 0,
+ "Pipelines queues should be empty")
def assertReportedStat(self, key, value=None, kind=None):
start = time.time()
@@ -1357,3 +1378,35 @@
pprint.pprint(self.statsd.stats)
raise Exception("Key %s not found in reported stats" % key)
+
+ def getPipeline(self, name):
+ return self.sched.abide.tenants.values()[0].layout.pipelines.get(name)
+
+ def updateConfigLayout(self, path):
+ root = os.path.join(self.test_root, "config")
+ os.makedirs(root)
+ f = tempfile.NamedTemporaryFile(dir=root, delete=False)
+ f.write("""
+tenants:
+ - name: openstack
+ include:
+ - %s
+ """ % os.path.abspath(path))
+ f.close()
+ self.config.set('zuul', 'tenant_config', f.name)
+
+ def addCommitToRepo(self, project, message, files, branch='master'):
+ path = os.path.join(self.upstream_root, project)
+ repo = git.Repo(path)
+ repo.head.reference = branch
+ zuul.merger.merger.reset_repo_to_head(repo)
+ for fn, content in files.items():
+ fn = os.path.join(path, fn)
+ with open(fn, 'w') as f:
+ f.write(content)
+ repo.index.add([fn])
+ commit = repo.index.commit(message)
+ repo.heads[branch].commit = commit
+ repo.head.reference = branch
+ repo.git.clean('-x', '-f', '-d')
+ repo.heads[branch].checkout()
diff --git a/tests/fixtures/config/in-repo/common.yaml b/tests/fixtures/config/in-repo/common.yaml
new file mode 100644
index 0000000..58b2051
--- /dev/null
+++ b/tests/fixtures/config/in-repo/common.yaml
@@ -0,0 +1,37 @@
+- pipeline:
+ name: check
+ manager: independent
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+- pipeline:
+ name: tenant-one-gate
+ manager: dependent
+ success-message: Build succeeded (tenant-one-gate).
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ start:
+ gerrit:
+ verified: 0
+ precedence: high
diff --git a/tests/fixtures/config/in-repo/main.yaml b/tests/fixtures/config/in-repo/main.yaml
new file mode 100644
index 0000000..e8b7665
--- /dev/null
+++ b/tests/fixtures/config/in-repo/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+ name: tenant-one
+ include:
+ - common.yaml
+ source:
+ gerrit:
+ repos:
+ - org/project
diff --git a/tests/fixtures/config/multi-tenant/common.yaml b/tests/fixtures/config/multi-tenant/common.yaml
new file mode 100644
index 0000000..6014227
--- /dev/null
+++ b/tests/fixtures/config/multi-tenant/common.yaml
@@ -0,0 +1,14 @@
+- pipeline:
+ name: check
+ manager: independent
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
diff --git a/tests/fixtures/config/multi-tenant/main.yaml b/tests/fixtures/config/multi-tenant/main.yaml
new file mode 100644
index 0000000..b9d780c
--- /dev/null
+++ b/tests/fixtures/config/multi-tenant/main.yaml
@@ -0,0 +1,11 @@
+- tenant:
+ name: tenant-one
+ include:
+ - common.yaml
+ - tenant-one.yaml
+
+- tenant:
+ name: tenant-two
+ include:
+ - common.yaml
+ - tenant-two.yaml
diff --git a/tests/fixtures/config/multi-tenant/tenant-one.yaml b/tests/fixtures/config/multi-tenant/tenant-one.yaml
new file mode 100644
index 0000000..86a98da
--- /dev/null
+++ b/tests/fixtures/config/multi-tenant/tenant-one.yaml
@@ -0,0 +1,35 @@
+- pipeline:
+ name: tenant-one-gate
+ manager: dependent
+ success-message: Build succeeded (tenant-one-gate).
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ start:
+ gerrit:
+ verified: 0
+ precedence: high
+
+- job:
+ name:
+ project1-test1
+
+- project:
+ name: org/project1
+ check:
+ jobs:
+ - project1-test1
+ tenant-one-gate:
+ jobs:
+ - project1-test1
diff --git a/tests/fixtures/config/multi-tenant/tenant-two.yaml b/tests/fixtures/config/multi-tenant/tenant-two.yaml
new file mode 100644
index 0000000..3f80a95
--- /dev/null
+++ b/tests/fixtures/config/multi-tenant/tenant-two.yaml
@@ -0,0 +1,35 @@
+- pipeline:
+ name: tenant-two-gate
+ manager: dependent
+ success-message: Build succeeded (tenant-two-gate).
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ start:
+ gerrit:
+ verified: 0
+ precedence: high
+
+- job:
+ name:
+ project2-test1
+
+- project:
+ name: org/project2
+ check:
+ jobs:
+ - project2-test1
+ tenant-two-gate:
+ jobs:
+ - project2-test1
diff --git a/tests/fixtures/config/project-template/common.yaml b/tests/fixtures/config/project-template/common.yaml
new file mode 100644
index 0000000..c6b237f
--- /dev/null
+++ b/tests/fixtures/config/project-template/common.yaml
@@ -0,0 +1,59 @@
+- pipeline:
+ name: check
+ manager: independent
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+- pipeline:
+ name: gate
+ manager: dependent
+ success-message: Build succeeded (gate).
+ source:
+ gerrit
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ start:
+ gerrit:
+ verified: 0
+ precedence: high
+
+- job:
+ name:
+ project-test1
+
+- job:
+ name:
+ project-test2
+
+- project-template:
+ name: test-template
+ gate:
+ jobs:
+ - project-test2
+
+- project:
+ name: org/project
+ templates:
+ - test-template
+ gate:
+ jobs:
+ - project-test1
diff --git a/tests/fixtures/config/project-template/main.yaml b/tests/fixtures/config/project-template/main.yaml
new file mode 100644
index 0000000..25dea57
--- /dev/null
+++ b/tests/fixtures/config/project-template/main.yaml
@@ -0,0 +1,4 @@
+- tenant:
+ name: tenant-one
+ include:
+ - common.yaml
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 2e48ff1..7d52c17 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -3,7 +3,9 @@
pipelines:
- name: check
- manager: IndependentPipelineManager
+ manager: independent
+ source:
+ gerrit
trigger:
gerrit:
- event: patchset-created
@@ -15,15 +17,19 @@
verified: -1
- name: post
- manager: IndependentPipelineManager
+ manager: independent
+ source:
+ gerrit
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- name: gate
- manager: DependentPipelineManager
+ manager: dependent
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
+ source:
+ gerrit
trigger:
gerrit:
- event: comment-added
@@ -42,8 +48,10 @@
precedence: high
- name: unused
- manager: IndependentPipelineManager
+ manager: independent
dequeue-on-new-patchset: false
+ source:
+ gerrit
trigger:
gerrit:
- event: comment-added
@@ -51,7 +59,9 @@
- approved: 1
- name: dup1
- manager: IndependentPipelineManager
+ manager: independent
+ source:
+ gerrit
trigger:
gerrit:
- event: change-restored
@@ -63,7 +73,9 @@
verified: -1
- name: dup2
- manager: IndependentPipelineManager
+ manager: independent
+ source:
+ gerrit
trigger:
gerrit:
- event: change-restored
@@ -75,8 +87,10 @@
verified: -1
- name: conflict
- manager: DependentPipelineManager
+ manager: dependent
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
+ source:
+ gerrit
trigger:
gerrit:
- event: comment-added
@@ -94,7 +108,9 @@
verified: 0
- name: experimental
- manager: IndependentPipelineManager
+ manager: independent
+ source:
+ gerrit
trigger:
gerrit:
- event: patchset-created
diff --git a/tests/fixtures/main.yaml b/tests/fixtures/main.yaml
new file mode 100644
index 0000000..f9ec378
--- /dev/null
+++ b/tests/fixtures/main.yaml
@@ -0,0 +1,4 @@
+tenants:
+ - name: openstack
+ include:
+ - layout.yaml
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index b250c6d..c08b5ad 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -2,7 +2,7 @@
server=127.0.0.1
[zuul]
-layout_config=layout.yaml
+tenant_config=main.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
diff --git a/tests/test_cloner.py b/tests/test_cloner.py
index 137c157..82d1812 100644
--- a/tests/test_cloner.py
+++ b/tests/test_cloner.py
@@ -37,11 +37,13 @@
workspace_root = None
def setUp(self):
+ self.skip("Disabled for early v3 development")
+
super(TestCloner, self).setUp()
self.workspace_root = os.path.join(self.test_root, 'workspace')
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-cloner.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-cloner.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -504,8 +506,8 @@
def test_periodic(self):
self.worker.hold_jobs_in_build = True
self.create_branch('org/project', 'stable/havana')
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -519,8 +521,8 @@
self.worker.hold_jobs_in_build = False
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.worker.release()
diff --git a/tests/test_connection.py b/tests/test_connection.py
index c3458ac..cb3abd8 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -21,6 +21,9 @@
class TestGerritConnection(testtools.TestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
log = logging.getLogger("zuul.test_connection")
def test_driver_name(self):
@@ -29,6 +32,9 @@
class TestConnections(ZuulTestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def setup_config(self, config_file='zuul-connections-same-gerrit.conf'):
super(TestConnections, self).setup_config(config_file)
@@ -60,11 +66,13 @@
class TestMultipleGerrits(ZuulTestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def setup_config(self,
config_file='zuul-connections-multiple-gerrits.conf'):
super(TestMultipleGerrits, self).setup_config(config_file)
- self.config.set(
- 'zuul', 'layout_config',
+ self.self.updateConfigLayout(
'layout-connections-multiple-gerrits.yaml')
def test_multiple_project_separate_gerrits(self):
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..bd507d1 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -31,6 +31,9 @@
class TestLayoutValidator(testtools.TestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def test_layouts(self):
"""Test layout file validation"""
print
diff --git a/tests/test_merger_repo.py b/tests/test_merger_repo.py
index 454f3cc..7bf08ee 100644
--- a/tests/test_merger_repo.py
+++ b/tests/test_merger_repo.py
@@ -34,8 +34,11 @@
workspace_root = None
def setUp(self):
- super(TestMergerRepo, self).setUp()
- self.workspace_root = os.path.join(self.test_root, 'workspace')
+ self.skip("Disabled for early v3 development")
+
+ # def setUp(self):
+ # super(TestMergerRepo, self).setUp()
+ # self.workspace_root = os.path.join(self.test_root, 'workspace')
def test_ensure_cloned(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..f8f74dc 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
-from zuul import change_matcher as cm
from zuul import model
+from zuul import configloader
from tests.base import BaseTestCase
@@ -22,11 +22,12 @@
@property
def job(self):
- job = model.Job('job')
- job.skip_if_matcher = cm.MatchAll([
- cm.ProjectMatcher('^project$'),
- cm.MatchAllFiles([cm.FileMatcher('^docs/.*$')]),
- ])
+ layout = model.Layout()
+ job = configloader.JobParser.fromYaml(layout, {
+ 'name': 'job',
+ 'irrelevant-files': [
+ '^docs/.*$'
+ ]})
return job
def test_change_matches_returns_false_for_matched_skip_if(self):
@@ -39,26 +40,61 @@
change.files = ['foo']
self.assertTrue(self.job.changeMatches(change))
- def test_copy_retains_skip_if(self):
- job = model.Job('job')
- job.copy(self.job)
- self.assertTrue(job.skip_if_matcher)
-
- def _assert_job_booleans_are_not_none(self, job):
- self.assertIsNotNone(job.voting)
- self.assertIsNotNone(job.hold_following_changes)
-
def test_job_sets_defaults_for_boolean_attributes(self):
- job = model.Job('job')
- self._assert_job_booleans_are_not_none(job)
+ self.assertIsNotNone(self.job.voting)
- def test_metajob_does_not_set_defaults_for_boolean_attributes(self):
- job = model.Job('^job')
- self.assertIsNone(job.voting)
- self.assertIsNone(job.hold_following_changes)
+ def test_job_inheritance(self):
+ layout = model.Layout()
+ base = configloader.JobParser.fromYaml(layout, {
+ 'name': 'base',
+ 'timeout': 30,
+ })
+ layout.addJob(base)
+ python27 = configloader.JobParser.fromYaml(layout, {
+ 'name': 'python27',
+ 'parent': 'base',
+ 'timeout': 40,
+ })
+ layout.addJob(python27)
+ python27diablo = configloader.JobParser.fromYaml(layout, {
+ 'name': 'python27',
+ 'branches': [
+ 'stable/diablo'
+ ],
+ 'timeout': 50,
+ })
+ layout.addJob(python27diablo)
- def test_metajob_copy_does_not_set_undefined_boolean_attributes(self):
- job = model.Job('job')
- metajob = model.Job('^job')
- job.copy(metajob)
- self._assert_job_booleans_are_not_none(job)
+ pipeline = model.Pipeline('gate', layout)
+ layout.addPipeline(pipeline)
+ queue = model.ChangeQueue(pipeline)
+
+ project = model.Project('project')
+ tree = pipeline.addProject(project)
+ tree.addJob(layout.getJob('python27'))
+
+ change = model.Change(project)
+ change.branch = 'master'
+ item = queue.enqueueChange(change)
+
+ self.assertTrue(base.changeMatches(change))
+ self.assertTrue(python27.changeMatches(change))
+ self.assertFalse(python27diablo.changeMatches(change))
+
+ item.freezeJobTree()
+ self.assertEqual(len(item.getJobs()), 1)
+ job = item.getJobs()[0]
+ self.assertEqual(job.name, 'python27')
+ self.assertEqual(job.timeout, 40)
+
+ change.branch = 'stable/diablo'
+
+ self.assertTrue(base.changeMatches(change))
+ self.assertTrue(python27.changeMatches(change))
+ self.assertTrue(python27diablo.changeMatches(change))
+
+ item.freezeJobTree()
+ self.assertEqual(len(item.getJobs()), 1)
+ job = item.getJobs()[0]
+ self.assertEqual(job.name, 'python27')
+ self.assertEqual(job.timeout, 50)
diff --git a/tests/test_requirements.py b/tests/test_requirements.py
index 3ae56ad..1cad659 100644
--- a/tests/test_requirements.py
+++ b/tests/test_requirements.py
@@ -27,6 +27,9 @@
class TestRequirements(ZuulTestCase):
"""Test pipeline and trigger requirements"""
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def test_pipeline_require_approval_newer_than(self):
"Test pipeline requirement: approval newer than"
return self._test_require_approval_newer_than('org/project1',
@@ -38,8 +41,8 @@
'project2-trigger')
def _test_require_approval_newer_than(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-newer-than.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-newer-than.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -76,8 +79,8 @@
'project2-trigger')
def _test_require_approval_older_than(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-older-than.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-older-than.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -114,8 +117,8 @@
'project2-trigger')
def _test_require_approval_username(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-username.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-username.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -145,8 +148,8 @@
'project2-trigger')
def _test_require_approval_email(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-email.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-email.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -176,8 +179,8 @@
'project2-trigger')
def _test_require_approval_vote1(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-vote1.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-vote1.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -213,8 +216,8 @@
'project2-trigger')
def _test_require_approval_vote2(self, project, job):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-vote2.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-vote2.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -261,9 +264,8 @@
def test_pipeline_require_current_patchset(self):
"Test pipeline requirement: current-patchset"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-'
- 'current-patchset.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-current-patchset.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
# Create two patchsets and let their tests settle out. Then
@@ -290,8 +292,8 @@
def test_pipeline_require_open(self):
"Test pipeline requirement: open"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-open.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-open.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -308,8 +310,8 @@
def test_pipeline_require_status(self):
"Test pipeline requirement: status"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-requirement-status.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-requirement-status.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -327,8 +329,7 @@
def _test_require_reject_username(self, project, job):
"Test negative username's match"
# Should only trigger if Jenkins hasn't voted.
- self.config.set(
- 'zuul', 'layout_config',
+ self.updateConfigLayout(
'tests/fixtures/layout-requirement-reject-username.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -369,8 +370,7 @@
def _test_require_reject(self, project, job):
"Test no approval matches a reject param"
- self.config.set(
- 'zuul', 'layout_config',
+ self.updateConfigLayout(
'tests/fixtures/layout-requirement-reject.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 499786c..7ae7de5 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -46,6 +46,9 @@
class TestSchedulerConfigParsing(BaseTestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def test_parse_skip_if(self):
job_yaml = """
jobs:
@@ -80,6 +83,9 @@
class TestScheduler(ZuulTestCase):
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def test_jobs_launched(self):
"Test that jobs are launched and a change is merged"
@@ -918,8 +924,8 @@
def test_post_ignore_deletes_negative(self):
"Test that deleting refs does trigger post jobs"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-dont-ignore-deletes.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-dont-ignore-deletes.yaml')
self.sched.reconfigure(self.config)
e = {
@@ -1769,8 +1775,8 @@
self.worker.hold_jobs_in_build = True
# Start timer trigger - also org/project
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-idle.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-idle.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
# The pipeline triggers every second, so we should have seen
@@ -1779,8 +1785,8 @@
self.waitUntilSettled()
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.assertEqual(len(self.builds), 2, "Two timer jobs")
@@ -2075,8 +2081,8 @@
self.waitUntilSettled()
self.assertEqual(len(self.gearman_server.getQueue()), 1)
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-jobs.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-jobs.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2136,8 +2142,8 @@
def _test_skip_if_jobs(self, branch, should_skip):
"Test that jobs with a skip-if filter run only when appropriate"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-skip-if.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-skip-if.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -2163,7 +2169,7 @@
def test_test_config(self):
"Test that we can test the config"
- self.sched.testConfig(self.config.get('zuul', 'layout_config'),
+ self.sched.testConfig(self.config.get('zuul', 'tenant_config'),
self.connections)
def test_build_description(self):
@@ -2193,8 +2199,8 @@
self.assertEqual(q1.name, 'integration')
self.assertEqual(q2.name, 'integration')
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-bad-queue.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-bad-queue.yaml')
with testtools.ExpectedException(
Exception, "More than one name assigned to change queue"):
self.sched.reconfigure(self.config)
@@ -2274,8 +2280,8 @@
def test_merging_queues(self):
"Test that transitively-connected change queues are merged"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-merge-queues.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-merge-queues.yaml')
self.sched.reconfigure(self.config)
self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
@@ -2415,9 +2421,8 @@
self.assertEqual(len(self.history), 0)
# Add the "project-test3" job.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-add-job.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-add-job.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2478,9 +2483,8 @@
self.assertEqual(len(self.history), 2)
# Add the "project-test3" job.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-add-job.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-add-job.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2532,9 +2536,8 @@
self.assertEqual(len(self.history), 2)
# Remove the test1 job.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-failed-job.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-failed-job.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2582,9 +2585,8 @@
self.assertEqual(len(self.history), 2)
# Remove the integration job.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-shared-queue.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-shared-queue.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2677,9 +2679,8 @@
self.assertEqual(len(self.builds), 5)
# This layout defines only org/project, not org/project1
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-del-project.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-del-project.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
@@ -2720,9 +2721,8 @@
'debian')
self.assertIsNone(self.getJobFromHistory('node-project-test2').node)
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-live-'
- 'reconfiguration-functions.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-live-reconfiguration-functions.yaml')
self.sched.reconfigure(self.config)
self.worker.build_history = []
@@ -2737,8 +2737,8 @@
self.assertIsNone(self.getJobFromHistory('node-project-test2').node)
def test_delayed_repo_init(self):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-delayed-repo-init.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-delayed-repo-init.yaml')
self.sched.reconfigure(self.config)
self.init_repo("org/new-project")
@@ -2757,8 +2757,8 @@
self.assertEqual(A.reported, 2)
def test_repo_deleted(self):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-repo-deleted.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-repo-deleted.yaml')
self.sched.reconfigure(self.config)
self.init_repo("org/delete-project")
@@ -2815,8 +2815,8 @@
def test_timer(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -2835,8 +2835,8 @@
self.worker.hold_jobs_in_build = False
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.worker.release()
@@ -2867,8 +2867,8 @@
# Test that timer triggers periodic jobs even across
# layout config reloads.
# Start timer trigger
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-idle.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-idle.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.waitUntilSettled()
@@ -2879,8 +2879,8 @@
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.waitUntilSettled()
@@ -2892,8 +2892,8 @@
self.assertEqual(len(self.history), x * 2)
def test_check_smtp_pool(self):
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-smtp.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-smtp.yaml')
self.sched.reconfigure(self.config)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -2925,8 +2925,8 @@
def test_timer_smtp(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-timer-smtp.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-timer-smtp.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -2960,8 +2960,8 @@
# Stop queuing timer triggered jobs and let any that may have
# queued through so that end of test assertions pass.
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-no-timer.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.waitUntilSettled()
@@ -3222,8 +3222,8 @@
def test_queue_rate_limiting(self):
"Test that DependentPipelines are rate limited with dep across window"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-rate-limit.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-rate-limit.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -3313,8 +3313,8 @@
def test_queue_rate_limiting_dependent(self):
"Test that DependentPipelines are rate limited with dep in window"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-rate-limit.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-rate-limit.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -3424,8 +3424,8 @@
def test_footer_message(self):
"Test a pipeline's footer message is correctly added to the report."
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-footer-message.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-footer-message.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -3465,8 +3465,8 @@
def test_merge_failure_reporters(self):
"""Check that the config is set up correctly"""
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-merge-failure.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-merge-failure.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -3510,8 +3510,8 @@
def test_merge_failure_reports(self):
"""Check that when a change fails to merge the correct message is sent
to the correct reporter"""
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-merge-failure.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-merge-failure.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -3569,8 +3569,8 @@
def test_swift_instructions(self):
"Test that the correct swift instructions are sent to the workers"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-swift.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-swift.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -4132,8 +4132,8 @@
def test_crd_check_ignore_dependencies(self):
"Test cross-repo dependencies can be ignored"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-ignore-dependencies.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-ignore-dependencies.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -4218,8 +4218,8 @@
def test_disable_at(self):
"Test a pipeline will only report to the disabled trigger when failing"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-disable-at.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-disable-at.yaml')
self.sched.reconfigure(self.config)
self.assertEqual(3, self.sched.layout.pipelines['check'].disable_at)
diff --git a/tests/test_v3.py b/tests/test_v3.py
new file mode 100644
index 0000000..8425383
--- /dev/null
+++ b/tests/test_v3.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python
+
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import textwrap
+
+from tests.base import (
+ ZuulTestCase,
+)
+
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s')
+
+
+class TestMultipleTenants(ZuulTestCase):
+ # A temporary class to hold new tests while others are disabled
+
+ tenant_config_file = 'config/multi-tenant/main.yaml'
+
+ def test_multiple_tenants(self):
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project1-test1').result,
+ 'SUCCESS')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2,
+ "A should report start and success")
+ self.assertIn('tenant-one-gate', A.messages[1],
+ "A should transit tenant-one gate")
+ self.assertNotIn('tenant-two-gate', A.messages[1],
+ "A should *not* transit tenant-two gate")
+
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ B.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project2-test1').result,
+ 'SUCCESS')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2,
+ "B should report start and success")
+ self.assertIn('tenant-two-gate', B.messages[1],
+ "B should transit tenant-two gate")
+ self.assertNotIn('tenant-one-gate', B.messages[1],
+ "B should *not* transit tenant-one gate")
+
+ self.assertEqual(A.reported, 2, "Activity in tenant two should"
+ "not affect tenant one")
+
+
+class TestInRepoConfig(ZuulTestCase):
+ # A temporary class to hold new tests while others are disabled
+
+ tenant_config_file = 'config/in-repo/main.yaml'
+
+ def setup_repos(self):
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project
+ tenant-one-gate:
+ jobs:
+ - project-test1
+ """)
+
+ self.addCommitToRepo('org/project', 'add zuul conf',
+ {'.zuul.yaml': in_repo_conf})
+
+ def test_in_repo_config(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2,
+ "A should report start and success")
+ self.assertIn('tenant-one-gate', A.messages[1],
+ "A should transit tenant-one gate")
+
+
+class TestProjectTemplate(ZuulTestCase):
+ tenant_config_file = 'config/project-template/main.yaml'
+
+ def test(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory('project-test2').result,
+ 'SUCCESS')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2,
+ "A should report start and success")
+ self.assertIn('gate', A.messages[1],
+ "A should transit gate")
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..bc5961f 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -29,6 +29,8 @@
self.waitUntilSettled()
def setUp(self):
+ self.skip("Disabled for early v3 development")
+
super(TestWebapp, self).setUp()
self.addCleanup(self._cleanup)
self.worker.hold_jobs_in_build = True
diff --git a/tests/test_zuultrigger.py b/tests/test_zuultrigger.py
index 0d52fc9..0442c2f 100644
--- a/tests/test_zuultrigger.py
+++ b/tests/test_zuultrigger.py
@@ -26,10 +26,13 @@
class TestZuulTrigger(ZuulTestCase):
"""Test Zuul Trigger"""
+ def setUp(self):
+ self.skip("Disabled for early v3 development")
+
def test_zuul_trigger_parent_change_enqueued(self):
"Test Zuul trigger event: parent-change-enqueued"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-zuultrigger-enqueued.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-zuultrigger-enqueued.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
@@ -74,8 +77,8 @@
def test_zuul_trigger_project_change_merged(self):
"Test Zuul trigger event: project-change-merged"
- self.config.set('zuul', 'layout_config',
- 'tests/fixtures/layout-zuultrigger-merged.yaml')
+ self.updateConfigLayout(
+ 'tests/fixtures/layout-zuultrigger-merged.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..f6b9fe1 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -90,6 +90,6 @@
else:
logging.basicConfig(level=logging.DEBUG)
- def configure_connections(self):
- self.connections = zuul.lib.connections.configure_connections(
- self.config)
+ def configure_connections(self, sched):
+ self.connections = zuul.lib.connections.ConnectionRegistry()
+ self.connections.configure(self.config, sched)
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 2aca4f2..2c891b5 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -177,7 +177,7 @@
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
- self.configure_connections()
+ self.configure_connections(self.sched)
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
diff --git a/zuul/configloader.py b/zuul/configloader.py
new file mode 100644
index 0000000..26db6ed
--- /dev/null
+++ b/zuul/configloader.py
@@ -0,0 +1,540 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import logging
+import yaml
+
+import voluptuous as vs
+
+import model
+import zuul.manager
+import zuul.manager.dependent
+import zuul.manager.independent
+from zuul import change_matcher
+
+
+# Several forms accept either a single item or a list, this makes
+# specifying that in the schema easy (and explicit).
+def to_list(x):
+ return vs.Any([x], x)
+
+
+def as_list(item):
+ if not item:
+ return []
+ if isinstance(item, list):
+ return item
+ return [item]
+
+
+class JobParser(object):
+ @staticmethod
+ def getSchema():
+ # TODOv3(jeblair, jhesketh): move to auth
+ swift = {vs.Required('name'): str,
+ 'container': str,
+ 'expiry': int,
+ 'max_file_size': int,
+ 'max-file-size': int,
+ 'max_file_count': int,
+ 'max-file-count': int,
+ 'logserver_prefix': str,
+ 'logserver-prefix': str,
+ }
+
+ job = {vs.Required('name'): str,
+ 'parent': str,
+ 'queue-name': str,
+ 'failure-message': str,
+ 'success-message': str,
+ 'failure-url': str,
+ 'success-url': str,
+ 'voting': bool,
+ 'mutex': str,
+ 'tags': to_list(str),
+ 'branches': to_list(str),
+ 'files': to_list(str),
+ 'swift': to_list(swift),
+ 'irrelevant-files': to_list(str),
+ 'timeout': int,
+ }
+
+ return vs.Schema(job)
+
+ @staticmethod
+ def fromYaml(layout, conf):
+ JobParser.getSchema()(conf)
+ job = model.Job(conf['name'])
+ if 'parent' in conf:
+ parent = layout.getJob(conf['parent'])
+ job.inheritFrom(parent)
+ job.timeout = conf.get('timeout', job.timeout)
+ job.workspace = conf.get('workspace', job.workspace)
+ job.pre_run = as_list(conf.get('pre-run', job.pre_run))
+ job.post_run = as_list(conf.get('post-run', job.post_run))
+ job.voting = conf.get('voting', True)
+ job.mutex = conf.get('mutex', None)
+ tags = conf.get('tags')
+ if tags:
+ # Tags are merged via a union rather than a
+ # destructive copy because they are intended to
+ # accumulate onto any previously applied tags from
+ # metajobs.
+ job.tags = job.tags.union(set(tags))
+
+ job.failure_message = conf.get('failure-message', job.failure_message)
+ job.success_message = conf.get('success-message', job.success_message)
+ job.failure_url = conf.get('failure-url', job.failure_url)
+ job.success_url = conf.get('success-url', job.success_url)
+ if 'branches' in conf:
+ matchers = []
+ for branch in as_list(conf['branches']):
+ matchers.append(change_matcher.BranchMatcher(branch))
+ job.branch_matcher = change_matcher.MatchAny(matchers)
+ if 'files' in conf:
+ matchers = []
+ for fn in as_list(conf['files']):
+ matchers.append(change_matcher.FileMatcher(fn))
+ job.file_matcher = change_matcher.MatchAny(matchers)
+ if 'irrelevant-files' in conf:
+ matchers = []
+ for fn in as_list(conf['irrelevant-files']):
+ matchers.append(change_matcher.FileMatcher(fn))
+ job.irrelevant_file_matcher = change_matcher.MatchAllFiles(
+ matchers)
+ return job
+
+
+class ProjectTemplateParser(object):
+ log = logging.getLogger("zuul.ProjectTemplateParser")
+
+ @staticmethod
+ def getSchema(layout):
+ project_template = {vs.Required('name'): str}
+ for p in layout.pipelines.values():
+ project_template[p.name] = {'queue': str,
+ 'jobs': [vs.Any(str, dict)]}
+ return vs.Schema(project_template)
+
+ @staticmethod
+ def fromYaml(layout, conf):
+ ProjectTemplateParser.getSchema(layout)(conf)
+ project_template = model.ProjectConfig(conf['name'])
+ for pipeline in layout.pipelines.values():
+ conf_pipeline = conf.get(pipeline.name)
+ if not conf_pipeline:
+ continue
+ project_pipeline = model.ProjectPipelineConfig()
+ project_template.pipelines[pipeline.name] = project_pipeline
+ project_pipeline.queue_name = conf.get('queue')
+ project_pipeline.job_tree = ProjectTemplateParser._parseJobTree(
+ layout, conf_pipeline.get('jobs'))
+ return project_template
+
+ @staticmethod
+ def _parseJobTree(layout, conf, tree=None):
+ if not tree:
+ tree = model.JobTree(None)
+ for conf_job in conf:
+ if isinstance(conf_job, basestring):
+ tree.addJob(layout.getJob(conf_job))
+ elif isinstance(conf_job, dict):
+ # A dictionary in a job tree may override params, or
+ # be the root of a sub job tree, or both.
+ jobname, attrs = dict.items()[0]
+ jobs = attrs.pop('jobs')
+ if attrs:
+ # We are overriding params, so make a new job def
+ attrs['name'] = jobname
+ subtree = tree.addJob(JobParser.fromYaml(layout, attrs))
+ else:
+ # Not overriding, so get existing job
+ subtree = tree.addJob(layout.getJob(jobname))
+
+ if jobs:
+ # This is the root of a sub tree
+ ProjectTemplateParser._parseJobTree(layout, jobs, subtree)
+ else:
+ raise Exception("Job must be a string or dictionary")
+ return tree
+
+
+class ProjectParser(object):
+ log = logging.getLogger("zuul.ProjectParser")
+
+ @staticmethod
+ def getSchema(layout):
+ project = {vs.Required('name'): str,
+ 'templates': [str]}
+ for p in layout.pipelines.values():
+ project[p.name] = {'queue': str,
+ 'jobs': [vs.Any(str, dict)]}
+ return vs.Schema(project)
+
+ @staticmethod
+ def fromYaml(layout, conf):
+ ProjectParser.getSchema(layout)(conf)
+ conf_templates = conf.pop('templates', [])
+ # The way we construct a project definition is by parsing the
+ # definition as a template, then applying all of the
+ # templates, including the newly parsed one, in order.
+ project_template = ProjectTemplateParser.fromYaml(layout, conf)
+ configs = [layout.project_templates[name] for name in conf_templates]
+ configs.append(project_template)
+ project = model.ProjectConfig(conf['name'])
+ for pipeline in layout.pipelines.values():
+ project_pipeline = model.ProjectPipelineConfig()
+ project_pipeline.job_tree = model.JobTree(None)
+ queue_name = None
+ # For every template, iterate over the job tree and replace or
+ # create the jobs in the final definition as needed.
+ pipeline_defined = False
+ for template in configs:
+ ProjectParser.log.debug("Applying template %s to pipeline %s" %
+ (template.name, pipeline.name))
+ if pipeline.name in template.pipelines:
+ pipeline_defined = True
+ template_pipeline = template.pipelines[pipeline.name]
+ project_pipeline.job_tree.inheritFrom(
+ template_pipeline.job_tree)
+ if template_pipeline.queue_name:
+ queue_name = template_pipeline.queue_name
+ if queue_name:
+ project_pipeline.queue_name = queue_name
+ if pipeline_defined:
+ project.pipelines[pipeline.name] = project_pipeline
+ return project
+
+
+class PipelineParser(object):
+ log = logging.getLogger("zuul.PipelineParser")
+
+ # A set of reporter configuration keys to action mapping
+ reporter_actions = {
+ 'start': 'start_actions',
+ 'success': 'success_actions',
+ 'failure': 'failure_actions',
+ 'merge-failure': 'merge_failure_actions',
+ 'disabled': 'disabled_actions',
+ }
+
+ @staticmethod
+ def getDriverSchema(dtype, connections):
+ # TODO(jhesketh): Make the driver discovery dynamic
+ connection_drivers = {
+ 'trigger': {
+ 'gerrit': 'zuul.trigger.gerrit',
+ },
+ 'reporter': {
+ 'gerrit': 'zuul.reporter.gerrit',
+ 'smtp': 'zuul.reporter.smtp',
+ },
+ }
+ standard_drivers = {
+ 'trigger': {
+ 'timer': 'zuul.trigger.timer',
+ 'zuul': 'zuul.trigger.zuultrigger',
+ }
+ }
+
+ schema = {}
+ # Add the configured connections as available layout options
+ for connection_name, connection in connections.connections.items():
+ for dname, dmod in connection_drivers.get(dtype, {}).items():
+ if connection.driver_name == dname:
+ schema[connection_name] = to_list(__import__(
+ connection_drivers[dtype][dname],
+ fromlist=['']).getSchema())
+
+ # Standard drivers are always available and don't require a unique
+ # (connection) name
+ for dname, dmod in standard_drivers.get(dtype, {}).items():
+ schema[dname] = to_list(__import__(
+ standard_drivers[dtype][dname], fromlist=['']).getSchema())
+
+ return schema
+
+ @staticmethod
+ def getSchema(layout, connections):
+ manager = vs.Any('independent',
+ 'dependent')
+
+ precedence = vs.Any('normal', 'low', 'high')
+
+ approval = vs.Schema({'username': str,
+ 'email-filter': str,
+ 'email': str,
+ 'older-than': str,
+ 'newer-than': str,
+ }, extra=True)
+
+ require = {'approval': to_list(approval),
+ 'open': bool,
+ 'current-patchset': bool,
+ 'status': to_list(str)}
+
+ reject = {'approval': to_list(approval)}
+
+ window = vs.All(int, vs.Range(min=0))
+ window_floor = vs.All(int, vs.Range(min=1))
+ window_type = vs.Any('linear', 'exponential')
+ window_factor = vs.All(int, vs.Range(min=1))
+
+ pipeline = {vs.Required('name'): str,
+ vs.Required('manager'): manager,
+ 'source': str,
+ 'precedence': precedence,
+ 'description': str,
+ 'require': require,
+ 'reject': reject,
+ 'success-message': str,
+ 'failure-message': str,
+ 'merge-failure-message': str,
+ 'footer-message': str,
+ 'dequeue-on-new-patchset': bool,
+ 'ignore-dependencies': bool,
+ 'disable-after-consecutive-failures':
+ vs.All(int, vs.Range(min=1)),
+ 'window': window,
+ 'window-floor': window_floor,
+ 'window-increase-type': window_type,
+ 'window-increase-factor': window_factor,
+ 'window-decrease-type': window_type,
+ 'window-decrease-factor': window_factor,
+ }
+ pipeline['trigger'] = vs.Required(
+ PipelineParser.getDriverSchema('trigger', connections))
+ for action in ['start', 'success', 'failure', 'merge-failure',
+ 'disabled']:
+ pipeline[action] = PipelineParser.getDriverSchema('reporter',
+ connections)
+ return vs.Schema(pipeline)
+
+ @staticmethod
+ def fromYaml(layout, connections, scheduler, conf):
+ PipelineParser.getSchema(layout, connections)(conf)
+ pipeline = model.Pipeline(conf['name'], layout)
+ pipeline.description = conf.get('description')
+
+ pipeline.source = connections.getSource(conf['source'])
+
+ precedence = model.PRECEDENCE_MAP[conf.get('precedence')]
+ pipeline.precedence = precedence
+ pipeline.failure_message = conf.get('failure-message',
+ "Build failed.")
+ pipeline.merge_failure_message = conf.get(
+ 'merge-failure-message', "Merge Failed.\n\nThis change or one "
+ "of its cross-repo dependencies was unable to be "
+ "automatically merged with the current state of its "
+ "repository. Please rebase the change and upload a new "
+ "patchset.")
+ pipeline.success_message = conf.get('success-message',
+ "Build succeeded.")
+ pipeline.footer_message = conf.get('footer-message', "")
+ pipeline.dequeue_on_new_patchset = conf.get(
+ 'dequeue-on-new-patchset', True)
+ pipeline.ignore_dependencies = conf.get(
+ 'ignore-dependencies', False)
+
+ for conf_key, action in PipelineParser.reporter_actions.items():
+ reporter_set = []
+ if conf.get(conf_key):
+ for reporter_name, params \
+ in conf.get(conf_key).items():
+ reporter = connections.getReporter(reporter_name,
+ params)
+ reporter.setAction(conf_key)
+ reporter_set.append(reporter)
+ setattr(pipeline, action, reporter_set)
+
+ # If merge-failure actions aren't explicit, use the failure actions
+ if not pipeline.merge_failure_actions:
+ pipeline.merge_failure_actions = pipeline.failure_actions
+
+ pipeline.disable_at = conf.get(
+ 'disable-after-consecutive-failures', None)
+
+ pipeline.window = conf.get('window', 20)
+ pipeline.window_floor = conf.get('window-floor', 3)
+ pipeline.window_increase_type = conf.get(
+ 'window-increase-type', 'linear')
+ pipeline.window_increase_factor = conf.get(
+ 'window-increase-factor', 1)
+ pipeline.window_decrease_type = conf.get(
+ 'window-decrease-type', 'exponential')
+ pipeline.window_decrease_factor = conf.get(
+ 'window-decrease-factor', 2)
+
+ manager_name = conf['manager']
+ if manager_name == 'dependent':
+ manager = zuul.manager.dependent.DependentPipelineManager(
+ scheduler, pipeline)
+ elif manager_name == 'independent':
+ manager = zuul.manager.independent.IndependentPipelineManager(
+ scheduler, pipeline)
+
+ pipeline.setManager(manager)
+ layout.pipelines[conf['name']] = pipeline
+
+ if 'require' in conf or 'reject' in conf:
+ require = conf.get('require', {})
+ reject = conf.get('reject', {})
+ f = model.ChangeishFilter(
+ open=require.get('open'),
+ current_patchset=require.get('current-patchset'),
+ statuses=to_list(require.get('status')),
+ required_approvals=to_list(require.get('approval')),
+ reject_approvals=to_list(reject.get('approval'))
+ )
+ manager.changeish_filters.append(f)
+
+ for trigger_name, trigger_config\
+ in conf.get('trigger').items():
+ trigger = connections.getTrigger(trigger_name, trigger_config)
+ pipeline.triggers.append(trigger)
+
+ # TODO: move
+ manager.event_filters += trigger.getEventFilters(
+ conf['trigger'][trigger_name])
+
+ return pipeline
+
+
+class TenantParser(object):
+ log = logging.getLogger("zuul.TenantParser")
+
+ tenant_source = vs.Schema({'repos': [str]})
+
+ @staticmethod
+ def validateTenantSources(connections):
+ def v(value, path=[]):
+ if isinstance(value, dict):
+ for k, val in value.items():
+ connections.getSource(k)
+ TenantParser.validateTenantSource(val, path + [k])
+ else:
+ raise vs.Invalid("Invalid tenant source", path)
+ return v
+
+ @staticmethod
+ def validateTenantSource(value, path=[]):
+ TenantParser.tenant_source(value)
+
+ @staticmethod
+ def getSchema(connections=None):
+ tenant = {vs.Required('name'): str,
+ 'include': to_list(str),
+ 'source': TenantParser.validateTenantSources(connections)}
+ return vs.Schema(tenant)
+
+ @staticmethod
+ def fromYaml(base, connections, scheduler, merger, conf):
+ TenantParser.getSchema(connections)(conf)
+ tenant = model.Tenant(conf['name'])
+ tenant_config = model.UnparsedTenantConfig()
+ for fn in conf.get('include', []):
+ if not os.path.isabs(fn):
+ fn = os.path.join(base, fn)
+ fn = os.path.expanduser(fn)
+ with open(fn) as config_file:
+ TenantParser.log.info("Loading configuration from %s" % (fn,))
+ incdata = yaml.load(config_file)
+ tenant_config.extend(incdata)
+ incdata = TenantParser._loadTenantInRepoLayouts(merger, connections,
+ conf)
+ tenant_config.extend(incdata)
+ tenant.layout = TenantParser._parseLayout(base, tenant_config,
+ scheduler, connections)
+ return tenant
+
+ @staticmethod
+ def _loadTenantInRepoLayouts(merger, connections, conf_tenant):
+ config = model.UnparsedTenantConfig()
+ jobs = []
+ for source_name, conf_source in conf_tenant.get('source', {}).items():
+ source = connections.getSource(source_name)
+ for conf_repo in conf_source.get('repos'):
+ project = source.getProject(conf_repo)
+ url = source.getGitUrl(project)
+ # TODOv3(jeblair): config should be branch specific
+ job = merger.getFiles(project.name, url, 'master',
+ files=['.zuul.yaml'])
+ job.project = project
+ jobs.append(job)
+ for job in jobs:
+ TenantParser.log.debug("Waiting for cat job %s" % (job,))
+ job.wait()
+ if job.files.get('.zuul.yaml'):
+ TenantParser.log.info(
+ "Loading configuration from %s/.zuul.yaml" %
+ (job.project,))
+ incdata = TenantParser._parseInRepoLayout(
+ job.files['.zuul.yaml'])
+ config.extend(incdata)
+ return config
+
+ @staticmethod
+ def _parseInRepoLayout(data):
+ # TODOv3(jeblair): this should implement some rules to protect
+ # aspects of the config that should not be changed in-repo
+ return yaml.load(data)
+
+ @staticmethod
+ def _parseLayout(base, data, scheduler, connections):
+ layout = model.Layout()
+
+ for config_pipeline in data.pipelines:
+ layout.addPipeline(PipelineParser.fromYaml(layout, connections,
+ scheduler,
+ config_pipeline))
+
+ for config_job in data.jobs:
+ layout.addJob(JobParser.fromYaml(layout, config_job))
+
+ for config_template in data.project_templates:
+ layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
+ layout, config_template))
+
+ for config_project in data.projects:
+ layout.addProjectConfig(ProjectParser.fromYaml(
+ layout, config_project))
+
+ for pipeline in layout.pipelines.values():
+ pipeline.manager._postConfig(layout)
+
+ return layout
+
+
+class ConfigLoader(object):
+ log = logging.getLogger("zuul.ConfigLoader")
+
+ def loadConfig(self, config_path, scheduler, merger, connections):
+ abide = model.Abide()
+
+ if config_path:
+ config_path = os.path.expanduser(config_path)
+ if not os.path.exists(config_path):
+ raise Exception("Unable to read tenant config file at %s" %
+ config_path)
+ with open(config_path) as config_file:
+ self.log.info("Loading configuration from %s" % (config_path,))
+ data = yaml.load(config_file)
+ config = model.UnparsedAbideConfig()
+ config.extend(data)
+ base = os.path.dirname(os.path.realpath(config_path))
+
+ for conf_tenant in config.tenants:
+ tenant = TenantParser.fromYaml(base, connections, scheduler,
+ merger, conf_tenant)
+ abide.tenants[tenant.name] = tenant
+ return abide
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index f8e5add..08a6569 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -25,7 +25,7 @@
import urllib2
from zuul.connection import BaseConnection
-from zuul.model import TriggerEvent
+from zuul.model import TriggerEvent, Project
class GerritEventConnector(threading.Thread):
@@ -98,8 +98,10 @@
Can not get account information." % event.type)
event.account = None
- if (event.change_number and
- self.connection.sched.getProject(event.project_name)):
+ if event.change_number:
+ # TODO(jhesketh): Check if the project exists?
+ # and self.connection.sched.getProject(event.project_name):
+
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
@@ -231,8 +233,14 @@
'https://%s' % self.server)
self._change_cache = {}
+ self.projects = {}
self.gerrit_event_connector = None
+ def getProject(self, name):
+ if name not in self.projects:
+ self.projects[name] = Project(name)
+ return self.projects[name]
+
def getCachedChange(self, key):
if key in self._change_cache:
return self._change_cache.get(key)
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
new file mode 100644
index 0000000..704d620
--- /dev/null
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -0,0 +1,296 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import json
+import logging
+import os
+import shutil
+import subprocess
+import tempfile
+import threading
+import traceback
+
+import gear
+import yaml
+
+import zuul.merger
+
+
+class JobDir(object):
+ def __init__(self):
+ self.root = tempfile.mkdtemp()
+ self.git_root = os.path.join(self.root, 'git')
+ os.makedirs(self.git_root)
+ self.ansible_root = os.path.join(self.root, 'ansible')
+ os.makedirs(self.ansible_root)
+ self.inventory = os.path.join(self.ansible_root, 'inventory')
+ self.playbook = os.path.join(self.ansible_root, 'playbook')
+ self.config = os.path.join(self.ansible_root, 'ansible.cfg')
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, etype, value, tb):
+ shutil.rmtree(self.root)
+
+
+class UpdateTask(object):
+ def __init__(self, project, url):
+ self.project = project
+ self.url = url
+ self.event = threading.Event()
+
+ def __eq__(self, other):
+ if other.project == self.project:
+ return True
+ return False
+
+ def wait(self):
+ self.event.wait()
+
+ def setComplete(self):
+ self.event.set()
+
+
+class DeduplicateQueue(object):
+ def __init__(self):
+ self.queue = collections.deque()
+ self.condition = threading.Condition()
+
+ def qsize(self):
+ return len(self.queue)
+
+ def put(self, item):
+ # Returns the original item if added, or an equivalent item if
+ # already enqueued.
+ self.condition.acquire()
+ ret = None
+ try:
+ for x in self.queue:
+ if item == x:
+ ret = x
+ if ret is None:
+ ret = item
+ self.queue.append(item)
+ self.condition.notify()
+ finally:
+ self.condition.release()
+ return ret
+
+ def get(self):
+ self.condition.acquire()
+ try:
+ while True:
+ try:
+ ret = self.queue.popleft()
+ return ret
+ except IndexError:
+ pass
+ self.condition.wait()
+ finally:
+ self.condition.release()
+
+
+class LaunchServer(object):
+ log = logging.getLogger("zuul.LaunchServer")
+
+ def __init__(self, config, connections={}):
+ self.config = config
+ self.zuul_url = config.get('merger', 'zuul_url')
+
+ if self.config.has_option('merger', 'git_dir'):
+ self.merge_root = self.config.get('merger', 'git_dir')
+ else:
+ self.merge_root = '/var/lib/zuul/git'
+
+ if self.config.has_option('merger', 'git_user_email'):
+ self.merge_email = self.config.get('merger', 'git_user_email')
+ else:
+ self.merge_email = None
+
+ if self.config.has_option('merger', 'git_user_name'):
+ self.merge_name = self.config.get('merger', 'git_user_name')
+ else:
+ self.merge_name = None
+
+ self.connections = connections
+ self.merger = self._getMerger(self.merge_root)
+ self.update_queue = DeduplicateQueue()
+
+ def _getMerger(self, root):
+ return zuul.merger.merger.Merger(root, self.connections,
+ self.merge_email, self.merge_name)
+
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = gear.Worker('Zuul Launch Server')
+ self.worker.addServer(server, port)
+ self.log.debug("Waiting for server")
+ self.worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+ self.log.debug("Starting worker")
+ self.update_thread = threading.Thread(target=self._updateLoop)
+ self.update_thread.daemon = True
+ self.update_thread.start()
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def register(self):
+ self.worker.registerFunction("launcher:launch")
+ # TODOv3: abort
+ self.worker.registerFunction("merger:cat")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self._running = False
+ self.worker.shutdown()
+ self.log.debug("Stopped")
+
+ def join(self):
+ self.update_thread.join()
+ self.thread.join()
+
+ def _updateLoop(self):
+ while self._running:
+ try:
+ self._innerUpdateLoop()
+ except:
+ self.log.exception("Exception in update thread:")
+
+ def _innerUpdateLoop(self):
+ # Inside of a loop that keeps the main repository up to date
+ task = self.update_queue.get()
+ self.log.info("Updating repo %s from %s" % (task.project, task.url))
+ self.merger.updateRepo(task.project, task.url)
+ self.log.debug("Finished updating repo %s from %s" %
+ (task.project, task.url))
+ task.setComplete()
+
+ def update(self, project, url):
+ task = UpdateTask(project, url)
+ task = self.update_queue.put(task)
+ return task
+
+ def run(self):
+ self.log.debug("Starting launch listener")
+ while self._running:
+ try:
+ job = self.worker.getJob()
+ try:
+ if job.name == 'launcher:launch':
+ self.log.debug("Got launch job: %s" % job.unique)
+ self.launch(job)
+ elif job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def launch(self, job):
+ thread = threading.Thread(target=self._launch, args=(job,))
+ thread.start()
+
+ def _launch(self, job):
+ self.log.debug("Job %s: beginning" % (job.unique,))
+ with JobDir() as jobdir:
+ self.log.debug("Job %s: job root at %s" %
+ (job.unique, jobdir.root))
+ args = json.loads(job.arguments)
+ tasks = []
+ for project in args['projects']:
+ self.log.debug("Job %s: updating project %s" %
+ (job.unique, project['name']))
+ tasks.append(self.update(project['name'], project['url']))
+ for task in tasks:
+ task.wait()
+ self.log.debug("Job %s: git updates complete" % (job.unique,))
+ merger = self._getMerger(jobdir.git_root)
+ commit = merger.mergeChanges(args['items']) # noqa
+
+ # TODOv3: Ansible the ansible thing here.
+ self.prepareAnsibleFiles(jobdir, args)
+ result = self.runAnsible(jobdir)
+
+ data = {
+ 'url': 'https://server/job',
+ 'number': 1
+ }
+ job.sendWorkData(json.dumps(data))
+ job.sendWorkStatus(0, 100)
+
+ result = dict(result=result)
+ job.sendWorkComplete(json.dumps(result))
+
+ def getHostList(self, args):
+ # TODOv3: This should get the appropriate nodes from nodepool,
+ # or in the unit tests, be overriden to return localhost.
+ return [('localhost', dict(ansible_connection='local'))]
+
+ def prepareAnsibleFiles(self, jobdir, args):
+ with open(jobdir.inventory, 'w') as inventory:
+ for host_name, host_vars in self.getHostList(args):
+ inventory.write(host_name)
+ inventory.write(' ')
+ for k, v in host_vars.items():
+ inventory.write('%s=%s' % (k, v))
+ inventory.write('\n')
+ with open(jobdir.playbook, 'w') as playbook:
+ play = dict(hosts='localhost',
+ tasks=[dict(name='test',
+ shell='echo Hello world')])
+ playbook.write(yaml.dump([play]))
+ with open(jobdir.config, 'w') as config:
+ config.write('[defaults]\n')
+ config.write('hostfile = %s\n' % jobdir.inventory)
+
+ def runAnsible(self, jobdir):
+ proc = subprocess.Popen(
+ ['ansible-playbook', jobdir.playbook],
+ cwd=jobdir.ansible_root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ (out, err) = proc.communicate()
+ ret = proc.wait()
+ print out
+ print err
+ if ret == 0:
+ return 'SUCCESS'
+ else:
+ return 'FAILURE'
+
+ def cat(self, job):
+ args = json.loads(job.arguments)
+ task = self.update(args['project'], args['url'])
+ task.wait()
+ files = self.merger.getFiles(args['project'], args['url'],
+ args['branch'], args['files'])
+ result = dict(updated=True,
+ files=files,
+ zuul_url=self.zuul_url)
+ job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/launchclient.py
similarity index 91%
rename from zuul/launcher/gearman.py
rename to zuul/launcher/launchclient.py
index 69fb71b..cc39797 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/launchclient.py
@@ -25,6 +25,35 @@
from zuul.model import Build
+def make_merger_item(item):
+ # Create a dictionary with all info about the item needed by
+ # the merger.
+ number = None
+ patchset = None
+ oldrev = None
+ newrev = None
+ if hasattr(item.change, 'number'):
+ number = item.change.number
+ patchset = item.change.patchset
+ elif hasattr(item.change, 'newrev'):
+ oldrev = item.change.oldrev
+ newrev = item.change.newrev
+ connection_name = item.pipeline.source.connection.connection_name
+ return dict(project=item.change.project.name,
+ url=item.pipeline.source.getGitUrl(
+ item.change.project),
+ connection_name=connection_name,
+ merge_mode=item.change.project.merge_mode,
+ refspec=item.change.refspec,
+ branch=item.change.branch,
+ ref=item.current_build_set.ref,
+ number=number,
+ patchset=patchset,
+ oldrev=oldrev,
+ newrev=newrev,
+ )
+
+
class GearmanCleanup(threading.Thread):
""" A thread that checks to see if outstanding builds have
completed without reporting back. """
@@ -148,8 +177,8 @@
self.log.info("Done waiting for Gearman server")
-class Gearman(object):
- log = logging.getLogger("zuul.Gearman")
+class LaunchClient(object):
+ log = logging.getLogger("zuul.LaunchClient")
negative_function_cache_ttl = 5
def __init__(self, config, sched, swift):
@@ -304,7 +333,7 @@
params['ZUUL_REF'] = item.change.ref
params['ZUUL_COMMIT'] = item.change.newrev
- # The destination_path is a unqiue path for this build request
+ # The destination_path is a unique path for this build request
# and generally where the logs are expected to be placed
destination_path = os.path.join(item.change.getBasePath(),
pipeline.name, job.name, uuid[:7])
@@ -335,10 +364,21 @@
# ZUUL_OLDREV
# ZUUL_NEWREV
- if 'ZUUL_NODE' in params:
- name = "build:%s:%s" % (job.name, params['ZUUL_NODE'])
- else:
- name = "build:%s" % job.name
+ all_items = dependent_items + [item]
+ merger_items = map(make_merger_item, all_items)
+
+ params['job'] = job.name
+ params['items'] = merger_items
+ params['projects'] = []
+ projects = set()
+ for item in all_items:
+ if item.change.project not in projects:
+ params['projects'].append(
+ dict(name=item.change.project.name,
+ url=item.pipeline.source.getGitUrl(
+ item.change.project)))
+ projects.add(item.change.project)
+
build = Build(job, uuid)
build.parameters = params
@@ -346,7 +386,7 @@
self.sched.onBuildCompleted(build, 'SUCCESS')
return build
- gearman_job = gear.Job(name, json.dumps(params),
+ gearman_job = gear.Job('launcher:launch', json.dumps(params),
unique=uuid)
build.__gearman_job = gearman_job
self.builds[uuid] = build
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index e1e8ac6..1f008c3 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -25,6 +25,30 @@
return v.Any([x], x)
+class ConfigSchema(object):
+ tenant_source = v.Schema({'repos': [str]})
+
+ def validateTenantSources(self, value, path=[]):
+ if isinstance(value, dict):
+ for k, val in value.items():
+ self.validateTenantSource(val, path + [k])
+ else:
+ raise v.Invalid("Invalid tenant source", path)
+
+ def validateTenantSource(self, value, path=[]):
+ # TODOv3(jeblair): validate against connections
+ self.tenant_source(value)
+
+ def getSchema(self, data, connections=None):
+ tenant = {v.Required('name'): str,
+ 'include': toList(str),
+ 'source': self.validateTenantSources}
+
+ schema = v.Schema({'tenants': [tenant]})
+
+ return schema
+
+
class LayoutSchema(object):
include = {'python-file': str}
includes = [include]
@@ -342,3 +366,9 @@
if action in pipeline:
self.extraDriverValidation('reporter', pipeline[action],
connections)
+
+
+class ConfigValidator(object):
+ def validate(self, data, connections=None):
+ schema = ConfigSchema().getSchema(data, connections)
+ schema(data)
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 92ddb0f..64cc3a7 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -18,49 +18,116 @@
import zuul.connection.smtp
-def configure_connections(config):
- # Register connections from the config
+class ConnectionRegistry(object):
+ """A registry of connections"""
- # TODO(jhesketh): import connection modules dynamically
- connections = {}
+ def __init__(self, sched):
+ self.connections = {}
+ self.sched = sched # TODOv3(jeblair): remove (abstraction violation)
- for section_name in config.sections():
- con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
- section_name, re.I)
- if not con_match:
- continue
- con_name = con_match.group(2)
- con_config = dict(config.items(section_name))
+ def registerScheduler(self, sched):
+ for connection_name, connection in self.connections.items():
+ connection.registerScheduler(sched)
+ connection.onLoad()
- if 'driver' not in con_config:
- raise Exception("No driver specified for connection %s."
- % con_name)
+ def stop(self):
+ for connection_name, connection in self.connections.items():
+ connection.onStop()
- con_driver = con_config['driver']
+ def configure(self, config):
+ # Register connections from the config
+ # TODO(jhesketh): import connection modules dynamically
+ connections = {}
- # TODO(jhesketh): load the required class automatically
- if con_driver == 'gerrit':
- connections[con_name] = \
- zuul.connection.gerrit.GerritConnection(con_name,
- con_config)
- elif con_driver == 'smtp':
- connections[con_name] = \
- zuul.connection.smtp.SMTPConnection(con_name, con_config)
+ for section_name in config.sections():
+ con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
+ section_name, re.I)
+ if not con_match:
+ continue
+ con_name = con_match.group(2)
+ con_config = dict(config.items(section_name))
+
+ if 'driver' not in con_config:
+ raise Exception("No driver specified for connection %s."
+ % con_name)
+
+ con_driver = con_config['driver']
+
+ # TODO(jhesketh): load the required class automatically
+ if con_driver == 'gerrit':
+ connections[con_name] = \
+ zuul.connection.gerrit.GerritConnection(con_name,
+ con_config)
+ elif con_driver == 'smtp':
+ connections[con_name] = \
+ zuul.connection.smtp.SMTPConnection(con_name, con_config)
+ else:
+ raise Exception("Unknown driver, %s, for connection %s"
+ % (con_config['driver'], con_name))
+
+ # If the [gerrit] or [smtp] sections still exist, load them in as a
+ # connection named 'gerrit' or 'smtp' respectfully
+
+ if 'gerrit' in config.sections():
+ connections['gerrit'] = \
+ zuul.connection.gerrit.GerritConnection(
+ 'gerrit', dict(config.items('gerrit')))
+
+ if 'smtp' in config.sections():
+ connections['smtp'] = \
+ zuul.connection.smtp.SMTPConnection(
+ 'smtp', dict(config.items('smtp')))
+
+ self.connections = connections
+
+ def _getDriver(self, dtype, connection_name, driver_config={}):
+ # Instantiate a driver such as a trigger, source or reporter
+ # TODO(jhesketh): Make this list dynamic or use entrypoints etc.
+ # Stevedore was not a good fit here due to the nature of triggers.
+ # Specifically we don't want to load a trigger per a pipeline as one
+ # trigger can listen to a stream (from gerrit, for example) and the
+ # scheduler decides which eventfilter to use. As such we want to load
+ # trigger+connection pairs uniquely.
+ drivers = {
+ 'source': {
+ 'gerrit': 'zuul.source.gerrit:GerritSource',
+ },
+ 'trigger': {
+ 'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
+ 'timer': 'zuul.trigger.timer:TimerTrigger',
+ 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
+ },
+ 'reporter': {
+ 'gerrit': 'zuul.reporter.gerrit:GerritReporter',
+ 'smtp': 'zuul.reporter.smtp:SMTPReporter',
+ },
+ }
+
+ # TODO(jhesketh): Check the connection_name exists
+ if connection_name in self.connections.keys():
+ driver_name = self.connections[connection_name].driver_name
+ connection = self.connections[connection_name]
else:
- raise Exception("Unknown driver, %s, for connection %s"
- % (con_config['driver'], con_name))
+ # In some cases a driver may not be related to a connection. For
+ # example, the 'timer' or 'zuul' triggers.
+ driver_name = connection_name
+ connection = None
+ driver = drivers[dtype][driver_name].split(':')
+ driver_instance = getattr(
+ __import__(driver[0], fromlist=['']), driver[1])(
+ driver_config, self.sched, connection
+ )
- # If the [gerrit] or [smtp] sections still exist, load them in as a
- # connection named 'gerrit' or 'smtp' respectfully
+ if connection:
+ connection.registerUse(dtype, driver_instance)
- if 'gerrit' in config.sections():
- connections['gerrit'] = \
- zuul.connection.gerrit.GerritConnection(
- 'gerrit', dict(config.items('gerrit')))
+ return driver_instance
- if 'smtp' in config.sections():
- connections['smtp'] = \
- zuul.connection.smtp.SMTPConnection(
- 'smtp', dict(config.items('smtp')))
+ def getSource(self, connection_name):
+ return self._getDriver('source', connection_name)
- return connections
+ def getReporter(self, connection_name, driver_config={}):
+ return self._getDriver('reporter', connection_name, driver_config)
+
+ def getTrigger(self, connection_name, driver_config={}):
+ return self._getDriver('trigger', connection_name, driver_config)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
new file mode 100644
index 0000000..ce369f1
--- /dev/null
+++ b/zuul/manager/__init__.py
@@ -0,0 +1,725 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import extras
+import logging
+
+from zuul import exceptions
+from zuul.model import NullChange
+
+statsd = extras.try_import('statsd.statsd')
+
+
+class DynamicChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ if self.change_queue and not self.change_queue.queue:
+ self.change_queue.pipeline.removeQueue(self.change_queue)
+
+
+class StaticChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ pass
+
+
+class BasePipelineManager(object):
+ log = logging.getLogger("zuul.BasePipelineManager")
+
+ def __init__(self, sched, pipeline):
+ self.sched = sched
+ self.pipeline = pipeline
+ self.event_filters = []
+ self.changeish_filters = []
+
+ def __str__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
+
+ def _postConfig(self, layout):
+ self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
+ self.log.info(" Source: %s" % self.pipeline.source)
+ self.log.info(" Requirements:")
+ for f in self.changeish_filters:
+ self.log.info(" %s" % f)
+ self.log.info(" Events:")
+ for e in self.event_filters:
+ self.log.info(" %s" % e)
+ self.log.info(" Projects:")
+
+ def log_jobs(tree, indent=0):
+ istr = ' ' + ' ' * indent
+ if tree.job:
+ efilters = ''
+ for b in tree.job._branches:
+ efilters += str(b)
+ for f in tree.job._files:
+ efilters += str(f)
+ if tree.job.skip_if_matcher:
+ efilters += str(tree.job.skip_if_matcher)
+ if efilters:
+ efilters = ' ' + efilters
+ tags = []
+ if tree.job.hold_following_changes:
+ tags.append('[hold]')
+ if not tree.job.voting:
+ tags.append('[nonvoting]')
+ if tree.job.mutex:
+ tags.append('[mutex: %s]' % tree.job.mutex)
+ tags = ' '.join(tags)
+ self.log.info("%s%s%s %s" % (istr, repr(tree.job),
+ efilters, tags))
+ for x in tree.job_trees:
+ log_jobs(x, indent + 2)
+
+ for p in layout.projects.values():
+ tree = self.pipeline.getJobTree(p)
+ if tree:
+ self.log.info(" %s" % p)
+ log_jobs(tree)
+ self.log.info(" On start:")
+ self.log.info(" %s" % self.pipeline.start_actions)
+ self.log.info(" On success:")
+ self.log.info(" %s" % self.pipeline.success_actions)
+ self.log.info(" On failure:")
+ self.log.info(" %s" % self.pipeline.failure_actions)
+ self.log.info(" On merge-failure:")
+ self.log.info(" %s" % self.pipeline.merge_failure_actions)
+ self.log.info(" When disabled:")
+ self.log.info(" %s" % self.pipeline.disabled_actions)
+
+ def getSubmitAllowNeeds(self):
+ # Get a list of code review labels that are allowed to be
+ # "needed" in the submit records for a change, with respect
+ # to this queue. In other words, the list of review labels
+ # this queue itself is likely to set before submitting.
+ allow_needs = set()
+ for action_reporter in self.pipeline.success_actions:
+ allow_needs.update(action_reporter.getSubmitAllowNeeds())
+ return allow_needs
+
+ def eventMatches(self, event, change):
+ if event.forced_pipeline:
+ if event.forced_pipeline == self.pipeline.name:
+ self.log.debug("Event %s for change %s was directly assigned "
+ "to pipeline %s" % (event, change, self))
+ return True
+ else:
+ return False
+ for ef in self.event_filters:
+ if ef.matches(event, change):
+ self.log.debug("Event %s for change %s matched %s "
+ "in pipeline %s" % (event, change, ef, self))
+ return True
+ return False
+
+ def isChangeAlreadyInPipeline(self, change):
+ # Checks live items in the pipeline
+ for item in self.pipeline.getAllItems():
+ if item.live and change.equals(item.change):
+ return True
+ return False
+
+ def isChangeAlreadyInQueue(self, change, change_queue):
+ # Checks any item in the specified change queue
+ for item in change_queue.queue:
+ if change.equals(item.change):
+ return True
+ return False
+
+ def reportStart(self, item):
+ if not self.pipeline._disabled:
+ try:
+ self.log.info("Reporting start, action %s item %s" %
+ (self.pipeline.start_actions, item))
+ ret = self.sendReport(self.pipeline.start_actions,
+ self.pipeline.source, item)
+ if ret:
+ self.log.error("Reporting item start %s received: %s" %
+ (item, ret))
+ except:
+ self.log.exception("Exception while reporting start:")
+
+ def sendReport(self, action_reporters, source, item,
+ message=None):
+ """Sends the built message off to configured reporters.
+
+ Takes the action_reporters, item, message and extra options and
+ sends them to the pluggable reporters.
+ """
+ report_errors = []
+ if len(action_reporters) > 0:
+ for reporter in action_reporters:
+ ret = reporter.report(source, self.pipeline, item)
+ if ret:
+ report_errors.append(ret)
+ if len(report_errors) == 0:
+ return
+ return report_errors
+
+ def isChangeReadyToBeEnqueued(self, change):
+ return True
+
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ return True
+
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
+ return True
+
+ def checkForChangesNeededBy(self, change, change_queue):
+ return True
+
+ def getFailingDependentItems(self, item):
+ return None
+
+ def getDependentItems(self, item):
+ orig_item = item
+ items = []
+ while item.item_ahead:
+ items.append(item.item_ahead)
+ item = item.item_ahead
+ self.log.info("Change %s depends on changes %s" %
+ (orig_item.change,
+ [x.change for x in items]))
+ return items
+
+ def getItemForChange(self, change):
+ for item in self.pipeline.getAllItems():
+ if item.change.equals(change):
+ return item
+ return None
+
+ def findOldVersionOfChangeAlreadyInQueue(self, change):
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if change.isUpdateOf(item.change):
+ return item
+ return None
+
+ def removeOldVersionsOfChange(self, change):
+ if not self.pipeline.dequeue_on_new_patchset:
+ return
+ old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
+ if old_item:
+ self.log.debug("Change %s is a new version of %s, removing %s" %
+ (change, old_item.change, old_item))
+ self.removeItem(old_item)
+
+ def removeAbandonedChange(self, change):
+ self.log.debug("Change %s abandoned, removing." % change)
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if item.change.equals(change):
+ self.removeItem(item)
+
+ def reEnqueueItem(self, item, last_head):
+ with self.getChangeQueue(item.change, last_head.queue) as change_queue:
+ if change_queue:
+ self.log.debug("Re-enqueing change %s in queue %s" %
+ (item.change, change_queue))
+ change_queue.enqueueItem(item)
+
+ # Re-set build results in case any new jobs have been
+ # added to the tree.
+ for build in item.current_build_set.getBuilds():
+ if build.result:
+ self.pipeline.setResult(item, build)
+ # Similarly, reset the item state.
+ if item.current_build_set.unable_to_merge:
+ self.pipeline.setUnableToMerge(item)
+ if item.dequeued_needing_change:
+ self.pipeline.setDequeuedNeedingChange(item)
+
+ self.reportStats(item)
+ return True
+ else:
+ self.log.error("Unable to find change queue for project %s" %
+ item.change.project)
+ return False
+
+ def addChange(self, change, quiet=False, enqueue_time=None,
+ ignore_requirements=False, live=True,
+ change_queue=None):
+ self.log.debug("Considering adding change %s" % change)
+
+ # If we are adding a live change, check if it's a live item
+ # anywhere in the pipeline. Otherwise, we will perform the
+ # duplicate check below on the specific change_queue.
+ if live and self.isChangeAlreadyInPipeline(change):
+ self.log.debug("Change %s is already in pipeline, "
+ "ignoring" % change)
+ return True
+
+ if not self.isChangeReadyToBeEnqueued(change):
+ self.log.debug("Change %s is not ready to be enqueued, ignoring" %
+ change)
+ return False
+
+ if not ignore_requirements:
+ for f in self.changeish_filters:
+ if not f.matches(change):
+ self.log.debug("Change %s does not match pipeline "
+ "requirement %s" % (change, f))
+ return False
+
+ with self.getChangeQueue(change, change_queue) as change_queue:
+ if not change_queue:
+ self.log.debug("Unable to find change queue for "
+ "change %s in project %s" %
+ (change, change.project))
+ return False
+
+ if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
+ change_queue):
+ self.log.debug("Failed to enqueue changes "
+ "ahead of %s" % change)
+ return False
+
+ if self.isChangeAlreadyInQueue(change, change_queue):
+ self.log.debug("Change %s is already in queue, "
+ "ignoring" % change)
+ return True
+
+ self.log.debug("Adding change %s to queue %s" %
+ (change, change_queue))
+ item = change_queue.enqueueChange(change)
+ if enqueue_time:
+ item.enqueue_time = enqueue_time
+ item.live = live
+ self.reportStats(item)
+ if not quiet:
+ if len(self.pipeline.start_actions) > 0:
+ self.reportStart(item)
+ self.enqueueChangesBehind(change, quiet, ignore_requirements,
+ change_queue)
+ for trigger in self.sched.triggers.values():
+ trigger.onChangeEnqueued(item.change, self.pipeline)
+ return True
+
+ def dequeueItem(self, item):
+ self.log.debug("Removing change %s from queue" % item.change)
+ item.queue.dequeueItem(item)
+
+ def removeItem(self, item):
+ # Remove an item from the queue, probably because it has been
+ # superseded by another change.
+ self.log.debug("Canceling builds behind change: %s "
+ "because it is being removed." % item.change)
+ self.cancelJobs(item)
+ self.dequeueItem(item)
+ self.reportStats(item)
+
+ def _launchJobs(self, item, jobs):
+ self.log.debug("Launching jobs for change %s" % item.change)
+ dependent_items = self.getDependentItems(item)
+ for job in jobs:
+ self.log.debug("Found job %s for change %s" % (job, item.change))
+ try:
+ build = self.sched.launcher.launch(job, item,
+ self.pipeline,
+ dependent_items)
+ self.log.debug("Adding build %s of job %s to item %s" %
+ (build, job, item))
+ item.addBuild(build)
+ except:
+ self.log.exception("Exception while launching job %s "
+ "for change %s:" % (job, item.change))
+
+ def launchJobs(self, item):
+ # TODO(jeblair): This should return a value indicating a job
+ # was launched. Appears to be a longstanding bug.
+ jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
+ if jobs:
+ self._launchJobs(item, jobs)
+
+ def cancelJobs(self, item, prime=True):
+ self.log.debug("Cancel jobs for change %s" % item.change)
+ canceled = False
+ old_build_set = item.current_build_set
+ if prime and item.current_build_set.ref:
+ item.resetAllBuilds()
+ for build in old_build_set.getBuilds():
+ try:
+ self.sched.launcher.cancel(build)
+ except:
+ self.log.exception("Exception while canceling build %s "
+ "for change %s" % (build, item.change))
+ build.result = 'CANCELED'
+ canceled = True
+ self.updateBuildDescriptions(old_build_set)
+ for item_behind in item.items_behind:
+ self.log.debug("Canceling jobs for change %s, behind change %s" %
+ (item_behind.change, item.change))
+ if self.cancelJobs(item_behind, prime=prime):
+ canceled = True
+ return canceled
+
+ def _processOneItem(self, item, nnfi):
+ changed = False
+ item_ahead = item.item_ahead
+ if item_ahead and (not item_ahead.live):
+ item_ahead = None
+ change_queue = item.queue
+ failing_reasons = [] # Reasons this item is failing
+
+ if self.checkForChangesNeededBy(item.change, change_queue) is not True:
+ # It's not okay to enqueue this change, we should remove it.
+ self.log.info("Dequeuing change %s because "
+ "it can no longer merge" % item.change)
+ self.cancelJobs(item)
+ self.dequeueItem(item)
+ self.pipeline.setDequeuedNeedingChange(item)
+ if item.live:
+ try:
+ self.reportItem(item)
+ except exceptions.MergeFailure:
+ pass
+ return (True, nnfi)
+ dep_items = self.getFailingDependentItems(item)
+ actionable = change_queue.isActionable(item)
+ item.active = actionable
+ if dep_items:
+ failing_reasons.append('a needed change is failing')
+ self.cancelJobs(item, prime=False)
+ else:
+ item_ahead_merged = False
+ if (item_ahead and item_ahead.change.is_merged):
+ item_ahead_merged = True
+ if (item_ahead != nnfi and not item_ahead_merged):
+ # Our current base is different than what we expected,
+ # and it's not because our current base merged. Something
+ # ahead must have failed.
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, is not the nearest non-failing "
+ "item, %s" % (item.change, item_ahead, nnfi))
+ change_queue.moveItem(item, nnfi)
+ changed = True
+ self.cancelJobs(item)
+ if actionable:
+ if not item.current_build_set.ref:
+ item.current_build_set.setConfiguration()
+ if actionable and self.launchJobs(item):
+ changed = True
+ if self.pipeline.didAnyJobFail(item):
+ failing_reasons.append("at least one job failed")
+ if (not item.live) and (not item.items_behind):
+ failing_reasons.append("is a non-live item with no items behind")
+ self.dequeueItem(item)
+ changed = True
+ if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
+ and item.live):
+ try:
+ self.reportItem(item)
+ except exceptions.MergeFailure:
+ failing_reasons.append("it did not merge")
+ for item_behind in item.items_behind:
+ self.log.info("Resetting builds for change %s because the "
+ "item ahead, %s, failed to merge" %
+ (item_behind.change, item))
+ self.cancelJobs(item_behind)
+ self.dequeueItem(item)
+ changed = True
+ elif not failing_reasons and item.live:
+ nnfi = item
+ item.current_build_set.failing_reasons = failing_reasons
+ if failing_reasons:
+ self.log.debug("%s is a failing item because %s" %
+ (item, failing_reasons))
+ return (changed, nnfi)
+
+ def processQueue(self):
+ # Do whatever needs to be done for each change in the queue
+ self.log.debug("Starting queue processor: %s" % self.pipeline.name)
+ changed = False
+ for queue in self.pipeline.queues:
+ queue_changed = False
+ nnfi = None # Nearest non-failing item
+ for item in queue.queue[:]:
+ item_changed, nnfi = self._processOneItem(
+ item, nnfi)
+ if item_changed:
+ queue_changed = True
+ self.reportStats(item)
+ if queue_changed:
+ changed = True
+ status = ''
+ for item in queue.queue:
+ status += item.formatStatus()
+ if status:
+ self.log.debug("Queue %s status is now:\n %s" %
+ (queue.name, status))
+ self.log.debug("Finished queue processor: %s (changed: %s)" %
+ (self.pipeline.name, changed))
+ return changed
+
+ def updateBuildDescriptions(self, build_set):
+ for build in build_set.getBuilds():
+ try:
+ desc = self.formatDescription(build)
+ self.sched.launcher.setBuildDescription(build, desc)
+ except:
+ # Log the failure and let loop continue
+ self.log.error("Failed to update description for build %s" %
+ (build))
+
+ if build_set.previous_build_set:
+ for build in build_set.previous_build_set.getBuilds():
+ try:
+ desc = self.formatDescription(build)
+ self.sched.launcher.setBuildDescription(build, desc)
+ except:
+ # Log the failure and let loop continue
+ self.log.error("Failed to update description for "
+ "build %s in previous build set" % (build))
+
+ def onBuildStarted(self, build):
+ self.log.debug("Build %s started" % build)
+ return True
+
+ def onBuildCompleted(self, build):
+ self.log.debug("Build %s completed" % build)
+ item = build.build_set.item
+
+ self.pipeline.setResult(item, build)
+ self.sched.mutex.release(item, build.job)
+ self.log.debug("Item %s status is now:\n %s" %
+ (item, item.formatStatus()))
+ return True
+
+ def onMergeCompleted(self, event):
+ build_set = event.build_set
+ item = build_set.item
+ build_set.merge_state = build_set.COMPLETE
+ build_set.zuul_url = event.zuul_url
+ if event.merged:
+ build_set.commit = event.commit
+ elif event.updated:
+ if not isinstance(item.change, NullChange):
+ build_set.commit = item.change.newrev
+ if not build_set.commit and not isinstance(item.change, NullChange):
+ self.log.info("Unable to merge change %s" % item.change)
+ self.pipeline.setUnableToMerge(item)
+
+ def reportItem(self, item):
+ if not item.reported:
+ # _reportItem() returns True if it failed to report.
+ item.reported = not self._reportItem(item)
+ if self.changes_merge:
+ succeeded = self.pipeline.didAllJobsSucceed(item)
+ merged = item.reported
+ if merged:
+ merged = self.pipeline.source.isMerged(item.change,
+ item.change.branch)
+ self.log.info("Reported change %s status: all-succeeded: %s, "
+ "merged: %s" % (item.change, succeeded, merged))
+ change_queue = item.queue
+ if not (succeeded and merged):
+ self.log.debug("Reported change %s failed tests or failed "
+ "to merge" % (item.change))
+ change_queue.decreaseWindowSize()
+ self.log.debug("%s window size decreased to %s" %
+ (change_queue, change_queue.window))
+ raise exceptions.MergeFailure(
+ "Change %s failed to merge" % item.change)
+ else:
+ change_queue.increaseWindowSize()
+ self.log.debug("%s window size increased to %s" %
+ (change_queue, change_queue.window))
+
+ for trigger in self.sched.triggers.values():
+ trigger.onChangeMerged(item.change, self.pipeline.source)
+
+ def _reportItem(self, item):
+ self.log.debug("Reporting change %s" % item.change)
+ ret = True # Means error as returned by trigger.report
+ if not self.pipeline.getJobs(item):
+ # We don't send empty reports with +1,
+ # and the same for -1's (merge failures or transient errors)
+ # as they cannot be followed by +1's
+ self.log.debug("No jobs for change %s" % item.change)
+ actions = []
+ elif self.pipeline.didAllJobsSucceed(item):
+ self.log.debug("success %s" % (self.pipeline.success_actions))
+ actions = self.pipeline.success_actions
+ item.setReportedResult('SUCCESS')
+ self.pipeline._consecutive_failures = 0
+ else:
+ actions = self.pipeline.failure_actions
+ item.setReportedResult('FAILURE')
+ self.pipeline._consecutive_failures += 1
+ if self.pipeline._disabled:
+ actions = self.pipeline.disabled_actions
+ # Check here if we should disable so that we only use the disabled
+ # reporters /after/ the last disable_at failure is still reported as
+ # normal.
+ if (self.pipeline.disable_at and not self.pipeline._disabled and
+ self.pipeline._consecutive_failures >= self.pipeline.disable_at):
+ self.pipeline._disabled = True
+ if actions:
+ try:
+ self.log.info("Reporting item %s, actions: %s" %
+ (item, actions))
+ ret = self.sendReport(actions, self.pipeline.source, item)
+ if ret:
+ self.log.error("Reporting item %s received: %s" %
+ (item, ret))
+ except:
+ self.log.exception("Exception while reporting:")
+ item.setReportedResult('ERROR')
+ self.updateBuildDescriptions(item.current_build_set)
+ return ret
+
+ def formatDescription(self, build):
+ concurrent_changes = ''
+ concurrent_builds = ''
+ other_builds = ''
+
+ for change in build.build_set.other_changes:
+ concurrent_changes += '<li><a href="{change.url}">\
+ {change.number},{change.patchset}</a></li>'.format(
+ change=change)
+
+ change = build.build_set.item.change
+
+ for build in build.build_set.getBuilds():
+ if build.url:
+ concurrent_builds += """\
+<li>
+ <a href="{build.url}">
+ {build.job.name} #{build.number}</a>: {build.result}
+</li>
+""".format(build=build)
+ else:
+ concurrent_builds += """\
+<li>
+ {build.job.name}: {build.result}
+</li>""".format(build=build)
+
+ if build.build_set.previous_build_set:
+ other_build = build.build_set.previous_build_set.getBuild(
+ build.job.name)
+ if other_build:
+ other_builds += """\
+<li>
+ Preceded by: <a href="{build.url}">
+ {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+ if build.build_set.next_build_set:
+ other_build = build.build_set.next_build_set.getBuild(
+ build.job.name)
+ if other_build:
+ other_builds += """\
+<li>
+ Succeeded by: <a href="{build.url}">
+ {build.job.name} #{build.number}</a>
+</li>
+""".format(build=other_build)
+
+ result = build.build_set.result
+
+ if hasattr(change, 'number'):
+ ret = """\
+<p>
+ Triggered by change:
+ <a href="{change.url}">{change.number},{change.patchset}</a><br/>
+ Branch: <b>{change.branch}</b><br/>
+ Pipeline: <b>{self.pipeline.name}</b>
+</p>"""
+ elif hasattr(change, 'ref'):
+ ret = """\
+<p>
+ Triggered by reference:
+ {change.ref}</a><br/>
+ Old revision: <b>{change.oldrev}</b><br/>
+ New revision: <b>{change.newrev}</b><br/>
+ Pipeline: <b>{self.pipeline.name}</b>
+</p>"""
+ else:
+ ret = ""
+
+ if concurrent_changes:
+ ret += """\
+<p>
+ Other changes tested concurrently with this change:
+ <ul>{concurrent_changes}</ul>
+</p>
+"""
+ if concurrent_builds:
+ ret += """\
+<p>
+ All builds for this change set:
+ <ul>{concurrent_builds}</ul>
+</p>
+"""
+
+ if other_builds:
+ ret += """\
+<p>
+ Other build sets for this change:
+ <ul>{other_builds}</ul>
+</p>
+"""
+ if result:
+ ret += """\
+<p>
+ Reported result: <b>{result}</b>
+</p>
+"""
+
+ ret = ret.format(**locals())
+ return ret
+
+ def reportStats(self, item):
+ if not statsd:
+ return
+ try:
+ # Update the gauge on enqueue and dequeue, but timers only
+ # when dequeing.
+ if item.dequeue_time:
+ dt = int((item.dequeue_time - item.enqueue_time) * 1000)
+ else:
+ dt = None
+ items = len(self.pipeline.getAllItems())
+
+ # stats.timers.zuul.pipeline.NAME.resident_time
+ # stats_counts.zuul.pipeline.NAME.total_changes
+ # stats.gauges.zuul.pipeline.NAME.current_changes
+ key = 'zuul.pipeline.%s' % self.pipeline.name
+ statsd.gauge(key + '.current_changes', items)
+ if dt:
+ statsd.timing(key + '.resident_time', dt)
+ statsd.incr(key + '.total_changes')
+
+ # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
+ # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
+ project_name = item.change.project.name.replace('/', '.')
+ key += '.%s' % project_name
+ if dt:
+ statsd.timing(key + '.resident_time', dt)
+ statsd.incr(key + '.total_changes')
+ except:
+ self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
new file mode 100644
index 0000000..02dc9f6
--- /dev/null
+++ b/zuul/manager/dependent.py
@@ -0,0 +1,198 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from zuul import model
+from zuul.manager import BasePipelineManager, StaticChangeQueueContextManager
+
+
+class DependentPipelineManager(BasePipelineManager):
+ log = logging.getLogger("zuul.DependentPipelineManager")
+ changes_merge = True
+
+ def __init__(self, *args, **kwargs):
+ super(DependentPipelineManager, self).__init__(*args, **kwargs)
+
+ def _postConfig(self, layout):
+ super(DependentPipelineManager, self)._postConfig(layout)
+ self.buildChangeQueues()
+
+ def buildChangeQueues(self):
+ self.log.debug("Building shared change queues")
+ change_queues = []
+
+ for project in self.pipeline.getProjects():
+ change_queue = model.ChangeQueue(
+ self.pipeline,
+ window=self.pipeline.window,
+ window_floor=self.pipeline.window_floor,
+ window_increase_type=self.pipeline.window_increase_type,
+ window_increase_factor=self.pipeline.window_increase_factor,
+ window_decrease_type=self.pipeline.window_decrease_type,
+ window_decrease_factor=self.pipeline.window_decrease_factor)
+ change_queue.addProject(project)
+ change_queues.append(change_queue)
+ self.log.debug("Created queue: %s" % change_queue)
+
+ # Iterate over all queues trying to combine them, and keep doing
+ # so until they can not be combined further.
+ last_change_queues = change_queues
+ while True:
+ new_change_queues = self.combineChangeQueues(last_change_queues)
+ if len(last_change_queues) == len(new_change_queues):
+ break
+ last_change_queues = new_change_queues
+
+ self.log.info(" Shared change queues:")
+ for queue in new_change_queues:
+ self.pipeline.addQueue(queue)
+ self.log.info(" %s containing %s" % (
+ queue, queue.generated_name))
+
+ def combineChangeQueues(self, change_queues):
+ self.log.debug("Combining shared queues")
+ new_change_queues = []
+ for a in change_queues:
+ merged_a = False
+ for b in new_change_queues:
+ if not a.getJobs().isdisjoint(b.getJobs()):
+ self.log.debug("Merging queue %s into %s" % (a, b))
+ b.mergeChangeQueue(a)
+ merged_a = True
+ break # this breaks out of 'for b' and continues 'for a'
+ if not merged_a:
+ self.log.debug("Keeping queue %s" % (a))
+ new_change_queues.append(a)
+ return new_change_queues
+
+ def getChangeQueue(self, change, existing=None):
+ if existing:
+ return StaticChangeQueueContextManager(existing)
+ return StaticChangeQueueContextManager(
+ self.pipeline.getQueue(change.project))
+
+ def isChangeReadyToBeEnqueued(self, change):
+ if not self.pipeline.source.canMerge(change,
+ self.getSubmitAllowNeeds()):
+ self.log.debug("Change %s can not merge, ignoring" % change)
+ return False
+ return True
+
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
+ to_enqueue = []
+ self.log.debug("Checking for changes needing %s:" % change)
+ if not hasattr(change, 'needed_by_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return
+ for other_change in change.needed_by_changes:
+ with self.getChangeQueue(other_change) as other_change_queue:
+ if other_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s can not be "
+ "enqueued in the target queue %s" %
+ (other_change, other_change.project,
+ change_queue))
+ continue
+ if self.pipeline.source.canMerge(other_change,
+ self.getSubmitAllowNeeds()):
+ self.log.debug(" Change %s needs %s and is ready to merge" %
+ (other_change, change))
+ to_enqueue.append(other_change)
+
+ if not to_enqueue:
+ self.log.debug(" No changes need %s" % change)
+
+ for other_change in to_enqueue:
+ self.addChange(other_change, quiet=quiet,
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
+
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ ret = self.checkForChangesNeededBy(change, change_queue)
+ if ret in [True, False]:
+ return ret
+ self.log.debug(" Changes %s must be merged ahead of %s" %
+ (ret, change))
+ for needed_change in ret:
+ r = self.addChange(needed_change, quiet=quiet,
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
+ if not r:
+ return False
+ return True
+
+ def checkForChangesNeededBy(self, change, change_queue):
+ self.log.debug("Checking for changes needed by %s:" % change)
+ # Return true if okay to proceed enqueing this change,
+ # false if the change should not be enqueued.
+ if not hasattr(change, 'needs_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return True
+ if not change.needs_changes:
+ self.log.debug(" No changes needed")
+ return True
+ changes_needed = []
+ # Ignore supplied change_queue
+ with self.getChangeQueue(change) as change_queue:
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
+ continue
+ with self.getChangeQueue(needed_change) as needed_change_queue:
+ if needed_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s does not "
+ "share a change queue with %s "
+ "in project %s" %
+ (needed_change, needed_change.project,
+ change, change.project))
+ return False
+ if not needed_change.is_current_patchset:
+ self.log.debug(" Needed change is not the "
+ "current patchset")
+ return False
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
+ self.log.debug(" Needed change is already ahead "
+ "in the queue")
+ continue
+ if self.pipeline.source.canMerge(needed_change,
+ self.getSubmitAllowNeeds()):
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # The needed change can't be merged.
+ self.log.debug(" Change %s is needed but can not be merged" %
+ needed_change)
+ return False
+ if changes_needed:
+ return changes_needed
+ return True
+
+ def getFailingDependentItems(self, item):
+ if not hasattr(item.change, 'needs_changes'):
+ return None
+ if not item.change.needs_changes:
+ return None
+ failing_items = set()
+ for needed_change in item.change.needs_changes:
+ needed_item = self.getItemForChange(needed_change)
+ if not needed_item:
+ continue
+ if needed_item.current_build_set.failing_reasons:
+ failing_items.add(needed_item)
+ if failing_items:
+ return failing_items
+ return None
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
new file mode 100644
index 0000000..5690189
--- /dev/null
+++ b/zuul/manager/independent.py
@@ -0,0 +1,95 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from zuul import model
+from zuul.manager import BasePipelineManager, DynamicChangeQueueContextManager
+
+
+class IndependentPipelineManager(BasePipelineManager):
+ log = logging.getLogger("zuul.IndependentPipelineManager")
+ changes_merge = False
+
+ def _postConfig(self, layout):
+ super(IndependentPipelineManager, self)._postConfig(layout)
+
+ def getChangeQueue(self, change, existing=None):
+ # creates a new change queue for every change
+ if existing:
+ return DynamicChangeQueueContextManager(existing)
+ if change.project not in self.pipeline.getProjects():
+ self.pipeline.addProject(change.project)
+ change_queue = model.ChangeQueue(self.pipeline)
+ change_queue.addProject(change.project)
+ self.pipeline.addQueue(change_queue)
+ self.log.debug("Dynamically created queue %s", change_queue)
+ return DynamicChangeQueueContextManager(change_queue)
+
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ ret = self.checkForChangesNeededBy(change, change_queue)
+ if ret in [True, False]:
+ return ret
+ self.log.debug(" Changes %s must be merged ahead of %s" %
+ (ret, change))
+ for needed_change in ret:
+ # This differs from the dependent pipeline by enqueuing
+ # changes ahead as "not live", that is, not intended to
+ # have jobs run. Also, pipeline requirements are always
+ # ignored (which is safe because the changes are not
+ # live).
+ r = self.addChange(needed_change, quiet=True,
+ ignore_requirements=True,
+ live=False, change_queue=change_queue)
+ if not r:
+ return False
+ return True
+
+ def checkForChangesNeededBy(self, change, change_queue):
+ if self.pipeline.ignore_dependencies:
+ return True
+ self.log.debug("Checking for changes needed by %s:" % change)
+ # Return true if okay to proceed enqueing this change,
+ # false if the change should not be enqueued.
+ if not hasattr(change, 'needs_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return True
+ if not change.needs_changes:
+ self.log.debug(" No changes needed")
+ return True
+ changes_needed = []
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
+ continue
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
+ self.log.debug(" Needed change is already ahead in the queue")
+ continue
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # This differs from the dependent pipeline check in not
+ # verifying that the dependent change is mergable.
+ if changes_needed:
+ return changes_needed
+ return True
+
+ def dequeueItem(self, item):
+ super(IndependentPipelineManager, self).dequeueItem(item)
+ # An independent pipeline manager dynamically removes empty
+ # queues
+ if not item.queue.queue:
+ self.pipeline.removeQueue(item.queue)
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 950c385..ce04795 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -14,6 +14,7 @@
import json
import logging
+import threading
from uuid import uuid4
import gear
@@ -55,6 +56,18 @@
self.__merge_client.onBuildCompleted(job)
+class MergeJob(gear.Job):
+ def __init__(self, *args, **kw):
+ super(MergeJob, self).__init__(*args, **kw)
+ self.__event = threading.Event()
+
+ def setComplete(self):
+ self.__event.set()
+
+ def wait(self, timeout=300):
+ return self.__event.wait(timeout)
+
+
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
@@ -71,26 +84,28 @@
self.gearman.addServer(server, port)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
- self.build_sets = {}
+ self.jobs = set()
def stop(self):
self.gearman.shutdown()
def areMergesOutstanding(self):
- if self.build_sets:
+ if self.jobs:
return True
return False
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
uuid = str(uuid4().hex)
- job = gear.Job(name,
+ job = MergeJob(name,
json.dumps(data),
unique=uuid)
+ job.build_set = build_set
self.log.debug("Submitting job %s with data %s" % (job, data))
- self.build_sets[uuid] = build_set
+ self.jobs.add(job)
self.gearman.submitJob(job, precedence=precedence,
timeout=300)
+ return job
def mergeChanges(self, items, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
@@ -103,21 +118,29 @@
url=url)
self.submitJob('merger:update', data, build_set, precedence)
+ def getFiles(self, project, url, branch, files,
+ precedence=zuul.model.PRECEDENCE_HIGH):
+ data = dict(project=project,
+ url=url,
+ branch=branch,
+ files=files)
+ job = self.submitJob('merger:cat', data, None, precedence)
+ return job
+
def onBuildCompleted(self, job):
- build_set = self.build_sets.get(job.unique)
- if build_set:
- data = getJobData(job)
- zuul_url = data.get('zuul_url')
- merged = data.get('merged', False)
- updated = data.get('updated', False)
- commit = data.get('commit')
- self.log.info("Merge %s complete, merged: %s, updated: %s, "
- "commit: %s" %
- (job, merged, updated, build_set.commit))
- self.sched.onMergeCompleted(build_set, zuul_url,
+ data = getJobData(job)
+ zuul_url = data.get('zuul_url')
+ merged = data.get('merged', False)
+ updated = data.get('updated', False)
+ commit = data.get('commit')
+ job.files = data.get('files', {})
+ self.log.info("Merge %s complete, merged: %s, updated: %s, "
+ "commit: %s" %
+ (job, merged, updated, commit))
+ job.setComplete()
+ if job.build_set:
+ self.sched.onMergeCompleted(job.build_set, zuul_url,
merged, updated, commit)
- # The test suite expects the build_set to be removed from
- # the internal dict after the wake flag is set.
- del self.build_sets[job.unique]
- else:
- self.log.error("Unable to find build set for uuid %s" % job.unique)
+ # The test suite expects the job to be removed from the
+ # internal account after the wake flag is set.
+ self.jobs.remove(job)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..eaa5721 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -184,6 +184,17 @@
origin = repo.remotes.origin
origin.update()
+ def getFiles(self, branch, files):
+ ret = {}
+ repo = self.createRepoObject()
+ for fn in files:
+ tree = repo.heads[branch].commit.tree
+ if fn in tree:
+ ret[fn] = tree[fn].data_stream.read()
+ else:
+ ret[fn] = None
+ return ret
+
class Merger(object):
log = logging.getLogger("zuul.Merger")
@@ -198,7 +209,7 @@
self.username = username
def _makeSSHWrappers(self, working_root, connections):
- for connection_name, connection in connections.items():
+ for connection_name, connection in connections.connections.items():
sshkey = connection.connection_config.get('sshkey')
if sshkey:
self._makeSSHWrapper(sshkey, working_root, connection_name)
@@ -342,3 +353,7 @@
if not commit:
return None
return commit.hexsha
+
+ def getFiles(self, project, url, branch, files):
+ repo = self.getRepo(project, url)
+ return repo.getFiles(branch, files)
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..813c602 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -68,6 +68,7 @@
def register(self):
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:update")
+ self.worker.registerFunction("merger:cat")
def stop(self):
self.log.debug("Stopping")
@@ -90,6 +91,9 @@
elif job.name == 'merger:update':
self.log.debug("Got update job: %s" % job.unique)
self.update(job)
+ elif job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -113,3 +117,13 @@
result = dict(updated=True,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
+
+ def cat(self, job):
+ args = json.loads(job.arguments)
+ self.merger.updateRepo(args['project'], args['url'])
+ files = self.merger.getFiles(args['project'], args['url'],
+ args['branch'], args['files'])
+ result = dict(updated=True,
+ files=files,
+ zuul_url=self.zuul_url)
+ job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index d2cf13b..a88c365 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -67,8 +67,9 @@
class Pipeline(object):
"""A top-level pipeline such as check, gate, post, etc."""
- def __init__(self, name):
+ def __init__(self, name, layout):
self.name = name
+ self.layout = layout
self.description = None
self.failure_message = None
self.merge_failure_message = None
@@ -81,6 +82,7 @@
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.source = None
+ self.triggers = []
self.start_actions = []
self.success_actions = []
self.failure_actions = []
@@ -96,6 +98,16 @@
self.window_decrease_type = None
self.window_decrease_factor = None
+ @property
+ def actions(self):
+ return (
+ self.start_actions +
+ self.success_actions +
+ self.failure_actions +
+ self.merge_failure_actions +
+ self.disabled_actions
+ )
+
def __repr__(self):
return '<Pipeline %s>' % self.name
@@ -136,11 +148,6 @@
def _findJobsToRun(self, job_trees, item, mutex):
torun = []
- if item.item_ahead:
- # Only run jobs if any 'hold' jobs on the change ahead
- # have completed successfully.
- if self.isHoldingFollowingChanges(item.item_ahead):
- return []
for tree in job_trees:
job = tree.job
result = None
@@ -166,7 +173,7 @@
def findJobsToRun(self, item, mutex):
if not item.live:
return []
- tree = self.getJobTree(item.change.project)
+ tree = item.job_tree
if not tree:
return []
return self._findJobsToRun(tree.job_trees, item, mutex)
@@ -231,8 +238,7 @@
item.removeBuild(build)
elif build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
- root = self.getJobTree(item.change.project)
- tree = root.getJobTreeForJob(build.job)
+ tree = item.job_tree.getJobTreeForJob(build.job)
for job in tree.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
@@ -327,24 +333,15 @@
def addProject(self, project):
if project not in self.projects:
self.projects.append(project)
- self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
names = [x.name for x in self.projects]
names.sort()
self.generated_name = ', '.join(names)
-
- for job in self._jobs:
- if job.queue_name:
- if (self.assigned_name and
- job.queue_name != self.assigned_name):
- raise Exception("More than one name assigned to "
- "change queue: %s != %s" %
- (self.assigned_name, job.queue_name))
- self.assigned_name = job.queue_name
self.name = self.assigned_name or self.generated_name
def enqueueChange(self, change):
item = QueueItem(self, change)
+ item.freezeJobTree()
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
@@ -434,97 +431,90 @@
return '<Project %s>' % (self.name)
+class Inheritable(object):
+ def __init__(self, parent=None):
+ self.parent = parent
+
+ def __getattribute__(self, name):
+ parent = object.__getattribute__(self, 'parent')
+ try:
+ return object.__getattribute__(self, name)
+ except AttributeError:
+ if parent:
+ return getattr(parent, name)
+ raise
+
+
class Job(object):
+ attributes = dict(
+ timeout=None,
+ # variables={},
+ # nodes=[],
+ # auth={},
+ workspace=None,
+ pre_run=None,
+ post_run=None,
+ voting=None,
+ failure_message=None,
+ success_message=None,
+ failure_url=None,
+ success_url=None,
+ # Matchers. These are separate so they can be individually
+ # overidden.
+ branch_matcher=None,
+ file_matcher=None,
+ irrelevant_file_matcher=None, # skip-if
+ swift=None, # TODOv3(jeblair): move to auth
+ parameter_function=None, # TODOv3(jeblair): remove
+ success_pattern=None, # TODOv3(jeblair): remove
+ tags=set(),
+ mutex=None,
+ )
+
def __init__(self, name):
- # If you add attributes here, be sure to add them to the copy method.
self.name = name
- self.queue_name = None
- self.failure_message = None
- self.success_message = None
- self.failure_pattern = None
- self.success_pattern = None
- self.parameter_function = None
- self.tags = set()
- self.mutex = None
- # A metajob should only supply values for attributes that have
- # been explicitly provided, so avoid setting boolean defaults.
- if self.is_metajob:
- self.hold_following_changes = None
- self.voting = None
- else:
- self.hold_following_changes = False
- self.voting = True
- self.branches = []
- self._branches = []
- self.files = []
- self._files = []
- self.skip_if_matcher = None
- self.swift = {}
+ for k, v in self.attributes.items():
+ setattr(self, k, v)
+
+ def __equals__(self, other):
+ # Compare the name and all inheritable attributes to determine
+ # whether two jobs with the same name are identically
+ # configured. Useful upon reconfiguration.
+ if not isinstance(other, Job):
+ return False
+ if self.name != other.name:
+ return False
+ for k, v in self.attributes.items():
+ if getattr(self, k) != getattr(other, k):
+ return False
+ return True
def __str__(self):
return self.name
def __repr__(self):
- return '<Job %s>' % (self.name)
+ return '<Job %s>' % (self.name,)
- @property
- def is_metajob(self):
- return self.name.startswith('^')
+ def inheritFrom(self, other):
+ """Copy the inheritable attributes which have been set on the other
+ job to this job."""
- def copy(self, other):
- if other.failure_message:
- self.failure_message = other.failure_message
- if other.success_message:
- self.success_message = other.success_message
- if other.failure_pattern:
- self.failure_pattern = other.failure_pattern
- if other.success_pattern:
- self.success_pattern = other.success_pattern
- if other.parameter_function:
- self.parameter_function = other.parameter_function
- if other.branches:
- self.branches = other.branches[:]
- self._branches = other._branches[:]
- if other.files:
- self.files = other.files[:]
- self._files = other._files[:]
- if other.skip_if_matcher:
- self.skip_if_matcher = other.skip_if_matcher.copy()
- if other.swift:
- self.swift.update(other.swift)
- if other.mutex:
- self.mutex = other.mutex
- # Tags are merged via a union rather than a destructive copy
- # because they are intended to accumulate as metajobs are
- # applied.
- if other.tags:
- self.tags = self.tags.union(other.tags)
- # Only non-None values should be copied for boolean attributes.
- if other.hold_following_changes is not None:
- self.hold_following_changes = other.hold_following_changes
- if other.voting is not None:
- self.voting = other.voting
+ if not isinstance(other, Job):
+ raise Exception("Job unable to inherit from %s" % (other,))
+ for k, v in self.attributes.items():
+ if getattr(other, k) != v:
+ setattr(self, k, getattr(other, k))
def changeMatches(self, change):
- matches_branch = False
- for branch in self.branches:
- if hasattr(change, 'branch') and branch.match(change.branch):
- matches_branch = True
- if hasattr(change, 'ref') and branch.match(change.ref):
- matches_branch = True
- if self.branches and not matches_branch:
+ if self.branch_matcher and not self.branch_matcher.matches(change):
return False
- matches_file = False
- for f in self.files:
- if hasattr(change, 'files'):
- for cf in change.files:
- if f.match(cf):
- matches_file = True
- if self.files and not matches_file:
+ if self.file_matcher and not self.file_matcher.matches(change):
return False
- if self.skip_if_matcher and self.skip_if_matcher.matches(change):
+ # NB: This is a negative match.
+ if (self.irrelevant_file_matcher and
+ self.irrelevant_file_matcher.matches(change)):
return False
return True
@@ -564,6 +554,17 @@
return ret
return None
+ def inheritFrom(self, other):
+ if other.job:
+ self.job = Job(other.job.name)
+ self.job.inheritFrom(other.job)
+ for other_tree in other.job_trees:
+ this_tree = self.getJobTreeForJob(other_tree.job)
+ if not this_tree:
+ this_tree = JobTree(None)
+ self.job_trees.append(this_tree)
+ this_tree.inheritFrom(other_tree)
+
class Build(object):
def __init__(self, job, uuid):
@@ -697,6 +698,7 @@
self.reported = False
self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all
+ self.job_tree = None
def __repr__(self):
if self.pipeline:
@@ -724,6 +726,43 @@
def setReportedResult(self, result):
self.current_build_set.result = result
+ def _createJobTree(self, job_trees, parent):
+ for tree in job_trees:
+ job = tree.job
+ if not job.changeMatches(self.change):
+ continue
+ frozen_job = Job(job.name)
+ frozen_tree = JobTree(frozen_job)
+ inherited = set()
+ for variant in self.pipeline.layout.getJobs(job.name):
+ if variant.changeMatches(self.change):
+ if variant not in inherited:
+ frozen_job.inheritFrom(variant)
+ inherited.add(variant)
+ if job not in inherited:
+ # Only update from the job in the tree if it is
+ # unique, otherwise we might unset an attribute we
+ # have overloaded.
+ frozen_job.inheritFrom(job)
+ parent.job_trees.append(frozen_tree)
+ self._createJobTree(tree.job_trees, frozen_tree)
+
+ def createJobTree(self):
+ project_tree = self.pipeline.getJobTree(self.change.project)
+ ret = JobTree(None)
+ self._createJobTree(project_tree.job_trees, ret)
+ return ret
+
+ def freezeJobTree(self):
+ """Find or create actual matching jobs for this item's change and
+ store the resulting job tree."""
+ self.job_tree = self.createJobTree()
+
+ def getJobs(self):
+ if not self.live or not self.job_tree:
+ return []
+ return self.job_tree.getJobs()
+
def formatJSON(self):
changeish = self.change
ret = {}
@@ -1329,24 +1368,149 @@
return True
+class ProjectPipelineConfig(object):
+ # Represents a project cofiguration in the context of a pipeline
+ def __init__(self):
+ self.job_tree = None
+ self.queue_name = None
+ # TODOv3(jeblair): add merge mode
+
+
+class ProjectConfig(object):
+ # Represents a project cofiguration
+ def __init__(self, name):
+ self.name = name
+ self.pipelines = {}
+
+
+class UnparsedAbideConfig(object):
+ # A collection of yaml lists that has not yet been parsed into
+ # objects.
+ def __init__(self):
+ self.tenants = []
+
+ def extend(self, conf):
+ if isinstance(conf, UnparsedAbideConfig):
+ self.tenants.extend(conf.tenants)
+ return
+
+ if not isinstance(conf, list):
+ raise Exception("Configuration items must be in the form of "
+ "a list of dictionaries (when parsing %s)" %
+ (conf,))
+ for item in conf:
+ if not isinstance(item, dict):
+ raise Exception("Configuration items must be in the form of "
+ "a list of dictionaries (when parsing %s)" %
+ (conf,))
+ if len(item.keys()) > 1:
+ raise Exception("Configuration item dictionaries must have "
+ "a single key (when parsing %s)" %
+ (conf,))
+ key, value = item.items()[0]
+ if key == 'tenant':
+ self.tenants.append(value)
+ else:
+ raise Exception("Configuration item not recognized "
+ "(when parsing %s)" %
+ (conf,))
+
+
+class UnparsedTenantConfig(object):
+ # A collection of yaml lists that has not yet been parsed into
+ # objects.
+ def __init__(self):
+ self.pipelines = []
+ self.jobs = []
+ self.project_templates = []
+ self.projects = []
+
+ def extend(self, conf):
+ if isinstance(conf, UnparsedTenantConfig):
+ self.pipelines.extend(conf.pipelines)
+ self.jobs.extend(conf.jobs)
+ self.project_templates.extend(conf.project_templates)
+ self.projects.extend(conf.projects)
+ return
+
+ if not isinstance(conf, list):
+ raise Exception("Configuration items must be in the form of "
+ "a list of dictionaries (when parsing %s)" %
+ (conf,))
+ for item in conf:
+ if not isinstance(item, dict):
+ raise Exception("Configuration items must be in the form of "
+ "a list of dictionaries (when parsing %s)" %
+ (conf,))
+ if len(item.keys()) > 1:
+ raise Exception("Configuration item dictionaries must have "
+ "a single key (when parsing %s)" %
+ (conf,))
+ key, value = item.items()[0]
+ if key == 'project':
+ self.projects.append(value)
+ elif key == 'job':
+ self.jobs.append(value)
+ elif key == 'project-template':
+ self.project_templates.append(value)
+ elif key == 'pipeline':
+ self.pipelines.append(value)
+ else:
+ raise Exception("Configuration item not recognized "
+ "(when parsing %s)" %
+ (conf,))
+
+
class Layout(object):
def __init__(self):
self.projects = {}
+ self.project_configs = {}
+ self.project_templates = {}
self.pipelines = OrderedDict()
+ # This is a dictionary of name -> [jobs]. The first element
+ # of the list is the first job added with that name. It is
+ # the reference definition for a given job. Subsequent
+ # elements are aspects of that job with different matchers
+ # that override some attribute of the job. These aspects all
+ # inherit from the reference definition.
self.jobs = {}
- self.metajobs = []
def getJob(self, name):
if name in self.jobs:
- return self.jobs[name]
- job = Job(name)
- if job.is_metajob:
- regex = re.compile(name)
- self.metajobs.append((regex, job))
+ return self.jobs[name][0]
+ raise Exception("Job %s not defined" % (name,))
+
+ def getJobs(self, name):
+ return self.jobs.get(name, [])
+
+ def addJob(self, job):
+ if job.name in self.jobs:
+ self.jobs[job.name].append(job)
else:
- # Apply attributes from matching meta-jobs
- for regex, metajob in self.metajobs:
- if regex.match(name):
- job.copy(metajob)
- self.jobs[name] = job
- return job
+ self.jobs[job.name] = [job]
+
+ def addPipeline(self, pipeline):
+ self.pipelines[pipeline.name] = pipeline
+
+ def addProjectTemplate(self, project_template):
+ self.project_templates[project_template.name] = project_template
+
+ def addProjectConfig(self, project_config):
+ self.project_configs[project_config.name] = project_config
+ # TODOv3(jeblair): tidy up the relationship between pipelines
+ # and projects and projectconfigs
+ for pipeline_name, pipeline_config in project_config.pipelines.items():
+ pipeline = self.pipelines[pipeline_name]
+ project = pipeline.source.getProject(project_config.name)
+ pipeline.job_trees[project] = pipeline_config.job_tree
+
+
+class Tenant(object):
+ def __init__(self, name):
+ self.name = name
+ self.layout = None
+
+
+class Abide(object):
+ def __init__(self):
+ self.tenants = OrderedDict()
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index fd79174..777b058 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -33,9 +33,6 @@
def setAction(self, action):
self._action = action
- def stop(self):
- """Stop the reporter."""
-
@abc.abstractmethod
def report(self, source, pipeline, item):
"""Send the compiled report message."""
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 118cbfc..b1d4181 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -21,44 +21,19 @@
import os
import pickle
from six.moves import queue as Queue
-import re
import sys
import threading
import time
-import yaml
-import layoutvalidator
+import configloader
import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
-from zuul import change_matcher, exceptions
+from model import Project
+from zuul import exceptions
from zuul import version as zuul_version
statsd = extras.try_import('statsd.statsd')
-def deep_format(obj, paramdict):
- """Apply the paramdict via str.format() to all string objects found within
- the supplied obj. Lists and dicts are traversed recursively.
-
- Borrowed from Jenkins Job Builder project"""
- if isinstance(obj, str):
- ret = obj.format(**paramdict)
- elif isinstance(obj, list):
- ret = []
- for item in obj:
- ret.append(deep_format(item, paramdict))
- elif isinstance(obj, dict):
- ret = {}
- for item in obj:
- exp_item = item.format(**paramdict)
-
- ret[exp_item] = deep_format(obj[item], paramdict)
- else:
- ret = obj
- return ret
-
-
class MutexHandler(object):
log = logging.getLogger("zuul.MutexHandler")
@@ -247,8 +222,9 @@
self._stopped = False
self.launcher = None
self.merger = None
+ self.connections = None
+ # TODO(jeblair): fix this
self.mutex = MutexHandler()
- self.connections = dict()
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
# the events are handled by the scheduler itself it needs to handle
@@ -260,23 +236,13 @@
self.trigger_event_queue = Queue.Queue()
self.result_event_queue = Queue.Queue()
self.management_event_queue = Queue.Queue()
- self.layout = model.Layout()
+ self.abide = model.Abide()
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
- # A set of reporter configuration keys to action mapping
- self._reporter_actions = {
- 'start': 'start_actions',
- 'success': 'success_actions',
- 'failure': 'failure_actions',
- 'merge-failure': 'merge_failure_actions',
- 'disabled': 'disabled_actions',
- }
-
def stop(self):
self._stopped = True
- self._unloadDrivers()
self.stopConnections()
self.wake_event.set()
@@ -285,331 +251,12 @@
# registerConnections as we don't want to do the onLoad event yet.
return self._parseConfig(config_path, connections)
- def _parseSkipIf(self, config_job):
- cm = change_matcher
- skip_matchers = []
-
- for config_skip in config_job.get('skip-if', []):
- nested_matchers = []
-
- project_regex = config_skip.get('project')
- if project_regex:
- nested_matchers.append(cm.ProjectMatcher(project_regex))
-
- branch_regex = config_skip.get('branch')
- if branch_regex:
- nested_matchers.append(cm.BranchMatcher(branch_regex))
-
- file_regexes = toList(config_skip.get('all-files-match-any'))
- if file_regexes:
- file_matchers = [cm.FileMatcher(x) for x in file_regexes]
- all_files_matcher = cm.MatchAllFiles(file_matchers)
- nested_matchers.append(all_files_matcher)
-
- # All patterns need to match a given skip-if predicate
- skip_matchers.append(cm.MatchAll(nested_matchers))
-
- if skip_matchers:
- # Any skip-if predicate can be matched to trigger a skip
- return cm.MatchAny(skip_matchers)
-
def registerConnections(self, connections):
self.connections = connections
- for connection_name, connection in self.connections.items():
- connection.registerScheduler(self)
- connection.onLoad()
+ self.connections.registerScheduler(self)
def stopConnections(self):
- for connection_name, connection in self.connections.items():
- connection.onStop()
-
- def _unloadDrivers(self):
- for trigger in self.triggers.values():
- trigger.stop()
- self.triggers = {}
- for pipeline in self.layout.pipelines.values():
- pipeline.source.stop()
- for action in self._reporter_actions.values():
- for reporter in pipeline.__getattribute__(action):
- reporter.stop()
-
- def _getDriver(self, dtype, connection_name, driver_config={}):
- # Instantiate a driver such as a trigger, source or reporter
- # TODO(jhesketh): Make this list dynamic or use entrypoints etc.
- # Stevedore was not a good fit here due to the nature of triggers.
- # Specifically we don't want to load a trigger per a pipeline as one
- # trigger can listen to a stream (from gerrit, for example) and the
- # scheduler decides which eventfilter to use. As such we want to load
- # trigger+connection pairs uniquely.
- drivers = {
- 'source': {
- 'gerrit': 'zuul.source.gerrit:GerritSource',
- },
- 'trigger': {
- 'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
- 'timer': 'zuul.trigger.timer:TimerTrigger',
- 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
- },
- 'reporter': {
- 'gerrit': 'zuul.reporter.gerrit:GerritReporter',
- 'smtp': 'zuul.reporter.smtp:SMTPReporter',
- },
- }
-
- # TODO(jhesketh): Check the connection_name exists
- if connection_name in self.connections.keys():
- driver_name = self.connections[connection_name].driver_name
- connection = self.connections[connection_name]
- else:
- # In some cases a driver may not be related to a connection. For
- # example, the 'timer' or 'zuul' triggers.
- driver_name = connection_name
- connection = None
- driver = drivers[dtype][driver_name].split(':')
- driver_instance = getattr(
- __import__(driver[0], fromlist=['']), driver[1])(
- driver_config, self, connection
- )
-
- if connection:
- connection.registerUse(dtype, driver_instance)
-
- return driver_instance
-
- def _getSourceDriver(self, connection_name):
- return self._getDriver('source', connection_name)
-
- def _getReporterDriver(self, connection_name, driver_config={}):
- return self._getDriver('reporter', connection_name, driver_config)
-
- def _getTriggerDriver(self, connection_name, driver_config={}):
- return self._getDriver('trigger', connection_name, driver_config)
-
- def _parseConfig(self, config_path, connections):
- layout = model.Layout()
- project_templates = {}
-
- if config_path:
- config_path = os.path.expanduser(config_path)
- if not os.path.exists(config_path):
- raise Exception("Unable to read layout config file at %s" %
- config_path)
- with open(config_path) as config_file:
- data = yaml.load(config_file)
-
- validator = layoutvalidator.LayoutValidator()
- validator.validate(data, connections)
-
- config_env = {}
- for include in data.get('includes', []):
- if 'python-file' in include:
- fn = include['python-file']
- if not os.path.isabs(fn):
- base = os.path.dirname(os.path.realpath(config_path))
- fn = os.path.join(base, fn)
- fn = os.path.expanduser(fn)
- execfile(fn, config_env)
-
- for conf_pipeline in data.get('pipelines', []):
- pipeline = Pipeline(conf_pipeline['name'])
- pipeline.description = conf_pipeline.get('description')
- # TODO(jeblair): remove backwards compatibility:
- pipeline.source = self._getSourceDriver(
- conf_pipeline.get('source', 'gerrit'))
- precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
- pipeline.precedence = precedence
- pipeline.failure_message = conf_pipeline.get('failure-message',
- "Build failed.")
- pipeline.merge_failure_message = conf_pipeline.get(
- 'merge-failure-message', "Merge Failed.\n\nThis change or one "
- "of its cross-repo dependencies was unable to be "
- "automatically merged with the current state of its "
- "repository. Please rebase the change and upload a new "
- "patchset.")
- pipeline.success_message = conf_pipeline.get('success-message',
- "Build succeeded.")
- pipeline.footer_message = conf_pipeline.get('footer-message', "")
- pipeline.dequeue_on_new_patchset = conf_pipeline.get(
- 'dequeue-on-new-patchset', True)
- pipeline.ignore_dependencies = conf_pipeline.get(
- 'ignore-dependencies', False)
-
- for conf_key, action in self._reporter_actions.items():
- reporter_set = []
- if conf_pipeline.get(conf_key):
- for reporter_name, params \
- in conf_pipeline.get(conf_key).items():
- reporter = self._getReporterDriver(reporter_name,
- params)
- reporter.setAction(conf_key)
- reporter_set.append(reporter)
- setattr(pipeline, action, reporter_set)
-
- # If merge-failure actions aren't explicit, use the failure actions
- if not pipeline.merge_failure_actions:
- pipeline.merge_failure_actions = pipeline.failure_actions
-
- pipeline.disable_at = conf_pipeline.get(
- 'disable-after-consecutive-failures', None)
-
- pipeline.window = conf_pipeline.get('window', 20)
- pipeline.window_floor = conf_pipeline.get('window-floor', 3)
- pipeline.window_increase_type = conf_pipeline.get(
- 'window-increase-type', 'linear')
- pipeline.window_increase_factor = conf_pipeline.get(
- 'window-increase-factor', 1)
- pipeline.window_decrease_type = conf_pipeline.get(
- 'window-decrease-type', 'exponential')
- pipeline.window_decrease_factor = conf_pipeline.get(
- 'window-decrease-factor', 2)
-
- manager = globals()[conf_pipeline['manager']](self, pipeline)
- pipeline.setManager(manager)
- layout.pipelines[conf_pipeline['name']] = pipeline
-
- if 'require' in conf_pipeline or 'reject' in conf_pipeline:
- require = conf_pipeline.get('require', {})
- reject = conf_pipeline.get('reject', {})
- f = ChangeishFilter(
- open=require.get('open'),
- current_patchset=require.get('current-patchset'),
- statuses=toList(require.get('status')),
- required_approvals=toList(require.get('approval')),
- reject_approvals=toList(reject.get('approval'))
- )
- manager.changeish_filters.append(f)
-
- for trigger_name, trigger_config\
- in conf_pipeline.get('trigger').items():
- if trigger_name not in self.triggers.keys():
- self.triggers[trigger_name] = \
- self._getTriggerDriver(trigger_name, trigger_config)
-
- for trigger_name, trigger in self.triggers.items():
- if trigger_name in conf_pipeline['trigger']:
- manager.event_filters += trigger.getEventFilters(
- conf_pipeline['trigger'][trigger_name])
-
- for project_template in data.get('project-templates', []):
- # Make sure the template only contains valid pipelines
- tpl = dict(
- (pipe_name, project_template.get(pipe_name))
- for pipe_name in layout.pipelines.keys()
- if pipe_name in project_template
- )
- project_templates[project_template.get('name')] = tpl
-
- for config_job in data.get('jobs', []):
- job = layout.getJob(config_job['name'])
- # Be careful to only set attributes explicitly present on
- # this job, to avoid squashing attributes set by a meta-job.
- m = config_job.get('queue-name', None)
- if m:
- job.queue_name = m
- m = config_job.get('failure-message', None)
- if m:
- job.failure_message = m
- m = config_job.get('success-message', None)
- if m:
- job.success_message = m
- m = config_job.get('failure-pattern', None)
- if m:
- job.failure_pattern = m
- m = config_job.get('success-pattern', None)
- if m:
- job.success_pattern = m
- m = config_job.get('hold-following-changes', False)
- if m:
- job.hold_following_changes = True
- m = config_job.get('voting', None)
- if m is not None:
- job.voting = m
- m = config_job.get('mutex', None)
- if m is not None:
- job.mutex = m
- tags = toList(config_job.get('tags'))
- if tags:
- # Tags are merged via a union rather than a
- # destructive copy because they are intended to
- # accumulate onto any previously applied tags from
- # metajobs.
- job.tags = job.tags.union(set(tags))
- fname = config_job.get('parameter-function', None)
- if fname:
- func = config_env.get(fname, None)
- if not func:
- raise Exception("Unable to find function %s" % fname)
- job.parameter_function = func
- branches = toList(config_job.get('branch'))
- if branches:
- job._branches = branches
- job.branches = [re.compile(x) for x in branches]
- files = toList(config_job.get('files'))
- if files:
- job._files = files
- job.files = [re.compile(x) for x in files]
- skip_if_matcher = self._parseSkipIf(config_job)
- if skip_if_matcher:
- job.skip_if_matcher = skip_if_matcher
- swift = toList(config_job.get('swift'))
- if swift:
- for s in swift:
- job.swift[s['name']] = s
-
- def add_jobs(job_tree, config_jobs):
- for job in config_jobs:
- if isinstance(job, list):
- for x in job:
- add_jobs(job_tree, x)
- if isinstance(job, dict):
- for parent, children in job.items():
- parent_tree = job_tree.addJob(layout.getJob(parent))
- add_jobs(parent_tree, children)
- if isinstance(job, str):
- job_tree.addJob(layout.getJob(job))
-
- for config_project in data.get('projects', []):
- project = Project(config_project['name'])
- shortname = config_project['name'].split('/')[-1]
-
- # This is reversed due to the prepend operation below, so
- # the ultimate order is templates (in order) followed by
- # statically defined jobs.
- for requested_template in reversed(
- config_project.get('template', [])):
- # Fetch the template from 'project-templates'
- tpl = project_templates.get(
- requested_template.get('name'))
- # Expand it with the project context
- requested_template['name'] = shortname
- expanded = deep_format(tpl, requested_template)
- # Finally merge the expansion with whatever has been
- # already defined for this project. Prepend our new
- # jobs to existing ones (which may have been
- # statically defined or defined by other templates).
- for pipeline in layout.pipelines.values():
- if pipeline.name in expanded:
- config_project.update(
- {pipeline.name: expanded[pipeline.name] +
- config_project.get(pipeline.name, [])})
-
- layout.projects[config_project['name']] = project
- mode = config_project.get('merge-mode', 'merge-resolve')
- project.merge_mode = model.MERGER_MAP[mode]
- for pipeline in layout.pipelines.values():
- if pipeline.name in config_project:
- job_tree = pipeline.addProject(project)
- config_jobs = config_project[pipeline.name]
- add_jobs(job_tree, config_jobs)
-
- # All jobs should be defined at this point, get rid of
- # metajobs so that getJob isn't doing anything weird.
- layout.metajobs = []
-
- for pipeline in layout.pipelines.values():
- pipeline.manager._postConfig(layout)
-
- return layout
+ self.connections.stop()
def setLauncher(self, launcher):
self.launcher = launcher
@@ -623,6 +270,7 @@
try:
p = self.layout.projects.get(name)
if p is None and create_foreign:
+ # TODOv3(jeblair): fix
self.log.info("Registering foreign project: %s" % name)
p = Project(name, foreign=True)
self.layout.projects[name] = p
@@ -792,83 +440,84 @@
self.config = event.config
try:
self.log.debug("Performing reconfiguration")
- self._unloadDrivers()
- layout = self._parseConfig(
- self.config.get('zuul', 'layout_config'), self.connections)
- for name, new_pipeline in layout.pipelines.items():
- old_pipeline = self.layout.pipelines.get(name)
- if not old_pipeline:
- if self.layout.pipelines:
- # Don't emit this warning on startup
- self.log.warning("No old pipeline matching %s found "
- "when reconfiguring" % name)
- continue
- self.log.debug("Re-enqueueing changes for pipeline %s" % name)
- items_to_remove = []
- builds_to_cancel = []
- last_head = None
- for shared_queue in old_pipeline.queues:
- for item in shared_queue.queue:
- if not item.item_ahead:
- last_head = item
- item.item_ahead = None
- item.items_behind = []
- item.pipeline = None
- item.queue = None
- project_name = item.change.project.name
- item.change.project = layout.projects.get(project_name)
- if not item.change.project:
- self.log.debug("Project %s not defined, "
- "re-instantiating as foreign" %
- project_name)
- project = Project(project_name, foreign=True)
- layout.projects[project_name] = project
- item.change.project = project
- item_jobs = new_pipeline.getJobs(item)
- for build in item.current_build_set.getBuilds():
- job = layout.jobs.get(build.job.name)
- if job and job in item_jobs:
- build.job = job
- else:
- item.removeBuild(build)
- builds_to_cancel.append(build)
- if not new_pipeline.manager.reEnqueueItem(item,
- last_head):
- items_to_remove.append(item)
- for item in items_to_remove:
- for build in item.current_build_set.getBuilds():
- builds_to_cancel.append(build)
- for build in builds_to_cancel:
- self.log.warning(
- "Canceling build %s during reconfiguration" % (build,))
- try:
- self.launcher.cancel(build)
- except Exception:
- self.log.exception(
- "Exception while canceling build %s "
- "for change %s" % (build, item.change))
- self.layout = layout
- self.maintainConnectionCache()
- for trigger in self.triggers.values():
- trigger.postConfig()
- for pipeline in self.layout.pipelines.values():
- pipeline.source.postConfig()
- for action in self._reporter_actions.values():
- for reporter in pipeline.__getattribute__(action):
- reporter.postConfig()
- if statsd:
- try:
- for pipeline in self.layout.pipelines.values():
- items = len(pipeline.getAllItems())
- # stats.gauges.zuul.pipeline.NAME.current_changes
- key = 'zuul.pipeline.%s' % pipeline.name
- statsd.gauge(key + '.current_changes', items)
- except Exception:
- self.log.exception("Exception reporting initial "
- "pipeline stats:")
+ loader = configloader.ConfigLoader()
+ abide = loader.loadConfig(
+ self.config.get('zuul', 'tenant_config'),
+ self, self.merger, self.connections)
+ for tenant in abide.tenants.values():
+ self._reconfigureTenant(tenant)
+ self.abide = abide
finally:
self.layout_lock.release()
+ def _reconfigureTenant(self, tenant):
+ # This is called from _doReconfigureEvent while holding the
+ # layout lock
+ old_tenant = self.abide.tenants.get(tenant.name)
+ if not old_tenant:
+ return
+ for name, new_pipeline in tenant.layout.pipelines.items():
+ old_pipeline = old_tenant.layout.pipelines.get(name)
+ if not old_pipeline:
+ self.log.warning("No old pipeline matching %s found "
+ "when reconfiguring" % name)
+ continue
+ self.log.debug("Re-enqueueing changes for pipeline %s" % name)
+ items_to_remove = []
+ builds_to_cancel = []
+ last_head = None
+ for shared_queue in old_pipeline.queues:
+ for item in shared_queue.queue:
+ if not item.item_ahead:
+ last_head = item
+ item.item_ahead = None
+ item.items_behind = []
+ item.pipeline = None
+ item.queue = None
+ project_name = item.change.project.name
+ item.change.project = new_pipeline.source.getProject(
+ project_name)
+ item_jobs = new_pipeline.getJobs(item)
+ for build in item.current_build_set.getBuilds():
+ job = tenant.layout.jobs.get(build.job.name)
+ if job and job in item_jobs:
+ build.job = job
+ else:
+ item.removeBuild(build)
+ builds_to_cancel.append(build)
+ if not new_pipeline.manager.reEnqueueItem(item,
+ last_head):
+ items_to_remove.append(item)
+ for item in items_to_remove:
+ for build in item.current_build_set.getBuilds():
+ builds_to_cancel.append(build)
+ for build in builds_to_cancel:
+ self.log.warning(
+ "Canceling build %s during reconfiguration" % (build,))
+ try:
+ self.launcher.cancel(build)
+ except Exception:
+ self.log.exception(
+ "Exception while canceling build %s "
+ "for change %s" % (build, item.change))
+ # TODOv3(jeblair): update for tenants
+ self.maintainConnectionCache()
+ for pipeline in tenant.layout.pipelines.values():
+ pipeline.source.postConfig()
+ pipeline.trigger.postConfig()
+ for reporter in pipeline.actions:
+ reporter.postConfig()
+ if statsd:
+ try:
+ for pipeline in self.layout.pipelines.values():
+ items = len(pipeline.getAllItems())
+ # stats.gauges.zuul.pipeline.NAME.current_changes
+ key = 'zuul.pipeline.%s' % pipeline.name
+ statsd.gauge(key + '.current_changes', items)
+ except Exception:
+ self.log.exception("Exception reporting initial "
+ "pipeline stats:")
+
def _doPromoteEvent(self, event):
pipeline = self.layout.pipelines[event.pipeline_name]
change_ids = [c.split(',') for c in event.change_ids]
@@ -914,15 +563,13 @@
change = pipeline.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
- self.log.info("Adding %s, %s to %s" %
+ self.log.info("Adding %s %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change, ignore_requirements=True)
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
- if self.merger.areMergesOutstanding():
- waiting = True
for pipeline in self.layout.pipelines.values():
for item in pipeline.getAllItems():
for build in item.current_build_set.getBuilds():
@@ -956,7 +603,7 @@
self.process_management_queue()
# Give result events priority -- they let us stop builds,
- # whereas trigger evensts cause us to launch builds.
+ # whereas trigger events cause us to launch builds.
while not self.result_event_queue.empty():
self.process_result_queue()
@@ -967,9 +614,10 @@
if self._pause and self._areAllBuildsComplete():
self._doPauseEvent()
- for pipeline in self.layout.pipelines.values():
- while pipeline.manager.processQueue():
- pass
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ while pipeline.manager.processQueue():
+ pass
except Exception:
self.log.exception("Exception in run handler:")
@@ -979,12 +627,16 @@
self.run_handler_lock.release()
def maintainConnectionCache(self):
+ # TODOv3(jeblair): update for tenants
relevant = set()
- for pipeline in self.layout.pipelines.values():
- self.log.debug("Gather relevant cache items for: %s" % pipeline)
- for item in pipeline.getAllItems():
- relevant.add(item.change)
- relevant.update(item.change.getRelatedChanges())
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.log.debug("Gather relevant cache items for: %s" %
+ pipeline)
+
+ for item in pipeline.getAllItems():
+ relevant.add(item.change)
+ relevant.update(item.change.getRelatedChanges())
for connection in self.connections.values():
connection.maintainCache(relevant)
self.log.debug(
@@ -996,31 +648,28 @@
event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event)
try:
- project = self.layout.projects.get(event.project_name)
-
- for pipeline in self.layout.pipelines.values():
- # Get the change even if the project is unknown to us for the
- # use of updating the cache if there is another change
- # depending on this foreign one.
- try:
- change = pipeline.source.getChange(event, project)
- except exceptions.ChangeNotFound as e:
- self.log.debug("Unable to get change %s from source %s. "
- "(most likely looking for a change from "
- "another connection trigger)",
- e.change, pipeline.source)
- continue
- if not project or project.foreign:
- self.log.debug("Project %s not found" % event.project_name)
- continue
- if event.type == 'patchset-created':
- pipeline.manager.removeOldVersionsOfChange(change)
- elif event.type == 'change-abandoned':
- pipeline.manager.removeAbandonedChange(change)
- if pipeline.manager.eventMatches(event, change):
- self.log.info("Adding %s, %s to %s" %
- (project, change, pipeline))
- pipeline.manager.addChange(change)
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ # Get the change even if the project is unknown to
+ # us for the use of updating the cache if there is
+ # another change depending on this foreign one.
+ try:
+ change = pipeline.source.getChange(event)
+ except exceptions.ChangeNotFound as e:
+ self.log.debug("Unable to get change %s from "
+ "source %s (most likely looking "
+ "for a change from another "
+ "connection trigger)",
+ e.change, pipeline.source)
+ continue
+ if event.type == 'patchset-created':
+ pipeline.manager.removeOldVersionsOfChange(change)
+ elif event.type == 'change-abandoned':
+ pipeline.manager.removeAbandonedChange(change)
+ if pipeline.manager.eventMatches(event, change):
+ self.log.info("Adding %s %s to %s" %
+ (change.project, change, pipeline))
+ pipeline.manager.addChange(change)
finally:
self.trigger_event_queue.task_done()
@@ -1097,6 +746,7 @@
pipeline.manager.onMergeCompleted(event)
def formatStatusJSON(self):
+ # TODOv3(jeblair): use tenants
data = {}
data['zuul_version'] = self.zuul_version
@@ -1124,1029 +774,3 @@
for pipeline in self.layout.pipelines.values():
pipelines.append(pipeline.formatStatusJSON())
return json.dumps(data)
-
-
-class BasePipelineManager(object):
- log = logging.getLogger("zuul.BasePipelineManager")
-
- def __init__(self, sched, pipeline):
- self.sched = sched
- self.pipeline = pipeline
- self.event_filters = []
- self.changeish_filters = []
-
- def __str__(self):
- return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
-
- def _postConfig(self, layout):
- self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
- self.log.info(" Source: %s" % self.pipeline.source)
- self.log.info(" Requirements:")
- for f in self.changeish_filters:
- self.log.info(" %s" % f)
- self.log.info(" Events:")
- for e in self.event_filters:
- self.log.info(" %s" % e)
- self.log.info(" Projects:")
-
- def log_jobs(tree, indent=0):
- istr = ' ' + ' ' * indent
- if tree.job:
- efilters = ''
- for b in tree.job._branches:
- efilters += str(b)
- for f in tree.job._files:
- efilters += str(f)
- if tree.job.skip_if_matcher:
- efilters += str(tree.job.skip_if_matcher)
- if efilters:
- efilters = ' ' + efilters
- tags = []
- if tree.job.hold_following_changes:
- tags.append('[hold]')
- if not tree.job.voting:
- tags.append('[nonvoting]')
- if tree.job.mutex:
- tags.append('[mutex: %s]' % tree.job.mutex)
- tags = ' '.join(tags)
- self.log.info("%s%s%s %s" % (istr, repr(tree.job),
- efilters, tags))
- for x in tree.job_trees:
- log_jobs(x, indent + 2)
-
- for p in layout.projects.values():
- tree = self.pipeline.getJobTree(p)
- if tree:
- self.log.info(" %s" % p)
- log_jobs(tree)
- self.log.info(" On start:")
- self.log.info(" %s" % self.pipeline.start_actions)
- self.log.info(" On success:")
- self.log.info(" %s" % self.pipeline.success_actions)
- self.log.info(" On failure:")
- self.log.info(" %s" % self.pipeline.failure_actions)
- self.log.info(" On merge-failure:")
- self.log.info(" %s" % self.pipeline.merge_failure_actions)
- self.log.info(" When disabled:")
- self.log.info(" %s" % self.pipeline.disabled_actions)
-
- def getSubmitAllowNeeds(self):
- # Get a list of code review labels that are allowed to be
- # "needed" in the submit records for a change, with respect
- # to this queue. In other words, the list of review labels
- # this queue itself is likely to set before submitting.
- allow_needs = set()
- for action_reporter in self.pipeline.success_actions:
- allow_needs.update(action_reporter.getSubmitAllowNeeds())
- return allow_needs
-
- def eventMatches(self, event, change):
- if event.forced_pipeline:
- if event.forced_pipeline == self.pipeline.name:
- self.log.debug("Event %s for change %s was directly assigned "
- "to pipeline %s" % (event, change, self))
- return True
- else:
- return False
- for ef in self.event_filters:
- if ef.matches(event, change):
- self.log.debug("Event %s for change %s matched %s "
- "in pipeline %s" % (event, change, ef, self))
- return True
- return False
-
- def isChangeAlreadyInPipeline(self, change):
- # Checks live items in the pipeline
- for item in self.pipeline.getAllItems():
- if item.live and change.equals(item.change):
- return True
- return False
-
- def isChangeAlreadyInQueue(self, change, change_queue):
- # Checks any item in the specified change queue
- for item in change_queue.queue:
- if change.equals(item.change):
- return True
- return False
-
- def reportStart(self, item):
- if not self.pipeline._disabled:
- try:
- self.log.info("Reporting start, action %s item %s" %
- (self.pipeline.start_actions, item))
- ret = self.sendReport(self.pipeline.start_actions,
- self.pipeline.source, item)
- if ret:
- self.log.error("Reporting item start %s received: %s" %
- (item, ret))
- except:
- self.log.exception("Exception while reporting start:")
-
- def sendReport(self, action_reporters, source, item,
- message=None):
- """Sends the built message off to configured reporters.
-
- Takes the action_reporters, item, message and extra options and
- sends them to the pluggable reporters.
- """
- report_errors = []
- if len(action_reporters) > 0:
- for reporter in action_reporters:
- ret = reporter.report(source, self.pipeline, item)
- if ret:
- report_errors.append(ret)
- if len(report_errors) == 0:
- return
- return report_errors
-
- def isChangeReadyToBeEnqueued(self, change):
- return True
-
- def enqueueChangesAhead(self, change, quiet, ignore_requirements,
- change_queue):
- return True
-
- def enqueueChangesBehind(self, change, quiet, ignore_requirements,
- change_queue):
- return True
-
- def checkForChangesNeededBy(self, change, change_queue):
- return True
-
- def getFailingDependentItems(self, item):
- return None
-
- def getDependentItems(self, item):
- orig_item = item
- items = []
- while item.item_ahead:
- items.append(item.item_ahead)
- item = item.item_ahead
- self.log.info("Change %s depends on changes %s" %
- (orig_item.change,
- [x.change for x in items]))
- return items
-
- def getItemForChange(self, change):
- for item in self.pipeline.getAllItems():
- if item.change.equals(change):
- return item
- return None
-
- def findOldVersionOfChangeAlreadyInQueue(self, change):
- for item in self.pipeline.getAllItems():
- if not item.live:
- continue
- if change.isUpdateOf(item.change):
- return item
- return None
-
- def removeOldVersionsOfChange(self, change):
- if not self.pipeline.dequeue_on_new_patchset:
- return
- old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
- if old_item:
- self.log.debug("Change %s is a new version of %s, removing %s" %
- (change, old_item.change, old_item))
- self.removeItem(old_item)
-
- def removeAbandonedChange(self, change):
- self.log.debug("Change %s abandoned, removing." % change)
- for item in self.pipeline.getAllItems():
- if not item.live:
- continue
- if item.change.equals(change):
- self.removeItem(item)
-
- def reEnqueueItem(self, item, last_head):
- with self.getChangeQueue(item.change, last_head.queue) as change_queue:
- if change_queue:
- self.log.debug("Re-enqueing change %s in queue %s" %
- (item.change, change_queue))
- change_queue.enqueueItem(item)
-
- # Re-set build results in case any new jobs have been
- # added to the tree.
- for build in item.current_build_set.getBuilds():
- if build.result:
- self.pipeline.setResult(item, build)
- # Similarly, reset the item state.
- if item.current_build_set.unable_to_merge:
- self.pipeline.setUnableToMerge(item)
- if item.dequeued_needing_change:
- self.pipeline.setDequeuedNeedingChange(item)
-
- self.reportStats(item)
- return True
- else:
- self.log.error("Unable to find change queue for project %s" %
- item.change.project)
- return False
-
- def addChange(self, change, quiet=False, enqueue_time=None,
- ignore_requirements=False, live=True,
- change_queue=None):
- self.log.debug("Considering adding change %s" % change)
-
- # If we are adding a live change, check if it's a live item
- # anywhere in the pipeline. Otherwise, we will perform the
- # duplicate check below on the specific change_queue.
- if live and self.isChangeAlreadyInPipeline(change):
- self.log.debug("Change %s is already in pipeline, "
- "ignoring" % change)
- return True
-
- if not self.isChangeReadyToBeEnqueued(change):
- self.log.debug("Change %s is not ready to be enqueued, ignoring" %
- change)
- return False
-
- if not ignore_requirements:
- for f in self.changeish_filters:
- if not f.matches(change):
- self.log.debug("Change %s does not match pipeline "
- "requirement %s" % (change, f))
- return False
-
- with self.getChangeQueue(change, change_queue) as change_queue:
- if not change_queue:
- self.log.debug("Unable to find change queue for "
- "change %s in project %s" %
- (change, change.project))
- return False
-
- if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
- change_queue):
- self.log.debug("Failed to enqueue changes "
- "ahead of %s" % change)
- return False
-
- if self.isChangeAlreadyInQueue(change, change_queue):
- self.log.debug("Change %s is already in queue, "
- "ignoring" % change)
- return True
-
- self.log.debug("Adding change %s to queue %s" %
- (change, change_queue))
- item = change_queue.enqueueChange(change)
- if enqueue_time:
- item.enqueue_time = enqueue_time
- item.live = live
- self.reportStats(item)
- if not quiet:
- if len(self.pipeline.start_actions) > 0:
- self.reportStart(item)
- self.enqueueChangesBehind(change, quiet, ignore_requirements,
- change_queue)
- for trigger in self.sched.triggers.values():
- trigger.onChangeEnqueued(item.change, self.pipeline)
- return True
-
- def dequeueItem(self, item):
- self.log.debug("Removing change %s from queue" % item.change)
- item.queue.dequeueItem(item)
-
- def removeItem(self, item):
- # Remove an item from the queue, probably because it has been
- # superseded by another change.
- self.log.debug("Canceling builds behind change: %s "
- "because it is being removed." % item.change)
- self.cancelJobs(item)
- self.dequeueItem(item)
- self.reportStats(item)
-
- def _makeMergerItem(self, item):
- # Create a dictionary with all info about the item needed by
- # the merger.
- number = None
- patchset = None
- oldrev = None
- newrev = None
- if hasattr(item.change, 'number'):
- number = item.change.number
- patchset = item.change.patchset
- elif hasattr(item.change, 'newrev'):
- oldrev = item.change.oldrev
- newrev = item.change.newrev
- connection_name = self.pipeline.source.connection.connection_name
- return dict(project=item.change.project.name,
- url=self.pipeline.source.getGitUrl(
- item.change.project),
- connection_name=connection_name,
- merge_mode=item.change.project.merge_mode,
- refspec=item.change.refspec,
- branch=item.change.branch,
- ref=item.current_build_set.ref,
- number=number,
- patchset=patchset,
- oldrev=oldrev,
- newrev=newrev,
- )
-
- def prepareRef(self, item):
- # Returns True if the ref is ready, false otherwise
- build_set = item.current_build_set
- if build_set.merge_state == build_set.COMPLETE:
- return True
- if build_set.merge_state == build_set.PENDING:
- return False
- build_set.merge_state = build_set.PENDING
- ref = build_set.ref
- if hasattr(item.change, 'refspec') and not ref:
- self.log.debug("Preparing ref for: %s" % item.change)
- item.current_build_set.setConfiguration()
- dependent_items = self.getDependentItems(item)
- dependent_items.reverse()
- all_items = dependent_items + [item]
- merger_items = map(self._makeMergerItem, all_items)
- self.sched.merger.mergeChanges(merger_items,
- item.current_build_set,
- self.pipeline.precedence)
- else:
- self.log.debug("Preparing update repo for: %s" % item.change)
- url = self.pipeline.source.getGitUrl(item.change.project)
- self.sched.merger.updateRepo(item.change.project.name,
- url, build_set,
- self.pipeline.precedence)
- return False
-
- def _launchJobs(self, item, jobs):
- self.log.debug("Launching jobs for change %s" % item.change)
- dependent_items = self.getDependentItems(item)
- for job in jobs:
- self.log.debug("Found job %s for change %s" % (job, item.change))
- try:
- build = self.sched.launcher.launch(job, item,
- self.pipeline,
- dependent_items)
- self.log.debug("Adding build %s of job %s to item %s" %
- (build, job, item))
- item.addBuild(build)
- except:
- self.log.exception("Exception while launching job %s "
- "for change %s:" % (job, item.change))
-
- def launchJobs(self, item):
- jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
- if jobs:
- self._launchJobs(item, jobs)
-
- def cancelJobs(self, item, prime=True):
- self.log.debug("Cancel jobs for change %s" % item.change)
- canceled = False
- old_build_set = item.current_build_set
- if prime and item.current_build_set.ref:
- item.resetAllBuilds()
- for build in old_build_set.getBuilds():
- try:
- self.sched.launcher.cancel(build)
- except:
- self.log.exception("Exception while canceling build %s "
- "for change %s" % (build, item.change))
- build.result = 'CANCELED'
- canceled = True
- self.updateBuildDescriptions(old_build_set)
- for item_behind in item.items_behind:
- self.log.debug("Canceling jobs for change %s, behind change %s" %
- (item_behind.change, item.change))
- if self.cancelJobs(item_behind, prime=prime):
- canceled = True
- return canceled
-
- def _processOneItem(self, item, nnfi):
- changed = False
- item_ahead = item.item_ahead
- if item_ahead and (not item_ahead.live):
- item_ahead = None
- change_queue = item.queue
- failing_reasons = [] # Reasons this item is failing
-
- if self.checkForChangesNeededBy(item.change, change_queue) is not True:
- # It's not okay to enqueue this change, we should remove it.
- self.log.info("Dequeuing change %s because "
- "it can no longer merge" % item.change)
- self.cancelJobs(item)
- self.dequeueItem(item)
- self.pipeline.setDequeuedNeedingChange(item)
- if item.live:
- try:
- self.reportItem(item)
- except exceptions.MergeFailure:
- pass
- return (True, nnfi)
- dep_items = self.getFailingDependentItems(item)
- actionable = change_queue.isActionable(item)
- item.active = actionable
- ready = False
- if dep_items:
- failing_reasons.append('a needed change is failing')
- self.cancelJobs(item, prime=False)
- else:
- item_ahead_merged = False
- if (item_ahead and item_ahead.change.is_merged):
- item_ahead_merged = True
- if (item_ahead != nnfi and not item_ahead_merged):
- # Our current base is different than what we expected,
- # and it's not because our current base merged. Something
- # ahead must have failed.
- self.log.info("Resetting builds for change %s because the "
- "item ahead, %s, is not the nearest non-failing "
- "item, %s" % (item.change, item_ahead, nnfi))
- change_queue.moveItem(item, nnfi)
- changed = True
- self.cancelJobs(item)
- if actionable:
- ready = self.prepareRef(item)
- if item.current_build_set.unable_to_merge:
- failing_reasons.append("it has a merge conflict")
- ready = False
- if actionable and ready and self.launchJobs(item):
- changed = True
- if self.pipeline.didAnyJobFail(item):
- failing_reasons.append("at least one job failed")
- if (not item.live) and (not item.items_behind):
- failing_reasons.append("is a non-live item with no items behind")
- self.dequeueItem(item)
- changed = True
- if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
- and item.live):
- try:
- self.reportItem(item)
- except exceptions.MergeFailure:
- failing_reasons.append("it did not merge")
- for item_behind in item.items_behind:
- self.log.info("Resetting builds for change %s because the "
- "item ahead, %s, failed to merge" %
- (item_behind.change, item))
- self.cancelJobs(item_behind)
- self.dequeueItem(item)
- changed = True
- elif not failing_reasons and item.live:
- nnfi = item
- item.current_build_set.failing_reasons = failing_reasons
- if failing_reasons:
- self.log.debug("%s is a failing item because %s" %
- (item, failing_reasons))
- return (changed, nnfi)
-
- def processQueue(self):
- # Do whatever needs to be done for each change in the queue
- self.log.debug("Starting queue processor: %s" % self.pipeline.name)
- changed = False
- for queue in self.pipeline.queues:
- queue_changed = False
- nnfi = None # Nearest non-failing item
- for item in queue.queue[:]:
- item_changed, nnfi = self._processOneItem(
- item, nnfi)
- if item_changed:
- queue_changed = True
- self.reportStats(item)
- if queue_changed:
- changed = True
- status = ''
- for item in queue.queue:
- status += item.formatStatus()
- if status:
- self.log.debug("Queue %s status is now:\n %s" %
- (queue.name, status))
- self.log.debug("Finished queue processor: %s (changed: %s)" %
- (self.pipeline.name, changed))
- return changed
-
- def updateBuildDescriptions(self, build_set):
- for build in build_set.getBuilds():
- try:
- desc = self.formatDescription(build)
- self.sched.launcher.setBuildDescription(build, desc)
- except:
- # Log the failure and let loop continue
- self.log.error("Failed to update description for build %s" %
- (build))
-
- if build_set.previous_build_set:
- for build in build_set.previous_build_set.getBuilds():
- try:
- desc = self.formatDescription(build)
- self.sched.launcher.setBuildDescription(build, desc)
- except:
- # Log the failure and let loop continue
- self.log.error("Failed to update description for "
- "build %s in previous build set" % (build))
-
- def onBuildStarted(self, build):
- self.log.debug("Build %s started" % build)
- return True
-
- def onBuildCompleted(self, build):
- self.log.debug("Build %s completed" % build)
- item = build.build_set.item
-
- self.pipeline.setResult(item, build)
- self.sched.mutex.release(item, build.job)
- self.log.debug("Item %s status is now:\n %s" %
- (item, item.formatStatus()))
- return True
-
- def onMergeCompleted(self, event):
- build_set = event.build_set
- item = build_set.item
- build_set.merge_state = build_set.COMPLETE
- build_set.zuul_url = event.zuul_url
- if event.merged:
- build_set.commit = event.commit
- elif event.updated:
- if not isinstance(item.change, NullChange):
- build_set.commit = item.change.newrev
- if not build_set.commit and not isinstance(item.change, NullChange):
- self.log.info("Unable to merge change %s" % item.change)
- self.pipeline.setUnableToMerge(item)
-
- def reportItem(self, item):
- if not item.reported:
- # _reportItem() returns True if it failed to report.
- item.reported = not self._reportItem(item)
- if self.changes_merge:
- succeeded = self.pipeline.didAllJobsSucceed(item)
- merged = item.reported
- if merged:
- merged = self.pipeline.source.isMerged(item.change,
- item.change.branch)
- self.log.info("Reported change %s status: all-succeeded: %s, "
- "merged: %s" % (item.change, succeeded, merged))
- change_queue = item.queue
- if not (succeeded and merged):
- self.log.debug("Reported change %s failed tests or failed "
- "to merge" % (item.change))
- change_queue.decreaseWindowSize()
- self.log.debug("%s window size decreased to %s" %
- (change_queue, change_queue.window))
- raise exceptions.MergeFailure(
- "Change %s failed to merge" % item.change)
- else:
- change_queue.increaseWindowSize()
- self.log.debug("%s window size increased to %s" %
- (change_queue, change_queue.window))
-
- for trigger in self.sched.triggers.values():
- trigger.onChangeMerged(item.change, self.pipeline.source)
-
- def _reportItem(self, item):
- self.log.debug("Reporting change %s" % item.change)
- ret = True # Means error as returned by trigger.report
- if not self.pipeline.getJobs(item):
- # We don't send empty reports with +1,
- # and the same for -1's (merge failures or transient errors)
- # as they cannot be followed by +1's
- self.log.debug("No jobs for change %s" % item.change)
- actions = []
- elif self.pipeline.didAllJobsSucceed(item):
- self.log.debug("success %s" % (self.pipeline.success_actions))
- actions = self.pipeline.success_actions
- item.setReportedResult('SUCCESS')
- self.pipeline._consecutive_failures = 0
- elif not self.pipeline.didMergerSucceed(item):
- actions = self.pipeline.merge_failure_actions
- item.setReportedResult('MERGER_FAILURE')
- else:
- actions = self.pipeline.failure_actions
- item.setReportedResult('FAILURE')
- self.pipeline._consecutive_failures += 1
- if self.pipeline._disabled:
- actions = self.pipeline.disabled_actions
- # Check here if we should disable so that we only use the disabled
- # reporters /after/ the last disable_at failure is still reported as
- # normal.
- if (self.pipeline.disable_at and not self.pipeline._disabled and
- self.pipeline._consecutive_failures >= self.pipeline.disable_at):
- self.pipeline._disabled = True
- if actions:
- try:
- self.log.info("Reporting item %s, actions: %s" %
- (item, actions))
- ret = self.sendReport(actions, self.pipeline.source, item)
- if ret:
- self.log.error("Reporting item %s received: %s" %
- (item, ret))
- except:
- self.log.exception("Exception while reporting:")
- item.setReportedResult('ERROR')
- self.updateBuildDescriptions(item.current_build_set)
- return ret
-
- def formatDescription(self, build):
- concurrent_changes = ''
- concurrent_builds = ''
- other_builds = ''
-
- for change in build.build_set.other_changes:
- concurrent_changes += '<li><a href="{change.url}">\
- {change.number},{change.patchset}</a></li>'.format(
- change=change)
-
- change = build.build_set.item.change
-
- for build in build.build_set.getBuilds():
- if build.url:
- concurrent_builds += """\
-<li>
- <a href="{build.url}">
- {build.job.name} #{build.number}</a>: {build.result}
-</li>
-""".format(build=build)
- else:
- concurrent_builds += """\
-<li>
- {build.job.name}: {build.result}
-</li>""".format(build=build)
-
- if build.build_set.previous_build_set:
- other_build = build.build_set.previous_build_set.getBuild(
- build.job.name)
- if other_build:
- other_builds += """\
-<li>
- Preceded by: <a href="{build.url}">
- {build.job.name} #{build.number}</a>
-</li>
-""".format(build=other_build)
-
- if build.build_set.next_build_set:
- other_build = build.build_set.next_build_set.getBuild(
- build.job.name)
- if other_build:
- other_builds += """\
-<li>
- Succeeded by: <a href="{build.url}">
- {build.job.name} #{build.number}</a>
-</li>
-""".format(build=other_build)
-
- result = build.build_set.result
-
- if hasattr(change, 'number'):
- ret = """\
-<p>
- Triggered by change:
- <a href="{change.url}">{change.number},{change.patchset}</a><br/>
- Branch: <b>{change.branch}</b><br/>
- Pipeline: <b>{self.pipeline.name}</b>
-</p>"""
- elif hasattr(change, 'ref'):
- ret = """\
-<p>
- Triggered by reference:
- {change.ref}</a><br/>
- Old revision: <b>{change.oldrev}</b><br/>
- New revision: <b>{change.newrev}</b><br/>
- Pipeline: <b>{self.pipeline.name}</b>
-</p>"""
- else:
- ret = ""
-
- if concurrent_changes:
- ret += """\
-<p>
- Other changes tested concurrently with this change:
- <ul>{concurrent_changes}</ul>
-</p>
-"""
- if concurrent_builds:
- ret += """\
-<p>
- All builds for this change set:
- <ul>{concurrent_builds}</ul>
-</p>
-"""
-
- if other_builds:
- ret += """\
-<p>
- Other build sets for this change:
- <ul>{other_builds}</ul>
-</p>
-"""
- if result:
- ret += """\
-<p>
- Reported result: <b>{result}</b>
-</p>
-"""
-
- ret = ret.format(**locals())
- return ret
-
- def reportStats(self, item):
- if not statsd:
- return
- try:
- # Update the gauge on enqueue and dequeue, but timers only
- # when dequeing.
- if item.dequeue_time:
- dt = int((item.dequeue_time - item.enqueue_time) * 1000)
- else:
- dt = None
- items = len(self.pipeline.getAllItems())
-
- # stats.timers.zuul.pipeline.NAME.resident_time
- # stats_counts.zuul.pipeline.NAME.total_changes
- # stats.gauges.zuul.pipeline.NAME.current_changes
- key = 'zuul.pipeline.%s' % self.pipeline.name
- statsd.gauge(key + '.current_changes', items)
- if dt:
- statsd.timing(key + '.resident_time', dt)
- statsd.incr(key + '.total_changes')
-
- # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
- # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
- project_name = item.change.project.name.replace('/', '.')
- key += '.%s' % project_name
- if dt:
- statsd.timing(key + '.resident_time', dt)
- statsd.incr(key + '.total_changes')
- except:
- self.log.exception("Exception reporting pipeline stats")
-
-
-class DynamicChangeQueueContextManager(object):
- def __init__(self, change_queue):
- self.change_queue = change_queue
-
- def __enter__(self):
- return self.change_queue
-
- def __exit__(self, etype, value, tb):
- if self.change_queue and not self.change_queue.queue:
- self.change_queue.pipeline.removeQueue(self.change_queue.queue)
-
-
-class IndependentPipelineManager(BasePipelineManager):
- log = logging.getLogger("zuul.IndependentPipelineManager")
- changes_merge = False
-
- def _postConfig(self, layout):
- super(IndependentPipelineManager, self)._postConfig(layout)
-
- def getChangeQueue(self, change, existing=None):
- # creates a new change queue for every change
- if existing:
- return DynamicChangeQueueContextManager(existing)
- if change.project not in self.pipeline.getProjects():
- self.pipeline.addProject(change.project)
- change_queue = ChangeQueue(self.pipeline)
- change_queue.addProject(change.project)
- self.pipeline.addQueue(change_queue)
- self.log.debug("Dynamically created queue %s", change_queue)
- return DynamicChangeQueueContextManager(change_queue)
-
- def enqueueChangesAhead(self, change, quiet, ignore_requirements,
- change_queue):
- ret = self.checkForChangesNeededBy(change, change_queue)
- if ret in [True, False]:
- return ret
- self.log.debug(" Changes %s must be merged ahead of %s" %
- (ret, change))
- for needed_change in ret:
- # This differs from the dependent pipeline by enqueuing
- # changes ahead as "not live", that is, not intended to
- # have jobs run. Also, pipeline requirements are always
- # ignored (which is safe because the changes are not
- # live).
- r = self.addChange(needed_change, quiet=True,
- ignore_requirements=True,
- live=False, change_queue=change_queue)
- if not r:
- return False
- return True
-
- def checkForChangesNeededBy(self, change, change_queue):
- if self.pipeline.ignore_dependencies:
- return True
- self.log.debug("Checking for changes needed by %s:" % change)
- # Return true if okay to proceed enqueing this change,
- # false if the change should not be enqueued.
- if not hasattr(change, 'needs_changes'):
- self.log.debug(" Changeish does not support dependencies")
- return True
- if not change.needs_changes:
- self.log.debug(" No changes needed")
- return True
- changes_needed = []
- for needed_change in change.needs_changes:
- self.log.debug(" Change %s needs change %s:" % (
- change, needed_change))
- if needed_change.is_merged:
- self.log.debug(" Needed change is merged")
- continue
- if self.isChangeAlreadyInQueue(needed_change, change_queue):
- self.log.debug(" Needed change is already ahead in the queue")
- continue
- self.log.debug(" Change %s is needed" % needed_change)
- if needed_change not in changes_needed:
- changes_needed.append(needed_change)
- continue
- # This differs from the dependent pipeline check in not
- # verifying that the dependent change is mergable.
- if changes_needed:
- return changes_needed
- return True
-
- def dequeueItem(self, item):
- super(IndependentPipelineManager, self).dequeueItem(item)
- # An independent pipeline manager dynamically removes empty
- # queues
- if not item.queue.queue:
- self.pipeline.removeQueue(item.queue)
-
-
-class StaticChangeQueueContextManager(object):
- def __init__(self, change_queue):
- self.change_queue = change_queue
-
- def __enter__(self):
- return self.change_queue
-
- def __exit__(self, etype, value, tb):
- pass
-
-
-class DependentPipelineManager(BasePipelineManager):
- log = logging.getLogger("zuul.DependentPipelineManager")
- changes_merge = True
-
- def __init__(self, *args, **kwargs):
- super(DependentPipelineManager, self).__init__(*args, **kwargs)
-
- def _postConfig(self, layout):
- super(DependentPipelineManager, self)._postConfig(layout)
- self.buildChangeQueues()
-
- def buildChangeQueues(self):
- self.log.debug("Building shared change queues")
- change_queues = []
-
- for project in self.pipeline.getProjects():
- change_queue = ChangeQueue(
- self.pipeline,
- window=self.pipeline.window,
- window_floor=self.pipeline.window_floor,
- window_increase_type=self.pipeline.window_increase_type,
- window_increase_factor=self.pipeline.window_increase_factor,
- window_decrease_type=self.pipeline.window_decrease_type,
- window_decrease_factor=self.pipeline.window_decrease_factor)
- change_queue.addProject(project)
- change_queues.append(change_queue)
- self.log.debug("Created queue: %s" % change_queue)
-
- # Iterate over all queues trying to combine them, and keep doing
- # so until they can not be combined further.
- last_change_queues = change_queues
- while True:
- new_change_queues = self.combineChangeQueues(last_change_queues)
- if len(last_change_queues) == len(new_change_queues):
- break
- last_change_queues = new_change_queues
-
- self.log.info(" Shared change queues:")
- for queue in new_change_queues:
- self.pipeline.addQueue(queue)
- self.log.info(" %s containing %s" % (
- queue, queue.generated_name))
-
- def combineChangeQueues(self, change_queues):
- self.log.debug("Combining shared queues")
- new_change_queues = []
- for a in change_queues:
- merged_a = False
- for b in new_change_queues:
- if not a.getJobs().isdisjoint(b.getJobs()):
- self.log.debug("Merging queue %s into %s" % (a, b))
- b.mergeChangeQueue(a)
- merged_a = True
- break # this breaks out of 'for b' and continues 'for a'
- if not merged_a:
- self.log.debug("Keeping queue %s" % (a))
- new_change_queues.append(a)
- return new_change_queues
-
- def getChangeQueue(self, change, existing=None):
- if existing:
- return StaticChangeQueueContextManager(existing)
- return StaticChangeQueueContextManager(
- self.pipeline.getQueue(change.project))
-
- def isChangeReadyToBeEnqueued(self, change):
- if not self.pipeline.source.canMerge(change,
- self.getSubmitAllowNeeds()):
- self.log.debug("Change %s can not merge, ignoring" % change)
- return False
- return True
-
- def enqueueChangesBehind(self, change, quiet, ignore_requirements,
- change_queue):
- to_enqueue = []
- self.log.debug("Checking for changes needing %s:" % change)
- if not hasattr(change, 'needed_by_changes'):
- self.log.debug(" Changeish does not support dependencies")
- return
- for other_change in change.needed_by_changes:
- with self.getChangeQueue(other_change) as other_change_queue:
- if other_change_queue != change_queue:
- self.log.debug(" Change %s in project %s can not be "
- "enqueued in the target queue %s" %
- (other_change, other_change.project,
- change_queue))
- continue
- if self.pipeline.source.canMerge(other_change,
- self.getSubmitAllowNeeds()):
- self.log.debug(" Change %s needs %s and is ready to merge" %
- (other_change, change))
- to_enqueue.append(other_change)
-
- if not to_enqueue:
- self.log.debug(" No changes need %s" % change)
-
- for other_change in to_enqueue:
- self.addChange(other_change, quiet=quiet,
- ignore_requirements=ignore_requirements,
- change_queue=change_queue)
-
- def enqueueChangesAhead(self, change, quiet, ignore_requirements,
- change_queue):
- ret = self.checkForChangesNeededBy(change, change_queue)
- if ret in [True, False]:
- return ret
- self.log.debug(" Changes %s must be merged ahead of %s" %
- (ret, change))
- for needed_change in ret:
- r = self.addChange(needed_change, quiet=quiet,
- ignore_requirements=ignore_requirements,
- change_queue=change_queue)
- if not r:
- return False
- return True
-
- def checkForChangesNeededBy(self, change, change_queue):
- self.log.debug("Checking for changes needed by %s:" % change)
- # Return true if okay to proceed enqueing this change,
- # false if the change should not be enqueued.
- if not hasattr(change, 'needs_changes'):
- self.log.debug(" Changeish does not support dependencies")
- return True
- if not change.needs_changes:
- self.log.debug(" No changes needed")
- return True
- changes_needed = []
- # Ignore supplied change_queue
- with self.getChangeQueue(change) as change_queue:
- for needed_change in change.needs_changes:
- self.log.debug(" Change %s needs change %s:" % (
- change, needed_change))
- if needed_change.is_merged:
- self.log.debug(" Needed change is merged")
- continue
- with self.getChangeQueue(needed_change) as needed_change_queue:
- if needed_change_queue != change_queue:
- self.log.debug(" Change %s in project %s does not "
- "share a change queue with %s "
- "in project %s" %
- (needed_change, needed_change.project,
- change, change.project))
- return False
- if not needed_change.is_current_patchset:
- self.log.debug(" Needed change is not the "
- "current patchset")
- return False
- if self.isChangeAlreadyInQueue(needed_change, change_queue):
- self.log.debug(" Needed change is already ahead "
- "in the queue")
- continue
- if self.pipeline.source.canMerge(needed_change,
- self.getSubmitAllowNeeds()):
- self.log.debug(" Change %s is needed" % needed_change)
- if needed_change not in changes_needed:
- changes_needed.append(needed_change)
- continue
- # The needed change can't be merged.
- self.log.debug(" Change %s is needed but can not be merged" %
- needed_change)
- return False
- if changes_needed:
- return changes_needed
- return True
-
- def getFailingDependentItems(self, item):
- if not hasattr(item.change, 'needs_changes'):
- return None
- if not item.change.needs_changes:
- return None
- failing_items = set()
- for needed_change in item.change.needs_changes:
- needed_item = self.getItemForChange(needed_change)
- if not needed_item:
- continue
- if needed_item.current_build_set.failing_reasons:
- failing_items.add(needed_item)
- if failing_items:
- return failing_items
- return None
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
index cb4501a..35524d0 100644
--- a/zuul/source/__init__.py
+++ b/zuul/source/__init__.py
@@ -32,9 +32,6 @@
self.sched = sched
self.connection = connection
- def stop(self):
- """Stop the source."""
-
@abc.abstractmethod
def getRefSha(self, project, ref):
"""Return a sha for a given project ref."""
@@ -53,7 +50,7 @@
"""Called after configuration has been processed."""
@abc.abstractmethod
- def getChange(self, event, project):
+ def getChange(self, event):
"""Get the change representing an event."""
@abc.abstractmethod
@@ -63,3 +60,7 @@
@abc.abstractmethod
def getGitUrl(self, project):
"""Get the git url for a project."""
+
+ @abc.abstractmethod
+ def getProject(self, name):
+ """Get a project."""
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
index eb8705d..9c52229 100644
--- a/zuul/source/gerrit.py
+++ b/zuul/source/gerrit.py
@@ -126,18 +126,23 @@
def postConfig(self):
pass
- def getChange(self, event, project):
+ def getProject(self, name):
+ return self.connection.getProject(name)
+
+ def getChange(self, event):
if event.change_number:
refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref:
+ project = self.getProject(event.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getGitwebUrl(project, sha=event.newrev)
else:
+ # TODOv3(jeblair): we need to get the project from the event
change = NullChange(project)
return change
@@ -224,11 +229,7 @@
if 'project' not in data:
raise exceptions.ChangeNotFound(change.number, change.patchset)
- # If updated changed came as a dependent on
- # and its project is not defined,
- # then create a 'foreign' project for it in layout
- change.project = self.sched.getProject(data['project'],
- create_foreign=bool(history))
+ change.project = self.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
diff --git a/zuul/trigger/__init__.py b/zuul/trigger/__init__.py
index 16fb0b1..b200a50 100644
--- a/zuul/trigger/__init__.py
+++ b/zuul/trigger/__init__.py
@@ -28,9 +28,6 @@
self.sched = sched
self.connection = connection
- def stop(self):
- """Stop the trigger."""
-
@abc.abstractmethod
def getEventFilters(self, trigger_conf):
"""Return a list of EventFilter's for the scheduler to match against.