Merge "Revert "Use new infra pipelines"" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 464cb60..99817f7 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -101,6 +101,27 @@
An openssl file containing the client private key in PEM format.
+.. attr:: statsd
+
+ Information about the optional statsd server. If the ``statsd``
+ python module is installed and this section is configured,
+ statistics will be reported to statsd. See :ref:`statsd` for more
+ information.
+
+ .. attr:: server
+
+ Hostname or IP address of the statsd server.
+
+ .. attr:: port
+ :default: 8125
+
+ The UDP port on which the statsd server is listening.
+
+ .. attr:: prefix
+
+ If present, this will be prefixed to all of the keys before
+ transmitting to the statsd server.
+
.. NOTE: this is a white lie at this point, since only the scheduler
uses this, however, we expect other components to use it later, so
it's reasonable for admins to plan for this now.
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 4fed1f9..55f1908 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -3,6 +3,8 @@
Monitoring
==========
+.. _statsd:
+
Statsd reporting
----------------
@@ -13,37 +15,24 @@
Configuration
~~~~~~~~~~~~~
-Statsd support uses the statsd python module. Note that Zuul will start without
-the statsd python module, so an existing Zuul installation may be missing it.
+Statsd support uses the ``statsd`` python module. Note that support
+is optional and Zuul will start without the statsd python module
+present.
-The configuration is done via environment variables STATSD_HOST and
-STATSD_PORT. They are interpreted by the statsd module directly and there is no
-such parameter in zuul.conf yet. Your init script will have to initialize both
-of them before executing Zuul.
-
-Your init script most probably loads a configuration file named
-``/etc/default/zuul`` which would contain the environment variables::
-
- $ cat /etc/default/zuul
- STATSD_HOST=10.0.0.1
- STATSD_PORT=8125
+Configuration is in the :attr:`statsd` section of ``zuul.conf``.
Metrics
~~~~~~~
These metrics are emitted by the Zuul :ref:`scheduler`:
-.. stat:: gerrit.event.<type>
+.. stat:: zuul.event.<driver>.event.<type>
:type: counter
- Gerrit emits different kinds of messages over its `stream-events`
- interface. Zuul will report counters for each type of event it
- receives from Gerrit.
+ Zuul will report counters for each type of event it receives from
+ each of its configured drivers.
- Refer to your Gerrit installation documentation for a complete
- list of Gerrit event types.
-
-.. stat:: zuul.pipeline
+.. stat:: zuul.tenant.<tenant>.pipeline
Holds metrics specific to jobs. This hierarchy includes:
@@ -63,22 +52,60 @@
The number of items currently being processed by this
pipeline.
- .. stat:: job
+ .. stat:: project
- Subtree detailing per jobs statistics:
+ This hierarchy holds more specific metrics for each project
+ participating in the pipeline.
- .. stat:: <jobname>
+ .. stat:: <canonical_hostname>
- The triggered job name.
+ The canonical hostname for the triggering project.
+ Embedded ``.`` characters will be translated to ``_``.
- .. stat:: <result>
- :type: counter, timer
+ .. stat:: <project>
- A counter for each type of result (e.g., ``SUCCESS`` or
- ``FAILURE``, ``ERROR``, etc.) for the job. If the
- result is ``SUCCESS`` or ``FAILURE``, Zuul will
- additionally report the duration of the build as a
- timer.
+ The name of the triggering project. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: <branch>
+
+ The name of the triggering branch. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: job
+
+ Subtree detailing per-project job statistics:
+
+ .. stat:: <jobname>
+
+ The triggered job name.
+
+ .. stat:: <result>
+ :type: counter, timer
+
+ A counter for each type of result (e.g., ``SUCCESS`` or
+ ``FAILURE``, ``ERROR``, etc.) for the job. If the
+ result is ``SUCCESS`` or ``FAILURE``, Zuul will
+ additionally report the duration of the build as a
+ timer.
+
+ .. stat:: current_changes
+ :type: gauge
+
+ The number of items of this project currently being
+ processed by this pipeline.
+
+ .. stat:: resident_time
+ :type: timer
+
+ A timer metric reporting how long each item for this
+ project has been in the pipeline.
+
+ .. stat:: total_changes
+ :type: counter
+
+ The number of changes for this project processed by the
+ pipeline since Zuul started.
.. stat:: resident_time
:type: timer
@@ -98,34 +125,111 @@
How long each item spent in the pipeline before its first job
started.
- .. stat:: <project>
+.. stat:: zuul.executor.<executor>
- This hierarchy holds more specific metrics for each project
- participating in the pipeline. If the project name contains
- a ``/`` character, it will be replaced with a ``.``.
+ Holds metrics emitted by individual executors. The ``<executor>``
+ component of the key will be replaced with the hostname of the
+ executor.
- .. stat:: current_changes
- :type: gauge
+ .. stat:: builds
+ :type: counter
- The number of items of this project currently being
- processed by this pipeline.
+ Incremented each time the executor starts a build.
- .. stat:: resident_time
- :type: timer
+ .. stat:: running_builds
+ :type: gauge
- A timer metric reporting how long each item for this
- project has been in the pipeline.
+ The number of builds currently running on this executor.
- .. stat:: total_changes
- :type: counter
+ .. stat:: load_average
+ :type: gauge
- The number of changes for this project processed by the
- pipeline since Zuul started.
+ The one-minute load average of this executor, multiplied by 100.
-As an example, given a job named `myjob` triggered by the `gate` pipeline
-which took 40 seconds to build, the Zuul scheduler will emit the following
-statsd events:
+.. stat:: zuul.nodepool
- * ``zuul.pipeline.gate.job.myjob.SUCCESS`` +1
- * ``zuul.pipeline.gate.job.myjob`` 40 seconds
- * ``zuul.pipeline.gate.all_jobs`` +1
+ Holds metrics related to Zuul requests from Nodepool.
+
+ .. stat:: requested
+ :type: counter
+
+ Incremented each time a node request is submitted to Nodepool.
+
+ .. stat:: label.<label>
+ :type: counter
+
+ Incremented each time a request for a specific label is
+ submitted to Nodepool.
+
+ .. stat:: size.<size>
+ :type: counter
+
+ Incremented each time a request of a specific size is submitted
+ to Nodepool. For example, a request for 3 nodes would use the
+ key ``zuul.nodepool.requested.size.3``.
+
+ .. stat:: canceled
+ :type: counter, timer
+
+ The counter is incremented each time a node request is canceled
+ by Zuul. The timer records the elapsed time from request to
+ cancelation.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: fulfilled
+ :type: counter, timer
+
+ The counter is incremented each time a node request is fulfilled
+ by Nodepool. The timer records the elapsed time from request to
+ fulfillment.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: failed
+ :type: counter, timer
+
+ The counter is incremented each time Nodepool fails to fulfill a
+ node request. The timer records the elapsed time from request
+ to failure.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: current_requests
+ :type: gauge
+
+ The number of outstanding nodepool requests from Zuul.
+
+
+As an example, given a job named `myjob` in `mytenant` triggered by a
+change to `myproject` on the `master` branch in the `gate` pipeline
+which took 40 seconds to build, the Zuul scheduler will emit the
+following statsd events:
+
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` +1
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` 40 seconds
+ * ``zuul.tenant.mytenant.pipeline.gate.all_jobs`` +1
diff --git a/doc/source/user/config.rst b/doc/source/user/config.rst
index f55fb4f..c4014e2 100644
--- a/doc/source/user/config.rst
+++ b/doc/source/user/config.rst
@@ -952,12 +952,12 @@
A boolean value which indicates whether this job may only be
used in pipelines where :attr:`pipeline.post-review` is
- ``true``. This is automatically set to ``true`` if this job is
- defined in a :term:`untrusted-project`. It may be explicitly
- set to obtain the same behavior for jobs defined in
- :term:`config projects <config-project>`. Once this is set to
- ``true`` anywhere in the inheritance hierarchy for a job, it
- will remain set for all child jobs and variants (it can not be
+ ``true``. This is automatically set to ``true`` if this job
+ uses a :ref:`secret` and is defined in a :term:`untrusted-project`.
+ It may be explicitly set to obtain the same behavior for jobs
+ defined in :term:`config projects <config-project>`. Once this
+ is set to ``true`` anywhere in the inheritance hierarchy for a job,
+ it will remain set for all child jobs and variants (it can not be
set to ``false``).
.. _project:
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index ba7aace..76494ad 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -5,6 +5,9 @@
;ssl_cert=/path/to/client.pem
;ssl_key=/path/to/client.key
+[statsd]
+server=127.0.0.1
+
[zookeeper]
hosts=127.0.0.1:2181
diff --git a/requirements.txt b/requirements.txt
index cdffda2..4b8be3c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,12 +2,16 @@
# pull from master until https://github.com/sigmavirus24/github3.py/pull/671
# is in a release
--e git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
+git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
PyYAML>=3.1.0
Paste
WebOb>=1.2.3
paramiko>=1.8.0,<2.0.0
-GitPython>=0.3.3,<2.1.2
+# Using a local fork of gitpython until at least these changes are in a
+# release.
+# https://github.com/gitpython-developers/GitPython/pull/682
+# https://github.com/gitpython-developers/GitPython/pull/686
+git+https://github.com/jeblair/GitPython.git@zuul#egg=GitPython
python-daemon>=2.0.4,<2.1.0
extras
statsd>=1.0.0,<3.0
diff --git a/tests/base.py b/tests/base.py
index dacb1ef..035ff0c 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -20,7 +20,6 @@
import datetime
import gc
import hashlib
-import importlib
from io import StringIO
import json
import logging
@@ -48,7 +47,6 @@
import kazoo.client
import kazoo.exceptions
import pymysql
-import statsd
import testtools
import testtools.content
import testtools.content_type
@@ -1363,90 +1361,6 @@
return repos
-class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
- """An Ansible executor to be used in tests.
-
- :ivar bool hold_jobs_in_build: If true, when jobs are executed
- they will report that they have started but then pause until
- released before reporting completion. This attribute may be
- changed at any time and will take effect for subsequently
- executed builds, but previously held builds will still need to
- be explicitly released.
-
- """
- def __init__(self, *args, **kw):
- self._run_ansible = kw.pop('_run_ansible', False)
- self._test_root = kw.pop('_test_root', False)
- super(RecordingExecutorServer, self).__init__(*args, **kw)
- self.hold_jobs_in_build = False
- self.lock = threading.Lock()
- self.running_builds = []
- self.build_history = []
- self.fail_tests = {}
- self.job_builds = {}
-
- def failJob(self, name, change):
- """Instruct the executor to report matching builds as failures.
-
- :arg str name: The name of the job to fail.
- :arg Change change: The :py:class:`~tests.base.FakeChange`
- instance which should cause the job to fail. This job
- will also fail for changes depending on this change.
-
- """
- l = self.fail_tests.get(name, [])
- l.append(change)
- self.fail_tests[name] = l
-
- def release(self, regex=None):
- """Release a held build.
-
- :arg str regex: A regular expression which, if supplied, will
- cause only builds with matching names to be released. If
- not supplied, all builds will be released.
-
- """
- builds = self.running_builds[:]
- self.log.debug("Releasing build %s (%s)" % (regex,
- len(self.running_builds)))
- for build in builds:
- if not regex or re.match(regex, build.name):
- self.log.debug("Releasing build %s" %
- (build.parameters['zuul']['build']))
- build.release()
- else:
- self.log.debug("Not releasing build %s" %
- (build.parameters['zuul']['build']))
- self.log.debug("Done releasing builds %s (%s)" %
- (regex, len(self.running_builds)))
-
- def executeJob(self, job):
- build = FakeBuild(self, job)
- job.build = build
- self.running_builds.append(build)
- self.job_builds[job.unique] = build
- args = json.loads(job.arguments)
- args['zuul']['_test'] = dict(test_root=self._test_root)
- job.arguments = json.dumps(args)
- self.job_workers[job.unique] = RecordingAnsibleJob(self, job)
- self.job_workers[job.unique].run()
-
- def stopJob(self, job):
- self.log.debug("handle stop")
- parameters = json.loads(job.arguments)
- uuid = parameters['uuid']
- for build in self.running_builds:
- if build.unique == uuid:
- build.aborted = True
- build.release()
- super(RecordingExecutorServer, self).stopJob(job)
-
- def stop(self):
- for build in self.running_builds:
- build.release()
- super(RecordingExecutorServer, self).stop()
-
-
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def doMergeChanges(self, merger, items, repo_state):
# Get a merger in order to update the repos involved in this job.
@@ -1504,6 +1418,92 @@
return hosts
+class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
+ """An Ansible executor to be used in tests.
+
+ :ivar bool hold_jobs_in_build: If true, when jobs are executed
+ they will report that they have started but then pause until
+ released before reporting completion. This attribute may be
+ changed at any time and will take effect for subsequently
+ executed builds, but previously held builds will still need to
+ be explicitly released.
+
+ """
+
+ _job_class = RecordingAnsibleJob
+
+ def __init__(self, *args, **kw):
+ self._run_ansible = kw.pop('_run_ansible', False)
+ self._test_root = kw.pop('_test_root', False)
+ super(RecordingExecutorServer, self).__init__(*args, **kw)
+ self.hold_jobs_in_build = False
+ self.lock = threading.Lock()
+ self.running_builds = []
+ self.build_history = []
+ self.fail_tests = {}
+ self.job_builds = {}
+
+ def failJob(self, name, change):
+ """Instruct the executor to report matching builds as failures.
+
+ :arg str name: The name of the job to fail.
+ :arg Change change: The :py:class:`~tests.base.FakeChange`
+ instance which should cause the job to fail. This job
+ will also fail for changes depending on this change.
+
+ """
+ l = self.fail_tests.get(name, [])
+ l.append(change)
+ self.fail_tests[name] = l
+
+ def release(self, regex=None):
+ """Release a held build.
+
+ :arg str regex: A regular expression which, if supplied, will
+ cause only builds with matching names to be released. If
+ not supplied, all builds will be released.
+
+ """
+ builds = self.running_builds[:]
+ self.log.debug("Releasing build %s (%s)" % (regex,
+ len(self.running_builds)))
+ for build in builds:
+ if not regex or re.match(regex, build.name):
+ self.log.debug("Releasing build %s" %
+ (build.parameters['zuul']['build']))
+ build.release()
+ else:
+ self.log.debug("Not releasing build %s" %
+ (build.parameters['zuul']['build']))
+ self.log.debug("Done releasing builds %s (%s)" %
+ (regex, len(self.running_builds)))
+
+ def executeJob(self, job):
+ build = FakeBuild(self, job)
+ job.build = build
+ self.running_builds.append(build)
+ self.job_builds[job.unique] = build
+ args = json.loads(job.arguments)
+ args['zuul']['_test'] = dict(test_root=self._test_root)
+ job.arguments = json.dumps(args)
+ super(RecordingExecutorServer, self).executeJob(job)
+
+ def stopJob(self, job):
+ self.log.debug("handle stop")
+ parameters = json.loads(job.arguments)
+ uuid = parameters['uuid']
+ for build in self.running_builds:
+ if build.unique == uuid:
+ build.aborted = True
+ build.release()
+ super(RecordingExecutorServer, self).stopJob(job)
+
+ def stop(self):
+ for build in self.running_builds:
+ build.release()
+ super(RecordingExecutorServer, self).stop()
+
+
class FakeGearmanServer(gear.Server):
"""A Gearman server for use in tests.
@@ -2047,14 +2047,9 @@
self.config.set('executor', 'state_dir', self.executor_state_root)
self.statsd = FakeStatsd()
- # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
- # see: https://github.com/jsocol/pystatsd/issues/61
- os.environ['STATSD_HOST'] = '127.0.0.1'
- os.environ['STATSD_PORT'] = str(self.statsd.port)
+ if self.config.has_section('statsd'):
+ self.config.set('statsd', 'port', str(self.statsd.port))
self.statsd.start()
- # the statsd client object is configured in the statsd module import
- importlib.reload(statsd)
- importlib.reload(zuul.scheduler)
self.gearman_server = FakeGearmanServer(self.use_ssl)
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
new file mode 100644
index 0000000..bd46718
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+ tasks:
+ - shell: echo "Failure test {{ zuul.executor.src_root }}"
+ - shell: exit 1
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index a83f0bc..f182d8d 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -20,8 +20,19 @@
parent: base
name: job-output
+- job:
+ name: job-output-failure
+ run: playbooks/job-output
+ post-run: playbooks/job-output-failure-post
+
- project:
name: org/project
check:
jobs:
- job-output
+
+- project:
+ name: org/project2
+ check:
+ jobs:
+ - job-output-failure
diff --git a/tests/fixtures/config/job-output/git/org_project2/README b/tests/fixtures/config/job-output/git/org_project2/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project2/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
index 208e274..14b382f 100644
--- a/tests/fixtures/config/job-output/main.yaml
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -6,3 +6,4 @@
- common-config
untrusted-projects:
- org/project
+ - org/project2
diff --git a/tests/fixtures/fake_git.sh b/tests/fixtures/fake_git.sh
new file mode 100755
index 0000000..5b787b7
--- /dev/null
+++ b/tests/fixtures/fake_git.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+ clone)
+ dest=$3
+ mkdir -p $dest/.git
+ ;;
+ version)
+ echo "git version 1.0.0"
+ exit 0
+ ;;
+esac
+sleep 30
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index 7bc8c59..e6f997c 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -1,6 +1,11 @@
[gearman]
server=127.0.0.1
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
[scheduler]
tenant_config=main.yaml
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 67be273..ec30a2b 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -19,9 +19,10 @@
import os
import git
+import testtools
from zuul.merger.merger import Repo
-from tests.base import ZuulTestCase
+from tests.base import ZuulTestCase, FIXTURE_DIR
class TestMergerRepo(ZuulTestCase):
@@ -74,3 +75,28 @@
os.path.join(self.upstream_root, 'org/project2'),
sub_repo.createRepoObject().remotes[0].url,
message="Sub repository points to upstream project2")
+
+ def test_clone_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0',
+ git_timeout=0.001)
+ # TODO: have the merger and repo classes catch fewer
+ # exceptions, including this one on initialization. For the
+ # test, we try cloning again.
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo._ensure_cloned()
+
+ def test_fetch_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0')
+ work_repo.git_timeout = 0.001
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo.update()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index d51898b..d3f9ddb 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -29,6 +29,7 @@
def setUp(self):
super(TestNodepool, self).setUp()
+ self.statsd = None
self.zk_chroot_fixture = self.useFixture(
ChrootedKazooFixture(self.id()))
self.zk_config = '%s:%s%s' % (
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6ab1bcc..bab5162 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -89,25 +89,44 @@
self.assertEqual(self.getJobFromHistory('project-test2').node,
'label1')
+ for stat in self.statsd.stats:
+ k, v = stat.decode('utf-8').split(':')
+ self.log.debug('stat %s:%s', k, v)
# TODOv3(jeblair): we may want to report stats by tenant (also?).
# Per-driver
self.assertReportedStat('zuul.event.gerrit.comment-added', value='1|c')
# Per-driver per-connection
self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.current_changes',
- value='1|g')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.resident_time', kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.total_changes',
- value='1|c')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.resident_time', kind='ms')
+ 'zuul.tenant.tenant-one.pipeline.gate.current_changes',
+ value='1|g')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.total_changes', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.total_changes', value='1|c')
+ exec_key = 'zuul.executor.%s' % self.executor_server.hostname
+ self.assertReportedStat(exec_key + '.builds', value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested', value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested.label.label1',
+ value='1|c')
+ self.assertReportedStat('zuul.nodepool.fulfilled.label.label1',
+ value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested.size.1', value='1|c')
+ self.assertReportedStat('zuul.nodepool.fulfilled.size.1', value='1|c')
+ self.assertReportedStat('zuul.nodepool.current_requests', value='1|g')
for build in self.history:
self.assertTrue(build.parameters['zuul']['voting'])
@@ -2143,8 +2162,7 @@
def test_statsd(self):
"Test each of the statsd methods used in the scheduler"
- import extras
- statsd = extras.try_import('statsd.statsd')
+ statsd = self.sched.statsd
statsd.incr('test-incr')
statsd.timing('test-timing', 3)
statsd.gauge('test-gauge', 12)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 327d457..0d081d5 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,7 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
+import io
import json
+import logging
import os
import textwrap
import gc
@@ -731,6 +733,47 @@
self.assertIn('the only project definition permitted', A.messages[0],
"A should have a syntax error reported")
+ def test_untrusted_depends_on_trusted(self):
+ with open(os.path.join(FIXTURE_DIR,
+ 'config/in-repo/git/',
+ 'common-config/zuul.yaml')) as f:
+ common_config = f.read()
+
+ common_config += textwrap.dedent(
+ """
+ - job:
+ name: project-test9
+ """)
+
+ file_dict = {'zuul.yaml': common_config}
+ A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+ files=file_dict)
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test9
+ """)
+
+ file_dict = {'zuul.yaml': in_repo_conf}
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ files=file_dict)
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 1,
+ "B should report failure")
+ self.assertIn('depends on a change to a config project',
+ B.messages[0],
+ "A should have a syntax error reported")
+
def test_duplicate_node_error(self):
in_repo_conf = textwrap.dedent(
"""
@@ -1844,7 +1887,8 @@
return f.read()
def test_job_output(self):
- # Verify that command standard output appears in the job output
+ # Verify that command standard output appears in the job output,
+ # and that failures in the final playbook get logged.
# This currently only verifies we receive output from
# localhost. Notably, it does not verify we receive output
@@ -1869,3 +1913,36 @@
self.assertIn(token,
self._get_file(self.history[0],
'work/logs/job-output.txt'))
+
+ def test_job_output_failure_log(self):
+ logger = logging.getLogger('zuul.AnsibleJob')
+ output = io.StringIO()
+ logger.addHandler(logging.StreamHandler(output))
+
+ # Verify that a failure in the last post playbook emits the contents
+ # of the json output to the log
+ self.executor_server.keep_jobdir = True
+ A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='job-output-failure',
+ result='POST_FAILURE', changes='1,1'),
+ ], ordered=False)
+
+ token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+ j = json.loads(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertEqual(token,
+ j[0]['plays'][0]['tasks'][0]
+ ['hosts']['localhost']['stdout'])
+
+ print(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertIn(token,
+ self._get_file(self.history[0],
+ 'work/logs/job-output.txt'))
+
+ log_output = output.getvalue()
+ self.assertIn('Final playbook failed', log_output)
+ self.assertIn('Failure test', log_output)
diff --git a/tox.ini b/tox.ini
index cc5ea58..7e84677 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,10 +5,7 @@
[testenv]
basepython = python3
-# Set STATSD env variables so that statsd code paths are tested.
-setenv = STATSD_HOST=127.0.0.1
- STATSD_PORT=8125
- VIRTUAL_ENV={envdir}
+setenv = VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=120
passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE OS_LOG_DEFAULTS
usedevelop = True
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index bba1922..d920d8e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -29,6 +29,7 @@
import zuul.cmd
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd_config
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
@@ -97,8 +98,14 @@
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
import zuul.lib.gearserver
- statsd_host = os.environ.get('STATSD_HOST')
- statsd_port = int(os.environ.get('STATSD_PORT', 8125))
+
+ (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(
+ self.config)
+ if statsd_prefix:
+ statsd_prefix += '.zuul.geard'
+ else:
+ statsd_prefix = 'zuul.geard'
+
host = get_default(self.config, 'gearman_server', 'listen_address')
port = int(get_default(self.config, 'gearman_server', 'port',
4730))
@@ -112,7 +119,7 @@
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
- statsd_prefix='zuul.geard')
+ statsd_prefix=statsd_prefix)
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 4fb49e3..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -14,8 +14,6 @@
import abc
-import extras
-
class BaseConnection(object, metaclass=abc.ABCMeta):
"""Base class for connections.
@@ -42,7 +40,6 @@
self.driver = driver
self.connection_name = connection_name
self.connection_config = connection_config
- self.statsd = extras.try_import('statsd.statsd')
def logEvent(self, event):
self.log.debug(
@@ -50,11 +47,11 @@
connection=self.connection_name,
event=event))
try:
- if self.statsd:
- self.statsd.incr(
+ if self.sched.statsd:
+ self.sched.statsd.incr(
'zuul.event.{driver}.{event}'.format(
driver=self.driver.name, event=event.type))
- self.statsd.incr(
+ self.sched.statsd.incr(
'zuul.event.{driver}.{connection}.{event}'.format(
driver=self.driver.name,
connection=self.connection_name,
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 459a554..d4e6736 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -29,6 +29,7 @@
import traceback
from zuul.lib.yamlutil import yaml
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
try:
import ara.plugins.callbacks as ara_callbacks
@@ -500,412 +501,6 @@
return inventory
-class ExecutorMergeWorker(gear.TextWorker):
- def __init__(self, executor_server, *args, **kw):
- self.zuul_executor_server = executor_server
- super(ExecutorMergeWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- # Wait until the update queue is empty before responding
- while self.zuul_executor_server.update_queue.qsize():
- time.sleep(1)
-
- with self.zuul_executor_server.merger_lock:
- super(ExecutorMergeWorker, self).handleNoop(packet)
-
-
-class ExecutorExecuteWorker(gear.TextWorker):
- def __init__(self, executor_server, *args, **kw):
- self.zuul_executor_server = executor_server
- super(ExecutorExecuteWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- # Delay our response to running a new job based on the number
- # of jobs we're currently running, in an attempt to spread
- # load evenly among executors.
- workers = len(self.zuul_executor_server.job_workers)
- delay = (workers ** 2) / 1000.0
- time.sleep(delay)
- return super(ExecutorExecuteWorker, self).handleNoop(packet)
-
-
-class ExecutorServer(object):
- log = logging.getLogger("zuul.ExecutorServer")
-
- def __init__(self, config, connections={}, jobdir_root=None,
- keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
- self.config = config
- self.keep_jobdir = keep_jobdir
- self.jobdir_root = jobdir_root
- # TODOv3(mordred): make the executor name more unique --
- # perhaps hostname+pid.
- self.hostname = socket.gethostname()
- self.log_streaming_port = log_streaming_port
- self.merger_lock = threading.Lock()
- self.verbose = False
- self.command_map = dict(
- stop=self.stop,
- pause=self.pause,
- unpause=self.unpause,
- graceful=self.graceful,
- verbose=self.verboseOn,
- unverbose=self.verboseOff,
- keep=self.keep,
- nokeep=self.nokeep,
- )
-
- self.merge_root = get_default(self.config, 'executor', 'git_dir',
- '/var/lib/zuul/executor-git')
- self.default_username = get_default(self.config, 'executor',
- 'default_username', 'zuul')
- self.disk_limit_per_job = int(get_default(self.config, 'executor',
- 'disk_limit_per_job', 250))
- self.merge_email = get_default(self.config, 'merger', 'git_user_email')
- self.merge_name = get_default(self.config, 'merger', 'git_user_name')
- self.merge_speed_limit = get_default(
- config, 'merger', 'git_http_low_speed_limit', '1000')
- self.merge_speed_time = get_default(
- config, 'merger', 'git_http_low_speed_time', '30')
- execution_wrapper_name = get_default(self.config, 'executor',
- 'execution_wrapper', 'bubblewrap')
- load_multiplier = float(get_default(self.config, 'executor',
- 'load_multiplier', '2.5'))
- self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
- self.accepting_work = False
- self.execution_wrapper = connections.drivers[execution_wrapper_name]
-
- self.connections = connections
- # This merger and its git repos are used to maintain
- # up-to-date copies of all the repos that are used by jobs, as
- # well as to support the merger:cat functon to supply
- # configuration information to Zuul when it starts.
- self.merger = self._getMerger(self.merge_root)
- self.update_queue = DeduplicateQueue()
-
- state_dir = get_default(self.config, 'executor', 'state_dir',
- '/var/lib/zuul', expand_user=True)
- path = os.path.join(state_dir, 'executor.socket')
- self.command_socket = commandsocket.CommandSocket(path)
- ansible_dir = os.path.join(state_dir, 'ansible')
- self.ansible_dir = ansible_dir
- if os.path.exists(ansible_dir):
- shutil.rmtree(ansible_dir)
-
- zuul_dir = os.path.join(ansible_dir, 'zuul')
- plugin_dir = os.path.join(zuul_dir, 'ansible')
-
- os.makedirs(plugin_dir, mode=0o0755)
-
- self.library_dir = os.path.join(plugin_dir, 'library')
- self.action_dir = os.path.join(plugin_dir, 'action')
- self.callback_dir = os.path.join(plugin_dir, 'callback')
- self.lookup_dir = os.path.join(plugin_dir, 'lookup')
- self.filter_dir = os.path.join(plugin_dir, 'filter')
-
- _copy_ansible_files(zuul.ansible, plugin_dir)
-
- # We're copying zuul.ansible.* into a directory we are going
- # to add to pythonpath, so our plugins can "import
- # zuul.ansible". But we're not installing all of zuul, so
- # create a __init__.py file for the stub "zuul" module.
- with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
- pass
-
- self.job_workers = {}
- self.disk_accountant = DiskAccountant(self.jobdir_root,
- self.disk_limit_per_job,
- self.stopJobDiskFull,
- self.merge_root)
-
- def _getMerger(self, root, logger=None):
- if root != self.merge_root:
- cache_root = self.merge_root
- else:
- cache_root = None
- return zuul.merger.merger.Merger(
- root, self.connections, self.merge_email, self.merge_name,
- self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
-
- def start(self):
- self._running = True
- self._command_running = True
- server = self.config.get('gearman', 'server')
- port = get_default(self.config, 'gearman', 'port', 4730)
- ssl_key = get_default(self.config, 'gearman', 'ssl_key')
- ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
- ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
- self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
- self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.executor_worker = ExecutorExecuteWorker(
- self, 'Zuul Executor Server')
- self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.log.debug("Waiting for server")
- self.merger_worker.waitForServer()
- self.executor_worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- self.log.debug("Starting command processor")
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
- self.command_thread.daemon = True
- self.command_thread.start()
-
- self.log.debug("Starting worker")
- self.update_thread = threading.Thread(target=self._updateLoop)
- self.update_thread.daemon = True
- self.update_thread.start()
- self.merger_thread = threading.Thread(target=self.run_merger)
- self.merger_thread.daemon = True
- self.merger_thread.start()
- self.executor_thread = threading.Thread(target=self.run_executor)
- self.executor_thread.daemon = True
- self.executor_thread.start()
- self.governor_stop_event = threading.Event()
- self.governor_thread = threading.Thread(target=self.run_governor)
- self.governor_thread.daemon = True
- self.governor_thread.start()
- self.disk_accountant.start()
-
- def register(self):
- self.register_work()
- self.executor_worker.registerFunction("executor:stop:%s" %
- self.hostname)
- self.merger_worker.registerFunction("merger:merge")
- self.merger_worker.registerFunction("merger:cat")
- self.merger_worker.registerFunction("merger:refstate")
-
- def register_work(self):
- self.accepting_work = True
- self.executor_worker.registerFunction("executor:execute")
-
- def unregister_work(self):
- self.accepting_work = False
- self.executor_worker.unregisterFunction("executor:execute")
-
- def stop(self):
- self.log.debug("Stopping")
- self.disk_accountant.stop()
- self.governor_stop_event.set()
- self._running = False
- self._command_running = False
- self.command_socket.stop()
- self.update_queue.put(None)
-
- for job_worker in list(self.job_workers.values()):
- try:
- job_worker.stop()
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
- self.merger_worker.shutdown()
- self.executor_worker.shutdown()
- self.log.debug("Stopped")
-
- def pause(self):
- # TODOv3: implement
- pass
-
- def unpause(self):
- # TODOv3: implement
- pass
-
- def graceful(self):
- # TODOv3: implement
- pass
-
- def verboseOn(self):
- self.verbose = True
-
- def verboseOff(self):
- self.verbose = False
-
- def keep(self):
- self.keep_jobdir = True
-
- def nokeep(self):
- self.keep_jobdir = False
-
- def join(self):
- self.update_thread.join()
- self.merger_thread.join()
- self.executor_thread.join()
- self.governor_thread.join()
-
- def runCommand(self):
- while self._command_running:
- try:
- command = self.command_socket.get().decode('utf8')
- if command != '_stop':
- self.command_map[command]()
- except Exception:
- self.log.exception("Exception while processing command")
-
- def _updateLoop(self):
- while self._running:
- try:
- self._innerUpdateLoop()
- except:
- self.log.exception("Exception in update thread:")
-
- def _innerUpdateLoop(self):
- # Inside of a loop that keeps the main repositories up to date
- task = self.update_queue.get()
- if task is None:
- # We are asked to stop
- return
- with self.merger_lock:
- self.log.info("Updating repo %s/%s" % (
- task.connection_name, task.project_name))
- self.merger.updateRepo(task.connection_name, task.project_name)
- self.log.debug("Finished updating repo %s/%s" %
- (task.connection_name, task.project_name))
- task.setComplete()
-
- def update(self, connection_name, project_name):
- # Update a repository in the main merger
- task = UpdateTask(connection_name, project_name)
- task = self.update_queue.put(task)
- return task
-
- def run_merger(self):
- self.log.debug("Starting merger listener")
- while self._running:
- try:
- job = self.merger_worker.getJob()
- try:
- if job.name == 'merger:cat':
- self.log.debug("Got cat job: %s" % job.unique)
- self.cat(job)
- elif job.name == 'merger:merge':
- self.log.debug("Got merge job: %s" % job.unique)
- self.merge(job)
- elif job.name == 'merger:refstate':
- self.log.debug("Got refstate job: %s" % job.unique)
- self.refstate(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def run_executor(self):
- self.log.debug("Starting executor listener")
- while self._running:
- try:
- job = self.executor_worker.getJob()
- try:
- if job.name == 'executor:execute':
- self.log.debug("Got execute job: %s" % job.unique)
- self.executeJob(job)
- elif job.name.startswith('executor:stop'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def run_governor(self):
- while not self.governor_stop_event.wait(30):
- self.manageLoad()
-
- def executeJob(self, job):
- self.job_workers[job.unique] = AnsibleJob(self, job)
- self.job_workers[job.unique].run()
-
- def manageLoad(self):
- ''' Apply some heuristics to decide whether or not we should
- be askign for more jobs '''
- load_avg = os.getloadavg()[0]
- if self.accepting_work:
- # Don't unregister if we don't have any active jobs.
- if load_avg > self.max_load_avg and self.job_workers:
- self.log.info(
- "Unregistering due to high system load {} > {}".format(
- load_avg, self.max_load_avg))
- self.unregister_work()
- elif load_avg <= self.max_load_avg:
- self.log.info(
- "Re-registering as load is within limits {} <= {}".format(
- load_avg, self.max_load_avg))
- self.register_work()
-
- def finishJob(self, unique):
- del(self.job_workers[unique])
-
- def stopJobDiskFull(self, jobdir):
- unique = os.path.basename(jobdir)
- self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
-
- def stopJob(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Stop job with arguments: %s" % (args,))
- unique = args['uuid']
- self.stopJobByUnique(unique)
- finally:
- job.sendWorkComplete()
-
- def stopJobByUnique(self, unique, reason=None):
- job_worker = self.job_workers.get(unique)
- if not job_worker:
- self.log.debug("Unable to find worker for job %s" % (unique,))
- return
- try:
- job_worker.stop(reason)
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
-
- def cat(self, job):
- args = json.loads(job.arguments)
- task = self.update(args['connection'], args['project'])
- task.wait()
- with self.merger_lock:
- files = self.merger.getFiles(args['connection'], args['project'],
- args['branch'], args['files'],
- args.get('dirs', []))
- result = dict(updated=True,
- files=files)
- job.sendWorkComplete(json.dumps(result))
-
- def refstate(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- success, repo_state = self.merger.getRepoState(args['items'])
- result = dict(updated=success,
- repo_state=repo_state)
- job.sendWorkComplete(json.dumps(result))
-
- def merge(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- ret = self.merger.mergeChanges(args['items'], args.get('files'),
- args.get('dirs', []),
- args.get('repo_state'))
- result = dict(merged=(ret is not None))
- if ret is None:
- result['commit'] = result['files'] = result['repo_state'] = None
- else:
- (result['commit'], result['files'], result['repo_state'],
- recent) = ret
- job.sendWorkComplete(json.dumps(result))
-
-
class AnsibleJobLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
@@ -1225,12 +820,41 @@
post_status, post_code = self.runAnsiblePlaybook(
playbook, args['timeout'], success, phase='post', index=index)
if post_status != self.RESULT_NORMAL or post_code != 0:
+ success = False
# If we encountered a pre-failure, that takes
# precedence over the post result.
if not pre_failed:
result = 'POST_FAILURE'
+ if (index + 1) == len(self.jobdir.post_playbooks):
+ self._logFinalPlaybookError()
+
return result
+ def _logFinalPlaybookError(self):
+ # Failures in the final post playbook can include failures
+ # uploading logs, which makes diagnosing issues difficult.
+ # Grab the output from the last playbook from the json
+ # file and log it.
+ json_output = self.jobdir.job_output_file.replace('txt', 'json')
+ self.log.debug("Final playbook failed")
+ if not os.path.exists(json_output):
+ self.log.debug("JSON logfile {logfile} is missing".format(
+ logfile=json_output))
+ return
+ try:
+ output = json.load(open(json_output, 'r'))
+ last_playbook = output[-1]
+ # Transform json to yaml - because it's easier to read and given
+ # the size of the data it'll be extra-hard to read this as an
+ # all on one line stringified nested dict.
+ yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
+ for line in yaml_out.split('\n'):
+ self.log.debug(line)
+ except Exception:
+ self.log.exception(
+ "Could not decode json from {logfile}".format(
+ logfile=json_output))
+
def getHostList(self, args):
hosts = []
for node in args['nodes']:
@@ -1704,7 +1328,7 @@
# Received abort request.
return (self.RESULT_ABORTED, None)
elif ret == 1:
- if syntax_buffer[0].startswith('ERROR!'):
+ if syntax_buffer[0].startswith(b'ERROR!'):
with open(self.jobdir.job_output_file, 'a') as job_output:
for line in syntax_buffer:
job_output.write("{now} | {line}\n".format(
@@ -1732,7 +1356,7 @@
now=datetime.datetime.now()))
found_marker = False
for line in syntax_buffer:
- if line.startswith('ERROR! Unexpected Exception'):
+ if line.startswith(b'ERROR! Unexpected Exception'):
found_marker = True
if not found_marker:
continue
@@ -1827,3 +1451,424 @@
self.emitPlaybookBanner(playbook, 'END', phase, result=result)
return result, code
+
+
+class ExecutorMergeWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Wait until the update queue is empty before responding
+ while self.zuul_executor_server.update_queue.qsize():
+ time.sleep(1)
+
+ with self.zuul_executor_server.merger_lock:
+ super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
+class ExecutorExecuteWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorExecuteWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Delay our response to running a new job based on the number
+ # of jobs we're currently running, in an attempt to spread
+ # load evenly among executors.
+ workers = len(self.zuul_executor_server.job_workers)
+ delay = (workers ** 2) / 1000.0
+ time.sleep(delay)
+ return super(ExecutorExecuteWorker, self).handleNoop(packet)
+
+
+class ExecutorServer(object):
+ log = logging.getLogger("zuul.ExecutorServer")
+ _job_class = AnsibleJob
+
+ def __init__(self, config, connections={}, jobdir_root=None,
+ keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
+ self.config = config
+ self.keep_jobdir = keep_jobdir
+ self.jobdir_root = jobdir_root
+ # TODOv3(mordred): make the executor name more unique --
+ # perhaps hostname+pid.
+ self.hostname = socket.gethostname()
+ self.log_streaming_port = log_streaming_port
+ self.merger_lock = threading.Lock()
+ self.verbose = False
+ self.command_map = dict(
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ graceful=self.graceful,
+ verbose=self.verboseOn,
+ unverbose=self.verboseOff,
+ keep=self.keep,
+ nokeep=self.nokeep,
+ )
+
+ self.statsd = get_statsd(config)
+ self.merge_root = get_default(self.config, 'executor', 'git_dir',
+ '/var/lib/zuul/executor-git')
+ self.default_username = get_default(self.config, 'executor',
+ 'default_username', 'zuul')
+ self.disk_limit_per_job = int(get_default(self.config, 'executor',
+ 'disk_limit_per_job', 250))
+ self.merge_email = get_default(self.config, 'merger', 'git_user_email')
+ self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+ self.merge_speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ self.merge_speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ execution_wrapper_name = get_default(self.config, 'executor',
+ 'execution_wrapper', 'bubblewrap')
+ load_multiplier = float(get_default(self.config, 'executor',
+ 'load_multiplier', '2.5'))
+ self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.accepting_work = False
+ self.execution_wrapper = connections.drivers[execution_wrapper_name]
+
+ self.connections = connections
+ # This merger and its git repos are used to maintain
+ # up-to-date copies of all the repos that are used by jobs, as
+ # well as to support the merger:cat functon to supply
+ # configuration information to Zuul when it starts.
+ self.merger = self._getMerger(self.merge_root)
+ self.update_queue = DeduplicateQueue()
+
+ state_dir = get_default(self.config, 'executor', 'state_dir',
+ '/var/lib/zuul', expand_user=True)
+ path = os.path.join(state_dir, 'executor.socket')
+ self.command_socket = commandsocket.CommandSocket(path)
+ ansible_dir = os.path.join(state_dir, 'ansible')
+ self.ansible_dir = ansible_dir
+ if os.path.exists(ansible_dir):
+ shutil.rmtree(ansible_dir)
+
+ zuul_dir = os.path.join(ansible_dir, 'zuul')
+ plugin_dir = os.path.join(zuul_dir, 'ansible')
+
+ os.makedirs(plugin_dir, mode=0o0755)
+
+ self.library_dir = os.path.join(plugin_dir, 'library')
+ self.action_dir = os.path.join(plugin_dir, 'action')
+ self.callback_dir = os.path.join(plugin_dir, 'callback')
+ self.lookup_dir = os.path.join(plugin_dir, 'lookup')
+ self.filter_dir = os.path.join(plugin_dir, 'filter')
+
+ _copy_ansible_files(zuul.ansible, plugin_dir)
+
+ # We're copying zuul.ansible.* into a directory we are going
+ # to add to pythonpath, so our plugins can "import
+ # zuul.ansible". But we're not installing all of zuul, so
+ # create a __init__.py file for the stub "zuul" module.
+ with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
+ pass
+
+ self.job_workers = {}
+ self.disk_accountant = DiskAccountant(self.jobdir_root,
+ self.disk_limit_per_job,
+ self.stopJobDiskFull,
+ self.merge_root)
+
+ def _getMerger(self, root, logger=None):
+ if root != self.merge_root:
+ cache_root = self.merge_root
+ else:
+ cache_root = None
+ return zuul.merger.merger.Merger(
+ root, self.connections, self.merge_email, self.merge_name,
+ self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
+
+ def start(self):
+ self._running = True
+ self._command_running = True
+ server = self.config.get('gearman', 'server')
+ port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+ self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+ self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.executor_worker = ExecutorExecuteWorker(
+ self, 'Zuul Executor Server')
+ self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.log.debug("Waiting for server")
+ self.merger_worker.waitForServer()
+ self.executor_worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand)
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
+ self.log.debug("Starting worker")
+ self.update_thread = threading.Thread(target=self._updateLoop)
+ self.update_thread.daemon = True
+ self.update_thread.start()
+ self.merger_thread = threading.Thread(target=self.run_merger)
+ self.merger_thread.daemon = True
+ self.merger_thread.start()
+ self.executor_thread = threading.Thread(target=self.run_executor)
+ self.executor_thread.daemon = True
+ self.executor_thread.start()
+ self.governor_stop_event = threading.Event()
+ self.governor_thread = threading.Thread(target=self.run_governor)
+ self.governor_thread.daemon = True
+ self.governor_thread.start()
+ self.disk_accountant.start()
+
+ def register(self):
+ self.register_work()
+ self.executor_worker.registerFunction("executor:stop:%s" %
+ self.hostname)
+ self.merger_worker.registerFunction("merger:merge")
+ self.merger_worker.registerFunction("merger:cat")
+ self.merger_worker.registerFunction("merger:refstate")
+
+ def register_work(self):
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
+
+ def unregister_work(self):
+ self.accepting_work = False
+ self.executor_worker.unregisterFunction("executor:execute")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self.disk_accountant.stop()
+ self.governor_stop_event.set()
+ self._running = False
+ self._command_running = False
+ self.command_socket.stop()
+ self.update_queue.put(None)
+
+ for job_worker in list(self.job_workers.values()):
+ try:
+ job_worker.stop()
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+ self.merger_worker.shutdown()
+ self.executor_worker.shutdown()
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.gauge(base_key + '.load_average', 0)
+ self.statsd.gauge(base_key + '.running_builds', 0)
+ self.log.debug("Stopped")
+
+ def pause(self):
+ # TODOv3: implement
+ pass
+
+ def unpause(self):
+ # TODOv3: implement
+ pass
+
+ def graceful(self):
+ # TODOv3: implement
+ pass
+
+ def verboseOn(self):
+ self.verbose = True
+
+ def verboseOff(self):
+ self.verbose = False
+
+ def keep(self):
+ self.keep_jobdir = True
+
+ def nokeep(self):
+ self.keep_jobdir = False
+
+ def join(self):
+ self.update_thread.join()
+ self.merger_thread.join()
+ self.executor_thread.join()
+ self.governor_thread.join()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
+
+ def _updateLoop(self):
+ while self._running:
+ try:
+ self._innerUpdateLoop()
+ except:
+ self.log.exception("Exception in update thread:")
+
+ def _innerUpdateLoop(self):
+ # Inside of a loop that keeps the main repositories up to date
+ task = self.update_queue.get()
+ if task is None:
+ # We are asked to stop
+ return
+ with self.merger_lock:
+ self.log.info("Updating repo %s/%s" % (
+ task.connection_name, task.project_name))
+ self.merger.updateRepo(task.connection_name, task.project_name)
+ self.log.debug("Finished updating repo %s/%s" %
+ (task.connection_name, task.project_name))
+ task.setComplete()
+
+ def update(self, connection_name, project_name):
+ # Update a repository in the main merger
+ task = UpdateTask(connection_name, project_name)
+ task = self.update_queue.put(task)
+ return task
+
+ def run_merger(self):
+ self.log.debug("Starting merger listener")
+ while self._running:
+ try:
+ job = self.merger_worker.getJob()
+ try:
+ if job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ elif job.name == 'merger:merge':
+ self.log.debug("Got merge job: %s" % job.unique)
+ self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def run_executor(self):
+ self.log.debug("Starting executor listener")
+ while self._running:
+ try:
+ job = self.executor_worker.getJob()
+ try:
+ if job.name == 'executor:execute':
+ self.log.debug("Got execute job: %s" % job.unique)
+ self.executeJob(job)
+ elif job.name.startswith('executor:stop'):
+ self.log.debug("Got stop job: %s" % job.unique)
+ self.stopJob(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ self.manageLoad()
+
+ def executeJob(self, job):
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.incr(base_key + '.builds')
+ self.job_workers[job.unique] = self._job_class(self, job)
+ self.job_workers[job.unique].run()
+
+ def manageLoad(self):
+ ''' Apply some heuristics to decide whether or not we should
+ be askign for more jobs '''
+ load_avg = os.getloadavg()[0]
+ if self.accepting_work:
+ # Don't unregister if we don't have any active jobs.
+ if load_avg > self.max_load_avg and self.job_workers:
+ self.log.info(
+ "Unregistering due to high system load {} > {}".format(
+ load_avg, self.max_load_avg))
+ self.unregister_work()
+ elif load_avg <= self.max_load_avg:
+ self.log.info(
+ "Re-registering as load is within limits {} <= {}".format(
+ load_avg, self.max_load_avg))
+ self.register_work()
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.gauge(base_key + '.load_average',
+ int(load_avg * 100))
+ self.statsd.gauge(base_key + '.running_builds',
+ len(self.job_workers))
+
+ def finishJob(self, unique):
+ del(self.job_workers[unique])
+
+ def stopJobDiskFull(self, jobdir):
+ unique = os.path.basename(jobdir)
+ self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
+
+ def stopJob(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Stop job with arguments: %s" % (args,))
+ unique = args['uuid']
+ self.stopJobByUnique(unique)
+ finally:
+ job.sendWorkComplete()
+
+ def stopJobByUnique(self, unique, reason=None):
+ job_worker = self.job_workers.get(unique)
+ if not job_worker:
+ self.log.debug("Unable to find worker for job %s" % (unique,))
+ return
+ try:
+ job_worker.stop(reason)
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+
+ def cat(self, job):
+ args = json.loads(job.arguments)
+ task = self.update(args['connection'], args['project'])
+ task.wait()
+ with self.merger_lock:
+ files = self.merger.getFiles(args['connection'], args['project'],
+ args['branch'], args['files'],
+ args.get('dirs', []))
+ result = dict(updated=True,
+ files=files)
+ job.sendWorkComplete(json.dumps(result))
+
+ def refstate(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ success, repo_state = self.merger.getRepoState(args['items'])
+ result = dict(updated=success,
+ repo_state=repo_state)
+ job.sendWorkComplete(json.dumps(result))
+
+ def merge(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ ret = self.merger.mergeChanges(args['items'], args.get('files'),
+ args.get('dirs', []),
+ args.get('repo_state'))
+ result = dict(merged=(ret is not None))
+ if ret is None:
+ result['commit'] = result['files'] = result['repo_state'] = None
+ else:
+ (result['commit'], result['files'], result['repo_state'],
+ recent) = ret
+ job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/lib/statsd.py b/zuul/lib/statsd.py
new file mode 100644
index 0000000..6c74f32
--- /dev/null
+++ b/zuul/lib/statsd.py
@@ -0,0 +1,33 @@
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import extras
+from zuul.lib.config import get_default
+
+
+def get_statsd_config(config):
+ statsd_host = get_default(config, 'statsd', 'server')
+ statsd_port = int(get_default(config, 'statsd', 'port', 8125))
+ statsd_prefix = get_default(config, 'statsd', 'prefix')
+ return (statsd_host, statsd_port, statsd_prefix)
+
+
+def get_statsd(config):
+ statsd = extras.try_import('statsd')
+ if statsd is None:
+ return None
+ (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(config)
+ if statsd_host is None:
+ return None
+ return statsd.StatsClient(statsd_host, statsd_port, statsd_prefix)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0c3d123..edea69c 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -11,6 +11,7 @@
# under the License.
import logging
+import textwrap
from zuul import exceptions
from zuul import model
@@ -430,6 +431,7 @@
self.log.debug("Loading dynamic layout")
(trusted_updates, untrusted_updates) = item.includesConfigUpdates()
build_set = item.current_build_set
+ trusted_layout_verified = False
try:
# First parse the config as it will land with the
# full set of config and project repos. This lets us
@@ -443,6 +445,7 @@
include_config_projects=True,
scheduler=self.sched,
connections=self.sched.connections)
+ trusted_layout_verified = True
# Then create the config a second time but without changes
# to config repos so that we actually use this config.
@@ -462,9 +465,23 @@
return item.queue.pipeline.layout
self.log.debug("Loading dynamic layout complete")
except zuul.configloader.ConfigurationSyntaxError as e:
- self.log.info("Configuration syntax error "
- "in dynamic layout")
- item.setConfigError(str(e))
+ self.log.info("Configuration syntax error in dynamic layout")
+ if trusted_layout_verified:
+ # The config is good if we include config-projects,
+ # but is currently invalid if we omit them. Instead
+ # of returning the whole error message, just leave a
+ # note that the config will work once the dependent
+ # changes land.
+ msg = "This change depends on a change "\
+ "to a config project.\n\n"
+ msg += textwrap.fill(textwrap.dedent("""\
+ The syntax of the configuration in this change has
+ been verified to be correct once the config project
+ change upon which it depends is merged, but it can not
+ be used until that occurs."""))
+ item.setConfigError(msg)
+ else:
+ item.setConfigError(str(e))
return None
except Exception:
self.log.exception("Error in dynamic layout")
@@ -820,19 +837,28 @@
dt = None
items = len(self.pipeline.getAllItems())
- # stats.timers.zuul.pipeline.NAME.resident_time
- # stats_counts.zuul.pipeline.NAME.total_changes
- # stats.gauges.zuul.pipeline.NAME.current_changes
- key = 'zuul.pipeline.%s' % self.pipeline.name
+ tenant = self.pipeline.layout.tenant
+ basekey = 'zuul.tenant.%s' % tenant.name
+ key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.total_changes
+ # stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
self.sched.statsd.gauge(key + '.current_changes', items)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
- # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
- project_name = item.change.project.name.replace('/', '.')
- key += '.%s' % project_name
+ hostname = (item.change.project.canonical_hostname.
+ replace('.', '_'))
+ projectname = (item.change.project.name.
+ replace('.', '_').replace('/', '.'))
+ projectname = projectname.replace('.', '_').replace('/', '.')
+ branchname = item.change.branch.replace('.', '_').replace('/', '.')
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.total_changes
+ key += '.project.%s.%s.%s' % (hostname, projectname, branchname)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index fbacbee..035d1d0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+from contextlib import contextmanager
+import logging
+import os
+import shutil
+
import git
import gitdb
-import os
-import logging
import zuul.model
@@ -38,6 +41,17 @@
raise
+@contextmanager
+def timeout_handler(path):
+ try:
+ yield
+ except git.exc.GitCommandError as e:
+ if e.status == -9:
+ # Timeout. The repo could be in a bad state, so delete it.
+ shutil.rmtree(path)
+ raise
+
+
class ZuulReference(git.Reference):
_common_path_default = "refs/zuul"
_points_to_commits_only = True
@@ -45,7 +59,7 @@
class Repo(object):
def __init__(self, remote, local, email, username, speed_limit, speed_time,
- sshkey=None, cache_path=None, logger=None):
+ sshkey=None, cache_path=None, logger=None, git_timeout=300):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
@@ -54,6 +68,7 @@
'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
'GIT_HTTP_LOW_SPEED_TIME': speed_time,
}
+ self.git_timeout = git_timeout
if sshkey:
self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
@@ -65,7 +80,7 @@
self._initialized = False
try:
self._ensure_cloned()
- except:
+ except Exception:
self.log.exception("Unable to initialize repo for %s" % remote)
def _ensure_cloned(self):
@@ -78,12 +93,10 @@
self.log.debug("Cloning from %s to %s" % (self.remote_url,
self.local_path))
if self.cache_path:
- git.Repo.clone_from(self.cache_path, self.local_path,
- env=self.env)
+ self._git_clone(self.cache_path)
rewrite_url = True
else:
- git.Repo.clone_from(self.remote_url, self.local_path,
- env=self.env)
+ self._git_clone(self.remote_url)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
# Create local branches corresponding to all the remote branches
@@ -107,6 +120,18 @@
def isInitialized(self):
return self._initialized
+ def _git_clone(self, url):
+ mygit = git.cmd.Git(os.getcwd())
+ mygit.update_environment(**self.env)
+ with timeout_handler(self.local_path):
+ mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+ kill_after_timeout=self.git_timeout)
+
+ def _git_fetch(self, repo, remote, ref=None, **kwargs):
+ with timeout_handler(self.local_path):
+ repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
+ **kwargs)
+
def createRepoObject(self):
self._ensure_cloned()
repo = git.Repo(self.local_path)
@@ -228,19 +253,18 @@
def fetch(self, ref):
repo = self.createRepoObject()
- # The git.remote.fetch method may read in git progress info and
- # interpret it improperly causing an AssertionError. Because the
- # data was fetched properly subsequent fetches don't seem to fail.
- # So try again if an AssertionError is caught.
- origin = repo.remotes.origin
- try:
- origin.fetch(ref)
- except AssertionError:
- origin.fetch(ref)
+ # NOTE: The following is currently not applicable, but if we
+ # switch back to fetch methods from GitPython, we need to
+ # consider it:
+ # The git.remote.fetch method may read in git progress info and
+ # interpret it improperly causing an AssertionError. Because the
+ # data was fetched properly subsequent fetches don't seem to fail.
+ # So try again if an AssertionError is caught.
+ self._git_fetch(repo, 'origin', ref)
def fetchFrom(self, repository, ref):
repo = self.createRepoObject()
- repo.git.fetch(repository, ref)
+ self._git_fetch(repo, repository, ref)
def createZuulRef(self, ref, commit='HEAD'):
repo = self.createRepoObject()
@@ -257,15 +281,14 @@
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
- origin = repo.remotes.origin
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
- origin.fetch()
- origin.fetch(tags=True)
+ self._git_fetch(repo, 'origin')
+ self._git_fetch(repo, 'origin', tags=True)
def getFiles(self, files, dirs=[], branch=None, commit=None):
ret = {}
diff --git a/zuul/model.py b/zuul/model.py
index 6eebbfb..b7c3031 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -523,6 +523,7 @@
self.job = job
self.nodeset = nodeset
self._state = STATE_REQUESTED
+ self.requested_time = time.time()
self.state_time = time.time()
self.stat = None
self.uid = uuid4().hex
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index f4c850d..7dafca0 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -22,6 +22,35 @@
self.requests = {}
self.sched = scheduler
+ def emitStats(self, request):
+ if not self.sched.statsd:
+ return
+ statsd = self.sched.statsd
+ # counter zuul.nodepool.requested
+ # counter zuul.nodepool.requested.label.<label>
+ # counter zuul.nodepool.requested.size.<size>
+ # gauge zuul.nodepool.current_requests
+ state = request.state
+ if request.canceled:
+ state = 'canceled'
+ dt = None
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ dt = int((request.state_time - request.requested_time) * 1000)
+ else:
+ dt = None
+ key = 'zuul.nodepool.%s' % state
+ statsd.incr(key)
+ if dt:
+ statsd.timing(key, dt)
+ for node in request.nodeset.getNodes():
+ statsd.incr(key + '.label.%s' % node.label)
+ if dt:
+ statsd.timing(key + '.label.%s' % node.label, dt)
+ statsd.incr(key + '.size.%s' % len(request.nodeset.nodes))
+ if dt:
+ statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
+ statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
+
def requestNodes(self, build_set, job):
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
@@ -33,6 +62,7 @@
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
self.log.info("Submited node request %s" % (req,))
+ self.emitStats(req)
else:
self.log.info("Fulfilling empty node request %s" % (req,))
req.state = model.STATE_FULFILLED
@@ -142,6 +172,7 @@
if request.canceled:
del self.requests[request.uid]
+ self.emitStats(request)
return False
# TODOv3(jeblair): handle allocation failure
@@ -156,6 +187,8 @@
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]
+ self.emitStats(request)
+
# Stop watching this request node.
return False
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index ab147ba..e5924f8 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import extras
import json
import logging
import os
@@ -31,6 +30,7 @@
from zuul import exceptions
from zuul import version as zuul_version
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
class ManagementEvent(object):
@@ -211,7 +211,7 @@
self.executor = None
self.merger = None
self.connections = None
- self.statsd = extras.try_import('statsd.statsd')
+ self.statsd = get_statsd(config)
# TODO(jeblair): fix this
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
@@ -282,31 +282,34 @@
build.result = result
try:
if self.statsd and build.pipeline:
- jobname = build.job.name.replace('.', '_')
- key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+ tenant = build.pipeline.layout.tenant
+ jobname = build.job.name.replace('.', '_').replace('/', '_')
+ hostname = (build.build_set.item.change.project.
+ canonical_hostname.replace('.', '_'))
+ projectname = (build.build_set.item.change.project.name.
+ replace('.', '_').replace('/', '_'))
+ branchname = (build.build_set.item.change.branch.
+ replace('.', '_').replace('/', '_'))
+ basekey = 'zuul.tenant.%s' % tenant.name
+ pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
+ key = '%s.all_jobs' % pipekey
self.statsd.incr(key)
- for label in build.node_labels:
- # Jenkins includes the node name in its list of labels, so
- # we filter it out here, since that is not statistically
- # interesting.
- if label == build.node_name:
- continue
- dt = int((build.start_time - build.execute_time) * 1000)
- key = 'zuul.pipeline.%s.label.%s.wait_time' % (
- build.pipeline.name, label)
- self.statsd.timing(key, dt)
- key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
- jobname, build.result)
+ jobkey = '%s.project.%s.%s.%s.job.%s' % (
+ pipekey, hostname, projectname, branchname, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.<result>
+ key = '%s.%s' % (jobkey, build.result)
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
dt = int((build.end_time - build.start_time) * 1000)
self.statsd.timing(key, dt)
self.statsd.incr(key)
-
- key = 'zuul.pipeline.%s.job.%s.wait_time' % (
- build.pipeline.name, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.wait_time
+ key = '%s.wait_time' % jobkey
dt = int((build.start_time - build.execute_time) * 1000)
self.statsd.timing(key, dt)
- except:
+ except Exception:
self.log.exception("Exception reporting runtime stats")
event = BuildCompletedEvent(build)
self.result_event_queue.put(event)
@@ -852,8 +855,12 @@
try:
self.nodepool.holdNodeSet(nodeset, autohold_key)
except Exception:
- self.log.exception("Unable to process autohold for %s",
+ self.log.exception("Unable to process autohold for %s:",
autohold_key)
+ if autohold_key in self.autohold_requests:
+ self.log.debug("Removing autohold %s due to exception",
+ autohold_key)
+ del self.autohold_requests[autohold_key]
self.nodepool.returnNodeSet(nodeset)
except Exception:
diff --git a/zuul/zk.py b/zuul/zk.py
index 2fca749..ede78be 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -269,6 +269,9 @@
for nodeid in nodes:
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
node_data, node_stat = self.client.get(node_path)
+ if not node_data:
+ self.log.warning("Node ID %s has no data", nodeid)
+ continue
node_data = self._strToDict(node_data)
if (node_data['state'] == zuul.model.STATE_HOLD and
node_data.get('hold_job') == identifier):