Merge "Allow the pool_recycle to be configured" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index 2eadd4f..ede4391 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
name: openstack-infra/zuul
check:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
irrelevant-files:
- zuul/cmd/migrate.py
- playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
- playbooks/zuul-migrate/.*
gate:
jobs:
- - build-openstack-infra-sphinx-docs:
- success-url: 'html/feature/zuulv3/'
+ - build-openstack-sphinx-docs:
+ irrelevant-files:
+ - zuul/cmd/migrate.py
+ - playbooks/zuul-migrate/.*
- tox-pep8
- tox-py35:
irrelevant-files:
@@ -77,5 +78,5 @@
- zuul-stream-functional
post:
jobs:
- - publish-openstack-python-docs-infra
+ - publish-openstack-sphinx-docs-infra
- publish-openstack-python-branch-tarball
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index fbb8cbc..b20aba7 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -101,6 +101,27 @@
An openssl file containing the client private key in PEM format.
+.. attr:: statsd
+
+ Information about the optional statsd server. If the ``statsd``
+ python module is installed and this section is configured,
+ statistics will be reported to statsd. See :ref:`statsd` for more
+ information.
+
+ .. attr:: server
+
+ Hostname or IP address of the statsd server.
+
+ .. attr:: port
+ :default: 8125
+
+ The UDP port on which the statsd server is listening.
+
+ .. attr:: prefix
+
+ If present, this will be prefixed to all of the keys before
+ transmitting to the statsd server.
+
.. NOTE: this is a white lie at this point, since only the scheduler
uses this, however, we expect other components to use it later, so
it's reasonable for admins to plan for this now.
@@ -115,6 +136,11 @@
A list of zookeeper hosts for Zuul to use when communicating
with Nodepool.
+ .. attr:: session_timeout
+ :default: 10.0
+
+ The ZooKeeper session timeout, in seconds.
+
.. _scheduler:
@@ -260,6 +286,22 @@
Directory in which Zuul should clone git repositories.
+ .. attr:: git_http_low_speed_limit
+ :default: 1000
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in bytes, setting to 0 will disable.
+
+ .. attr:: git_http_low_speed_time
+ :default: 30
+
+ If the HTTP transfer speed is less then git_http_low_speed_limit for
+ longer then git_http_low_speed_time, the transfer is aborted.
+
+ Value in seconds, setting to 0 will disable.
+
.. attr:: git_user_email
Value to pass to `git config user.email
@@ -463,6 +505,32 @@
`bubblewrap` has become integral to securely operating Zuul. If you
have a valid use case for it, we encourage you to let us know.
+ .. attr:: load_multiplier
+ :default: 2.5
+
+ When an executor host gets too busy, the system may suffer
+ timeouts and other ill effects. The executor will stop accepting
+ more than 1 job at a time until load has lowered below a safe
+ level. This level is determined by multiplying the number of
+ CPU's by `load_multiplier`.
+
+ So for example, if the system has 2 CPUs, and load_multiplier
+ is 2.5, the safe load for the system is 5.00. Any time the
+ system load average is over 5.00, the executor will quit
+ accepting multiple jobs at one time.
+
+ The executor will observe system load and determine whether
+ to accept more jobs every 30 seconds.
+
+ .. attr:: hostname
+ :default: hostname of the server
+
+ The executor needs to know its hostname under which it is reachable by
+ zuul-web. Otherwise live console log streaming doesn't work. In most cases
+ This is automatically detected correctly. But when running in environments
+ where it cannot determine its hostname correctly this can be overridden
+ here.
+
.. attr:: merger
.. attr:: git_user_email
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 4fed1f9..55f1908 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -3,6 +3,8 @@
Monitoring
==========
+.. _statsd:
+
Statsd reporting
----------------
@@ -13,37 +15,24 @@
Configuration
~~~~~~~~~~~~~
-Statsd support uses the statsd python module. Note that Zuul will start without
-the statsd python module, so an existing Zuul installation may be missing it.
+Statsd support uses the ``statsd`` python module. Note that support
+is optional and Zuul will start without the statsd python module
+present.
-The configuration is done via environment variables STATSD_HOST and
-STATSD_PORT. They are interpreted by the statsd module directly and there is no
-such parameter in zuul.conf yet. Your init script will have to initialize both
-of them before executing Zuul.
-
-Your init script most probably loads a configuration file named
-``/etc/default/zuul`` which would contain the environment variables::
-
- $ cat /etc/default/zuul
- STATSD_HOST=10.0.0.1
- STATSD_PORT=8125
+Configuration is in the :attr:`statsd` section of ``zuul.conf``.
Metrics
~~~~~~~
These metrics are emitted by the Zuul :ref:`scheduler`:
-.. stat:: gerrit.event.<type>
+.. stat:: zuul.event.<driver>.event.<type>
:type: counter
- Gerrit emits different kinds of messages over its `stream-events`
- interface. Zuul will report counters for each type of event it
- receives from Gerrit.
+ Zuul will report counters for each type of event it receives from
+ each of its configured drivers.
- Refer to your Gerrit installation documentation for a complete
- list of Gerrit event types.
-
-.. stat:: zuul.pipeline
+.. stat:: zuul.tenant.<tenant>.pipeline
Holds metrics specific to jobs. This hierarchy includes:
@@ -63,22 +52,60 @@
The number of items currently being processed by this
pipeline.
- .. stat:: job
+ .. stat:: project
- Subtree detailing per jobs statistics:
+ This hierarchy holds more specific metrics for each project
+ participating in the pipeline.
- .. stat:: <jobname>
+ .. stat:: <canonical_hostname>
- The triggered job name.
+ The canonical hostname for the triggering project.
+ Embedded ``.`` characters will be translated to ``_``.
- .. stat:: <result>
- :type: counter, timer
+ .. stat:: <project>
- A counter for each type of result (e.g., ``SUCCESS`` or
- ``FAILURE``, ``ERROR``, etc.) for the job. If the
- result is ``SUCCESS`` or ``FAILURE``, Zuul will
- additionally report the duration of the build as a
- timer.
+ The name of the triggering project. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: <branch>
+
+ The name of the triggering branch. Embedded ``/`` or
+ ``.`` characters will be translated to ``_``.
+
+ .. stat:: job
+
+ Subtree detailing per-project job statistics:
+
+ .. stat:: <jobname>
+
+ The triggered job name.
+
+ .. stat:: <result>
+ :type: counter, timer
+
+ A counter for each type of result (e.g., ``SUCCESS`` or
+ ``FAILURE``, ``ERROR``, etc.) for the job. If the
+ result is ``SUCCESS`` or ``FAILURE``, Zuul will
+ additionally report the duration of the build as a
+ timer.
+
+ .. stat:: current_changes
+ :type: gauge
+
+ The number of items of this project currently being
+ processed by this pipeline.
+
+ .. stat:: resident_time
+ :type: timer
+
+ A timer metric reporting how long each item for this
+ project has been in the pipeline.
+
+ .. stat:: total_changes
+ :type: counter
+
+ The number of changes for this project processed by the
+ pipeline since Zuul started.
.. stat:: resident_time
:type: timer
@@ -98,34 +125,111 @@
How long each item spent in the pipeline before its first job
started.
- .. stat:: <project>
+.. stat:: zuul.executor.<executor>
- This hierarchy holds more specific metrics for each project
- participating in the pipeline. If the project name contains
- a ``/`` character, it will be replaced with a ``.``.
+ Holds metrics emitted by individual executors. The ``<executor>``
+ component of the key will be replaced with the hostname of the
+ executor.
- .. stat:: current_changes
- :type: gauge
+ .. stat:: builds
+ :type: counter
- The number of items of this project currently being
- processed by this pipeline.
+ Incremented each time the executor starts a build.
- .. stat:: resident_time
- :type: timer
+ .. stat:: running_builds
+ :type: gauge
- A timer metric reporting how long each item for this
- project has been in the pipeline.
+ The number of builds currently running on this executor.
- .. stat:: total_changes
- :type: counter
+ .. stat:: load_average
+ :type: gauge
- The number of changes for this project processed by the
- pipeline since Zuul started.
+ The one-minute load average of this executor, multiplied by 100.
-As an example, given a job named `myjob` triggered by the `gate` pipeline
-which took 40 seconds to build, the Zuul scheduler will emit the following
-statsd events:
+.. stat:: zuul.nodepool
- * ``zuul.pipeline.gate.job.myjob.SUCCESS`` +1
- * ``zuul.pipeline.gate.job.myjob`` 40 seconds
- * ``zuul.pipeline.gate.all_jobs`` +1
+ Holds metrics related to Zuul requests from Nodepool.
+
+ .. stat:: requested
+ :type: counter
+
+ Incremented each time a node request is submitted to Nodepool.
+
+ .. stat:: label.<label>
+ :type: counter
+
+ Incremented each time a request for a specific label is
+ submitted to Nodepool.
+
+ .. stat:: size.<size>
+ :type: counter
+
+ Incremented each time a request of a specific size is submitted
+ to Nodepool. For example, a request for 3 nodes would use the
+ key ``zuul.nodepool.requested.size.3``.
+
+ .. stat:: canceled
+ :type: counter, timer
+
+ The counter is incremented each time a node request is canceled
+ by Zuul. The timer records the elapsed time from request to
+ cancelation.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: fulfilled
+ :type: counter, timer
+
+ The counter is incremented each time a node request is fulfilled
+ by Nodepool. The timer records the elapsed time from request to
+ fulfillment.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: failed
+ :type: counter, timer
+
+ The counter is incremented each time Nodepool fails to fulfill a
+ node request. The timer records the elapsed time from request
+ to failure.
+
+ .. stat:: label.<label>
+ :type: counter, timer
+
+ The same, for a specific label.
+
+ .. stat:: size.<size>
+ :type: counter, timer
+
+ The same, for a specific request size.
+
+ .. stat:: current_requests
+ :type: gauge
+
+ The number of outstanding nodepool requests from Zuul.
+
+
+As an example, given a job named `myjob` in `mytenant` triggered by a
+change to `myproject` on the `master` branch in the `gate` pipeline
+which took 40 seconds to build, the Zuul scheduler will emit the
+following statsd events:
+
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` +1
+ * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` 40 seconds
+ * ``zuul.tenant.mytenant.pipeline.gate.all_jobs`` +1
diff --git a/doc/source/user/config.rst b/doc/source/user/config.rst
index 025ea71..c4014e2 100644
--- a/doc/source/user/config.rst
+++ b/doc/source/user/config.rst
@@ -653,10 +653,22 @@
configuration, Zuul reads the ``master`` branch of a given
project first, then other branches in alphabetical order.
+ * In the case of a job variant defined within a :ref:`project`,
+ if the project definition is in a :term:`config-project`, no
+ implied branch specifier is used. If it appears in an
+ :term:`untrusted-project`, with no branch specifier, the
+ branch containing the project definition is used as an implied
+ branch specifier.
+
+ * In the case of a job variant defined within a
+ :ref:`project-template`, if no branch specifier appears, the
+ implied branch specifier for the :ref:`project` definition which
+ uses the project-template will be used.
+
* Any further job variants other than the reference definition
in an untrusted-project will, if they do not have a branch
- specifier, will have an implied branch specifier for the
- current branch applied.
+ specifier, have an implied branch specifier for the current
+ branch applied.
This allows for the very simple and expected workflow where if a
project defines a job on the ``master`` branch with no branch
@@ -940,12 +952,12 @@
A boolean value which indicates whether this job may only be
used in pipelines where :attr:`pipeline.post-review` is
- ``true``. This is automatically set to ``true`` if this job is
- defined in a :term:`untrusted-project`. It may be explicitly
- set to obtain the same behavior for jobs defined in
- :term:`config projects <config-project>`. Once this is set to
- ``true`` anywhere in the inheritance hierarchy for a job, it
- will remain set for all child jobs and variants (it can not be
+ ``true``. This is automatically set to ``true`` if this job
+ uses a :ref:`secret` and is defined in a :term:`untrusted-project`.
+ It may be explicitly set to obtain the same behavior for jobs
+ defined in :term:`config projects <config-project>`. Once this
+ is set to ``true`` anywhere in the inheritance hierarchy for a job,
+ it will remain set for all child jobs and variants (it can not be
set to ``false``).
.. _project:
diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst
index 3d24f5d..cf607b9 100644
--- a/doc/source/user/jobs.rst
+++ b/doc/source/user/jobs.rst
@@ -244,6 +244,18 @@
The path to the source code, relative to the work dir. E.g.,
`src/git.example.com/org/project`.
+ .. var:: required
+
+ A boolean indicating whether this project appears in the
+ :attr:`job.required-projects` list for this job.
+
+ .. var:: _projects
+ :type: dict
+
+ The same as ``projects`` but a dictionary indexed by the
+ ``name`` value of each entry. ``projects`` will be converted to
+ this.
+
.. var:: tenant
The name of the current Zuul tenant.
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index 70e999e..50dbed5 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -53,6 +53,7 @@
'msg_id': '#zuul_msg',
'pipelines_id': '#zuul_pipelines',
'queue_events_num': '#zuul_queue_events_num',
+ 'queue_management_events_num': '#zuul_queue_management_events_num',
'queue_results_num': '#zuul_queue_results_num',
}, options);
@@ -281,38 +282,38 @@
change_header: function(change) {
var change_id = change.id || 'NA';
- if (change_id.length === 40) {
- change_id = change_id.substr(0, 7);
- }
var $change_link = $('<small />');
if (change.url !== null) {
- var github_id = change.id.match(/^([0-9]+),([0-9a-f]{40})$/);
+ var github_id = change_id.match(/^([0-9]+),([0-9a-f]{40})$/);
if (github_id) {
$change_link.append(
$('<a />').attr('href', change.url).append(
$('<abbr />')
- .attr('title', change.id)
+ .attr('title', change_id)
.text('#' + github_id[1])
)
);
- } else if (/^[0-9a-f]{40}$/.test(change.id)) {
- var change_id_short = change.id.slice(0, 7);
+ } else if (/^[0-9a-f]{40}$/.test(change_id)) {
+ var change_id_short = change_id.slice(0, 7);
$change_link.append(
$('<a />').attr('href', change.url).append(
$('<abbr />')
- .attr('title', change.id)
+ .attr('title', change_id)
.text(change_id_short)
)
);
}
else {
$change_link.append(
- $('<a />').attr('href', change.url).text(change.id)
+ $('<a />').attr('href', change.url).text(change_id)
);
}
}
else {
+ if (change_id.length === 40) {
+ change_id = change_id.substr(0, 7);
+ }
$change_link.text(change_id);
}
@@ -713,6 +714,10 @@
data.trigger_event_queue ?
data.trigger_event_queue.length : '0'
);
+ $(options.queue_management_events_num).text(
+ data.management_event_queue ?
+ data.management_event_queue.length : '0'
+ );
$(options.queue_results_num).text(
data.result_event_queue ?
data.result_event_queue.length : '0'
diff --git a/etc/status/public_html/zuul.app.js b/etc/status/public_html/zuul.app.js
index ae950e8..7ceb2dd 100644
--- a/etc/status/public_html/zuul.app.js
+++ b/etc/status/public_html/zuul.app.js
@@ -33,7 +33,7 @@
+ '<div class="zuul-container" id="zuul-container">'
+ '<div style="display: none;" class="alert" id="zuul_msg"></div>'
+ '<button class="btn pull-right zuul-spinner">updating <span class="glyphicon glyphicon-refresh"></span></button>'
- + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_results_num">0</span> results.</p>'
+ + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_management_events_num">0</span> management events, <span id="zuul_queue_results_num">0</span> results.</p>'
+ '<div id="zuul_controls"></div>'
+ '<div id="zuul_pipelines" class="row"></div>'
+ '<p>Zuul version: <span id="zuul-version-span"></span></p>'
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index ba7aace..76494ad 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -5,6 +5,9 @@
;ssl_cert=/path/to/client.pem
;ssl_key=/path/to/client.key
+[statsd]
+server=127.0.0.1
+
[zookeeper]
hosts=127.0.0.1:2181
diff --git a/requirements.txt b/requirements.txt
index cdffda2..4b8be3c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,12 +2,16 @@
# pull from master until https://github.com/sigmavirus24/github3.py/pull/671
# is in a release
--e git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
+git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
PyYAML>=3.1.0
Paste
WebOb>=1.2.3
paramiko>=1.8.0,<2.0.0
-GitPython>=0.3.3,<2.1.2
+# Using a local fork of gitpython until at least these changes are in a
+# release.
+# https://github.com/gitpython-developers/GitPython/pull/682
+# https://github.com/gitpython-developers/GitPython/pull/686
+git+https://github.com/jeblair/GitPython.git@zuul#egg=GitPython
python-daemon>=2.0.4,<2.1.0
extras
statsd>=1.0.0,<3.0
diff --git a/tests/base.py b/tests/base.py
index 2e3d682..a02ee5a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -20,7 +20,6 @@
import datetime
import gc
import hashlib
-import importlib
from io import StringIO
import json
import logging
@@ -48,7 +47,6 @@
import kazoo.client
import kazoo.exceptions
import pymysql
-import statsd
import testtools
import testtools.content
import testtools.content_type
@@ -1363,6 +1361,63 @@
return repos
+class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
+ def doMergeChanges(self, merger, items, repo_state):
+ # Get a merger in order to update the repos involved in this job.
+ commit = super(RecordingAnsibleJob, self).doMergeChanges(
+ merger, items, repo_state)
+ if not commit: # merge conflict
+ self.recordResult('MERGER_FAILURE')
+ return commit
+
+ def recordResult(self, result):
+ build = self.executor_server.job_builds[self.job.unique]
+ self.executor_server.lock.acquire()
+ self.executor_server.build_history.append(
+ BuildHistory(name=build.name, result=result, changes=build.changes,
+ node=build.node, uuid=build.unique,
+ ref=build.parameters['zuul']['ref'],
+ parameters=build.parameters, jobdir=build.jobdir,
+ pipeline=build.parameters['zuul']['pipeline'])
+ )
+ self.executor_server.running_builds.remove(build)
+ del self.executor_server.job_builds[self.job.unique]
+ self.executor_server.lock.release()
+
+ def runPlaybooks(self, args):
+ build = self.executor_server.job_builds[self.job.unique]
+ build.jobdir = self.jobdir
+
+ result = super(RecordingAnsibleJob, self).runPlaybooks(args)
+ self.recordResult(result)
+ return result
+
+ def runAnsible(self, cmd, timeout, playbook, wrapped=True):
+ build = self.executor_server.job_builds[self.job.unique]
+
+ if self.executor_server._run_ansible:
+ result = super(RecordingAnsibleJob, self).runAnsible(
+ cmd, timeout, playbook, wrapped)
+ else:
+ if playbook.path:
+ result = build.run()
+ else:
+ result = (self.RESULT_NORMAL, 0)
+ return result
+
+ def getHostList(self, args):
+ self.log.debug("hostlist")
+ hosts = super(RecordingAnsibleJob, self).getHostList(args)
+ for host in hosts:
+ host['host_vars']['ansible_connection'] = 'local'
+
+ hosts.append(dict(
+ name='localhost',
+ host_vars=dict(ansible_connection='local'),
+ host_keys=[]))
+ return hosts
+
+
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests.
@@ -1374,6 +1429,9 @@
be explicitly released.
"""
+
+ _job_class = RecordingAnsibleJob
+
def __init__(self, *args, **kw):
self._run_ansible = kw.pop('_run_ansible', False)
self._test_root = kw.pop('_test_root', False)
@@ -1428,8 +1486,7 @@
args = json.loads(job.arguments)
args['zuul']['_test'] = dict(test_root=self._test_root)
job.arguments = json.dumps(args)
- self.job_workers[job.unique] = RecordingAnsibleJob(self, job)
- self.job_workers[job.unique].run()
+ super(RecordingExecutorServer, self).executeJob(job)
def stopJob(self, job):
self.log.debug("handle stop")
@@ -1447,63 +1504,6 @@
super(RecordingExecutorServer, self).stop()
-class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
- def doMergeChanges(self, merger, items, repo_state):
- # Get a merger in order to update the repos involved in this job.
- commit = super(RecordingAnsibleJob, self).doMergeChanges(
- merger, items, repo_state)
- if not commit: # merge conflict
- self.recordResult('MERGER_FAILURE')
- return commit
-
- def recordResult(self, result):
- build = self.executor_server.job_builds[self.job.unique]
- self.executor_server.lock.acquire()
- self.executor_server.build_history.append(
- BuildHistory(name=build.name, result=result, changes=build.changes,
- node=build.node, uuid=build.unique,
- ref=build.parameters['zuul']['ref'],
- parameters=build.parameters, jobdir=build.jobdir,
- pipeline=build.parameters['zuul']['pipeline'])
- )
- self.executor_server.running_builds.remove(build)
- del self.executor_server.job_builds[self.job.unique]
- self.executor_server.lock.release()
-
- def runPlaybooks(self, args):
- build = self.executor_server.job_builds[self.job.unique]
- build.jobdir = self.jobdir
-
- result = super(RecordingAnsibleJob, self).runPlaybooks(args)
- self.recordResult(result)
- return result
-
- def runAnsible(self, cmd, timeout, playbook):
- build = self.executor_server.job_builds[self.job.unique]
-
- if self.executor_server._run_ansible:
- result = super(RecordingAnsibleJob, self).runAnsible(
- cmd, timeout, playbook)
- else:
- if playbook.path:
- result = build.run()
- else:
- result = (self.RESULT_NORMAL, 0)
- return result
-
- def getHostList(self, args):
- self.log.debug("hostlist")
- hosts = super(RecordingAnsibleJob, self).getHostList(args)
- for host in hosts:
- host['host_vars']['ansible_connection'] = 'local'
-
- hosts.append(dict(
- name='localhost',
- host_vars=dict(ansible_connection='local'),
- host_keys=[]))
- return hosts
-
-
class FakeGearmanServer(gear.Server):
"""A Gearman server for use in tests.
@@ -1518,6 +1518,7 @@
def __init__(self, use_ssl=False):
self.hold_jobs_in_queue = False
+ self.hold_merge_jobs_in_queue = False
if use_ssl:
ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
@@ -1537,6 +1538,8 @@
if not hasattr(job, 'waiting'):
if job.name.startswith(b'executor:execute'):
job.waiting = self.hold_jobs_in_queue
+ elif job.name.startswith(b'merger:'):
+ job.waiting = self.hold_merge_jobs_in_queue
else:
job.waiting = False
if job.waiting:
@@ -1562,10 +1565,15 @@
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
- if job.name != b'executor:execute':
- continue
- parameters = json.loads(job.arguments.decode('utf8'))
- if not regex or re.match(regex, parameters.get('job')):
+ match = False
+ if job.name == b'executor:execute':
+ parameters = json.loads(job.arguments.decode('utf8'))
+ if not regex or re.match(regex, parameters.get('job')):
+ match = True
+ if job.name.startswith(b'merger:'):
+ if not regex:
+ match = True
+ if match:
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
@@ -2039,14 +2047,9 @@
self.config.set('executor', 'state_dir', self.executor_state_root)
self.statsd = FakeStatsd()
- # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
- # see: https://github.com/jsocol/pystatsd/issues/61
- os.environ['STATSD_HOST'] = '127.0.0.1'
- os.environ['STATSD_PORT'] = str(self.statsd.port)
+ if self.config.has_section('statsd'):
+ self.config.set('statsd', 'port', str(self.statsd.port))
self.statsd.start()
- # the statsd client object is configured in the statsd module import
- importlib.reload(statsd)
- importlib.reload(zuul.scheduler)
self.gearman_server = FakeGearmanServer(self.use_ssl)
@@ -2381,7 +2384,7 @@
# before noticing they should exit, but they should exit on their own.
# Further the pydevd threads also need to be whitelisted so debugging
# e.g. in PyCharm is possible without breaking shutdown.
- whitelist = ['executor-watchdog',
+ whitelist = ['watchdog',
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',
@@ -2556,6 +2559,26 @@
return False
return True
+ def areAllMergeJobsWaiting(self):
+ for client_job in list(self.merge_client.jobs):
+ if not client_job.handle:
+ self.log.debug("%s has no handle" % client_job)
+ return False
+ server_job = self.gearman_server.jobs.get(client_job.handle)
+ if not server_job:
+ self.log.debug("%s is not known to the gearman server" %
+ client_job)
+ return False
+ if not hasattr(server_job, 'waiting'):
+ self.log.debug("%s is being enqueued" % server_job)
+ return False
+ if server_job.waiting:
+ self.log.debug("%s is waiting" % server_job)
+ continue
+ self.log.debug("%s is not waiting" % server_job)
+ return False
+ return True
+
def eventQueuesEmpty(self):
for event_queue in self.event_queues:
yield event_queue.empty()
@@ -2592,7 +2615,7 @@
# processed
self.eventQueuesJoin()
self.sched.run_handler_lock.acquire()
- if (not self.merge_client.jobs and
+ if (self.areAllMergeJobsWaiting() and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting() and
self.areAllNodeRequestsComplete() and
@@ -2620,6 +2643,13 @@
return build
raise Exception("Unable to find build %s" % name)
+ def assertJobNotInHistory(self, name, project=None):
+ for job in self.history:
+ if (project is None or
+ job.parameters['zuul']['project']['name'] == project):
+ self.assertNotEqual(job.name, name,
+ 'Job %s found in history' % name)
+
def getJobFromHistory(self, name, project=None):
for job in self.history:
if (job.name == name and
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
new file mode 100644
index 0000000..89d98a9
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
@@ -0,0 +1,25 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+ pre-run: playbooks/base/pre
+ post-run:
+ - playbooks/base/post-ssh
+ - playbooks/base/post-logs
+
+- project:
+ name: project-config
+ check:
+ jobs: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
new file mode 100644
index 0000000..2545208
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
@@ -0,0 +1,24 @@
+- job:
+ name: puppet-base
+ pre-run: playbooks/prepare-node-common
+
+- job:
+ name: puppet-module-base
+ parent: puppet-base
+ pre-run: playbooks/prepare-node-unit
+
+- job:
+ name: puppet-lint
+ parent: puppet-module-base
+ run: playbooks/run-lint
+
+- project-template:
+ name: puppet-check-jobs
+ check:
+ jobs:
+ - puppet-lint
+
+- project:
+ name: puppet-integration
+ templates:
+ - puppet-check-jobs
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/README b/tests/fixtures/config/branch-variants/git/puppet-integration/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/branch-variants/main.yaml b/tests/fixtures/config/branch-variants/main.yaml
new file mode 100644
index 0000000..ad2b2f6
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - project-config
+ untrusted-projects:
+ - puppet-integration
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
new file mode 100644
index 0000000..bd46718
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+ tasks:
+ - shell: echo "Failure test {{ zuul.executor.src_root }}"
+ - shell: exit 1
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index a83f0bc..f182d8d 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -20,8 +20,19 @@
parent: base
name: job-output
+- job:
+ name: job-output-failure
+ run: playbooks/job-output
+ post-run: playbooks/job-output-failure-post
+
- project:
name: org/project
check:
jobs:
- job-output
+
+- project:
+ name: org/project2
+ check:
+ jobs:
+ - job-output-failure
diff --git a/tests/fixtures/config/job-output/git/org_project2/README b/tests/fixtures/config/job-output/git/org_project2/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project2/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
index 208e274..14b382f 100644
--- a/tests/fixtures/config/job-output/main.yaml
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -6,3 +6,4 @@
- common-config
untrusted-projects:
- org/project
+ - org/project2
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
new file mode 100644
index 0000000..40b3ca7
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
@@ -0,0 +1,13 @@
+- hosts: all
+ tasks:
+ - debug: var=waitpath
+ - file:
+ path: "{{zuul._test.test_root}}/{{zuul.build}}.post_start.flag"
+ state: touch
+ # Do not finish until test creates the flag file
+ - wait_for:
+ state: present
+ path: "{{waitpath}}"
+ - file:
+ path: "{{zuul._test.test_root}}/{{zuul.build}}.post_end.flag"
+ state: touch
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
new file mode 100644
index 0000000..92a5515
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -0,0 +1,24 @@
+- pipeline:
+ name: check
+ manager: independent
+ post-review: true
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: python27
+ pre-run: playbooks/pre
+ post-run: playbooks/post
+ vars:
+ waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
diff --git a/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
new file mode 100644
index 0000000..89a5674
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
@@ -0,0 +1,5 @@
+- project:
+ name: org/project
+ check:
+ jobs:
+ - python27
diff --git a/tests/fixtures/config/post-playbook/git/org_project/README b/tests/fixtures/config/post-playbook/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/post-playbook/main.yaml b/tests/fixtures/config/post-playbook/main.yaml
new file mode 100644
index 0000000..6033879
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/main.yaml
@@ -0,0 +1,9 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project
+
diff --git a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
index 14f43f4..2160ef9 100644
--- a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
@@ -39,6 +39,7 @@
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
+ precedence: low
- job:
name: base
diff --git a/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml b/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml b/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
index b8f4d67..8fce9e7 100644
--- a/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
@@ -27,6 +27,9 @@
- job:
name: project-test2
+- job:
+ name: project-test3
+
- project:
name: org/project
check:
@@ -36,3 +39,9 @@
dependencies: project-merge
- project-test2:
dependencies: project-merge
+ # Make sure we have a "SKIPPED" result
+ - project-test3:
+ dependencies: project-test1
+ # The noop job can have timing quirks
+ - noop:
+ dependencies: project-test2
diff --git a/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml
new file mode 100644
index 0000000..6d1892c
--- /dev/null
+++ b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml
@@ -0,0 +1,4 @@
+- project:
+ name: untrusted-config
+ templates:
+ - test-one-and-two
diff --git a/tests/fixtures/config/templated-project/git/common-config/zuul.d/templates.yaml b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/templates.yaml
similarity index 100%
rename from tests/fixtures/config/templated-project/git/common-config/zuul.d/templates.yaml
rename to tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/templates.yaml
diff --git a/tests/fixtures/config/templated-project/main.yaml b/tests/fixtures/config/templated-project/main.yaml
index e59b396..bb59838 100644
--- a/tests/fixtures/config/templated-project/main.yaml
+++ b/tests/fixtures/config/templated-project/main.yaml
@@ -5,5 +5,6 @@
config-projects:
- common-config
untrusted-projects:
+ - untrusted-config
- org/templated-project
- org/layered-project
diff --git a/tests/fixtures/fake_git.sh b/tests/fixtures/fake_git.sh
new file mode 100755
index 0000000..5b787b7
--- /dev/null
+++ b/tests/fixtures/fake_git.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+ clone)
+ dest=$3
+ mkdir -p $dest/.git
+ ;;
+ version)
+ echo "git version 1.0.0"
+ exit 0
+ ;;
+esac
+sleep 30
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
index e97d37a..c89e2fa 100644
--- a/tests/fixtures/layouts/delayed-repo-init.yaml
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -67,7 +67,7 @@
dependencies: project-merge
gate:
jobs:
- - project-merge:
+ - project-merge
- project-test1:
dependencies: project-merge
- project-test2:
diff --git a/tests/fixtures/layouts/job-vars.yaml b/tests/fixtures/layouts/job-vars.yaml
new file mode 100644
index 0000000..22fc5c2
--- /dev/null
+++ b/tests/fixtures/layouts/job-vars.yaml
@@ -0,0 +1,75 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: parentjob
+ parent: base
+ required-projects:
+ - org/project0
+ vars:
+ override: 0
+ child1override: 0
+ parent: 0
+
+- job:
+ name: child1
+ parent: parentjob
+ required-projects:
+ - org/project1
+ vars:
+ override: 1
+ child1override: 1
+ child1: 1
+
+- job:
+ name: child2
+ parent: parentjob
+ required-projects:
+ - org/project2
+ vars:
+ override: 2
+ child2: 2
+
+- job:
+ name: child3
+ parent: parentjob
+
+- project:
+ name: org/project
+ check:
+ jobs:
+ - parentjob
+ - child1
+ - child2
+ - child3:
+ required-projects:
+ - org/project3
+ vars:
+ override: 3
+ child3: 3
+
+- project:
+ name: org/project0
+
+- project:
+ name: org/project1
+
+- project:
+ name: org/project2
+
+- project:
+ name: org/project3
diff --git a/tests/fixtures/layouts/matcher-test.yaml b/tests/fixtures/layouts/matcher-test.yaml
new file mode 100644
index 0000000..b511a2f
--- /dev/null
+++ b/tests/fixtures/layouts/matcher-test.yaml
@@ -0,0 +1,63 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- pipeline:
+ name: gate
+ manager: dependent
+ success-message: Build succeeded (gate).
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - Approved: 1
+ success:
+ gerrit:
+ Verified: 2
+ submit: true
+ failure:
+ gerrit:
+ Verified: -2
+ start:
+ gerrit:
+ Verified: 0
+ precedence: high
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: project-test1
+ nodeset:
+ nodes:
+ - name: controller
+ label: label1
+
+- job:
+ name: ignore-branch
+ branches: ^(?!featureA).*$
+ nodeset:
+ nodes:
+ - name: controller
+ label: label2
+
+- project:
+ name: org/project
+ check:
+ jobs:
+ - project-test1
+ - ignore-branch
+ gate:
+ jobs:
+ - project-test1
+ - ignore-branch
diff --git a/tests/fixtures/layouts/multiple-templates.yaml b/tests/fixtures/layouts/multiple-templates.yaml
new file mode 100644
index 0000000..7272cad
--- /dev/null
+++ b/tests/fixtures/layouts/multiple-templates.yaml
@@ -0,0 +1,44 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: py27
+
+- project-template:
+ name: python-jobs
+ check:
+ jobs:
+ - py27
+
+- project-template:
+ name: python-trusty-jobs
+ check:
+ jobs:
+ - py27:
+ tags:
+ - trusty
+
+- project:
+ name: org/project1
+ templates:
+ - python-jobs
+ - python-trusty-jobs
+
+- project:
+ name: org/project2
+ templates:
+ - python-jobs
diff --git a/tests/fixtures/zuul-executor-hostname.conf b/tests/fixtures/zuul-executor-hostname.conf
new file mode 100644
index 0000000..7db144d
--- /dev/null
+++ b/tests/fixtures/zuul-executor-hostname.conf
@@ -0,0 +1,32 @@
+[gearman]
+server=127.0.0.1
+
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
+[scheduler]
+tenant_config=main.yaml
+
+[merger]
+git_dir=/tmp/zuul-test/merger-git
+git_user_email=zuul@example.com
+git_user_name=zuul
+
+[executor]
+git_dir=/tmp/zuul-test/executor-git
+hostname=test-executor-hostname.openstack.org
+
+[connection gerrit]
+driver=gerrit
+server=review.example.com
+user=jenkins
+sshkey=fake_id_rsa_path
+
+[connection smtp]
+driver=smtp
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index 7bc8c59..e6f997c 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -1,6 +1,11 @@
[gearman]
server=127.0.0.1
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
[scheduler]
tenant_config=main.yaml
diff --git a/tests/print_layout.py b/tests/print_layout.py
index a295886..055270f 100644
--- a/tests/print_layout.py
+++ b/tests/print_layout.py
@@ -59,6 +59,14 @@
if fn in ['zuul.yaml', '.zuul.yaml']:
print_file('File: ' + os.path.join(gitrepo, fn),
os.path.join(reporoot, fn))
+ for subdir in ['.zuul.d', 'zuul.d']:
+ zuuld = os.path.join(reporoot, subdir)
+ if not os.path.exists(zuuld):
+ continue
+ filenames = os.listdir(zuuld)
+ for fn in filenames:
+ print_file('File: ' + os.path.join(gitrepo, subdir, fn),
+ os.path.join(zuuld, fn))
if __name__ == '__main__':
diff --git a/tests/unit/test_change_matcher.py b/tests/unit/test_change_matcher.py
index 6b161a1..3d5345f 100644
--- a/tests/unit/test_change_matcher.py
+++ b/tests/unit/test_change_matcher.py
@@ -60,7 +60,7 @@
self.assertTrue(self.matcher.matches(self.change))
def test_matches_returns_true_on_matching_ref(self):
- self.change.branch = 'bar'
+ delattr(self.change, 'branch')
self.change.ref = 'foo'
self.assertTrue(self.matcher.matches(self.change))
@@ -69,11 +69,6 @@
self.change.ref = 'baz'
self.assertFalse(self.matcher.matches(self.change))
- def test_matches_returns_false_for_missing_attrs(self):
- delattr(self.change, 'branch')
- # ref is by default not an attribute
- self.assertFalse(self.matcher.matches(self.change))
-
class TestFileMatcher(BaseTestMatcher):
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index 6815f83..719f307 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -74,6 +74,7 @@
def test_sql_results(self):
"Test results are entered into an sql table"
+ self.executor_server.hold_jobs_in_build = True
# Grab the sa tables
tenant = self.sched.abide.tenants.get('tenant-one')
reporter = _get_reporter_from_connection_name(
@@ -85,6 +86,8 @@
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
+ self.orderedRelease()
+ self.waitUntilSettled()
# Add a failed result
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@@ -92,6 +95,8 @@
self.executor_server.failJob('project-test1', B)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
+ self.orderedRelease()
+ self.waitUntilSettled()
conn = self.connections.connections['resultsdb'].engine.connect()
result = conn.execute(
@@ -143,15 +148,15 @@
)
).fetchall()
- # Check the second last result, which should be the project-test1 job
+ # Check the second result, which should be the project-test1 job
# which failed
- self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
- self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
+ self.assertEqual('project-test1', buildset1_builds[1]['job_name'])
+ self.assertEqual("FAILURE", buildset1_builds[1]['result'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
- uuid=buildset1_builds[-2]['uuid']),
- buildset1_builds[-2]['log_url'])
+ uuid=buildset1_builds[1]['uuid']),
+ buildset1_builds[1]['log_url'])
def test_multiple_sql_connections(self):
"Test putting results in different databases"
diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py
index 9c45645..ac7ae34 100755
--- a/tests/unit/test_executor.py
+++ b/tests/unit/test_executor.py
@@ -401,3 +401,12 @@
node['ssh_port'] = 22022
keys = self.test_job.getHostList({'nodes': [node]})[0]['host_keys']
self.assertEqual(keys[0], '[localhost]:22022 fake-host-key')
+
+
+class TestExecutorHostname(ZuulTestCase):
+ config_file = 'zuul-executor-hostname.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ def test_executor_hostname(self):
+ self.assertEqual('test-executor-hostname.openstack.org',
+ self.executor_server.hostname)
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..ec30a2b 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -19,9 +19,10 @@
import os
import git
+import testtools
from zuul.merger.merger import Repo
-from tests.base import ZuulTestCase
+from tests.base import ZuulTestCase, FIXTURE_DIR
class TestMergerRepo(ZuulTestCase):
@@ -49,7 +50,7 @@
msg='.git file in submodule should be a file')
work_repo = Repo(parent_path, self.workspace_root,
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(
os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +61,7 @@
sub_repo = Repo(
os.path.join(self.upstream_root, 'org/project2'),
os.path.join(self.workspace_root, 'subdir'),
- 'none@example.org', 'User Name')
+ 'none@example.org', 'User Name', '0', '0')
self.assertTrue(os.path.exists(
os.path.join(self.workspace_root, 'subdir', '.git')),
msg='Cloned over the submodule placeholder')
@@ -74,3 +75,28 @@
os.path.join(self.upstream_root, 'org/project2'),
sub_repo.createRepoObject().remotes[0].url,
message="Sub repository points to upstream project2")
+
+ def test_clone_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0',
+ git_timeout=0.001)
+ # TODO: have the merger and repo classes catch fewer
+ # exceptions, including this one on initialization. For the
+ # test, we try cloning again.
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo._ensure_cloned()
+
+ def test_fetch_timeout(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0')
+ work_repo.git_timeout = 0.001
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+ with testtools.ExpectedException(git.exc.GitCommandError,
+ '.*exit code\(-9\)'):
+ work_repo.update()
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index c457ff0..628a45c 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -246,7 +246,11 @@
})
layout.addJob(python27essex)
- project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+ project_template_parser = configloader.ProjectTemplateParser(
+ tenant, layout)
+ project_parser = configloader.ProjectParser(
+ tenant, layout, project_template_parser)
+ project_config = project_parser.fromYaml([{
'_source_context': self.context,
'_start_mark': self.start_mark,
'name': 'project',
@@ -262,7 +266,7 @@
# Test master
change.branch = 'master'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -291,7 +295,7 @@
# Test diablo
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -321,7 +325,7 @@
# Test essex
change.branch = 'stable/essex'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -505,6 +509,7 @@
def test_job_inheritance_job_tree(self):
tenant = model.Tenant('tenant')
layout = model.Layout(tenant)
+
tpc = model.TenantProjectConfig(self.project)
tenant.addUntrustedProject(tpc)
@@ -539,7 +544,11 @@
})
layout.addJob(python27diablo)
- project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+ project_template_parser = configloader.ProjectTemplateParser(
+ tenant, layout)
+ project_parser = configloader.ProjectParser(
+ tenant, layout, project_template_parser)
+ project_config = project_parser.fromYaml([{
'_source_context': self.context,
'_start_mark': self.start_mark,
'name': 'project',
@@ -554,7 +563,7 @@
change = model.Change(self.project)
change.branch = 'master'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -568,7 +577,7 @@
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -609,7 +618,11 @@
})
layout.addJob(python27)
- project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+ project_template_parser = configloader.ProjectTemplateParser(
+ tenant, layout)
+ project_parser = configloader.ProjectParser(
+ tenant, layout, project_template_parser)
+ project_config = project_parser.fromYaml([{
'_source_context': self.context,
'_start_mark': self.start_mark,
'name': 'project',
@@ -625,7 +638,7 @@
change.branch = 'master'
change.files = ['/COMMIT_MSG', 'ignored-file']
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertFalse(python27.changeMatches(change))
@@ -682,8 +695,12 @@
context2 = model.SourceContext(project2, 'master',
'test', True)
- project2_config = configloader.ProjectParser.fromYaml(
- self.tenant, self.layout, [{
+ project_template_parser = configloader.ProjectTemplateParser(
+ self.tenant, self.layout)
+ project_parser = configloader.ProjectParser(
+ self.tenant, self.layout, project_template_parser)
+ project2_config = project_parser.fromYaml(
+ [{
'_source_context': context2,
'_start_mark': self.start_mark,
'name': 'project2',
@@ -700,7 +717,7 @@
# Test master
change.branch = 'master'
item = self.queue.enqueueChange(change)
- item.current_build_set.layout = self.layout
+ item.layout = self.layout
with testtools.ExpectedException(
Exception,
"Project project2 is not allowed to run job job"):
@@ -718,8 +735,12 @@
self.layout.addJob(job)
- project_config = configloader.ProjectParser.fromYaml(
- self.tenant, self.layout, [{
+ project_template_parser = configloader.ProjectTemplateParser(
+ self.tenant, self.layout)
+ project_parser = configloader.ProjectParser(
+ self.tenant, self.layout, project_template_parser)
+ project_config = project_parser.fromYaml(
+ [{
'_source_context': self.context,
'_start_mark': self.start_mark,
'name': 'project',
@@ -736,7 +757,7 @@
# Test master
change.branch = 'master'
item = self.queue.enqueueChange(change)
- item.current_build_set.layout = self.layout
+ item.layout = self.layout
with testtools.ExpectedException(
Exception,
"Pre-review pipeline gate does not allow post-review job"):
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d3f9ddb 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -29,6 +29,7 @@
def setUp(self):
super(TestNodepool, self).setUp()
+ self.statsd = None
self.zk_chroot_fixture = self.useFixture(
ChrootedKazooFixture(self.id()))
self.zk_config = '%s:%s%s' % (
@@ -76,7 +77,7 @@
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request.id)
nodeset = request.nodeset
for node in nodeset.getNodes():
@@ -125,3 +126,47 @@
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)
+
+ def test_accept_nodes_resubmitted(self):
+ # Test that a resubmitted request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Accept the nodes, passing a different ID
+ self.nodepool.acceptNodes(request, "invalid")
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
+
+ def test_accept_nodes_lost_request(self):
+ # Test that a lost request would not lock nodes
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ request = self.nodepool.requestNodes(None, job)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ self.zk.deleteNodeRequest(request)
+
+ # Accept the nodes
+ self.nodepool.acceptNodes(request, request.id)
+ nodeset = request.nodeset
+
+ for node in nodeset.getNodes():
+ self.assertIsNone(node.lock)
+ self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 2dcd9bf..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -89,25 +89,44 @@
self.assertEqual(self.getJobFromHistory('project-test2').node,
'label1')
+ for stat in self.statsd.stats:
+ k, v = stat.decode('utf-8').split(':')
+ self.log.debug('stat %s:%s', k, v)
# TODOv3(jeblair): we may want to report stats by tenant (also?).
# Per-driver
self.assertReportedStat('zuul.event.gerrit.comment-added', value='1|c')
# Per-driver per-connection
self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.current_changes',
- value='1|g')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
- value='1|c')
- self.assertReportedStat('zuul.pipeline.gate.resident_time', kind='ms')
- self.assertReportedStat('zuul.pipeline.gate.total_changes',
- value='1|c')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.resident_time', kind='ms')
+ 'zuul.tenant.tenant-one.pipeline.gate.current_changes',
+ value='1|g')
self.assertReportedStat(
- 'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.job.project-merge.SUCCESS', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.total_changes', value='1|c')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.resident_time', kind='ms')
+ self.assertReportedStat(
+ 'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+ 'org_project.master.total_changes', value='1|c')
+ exec_key = 'zuul.executor.%s' % self.executor_server.hostname
+ self.assertReportedStat(exec_key + '.builds', value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested', value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested.label.label1',
+ value='1|c')
+ self.assertReportedStat('zuul.nodepool.fulfilled.label.label1',
+ value='1|c')
+ self.assertReportedStat('zuul.nodepool.requested.size.1', value='1|c')
+ self.assertReportedStat('zuul.nodepool.fulfilled.size.1', value='1|c')
+ self.assertReportedStat('zuul.nodepool.current_requests', value='1|g')
for build in self.history:
self.assertTrue(build.parameters['zuul']['voting'])
@@ -1494,6 +1513,31 @@
held_nodes += 1
self.assertEqual(held_nodes, 1)
+ @simple_layout('layouts/autohold.yaml')
+ def test_autohold_list(self):
+ client = zuul.rpcclient.RPCClient('127.0.0.1',
+ self.gearman_server.port)
+ self.addCleanup(client.shutdown)
+
+ r = client.autohold('tenant-one', 'org/project', 'project-test2',
+ "reason text", 1)
+ self.assertTrue(r)
+
+ autohold_requests = client.autohold_list()
+ self.assertNotEqual({}, autohold_requests)
+ self.assertEqual(1, len(autohold_requests.keys()))
+
+ # The single dict key should be a CSV string value
+ key = list(autohold_requests.keys())[0]
+ tenant, project, job = key.split(',')
+
+ self.assertEqual('tenant-one', tenant)
+ self.assertIn('org/project', project)
+ self.assertEqual('project-test2', job)
+
+ # Note: the value is converted from set to list by json.
+ self.assertEqual([1, "reason text"], autohold_requests[key])
+
@simple_layout('layouts/three-projects.yaml')
def test_dependent_behind_dequeue(self):
# This particular test does a large amount of merges and needs a little
@@ -2143,8 +2187,7 @@
def test_statsd(self):
"Test each of the statsd methods used in the scheduler"
- import extras
- statsd = extras.try_import('statsd.statsd')
+ statsd = self.sched.statsd
statsd.incr('test-incr')
statsd.timing('test-timing', 3)
statsd.gauge('test-gauge', 12)
@@ -2269,6 +2312,58 @@
self.assertEqual(set(['project-test-nomatch-starts-empty',
'project-test-nomatch-starts-full']), run_jobs)
+ @simple_layout('layouts/job-vars.yaml')
+ def test_inherited_job_variables(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='parentjob', result='SUCCESS'),
+ dict(name='child1', result='SUCCESS'),
+ dict(name='child2', result='SUCCESS'),
+ dict(name='child3', result='SUCCESS'),
+ ], ordered=False)
+ j = self.getJobFromHistory('parentjob')
+ rp = set([p['name'] for p in j.parameters['projects']])
+ self.assertEqual(j.parameters['vars']['override'], 0)
+ self.assertEqual(j.parameters['vars']['child1override'], 0)
+ self.assertEqual(j.parameters['vars']['parent'], 0)
+ self.assertFalse('child1' in j.parameters['vars'])
+ self.assertFalse('child2' in j.parameters['vars'])
+ self.assertFalse('child3' in j.parameters['vars'])
+ self.assertEqual(rp, set(['org/project', 'org/project0',
+ 'org/project0']))
+ j = self.getJobFromHistory('child1')
+ rp = set([p['name'] for p in j.parameters['projects']])
+ self.assertEqual(j.parameters['vars']['override'], 1)
+ self.assertEqual(j.parameters['vars']['child1override'], 1)
+ self.assertEqual(j.parameters['vars']['parent'], 0)
+ self.assertEqual(j.parameters['vars']['child1'], 1)
+ self.assertFalse('child2' in j.parameters['vars'])
+ self.assertFalse('child3' in j.parameters['vars'])
+ self.assertEqual(rp, set(['org/project', 'org/project0',
+ 'org/project1']))
+ j = self.getJobFromHistory('child2')
+ rp = set([p['name'] for p in j.parameters['projects']])
+ self.assertEqual(j.parameters['vars']['override'], 2)
+ self.assertEqual(j.parameters['vars']['child1override'], 0)
+ self.assertEqual(j.parameters['vars']['parent'], 0)
+ self.assertFalse('child1' in j.parameters['vars'])
+ self.assertEqual(j.parameters['vars']['child2'], 2)
+ self.assertFalse('child3' in j.parameters['vars'])
+ self.assertEqual(rp, set(['org/project', 'org/project0',
+ 'org/project2']))
+ j = self.getJobFromHistory('child3')
+ rp = set([p['name'] for p in j.parameters['projects']])
+ self.assertEqual(j.parameters['vars']['override'], 3)
+ self.assertEqual(j.parameters['vars']['child1override'], 0)
+ self.assertEqual(j.parameters['vars']['parent'], 0)
+ self.assertFalse('child1' in j.parameters['vars'])
+ self.assertFalse('child2' in j.parameters['vars'])
+ self.assertEqual(j.parameters['vars']['child3'], 3)
+ self.assertEqual(rp, set(['org/project', 'org/project0',
+ 'org/project3']))
+
def test_queue_names(self):
"Test shared change queue names"
tenant = self.sched.abide.tenants.get('tenant-one')
@@ -2411,6 +2506,28 @@
self.assertIn('project-merge', status_jobs[1]['dependencies'])
self.assertIn('project-merge', status_jobs[2]['dependencies'])
+ def test_reconfigure_merge(self):
+ """Test that two reconfigure events are merged"""
+
+ tenant = self.sched.abide.tenants['tenant-one']
+ (trusted, project) = tenant.getProject('org/project')
+
+ self.sched.run_handler_lock.acquire()
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+ self.sched.reconfigureTenant(tenant, project)
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.reconfigureTenant(tenant, project)
+ # The second event should have been combined with the first
+ # so we should still only have one entry.
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.run_handler_lock.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
self.executor_server.hold_jobs_in_build = True
@@ -4544,6 +4661,50 @@
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
+ def test_job_aborted(self):
+ "Test that if a execute server aborts a job, it is run again"
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+
+ # first abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # second abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # third abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ # fourth abort
+ self.builds[0].aborted = True
+ self.executor_server.release('.*-test*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 1)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.history), 7)
+ self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 4)
+ self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
def test_rerun_on_abort(self):
"Test that if a execute server fails to run a job, it is run again"
@@ -4613,6 +4774,78 @@
self.assertIn('project-test1 : SKIPPED', A.messages[1])
self.assertIn('project-test2 : SKIPPED', A.messages[1])
+ def test_nodepool_priority(self):
+ "Test that nodes are requested at the correct priority"
+
+ self.fake_nodepool.paused = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ C.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
+
+ self.waitUntilSettled()
+
+ reqs = self.fake_nodepool.getNodeRequests()
+
+ # The requests come back sorted by oid. Since we have three requests
+ # for the three changes each with a different priority.
+ # Also they get a serial number based on order they were received
+ # so the number on the endof the oid should map to order submitted.
+
+ # * gate first - high priority - change C
+ self.assertEqual(reqs[0]['_oid'], '100-0000000002')
+ self.assertEqual(reqs[0]['node_types'], ['label1'])
+ # * check second - normal priority - change B
+ self.assertEqual(reqs[1]['_oid'], '200-0000000001')
+ self.assertEqual(reqs[1]['node_types'], ['label1'])
+ # * post third - low priority - change A
+ # additionally, the post job defined uses an ubuntu-xenial node,
+ # so we include that check just as an extra verification
+ self.assertEqual(reqs[2]['_oid'], '300-0000000000')
+ self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
+
+ self.fake_nodepool.paused = False
+ self.waitUntilSettled()
+
+ @simple_layout('layouts/multiple-templates.yaml')
+ def test_multiple_project_templates(self):
+ # Test that applying multiple project templates to a project
+ # doesn't alter them when used for a second project.
+ A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ build = self.getJobFromHistory('py27')
+ self.assertEqual(build.parameters['zuul']['jobtags'], [])
+
+ def test_pending_merge_in_reconfig(self):
+ # Test that if we are waiting for an outstanding merge on
+ # reconfiguration that we continue to do so.
+ self.gearman_server.hold_merge_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+ self.waitUntilSettled()
+ # Reconfigure while we still have an outstanding merge job
+ self.sched.reconfigureTenant(self.sched.abide.tenants['tenant-one'],
+ None)
+ self.waitUntilSettled()
+ # Verify the merge job is still running and that the item is
+ # in the pipeline
+ self.assertEqual(len(self.sched.merger.jobs), 1)
+ tenant = self.sched.abide.tenants.get('tenant-one')
+ pipeline = tenant.layout.pipelines['post']
+ self.assertEqual(len(pipeline.getAllItems()), 1)
+ self.gearman_server.hold_merge_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
class TestExecutor(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
@@ -4830,6 +5063,42 @@
self.assertEqual(self.getJobFromHistory('project-test6').result,
'SUCCESS')
+ def test_unimplied_branch_matchers(self):
+ # This tests that there are no implied branch matchers added
+ # by project templates.
+ self.create_branch('org/layered-project', 'stable')
+
+ A = self.fake_gerrit.addFakeChange(
+ 'org/layered-project', 'stable', 'A')
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ print(self.getJobFromHistory('project-test1').
+ parameters['zuul']['_inheritance_path'])
+
+ def test_implied_branch_matchers(self):
+ # This tests that there is an implied branch matcher when a
+ # template is used on an in-repo project pipeline definition.
+ self.create_branch('untrusted-config', 'stable')
+ self.fake_gerrit.addEvent(
+ self.fake_gerrit.getFakeBranchCreatedEvent(
+ 'untrusted-config', 'stable'))
+ self.waitUntilSettled()
+
+ A = self.fake_gerrit.addFakeChange(
+ 'untrusted-config', 'stable', 'A')
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ print(self.getJobFromHistory('project-test1').
+ parameters['zuul']['_inheritance_path'])
+
class TestSchedulerSuccessURL(ZuulTestCase):
tenant_config_file = 'config/success-url/main.yaml'
@@ -5503,7 +5772,7 @@
queue = queue_candidate
break
queue_item = queue.queue[0]
- item_dynamic_layout = queue_item.current_build_set.layout
+ item_dynamic_layout = queue_item.layout
dynamic_test_semaphore = \
item_dynamic_layout.semaphores.get('test-semaphore')
self.assertEqual(dynamic_test_semaphore.max, 1)
@@ -5556,3 +5825,30 @@
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+
+
+class TestSchedulerBranchMatcher(ZuulTestCase):
+
+ @simple_layout('layouts/matcher-test.yaml')
+ def test_job_branch_ignored(self):
+ '''
+ Test that branch matching logic works.
+
+ The 'ignore-branch' job has a branch matcher that is supposed to
+ match every branch except for the 'featureA' branch, so it should
+ not be run on a change to that branch.
+ '''
+ self.create_branch('org/project', 'featureA')
+ A = self.fake_gerrit.addFakeChange('org/project', 'featureA', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.printHistory()
+ self.assertEqual(self.getJobFromHistory('project-test1').result,
+ 'SUCCESS')
+ self.assertJobNotInHistory('ignore-branch')
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2,
+ "A should report start and success")
+ self.assertIn('gate', A.messages[1],
+ "A should transit gate")
diff --git a/tests/unit/test_stack_dump.py b/tests/unit/test_stack_dump.py
index 824e04c..13e83c6 100644
--- a/tests/unit/test_stack_dump.py
+++ b/tests/unit/test_stack_dump.py
@@ -31,4 +31,5 @@
zuul.cmd.stack_dump_handler(signal.SIGUSR2, None)
self.assertIn("Thread", self.log_fixture.output)
+ self.assertIn("MainThread", self.log_fixture.output)
self.assertIn("test_stack_dump_logs", self.log_fixture.output)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 1c633ba..b30d710 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,9 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
+import io
import json
+import logging
import os
import textwrap
+import gc
+import time
+from unittest import skip
import testtools
@@ -151,6 +156,29 @@
self.assertIn('Unable to inherit from final job', A.messages[0])
+class TestBranchVariants(ZuulTestCase):
+ tenant_config_file = 'config/branch-variants/main.yaml'
+
+ def test_branch_variants(self):
+ # Test branch variants of jobs with inheritance
+ self.executor_server.hold_jobs_in_build = True
+ # This creates a new branch with a copy of the config in master
+ self.create_branch('puppet-integration', 'stable')
+ self.fake_gerrit.addEvent(
+ self.fake_gerrit.getFakeBranchCreatedEvent(
+ 'puppet-integration', 'stable'))
+ self.waitUntilSettled()
+
+ A = self.fake_gerrit.addFakeChange('puppet-integration', 'stable', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds[0].parameters['pre_playbooks']), 3)
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+
class TestInRepoConfig(ZuulTestCase):
# A temporary class to hold new tests while others are disabled
@@ -170,6 +198,39 @@
self.assertIn('tenant-one-gate', A.messages[1],
"A should transit tenant-one gate")
+ @skip("This test is useful, but not reliable")
+ def test_full_and_dynamic_reconfig(self):
+ self.executor_server.hold_jobs_in_build = True
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project
+ tenant-one-gate:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ gc.collect()
+ pipelines = [obj for obj in gc.get_objects()
+ if isinstance(obj, zuul.model.Pipeline)]
+ self.assertEqual(len(pipelines), 4)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
def test_dynamic_config(self):
in_repo_conf = textwrap.dedent(
"""
@@ -696,6 +757,47 @@
self.assertIn('the only project definition permitted', A.messages[0],
"A should have a syntax error reported")
+ def test_untrusted_depends_on_trusted(self):
+ with open(os.path.join(FIXTURE_DIR,
+ 'config/in-repo/git/',
+ 'common-config/zuul.yaml')) as f:
+ common_config = f.read()
+
+ common_config += textwrap.dedent(
+ """
+ - job:
+ name: project-test9
+ """)
+
+ file_dict = {'zuul.yaml': common_config}
+ A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+ files=file_dict)
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test9
+ """)
+
+ file_dict = {'zuul.yaml': in_repo_conf}
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ files=file_dict)
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 1,
+ "B should report failure")
+ self.assertIn('depends on a change to a config project',
+ B.messages[0],
+ "A should have a syntax error reported")
+
def test_duplicate_node_error(self):
in_repo_conf = textwrap.dedent(
"""
@@ -816,6 +918,150 @@
A.messages[0],
"A should have a syntax error reported")
+ def test_job_list_in_project_template_not_dict_error(self):
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project-template:
+ name: some-jobs
+ check:
+ jobs:
+ - project-test1:
+ - required-projects:
+ org/project2
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertIn('expected str for dictionary value',
+ A.messages[0], "A should have a syntax error reported")
+
+ def test_job_list_in_project_not_dict_error(self):
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project:
+ name: org/project1
+ check:
+ jobs:
+ - project-test1:
+ - required-projects:
+ org/project2
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertIn('expected str for dictionary value',
+ A.messages[0], "A should have a syntax error reported")
+
+ def test_project_template(self):
+ # Tests that a project template is not modified when used, and
+ # can therefore be used in subsequent reconfigurations.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+ - project-template:
+ name: some-jobs
+ tenant-one-gate:
+ jobs:
+ - project-test1:
+ required-projects:
+ - org/project1
+ - project:
+ name: org/project
+ templates:
+ - some-jobs
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ in_repo_conf = textwrap.dedent(
+ """
+ - project:
+ name: org/project1
+ templates:
+ - some-jobs
+ """)
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
+ files=file_dict)
+ B.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertEqual(B.data['status'], 'MERGED')
+
+ def test_job_remove_add(self):
+ # Tests that a job can be removed from one repo and added in another.
+ # First, remove the current config for project1 since it
+ # references the job we want to remove.
+ file_dict = {'.zuul.yaml': None}
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
+ files=file_dict)
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ # Then propose a change to delete the job from one repo...
+ file_dict = {'.zuul.yaml': None}
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ files=file_dict)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ # ...and a second that depends on it that adds it to another repo.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: project-test1
+
+ - project:
+ name: org/project1
+ check:
+ jobs:
+ - project-test1
+ """)
+ in_repo_playbook = textwrap.dedent(
+ """
+ - hosts: all
+ tasks: []
+ """)
+ file_dict = {'.zuul.yaml': in_repo_conf,
+ 'playbooks/project-test1.yaml': in_repo_playbook}
+ C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C',
+ files=file_dict,
+ parent='refs/changes/1/1/1')
+ C.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ C.subject, B.data['id'])
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+ ], ordered=False)
+
def test_multi_repo(self):
downstream_repo_conf = textwrap.dedent(
"""
@@ -1215,6 +1461,40 @@
"The file %s should exist" % post_flag_path)
+class TestPostPlaybooks(AnsibleZuulTestCase):
+ tenant_config_file = 'config/post-playbook/main.yaml'
+
+ def test_post_playbook_abort(self):
+ # Test that when we abort a job in the post playbook, that we
+ # don't send back POST_FAILURE.
+ self.executor_server.verbose = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ while not len(self.builds):
+ time.sleep(0.1)
+ build = self.builds[0]
+
+ post_start = os.path.join(self.test_root, build.uuid +
+ '.post_start.flag')
+ start = time.time()
+ while time.time() < start + 90:
+ if os.path.exists(post_start):
+ break
+ time.sleep(0.1)
+ # The post playbook has started, abort the job
+ self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+ self.waitUntilSettled()
+
+ build = self.getJobFromHistory('python27')
+ self.assertEqual('ABORTED', build.result)
+
+ post_end = os.path.join(self.test_root, build.uuid +
+ '.post_end.flag')
+ self.assertTrue(os.path.exists(post_start))
+ self.assertFalse(os.path.exists(post_end))
+
+
class TestBrokenConfig(ZuulTestCase):
# Test that we get an appropriate syntax error if we start with a
# broken config.
@@ -1665,7 +1945,8 @@
return f.read()
def test_job_output(self):
- # Verify that command standard output appears in the job output
+ # Verify that command standard output appears in the job output,
+ # and that failures in the final playbook get logged.
# This currently only verifies we receive output from
# localhost. Notably, it does not verify we receive output
@@ -1690,3 +1971,36 @@
self.assertIn(token,
self._get_file(self.history[0],
'work/logs/job-output.txt'))
+
+ def test_job_output_failure_log(self):
+ logger = logging.getLogger('zuul.AnsibleJob')
+ output = io.StringIO()
+ logger.addHandler(logging.StreamHandler(output))
+
+ # Verify that a failure in the last post playbook emits the contents
+ # of the json output to the log
+ self.executor_server.keep_jobdir = True
+ A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='job-output-failure',
+ result='POST_FAILURE', changes='1,1'),
+ ], ordered=False)
+
+ token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+ j = json.loads(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertEqual(token,
+ j[0]['plays'][0]['tasks'][0]
+ ['hosts']['localhost']['stdout'])
+
+ print(self._get_file(self.history[0],
+ 'work/logs/job-output.json'))
+ self.assertIn(token,
+ self._get_file(self.history[0],
+ 'work/logs/job-output.txt'))
+
+ log_output = output.getvalue()
+ self.assertIn('Final playbook failed', log_output)
+ self.assertIn('Failure test', log_output)
diff --git a/tools/run-migration.sh b/tools/run-migration.sh
index be297f4..618fc56 100755
--- a/tools/run-migration.sh
+++ b/tools/run-migration.sh
@@ -47,12 +47,12 @@
BASE_DIR=$(cd $(dirname $0)/../..; pwd)
cd $BASE_DIR/project-config
-if [[ $FINAL ]] ; then
+if [[ $FINAL = 1 ]] ; then
git reset --hard
fi
python3 $BASE_DIR/zuul/zuul/cmd/migrate.py --mapping=zuul/mapping.yaml \
zuul/layout.yaml jenkins/jobs nodepool/nodepool.yaml . $VERBOSE
-if [[ $FINAL ]] ; then
+if [[ $FINAL = 1 ]] ; then
find ../openstack-zuul-jobs/playbooks/legacy -maxdepth 1 -mindepth 1 \
-type d | xargs rm -rf
mv zuul.d/zuul-legacy-* ../openstack-zuul-jobs/zuul.d/
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 8b854c7..d258354 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -20,14 +20,15 @@
parser = argparse.ArgumentParser()
parser.add_argument('url', help='The URL of the running Zuul instance')
-parser.add_argument('pipeline_name', help='The name of the Zuul pipeline')
+parser.add_argument('tenant', help='The Zuul tenant')
+parser.add_argument('pipeline', help='The name of the Zuul pipeline')
options = parser.parse_args()
data = urllib2.urlopen('%s/status.json' % options.url).read()
data = json.loads(data)
for pipeline in data['pipelines']:
- if pipeline['name'] != options.pipeline_name:
+ if pipeline['name'] != options.pipeline:
continue
for queue in pipeline['change_queues']:
for head in queue['heads']:
@@ -36,9 +37,10 @@
continue
cid, cps = change['id'].split(',')
print(
- "zuul enqueue --trigger gerrit --pipeline %s "
- "--project %s --change %s,%s" % (
- options.pipeline_name,
+ "zuul enqueue --tenant %s --trigger gerrit "
+ "--pipeline %s --project %s --change %s,%s" % (
+ options.tenant,
+ options.pipeline,
change['project'],
cid, cps)
)
diff --git a/tox.ini b/tox.ini
index cc5ea58..7e84677 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,10 +5,7 @@
[testenv]
basepython = python3
-# Set STATSD env variables so that statsd code paths are tested.
-setenv = STATSD_HOST=127.0.0.1
- STATSD_PORT=8125
- VIRTUAL_ENV={envdir}
+setenv = VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=120
passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE OS_LOG_DEFAULTS
usedevelop = True
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 0a266df..8ba3b86 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -168,32 +168,8 @@
host=host.name,
filename=included_file._filename))
- def _emit_playbook_banner(self):
- # Get the hostvars from just one host - the vars we're looking for will
- # be identical on all of them
- hostvars = next(iter(self._play._variable_manager._hostvars.values()))
- self._playbook_name = None
-
- phase = hostvars.get('zuul_execution_phase', '')
- playbook = hostvars.get('zuul_execution_canonical_name_and_path')
- trusted = hostvars.get('zuul_execution_trusted')
- trusted = 'trusted' if trusted == "True" else 'untrusted'
- branch = hostvars.get('zuul_execution_branch')
-
- if phase and phase != 'run':
- phase = '{phase}-run'.format(phase=phase)
- phase = phase.upper()
-
- self._log("{phase} [{trusted} : {playbook}@{branch}]".format(
- trusted=trusted, phase=phase, playbook=playbook, branch=branch))
-
def v2_playbook_on_play_start(self, play):
self._play = play
-
- # We can't fill in this information until the first play
- if self._playbook_name:
- self._emit_playbook_banner()
-
# Log an extra blank line to get space before each play
self._log("")
diff --git a/zuul/ansible/filter/zuul_filters.py b/zuul/ansible/filter/zuul_filters.py
index 17ef2bb..fa21f6b 100644
--- a/zuul/ansible/filter/zuul_filters.py
+++ b/zuul/ansible/filter/zuul_filters.py
@@ -14,10 +14,19 @@
def zuul_legacy_vars(zuul):
- # omitted:
- # ZUUL_URL
- # ZUUL_REF
+ # intentionally omitted:
+ # BASE_LOG_PATH
+ # JOB_TAGS
+ # LOG_PATH
# ZUUL_COMMIT
+ # ZUUL_REF
+ # ZUUL_URL
+ #
+ # newly added to all builds:
+ # ZUUL_SHORT_PROJECT_NAME
+ #
+ # existing in most builds but newly added for periodic:
+ # ZUUL_BRANCH
short_name = zuul['project']['name'].split('/')[-1]
params = dict(ZUUL_UUID=zuul['build'],
@@ -26,6 +35,8 @@
ZUUL_PIPELINE=zuul['pipeline'],
ZUUL_VOTING=zuul['voting'],
WORKSPACE='/home/zuul/workspace')
+ if 'timeout' in zuul and zuul['timeout'] is not None:
+ params['BUILD_TIMEOUT'] = str(int(zuul['timeout']) * 1000)
if 'branch' in zuul:
params['ZUUL_BRANCH'] = zuul['branch']
@@ -34,7 +45,7 @@
['%s:%s:refs/changes/%s/%s/%s' % (
i['project']['name'],
i['branch'],
- str(i['change'])[:-2:],
+ str(i['change'])[-2:],
i['change'],
i['patchset'])
for i in zuul['items']])
diff --git a/zuul/ansible/paths.py b/zuul/ansible/paths.py
index 04daef4..05f9fdc 100644
--- a/zuul/ansible/paths.py
+++ b/zuul/ansible/paths.py
@@ -24,7 +24,7 @@
def _is_safe_path(path):
full_path = os.path.realpath(os.path.abspath(os.path.expanduser(path)))
- if not full_path.startswith(os.path.abspath(os.path.curdir)):
+ if not full_path.startswith(os.path.abspath(os.path.expanduser('~'))):
return False
return True
diff --git a/zuul/change_matcher.py b/zuul/change_matcher.py
index baea217..7f6673d 100644
--- a/zuul/change_matcher.py
+++ b/zuul/change_matcher.py
@@ -60,11 +60,13 @@
class BranchMatcher(AbstractChangeMatcher):
def matches(self, change):
- return (
- (hasattr(change, 'branch') and self.regex.match(change.branch)) or
- (hasattr(change, 'ref') and
- change.ref is not None and self.regex.match(change.ref))
- )
+ if hasattr(change, 'branch'):
+ if self.regex.match(change.branch):
+ return True
+ return False
+ if self.regex.match(change.ref):
+ return True
+ return False
class FileMatcher(AbstractChangeMatcher):
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index a5db9a6..86f7f12 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,8 +23,10 @@
import signal
import sys
import traceback
+import threading
yappi = extras.try_import('yappi')
+objgraph = extras.try_import('objgraph')
from zuul.ansible import logconfig
import zuul.lib.connections
@@ -37,23 +39,50 @@
def stack_dump_handler(signum, frame):
signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- log_str = ""
- for thread_id, stack_frame in sys._current_frames().items():
- log_str += "Thread: %s\n" % thread_id
- log_str += "".join(traceback.format_stack(stack_frame))
log = logging.getLogger("zuul.stack_dump")
- log.debug(log_str)
- if yappi:
- if not yappi.is_running():
- yappi.start()
- else:
- yappi.stop()
- yappi_out = io.BytesIO()
- yappi.get_func_stats().print_all(out=yappi_out)
- yappi.get_thread_stats().print_all(out=yappi_out)
- log.debug(yappi_out.getvalue())
- yappi_out.close()
- yappi.clear_stats()
+ log.debug("Beginning debug handler")
+ try:
+ threads = {}
+ for t in threading.enumerate():
+ threads[t.ident] = t
+ log_str = ""
+ for thread_id, stack_frame in sys._current_frames().items():
+ thread = threads.get(thread_id)
+ if thread:
+ thread_name = thread.name
+ else:
+ thread_name = thread.ident
+ log_str += "Thread: %s %s\n" % (thread_id, thread_name)
+ log_str += "".join(traceback.format_stack(stack_frame))
+ log.debug(log_str)
+ except Exception:
+ log.exception("Thread dump error:")
+ try:
+ if yappi:
+ if not yappi.is_running():
+ log.debug("Starting Yappi")
+ yappi.start()
+ else:
+ log.debug("Stopping Yappi")
+ yappi.stop()
+ yappi_out = io.StringIO()
+ yappi.get_func_stats().print_all(out=yappi_out)
+ yappi.get_thread_stats().print_all(out=yappi_out)
+ log.debug(yappi_out.getvalue())
+ yappi_out.close()
+ yappi.clear_stats()
+ except Exception:
+ log.exception("Yappi error:")
+ try:
+ if objgraph:
+ log.debug("Most common types:")
+ objgraph_out = io.StringIO()
+ objgraph.show_growth(limit=100, file=objgraph_out)
+ log.debug(objgraph_out.getvalue())
+ objgraph_out.close()
+ except Exception:
+ log.exception("Objgraph error:")
+ log.debug("End debug handler")
signal.signal(signal.SIGUSR2, stack_dump_handler)
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 177283e..c9e399a 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -61,6 +61,10 @@
required=False, type=int, default=1)
cmd_autohold.set_defaults(func=self.autohold)
+ cmd_autohold_list = subparsers.add_parser(
+ 'autohold-list', help='list autohold requests')
+ cmd_autohold_list.set_defaults(func=self.autohold_list)
+
cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
cmd_enqueue.add_argument('--tenant', help='tenant name',
required=True)
@@ -162,6 +166,27 @@
count=self.args.count)
return r
+ def autohold_list(self):
+ client = zuul.rpcclient.RPCClient(
+ self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
+ autohold_requests = client.autohold_list()
+
+ if len(autohold_requests.keys()) == 0:
+ print("No autohold requests found")
+ return True
+
+ table = prettytable.PrettyTable(
+ field_names=['Tenant', 'Project', 'Job', 'Count', 'Reason'])
+
+ for key, value in autohold_requests.items():
+ # The key comes to us as a CSV string because json doesn't like
+ # non-str keys.
+ tenant_name, project_name, job_name = key.split(',')
+ count, reason = value
+ table.add_row([tenant_name, project_name, job_name, count, reason])
+ print(table)
+ return True
+
def enqueue(self):
client = zuul.rpcclient.RPCClient(
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index 63c621d..70c80c5 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -22,6 +22,7 @@
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+import grp
import logging
import os
import pwd
@@ -101,7 +102,10 @@
if os.getuid() != 0:
return
pw = pwd.getpwnam(self.user)
- os.setgroups([])
+ # get a list of supplementary groups for the target user, and make sure
+ # we set them when dropping privileges.
+ groups = [g.gr_gid for g in grp.getgrall() if self.user in g.gr_mem]
+ os.setgroups(groups)
os.setgid(pw.pw_gid)
os.setuid(pw.pw_uid)
os.umask(0o022)
diff --git a/zuul/cmd/migrate.py b/zuul/cmd/migrate.py
index 1f0a602..efb2796 100644
--- a/zuul/cmd/migrate.py
+++ b/zuul/cmd/migrate.py
@@ -42,6 +42,9 @@
import jenkins_jobs.parser
import yaml
+JOB_MATCHERS = {} # type: Dict[str, Dict[str, Dict]]
+TEMPLATES_TO_EXPAND = {} # type: Dict[str, List]
+JOBS_FOR_EXPAND = collections.defaultdict(dict) # type: ignore
JOBS_BY_ORIG_TEMPLATE = {} # type: ignore
SUFFIXES = [] # type: ignore
ENVIRONMENT = '{{ zuul | zuul_legacy_vars }}'
@@ -186,6 +189,37 @@
return
+def normalize_project_expansions():
+ remove_from_job_matchers = []
+ template = None
+ # First find the matchers that are the same for all jobs
+ for job_name, project in copy.deepcopy(JOBS_FOR_EXPAND).items():
+ JOB_MATCHERS[job_name] = None
+ for project_name, expansion in project.items():
+ template = expansion['template']
+ if not JOB_MATCHERS[job_name]:
+ JOB_MATCHERS[job_name] = copy.deepcopy(expansion['info'])
+ else:
+ if JOB_MATCHERS[job_name] != expansion['info']:
+ # We have different expansions for this job, it can't be
+ # done at the job level
+ remove_from_job_matchers.append(job_name)
+
+ for job_name in remove_from_job_matchers:
+ JOB_MATCHERS.pop(job_name, None)
+
+ # Second, find out which projects need to expand a given template
+ for job_name, project in copy.deepcopy(JOBS_FOR_EXPAND).items():
+ # There is a job-level expansion for this one
+ if job_name in JOB_MATCHERS.keys():
+ continue
+ for project_name, expansion in project.items():
+ TEMPLATES_TO_EXPAND[project_name] = []
+ if expansion['info']:
+ # There is an expansion for this project
+ TEMPLATES_TO_EXPAND[project_name].append(expansion['template'])
+
+
# from :
# http://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data flake8: noqa
def should_use_block(value):
@@ -885,7 +919,7 @@
elif 'openstack/puppet-openstack-integration' in expanded_projects:
output['parent'] = 'legacy-puppet-openstack-integration'
elif has_artifacts:
- output['parent'] = 'publish-openstack-artifacts'
+ output['parent'] = 'legacy-publish-openstack-artifacts'
elif has_draft:
output['success-url'] = 'html/'
output['run'] = os.path.join(self.job_path, 'run')
@@ -898,8 +932,6 @@
timeout = self.getTimeout()
if timeout:
output['timeout'] = timeout
- output.setdefault('vars', {})
- output['vars']['BUILD_TIMEOUT'] = str(timeout * 1000)
if self.nodes:
if len(self.nodes) == 1:
@@ -910,6 +942,14 @@
if expanded_projects:
output['required-projects'] = sorted(list(set(expanded_projects)))
+ if self.name in JOB_MATCHERS:
+ for k, v in JOB_MATCHERS[self.name].items():
+ if k in output:
+ self.log.error(
+ 'Job %s has attributes directly and from matchers',
+ self.name)
+ output[k] = v
+
return output
def toPipelineDict(self):
@@ -1345,7 +1385,7 @@
for pipeline, value in template.items():
if pipeline == 'name':
continue
- if pipeline not in project:
+ if pipeline not in project or 'jobs' not in project[pipeline]:
project[pipeline] = dict(jobs=[])
project[pipeline]['jobs'].extend(value['jobs'])
@@ -1355,7 +1395,7 @@
return job.orig
return None
- def applyProjectMatchers(self, matchers, project):
+ def applyProjectMatchers(self, matchers, project, final=False):
'''
Apply per-project job matchers to the given project.
@@ -1373,7 +1413,8 @@
self.log.debug(
"Applied irrelevant-files to job %s in project %s",
job, project['name'])
- job = {job: {'irrelevant-files': list(set(files))}}
+ job = {job: {'irrelevant-files':
+ sorted(list(set(files)))}}
elif isinstance(job, dict):
job = job.copy()
job_name = get_single_key(job)
@@ -1387,8 +1428,8 @@
if 'irrelevant-files' not in extras:
extras['irrelevant-files'] = []
extras['irrelevant-files'].extend(files)
- extras['irrelevant-files'] = list(
- set(extras['irrelevant-files']))
+ extras['irrelevant-files'] = sorted(list(
+ set(extras['irrelevant-files'])))
job[job_name] = extras
new_jobs.append(job)
return new_jobs
@@ -1398,17 +1439,61 @@
if k in ('templates', 'name'):
continue
project[k]['jobs'] = processPipeline(
- project[k]['jobs'], job_name_regex, files)
+ project[k].get('jobs', []), job_name_regex, files)
- for matcher in matchers:
- # find the project-specific section
- for skipper in matcher.get('skip-if', []):
- if skipper.get('project'):
- if re.search(skipper['project'], project['name']):
- if 'all-files-match-any' in skipper:
- applyIrrelevantFiles(
- matcher['name'],
- skipper['all-files-match-any'])
+ if matchers:
+ for matcher in matchers:
+ # find the project-specific section
+ for skipper in matcher.get('skip-if', []):
+ if skipper.get('project'):
+ if re.search(skipper['project'], project['name']):
+ if 'all-files-match-any' in skipper:
+ applyIrrelevantFiles(
+ matcher['name'],
+ skipper['all-files-match-any'])
+
+ if not final:
+ return
+
+ for k, v in project.items():
+ if k in ('templates', 'name'):
+ continue
+ jobs = []
+ for job in project[k].get('jobs', []):
+ if isinstance(job, dict):
+ job_name = get_single_key(job)
+ else:
+ job_name = job
+ if job_name in JOB_MATCHERS:
+ jobs.append(job)
+ continue
+ orig_name = self.getOldJobName(job_name)
+ if not orig_name:
+ jobs.append(job)
+ continue
+ orig_name = orig_name.format(
+ name=project['name'].split('/')[1])
+ info = {}
+ for layout_job in self.mapping.layout.get('jobs', []):
+ if 'parameter-function' in layout_job:
+ continue
+ if 'skip-if' in layout_job:
+ continue
+ if re.search(layout_job['name'], orig_name):
+ if not layout_job.get('voting', True):
+ info['voting'] = False
+ if layout_job.get('branch'):
+ info['branches'] = layout_job['branch']
+ if layout_job.get('files'):
+ info['files'] = layout_job['files']
+ if not isinstance(job, dict):
+ job = {job: info}
+ else:
+ job[job_name].update(info)
+
+ jobs.append(job)
+ if jobs:
+ project[k]['jobs'] = jobs
def writeProject(self, project):
'''
@@ -1423,12 +1508,7 @@
if 'name' in project:
new_project['name'] = project['name']
- job_matchers = self.scanForProjectMatchers(project['name'])
- if job_matchers:
- exp_template_names = self.findReferencedTemplateNames(
- job_matchers, project['name'])
- else:
- exp_template_names = []
+ exp_template_names = TEMPLATES_TO_EXPAND.get(project['name'], [])
templates_to_expand = []
if 'template' in project:
@@ -1447,6 +1527,51 @@
new_project[key] = collections.OrderedDict()
if key == 'gate':
for queue in self.change_queues:
+ if (project['name'] not in queue.getProjects() or
+ len(queue.getProjects()) == 1):
+ continue
+ new_project[key]['queue'] = queue.name
+ tmp = [job for job in self.makeNewJobs(value)]
+ # Don't insert into self.job_objects - that was done
+ # in the speculative pass
+ jobs = [job.toPipelineDict() for job in tmp]
+ if jobs:
+ new_project[key]['jobs'] = jobs
+ if not new_project[key]:
+ del new_project[key]
+
+ for name in templates_to_expand:
+ self.expandTemplateIntoProject(name, new_project)
+
+ job_matchers = self.scanForProjectMatchers(project['name'])
+
+ # Need a deep copy after expansion, else our templates end up
+ # also getting this change.
+ new_project = copy.deepcopy(new_project)
+ self.applyProjectMatchers(job_matchers, new_project, final=True)
+
+ return new_project
+
+ def checkSpeculativeProject(self, project):
+ '''
+ Create a new v3 project definition expanding all templates.
+ '''
+ new_project = collections.OrderedDict()
+ if 'name' in project:
+ new_project['name'] = project['name']
+
+ templates_to_expand = []
+ for template in project.get('template', []):
+ templates_to_expand.append(template['name'])
+
+ # We have to do this section to expand self.job_objects
+ for key, value in project.items():
+ if key in ('name', 'template'):
+ continue
+ else:
+ new_project[key] = collections.OrderedDict()
+ if key == 'gate':
+ for queue in self.change_queues:
if project['name'] not in queue.getProjects():
continue
if len(queue.getProjects()) == 1:
@@ -1454,18 +1579,60 @@
new_project[key]['queue'] = queue.name
tmp = [job for job in self.makeNewJobs(value)]
self.job_objects.extend(tmp)
- jobs = [job.toPipelineDict() for job in tmp]
- new_project[key]['jobs'] = jobs
for name in templates_to_expand:
- self.expandTemplateIntoProject(name, new_project)
- # Need a deep copy after expansion, else our templates end up
- # also getting this change.
- new_project = copy.deepcopy(new_project)
- self.applyProjectMatchers(job_matchers, new_project)
+ expand_project = copy.deepcopy(new_project)
+ self.expandTemplateIntoProject(name, expand_project)
- return new_project
+ # Need a deep copy after expansion, else our templates end up
+ # also getting this change.
+ expand_project = copy.deepcopy(expand_project)
+ job_matchers = self.scanForProjectMatchers(project['name'])
+ self.applyProjectMatchers(job_matchers, expand_project)
+
+ # We should now have a project-pipeline with only the
+ # jobs expanded from this one template
+ for project_part in expand_project.values():
+ # The pipelines are dicts - we only want pipelines
+ if isinstance(project_part, dict):
+ if 'jobs' not in project_part:
+ continue
+ self.processProjectTemplateExpansion(
+ project_part, project, name)
+
+ def processProjectTemplateExpansion(self, project_part, project, template):
+ # project_part should be {'jobs': []}
+ job_list = project_part['jobs']
+ for new_job in job_list:
+ if isinstance(new_job, dict):
+ new_job_name = get_single_key(new_job)
+ info = new_job[new_job_name]
+ else:
+ new_job_name = new_job
+ info = None
+ orig_name = self.getOldJobName(new_job_name)
+ if not orig_name:
+ self.log.error("Job without old name: %s", new_job_name)
+ continue
+ orig_name = orig_name.format(name=project['name'].split('/')[1])
+
+ for layout_job in self.mapping.layout.get('jobs', []):
+ if 'parameter-function' in layout_job:
+ continue
+ if re.search(layout_job['name'], orig_name):
+ if not info:
+ info = {}
+ if not layout_job.get('voting', True):
+ info['voting'] = False
+ if layout_job.get('branch'):
+ info['branches'] = layout_job['branch']
+ if layout_job.get('files'):
+ info['files'] = layout_job['files']
+
+ if info:
+ expansion = dict(info=info, template=template)
+ JOBS_FOR_EXPAND[new_job_name][project['name']] = expansion
def writeJobs(self):
output_dir = self.setupDir()
@@ -1487,6 +1654,10 @@
template_config,
key=lambda template: template['project-template']['name'])
+ for project in self.layout.get('projects', []):
+ self.checkSpeculativeProject(project)
+ normalize_project_expansions()
+
project_names = []
for project in self.layout.get('projects', []):
project_names.append(project['name'])
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index a9923c6..d920d8e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -29,6 +29,7 @@
import zuul.cmd
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd_config
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
@@ -97,8 +98,14 @@
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
import zuul.lib.gearserver
- statsd_host = os.environ.get('STATSD_HOST')
- statsd_port = int(os.environ.get('STATSD_PORT', 8125))
+
+ (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(
+ self.config)
+ if statsd_prefix:
+ statsd_prefix += '.zuul.geard'
+ else:
+ statsd_prefix = 'zuul.geard'
+
host = get_default(self.config, 'gearman_server', 'listen_address')
port = int(get_default(self.config, 'gearman_server', 'port',
4730))
@@ -112,7 +119,7 @@
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
- statsd_prefix='zuul.geard')
+ statsd_prefix=statsd_prefix)
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
@@ -154,8 +161,10 @@
zookeeper = zuul.zk.ZooKeeper()
zookeeper_hosts = get_default(self.config, 'zookeeper',
'hosts', '127.0.0.1:2181')
+ zookeeper_timeout = float(get_default(self.config, 'zookeeper',
+ 'session_timeout', 10.0))
- zookeeper.connect(zookeeper_hosts)
+ zookeeper.connect(zookeeper_hosts, timeout=zookeeper_timeout)
cache_expiry = get_default(self.config, 'webapp', 'status_expiry', 1)
listen_address = get_default(self.config, 'webapp', 'listen_address',
diff --git a/zuul/configloader.py b/zuul/configloader.py
index afdf329..426842b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -11,6 +11,7 @@
# under the License.
import base64
+import collections
from contextlib import contextmanager
import copy
import os
@@ -383,57 +384,58 @@
class JobParser(object):
ANSIBLE_ROLE_RE = re.compile(r'^(ansible[-_.+]*)*(role[-_.+]*)*')
- @staticmethod
- def getSchema():
- zuul_role = {vs.Required('zuul'): str,
- 'name': str}
+ zuul_role = {vs.Required('zuul'): str,
+ 'name': str}
- galaxy_role = {vs.Required('galaxy'): str,
- 'name': str}
+ galaxy_role = {vs.Required('galaxy'): str,
+ 'name': str}
- role = vs.Any(zuul_role, galaxy_role)
+ role = vs.Any(zuul_role, galaxy_role)
- job_project = {vs.Required('name'): str,
- 'override-branch': str}
+ job_project = {vs.Required('name'): str,
+ 'override-branch': str}
- secret = {vs.Required('name'): str,
- vs.Required('secret'): str}
+ secret = {vs.Required('name'): str,
+ vs.Required('secret'): str}
- job = {vs.Required('name'): str,
- 'parent': vs.Any(str, None),
- 'final': bool,
- 'failure-message': str,
- 'success-message': str,
- 'failure-url': str,
- 'success-url': str,
- 'hold-following-changes': bool,
- 'voting': bool,
- 'semaphore': str,
- 'tags': to_list(str),
- 'branches': to_list(str),
- 'files': to_list(str),
- 'secrets': to_list(vs.Any(secret, str)),
- 'irrelevant-files': to_list(str),
- # validation happens in NodeSetParser
- 'nodeset': vs.Any(dict, str),
- 'timeout': int,
- 'attempts': int,
- 'pre-run': to_list(str),
- 'post-run': to_list(str),
- 'run': str,
- '_source_context': model.SourceContext,
- '_start_mark': ZuulMark,
- 'roles': to_list(role),
- 'required-projects': to_list(vs.Any(job_project, str)),
- 'vars': dict,
- 'dependencies': to_list(str),
- 'allowed-projects': to_list(str),
- 'override-branch': str,
- 'description': str,
- 'post-review': bool
- }
+ # Attributes of a job that can also be used in Project and ProjectTemplate
+ job_attributes = {'parent': vs.Any(str, None),
+ 'final': bool,
+ 'failure-message': str,
+ 'success-message': str,
+ 'failure-url': str,
+ 'success-url': str,
+ 'hold-following-changes': bool,
+ 'voting': bool,
+ 'semaphore': str,
+ 'tags': to_list(str),
+ 'branches': to_list(str),
+ 'files': to_list(str),
+ 'secrets': to_list(vs.Any(secret, str)),
+ 'irrelevant-files': to_list(str),
+ # validation happens in NodeSetParser
+ 'nodeset': vs.Any(dict, str),
+ 'timeout': int,
+ 'attempts': int,
+ 'pre-run': to_list(str),
+ 'post-run': to_list(str),
+ 'run': str,
+ '_source_context': model.SourceContext,
+ '_start_mark': ZuulMark,
+ 'roles': to_list(role),
+ 'required-projects': to_list(vs.Any(job_project, str)),
+ 'vars': dict,
+ 'dependencies': to_list(str),
+ 'allowed-projects': to_list(str),
+ 'override-branch': str,
+ 'description': str,
+ 'post-review': bool}
- return vs.Schema(job)
+ job_name = {vs.Required('name'): str}
+
+ job = dict(collections.ChainMap(job_name, job_attributes))
+
+ schema = vs.Schema(job)
simple_attributes = [
'final',
@@ -456,58 +458,69 @@
# the reference definition of this job, and this is a project
# repo, add an implicit branch matcher for this branch
# (assuming there are no explicit branch matchers). But only
- # for top-level job definitions and variants.
- # Project-pipeline job variants should more closely attach to
- # their branch if they appear in a project-repo.
+ # for top-level job definitions and variants. Never for
+ # project-templates. They, and in-project project-pipeline
+ # job variants, should more closely attach to their branch if
+ # they appear in a project-repo. That's handled in the
+ # ProjectParser.
if (reference and
reference.source_context and
reference.source_context.branch != job.source_context.branch):
- same_context = False
+ same_branch = False
else:
- same_context = True
+ same_branch = True
if (job.source_context and
(not job.source_context.trusted) and
- ((not same_context) or project_pipeline)):
+ (not project_pipeline) and
+ (not same_branch)):
return [job.source_context.branch]
return None
@staticmethod
- def fromYaml(tenant, layout, conf, project_pipeline=False):
- with configuration_exceptions('job', conf):
- JobParser.getSchema()(conf)
+ def fromYaml(tenant, layout, conf, project_pipeline=False,
+ name=None, validate=True):
+ if validate:
+ with configuration_exceptions('job', conf):
+ JobParser.schema(conf)
+
+ if name is None:
+ name = conf['name']
# NB: The default detection system in the Job class requires
# that we always assign values directly rather than modifying
# them (e.g., "job.run = ..." rather than
# "job.run.append(...)").
- reference = layout.jobs.get(conf['name'], [None])[0]
+ reference = layout.jobs.get(name, [None])[0]
- job = model.Job(conf['name'])
+ job = model.Job(name)
job.source_context = conf.get('_source_context')
+ job.source_line = conf.get('_start_mark').line + 1
- is_variant = layout.hasJob(conf['name'])
- if 'parent' in conf:
- if conf['parent'] is not None:
- # Parent job is explicitly specified, so inherit from it.
- parent = layout.getJob(conf['parent'])
- job.inheritFrom(parent)
+ is_variant = layout.hasJob(name)
+ if not is_variant:
+ if 'parent' in conf:
+ if conf['parent'] is not None:
+ # Parent job is explicitly specified, so inherit from it.
+ parent = layout.getJob(conf['parent'])
+ job.inheritFrom(parent)
+ else:
+ # Parent is explicitly set as None, so user intends
+ # this to be a base job. That's only okay if we're in
+ # a config project.
+ if not conf['_source_context'].trusted:
+ raise Exception(
+ "Base jobs must be defined in config projects")
else:
- # Parent is explicitly set as None, so user intends
- # this to be a base job. That's only okay if we're in
- # a config project.
- if not conf['_source_context'].trusted:
- raise Exception(
- "Base jobs must be defined in config projects")
- else:
- # Parent is not explicitly set, so inherit from the
- # default -- but only if this is the primary definition
- # for the job (ie, not a variant -- variants don't need to
- # have a parent as long as the main job does).
- if not is_variant:
parent = layout.getJob(tenant.default_base_job)
job.inheritFrom(parent)
+ else:
+ if 'parent' in conf:
+ # TODO(jeblair): warn the user that we're ignoring the
+ # parent setting on this variant job definition.
+ pass
+
# Secrets are part of the playbook context so we must establish
# them earlier than playbooks.
secrets = []
@@ -668,10 +681,7 @@
if (not branches) and ('branches' in conf):
branches = as_list(conf['branches'])
if branches:
- matchers = []
- for branch in branches:
- matchers.append(change_matcher.BranchMatcher(branch))
- job.branch_matcher = change_matcher.MatchAny(matchers)
+ job.setBranchMatcher(branches)
if 'files' in conf:
matchers = []
for fn in as_list(conf['files']):
@@ -709,10 +719,13 @@
class ProjectTemplateParser(object):
- log = logging.getLogger("zuul.ProjectTemplateParser")
+ def __init__(self, tenant, layout):
+ self.log = logging.getLogger("zuul.ProjectTemplateParser")
+ self.tenant = tenant
+ self.layout = layout
+ self.schema = self.getSchema()
- @staticmethod
- def getSchema(layout):
+ def getSchema(self):
project_template = {
vs.Required('name'): str,
'description': str,
@@ -723,47 +736,41 @@
'_start_mark': ZuulMark,
}
- for p in layout.pipelines.values():
- project_template[p.name] = {'queue': str,
- 'jobs': [vs.Any(str, dict)]}
+ job = {str: vs.Any(str, JobParser.job_attributes)}
+ job_list = [vs.Any(str, job)]
+ pipeline_contents = {'queue': str, 'jobs': job_list}
+
+ for p in self.layout.pipelines.values():
+ project_template[p.name] = pipeline_contents
return vs.Schema(project_template)
- @staticmethod
- def fromYaml(tenant, layout, conf):
- with configuration_exceptions('project or project-template', conf):
- ProjectTemplateParser.getSchema(layout)(conf)
- # Make a copy since we modify this later via pop
- conf = copy.deepcopy(conf)
+ def fromYaml(self, conf, validate=True):
+ if validate:
+ with configuration_exceptions('project-template', conf):
+ self.schema(conf)
project_template = model.ProjectConfig(conf['name'])
source_context = conf['_source_context']
start_mark = conf['_start_mark']
- for pipeline in layout.pipelines.values():
+ for pipeline in self.layout.pipelines.values():
conf_pipeline = conf.get(pipeline.name)
if not conf_pipeline:
continue
project_pipeline = model.ProjectPipelineConfig()
project_template.pipelines[pipeline.name] = project_pipeline
project_pipeline.queue_name = conf_pipeline.get('queue')
- ProjectTemplateParser._parseJobList(
- tenant, layout, conf_pipeline.get('jobs', []),
+ self.parseJobList(
+ conf_pipeline.get('jobs', []),
source_context, start_mark, project_pipeline.job_list)
return project_template
- @staticmethod
- def _parseJobList(tenant, layout, conf, source_context,
- start_mark, job_list):
+ def parseJobList(self, conf, source_context, start_mark, job_list):
for conf_job in conf:
if isinstance(conf_job, str):
- attrs = dict(name=conf_job)
+ jobname = conf_job
+ attrs = {}
elif isinstance(conf_job, dict):
# A dictionary in a job tree may override params
jobname, attrs = list(conf_job.items())[0]
- if attrs:
- # We are overriding params, so make a new job def
- attrs['name'] = jobname
- else:
- # Not overriding, so add a blank job
- attrs = dict(name=jobname)
else:
raise Exception("Job must be a string or dictionary")
attrs['_source_context'] = source_context
@@ -772,17 +779,22 @@
# validate that the job is existing
with configuration_exceptions('project or project-template',
attrs):
- layout.getJob(attrs['name'])
+ self.layout.getJob(jobname)
- job_list.addJob(JobParser.fromYaml(tenant, layout, attrs,
- project_pipeline=True))
+ job_list.addJob(JobParser.fromYaml(self.tenant, self.layout,
+ attrs, project_pipeline=True,
+ name=jobname, validate=False))
class ProjectParser(object):
- log = logging.getLogger("zuul.ProjectParser")
+ def __init__(self, tenant, layout, project_template_parser):
+ self.log = logging.getLogger("zuul.ProjectParser")
+ self.tenant = tenant
+ self.layout = layout
+ self.project_template_parser = project_template_parser
+ self.schema = self.getSchema()
- @staticmethod
- def getSchema(layout):
+ def getSchema(self):
project = {
vs.Required('name'): str,
'description': str,
@@ -794,46 +806,52 @@
'_start_mark': ZuulMark,
}
- for p in layout.pipelines.values():
- project[p.name] = {'queue': str,
- 'jobs': [vs.Any(str, dict)]}
+ job = {str: vs.Any(str, JobParser.job_attributes)}
+ job_list = [vs.Any(str, job)]
+ pipeline_contents = {'queue': str, 'jobs': job_list}
+
+ for p in self.layout.pipelines.values():
+ project[p.name] = pipeline_contents
return vs.Schema(project)
- @staticmethod
- def fromYaml(tenant, layout, conf_list):
+ def fromYaml(self, conf_list):
for conf in conf_list:
with configuration_exceptions('project', conf):
- ProjectParser.getSchema(layout)(conf)
+ self.schema(conf)
with configuration_exceptions('project', conf_list[0]):
project_name = conf_list[0]['name']
- (trusted, project) = tenant.getProject(project_name)
+ (trusted, project) = self.tenant.getProject(project_name)
if project is None:
raise ProjectNotFoundError(project_name)
project_config = model.ProjectConfig(project.canonical_name)
configs = []
for conf in conf_list:
+ implied_branch = None
with configuration_exceptions('project', conf):
if not conf['_source_context'].trusted:
if project != conf['_source_context'].project:
raise ProjectNotPermittedError()
- # Make a copy since we modify this later via pop
- conf = copy.deepcopy(conf)
- conf_templates = conf.pop('templates', [])
+ conf_templates = conf.get('templates', [])
# The way we construct a project definition is by
# parsing the definition as a template, then applying
# all of the templates, including the newly parsed
# one, in order.
- project_template = ProjectTemplateParser.fromYaml(
- tenant, layout, conf)
+ project_template = self.project_template_parser.fromYaml(
+ conf, validate=False)
+ # If this project definition is in a place where it
+ # should get implied branch matchers, set it.
+ if (not conf['_source_context'].trusted):
+ implied_branch = conf['_source_context'].branch
for name in conf_templates:
- if name not in layout.project_templates:
+ if name not in self.layout.project_templates:
raise TemplateNotFoundError(name)
- configs.extend([layout.project_templates[name]
+ configs.extend([(self.layout.project_templates[name],
+ implied_branch)
for name in conf_templates])
- configs.append(project_template)
+ configs.append((project_template, implied_branch))
# Set the following values to the first one that we
# find and ignore subsequent settings.
mode = conf.get('merge-mode')
@@ -848,21 +866,19 @@
project_config.merge_mode = model.MERGER_MAP['merge-resolve']
if project_config.default_branch is None:
project_config.default_branch = 'master'
- for pipeline in layout.pipelines.values():
+ for pipeline in self.layout.pipelines.values():
project_pipeline = model.ProjectPipelineConfig()
queue_name = None
# For every template, iterate over the job tree and replace or
# create the jobs in the final definition as needed.
pipeline_defined = False
- for template in configs:
+ for (template, implied_branch) in configs:
if pipeline.name in template.pipelines:
- ProjectParser.log.debug(
- "Applying template %s to pipeline %s" %
- (template.name, pipeline.name))
pipeline_defined = True
template_pipeline = template.pipelines[pipeline.name]
project_pipeline.job_list.inheritFrom(
- template_pipeline.job_list)
+ template_pipeline.job_list,
+ implied_branch)
if template_pipeline.queue_name:
queue_name = template_pipeline.queue_name
if queue_name:
@@ -1358,7 +1374,12 @@
# branch. Remember the branch and then implicitly add a
# branch selector to each job there. This makes the
# in-repo configuration apply only to that branch.
- for branch in project.source.getProjectBranches(project, tenant):
+ branches = sorted(project.source.getProjectBranches(
+ project, tenant))
+ if 'master' in branches:
+ branches.remove('master')
+ branches = ['master'] + branches
+ for branch in branches:
new_project_unparsed_branch_config[project][branch] = \
model.UnparsedTenantConfig()
job = merger.getFiles(
@@ -1406,19 +1427,20 @@
(job.source_context,))
continue
loaded = conf_root
- job.source_context.path = fn
+ source_context = job.source_context.copy()
+ source_context.path = fn
TenantParser.log.info(
"Loading configuration from %s" %
- (job.source_context,))
- project = job.source_context.project
- branch = job.source_context.branch
- if job.source_context.trusted:
+ (source_context,))
+ project = source_context.project
+ branch = source_context.branch
+ if source_context.trusted:
incdata = TenantParser._parseConfigProjectLayout(
- job.files[fn], job.source_context)
+ job.files[fn], source_context)
config_projects_config.extend(incdata)
else:
incdata = TenantParser._parseUntrustedProjectLayout(
- job.files[fn], job.source_context)
+ job.files[fn], source_context)
untrusted_projects_config.extend(incdata)
new_project_unparsed_config[project].extend(incdata)
if branch in new_project_unparsed_branch_config.get(
@@ -1516,13 +1538,15 @@
continue
layout.addSemaphore(semaphore)
+ project_template_parser = ProjectTemplateParser(tenant, layout)
for config_template in data.project_templates:
classes = TenantParser._getLoadClasses(tenant, config_template)
if 'project-template' not in classes:
continue
- layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
- tenant, layout, config_template))
+ layout.addProjectTemplate(project_template_parser.fromYaml(
+ config_template))
+ project_parser = ProjectParser(tenant, layout, project_template_parser)
for config_projects in data.projects.values():
# Unlike other config classes, we expect multiple project
# stanzas with the same name, so that a config repo can
@@ -1540,14 +1564,15 @@
if not filtered_projects:
continue
- layout.addProjectConfig(ProjectParser.fromYaml(
- tenant, layout, filtered_projects))
+ layout.addProjectConfig(project_parser.fromYaml(
+ filtered_projects))
@staticmethod
def _parseLayout(base, tenant, data, scheduler, connections):
# Don't call this method from dynamic reconfiguration because
# it interacts with drivers and connections.
layout = model.Layout(tenant)
+ TenantParser.log.debug("Created layout id %s", layout.uuid)
TenantParser._parseLayoutItems(layout, tenant, data,
scheduler, connections)
@@ -1615,9 +1640,22 @@
for branch in branches:
fns1 = []
fns2 = []
- files_list = files.connections.get(
+ files_entry = files.connections.get(
project.source.connection.connection_name, {}).get(
- project.name, {}).get(branch, {}).keys()
+ project.name, {}).get(branch)
+ # If there is no files entry at all for this
+ # project-branch, then use the cached config.
+ if files_entry is None:
+ if trusted:
+ incdata = project.unparsed_config
+ else:
+ incdata = project.unparsed_branch_config.get(branch)
+ if incdata:
+ config.extend(incdata)
+ continue
+ # Otherwise, do not use the cached config (even if the
+ # files are empty as that likely means they were deleted).
+ files_list = files_entry.keys()
for fn in files_list:
if fn.startswith("zuul.d/"):
fns1.append(fn)
@@ -1649,14 +1687,6 @@
config.extend(incdata)
- if not loaded:
- if trusted:
- incdata = project.unparsed_config
- else:
- incdata = project.unparsed_branch_config.get(branch)
- if incdata:
- config.extend(incdata)
-
def createDynamicLayout(self, tenant, files,
include_config_projects=False,
scheduler=None, connections=None):
@@ -1671,6 +1701,7 @@
self._loadDynamicProjectData(config, project, files, False, tenant)
layout = model.Layout(tenant)
+ self.log.debug("Created layout id %s", layout.uuid)
if not include_config_projects:
# NOTE: the actual pipeline objects (complete with queues
# and enqueued items) are copied by reference here. This
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 3655115..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -14,8 +14,6 @@
import abc
-import extras
-
class BaseConnection(object, metaclass=abc.ABCMeta):
"""Base class for connections.
@@ -42,20 +40,18 @@
self.driver = driver
self.connection_name = connection_name
self.connection_config = connection_config
- self.statsd = extras.try_import('statsd.statsd')
def logEvent(self, event):
self.log.debug(
- 'Scheduling {driver} event from {connection}: {event}'.format(
- driver=self.driver.name,
+ 'Scheduling event from {connection}: {event}'.format(
connection=self.connection_name,
- event=event.type))
+ event=event))
try:
- if self.statsd:
- self.statsd.incr(
+ if self.sched.statsd:
+ self.sched.statsd.incr(
'zuul.event.{driver}.{event}'.format(
driver=self.driver.name, event=event.type))
- self.statsd.incr(
+ self.sched.statsd.incr(
'zuul.event.{driver}.{connection}.{event}'.format(
driver=self.driver.name,
connection=self.connection_name,
diff --git a/zuul/driver/sql/sqlreporter.py b/zuul/driver/sql/sqlreporter.py
index 46bc1e6..7785c48 100644
--- a/zuul/driver/sql/sqlreporter.py
+++ b/zuul/driver/sql/sqlreporter.py
@@ -66,15 +66,19 @@
(result, url) = item.formatJobResult(job)
+ start = end = None
+ if build.start_time:
+ start = datetime.datetime.fromtimestamp(build.start_time)
+ if build.end_time:
+ end = datetime.datetime.fromtimestamp(build.end_time)
+
build_inserts.append({
'buildset_id': buildset_ins_result.inserted_primary_key,
'uuid': build.uuid,
'job_name': build.job.name,
'result': result,
- 'start_time': datetime.datetime.fromtimestamp(
- build.start_time),
- 'end_time': datetime.datetime.fromtimestamp(
- build.end_time),
+ 'start_time': start,
+ 'end_time': end,
'voting': build.job.voting,
'log_url': url,
'node_name': build.node_name,
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index f97d286..ae22c8e 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -101,7 +101,7 @@
job = super(ZuulGearmanClient, self).handleStatusRes(packet)
except gear.UnknownJobError:
handle = packet.getArgument(0)
- for build in self.__zuul_gearman.builds:
+ for build in self.__zuul_gearman.builds.values():
if build.__gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(job)
@@ -168,7 +168,8 @@
project=project,
tenant=tenant.name,
timeout=job.timeout,
- jobtags=sorted(job.tags))
+ jobtags=sorted(job.tags),
+ _inheritance_path=list(job.inheritance_path))
if hasattr(item.change, 'branch'):
zuul_params['branch'] = item.change.branch
if hasattr(item.change, 'tag'):
@@ -186,6 +187,7 @@
and item.change.newrev != '0' * 40):
zuul_params['newrev'] = item.change.newrev
zuul_params['projects'] = [] # Set below
+ zuul_params['_projects'] = {} # transitional to convert to dict
zuul_params['items'] = []
for i in all_items:
d = dict()
@@ -234,9 +236,10 @@
params['vars'] = copy.deepcopy(job.variables)
params['zuul'] = zuul_params
projects = set()
+ required_projects = set()
def make_project_dict(project, override_branch=None):
- project_config = item.current_build_set.layout.project_configs.get(
+ project_config = item.layout.project_configs.get(
project.canonical_name, None)
if project_config:
project_default_branch = project_config.default_branch
@@ -260,6 +263,7 @@
make_project_dict(project,
job_project.override_branch))
projects.add(project)
+ required_projects.add(project)
for i in all_items:
if i.change.project not in projects:
project = i.change.project
@@ -267,18 +271,30 @@
projects.add(project)
for p in projects:
- zuul_params['projects'].append(dict(
+ zuul_params['_projects'][p.canonical_name] = (dict(
name=p.name,
short_name=p.name.split('/')[-1],
- canonical_hostname=p.canonical_hostname,
+ # Duplicate this into the dict too, so that iterating
+ # project.values() is easier for callers
canonical_name=p.canonical_name,
+ canonical_hostname=p.canonical_hostname,
src_dir=os.path.join('src', p.canonical_name),
+ required=(p in required_projects),
))
+ # We are transitioning "projects" from a list to a dict
+ # indexed by canonical name, as it is much easier to access
+ # values in ansible. Existing callers are converted to
+ # "_projects", then once "projects" is unused we switch it,
+ # then convert callers back. Finally when "_projects" is
+ # unused it will be removed.
+ for cn, p in zuul_params['_projects'].items():
+ zuul_params['projects'].append(p)
build = Build(job, uuid)
build.parameters = params
if job.name == 'noop':
+ self.sched.onBuildStarted(build)
self.sched.onBuildCompleted(build, 'SUCCESS', {})
return build
@@ -368,9 +384,17 @@
result = 'RETRY_LIMIT'
else:
build.retry = True
+ if result in ('DISCONNECT', 'ABORTED'):
+ # Always retry if the executor just went away
+ build.retry = True
result_data = data.get('data', {})
self.log.info("Build %s complete, result %s" %
(job, result))
+ # If the build should be retried, don't supply the result
+ # so that elsewhere we don't have to deal with keeping
+ # track of which results are non-final.
+ if build.retry:
+ result = None
self.sched.onBuildCompleted(build, result, result_data)
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
@@ -399,7 +423,7 @@
def onDisconnect(self, job):
self.log.info("Gearman job %s lost due to disconnect" % job)
- self.onBuildCompleted(job)
+ self.onBuildCompleted(job, 'DISCONNECT')
def onUnknownJob(self, job):
self.log.info("Gearman job %s lost due to unknown handle" % job)
@@ -440,7 +464,9 @@
def lookForLostBuilds(self):
self.log.debug("Looking for lost builds")
- for build in self.builds.values():
+ # Construct a list from the values iterator to protect from it changing
+ # out from underneath us.
+ for build in list(self.builds.values()):
if build.result:
# The build has finished, it will be removed
continue
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index af2e7e9..e7a6dbc 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
import datetime
import json
import logging
+import multiprocessing
import os
import shutil
import signal
@@ -28,6 +29,7 @@
import traceback
from zuul.lib.yamlutil import yaml
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
try:
import ara.plugins.callbacks as ara_callbacks
@@ -45,6 +47,11 @@
DEFAULT_FINGER_PORT = 79
+class StopException(Exception):
+ """An exception raised when an inner loop is asked to stop."""
+ pass
+
+
class ExecutorError(Exception):
"""A non-transient run-time executor error
@@ -88,7 +95,7 @@
if cache_dir == jobs_base:
raise Exception("Cache dir and jobs dir cannot be the same")
self.thread = threading.Thread(target=self._run,
- name='executor-diskaccountant')
+ name='diskaccountant')
self.thread.daemon = True
self._running = False
self.jobs_base = jobs_base
@@ -147,7 +154,7 @@
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run,
- name='executor-watchdog')
+ name='watchdog')
self.thread.daemon = True
self.timed_out = None
@@ -284,6 +291,7 @@
# inventory.yaml
# .ansible (mounted in bwrap read-write)
# fact-cache/localhost
+ # cp
# playbook_0 (mounted in bwrap for each playbook read-only)
# secrets.yaml
# project -> ../trusted/project_0/...
@@ -324,6 +332,8 @@
self.ansible_cache_root = os.path.join(self.root, '.ansible')
self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
os.makedirs(self.fact_cache)
+ self.control_path = os.path.join(self.ansible_cache_root, 'cp')
+ os.makedirs(self.control_path)
localhost_facts = os.path.join(self.fact_cache, 'localhost')
# NOTE(pabelanger): We do not want to leak zuul-executor facts to other
# playbooks now that smart fact gathering is enabled by default. We
@@ -499,353 +509,6 @@
return inventory
-class ExecutorMergeWorker(gear.TextWorker):
- def __init__(self, executor_server, *args, **kw):
- self.zuul_executor_server = executor_server
- super(ExecutorMergeWorker, self).__init__(*args, **kw)
-
- def handleNoop(self, packet):
- # Wait until the update queue is empty before responding
- while self.zuul_executor_server.update_queue.qsize():
- time.sleep(1)
-
- with self.zuul_executor_server.merger_lock:
- super(ExecutorMergeWorker, self).handleNoop(packet)
-
-
-class ExecutorServer(object):
- log = logging.getLogger("zuul.ExecutorServer")
-
- def __init__(self, config, connections={}, jobdir_root=None,
- keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
- self.config = config
- self.keep_jobdir = keep_jobdir
- self.jobdir_root = jobdir_root
- # TODOv3(mordred): make the executor name more unique --
- # perhaps hostname+pid.
- self.hostname = socket.gethostname()
- self.log_streaming_port = log_streaming_port
- self.merger_lock = threading.Lock()
- self.verbose = False
- self.command_map = dict(
- stop=self.stop,
- pause=self.pause,
- unpause=self.unpause,
- graceful=self.graceful,
- verbose=self.verboseOn,
- unverbose=self.verboseOff,
- keep=self.keep,
- nokeep=self.nokeep,
- )
-
- self.merge_root = get_default(self.config, 'executor', 'git_dir',
- '/var/lib/zuul/executor-git')
- self.default_username = get_default(self.config, 'executor',
- 'default_username', 'zuul')
- self.disk_limit_per_job = int(get_default(self.config, 'executor',
- 'disk_limit_per_job', 250))
- self.merge_email = get_default(self.config, 'merger', 'git_user_email')
- self.merge_name = get_default(self.config, 'merger', 'git_user_name')
- execution_wrapper_name = get_default(self.config, 'executor',
- 'execution_wrapper', 'bubblewrap')
- self.execution_wrapper = connections.drivers[execution_wrapper_name]
-
- self.connections = connections
- # This merger and its git repos are used to maintain
- # up-to-date copies of all the repos that are used by jobs, as
- # well as to support the merger:cat functon to supply
- # configuration information to Zuul when it starts.
- self.merger = self._getMerger(self.merge_root)
- self.update_queue = DeduplicateQueue()
-
- state_dir = get_default(self.config, 'executor', 'state_dir',
- '/var/lib/zuul', expand_user=True)
- path = os.path.join(state_dir, 'executor.socket')
- self.command_socket = commandsocket.CommandSocket(path)
- ansible_dir = os.path.join(state_dir, 'ansible')
- self.ansible_dir = ansible_dir
- if os.path.exists(ansible_dir):
- shutil.rmtree(ansible_dir)
-
- zuul_dir = os.path.join(ansible_dir, 'zuul')
- plugin_dir = os.path.join(zuul_dir, 'ansible')
-
- os.makedirs(plugin_dir, mode=0o0755)
-
- self.library_dir = os.path.join(plugin_dir, 'library')
- self.action_dir = os.path.join(plugin_dir, 'action')
- self.callback_dir = os.path.join(plugin_dir, 'callback')
- self.lookup_dir = os.path.join(plugin_dir, 'lookup')
- self.filter_dir = os.path.join(plugin_dir, 'filter')
-
- _copy_ansible_files(zuul.ansible, plugin_dir)
-
- # We're copying zuul.ansible.* into a directory we are going
- # to add to pythonpath, so our plugins can "import
- # zuul.ansible". But we're not installing all of zuul, so
- # create a __init__.py file for the stub "zuul" module.
- with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
- pass
-
- self.job_workers = {}
- self.disk_accountant = DiskAccountant(self.jobdir_root,
- self.disk_limit_per_job,
- self.stopJobByJobdir,
- self.merge_root)
-
- def _getMerger(self, root, logger=None):
- if root != self.merge_root:
- cache_root = self.merge_root
- else:
- cache_root = None
- return zuul.merger.merger.Merger(root, self.connections,
- self.merge_email, self.merge_name,
- cache_root, logger)
-
- def start(self):
- self._running = True
- self._command_running = True
- server = self.config.get('gearman', 'server')
- port = get_default(self.config, 'gearman', 'port', 4730)
- ssl_key = get_default(self.config, 'gearman', 'ssl_key')
- ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
- ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
- self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
- self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.executor_worker = gear.TextWorker('Zuul Executor Server')
- self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
- self.log.debug("Waiting for server")
- self.merger_worker.waitForServer()
- self.executor_worker.waitForServer()
- self.log.debug("Registering")
- self.register()
-
- self.log.debug("Starting command processor")
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
- self.command_thread.daemon = True
- self.command_thread.start()
-
- self.log.debug("Starting worker")
- self.update_thread = threading.Thread(target=self._updateLoop)
- self.update_thread.daemon = True
- self.update_thread.start()
- self.merger_thread = threading.Thread(target=self.run_merger)
- self.merger_thread.daemon = True
- self.merger_thread.start()
- self.executor_thread = threading.Thread(target=self.run_executor)
- self.executor_thread.daemon = True
- self.executor_thread.start()
- self.disk_accountant.start()
-
- def register(self):
- self.executor_worker.registerFunction("executor:execute")
- self.executor_worker.registerFunction("executor:stop:%s" %
- self.hostname)
- self.merger_worker.registerFunction("merger:merge")
- self.merger_worker.registerFunction("merger:cat")
- self.merger_worker.registerFunction("merger:refstate")
-
- def stop(self):
- self.log.debug("Stopping")
- self.disk_accountant.stop()
- self._running = False
- self._command_running = False
- self.command_socket.stop()
- self.update_queue.put(None)
-
- for job_worker in list(self.job_workers.values()):
- try:
- job_worker.stop()
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
- self.merger_worker.shutdown()
- self.executor_worker.shutdown()
- self.log.debug("Stopped")
-
- def pause(self):
- # TODOv3: implement
- pass
-
- def unpause(self):
- # TODOv3: implement
- pass
-
- def graceful(self):
- # TODOv3: implement
- pass
-
- def verboseOn(self):
- self.verbose = True
-
- def verboseOff(self):
- self.verbose = False
-
- def keep(self):
- self.keep_jobdir = True
-
- def nokeep(self):
- self.keep_jobdir = False
-
- def join(self):
- self.update_thread.join()
- self.merger_thread.join()
- self.executor_thread.join()
-
- def runCommand(self):
- while self._command_running:
- try:
- command = self.command_socket.get().decode('utf8')
- if command != '_stop':
- self.command_map[command]()
- except Exception:
- self.log.exception("Exception while processing command")
-
- def _updateLoop(self):
- while self._running:
- try:
- self._innerUpdateLoop()
- except:
- self.log.exception("Exception in update thread:")
-
- def _innerUpdateLoop(self):
- # Inside of a loop that keeps the main repositories up to date
- task = self.update_queue.get()
- if task is None:
- # We are asked to stop
- return
- with self.merger_lock:
- self.log.info("Updating repo %s/%s" % (
- task.connection_name, task.project_name))
- self.merger.updateRepo(task.connection_name, task.project_name)
- self.log.debug("Finished updating repo %s/%s" %
- (task.connection_name, task.project_name))
- task.setComplete()
-
- def update(self, connection_name, project_name):
- # Update a repository in the main merger
- task = UpdateTask(connection_name, project_name)
- task = self.update_queue.put(task)
- return task
-
- def run_merger(self):
- self.log.debug("Starting merger listener")
- while self._running:
- try:
- job = self.merger_worker.getJob()
- try:
- if job.name == 'merger:cat':
- self.log.debug("Got cat job: %s" % job.unique)
- self.cat(job)
- elif job.name == 'merger:merge':
- self.log.debug("Got merge job: %s" % job.unique)
- self.merge(job)
- elif job.name == 'merger:refstate':
- self.log.debug("Got refstate job: %s" % job.unique)
- self.refstate(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def run_executor(self):
- self.log.debug("Starting executor listener")
- while self._running:
- try:
- job = self.executor_worker.getJob()
- try:
- if job.name == 'executor:execute':
- self.log.debug("Got execute job: %s" % job.unique)
- self.executeJob(job)
- elif job.name.startswith('executor:stop'):
- self.log.debug("Got stop job: %s" % job.unique)
- self.stopJob(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
- except Exception:
- self.log.exception("Exception while running job")
- job.sendWorkException(
- traceback.format_exc().encode('utf8'))
- except gear.InterruptedError:
- pass
- except Exception:
- self.log.exception("Exception while getting job")
-
- def executeJob(self, job):
- self.job_workers[job.unique] = AnsibleJob(self, job)
- self.job_workers[job.unique].run()
-
- def finishJob(self, unique):
- del(self.job_workers[unique])
-
- def stopJobByJobdir(self, jobdir):
- unique = os.path.basename(jobdir)
- self.stopJobByUnique(unique)
-
- def stopJob(self, job):
- try:
- args = json.loads(job.arguments)
- self.log.debug("Stop job with arguments: %s" % (args,))
- unique = args['uuid']
- self.stopJobByUnique(unique)
- finally:
- job.sendWorkComplete()
-
- def stopJobByUnique(self, unique):
- job_worker = self.job_workers.get(unique)
- if not job_worker:
- self.log.debug("Unable to find worker for job %s" % (unique,))
- return
- try:
- job_worker.stop()
- except Exception:
- self.log.exception("Exception sending stop command "
- "to worker:")
-
- def cat(self, job):
- args = json.loads(job.arguments)
- task = self.update(args['connection'], args['project'])
- task.wait()
- with self.merger_lock:
- files = self.merger.getFiles(args['connection'], args['project'],
- args['branch'], args['files'],
- args.get('dirs', []))
- result = dict(updated=True,
- files=files)
- job.sendWorkComplete(json.dumps(result))
-
- def refstate(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- success, repo_state = self.merger.getRepoState(args['items'])
- result = dict(updated=success,
- repo_state=repo_state)
- job.sendWorkComplete(json.dumps(result))
-
- def merge(self, job):
- args = json.loads(job.arguments)
- with self.merger_lock:
- ret = self.merger.mergeChanges(args['items'], args.get('files'),
- args.get('dirs', []),
- args.get('repo_state'))
- result = dict(merged=(ret is not None))
- if ret is None:
- result['commit'] = result['files'] = result['repo_state'] = None
- else:
- (result['commit'], result['files'], result['repo_state'],
- recent) = ret
- job.sendWorkComplete(json.dumps(result))
-
-
class AnsibleJobLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
@@ -858,12 +521,14 @@
RESULT_TIMED_OUT = 2
RESULT_UNREACHABLE = 3
RESULT_ABORTED = 4
+ RESULT_DISK_FULL = 5
RESULT_MAP = {
RESULT_NORMAL: 'RESULT_NORMAL',
RESULT_TIMED_OUT: 'RESULT_TIMED_OUT',
RESULT_UNREACHABLE: 'RESULT_UNREACHABLE',
RESULT_ABORTED: 'RESULT_ABORTED',
+ RESULT_DISK_FULL: 'RESULT_DISK_FULL',
}
def __init__(self, executor_server, job):
@@ -876,6 +541,7 @@
self.proc_lock = threading.Lock()
self.running = False
self.aborted = False
+ self.aborted_reason = None
self.thread = None
self.private_key_file = get_default(self.executor_server.config,
'executor', 'private_key_file',
@@ -890,12 +556,16 @@
def run(self):
self.running = True
- self.thread = threading.Thread(target=self.execute)
+ self.thread = threading.Thread(target=self.execute,
+ name='build-%s' % self.job.unique)
self.thread.start()
- def stop(self):
+ def stop(self, reason=None):
self.aborted = True
+ self.aborted_reason = reason
self.abortRunningProc()
+
+ def wait(self):
if self.thread:
self.thread.join()
@@ -1042,6 +712,13 @@
self.job.sendWorkStatus(0, 100)
result = self.runPlaybooks(args)
+
+ # Stop the persistent SSH connections.
+ setup_status, setup_code = self.runAnsibleCleanup(
+ self.jobdir.setup_playbook)
+
+ if self.aborted_reason == self.RESULT_DISK_FULL:
+ result = 'DISK_FULL'
data = self.getResultData()
result_data = json.dumps(dict(result=result,
data=data))
@@ -1158,13 +835,44 @@
# TODOv3(pabelanger): Implement post-run timeout setting.
post_status, post_code = self.runAnsiblePlaybook(
playbook, args['timeout'], success, phase='post', index=index)
+ if post_status == self.RESULT_ABORTED:
+ return 'ABORTED'
if post_status != self.RESULT_NORMAL or post_code != 0:
+ success = False
# If we encountered a pre-failure, that takes
# precedence over the post result.
if not pre_failed:
result = 'POST_FAILURE'
+ if (index + 1) == len(self.jobdir.post_playbooks):
+ self._logFinalPlaybookError()
+
return result
+ def _logFinalPlaybookError(self):
+ # Failures in the final post playbook can include failures
+ # uploading logs, which makes diagnosing issues difficult.
+ # Grab the output from the last playbook from the json
+ # file and log it.
+ json_output = self.jobdir.job_output_file.replace('txt', 'json')
+ self.log.debug("Final playbook failed")
+ if not os.path.exists(json_output):
+ self.log.debug("JSON logfile {logfile} is missing".format(
+ logfile=json_output))
+ return
+ try:
+ output = json.load(open(json_output, 'r'))
+ last_playbook = output[-1]
+ # Transform json to yaml - because it's easier to read and given
+ # the size of the data it'll be extra-hard to read this as an
+ # all on one line stringified nested dict.
+ yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
+ for line in yaml_out.split('\n'):
+ self.log.debug(line)
+ except Exception:
+ self.log.exception(
+ "Could not decode json from {logfile}".format(
+ logfile=json_output))
+
def getHostList(self, args):
hosts = []
for node in args['nodes']:
@@ -1491,6 +1199,13 @@
config.write('display_args_to_stdout = %s\n' %
str(not jobdir_playbook.secrets_content))
+ # Increase the internal poll interval of ansible.
+ # The default interval of 0.001s is optimized for interactive
+ # ui at the expense of CPU load. As we have a non-interactive
+ # automation use case a longer poll interval is more suitable
+ # and reduces CPU load of the ansible process.
+ config.write('internal_poll_interval = 0.01\n')
+
config.write('[ssh_connection]\n')
# NB: when setting pipelining = True, keep_remote_files
# must be False (the default). Otherwise it apparently
@@ -1501,6 +1216,7 @@
# command which expects interactive input on a tty (such
# as sudo) it does not hang.
config.write('pipelining = True\n')
+ config.write('control_path_dir = %s\n' % self.jobdir.control_path)
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
@@ -1522,7 +1238,7 @@
except Exception:
self.log.exception("Exception while killing ansible process:")
- def runAnsible(self, cmd, timeout, playbook):
+ def runAnsible(self, cmd, timeout, playbook, wrapped=True):
config_file = playbook.ansible_config
env_copy = os.environ.copy()
env_copy.update(self.ssh_agent.env)
@@ -1563,8 +1279,12 @@
if playbook.secrets_content:
secrets[playbook.secrets] = playbook.secrets_content
- context = self.executor_server.execution_wrapper.getExecutionContext(
- ro_paths, rw_paths, secrets)
+ if wrapped:
+ wrapper = self.executor_server.execution_wrapper
+ else:
+ wrapper = self.executor_server.connections.drivers['nullwrap']
+
+ context = wrapper.getExecutionContext(ro_paths, rw_paths, secrets)
popen = context.getPopen(
work_dir=self.jobdir.work_root,
@@ -1583,6 +1303,7 @@
self.proc = popen(
cmd,
cwd=self.jobdir.work_root,
+ stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
@@ -1630,6 +1351,13 @@
elif ret == -9:
# Received abort request.
return (self.RESULT_ABORTED, None)
+ elif ret == 1:
+ if syntax_buffer[0].startswith(b'ERROR!'):
+ with open(self.jobdir.job_output_file, 'a') as job_output:
+ for line in syntax_buffer:
+ job_output.write("{now} | {line}\n".format(
+ now=datetime.datetime.now(),
+ line=line.decode('utf-8').rstrip()))
elif ret == 4:
# Ansible could not parse the yaml.
self.log.debug("Ansible parse error")
@@ -1652,7 +1380,7 @@
now=datetime.datetime.now()))
found_marker = False
for line in syntax_buffer:
- if line.startswith('ERROR! Unexpected Exception'):
+ if line.startswith(b'ERROR! Unexpected Exception'):
found_marker = True
if not found_marker:
continue
@@ -1672,11 +1400,63 @@
'-a', 'gather_subset=!all']
result, code = self.runAnsible(
- cmd=cmd, timeout=60, playbook=playbook)
+ cmd=cmd, timeout=60, playbook=playbook,
+ wrapped=False)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
return result, code
+ def runAnsibleCleanup(self, playbook):
+ # TODO(jeblair): This requires a bugfix in Ansible 2.4
+ # Once this is used, increase the controlpersist timeout.
+ return (self.RESULT_NORMAL, 0)
+
+ if self.executor_server.verbose:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible', '*', verbose, '-m', 'meta',
+ '-a', 'reset_connection']
+
+ result, code = self.runAnsible(
+ cmd=cmd, timeout=60, playbook=playbook,
+ wrapped=False)
+ self.log.debug("Ansible complete, result %s code %s" % (
+ self.RESULT_MAP[result], code))
+ return result, code
+
+ def emitPlaybookBanner(self, playbook, step, phase, result=None):
+ # This is used to print a header and a footer, respectively at the
+ # beginning and the end of each playbook execution.
+ # We are doing it from the executor rather than from a callback because
+ # the parameters are not made available to the callback until it's too
+ # late.
+ phase = phase or ''
+ trusted = playbook.trusted
+ trusted = 'trusted' if trusted else 'untrusted'
+ branch = playbook.branch
+ playbook = playbook.canonical_name_and_path
+
+ if phase and phase != 'run':
+ phase = '{phase}-run'.format(phase=phase)
+ phase = phase.upper()
+
+ if result is not None:
+ result = self.RESULT_MAP[result]
+ msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
+ msg = msg.format(phase=phase, step=step, result=result,
+ trusted=trusted, playbook=playbook, branch=branch)
+ else:
+ msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
+ msg = msg.format(phase=phase, step=step, trusted=trusted,
+ playbook=playbook, branch=branch)
+
+ with open(self.jobdir.job_output_file, 'a') as job_output:
+ job_output.write("{now} | {msg}\n".format(
+ now=datetime.datetime.now(),
+ msg=msg))
+
def runAnsiblePlaybook(self, playbook, timeout, success=None,
phase=None, index=None):
if self.executor_server.verbose:
@@ -1707,8 +1487,482 @@
if self.executor_variables_file is not None:
cmd.extend(['-e@%s' % self.executor_variables_file])
+ self.emitPlaybookBanner(playbook, 'START', phase)
+
result, code = self.runAnsible(
cmd=cmd, timeout=timeout, playbook=playbook)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
+
+ self.emitPlaybookBanner(playbook, 'END', phase, result=result)
return result, code
+
+
+class ExecutorMergeWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Wait until the update queue is empty before responding
+ while self.zuul_executor_server.update_queue.qsize():
+ time.sleep(1)
+
+ with self.zuul_executor_server.merger_lock:
+ super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
+class ExecutorExecuteWorker(gear.TextWorker):
+ def __init__(self, executor_server, *args, **kw):
+ self.zuul_executor_server = executor_server
+ super(ExecutorExecuteWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ # Delay our response to running a new job based on the number
+ # of jobs we're currently running, in an attempt to spread
+ # load evenly among executors.
+ workers = len(self.zuul_executor_server.job_workers)
+ delay = (workers ** 2) / 1000.0
+ time.sleep(delay)
+ return super(ExecutorExecuteWorker, self).handleNoop(packet)
+
+
+class ExecutorServer(object):
+ log = logging.getLogger("zuul.ExecutorServer")
+ _job_class = AnsibleJob
+
+ def __init__(self, config, connections={}, jobdir_root=None,
+ keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
+ self.config = config
+ self.keep_jobdir = keep_jobdir
+ self.jobdir_root = jobdir_root
+ # TODOv3(mordred): make the executor name more unique --
+ # perhaps hostname+pid.
+ self.hostname = get_default(self.config, 'executor', 'hostname',
+ socket.gethostname())
+ self.log_streaming_port = log_streaming_port
+ self.merger_lock = threading.Lock()
+ self.run_lock = threading.Lock()
+ self.verbose = False
+ self.command_map = dict(
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ graceful=self.graceful,
+ verbose=self.verboseOn,
+ unverbose=self.verboseOff,
+ keep=self.keep,
+ nokeep=self.nokeep,
+ )
+
+ self.statsd = get_statsd(config)
+ self.merge_root = get_default(self.config, 'executor', 'git_dir',
+ '/var/lib/zuul/executor-git')
+ self.default_username = get_default(self.config, 'executor',
+ 'default_username', 'zuul')
+ self.disk_limit_per_job = int(get_default(self.config, 'executor',
+ 'disk_limit_per_job', 250))
+ self.merge_email = get_default(self.config, 'merger', 'git_user_email')
+ self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+ self.merge_speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ self.merge_speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ execution_wrapper_name = get_default(self.config, 'executor',
+ 'execution_wrapper', 'bubblewrap')
+ load_multiplier = float(get_default(self.config, 'executor',
+ 'load_multiplier', '2.5'))
+ self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.accepting_work = False
+ self.execution_wrapper = connections.drivers[execution_wrapper_name]
+
+ self.connections = connections
+ # This merger and its git repos are used to maintain
+ # up-to-date copies of all the repos that are used by jobs, as
+ # well as to support the merger:cat functon to supply
+ # configuration information to Zuul when it starts.
+ self.merger = self._getMerger(self.merge_root)
+ self.update_queue = DeduplicateQueue()
+
+ state_dir = get_default(self.config, 'executor', 'state_dir',
+ '/var/lib/zuul', expand_user=True)
+ path = os.path.join(state_dir, 'executor.socket')
+ self.command_socket = commandsocket.CommandSocket(path)
+ ansible_dir = os.path.join(state_dir, 'ansible')
+ self.ansible_dir = ansible_dir
+ if os.path.exists(ansible_dir):
+ shutil.rmtree(ansible_dir)
+
+ zuul_dir = os.path.join(ansible_dir, 'zuul')
+ plugin_dir = os.path.join(zuul_dir, 'ansible')
+
+ os.makedirs(plugin_dir, mode=0o0755)
+
+ self.library_dir = os.path.join(plugin_dir, 'library')
+ self.action_dir = os.path.join(plugin_dir, 'action')
+ self.callback_dir = os.path.join(plugin_dir, 'callback')
+ self.lookup_dir = os.path.join(plugin_dir, 'lookup')
+ self.filter_dir = os.path.join(plugin_dir, 'filter')
+
+ _copy_ansible_files(zuul.ansible, plugin_dir)
+
+ # We're copying zuul.ansible.* into a directory we are going
+ # to add to pythonpath, so our plugins can "import
+ # zuul.ansible". But we're not installing all of zuul, so
+ # create a __init__.py file for the stub "zuul" module.
+ with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
+ pass
+
+ self.job_workers = {}
+ self.disk_accountant = DiskAccountant(self.jobdir_root,
+ self.disk_limit_per_job,
+ self.stopJobDiskFull,
+ self.merge_root)
+
+ def _getMerger(self, root, logger=None):
+ if root != self.merge_root:
+ cache_root = self.merge_root
+ else:
+ cache_root = None
+ return zuul.merger.merger.Merger(
+ root, self.connections, self.merge_email, self.merge_name,
+ self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
+
+ def start(self):
+ self._running = True
+ self._command_running = True
+ server = self.config.get('gearman', 'server')
+ port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+ self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+ self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.executor_worker = ExecutorExecuteWorker(
+ self, 'Zuul Executor Server')
+ self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.log.debug("Waiting for server")
+ self.merger_worker.waitForServer()
+ self.executor_worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand,
+ name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
+ self.log.debug("Starting worker")
+ self.update_thread = threading.Thread(target=self._updateLoop,
+ name='update')
+ self.update_thread.daemon = True
+ self.update_thread.start()
+ self.merger_thread = threading.Thread(target=self.run_merger,
+ name='merger')
+ self.merger_thread.daemon = True
+ self.merger_thread.start()
+ self.executor_thread = threading.Thread(target=self.run_executor,
+ name='executor')
+ self.executor_thread.daemon = True
+ self.executor_thread.start()
+ self.governor_stop_event = threading.Event()
+ self.governor_thread = threading.Thread(target=self.run_governor,
+ name='governor')
+ self.governor_thread.daemon = True
+ self.governor_thread.start()
+ self.disk_accountant.start()
+
+ def register(self):
+ self.register_work()
+ self.executor_worker.registerFunction("executor:stop:%s" %
+ self.hostname)
+ self.merger_worker.registerFunction("merger:merge")
+ self.merger_worker.registerFunction("merger:cat")
+ self.merger_worker.registerFunction("merger:refstate")
+
+ def register_work(self):
+ if self._running:
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
+
+ def unregister_work(self):
+ self.accepting_work = False
+ self.executor_worker.unregisterFunction("executor:execute")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self.disk_accountant.stop()
+ # The governor can change function registration, so make sure
+ # it has stopped.
+ self.governor_stop_event.set()
+ self.governor_thread.join()
+ # Stop accepting new jobs
+ self.merger_worker.setFunctions([])
+ self.executor_worker.setFunctions([])
+ # Tell the executor worker to abort any jobs it just accepted,
+ # and grab the list of currently running job workers.
+ with self.run_lock:
+ self._running = False
+ self._command_running = False
+ workers = list(self.job_workers.values())
+ self.command_socket.stop()
+
+ for job_worker in workers:
+ try:
+ job_worker.stop()
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+ for job_worker in workers:
+ try:
+ job_worker.wait()
+ except Exception:
+ self.log.exception("Exception waiting for worker "
+ "to stop:")
+
+ # Now that we aren't accepting any new jobs, and all of the
+ # running jobs have stopped, tell the update processor to
+ # stop.
+ self.update_queue.put(None)
+
+ # All job results should have been sent by now, shutdown the
+ # gearman workers.
+ self.merger_worker.shutdown()
+ self.executor_worker.shutdown()
+
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.gauge(base_key + '.load_average', 0)
+ self.statsd.gauge(base_key + '.running_builds', 0)
+
+ self.log.debug("Stopped")
+
+ def join(self):
+ self.governor_thread.join()
+ self.update_thread.join()
+ self.merger_thread.join()
+ self.executor_thread.join()
+
+ def pause(self):
+ # TODOv3: implement
+ pass
+
+ def unpause(self):
+ # TODOv3: implement
+ pass
+
+ def graceful(self):
+ # TODOv3: implement
+ pass
+
+ def verboseOn(self):
+ self.verbose = True
+
+ def verboseOff(self):
+ self.verbose = False
+
+ def keep(self):
+ self.keep_jobdir = True
+
+ def nokeep(self):
+ self.keep_jobdir = False
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
+
+ def _updateLoop(self):
+ while True:
+ try:
+ self._innerUpdateLoop()
+ except StopException:
+ return
+ except Exception:
+ self.log.exception("Exception in update thread:")
+
+ def _innerUpdateLoop(self):
+ # Inside of a loop that keeps the main repositories up to date
+ task = self.update_queue.get()
+ if task is None:
+ # We are asked to stop
+ raise StopException()
+ with self.merger_lock:
+ self.log.info("Updating repo %s/%s" % (
+ task.connection_name, task.project_name))
+ self.merger.updateRepo(task.connection_name, task.project_name)
+ self.log.debug("Finished updating repo %s/%s" %
+ (task.connection_name, task.project_name))
+ task.setComplete()
+
+ def update(self, connection_name, project_name):
+ # Update a repository in the main merger
+ task = UpdateTask(connection_name, project_name)
+ task = self.update_queue.put(task)
+ return task
+
+ def run_merger(self):
+ self.log.debug("Starting merger listener")
+ while self._running:
+ try:
+ job = self.merger_worker.getJob()
+ try:
+ self.mergerJobDispatch(job)
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def mergerJobDispatch(self, job):
+ with self.run_lock:
+ if job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ elif job.name == 'merger:merge':
+ self.log.debug("Got merge job: %s" % job.unique)
+ self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+
+ def run_executor(self):
+ self.log.debug("Starting executor listener")
+ while self._running:
+ try:
+ job = self.executor_worker.getJob()
+ try:
+ self.executorJobDispatch(job)
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def executorJobDispatch(self, job):
+ with self.run_lock:
+ if not self._running:
+ job.sendWorkFail()
+ return
+ if job.name == 'executor:execute':
+ self.log.debug("Got execute job: %s" % job.unique)
+ self.executeJob(job)
+ elif job.name.startswith('executor:stop'):
+ self.log.debug("Got stop job: %s" % job.unique)
+ self.stopJob(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+
+ def executeJob(self, job):
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.incr(base_key + '.builds')
+ self.job_workers[job.unique] = self._job_class(self, job)
+ self.job_workers[job.unique].run()
+
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ try:
+ self.manageLoad()
+ except Exception:
+ self.log.exception("Exception in governor thread:")
+
+ def manageLoad(self):
+ ''' Apply some heuristics to decide whether or not we should
+ be askign for more jobs '''
+ load_avg = os.getloadavg()[0]
+ if self.accepting_work:
+ # Don't unregister if we don't have any active jobs.
+ if load_avg > self.max_load_avg and self.job_workers:
+ self.log.info(
+ "Unregistering due to high system load {} > {}".format(
+ load_avg, self.max_load_avg))
+ self.unregister_work()
+ elif load_avg <= self.max_load_avg:
+ self.log.info(
+ "Re-registering as load is within limits {} <= {}".format(
+ load_avg, self.max_load_avg))
+ self.register_work()
+ if self.statsd:
+ base_key = 'zuul.executor.%s' % self.hostname
+ self.statsd.gauge(base_key + '.load_average',
+ int(load_avg * 100))
+ self.statsd.gauge(base_key + '.running_builds',
+ len(self.job_workers))
+
+ def finishJob(self, unique):
+ del(self.job_workers[unique])
+
+ def stopJobDiskFull(self, jobdir):
+ unique = os.path.basename(jobdir)
+ self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
+
+ def stopJob(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Stop job with arguments: %s" % (args,))
+ unique = args['uuid']
+ self.stopJobByUnique(unique)
+ finally:
+ job.sendWorkComplete()
+
+ def stopJobByUnique(self, unique, reason=None):
+ job_worker = self.job_workers.get(unique)
+ if not job_worker:
+ self.log.debug("Unable to find worker for job %s" % (unique,))
+ return
+ try:
+ job_worker.stop(reason)
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+
+ def cat(self, job):
+ args = json.loads(job.arguments)
+ task = self.update(args['connection'], args['project'])
+ task.wait()
+ with self.merger_lock:
+ files = self.merger.getFiles(args['connection'], args['project'],
+ args['branch'], args['files'],
+ args.get('dirs', []))
+ result = dict(updated=True,
+ files=files)
+ job.sendWorkComplete(json.dumps(result))
+
+ def refstate(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ success, repo_state = self.merger.getRepoState(args['items'])
+ result = dict(updated=success,
+ repo_state=repo_state)
+ job.sendWorkComplete(json.dumps(result))
+
+ def merge(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ ret = self.merger.mergeChanges(args['items'], args.get('files'),
+ args.get('dirs', []),
+ args.get('repo_state'))
+ result = dict(merged=(ret is not None))
+ if ret is None:
+ result['commit'] = result['files'] = result['repo_state'] = None
+ else:
+ (result['commit'], result['files'], result['repo_state'],
+ recent) = ret
+ job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+ def __init__(self):
+ self.queue = collections.deque()
+ self.lock = threading.RLock()
+ self.condition = threading.Condition(self.lock)
+ self.join_condition = threading.Condition(self.lock)
+ self.tasks = 0
+
+ def qsize(self):
+ return len(self.queue)
+
+ def empty(self):
+ return self.qsize() == 0
+
+ def put(self, item):
+ # Returns the original item if added, or an updated equivalent
+ # item if already enqueued.
+ self.condition.acquire()
+ ret = None
+ try:
+ for x in self.queue:
+ if item == x:
+ ret = x
+ if hasattr(ret, 'merge'):
+ ret.merge(item)
+ if ret is None:
+ ret = item
+ self.queue.append(item)
+ self.condition.notify()
+ finally:
+ self.condition.release()
+ return ret
+
+ def get(self):
+ self.condition.acquire()
+ try:
+ while True:
+ try:
+ ret = self.queue.popleft()
+ self.join_condition.acquire()
+ self.tasks += 1
+ self.join_condition.release()
+ return ret
+ except IndexError:
+ self.condition.wait()
+ finally:
+ self.condition.release()
+
+ def task_done(self):
+ self.join_condition.acquire()
+ self.tasks -= 1
+ self.join_condition.notify()
+ self.join_condition.release()
+
+ def join(self):
+ self.join_condition.acquire()
+ while self.tasks:
+ self.join_condition.wait()
+ self.join_condition.release()
diff --git a/zuul/lib/statsd.py b/zuul/lib/statsd.py
new file mode 100644
index 0000000..0ccacf9
--- /dev/null
+++ b/zuul/lib/statsd.py
@@ -0,0 +1,30 @@
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from zuul.lib.config import get_default
+
+
+def get_statsd_config(config):
+ statsd_host = get_default(config, 'statsd', 'server')
+ statsd_port = int(get_default(config, 'statsd', 'port', 8125))
+ statsd_prefix = get_default(config, 'statsd', 'prefix')
+ return (statsd_host, statsd_port, statsd_prefix)
+
+
+def get_statsd(config):
+ (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(config)
+ if statsd_host is None:
+ return None
+ import statsd
+ return statsd.StatsClient(statsd_host, statsd_port, statsd_prefix)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index b94b8a5..edea69c 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -11,6 +11,7 @@
# under the License.
import logging
+import textwrap
from zuul import exceptions
from zuul import model
@@ -42,9 +43,10 @@
class PipelineManager(object):
"""Abstract Base Class for enqueing and processing Changes in a Pipeline"""
- log = logging.getLogger("zuul.PipelineManager")
-
def __init__(self, sched, pipeline):
+ self.log = logging.getLogger("zuul.Pipeline.%s.%s" %
+ (pipeline.layout.tenant.name,
+ pipeline.name,))
self.sched = sched
self.pipeline = pipeline
self.event_filters = []
@@ -228,20 +230,17 @@
(item.change, change_queue))
change_queue.enqueueItem(item)
- # Get an updated copy of the layout if necessary.
- # This will return one of the following:
- # 1) An existing layout from the item ahead or pipeline.
- # 2) A newly created layout from the cached pipeline
- # layout config plus the previously returned
- # in-repo files stored in the buildset.
- # 3) None in the case that a fetch of the files from
- # the merger is still pending.
- item.current_build_set.layout = self.getLayout(item)
-
- # Rebuild the frozen job tree from the new layout, if
- # we have one. If not, it will be built later.
- if item.current_build_set.layout:
- item.freezeJobGraph()
+ # Get an updated copy of the layout and update the job
+ # graph if necessary. This resumes the buildset merge
+ # state machine. If we have an up-to-date layout, it
+ # will go ahead and refresh the job graph if needed;
+ # or it will send a new merge job if necessary, or it
+ # will do nothing if we're waiting on a merge job.
+ item.job_graph = None
+ item.layout = None
+ if item.active:
+ if self.prepareItem(item):
+ self.prepareJobs(item)
# Re-set build results in case any new jobs have been
# added to the tree.
@@ -373,7 +372,7 @@
def executeJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was executed. Appears to be a longstanding bug.
- if not item.current_build_set.layout:
+ if not item.layout:
return False
jobs = item.findJobsToRun(
@@ -407,13 +406,8 @@
old_build_set.item, build.job)
if not was_running:
- try:
- nodeset = build.build_set.getJobNodeSet(build.job.name)
- self.sched.nodepool.returnNodeSet(nodeset)
- except Exception:
- self.log.exception("Unable to return nodeset %s for "
- "canceled build request %s" %
- (nodeset, build))
+ nodeset = build.build_set.getJobNodeSet(build.job.name)
+ self.sched.nodepool.returnNodeSet(nodeset)
build.result = 'CANCELED'
canceled = True
canceled_jobs.add(build.job.name)
@@ -434,30 +428,60 @@
import zuul.configloader
loader = zuul.configloader.ConfigLoader()
- build_set = item.current_build_set
self.log.debug("Loading dynamic layout")
+ (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
+ build_set = item.current_build_set
+ trusted_layout_verified = False
try:
# First parse the config as it will land with the
# full set of config and project repos. This lets us
# catch syntax errors in config repos even though we won't
# actually run with that config.
- loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=True,
- scheduler=self.sched,
- connections=self.sched.connections)
+ if trusted_updates:
+ self.log.debug("Loading dynamic layout (phase 1)")
+ loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=True,
+ scheduler=self.sched,
+ connections=self.sched.connections)
+ trusted_layout_verified = True
# Then create the config a second time but without changes
# to config repos so that we actually use this config.
- layout = loader.createDynamicLayout(
- item.pipeline.layout.tenant,
- build_set.files,
- include_config_projects=False)
+ if untrusted_updates:
+ self.log.debug("Loading dynamic layout (phase 2)")
+ layout = loader.createDynamicLayout(
+ item.pipeline.layout.tenant,
+ build_set.files,
+ include_config_projects=False)
+ else:
+ # We're a change to a config repo (with no untrusted
+ # items ahead), so just use the most recently
+ # generated layout.
+ if item.item_ahead:
+ return item.item_ahead.layout
+ else:
+ return item.queue.pipeline.layout
+ self.log.debug("Loading dynamic layout complete")
except zuul.configloader.ConfigurationSyntaxError as e:
- self.log.info("Configuration syntax error "
- "in dynamic layout")
- item.setConfigError(str(e))
+ self.log.info("Configuration syntax error in dynamic layout")
+ if trusted_layout_verified:
+ # The config is good if we include config-projects,
+ # but is currently invalid if we omit them. Instead
+ # of returning the whole error message, just leave a
+ # note that the config will work once the dependent
+ # changes land.
+ msg = "This change depends on a change "\
+ "to a config project.\n\n"
+ msg += textwrap.fill(textwrap.dedent("""\
+ The syntax of the configuration in this change has
+ been verified to be correct once the config project
+ change upon which it depends is merged, but it can not
+ be used until that occurs."""))
+ item.setConfigError(msg)
+ else:
+ item.setConfigError(str(e))
return None
except Exception:
self.log.exception("Error in dynamic layout")
@@ -468,7 +492,7 @@
def getLayout(self, item):
if not item.change.updatesConfig():
if item.item_ahead:
- return item.item_ahead.current_build_set.layout
+ return item.item_ahead.layout
else:
return item.queue.pipeline.layout
# This item updates the config, ask the merger for the result.
@@ -519,10 +543,9 @@
def prepareJobs(self, item):
# This only runs once the item is in the pipeline's action window
# Returns True if the item is ready, false otherwise
- build_set = item.current_build_set
- if not build_set.layout:
- build_set.layout = self.getLayout(item)
- if not build_set.layout:
+ if not item.layout:
+ item.layout = self.getLayout(item)
+ if not item.layout:
return False
if not item.job_graph:
@@ -748,8 +771,7 @@
# pipeline, use the dynamic layout if available, otherwise,
# fall back to the current static layout as a best
# approximation.
- layout = (item.current_build_set.layout or
- self.pipeline.layout)
+ layout = (item.layout or self.pipeline.layout)
project_in_pipeline = True
if not layout.getProjectPipelineConfig(item.change.project,
@@ -815,19 +837,28 @@
dt = None
items = len(self.pipeline.getAllItems())
- # stats.timers.zuul.pipeline.NAME.resident_time
- # stats_counts.zuul.pipeline.NAME.total_changes
- # stats.gauges.zuul.pipeline.NAME.current_changes
- key = 'zuul.pipeline.%s' % self.pipeline.name
+ tenant = self.pipeline.layout.tenant
+ basekey = 'zuul.tenant.%s' % tenant.name
+ key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.total_changes
+ # stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
self.sched.statsd.gauge(key + '.current_changes', items)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
- # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
- project_name = item.change.project.name.replace('/', '.')
- key += '.%s' % project_name
+ hostname = (item.change.project.canonical_hostname.
+ replace('.', '_'))
+ projectname = (item.change.project.name.
+ replace('.', '_').replace('/', '.'))
+ projectname = projectname.replace('.', '_').replace('/', '.')
+ branchname = item.change.branch.replace('.', '_').replace('/', '.')
+ # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.resident_time
+ # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.
+ # project.<host>.<project>.<branch>.total_changes
+ key += '.project.%s.%s.%s' % (hostname, projectname, branchname)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index 411894e..5aef453 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
-
from zuul import model
from zuul.manager import PipelineManager, StaticChangeQueueContextManager
from zuul.manager import DynamicChangeQueueContextManager
@@ -25,7 +23,6 @@
using the Optmistic Branch Prediction logic with Nearest Non-Failing Item
reparenting algorithm for handling errors.
"""
- log = logging.getLogger("zuul.DependentPipelineManager")
changes_merge = True
def __init__(self, *args, **kwargs):
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
index 7b0a9f5..65f5ca0 100644
--- a/zuul/manager/independent.py
+++ b/zuul/manager/independent.py
@@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
-
from zuul import model
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
@@ -19,7 +17,6 @@
class IndependentPipelineManager(PipelineManager):
"""PipelineManager that puts every Change into its own ChangeQueue."""
- log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
def _postConfig(self, layout):
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 8b98bfb..035d1d0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+from contextlib import contextmanager
+import logging
+import os
+import shutil
+
import git
import gitdb
-import os
-import logging
import zuul.model
@@ -38,22 +41,37 @@
raise
+@contextmanager
+def timeout_handler(path):
+ try:
+ yield
+ except git.exc.GitCommandError as e:
+ if e.status == -9:
+ # Timeout. The repo could be in a bad state, so delete it.
+ shutil.rmtree(path)
+ raise
+
+
class ZuulReference(git.Reference):
_common_path_default = "refs/zuul"
_points_to_commits_only = True
class Repo(object):
- def __init__(self, remote, local, email, username, sshkey=None,
- cache_path=None, logger=None):
+ def __init__(self, remote, local, email, username, speed_limit, speed_time,
+ sshkey=None, cache_path=None, logger=None, git_timeout=300):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
self.log = logger
+ self.env = {
+ 'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+ 'GIT_HTTP_LOW_SPEED_TIME': speed_time,
+ }
+ self.git_timeout = git_timeout
if sshkey:
- self.env = {'GIT_SSH_COMMAND': 'ssh -i %s' % (sshkey,)}
- else:
- self.env = {}
+ self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
+
self.remote_url = remote
self.local_path = local
self.email = email
@@ -62,7 +80,7 @@
self._initialized = False
try:
self._ensure_cloned()
- except:
+ except Exception:
self.log.exception("Unable to initialize repo for %s" % remote)
def _ensure_cloned(self):
@@ -75,12 +93,10 @@
self.log.debug("Cloning from %s to %s" % (self.remote_url,
self.local_path))
if self.cache_path:
- git.Repo.clone_from(self.cache_path, self.local_path,
- env=self.env)
+ self._git_clone(self.cache_path)
rewrite_url = True
else:
- git.Repo.clone_from(self.remote_url, self.local_path,
- env=self.env)
+ self._git_clone(self.remote_url)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
# Create local branches corresponding to all the remote branches
@@ -104,6 +120,18 @@
def isInitialized(self):
return self._initialized
+ def _git_clone(self, url):
+ mygit = git.cmd.Git(os.getcwd())
+ mygit.update_environment(**self.env)
+ with timeout_handler(self.local_path):
+ mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+ kill_after_timeout=self.git_timeout)
+
+ def _git_fetch(self, repo, remote, ref=None, **kwargs):
+ with timeout_handler(self.local_path):
+ repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
+ **kwargs)
+
def createRepoObject(self):
self._ensure_cloned()
repo = git.Repo(self.local_path)
@@ -225,19 +253,18 @@
def fetch(self, ref):
repo = self.createRepoObject()
- # The git.remote.fetch method may read in git progress info and
- # interpret it improperly causing an AssertionError. Because the
- # data was fetched properly subsequent fetches don't seem to fail.
- # So try again if an AssertionError is caught.
- origin = repo.remotes.origin
- try:
- origin.fetch(ref)
- except AssertionError:
- origin.fetch(ref)
+ # NOTE: The following is currently not applicable, but if we
+ # switch back to fetch methods from GitPython, we need to
+ # consider it:
+ # The git.remote.fetch method may read in git progress info and
+ # interpret it improperly causing an AssertionError. Because the
+ # data was fetched properly subsequent fetches don't seem to fail.
+ # So try again if an AssertionError is caught.
+ self._git_fetch(repo, 'origin', ref)
def fetchFrom(self, repository, ref):
repo = self.createRepoObject()
- repo.git.fetch(repository, ref)
+ self._git_fetch(repo, repository, ref)
def createZuulRef(self, ref, commit='HEAD'):
repo = self.createRepoObject()
@@ -254,15 +281,14 @@
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
- origin = repo.remotes.origin
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
- origin.fetch()
- origin.fetch(tags=True)
+ self._git_fetch(repo, 'origin')
+ self._git_fetch(repo, 'origin', tags=True)
def getFiles(self, files, dirs=[], branch=None, commit=None):
ret = {}
@@ -293,7 +319,7 @@
class Merger(object):
def __init__(self, working_root, connections, email, username,
- cache_root=None, logger=None):
+ speed_limit, speed_time, cache_root=None, logger=None):
self.logger = logger
if logger is None:
self.log = logging.getLogger("zuul.Merger")
@@ -306,6 +332,8 @@
self.connections = connections
self.email = email
self.username = username
+ self.speed_limit = speed_limit
+ self.speed_time = speed_time
self.cache_root = cache_root
def _addProject(self, hostname, project_name, url, sshkey):
@@ -318,8 +346,9 @@
project_name)
else:
cache_path = None
- repo = Repo(url, path, self.email, self.username,
- sshkey, cache_path, self.logger)
+ repo = Repo(
+ url, path, self.email, self.username, self.speed_limit,
+ self.speed_time, sshkey, cache_path, self.logger)
self.repos[key] = repo
except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
'/var/lib/zuul/merger-git')
merge_email = get_default(self.config, 'merger', 'git_user_email')
merge_name = get_default(self.config, 'merger', 'git_user_name')
-
- self.merger = merger.Merger(merge_root, connections, merge_email,
- merge_name)
+ speed_limit = get_default(
+ config, 'merger', 'git_http_low_speed_limit', '1000')
+ speed_time = get_default(
+ config, 'merger', 'git_http_low_speed_time', '30')
+ self.merger = merger.Merger(
+ merge_root, connections, merge_email, merge_name, speed_limit,
+ speed_time)
def start(self):
self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index 4c5a51f..7d6e80c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -23,6 +23,8 @@
import urllib.parse
import textwrap
+from zuul import change_matcher
+
MERGER_MERGE = 1 # "git merge"
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
MERGER_CHERRY_PICK = 3 # "git cherry-pick"
@@ -44,6 +46,12 @@
'high': PRECEDENCE_HIGH,
}
+PRIORITY_MAP = {
+ PRECEDENCE_NORMAL: 200,
+ PRECEDENCE_LOW: 300,
+ PRECEDENCE_HIGH: 100,
+}
+
# Request states
STATE_REQUESTED = 'requested'
STATE_PENDING = 'pending'
@@ -515,6 +523,7 @@
self.job = job
self.nodeset = nodeset
self._state = STATE_REQUESTED
+ self.requested_time = time.time()
self.state_time = time.time()
self.stat = None
self.uid = uuid4().hex
@@ -525,6 +534,14 @@
self.canceled = False
@property
+ def priority(self):
+ if self.build_set:
+ precedence = self.build_set.item.pipeline.precedence
+ else:
+ precedence = PRECEDENCE_NORMAL
+ return PRIORITY_MAP[precedence]
+
+ @property
def fulfilled(self):
return (self._state == STATE_FULFILLED) and not self.failed
@@ -817,6 +834,7 @@
self.other_attributes = dict(
name=None,
source_context=None,
+ source_line=None,
inheritance_path=(),
)
@@ -851,14 +869,16 @@
return self.name
def __repr__(self):
- return '<Job %s branches: %s source: %s>' % (self.name,
- self.branch_matcher,
- self.source_context)
+ return '<Job %s branches: %s source: %s#%s>' % (
+ self.name,
+ self.branch_matcher,
+ self.source_context,
+ self.source_line)
def __getattr__(self, name):
v = self.__dict__.get(name)
if v is None:
- return copy.deepcopy(self.attributes[name])
+ return self.attributes[name]
return v
def _get(self, name):
@@ -868,6 +888,8 @@
return Attributes(name=self.name)
def setRun(self):
+ msg = 'self %s' % (repr(self),)
+ self.inheritance_path = self.inheritance_path + (msg,)
if not self.run:
self.run = self.implied_run
@@ -898,14 +920,21 @@
if changed:
self.roles = tuple(newroles)
+ def setBranchMatcher(self, branches):
+ # Set the branch matcher to match any of the supplied branches
+ matchers = []
+ for branch in branches:
+ matchers.append(change_matcher.BranchMatcher(branch))
+ self.branch_matcher = change_matcher.MatchAny(matchers)
+
def updateVariables(self, other_vars):
- v = self.variables
+ v = copy.deepcopy(self.variables)
Job._deepUpdate(v, other_vars)
self.variables = v
def updateProjects(self, other_projects):
- required_projects = self.required_projects
- Job._deepUpdate(required_projects, other_projects)
+ required_projects = self.required_projects.copy()
+ required_projects.update(other_projects)
self.required_projects = required_projects
@staticmethod
@@ -932,7 +961,7 @@
# copy all attributes
for k in self.inheritable_attributes:
if (other._get(k) is not None):
- setattr(self, k, copy.deepcopy(getattr(other, k)))
+ setattr(self, k, getattr(other, k))
msg = 'inherit from %s' % (repr(other),)
self.inheritance_path = other.inheritance_path + (msg,)
@@ -1025,12 +1054,14 @@
else:
self.jobs[job.name] = [job]
- def inheritFrom(self, other):
+ def inheritFrom(self, other, implied_branch):
for jobname, jobs in other.jobs.items():
- if jobname in self.jobs:
- self.jobs[jobname].extend(jobs)
- else:
- self.jobs[jobname] = jobs
+ joblist = self.jobs.setdefault(jobname, [])
+ for job in jobs:
+ if not job.branch_matcher and implied_branch:
+ job = job.copy()
+ job.setBranchMatcher([implied_branch])
+ joblist.append(job)
class JobGraph(object):
@@ -1129,7 +1160,6 @@
self.start_time = None
self.end_time = None
self.estimated_time = None
- self.pipeline = None
self.canceled = False
self.retry = False
self.parameters = {}
@@ -1141,6 +1171,10 @@
return ('<Build %s of %s on %s>' %
(self.uuid, self.job.name, self.worker))
+ @property
+ def pipeline(self):
+ return self.build_set.item.pipeline
+
def getSafeAttributes(self):
return Attributes(uuid=self.uuid,
result=self.result,
@@ -1229,8 +1263,6 @@
self.item = item
self.builds = {}
self.result = None
- self.next_build_set = None
- self.previous_build_set = None
self.uuid = None
self.commit = None
self.dependent_items = None
@@ -1243,7 +1275,6 @@
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
self.repo_state = {}
- self.layout = None
self.tries = {}
@property
@@ -1338,7 +1369,7 @@
item = self.item
layout = None
while item:
- layout = item.current_build_set.layout
+ layout = item.layout
if layout:
break
item = item.item_ahead
@@ -1369,10 +1400,8 @@
self.pipeline = queue.pipeline
self.queue = queue
self.change = change # a ref
- self.build_sets = []
self.dequeued_needing_change = False
self.current_build_set = BuildSet(self)
- self.build_sets.append(self.current_build_set)
self.item_ahead = None
self.items_behind = []
self.enqueue_time = None
@@ -1382,7 +1411,7 @@
self.quiet = False
self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all
- # TODO(jeblair): move job_graph to buildset
+ self.layout = None
self.job_graph = None
def __repr__(self):
@@ -1394,17 +1423,12 @@
id(self), self.change, pipeline)
def resetAllBuilds(self):
- old = self.current_build_set
- self.current_build_set.result = 'CANCELED'
self.current_build_set = BuildSet(self)
- old.next_build_set = self.current_build_set
- self.current_build_set.previous_build_set = old
- self.build_sets.append(self.current_build_set)
+ self.layout = None
self.job_graph = None
def addBuild(self, build):
self.current_build_set.addBuild(build)
- build.pipeline = self.pipeline
def removeBuild(self, build):
self.current_build_set.removeBuild(build)
@@ -1415,8 +1439,7 @@
def freezeJobGraph(self):
"""Find or create actual matching jobs for this item's change and
store the resulting job tree."""
- layout = self.current_build_set.layout
- job_graph = layout.createJobGraph(self)
+ job_graph = self.layout.createJobGraph(self)
for job in job_graph.getJobs():
# Ensure that each jobs's dependencies are fully
# accessible. This will raise an exception if not.
@@ -1491,6 +1514,25 @@
def wasDequeuedNeedingChange(self):
return self.dequeued_needing_change
+ def includesConfigUpdates(self):
+ includes_trusted = False
+ includes_untrusted = False
+ tenant = self.pipeline.layout.tenant
+ item = self
+ while item:
+ if item.change.updatesConfig():
+ (trusted, project) = tenant.getProject(
+ item.change.project.canonical_name)
+ if trusted:
+ includes_trusted = True
+ else:
+ includes_untrusted = True
+ if includes_trusted and includes_untrusted:
+ # We're done early
+ return (includes_trusted, includes_untrusted)
+ item = item.item_ahead
+ return (includes_trusted, includes_untrusted)
+
def isHoldingFollowingChanges(self):
if not self.live:
return False
@@ -2098,6 +2140,7 @@
def __init__(self, name):
self.name = name
self.merge_mode = None
+ # The default branch for the project (usually master).
self.default_branch = None
self.pipelines = {}
self.private_key_file = None
@@ -2277,6 +2320,7 @@
"""Holds all of the Pipelines."""
def __init__(self, tenant):
+ self.uuid = uuid4().hex
self.tenant = tenant
self.project_configs = {}
self.project_templates = {}
@@ -2498,14 +2542,14 @@
@staticmethod
def _max_count(item, semaphore_name):
- if not item.current_build_set.layout:
+ if not item.layout:
# This should not occur as the layout of the item must already be
# built when acquiring or releasing a semaphore for a job.
raise Exception("Item {} has no layout".format(item))
# find the right semaphore
default_semaphore = Semaphore(semaphore_name, 1)
- semaphores = item.current_build_set.layout.semaphores
+ semaphores = item.layout.semaphores
return semaphores.get(semaphore_name, default_semaphore).max
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index dc855cd..7dafca0 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -22,6 +22,35 @@
self.requests = {}
self.sched = scheduler
+ def emitStats(self, request):
+ if not self.sched.statsd:
+ return
+ statsd = self.sched.statsd
+ # counter zuul.nodepool.requested
+ # counter zuul.nodepool.requested.label.<label>
+ # counter zuul.nodepool.requested.size.<size>
+ # gauge zuul.nodepool.current_requests
+ state = request.state
+ if request.canceled:
+ state = 'canceled'
+ dt = None
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ dt = int((request.state_time - request.requested_time) * 1000)
+ else:
+ dt = None
+ key = 'zuul.nodepool.%s' % state
+ statsd.incr(key)
+ if dt:
+ statsd.timing(key, dt)
+ for node in request.nodeset.getNodes():
+ statsd.incr(key + '.label.%s' % node.label)
+ if dt:
+ statsd.timing(key + '.label.%s' % node.label, dt)
+ statsd.incr(key + '.size.%s' % len(request.nodeset.nodes))
+ if dt:
+ statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
+ statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
+
def requestNodes(self, build_set, job):
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
@@ -33,6 +62,7 @@
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
self.log.info("Submited node request %s" % (req,))
+ self.emitStats(req)
else:
self.log.info("Fulfilling empty node request %s" % (req,))
req.state = model.STATE_FULFILLED
@@ -90,10 +120,15 @@
self.log.info("Returning nodeset %s" % (nodeset,))
for node in nodeset.getNodes():
if node.lock is None:
- raise Exception("Node %s is not locked" % (node,))
- if node.state == model.STATE_IN_USE:
- node.state = model.STATE_USED
- self.sched.zk.storeNode(node)
+ self.log.error("Node %s is not locked" % (node,))
+ else:
+ try:
+ if node.state == model.STATE_IN_USE:
+ node.state = model.STATE_USED
+ self.sched.zk.storeNode(node)
+ except Exception:
+ self.log.exception("Exception storing node %s "
+ "while unlocking:" % (node,))
self._unlockNodes(nodeset.getNodes())
def unlockNodeSet(self, nodeset):
@@ -106,18 +141,21 @@
except Exception:
self.log.exception("Error unlocking node:")
- def lockNodeSet(self, nodeset):
- self._lockNodes(nodeset.getNodes())
+ def lockNodeSet(self, nodeset, request_id):
+ self._lockNodes(nodeset.getNodes(), request_id)
- def _lockNodes(self, nodes):
+ def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
+ if node.allocated_to != request_id:
+ raise Exception("Node %s allocated to %s, not %s" %
+ (node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
- self.sched.zk.lockNode(node)
+ self.sched.zk.lockNode(node, timeout=30)
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
@@ -134,29 +172,48 @@
if request.canceled:
del self.requests[request.uid]
+ self.emitStats(request)
return False
- if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+ # TODOv3(jeblair): handle allocation failure
+ if deleted:
+ self.log.debug("Resubmitting lost node request %s" % (request,))
+ request.id = None
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
self.log.info("Node request %s %s" % (request, request.state))
# Give our results to the scheduler.
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]
+ self.emitStats(request)
+
# Stop watching this request node.
return False
- # TODOv3(jeblair): handle allocation failure
- elif deleted:
- self.log.debug("Resubmitting lost node request %s" % (request,))
- self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
return True
- def acceptNodes(self, request):
+ def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use.
self.log.info("Accepting node request %s" % (request,))
+ if request_id != request.id:
+ self.log.info("Skipping node accept for %s (resubmitted as %s)",
+ request_id, request.id)
+ return
+
+ # Make sure the request still exists. It's possible it could have
+ # disappeared if we lost the ZK session between when the fulfillment
+ # response was added to our queue, and when we actually get around to
+ # processing it. Nodepool will automatically reallocate the assigned
+ # nodes in that situation.
+ if not self.sched.zk.nodeRequestExists(request):
+ self.log.info("Request %s no longer exists", request.id)
+ return
+
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
@@ -166,7 +223,7 @@
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
try:
- self.lockNodeSet(request.nodeset)
+ self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
self.log.exception("Error locking nodes:")
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 1a0a084..8f2e5dc 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -56,6 +56,14 @@
'count': count}
return not self.submitJob('zuul:autohold', data).failure
+ def autohold_list(self):
+ data = {}
+ job = self.submitJob('zuul:autohold_list', data)
+ if job.failure:
+ return False
+ else:
+ return json.loads(job.data[0])
+
def enqueue(self, tenant, pipeline, project, trigger, change):
data = {'tenant': tenant,
'pipeline': pipeline,
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 52a7e51..11d6684 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -50,6 +50,7 @@
def register(self):
self.worker.registerFunction("zuul:autohold")
+ self.worker.registerFunction("zuul:autohold_list")
self.worker.registerFunction("zuul:enqueue")
self.worker.registerFunction("zuul:enqueue_ref")
self.worker.registerFunction("zuul:promote")
@@ -90,6 +91,17 @@
except Exception:
self.log.exception("Exception while getting job")
+ def handle_autohold_list(self, job):
+ req = {}
+
+ # The json.dumps() call cannot handle dict keys that are not strings
+ # so we convert our key to a CSV string that the caller can parse.
+ for key, value in self.sched.autohold_requests.items():
+ new_key = ','.join(key)
+ req[new_key] = value
+
+ job.sendWorkComplete(json.dumps(req))
+
def handle_autohold(self, job):
args = json.loads(job.arguments)
params = {}
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 5432661..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import extras
import json
import logging
import os
@@ -31,6 +30,8 @@
from zuul import exceptions
from zuul import version as zuul_version
from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
class ManagementEvent(object):
@@ -71,10 +72,28 @@
the path specified in the configuration.
:arg Tenant tenant: the tenant to reconfigure
+ :arg Project project: if supplied, clear the cached configuration
+ from this project first
"""
- def __init__(self, tenant):
+ def __init__(self, tenant, project):
super(TenantReconfigureEvent, self).__init__()
- self.tenant = tenant
+ self.tenant_name = tenant.name
+ self.projects = set([project])
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ if not isinstance(other, TenantReconfigureEvent):
+ return False
+ # We don't check projects because they will get combined when
+ # merged.
+ return (self.tenant_name == other.tenant_name)
+
+ def merge(self, other):
+ if self.tenant_name != other.tenant_name:
+ raise Exception("Can not merge events from different tenants")
+ self.projects |= other.projects
class PromoteEvent(ManagementEvent):
@@ -161,6 +180,7 @@
def __init__(self, request):
self.request = request
+ self.request_id = request.id
def toList(item):
@@ -207,7 +227,7 @@
self.executor = None
self.merger = None
self.connections = None
- self.statsd = extras.try_import('statsd.statsd')
+ self.statsd = get_statsd(config)
# TODO(jeblair): fix this
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
@@ -219,7 +239,7 @@
self.trigger_event_queue = queue.Queue()
self.result_event_queue = queue.Queue()
- self.management_event_queue = queue.Queue()
+ self.management_event_queue = zuul.lib.queue.MergedQueue()
self.abide = model.Abide()
if not testonly:
@@ -259,22 +279,16 @@
self.zk = zk
def addEvent(self, event):
- self.log.debug("Adding trigger event: %s" % event)
self.trigger_event_queue.put(event)
self.wake_event.set()
- self.log.debug("Done adding trigger event: %s" % event)
def onBuildStarted(self, build):
- self.log.debug("Adding start event for build: %s" % build)
build.start_time = time.time()
event = BuildStartedEvent(build)
self.result_event_queue.put(event)
self.wake_event.set()
- self.log.debug("Done adding start event for build: %s" % build)
def onBuildCompleted(self, build, result, result_data):
- self.log.debug("Adding complete event for build: %s result: %s" % (
- build, result))
build.end_time = time.time()
build.result_data = result_data
# Note, as soon as the result is set, other threads may act
@@ -284,61 +298,60 @@
build.result = result
try:
if self.statsd and build.pipeline:
- jobname = build.job.name.replace('.', '_')
- key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+ tenant = build.pipeline.layout.tenant
+ jobname = build.job.name.replace('.', '_').replace('/', '_')
+ hostname = (build.build_set.item.change.project.
+ canonical_hostname.replace('.', '_'))
+ projectname = (build.build_set.item.change.project.name.
+ replace('.', '_').replace('/', '_'))
+ branchname = (build.build_set.item.change.branch.
+ replace('.', '_').replace('/', '_'))
+ basekey = 'zuul.tenant.%s' % tenant.name
+ pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
+ key = '%s.all_jobs' % pipekey
self.statsd.incr(key)
- for label in build.node_labels:
- # Jenkins includes the node name in its list of labels, so
- # we filter it out here, since that is not statistically
- # interesting.
- if label == build.node_name:
- continue
- dt = int((build.start_time - build.execute_time) * 1000)
- key = 'zuul.pipeline.%s.label.%s.wait_time' % (
- build.pipeline.name, label)
- self.statsd.timing(key, dt)
- key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
- jobname, build.result)
+ jobkey = '%s.project.%s.%s.%s.job.%s' % (
+ pipekey, hostname, projectname, branchname, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.<result>
+ key = '%s.%s' % (jobkey, build.result)
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
dt = int((build.end_time - build.start_time) * 1000)
self.statsd.timing(key, dt)
self.statsd.incr(key)
-
- key = 'zuul.pipeline.%s.job.%s.wait_time' % (
- build.pipeline.name, jobname)
+ # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+ # <host>.<project>.<branch>.job.<job>.wait_time
+ key = '%s.wait_time' % jobkey
dt = int((build.start_time - build.execute_time) * 1000)
self.statsd.timing(key, dt)
- except:
+ except Exception:
self.log.exception("Exception reporting runtime stats")
event = BuildCompletedEvent(build)
self.result_event_queue.put(event)
self.wake_event.set()
- self.log.debug("Done adding complete event for build: %s" % build)
def onMergeCompleted(self, build_set, merged, updated,
commit, files, repo_state):
- self.log.debug("Adding merge complete event for build set: %s" %
- build_set)
event = MergeCompletedEvent(build_set, merged,
updated, commit, files, repo_state)
self.result_event_queue.put(event)
self.wake_event.set()
def onNodesProvisioned(self, req):
- self.log.debug("Adding nodes provisioned event for build set: %s" %
- req.build_set)
event = NodesProvisionedEvent(req)
self.result_event_queue.put(event)
self.wake_event.set()
- def reconfigureTenant(self, tenant):
- self.log.debug("Prepare to reconfigure")
- event = TenantReconfigureEvent(tenant)
+ def reconfigureTenant(self, tenant, project):
+ self.log.debug("Submitting tenant reconfiguration event for "
+ "%s due to project %s", tenant.name, project)
+ event = TenantReconfigureEvent(tenant, project)
self.management_event_queue.put(event)
self.wake_event.set()
def reconfigure(self, config):
- self.log.debug("Prepare to reconfigure")
+ self.log.debug("Submitting reconfiguration event")
event = ReconfigureEvent(config)
self.management_event_queue.put(event)
self.wake_event.set()
@@ -457,7 +470,7 @@
self.layout_lock.acquire()
self.config = event.config
try:
- self.log.debug("Performing reconfiguration")
+ self.log.debug("Full reconfiguration beginning")
loader = configloader.ConfigLoader()
abide = loader.loadConfig(
self.config.get('scheduler', 'tenant_config'),
@@ -468,24 +481,31 @@
self.abide = abide
finally:
self.layout_lock.release()
+ self.log.debug("Full reconfiguration complete")
def _doTenantReconfigureEvent(self, event):
# This is called in the scheduler loop after another thread submits
# a request
self.layout_lock.acquire()
try:
- self.log.debug("Performing tenant reconfiguration")
+ self.log.debug("Tenant reconfiguration beginning")
+ # If a change landed to a project, clear out the cached
+ # config before reconfiguring.
+ for project in event.projects:
+ project.unparsed_config = None
+ old_tenant = self.abide.tenants[event.tenant_name]
loader = configloader.ConfigLoader()
abide = loader.reloadTenant(
self.config.get('scheduler', 'tenant_config'),
self._get_project_key_dir(),
self, self.merger, self.connections,
- self.abide, event.tenant)
- tenant = abide.tenants[event.tenant.name]
+ self.abide, old_tenant)
+ tenant = abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant)
self.abide = abide
finally:
self.layout_lock.release()
+ self.log.debug("Tenant reconfiguration complete")
def _reenqueueGetProject(self, tenant, item):
project = item.change.project
@@ -769,8 +789,7 @@
# or a branch was just created or deleted. Clear
# out cached data for this project and perform a
# reconfiguration.
- change.project.unparsed_config = None
- self.reconfigureTenant(tenant)
+ self.reconfigureTenant(tenant, change.project)
for pipeline in tenant.layout.pipelines.values():
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(change)
@@ -853,8 +872,12 @@
try:
self.nodepool.holdNodeSet(nodeset, autohold_key)
except Exception:
- self.log.exception("Unable to process autohold for %s",
+ self.log.exception("Unable to process autohold for %s:",
autohold_key)
+ if autohold_key in self.autohold_requests:
+ self.log.debug("Removing autohold %s due to exception",
+ autohold_key)
+ del self.autohold_requests[autohold_key]
self.nodepool.returnNodeSet(nodeset)
except Exception:
@@ -891,9 +914,10 @@
def _doNodesProvisionedEvent(self, event):
request = event.request
+ request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request)
+ self.nodepool.acceptNodes(request, request_id)
if request.canceled:
return
@@ -932,6 +956,9 @@
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
+ data['management_event_queue'] = {}
+ data['management_event_queue']['length'] = \
+ self.management_event_queue.qsize()
if self.last_reconfigured:
data['last_reconfigured'] = self.last_reconfigured * 1000
diff --git a/zuul/zk.py b/zuul/zk.py
index 5ea4e56..ede78be 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -90,7 +90,7 @@
def resetLostFlag(self):
self._became_lost = False
- def connect(self, hosts, read_only=False):
+ def connect(self, hosts, read_only=False, timeout=10.0):
'''
Establish a connection with ZooKeeper cluster.
@@ -100,10 +100,12 @@
:param str hosts: Comma-separated list of hosts to connect to (e.g.
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
:param bool read_only: If True, establishes a read-only connection.
-
+ :param float timeout: The ZooKeeper session timeout, in
+ seconds (default: 10.0).
'''
if self.client is None:
- self.client = KazooClient(hosts=hosts, read_only=read_only)
+ self.client = KazooClient(hosts=hosts, read_only=read_only,
+ timeout=timeout)
self.client.add_listener(self._connection_listener)
self.client.start()
@@ -145,12 +147,10 @@
from ZooKeeper). The watcher should return False when
further updates are no longer necessary.
'''
- priority = 100 # TODO(jeblair): integrate into nodereq
-
data = node_request.toDict()
data['created_time'] = time.time()
- path = '%s/%s-' % (self.REQUEST_ROOT, priority)
+ path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
path = self.client.create(path, self._dictToStr(data),
makepath=True,
sequence=True, ephemeral=True)
@@ -160,7 +160,6 @@
def callback(data, stat):
if data:
data = self._strToDict(data)
- node_request.updateFromDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
node_data = self._strToDict(node_data)
request_nodes[i].id = nodeid
request_nodes[i].updateFromDict(node_data)
+ node_request.updateFromDict(data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
@@ -187,6 +187,19 @@
except kze.NoNodeError:
pass
+ def nodeRequestExists(self, node_request):
+ '''
+ See if a NodeRequest exists in ZooKeeper.
+
+ :param NodeRequest node_request: A NodeRequest to verify.
+
+ :returns: True if the request exists, False otherwise.
+ '''
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ if self.client.exists(path):
+ return True
+ return False
+
def storeNode(self, node):
'''Store the node.
@@ -256,6 +269,9 @@
for nodeid in nodes:
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
node_data, node_stat = self.client.get(node_path)
+ if not node_data:
+ self.log.warning("Node ID %s has no data", nodeid)
+ continue
node_data = self._strToDict(node_data)
if (node_data['state'] == zuul.model.STATE_HOLD and
node_data.get('hold_job') == identifier):