Merge "Squelch ara initializing message from log" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index e2628cb..271dd02 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -7,9 +7,11 @@
voting: false
- tox-pep8
- tox-py35
- - tox-tarball
gate:
jobs:
- tox-docs
- tox-pep8
- tox-py35
+ post:
+ jobs:
+ - publish-openstack-python-branch-tarball
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index aa6d8c8..2c70d47 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -311,10 +311,10 @@
*trusted* execution context, otherwise, it is run in the *untrusted*
execution context.
-Both execution contexts use `bubblewrap`_ to create a namespace to
-ensure that playbook executions are isolated and are unable to access
-files outside of a restricted environment. The administrator may
-configure additional local directories on the executor to be made
+Both execution contexts use `bubblewrap`_ [#nullwrap]_ to create a
+namespace to ensure that playbook executions are isolated and are unable
+to access files outside of a restricted environment. The administrator
+may configure additional local directories on the executor to be made
available to the restricted environment.
The trusted execution context has access to all Ansible features,
@@ -335,6 +335,8 @@
protections are made as part of a defense-in-depth strategy.
.. _bubblewrap: https://github.com/projectatomic/bubblewrap
+.. [#nullwrap] Unless one has set execution_wrapper to nullwrap in the
+ executor configuration.
Configuration
~~~~~~~~~~~~~
@@ -437,6 +439,25 @@
List of paths, separated by ``:`` to read-write bind mount into
untrusted bubblewrap contexts.
+ .. attr:: execution_wrapper
+ :default: bubblewrap
+
+ Name of the execution wrapper to use when executing
+ `ansible-playbook`. The default, `bubblewrap` is recommended for
+ all installations.
+
+ There is also a `nullwrap` driver for situations where one wants
+ to run Zuul without access to bubblewrap or in such a way that
+ bubblewrap may interfere with the jobs themselves. However,
+ `nullwrap` is considered unsafe, as `bubblewrap` provides
+ significant protections against malicious users and accidental
+ breakage in playbooks. As such, `nullwrap` is not recommended
+ for use in production.
+
+ This option, and thus, `nullwrap`, may be removed in the future.
+ `bubblewrap` has become integral to securely operating Zuul. If you
+ have a valid use case for it, we encourage you to let us know.
+
.. attr:: merger
.. attr:: git_user_email
diff --git a/doc/source/admin/installation.rst b/doc/source/admin/installation.rst
index bc61f7e..ae7d571 100644
--- a/doc/source/admin/installation.rst
+++ b/doc/source/admin/installation.rst
@@ -30,7 +30,7 @@
Gearman is a job distribution system that Zuul uses to communicate
with its distributed components. The Zuul scheduler distributes work
-to Zuul mergers and executors use Gearman. You may supply your own
+to Zuul mergers and executors using Gearman. You may supply your own
gearman server, but the Zuul scheduler includes a built-in server
which is recommended. Ensure that all Zuul hosts can communicate with
the gearman server.
@@ -56,7 +56,7 @@
Nodepool uses Zookeeper to communicate internally among its
components, and also to communicate with Zuul. You can run a simple
single-node Zookeeper instance, or a multi-node cluster. Ensure that
-The host running the Zuul scheduler has access to the cluster.
+the host running the Zuul scheduler has access to the cluster.
Ansible
~~~~~~~
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 9c69960..4fed1f9 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -31,12 +31,12 @@
Metrics
~~~~~~~
-The metrics are emitted by the Zuul :ref:`scheduler`:
+These metrics are emitted by the Zuul :ref:`scheduler`:
.. stat:: gerrit.event.<type>
:type: counter
- Gerrit emits different kind of message over its `stream-events`
+ Gerrit emits different kinds of messages over its `stream-events`
interface. Zuul will report counters for each type of event it
receives from Gerrit.
@@ -89,7 +89,7 @@
.. stat:: total_changes
:type: counter
- The number of change processed by the pipeline since Zuul
+ The number of changes processed by the pipeline since Zuul
started.
.. stat:: wait_time
@@ -119,7 +119,7 @@
.. stat:: total_changes
:type: counter
- The number of change for this project processed by the
+ The number of changes for this project processed by the
pipeline since Zuul started.
As an example, given a job named `myjob` triggered by the `gate` pipeline
diff --git a/doc/source/developer/docs.rst b/doc/source/developer/docs.rst
index 6a7256e..0a8bfe5 100644
--- a/doc/source/developer/docs.rst
+++ b/doc/source/developer/docs.rst
@@ -65,7 +65,7 @@
entire hierarchy of the attribute, but emphasises the last portion
(i.e., the field being documented).
-To use the hierarchical features, simply nest with indendtation in the
+To use the hierarchical features, simply nest with indentation in the
normal RST manner.
It supports the ``required`` and ``default`` options and will annotate
diff --git a/tests/base.py b/tests/base.py
index 028a8b1..357dd7a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -481,6 +481,25 @@
self.changes[self.change_number] = c
return c
+ def getFakeBranchCreatedEvent(self, project, branch):
+ path = os.path.join(self.upstream_root, project)
+ repo = git.Repo(path)
+ oldrev = 40 * '0'
+
+ event = {
+ "type": "ref-updated",
+ "submitter": {
+ "name": "User Name",
+ },
+ "refUpdate": {
+ "oldRev": oldrev,
+ "newRev": repo.heads[branch].commit.hexsha,
+ "refName": branch,
+ "project": project,
+ }
+ }
+ return event
+
def review(self, project, changeid, message, action):
number, ps = changeid.split(',')
change = self.changes[int(number)]
@@ -2792,6 +2811,41 @@
files)
return before
+ def newTenantConfig(self, source_name):
+ """ Use this to update the tenant config file in tests
+
+ This will update self.tenant_config_file to point to a temporary file
+ for the duration of this particular test. The content of that file will
+ be taken from FIXTURE_DIR/source_name
+
+ After the test the original value of self.tenant_config_file will be
+ restored.
+
+ :arg str source_name: The path of the file under
+ FIXTURE_DIR that will be used to populate the new tenant
+ config file.
+ """
+ source_path = os.path.join(FIXTURE_DIR, source_name)
+ orig_tenant_config_file = self.tenant_config_file
+ with tempfile.NamedTemporaryFile(
+ delete=False, mode='wb') as new_tenant_config:
+ self.tenant_config_file = new_tenant_config.name
+ with open(source_path, mode='rb') as source_tenant_config:
+ new_tenant_config.write(source_tenant_config.read())
+ self.config['scheduler']['tenant_config'] = self.tenant_config_file
+ self.setupAllProjectKeys()
+ self.log.debug(
+ 'tenant_config_file = {}'.format(self.tenant_config_file))
+
+ def _restoreTenantConfig():
+ self.log.debug(
+ 'restoring tenant_config_file = {}'.format(
+ orig_tenant_config_file))
+ os.unlink(self.tenant_config_file)
+ self.tenant_config_file = orig_tenant_config_file
+ self.config['scheduler']['tenant_config'] = orig_tenant_config_file
+ self.addCleanup(_restoreTenantConfig)
+
def addEvent(self, connection, event):
"""Inject a Fake (Gerrit) event.
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml
new file mode 100644
index 0000000..332db87
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml
@@ -0,0 +1,3 @@
+- hosts: all
+ tasks:
+ - shell: echo "Standard output test {{ zuul.executor.src_root }}"
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
new file mode 100644
index 0000000..a83f0bc
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -0,0 +1,27 @@
+- pipeline:
+ name: check
+ manager: independent
+ post-review: true
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ parent: base
+ name: job-output
+
+- project:
+ name: org/project
+ check:
+ jobs:
+ - job-output
diff --git a/tests/fixtures/config/job-output/git/org_project/README b/tests/fixtures/config/job-output/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
new file mode 100644
index 0000000..208e274
--- /dev/null
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project
diff --git a/tests/fixtures/layout-delayed-repo-init.yaml b/tests/fixtures/layout-delayed-repo-init.yaml
deleted file mode 100644
index 04dc010..0000000
--- a/tests/fixtures/layout-delayed-repo-init.yaml
+++ /dev/null
@@ -1,52 +0,0 @@
-pipelines:
- - name: check
- manager: IndependentPipelineManager
- trigger:
- gerrit:
- - event: patchset-created
- success:
- gerrit:
- Verified: 1
- failure:
- gerrit:
- Verified: -1
-
- - name: post
- manager: IndependentPipelineManager
- trigger:
- gerrit:
- - event: ref-updated
- ref: ^(?!refs/).*$
-
- - name: gate
- manager: DependentPipelineManager
- failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
- trigger:
- gerrit:
- - event: comment-added
- approval:
- - Approved: 1
- success:
- gerrit:
- Verified: 2
- submit: true
- failure:
- gerrit:
- Verified: -2
- start:
- gerrit:
- Verified: 0
- precedence: high
-
-projects:
- - name: org/new-project
- check:
- - project-merge:
- - project-test1
- - project-test2
- gate:
- - project-merge:
- - project-test1
- - project-test2
- post:
- - project-post
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
new file mode 100644
index 0000000..e97d37a
--- /dev/null
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -0,0 +1,77 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- pipeline:
+ name: post
+ manager: independent
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^(?!refs/).*$
+
+- pipeline:
+ name: gate
+ manager: dependent
+ failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - Approved: 1
+ success:
+ gerrit:
+ Verified: 2
+ submit: true
+ failure:
+ gerrit:
+ Verified: -2
+ start:
+ gerrit:
+ Verified: 0
+ precedence: high
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: project-merge
+
+- job:
+ name: project-test1
+
+- job:
+ name: project-test2
+
+- job:
+ name: project-post
+
+- project:
+ name: org/new-project
+ check:
+ jobs:
+ - project-merge
+ - project-test1:
+ dependencies: project-merge
+ - project-test2:
+ dependencies: project-merge
+ gate:
+ jobs:
+ - project-merge:
+ - project-test1:
+ dependencies: project-merge
+ - project-test2:
+ dependencies: project-merge
+ post:
+ jobs:
+ - project-post
diff --git a/tests/fixtures/tenants/delayed-repo-init.yaml b/tests/fixtures/tenants/delayed-repo-init.yaml
new file mode 100644
index 0000000..433e6f7
--- /dev/null
+++ b/tests/fixtures/tenants/delayed-repo-init.yaml
@@ -0,0 +1,11 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project
+ - org/project1
+ - org/project2
+ - org/new-project
diff --git a/tests/unit/test_bubblewrap.py b/tests/unit/test_bubblewrap.py
index d94b3f2..661d868 100644
--- a/tests/unit/test_bubblewrap.py
+++ b/tests/unit/test_bubblewrap.py
@@ -32,14 +32,15 @@
def test_bubblewrap_wraps(self):
bwrap = bubblewrap.BubblewrapDriver()
+ context = bwrap.getExecutionContext()
work_dir = tempfile.mkdtemp()
ssh_agent = SshAgent()
self.addCleanup(ssh_agent.stop)
ssh_agent.start()
- po = bwrap.getPopen(work_dir=work_dir,
- ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
- self.assertTrue(po.passwd_r > 2)
- self.assertTrue(po.group_r > 2)
+ po = context.getPopen(work_dir=work_dir,
+ ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
+ self.assertTrue(po.fds[0] > 2)
+ self.assertTrue(po.fds[1] > 2)
self.assertTrue(work_dir in po.command)
# Now run /usr/bin/id to verify passwd/group entries made it in
true_proc = po(['/usr/bin/id'], stdout=subprocess.PIPE,
@@ -50,19 +51,19 @@
# And that it did not print things on stderr
self.assertEqual(0, len(errs.strip()))
# Make sure the _r's are closed
- self.assertIsNone(po.passwd_r)
- self.assertIsNone(po.group_r)
+ self.assertEqual([], po.fds)
def test_bubblewrap_leak(self):
bwrap = bubblewrap.BubblewrapDriver()
+ context = bwrap.getExecutionContext()
work_dir = tempfile.mkdtemp()
ansible_dir = tempfile.mkdtemp()
ssh_agent = SshAgent()
self.addCleanup(ssh_agent.stop)
ssh_agent.start()
- po = bwrap.getPopen(work_dir=work_dir,
- ansible_dir=ansible_dir,
- ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
+ po = context.getPopen(work_dir=work_dir,
+ ansible_dir=ansible_dir,
+ ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
leak_time = 60
# Use hexadecimal notation to avoid false-positive
true_proc = po(['bash', '-c', 'sleep 0x%X & disown' % leak_time])
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index ce30e7c..6dd8333 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -202,7 +202,7 @@
'name': 'python27',
'parent': 'base',
'pre-run': 'py27-pre',
- 'post-run': 'py27-post',
+ 'post-run': ['py27-post-a', 'py27-post-b'],
'nodes': [{
'name': 'controller',
'label': 'new',
@@ -275,7 +275,8 @@
['base-pre',
'py27-pre'])
self.assertEqual([x.path for x in job.post_run],
- ['py27-post',
+ ['py27-post-a',
+ 'py27-post-b',
'base-post'])
self.assertEqual([x.path for x in job.run],
['playbooks/python27',
@@ -305,7 +306,8 @@
'py27-diablo-pre'])
self.assertEqual([x.path for x in job.post_run],
['py27-diablo-post',
- 'py27-post',
+ 'py27-post-a',
+ 'py27-post-b',
'base-post'])
self.assertEqual([x.path for x in job.run],
['py27-diablo']),
@@ -330,7 +332,8 @@
'py27-essex-pre'])
self.assertEqual([x.path for x in job.post_run],
['py27-essex-post',
- 'py27-post',
+ 'py27-post-a',
+ 'py27-post-b',
'base-post'])
self.assertEqual([x.path for x in job.run],
['playbooks/python27',
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 97d53e0..f33d964 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2759,13 +2759,18 @@
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertIn('Build succeeded', A.messages[0])
- @skip("Disabled for early v3 development")
def test_delayed_repo_init(self):
- self.updateConfigLayout(
- 'tests/fixtures/layout-delayed-repo-init.yaml')
- self.sched.reconfigure(self.config)
-
self.init_repo("org/new-project")
+ files = {'README': ''}
+ self.addCommitToRepo("org/new-project", 'Initial commit',
+ files=files, tag='init')
+ self.newTenantConfig('tenants/delayed-repo-init.yaml')
+ self.commitConfigUpdate(
+ 'common-config',
+ 'layouts/delayed-repo-init.yaml')
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
A = self.fake_gerrit.addFakeChange('org/new-project', 'master', 'A')
A.addApproval('Code-Review', 2)
@@ -3836,19 +3841,23 @@
self.create_branch('org/project2', 'mp')
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
- C = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C')
- C.data['id'] = B.data['id']
+ C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1')
+ C2 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C2',
+ status='ABANDONED')
+ C1.data['id'] = B.data['id']
+ C2.data['id'] = B.data['id']
+
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
- C.addApproval('Code-Review', 2)
+ C1.addApproval('Code-Review', 2)
- # A Depends-On: B+C
+ # A Depends-On: B+C1
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)
- C.addApproval('Approved', 1)
+ C1.addApproval('Approved', 1)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
@@ -3864,10 +3873,10 @@
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
- self.assertEqual(C.data['status'], 'MERGED')
+ self.assertEqual(C1.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
- self.assertEqual(C.reported, 2)
+ self.assertEqual(C1.reported, 2)
changes = self.getJobFromHistory(
'project-merge', 'org/project1').changes
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index adb6bed..60a0986 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import os
import textwrap
@@ -444,6 +445,9 @@
file_dict = {'.zuul.yaml': in_repo_conf,
'playbooks/project-test2.yaml': in_repo_playbook}
self.create_branch('org/project', 'stable')
+ self.fake_gerrit.addEvent(
+ self.fake_gerrit.getFakeBranchCreatedEvent(
+ 'org/project', 'stable'))
A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A',
files=file_dict)
A.addApproval('Code-Review', 2)
@@ -484,6 +488,9 @@
# it from a different branch on a different repo.
self.create_branch('org/project1', 'stable')
+ self.fake_gerrit.addEvent(
+ self.fake_gerrit.getFakeBranchCreatedEvent(
+ 'org/project1', 'stable'))
in_repo_conf = textwrap.dedent(
"""
@@ -1286,8 +1293,7 @@
], ordered=False)
matches = self.searchForContent(self.history[0].jobdir.root,
b'test-password')
- self.assertEqual(set(['/ansible/playbook_0/secrets.yaml',
- '/work/secret-file.txt']),
+ self.assertEqual(set(['/work/secret-file.txt']),
set(matches))
def test_secret_file(self):
@@ -1322,8 +1328,7 @@
], ordered=False)
matches = self.searchForContent(self.history[0].jobdir.root,
b'test-password')
- self.assertEqual(set(['/ansible/playbook_0/secrets.yaml',
- '/work/failure-file.txt']),
+ self.assertEqual(set(['/work/failure-file.txt']),
set(matches))
def test_secret_file_fail(self):
@@ -1334,3 +1339,39 @@
# paths.
self.executor_server.verbose = True
self._test_secret_file_fail()
+
+
+class TestJobOutput(AnsibleZuulTestCase):
+ tenant_config_file = 'config/job-output/main.yaml'
+
+ def _get_file(self, build, path):
+ p = os.path.join(build.jobdir.root, path)
+ with open(p) as f:
+ return f.read()
+
+ def test_job_output(self):
+ # Verify that command standard output appears in the job output
+
+ # This currently only verifies we receive output from
+ # localhost. Notably, it does not verify we receive output
+ # via zuul_console streaming.
+ self.executor_server.keep_jobdir = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='job-output', result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
+ token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+ j = json.loads(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertEqual(token,
+ j[0]['plays'][0]['tasks'][0]
+ ['hosts']['localhost']['stdout'])
+
+ print(self._get_file(self.history[0],
+ 'work/logs/job-output.txt'))
+ self.assertIn(token,
+ self._get_file(self.history[0],
+ 'work/logs/job-output.txt'))
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 1a006bc..4cfd19e 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -259,8 +259,12 @@
is_localhost = True
else:
task_hostvars = result._task._variable_manager._hostvars[task_host]
+ # Normally hosts in the inventory will have ansible_host
+ # or ansible_inventory host defined. The implied
+ # inventory record for 'localhost' will have neither, so
+ # default to that if none are supplied.
if task_hostvars.get('ansible_host', task_hostvars.get(
- 'ansible_inventory_host')) in localhost_names:
+ 'ansible_inventory_host', 'localhost')) in localhost_names:
is_localhost = True
if not is_localhost and is_task:
@@ -487,18 +491,12 @@
def _get_task_hosts(self, task):
# If this task has as delegate to, we don't care about the play hosts,
# we care about the task's delegate target.
- delegate_to = task.delegate_to
- if delegate_to:
- return [delegate_to]
- hosts = self._play.hosts
- if 'all' in hosts:
- # NOTE(jamielennox): play.hosts is purely the list of hosts
- # that was provided not interpretted by inventory. We don't
- # have inventory access here but we can assume that 'all' is
- # everything in hostvars.
- play_vars = self._play._variable_manager._hostvars
- hosts = play_vars.keys()
- return hosts
+ if task.delegate_to:
+ return [task.delegate_to]
+
+ # _restriction returns the parsed/compiled list of hosts after
+ # applying subsets/limits
+ return self._play._variable_manager._inventory._restriction
def _dump_result_dict(self, result_dict):
result_dict = result_dict.copy()
diff --git a/zuul/ansible/library/zuul_afs.py b/zuul/ansible/library/zuul_afs.py
deleted file mode 100644
index 710c15d..0000000
--- a/zuul/ansible/library/zuul_afs.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2016 Red Hat
-#
-# This module is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This software is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this software. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import subprocess
-
-
-def afs_sync(afsuser, afskeytab, afsroot, afssource, afstarget):
- # Find the list of root markers in the just-completed build
- # (usually there will only be one, but some builds produce content
- # at the root *and* at a tag location, or possibly at multiple
- # translation roots).
- src_root_markers = []
- for root, dirnames, filenames in os.walk(afssource):
- if '.root-marker' in filenames:
- src_root_markers.append(root)
-
- output_blocks = []
- # Synchronize the content at each root marker.
- for root_count, src_root in enumerate(src_root_markers):
- # The component of the path between the source root and the
- # current source root marker. May be '.' if there is a marker
- # at the root.
- subpath = os.path.relpath(src_root, afssource)
-
- # Add to our debugging output
- output = dict(subpath=subpath)
- output_blocks.append(output)
-
- # The absolute path to the source (in staging) and destination
- # (in afs) of the build root for the current root marker.
- subsource = os.path.abspath(os.path.join(afssource, subpath))
- subtarget = os.path.abspath(os.path.join(afstarget, subpath))
-
- # Create a filter list for rsync so that we copy exactly the
- # directories we want to without deleting any existing
- # directories in the published site that were placed there by
- # previous builds.
-
- # Exclude any directories under this subpath which have root
- # markers.
- excludes = []
- for root, dirnames, filenames in os.walk(subtarget):
- if '.root-marker' in filenames:
- exclude_subpath = os.path.relpath(root, subtarget)
- if exclude_subpath == '.':
- continue
- excludes.append(os.path.join('/', exclude_subpath))
- output['excludes'] = excludes
-
- filter_file = os.path.join(afsroot, 'filter_%i' % root_count)
-
- with open(filter_file, 'w') as f:
- for exclude in excludes:
- f.write('- %s\n' % exclude)
-
- # Perform the rsync with the filter list.
- rsync_cmd = ' '.join([
- '/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
- "--out-format='<<CHANGED>>%i %n%L'",
- "--filter='merge {filter}'", '{src}/', '{dst}/',
- ])
- mkdir_cmd = ' '.join(['mkdir', '-p', '{dst}/'])
- bash_cmd = ' '.join([
- '/bin/bash', '-c', '"{mkdir_cmd} && {rsync_cmd}"'
- ]).format(
- mkdir_cmd=mkdir_cmd,
- rsync_cmd=rsync_cmd)
-
- k5start_cmd = ' '.join([
- '/usr/bin/k5start', '-t', '-f', '{keytab}', '{user}', '--',
- bash_cmd,
- ])
-
- shell_cmd = k5start_cmd.format(
- src=subsource,
- dst=subtarget,
- filter=filter_file,
- user=afsuser,
- keytab=afskeytab),
- output['source'] = subsource
- output['destination'] = subtarget
- output['output'] = subprocess.check_output(shell_cmd, shell=True)
-
- return output_blocks
-
-
-def main():
- module = AnsibleModule(
- argument_spec=dict(
- user=dict(required=True, type='raw'),
- keytab=dict(required=True, type='raw'),
- root=dict(required=True, type='raw'),
- source=dict(required=True, type='raw'),
- target=dict(required=True, type='raw'),
- )
- )
-
- p = module.params
- output = afs_sync(p['user'], p['keytab'], p['root'],
- p['source'], p['target'])
- module.exit_json(changed=True, build_roots=output)
-
-from ansible.module_utils.basic import * # noqa
-from ansible.module_utils.basic import AnsibleModule
-
-if __name__ == '__main__':
- main()
diff --git a/zuul/configloader.py b/zuul/configloader.py
index aead0d8..c925024 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -507,7 +507,10 @@
pre_run_name, job.roles,
secrets)
job.pre_run = job.pre_run + (pre_run,)
- for post_run_name in as_list(conf.get('post-run')):
+ # NOTE(pabelanger): Reverse the order of our post-run list. We prepend
+ # post-runs for inherits however, we want to execute post-runs in the
+ # order they are listed within the job.
+ for post_run_name in reversed(as_list(conf.get('post-run'))):
post_run = model.PlaybookContext(job.source_context,
post_run_name, job.roles,
secrets)
@@ -1402,7 +1405,17 @@
tenant, config_semaphore)
if 'semaphore' not in classes:
continue
- layout.addSemaphore(SemaphoreParser.fromYaml(config_semaphore))
+ semaphore = SemaphoreParser.fromYaml(config_semaphore)
+ old_semaphore = layout.semaphores.get(semaphore.name)
+ if (old_semaphore and
+ (old_semaphore.source_context.project ==
+ semaphore.source_context.project)):
+ # If a semaphore shows up twice in the same
+ # project, it's probably due to showing up in
+ # two branches. Ignore subsequent
+ # definitions.
+ continue
+ layout.addSemaphore(semaphore)
for config_template in data.project_templates:
classes = TenantParser._getLoadClasses(tenant, config_template)
@@ -1494,7 +1507,9 @@
if trusted:
branches = ['master']
else:
- branches = project.source.getProjectBranches(project, tenant)
+ # Use the cached branch list; since this is a dynamic
+ # reconfiguration there should not be any branch changes.
+ branches = project.unparsed_branch_config.keys()
for branch in branches:
fns1 = []
diff --git a/zuul/driver/__init__.py b/zuul/driver/__init__.py
index 6ac9197..682b251 100644
--- a/zuul/driver/__init__.py
+++ b/zuul/driver/__init__.py
@@ -258,25 +258,22 @@
"""
@abc.abstractmethod
- def getPopen(self, **kwargs):
- """Create and return a subprocess.Popen factory wrapped however the
- driver sees fit.
+ def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+ """Create and return an execution context.
+
+ The execution context is meant to be used for a single
+ invocation of a command.
This method is required by the interface
- :arg dict kwargs: key/values for use by driver as needed
-
- :returns: a callable that takes the same args as subprocess.Popen
- :rtype: Callable
- """
- pass
-
- @abc.abstractmethod
- def setMountsMap(self, state_dir, ro_paths=None, rw_paths=None):
- """Add additional mount point to the execution environment.
-
- :arg str state_dir: the state directory to be read write
:arg list ro_paths: read only files or directories to bind mount
:arg list rw_paths: read write files or directories to bind mount
+ :arg dict secrets: a dictionary where the key is a file path,
+ and the value is the content which should be written to
+ that path in a secure manner.
+
+ :returns: a new ExecutionContext object.
+ :rtype: BaseExecutionContext
+
"""
pass
diff --git a/zuul/driver/bubblewrap/__init__.py b/zuul/driver/bubblewrap/__init__.py
index cbaa609..a8969cc 100644
--- a/zuul/driver/bubblewrap/__init__.py
+++ b/zuul/driver/bubblewrap/__init__.py
@@ -22,18 +22,19 @@
import shlex
import subprocess
import sys
+import threading
import re
from typing import Dict, List # flake8: noqa
from zuul.driver import (Driver, WrapperInterface)
+from zuul.execution_context import BaseExecutionContext
class WrappedPopen(object):
- def __init__(self, command, passwd_r, group_r):
+ def __init__(self, command, fds):
self.command = command
- self.passwd_r = passwd_r
- self.group_r = group_r
+ self.fds = fds
def __call__(self, args, *sub_args, **kwargs):
try:
@@ -45,7 +46,7 @@
# versions. So until we are py3 only we can only bwrap
# things that are close_fds=False
pass_fds = list(kwargs.get('pass_fds', []))
- for fd in (self.passwd_r, self.group_r):
+ for fd in self.fds:
if fd not in pass_fds:
pass_fds.append(fd)
kwargs['pass_fds'] = pass_fds
@@ -55,42 +56,32 @@
return proc
def __del__(self):
- if self.passwd_r:
+ for fd in self.fds:
try:
- os.close(self.passwd_r)
+ os.close(fd)
except OSError:
pass
- self.passwd_r = None
- if self.group_r:
- try:
- os.close(self.group_r)
- except OSError:
- pass
- self.group_r = None
+ self.fds = []
-class BubblewrapDriver(Driver, WrapperInterface):
- name = 'bubblewrap'
- log = logging.getLogger("zuul.BubblewrapDriver")
+class BubblewrapExecutionContext(BaseExecutionContext):
+ log = logging.getLogger("zuul.BubblewrapExecutionContext")
- mounts_map = {'rw': [], 'ro': []} # type: Dict[str, List]
- release_file_re = re.compile('^\W+-release$')
-
- def __init__(self):
- self.bwrap_command = self._bwrap_command()
-
- def reconfigure(self, tenant):
- pass
-
- def stop(self):
- pass
-
- def setMountsMap(self, ro_paths=None, rw_paths=None):
- if not ro_paths:
- ro_paths = []
- if not rw_paths:
- rw_paths = []
+ def __init__(self, bwrap_command, ro_paths, rw_paths, secrets):
+ self.bwrap_command = bwrap_command
self.mounts_map = {'ro': ro_paths, 'rw': rw_paths}
+ self.secrets = secrets
+
+ def startPipeWriter(self, pipe, data):
+ # In case we have a large amount of data to write through a
+ # pipe, spawn a thread to handle the writes.
+ t = threading.Thread(target=self._writer, args=(pipe, data))
+ t.daemon = True
+ t.start()
+
+ def _writer(self, pipe, data):
+ os.write(pipe, data)
+ os.close(pipe)
def getPopen(self, **kwargs):
# Set zuul_dir if it was not passed in
@@ -110,6 +101,9 @@
for bind in self.mounts_map[mount_type]:
bwrap_command.extend([bind_arg, bind, bind])
+ # A list of file descriptors which must be held open so that
+ # bwrap may read from them.
+ read_fds = []
# Need users and groups
uid = os.getuid()
passwd = list(pwd.getpwuid(uid))
@@ -121,6 +115,7 @@
os.write(passwd_w, passwd_bytes)
os.write(passwd_w, b'\n')
os.close(passwd_w)
+ read_fds.append(passwd_r)
gid = os.getgid()
group = grp.getgrgid(gid)
@@ -130,6 +125,20 @@
os.write(group_w, group_bytes)
os.write(group_w, b'\n')
os.close(group_w)
+ read_fds.append(group_r)
+
+ # Create a tmpfs for each directory which holds secrets, and
+ # tell bubblewrap to write the contents to a file therein.
+ secret_dirs = set()
+ for fn, content in self.secrets.items():
+ secret_dir = os.path.dirname(fn)
+ if secret_dir not in secret_dirs:
+ bwrap_command.extend(['--tmpfs', secret_dir])
+ secret_dirs.add(secret_dir)
+ secret_r, secret_w = os.pipe()
+ self.startPipeWriter(secret_w, content.encode('utf8'))
+ bwrap_command.extend(['--file', str(secret_r), fn])
+ read_fds.append(secret_r)
kwargs = dict(kwargs) # Don't update passed in dict
kwargs['uid'] = uid
@@ -141,10 +150,26 @@
self.log.debug("Bubblewrap command: %s",
" ".join(shlex.quote(c) for c in command))
- wrapped_popen = WrappedPopen(command, passwd_r, group_r)
+ wrapped_popen = WrappedPopen(command, read_fds)
return wrapped_popen
+
+class BubblewrapDriver(Driver, WrapperInterface):
+ log = logging.getLogger("zuul.BubblewrapDriver")
+ name = 'bubblewrap'
+
+ release_file_re = re.compile('^\W+-release$')
+
+ def __init__(self):
+ self.bwrap_command = self._bwrap_command()
+
+ def reconfigure(self, tenant):
+ pass
+
+ def stop(self):
+ pass
+
def _bwrap_command(self):
bwrap_command = [
'bwrap',
@@ -185,6 +210,18 @@
return bwrap_command
+ def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+ if not ro_paths:
+ ro_paths = []
+ if not rw_paths:
+ rw_paths = []
+ if not secrets:
+ secrets = {}
+ return BubblewrapExecutionContext(
+ self.bwrap_command,
+ ro_paths, rw_paths,
+ secrets)
+
def main(args=None):
logging.basicConfig(level=logging.DEBUG)
@@ -192,18 +229,27 @@
driver = BubblewrapDriver()
parser = argparse.ArgumentParser()
- parser.add_argument('--ro-bind', nargs='+')
- parser.add_argument('--rw-bind', nargs='+')
+ parser.add_argument('--ro-paths', nargs='+')
+ parser.add_argument('--rw-paths', nargs='+')
+ parser.add_argument('--secret', nargs='+')
parser.add_argument('work_dir')
parser.add_argument('run_args', nargs='+')
cli_args = parser.parse_args()
ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK')
- driver.setMountsMap(cli_args.ro_bind, cli_args.rw_bind)
+ secrets = {}
+ if cli_args.secret:
+ for secret in cli_args.secret:
+ fn, content = secret.split('=', 1)
+ secrets[fn]=content
- popen = driver.getPopen(work_dir=cli_args.work_dir,
- ssh_auth_sock=ssh_auth_sock)
+ context = driver.getExecutionContext(
+ cli_args.ro_paths, cli_args.rw_paths,
+ secrets)
+
+ popen = context.getPopen(work_dir=cli_args.work_dir,
+ ssh_auth_sock=ssh_auth_sock)
x = popen(cli_args.run_args)
x.wait()
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index de72c69..35137c7 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -122,6 +122,17 @@
"from Gerrit. Can not get account information." %
(event.type,))
+ # This checks whether the event created or deleted a branch so
+ # that Zuul may know to perform a reconfiguration on the
+ # project.
+ if event.type == 'ref-updated' and not event.ref.startswith('refs/'):
+ if event.oldrev == '0' * 40:
+ event.branch_created = True
+ event.branch = event.ref
+ if event.newrev == '0' * 40:
+ event.branch_deleted = True
+ event.branch = event.ref
+
if event.change_number:
# TODO(jhesketh): Check if the project exists?
# and self.connection.sched.getProject(event.project_name):
@@ -219,13 +230,16 @@
stdout.channel.close()
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
- client.close()
if ret and ret not in [-1, 130]:
raise Exception("Gerrit error executing stream-events")
except:
self.log.exception("Exception on ssh event stream:")
time.sleep(5)
+ finally:
+ # If we don't close on exceptions to connect we can leak the
+ # connection and DoS Gerrit.
+ client.close()
def run(self):
while not self._stopped:
@@ -456,6 +470,9 @@
self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
+ # This is a git commit dependency. So we only ignore it if it is
+ # already merged. So even if it is "ABANDONED", we should not
+ # ignore it.
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
@@ -467,7 +484,7 @@
"change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
- if (not dep.is_merged) and dep not in needs_changes:
+ if dep.open and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
@@ -479,7 +496,7 @@
self.log.debug("Updating %s: Getting git-needed change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
- if (not dep.is_merged) and dep.is_current_patchset:
+ if dep.open and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id'], change):
@@ -495,7 +512,7 @@
refresh = (dep_num, dep_ps) not in history
dep = self._getChange(
dep_num, dep_ps, refresh=refresh, history=history)
- if (not dep.is_merged) and dep.is_current_patchset:
+ if dep.open and dep.is_current_patchset:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
@@ -720,16 +737,25 @@
return out
def _open(self):
- client = paramiko.SSHClient()
- client.load_system_host_keys()
- client.set_missing_host_key_policy(paramiko.WarningPolicy())
- client.connect(self.server,
- username=self.user,
- port=self.port,
- key_filename=self.keyfile)
- transport = client.get_transport()
- transport.set_keepalive(self.keepalive)
- self.client = client
+ if self.client:
+ # Paramiko needs explicit closes, its possible we will open even
+ # with an unclosed client so explicitly close here.
+ self.client.close()
+ try:
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy())
+ client.connect(self.server,
+ username=self.user,
+ port=self.port,
+ key_filename=self.keyfile)
+ transport = client.get_transport()
+ transport.set_keepalive(self.keepalive)
+ self.client = client
+ except Exception:
+ client.close()
+ self.client = None
+ raise
def _ssh(self, command, stdin_data=None):
if not self.client:
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 616e774..f31df6a 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -163,6 +163,14 @@
# necessary for the scheduler to match against particular branches
event.branch = ref_parts[2]
+ # This checks whether the event created or deleted a branch so
+ # that Zuul may know to perform a reconfiguration on the
+ # project.
+ if event.oldrev == '0' * 40:
+ event.branch_created = True
+ if event.newrev == '0' * 40:
+ event.branch_deleted = True
+
return event
def _event_pull_request(self, body):
@@ -982,5 +990,5 @@
def getSchema():
- github_connection = v.Any(str, v.Schema({}, extra=True))
+ github_connection = v.Any(str, v.Schema(dict))
return github_connection
diff --git a/zuul/driver/nullwrap/__init__.py b/zuul/driver/nullwrap/__init__.py
index 50fea27..178e4c7 100644
--- a/zuul/driver/nullwrap/__init__.py
+++ b/zuul/driver/nullwrap/__init__.py
@@ -18,14 +18,30 @@
import subprocess
from zuul.driver import (Driver, WrapperInterface)
+from zuul.execution_context import BaseExecutionContext
+
+
+class NullExecutionContext(BaseExecutionContext):
+ log = logging.getLogger("zuul.NullExecutionContext")
+
+ def getPopen(self, **kwargs):
+ return subprocess.Popen
class NullwrapDriver(Driver, WrapperInterface):
name = 'nullwrap'
log = logging.getLogger("zuul.NullwrapDriver")
- def getPopen(self, **kwargs):
- return subprocess.Popen
-
- def setMountsMap(self, **kwargs):
- pass
+ def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+ # The bubblewrap driver writes secrets to a tmpfs so that they
+ # don't hit the disk (unless the kernel swaps the memory to
+ # disk, which can be mitigated with encrypted swap). We
+ # haven't implemented similar functionality in nullwrap, so
+ # for safety, raise an exception in that case. If you are
+ # interested in implementing this functionality, please
+ # contact us on the mailing list.
+ if secrets:
+ raise NotImplementedError(
+ "The nullwrap driver does not support the use of secrets. "
+ "Consider using the bubblewrap driver instead.")
+ return NullExecutionContext()
diff --git a/zuul/execution_context/__init__.py b/zuul/execution_context/__init__.py
new file mode 100644
index 0000000..29b2912
--- /dev/null
+++ b/zuul/execution_context/__init__.py
@@ -0,0 +1,40 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+
+class BaseExecutionContext(object, metaclass=abc.ABCMeta):
+ """The execution interface returned by a wrapper.
+
+ Wrapper drivers return instances which implement this interface.
+
+ It is used to hold information and aid in the execution of a
+ single command.
+
+ """
+
+ @abc.abstractmethod
+ def getPopen(self, **kwargs):
+ """Create and return a subprocess.Popen factory wrapped however the
+ driver sees fit.
+
+ This method is required by the interface
+
+ :arg dict kwargs: key/values for use by driver as needed
+
+ :returns: a callable that takes the same args as subprocess.Popen
+ :rtype: Callable
+ """
+ pass
diff --git a/zuul/executor/ansiblelaunchserver.py b/zuul/executor/ansiblelaunchserver.py
deleted file mode 100644
index 18762b2..0000000
--- a/zuul/executor/ansiblelaunchserver.py
+++ /dev/null
@@ -1,1580 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-############################################################################
-# NOTE(jhesketh): This file has been superceeded by zuul/launcher/server.py.
-# It is kept here to make merging master back into v3 easier. Once closer
-# to completion it can be removed.
-############################################################################
-
-
-import json
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import subprocess
-import tempfile
-import threading
-import time
-import traceback
-import uuid
-import Queue
-
-import gear
-import jenkins_jobs.builder
-import jenkins_jobs.formatter
-import zmq
-
-import zuul.ansible.library
-from zuul.lib import commandsocket
-from zuul.lib import yamlutil as yaml
-
-ANSIBLE_WATCHDOG_GRACE = 5 * 60
-ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
-ANSIBLE_DEFAULT_PRE_TIMEOUT = 10 * 60
-ANSIBLE_DEFAULT_POST_TIMEOUT = 30 * 60
-
-
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
- 'verbose', 'unverbose']
-
-
-def boolify(x):
- if isinstance(x, str):
- return bool(int(x))
- return bool(x)
-
-
-class LaunchGearWorker(gear.TextWorker):
- def __init__(self, *args, **kw):
- self.__launch_server = kw.pop('launch_server')
- super(LaunchGearWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- workers = len(self.__launch_server.node_workers)
- delay = (workers ** 2) / 1000.0
- time.sleep(delay)
- return super(LaunchGearWorker, self).handleNoop(packet)
-
-
-class NodeGearWorker(gear.TextWorker):
- MASS_DO = 101
-
- def sendMassDo(self, functions):
- names = [gear.convert_to_bytes(x) for x in functions]
- data = b'\x00'.join(names)
- new_function_dict = {}
- for name in names:
- new_function_dict[name] = gear.FunctionRecord(name)
- self.broadcast_lock.acquire()
- try:
- p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
- self.broadcast(p)
- self.functions = new_function_dict
- finally:
- self.broadcast_lock.release()
-
-
-class Watchdog(object):
- def __init__(self, timeout, function, args):
- self.timeout = timeout
- self.function = function
- self.args = args
- self.thread = threading.Thread(target=self._run)
- self.thread.daemon = True
-
- def _run(self):
- while self._running and time.time() < self.end:
- time.sleep(10)
- if self._running:
- self.function(*self.args)
-
- def start(self):
- self._running = True
- self.end = time.time() + self.timeout
- self.thread.start()
-
- def stop(self):
- self._running = False
-
-
-class JobDir(object):
- def __init__(self, keep=False):
- self.keep = keep
- self.root = tempfile.mkdtemp()
- self.ansible_root = os.path.join(self.root, 'ansible')
- os.makedirs(self.ansible_root)
- self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
- self.inventory = os.path.join(self.ansible_root, 'inventory')
- self.vars = os.path.join(self.ansible_root, 'vars.yaml')
- self.pre_playbook = os.path.join(self.ansible_root, 'pre_playbook')
- self.playbook = os.path.join(self.ansible_root, 'playbook')
- self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
- self.config = os.path.join(self.ansible_root, 'ansible.cfg')
- self.pre_post_config = os.path.join(self.ansible_root,
- 'ansible_pre_post.cfg')
- self.script_root = os.path.join(self.ansible_root, 'scripts')
- self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
- os.makedirs(self.script_root)
- self.staging_root = os.path.join(self.root, 'staging')
- os.makedirs(self.staging_root)
-
- def __enter__(self):
- return self
-
- def __exit__(self, etype, value, tb):
- if not self.keep:
- shutil.rmtree(self.root)
-
-
-class LaunchServer(object):
- log = logging.getLogger("zuul.LaunchServer")
- site_section_re = re.compile('site "(.*?)"')
- node_section_re = re.compile('node "(.*?)"')
-
- def __init__(self, config, keep_jobdir=False):
- self.config = config
- self.options = dict(
- verbose=False
- )
- self.keep_jobdir = keep_jobdir
- self.hostname = socket.gethostname()
- self.registered_functions = set()
- self.node_workers = {}
- self.jobs = {}
- self.builds = {}
- self.zmq_send_queue = Queue.Queue()
- self.termination_queue = Queue.Queue()
- self.sites = {}
- self.static_nodes = {}
- self.command_map = dict(
- reconfigure=self.reconfigure,
- stop=self.stop,
- pause=self.pause,
- unpause=self.unpause,
- release=self.release,
- graceful=self.graceful,
- verbose=self.verboseOn,
- unverbose=self.verboseOff,
- )
-
- if config.has_option('launcher', 'accept_nodes'):
- self.accept_nodes = config.getboolean('launcher',
- 'accept_nodes')
- else:
- self.accept_nodes = True
- self.config_accept_nodes = self.accept_nodes
-
- if self.config.has_option('zuul', 'state_dir'):
- state_dir = os.path.expanduser(
- self.config.get('zuul', 'state_dir'))
- else:
- state_dir = '/var/lib/zuul'
- path = os.path.join(state_dir, 'launcher.socket')
- self.command_socket = commandsocket.CommandSocket(path)
- ansible_dir = os.path.join(state_dir, 'ansible')
- self.library_dir = os.path.join(ansible_dir, 'library')
- if not os.path.exists(self.library_dir):
- os.makedirs(self.library_dir)
- self.pre_post_library_dir = os.path.join(ansible_dir,
- 'pre_post_library')
- if not os.path.exists(self.pre_post_library_dir):
- os.makedirs(self.pre_post_library_dir)
-
- library_path = os.path.dirname(os.path.abspath(
- zuul.ansible.library.__file__))
- # Ansible library modules that should be available to all
- # playbooks:
- all_libs = ['zuul_log.py', 'zuul_console.py', 'zuul_afs.py']
- # Modules that should only be used by job playbooks:
- job_libs = ['command.py']
-
- for fn in all_libs:
- shutil.copy(os.path.join(library_path, fn), self.library_dir)
- shutil.copy(os.path.join(library_path, fn),
- self.pre_post_library_dir)
- for fn in job_libs:
- shutil.copy(os.path.join(library_path, fn), self.library_dir)
-
- def get_config_default(section, option, default):
- if config.has_option(section, option):
- return config.get(section, option)
- return default
-
- for section in config.sections():
- m = self.site_section_re.match(section)
- if m:
- sitename = m.group(1)
- d = {}
- d['host'] = get_config_default(section, 'host', None)
- d['user'] = get_config_default(section, 'user', '')
- d['pass'] = get_config_default(section, 'pass', '')
- d['root'] = get_config_default(section, 'root', '/')
- d['keytab'] = get_config_default(section, 'keytab', None)
- self.sites[sitename] = d
- continue
- m = self.node_section_re.match(section)
- if m:
- nodename = m.group(1)
- d = {}
- d['name'] = nodename
- d['host'] = config.get(section, 'host')
- d['description'] = get_config_default(section,
- 'description', '')
- if config.has_option(section, 'labels'):
- d['labels'] = config.get(section, 'labels').split(',')
- else:
- d['labels'] = []
- self.static_nodes[nodename] = d
- continue
-
- def start(self):
- self._gearman_running = True
- self._zmq_running = True
- self._reaper_running = True
- self._command_running = True
-
- # Setup ZMQ
- self.zcontext = zmq.Context()
- self.zsocket = self.zcontext.socket(zmq.PUB)
- self.zsocket.bind("tcp://*:8888")
-
- # Setup Gearman
- server = self.config.get('gearman', 'server')
- if self.config.has_option('gearman', 'port'):
- port = self.config.get('gearman', 'port')
- else:
- port = 4730
- self.worker = LaunchGearWorker('Zuul Launch Server',
- launch_server=self)
- self.worker.addServer(server, port)
- self.log.debug("Waiting for server")
- self.worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- # Start command socket
- self.log.debug("Starting command processor")
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
- self.command_thread.daemon = True
- self.command_thread.start()
-
- # Load JJB config
- self.loadJobs()
-
- # Start ZMQ worker thread
- self.log.debug("Starting ZMQ processor")
- self.zmq_thread = threading.Thread(target=self.runZMQ)
- self.zmq_thread.daemon = True
- self.zmq_thread.start()
-
- # Start node worker reaper thread
- self.log.debug("Starting reaper")
- self.reaper_thread = threading.Thread(target=self.runReaper)
- self.reaper_thread.daemon = True
- self.reaper_thread.start()
-
- # Start Gearman worker thread
- self.log.debug("Starting worker")
- self.gearman_thread = threading.Thread(target=self.run)
- self.gearman_thread.daemon = True
- self.gearman_thread.start()
-
- # Start static workers
- for node in self.static_nodes.values():
- self.log.debug("Creating static node with arguments: %s" % (node,))
- self._launchWorker(node)
-
- def loadJobs(self):
- self.log.debug("Loading jobs")
- builder = JJB()
- path = self.config.get('launcher', 'jenkins_jobs')
- builder.load_files([path])
- builder.parser.expandYaml()
- unseen = set(self.jobs.keys())
- for job in builder.parser.jobs:
- builder.expandMacros(job)
- self.jobs[job['name']] = job
- unseen.discard(job['name'])
- for name in unseen:
- del self.jobs[name]
-
- def register(self):
- new_functions = set()
- if self.accept_nodes:
- new_functions.add("node_assign:zuul")
- new_functions.add("stop:%s" % self.hostname)
- new_functions.add("set_description:%s" % self.hostname)
- new_functions.add("node_revoke:%s" % self.hostname)
-
- for function in new_functions - self.registered_functions:
- self.worker.registerFunction(function)
- for function in self.registered_functions - new_functions:
- self.worker.unRegisterFunction(function)
- self.registered_functions = new_functions
-
- def reconfigure(self):
- self.log.debug("Reconfiguring")
- self.loadJobs()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='reconfigure'))
- except Exception:
- self.log.exception("Exception sending reconfigure command "
- "to worker:")
- self.log.debug("Reconfiguration complete")
-
- def pause(self):
- self.log.debug("Pausing")
- self.accept_nodes = False
- self.register()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='pause'))
- except Exception:
- self.log.exception("Exception sending pause command "
- "to worker:")
- self.log.debug("Paused")
-
- def unpause(self):
- self.log.debug("Unpausing")
- self.accept_nodes = self.config_accept_nodes
- self.register()
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.queue.put(dict(action='unpause'))
- except Exception:
- self.log.exception("Exception sending unpause command "
- "to worker:")
- self.log.debug("Unpaused")
-
- def release(self):
- self.log.debug("Releasing idle nodes")
- for node in self.node_workers.values():
- if node.name in self.static_nodes:
- continue
- try:
- if node.isAlive():
- node.queue.put(dict(action='release'))
- except Exception:
- self.log.exception("Exception sending release command "
- "to worker:")
- self.log.debug("Finished releasing idle nodes")
-
- def graceful(self):
- # Note: this is run in the command processing thread; no more
- # external commands will be processed after this.
- self.log.debug("Gracefully stopping")
- self.pause()
- self.release()
- self.log.debug("Waiting for all builds to finish")
- while self.builds:
- time.sleep(5)
- self.log.debug("All builds are finished")
- self.stop()
-
- def stop(self):
- self.log.debug("Stopping")
- # First, stop accepting new jobs
- self._gearman_running = False
- self._reaper_running = False
- self.worker.shutdown()
- # Then stop all of the workers
- for node in self.node_workers.values():
- try:
- if node.isAlive():
- node.stop()
- except Exception:
- self.log.exception("Exception sending stop command to worker:")
- # Stop ZMQ afterwords so that the send queue is flushed
- self._zmq_running = False
- self.zmq_send_queue.put(None)
- self.zmq_send_queue.join()
- # Stop command processing
- self._command_running = False
- self.command_socket.stop()
- # Join the gearman thread which was stopped earlier.
- self.gearman_thread.join()
- # The command thread is joined in the join() method of this
- # class, which is called by the command shell.
- self.log.debug("Stopped")
-
- def verboseOn(self):
- self.log.debug("Enabling verbose mode")
- self.options['verbose'] = True
-
- def verboseOff(self):
- self.log.debug("Disabling verbose mode")
- self.options['verbose'] = False
-
- def join(self):
- self.command_thread.join()
-
- def runCommand(self):
- while self._command_running:
- try:
- command = self.command_socket.get()
- self.command_map[command]()
- except Exception:
- self.log.exception("Exception while processing command")
-
- def runZMQ(self):
- while self._zmq_running or not self.zmq_send_queue.empty():
- try:
- item = self.zmq_send_queue.get()
- self.log.debug("Got ZMQ event %s" % (item,))
- if item is None:
- continue
- self.zsocket.send(item)
- except Exception:
- self.log.exception("Exception while processing ZMQ events")
- finally:
- self.zmq_send_queue.task_done()
-
- def run(self):
- while self._gearman_running:
- try:
- job = self.worker.getJob()
- try:
- if job.name.startswith('node_assign:'):
- self.log.debug("Got node_assign job: %s" % job.unique)
- self.assignNode(job)
- elif job.name.startswith('stop:'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- elif job.name.startswith('set_description:'):
- self.log.debug("Got set_description job: %s" %
- job.unique)
- job.sendWorkComplete()
- elif job.name.startswith('node_revoke:'):
- self.log.debug("Got node_revoke job: %s" % job.unique)
- self.revokeNode(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(traceback.format_exc())
- except gear.InterruptedError:
- return
- except Exception:
- self.log.exception("Exception while getting job")
-
- def assignNode(self, job):
- args = json.loads(job.arguments)
- self.log.debug("Assigned node with arguments: %s" % (args,))
- self._launchWorker(args)
- data = dict(manager=self.hostname)
- job.sendWorkData(json.dumps(data))
- job.sendWorkComplete()
-
- def _launchWorker(self, args):
- worker = NodeWorker(self.config, self.jobs, self.builds,
- self.sites, args['name'], args['host'],
- args['description'], args['labels'],
- self.hostname, self.zmq_send_queue,
- self.termination_queue, self.keep_jobdir,
- self.library_dir, self.pre_post_library_dir,
- self.options)
- self.node_workers[worker.name] = worker
-
- worker.thread = threading.Thread(target=worker.run)
- worker.thread.start()
-
- def revokeNode(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Revoke job with arguments: %s" % (args,))
- name = args['name']
- node = self.node_workers.get(name)
- if not node:
- self.log.debug("Unable to find worker %s" % (name,))
- return
- try:
- if node.isAlive():
- node.queue.put(dict(action='stop'))
- else:
- self.log.debug("Node %s is not alive while revoking node" %
- (node.name,))
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
- finally:
- job.sendWorkComplete()
-
- def stopJob(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Stop job with arguments: %s" % (args,))
- unique = args['number']
- build_worker_name = self.builds.get(unique)
- if not build_worker_name:
- self.log.debug("Unable to find build for job %s" % (unique,))
- return
- node = self.node_workers.get(build_worker_name)
- if not node:
- self.log.debug("Unable to find worker for job %s" % (unique,))
- return
- try:
- if node.isAlive():
- node.queue.put(dict(action='abort'))
- else:
- self.log.debug("Node %s is not alive while aborting job" %
- (node.name,))
- except Exception:
- self.log.exception("Exception sending abort command "
- "to worker:")
- finally:
- job.sendWorkComplete()
-
- def runReaper(self):
- # We don't actually care if all the events are processed
- while self._reaper_running:
- try:
- item = self.termination_queue.get()
- self.log.debug("Got termination event %s" % (item,))
- if item is None:
- continue
- worker = self.node_workers[item]
- self.log.debug("Joining %s" % (item,))
- worker.thread.join()
- self.log.debug("Joined %s" % (item,))
- del self.node_workers[item]
- except Exception:
- self.log.exception("Exception while processing "
- "termination events:")
- finally:
- self.termination_queue.task_done()
-
-
-class NodeWorker(object):
- retry_args = dict(register='task_result',
- until='task_result.rc == 0',
- retries=3,
- delay=30)
-
- def __init__(self, config, jobs, builds, sites, name, host,
- description, labels, manager_name, zmq_send_queue,
- termination_queue, keep_jobdir, library_dir,
- pre_post_library_dir, options):
- self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
- self.log.debug("Creating node worker %s" % (name,))
- self.config = config
- self.jobs = jobs
- self.builds = builds
- self.sites = sites
- self.name = name
- self.host = host
- self.description = description
- if not isinstance(labels, list):
- labels = [labels]
- self.labels = labels
- self.thread = None
- self.registered_functions = set()
- # If the unpaused Event is set, that means we should run jobs.
- # If it is clear, then we are paused and should not run jobs.
- self.unpaused = threading.Event()
- self.unpaused.set()
- self._running = True
- self.queue = Queue.Queue()
- self.manager_name = manager_name
- self.zmq_send_queue = zmq_send_queue
- self.termination_queue = termination_queue
- self.keep_jobdir = keep_jobdir
- self.running_job_lock = threading.Lock()
- self.pending_registration = False
- self.registration_lock = threading.Lock()
- self._get_job_lock = threading.Lock()
- self._got_job = False
- self._job_complete_event = threading.Event()
- self._running_job = False
- self._aborted_job = False
- self._watchdog_timeout = False
- self._sent_complete_event = False
- self.ansible_pre_proc = None
- self.ansible_job_proc = None
- self.ansible_post_proc = None
- self.workspace_root = config.get('launcher', 'workspace_root')
- if self.config.has_option('launcher', 'private_key_file'):
- self.private_key_file = config.get('launcher', 'private_key_file')
- else:
- self.private_key_file = '~/.ssh/id_rsa'
- if self.config.has_option('launcher', 'username'):
- self.username = config.get('launcher', 'username')
- else:
- self.username = 'zuul'
- self.library_dir = library_dir
- self.pre_post_library_dir = pre_post_library_dir
- self.options = options
-
- def isAlive(self):
- # Meant to be called from the manager
- if self.thread and self.thread.is_alive():
- return True
- return False
-
- def run(self):
- self.log.debug("Node worker %s starting" % (self.name,))
- server = self.config.get('gearman', 'server')
- if self.config.has_option('gearman', 'port'):
- port = self.config.get('gearman', 'port')
- else:
- port = 4730
- self.worker = NodeGearWorker(self.name)
- self.worker.addServer(server, port)
- self.log.debug("Waiting for server")
- self.worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- self.gearman_thread = threading.Thread(target=self.runGearman)
- self.gearman_thread.daemon = True
- self.gearman_thread.start()
-
- self.log.debug("Started")
-
- while self._running or not self.queue.empty():
- try:
- self._runQueue()
- except Exception:
- self.log.exception("Exception in queue manager:")
-
- def stop(self):
- # If this is called locally, setting _running will be
- # effictive, if it's called remotely, it will not be, but it
- # will be set by the queue thread.
- self.log.debug("Submitting stop request")
- self._running = False
- self.unpaused.set()
- self.queue.put(dict(action='stop'))
- self.queue.join()
-
- def pause(self):
- self.unpaused.clear()
- self.worker.stopWaitingForJobs()
-
- def unpause(self):
- self.unpaused.set()
-
- def release(self):
- # If this node is idle, stop it.
- old_unpaused = self.unpaused.is_set()
- if old_unpaused:
- self.pause()
- with self._get_job_lock:
- if self._got_job:
- self.log.debug("This worker is not idle")
- if old_unpaused:
- self.unpause()
- return
- self.log.debug("Stopping due to release command")
- self.queue.put(dict(action='stop'))
-
- def _runQueue(self):
- item = self.queue.get()
- try:
- if item['action'] == 'stop':
- self.log.debug("Received stop request")
- self._running = False
- self.termination_queue.put(self.name)
- if not self.abortRunningJob():
- self.sendFakeCompleteEvent()
- else:
- self._job_complete_event.wait()
- self.worker.shutdown()
- if item['action'] == 'pause':
- self.log.debug("Received pause request")
- self.pause()
- if item['action'] == 'unpause':
- self.log.debug("Received unpause request")
- self.unpause()
- if item['action'] == 'release':
- self.log.debug("Received release request")
- self.release()
- elif item['action'] == 'reconfigure':
- self.log.debug("Received reconfigure request")
- self.register()
- elif item['action'] == 'abort':
- self.log.debug("Received abort request")
- self.abortRunningJob()
- finally:
- self.queue.task_done()
-
- def runGearman(self):
- while self._running:
- try:
- self.unpaused.wait()
- if self._running:
- self._runGearman()
- except Exception:
- self.log.exception("Exception in gearman manager:")
- with self._get_job_lock:
- self._got_job = False
-
- def _runGearman(self):
- if self.pending_registration:
- self.register()
- with self._get_job_lock:
- try:
- job = self.worker.getJob()
- self._got_job = True
- except gear.InterruptedError:
- return
- self.log.debug("Node worker %s got job %s" % (self.name, job.name))
- try:
- if job.name not in self.registered_functions:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- return
- self.launch(job)
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(traceback.format_exc())
-
- def generateFunctionNames(self, job):
- # This only supports "node: foo" and "node: foo || bar"
- ret = set()
- job_labels = job.get('node')
- matching_labels = set()
- if job_labels:
- job_labels = [x.strip() for x in job_labels.split('||')]
- matching_labels = set(self.labels) & set(job_labels)
- if not matching_labels:
- return ret
- ret.add('build:%s' % (job['name'],))
- for label in matching_labels:
- ret.add('build:%s:%s' % (job['name'], label))
- return ret
-
- def register(self):
- if not self.registration_lock.acquire(False):
- self.log.debug("Registration already in progress")
- return
- try:
- if self._running_job:
- self.pending_registration = True
- self.log.debug("Ignoring registration due to running job")
- return
- self.log.debug("Updating registration")
- self.pending_registration = False
- new_functions = set()
- for job in self.jobs.values():
- new_functions |= self.generateFunctionNames(job)
- self.worker.sendMassDo(new_functions)
- self.registered_functions = new_functions
- finally:
- self.registration_lock.release()
-
- def abortRunningJob(self):
- self._aborted_job = True
- return self.abortRunningProc(self.ansible_job_proc)
-
- def abortRunningProc(self, proc):
- aborted = False
- self.log.debug("Abort: acquiring job lock")
- with self.running_job_lock:
- if self._running_job:
- self.log.debug("Abort: a job is running")
- if proc:
- self.log.debug("Abort: sending kill signal to job "
- "process group")
- try:
- pgid = os.getpgid(proc.pid)
- os.killpg(pgid, signal.SIGKILL)
- aborted = True
- except Exception:
- self.log.exception("Exception while killing "
- "ansible process:")
- else:
- self.log.debug("Abort: no job is running")
-
- return aborted
-
- def launch(self, job):
- self.log.info("Node worker %s launching job %s" %
- (self.name, job.name))
-
- # Make sure we can parse what we need from the job first
- args = json.loads(job.arguments)
- offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
- job_name = job.name.split(':')[1]
-
- # Initialize the result so we have something regardless of
- # whether the job actually runs
- result = None
- self._sent_complete_event = False
- self._aborted_job = False
- self._watchdog_timeout = False
-
- try:
- self.sendStartEvent(job_name, args)
- except Exception:
- self.log.exception("Exception while sending job start event")
-
- try:
- result = self.runJob(job, args)
- except Exception:
- self.log.exception("Exception while launching job thread")
-
- self._running_job = False
-
- try:
- data = json.dumps(dict(result=result))
- job.sendWorkComplete(data)
- except Exception:
- self.log.exception("Exception while sending job completion packet")
-
- try:
- self.sendCompleteEvent(job_name, result, args)
- except Exception:
- self.log.exception("Exception while sending job completion event")
-
- try:
- del self.builds[job.unique]
- except Exception:
- self.log.exception("Exception while clearing build record")
-
- self._job_complete_event.set()
- if offline and self._running:
- self.stop()
-
- def sendStartEvent(self, name, parameters):
- build = dict(node_name=self.name,
- host_name=self.manager_name,
- parameters=parameters)
-
- event = dict(name=name,
- build=build)
-
- item = "onStarted %s" % json.dumps(event)
- self.log.debug("Sending over ZMQ: %s" % (item,))
- self.zmq_send_queue.put(item)
-
- def sendCompleteEvent(self, name, status, parameters):
- build = dict(status=status,
- node_name=self.name,
- host_name=self.manager_name,
- parameters=parameters)
-
- event = dict(name=name,
- build=build)
-
- item = "onFinalized %s" % json.dumps(event)
- self.log.debug("Sending over ZMQ: %s" % (item,))
- self.zmq_send_queue.put(item)
- self._sent_complete_event = True
-
- def sendFakeCompleteEvent(self):
- if self._sent_complete_event:
- return
- self.sendCompleteEvent('zuul:launcher-shutdown',
- 'SUCCESS', {})
-
- def runJob(self, job, args):
- self.ansible_pre_proc = None
- self.ansible_job_proc = None
- self.ansible_post_proc = None
- result = None
- with self.running_job_lock:
- if not self._running:
- return result
- self._running_job = True
- self._job_complete_event.clear()
-
- self.log.debug("Job %s: beginning" % (job.unique,))
- self.builds[job.unique] = self.name
- with JobDir(self.keep_jobdir) as jobdir:
- self.log.debug("Job %s: job root at %s" %
- (job.unique, jobdir.root))
- timeout = self.prepareAnsibleFiles(jobdir, job, args)
-
- data = {
- 'manager': self.manager_name,
- 'number': job.unique,
- }
- if ':' in self.host:
- data['url'] = 'telnet://[%s]:19885' % self.host
- else:
- data['url'] = 'telnet://%s:19885' % self.host
-
- job.sendWorkData(json.dumps(data))
- job.sendWorkStatus(0, 100)
-
- pre_status = self.runAnsiblePrePlaybook(jobdir)
- if pre_status is None:
- # These should really never fail, so return None and have
- # zuul try again
- return result
-
- job_status = self.runAnsiblePlaybook(jobdir, timeout)
- if job_status is None:
- # The result of the job is indeterminate. Zuul will
- # run it again.
- return result
-
- post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
- if not post_status:
- result = 'POST_FAILURE'
- elif job_status:
- result = 'SUCCESS'
- else:
- result = 'FAILURE'
-
- if self._aborted_job and not self._watchdog_timeout:
- # A Null result will cause zuul to relaunch the job if
- # it needs to.
- result = None
-
- return result
-
- def getHostList(self):
- return [('node', dict(
- ansible_host=self.host, ansible_user=self.username))]
-
- def _substituteVariables(self, text, variables):
- def lookup(match):
- return variables.get(match.group(1), '')
- return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
-
- def _getRsyncOptions(self, source, parameters):
- # Treat the publisher source as a filter; ant and rsync behave
- # fairly close in this manner, except for leading directories.
- source = self._substituteVariables(source, parameters)
- # If the source starts with ** then we want to match any
- # number of directories, so don't anchor the include filter.
- # If it does not start with **, then the intent is likely to
- # at least start by matching an immediate file or subdirectory
- # (even if later we have a ** in the middle), so in this case,
- # anchor it to the root of the transfer (the workspace).
- if not source.startswith('**'):
- source = os.path.join('/', source)
- # These options mean: include the thing we want, include any
- # directories (so that we continue to search for the thing we
- # want no matter how deep it is), exclude anything that
- # doesn't match the thing we want or is a directory, then get
- # rid of empty directories left over at the end.
- rsync_opts = ['--include="%s"' % source,
- '--include="*/"',
- '--exclude="*"',
- '--prune-empty-dirs']
- return rsync_opts
-
- def _makeSCPTask(self, jobdir, publisher, parameters):
- tasks = []
- for scpfile in publisher['scp']['files']:
- scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
- os.chmod(scproot, 0o755)
-
- site = publisher['scp']['site']
- if scpfile.get('copy-console'):
- # Include the local ansible directory in the console
- # upload. This uploads the playbook and ansible logs.
- copyargs = dict(src=jobdir.ansible_root + '/',
- dest=os.path.join(scproot, '_zuul_ansible'))
- task = dict(name='copy console log',
- copy=copyargs,
- delegate_to='127.0.0.1')
- # This is a local copy and should not fail, so does
- # not need a retry stanza.
- tasks.append(task)
-
- # Fetch the console log from the remote host.
- src = '/tmp/console.html'
- rsync_opts = []
- else:
- src = parameters['WORKSPACE']
- if not src.endswith('/'):
- src = src + '/'
- rsync_opts = self._getRsyncOptions(scpfile['source'],
- parameters)
-
- syncargs = dict(src=src,
- dest=scproot,
- copy_links='yes',
- mode='pull')
- if rsync_opts:
- syncargs['rsync_opts'] = rsync_opts
- task = dict(name='copy files from node',
- synchronize=syncargs)
- if not scpfile.get('copy-after-failure'):
- task['when'] = 'success|bool'
- # We don't use retry_args here because there is a bug in
- # the synchronize module that breaks subsequent attempts at
- # retrying. Better to try once and get an accurate error
- # message if it fails.
- # https://github.com/ansible/ansible/issues/18281
- tasks.append(task)
-
- task = self._makeSCPTaskLocalAction(
- site, scpfile, scproot, parameters)
- task.update(self.retry_args)
- tasks.append(task)
- return tasks
-
- def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
- if site not in self.sites:
- raise Exception("Undefined SCP site: %s" % (site,))
- site = self.sites[site]
- dest = scpfile['target'].lstrip('/')
- dest = self._substituteVariables(dest, parameters)
- dest = os.path.join(site['root'], dest)
- dest = os.path.normpath(dest)
- if not dest.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (dest,))
-
- rsync_cmd = [
- '/usr/bin/rsync', '--delay-updates', '-F',
- '--compress', '-rt', '--safe-links',
- '--rsync-path="mkdir -p {dest} && rsync"',
- '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
- '-o StrictHostKeyChecking=no -q"',
- '--out-format="<<CHANGED>>%i %n%L"',
- '{source}', '"{user}@{host}:{dest}"'
- ]
- if scpfile.get('keep-hierarchy'):
- source = '"%s/"' % scproot
- else:
- source = '`/usr/bin/find "%s" -type f`' % scproot
- shellargs = ' '.join(rsync_cmd).format(
- source=source,
- dest=dest,
- private_key_file=self.private_key_file,
- host=site['host'],
- user=site['user'])
- task = dict(name='rsync logs to server',
- shell=shellargs,
- delegate_to='127.0.0.1')
- if not scpfile.get('copy-after-failure'):
- task['when'] = 'success|bool'
-
- return task
-
- def _makeFTPTask(self, jobdir, publisher, parameters):
- tasks = []
- ftp = publisher['ftp']
- site = ftp['site']
- if site not in self.sites:
- raise Exception("Undefined FTP site: %s" % site)
- site = self.sites[site]
-
- ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
- ftpcontent = os.path.join(ftproot, 'content')
- os.makedirs(ftpcontent)
- ftpscript = os.path.join(ftproot, 'script')
-
- src = parameters['WORKSPACE']
- if not src.endswith('/'):
- src = src + '/'
- rsync_opts = self._getRsyncOptions(ftp['source'],
- parameters)
- syncargs = dict(src=src,
- dest=ftpcontent,
- copy_links='yes',
- mode='pull')
- if rsync_opts:
- syncargs['rsync_opts'] = rsync_opts
- task = dict(name='copy files from node',
- synchronize=syncargs,
- when='success|bool')
- # We don't use retry_args here because there is a bug in the
- # synchronize module that breaks subsequent attempts at retrying.
- # Better to try once and get an accurate error message if it fails.
- # https://github.com/ansible/ansible/issues/18281
- tasks.append(task)
- task = dict(name='FTP files to server',
- shell='lftp -f %s' % ftpscript,
- when='success|bool',
- delegate_to='127.0.0.1')
- ftpsource = ftpcontent
- if ftp.get('remove-prefix'):
- ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
- while ftpsource[-1] == '/':
- ftpsource = ftpsource[:-1]
- ftptarget = ftp['target'].lstrip('/')
- ftptarget = self._substituteVariables(ftptarget, parameters)
- ftptarget = os.path.join(site['root'], ftptarget)
- ftptarget = os.path.normpath(ftptarget)
- if not ftptarget.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (ftptarget,))
- while ftptarget[-1] == '/':
- ftptarget = ftptarget[:-1]
- with open(ftpscript, 'w') as script:
- script.write('open %s\n' % site['host'])
- script.write('user %s %s\n' % (site['user'], site['pass']))
- script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
- task.update(self.retry_args)
- tasks.append(task)
- return tasks
-
- def _makeAFSTask(self, jobdir, publisher, parameters):
- tasks = []
- afs = publisher['afs']
- site = afs['site']
- if site not in self.sites:
- raise Exception("Undefined AFS site: %s" % site)
- site = self.sites[site]
-
- afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
- afscontent = os.path.join(afsroot, 'content')
- afssource = afscontent
- if afs.get('remove-prefix'):
- afssource = os.path.join(afscontent, afs['remove-prefix'])
- while afssource[-1] == '/':
- afssource = afssource[:-1]
-
- src = parameters['WORKSPACE']
- if not src.endswith('/'):
- src = src + '/'
- rsync_opts = self._getRsyncOptions(afs['source'],
- parameters)
- syncargs = dict(src=src,
- dest=afscontent,
- copy_links='yes',
- mode='pull')
- if rsync_opts:
- syncargs['rsync_opts'] = rsync_opts
- task = dict(name='copy files from node',
- synchronize=syncargs,
- when='success|bool')
- # We don't use retry_args here because there is a bug in the
- # synchronize module that breaks subsequent attempts at retrying.
- # Better to try once and get an accurate error message if it fails.
- # https://github.com/ansible/ansible/issues/18281
- tasks.append(task)
-
- afstarget = afs['target'].lstrip('/')
- afstarget = self._substituteVariables(afstarget, parameters)
- afstarget = os.path.join(site['root'], afstarget)
- afstarget = os.path.normpath(afstarget)
- if not afstarget.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (afstarget,))
-
- afsargs = dict(user=site['user'],
- keytab=site['keytab'],
- root=afsroot,
- source=afssource,
- target=afstarget)
-
- task = dict(name='Synchronize files to AFS',
- zuul_afs=afsargs,
- when='success|bool',
- delegate_to='127.0.0.1')
- tasks.append(task)
-
- return tasks
-
- def _makeBuilderTask(self, jobdir, builder, parameters, sequence):
- tasks = []
- script_fn = '%02d-%s.sh' % (sequence, str(uuid.uuid4().hex))
- script_path = os.path.join(jobdir.script_root, script_fn)
- with open(script_path, 'w') as script:
- data = builder['shell']
- if not data.startswith('#!'):
- data = '#!/bin/bash -x\n %s' % (data,)
- script.write(data)
-
- remote_path = os.path.join('/tmp', script_fn)
- copy = dict(src=script_path,
- dest=remote_path,
- mode=0o555)
- task = dict(copy=copy)
- tasks.append(task)
-
- task = dict(command=remote_path)
- task['name'] = 'command generated from JJB'
- task['environment'] = "{{ zuul.environment }}"
- task['args'] = dict(chdir=parameters['WORKSPACE'])
- tasks.append(task)
-
- filetask = dict(path=remote_path,
- state='absent')
- task = dict(file=filetask)
- tasks.append(task)
-
- return tasks
-
- def _transformPublishers(self, jjb_job):
- early_publishers = []
- late_publishers = []
- old_publishers = jjb_job.get('publishers', [])
- for publisher in old_publishers:
- early_scpfiles = []
- late_scpfiles = []
- if 'scp' not in publisher:
- early_publishers.append(publisher)
- continue
- copy_console = False
- for scpfile in publisher['scp']['files']:
- if scpfile.get('copy-console'):
- scpfile['keep-hierarchy'] = True
- late_scpfiles.append(scpfile)
- copy_console = True
- else:
- early_scpfiles.append(scpfile)
- publisher['scp']['files'] = early_scpfiles + late_scpfiles
- if copy_console:
- late_publishers.append(publisher)
- else:
- early_publishers.append(publisher)
- publishers = early_publishers + late_publishers
- if old_publishers != publishers:
- self.log.debug("Transformed job publishers")
- return early_publishers, late_publishers
-
- def prepareAnsibleFiles(self, jobdir, gearman_job, args):
- job_name = gearman_job.name.split(':')[1]
- jjb_job = self.jobs[job_name]
-
- parameters = args.copy()
- parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
-
- with open(jobdir.inventory, 'w') as inventory:
- for host_name, host_vars in self.getHostList():
- inventory.write(host_name)
- for k, v in host_vars.items():
- inventory.write(' %s=%s' % (k, v))
- inventory.write('\n')
-
- timeout = None
- timeout_var = None
- for wrapper in jjb_job.get('wrappers', []):
- if isinstance(wrapper, dict):
- build_timeout = wrapper.get('timeout')
- if isinstance(build_timeout, dict):
- timeout_var = build_timeout.get('timeout-var')
- timeout = build_timeout.get('timeout')
- if timeout is not None:
- timeout = int(timeout) * 60
- if not timeout:
- timeout = ANSIBLE_DEFAULT_TIMEOUT
- if timeout_var:
- parameters[timeout_var] = str(timeout * 1000)
-
- with open(jobdir.vars, 'w') as vars_yaml:
- variables = dict(
- timeout=timeout,
- environment=parameters,
- )
- zuul_vars = dict(zuul=variables)
- vars_yaml.write(
- yaml.safe_dump(zuul_vars, default_flow_style=False))
-
- with open(jobdir.pre_playbook, 'w') as pre_playbook:
-
- shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
- jobdir.known_hosts)
- tasks = []
- tasks.append(dict(shell=shellargs, delegate_to='127.0.0.1'))
-
- task = dict(file=dict(path='/tmp/console.html', state='absent'))
- tasks.append(task)
-
- task = dict(zuul_console=dict(path='/tmp/console.html',
- port=19885))
- tasks.append(task)
-
- task = dict(file=dict(path=parameters['WORKSPACE'],
- state='directory'))
- tasks.append(task)
-
- msg = [
- "Launched by %s" % self.manager_name,
- "Building remotely on %s in workspace %s" % (
- self.name, parameters['WORKSPACE'])]
- task = dict(zuul_log=dict(msg=msg))
- tasks.append(task)
-
- play = dict(hosts='node', name='Job setup', tasks=tasks)
- pre_playbook.write(
- yaml.safe_dump([play], default_flow_style=False))
-
- with open(jobdir.playbook, 'w') as playbook:
- tasks = []
-
- sequence = 0
- for builder in jjb_job.get('builders', []):
- if 'shell' in builder:
- sequence += 1
- tasks.extend(
- self._makeBuilderTask(jobdir, builder, parameters,
- sequence))
-
- play = dict(hosts='node', name='Job body', tasks=tasks)
- playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
- early_publishers, late_publishers = self._transformPublishers(jjb_job)
-
- with open(jobdir.post_playbook, 'w') as playbook:
- blocks = []
- for publishers in [early_publishers, late_publishers]:
- block = []
- for publisher in publishers:
- if 'scp' in publisher:
- block.extend(self._makeSCPTask(jobdir, publisher,
- parameters))
- if 'ftp' in publisher:
- block.extend(self._makeFTPTask(jobdir, publisher,
- parameters))
- if 'afs' in publisher:
- block.extend(self._makeAFSTask(jobdir, publisher,
- parameters))
- blocks.append(block)
-
- # The 'always' section contains the log publishing tasks,
- # the 'block' contains all the other publishers. This way
- # we run the log publisher regardless of whether the rest
- # of the publishers succeed.
- tasks = []
-
- task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"),
- when='success|bool')
- blocks[0].insert(0, task)
- task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"),
- when='not success|bool and not timedout|bool')
- blocks[0].insert(0, task)
- task = dict(zuul_log=dict(msg="Job timed out, result: FAILURE"),
- when='not success|bool and timedout|bool')
- blocks[0].insert(0, task)
-
- tasks.append(dict(block=blocks[0],
- always=blocks[1]))
-
- play = dict(hosts='node', name='Publishers',
- tasks=tasks)
- playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
- self._writeAnsibleConfig(jobdir, jobdir.config,
- library=self.library_dir)
- self._writeAnsibleConfig(jobdir, jobdir.pre_post_config,
- library=self.pre_post_library_dir)
-
- return timeout
-
- def _writeAnsibleConfig(self, jobdir, fn, library):
- with open(fn, 'w') as config:
- config.write('[defaults]\n')
- config.write('hostfile = %s\n' % jobdir.inventory)
- config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
- config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
- config.write('private_key_file = %s\n' % self.private_key_file)
- config.write('retry_files_enabled = False\n')
- config.write('log_path = %s\n' % jobdir.ansible_log)
- config.write('gathering = explicit\n')
- config.write('library = %s\n' % library)
- # TODO(mordred) This can be removed once we're using ansible 2.2
- config.write('module_set_locale = False\n')
- # bump the timeout because busy nodes may take more than
- # 10s to respond
- config.write('timeout = 30\n')
-
- config.write('[ssh_connection]\n')
- # NB: when setting pipelining = True, keep_remote_files
- # must be False (the default). Otherwise it apparently
- # will override the pipelining option and effectively
- # disable it. Pipelining has a side effect of running the
- # command without a tty (ie, without the -tt argument to
- # ssh). We require this behavior so that if a job runs a
- # command which expects interactive input on a tty (such
- # as sudo) it does not hang.
- config.write('pipelining = True\n')
- ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
- "-o UserKnownHostsFile=%s" % jobdir.known_hosts
- config.write('ssh_args = %s\n' % ssh_args)
-
- def _ansibleTimeout(self, proc, msg):
- self._watchdog_timeout = True
- self.log.warning(msg)
- self.abortRunningProc(proc)
-
- def runAnsiblePrePlaybook(self, jobdir):
- # Set LOGNAME env variable so Ansible log_path log reports
- # the correct user.
- env_copy = os.environ.copy()
- env_copy['LOGNAME'] = 'zuul'
- env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
-
- if self.options['verbose']:
- verbose = '-vvv'
- else:
- verbose = '-v'
-
- cmd = ['ansible-playbook', jobdir.pre_playbook,
- '-e@%s' % jobdir.vars, verbose]
- self.log.debug("Ansible pre command: %s" % (cmd,))
-
- self.ansible_pre_proc = subprocess.Popen(
- cmd,
- cwd=jobdir.ansible_root,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- preexec_fn=os.setsid,
- env=env_copy,
- )
- ret = None
- watchdog = Watchdog(ANSIBLE_DEFAULT_PRE_TIMEOUT,
- self._ansibleTimeout,
- (self.ansible_pre_proc,
- "Ansible pre timeout exceeded"))
- watchdog.start()
- try:
- for line in iter(self.ansible_pre_proc.stdout.readline, b''):
- line = line[:1024].rstrip()
- self.log.debug("Ansible pre output: %s" % (line,))
- ret = self.ansible_pre_proc.wait()
- finally:
- watchdog.stop()
- self.log.debug("Ansible pre exit code: %s" % (ret,))
- self.ansible_pre_proc = None
- return ret == 0
-
- def runAnsiblePlaybook(self, jobdir, timeout):
- # Set LOGNAME env variable so Ansible log_path log reports
- # the correct user.
- env_copy = os.environ.copy()
- env_copy['LOGNAME'] = 'zuul'
- env_copy['ANSIBLE_CONFIG'] = jobdir.config
-
- if self.options['verbose']:
- verbose = '-vvv'
- else:
- verbose = '-v'
-
- cmd = ['ansible-playbook', jobdir.playbook, verbose,
- '-e@%s' % jobdir.vars]
- self.log.debug("Ansible command: %s" % (cmd,))
-
- self.ansible_job_proc = subprocess.Popen(
- cmd,
- cwd=jobdir.ansible_root,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- preexec_fn=os.setsid,
- env=env_copy,
- )
- ret = None
- watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
- self._ansibleTimeout,
- (self.ansible_job_proc,
- "Ansible timeout exceeded"))
- watchdog.start()
- try:
- for line in iter(self.ansible_job_proc.stdout.readline, b''):
- line = line[:1024].rstrip()
- self.log.debug("Ansible output: %s" % (line,))
- ret = self.ansible_job_proc.wait()
- finally:
- watchdog.stop()
- self.log.debug("Ansible exit code: %s" % (ret,))
- self.ansible_job_proc = None
- if self._watchdog_timeout:
- return False
- if ret == 3:
- # AnsibleHostUnreachable: We had a network issue connecting to
- # our zuul-worker.
- return None
- elif ret == -9:
- # Received abort request.
- return None
- return ret == 0
-
- def runAnsiblePostPlaybook(self, jobdir, success):
- # Set LOGNAME env variable so Ansible log_path log reports
- # the correct user.
- env_copy = os.environ.copy()
- env_copy['LOGNAME'] = 'zuul'
- env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
-
- if self.options['verbose']:
- verbose = '-vvv'
- else:
- verbose = '-v'
-
- cmd = ['ansible-playbook', jobdir.post_playbook,
- '-e', 'success=%s' % success,
- '-e', 'timedout=%s' % self._watchdog_timeout,
- '-e@%s' % jobdir.vars,
- verbose]
- self.log.debug("Ansible post command: %s" % (cmd,))
-
- self.ansible_post_proc = subprocess.Popen(
- cmd,
- cwd=jobdir.ansible_root,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- preexec_fn=os.setsid,
- env=env_copy,
- )
- ret = None
- watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
- self._ansibleTimeout,
- (self.ansible_post_proc,
- "Ansible post timeout exceeded"))
- watchdog.start()
- try:
- for line in iter(self.ansible_post_proc.stdout.readline, b''):
- line = line[:1024].rstrip()
- self.log.debug("Ansible post output: %s" % (line,))
- ret = self.ansible_post_proc.wait()
- finally:
- watchdog.stop()
- self.log.debug("Ansible post exit code: %s" % (ret,))
- self.ansible_post_proc = None
- return ret == 0
-
-
-class JJB(jenkins_jobs.builder.Builder):
- def __init__(self):
- self.global_config = None
- self._plugins_list = []
-
- def expandComponent(self, component_type, component, template_data):
- component_list_type = component_type + 's'
- new_components = []
- if isinstance(component, dict):
- name, component_data = next(iter(component.items()))
- if template_data:
- component_data = jenkins_jobs.formatter.deep_format(
- component_data, template_data, True)
- else:
- name = component
- component_data = {}
-
- new_component = self.parser.data.get(component_type, {}).get(name)
- if new_component:
- for new_sub_component in new_component[component_list_type]:
- new_components.extend(
- self.expandComponent(component_type,
- new_sub_component, component_data))
- else:
- new_components.append({name: component_data})
- return new_components
-
- def expandMacros(self, job):
- for component_type in ['builder', 'publisher', 'wrapper']:
- component_list_type = component_type + 's'
- new_components = []
- for new_component in job.get(component_list_type, []):
- new_components.extend(self.expandComponent(component_type,
- new_component, {}))
- job[component_list_type] = new_components
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 45937ef..6a39096 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -255,8 +255,10 @@
self.roles_path = []
self.ansible_config = os.path.join(self.root, 'ansible.cfg')
self.project_link = os.path.join(self.root, 'project')
- self.secrets = os.path.join(self.root, 'secrets.yaml')
- self.has_secrets = False
+ self.secrets_root = os.path.join(self.root, 'secrets')
+ os.makedirs(self.secrets_root)
+ self.secrets = os.path.join(self.secrets_root, 'secrets.yaml')
+ self.secrets_content = None
def addRole(self):
count = len(self.roles)
@@ -922,7 +924,6 @@
args = json.loads(self.job.arguments)
self.log.debug("Beginning job %s for ref %s" %
(self.job.name, args['zuul']['ref']))
- self.log.debug("Args: %s" % (self.job.arguments,))
self.log.debug("Job root: %s" % (self.jobdir.root,))
tasks = []
projects = set()
@@ -1266,6 +1267,13 @@
for role in playbook['roles']:
self.prepareRole(jobdir_playbook, role, args)
+ secrets = playbook['secrets']
+ if secrets:
+ if 'zuul' in secrets:
+ raise Exception("Defining secrets named 'zuul' is not allowed")
+ jobdir_playbook.secrets_content = yaml.safe_dump(
+ secrets, default_flow_style=False)
+
self.writeAnsibleConfig(jobdir_playbook, playbook)
def checkoutTrustedProject(self, project, branch):
@@ -1390,15 +1398,6 @@
def writeAnsibleConfig(self, jobdir_playbook, playbook):
trusted = jobdir_playbook.trusted
- secrets = playbook['secrets'].copy()
- if secrets:
- if 'zuul' in secrets:
- raise Exception("Defining secrets named 'zuul' is not allowed")
- with open(jobdir_playbook.secrets, 'w') as secrets_yaml:
- secrets_yaml.write(
- yaml.safe_dump(secrets, default_flow_style=False))
- jobdir_playbook.has_secrets = True
-
# TODO(mordred) This should likely be extracted into a more generalized
# mechanism for deployers being able to add callback
# plugins.
@@ -1406,17 +1405,13 @@
callback_path = '%s:%s' % (
self.executor_server.callback_dir,
os.path.dirname(ara_callbacks.__file__))
- callback_whitelist = 'zuul_json,ara'
else:
callback_path = self.executor_server.callback_dir
- callback_whitelist = 'zuul_json'
with open(jobdir_playbook.ansible_config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % self.jobdir.inventory)
config.write('local_tmp = %s/local_tmp\n' %
self.jobdir.ansible_cache_root)
- config.write('remote_tmp = %s/remote_tmp\n' %
- self.jobdir.ansible_cache_root)
config.write('retry_files_enabled = False\n')
config.write('gathering = smart\n')
config.write('fact_caching = jsonfile\n')
@@ -1427,7 +1422,6 @@
config.write('command_warnings = False\n')
config.write('callback_plugins = %s\n' % callback_path)
config.write('stdout_callback = zuul_stream\n')
- config.write('callback_whitelist = %s\n' % callback_whitelist)
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
@@ -1446,7 +1440,7 @@
# role. Otherwise, printing the args could be useful for
# debugging.
config.write('display_args_to_stdout = %s\n' %
- str(not secrets))
+ str(not playbook['secrets']))
config.write('[ssh_connection]\n')
# NB: when setting pipelining = True, keep_remote_files
@@ -1514,10 +1508,14 @@
if self.executor_variables_file:
ro_paths.append(self.executor_variables_file)
- self.executor_server.execution_wrapper.setMountsMap(ro_paths,
- rw_paths)
+ secrets = {}
+ if playbook.secrets_content:
+ secrets[playbook.secrets] = playbook.secrets_content
- popen = self.executor_server.execution_wrapper.getPopen(
+ context = self.executor_server.execution_wrapper.getExecutionContext(
+ ro_paths, rw_paths, secrets)
+
+ popen = context.getPopen(
work_dir=self.jobdir.work_root,
ssh_auth_sock=env_copy.get('SSH_AUTH_SOCK'))
@@ -1598,7 +1596,7 @@
verbose = '-v'
cmd = ['ansible-playbook', verbose, playbook.path]
- if playbook.has_secrets:
+ if playbook.secrets_content:
cmd.extend(['-e', '@' + playbook.secrets])
if success is not None:
diff --git a/zuul/model.py b/zuul/model.py
index 5a157bc..850bbe2 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1996,6 +1996,8 @@
# common
self.type = None
self.branch_updated = False
+ self.branch_created = False
+ self.branch_deleted = False
self.ref = None
# For management events (eg: enqueue / promote)
self.tenant_name = None
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a64d9e0..52b34ec 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -753,12 +753,15 @@
"source %s",
e.change, project.source)
continue
- if (event.branch_updated and
- hasattr(change, 'files') and
- change.updatesConfig()):
- # The change that just landed updates the config.
- # Clear out cached data for this project and
- # perform a reconfiguration.
+ if ((event.branch_updated and
+ hasattr(change, 'files') and
+ change.updatesConfig()) or
+ event.branch_created or
+ event.branch_deleted):
+ # The change that just landed updates the config
+ # or a branch was just created or deleted. Clear
+ # out cached data for this project and perform a
+ # reconfiguration.
change.project.unparsed_config = None
self.reconfigureTenant(tenant)
for pipeline in tenant.layout.pipelines.values():