Merge "Fix doc typo that missed important words" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index ff1a523..c081235 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
name: openstack-infra/zuul
infra-check:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
irrelevant-files:
- zuul/cmd/migrate.py
- playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
- playbooks/zuul-migrate/.*
infra-gate:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
+ irrelevant-files:
+ - zuul/cmd/migrate.py
+ - playbooks/zuul-migrate/.*
- tox-pep8
- tox-py35:
irrelevant-files:
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 5e7e0e1..464cb60 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -265,6 +265,22 @@
Directory in which Zuul should clone git repositories.
+ .. attr:: git_http_low_speed_limit
+ :default: 1000
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in bytes, setting to 0 will disable.
+
+ .. attr:: git_http_low_speed_time
+ :default: 30
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in seconds, setting to 0 will disable.
+
.. attr:: git_user_email
Value to pass to `git config user.email
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 4fed1f9..a8b2324 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -33,17 +33,13 @@
These metrics are emitted by the Zuul :ref:`scheduler`:
-.. stat:: gerrit.event.<type>
+.. stat:: zuul.event.<driver>.event.<type>
:type: counter
- Gerrit emits different kinds of messages over its `stream-events`
- interface. Zuul will report counters for each type of event it
- receives from Gerrit.
+ Zuul will report counters for each type of event it receives from
+ each of its configured drivers.
- Refer to your Gerrit installation documentation for a complete
- list of Gerrit event types.
-
-.. stat:: zuul.pipeline
+.. stat:: zuul.<tenant>.pipeline
Holds metrics specific to jobs. This hierarchy includes:
@@ -63,22 +59,60 @@
The number of items currently being processed by this
pipeline.
- .. stat:: job
+ .. stat:: project
- Subtree detailing per jobs statistics:
+ This hierarchy holds more specific metrics for each project
+ participating in the pipeline.
- .. stat:: <jobname>
+ .. stat:: <canonical_hostname>
- The triggered job name.
+ The canonical hostname for the triggering project.
+ Embedded ``.`` characters will be translated to ``_``.
- .. stat:: <result>
- :type: counter, timer
+ .. stat:: <project>
- A counter for each type of result (e.g., ``SUCCESS`` or
- ``FAILURE``, ``ERROR``, etc.) for the job. If the
- result is ``SUCCESS`` or ``FAILURE``, Zuul will
- additionally report the duration of the build as a
- timer.
+ The name of the triggering project. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: <branch>
+
+ The name of the triggering branch. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: job
+
+ Subtree detailing per-project job statistics:
+
+ .. stat:: <jobname>
+
+ The triggered job name.
+
+ .. stat:: <result>
+ :type: counter, timer
+
+ A counter for each type of result (e.g., ``SUCCESS`` or
+ ``FAILURE``, ``ERROR``, etc.) for the job. If the
+ result is ``SUCCESS`` or ``FAILURE``, Zuul will
+ additionally report the duration of the build as a
+ timer.
+
+ .. stat:: current_changes
+ :type: gauge
+
+ The number of items of this project currently being
+ processed by this pipeline.
+
+ .. stat:: resident_time
+ :type: timer
+
+ A timer metric reporting how long each item for this
+ project has been in the pipeline.
+
+ .. stat:: total_changes
+ :type: counter
+
+ The number of changes for this project processed by the
+ pipeline since Zuul started.
.. stat:: resident_time
:type: timer
@@ -98,34 +132,12 @@
How long each item spent in the pipeline before its first job
started.
- .. stat:: <project>
- This hierarchy holds more specific metrics for each project
- participating in the pipeline. If the project name contains
- a ``/`` character, it will be replaced with a ``.``.
+As an example, given a job named `myjob` in `mytenant` triggered by a
+change to `myproject` on the `master` branch in the `gate` pipeline
+which took 40 seconds to build, the Zuul scheduler will emit the
+following statsd events:
- .. stat:: current_changes
- :type: gauge
-
- The number of items of this project currently being
- processed by this pipeline.
-
- .. stat:: resident_time
- :type: timer
-
- A timer metric reporting how long each item for this
- project has been in the pipeline.
-
- .. stat:: total_changes
- :type: counter
-
- The number of changes for this project processed by the
- pipeline since Zuul started.
-
-As an example, given a job named `myjob` triggered by the `gate` pipeline
-which took 40 seconds to build, the Zuul scheduler will emit the following
-statsd events:
-
- * ``zuul.pipeline.gate.job.myjob.SUCCESS`` +1
- * ``zuul.pipeline.gate.job.myjob`` 40 seconds
- * ``zuul.pipeline.gate.all_jobs`` +1
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` +1
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` 40 seconds
+ * ``zuul.tenant.mytenant.pipeline.gate.all_jobs`` +1
diff --git a/requirements.txt b/requirements.txt
index cdffda2..b718827 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,7 +7,11 @@
Paste
WebOb>=1.2.3
paramiko>=1.8.0,<2.0.0
-GitPython>=0.3.3,<2.1.2
+# Using a local fork of gitpython until at least these changes are in a
+# release.
+# https://github.com/gitpython-developers/GitPython/pull/682
+# https://github.com/gitpython-developers/GitPython/pull/686
+-e git+https://github.com/jeblair/GitPython.git@zuul#egg=GitPython
python-daemon>=2.0.4,<2.1.0
extras
statsd>=1.0.0,<3.0
diff --git a/tests/base.py b/tests/base.py
index 5841c08..dacb1ef 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1570,7 +1570,7 @@
parameters = json.loads(job.arguments.decode('utf8'))
if not regex or re.match(regex, parameters.get('job')):
match = True
- if job.name == b'merger:merge':
+ if job.name.startswith(b'merger:'):
if not regex:
match = True
if match:
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
new file mode 100644
index 0000000..bd46718
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+ tasks:
+ - shell: echo "Failure test {{ zuul.executor.src_root }}"
+ - shell: exit 1
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index a83f0bc..f182d8d 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -20,8 +20,19 @@
parent: base
name: job-output
+- job:
+ name: job-output-failure
+ run: playbooks/job-output
+ post-run: playbooks/job-output-failure-post
+
- project:
name: org/project
check:
jobs:
- job-output
+
+- project:
+ name: org/project2
+ check:
+ jobs:
+ - job-output-failure
diff --git a/tests/fixtures/config/job-output/git/org_project2/README b/tests/fixtures/config/job-output/git/org_project2/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project2/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
index 208e274..14b382f 100644
--- a/tests/fixtures/config/job-output/main.yaml
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -6,3 +6,4 @@
- common-config
untrusted-projects:
- org/project
+ - org/project2
diff --git a/tests/fixtures/fake_git.sh b/tests/fixtures/fake_git.sh
new file mode 100755
index 0000000..5b787b7
--- /dev/null
+++ b/tests/fixtures/fake_git.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+ clone)
+ dest=$3
+ mkdir -p $dest/.git
+ ;;
+ version)
+ echo "git version 1.0.0"
+ exit 0
+ ;;
+esac
+sleep 30
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
index e97d37a..c89e2fa 100644
--- a/tests/fixtures/layouts/delayed-repo-init.yaml
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -67,7 +67,7 @@
dependencies: project-merge
gate:
jobs:
- - project-merge:
+ - project-merge
- project-test1:
dependencies: project-merge
- project-test2:
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..ec30a2b 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -19,9 +19,10 @@
import os
import git
+import testtools
from zuul.merger.merger import Repo
-from tests.base import ZuulTestCase
+from tests.base import ZuulTestCase, FIXTURE_DIR
class TestMergerRepo(ZuulTestCase):
@@ -49,7 +50,7 @@
msg='.git file in submodule should be a file')
work_repo = Repo(parent_path, self.workspace_root,
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(
os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +61,7 @@
sub_repo = Repo(
os.path.join(self.upstream_root, 'org/project2'),
os.path.join(self.workspace_root, 'subdir'),
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(os.path.exists(
os.path.join(self.workspace_root, 'subdir', '.git')),
msg='Cloned over the submodule placeholder')
@@ -74,3 +75,28 @@
os.path.join(self.upstream_root, 'org/project2'),
sub_repo.createRepoObject().remotes[0].url,
message="Sub repository points to upstream project2")
+
+ def test_clone_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0',
+ git_timeout=0.001)
+ # TODO: have the merger and repo classes catch fewer
+ # exceptions, including this one on initialization. For the
+ # test, we try cloning again.
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo._ensure_cloned()
+
+ def test_fetch_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0')
+ work_repo.git_timeout = 0.001
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo.update()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request.id)
nodeset = request.nodeset
for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)
+
+ def test_accept_nodes_resubmitted(self):
+ # Test that a resubmitted request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Accept the nodes, passing a different ID
+ self.nodepool.acceptNodes(request, "invalid")
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
+
+ def test_accept_nodes_lost_request(self):
+ # Test that a lost request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ self.zk.deleteNodeRequest(request)
+
+ # Accept the nodes
+ self.nodepool.acceptNodes(request, request.id)
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 65a37ff..3203960 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -89,25 +89,34 @@
self.assertEqual(self.getJobFromHistory('project-test2').node,
'label1')
+ for stat in self.statsd.stats:
+ k, v = stat.decode('utf-8').split(':')
+ self.log.debug('stat %s:%s', k, v)
# TODOv3(jeblair): we may want to report stats by tenant (also?).
# Per-driver
self.assertReportedStat('zuul.event.gerrit.comment-added', value='1|c')
# Per-driver per-connection
self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.current_changes',
- value='1|g')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.resident_time', kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.total_changes',
- value='1|c')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.resident_time', kind='ms')
+ 'zuul.tenant.tenant-one.pipeline.gate.current_changes',
+ value='1|g')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.total_changes', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.total_changes', value='1|c')
for build in self.history:
self.assertTrue(build.parameters['zuul']['voting'])
@@ -4596,6 +4605,50 @@
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
+ def test_job_aborted(self):
+ "Test that if a execute server aborts a job, it is run again"
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+
+ # first abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # second abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # third abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # fourth abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.history), 7)
+ self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 4)
+ self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
def test_rerun_on_abort(self):
"Test that if a execute server fails to run a job, it is run again"
@@ -4715,6 +4768,28 @@
build = self.getJobFromHistory('py27')
self.assertEqual(build.parameters['zuul']['jobtags'], [])
+ def test_pending_merge_in_reconfig(self):
+ # Test that if we are waiting for an outstanding merge on
+ # reconfiguration that we continue to do so.
+ self.gearman_server.hold_merge_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+ self.waitUntilSettled()
+ # Reconfigure while we still have an outstanding merge job
+ self.sched.reconfigureTenant(self.sched.abide.tenants['tenant-one'],
+ None)
+ self.waitUntilSettled()
+ # Verify the merge job is still running and that the item is
+ # in the pipeline
+ self.assertEqual(len(self.sched.merger.jobs), 1)
+ tenant = self.sched.abide.tenants.get('tenant-one')
+ pipeline = tenant.layout.pipelines['post']
+ self.assertEqual(len(pipeline.getAllItems()), 1)
+ self.gearman_server.hold_merge_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
class TestExecutor(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 1c633ba..0d081d5 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,9 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+import io
import json
+import logging
import os
import textwrap
+import gc
+from unittest import skip
import testtools
@@ -170,6 +174,39 @@
self.assertIn('tenant-one-gate', A.messages[1],
"A should transit tenant-one gate")
+ @skip("This test is useful, but not reliable")
+ def test_full_and_dynamic_reconfig(self):
+ self.executor_server.hold_jobs_in_build = True
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project
+ tenant-one-gate:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ gc.collect()
+ pipelines = [obj for obj in gc.get_objects()
+ if isinstance(obj, zuul.model.Pipeline)]
+ self.assertEqual(len(pipelines), 4)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
def test_dynamic_config(self):
in_repo_conf = textwrap.dedent(
"""
@@ -696,6 +733,47 @@
self.assertIn('the only project definition permitted', A.messages[0],
"A should have a syntax error reported")
+ def test_untrusted_depends_on_trusted(self):
+ with open(os.path.join(FIXTURE_DIR,
+ 'config/in-repo/git/',
+ 'common-config/zuul.yaml')) as f:
+ common_config = f.read()
+
+ common_config += textwrap.dedent(
+ """
+ - job:
+ name: project-test9
+ """)
+
+ file_dict = {'zuul.yaml': common_config}
+ A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+ files=file_dict)
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test9
+ """)
+
+ file_dict = {'zuul.yaml': in_repo_conf}
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ files=file_dict)
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 1,
+ "B should report failure")
+ self.assertIn('depends on a change to a config project',
+ B.messages[0],
+ "A should have a syntax error reported")
+
def test_duplicate_node_error(self):
in_repo_conf = textwrap.dedent(
"""
@@ -816,6 +894,150 @@
A.messages[0],
"A should have a syntax error reported")
+ def test_job_list_in_project_template_not_dict_error(self):
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project-template:
+ name: some-jobs
+ check:
+ jobs:
+ - project-test1:
+ - required-projects:
+ org/project2
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertIn('expected str for dictionary value',
+ A.messages[0], "A should have a syntax error reported")
+
+ def test_job_list_in_project_not_dict_error(self):
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project:
+ name: org/project1
+ check:
+ jobs:
+ - project-test1:
+ - required-projects:
+ org/project2
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertIn('expected str for dictionary value',
+ A.messages[0], "A should have a syntax error reported")
+
+ def test_project_template(self):
+ # Tests that a project template is not modified when used, and
+ # can therefore be used in subsequent reconfigurations.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project-template:
+ name: some-jobs
+ tenant-one-gate:
+ jobs:
+ - project-test1:
+ required-projects:
+ - org/project1
+ - project:
+ name: org/project
+ templates:
+ - some-jobs
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ in_repo_conf = textwrap.dedent(
+ """
+ - project:
+ name: org/project1
+ templates:
+ - some-jobs
+ """)
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
+ files=file_dict)
+ B.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertEqual(B.data['status'], 'MERGED')
+
+ def test_job_remove_add(self):
+ # Tests that a job can be removed from one repo and added in another.
+ # First, remove the current config for project1 since it
+ # references the job we want to remove.
+ file_dict = {'.zuul.yaml': None}
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
+ files=file_dict)
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ # Then propose a change to delete the job from one repo...
+ file_dict = {'.zuul.yaml': None}
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ files=file_dict)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ # ...and a second that depends on it that adds it to another repo.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project1
+ check:
+ jobs:
+ - project-test1
+ """)
+ in_repo_playbook = textwrap.dedent(
+ """
+ - hosts: all
+ tasks: []
+ """)
+ file_dict = {'.zuul.yaml': in_repo_conf,
+ 'playbooks/project-test1.yaml': in_repo_playbook}
+ C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C',
+ files=file_dict,
+ parent='refs/changes/1/1/1')
+ C.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ C.subject, B.data['id'])
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+ ], ordered=False)
+
def test_multi_repo(self):
downstream_repo_conf = textwrap.dedent(
"""
@@ -1665,7 +1887,8 @@
return f.read()
def test_job_output(self):
- # Verify that command standard output appears in the job output
+ # Verify that command standard output appears in the job output,
+ # and that failures in the final playbook get logged.
# This currently only verifies we receive output from
# localhost. Notably, it does not verify we receive output
@@ -1690,3 +1913,36 @@
self.assertIn(token,
self._get_file(self.history[0],
'work/logs/job-output.txt'))
+
+ def test_job_output_failure_log(self):
+ logger = logging.getLogger('zuul.AnsibleJob')
+ output = io.StringIO()
+ logger.addHandler(logging.StreamHandler(output))
+
+ # Verify that a failure in the last post playbook emits the contents
+ # of the json output to the log
+ self.executor_server.keep_jobdir = True
+ A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='job-output-failure',
+ result='POST_FAILURE', changes='1,1'),
+ ], ordered=False)
+
+ token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+ j = json.loads(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertEqual(token,
+ j[0]['plays'][0]['tasks'][0]
+ ['hosts']['localhost']['stdout'])
+
+ print(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertIn(token,
+ self._get_file(self.history[0],
+ 'work/logs/job-output.txt'))
+
+ log_output = output.getvalue()
+ self.assertIn('Final playbook failed', log_output)
+ self.assertIn('Failure test', log_output)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 6a9ba01..79c8b39 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -11,6 +11,7 @@
# under the License.
import base64
+import collections
from contextlib import contextmanager
import copy
import os
@@ -397,39 +398,42 @@
secret = {vs.Required('name'): str,
vs.Required('secret'): str}
- job = {vs.Required('name'): str,
- 'parent': vs.Any(str, None),
- 'final': bool,
- 'failure-message': str,
- 'success-message': str,
- 'failure-url': str,
- 'success-url': str,
- 'hold-following-changes': bool,
- 'voting': bool,
- 'semaphore': str,
- 'tags': to_list(str),
- 'branches': to_list(str),
- 'files': to_list(str),
- 'secrets': to_list(vs.Any(secret, str)),
- 'irrelevant-files': to_list(str),
- # validation happens in NodeSetParser
- 'nodeset': vs.Any(dict, str),
- 'timeout': int,
- 'attempts': int,
- 'pre-run': to_list(str),
- 'post-run': to_list(str),
- 'run': str,
- '_source_context': model.SourceContext,
- '_start_mark': ZuulMark,
- 'roles': to_list(role),
- 'required-projects': to_list(vs.Any(job_project, str)),
- 'vars': dict,
- 'dependencies': to_list(str),
- 'allowed-projects': to_list(str),
- 'override-branch': str,
- 'description': str,
- 'post-review': bool
- }
+ # Attributes of a job that can also be used in Project and ProjectTemplate
+ job_attributes = {'parent': vs.Any(str, None),
+ 'final': bool,
+ 'failure-message': str,
+ 'success-message': str,
+ 'failure-url': str,
+ 'success-url': str,
+ 'hold-following-changes': bool,
+ 'voting': bool,
+ 'semaphore': str,
+ 'tags': to_list(str),
+ 'branches': to_list(str),
+ 'files': to_list(str),
+ 'secrets': to_list(vs.Any(secret, str)),
+ 'irrelevant-files': to_list(str),
+ # validation happens in NodeSetParser
+ 'nodeset': vs.Any(dict, str),
+ 'timeout': int,
+ 'attempts': int,
+ 'pre-run': to_list(str),
+ 'post-run': to_list(str),
+ 'run': str,
+ '_source_context': model.SourceContext,
+ '_start_mark': ZuulMark,
+ 'roles': to_list(role),
+ 'required-projects': to_list(vs.Any(job_project, str)),
+ 'vars': dict,
+ 'dependencies': to_list(str),
+ 'allowed-projects': to_list(str),
+ 'override-branch': str,
+ 'description': str,
+ 'post-review': bool}
+
+ job_name = {vs.Required('name'): str}
+
+ job = dict(collections.ChainMap(job_name, job_attributes))
schema = vs.Schema(job)
@@ -474,22 +478,27 @@
return None
@staticmethod
- def fromYaml(tenant, layout, conf, project_pipeline=False):
- with configuration_exceptions('job', conf):
- JobParser.schema(conf)
+ def fromYaml(tenant, layout, conf, project_pipeline=False,
+ name=None, validate=True):
+ if validate:
+ with configuration_exceptions('job', conf):
+ JobParser.schema(conf)
+
+ if name is None:
+ name = conf['name']
# NB: The default detection system in the Job class requires
# that we always assign values directly rather than modifying
# them (e.g., "job.run = ..." rather than
# "job.run.append(...)").
- reference = layout.jobs.get(conf['name'], [None])[0]
+ reference = layout.jobs.get(name, [None])[0]
- job = model.Job(conf['name'])
+ job = model.Job(name)
job.source_context = conf.get('_source_context')
job.source_line = conf.get('_start_mark').line + 1
- is_variant = layout.hasJob(conf['name'])
+ is_variant = layout.hasJob(name)
if 'parent' in conf:
if conf['parent'] is not None:
# Parent job is explicitly specified, so inherit from it.
@@ -725,9 +734,12 @@
'_start_mark': ZuulMark,
}
+ job = {str: vs.Any(str, JobParser.job_attributes)}
+ job_list = [vs.Any(str, job)]
+ pipeline_contents = {'queue': str, 'jobs': job_list}
+
for p in self.layout.pipelines.values():
- project_template[p.name] = {'queue': str,
- 'jobs': [vs.Any(str, dict)]}
+ project_template[p.name] = pipeline_contents
return vs.Schema(project_template)
def fromYaml(self, conf, validate=True):
@@ -752,16 +764,11 @@
def parseJobList(self, conf, source_context, start_mark, job_list):
for conf_job in conf:
if isinstance(conf_job, str):
- attrs = dict(name=conf_job)
+ jobname = conf_job
+ attrs = {}
elif isinstance(conf_job, dict):
# A dictionary in a job tree may override params
jobname, attrs = list(conf_job.items())[0]
- if attrs:
- # We are overriding params, so make a new job def
- attrs['name'] = jobname
- else:
- # Not overriding, so add a blank job
- attrs = dict(name=jobname)
else:
raise Exception("Job must be a string or dictionary")
attrs['_source_context'] = source_context
@@ -770,10 +777,11 @@
# validate that the job is existing
with configuration_exceptions('project or project-template',
attrs):
- self.layout.getJob(attrs['name'])
+ self.layout.getJob(jobname)
job_list.addJob(JobParser.fromYaml(self.tenant, self.layout,
- attrs, project_pipeline=True))
+ attrs, project_pipeline=True,
+ name=jobname, validate=False))
class ProjectParser(object):
@@ -796,9 +804,12 @@
'_start_mark': ZuulMark,
}
+ job = {str: vs.Any(str, JobParser.job_attributes)}
+ job_list = [vs.Any(str, job)]
+ pipeline_contents = {'queue': str, 'jobs': job_list}
+
for p in self.layout.pipelines.values():
- project[p.name] = {'queue': str,
- 'jobs': [vs.Any(str, dict)]}
+ project[p.name] = pipeline_contents
return vs.Schema(project)
def fromYaml(self, conf_list):
@@ -1621,9 +1632,22 @@
for branch in branches:
fns1 = []
fns2 = []
- files_list = files.connections.get(
+ files_entry = files.connections.get(
project.source.connection.connection_name, {}).get(
- project.name, {}).get(branch, {}).keys()
+ project.name, {}).get(branch)
+ # If there is no files entry at all for this
+ # project-branch, then use the cached config.
+ if files_entry is None:
+ if trusted:
+ incdata = project.unparsed_config
+ else:
+ incdata = project.unparsed_branch_config.get(branch)
+ if incdata:
+ config.extend(incdata)
+ continue
+ # Otherwise, do not use the cached config (even if the
+ # files are empty as that likely means they were deleted).
+ files_list = files_entry.keys()
for fn in files_list:
if fn.startswith("zuul.d/"):
fns1.append(fn)
@@ -1655,14 +1679,6 @@
config.extend(incdata)
- if not loaded:
- if trusted:
- incdata = project.unparsed_config
- else:
- incdata = project.unparsed_branch_config.get(branch)
- if incdata:
- config.extend(incdata)
-
def createDynamicLayout(self, tenant, files,
include_config_projects=False,
scheduler=None, connections=None):
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index cde773d..0f8d7d7 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -373,6 +373,9 @@
result = 'RETRY_LIMIT'
else:
build.retry = True
+ if result in ('DISCONNECT', 'ABORTED'):
+ # Always retry if the executor just went away
+ build.retry = True
result_data = data.get('data', {})
self.log.info("Build %s complete, result %s" %
(job, result))
@@ -404,7 +407,7 @@
def onDisconnect(self, job):
self.log.info("Gearman job %s lost due to disconnect" % job)
- self.onBuildCompleted(job)
+ self.onBuildCompleted(job, 'DISCONNECT')
def onUnknownJob(self, job):
self.log.info("Gearman job %s lost due to unknown handle" % job)
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 82921fb..d739c18 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -562,6 +562,10 @@
'disk_limit_per_job', 250))
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+ self.merge_speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ self.merge_speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
execution_wrapper_name = get_default(self.config, 'executor',
'execution_wrapper', 'bubblewrap')
load_multiplier = float(get_default(self.config, 'executor',
@@ -610,7 +614,7 @@
self.job_workers = {}
self.disk_accountant = DiskAccountant(self.jobdir_root,
self.disk_limit_per_job,
- self.stopJobByJobdir,
+ self.stopJobDiskFull,
self.merge_root)
def _getMerger(self, root, logger=None):
@@ -618,9 +622,9 @@
cache_root = self.merge_root
else:
cache_root = None
- return zuul.merger.merger.Merger(root, self.connections,
- self.merge_email, self.merge_name,
- cache_root, logger)
+ return zuul.merger.merger.Merger(
+ root, self.connections, self.merge_email, self.merge_name,
+ self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
def start(self):
self._running = True
@@ -843,9 +847,9 @@
def finishJob(self, unique):
del(self.job_workers[unique])
- def stopJobByJobdir(self, jobdir):
+ def stopJobDiskFull(self, jobdir):
unique = os.path.basename(jobdir)
- self.stopJobByUnique(unique)
+ self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
def stopJob(self, job):
try:
@@ -856,13 +860,13 @@
finally:
job.sendWorkComplete()
- def stopJobByUnique(self, unique):
+ def stopJobByUnique(self, unique, reason=None):
job_worker = self.job_workers.get(unique)
if not job_worker:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
- job_worker.stop()
+ job_worker.stop(reason)
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
@@ -914,12 +918,14 @@
RESULT_TIMED_OUT = 2
RESULT_UNREACHABLE = 3
RESULT_ABORTED = 4
+ RESULT_DISK_FULL = 5
RESULT_MAP = {
RESULT_NORMAL: 'RESULT_NORMAL',
RESULT_TIMED_OUT: 'RESULT_TIMED_OUT',
RESULT_UNREACHABLE: 'RESULT_UNREACHABLE',
RESULT_ABORTED: 'RESULT_ABORTED',
+ RESULT_DISK_FULL: 'RESULT_DISK_FULL',
}
def __init__(self, executor_server, job):
@@ -932,6 +938,7 @@
self.proc_lock = threading.Lock()
self.running = False
self.aborted = False
+ self.aborted_reason = None
self.thread = None
self.private_key_file = get_default(self.executor_server.config,
'executor', 'private_key_file',
@@ -949,8 +956,9 @@
self.thread = threading.Thread(target=self.execute)
self.thread.start()
- def stop(self):
+ def stop(self, reason=None):
self.aborted = True
+ self.aborted_reason = reason
self.abortRunningProc()
if self.thread:
self.thread.join()
@@ -1098,6 +1106,8 @@
self.job.sendWorkStatus(0, 100)
result = self.runPlaybooks(args)
+ if self.aborted_reason == self.RESULT_DISK_FULL:
+ result = 'DISK_FULL'
data = self.getResultData()
result_data = json.dumps(dict(result=result,
data=data))
@@ -1215,12 +1225,41 @@
post_status, post_code = self.runAnsiblePlaybook(
playbook, args['timeout'], success, phase='post', index=index)
if post_status != self.RESULT_NORMAL or post_code != 0:
+ success = False
# If we encountered a pre-failure, that takes
# precedence over the post result.
if not pre_failed:
result = 'POST_FAILURE'
+ if (index + 1) == len(self.jobdir.post_playbooks):
+ self._logFinalPlaybookError()
+
return result
+ def _logFinalPlaybookError(self):
+ # Failures in the final post playbook can include failures
+ # uploading logs, which makes diagnosing issues difficult.
+ # Grab the output from the last playbook from the json
+ # file and log it.
+ json_output = self.jobdir.job_output_file.replace('txt', 'json')
+ self.log.debug("Final playbook failed")
+ if not os.path.exists(json_output):
+ self.log.debug("JSON logfile {logfile} is missing".format(
+ logfile=json_output))
+ return
+ try:
+ output = json.load(open(json_output, 'r'))
+ last_playbook = output[-1]
+ # Transform json to yaml - because it's easier to read and given
+ # the size of the data it'll be extra-hard to read this as an
+ # all on one line stringified nested dict.
+ yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
+ for line in yaml_out.split('\n'):
+ self.log.debug(line)
+ except Exception:
+ self.log.exception(
+ "Could not decode json from {logfile}".format(
+ logfile=json_output))
+
def getHostList(self, args):
hosts = []
for node in args['nodes']:
@@ -1693,6 +1732,13 @@
elif ret == -9:
# Received abort request.
return (self.RESULT_ABORTED, None)
+ elif ret == 1:
+ if syntax_buffer[0].startswith('ERROR!'):
+ with open(self.jobdir.job_output_file, 'a') as job_output:
+ for line in syntax_buffer:
+ job_output.write("{now} | {line}\n".format(
+ now=datetime.datetime.now(),
+ line=line.decode('utf-8').rstrip()))
elif ret == 4:
# Ansible could not parse the yaml.
self.log.debug("Ansible parse error")
@@ -1759,15 +1805,15 @@
if result is not None:
result = self.RESULT_MAP[result]
msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
- msg.format(phase=phase, step=step, result=result,
- trusted=trusted, playbook=playbook, branch=branch)
+ msg = msg.format(phase=phase, step=step, result=result,
+ trusted=trusted, playbook=playbook, branch=branch)
else:
msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
- msg.format(phase=phase, step=step, trusted=trusted,
- playbook=playbook, branch=branch)
+ msg = msg.format(phase=phase, step=step, trusted=trusted,
+ playbook=playbook, branch=branch)
with open(self.jobdir.job_output_file, 'a') as job_output:
- job_output.write("{now} | {msg}".format(
+ job_output.write("{now} | {msg}\n".format(
now=datetime.datetime.now(),
msg=msg))
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0d53fc8..edea69c 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -11,6 +11,7 @@
# under the License.
import logging
+import textwrap
from zuul import exceptions
from zuul import model
@@ -229,20 +230,17 @@
(item.change, change_queue))
change_queue.enqueueItem(item)
- # Get an updated copy of the layout if necessary.
- # This will return one of the following:
- # 1) An existing layout from the item ahead or pipeline.
- # 2) A newly created layout from the cached pipeline
- # layout config plus the previously returned
- # in-repo files stored in the buildset.
- # 3) None in the case that a fetch of the files from
- # the merger is still pending.
- item.layout = self.getLayout(item)
-
- # Rebuild the frozen job tree from the new layout, if
- # we have one. If not, it will be built later.
- if item.layout:
- item.freezeJobGraph()
+ # Get an updated copy of the layout and update the job
+ # graph if necessary. This resumes the buildset merge
+ # state machine. If we have an up-to-date layout, it
+ # will go ahead and refresh the job graph if needed;
+ # or it will send a new merge job if necessary, or it
+ # will do nothing if we're waiting on a merge job.
+ item.job_graph = None
+ item.layout = None
+ if item.active:
+ if self.prepareItem(item):
+ self.prepareJobs(item)
# Re-set build results in case any new jobs have been
# added to the tree.
@@ -430,32 +428,60 @@
import zuul.configloader
loader = zuul.configloader.ConfigLoader()
+ self.log.debug("Loading dynamic layout")
+ (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
build_set = item.current_build_set
+ trusted_layout_verified = False
try:
# First parse the config as it will land with the
# full set of config and project repos. This lets us
# catch syntax errors in config repos even though we won't
# actually run with that config.
- self.log.debug("Loading dynamic layout (phase 1)")
- loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=True,
- scheduler=self.sched,
- connections=self.sched.connections)
+ if trusted_updates:
+ self.log.debug("Loading dynamic layout (phase 1)")
+ loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=True,
+ scheduler=self.sched,
+ connections=self.sched.connections)
+ trusted_layout_verified = True
# Then create the config a second time but without changes
# to config repos so that we actually use this config.
- self.log.debug("Loading dynamic layout (phase 2)")
- layout = loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=False)
+ if untrusted_updates:
+ self.log.debug("Loading dynamic layout (phase 2)")
+ layout = loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=False)
+ else:
+ # We're a change to a config repo (with no untrusted
+ # items ahead), so just use the most recently
+ # generated layout.
+ if item.item_ahead:
+ return item.item_ahead.layout
+ else:
+ return item.queue.pipeline.layout
self.log.debug("Loading dynamic layout complete")
except zuul.configloader.ConfigurationSyntaxError as e:
- self.log.info("Configuration syntax error "
- "in dynamic layout")
- item.setConfigError(str(e))
+ self.log.info("Configuration syntax error in dynamic layout")
+ if trusted_layout_verified:
+ # The config is good if we include config-projects,
+ # but is currently invalid if we omit them. Instead
+ # of returning the whole error message, just leave a
+ # note that the config will work once the dependent
+ # changes land.
+ msg = "This change depends on a change "\
+ "to a config project.\n\n"
+ msg += textwrap.fill(textwrap.dedent("""\
+ The syntax of the configuration in this change has
+ been verified to be correct once the config project
+ change upon which it depends is merged, but it can not
+ be used until that occurs."""))
+ item.setConfigError(msg)
+ else:
+ item.setConfigError(str(e))
return None
except Exception:
self.log.exception("Error in dynamic layout")
@@ -811,19 +837,28 @@
dt = None
items = len(self.pipeline.getAllItems())
- # stats.timers.zuul.pipeline.NAME.resident_time
- # stats_counts.zuul.pipeline.NAME.total_changes
- # stats.gauges.zuul.pipeline.NAME.current_changes
- key = 'zuul.pipeline.%s' % self.pipeline.name
+ tenant = self.pipeline.layout.tenant
+ basekey = 'zuul.tenant.%s' % tenant.name
+ key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.total_changes
+ # stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
self.sched.statsd.gauge(key + '.current_changes', items)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
- # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
- project_name = item.change.project.name.replace('/', '.')
- key += '.%s' % project_name
+ hostname = (item.change.project.canonical_hostname.
+ replace('.', '_'))
+ projectname = (item.change.project.name.
+ replace('.', '_').replace('/', '.'))
+ projectname = projectname.replace('.', '_').replace('/', '.')
+ branchname = item.change.branch.replace('.', '_').replace('/', '.')
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.total_changes
+ key += '.project.%s.%s.%s' % (hostname, projectname, branchname)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 7b732c7..035d1d0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+from contextlib import contextmanager
+import logging
+import os
+import shutil
+
import git
import gitdb
-import os
-import logging
import zuul.model
@@ -38,23 +41,34 @@
raise
+@contextmanager
+def timeout_handler(path):
+ try:
+ yield
+ except git.exc.GitCommandError as e:
+ if e.status == -9:
+ # Timeout. The repo could be in a bad state, so delete it.
+ shutil.rmtree(path)
+ raise
+
+
class ZuulReference(git.Reference):
_common_path_default = "refs/zuul"
_points_to_commits_only = True
class Repo(object):
- def __init__(self, remote, local, email, username, sshkey=None,
- cache_path=None, logger=None):
+ def __init__(self, remote, local, email, username, speed_limit, speed_time,
+ sshkey=None, cache_path=None, logger=None, git_timeout=300):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
self.log = logger
- # TODO(pabelanger): Expose to user via zuul.conf.
self.env = {
- 'GIT_HTTP_LOW_SPEED_LIMIT': '1000',
- 'GIT_HTTP_LOW_SPEED_TIME': '30',
+ 'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+ 'GIT_HTTP_LOW_SPEED_TIME': speed_time,
}
+ self.git_timeout = git_timeout
if sshkey:
self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
@@ -66,7 +80,7 @@
self._initialized = False
try:
self._ensure_cloned()
- except:
+ except Exception:
self.log.exception("Unable to initialize repo for %s" % remote)
def _ensure_cloned(self):
@@ -79,12 +93,10 @@
self.log.debug("Cloning from %s to %s" % (self.remote_url,
self.local_path))
if self.cache_path:
- git.Repo.clone_from(self.cache_path, self.local_path,
- env=self.env)
+ self._git_clone(self.cache_path)
rewrite_url = True
else:
- git.Repo.clone_from(self.remote_url, self.local_path,
- env=self.env)
+ self._git_clone(self.remote_url)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
# Create local branches corresponding to all the remote branches
@@ -108,6 +120,18 @@
def isInitialized(self):
return self._initialized
+ def _git_clone(self, url):
+ mygit = git.cmd.Git(os.getcwd())
+ mygit.update_environment(**self.env)
+ with timeout_handler(self.local_path):
+ mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+ kill_after_timeout=self.git_timeout)
+
+ def _git_fetch(self, repo, remote, ref=None, **kwargs):
+ with timeout_handler(self.local_path):
+ repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
+ **kwargs)
+
def createRepoObject(self):
self._ensure_cloned()
repo = git.Repo(self.local_path)
@@ -229,19 +253,18 @@
def fetch(self, ref):
repo = self.createRepoObject()
- # The git.remote.fetch method may read in git progress info and
- # interpret it improperly causing an AssertionError. Because the
- # data was fetched properly subsequent fetches don't seem to fail.
- # So try again if an AssertionError is caught.
- origin = repo.remotes.origin
- try:
- origin.fetch(ref)
- except AssertionError:
- origin.fetch(ref)
+ # NOTE: The following is currently not applicable, but if we
+ # switch back to fetch methods from GitPython, we need to
+ # consider it:
+ # The git.remote.fetch method may read in git progress info and
+ # interpret it improperly causing an AssertionError. Because the
+ # data was fetched properly subsequent fetches don't seem to fail.
+ # So try again if an AssertionError is caught.
+ self._git_fetch(repo, 'origin', ref)
def fetchFrom(self, repository, ref):
repo = self.createRepoObject()
- repo.git.fetch(repository, ref)
+ self._git_fetch(repo, repository, ref)
def createZuulRef(self, ref, commit='HEAD'):
repo = self.createRepoObject()
@@ -258,15 +281,14 @@
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
- origin = repo.remotes.origin
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
- origin.fetch()
- origin.fetch(tags=True)
+ self._git_fetch(repo, 'origin')
+ self._git_fetch(repo, 'origin', tags=True)
def getFiles(self, files, dirs=[], branch=None, commit=None):
ret = {}
@@ -297,7 +319,7 @@
class Merger(object):
def __init__(self, working_root, connections, email, username,
- cache_root=None, logger=None):
+ speed_limit, speed_time, cache_root=None, logger=None):
self.logger = logger
if logger is None:
self.log = logging.getLogger("zuul.Merger")
@@ -310,6 +332,8 @@
self.connections = connections
self.email = email
self.username = username
+ self.speed_limit = speed_limit
+ self.speed_time = speed_time
self.cache_root = cache_root
def _addProject(self, hostname, project_name, url, sshkey):
@@ -322,8 +346,9 @@
project_name)
else:
cache_path = None
- repo = Repo(url, path, self.email, self.username,
- sshkey, cache_path, self.logger)
+ repo = Repo(
+ url, path, self.email, self.username, self.speed_limit,
+ self.speed_time, sshkey, cache_path, self.logger)
self.repos[key] = repo
except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
'/var/lib/zuul/merger-git')
merge_email = get_default(self.config, 'merger', 'git_user_email')
merge_name = get_default(self.config, 'merger', 'git_user_name')
-
- self.merger = merger.Merger(merge_root, connections, merge_email,
- merge_name)
+ speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ self.merger = merger.Merger(
+ merge_root, connections, merge_email, merge_name, speed_limit,
+ speed_time)
def start(self):
self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index c95a169..6eebbfb 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1157,7 +1157,6 @@
self.start_time = None
self.end_time = None
self.estimated_time = None
- self.pipeline = None
self.canceled = False
self.retry = False
self.parameters = {}
@@ -1169,6 +1168,10 @@
return ('<Build %s of %s on %s>' %
(self.uuid, self.job.name, self.worker))
+ @property
+ def pipeline(self):
+ return self.build_set.item.pipeline
+
def getSafeAttributes(self):
return Attributes(uuid=self.uuid,
result=self.result,
@@ -1432,7 +1435,6 @@
def addBuild(self, build):
self.current_build_set.addBuild(build)
- build.pipeline = self.pipeline
def removeBuild(self, build):
self.current_build_set.removeBuild(build)
@@ -1518,6 +1520,25 @@
def wasDequeuedNeedingChange(self):
return self.dequeued_needing_change
+ def includesConfigUpdates(self):
+ includes_trusted = False
+ includes_untrusted = False
+ tenant = self.pipeline.layout.tenant
+ item = self
+ while item:
+ if item.change.updatesConfig():
+ (trusted, project) = tenant.getProject(
+ item.change.project.canonical_name)
+ if trusted:
+ includes_trusted = True
+ else:
+ includes_untrusted = True
+ if includes_trusted and includes_untrusted:
+ # We're done early
+ return (includes_trusted, includes_untrusted)
+ item = item.item_ahead
+ return (includes_trusted, includes_untrusted)
+
def isHoldingFollowingChanges(self):
if not self.live:
return False
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
except Exception:
self.log.exception("Error unlocking node:")
- def lockNodeSet(self, nodeset):
- self._lockNodes(nodeset.getNodes())
+ def lockNodeSet(self, nodeset, request_id):
+ self._lockNodes(nodeset.getNodes(), request_id)
- def _lockNodes(self, nodes):
+ def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
+ if node.allocated_to != request_id:
+ raise Exception("Node %s allocated to %s, not %s" %
+ (node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node, timeout=30)
locked_nodes.append(node)
@@ -141,7 +144,12 @@
del self.requests[request.uid]
return False
- if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ # TODOv3(jeblair): handle allocation failure
+ if deleted:
+ self.log.debug("Resubmitting lost node request %s" % (request,))
+ request.id = None
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
self.log.info("Node request %s %s" % (request, request.state))
# Give our results to the scheduler.
@@ -150,18 +158,29 @@
# Stop watching this request node.
return False
- # TODOv3(jeblair): handle allocation failure
- elif deleted:
- self.log.debug("Resubmitting lost node request %s" % (request,))
- self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
return True
- def acceptNodes(self, request):
+ def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use.
self.log.info("Accepting node request %s" % (request,))
+ if request_id != request.id:
+ self.log.info("Skipping node accept for %s (resubmitted as %s)",
+ request_id, request.id)
+ return
+
+ # Make sure the request still exists. It's possible it could have
+ # disappeared if we lost the ZK session between when the fulfillment
+ # response was added to our queue, and when we actually get around to
+ # processing it. Nodepool will automatically reallocate the assigned
+ # nodes in that situation.
+ if not self.sched.zk.nodeRequestExists(request):
+ self.log.info("Request %s no longer exists", request.id)
+ return
+
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
try:
- self.lockNodeSet(request.nodeset)
+ self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..cfcd865 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
def __init__(self, request):
self.request = request
+ self.request_id = request.id
def toList(item):
@@ -281,31 +282,34 @@
build.result = result
try:
if self.statsd and build.pipeline:
- jobname = build.job.name.replace('.', '_')
- key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+ tenant = build.pipeline.layout.tenant
+ jobname = build.job.name.replace('.', '_').replace('/', '_')
+ hostname = (build.build_set.item.change.project.
+ canonical_hostname.replace('.', '_'))
+ projectname = (build.build_set.item.change.project.name.
+ replace('.', '_').replace('/', '_'))
+ branchname = (build.build_set.item.change.branch.
+ replace('.', '_').replace('/', '_'))
+ basekey = 'zuul.tenant.%s' % tenant.name
+ pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
+ key = '%s.all_jobs' % pipekey
self.statsd.incr(key)
- for label in build.node_labels:
- # Jenkins includes the node name in its list of labels, so
- # we filter it out here, since that is not statistically
- # interesting.
- if label == build.node_name:
- continue
- dt = int((build.start_time - build.execute_time) * 1000)
- key = 'zuul.pipeline.%s.label.%s.wait_time' % (
- build.pipeline.name, label)
- self.statsd.timing(key, dt)
- key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
- jobname, build.result)
+ jobkey = '%s.project.%s.%s.%s.job.%s' % (
+ pipekey, hostname, projectname, branchname, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.<result>
+ key = '%s.%s' % (jobkey, build.result)
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
dt = int((build.end_time - build.start_time) * 1000)
self.statsd.timing(key, dt)
self.statsd.incr(key)
-
- key = 'zuul.pipeline.%s.job.%s.wait_time' % (
- build.pipeline.name, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.wait_time
+ key = '%s.wait_time' % jobkey
dt = int((build.start_time - build.execute_time) * 1000)
self.statsd.timing(key, dt)
- except:
+ except Exception:
self.log.exception("Exception reporting runtime stats")
event = BuildCompletedEvent(build)
self.result_event_queue.put(event)
@@ -889,9 +893,10 @@
def _doNodesProvisionedEvent(self, event):
request = event.request
+ request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request_id)
if request.canceled:
return
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..2fca749 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -160,7 +160,6 @@
def callback(data, stat):
if data:
data = self._strToDict(data)
- node_request.updateFromDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
node_data = self._strToDict(node_data)
request_nodes[i].id = nodeid
request_nodes[i].updateFromDict(node_data)
+ node_request.updateFromDict(data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
@@ -187,6 +187,19 @@
except kze.NoNodeError:
pass
+ def nodeRequestExists(self, node_request):
+ '''
+ See if a NodeRequest exists in ZooKeeper.
+
+ :param NodeRequest node_request: A NodeRequest to verify.
+
+ :returns: True if the request exists, False otherwise.
+ '''
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ if self.client.exists(path):
+ return True
+ return False
+
def storeNode(self, node):
'''Store the node.