Merge "Tidy up loggers"
diff --git a/.gitignore b/.gitignore
index 9703f16..e76a1bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
+*.sw?
*.egg
*.egg-info
*.pyc
+.idea
.test
.testrepository
.tox
diff --git a/.testr.conf b/.testr.conf
index 5433c07..8ef6689 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
[DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_LOG_LEVEL=${OS_LOG_LEVEL:-INFO} OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
diff --git a/README.rst b/README.rst
index ff4d938..90e00a5 100644
--- a/README.rst
+++ b/README.rst
@@ -1,20 +1,26 @@
Zuul
====
-Zuul is a trunk gating system developed for the OpenStack Project.
+Zuul is a project gating system developed for the OpenStack Project.
Contributing
------------
+We are currently engaged in a significant development effort in
+preparation for the third major version of Zuul. We call this effort
+`Zuul v3`_ and it is described in this file in the `feature/zuulv3`
+branch of this repo.
+
To browse the latest code, see: https://git.openstack.org/cgit/openstack-infra/zuul/tree/
To clone the latest code, use `git clone git://git.openstack.org/openstack-infra/zuul`
Bugs are handled at: https://storyboard.openstack.org/#!/project/679
-Code reviews are, as you might expect, handled by gerrit. The gerrit they
-use is http://review.openstack.org
+Code reviews are, as you might expect, handled by gerrit at
+https://review.openstack.org
-Use `git review` to submit patches (after creating a gerrit account that links to your launchpad account). Example::
+Use `git review` to submit patches (after creating a Gerrit account
+that links to your launchpad account). Example::
# Do your commits
$ git review
diff --git a/bindep.txt b/bindep.txt
new file mode 100644
index 0000000..a2cc02e
--- /dev/null
+++ b/bindep.txt
@@ -0,0 +1 @@
+libjpeg-dev [test]
diff --git a/doc/source/connections.rst b/doc/source/connections.rst
index f0820a6..835247b 100644
--- a/doc/source/connections.rst
+++ b/doc/source/connections.rst
@@ -38,6 +38,9 @@
Path to SSH key to use when logging into above server.
``sshkey=/home/zuul/.ssh/id_rsa``
+**keepalive**
+ Optional: Keepalive timeout, 0 means no keepalive.
+ ``keepalive=60``
Gerrit Configuration
~~~~~~~~~~~~~~~~~~~~
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 61f9e4f..3c793da 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -13,6 +13,7 @@
.. toctree::
:maxdepth: 2
+ quick-start
gating
connections
triggers
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index 0a1e0e7..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
.. _`Turbo-Hipster`:
- http://git.openstack.org/cgit/stackforge/turbo-hipster/
+ https://git.openstack.org/cgit/openstack/turbo-hipster/
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/
@@ -239,7 +239,7 @@
instead. As an example, the OpenStack project uses the following
script to prepare the workspace for its integration testing:
- https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh
+ https://git.openstack.org/cgit/openstack-infra/devstack-gate/tree/devstack-vm-gate-wrap.sh
Turbo Hipster Worker
~~~~~~~~~~~~~~~~~~~~
diff --git a/doc/source/merger.rst b/doc/source/merger.rst
index e01bc8c..82e204b 100644
--- a/doc/source/merger.rst
+++ b/doc/source/merger.rst
@@ -58,3 +58,17 @@
depending on what the state of Zuul's repository is when the clone
happens). They are, however, suitable for automated systems that
respond to Zuul triggers.
+
+Clearing old references
+~~~~~~~~~~~~~~~~~~~~~~~
+
+The references created under refs/zuul are not garbage collected. Since
+git fetch send them all to Gerrit to sync the repositories, the time
+spent on merge will slightly grow overtime and start being noticeable.
+
+To clean them you can use the ``tools/zuul-clear-refs.py`` script on
+each repositories. It will delete Zuul references that point to commits
+for which the commit date is older than a given amount of days (default
+360)::
+
+ ./tools/zuul-clear-refs.py /path/to/zuul/git/repo
diff --git a/doc/source/quick-start.rst b/doc/source/quick-start.rst
new file mode 100644
index 0000000..82779c6
--- /dev/null
+++ b/doc/source/quick-start.rst
@@ -0,0 +1,162 @@
+Quick Start Guide
+=================
+
+System Requirements
+-------------------
+
+For most deployments zuul only needs 1-2GB. OpenStack uses a 30GB setup.
+
+Install Zuul
+------------
+
+You can get zuul from pypi via::
+
+ pip install zuul
+
+Zuul Components
+---------------
+
+Zuul provides the following components:
+
+ - **zuul-server**: scheduler daemon which communicates with Gerrit and
+ Gearman. Handles receiving events, launching jobs, collecting results
+ and postingreports.
+ - **zuul-merger**: speculative-merger which communicates with Gearman.
+ Prepares Git repositories for jobs to test against. This additionally
+ requires a web server hosting the Git repositories which can be cloned
+ by the jobs.
+ - **zuul-cloner**: client side script used to setup job workspace. It is
+ used to clone the repositories prepared by the zuul-merger described
+ previously.
+ - **gearmand**: optional builtin gearman daemon provided by zuul-server
+
+External components:
+
+ - Jenkins Gearman plugin: Used by Jenkins to connect to Gearman
+
+Zuul Communication
+------------------
+
+All the Zuul components communicate with each other using Gearman. As well as
+the following communication channels:
+
+zuul-server:
+
+ - Gerrit
+ - Gearman Daemon
+
+zuul-merger:
+
+ - Gerrit
+ - Gearman Daemon
+
+zuul-cloner:
+
+ - http hosted zuul-merger git repos
+
+Jenkins:
+
+ - Gearman Daemon via Jenkins Gearman Plugin
+
+Zuul Setup
+----------
+
+At minimum we need to provide **zuul.conf** and **layout.yaml** and placed
+in /etc/zuul/ directory. You will also need a zuul user and ssh key for the
+zuul user in Gerrit. The following example uses the builtin gearmand service
+in zuul.
+
+**zuul.conf**::
+
+ [zuul]
+ layout_config=/etc/zuul/layout.yaml
+
+ [merger]
+ git_dir=/git
+ zuul_url=http://zuul.example.com/p
+
+ [gearman_server]
+ start=true
+
+ [gearman]
+ server=127.0.0.1
+
+ [connection gerrit]
+ driver=gerrit
+ server=git.example.com
+ port=29418
+ baseurl=https://git.example.com/gerrit/
+ user=zuul
+ sshkey=/home/zuul/.ssh/id_rsa
+
+See :doc:`zuul` for more details.
+
+The following sets up a basic timer triggered job using zuul.
+
+**layout.yaml**::
+
+ pipelines:
+ - name: periodic
+ source: gerrit
+ manager: IndependentPipelineManager
+ trigger:
+ timer:
+ - time: '0 * * * *'
+
+ projects:
+ - name: aproject
+ periodic:
+ - aproject-periodic-build
+
+Starting Zuul
+-------------
+
+You can run zuul-server with the **-d** option to make it not daemonize. It's
+a good idea at first to confirm there's no issues with your configuration.
+
+Simply run::
+
+ zuul-server
+
+Once run you should have 2 zuul-server processes::
+
+ zuul 12102 1 0 Jan21 ? 00:15:45 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d
+ zuul 12107 12102 0 Jan21 ? 00:00:01 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d
+
+Note: In this example zuul was installed in a virtualenv.
+
+The 2nd zuul-server process is gearmand running if you are using the builtin
+gearmand server, otherwise there will only be 1 process.
+
+Zuul won't actually process your Job queue however unless you also have a
+zuul-merger process running.
+
+Simply run::
+
+ zuul-merger
+
+Zuul should now be able to process your periodic job as configured above once
+the Jenkins side of things is configured.
+
+Jenkins Setup
+-------------
+
+Install the Jenkins Gearman Plugin via Jenkins Plugin management interface.
+Then naviage to **Manage > Configuration > Gearman** and setup the Jenkins
+server hostname/ip and port to connect to gearman.
+
+At this point gearman should be running your Jenkins jobs.
+
+Troubleshooting
+---------------
+
+Checking Gearman function registration (jobs). You can use telnet to connect
+to gearman to check that Jenkins is registering your configured jobs in
+gearman::
+
+ telnet <gearman_ip> 4730
+
+Useful commands are **workers** and **status** which you can run by just
+typing those commands once connected to gearman. Every job in your Jenkins
+master must appear when you run **workers** for Zuul to be able to run jobs
+against your Jenkins instance.
diff --git a/doc/source/statsd.rst b/doc/source/statsd.rst
index f789d61..b3bf99f 100644
--- a/doc/source/statsd.rst
+++ b/doc/source/statsd.rst
@@ -31,7 +31,7 @@
The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
-**gerrit.events.<type> (counters)**
+**gerrit.event.<type> (counters)**
Gerrit emits different kind of message over its `stream-events` interface. As
a convenience, Zuul emits metrics to statsd which save you from having to use
a different daemon to measure Gerrit events.
@@ -52,6 +52,18 @@
Refer to your Gerrit installation documentation for an exhaustive list of
Gerrit event types.
+**zuul.node_type.**
+ Holds metrics specifc to build nodes per label. The hierarchy is:
+
+ #. **<build node label>** each of the labels associated to a build in
+ Jenkins. It contains:
+
+ #. **job.<jobname>** subtree detailing per job statistics:
+
+ #. **wait_time** counter and timer of the wait time, with the
+ difference of the job start time and the launch time, in
+ milliseconds.
+
**zuul.pipeline.**
Holds metrics specific to jobs. The hierarchy is:
@@ -75,10 +87,13 @@
known by Zuul (which includes build time and Zuul overhead).
#. **total_changes** counter of the number of change proceeding since
Zuul started.
+ #. **wait_time** counter and timer of the wait time, with the difference
+ of the job start time and the launch time, in milliseconds.
Additionally, the `zuul.pipeline.<pipeline name>` hierarchy contains
- `current_changes` and `resident_time` metrics for each projects. The slash
- separator used in Gerrit name being replaced by dots.
+ `current_changes` (gauge), `resident_time` (timing) and `total_changes`
+ (counter) metrics for each projects. The slash separator used in Gerrit name
+ being replaced by dots.
As an example, given a job named `myjob` triggered by the `gate` pipeline
which took 40 seconds to build, the Zuul scheduler will emit the following
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 74ce360..e8279d9 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -10,11 +10,11 @@
**zuul.conf**
Connection information for Gerrit and Gearman, locations of the
- other config files.
+ other config files. (required)
**layout.yaml**
- Project and pipeline configuration -- what Zuul does.
+ Project and pipeline configuration -- what Zuul does. (required)
**logging.conf**
- Python logging config.
+ Python logging config. (optional)
Examples of each of the three files can be found in the etc/ directory
of the source distribution.
@@ -41,17 +41,28 @@
gearman
"""""""
+Client connection information for gearman. If using Zuul's builtin gearmand
+server just set **server** to 127.0.0.1.
+
**server**
Hostname or IP address of the Gearman server.
- ``server=gearman.example.com``
+ ``server=gearman.example.com`` (required)
**port**
Port on which the Gearman server is listening.
- ``port=4730``
+ ``port=4730`` (optional)
+
+**check_job_registration**
+ Check to see if job is registered with Gearman or not. When True
+ a build result of NOT_REGISTERED will be return if job is not found.
+ ``check_job_registration=True``
gearman_server
""""""""""""""
+The builtin gearman server. Zuul can fork a gearman process from itself rather
+than connecting to an external one.
+
**start**
Whether to start the internal Gearman server (default: False).
``start=true``
@@ -64,9 +75,25 @@
Path to log config file for internal Gearman server.
``log_config=/etc/zuul/gearman-logging.yaml``
+webapp
+""""""
+
+**listen_address**
+ IP address or domain name on which to listen (default: 0.0.0.0).
+ ``listen_address=127.0.0.1``
+
+**port**
+ Port on which the webapp is listening (default: 8001).
+ ``port=8008``
+
zuul
""""
+Zuul's main configuration section. At minimum zuul must be able to find
+layout.yaml to be useful.
+
+.. note:: Must be provided when running zuul-server
+
.. _layout_config:
**layout_config**
@@ -118,6 +145,13 @@
merger
""""""
+The zuul-merger process configuration. Detailed documentation on this process
+can be found on the :doc:`merger` page.
+
+.. note:: Must be provided when running zuul-merger. Both services may share the
+ same configuration (and even host) or otherwise have an individual
+ zuul.conf.
+
**git_dir**
Directory that Zuul should clone local git repositories to.
``git_dir=/var/lib/zuul/git``
@@ -394,11 +428,12 @@
approval matching all specified requirements.
*username*
- If present, an approval from this username is required.
+ If present, an approval from this username is required. It is
+ treated as a regular expression.
*email*
If present, an approval with this email address is required. It
- is treated as a regular expression as above.
+ is treated as a regular expression.
*email-filter* (deprecated)
A deprecated alternate spelling of *email*. Only one of *email* or
@@ -568,8 +603,8 @@
my_gerrit:
verified: 1
failure:
- gerrit:
- my_gerrit: -1
+ my_gerrit:
+ verified: -1
This will trigger jobs each time a new patchset (or change) is
uploaded to Gerrit, and report +/-1 values to Gerrit in the
@@ -704,6 +739,11 @@
would largely defeat the parallelization of dependent change testing
that is the main feature of Zuul. Default: ``false``.
+**mutex (optional)**
+ This is a string that names a mutex that should be observed by this
+ job. Only one build of any job that references the same named mutex
+ will be enqueued at a time. This applies across all pipelines.
+
**branch (optional)**
This job should only be run on matching branches. This field is
treated as a regular expression and multiple branches may be
@@ -754,12 +794,26 @@
expressions.
The pattern for '/COMMIT_MSG' is always matched on and does not
- have to be included.
+ have to be included. Exception is merge commits (without modified
+ files), in this case '/COMMIT_MSG' is not matched, and job is not
+ skipped. In case of merge commits it's assumed that list of modified
+ files isn't predictible and CI should be run.
**voting (optional)**
Boolean value (``true`` or ``false``) that indicates whatever
a job is voting or not. Default: ``true``.
+**attempts (optional)**
+ Number of attempts zuul will launch a job. Once reached, zuul will report
+ RETRY_LIMIT as the job result.
+ Defaults to 3.
+
+**tags (optional)**
+ A list of arbitrary strings which will be associated with the job.
+ Can be used by the parameter-function to alter behavior based on
+ their presence on a job. If the job name is a regular expression,
+ tags will accumulate on jobs that match.
+
**parameter-function (optional)**
Specifies a function that should be applied to the parameters before
the job is launched. The function should be defined in a python file
@@ -986,9 +1040,8 @@
If you send signal 1 (SIGHUP) to the zuul-server process, Zuul will
stop executing new jobs, wait until all executing jobs are finished,
-reload its configuration, and resume. Any values in any of the
-configuration files may be changed, except the location of Zuul's PID
-file (a change to that will be ignored until Zuul is restarted).
+reload its layout.yaml, and resume. Changes to any connections or
+the PID file will be ignored until Zuul is restarted.
If you send a SIGUSR1 to the zuul-server process, Zuul will stop
executing new jobs, wait until all executing jobs are finished,
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index c63700a..9df44ce 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -490,10 +490,12 @@
$header_div.append($heading);
if (typeof pipeline.description === 'string') {
+ var descr = $('<small />')
+ $.each( pipeline.description.split(/\r?\n\r?\n/), function(index, descr_part){
+ descr.append($('<p />').text(descr_part));
+ });
$header_div.append(
- $('<p />').append(
- $('<small />').text(pipeline.description)
- )
+ $('<p />').append(descr)
);
}
return $header_div;
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index 21c1317..a8eab3e 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -26,12 +26,17 @@
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
+[webapp]
+listen_address=0.0.0.0
+port=8001
+
[connection gerrit]
driver=gerrit
server=review.example.com
;baseurl=https://review.example.com/r
user=jenkins
sshkey=/home/jenkins/.ssh/id_rsa
+;keepalive=60
[connection smtp]
driver=smtp
diff --git a/requirements.txt b/requirements.txt
index 6318a59..77ac0a5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,9 @@
-pbr>=0.5.21,<1.0
+pbr>=1.1.0
-argparse
PyYAML>=3.1.0
Paste
-WebOb>=1.2.3,<1.3
-paramiko>=1.8.0
+WebOb>=1.2.3
+paramiko>=1.8.0,<2.0.0
GitPython>=0.3.3
ordereddict
python-daemon>=2.0.4,<2.1.0
@@ -12,7 +11,7 @@
statsd>=1.0.0,<3.0
voluptuous>=0.7
gear>=0.5.7,<1.0.0
-apscheduler>=2.1.1,<3.0
+apscheduler>=3.0
PrettyTable>=0.6,<0.8
babel>=1.0
six>=1.6.0
diff --git a/setup.cfg b/setup.cfg
index 620e1ac..7ddeb84 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -25,6 +25,7 @@
zuul-merger = zuul.cmd.merger:main
zuul = zuul.cmd.client:main
zuul-cloner = zuul.cmd.cloner:main
+ zuul-launcher = zuul.cmd.launcher:main
[build_sphinx]
source-dir = doc/source
diff --git a/test-requirements.txt b/test-requirements.txt
index 88223b0..aed9998 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -3,7 +3,6 @@
coverage>=3.6
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
sphinxcontrib-blockdiag>=1.1.0
-discover
fixtures>=0.3.14
python-keystoneclient>=0.4.2
python-subunit
diff --git a/tests/base.py b/tests/base.py
index f3bfa4e..2559eb4 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -22,22 +22,22 @@
import os
import pprint
from six.moves import queue as Queue
+from six.moves import urllib
import random
import re
import select
import shutil
+from six.moves import reload_module
import socket
import string
import subprocess
import swiftclient
import threading
import time
-import urllib2
import git
import gear
import fixtures
-import six.moves.urllib.parse as urlparse
import statsd
import testtools
from git import GitCommandError
@@ -262,6 +262,25 @@
"comment": "This is a comment"}
return event
+ def getRefUpdatedEvent(self):
+ path = os.path.join(self.upstream_root, self.project)
+ repo = git.Repo(path)
+ oldrev = repo.heads[self.branch].commit.hexsha
+
+ event = {
+ "type": "ref-updated",
+ "submitter": {
+ "name": "User Name",
+ },
+ "refUpdate": {
+ "oldRev": oldrev,
+ "newRev": self.patchsets[-1]['revision'],
+ "refName": self.branch,
+ "project": self.project,
+ }
+ }
+ return event
+
def addApproval(self, category, value, username='reviewer_john',
granted_on=None, message=''):
if not granted_on:
@@ -479,7 +498,7 @@
self.url = url
def read(self):
- res = urlparse.urlparse(self.url)
+ res = urllib.parse.urlparse(self.url)
path = res.path
project = '/'.join(path.split('/')[2:-2])
ret = '001e# service=git-upload-pack\n'
@@ -540,6 +559,7 @@
self.wait_condition = threading.Condition()
self.waiting = False
self.aborted = False
+ self.requeue = False
self.created = time.time()
self.description = ''
self.run_error = False
@@ -602,6 +622,8 @@
result = 'FAILURE'
if self.aborted:
result = 'ABORTED'
+ if self.requeue:
+ result = None
if self.run_error:
work_fail = True
@@ -620,6 +642,7 @@
BuildHistory(name=self.name, number=self.number,
result=result, changes=changes, node=self.node,
uuid=self.unique, description=self.description,
+ parameters=self.parameters,
pipeline=self.parameters['ZUUL_PIPELINE'])
)
@@ -856,11 +879,44 @@
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
os.environ.get('OS_LOG_CAPTURE') == '1'):
+ log_level = logging.DEBUG
+ if os.environ.get('OS_LOG_LEVEL') == 'DEBUG':
+ log_level = logging.DEBUG
+ elif os.environ.get('OS_LOG_LEVEL') == 'INFO':
+ log_level = logging.INFO
+ elif os.environ.get('OS_LOG_LEVEL') == 'WARNING':
+ log_level = logging.WARNING
+ elif os.environ.get('OS_LOG_LEVEL') == 'ERROR':
+ log_level = logging.ERROR
+ elif os.environ.get('OS_LOG_LEVEL') == 'CRITICAL':
+ log_level = logging.CRITICAL
self.useFixture(fixtures.FakeLogger(
- level=logging.DEBUG,
+ level=log_level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
+ # NOTE(notmorgan): Extract logging overrides for specific libraries
+ # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+ # each. This is used to limit the output during test runs from
+ # libraries that zuul depends on such as gear.
+ log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+ if log_defaults_from_env:
+ for default in log_defaults_from_env.split(','):
+ try:
+ name, level_str = default.split('=', 1)
+ level = getattr(logging, level_str, logging.DEBUG)
+ self.useFixture(fixtures.FakeLogger(
+ name=name,
+ level=level,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
+ except ValueError:
+ # NOTE(notmorgan): Invalid format of the log default,
+ # skip and don't try and apply a logger for the
+ # specified module
+ pass
+
class ZuulTestCase(BaseTestCase):
@@ -875,11 +931,13 @@
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
+ self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
+ os.makedirs(self.state_root)
# Make per test copy of Configuration.
self.setup_config()
@@ -887,6 +945,7 @@
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'layout_config')))
self.config.set('merger', 'git_dir', self.git_root)
+ self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")
@@ -913,8 +972,8 @@
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
# the statsd client object is configured in the statsd module import
- reload(statsd)
- reload(zuul.scheduler)
+ reload_module(statsd)
+ reload_module(zuul.scheduler)
self.gearman_server = FakeGearmanServer()
@@ -943,12 +1002,12 @@
self.sched.registerConnections(self.connections)
def URLOpenerFactory(*args, **kw):
- if isinstance(args[0], urllib2.Request):
+ if isinstance(args[0], urllib.request.Request):
return old_urlopen(*args, **kw)
return FakeURLOpener(self.upstream_root, *args, **kw)
- old_urlopen = urllib2.urlopen
- urllib2.urlopen = URLOpenerFactory
+ old_urlopen = urllib.request.urlopen
+ urllib.request.urlopen = URLOpenerFactory
self.merge_server = zuul.merger.server.MergeServer(self.config,
self.connections)
@@ -962,7 +1021,8 @@
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
- self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+ self.webapp = zuul.webapp.WebApp(
+ self.sched, port=0, listen_address='127.0.0.1')
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()
@@ -1131,6 +1191,17 @@
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
+ def create_commit(self, project):
+ path = os.path.join(self.upstream_root, project)
+ repo = git.Repo(path)
+ repo.head.reference = repo.heads['master']
+ file_name = os.path.join(path, 'README')
+ with open(file_name, 'a') as f:
+ f.write('creating fake commit\n')
+ repo.index.add([file_name])
+ commit = repo.index.commit('Creating a fake commit')
+ return commit.hexsha
+
def ref_has_change(self, ref, change):
path = os.path.join(self.git_root, change.project)
repo = git.Repo(path)
@@ -1292,9 +1363,11 @@
start = time.time()
while True:
if time.time() - start > 10:
- print 'queue status:',
- print ' '.join(self.eventQueuesEmpty())
- print self.areAllBuildsWaiting()
+ self.log.debug("Queue status:")
+ for queue in self.event_queues:
+ self.log.debug(" %s: %s" % (queue, queue.empty()))
+ self.log.debug("All builds waiting: %s" %
+ (self.areAllBuildsWaiting(),))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
self.worker.lock.acquire()
@@ -1332,8 +1405,8 @@
for pipeline in self.sched.layout.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
- print 'pipeline %s queue %s contents %s' % (
- pipeline.name, queue.name, queue.queue)
+ print('pipeline %s queue %s contents %s' % (
+ pipeline.name, queue.name, queue.queue))
self.assertEqual(len(queue.queue), 0,
"Pipelines queues should be empty")
diff --git a/tests/fixtures/layout-abort-attempts.yaml b/tests/fixtures/layout-abort-attempts.yaml
new file mode 100644
index 0000000..86d9d78
--- /dev/null
+++ b/tests/fixtures/layout-abort-attempts.yaml
@@ -0,0 +1,30 @@
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+ - name: post
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^(?!refs/).*$
+
+jobs:
+ - name: project-test1
+ attempts: 4
+
+projects:
+ - name: org/project
+ check:
+ - project-merge:
+ - project-test1
+ - project-test2
diff --git a/tests/fixtures/layout-cloner.yaml b/tests/fixtures/layout-cloner.yaml
index e840ed9..e8b5dde 100644
--- a/tests/fixtures/layout-cloner.yaml
+++ b/tests/fixtures/layout-cloner.yaml
@@ -1,4 +1,16 @@
pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
- name: gate
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
@@ -18,28 +30,54 @@
gerrit:
verified: -2
+ - name: post
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^(?!refs/).*$
+
projects:
+ - name: org/project
+ check:
+ - integration
+ gate:
+ - integration
- name: org/project1
+ check:
+ - integration
gate:
- - integration
+ - integration
+ post:
+ - postjob
- name: org/project2
+ check:
+ - integration
gate:
- - integration
+ - integration
- name: org/project3
+ check:
+ - integration
gate:
- - integration
+ - integration
- name: org/project4
+ check:
+ - integration
gate:
- - integration
+ - integration
- name: org/project5
+ check:
+ - integration
gate:
- - integration
+ - integration
- name: org/project6
+ check:
+ - integration
gate:
- - integration
+ - integration
diff --git a/tests/fixtures/layout-mutex.yaml b/tests/fixtures/layout-mutex.yaml
new file mode 100644
index 0000000..fcd0529
--- /dev/null
+++ b/tests/fixtures/layout-mutex.yaml
@@ -0,0 +1,25 @@
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+jobs:
+ - name: mutex-one
+ mutex: test-mutex
+ - name: mutex-two
+ mutex: test-mutex
+
+projects:
+ - name: org/project
+ check:
+ - project-test1
+ - mutex-one
+ - mutex-two
diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml
index 7a549f0..f9e6477 100644
--- a/tests/fixtures/layout-requirement-username.yaml
+++ b/tests/fixtures/layout-requirement-username.yaml
@@ -3,7 +3,7 @@
manager: IndependentPipelineManager
require:
approval:
- - username: jenkins
+ - username: ^(jenkins|zuul)$
trigger:
gerrit:
- event: comment-added
diff --git a/tests/fixtures/layout-success-pattern.yaml b/tests/fixtures/layout-success-pattern.yaml
new file mode 100644
index 0000000..cea15f1
--- /dev/null
+++ b/tests/fixtures/layout-success-pattern.yaml
@@ -0,0 +1,21 @@
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ smtp:
+ to: me@example.org
+
+jobs:
+ - name: docs-draft-test
+ success-pattern: http://docs-draft.example.org/{build.parameters[LOG_PATH]}/publish-docs/
+ - name: docs-draft-test2
+ success-pattern: http://docs-draft.example.org/{NOPE}/{build.parameters[BAD]}/publish-docs/
+
+projects:
+ - name: org/docs
+ check:
+ - docs-draft-test:
+ - docs-draft-test2
diff --git a/tests/fixtures/layout-tags.yaml b/tests/fixtures/layout-tags.yaml
new file mode 100644
index 0000000..d5b8bf9
--- /dev/null
+++ b/tests/fixtures/layout-tags.yaml
@@ -0,0 +1,42 @@
+includes:
+ - python-file: tags_custom_functions.py
+
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+jobs:
+ - name: ^.*$
+ parameter-function: apply_tags
+ - name: ^.*-merge$
+ failure-message: Unable to merge change
+ hold-following-changes: true
+ tags: merge
+ - name: project1-merge
+ tags:
+ - project1
+ - extratag
+
+projects:
+ - name: org/project1
+ check:
+ - project1-merge:
+ - project1-test1
+ - project1-test2
+ - project1-project2-integration
+
+ - name: org/project2
+ check:
+ - project2-merge:
+ - project2-test1
+ - project2-test2
+ - project1-project2-integration
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 1d23443..2e48ff1 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -107,6 +107,7 @@
- name: ^.*-merge$
failure-message: Unable to merge change
hold-following-changes: true
+ tags: merge
- name: nonvoting-project-test2
voting: false
- name: project-testfile
@@ -116,6 +117,14 @@
parameter-function: select_debian_node
- name: project1-project2-integration
queue-name: integration
+ - name: mutex-one
+ mutex: test-mutex
+ - name: mutex-two
+ mutex: test-mutex
+ - name: project1-merge
+ tags:
+ - project1
+ - extratag
project-templates:
- name: test-one-and-two
diff --git a/tests/fixtures/tags_custom_functions.py b/tests/fixtures/tags_custom_functions.py
new file mode 100644
index 0000000..67e7ef1
--- /dev/null
+++ b/tests/fixtures/tags_custom_functions.py
@@ -0,0 +1,2 @@
+def apply_tags(item, job, params):
+ params['BUILD_TAGS'] = ' '.join(sorted(job.tags))
diff --git a/tests/fixtures/zuul-connections-same-gerrit.conf b/tests/fixtures/zuul-connections-same-gerrit.conf
index af31c8a..b3b0e3f 100644
--- a/tests/fixtures/zuul-connections-same-gerrit.conf
+++ b/tests/fixtures/zuul-connections-same-gerrit.conf
@@ -26,13 +26,13 @@
driver=gerrit
server=review.example.com
user=jenkins
-sshkey=none
+sshkey=fake_id_rsa1
[connection alt_voting_gerrit]
driver=gerrit
server=review.example.com
user=civoter
-sshkey=none
+sshkey=fake_id_rsa2
[connection outgoing_smtp]
driver=smtp
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index b250c6d..0956cc4 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -26,7 +26,7 @@
driver=gerrit
server=review.example.com
user=jenkins
-sshkey=none
+sshkey=fake_id_rsa_path
[connection smtp]
driver=smtp
diff --git a/tests/test_change_matcher.py b/tests/test_change_matcher.py
index 1f4ab93..0585322 100644
--- a/tests/test_change_matcher.py
+++ b/tests/test_change_matcher.py
@@ -123,13 +123,13 @@
self._test_matches(False)
def test_matches_returns_false_when_not_all_files_match(self):
- self._test_matches(False, files=['docs/foo', 'foo/bar'])
+ self._test_matches(False, files=['/COMMIT_MSG', 'docs/foo', 'foo/bar'])
- def test_matches_returns_true_when_commit_message_matches(self):
- self._test_matches(True, files=['/COMMIT_MSG'])
+ def test_matches_returns_false_when_commit_message_matches(self):
+ self._test_matches(False, files=['/COMMIT_MSG'])
def test_matches_returns_true_when_all_files_match(self):
- self._test_matches(True, files=['docs/foo'])
+ self._test_matches(True, files=['/COMMIT_MSG', 'docs/foo'])
class TestMatchAll(BaseTestMatcher):
diff --git a/tests/test_cloner.py b/tests/test_cloner.py
index 137c157..896fcba 100644
--- a/tests/test_cloner.py
+++ b/tests/test_cloner.py
@@ -91,6 +91,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -107,11 +108,34 @@
'be correct' % (project, number))
work = self.getWorkspaceRepos(projects)
- upstream_repo_path = os.path.join(self.upstream_root, 'org/project1')
- self.assertEquals(
+ # project1 is the zuul_project so the origin should be set to the
+ # zuul_url since that is the most up to date.
+ cache_repo_path = os.path.join(cache_root, 'org/project1')
+ self.assertNotEqual(
work['org/project1'].remotes.origin.url,
+ cache_repo_path,
+ 'workspace repo origin should not be the cache'
+ )
+ zuul_url_repo_path = os.path.join(self.git_root, 'org/project1')
+ self.assertEqual(
+ work['org/project1'].remotes.origin.url,
+ zuul_url_repo_path,
+ 'workspace repo origin should be the zuul url'
+ )
+
+ # project2 is not the zuul_project so the origin should be set
+ # to upstream since that is the best we can do
+ cache_repo_path = os.path.join(cache_root, 'org/project2')
+ self.assertNotEqual(
+ work['org/project2'].remotes.origin.url,
+ cache_repo_path,
+ 'workspace repo origin should not be the cache'
+ )
+ upstream_repo_path = os.path.join(self.upstream_root, 'org/project2')
+ self.assertEqual(
+ work['org/project2'].remotes.origin.url,
upstream_repo_path,
- 'workspace repo origin should be upstream, not cache'
+ 'workspace repo origin should be the upstream url'
)
self.worker.hold_jobs_in_build = False
@@ -149,6 +173,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -219,6 +244,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -333,6 +359,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -395,6 +422,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -481,6 +509,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters['ZUUL_BRANCH'],
zuul_ref=build.parameters['ZUUL_REF'],
zuul_url=self.git_root,
@@ -546,6 +575,7 @@
git_base_url=self.upstream_root,
projects=projects,
workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
zuul_branch=build.parameters.get('ZUUL_BRANCH', None),
zuul_ref=build.parameters.get('ZUUL_REF', None),
zuul_url=self.git_root,
@@ -566,3 +596,159 @@
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
+
+ def test_periodic_update(self):
+ # Test that the merger correctly updates its local repository
+ # before running a periodic job.
+
+ # Prime the merger with the current state
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Merge a different change
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setMerged()
+
+ # Start a periodic job
+ self.worker.hold_jobs_in_build = True
+ self.launcher.negative_function_cache_ttl = 0
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-timer.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+
+ # The pipeline triggers every second, so we should have seen
+ # several by now.
+ time.sleep(5)
+ self.waitUntilSettled()
+
+ builds = self.builds[:]
+
+ self.worker.hold_jobs_in_build = False
+ # Stop queuing timer triggered jobs so that the assertions
+ # below don't race against more jobs being queued.
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-no-timer.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+ self.worker.release()
+ self.waitUntilSettled()
+
+ projects = ['org/project']
+
+ self.assertEquals(2, len(builds), "Two builds are running")
+
+ upstream = self.getUpstreamRepos(projects)
+ self.assertEqual(upstream['org/project'].commit('master').hexsha,
+ B.patchsets[0]['revision'])
+ states = [
+ {'org/project':
+ str(upstream['org/project'].commit('master')),
+ },
+ {'org/project':
+ str(upstream['org/project'].commit('master')),
+ },
+ ]
+
+ for number, build in enumerate(builds):
+ self.log.debug("Build parameters: %s", build.parameters)
+ cloner = zuul.lib.cloner.Cloner(
+ git_base_url=self.upstream_root,
+ projects=projects,
+ workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
+ zuul_branch=build.parameters.get('ZUUL_BRANCH', None),
+ zuul_ref=build.parameters.get('ZUUL_REF', None),
+ zuul_url=self.git_root,
+ )
+ cloner.execute()
+ work = self.getWorkspaceRepos(projects)
+ state = states[number]
+
+ for project in projects:
+ self.assertEquals(state[project],
+ str(work[project].commit('HEAD')),
+ 'Project %s commit for build %s should '
+ 'be correct' % (project, number))
+
+ shutil.rmtree(self.workspace_root)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ def test_post_checkout(self):
+ self.worker.hold_jobs_in_build = True
+ project = "org/project1"
+
+ A = self.fake_gerrit.addFakeChange(project, 'master', 'A')
+ event = A.getRefUpdatedEvent()
+ A.setMerged()
+ self.fake_gerrit.addEvent(event)
+ self.waitUntilSettled()
+
+ build = self.builds[0]
+ state = {'org/project1': build.parameters['ZUUL_COMMIT']}
+
+ build.release()
+ self.waitUntilSettled()
+
+ cloner = zuul.lib.cloner.Cloner(
+ git_base_url=self.upstream_root,
+ projects=[project],
+ workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
+ zuul_branch=build.parameters.get('ZUUL_BRANCH', None),
+ zuul_ref=build.parameters.get('ZUUL_REF', None),
+ zuul_newrev=build.parameters.get('ZUUL_NEWREV', None),
+ zuul_url=self.git_root,
+ )
+ cloner.execute()
+ work = self.getWorkspaceRepos([project])
+ self.assertEquals(state[project],
+ str(work[project].commit('HEAD')),
+ 'Project %s commit for build %s should '
+ 'be correct' % (project, 0))
+ shutil.rmtree(self.workspace_root)
+
+ def test_post_and_master_checkout(self):
+ self.worker.hold_jobs_in_build = True
+ projects = ["org/project1", "org/project2"]
+
+ A = self.fake_gerrit.addFakeChange(projects[0], 'master', 'A')
+ event = A.getRefUpdatedEvent()
+ A.setMerged()
+ self.fake_gerrit.addEvent(event)
+ self.waitUntilSettled()
+
+ build = self.builds[0]
+ upstream = self.getUpstreamRepos(projects)
+ state = {'org/project1':
+ build.parameters['ZUUL_COMMIT'],
+ 'org/project2':
+ str(upstream['org/project2'].commit('master')),
+ }
+
+ build.release()
+ self.waitUntilSettled()
+
+ cloner = zuul.lib.cloner.Cloner(
+ git_base_url=self.upstream_root,
+ projects=projects,
+ workspace=self.workspace_root,
+ zuul_project=build.parameters.get('ZUUL_PROJECT', None),
+ zuul_branch=build.parameters.get('ZUUL_BRANCH', None),
+ zuul_ref=build.parameters.get('ZUUL_REF', None),
+ zuul_newrev=build.parameters.get('ZUUL_NEWREV', None),
+ zuul_url=self.git_root,
+ )
+ cloner.execute()
+ work = self.getWorkspaceRepos(projects)
+
+ for project in projects:
+ self.assertEquals(state[project],
+ str(work[project].commit('HEAD')),
+ 'Project %s commit for build %s should '
+ 'be correct' % (project, 0))
+ shutil.rmtree(self.workspace_root)
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-import ConfigParser
+from six.moves import configparser as ConfigParser
import os
import re
@@ -33,13 +33,13 @@
class TestLayoutValidator(testtools.TestCase):
def test_layouts(self):
"""Test layout file validation"""
- print
+ print()
errors = []
for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
m = LAYOUT_RE.match(fn)
if not m:
continue
- print fn
+ print(fn)
# Load any .conf file by the same name but .conf extension.
config_file = ("%s.conf" %
@@ -69,7 +69,7 @@
fn)
except voluptuous.Invalid as e:
error = str(e)
- print ' ', error
+ print(' ', error)
if error in errors:
raise Exception("Error has already been tested: %s" %
error)
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..6ad0750 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,6 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
+import random
+
+import fixtures
+
from zuul import change_matcher as cm
from zuul import model
@@ -31,12 +36,12 @@
def test_change_matches_returns_false_for_matched_skip_if(self):
change = model.Change('project')
- change.files = ['docs/foo']
+ change.files = ['/COMMIT_MSG', 'docs/foo']
self.assertFalse(self.job.changeMatches(change))
def test_change_matches_returns_true_for_unmatched_skip_if(self):
change = model.Change('project')
- change.files = ['foo']
+ change.files = ['/COMMIT_MSG', 'foo']
self.assertTrue(self.job.changeMatches(change))
def test_copy_retains_skip_if(self):
@@ -62,3 +67,76 @@
metajob = model.Job('^job')
job.copy(metajob)
self._assert_job_booleans_are_not_none(job)
+
+
+class TestJobTimeData(BaseTestCase):
+ def setUp(self):
+ super(TestJobTimeData, self).setUp()
+ self.tmp_root = self.useFixture(fixtures.TempDir(
+ rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+ ).path
+
+ def test_empty_timedata(self):
+ path = os.path.join(self.tmp_root, 'job-name')
+ self.assertFalse(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+
+ def test_save_reload(self):
+ path = os.path.join(self.tmp_root, 'job-name')
+ self.assertFalse(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ success_times = []
+ failure_times = []
+ results = []
+ for x in range(10):
+ success_times.append(int(random.random() * 1000))
+ failure_times.append(int(random.random() * 1000))
+ results.append(0)
+ results.append(1)
+ random.shuffle(results)
+ s = f = 0
+ for result in results:
+ if result:
+ td.add(failure_times[f], 'FAILURE')
+ f += 1
+ else:
+ td.add(success_times[s], 'SUCCESS')
+ s += 1
+ self.assertEqual(td.success_times, success_times)
+ self.assertEqual(td.failure_times, failure_times)
+ self.assertEqual(td.results, results[10:])
+ td.save()
+ self.assertTrue(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ td.load()
+ self.assertEqual(td.success_times, success_times)
+ self.assertEqual(td.failure_times, failure_times)
+ self.assertEqual(td.results, results[10:])
+
+
+class TestTimeDataBase(BaseTestCase):
+ def setUp(self):
+ super(TestTimeDataBase, self).setUp()
+ self.tmp_root = self.useFixture(fixtures.TempDir(
+ rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+ ).path
+ self.db = model.TimeDataBase(self.tmp_root)
+
+ def test_timedatabase(self):
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
+ self.db.update('job-name', 50, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
+ self.db.update('job-name', 100, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
+ for x in range(10):
+ self.db.update('job-name', 100, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 100)
diff --git a/tests/test_requirements.py b/tests/test_requirements.py
index 3ae56ad..81814bf 100644
--- a/tests/test_requirements.py
+++ b/tests/test_requirements.py
@@ -245,7 +245,7 @@
self.assertEqual(len(self.history), 1)
self.assertEqual(self.history[0].name, job)
- # A +2 should allow it to be enqueued
+ # A +2 from nobody should not cause it to be enqueued
B = self.fake_gerrit.addFakeChange(project, 'master', 'B')
# A comment event that we will keep submitting to trigger
comment = B.addApproval('CRVW', 2, username='nobody')
@@ -253,6 +253,7 @@
self.waitUntilSettled()
self.assertEqual(len(self.history), 1)
+ # A +2 from jenkins should allow it to be enqueued
B.addApproval('VRFY', 2, username='jenkins')
self.fake_gerrit.addEvent(comment)
self.waitUntilSettled()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 4f52911..80097b8 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -20,11 +20,10 @@
import re
import shutil
import time
-import urllib
-import urllib2
import yaml
import git
+from six.moves import urllib
import testtools
import zuul.change_matcher
@@ -34,7 +33,6 @@
import zuul.reporter.smtp
from tests.base import (
- BaseTestCase,
ZuulTestCase,
repack_repo,
)
@@ -44,40 +42,6 @@
'%(levelname)-8s %(message)s')
-class TestSchedulerConfigParsing(BaseTestCase):
-
- def test_parse_skip_if(self):
- job_yaml = """
-jobs:
- - name: job_name
- skip-if:
- - project: ^project_name$
- branch: ^stable/icehouse$
- all-files-match-any:
- - ^filename$
- - project: ^project2_name$
- all-files-match-any:
- - ^filename2$
- """.strip()
- data = yaml.load(job_yaml)
- config_job = data.get('jobs')[0]
- sched = zuul.scheduler.Scheduler({})
- cm = zuul.change_matcher
- expected = cm.MatchAny([
- cm.MatchAll([
- cm.ProjectMatcher('^project_name$'),
- cm.BranchMatcher('^stable/icehouse$'),
- cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
- ]),
- cm.MatchAll([
- cm.ProjectMatcher('^project2_name$'),
- cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
- ]),
- ])
- matcher = sched._parseSkipIf(config_job)
- self.assertEqual(expected, matcher)
-
-
class TestScheduler(ZuulTestCase):
def test_jobs_launched(self):
@@ -495,6 +459,46 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def _test_time_database(self, iteration):
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ time.sleep(2)
+
+ data = json.loads(self.sched.formatStatusJSON())
+ found_job = None
+ for pipeline in data['pipelines']:
+ if pipeline['name'] != 'gate':
+ continue
+ for queue in pipeline['change_queues']:
+ for head in queue['heads']:
+ for item in head:
+ for job in item['jobs']:
+ if job['name'] == 'project-merge':
+ found_job = job
+ break
+
+ self.assertIsNotNone(found_job)
+ if iteration == 1:
+ self.assertIsNotNone(found_job['estimated_time'])
+ self.assertIsNone(found_job['remaining_time'])
+ else:
+ self.assertIsNotNone(found_job['estimated_time'])
+ self.assertTrue(found_job['estimated_time'] >= 2)
+ self.assertIsNotNone(found_job['remaining_time'])
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ def test_time_database(self):
+ "Test the time database"
+
+ self._test_time_database(1)
+ self._test_time_database(2)
+
def test_two_failed_changes_at_head(self):
"Test that changes are reparented correctly if 2 fail at head"
@@ -600,6 +604,36 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def test_parse_skip_if(self):
+ job_yaml = """
+jobs:
+ - name: job_name
+ skip-if:
+ - project: ^project_name$
+ branch: ^stable/icehouse$
+ all-files-match-any:
+ - ^filename$
+ - project: ^project2_name$
+ all-files-match-any:
+ - ^filename2$
+ """.strip()
+ data = yaml.load(job_yaml)
+ config_job = data.get('jobs')[0]
+ cm = zuul.change_matcher
+ expected = cm.MatchAny([
+ cm.MatchAll([
+ cm.ProjectMatcher('^project_name$'),
+ cm.BranchMatcher('^stable/icehouse$'),
+ cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
+ ]),
+ cm.MatchAll([
+ cm.ProjectMatcher('^project2_name$'),
+ cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
+ ]),
+ ])
+ matcher = self.sched._parseSkipIf(config_job)
+ self.assertEqual(expected, matcher)
+
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -693,8 +727,8 @@
# triggering events. Since it will have the changes cached
# already (without approvals), we need to clear the cache
# first.
- source = self.sched.layout.pipelines['gate'].source
- source.maintainCache([])
+ for connection in self.connections.values():
+ connection.maintainCache([])
self.worker.hold_jobs_in_build = True
A.addApproval('APRV', 1)
@@ -791,7 +825,6 @@
A.addApproval('APRV', 1)
a = source._getChange(1, 2, refresh=True)
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
- source.maintainCache([])
def test_build_configuration(self):
"Test that zuul merges the right commits for testing"
@@ -1450,7 +1483,7 @@
self.worker.build_history = []
path = os.path.join(self.git_root, "org/project")
- print repack_repo(path)
+ print(repack_repo(path))
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
@@ -1475,9 +1508,9 @@
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")
- print repack_repo(path)
+ print(repack_repo(path))
path = os.path.join(self.git_root, "org/project1")
- print repack_repo(path)
+ print(repack_repo(path))
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@@ -2236,15 +2269,18 @@
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
+ self.worker.release('project-merge')
+ self.waitUntilSettled()
+
port = self.webapp.server.socket.getsockname()[1]
- req = urllib2.Request("http://localhost:%s/status.json" % port)
- f = urllib2.urlopen(req)
+ req = urllib.request.Request("http://localhost:%s/status.json" % port)
+ f = urllib.request.urlopen(req)
headers = f.info()
self.assertIn('Content-Length', headers)
self.assertIn('Content-Type', headers)
- self.assertEqual(headers['Content-Type'],
- 'application/json; charset=UTF-8')
+ self.assertIsNotNone(re.match('^application/json(; charset=UTF-8)?$',
+ headers['Content-Type']))
self.assertIn('Access-Control-Allow-Origin', headers)
self.assertIn('Cache-Control', headers)
self.assertIn('Last-Modified', headers)
@@ -2256,7 +2292,7 @@
self.waitUntilSettled()
data = json.loads(data)
- status_jobs = set()
+ status_jobs = []
for p in data['pipelines']:
for q in p['change_queues']:
if p['name'] in ['gate', 'conflict']:
@@ -2268,10 +2304,24 @@
self.assertTrue(change['active'])
self.assertEqual(change['id'], '1,1')
for job in change['jobs']:
- status_jobs.add(job['name'])
- self.assertIn('project-merge', status_jobs)
- self.assertIn('project-test1', status_jobs)
- self.assertIn('project-test2', status_jobs)
+ status_jobs.append(job)
+ self.assertEqual('project-merge', status_jobs[0]['name'])
+ self.assertEqual('https://server/job/project-merge/0/',
+ status_jobs[0]['url'])
+ self.assertEqual('http://logs.example.com/1/1/gate/project-merge/0',
+ status_jobs[0]['report_url'])
+
+ self.assertEqual('project-test1', status_jobs[1]['name'])
+ self.assertEqual('https://server/job/project-test1/1/',
+ status_jobs[1]['url'])
+ self.assertEqual('http://logs.example.com/1/1/gate/project-test1/1',
+ status_jobs[1]['report_url'])
+
+ self.assertEqual('project-test2', status_jobs[2]['name'])
+ self.assertEqual('https://server/job/project-test2/2/',
+ status_jobs[2]['url'])
+ self.assertEqual('http://logs.example.com/1/1/gate/project-test2/2',
+ status_jobs[2]['report_url'])
def test_merging_queues(self):
"Test that transitively-connected change queues are merged"
@@ -2280,6 +2330,70 @@
self.sched.reconfigure(self.config)
self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
+ def test_mutex(self):
+ "Test job mutexes"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-mutex.yaml')
+ self.sched.reconfigure(self.config)
+
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 3)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'mutex-one')
+ self.assertEqual(self.builds[2].name, 'project-test1')
+
+ self.worker.release('mutex-one')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 3)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test1')
+ self.assertEqual(self.builds[2].name, 'mutex-two')
+ self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+ self.worker.release('mutex-two')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 3)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test1')
+ self.assertEqual(self.builds[2].name, 'mutex-one')
+ self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+ self.worker.release('mutex-one')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 3)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test1')
+ self.assertEqual(self.builds[2].name, 'mutex-two')
+ self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+ self.worker.release('mutex-two')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+ self.assertEqual(self.builds[0].name, 'project-test1')
+ self.assertEqual(self.builds[1].name, 'project-test1')
+ self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+
+ self.waitUntilSettled()
+ self.assertEqual(len(self.builds), 0)
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
def test_node_label(self):
"Test that a job runs on a specific node label"
self.worker.registerFunction('build:node-project-test1:debian')
@@ -2545,6 +2659,53 @@
# Ensure the removed job was not included in the report.
self.assertNotIn('project1-project2-integration', A.messages[0])
+ def test_double_live_reconfiguration_shared_queue(self):
+ # This was a real-world regression. A change is added to
+ # gate; a reconfigure happens, a second change which depends
+ # on the first is added, and a second reconfiguration happens.
+ # Ensure that both changes merge.
+
+ # A failure may indicate incorrect caching or cleaning up of
+ # references during a reconfiguration.
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ B.setDependsOn(A, 1)
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ # Add the parent change.
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Reconfigure (with only one change in the pipeline).
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ # Add the child change.
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # Reconfigure (with both in the pipeline).
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.history), 8)
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2)
+
def test_live_reconfiguration_del_project(self):
# Test project deletion from layout
# while changes are enqueued
@@ -2683,6 +2844,25 @@
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
+ def test_tags(self):
+ "Test job tags"
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-tags.yaml')
+ self.sched.reconfigure(self.config)
+
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ results = {'project1-merge': 'extratag merge project1',
+ 'project2-merge': 'merge'}
+
+ for build in self.history:
+ self.assertEqual(results.get(build.name, ''),
+ build.parameters.get('BUILD_TAGS'))
+
def test_timer(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
@@ -2700,7 +2880,8 @@
port = self.webapp.server.socket.getsockname()[1]
- f = urllib.urlopen("http://localhost:%s/status.json" % port)
+ req = urllib.request.Request("http://localhost:%s/status.json" % port)
+ f = urllib.request.urlopen(req)
data = f.read()
self.worker.hold_jobs_in_build = False
@@ -2742,11 +2923,11 @@
'tests/fixtures/layout-idle.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
+ self.waitUntilSettled()
# The pipeline triggers every second, so we should have seen
# several by now.
time.sleep(5)
- self.waitUntilSettled()
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
@@ -2754,6 +2935,7 @@
'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
+ self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.worker.release('.*')
@@ -2838,6 +3020,49 @@
self.worker.release('.*')
self.waitUntilSettled()
+ def test_timer_sshkey(self):
+ "Test that a periodic job can setup SSH key authentication"
+ self.worker.hold_jobs_in_build = True
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-timer.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+
+ # The pipeline triggers every second, so we should have seen
+ # several by now.
+ time.sleep(5)
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+
+ ssh_wrapper = os.path.join(self.git_root, ".ssh_wrapper_gerrit")
+ self.assertTrue(os.path.isfile(ssh_wrapper))
+ with open(ssh_wrapper) as f:
+ ssh_wrapper_content = f.read()
+ self.assertIn("fake_id_rsa", ssh_wrapper_content)
+ # In the unit tests Merger runs in the same process,
+ # so we see its' environment variables
+ self.assertEqual(os.environ['GIT_SSH'], ssh_wrapper)
+
+ self.worker.release('.*')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 2)
+
+ self.assertEqual(self.getJobFromHistory(
+ 'project-bitrot-stable-old').result, 'SUCCESS')
+ self.assertEqual(self.getJobFromHistory(
+ 'project-bitrot-stable-older').result, 'SUCCESS')
+
+ # Stop queuing timer triggered jobs and let any that may have
+ # queued through so that end of test assertions pass.
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-no-timer.yaml')
+ self.sched.reconfigure(self.config)
+ self.registerJobs()
+ self.waitUntilSettled()
+ self.worker.release('.*')
+ self.waitUntilSettled()
+
def test_client_enqueue_change(self):
"Test that the RPC client can enqueue a change"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -3412,6 +3637,31 @@
self.assertEqual('The merge failed! For more information...',
self.smtp_messages[0]['body'])
+ def test_default_merge_failure_reports(self):
+ """Check that the default merge failure reports are correct."""
+
+ # A should report success, B should report merge failure.
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addPatchset(['conflict'])
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.addPatchset(['conflict'])
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(3, len(self.history)) # A jobs
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertIn('Build succeeded', A.messages[1])
+ self.assertIn('Merge Failed', B.messages[1])
+ self.assertIn('automatically merged', B.messages[1])
+ self.assertNotIn('logs.example.com', B.messages[1])
+ self.assertNotIn('SKIPPED', B.messages[1])
+
def test_swift_instructions(self):
"Test that the correct swift instructions are sent to the workers"
self.config.set('zuul', 'layout_config',
@@ -3566,8 +3816,8 @@
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
- source = self.sched.layout.pipelines['gate'].source
- source.maintainCache([])
+ for connection in self.connections.values():
+ connection.maintainCache([])
self.worker.hold_jobs_in_build = True
B.addApproval('APRV', 1)
@@ -4060,6 +4310,64 @@
self.waitUntilSettled()
self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
+ def test_crd_check_unknown(self):
+ "Test unknown projects in independent pipeline"
+ self.init_repo("org/unknown")
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/unknown', 'master', 'D')
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ # Make sure zuul has seen an event on B.
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 0)
+
+ def test_crd_cycle_join(self):
+ "Test an updated change creates a cycle"
+ A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Create B->A
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Update A to add A->B (a cycle).
+ A.addPatchset()
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+ # Normally we would submit the patchset-created event for
+ # processing here, however, we have no way of noting whether
+ # the dependency cycle detection correctly raised an
+ # exception, so instead, we reach into the source driver and
+ # call the method that would ultimately be called by the event
+ # processing.
+
+ source = self.sched.layout.pipelines['gate'].source
+ with testtools.ExpectedException(
+ Exception, "Dependency cycle detected"):
+ source._getChange(u'1', u'2', True)
+ self.log.debug("Got expected dependency cycle exception")
+
+ # Now if we update B to remove the depends-on, everything
+ # should be okay. B; A->B
+
+ B.addPatchset()
+ B.data['commitMessage'] = '%s\n' % (B.subject,)
+ source._getChange(u'1', u'2', True)
+ source._getChange(u'2', u'2', True)
+
def test_disable_at(self):
"Test a pipeline will only report to the disabled trigger when failing"
@@ -4181,3 +4489,71 @@
self.assertIn('Build failed.', K.messages[0])
# No more messages reported via smtp
self.assertEqual(3, len(self.smtp_messages))
+
+ def test_success_pattern(self):
+ "Ensure bad build params are ignored"
+
+ # Use SMTP reporter to grab the result message easier
+ self.init_repo("org/docs")
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-success-pattern.yaml')
+ self.sched.reconfigure(self.config)
+ self.worker.hold_jobs_in_build = True
+ self.registerJobs()
+
+ A = self.fake_gerrit.addFakeChange('org/docs', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Grab build id
+ self.assertEqual(len(self.builds), 1)
+ uuid = self.builds[0].unique[:7]
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.smtp_messages), 1)
+ body = self.smtp_messages[0]['body'].splitlines()
+ self.assertEqual('Build succeeded.', body[0])
+
+ self.assertIn(
+ '- docs-draft-test http://docs-draft.example.org/1/1/1/check/'
+ 'docs-draft-test/%s/publish-docs/' % uuid,
+ body[2])
+ self.assertIn(
+ '- docs-draft-test2 https://server/job/docs-draft-test2/1/',
+ body[3])
+
+ def test_rerun_on_abort(self):
+ "Test that if a worker fails to run a job, it is run again"
+
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-abort-attempts.yaml')
+ self.sched.reconfigure(self.config)
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+ self.builds[0].requeue = True
+ self.worker.release('.*-test*')
+ self.waitUntilSettled()
+
+ for x in range(3):
+ self.assertEqual(len(self.builds), 1)
+ self.builds[0].requeue = True
+ self.worker.release('.*-test1')
+ self.waitUntilSettled()
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 6)
+ self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 2)
+ self.assertEqual(A.reported, 1)
+ self.assertIn('RETRY_LIMIT', A.messages[0])
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..94f097a 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -16,7 +16,8 @@
# under the License.
import json
-import urllib2
+
+from six.moves import urllib
from tests.base import ZuulTestCase
@@ -44,41 +45,41 @@
def test_webapp_status(self):
"Test that we can filter to only certain changes in the webapp."
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_status_compat(self):
# testing compat with status.json
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status.json" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_bad_url(self):
# do we 404 correctly
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/foo" % self.port)
- self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
+ self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
def test_webapp_find_change(self):
# can we filter by change id
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/change/1,1" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)
self.assertEqual("org/project", data[0]['project'])
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/change/2,1" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)
diff --git a/tools/trigger-job.py b/tools/trigger-job.py
index dff4e3f..7123afc 100755
--- a/tools/trigger-job.py
+++ b/tools/trigger-job.py
@@ -68,7 +68,7 @@
job = gear.Job("build:%s" % args.job,
json.dumps(data),
unique=data['ZUUL_UUID'])
- c.submitJob(job)
+ c.submitJob(job, precedence=gear.PRECEDENCE_HIGH)
while not job.complete:
time.sleep(1)
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 9dbf504..8b854c7 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -35,7 +35,7 @@
if not change['live']:
continue
cid, cps = change['id'].split(',')
- print (
+ print(
"zuul enqueue --trigger gerrit --pipeline %s "
"--project %s --change %s,%s" % (
options.pipeline_name,
diff --git a/tools/zuul-clear-refs.py b/tools/zuul-clear-refs.py
new file mode 100755
index 0000000..60ce744
--- /dev/null
+++ b/tools/zuul-clear-refs.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+# Copyright 2014-2015 Antoine "hashar" Musso
+# Copyright 2014-2015 Wikimedia Foundation Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=locally-disabled, invalid-name
+
+"""
+Zuul references cleaner.
+
+Clear up references under /refs/zuul/ by inspecting the age of the commit the
+reference points to. If the commit date is older than a number of days
+specificed by --until, the reference is deleted from the git repository.
+
+Use --dry-run --verbose to finely inspect the script behavior.
+"""
+
+import argparse
+import git
+import logging
+import time
+import sys
+
+NOW = int(time.time())
+DEFAULT_DAYS = 360
+ZUUL_REF_PREFIX = 'refs/zuul/'
+
+parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+)
+parser.add_argument('--until', dest='days_ago', default=DEFAULT_DAYS, type=int,
+ help='references older than this number of day will '
+ 'be deleted. Default: %s' % DEFAULT_DAYS)
+parser.add_argument('-n', '--dry-run', dest='dryrun', action='store_true',
+ help='do not delete references')
+parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
+ help='set log level from info to debug')
+parser.add_argument('gitrepo', help='path to a Zuul git repository')
+args = parser.parse_args()
+
+logging.basicConfig()
+log = logging.getLogger('zuul-clear-refs')
+if args.verbose:
+ log.setLevel(logging.DEBUG)
+else:
+ log.setLevel(logging.INFO)
+
+try:
+ repo = git.Repo(args.gitrepo)
+except git.exc.InvalidGitRepositoryError:
+ log.error("Invalid git repo: %s" % args.gitrepo)
+ sys.exit(1)
+
+for ref in repo.references:
+
+ if not ref.path.startswith(ZUUL_REF_PREFIX):
+ continue
+ if type(ref) is not git.refs.reference.Reference:
+ # Paranoia: ignore heads/tags/remotes ..
+ continue
+
+ try:
+ commit_ts = ref.commit.committed_date
+ except LookupError:
+ # GitPython does not properly handle PGP signed tags
+ log.exception("Error in commit: %s, ref: %s. Type: %s",
+ ref.commit, ref.path, type(ref))
+ continue
+
+ commit_age = int((NOW - commit_ts) / 86400) # days
+ log.debug(
+ "%s at %s is %3s days old",
+ ref.commit,
+ ref.path,
+ commit_age,
+ )
+ if commit_age > args.days_ago:
+ if args.dryrun:
+ log.info("Would delete old ref: %s (%s)", ref.path, ref.commit)
+ else:
+ log.info("Deleting old ref: %s (%s)", ref.path, ref.commit)
+ ref.delete(repo, ref.path)
diff --git a/tox.ini b/tox.ini
index 0f8254a..06ccbcd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,8 @@
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=30
+ OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
+passenv = ZUUL_TEST_ROOT
usedevelop = True
install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
@@ -16,10 +18,17 @@
commands =
python setup.py testr --slowest --testr-args='{posargs}'
-[tox:jenkins]
-downloadcache = ~/cache/pip
+[testenv:bindep]
+# Do not install any requirements. We want this to be fast and work even if
+# system dependencies are missing, since it's used to tell you what system
+# dependencies are missing! This also means that bindep must be installed
+# separately, outside of the requirements files.
+deps = bindep
+commands = bindep test
[testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
commands = flake8 {posargs}
[testenv:cover]
diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/__init__.py
diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/library/__init__.py
diff --git a/zuul/ansible/library/command.py b/zuul/ansible/library/command.py
new file mode 100644
index 0000000..6390322
--- /dev/null
+++ b/zuul/ansible/library/command.py
@@ -0,0 +1,469 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2016 Red Hat, Inc.
+# Copyright (c) 2016 IBM Corp.
+# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>, and others
+# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+# flake8: noqa
+# This file shares a significant chunk of code with an upstream ansible
+# function, run_command. The goal is to not have to fork quite so much
+# of that function, and discussing that design with upstream means we
+# should keep the changes to substantive ones only. For that reason, this
+# file is purposely not enforcing pep8, as making the function pep8 clean
+# would remove our ability to easily have a discussion with our friends
+# upstream
+
+DOCUMENTATION = '''
+---
+module: command
+short_description: Executes a command on a remote node
+version_added: historical
+description:
+ - The M(command) module takes the command name followed by a list of space-delimited arguments.
+ - The given command will be executed on all selected nodes. It will not be
+ processed through the shell, so variables like C($HOME) and operations
+ like C("<"), C(">"), C("|"), C(";") and C("&") will not work (use the M(shell)
+ module if you need these features).
+options:
+ free_form:
+ description:
+ - the command module takes a free form command to run. There is no parameter actually named 'free form'.
+ See the examples!
+ required: true
+ default: null
+ creates:
+ description:
+ - a filename or (since 2.0) glob pattern, when it already exists, this step will B(not) be run.
+ required: no
+ default: null
+ removes:
+ description:
+ - a filename or (since 2.0) glob pattern, when it does not exist, this step will B(not) be run.
+ version_added: "0.8"
+ required: no
+ default: null
+ chdir:
+ description:
+ - cd into this directory before running the command
+ version_added: "0.6"
+ required: false
+ default: null
+ executable:
+ description:
+ - change the shell used to execute the command. Should be an absolute path to the executable.
+ required: false
+ default: null
+ version_added: "0.9"
+ warn:
+ version_added: "1.8"
+ default: yes
+ description:
+ - if command warnings are on in ansible.cfg, do not warn about this particular line if set to no/false.
+ required: false
+notes:
+ - If you want to run a command through the shell (say you are using C(<),
+ C(>), C(|), etc), you actually want the M(shell) module instead. The
+ M(command) module is much more secure as it's not affected by the user's
+ environment.
+ - " C(creates), C(removes), and C(chdir) can be specified after the command. For instance, if you only want to run a command if a certain file does not exist, use this."
+author:
+ - Ansible Core Team
+ - Michael DeHaan
+'''
+
+EXAMPLES = '''
+# Example from Ansible Playbooks.
+- command: /sbin/shutdown -t now
+
+# Run the command if the specified file does not exist.
+- command: /usr/bin/make_database.sh arg1 arg2 creates=/path/to/database
+
+# You can also use the 'args' form to provide the options. This command
+# will change the working directory to somedir/ and will only run when
+# /path/to/database doesn't exist.
+- command: /usr/bin/make_database.sh arg1 arg2
+ args:
+ chdir: somedir/
+ creates: /path/to/database
+'''
+
+import datetime
+import glob
+import pipes
+import re
+import shlex
+import os
+
+import getpass
+import select
+import subprocess
+import traceback
+import threading
+
+from ansible.module_utils.basic import AnsibleModule, heuristic_log_sanitize
+from ansible.module_utils.basic import get_exception
+# ZUUL: Hardcode python2 until we're on ansible 2.2
+from ast import literal_eval
+
+
+PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
+
+
+class Console(object):
+ def __enter__(self):
+ self.logfile = open('/tmp/console.html', 'a', 0)
+ return self
+
+ def __exit__(self, etype, value, tb):
+ self.logfile.close()
+
+ def addLine(self, ln):
+ # Note this format with deliminator is "inspired" by the old
+ # Jenkins format but with microsecond resolution instead of
+ # millisecond. It is kept so log parsing/formatting remains
+ # consistent.
+ ts = datetime.datetime.now()
+ outln = '%s | %s' % (ts, ln)
+ self.logfile.write(outln)
+
+
+def follow(fd):
+ newline_warning = False
+ with Console() as console:
+ while True:
+ line = fd.readline()
+ if not line:
+ break
+ if not line.endswith('\n'):
+ line += '\n'
+ newline_warning = True
+ console.addLine(line)
+ if newline_warning:
+ console.addLine('[Zuul] No trailing newline\n')
+
+
+# Taken from ansible/module_utils/basic.py ... forking the method for now
+# so that we can dive in and figure out how to make appropriate hook points
+def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None):
+ '''
+ Execute a command, returns rc, stdout, and stderr.
+
+ :arg args: is the command to run
+ * If args is a list, the command will be run with shell=False.
+ * If args is a string and use_unsafe_shell=False it will split args to a list and run with shell=False
+ * If args is a string and use_unsafe_shell=True it runs with shell=True.
+ :kw check_rc: Whether to call fail_json in case of non zero RC.
+ Default False
+ :kw close_fds: See documentation for subprocess.Popen(). Default True
+ :kw executable: See documentation for subprocess.Popen(). Default None
+ :kw data: If given, information to write to the stdin of the command
+ :kw binary_data: If False, append a newline to the data. Default False
+ :kw path_prefix: If given, additional path to find the command in.
+ This adds to the PATH environment vairable so helper commands in
+ the same directory can also be found
+ :kw cwd: If given, working directory to run the command inside
+ :kw use_unsafe_shell: See `args` parameter. Default False
+ :kw prompt_regex: Regex string (not a compiled regex) which can be
+ used to detect prompts in the stdout which would otherwise cause
+ the execution to hang (especially if no input data is specified)
+ :kwarg environ_update: dictionary to *update* os.environ with
+ '''
+
+ shell = False
+ if isinstance(args, list):
+ if use_unsafe_shell:
+ args = " ".join([pipes.quote(x) for x in args])
+ shell = True
+ elif isinstance(args, (str, unicode)) and use_unsafe_shell:
+ shell = True
+ elif isinstance(args, (str, unicode)):
+ # On python2.6 and below, shlex has problems with text type
+ # ZUUL: Hardcode python2 until we're on ansible 2.2
+ if isinstance(args, unicode):
+ args = args.encode('utf-8')
+ args = shlex.split(args)
+ else:
+ msg = "Argument 'args' to run_command must be list or string"
+ self.fail_json(rc=257, cmd=args, msg=msg)
+
+ prompt_re = None
+ if prompt_regex:
+ try:
+ prompt_re = re.compile(prompt_regex, re.MULTILINE)
+ except re.error:
+ self.fail_json(msg="invalid prompt regular expression given to run_command")
+
+ # expand things like $HOME and ~
+ if not shell:
+ args = [ os.path.expanduser(os.path.expandvars(x)) for x in args if x is not None ]
+
+ rc = 0
+ msg = None
+ st_in = None
+
+ # Manipulate the environ we'll send to the new process
+ old_env_vals = {}
+ # We can set this from both an attribute and per call
+ for key, val in self.run_command_environ_update.items():
+ old_env_vals[key] = os.environ.get(key, None)
+ os.environ[key] = val
+ if environ_update:
+ for key, val in environ_update.items():
+ old_env_vals[key] = os.environ.get(key, None)
+ os.environ[key] = val
+ if path_prefix:
+ old_env_vals['PATH'] = os.environ['PATH']
+ os.environ['PATH'] = "%s:%s" % (path_prefix, os.environ['PATH'])
+
+ # If using test-module and explode, the remote lib path will resemble ...
+ # /tmp/test_module_scratch/debug_dir/ansible/module_utils/basic.py
+ # If using ansible or ansible-playbook with a remote system ...
+ # /tmp/ansible_vmweLQ/ansible_modlib.zip/ansible/module_utils/basic.py
+
+ # Clean out python paths set by ansiballz
+ if 'PYTHONPATH' in os.environ:
+ pypaths = os.environ['PYTHONPATH'].split(':')
+ pypaths = [x for x in pypaths \
+ if not x.endswith('/ansible_modlib.zip') \
+ and not x.endswith('/debug_dir')]
+ os.environ['PYTHONPATH'] = ':'.join(pypaths)
+ if not os.environ['PYTHONPATH']:
+ del os.environ['PYTHONPATH']
+
+ # create a printable version of the command for use
+ # in reporting later, which strips out things like
+ # passwords from the args list
+ to_clean_args = args
+ # ZUUL: Hardcode python2 until we're on ansible 2.2
+ if isinstance(args, (unicode, str)):
+ to_clean_args = shlex.split(to_clean_args)
+
+ clean_args = []
+ is_passwd = False
+ for arg in to_clean_args:
+ if is_passwd:
+ is_passwd = False
+ clean_args.append('********')
+ continue
+ if PASSWD_ARG_RE.match(arg):
+ sep_idx = arg.find('=')
+ if sep_idx > -1:
+ clean_args.append('%s=********' % arg[:sep_idx])
+ continue
+ else:
+ is_passwd = True
+ arg = heuristic_log_sanitize(arg, self.no_log_values)
+ clean_args.append(arg)
+ clean_args = ' '.join(pipes.quote(arg) for arg in clean_args)
+
+ if data:
+ st_in = subprocess.PIPE
+
+ # ZUUL: changed stderr to follow stdout
+ kwargs = dict(
+ executable=executable,
+ shell=shell,
+ close_fds=close_fds,
+ stdin=st_in,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+
+ if cwd and os.path.isdir(cwd):
+ kwargs['cwd'] = cwd
+
+ # store the pwd
+ prev_dir = os.getcwd()
+
+ # make sure we're in the right working directory
+ if cwd and os.path.isdir(cwd):
+ try:
+ os.chdir(cwd)
+ except (OSError, IOError):
+ e = get_exception()
+ self.fail_json(rc=e.errno, msg="Could not open %s, %s" % (cwd, str(e)))
+
+ try:
+
+ if self._debug:
+ if isinstance(args, list):
+ running = ' '.join(args)
+ else:
+ running = args
+ self.log('Executing: ' + running)
+ # ZUUL: Replaced the excution loop with the zuul_runner run function
+ cmd = subprocess.Popen(args, **kwargs)
+ t = threading.Thread(target=follow, args=(cmd.stdout,))
+ t.daemon = True
+ t.start()
+ ret = cmd.wait()
+ # Give the thread that is writing the console log up to 10 seconds
+ # to catch up and exit. If it hasn't done so by then, it is very
+ # likely stuck in readline() because it spawed a child that is
+ # holding stdout or stderr open.
+ t.join(10)
+ with Console() as console:
+ if t.isAlive():
+ console.addLine("[Zuul] standard output/error still open "
+ "after child exited")
+ console.addLine("[Zuul] Task exit code: %s\n" % ret)
+
+ # ZUUL: If the console log follow thread *is* stuck in readline,
+ # we can't close stdout (attempting to do so raises an
+ # exception) , so this is disabled.
+ # cmd.stdout.close()
+
+ # ZUUL: stdout and stderr are in the console log file
+ stdout = ''
+ stderr = ''
+
+ rc = cmd.returncode
+ except (OSError, IOError):
+ e = get_exception()
+ self.fail_json(rc=e.errno, msg=str(e), cmd=clean_args)
+ except Exception:
+ e = get_exception()
+ self.fail_json(rc=257, msg=str(e), exception=traceback.format_exc(), cmd=clean_args)
+
+ # Restore env settings
+ for key, val in old_env_vals.items():
+ if val is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = val
+
+ if rc != 0 and check_rc:
+ msg = heuristic_log_sanitize(stderr.rstrip(), self.no_log_values)
+ self.fail_json(cmd=clean_args, rc=rc, stdout=stdout, stderr=stderr, msg=msg)
+
+ # reset the pwd
+ os.chdir(prev_dir)
+
+ return (rc, stdout, stderr)
+
+
+def check_command(commandline):
+ arguments = { 'chown': 'owner', 'chmod': 'mode', 'chgrp': 'group',
+ 'ln': 'state=link', 'mkdir': 'state=directory',
+ 'rmdir': 'state=absent', 'rm': 'state=absent', 'touch': 'state=touch' }
+ commands = { 'hg': 'hg', 'curl': 'get_url or uri', 'wget': 'get_url or uri',
+ 'svn': 'subversion', 'service': 'service',
+ 'mount': 'mount', 'rpm': 'yum, dnf or zypper', 'yum': 'yum', 'apt-get': 'apt',
+ 'tar': 'unarchive', 'unzip': 'unarchive', 'sed': 'template or lineinfile',
+ 'dnf': 'dnf', 'zypper': 'zypper' }
+ become = [ 'sudo', 'su', 'pbrun', 'pfexec', 'runas' ]
+ warnings = list()
+ command = os.path.basename(commandline.split()[0])
+ if command in arguments:
+ warnings.append("Consider using file module with %s rather than running %s" % (arguments[command], command))
+ if command in commands:
+ warnings.append("Consider using %s module rather than running %s" % (commands[command], command))
+ if command in become:
+ warnings.append("Consider using 'become', 'become_method', and 'become_user' rather than running %s" % (command,))
+ return warnings
+
+
+def main():
+
+ # the command module is the one ansible module that does not take key=value args
+ # hence don't copy this one if you are looking to build others!
+ module = AnsibleModule(
+ argument_spec=dict(
+ _raw_params = dict(),
+ _uses_shell = dict(type='bool', default=False),
+ chdir = dict(type='path'),
+ executable = dict(),
+ creates = dict(type='path'),
+ removes = dict(type='path'),
+ warn = dict(type='bool', default=True),
+ environ = dict(type='dict', default=None),
+ )
+ )
+
+ shell = module.params['_uses_shell']
+ chdir = module.params['chdir']
+ executable = module.params['executable']
+ args = module.params['_raw_params']
+ creates = module.params['creates']
+ removes = module.params['removes']
+ warn = module.params['warn']
+ environ = module.params['environ']
+
+ if args.strip() == '':
+ module.fail_json(rc=256, msg="no command given")
+
+ if chdir:
+ chdir = os.path.abspath(chdir)
+ os.chdir(chdir)
+
+ if creates:
+ # do not run the command if the line contains creates=filename
+ # and the filename already exists. This allows idempotence
+ # of command executions.
+ if glob.glob(creates):
+ module.exit_json(
+ cmd=args,
+ stdout="skipped, since %s exists" % creates,
+ changed=False,
+ rc=0
+ )
+
+ if removes:
+ # do not run the command if the line contains removes=filename
+ # and the filename does not exist. This allows idempotence
+ # of command executions.
+ if not glob.glob(removes):
+ module.exit_json(
+ cmd=args,
+ stdout="skipped, since %s does not exist" % removes,
+ changed=False,
+ rc=0
+ )
+
+ warnings = list()
+ if warn:
+ warnings = check_command(args)
+
+ if not shell:
+ args = shlex.split(args)
+ startd = datetime.datetime.now()
+
+ rc, out, err = zuul_run_command(module, args, executable=executable, use_unsafe_shell=shell, environ_update=environ)
+
+ endd = datetime.datetime.now()
+ delta = endd - startd
+
+ if out is None:
+ out = ''
+ if err is None:
+ err = ''
+
+ module.exit_json(
+ cmd = args,
+ stdout = out.rstrip("\r\n"),
+ stderr = err.rstrip("\r\n"),
+ rc = rc,
+ start = str(startd),
+ end = str(endd),
+ delta = str(delta),
+ changed = True,
+ warnings = warnings
+ )
+
+if __name__ == '__main__':
+ main()
diff --git a/zuul/ansible/library/zuul_afs.py b/zuul/ansible/library/zuul_afs.py
new file mode 100644
index 0000000..3ba426b
--- /dev/null
+++ b/zuul/ansible/library/zuul_afs.py
@@ -0,0 +1,121 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 Red Hat
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import subprocess
+
+
+def afs_sync(afsuser, afskeytab, afsroot, afssource, afstarget):
+ # Find the list of root markers in the just-completed build
+ # (usually there will only be one, but some builds produce content
+ # at the root *and* at a tag location, or possibly at multiple
+ # translation roots).
+ src_root_markers = []
+ for root, dirnames, filenames in os.walk(afssource):
+ if '.root-marker' in filenames:
+ src_root_markers.append(root)
+
+ output_blocks = []
+ # Synchronize the content at each root marker.
+ for root_count, src_root in enumerate(src_root_markers):
+ # The component of the path between the source root and the
+ # current source root marker. May be '.' if there is a marker
+ # at the root.
+ subpath = os.path.relpath(src_root, afssource)
+
+ # Add to our debugging output
+ output = dict(subpath=subpath)
+ output_blocks.append(output)
+
+ # The absolute path to the source (in staging) and destination
+ # (in afs) of the build root for the current root marker.
+ subsource = os.path.abspath(os.path.join(afssource, subpath))
+ subtarget = os.path.abspath(os.path.join(afstarget, subpath))
+
+ # Create a filter list for rsync so that we copy exactly the
+ # directories we want to without deleting any existing
+ # directories in the published site that were placed there by
+ # previous builds.
+
+ # Exclude any directories under this subpath which have root
+ # markers.
+ excludes = []
+ for root, dirnames, filenames in os.walk(subtarget):
+ if '.root-marker' in filenames:
+ exclude_subpath = os.path.relpath(root, subtarget)
+ if exclude_subpath == '.':
+ continue
+ excludes.append(os.path.join('/', exclude_subpath))
+ output['excludes'] = excludes
+
+ filter_file = os.path.join(afsroot, 'filter_%i' % root_count)
+
+ with open(filter_file, 'w') as f:
+ for exclude in excludes:
+ f.write('- %s\n' % exclude)
+
+ # Perform the rsync with the filter list.
+ rsync_cmd = ' '.join([
+ '/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
+ "--out-format='<<CHANGED>>%i %n%L'",
+ "--filter='merge {filter}'", '{src}/', '{dst}/',
+ ])
+ mkdir_cmd = ' '.join(['mkdir', '-p', '{dst}/'])
+ bash_cmd = ' '.join([
+ '/bin/bash', '-c', '"{mkdir_cmd} && {rsync_cmd}"'
+ ]).format(
+ mkdir_cmd=mkdir_cmd,
+ rsync_cmd=rsync_cmd)
+
+ k5start_cmd = ' '.join([
+ '/usr/bin/k5start', '-t', '-f', '{keytab}', '{user}', '--',
+ bash_cmd,
+ ])
+
+ shell_cmd = k5start_cmd.format(
+ src=subsource,
+ dst=subtarget,
+ filter=filter_file,
+ user=afsuser,
+ keytab=afskeytab),
+ output['source'] = subsource
+ output['destination'] = subtarget
+ output['output'] = subprocess.check_output(shell_cmd, shell=True)
+
+ return output_blocks
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ user=dict(required=True, type='raw'),
+ keytab=dict(required=True, type='raw'),
+ root=dict(required=True, type='raw'),
+ source=dict(required=True, type='raw'),
+ target=dict(required=True, type='raw'),
+ )
+ )
+
+ p = module.params
+ output = afs_sync(p['user'], p['keytab'], p['root'],
+ p['source'], p['target'])
+ module.exit_json(changed=True, build_roots=output)
+
+from ansible.module_utils.basic import * # noqa
+
+if __name__ == '__main__':
+ main()
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
new file mode 100644
index 0000000..e70dac8
--- /dev/null
+++ b/zuul/ansible/library/zuul_console.py
@@ -0,0 +1,184 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import socket
+import threading
+
+
+def daemonize():
+ # A really basic daemonize method that should work well enough for
+ # now in this circumstance. Based on the public domain code at:
+ # http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
+
+ pid = os.fork()
+ if pid > 0:
+ return True
+
+ os.chdir('/')
+ os.setsid()
+ os.umask(0)
+
+ pid = os.fork()
+ if pid > 0:
+ sys.exit(0)
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+ i = open('/dev/null', 'r')
+ o = open('/dev/null', 'a+')
+ e = open('/dev/null', 'a+', 0)
+ os.dup2(i.fileno(), sys.stdin.fileno())
+ os.dup2(o.fileno(), sys.stdout.fileno())
+ os.dup2(e.fileno(), sys.stderr.fileno())
+ return False
+
+
+class Console(object):
+ def __init__(self, path):
+ self.path = path
+ self.file = open(path)
+ self.stat = os.stat(path)
+ self.size = self.stat.st_size
+
+
+class Server(object):
+ def __init__(self, path, port):
+ self.path = path
+
+ s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ s.bind(('::', port))
+ s.listen(1)
+
+ self.socket = s
+
+ def accept(self):
+ conn, addr = self.socket.accept()
+ return conn
+
+ def run(self):
+ while True:
+ conn = self.accept()
+ t = threading.Thread(target=self.handleOneConnection, args=(conn,))
+ t.daemon = True
+ t.start()
+
+ def chunkConsole(self, conn):
+ try:
+ console = Console(self.path)
+ except Exception:
+ return
+ while True:
+ chunk = console.file.read(4096)
+ if not chunk:
+ break
+ conn.send(chunk)
+ return console
+
+ def followConsole(self, console, conn):
+ while True:
+ # As long as we have unread data, keep reading/sending
+ while True:
+ chunk = console.file.read(4096)
+ if chunk:
+ conn.send(chunk)
+ else:
+ break
+
+ # At this point, we are waiting for more data to be written
+ time.sleep(0.5)
+
+ # Check to see if the remote end has sent any data, if so,
+ # discard
+ r, w, e = select.select([conn], [], [conn], 0)
+ if conn in e:
+ return False
+ if conn in r:
+ ret = conn.recv(1024)
+ # Discard anything read, if input is eof, it has
+ # disconnected.
+ if not ret:
+ return False
+
+ # See if the file has been truncated
+ try:
+ st = os.stat(console.path)
+ if (st.st_ino != console.stat.st_ino or
+ st.st_size < console.size):
+ return True
+ except Exception:
+ return True
+ console.size = st.st_size
+
+ def handleOneConnection(self, conn):
+ # FIXME: this won't notice disconnects until it tries to send
+ console = None
+ try:
+ while True:
+ if console is not None:
+ try:
+ console.file.close()
+ except:
+ pass
+ while True:
+ console = self.chunkConsole(conn)
+ if console:
+ break
+ time.sleep(0.5)
+ while True:
+ if self.followConsole(console, conn):
+ break
+ else:
+ return
+ finally:
+ try:
+ conn.close()
+ except Exception:
+ pass
+
+
+def test():
+ s = Server('/tmp/console.html', 19885)
+ s.run()
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ path=dict(default='/tmp/console.html'),
+ port=dict(default=19885, type='int'),
+ )
+ )
+
+ p = module.params
+ path = p['path']
+ port = p['port']
+
+ if daemonize():
+ module.exit_json()
+
+ s = Server(path, port)
+ s.run()
+
+from ansible.module_utils.basic import * # noqa
+
+if __name__ == '__main__':
+ main()
+# test()
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
new file mode 100644
index 0000000..4b377d9
--- /dev/null
+++ b/zuul/ansible/library/zuul_log.py
@@ -0,0 +1,58 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+# Copyright (c) 2016 Red Hat
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+
+
+class Console(object):
+ def __enter__(self):
+ self.logfile = open('/tmp/console.html', 'a', 0)
+ return self
+
+ def __exit__(self, etype, value, tb):
+ self.logfile.close()
+
+ def addLine(self, ln):
+ ts = datetime.datetime.now()
+ outln = '%s | %s' % (str(ts), ln)
+ self.logfile.write(outln)
+
+
+def log(msg):
+ if not isinstance(msg, list):
+ msg = [msg]
+ with Console() as console:
+ for line in msg:
+ console.addLine("[Zuul] %s\n" % line)
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ msg=dict(required=True, type='raw'),
+ )
+ )
+
+ p = module.params
+ log(p['msg'])
+ module.exit_json(changed=True)
+
+from ansible.module_utils.basic import * # noqa
+
+if __name__ == '__main__':
+ main()
diff --git a/zuul/change_matcher.py b/zuul/change_matcher.py
index ed380f0..ca2d93f 100644
--- a/zuul/change_matcher.py
+++ b/zuul/change_matcher.py
@@ -101,7 +101,7 @@
yield self.commit_regex
def matches(self, change):
- if not (hasattr(change, 'files') and change.files):
+ if not (hasattr(change, 'files') and len(change.files) > 1):
return False
for file_ in change.files:
matched_file = False
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import six
from six.moves import configparser as ConfigParser
-import cStringIO
import extras
import logging
import logging.config
@@ -47,7 +47,7 @@
yappi.start()
else:
yappi.stop()
- yappi_out = cStringIO.StringIO()
+ yappi_out = six.BytesIO()
yappi.get_func_stats().print_all(out=yappi_out)
yappi.get_thread_stats().print_all(out=yappi_out)
log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 59ac419..1ce2828 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -154,7 +154,7 @@
running_items = client.get_running_jobs()
if len(running_items) == 0:
- print "No jobs currently running"
+ print("No jobs currently running")
return True
all_fields = self._show_running_jobs_columns()
@@ -181,7 +181,7 @@
v += all_fields[f]['append']
values.append(v)
table.add_row(values)
- print table
+ print(table)
return True
def _epoch_to_relative_time(self, epoch):
diff --git a/zuul/cmd/cloner.py b/zuul/cmd/cloner.py
index c616aa1..4f8b9f4 100755
--- a/zuul/cmd/cloner.py
+++ b/zuul/cmd/cloner.py
@@ -27,6 +27,8 @@
'branch',
'ref',
'url',
+ 'project',
+ 'newrev',
)
@@ -98,6 +100,10 @@
parser.error("Specifying a Zuul ref requires a Zuul url. "
"Define Zuul arguments either via environment "
"variables or using options above.")
+ if 'zuul_newrev' in zuul_args and 'zuul_project' not in zuul_args:
+ parser.error("ZUUL_NEWREV has been specified without "
+ "ZUUL_PROJECT. Please define a ZUUL_PROJECT or do "
+ "not set ZUUL_NEWREV.")
self.args = args
@@ -145,6 +151,8 @@
clone_map_file=self.args.clone_map_file,
project_branches=project_branches,
cache_dir=self.args.cache_dir,
+ zuul_newrev=self.args.zuul_newrev,
+ zuul_project=self.args.zuul_project,
)
cloner.execute()
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
new file mode 100644
index 0000000..49643ae
--- /dev/null
+++ b/zuul/cmd/launcher.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import daemon
+import extras
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+import logging
+import os
+import socket
+import sys
+import signal
+
+import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
+
+# No zuul imports that pull in paramiko here; it must not be
+# imported until after the daemonization.
+# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
+
+
+class Launcher(zuul.cmd.ZuulApp):
+
+ def parse_arguments(self):
+ parser = argparse.ArgumentParser(description='Zuul launch worker.')
+ parser.add_argument('-c', dest='config',
+ help='specify the config file')
+ parser.add_argument('-d', dest='nodaemon', action='store_true',
+ help='do not run as a daemon')
+ parser.add_argument('--version', dest='version', action='version',
+ version=self._get_version(),
+ help='show zuul version')
+ parser.add_argument('--keep-jobdir', dest='keep_jobdir',
+ action='store_true',
+ help='keep local jobdirs after run completes')
+ parser.add_argument('command',
+ choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
+ nargs='?')
+
+ self.args = parser.parse_args()
+
+ def send_command(self, cmd):
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(
+ self.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ path = os.path.join(state_dir, 'launcher.socket')
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(path)
+ s.sendall('%s\n' % cmd)
+
+ def exit_handler(self):
+ self.launcher.stop()
+ self.launcher.join()
+
+ def main(self, daemon=True):
+ # See comment at top of file about zuul imports
+
+ self.setup_logging('launcher', 'log_config')
+
+ self.log = logging.getLogger("zuul.Launcher")
+
+ LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer
+ self.launcher = LaunchServer(self.config,
+ keep_jobdir=self.args.keep_jobdir)
+ self.launcher.start()
+
+ signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
+ if daemon:
+ self.launcher.join()
+ else:
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking launcher to exit nicely...\n")
+ self.exit_handler()
+ sys.exit(0)
+
+
+def main():
+ server = Launcher()
+ server.parse_arguments()
+ server.read_config()
+
+ if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+ server.send_command(server.args.command)
+ sys.exit(0)
+
+ server.configure_connections()
+
+ if server.config.has_option('launcher', 'pidfile'):
+ pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile'))
+ else:
+ pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid'
+ pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+ if server.args.nodaemon:
+ server.main(False)
+ else:
+ with daemon.DaemonContext(pidfile=pid):
+ server.main(True)
+
+
+if __name__ == "__main__":
+ sys.path.insert(0, '.')
+ main()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index df215fd..797a990 100644
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -68,7 +68,7 @@
try:
signal.pause()
except KeyboardInterrupt:
- print "Ctrl + C: asking merger to exit nicely...\n"
+ print("Ctrl + C: asking merger to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
@@ -89,9 +89,7 @@
f.close()
os.unlink(test_fn)
except Exception:
- print
- print "Unable to write to state directory: %s" % state_dir
- print
+ print("\nUnable to write to state directory: %s\n" % state_dir)
raise
if server.config.has_option('merger', 'pidfile'):
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 2aca4f2..0b7538d 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -61,12 +61,9 @@
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.log.debug("Reconfiguration triggered")
- self.sched.stopConnections()
self.read_config()
self.setup_logging('zuul', 'log_config')
try:
- self.configure_connections()
- self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
except Exception:
self.log.exception("Reconfiguration failed:")
@@ -89,8 +86,10 @@
import zuul.trigger.gerrit
logging.basicConfig(level=logging.DEBUG)
- self.sched = zuul.scheduler.Scheduler(self.config)
+ self.sched = zuul.scheduler.Scheduler(self.config,
+ testonly=True)
self.configure_connections()
+ self.sched.registerConnections(self.connections, load=False)
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'),
self.connections)
@@ -109,7 +108,7 @@
jobs.add(v)
for job in sorted(layout.jobs):
if job not in jobs:
- print "Job %s not defined" % job
+ print("FAILURE: Job %s not defined" % job)
failure = True
return failure
@@ -119,18 +118,18 @@
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
- import gear
+ import zuul.lib.gearserver
statsd_host = os.environ.get('STATSD_HOST')
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
if self.config.has_option('gearman_server', 'listen_address'):
host = self.config.get('gearman_server', 'listen_address')
else:
host = None
- gear.Server(4730,
- host=host,
- statsd_host=statsd_host,
- statsd_port=statsd_port,
- statsd_prefix='zuul.geard')
+ zuul.lib.gearserver.GearServer(4730,
+ host=host,
+ statsd_host=statsd_host,
+ statsd_port=statsd_port,
+ statsd_prefix='zuul.geard')
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
@@ -174,7 +173,20 @@
cache_expiry = self.config.getint('zuul', 'status_expiry')
else:
cache_expiry = 1
- webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
+
+ if self.config.has_option('webapp', 'listen_address'):
+ listen_address = self.config.get('webapp', 'listen_address')
+ else:
+ listen_address = '0.0.0.0'
+
+ if self.config.has_option('webapp', 'port'):
+ port = self.config.getint('webapp', 'port')
+ else:
+ port = 8001
+
+ webapp = zuul.webapp.WebApp(
+ self.sched, port=port, cache_expiry=cache_expiry,
+ listen_address=listen_address)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.configure_connections()
@@ -198,7 +210,7 @@
try:
signal.pause()
except KeyboardInterrupt:
- print "Ctrl + C: asking scheduler to exit nicely...\n"
+ print("Ctrl + C: asking scheduler to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index f2ea47a..066b4db 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -43,6 +43,14 @@
self.connection_name = connection_name
self.connection_config = connection_config
+ # Keep track of the sources, triggers and reporters using this
+ # connection
+ self.attached_to = {
+ 'source': [],
+ 'trigger': [],
+ 'reporter': [],
+ }
+
def onLoad(self):
pass
@@ -51,3 +59,13 @@
def registerScheduler(self, sched):
self.sched = sched
+
+ def registerUse(self, what, instance):
+ self.attached_to[what].append(instance)
+
+ def maintainCache(self, relevant):
+ """Make cache contain relevant changes.
+
+ This lets the user supply a list of change objects that are
+ still in use. Anything in our cache that isn't in the supplied
+ list should be safe to remove from the cache."""
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index 5caa7ee..05d6534 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -18,11 +18,11 @@
import json
import time
from six.moves import queue as Queue
+from six.moves import urllib
import paramiko
import logging
import pprint
import voluptuous as v
-import urllib2
from zuul.connection import BaseConnection
from zuul.model import TriggerEvent
@@ -32,7 +32,7 @@
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
- delay = 5.0
+ delay = 10.0
def __init__(self, connection):
super(GerritEventConnector, self).__init__()
@@ -47,7 +47,6 @@
def _handleEvent(self):
ts, data = self.connection.getEvent()
if self._stopped:
- self.connection.eventDone()
return
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
@@ -95,17 +94,29 @@
try:
event.account = data.get(accountfield_from_type[event.type])
except KeyError:
- self.log.error("Received unrecognized event type '%s' from Gerrit.\
+ self.log.warning("Received unrecognized event type '%s' from Gerrit.\
Can not get account information." % event.type)
event.account = None
if (event.change_number and
self.connection.sched.getProject(event.project_name)):
- # Mark the change as needing a refresh in the cache
- event._needs_refresh = True
-
+ # Call _getChange for the side effect of updating the
+ # cache. Note that this modifies Change objects outside
+ # the main thread.
+ # NOTE(jhesketh): Ideally we'd just remove the change from the
+ # cache to denote that it needs updating. However the change
+ # object is already used by Item's and hence BuildSet's etc. and
+ # we need to update those objects by reference so that they have
+ # the correct/new information and also avoid hitting gerrit
+ # multiple times.
+ if self.connection.attached_to['source']:
+ self.connection.attached_to['source'][0]._getChange(
+ event.change_number, event.patch_number, refresh=True)
+ # We only need to do this once since the connection maintains
+ # the cache (which is shared between all the sources)
+ # NOTE(jhesketh): We may couple sources and connections again
+ # at which point this becomes more sensible.
self.connection.sched.addEvent(event)
- self.connection.eventDone()
def run(self):
while True:
@@ -115,19 +126,23 @@
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
+ finally:
+ self.connection.eventDone()
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
+ poll_timeout = 500
def __init__(self, gerrit_connection, username, hostname, port=29418,
- keyfile=None):
+ keyfile=None, keepalive=60):
threading.Thread.__init__(self)
self.username = username
self.keyfile = keyfile
self.hostname = hostname
self.port = port
self.gerrit_connection = gerrit_connection
+ self.keepalive = keepalive
self._stopped = False
def _read(self, fd):
@@ -141,7 +156,7 @@
poll = select.poll()
poll.register(stdout.channel)
while not self._stopped:
- ret = poll.poll()
+ ret = poll.poll(self.poll_timeout)
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
@@ -158,6 +173,8 @@
username=self.username,
port=self.port,
key_filename=self.keyfile)
+ transport = client.get_transport()
+ transport.set_keepalive(self.keepalive)
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
@@ -210,6 +227,7 @@
self.server = self.connection_config.get('server')
self.port = int(self.connection_config.get('port', 29418))
self.keyfile = self.connection_config.get('sshkey', None)
+ self.keepalive = int(self.connection_config.get('keepalive', 60))
self.watcher_thread = None
self.event_queue = None
self.client = None
@@ -342,6 +360,8 @@
username=self.user,
port=self.port,
key_filename=self.keyfile)
+ transport = client.get_transport()
+ transport.set_keepalive(self.keepalive)
self.client = client
def _ssh(self, command, stdin_data=None):
@@ -374,10 +394,10 @@
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
- data = urllib2.urlopen(url).read()
+ data = urllib.request.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
- raise # keeps urllib2 error informations
+ raise # keeps urllib error informations
ret = {}
read_headers = False
read_advertisement = False
@@ -447,7 +467,8 @@
self.user,
self.server,
self.port,
- keyfile=self.keyfile)
+ keyfile=self.keyfile,
+ keepalive=self.keepalive)
self.watcher_thread.start()
def _stop_event_connector(self):
diff --git a/zuul/exceptions.py b/zuul/exceptions.py
index 2bd2c6b..40a1e40 100644
--- a/zuul/exceptions.py
+++ b/zuul/exceptions.py
@@ -22,5 +22,14 @@
super(ChangeNotFound, self).__init__(message)
+class RevNotFound(Exception):
+ def __init__(self, project, rev):
+ self.project = project
+ self.revision = rev
+ message = ("Failed to checkout project '%s' at revision '%s'"
+ % (self.project, self.revision))
+ super(RevNotFound, self).__init__(message)
+
+
class MergeFailure(Exception):
pass
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
new file mode 100644
index 0000000..c70302b
--- /dev/null
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -0,0 +1,1573 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import os
+import re
+import shutil
+import signal
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+import traceback
+import uuid
+import Queue
+
+import gear
+import yaml
+import jenkins_jobs.builder
+import jenkins_jobs.formatter
+import zmq
+
+import zuul.ansible.library
+from zuul.lib import commandsocket
+
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_PRE_TIMEOUT = 10 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 30 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
+ 'verbose', 'unverbose']
+
+
+def boolify(x):
+ if isinstance(x, str):
+ return bool(int(x))
+ return bool(x)
+
+
+class LaunchGearWorker(gear.Worker):
+ def __init__(self, *args, **kw):
+ self.__launch_server = kw.pop('launch_server')
+ super(LaunchGearWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ workers = len(self.__launch_server.node_workers)
+ delay = (workers ** 2) / 1000.0
+ time.sleep(delay)
+ return super(LaunchGearWorker, self).handleNoop(packet)
+
+
+class NodeGearWorker(gear.Worker):
+ MASS_DO = 101
+
+ def sendMassDo(self, functions):
+ names = [gear.convert_to_bytes(x) for x in functions]
+ data = b'\x00'.join(names)
+ new_function_dict = {}
+ for name in names:
+ new_function_dict[name] = gear.FunctionRecord(name)
+ self.broadcast_lock.acquire()
+ try:
+ p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+ self.broadcast(p)
+ self.functions = new_function_dict
+ finally:
+ self.broadcast_lock.release()
+
+
+class Watchdog(object):
+ def __init__(self, timeout, function, args):
+ self.timeout = timeout
+ self.function = function
+ self.args = args
+ self.thread = threading.Thread(target=self._run)
+ self.thread.daemon = True
+
+ def _run(self):
+ while self._running and time.time() < self.end:
+ time.sleep(10)
+ if self._running:
+ self.function(*self.args)
+
+ def start(self):
+ self._running = True
+ self.end = time.time() + self.timeout
+ self.thread.start()
+
+ def stop(self):
+ self._running = False
+
+
+class JobDir(object):
+ def __init__(self, keep=False):
+ self.keep = keep
+ self.root = tempfile.mkdtemp()
+ self.ansible_root = os.path.join(self.root, 'ansible')
+ os.makedirs(self.ansible_root)
+ self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
+ self.inventory = os.path.join(self.ansible_root, 'inventory')
+ self.vars = os.path.join(self.ansible_root, 'vars.yaml')
+ self.pre_playbook = os.path.join(self.ansible_root, 'pre_playbook')
+ self.playbook = os.path.join(self.ansible_root, 'playbook')
+ self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
+ self.config = os.path.join(self.ansible_root, 'ansible.cfg')
+ self.pre_post_config = os.path.join(self.ansible_root,
+ 'ansible_pre_post.cfg')
+ self.script_root = os.path.join(self.ansible_root, 'scripts')
+ self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
+ os.makedirs(self.script_root)
+ self.staging_root = os.path.join(self.root, 'staging')
+ os.makedirs(self.staging_root)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, etype, value, tb):
+ if not self.keep:
+ shutil.rmtree(self.root)
+
+
+class LaunchServer(object):
+ log = logging.getLogger("zuul.LaunchServer")
+ site_section_re = re.compile('site "(.*?)"')
+ node_section_re = re.compile('node "(.*?)"')
+
+ def __init__(self, config, keep_jobdir=False):
+ self.config = config
+ self.options = dict(
+ verbose=False
+ )
+ self.keep_jobdir = keep_jobdir
+ self.hostname = socket.gethostname()
+ self.registered_functions = set()
+ self.node_workers = {}
+ self.jobs = {}
+ self.builds = {}
+ self.zmq_send_queue = Queue.Queue()
+ self.termination_queue = Queue.Queue()
+ self.sites = {}
+ self.static_nodes = {}
+ self.command_map = dict(
+ reconfigure=self.reconfigure,
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ release=self.release,
+ graceful=self.graceful,
+ verbose=self.verboseOn,
+ unverbose=self.verboseOff,
+ )
+
+ if config.has_option('launcher', 'accept_nodes'):
+ self.accept_nodes = config.getboolean('launcher',
+ 'accept_nodes')
+ else:
+ self.accept_nodes = True
+ self.config_accept_nodes = self.accept_nodes
+
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(
+ self.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ path = os.path.join(state_dir, 'launcher.socket')
+ self.command_socket = commandsocket.CommandSocket(path)
+ ansible_dir = os.path.join(state_dir, 'ansible')
+ self.library_dir = os.path.join(ansible_dir, 'library')
+ if not os.path.exists(self.library_dir):
+ os.makedirs(self.library_dir)
+ self.pre_post_library_dir = os.path.join(ansible_dir,
+ 'pre_post_library')
+ if not os.path.exists(self.pre_post_library_dir):
+ os.makedirs(self.pre_post_library_dir)
+
+ library_path = os.path.dirname(os.path.abspath(
+ zuul.ansible.library.__file__))
+ # Ansible library modules that should be available to all
+ # playbooks:
+ all_libs = ['zuul_log.py', 'zuul_console.py', 'zuul_afs.py']
+ # Modules that should only be used by job playbooks:
+ job_libs = ['command.py']
+
+ for fn in all_libs:
+ shutil.copy(os.path.join(library_path, fn), self.library_dir)
+ shutil.copy(os.path.join(library_path, fn),
+ self.pre_post_library_dir)
+ for fn in job_libs:
+ shutil.copy(os.path.join(library_path, fn), self.library_dir)
+
+ def get_config_default(section, option, default):
+ if config.has_option(section, option):
+ return config.get(section, option)
+ return default
+
+ for section in config.sections():
+ m = self.site_section_re.match(section)
+ if m:
+ sitename = m.group(1)
+ d = {}
+ d['host'] = get_config_default(section, 'host', None)
+ d['user'] = get_config_default(section, 'user', '')
+ d['pass'] = get_config_default(section, 'pass', '')
+ d['root'] = get_config_default(section, 'root', '/')
+ d['keytab'] = get_config_default(section, 'keytab', None)
+ self.sites[sitename] = d
+ continue
+ m = self.node_section_re.match(section)
+ if m:
+ nodename = m.group(1)
+ d = {}
+ d['name'] = nodename
+ d['host'] = config.get(section, 'host')
+ d['description'] = get_config_default(section,
+ 'description', '')
+ if config.has_option(section, 'labels'):
+ d['labels'] = config.get(section, 'labels').split(',')
+ else:
+ d['labels'] = []
+ self.static_nodes[nodename] = d
+ continue
+
+ def start(self):
+ self._gearman_running = True
+ self._zmq_running = True
+ self._reaper_running = True
+ self._command_running = True
+
+ # Setup ZMQ
+ self.zcontext = zmq.Context()
+ self.zsocket = self.zcontext.socket(zmq.PUB)
+ self.zsocket.bind("tcp://*:8888")
+
+ # Setup Gearman
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = LaunchGearWorker('Zuul Launch Server',
+ launch_server=self)
+ self.worker.addServer(server, port)
+ self.log.debug("Waiting for server")
+ self.worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+
+ # Start command socket
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand)
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
+ # Load JJB config
+ self.loadJobs()
+
+ # Start ZMQ worker thread
+ self.log.debug("Starting ZMQ processor")
+ self.zmq_thread = threading.Thread(target=self.runZMQ)
+ self.zmq_thread.daemon = True
+ self.zmq_thread.start()
+
+ # Start node worker reaper thread
+ self.log.debug("Starting reaper")
+ self.reaper_thread = threading.Thread(target=self.runReaper)
+ self.reaper_thread.daemon = True
+ self.reaper_thread.start()
+
+ # Start Gearman worker thread
+ self.log.debug("Starting worker")
+ self.gearman_thread = threading.Thread(target=self.run)
+ self.gearman_thread.daemon = True
+ self.gearman_thread.start()
+
+ # Start static workers
+ for node in self.static_nodes.values():
+ self.log.debug("Creating static node with arguments: %s" % (node,))
+ self._launchWorker(node)
+
+ def loadJobs(self):
+ self.log.debug("Loading jobs")
+ builder = JJB()
+ path = self.config.get('launcher', 'jenkins_jobs')
+ builder.load_files([path])
+ builder.parser.expandYaml()
+ unseen = set(self.jobs.keys())
+ for job in builder.parser.jobs:
+ builder.expandMacros(job)
+ self.jobs[job['name']] = job
+ unseen.discard(job['name'])
+ for name in unseen:
+ del self.jobs[name]
+
+ def register(self):
+ new_functions = set()
+ if self.accept_nodes:
+ new_functions.add("node_assign:zuul")
+ new_functions.add("stop:%s" % self.hostname)
+ new_functions.add("set_description:%s" % self.hostname)
+ new_functions.add("node_revoke:%s" % self.hostname)
+
+ for function in new_functions - self.registered_functions:
+ self.worker.registerFunction(function)
+ for function in self.registered_functions - new_functions:
+ self.worker.unRegisterFunction(function)
+ self.registered_functions = new_functions
+
+ def reconfigure(self):
+ self.log.debug("Reconfiguring")
+ self.loadJobs()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='reconfigure'))
+ except Exception:
+ self.log.exception("Exception sending reconfigure command "
+ "to worker:")
+ self.log.debug("Reconfiguration complete")
+
+ def pause(self):
+ self.log.debug("Pausing")
+ self.accept_nodes = False
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='pause'))
+ except Exception:
+ self.log.exception("Exception sending pause command "
+ "to worker:")
+ self.log.debug("Paused")
+
+ def unpause(self):
+ self.log.debug("Unpausing")
+ self.accept_nodes = self.config_accept_nodes
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='unpause'))
+ except Exception:
+ self.log.exception("Exception sending unpause command "
+ "to worker:")
+ self.log.debug("Unpaused")
+
+ def release(self):
+ self.log.debug("Releasing idle nodes")
+ for node in self.node_workers.values():
+ if node.name in self.static_nodes:
+ continue
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='release'))
+ except Exception:
+ self.log.exception("Exception sending release command "
+ "to worker:")
+ self.log.debug("Finished releasing idle nodes")
+
+ def graceful(self):
+ # Note: this is run in the command processing thread; no more
+ # external commands will be processed after this.
+ self.log.debug("Gracefully stopping")
+ self.pause()
+ self.release()
+ self.log.debug("Waiting for all builds to finish")
+ while self.builds:
+ time.sleep(5)
+ self.log.debug("All builds are finished")
+ self.stop()
+
+ def stop(self):
+ self.log.debug("Stopping")
+ # First, stop accepting new jobs
+ self._gearman_running = False
+ self._reaper_running = False
+ self.worker.shutdown()
+ # Then stop all of the workers
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.stop()
+ except Exception:
+ self.log.exception("Exception sending stop command to worker:")
+ # Stop ZMQ afterwords so that the send queue is flushed
+ self._zmq_running = False
+ self.zmq_send_queue.put(None)
+ self.zmq_send_queue.join()
+ # Stop command processing
+ self._command_running = False
+ self.command_socket.stop()
+ # Join the gearman thread which was stopped earlier.
+ self.gearman_thread.join()
+ # The command thread is joined in the join() method of this
+ # class, which is called by the command shell.
+ self.log.debug("Stopped")
+
+ def verboseOn(self):
+ self.log.debug("Enabling verbose mode")
+ self.options['verbose'] = True
+
+ def verboseOff(self):
+ self.log.debug("Disabling verbose mode")
+ self.options['verbose'] = False
+
+ def join(self):
+ self.command_thread.join()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get()
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
+
+ def runZMQ(self):
+ while self._zmq_running or not self.zmq_send_queue.empty():
+ try:
+ item = self.zmq_send_queue.get()
+ self.log.debug("Got ZMQ event %s" % (item,))
+ if item is None:
+ continue
+ self.zsocket.send(item)
+ except Exception:
+ self.log.exception("Exception while processing ZMQ events")
+ finally:
+ self.zmq_send_queue.task_done()
+
+ def run(self):
+ while self._gearman_running:
+ try:
+ job = self.worker.getJob()
+ try:
+ if job.name.startswith('node_assign:'):
+ self.log.debug("Got node_assign job: %s" % job.unique)
+ self.assignNode(job)
+ elif job.name.startswith('stop:'):
+ self.log.debug("Got stop job: %s" % job.unique)
+ self.stopJob(job)
+ elif job.name.startswith('set_description:'):
+ self.log.debug("Got set_description job: %s" %
+ job.unique)
+ job.sendWorkComplete()
+ elif job.name.startswith('node_revoke:'):
+ self.log.debug("Got node_revoke job: %s" % job.unique)
+ self.revokeNode(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+ except gear.InterruptedError:
+ return
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def assignNode(self, job):
+ args = json.loads(job.arguments)
+ self.log.debug("Assigned node with arguments: %s" % (args,))
+ self._launchWorker(args)
+ data = dict(manager=self.hostname)
+ job.sendWorkData(json.dumps(data))
+ job.sendWorkComplete()
+
+ def _launchWorker(self, args):
+ worker = NodeWorker(self.config, self.jobs, self.builds,
+ self.sites, args['name'], args['host'],
+ args['description'], args['labels'],
+ self.hostname, self.zmq_send_queue,
+ self.termination_queue, self.keep_jobdir,
+ self.library_dir, self.pre_post_library_dir,
+ self.options)
+ self.node_workers[worker.name] = worker
+
+ worker.thread = threading.Thread(target=worker.run)
+ worker.thread.start()
+
+ def revokeNode(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Revoke job with arguments: %s" % (args,))
+ name = args['name']
+ node = self.node_workers.get(name)
+ if not node:
+ self.log.debug("Unable to find worker %s" % (name,))
+ return
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='stop'))
+ else:
+ self.log.debug("Node %s is not alive while revoking node" %
+ (node.name,))
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+ finally:
+ job.sendWorkComplete()
+
+ def stopJob(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Stop job with arguments: %s" % (args,))
+ unique = args['number']
+ build_worker_name = self.builds.get(unique)
+ if not build_worker_name:
+ self.log.debug("Unable to find build for job %s" % (unique,))
+ return
+ node = self.node_workers.get(build_worker_name)
+ if not node:
+ self.log.debug("Unable to find worker for job %s" % (unique,))
+ return
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='abort'))
+ else:
+ self.log.debug("Node %s is not alive while aborting job" %
+ (node.name,))
+ except Exception:
+ self.log.exception("Exception sending abort command "
+ "to worker:")
+ finally:
+ job.sendWorkComplete()
+
+ def runReaper(self):
+ # We don't actually care if all the events are processed
+ while self._reaper_running:
+ try:
+ item = self.termination_queue.get()
+ self.log.debug("Got termination event %s" % (item,))
+ if item is None:
+ continue
+ worker = self.node_workers[item]
+ self.log.debug("Joining %s" % (item,))
+ worker.thread.join()
+ self.log.debug("Joined %s" % (item,))
+ del self.node_workers[item]
+ except Exception:
+ self.log.exception("Exception while processing "
+ "termination events:")
+ finally:
+ self.termination_queue.task_done()
+
+
+class NodeWorker(object):
+ retry_args = dict(register='task_result',
+ until='task_result.rc == 0',
+ retries=3,
+ delay=30)
+
+ def __init__(self, config, jobs, builds, sites, name, host,
+ description, labels, manager_name, zmq_send_queue,
+ termination_queue, keep_jobdir, library_dir,
+ pre_post_library_dir, options):
+ self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
+ self.log.debug("Creating node worker %s" % (name,))
+ self.config = config
+ self.jobs = jobs
+ self.builds = builds
+ self.sites = sites
+ self.name = name
+ self.host = host
+ self.description = description
+ if not isinstance(labels, list):
+ labels = [labels]
+ self.labels = labels
+ self.thread = None
+ self.registered_functions = set()
+ # If the unpaused Event is set, that means we should run jobs.
+ # If it is clear, then we are paused and should not run jobs.
+ self.unpaused = threading.Event()
+ self.unpaused.set()
+ self._running = True
+ self.queue = Queue.Queue()
+ self.manager_name = manager_name
+ self.zmq_send_queue = zmq_send_queue
+ self.termination_queue = termination_queue
+ self.keep_jobdir = keep_jobdir
+ self.running_job_lock = threading.Lock()
+ self.pending_registration = False
+ self.registration_lock = threading.Lock()
+ self._get_job_lock = threading.Lock()
+ self._got_job = False
+ self._job_complete_event = threading.Event()
+ self._running_job = False
+ self._aborted_job = False
+ self._watchdog_timeout = False
+ self._sent_complete_event = False
+ self.ansible_pre_proc = None
+ self.ansible_job_proc = None
+ self.ansible_post_proc = None
+ self.workspace_root = config.get('launcher', 'workspace_root')
+ if self.config.has_option('launcher', 'private_key_file'):
+ self.private_key_file = config.get('launcher', 'private_key_file')
+ else:
+ self.private_key_file = '~/.ssh/id_rsa'
+ if self.config.has_option('launcher', 'username'):
+ self.username = config.get('launcher', 'username')
+ else:
+ self.username = 'zuul'
+ self.library_dir = library_dir
+ self.pre_post_library_dir = pre_post_library_dir
+ self.options = options
+
+ def isAlive(self):
+ # Meant to be called from the manager
+ if self.thread and self.thread.is_alive():
+ return True
+ return False
+
+ def run(self):
+ self.log.debug("Node worker %s starting" % (self.name,))
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = NodeGearWorker(self.name)
+ self.worker.addServer(server, port)
+ self.log.debug("Waiting for server")
+ self.worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+
+ self.gearman_thread = threading.Thread(target=self.runGearman)
+ self.gearman_thread.daemon = True
+ self.gearman_thread.start()
+
+ self.log.debug("Started")
+
+ while self._running or not self.queue.empty():
+ try:
+ self._runQueue()
+ except Exception:
+ self.log.exception("Exception in queue manager:")
+
+ def stop(self):
+ # If this is called locally, setting _running will be
+ # effictive, if it's called remotely, it will not be, but it
+ # will be set by the queue thread.
+ self.log.debug("Submitting stop request")
+ self._running = False
+ self.unpaused.set()
+ self.queue.put(dict(action='stop'))
+ self.queue.join()
+
+ def pause(self):
+ self.unpaused.clear()
+ self.worker.stopWaitingForJobs()
+
+ def unpause(self):
+ self.unpaused.set()
+
+ def release(self):
+ # If this node is idle, stop it.
+ old_unpaused = self.unpaused.is_set()
+ if old_unpaused:
+ self.pause()
+ with self._get_job_lock:
+ if self._got_job:
+ self.log.debug("This worker is not idle")
+ if old_unpaused:
+ self.unpause()
+ return
+ self.log.debug("Stopping due to release command")
+ self.queue.put(dict(action='stop'))
+
+ def _runQueue(self):
+ item = self.queue.get()
+ try:
+ if item['action'] == 'stop':
+ self.log.debug("Received stop request")
+ self._running = False
+ self.termination_queue.put(self.name)
+ if not self.abortRunningJob():
+ self.sendFakeCompleteEvent()
+ else:
+ self._job_complete_event.wait()
+ self.worker.shutdown()
+ if item['action'] == 'pause':
+ self.log.debug("Received pause request")
+ self.pause()
+ if item['action'] == 'unpause':
+ self.log.debug("Received unpause request")
+ self.unpause()
+ if item['action'] == 'release':
+ self.log.debug("Received release request")
+ self.release()
+ elif item['action'] == 'reconfigure':
+ self.log.debug("Received reconfigure request")
+ self.register()
+ elif item['action'] == 'abort':
+ self.log.debug("Received abort request")
+ self.abortRunningJob()
+ finally:
+ self.queue.task_done()
+
+ def runGearman(self):
+ while self._running:
+ try:
+ self.unpaused.wait()
+ if self._running:
+ self._runGearman()
+ except Exception:
+ self.log.exception("Exception in gearman manager:")
+ with self._get_job_lock:
+ self._got_job = False
+
+ def _runGearman(self):
+ if self.pending_registration:
+ self.register()
+ with self._get_job_lock:
+ try:
+ job = self.worker.getJob()
+ self._got_job = True
+ except gear.InterruptedError:
+ return
+ self.log.debug("Node worker %s got job %s" % (self.name, job.name))
+ try:
+ if job.name not in self.registered_functions:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ return
+ self.launch(job)
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+
+ def generateFunctionNames(self, job):
+ # This only supports "node: foo" and "node: foo || bar"
+ ret = set()
+ job_labels = job.get('node')
+ matching_labels = set()
+ if job_labels:
+ job_labels = [x.strip() for x in job_labels.split('||')]
+ matching_labels = set(self.labels) & set(job_labels)
+ if not matching_labels:
+ return ret
+ ret.add('build:%s' % (job['name'],))
+ for label in matching_labels:
+ ret.add('build:%s:%s' % (job['name'], label))
+ return ret
+
+ def register(self):
+ if not self.registration_lock.acquire(False):
+ self.log.debug("Registration already in progress")
+ return
+ try:
+ if self._running_job:
+ self.pending_registration = True
+ self.log.debug("Ignoring registration due to running job")
+ return
+ self.log.debug("Updating registration")
+ self.pending_registration = False
+ new_functions = set()
+ for job in self.jobs.values():
+ new_functions |= self.generateFunctionNames(job)
+ self.worker.sendMassDo(new_functions)
+ self.registered_functions = new_functions
+ finally:
+ self.registration_lock.release()
+
+ def abortRunningJob(self):
+ self._aborted_job = True
+ return self.abortRunningProc(self.ansible_job_proc)
+
+ def abortRunningProc(self, proc):
+ aborted = False
+ self.log.debug("Abort: acquiring job lock")
+ with self.running_job_lock:
+ if self._running_job:
+ self.log.debug("Abort: a job is running")
+ if proc:
+ self.log.debug("Abort: sending kill signal to job "
+ "process group")
+ try:
+ pgid = os.getpgid(proc.pid)
+ os.killpg(pgid, signal.SIGKILL)
+ aborted = True
+ except Exception:
+ self.log.exception("Exception while killing "
+ "ansible process:")
+ else:
+ self.log.debug("Abort: no job is running")
+
+ return aborted
+
+ def launch(self, job):
+ self.log.info("Node worker %s launching job %s" %
+ (self.name, job.name))
+
+ # Make sure we can parse what we need from the job first
+ args = json.loads(job.arguments)
+ offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
+ job_name = job.name.split(':')[1]
+
+ # Initialize the result so we have something regardless of
+ # whether the job actually runs
+ result = None
+ self._sent_complete_event = False
+ self._aborted_job = False
+ self._watchdog_timeout = False
+
+ try:
+ self.sendStartEvent(job_name, args)
+ except Exception:
+ self.log.exception("Exception while sending job start event")
+
+ try:
+ result = self.runJob(job, args)
+ except Exception:
+ self.log.exception("Exception while launching job thread")
+
+ self._running_job = False
+
+ try:
+ data = json.dumps(dict(result=result))
+ job.sendWorkComplete(data)
+ except Exception:
+ self.log.exception("Exception while sending job completion packet")
+
+ try:
+ self.sendCompleteEvent(job_name, result, args)
+ except Exception:
+ self.log.exception("Exception while sending job completion event")
+
+ try:
+ del self.builds[job.unique]
+ except Exception:
+ self.log.exception("Exception while clearing build record")
+
+ self._job_complete_event.set()
+ if offline and self._running:
+ self.stop()
+
+ def sendStartEvent(self, name, parameters):
+ build = dict(node_name=self.name,
+ host_name=self.manager_name,
+ parameters=parameters)
+
+ event = dict(name=name,
+ build=build)
+
+ item = "onStarted %s" % json.dumps(event)
+ self.log.debug("Sending over ZMQ: %s" % (item,))
+ self.zmq_send_queue.put(item)
+
+ def sendCompleteEvent(self, name, status, parameters):
+ build = dict(status=status,
+ node_name=self.name,
+ host_name=self.manager_name,
+ parameters=parameters)
+
+ event = dict(name=name,
+ build=build)
+
+ item = "onFinalized %s" % json.dumps(event)
+ self.log.debug("Sending over ZMQ: %s" % (item,))
+ self.zmq_send_queue.put(item)
+ self._sent_complete_event = True
+
+ def sendFakeCompleteEvent(self):
+ if self._sent_complete_event:
+ return
+ self.sendCompleteEvent('zuul:launcher-shutdown',
+ 'SUCCESS', {})
+
+ def runJob(self, job, args):
+ self.ansible_pre_proc = None
+ self.ansible_job_proc = None
+ self.ansible_post_proc = None
+ result = None
+ with self.running_job_lock:
+ if not self._running:
+ return result
+ self._running_job = True
+ self._job_complete_event.clear()
+
+ self.log.debug("Job %s: beginning" % (job.unique,))
+ self.builds[job.unique] = self.name
+ with JobDir(self.keep_jobdir) as jobdir:
+ self.log.debug("Job %s: job root at %s" %
+ (job.unique, jobdir.root))
+ timeout = self.prepareAnsibleFiles(jobdir, job, args)
+
+ data = {
+ 'manager': self.manager_name,
+ 'number': job.unique,
+ }
+ if ':' in self.host:
+ data['url'] = 'telnet://[%s]:19885' % self.host
+ else:
+ data['url'] = 'telnet://%s:19885' % self.host
+
+ job.sendWorkData(json.dumps(data))
+ job.sendWorkStatus(0, 100)
+
+ pre_status = self.runAnsiblePrePlaybook(jobdir)
+ if pre_status is None:
+ # These should really never fail, so return None and have
+ # zuul try again
+ return result
+
+ job_status = self.runAnsiblePlaybook(jobdir, timeout)
+ if job_status is None:
+ # The result of the job is indeterminate. Zuul will
+ # run it again.
+ return result
+
+ post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
+ if not post_status:
+ result = 'POST_FAILURE'
+ elif job_status:
+ result = 'SUCCESS'
+ else:
+ result = 'FAILURE'
+
+ if self._aborted_job and not self._watchdog_timeout:
+ # A Null result will cause zuul to relaunch the job if
+ # it needs to.
+ result = None
+
+ return result
+
+ def getHostList(self):
+ return [('node', dict(
+ ansible_host=self.host, ansible_user=self.username))]
+
+ def _substituteVariables(self, text, variables):
+ def lookup(match):
+ return variables.get(match.group(1), '')
+ return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+ def _getRsyncOptions(self, source, parameters):
+ # Treat the publisher source as a filter; ant and rsync behave
+ # fairly close in this manner, except for leading directories.
+ source = self._substituteVariables(source, parameters)
+ # If the source starts with ** then we want to match any
+ # number of directories, so don't anchor the include filter.
+ # If it does not start with **, then the intent is likely to
+ # at least start by matching an immediate file or subdirectory
+ # (even if later we have a ** in the middle), so in this case,
+ # anchor it to the root of the transfer (the workspace).
+ if not source.startswith('**'):
+ source = os.path.join('/', source)
+ # These options mean: include the thing we want, include any
+ # directories (so that we continue to search for the thing we
+ # want no matter how deep it is), exclude anything that
+ # doesn't match the thing we want or is a directory, then get
+ # rid of empty directories left over at the end.
+ rsync_opts = ['--include="%s"' % source,
+ '--include="*/"',
+ '--exclude="*"',
+ '--prune-empty-dirs']
+ return rsync_opts
+
+ def _makeSCPTask(self, jobdir, publisher, parameters):
+ tasks = []
+ for scpfile in publisher['scp']['files']:
+ scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+ os.chmod(scproot, 0o755)
+
+ site = publisher['scp']['site']
+ if scpfile.get('copy-console'):
+ # Include the local ansible directory in the console
+ # upload. This uploads the playbook and ansible logs.
+ copyargs = dict(src=jobdir.ansible_root + '/',
+ dest=os.path.join(scproot, '_zuul_ansible'))
+ task = dict(name='copy console log',
+ copy=copyargs,
+ delegate_to='127.0.0.1')
+ # This is a local copy and should not fail, so does
+ # not need a retry stanza.
+ tasks.append(task)
+
+ # Fetch the console log from the remote host.
+ src = '/tmp/console.html'
+ rsync_opts = []
+ else:
+ src = parameters['WORKSPACE']
+ if not src.endswith('/'):
+ src = src + '/'
+ rsync_opts = self._getRsyncOptions(scpfile['source'],
+ parameters)
+
+ syncargs = dict(src=src,
+ dest=scproot,
+ copy_links='yes',
+ mode='pull')
+ if rsync_opts:
+ syncargs['rsync_opts'] = rsync_opts
+ task = dict(name='copy files from node',
+ synchronize=syncargs)
+ if not scpfile.get('copy-after-failure'):
+ task['when'] = 'success|bool'
+ # We don't use retry_args here because there is a bug in
+ # the synchronize module that breaks subsequent attempts at
+ # retrying. Better to try once and get an accurate error
+ # message if it fails.
+ # https://github.com/ansible/ansible/issues/18281
+ tasks.append(task)
+
+ task = self._makeSCPTaskLocalAction(
+ site, scpfile, scproot, parameters)
+ task.update(self.retry_args)
+ tasks.append(task)
+ return tasks
+
+ def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+ if site not in self.sites:
+ raise Exception("Undefined SCP site: %s" % (site,))
+ site = self.sites[site]
+ dest = scpfile['target'].lstrip('/')
+ dest = self._substituteVariables(dest, parameters)
+ dest = os.path.join(site['root'], dest)
+ dest = os.path.normpath(dest)
+ if not dest.startswith(site['root']):
+ raise Exception("Target path %s is not below site root" %
+ (dest,))
+
+ rsync_cmd = [
+ '/usr/bin/rsync', '--delay-updates', '-F',
+ '--compress', '-rt', '--safe-links',
+ '--rsync-path="mkdir -p {dest} && rsync"',
+ '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+ '-o StrictHostKeyChecking=no -q"',
+ '--out-format="<<CHANGED>>%i %n%L"',
+ '{source}', '"{user}@{host}:{dest}"'
+ ]
+ if scpfile.get('keep-hierarchy'):
+ source = '"%s/"' % scproot
+ else:
+ source = '`/usr/bin/find "%s" -type f`' % scproot
+ shellargs = ' '.join(rsync_cmd).format(
+ source=source,
+ dest=dest,
+ private_key_file=self.private_key_file,
+ host=site['host'],
+ user=site['user'])
+ task = dict(name='rsync logs to server',
+ shell=shellargs,
+ delegate_to='127.0.0.1')
+ if not scpfile.get('copy-after-failure'):
+ task['when'] = 'success|bool'
+
+ return task
+
+ def _makeFTPTask(self, jobdir, publisher, parameters):
+ tasks = []
+ ftp = publisher['ftp']
+ site = ftp['site']
+ if site not in self.sites:
+ raise Exception("Undefined FTP site: %s" % site)
+ site = self.sites[site]
+
+ ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+ ftpcontent = os.path.join(ftproot, 'content')
+ os.makedirs(ftpcontent)
+ ftpscript = os.path.join(ftproot, 'script')
+
+ src = parameters['WORKSPACE']
+ if not src.endswith('/'):
+ src = src + '/'
+ rsync_opts = self._getRsyncOptions(ftp['source'],
+ parameters)
+ syncargs = dict(src=src,
+ dest=ftpcontent,
+ copy_links='yes',
+ mode='pull')
+ if rsync_opts:
+ syncargs['rsync_opts'] = rsync_opts
+ task = dict(name='copy files from node',
+ synchronize=syncargs,
+ when='success|bool')
+ # We don't use retry_args here because there is a bug in the
+ # synchronize module that breaks subsequent attempts at retrying.
+ # Better to try once and get an accurate error message if it fails.
+ # https://github.com/ansible/ansible/issues/18281
+ tasks.append(task)
+ task = dict(name='FTP files to server',
+ shell='lftp -f %s' % ftpscript,
+ when='success|bool',
+ delegate_to='127.0.0.1')
+ ftpsource = ftpcontent
+ if ftp.get('remove-prefix'):
+ ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
+ while ftpsource[-1] == '/':
+ ftpsource = ftpsource[:-1]
+ ftptarget = ftp['target'].lstrip('/')
+ ftptarget = self._substituteVariables(ftptarget, parameters)
+ ftptarget = os.path.join(site['root'], ftptarget)
+ ftptarget = os.path.normpath(ftptarget)
+ if not ftptarget.startswith(site['root']):
+ raise Exception("Target path %s is not below site root" %
+ (ftptarget,))
+ while ftptarget[-1] == '/':
+ ftptarget = ftptarget[:-1]
+ with open(ftpscript, 'w') as script:
+ script.write('open %s\n' % site['host'])
+ script.write('user %s %s\n' % (site['user'], site['pass']))
+ script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
+ task.update(self.retry_args)
+ tasks.append(task)
+ return tasks
+
+ def _makeAFSTask(self, jobdir, publisher, parameters):
+ tasks = []
+ afs = publisher['afs']
+ site = afs['site']
+ if site not in self.sites:
+ raise Exception("Undefined AFS site: %s" % site)
+ site = self.sites[site]
+
+ afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
+ afscontent = os.path.join(afsroot, 'content')
+ afssource = afscontent
+ if afs.get('remove-prefix'):
+ afssource = os.path.join(afscontent, afs['remove-prefix'])
+ while afssource[-1] == '/':
+ afssource = afssource[:-1]
+
+ src = parameters['WORKSPACE']
+ if not src.endswith('/'):
+ src = src + '/'
+ rsync_opts = self._getRsyncOptions(afs['source'],
+ parameters)
+ syncargs = dict(src=src,
+ dest=afscontent,
+ copy_links='yes',
+ mode='pull')
+ if rsync_opts:
+ syncargs['rsync_opts'] = rsync_opts
+ task = dict(name='copy files from node',
+ synchronize=syncargs,
+ when='success|bool')
+ # We don't use retry_args here because there is a bug in the
+ # synchronize module that breaks subsequent attempts at retrying.
+ # Better to try once and get an accurate error message if it fails.
+ # https://github.com/ansible/ansible/issues/18281
+ tasks.append(task)
+
+ afstarget = afs['target'].lstrip('/')
+ afstarget = self._substituteVariables(afstarget, parameters)
+ afstarget = os.path.join(site['root'], afstarget)
+ afstarget = os.path.normpath(afstarget)
+ if not afstarget.startswith(site['root']):
+ raise Exception("Target path %s is not below site root" %
+ (afstarget,))
+
+ afsargs = dict(user=site['user'],
+ keytab=site['keytab'],
+ root=afsroot,
+ source=afssource,
+ target=afstarget)
+
+ task = dict(name='Synchronize files to AFS',
+ zuul_afs=afsargs,
+ when='success|bool',
+ delegate_to='127.0.0.1')
+ tasks.append(task)
+
+ return tasks
+
+ def _makeBuilderTask(self, jobdir, builder, parameters, sequence):
+ tasks = []
+ script_fn = '%02d-%s.sh' % (sequence, str(uuid.uuid4().hex))
+ script_path = os.path.join(jobdir.script_root, script_fn)
+ with open(script_path, 'w') as script:
+ data = builder['shell']
+ if not data.startswith('#!'):
+ data = '#!/bin/bash -x\n %s' % (data,)
+ script.write(data)
+
+ remote_path = os.path.join('/tmp', script_fn)
+ copy = dict(src=script_path,
+ dest=remote_path,
+ mode=0o555)
+ task = dict(copy=copy)
+ tasks.append(task)
+
+ task = dict(command=remote_path)
+ task['name'] = 'command generated from JJB'
+ task['environment'] = "{{ zuul.environment }}"
+ task['args'] = dict(chdir=parameters['WORKSPACE'])
+ tasks.append(task)
+
+ filetask = dict(path=remote_path,
+ state='absent')
+ task = dict(file=filetask)
+ tasks.append(task)
+
+ return tasks
+
+ def _transformPublishers(self, jjb_job):
+ early_publishers = []
+ late_publishers = []
+ old_publishers = jjb_job.get('publishers', [])
+ for publisher in old_publishers:
+ early_scpfiles = []
+ late_scpfiles = []
+ if 'scp' not in publisher:
+ early_publishers.append(publisher)
+ continue
+ copy_console = False
+ for scpfile in publisher['scp']['files']:
+ if scpfile.get('copy-console'):
+ scpfile['keep-hierarchy'] = True
+ late_scpfiles.append(scpfile)
+ copy_console = True
+ else:
+ early_scpfiles.append(scpfile)
+ publisher['scp']['files'] = early_scpfiles + late_scpfiles
+ if copy_console:
+ late_publishers.append(publisher)
+ else:
+ early_publishers.append(publisher)
+ publishers = early_publishers + late_publishers
+ if old_publishers != publishers:
+ self.log.debug("Transformed job publishers")
+ return early_publishers, late_publishers
+
+ def prepareAnsibleFiles(self, jobdir, gearman_job, args):
+ job_name = gearman_job.name.split(':')[1]
+ jjb_job = self.jobs[job_name]
+
+ parameters = args.copy()
+ parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
+
+ with open(jobdir.inventory, 'w') as inventory:
+ for host_name, host_vars in self.getHostList():
+ inventory.write(host_name)
+ for k, v in host_vars.items():
+ inventory.write(' %s=%s' % (k, v))
+ inventory.write('\n')
+
+ timeout = None
+ timeout_var = None
+ for wrapper in jjb_job.get('wrappers', []):
+ if isinstance(wrapper, dict):
+ build_timeout = wrapper.get('timeout')
+ if isinstance(build_timeout, dict):
+ timeout_var = build_timeout.get('timeout-var')
+ timeout = build_timeout.get('timeout')
+ if timeout is not None:
+ timeout = int(timeout) * 60
+ if not timeout:
+ timeout = ANSIBLE_DEFAULT_TIMEOUT
+ if timeout_var:
+ parameters[timeout_var] = str(timeout * 1000)
+
+ with open(jobdir.vars, 'w') as vars_yaml:
+ variables = dict(
+ timeout=timeout,
+ environment=parameters,
+ )
+ zuul_vars = dict(zuul=variables)
+ vars_yaml.write(
+ yaml.safe_dump(zuul_vars, default_flow_style=False))
+
+ with open(jobdir.pre_playbook, 'w') as pre_playbook:
+
+ shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
+ jobdir.known_hosts)
+ tasks = []
+ tasks.append(dict(shell=shellargs, delegate_to='127.0.0.1'))
+
+ task = dict(file=dict(path='/tmp/console.html', state='absent'))
+ tasks.append(task)
+
+ task = dict(zuul_console=dict(path='/tmp/console.html',
+ port=19885))
+ tasks.append(task)
+
+ task = dict(file=dict(path=parameters['WORKSPACE'],
+ state='directory'))
+ tasks.append(task)
+
+ msg = [
+ "Launched by %s" % self.manager_name,
+ "Building remotely on %s in workspace %s" % (
+ self.name, parameters['WORKSPACE'])]
+ task = dict(zuul_log=dict(msg=msg))
+ tasks.append(task)
+
+ play = dict(hosts='node', name='Job setup', tasks=tasks)
+ pre_playbook.write(
+ yaml.safe_dump([play], default_flow_style=False))
+
+ with open(jobdir.playbook, 'w') as playbook:
+ tasks = []
+
+ sequence = 0
+ for builder in jjb_job.get('builders', []):
+ if 'shell' in builder:
+ sequence += 1
+ tasks.extend(
+ self._makeBuilderTask(jobdir, builder, parameters,
+ sequence))
+
+ play = dict(hosts='node', name='Job body', tasks=tasks)
+ playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+ early_publishers, late_publishers = self._transformPublishers(jjb_job)
+
+ with open(jobdir.post_playbook, 'w') as playbook:
+ blocks = []
+ for publishers in [early_publishers, late_publishers]:
+ block = []
+ for publisher in publishers:
+ if 'scp' in publisher:
+ block.extend(self._makeSCPTask(jobdir, publisher,
+ parameters))
+ if 'ftp' in publisher:
+ block.extend(self._makeFTPTask(jobdir, publisher,
+ parameters))
+ if 'afs' in publisher:
+ block.extend(self._makeAFSTask(jobdir, publisher,
+ parameters))
+ blocks.append(block)
+
+ # The 'always' section contains the log publishing tasks,
+ # the 'block' contains all the other publishers. This way
+ # we run the log publisher regardless of whether the rest
+ # of the publishers succeed.
+ tasks = []
+
+ task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"),
+ when='success|bool')
+ blocks[0].insert(0, task)
+ task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"),
+ when='not success|bool and not timedout|bool')
+ blocks[0].insert(0, task)
+ task = dict(zuul_log=dict(msg="Job timed out, result: FAILURE"),
+ when='not success|bool and timedout|bool')
+ blocks[0].insert(0, task)
+
+ tasks.append(dict(block=blocks[0],
+ always=blocks[1]))
+
+ play = dict(hosts='node', name='Publishers',
+ tasks=tasks)
+ playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+ self._writeAnsibleConfig(jobdir, jobdir.config,
+ library=self.library_dir)
+ self._writeAnsibleConfig(jobdir, jobdir.pre_post_config,
+ library=self.pre_post_library_dir)
+
+ return timeout
+
+ def _writeAnsibleConfig(self, jobdir, fn, library):
+ with open(fn, 'w') as config:
+ config.write('[defaults]\n')
+ config.write('hostfile = %s\n' % jobdir.inventory)
+ config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
+ config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
+ config.write('private_key_file = %s\n' % self.private_key_file)
+ config.write('retry_files_enabled = False\n')
+ config.write('log_path = %s\n' % jobdir.ansible_log)
+ config.write('gathering = explicit\n')
+ config.write('library = %s\n' % library)
+ # TODO(mordred) This can be removed once we're using ansible 2.2
+ config.write('module_set_locale = False\n')
+ # bump the timeout because busy nodes may take more than
+ # 10s to respond
+ config.write('timeout = 30\n')
+
+ config.write('[ssh_connection]\n')
+ # NB: when setting pipelining = True, keep_remote_files
+ # must be False (the default). Otherwise it apparently
+ # will override the pipelining option and effectively
+ # disable it. Pipelining has a side effect of running the
+ # command without a tty (ie, without the -tt argument to
+ # ssh). We require this behavior so that if a job runs a
+ # command which expects interactive input on a tty (such
+ # as sudo) it does not hang.
+ config.write('pipelining = True\n')
+ ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+ "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+ config.write('ssh_args = %s\n' % ssh_args)
+
+ def _ansibleTimeout(self, proc, msg):
+ self._watchdog_timeout = True
+ self.log.warning(msg)
+ self.abortRunningProc(proc)
+
+ def runAnsiblePrePlaybook(self, jobdir):
+ # Set LOGNAME env variable so Ansible log_path log reports
+ # the correct user.
+ env_copy = os.environ.copy()
+ env_copy['LOGNAME'] = 'zuul'
+ env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
+
+ if self.options['verbose']:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible-playbook', jobdir.pre_playbook,
+ '-e@%s' % jobdir.vars, verbose]
+ self.log.debug("Ansible pre command: %s" % (cmd,))
+
+ self.ansible_pre_proc = subprocess.Popen(
+ cmd,
+ cwd=jobdir.ansible_root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ preexec_fn=os.setsid,
+ env=env_copy,
+ )
+ ret = None
+ watchdog = Watchdog(ANSIBLE_DEFAULT_PRE_TIMEOUT,
+ self._ansibleTimeout,
+ (self.ansible_pre_proc,
+ "Ansible pre timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_pre_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible pre output: %s" % (line,))
+ ret = self.ansible_pre_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible pre exit code: %s" % (ret,))
+ self.ansible_pre_proc = None
+ return ret == 0
+
+ def runAnsiblePlaybook(self, jobdir, timeout):
+ # Set LOGNAME env variable so Ansible log_path log reports
+ # the correct user.
+ env_copy = os.environ.copy()
+ env_copy['LOGNAME'] = 'zuul'
+ env_copy['ANSIBLE_CONFIG'] = jobdir.config
+
+ if self.options['verbose']:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible-playbook', jobdir.playbook, verbose,
+ '-e@%s' % jobdir.vars]
+ self.log.debug("Ansible command: %s" % (cmd,))
+
+ self.ansible_job_proc = subprocess.Popen(
+ cmd,
+ cwd=jobdir.ansible_root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ preexec_fn=os.setsid,
+ env=env_copy,
+ )
+ ret = None
+ watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+ self._ansibleTimeout,
+ (self.ansible_job_proc,
+ "Ansible timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_job_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible output: %s" % (line,))
+ ret = self.ansible_job_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible exit code: %s" % (ret,))
+ self.ansible_job_proc = None
+ if self._watchdog_timeout:
+ return False
+ if ret == 3:
+ # AnsibleHostUnreachable: We had a network issue connecting to
+ # our zuul-worker.
+ return None
+ elif ret == -9:
+ # Received abort request.
+ return None
+ return ret == 0
+
+ def runAnsiblePostPlaybook(self, jobdir, success):
+ # Set LOGNAME env variable so Ansible log_path log reports
+ # the correct user.
+ env_copy = os.environ.copy()
+ env_copy['LOGNAME'] = 'zuul'
+ env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
+
+ if self.options['verbose']:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible-playbook', jobdir.post_playbook,
+ '-e', 'success=%s' % success,
+ '-e', 'timedout=%s' % self._watchdog_timeout,
+ '-e@%s' % jobdir.vars,
+ verbose]
+ self.log.debug("Ansible post command: %s" % (cmd,))
+
+ self.ansible_post_proc = subprocess.Popen(
+ cmd,
+ cwd=jobdir.ansible_root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ preexec_fn=os.setsid,
+ env=env_copy,
+ )
+ ret = None
+ watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+ self._ansibleTimeout,
+ (self.ansible_post_proc,
+ "Ansible post timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_post_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible post output: %s" % (line,))
+ ret = self.ansible_post_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible post exit code: %s" % (ret,))
+ self.ansible_post_proc = None
+ return ret == 0
+
+
+class JJB(jenkins_jobs.builder.Builder):
+ def __init__(self):
+ self.global_config = None
+ self._plugins_list = []
+
+ def expandComponent(self, component_type, component, template_data):
+ component_list_type = component_type + 's'
+ new_components = []
+ if isinstance(component, dict):
+ name, component_data = next(iter(component.items()))
+ if template_data:
+ component_data = jenkins_jobs.formatter.deep_format(
+ component_data, template_data, True)
+ else:
+ name = component
+ component_data = {}
+
+ new_component = self.parser.data.get(component_type, {}).get(name)
+ if new_component:
+ for new_sub_component in new_component[component_list_type]:
+ new_components.extend(
+ self.expandComponent(component_type,
+ new_sub_component, component_data))
+ else:
+ new_components.append({name: component_data})
+ return new_components
+
+ def expandMacros(self, job):
+ for component_type in ['builder', 'publisher', 'wrapper']:
+ component_list_type = component_type + 's'
+ new_components = []
+ for new_component in job.get(component_list_type, []):
+ new_components.extend(self.expandComponent(component_type,
+ new_component, {}))
+ job[component_list_type] = new_components
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 69fb71b..2840ba6 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
import json
import logging
import os
+import six
import time
import threading
from uuid import uuid4
@@ -164,6 +165,11 @@
port = config.get('gearman', 'port')
else:
port = 4730
+ if config.has_option('gearman', 'check_job_registration'):
+ self.job_registration = config.getboolean(
+ 'gearman', 'check_job_registration')
+ else:
+ self.job_registration = True
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port)
@@ -224,6 +230,19 @@
# NOTE(jhesketh): The params need to stay in a key=value data pair
# as workers cannot necessarily handle lists.
+ if callable(job.parameter_function):
+ pargs = inspect.getargspec(job.parameter_function)
+ if len(pargs.args) == 2:
+ job.parameter_function(item, params)
+ else:
+ job.parameter_function(item, job, params)
+ self.log.debug("Custom parameter function used for job %s, "
+ "change: %s, params: %s" % (job, item.change,
+ params))
+
+ # NOTE(mmedvede): Swift parameter creation should remain after the call
+ # to job.parameter_function to make it possible to update LOG_PATH for
+ # swift upload url using parameter_function mechanism.
if job.swift and self.swift.connection:
for name, s in job.swift.items():
@@ -231,7 +250,7 @@
s_config = {}
s_config.update((k, v.format(item=item, job=job,
change=item.change))
- if isinstance(v, basestring)
+ if isinstance(v, six.string_types)
else (k, v)
for k, v in s.items())
@@ -254,16 +273,6 @@
for key, value in swift_instructions.items():
params['_'.join(['SWIFT', name, key])] = value
- if callable(job.parameter_function):
- pargs = inspect.getargspec(job.parameter_function)
- if len(pargs.args) == 2:
- job.parameter_function(item, params)
- else:
- job.parameter_function(item, job, params)
- self.log.debug("Custom parameter function used for job %s, "
- "change: %s, params: %s" % (job, item.change,
- params))
-
def launch(self, job, item, pipeline, dependent_items=[]):
uuid = str(uuid4().hex)
self.log.info(
@@ -351,12 +360,19 @@
build.__gearman_job = gearman_job
self.builds[uuid] = build
- if not self.isJobRegistered(gearman_job.name):
+ if self.job_registration and not self.isJobRegistered(
+ gearman_job.name):
self.log.error("Job %s is not registered with Gearman" %
gearman_job)
self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
return build
+ # NOTE(pabelanger): Rather then looping forever, check to see if job
+ # has passed attempts limit.
+ if item.current_build_set.getTries(job.name) > job.attempts:
+ self.onBuildCompleted(gearman_job, 'RETRY_LIMIT')
+ return build
+
if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
precedence = gear.PRECEDENCE_NORMAL
elif pipeline.precedence == zuul.model.PRECEDENCE_HIGH:
@@ -456,9 +472,6 @@
build.number = data.get('number')
build.__gearman_manager = data.get('manager')
self.sched.onBuildStarted(build)
-
- if job.denominator:
- build.estimated_time = float(job.denominator) / 1000
else:
self.log.error("Unable to find build %s" % job.unique)
@@ -505,7 +518,7 @@
# us where the job is running.
return False
- if not self.isJobRegistered(name):
+ if self.job_registration and not self.isJobRegistered(name):
return False
desc_uuid = str(uuid4().hex)
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index ba96ab7..91e15d1 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -103,6 +103,9 @@
'success-pattern': str,
'hold-following-changes': bool,
'voting': bool,
+ 'attempts': int,
+ 'mutex': str,
+ 'tags': toList(str),
'parameter-function': str,
'branch': toList(str),
'files': toList(str),
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
import os
import re
+import six
+
+
OrderedDict = extras.try_imports(['collections.OrderedDict',
'ordereddict.OrderedDict'])
@@ -59,17 +62,17 @@
raise Exception("Expansion error. Check error messages above")
self.log.info("Mapping projects to workspace...")
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
dest = os.path.normpath(os.path.join(workspace, dest[0]))
ret[project] = dest
self.log.info(" %s -> %s", project, dest)
self.log.debug("Checking overlap in destination directories...")
check = defaultdict(list)
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
check[dest].append(project)
- dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+ dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
if dupes:
raise Exception("Some projects share the same destination: %s",
dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index 0ac7f0f..6e50eda 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,7 +19,10 @@
import re
import yaml
+import six
+
from git import GitCommandError
+from zuul import exceptions
from zuul.lib.clonemapper import CloneMapper
from zuul.merger.merger import Repo
@@ -29,7 +32,8 @@
def __init__(self, git_base_url, projects, workspace, zuul_branch,
zuul_ref, zuul_url, branch=None, clone_map_file=None,
- project_branches=None, cache_dir=None):
+ project_branches=None, cache_dir=None, zuul_newrev=None,
+ zuul_project=None):
self.clone_map = []
self.dests = None
@@ -42,7 +46,13 @@
self.zuul_branch = zuul_branch or ''
self.zuul_ref = zuul_ref or ''
self.zuul_url = zuul_url
+ self.zuul_project = zuul_project
+
self.project_branches = project_branches or {}
+ self.project_revisions = {}
+
+ if zuul_newrev and zuul_project:
+ self.project_revisions[zuul_project] = zuul_newrev
if clone_map_file:
self.readCloneMap(clone_map_file)
@@ -62,17 +72,29 @@
dests = mapper.expand(workspace=self.workspace)
self.log.info("Preparing %s repositories", len(dests))
- for project, dest in dests.iteritems():
+ for project, dest in six.iteritems(dests):
self.prepareRepo(project, dest)
self.log.info("Prepared all repositories")
def cloneUpstream(self, project, dest):
# Check for a cached git repo first
git_cache = '%s/%s' % (self.cache_dir, project)
- git_upstream = '%s/%s' % (self.git_url, project)
+
+ # Then, if we are cloning the repo for the zuul_project, then
+ # set its origin to be the zuul merger, as it is guaranteed to
+ # be correct and up to date even if mirrors haven't updated
+ # yet. Otherwise, we can not be sure about the state of the
+ # project, so our best chance to get the most current state is
+ # by setting origin to the git_url.
+ if (self.zuul_url and project == self.zuul_project):
+ git_upstream = '%s/%s' % (self.zuul_url, project)
+ else:
+ git_upstream = '%s/%s' % (self.git_url, project)
+
+ repo_is_cloned = os.path.exists(os.path.join(dest, '.git'))
if (self.cache_dir and
os.path.exists(git_cache) and
- not os.path.exists(dest)):
+ not repo_is_cloned):
# file:// tells git not to hard-link across repos
git_cache = 'file://%s' % git_cache
self.log.info("Creating repo %s from cache %s",
@@ -95,26 +117,50 @@
return repo
- def fetchFromZuul(self, repo, project, ref):
- zuul_remote = '%s/%s' % (self.zuul_url, project)
+ def fetchRef(self, repo, project, ref):
+ # If we are fetching a zuul ref, the only place to get it is
+ # from the zuul merger (and it is guaranteed to be correct).
+ # Otherwise, the only way we can be certain that the ref
+ # (which, since it is not a zuul ref, is a branch or tag) is
+ # correct is in the case that it matches zuul_project. If
+ # neither of those two conditions are met, we are most likely
+ # to get the correct state from the git_url.
+ if (ref.startswith('refs/zuul') or
+ project == self.zuul_project):
+
+ remote = '%s/%s' % (self.zuul_url, project)
+ else:
+ remote = '%s/%s' % (self.git_url, project)
try:
- repo.fetchFrom(zuul_remote, ref)
- self.log.debug("Fetched ref %s from %s", ref, project)
+ repo.fetchFrom(remote, ref)
+ self.log.debug("Fetched ref %s from %s", ref, remote)
return True
- except (ValueError, GitCommandError):
- self.log.debug("Project %s in Zuul does not have ref %s",
- project, ref)
+ except ValueError:
+ self.log.debug("Repo %s does not have ref %s",
+ remote, ref)
+ return False
+ except GitCommandError as error:
+ # Bail out if fetch fails due to infrastructure reasons
+ if error.stderr.startswith('fatal: unable to access'):
+ raise
+ self.log.debug("Repo %s does not have ref %s",
+ remote, ref)
return False
def prepareRepo(self, project, dest):
"""Clone a repository for project at dest and apply a reference
suitable for testing. The reference lookup is attempted in this order:
- 1) Zuul reference for the indicated branch
- 2) Zuul reference for the master branch
- 3) The tip of the indicated branch
- 4) The tip of the master branch
+ 1) The indicated revision for specific project
+ 2) Zuul reference for the indicated branch
+ 3) Zuul reference for the master branch
+ 4) The tip of the indicated branch
+ 5) The tip of the master branch
+
+ If an "indicated revision" is specified for this project, and we are
+ unable to meet this requirement, we stop attempting to check this
+ repo out and raise a zuul.exceptions.RevNotFound exception.
The "indicated branch" is one of the following:
@@ -134,6 +180,10 @@
# `git branch` is happy with.
repo.reset()
+ indicated_revision = None
+ if project in self.project_revisions:
+ indicated_revision = self.project_revisions[project]
+
indicated_branch = self.branch or self.zuul_branch
if project in self.project_branches:
indicated_branch = self.project_branches[project]
@@ -148,8 +198,9 @@
self.log.info("upstream repo has branch %s", indicated_branch)
fallback_branch = indicated_branch
else:
- self.log.info("upstream repo is missing branch %s",
- self.branch)
+ if indicated_branch:
+ self.log.info("upstream repo is missing branch %s",
+ indicated_branch)
# FIXME should be origin HEAD branch which might not be 'master'
fallback_branch = 'master'
@@ -159,13 +210,26 @@
else:
fallback_zuul_ref = None
+ # If the user has requested an explicit revision to be checked out,
+ # we use it above all else, and if we cannot satisfy this requirement
+ # we raise an error and do not attempt to continue.
+ if indicated_revision:
+ self.log.info("Attempting to check out revision %s for "
+ "project %s", indicated_revision, project)
+ try:
+ self.fetchRef(repo, project, self.zuul_ref)
+ commit = repo.checkout(indicated_revision)
+ except (ValueError, GitCommandError):
+ raise exceptions.RevNotFound(project, indicated_revision)
+ self.log.info("Prepared '%s' repo at revision '%s'", project,
+ indicated_revision)
# If we have a non empty zuul_ref to use, use it. Otherwise we fall
# back to checking out the branch.
- if ((override_zuul_ref and
- self.fetchFromZuul(repo, project, override_zuul_ref)) or
- (fallback_zuul_ref and
- fallback_zuul_ref != override_zuul_ref and
- self.fetchFromZuul(repo, project, fallback_zuul_ref))):
+ elif ((override_zuul_ref and
+ self.fetchRef(repo, project, override_zuul_ref)) or
+ (fallback_zuul_ref and
+ fallback_zuul_ref != override_zuul_ref and
+ self.fetchRef(repo, project, fallback_zuul_ref))):
# Work around a bug in GitPython which can not parse FETCH_HEAD
gitcmd = git.Git(dest)
fetch_head = gitcmd.rev_parse('FETCH_HEAD')
diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py
new file mode 100644
index 0000000..1b7fed9
--- /dev/null
+++ b/zuul/lib/commandsocket.py
@@ -0,0 +1,83 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2016 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import socket
+import threading
+import Queue
+
+
+class CommandSocket(object):
+ log = logging.getLogger("zuul.CommandSocket")
+
+ def __init__(self, path):
+ self.running = False
+ self.path = path
+ self.queue = Queue.Queue()
+
+ def start(self):
+ self.running = True
+ if os.path.exists(self.path):
+ os.unlink(self.path)
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.bind(self.path)
+ self.socket.listen(1)
+ self.socket_thread = threading.Thread(target=self._socketListener)
+ self.socket_thread.daemon = True
+ self.socket_thread.start()
+
+ def stop(self):
+ # First, wake up our listener thread with a connection and
+ # tell it to stop running.
+ self.running = False
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(self.path)
+ s.sendall('_stop\n')
+ # The command '_stop' will be ignored by our listener, so
+ # directly inject it into the queue so that consumers of this
+ # class which are waiting in .get() are awakened. They can
+ # either handle '_stop' or just ignore the unknown command and
+ # then check to see if they should continue to run before
+ # re-entering their loop.
+ self.queue.put('_stop')
+ self.socket_thread.join()
+
+ def _socketListener(self):
+ while self.running:
+ try:
+ s, addr = self.socket.accept()
+ self.log.debug("Accepted socket connection %s" % (s,))
+ buf = ''
+ while True:
+ buf += s.recv(1)
+ if buf[-1] == '\n':
+ break
+ buf = buf.strip()
+ self.log.debug("Received %s from socket" % (buf,))
+ s.close()
+ # Because we use '_stop' internally to wake up a
+ # waiting thread, don't allow it to actually be
+ # injected externally.
+ if buf != '_stop':
+ self.queue.put(buf)
+ except Exception:
+ self.log.exception("Exception in socket handler")
+
+ def get(self):
+ if not self.running:
+ raise Exception("CommandSocket.get called while stopped")
+ return self.queue.get()
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+ def handlePacket(self, packet):
+ if packet.ptype == MASS_DO:
+ self.log.info("Received packet from %s: %s" % (packet.connection,
+ packet))
+ self.handleMassDo(packet)
+ else:
+ return super(GearServer, self).handlePacket(packet)
+
+ def handleMassDo(self, packet):
+ packet.connection.functions = set()
+ for name in packet.data.split(b'\x00'):
+ self.log.debug("Adding function %s to %s" % (
+ name, packet.connection))
+ packet.connection.functions.add(name)
+ self.functions.add(name)
diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py
index d9f4b09..5660819 100644
--- a/zuul/lib/swift.py
+++ b/zuul/lib/swift.py
@@ -19,8 +19,8 @@
import os
import random
import six
+from six.moves import urllib
import string
-import urlparse
class Swift(object):
@@ -156,7 +156,7 @@
url = os.path.join(self.storage_url, settings['container'],
settings['file_path_prefix'],
destination_prefix)
- u = urlparse.urlparse(url)
+ u = urllib.parse.urlparse(url)
hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
settings['max_file_size'],
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 950c385..9e8c243 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -97,9 +97,10 @@
data = dict(items=items)
self.submitJob('merger:merge', data, build_set, precedence)
- def updateRepo(self, project, url, build_set,
+ def updateRepo(self, project, connection_name, url, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(project=project,
+ connection_name=connection_name,
url=url)
self.submitJob('merger:update', data, build_set, precedence)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..a974e9c 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -95,8 +95,12 @@
continue
repo.create_head(ref.remote_head, ref, force=True)
- # Reset to remote HEAD (usually origin/master)
- repo.head.reference = origin.refs['HEAD']
+ # try reset to remote HEAD (usually origin/master)
+ # If it fails, pick the first reference
+ try:
+ repo.head.reference = origin.refs['HEAD']
+ except IndexError:
+ repo.head.reference = origin.refs[0]
reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
@@ -182,7 +186,14 @@
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
origin = repo.remotes.origin
- origin.update()
+ if repo.git.version_info[:2] < (1, 9):
+ # Before 1.9, 'git fetch --tags' did not include the
+ # behavior covered by 'git --fetch', so we run both
+ # commands in that case. Starting with 1.9, 'git fetch
+ # --tags' is all that is necessary. See
+ # https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
+ origin.fetch()
+ origin.fetch(tags=True)
class Merger(object):
@@ -210,7 +221,15 @@
fd.write('#!/bin/bash\n')
fd.write('ssh -i %s $@\n' % key)
fd.close()
- os.chmod(name, 0755)
+ os.chmod(name, 0o755)
+
+ def _setGitSsh(self, connection_name):
+ wrapper_name = '.ssh_wrapper_%s' % connection_name
+ name = os.path.join(self.working_root, wrapper_name)
+ if os.path.isfile(name):
+ os.environ['GIT_SSH'] = name
+ elif 'GIT_SSH' in os.environ:
+ del os.environ['GIT_SSH']
def addProject(self, project, url):
repo = None
@@ -231,11 +250,12 @@
" without a url" % (project,))
return self.addProject(project, url)
- def updateRepo(self, project, url):
+ def updateRepo(self, project, connection_name, url):
+ self._setGitSsh(connection_name)
repo = self.getRepo(project, url)
try:
self.log.info("Updating local repository %s", project)
- repo.update()
+ repo.reset()
except Exception:
self.log.exception("Unable to update %s", project)
@@ -268,14 +288,6 @@
return commit
- def _setGitSsh(self, connection_name):
- wrapper_name = '.ssh_wrapper_%s' % connection_name
- name = os.path.join(self.working_root, wrapper_name)
- if os.path.isfile(name):
- os.environ['GIT_SSH'] = name
- elif 'GIT_SSH' in os.environ:
- del os.environ['GIT_SSH']
-
def _mergeItem(self, item, recent):
self.log.debug("Processing refspec %s for project %s / %s ref %s" %
(item['refspec'], item['project'], item['branch'],
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..b1921d9 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
import gear
-import merger
+from zuul.merger import merger
class MergeServer(object):
@@ -109,7 +109,9 @@
def update(self, job):
args = json.loads(job.arguments)
- self.merger.updateRepo(args['project'], args['url'])
+ self.merger.updateRepo(args['project'],
+ args['connection_name'],
+ args['url'])
result = dict(updated=True,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index 54f776c..b24a06b 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
# under the License.
import copy
+import os
import re
+import struct
import time
from uuid import uuid4
import extras
@@ -108,7 +110,11 @@
return job_tree
def getProjects(self):
- return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+ # cmp is not in python3, applied idiom from
+ # http://python-future.org/compatible_idioms.html#cmp
+ return sorted(
+ self.job_trees.keys(),
+ key=lambda p: p.name)
def addQueue(self, queue):
self.queues.append(queue)
@@ -134,7 +140,7 @@
return []
return item.change.filterJobs(tree.getJobs())
- def _findJobsToRun(self, job_trees, item):
+ def _findJobsToRun(self, job_trees, item, mutex):
torun = []
if item.item_ahead:
# Only run jobs if any 'hold' jobs on the change ahead
@@ -153,20 +159,23 @@
else:
# There is no build for the root of this job tree,
# so we should run it.
- torun.append(job)
+ if mutex.acquire(item, job):
+ # If this job needs a mutex, either acquire it or make
+ # sure that we have it before running the job.
+ torun.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
- torun.extend(self._findJobsToRun(tree.job_trees, item))
+ torun.extend(self._findJobsToRun(tree.job_trees, item, mutex))
return torun
- def findJobsToRun(self, item):
+ def findJobsToRun(self, item, mutex):
if not item.live:
return []
tree = self.getJobTree(item.change.project)
if not tree:
return []
- return self._findJobsToRun(tree.job_trees, item)
+ return self._findJobsToRun(tree.job_trees, item, mutex)
def haveAllJobsStarted(self, item):
for job in self.getJobs(item):
@@ -263,7 +272,7 @@
items.extend(shared_queue.queue)
return items
- def formatStatusJSON(self):
+ def formatStatusJSON(self, url_pattern=None):
j_pipeline = dict(name=self.name,
description=self.description)
j_queues = []
@@ -280,7 +289,7 @@
if j_changes:
j_queue['heads'].append(j_changes)
j_changes = []
- j_changes.append(e.formatJSON())
+ j_changes.append(e.formatJSON(url_pattern))
if (len(j_changes) > 1 and
(j_changes[-2]['remaining_time'] is not None) and
(j_changes[-1]['remaining_time'] is not None)):
@@ -412,7 +421,7 @@
elif self.window_decrease_type == 'exponential':
self.window = max(
self.window_floor,
- self.window / self.window_decrease_factor)
+ int(self.window / self.window_decrease_factor))
class Project(object):
@@ -441,6 +450,8 @@
self.failure_pattern = None
self.success_pattern = None
self.parameter_function = None
+ self.tags = set()
+ self.mutex = None
# A metajob should only supply values for attributes that have
# been explicitly provided, so avoid setting boolean defaults.
if self.is_metajob:
@@ -455,6 +466,8 @@
self._files = []
self.skip_if_matcher = None
self.swift = {}
+ # Number of attempts to launch a job before giving up.
+ self.attempts = 3
def __str__(self):
return self.name
@@ -487,6 +500,13 @@
self.skip_if_matcher = other.skip_if_matcher.copy()
if other.swift:
self.swift.update(other.swift)
+ if other.mutex:
+ self.mutex = other.mutex
+ # Tags are merged via a union rather than a destructive copy
+ # because they are intended to accumulate as metajobs are
+ # applied.
+ if other.tags:
+ self.tags = self.tags.union(other.tags)
# Only non-None values should be copied for boolean attributes.
if other.hold_following_changes is not None:
self.hold_following_changes = other.hold_following_changes
@@ -628,6 +648,7 @@
self.unable_to_merge = False
self.failing_reasons = []
self.merge_state = self.NEW
+ self.tries = {}
def __repr__(self):
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
@@ -653,9 +674,12 @@
def addBuild(self, build):
self.builds[build.job.name] = build
+ if build.job.name not in self.tries:
+ self.tries[build.job.name] = 1
build.build_set = self
def removeBuild(self, build):
+ self.tries[build.job.name] += 1
del self.builds[build.job.name]
def getBuild(self, job_name):
@@ -666,6 +690,9 @@
keys.sort()
return [self.builds.get(x) for x in keys]
+ def getTries(self, job_name):
+ return self.tries.get(job_name)
+
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
@@ -712,7 +739,34 @@
def setReportedResult(self, result):
self.current_build_set.result = result
- def formatJSON(self):
+ def formatJobResult(self, job, url_pattern=None):
+ build = self.current_build_set.getBuild(job.name)
+ result = build.result
+ pattern = url_pattern
+ if result == 'SUCCESS':
+ if job.success_message:
+ result = job.success_message
+ if job.success_pattern:
+ pattern = job.success_pattern
+ elif result == 'FAILURE':
+ if job.failure_message:
+ result = job.failure_message
+ if job.failure_pattern:
+ pattern = job.failure_pattern
+ url = None
+ if pattern:
+ try:
+ url = pattern.format(change=self.change,
+ pipeline=self.pipeline,
+ job=job,
+ build=build)
+ except Exception:
+ pass # FIXME: log this or something?
+ if not url:
+ url = build.url or job.name
+ return (result, url)
+
+ def formatJSON(self, url_pattern=None):
changeish = self.change
ret = {}
ret['active'] = self.active
@@ -749,11 +803,13 @@
elapsed = None
remaining = None
result = None
- url = None
+ build_url = None
+ report_url = None
worker = None
if build:
result = build.result
- url = build.url
+ build_url = build.url
+ (unused, report_url) = self.formatJobResult(job, url_pattern)
if build.start_time:
if build.end_time:
elapsed = int((build.end_time -
@@ -781,7 +837,8 @@
'name': job.name,
'elapsed_time': elapsed,
'remaining_time': remaining,
- 'url': url,
+ 'url': build_url,
+ 'report_url': report_url,
'result': result,
'voting': job.voting,
'uuid': build.uuid if build else None,
@@ -1005,9 +1062,6 @@
# an admin command, etc):
self.forced_pipeline = None
- # Internal mechanism to track if the change needs a refresh from cache
- self._needs_refresh = False
-
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
@@ -1034,7 +1088,7 @@
for a in approvals:
for k, v in a.items():
if k == 'username':
- pass
+ a['username'] = re.compile(v)
elif k in ['email', 'email-filter']:
a['email'] = re.compile(v)
elif k == 'newer-than':
@@ -1053,7 +1107,7 @@
by = approval.get('by', {})
for k, v in rapproval.items():
if k == 'username':
- if (by.get('username', '') != v):
+ if (not v.search(by.get('username', ''))):
return False
elif k == 'email':
if (not v.search(by.get('email', ''))):
@@ -1341,3 +1395,78 @@
job.copy(metajob)
self.jobs[name] = job
return job
+
+
+class JobTimeData(object):
+ format = 'B10H10H10B'
+ version = 0
+
+ def __init__(self, path):
+ self.path = path
+ self.success_times = [0 for x in range(10)]
+ self.failure_times = [0 for x in range(10)]
+ self.results = [0 for x in range(10)]
+
+ def load(self):
+ if not os.path.exists(self.path):
+ return
+ with open(self.path) as f:
+ data = struct.unpack(self.format, f.read())
+ version = data[0]
+ if version != self.version:
+ raise Exception("Unkown data version")
+ self.success_times = list(data[1:11])
+ self.failure_times = list(data[11:21])
+ self.results = list(data[21:32])
+
+ def save(self):
+ tmpfile = self.path + '.tmp'
+ data = [self.version]
+ data.extend(self.success_times)
+ data.extend(self.failure_times)
+ data.extend(self.results)
+ data = struct.pack(self.format, *data)
+ with open(tmpfile, 'w') as f:
+ f.write(data)
+ os.rename(tmpfile, self.path)
+
+ def add(self, elapsed, result):
+ elapsed = int(elapsed)
+ if result == 'SUCCESS':
+ self.success_times.append(elapsed)
+ self.success_times.pop(0)
+ result = 0
+ else:
+ self.failure_times.append(elapsed)
+ self.failure_times.pop(0)
+ result = 1
+ self.results.append(result)
+ self.results.pop(0)
+
+ def getEstimatedTime(self):
+ times = [x for x in self.success_times if x]
+ if times:
+ return float(sum(times)) / len(times)
+ return 0.0
+
+
+class TimeDataBase(object):
+ def __init__(self, root):
+ self.root = root
+ self.jobs = {}
+
+ def _getTD(self, name):
+ td = self.jobs.get(name)
+ if not td:
+ td = JobTimeData(os.path.join(self.root, name))
+ self.jobs[name] = td
+ td.load()
+ return td
+
+ def getEstimatedTime(self, name):
+ return self._getTD(name).getEstimatedTime()
+
+ def update(self, name, elapsed, result):
+ td = self._getTD(name)
+ td.add(elapsed, result)
+ td.save()
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index e29f9a7..0c9a8d8 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -13,6 +13,7 @@
# under the License.
import abc
+import logging
import six
@@ -24,6 +25,8 @@
Defines the exact public methods that must be supplied.
"""
+ log = logging.getLogger("zuul.reporter.BaseReporter")
+
def __init__(self, reporter_config={}, sched=None, connection=None):
self.reporter_config = reporter_config
self.sched = sched
@@ -84,6 +87,8 @@
def _formatItemReportFailure(self, pipeline, item):
if item.dequeued_needing_change:
msg = 'This change depends on a change that failed to merge.\n'
+ elif not pipeline.didMergerSucceed(item):
+ msg = pipeline.merge_failure_message
else:
msg = (pipeline.failure_message + '\n\n' +
self._formatItemReportJobs(pipeline, item))
@@ -111,25 +116,7 @@
for job in pipeline.getJobs(item):
build = item.current_build_set.getBuild(job.name)
- result = build.result
- pattern = url_pattern
- if result == 'SUCCESS':
- if job.success_message:
- result = job.success_message
- if job.success_pattern:
- pattern = job.success_pattern
- elif result == 'FAILURE':
- if job.failure_message:
- result = job.failure_message
- if job.failure_pattern:
- pattern = job.failure_pattern
- if pattern:
- url = pattern.format(change=item.change,
- pipeline=pipeline,
- job=job,
- build=build)
- else:
- url = build.url or job.name
+ (result, url) = item.formatJobResult(job, url_pattern)
if not job.voting:
voting = ' (non-voting)'
else:
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d54da9f..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
import gear
import six
-import model
+from zuul import model
class RPCListener(object):
@@ -40,11 +40,11 @@
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
+ self.worker.waitForServer()
+ self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
- self.worker.waitForServer()
- self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
@@ -66,8 +66,8 @@
while self._running:
try:
job = self.worker.getJob()
- z, jobname = job.name.split(':')
self.log.debug("Received job %s" % job.name)
+ z, jobname = job.name.split(':')
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index f8321d1..161bfc3 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,6 +20,7 @@
import logging
import os
import pickle
+import six
from six.moves import queue as Queue
import re
import sys
@@ -27,10 +28,10 @@
import time
import yaml
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
from zuul import change_matcher, exceptions
from zuul import version as zuul_version
@@ -59,16 +60,76 @@
return ret
+class MutexHandler(object):
+ log = logging.getLogger("zuul.MutexHandler")
+
+ def __init__(self):
+ self.mutexes = {}
+
+ def acquire(self, item, job):
+ if not job.mutex:
+ return True
+ mutex_name = job.mutex
+ m = self.mutexes.get(mutex_name)
+ if not m:
+ # The mutex is not held, acquire it
+ self._acquire(mutex_name, item, job.name)
+ return True
+ held_item, held_job_name = m
+ if held_item is item and held_job_name == job.name:
+ # This item already holds the mutex
+ return True
+ held_build = held_item.current_build_set.getBuild(held_job_name)
+ if held_build and held_build.result:
+ # The build that held the mutex is complete, release it
+ # and let the new item have it.
+ self.log.error("Held mutex %s being released because "
+ "the build that holds it is complete" %
+ (mutex_name,))
+ self._release(mutex_name, item, job.name)
+ self._acquire(mutex_name, item, job.name)
+ return True
+ return False
+
+ def release(self, item, job):
+ if not job.mutex:
+ return
+ mutex_name = job.mutex
+ m = self.mutexes.get(mutex_name)
+ if not m:
+ # The mutex is not held, nothing to do
+ self.log.error("Mutex can not be released for %s "
+ "because the mutex is not held" %
+ (item,))
+ return
+ held_item, held_job_name = m
+ if held_item is item and held_job_name == job.name:
+ # This item holds the mutex
+ self._release(mutex_name, item, job.name)
+ return
+ self.log.error("Mutex can not be released for %s "
+ "which does not hold it" %
+ (item,))
+
+ def _acquire(self, mutex_name, item, job_name):
+ self.log.debug("Job %s of item %s acquiring mutex %s" %
+ (job_name, item, mutex_name))
+ self.mutexes[mutex_name] = (item, job_name)
+
+ def _release(self, mutex_name, item, job_name):
+ self.log.debug("Job %s of item %s releasing mutex %s" %
+ (job_name, item, mutex_name))
+ del self.mutexes[mutex_name]
+
+
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
- self._exception = None
- self._traceback = None
+ self._exc_info = None
- def exception(self, e, tb):
- self._exception = e
- self._traceback = tb
+ def exception(self, exc_info):
+ self._exc_info = exc_info
self._wait_event.set()
def done(self):
@@ -76,8 +137,8 @@
def wait(self, timeout=None):
self._wait_event.wait(timeout)
- if self._exception:
- raise self._exception, None, self._traceback
+ if self._exc_info:
+ six.reraise(*self._exc_info)
return self._wait_event.is_set()
@@ -174,7 +235,7 @@
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
- def __init__(self, config):
+ def __init__(self, config, testonly=False):
threading.Thread.__init__(self)
self.daemon = True
self.wake_event = threading.Event()
@@ -185,6 +246,7 @@
self._stopped = False
self.launcher = None
self.merger = None
+ self.mutex = MutexHandler()
self.connections = dict()
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
@@ -199,6 +261,10 @@
self.management_event_queue = Queue.Queue()
self.layout = model.Layout()
+ if not testonly:
+ time_dir = self._get_time_database_dir()
+ self.time_database = model.TimeDataBase(time_dir)
+
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
@@ -250,11 +316,14 @@
# Any skip-if predicate can be matched to trigger a skip
return cm.MatchAny(skip_matchers)
- def registerConnections(self, connections):
+ def registerConnections(self, connections, load=True):
+ # load: whether or not to trigger the onLoad for the connection. This
+ # is useful for not doing a full load during layout validation.
self.connections = connections
for connection_name, connection in self.connections.items():
connection.registerScheduler(self)
- connection.onLoad()
+ if load:
+ connection.onLoad()
def stopConnections(self):
for connection_name, connection in self.connections.items():
@@ -263,6 +332,7 @@
def _unloadDrivers(self):
for trigger in self.triggers.values():
trigger.stop()
+ self.triggers = {}
for pipeline in self.layout.pipelines.values():
pipeline.source.stop()
for action in self._reporter_actions.values():
@@ -307,6 +377,9 @@
driver_config, self, connection
)
+ if connection:
+ connection.registerUse(dtype, driver_instance)
+
return driver_instance
def _getSourceDriver(self, connection_name):
@@ -341,7 +414,9 @@
base = os.path.dirname(os.path.realpath(config_path))
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
- execfile(fn, config_env)
+ with open(fn) as _f:
+ code = compile(_f.read(), fn, 'exec')
+ six.exec_(code, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
@@ -454,9 +529,20 @@
m = config_job.get('hold-following-changes', False)
if m:
job.hold_following_changes = True
+ job.attempts = config_job.get('attempts', 3)
m = config_job.get('voting', None)
if m is not None:
job.voting = m
+ m = config_job.get('mutex', None)
+ if m is not None:
+ job.mutex = m
+ tags = toList(config_job.get('tags'))
+ if tags:
+ # Tags are merged via a union rather than a
+ # destructive copy because they are intended to
+ # accumulate onto any previously applied tags from
+ # metajobs.
+ job.tags = job.tags.union(set(tags))
fname = config_job.get('parameter-function', None)
if fname:
func = config_env.get(fname, None)
@@ -540,12 +626,12 @@
def setMerger(self, merger):
self.merger = merger
- def getProject(self, name, create_foreign=False):
+ def getProject(self, name):
self.layout_lock.acquire()
p = None
try:
p = self.layout.projects.get(name)
- if p is None and create_foreign:
+ if p is None:
self.log.info("Registering foreign project: %s" % name)
p = Project(name, foreign=True)
self.layout.projects[name] = p
@@ -663,6 +749,17 @@
state_dir = '/var/lib/zuul'
return os.path.join(state_dir, 'queue.pickle')
+ def _get_time_database_dir(self):
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(self.config.get('zuul',
+ 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ d = os.path.join(state_dir, 'times')
+ if not os.path.exists(d):
+ os.mkdir(d)
+ return d
+
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
@@ -771,7 +868,7 @@
"Exception while canceling build %s "
"for change %s" % (build, item.change))
self.layout = layout
- self.maintainTriggerCache()
+ self.maintainConnectionCache()
for trigger in self.triggers.values():
trigger.postConfig()
for pipeline in self.layout.pipelines.values():
@@ -901,16 +998,18 @@
finally:
self.run_handler_lock.release()
- def maintainTriggerCache(self):
+ def maintainConnectionCache(self):
relevant = set()
for pipeline in self.layout.pipelines.values():
- self.log.debug("Start maintain trigger cache for: %s" % pipeline)
+ self.log.debug("Gather relevant cache items for: %s" % pipeline)
for item in pipeline.getAllItems():
relevant.add(item.change)
relevant.update(item.change.getRelatedChanges())
- pipeline.source.maintainCache(relevant)
- self.log.debug("End maintain trigger cache for: %s" % pipeline)
- self.log.debug("Trigger cache size: %s" % len(relevant))
+ for connection in self.connections.values():
+ connection.maintainCache(relevant)
+ self.log.debug(
+ "End maintain connection cache for: %s" % connection)
+ self.log.debug("Connection cache size: %s" % len(relevant))
def process_event_queue(self):
self.log.debug("Fetching trigger event")
@@ -959,8 +1058,8 @@
else:
self.log.error("Unable to handle event %s" % event)
event.done()
- except Exception as e:
- event.exception(e, sys.exc_info()[2])
+ except Exception:
+ event.exception(sys.exc_info())
self.management_event_queue.task_done()
def process_result_queue(self):
@@ -990,6 +1089,11 @@
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
+ try:
+ build.estimated_time = float(self.time_database.getEstimatedTime(
+ build.job.name))
+ except Exception:
+ self.log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(event.build)
def _doBuildCompletedEvent(self, event):
@@ -1003,6 +1107,13 @@
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
+ if build.end_time and build.start_time and build.result:
+ duration = build.end_time - build.start_time
+ try:
+ self.time_database.update(
+ build.job.name, duration, build.result)
+ except Exception:
+ self.log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):
@@ -1018,6 +1129,11 @@
pipeline.manager.onMergeCompleted(event)
def formatStatusJSON(self):
+ if self.config.has_option('zuul', 'url_pattern'):
+ url_pattern = self.config.get('zuul', 'url_pattern')
+ else:
+ url_pattern = None
+
data = {}
data['zuul_version'] = self.zuul_version
@@ -1043,7 +1159,7 @@
pipelines = []
data['pipelines'] = pipelines
for pipeline in self.layout.pipelines.values():
- pipelines.append(pipeline.formatStatusJSON())
+ pipelines.append(pipeline.formatStatusJSON(url_pattern))
return json.dumps(data)
@@ -1082,14 +1198,16 @@
efilters += str(tree.job.skip_if_matcher)
if efilters:
efilters = ' ' + efilters
- hold = ''
+ tags = []
if tree.job.hold_following_changes:
- hold = ' [hold]'
- voting = ''
+ tags.append('[hold]')
if not tree.job.voting:
- voting = ' [nonvoting]'
- self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
- efilters, hold, voting))
+ tags.append('[nonvoting]')
+ if tree.job.mutex:
+ tags.append('[mutex: %s]' % tree.job.mutex)
+ tags = ' '.join(tags)
+ self.log.info("%s%s%s %s" % (istr, repr(tree.job),
+ efilters, tags))
for x in tree.job_trees:
log_jobs(x, indent + 2)
@@ -1369,7 +1487,6 @@
return True
if build_set.merge_state == build_set.PENDING:
return False
- build_set.merge_state = build_set.PENDING
ref = build_set.ref
if hasattr(item.change, 'refspec') and not ref:
self.log.debug("Preparing ref for: %s" % item.change)
@@ -1384,9 +1501,12 @@
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.source.getGitUrl(item.change.project)
+ connection_name = self.pipeline.source.connection.connection_name
self.sched.merger.updateRepo(item.change.project.name,
- url, build_set,
+ connection_name, url, build_set,
self.pipeline.precedence)
+ # merge:merge has been emitted properly:
+ build_set.merge_state = build_set.PENDING
return False
def _launchJobs(self, item, jobs):
@@ -1406,7 +1526,7 @@
"for change %s:" % (job, item.change))
def launchJobs(self, item):
- jobs = self.pipeline.findJobsToRun(item)
+ jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
if jobs:
self._launchJobs(item, jobs)
@@ -1535,13 +1655,23 @@
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
- desc = self.formatDescription(build)
- self.sched.launcher.setBuildDescription(build, desc)
+ try:
+ desc = self.formatDescription(build)
+ self.sched.launcher.setBuildDescription(build, desc)
+ except:
+ # Log the failure and let loop continue
+ self.log.error("Failed to update description for build %s" %
+ (build))
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
- desc = self.formatDescription(build)
- self.sched.launcher.setBuildDescription(build, desc)
+ try:
+ desc = self.formatDescription(build)
+ self.sched.launcher.setBuildDescription(build, desc)
+ except:
+ # Log the failure and let loop continue
+ self.log.error("Failed to update description for "
+ "build %s in previous build set" % (build))
def onBuildStarted(self, build):
self.log.debug("Build %s started" % build)
@@ -1552,6 +1682,7 @@
item = build.build_set.item
self.pipeline.setResult(item, build)
+ self.sched.mutex.release(item, build.job)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
return True
@@ -1564,9 +1695,9 @@
if event.merged:
build_set.commit = event.commit
elif event.updated:
- if not isinstance(item, NullChange):
+ if not isinstance(item.change, NullChange):
build_set.commit = item.change.newrev
- if not build_set.commit:
+ if not build_set.commit and not isinstance(item.change, NullChange):
self.log.info("Unable to merge change %s" % item.change)
self.pipeline.setUnableToMerge(item)
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
index 25fe974..cb4501a 100644
--- a/zuul/source/__init__.py
+++ b/zuul/source/__init__.py
@@ -49,13 +49,6 @@
def canMerge(self, change, allow_needs):
"""Determine if change can merge."""
- def maintainCache(self, relevant):
- """Make cache contain relevant changes.
-
- This lets the user supply a list of change objects that are
- still in use. Anything in our cache that isn't in the supplied
- list should be safe to remove from the cache."""
-
def postConfig(self):
"""Called after configuration has been processed."""
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
index 27e9ae0..828e201 100644
--- a/zuul/source/gerrit.py
+++ b/zuul/source/gerrit.py
@@ -20,6 +20,20 @@
from zuul.source import BaseSource
+# Walk the change dependency tree to find a cycle
+def detect_cycle(change, history=None):
+ if history is None:
+ history = []
+ else:
+ history = history[:]
+ history.append(change.number)
+ for dep in change.needs_changes:
+ if dep.number in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep.number, history))
+ detect_cycle(dep, history)
+
+
class GerritSource(BaseSource):
name = 'gerrit'
log = logging.getLogger("zuul.GerritSource")
@@ -60,6 +74,10 @@
data = self.connection.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
+ if change.is_merged:
+ self.log.debug("Change %s is merged" % (change,))
+ else:
+ self.log.debug("Change %s is not merged" % (change,))
if not head:
return change.is_merged
if not change.is_merged:
@@ -82,7 +100,6 @@
status = data.get('status')
if not status:
return False
- self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
@@ -129,9 +146,6 @@
def getChange(self, event, project):
if event.change_number:
refresh = False
- if event._needs_refresh:
- refresh = True
- event._needs_refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref:
@@ -180,7 +194,7 @@
(record.get('number'),))
return changes
- def _getDependsOnFromCommit(self, message):
+ def _getDependsOnFromCommit(self, message, change):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
@@ -190,17 +204,19 @@
continue
seen.add(match)
query = "change:%s" % (match,)
- self.log.debug("Running query %s to find needed changes" %
- (query,))
+ self.log.debug("Updating %s: Running query %s "
+ "to find needed changes" %
+ (change, query,))
records.extend(self.connection.simpleQuery(query))
return records
- def _getNeededByFromCommit(self, change_id):
+ def _getNeededByFromCommit(self, change_id, change):
records = []
seen = set()
query = 'message:%s' % change_id
- self.log.debug("Running query %s to find changes needed-by" %
- (query,))
+ self.log.debug("Updating %s: Running query %s "
+ "to find changes needed-by" %
+ (change, query,))
results = self.connection.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
@@ -210,15 +226,15 @@
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
- self.log.debug("Found change %s,%s needs %s from commit" %
- (key[0], key[1], change_id))
+ self.log.debug("Updating %s: Found change %s,%s "
+ "needs %s from commit" %
+ (change, key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def _updateChange(self, change, history=None):
- self.log.info("Updating information for %s,%s" %
- (change.number, change.patchset))
+ self.log.info("Updating %s" % (change,))
data = self.connection.query(change.number)
change._data = data
@@ -227,11 +243,7 @@
if 'project' not in data:
raise exceptions.ChangeNotFound(change.number, change.patchset)
- # If updated changed came as a dependent on
- # and its project is not defined,
- # then create a 'foreign' project for it in layout
- change.project = self.sched.getProject(data['project'],
- create_foreign=bool(history))
+ change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
@@ -258,6 +270,7 @@
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
+ self.log.debug("Updating %s: change is merged" % (change,))
return change
if history is None:
@@ -273,21 +286,35 @@
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
- self.log.debug("Getting git-dependent change %s,%s" %
- (dep_num, dep_ps))
+ self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
+ (change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
+ # Because we are not forcing a refresh in _getChange, it
+ # may return without executing this code, so if we are
+ # updating our change to add ourselves to a dependency
+ # cycle, we won't detect it. By explicitly performing a
+ # walk of the dependency tree, we will.
+ detect_cycle(dep, history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
- for record in self._getDependsOnFromCommit(data['commitMessage']):
+ for record in self._getDependsOnFromCommit(data['commitMessage'],
+ change):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
- self.log.debug("Getting commit-dependent change %s,%s" %
- (dep_num, dep_ps))
+ self.log.debug("Updating %s: Getting commit-dependent "
+ "change %s,%s" %
+ (change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
+ # Because we are not forcing a refresh in _getChange, it
+ # may return without executing this code, so if we are
+ # updating our change to add ourselves to a dependency
+ # cycle, we won't detect it. By explicitly performing a
+ # walk of the dependency tree, we will.
+ detect_cycle(dep, history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
@@ -297,15 +324,17 @@
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
+ self.log.debug("Updating %s: Getting git-needed change %s,%s" %
+ (change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
- for record in self._getNeededByFromCommit(data['id']):
+ for record in self._getNeededByFromCommit(data['id'], change):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
- self.log.debug("Getting commit-needed change %s,%s" %
- (dep_num, dep_ps))
+ self.log.debug("Updating %s: Getting commit-needed change %s,%s" %
+ (change, dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
@@ -322,6 +351,3 @@
def _getGitwebUrl(self, project, sha=None):
return self.connection.getGitwebUrl(project, sha)
-
- def maintainCache(self, relevant):
- self.connection.maintainCache(relevant)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index 8d9ac85..f982914 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
-import apscheduler.scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.cron import CronTrigger
import logging
import voluptuous as v
from zuul.model import EventFilter, TriggerEvent
@@ -26,7 +27,7 @@
def __init__(self, trigger_config={}, sched=None, connection=None):
super(TimerTrigger, self).__init__(trigger_config, sched, connection)
- self.apsched = apscheduler.scheduler.Scheduler()
+ self.apsched = BackgroundScheduler()
self.apsched.start()
def _onTrigger(self, pipeline_name, timespec):
@@ -39,8 +40,8 @@
self.log.debug("Adding event %s" % event)
self.sched.addEvent(event)
- def _shutdown(self):
- self.apsched.stop()
+ def stop(self):
+ self.apsched.shutdown()
def getEventFilters(self, trigger_conf):
def toList(item):
@@ -62,7 +63,7 @@
def postConfig(self):
for job in self.apsched.get_jobs():
- self.apsched.unschedule_job(job)
+ job.remove()
for pipeline in self.sched.layout.pipelines.values():
for ef in pipeline.manager.event_filters:
if ef.trigger != self:
@@ -81,14 +82,11 @@
second = parts[5]
else:
second = None
- self.apsched.add_cron_job(self._onTrigger,
- day=dom,
- day_of_week=dow,
- hour=hour,
- minute=minute,
- second=second,
- args=(pipeline.name,
- timespec,))
+ trigger = CronTrigger(day=dom, day_of_week=dow, hour=hour,
+ minute=minute, second=second)
+
+ self.apsched.add_job(self._onTrigger, trigger=trigger,
+ args=(pipeline.name, timespec,))
def getSchema():
diff --git a/zuul/webapp.py b/zuul/webapp.py
index 44c333b..c1c848b 100644
--- a/zuul/webapp.py
+++ b/zuul/webapp.py
@@ -43,16 +43,19 @@
class WebApp(threading.Thread):
log = logging.getLogger("zuul.WebApp")
- def __init__(self, scheduler, port=8001, cache_expiry=1):
+ def __init__(self, scheduler, port=8001, cache_expiry=1,
+ listen_address='0.0.0.0'):
threading.Thread.__init__(self)
self.scheduler = scheduler
+ self.listen_address = listen_address
self.port = port
self.cache_expiry = cache_expiry
self.cache_time = 0
self.cache = None
self.daemon = True
- self.server = httpserver.serve(dec.wsgify(self.app), host='0.0.0.0',
- port=self.port, start_loop=False)
+ self.server = httpserver.serve(
+ dec.wsgify(self.app), host=self.listen_address, port=self.port,
+ start_loop=False)
def run(self):
self.server.serve_forever()