Merge "Support post jobs by supporting rev checkout"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
[DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
.. _`Turbo-Hipster`:
- http://git.openstack.org/cgit/stackforge/turbo-hipster/
+ https://git.openstack.org/cgit/openstack/turbo-hipster/
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 98e4bb8..be9570c 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
Port on which the Gearman server is listening.
``port=4730``
+**check_job_registration**
+ Check to see if job is registered with Gearman or not. When True
+ a build result of NOT_REGISTERED will be return if job is not found.
+ ``check_job_registration=True``
+
gearman_server
""""""""""""""
@@ -394,11 +399,12 @@
approval matching all specified requirements.
*username*
- If present, an approval from this username is required.
+ If present, an approval from this username is required. It is
+ treated as a regular expression.
*email*
If present, an approval with this email address is required. It
- is treated as a regular expression as above.
+ is treated as a regular expression.
*email-filter* (deprecated)
A deprecated alternate spelling of *email*. Only one of *email* or
diff --git a/other-requirements.txt b/other-requirements.txt
new file mode 100644
index 0000000..1ade655
--- /dev/null
+++ b/other-requirements.txt
@@ -0,0 +1,4 @@
+mysql-client [test]
+mysql-server [test]
+postgresql [test]
+postgresql-client [test]
diff --git a/tests/base.py b/tests/base.py
index 7945a0b..2d7f918 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -22,22 +22,22 @@
import os
import pprint
from six.moves import queue as Queue
+from six.moves import urllib
import random
import re
import select
import shutil
+from six.moves import reload_module
import socket
import string
import subprocess
import swiftclient
import threading
import time
-import urllib2
import git
import gear
import fixtures
-import six.moves.urllib.parse as urlparse
import statsd
import testtools
from git import GitCommandError
@@ -479,7 +479,7 @@
self.url = url
def read(self):
- res = urlparse.urlparse(self.url)
+ res = urllib.parse.urlparse(self.url)
path = res.path
project = '/'.join(path.split('/')[2:-2])
ret = '001e# service=git-upload-pack\n'
@@ -862,6 +862,28 @@
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
+ # NOTE(notmorgan): Extract logging overrides for specific libraries
+ # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+ # each. This is used to limit the output during test runs from
+ # libraries that zuul depends on such as gear.
+ log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+ if log_defaults_from_env:
+ for default in log_defaults_from_env.split(','):
+ try:
+ name, level_str = default.split('=', 1)
+ level = getattr(logging, level_str, logging.DEBUG)
+ self.useFixture(fixtures.FakeLogger(
+ name=name,
+ level=level,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
+ except ValueError:
+ # NOTE(notmorgan): Invalid format of the log default,
+ # skip and don't try and apply a logger for the
+ # specified module
+ pass
+
class ZuulTestCase(BaseTestCase):
@@ -876,11 +898,13 @@
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
+ self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
+ os.makedirs(self.state_root)
# Make per test copy of Configuration.
self.setup_config()
@@ -888,6 +912,7 @@
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'layout_config')))
self.config.set('merger', 'git_dir', self.git_root)
+ self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")
@@ -914,8 +939,8 @@
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
# the statsd client object is configured in the statsd module import
- reload(statsd)
- reload(zuul.scheduler)
+ reload_module(statsd)
+ reload_module(zuul.scheduler)
self.gearman_server = FakeGearmanServer()
@@ -944,12 +969,12 @@
self.sched.registerConnections(self.connections)
def URLOpenerFactory(*args, **kw):
- if isinstance(args[0], urllib2.Request):
+ if isinstance(args[0], urllib.request.Request):
return old_urlopen(*args, **kw)
return FakeURLOpener(self.upstream_root, *args, **kw)
- old_urlopen = urllib2.urlopen
- urllib2.urlopen = URLOpenerFactory
+ old_urlopen = urllib.request.urlopen
+ urllib.request.urlopen = URLOpenerFactory
self.merge_server = zuul.merger.server.MergeServer(self.config,
self.connections)
@@ -1304,9 +1329,11 @@
start = time.time()
while True:
if time.time() - start > 10:
- print 'queue status:',
- print ' '.join(self.eventQueuesEmpty())
- print self.areAllBuildsWaiting()
+ self.log.debug("Queue status:")
+ for queue in self.event_queues:
+ self.log.debug(" %s: %s" % (queue, queue.empty()))
+ self.log.debug("All builds waiting: %s" %
+ (self.areAllBuildsWaiting(),))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
self.worker.lock.acquire()
@@ -1344,8 +1371,8 @@
for pipeline in self.sched.layout.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
- print 'pipeline %s queue %s contents %s' % (
- pipeline.name, queue.name, queue.queue)
+ print('pipeline %s queue %s contents %s' % (
+ pipeline.name, queue.name, queue.queue))
self.assertEqual(len(queue.queue), 0,
"Pipelines queues should be empty")
diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml
index 7a549f0..f9e6477 100644
--- a/tests/fixtures/layout-requirement-username.yaml
+++ b/tests/fixtures/layout-requirement-username.yaml
@@ -3,7 +3,7 @@
manager: IndependentPipelineManager
require:
approval:
- - username: jenkins
+ - username: ^(jenkins|zuul)$
trigger:
gerrit:
- event: comment-added
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-import ConfigParser
+from six.moves import configparser as ConfigParser
import os
import re
@@ -33,13 +33,13 @@
class TestLayoutValidator(testtools.TestCase):
def test_layouts(self):
"""Test layout file validation"""
- print
+ print()
errors = []
for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
m = LAYOUT_RE.match(fn)
if not m:
continue
- print fn
+ print(fn)
# Load any .conf file by the same name but .conf extension.
config_file = ("%s.conf" %
@@ -69,7 +69,7 @@
fn)
except voluptuous.Invalid as e:
error = str(e)
- print ' ', error
+ print(' ', error)
if error in errors:
raise Exception("Error has already been tested: %s" %
error)
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..ac19383 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,6 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
+import random
+
+import fixtures
+
from zuul import change_matcher as cm
from zuul import model
@@ -62,3 +67,76 @@
metajob = model.Job('^job')
job.copy(metajob)
self._assert_job_booleans_are_not_none(job)
+
+
+class TestJobTimeData(BaseTestCase):
+ def setUp(self):
+ super(TestJobTimeData, self).setUp()
+ self.tmp_root = self.useFixture(fixtures.TempDir(
+ rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+ ).path
+
+ def test_empty_timedata(self):
+ path = os.path.join(self.tmp_root, 'job-name')
+ self.assertFalse(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+
+ def test_save_reload(self):
+ path = os.path.join(self.tmp_root, 'job-name')
+ self.assertFalse(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+ success_times = []
+ failure_times = []
+ results = []
+ for x in range(10):
+ success_times.append(int(random.random() * 1000))
+ failure_times.append(int(random.random() * 1000))
+ results.append(0)
+ results.append(1)
+ random.shuffle(results)
+ s = f = 0
+ for result in results:
+ if result:
+ td.add(failure_times[f], 'FAILURE')
+ f += 1
+ else:
+ td.add(success_times[s], 'SUCCESS')
+ s += 1
+ self.assertEqual(td.success_times, success_times)
+ self.assertEqual(td.failure_times, failure_times)
+ self.assertEqual(td.results, results[10:])
+ td.save()
+ self.assertTrue(os.path.exists(path))
+ self.assertFalse(os.path.exists(path + '.tmp'))
+ td = model.JobTimeData(path)
+ td.load()
+ self.assertEqual(td.success_times, success_times)
+ self.assertEqual(td.failure_times, failure_times)
+ self.assertEqual(td.results, results[10:])
+
+
+class TestTimeDataBase(BaseTestCase):
+ def setUp(self):
+ super(TestTimeDataBase, self).setUp()
+ self.tmp_root = self.useFixture(fixtures.TempDir(
+ rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+ ).path
+ self.db = model.TimeDataBase(self.tmp_root)
+
+ def test_timedatabase(self):
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
+ self.db.update('job-name', 50, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
+ self.db.update('job-name', 100, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
+ for x in range(10):
+ self.db.update('job-name', 100, 'SUCCESS')
+ self.assertEqual(self.db.getEstimatedTime('job-name'), 100)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index fe7c7cc..ea512a2 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -20,11 +20,10 @@
import re
import shutil
import time
-import urllib
-import urllib2
import yaml
import git
+from six.moves import urllib
import testtools
import zuul.change_matcher
@@ -34,7 +33,6 @@
import zuul.reporter.smtp
from tests.base import (
- BaseTestCase,
ZuulTestCase,
repack_repo,
)
@@ -44,40 +42,6 @@
'%(levelname)-8s %(message)s')
-class TestSchedulerConfigParsing(BaseTestCase):
-
- def test_parse_skip_if(self):
- job_yaml = """
-jobs:
- - name: job_name
- skip-if:
- - project: ^project_name$
- branch: ^stable/icehouse$
- all-files-match-any:
- - ^filename$
- - project: ^project2_name$
- all-files-match-any:
- - ^filename2$
- """.strip()
- data = yaml.load(job_yaml)
- config_job = data.get('jobs')[0]
- sched = zuul.scheduler.Scheduler({})
- cm = zuul.change_matcher
- expected = cm.MatchAny([
- cm.MatchAll([
- cm.ProjectMatcher('^project_name$'),
- cm.BranchMatcher('^stable/icehouse$'),
- cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
- ]),
- cm.MatchAll([
- cm.ProjectMatcher('^project2_name$'),
- cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
- ]),
- ])
- matcher = sched._parseSkipIf(config_job)
- self.assertEqual(expected, matcher)
-
-
class TestScheduler(ZuulTestCase):
def test_jobs_launched(self):
@@ -495,6 +459,46 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def _test_time_database(self, iteration):
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ time.sleep(2)
+
+ data = json.loads(self.sched.formatStatusJSON())
+ found_job = None
+ for pipeline in data['pipelines']:
+ if pipeline['name'] != 'gate':
+ continue
+ for queue in pipeline['change_queues']:
+ for head in queue['heads']:
+ for item in head:
+ for job in item['jobs']:
+ if job['name'] == 'project-merge':
+ found_job = job
+ break
+
+ self.assertIsNotNone(found_job)
+ if iteration == 1:
+ self.assertIsNotNone(found_job['estimated_time'])
+ self.assertIsNone(found_job['remaining_time'])
+ else:
+ self.assertIsNotNone(found_job['estimated_time'])
+ self.assertTrue(found_job['estimated_time'] >= 2)
+ self.assertIsNotNone(found_job['remaining_time'])
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ def test_time_database(self):
+ "Test the time database"
+
+ self._test_time_database(1)
+ self._test_time_database(2)
+
def test_two_failed_changes_at_head(self):
"Test that changes are reparented correctly if 2 fail at head"
@@ -600,6 +604,36 @@
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
+ def test_parse_skip_if(self):
+ job_yaml = """
+jobs:
+ - name: job_name
+ skip-if:
+ - project: ^project_name$
+ branch: ^stable/icehouse$
+ all-files-match-any:
+ - ^filename$
+ - project: ^project2_name$
+ all-files-match-any:
+ - ^filename2$
+ """.strip()
+ data = yaml.load(job_yaml)
+ config_job = data.get('jobs')[0]
+ cm = zuul.change_matcher
+ expected = cm.MatchAny([
+ cm.MatchAll([
+ cm.ProjectMatcher('^project_name$'),
+ cm.BranchMatcher('^stable/icehouse$'),
+ cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
+ ]),
+ cm.MatchAll([
+ cm.ProjectMatcher('^project2_name$'),
+ cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
+ ]),
+ ])
+ matcher = self.sched._parseSkipIf(config_job)
+ self.assertEqual(expected, matcher)
+
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1449,7 +1483,7 @@
self.worker.build_history = []
path = os.path.join(self.git_root, "org/project")
- print repack_repo(path)
+ print(repack_repo(path))
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
@@ -1474,9 +1508,9 @@
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")
- print repack_repo(path)
+ print(repack_repo(path))
path = os.path.join(self.git_root, "org/project1")
- print repack_repo(path)
+ print(repack_repo(path))
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@@ -2240,8 +2274,8 @@
port = self.webapp.server.socket.getsockname()[1]
- req = urllib2.Request("http://localhost:%s/status.json" % port)
- f = urllib2.urlopen(req)
+ req = urllib.request.Request("http://localhost:%s/status.json" % port)
+ f = urllib.request.urlopen(req)
headers = f.info()
self.assertIn('Content-Length', headers)
self.assertIn('Content-Type', headers)
@@ -2846,7 +2880,8 @@
port = self.webapp.server.socket.getsockname()[1]
- f = urllib.urlopen("http://localhost:%s/status.json" % port)
+ req = urllib.request.Request("http://localhost:%s/status.json" % port)
+ f = urllib.request.urlopen(req)
data = f.read()
self.worker.hold_jobs_in_build = False
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..94f097a 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -16,7 +16,8 @@
# under the License.
import json
-import urllib2
+
+from six.moves import urllib
from tests.base import ZuulTestCase
@@ -44,41 +45,41 @@
def test_webapp_status(self):
"Test that we can filter to only certain changes in the webapp."
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_status_compat(self):
# testing compat with status.json
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status.json" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_bad_url(self):
# do we 404 correctly
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/foo" % self.port)
- self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
+ self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
def test_webapp_find_change(self):
# can we filter by change id
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/change/1,1" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)
self.assertEqual("org/project", data[0]['project'])
- req = urllib2.Request(
+ req = urllib.request.Request(
"http://localhost:%s/status/change/2,1" % self.port)
- f = urllib2.urlopen(req)
+ f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 9dbf504..8b854c7 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -35,7 +35,7 @@
if not change['live']:
continue
cid, cps = change['id'].split(',')
- print (
+ print(
"zuul enqueue --trigger gerrit --pipeline %s "
"--project %s --change %s,%s" % (
options.pipeline_name,
diff --git a/tox.ini b/tox.ini
index 79ea939..06ccbcd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=30
+ OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
passenv = ZUUL_TEST_ROOT
usedevelop = True
install_command = pip install {opts} {packages}
@@ -17,7 +18,17 @@
commands =
python setup.py testr --slowest --testr-args='{posargs}'
+[testenv:bindep]
+# Do not install any requirements. We want this to be fast and work even if
+# system dependencies are missing, since it's used to tell you what system
+# dependencies are missing! This also means that bindep must be installed
+# separately, outside of the requirements files.
+deps = bindep
+commands = bindep test
+
[testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
commands = flake8 {posargs}
[testenv:cover]
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
index bb6ec7b..78f3249 100644
--- a/zuul/ansible/library/zuul_console.py
+++ b/zuul/ansible/library/zuul_console.py
@@ -109,12 +109,20 @@
def followConsole(self, console, conn):
while True:
- r = [console.file, conn]
- e = [console.file, conn]
- r, w, e = select.select(r, [], e)
+ # As long as we have unread data, keep reading/sending
+ while True:
+ chunk = console.file.read(4096)
+ if chunk:
+ conn.send(chunk)
+ else:
+ break
- if console.file in e:
- return True
+ # At this point, we are waiting for more data to be written
+ time.sleep(0.5)
+
+ # Check to see if the remote end has sent any data, if so,
+ # discard
+ r, w, e = select.select([conn], [], [conn], 0)
if conn in e:
return False
if conn in r:
@@ -124,19 +132,15 @@
if not ret:
return False
- if console.file in r:
- line = console.file.readline()
- if line:
- conn.send(line)
- time.sleep(0.5)
- try:
- st = os.stat(console.path)
- if (st.st_ino != console.stat.st_ino or
- st.st_size < console.size):
- return True
- except Exception:
+ # See if the file has been truncated
+ try:
+ st = os.stat(console.path)
+ if (st.st_ino != console.stat.st_ino or
+ st.st_size < console.size):
return True
- console.size = st.st_size
+ except Exception:
+ return True
+ console.size = st.st_size
def handleOneConnection(self, conn):
# FIXME: this won't notice disconnects until it tries to send
@@ -166,14 +170,14 @@
def test():
- s = Server('/tmp/console.log', 8088)
+ s = Server('/tmp/console.html', 8088)
s.run()
def main():
module = AnsibleModule(
argument_spec=dict(
- path=dict(default='/tmp/console.log'),
+ path=dict(default='/tmp/console.html'),
port=dict(default=8088, type='int'),
)
)
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
new file mode 100644
index 0000000..4b377d9
--- /dev/null
+++ b/zuul/ansible/library/zuul_log.py
@@ -0,0 +1,58 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+# Copyright (c) 2016 Red Hat
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+
+
+class Console(object):
+ def __enter__(self):
+ self.logfile = open('/tmp/console.html', 'a', 0)
+ return self
+
+ def __exit__(self, etype, value, tb):
+ self.logfile.close()
+
+ def addLine(self, ln):
+ ts = datetime.datetime.now()
+ outln = '%s | %s' % (str(ts), ln)
+ self.logfile.write(outln)
+
+
+def log(msg):
+ if not isinstance(msg, list):
+ msg = [msg]
+ with Console() as console:
+ for line in msg:
+ console.addLine("[Zuul] %s\n" % line)
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ msg=dict(required=True, type='raw'),
+ )
+ )
+
+ p = module.params
+ log(p['msg'])
+ module.exit_json(changed=True)
+
+from ansible.module_utils.basic import * # noqa
+
+if __name__ == '__main__':
+ main()
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 6fc8f2d..7689fb3 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -16,40 +16,95 @@
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import datetime
+import getpass
+import os
import subprocess
+import threading
class Console(object):
def __enter__(self):
- self.logfile = open('/tmp/console.log', 'w+')
+ self.logfile = open('/tmp/console.html', 'a', 0)
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
+ # Note this format with deliminator is "inspired" by the old
+ # Jenkins format but with microsecond resolution instead of
+ # millisecond. It is kept so log parsing/formatting remains
+ # consistent.
ts = datetime.datetime.now()
- outln = '%s %s' % (str(ts), ln)
+ outln = '%s | %s' % (ts, ln)
self.logfile.write(outln)
+def get_env():
+ env = {}
+ env['HOME'] = os.path.expanduser('~')
+ env['USER'] = getpass.getuser()
+
+ # Known locations for PAM mod_env sources
+ for fn in ['/etc/environment', '/etc/default/locale']:
+ if os.path.exists(fn):
+ with open(fn) as f:
+ for line in f:
+ if not line:
+ continue
+ if line[0] == '#':
+ continue
+ if '=' not in line:
+ continue
+ k, v = line.strip().split('=')
+ for q in ["'", '"']:
+ if v[0] == q:
+ v = v.strip(q)
+ env[k] = v
+ return env
+
+
+def follow(fd):
+ newline_warning = False
+ with Console() as console:
+ while True:
+ line = fd.readline()
+ if not line:
+ break
+ if not line.endswith('\n'):
+ line += '\n'
+ newline_warning = True
+ console.addLine(line)
+ if newline_warning:
+ console.addLine('[Zuul] No trailing newline\n')
+
+
def run(cwd, cmd, args):
+ env = get_env()
+ env.update(args)
proc = subprocess.Popen(
- [cmd],
+ ['/bin/bash', '-l', '-c', cmd],
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
- env=args,
+ env=env,
)
- with Console() as console:
- while True:
- line = proc.stdout.readline()
- if not line:
- break
- console.addLine(line)
+ t = threading.Thread(target=follow, args=(proc.stdout,))
+ t.daemon = True
+ t.start()
ret = proc.wait()
+ # Give the thread that is writing the console log up to 10 seconds
+ # to catch up and exit. If it hasn't done so by then, it is very
+ # likely stuck in readline() because it spawed a child that is
+ # holding stdout or stderr open.
+ t.join(10)
+ with Console() as console:
+ if t.isAlive():
+ console.addLine("[Zuul] standard output/error still open "
+ "after child exited")
+ console.addLine("[Zuul] Task exit code: %s\n" % ret)
return ret
@@ -63,7 +118,8 @@
)
p = module.params
- ret = run(p['cwd'], p['command'], p['parameters'])
+ env = p['parameters'].copy()
+ ret = run(p['cwd'], p['command'], env)
if ret == 0:
module.exit_json(changed=True, rc=ret)
else:
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
index 245e988..1cfd10d 100644
--- a/zuul/ansible/plugins/callback_plugins/timeout.py
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -46,12 +46,7 @@
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
- facts = dict(elapsed_time=self._elapsed_time)
-
- overall_timeout = manager.extra_vars.get('timeout')
- if overall_timeout is not None:
- timeout = int(overall_timeout) - int(self._elapsed_time)
- facts['timeout'] = timeout
+ facts = dict(elapsed_time=int(self._elapsed_time))
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import six
from six.moves import configparser as ConfigParser
-import cStringIO
import extras
import logging
import logging.config
@@ -47,7 +47,7 @@
yappi.start()
else:
yappi.stop()
- yappi_out = cStringIO.StringIO()
+ yappi_out = six.BytesIO()
yappi.get_func_stats().print_all(out=yappi_out)
yappi.get_thread_stats().print_all(out=yappi_out)
log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 59ac419..1ce2828 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -154,7 +154,7 @@
running_items = client.get_running_jobs()
if len(running_items) == 0:
- print "No jobs currently running"
+ print("No jobs currently running")
return True
all_fields = self._show_running_jobs_columns()
@@ -181,7 +181,7 @@
v += all_fields[f]['append']
values.append(v)
table.add_row(values)
- print table
+ print(table)
return True
def _epoch_to_relative_time(self, epoch):
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
index 86266b3..49643ae 100644
--- a/zuul/cmd/launcher.py
+++ b/zuul/cmd/launcher.py
@@ -24,12 +24,14 @@
import logging
import os
+import socket
import sys
import signal
import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
-# No zuul imports here because they pull in paramiko which must not be
+# No zuul imports that pull in paramiko here; it must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
@@ -46,53 +48,64 @@
parser.add_argument('--version', dest='version', action='version',
version=self._get_version(),
help='show zuul version')
+ parser.add_argument('--keep-jobdir', dest='keep_jobdir',
+ action='store_true',
+ help='keep local jobdirs after run completes')
+ parser.add_argument('command',
+ choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
+ nargs='?')
+
self.args = parser.parse_args()
- def reconfigure_handler(self, signum, frame):
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
- self.log.debug("Reconfiguration triggered")
- self.read_config()
- self.setup_logging('launcher', 'log_config')
- try:
- self.launcher.reconfigure(self.config)
- except Exception:
- self.log.exception("Reconfiguration failed:")
- signal.signal(signal.SIGHUP, self.reconfigure_handler)
+ def send_command(self, cmd):
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(
+ self.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ path = os.path.join(state_dir, 'launcher.socket')
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(path)
+ s.sendall('%s\n' % cmd)
- def exit_handler(self, signum, frame):
- signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ def exit_handler(self):
self.launcher.stop()
self.launcher.join()
- def main(self):
+ def main(self, daemon=True):
# See comment at top of file about zuul imports
- import zuul.launcher.ansiblelaunchserver
self.setup_logging('launcher', 'log_config')
self.log = logging.getLogger("zuul.Launcher")
LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer
- self.launcher = LaunchServer(self.config)
+ self.launcher = LaunchServer(self.config,
+ keep_jobdir=self.args.keep_jobdir)
self.launcher.start()
- signal.signal(signal.SIGHUP, self.reconfigure_handler)
- signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
- while True:
- try:
- signal.pause()
- except KeyboardInterrupt:
- print "Ctrl + C: asking launcher to exit nicely...\n"
- self.exit_handler(signal.SIGINT, None)
- sys.exit(0)
+ if daemon:
+ self.launcher.join()
+ else:
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking launcher to exit nicely...\n")
+ self.exit_handler()
+ sys.exit(0)
def main():
server = Launcher()
server.parse_arguments()
-
server.read_config()
+
+ if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+ server.send_command(server.args.command)
+ sys.exit(0)
+
server.configure_connections()
if server.config.has_option('launcher', 'pidfile'):
@@ -102,10 +115,10 @@
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
- server.main()
+ server.main(False)
else:
with daemon.DaemonContext(pidfile=pid):
- server.main()
+ server.main(True)
if __name__ == "__main__":
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index df215fd..797a990 100644
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -68,7 +68,7 @@
try:
signal.pause()
except KeyboardInterrupt:
- print "Ctrl + C: asking merger to exit nicely...\n"
+ print("Ctrl + C: asking merger to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
@@ -89,9 +89,7 @@
f.close()
os.unlink(test_fn)
except Exception:
- print
- print "Unable to write to state directory: %s" % state_dir
- print
+ print("\nUnable to write to state directory: %s\n" % state_dir)
raise
if server.config.has_option('merger', 'pidfile'):
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index b1cd050..1fb4a32 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -86,7 +86,8 @@
import zuul.trigger.gerrit
logging.basicConfig(level=logging.DEBUG)
- self.sched = zuul.scheduler.Scheduler(self.config)
+ self.sched = zuul.scheduler.Scheduler(self.config,
+ testonly=True)
self.configure_connections()
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'),
@@ -106,7 +107,7 @@
jobs.add(v)
for job in sorted(layout.jobs):
if job not in jobs:
- print "Job %s not defined" % job
+ print("Job %s not defined" % job)
failure = True
return failure
@@ -116,18 +117,18 @@
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
- import gear
+ import zuul.lib.gearserver
statsd_host = os.environ.get('STATSD_HOST')
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
if self.config.has_option('gearman_server', 'listen_address'):
host = self.config.get('gearman_server', 'listen_address')
else:
host = None
- gear.Server(4730,
- host=host,
- statsd_host=statsd_host,
- statsd_port=statsd_port,
- statsd_prefix='zuul.geard')
+ zuul.lib.gearserver.GearServer(4730,
+ host=host,
+ statsd_host=statsd_host,
+ statsd_port=statsd_port,
+ statsd_prefix='zuul.geard')
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
@@ -195,7 +196,7 @@
try:
signal.pause()
except KeyboardInterrupt:
- print "Ctrl + C: asking scheduler to exit nicely...\n"
+ print("Ctrl + C: asking scheduler to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index a1854f4..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -18,11 +18,11 @@
import json
import time
from six.moves import queue as Queue
+from six.moves import urllib
import paramiko
import logging
import pprint
import voluptuous as v
-import urllib2
from zuul.connection import BaseConnection
from zuul.model import TriggerEvent
@@ -32,7 +32,7 @@
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
- delay = 5.0
+ delay = 10.0
def __init__(self, connection):
super(GerritEventConnector, self).__init__()
@@ -388,10 +388,10 @@
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
- data = urllib2.urlopen(url).read()
+ data = urllib.request.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
- raise # keeps urllib2 error informations
+ raise # keeps urllib error informations
ret = {}
read_headers = False
read_advertisement = False
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 844b390..95fc2fa 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -14,7 +14,6 @@
import json
import logging
-import multiprocessing
import os
import re
import shutil
@@ -23,76 +22,217 @@
import subprocess
import tempfile
import threading
+import time
import traceback
+import Queue
import uuid
import gear
import yaml
import jenkins_jobs.builder
+import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
+ 'verbose', 'unverbose']
+
+
+def boolify(x):
+ if isinstance(x, str):
+ return bool(int(x))
+ return bool(x)
+
+
+class LaunchGearWorker(gear.Worker):
+ def __init__(self, *args, **kw):
+ self.__launch_server = kw.pop('launch_server')
+ super(LaunchGearWorker, self).__init__(*args, **kw)
+
+ def handleNoop(self, packet):
+ workers = len(self.__launch_server.node_workers)
+ delay = (workers ** 2) / 1000.0
+ time.sleep(delay)
+ return super(LaunchGearWorker, self).handleNoop(packet)
+
+
+class NodeGearWorker(gear.Worker):
+ MASS_DO = 101
+
+ def sendMassDo(self, functions):
+ data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
+ self.broadcast_lock.acquire()
+ try:
+ p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+ self.broadcast(p)
+ finally:
+ self.broadcast_lock.release()
+
+
+class Watchdog(object):
+ def __init__(self, timeout, function, args):
+ self.timeout = timeout
+ self.function = function
+ self.args = args
+ self.thread = threading.Thread(target=self._run)
+ self.thread.daemon = True
+
+ def _run(self):
+ while self._running and time.time() < self.end:
+ time.sleep(10)
+ if self._running:
+ self.function(*self.args)
+
+ def start(self):
+ self._running = True
+ self.end = time.time() + self.timeout
+ self.thread.start()
+
+ def stop(self):
+ self._running = False
class JobDir(object):
- def __init__(self):
+ def __init__(self, keep=False):
+ self.keep = keep
self.root = tempfile.mkdtemp()
- self.git_root = os.path.join(self.root, 'git')
- os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
- self.plugins_root = os.path.join(self.ansible_root, 'plugins')
- os.makedirs(self.plugins_root)
+ self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
self.config = os.path.join(self.ansible_root, 'ansible.cfg')
self.script_root = os.path.join(self.ansible_root, 'scripts')
+ self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
os.makedirs(self.script_root)
+ self.staging_root = os.path.join(self.root, 'staging')
+ os.makedirs(self.staging_root)
def __enter__(self):
return self
def __exit__(self, etype, value, tb):
- shutil.rmtree(self.root)
+ if not self.keep:
+ shutil.rmtree(self.root)
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
- section_re = re.compile('site "(.*?)"')
+ site_section_re = re.compile('site "(.*?)"')
+ node_section_re = re.compile('node "(.*?)"')
- def __init__(self, config):
+ def __init__(self, config, keep_jobdir=False):
self.config = config
+ self.options = dict(
+ verbose=False
+ )
+ self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname()
+ self.registered_functions = set()
self.node_workers = {}
- self.mpmanager = multiprocessing.Manager()
- self.jobs = self.mpmanager.dict()
- self.builds = self.mpmanager.dict()
- self.zmq_send_queue = multiprocessing.JoinableQueue()
- self.termination_queue = multiprocessing.JoinableQueue()
+ self.jobs = {}
+ self.builds = {}
+ self.zmq_send_queue = Queue.Queue()
+ self.termination_queue = Queue.Queue()
self.sites = {}
+ self.static_nodes = {}
+ self.command_map = dict(
+ reconfigure=self.reconfigure,
+ stop=self.stop,
+ pause=self.pause,
+ unpause=self.unpause,
+ release=self.release,
+ graceful=self.graceful,
+ verbose=self.verboseOn,
+ unverbose=self.verboseOff,
+ )
+
+ if config.has_option('launcher', 'accept_nodes'):
+ self.accept_nodes = config.getboolean('launcher',
+ 'accept_nodes')
+ else:
+ self.accept_nodes = True
+ self.config_accept_nodes = self.accept_nodes
+
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(
+ self.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ path = os.path.join(state_dir, 'launcher.socket')
+ self.command_socket = commandsocket.CommandSocket(path)
+ ansible_dir = os.path.join(state_dir, 'ansible')
+ plugins_dir = os.path.join(ansible_dir, 'plugins')
+ self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
+ if not os.path.exists(self.callback_dir):
+ os.makedirs(self.callback_dir)
+ self.library_dir = os.path.join(ansible_dir, 'library')
+ if not os.path.exists(self.library_dir):
+ os.makedirs(self.library_dir)
+
+ callback_path = os.path.dirname(os.path.abspath(
+ zuul.ansible.plugins.callback_plugins.__file__))
+ for fn in os.listdir(callback_path):
+ shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
+
+ library_path = os.path.dirname(os.path.abspath(
+ zuul.ansible.library.__file__))
+ for fn in os.listdir(library_path):
+ shutil.copy(os.path.join(library_path, fn), self.library_dir)
for section in config.sections():
- m = self.section_re.match(section)
+ m = self.site_section_re.match(section)
if m:
sitename = m.group(1)
d = {}
d['host'] = config.get(section, 'host')
d['user'] = config.get(section, 'user')
- d['pass'] = config.get(section, 'pass', '')
- d['root'] = config.get(section, 'root', '/')
+ if config.has_option(section, 'pass'):
+ d['pass'] = config.get(section, 'pass')
+ else:
+ d['pass'] = ''
+ if config.has_option(section, 'root'):
+ d['root'] = config.get(section, 'root')
+ else:
+ d['root'] = '/'
self.sites[sitename] = d
+ continue
+ m = self.node_section_re.match(section)
+ if m:
+ nodename = m.group(1)
+ d = {}
+ d['name'] = nodename
+ d['host'] = config.get(section, 'host')
+ if config.has_option(section, 'description'):
+ d['description'] = config.get(section, 'description')
+ else:
+ d['description'] = ''
+ if config.has_option(section, 'labels'):
+ d['labels'] = config.get(section, 'labels').split(',')
+ else:
+ d['labels'] = []
+ self.static_nodes[nodename] = d
+ continue
def start(self):
self._gearman_running = True
self._zmq_running = True
self._reaper_running = True
+ self._command_running = True
# Setup ZMQ
self.zcontext = zmq.Context()
self.zsocket = self.zcontext.socket(zmq.PUB)
- self.zsocket.bind("tcp://*:8881")
+ self.zsocket.bind("tcp://*:8888")
# Setup Gearman
server = self.config.get('gearman', 'server')
@@ -100,13 +240,21 @@
port = self.config.get('gearman', 'port')
else:
port = 4730
- self.worker = gear.Worker('Zuul Launch Server')
+ self.worker = LaunchGearWorker('Zuul Launch Server',
+ launch_server=self)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
+ # Start command socket
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand)
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
# Load JJB config
self.loadJobs()
@@ -128,6 +276,11 @@
self.gearman_thread.daemon = True
self.gearman_thread.start()
+ # Start static workers
+ for node in self.static_nodes.values():
+ self.log.debug("Creating static node with arguments: %s" % (node,))
+ self._launchWorker(node)
+
def loadJobs(self):
self.log.debug("Loading jobs")
builder = JJB()
@@ -136,18 +289,28 @@
builder.parser.expandYaml()
unseen = set(self.jobs.keys())
for job in builder.parser.jobs:
+ builder.expandMacros(job)
self.jobs[job['name']] = job
unseen.discard(job['name'])
for name in unseen:
del self.jobs[name]
def register(self):
- self.worker.registerFunction("node-assign:zuul")
- self.worker.registerFunction("stop:%s" % self.hostname)
+ new_functions = set()
+ if self.accept_nodes:
+ new_functions.add("node_assign:zuul")
+ new_functions.add("stop:%s" % self.hostname)
+ new_functions.add("set_description:%s" % self.hostname)
+ new_functions.add("node_revoke:%s" % self.hostname)
- def reconfigure(self, config):
+ for function in new_functions - self.registered_functions:
+ self.worker.registerFunction(function)
+ for function in self.registered_functions - new_functions:
+ self.worker.unRegisterFunction(function)
+ self.registered_functions = new_functions
+
+ def reconfigure(self):
self.log.debug("Reconfiguring")
- self.config = config
self.loadJobs()
for node in self.node_workers.values():
try:
@@ -156,25 +319,103 @@
except Exception:
self.log.exception("Exception sending reconfigure command "
"to worker:")
+ self.log.debug("Reconfiguration complete")
+
+ def pause(self):
+ self.log.debug("Pausing")
+ self.accept_nodes = False
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='pause'))
+ except Exception:
+ self.log.exception("Exception sending pause command "
+ "to worker:")
+ self.log.debug("Paused")
+
+ def unpause(self):
+ self.log.debug("Unpausing")
+ self.accept_nodes = self.config_accept_nodes
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='unpause'))
+ except Exception:
+ self.log.exception("Exception sending unpause command "
+ "to worker:")
+ self.log.debug("Unpaused")
+
+ def release(self):
+ self.log.debug("Releasing idle nodes")
+ for node in self.node_workers.values():
+ if node.name in self.static_nodes:
+ continue
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='release'))
+ except Exception:
+ self.log.exception("Exception sending release command "
+ "to worker:")
+ self.log.debug("Finished releasing idle nodes")
+
+ def graceful(self):
+ # Note: this is run in the command processing thread; no more
+ # external commands will be processed after this.
+ self.log.debug("Gracefully stopping")
+ self.pause()
+ self.release()
+ self.log.debug("Waiting for all builds to finish")
+ while self.builds:
+ time.sleep(5)
+ self.log.debug("All builds are finished")
+ self.stop()
def stop(self):
self.log.debug("Stopping")
+ # First, stop accepting new jobs
self._gearman_running = False
self._reaper_running = False
self.worker.shutdown()
+ # Then stop all of the workers
for node in self.node_workers.values():
try:
if node.isAlive():
node.stop()
except Exception:
self.log.exception("Exception sending stop command to worker:")
+ # Stop ZMQ afterwords so that the send queue is flushed
self._zmq_running = False
self.zmq_send_queue.put(None)
self.zmq_send_queue.join()
+ # Stop command processing
+ self._command_running = False
+ self.command_socket.stop()
+ # Join the gearman thread which was stopped earlier.
+ self.gearman_thread.join()
+ # The command thread is joined in the join() method of this
+ # class, which is called by the command shell.
self.log.debug("Stopped")
+ def verboseOn(self):
+ self.log.debug("Enabling verbose mode")
+ self.options['verbose'] = True
+
+ def verboseOff(self):
+ self.log.debug("Disabling verbose mode")
+ self.options['verbose'] = False
+
def join(self):
- self.gearman_thread.join()
+ self.command_thread.join()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get()
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
def runZMQ(self):
while self._zmq_running or not self.zmq_send_queue.empty():
@@ -194,12 +435,19 @@
try:
job = self.worker.getJob()
try:
- if job.name.startswith('node-assign:'):
- self.log.debug("Got node-assign job: %s" % job.unique)
+ if job.name.startswith('node_assign:'):
+ self.log.debug("Got node_assign job: %s" % job.unique)
self.assignNode(job)
elif job.name.startswith('stop:'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
+ elif job.name.startswith('set_description:'):
+ self.log.debug("Got set_description job: %s" %
+ job.unique)
+ job.sendWorkComplete()
+ elif job.name.startswith('node_revoke:'):
+ self.log.debug("Got node_revoke job: %s" % job.unique)
+ self.revokeNode(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -214,19 +462,44 @@
def assignNode(self, job):
args = json.loads(job.arguments)
self.log.debug("Assigned node with arguments: %s" % (args,))
+ self._launchWorker(args)
+ data = dict(manager=self.hostname)
+ job.sendWorkData(json.dumps(data))
+ job.sendWorkComplete()
+
+ def _launchWorker(self, args):
worker = NodeWorker(self.config, self.jobs, self.builds,
self.sites, args['name'], args['host'],
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
- self.termination_queue)
+ self.termination_queue, self.keep_jobdir,
+ self.callback_dir, self.library_dir,
+ self.options)
self.node_workers[worker.name] = worker
- worker.process = multiprocessing.Process(target=worker.run)
- worker.process.start()
+ worker.thread = threading.Thread(target=worker.run)
+ worker.thread.start()
- data = dict(manager=self.hostname)
- job.sendWorkData(json.dumps(data))
- job.sendWorkComplete()
+ def revokeNode(self, job):
+ try:
+ args = json.loads(job.arguments)
+ self.log.debug("Revoke job with arguments: %s" % (args,))
+ name = args['name']
+ node = self.node_workers.get(name)
+ if not node:
+ self.log.debug("Unable to find worker %s" % (name,))
+ return
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='stop'))
+ else:
+ self.log.debug("Node %s is not alive while revoking node" %
+ (node.name,))
+ except Exception:
+ self.log.exception("Exception sending stop command "
+ "to worker:")
+ finally:
+ job.sendWorkComplete()
def stopJob(self, job):
try:
@@ -261,6 +534,10 @@
self.log.debug("Got termination event %s" % (item,))
if item is None:
continue
+ worker = self.node_workers[item]
+ self.log.debug("Joining %s" % (item,))
+ worker.thread.join()
+ self.log.debug("Joined %s" % (item,))
del self.node_workers[item]
except Exception:
self.log.exception("Exception while processing "
@@ -270,11 +547,11 @@
class NodeWorker(object):
- log = logging.getLogger("zuul.NodeWorker")
-
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
- termination_queue):
+ termination_queue, keep_jobdir, callback_dir,
+ library_dir, options):
+ self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
@@ -286,17 +563,29 @@
if not isinstance(labels, list):
labels = [labels]
self.labels = labels
- self.process = None
+ self.thread = None
self.registered_functions = set()
+ # If the unpaused Event is set, that means we should run jobs.
+ # If it is clear, then we are paused and should not run jobs.
+ self.unpaused = threading.Event()
+ self.unpaused.set()
self._running = True
- self.queue = multiprocessing.JoinableQueue()
+ self.queue = Queue.Queue()
self.manager_name = manager_name
self.zmq_send_queue = zmq_send_queue
self.termination_queue = termination_queue
+ self.keep_jobdir = keep_jobdir
self.running_job_lock = threading.Lock()
+ self.pending_registration = False
+ self.registration_lock = threading.Lock()
+ self._get_job_lock = threading.Lock()
+ self._got_job = False
self._job_complete_event = threading.Event()
self._running_job = False
+ self._aborted_job = False
self._sent_complete_event = False
+ self.ansible_job_proc = None
+ self.ansible_post_proc = None
self.workspace_root = config.get('launcher', 'workspace_root')
if self.config.has_option('launcher', 'private_key_file'):
self.private_key_file = config.get('launcher', 'private_key_file')
@@ -306,31 +595,36 @@
self.username = config.get('launcher', 'username')
else:
self.username = 'zuul'
+ self.callback_dir = callback_dir
+ self.library_dir = library_dir
+ self.options = options
def isAlive(self):
# Meant to be called from the manager
- if self.process and self.process.is_alive():
+ if self.thread and self.thread.is_alive():
return True
return False
def run(self):
- signal.signal(signal.SIGINT, signal.SIG_IGN)
self.log.debug("Node worker %s starting" % (self.name,))
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
- self.worker = gear.Worker(self.name)
+ self.worker = NodeGearWorker(self.name)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
+ self.log.debug("Registering")
self.register()
self.gearman_thread = threading.Thread(target=self.runGearman)
self.gearman_thread.daemon = True
self.gearman_thread.start()
+ self.log.debug("Started")
+
while self._running or not self.queue.empty():
try:
self._runQueue()
@@ -343,9 +637,31 @@
# will be set by the queue thread.
self.log.debug("Submitting stop request")
self._running = False
+ self.unpaused.set()
self.queue.put(dict(action='stop'))
self.queue.join()
+ def pause(self):
+ self.unpaused.clear()
+ self.worker.stopWaitingForJobs()
+
+ def unpause(self):
+ self.unpaused.set()
+
+ def release(self):
+ # If this node is idle, stop it.
+ old_unpaused = self.unpaused.is_set()
+ if old_unpaused:
+ self.pause()
+ with self._get_job_lock:
+ if self._got_job:
+ self.log.debug("This worker is not idle")
+ if old_unpaused:
+ self.unpause()
+ return
+ self.log.debug("Stopping due to release command")
+ self.queue.put(dict(action='stop'))
+
def _runQueue(self):
item = self.queue.get()
try:
@@ -358,6 +674,15 @@
else:
self._job_complete_event.wait()
self.worker.shutdown()
+ if item['action'] == 'pause':
+ self.log.debug("Received pause request")
+ self.pause()
+ if item['action'] == 'unpause':
+ self.log.debug("Received unpause request")
+ self.unpause()
+ if item['action'] == 'release':
+ self.log.debug("Received release request")
+ self.release()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
@@ -370,15 +695,23 @@
def runGearman(self):
while self._running:
try:
- self._runGearman()
+ self.unpaused.wait()
+ if self._running:
+ self._runGearman()
except Exception:
self.log.exception("Exception in gearman manager:")
+ with self._get_job_lock:
+ self._got_job = False
def _runGearman(self):
- try:
- job = self.worker.getJob()
- except gear.InterruptedError:
- return
+ if self.pending_registration:
+ self.register()
+ with self._get_job_lock:
+ try:
+ job = self.worker.getJob()
+ self._got_job = True
+ except gear.InterruptedError:
+ return
self.log.debug("Node worker %s got job %s" % (self.name, job.name))
try:
if job.name not in self.registered_functions:
@@ -406,24 +739,34 @@
return ret
def register(self):
- if self._running_job:
+ if not self.registration_lock.acquire(False):
+ self.log.debug("Registration already in progress")
return
- new_functions = set()
- for job in self.jobs.values():
- new_functions |= self.generateFunctionNames(job)
- for function in new_functions - self.registered_functions:
- self.worker.registerFunction(function)
- for function in self.registered_functions - new_functions:
- self.worker.unRegisterFunction(function)
- self.registered_functions = new_functions
+ try:
+ if self._running_job:
+ self.pending_registration = True
+ self.log.debug("Ignoring registration due to running job")
+ return
+ self.log.debug("Updating registration")
+ self.pending_registration = False
+ new_functions = set()
+ for job in self.jobs.values():
+ new_functions |= self.generateFunctionNames(job)
+ self.worker.sendMassDo(new_functions)
+ self.registered_functions = new_functions
+ finally:
+ self.registration_lock.release()
def abortRunningJob(self):
+ self._aborted_job = True
+ return self.abortRunningProc(self.ansible_job_proc)
+
+ def abortRunningProc(self, proc):
aborted = False
self.log.debug("Abort: acquiring job lock")
with self.running_job_lock:
if self._running_job:
self.log.debug("Abort: a job is running")
- proc = self.ansible_proc
if proc:
self.log.debug("Abort: sending kill signal to job "
"process group")
@@ -445,15 +788,14 @@
# Make sure we can parse what we need from the job first
args = json.loads(job.arguments)
- # This may be configurable later, or we may choose to honor
- # OFFLINE_NODE_WHEN_COMPLETE
- offline = True
+ offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
job_name = job.name.split(':')[1]
# Initialize the result so we have something regardless of
# whether the job actually runs
result = None
self._sent_complete_event = False
+ self._aborted_job = False
try:
self.sendStartEvent(job_name, args)
@@ -466,11 +808,10 @@
self.log.exception("Exception while launching job thread")
self._running_job = False
- if not result:
- result = b''
try:
- job.sendWorkComplete(result)
+ data = json.dumps(dict(result=result))
+ job.sendWorkComplete(data)
except Exception:
self.log.exception("Exception while sending job completion packet")
@@ -521,7 +862,8 @@
'SUCCESS', {})
def runJob(self, job, args):
- self.ansible_proc = None
+ self.ansible_job_proc = None
+ self.ansible_post_proc = None
result = None
with self.running_job_lock:
if not self._running:
@@ -531,7 +873,7 @@
self.log.debug("Job %s: beginning" % (job.unique,))
self.builds[job.unique] = self.name
- with JobDir() as jobdir:
+ with JobDir(self.keep_jobdir) as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
timeout = self.prepareAnsibleFiles(jobdir, job, args)
@@ -545,13 +887,23 @@
job.sendWorkStatus(0, 100)
job_status = self.runAnsiblePlaybook(jobdir, timeout)
- post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
- if job_status and post_status:
- status = 'SUCCESS'
- else:
- status = 'FAILURE'
+ if job_status is None:
+ # The result of the job is indeterminate. Zuul will
+ # run it again.
+ return result
- result = json.dumps(dict(result=status))
+ post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
+ if not post_status:
+ result = 'POST_FAILURE'
+ elif job_status:
+ result = 'SUCCESS'
+ else:
+ result = 'FAILURE'
+
+ if self._aborted_job:
+ # A Null result will cause zuul to relaunch the job if
+ # it needs to.
+ result = None
return result
@@ -559,56 +911,152 @@
return [('node', dict(
ansible_host=self.host, ansible_user=self.username))]
- def _makeSCPTask(self, publisher):
+ def _substituteVariables(self, text, variables):
+ def lookup(match):
+ return variables.get(match.group(1), '')
+ return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+ def _getRsyncOptions(self, source, parameters):
+ # Treat the publisher source as a filter; ant and rsync behave
+ # fairly close in this manner, except for leading directories.
+ source = self._substituteVariables(source, parameters)
+ # If the source starts with ** then we want to match any
+ # number of directories, so don't anchor the include filter.
+ # If it does not start with **, then the intent is likely to
+ # at least start by matching an immediate file or subdirectory
+ # (even if later we have a ** in the middle), so in this case,
+ # anchor it to the root of the transfer (the workspace).
+ if not source.startswith('**'):
+ source = os.path.join('/', source)
+ # These options mean: include the thing we want, include any
+ # directories (so that we continue to search for the thing we
+ # want no matter how deep it is), exclude anything that
+ # doesn't match the thing we want or is a directory, then get
+ # rid of empty directories left over at the end.
+ rsync_opts = ['--include="%s"' % source,
+ '--include="*/"',
+ '--exclude="*"',
+ '--prune-empty-dirs']
+ return rsync_opts
+
+ def _makeSCPTask(self, jobdir, publisher, parameters):
tasks = []
for scpfile in publisher['scp']['files']:
+ scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+ os.chmod(scproot, 0o755)
+
site = publisher['scp']['site']
- if site not in self.sites:
- raise Exception("Undefined SCP site: %s" % (site,))
- site = self.sites[site]
if scpfile.get('copy-console'):
- src = '/tmp/console.log'
+ # Include the local ansible directory in the console
+ # upload. This uploads the playbook and ansible logs.
+ copyargs = dict(src=jobdir.ansible_root + '/',
+ dest=os.path.join(scproot, '_zuul_ansible'))
+ task = dict(copy=copyargs,
+ delegate_to='127.0.0.1')
+ tasks.append(task)
+
+ # Fetch the console log from the remote host.
+ src = '/tmp/console.html'
+ rsync_opts = []
else:
- src = scpfile['source']
- dest = os.path.join(site['root'], scpfile['target'])
- dest = os.path.normpath(dest)
- if not dest.startswith(site['root']):
- raise Exception("Target path %s is not below site root" %
- (dest,))
+ src = parameters['WORKSPACE']
+ if not src.endswith('/'):
+ src = src + '/'
+ rsync_opts = self._getRsyncOptions(scpfile['source'],
+ parameters)
+
syncargs = dict(src=src,
- dest=dest)
- task = dict(synchronize=syncargs,
- delegate_to=site['host'])
+ dest=scproot,
+ copy_links='yes',
+ mode='pull')
+ if rsync_opts:
+ syncargs['rsync_opts'] = rsync_opts
+ task = dict(synchronize=syncargs)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
tasks.append(task)
+
+ task = self._makeSCPTaskLocalAction(
+ site, scpfile, scproot, parameters)
+ tasks.append(task)
return tasks
- def _makeFTPTask(self, jobdir, publisher):
+ def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+ if site not in self.sites:
+ raise Exception("Undefined SCP site: %s" % (site,))
+ site = self.sites[site]
+ dest = scpfile['target'].lstrip('/')
+ dest = self._substituteVariables(dest, parameters)
+ dest = os.path.join(site['root'], dest)
+ dest = os.path.normpath(dest)
+ if not dest.startswith(site['root']):
+ raise Exception("Target path %s is not below site root" %
+ (dest,))
+
+ rsync_cmd = [
+ '/usr/bin/rsync', '--delay-updates', '-F',
+ '--compress', '-rt', '--safe-links',
+ '--rsync-path="mkdir -p {dest} && rsync"',
+ '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+ '-o StrictHostKeyChecking=no -q"',
+ '--out-format="<<CHANGED>>%i %n%L"',
+ '{source}', '"{user}@{host}:{dest}"'
+ ]
+ if scpfile.get('keep-hierarchy'):
+ source = '"%s/"' % scproot
+ else:
+ source = '`/usr/bin/find "%s" -type f`' % scproot
+ shellargs = ' '.join(rsync_cmd).format(
+ source=source,
+ dest=dest,
+ private_key_file=self.private_key_file,
+ host=site['host'],
+ user=site['user'])
+ task = dict(shell=shellargs,
+ delegate_to='127.0.0.1')
+ if not scpfile.get('copy-after-failure'):
+ task['when'] = 'success'
+
+ return task
+
+ def _makeFTPTask(self, jobdir, publisher, parameters):
tasks = []
ftp = publisher['ftp']
site = ftp['site']
if site not in self.sites:
raise Exception("Undefined FTP site: %s" % site)
site = self.sites[site]
- ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
+
+ ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
ftpcontent = os.path.join(ftproot, 'content')
os.makedirs(ftpcontent)
ftpscript = os.path.join(ftproot, 'script')
- syncargs = dict(src=ftp['source'],
- dest=ftpcontent)
+
+ src = parameters['WORKSPACE']
+ if not src.endswith('/'):
+ src = src + '/'
+ rsync_opts = self._getRsyncOptions(ftp['source'],
+ parameters)
+ syncargs = dict(src=src,
+ dest=ftpcontent,
+ copy_links='yes',
+ mode='pull')
+ if rsync_opts:
+ syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
tasks.append(task)
task = dict(shell='lftp -f %s' % ftpscript,
- when='success')
+ when='success',
+ delegate_to='127.0.0.1')
ftpsource = ftpcontent
if ftp.get('remove-prefix'):
ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
while ftpsource[-1] == '/':
ftpsource = ftpsource[:-1]
- ftptarget = ftp['target']
- ftptarget = os.path.join(site['root'], ftp['target'])
+ ftptarget = ftp['target'].lstrip('/')
+ ftptarget = self._substituteVariables(ftptarget, parameters)
+ ftptarget = os.path.join(site['root'], ftptarget)
ftptarget = os.path.normpath(ftptarget)
if not ftptarget.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
@@ -622,17 +1070,20 @@
tasks.append(task)
return tasks
- def _makeBuilderTask(self, jobdir, builder, parameters, timeout):
+ def _makeBuilderTask(self, jobdir, builder, parameters):
tasks = []
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
- script.write(builder['shell'])
+ data = builder['shell']
+ if not data.startswith('#!'):
+ data = '#!/bin/bash -x\n %s' % (data,)
+ script.write(data)
remote_path = os.path.join('/tmp', script_fn)
copy = dict(src=script_path,
dest=remote_path,
- mode=0555)
+ mode=0o555)
task = dict(copy=copy)
tasks.append(task)
@@ -640,11 +1091,10 @@
cwd=parameters['WORKSPACE'],
parameters=parameters)
task = dict(zuul_runner=runner)
- if timeout:
- task['when'] = '{{ timeout | int > 0 }}'
- task['async'] = '{{ timeout }}'
- else:
- task['async'] = 2 * 60 * 60 # 2 hour default timeout
+ task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
+ 'second timeout')
+ task['when'] = '{{ elapsed_time < timeout | int }}'
+ task['async'] = '{{ timeout | int - elapsed_time }}'
task['poll'] = 5
tasks.append(task)
@@ -655,6 +1105,34 @@
return tasks
+ def _transformPublishers(self, jjb_job):
+ early_publishers = []
+ late_publishers = []
+ old_publishers = jjb_job.get('publishers', [])
+ for publisher in old_publishers:
+ early_scpfiles = []
+ late_scpfiles = []
+ if 'scp' not in publisher:
+ early_publishers.append(publisher)
+ continue
+ copy_console = False
+ for scpfile in publisher['scp']['files']:
+ if scpfile.get('copy-console'):
+ scpfile['keep-hierarchy'] = True
+ late_scpfiles.append(scpfile)
+ copy_console = True
+ else:
+ early_scpfiles.append(scpfile)
+ publisher['scp']['files'] = early_scpfiles + late_scpfiles
+ if copy_console:
+ late_publishers.append(publisher)
+ else:
+ early_publishers.append(publisher)
+ publishers = early_publishers + late_publishers
+ if old_publishers != publishers:
+ self.log.debug("Transformed job publishers")
+ return early_publishers, late_publishers
+
def prepareAnsibleFiles(self, jobdir, gearman_job, args):
job_name = gearman_job.name.split(':')[1]
jjb_job = self.jobs[job_name]
@@ -670,94 +1148,237 @@
inventory.write('\n')
timeout = None
+ timeout_var = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
- timeout = wrapper.get('build-timeout', {})
- if isinstance(timeout, dict):
- timeout = timeout.get('timeout')
- if timeout:
- timeout = timeout * 60
+ build_timeout = wrapper.get('timeout')
+ if isinstance(build_timeout, dict):
+ timeout_var = build_timeout.get('timeout-var')
+ timeout = build_timeout.get('timeout')
+ if timeout is not None:
+ timeout = int(timeout) * 60
+ if not timeout:
+ timeout = ANSIBLE_DEFAULT_TIMEOUT
+ if timeout_var:
+ parameters[timeout_var] = str(timeout * 1000)
with open(jobdir.playbook, 'w') as playbook:
+ pre_tasks = []
tasks = []
+ main_block = []
+ error_block = []
+ variables = []
- task = dict(file=dict(path='/tmp/console.log', state='absent'))
- tasks.append(task)
+ shellargs = "ssh-keyscan %s > %s" % (
+ self.host, jobdir.known_hosts)
+ pre_tasks.append(dict(shell=shellargs,
+ delegate_to='127.0.0.1'))
- task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
- tasks.append(task)
+ tasks.append(dict(block=main_block,
+ rescue=error_block))
+
+ task = dict(file=dict(path='/tmp/console.html', state='absent'))
+ main_block.append(task)
+
+ task = dict(zuul_console=dict(path='/tmp/console.html', port=8088))
+ main_block.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
state='directory'))
- tasks.append(task)
+ main_block.append(task)
+
+ msg = [
+ "Launched by %s" % self.manager_name,
+ "Building remotely on %s in workspace %s" % (
+ self.name, parameters['WORKSPACE'])]
+ task = dict(zuul_log=dict(msg=msg))
+ main_block.append(task)
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
- tasks.extend(self._makeBuilderTask(jobdir, builder,
- parameters, timeout))
- play = dict(hosts='node', name='Job body',
- tasks=tasks)
- playbook.write(yaml.dump([play]))
+ main_block.extend(
+ self._makeBuilderTask(jobdir, builder, parameters))
+ task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
+ main_block.append(task)
+
+ task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
+ error_block.append(task)
+ error_block.append(dict(fail=dict(msg='FAILURE')))
+
+ variables.append(dict(timeout=timeout))
+ play = dict(hosts='node', name='Job body', vars=variables,
+ pre_tasks=pre_tasks, tasks=tasks)
+ playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+ early_publishers, late_publishers = self._transformPublishers(jjb_job)
with open(jobdir.post_playbook, 'w') as playbook:
+ blocks = []
+ for publishers in [early_publishers, late_publishers]:
+ block = []
+ for publisher in publishers:
+ if 'scp' in publisher:
+ block.extend(self._makeSCPTask(jobdir, publisher,
+ parameters))
+ if 'ftp' in publisher:
+ block.extend(self._makeFTPTask(jobdir, publisher,
+ parameters))
+ blocks.append(block)
+
+ # The 'always' section contains the log publishing tasks,
+ # the 'block' contains all the other publishers. This way
+ # we run the log publisher regardless of whether the rest
+ # of the publishers succeed.
tasks = []
- for publisher in jjb_job.get('publishers', []):
- if 'scp' in publisher:
- tasks.extend(self._makeSCPTask(publisher))
- if 'ftp' in publisher:
- tasks.extend(self._makeFTPTask(jobdir, publisher))
+ tasks.append(dict(block=blocks[0],
+ always=blocks[1]))
+
play = dict(hosts='node', name='Publishers',
tasks=tasks)
- playbook.write(yaml.dump([play]))
+ playbook.write(yaml.safe_dump([play], default_flow_style=False))
with open(jobdir.config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
- config.write('host_key_checking = False\n')
+ config.write('keep_remote_files = True\n')
+ config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
+ config.write('retry_files_enabled = False\n')
+ config.write('log_path = %s\n' % jobdir.ansible_log)
+ config.write('gathering = explicit\n')
+ config.write('callback_plugins = %s\n' % self.callback_dir)
+ config.write('library = %s\n' % self.library_dir)
- callback_path = zuul.ansible.plugins.callback_plugins.__file__
- callback_path = os.path.abspath(callback_path)
- callback_path = os.path.dirname(callback_path)
- config.write('callback_plugins = %s\n' % callback_path)
-
- library_path = zuul.ansible.library.__file__
- library_path = os.path.abspath(library_path)
- library_path = os.path.dirname(library_path)
- config.write('library = %s\n' % library_path)
+ config.write('[ssh_connection]\n')
+ ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+ "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+ config.write('ssh_args = %s\n' % ssh_args)
return timeout
+ def _ansibleTimeout(self, proc, msg):
+ self.log.warning(msg)
+ self.abortRunningProc(proc)
+
def runAnsiblePlaybook(self, jobdir, timeout):
- self.ansible_proc = subprocess.Popen(
- ['ansible-playbook', jobdir.playbook,
- '-e', 'timeout=%s' % timeout, '-v'],
+ # Set LOGNAME env variable so Ansible log_path log reports
+ # the correct user.
+ env_copy = os.environ.copy()
+ env_copy['LOGNAME'] = 'zuul'
+
+ if self.options['verbose']:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible-playbook', jobdir.playbook, verbose]
+ self.log.debug("Ansible command: %s" % (cmd,))
+
+ self.ansible_job_proc = subprocess.Popen(
+ cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
+ env=env_copy,
)
- (out, err) = self.ansible_proc.communicate()
- self.log.debug("Ansible stdout:\n%s" % out)
- self.log.debug("Ansible stderr:\n%s" % err)
- ret = self.ansible_proc.wait()
- self.ansible_proc = None
+ ret = None
+ watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+ self._ansibleTimeout,
+ (self.ansible_job_proc,
+ "Ansible timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_job_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible output: %s" % (line,))
+ ret = self.ansible_job_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible exit code: %s" % (ret,))
+ self.ansible_job_proc = None
+ if ret == 3:
+ # AnsibleHostUnreachable: We had a network issue connecting to
+ # our zuul-worker.
+ return None
+ elif ret == -9:
+ # Received abort request.
+ return None
return ret == 0
def runAnsiblePostPlaybook(self, jobdir, success):
- proc = subprocess.Popen(
- ['ansible-playbook', jobdir.post_playbook,
- '-e', 'success=%s' % success],
+ # Set LOGNAME env variable so Ansible log_path log reports
+ # the correct user.
+ env_copy = os.environ.copy()
+ env_copy['LOGNAME'] = 'zuul'
+
+ if self.options['verbose']:
+ verbose = '-vvv'
+ else:
+ verbose = '-v'
+
+ cmd = ['ansible-playbook', jobdir.post_playbook,
+ '-e', 'success=%s' % success, verbose]
+ self.log.debug("Ansible post command: %s" % (cmd,))
+
+ self.ansible_post_proc = subprocess.Popen(
+ cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
+ env=env_copy,
)
- (out, err) = proc.communicate()
- return proc.wait() == 0
+ ret = None
+ watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+ self._ansibleTimeout,
+ (self.ansible_post_proc,
+ "Ansible post timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_post_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible post output: %s" % (line,))
+ ret = self.ansible_post_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible post exit code: %s" % (ret,))
+ self.ansible_post_proc = None
+ return ret == 0
class JJB(jenkins_jobs.builder.Builder):
def __init__(self):
self.global_config = None
self._plugins_list = []
+
+ def expandComponent(self, component_type, component, template_data):
+ component_list_type = component_type + 's'
+ new_components = []
+ if isinstance(component, dict):
+ name, component_data = next(iter(component.items()))
+ if template_data:
+ component_data = jenkins_jobs.formatter.deep_format(
+ component_data, template_data, True)
+ else:
+ name = component
+ component_data = {}
+
+ new_component = self.parser.data.get(component_type, {}).get(name)
+ if new_component:
+ for new_sub_component in new_component[component_list_type]:
+ new_components.extend(
+ self.expandComponent(component_type,
+ new_sub_component, component_data))
+ else:
+ new_components.append({name: component_data})
+ return new_components
+
+ def expandMacros(self, job):
+ for component_type in ['builder', 'publisher', 'wrapper']:
+ component_list_type = component_type + 's'
+ new_components = []
+ for new_component in job.get(component_list_type, []):
+ new_components.extend(self.expandComponent(component_type,
+ new_component, {}))
+ job[component_list_type] = new_components
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 69fb71b..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
import json
import logging
import os
+import six
import time
import threading
from uuid import uuid4
@@ -164,6 +165,11 @@
port = config.get('gearman', 'port')
else:
port = 4730
+ if config.has_option('gearman', 'check_job_registration'):
+ self.job_registration = config.getboolean(
+ 'gearman', 'check_job_registration')
+ else:
+ self.job_registration = True
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
s_config = {}
s_config.update((k, v.format(item=item, job=job,
change=item.change))
- if isinstance(v, basestring)
+ if isinstance(v, six.string_types)
else (k, v)
for k, v in s.items())
@@ -351,7 +357,8 @@
build.__gearman_job = gearman_job
self.builds[uuid] = build
- if not self.isJobRegistered(gearman_job.name):
+ if self.job_registration and not self.isJobRegistered(
+ gearman_job.name):
self.log.error("Job %s is not registered with Gearman" %
gearman_job)
self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -456,9 +463,6 @@
build.number = data.get('number')
build.__gearman_manager = data.get('manager')
self.sched.onBuildStarted(build)
-
- if job.denominator:
- build.estimated_time = float(job.denominator) / 1000
else:
self.log.error("Unable to find build %s" % job.unique)
@@ -505,7 +509,7 @@
# us where the job is running.
return False
- if not self.isJobRegistered(name):
+ if self.job_registration and not self.isJobRegistered(name):
return False
desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
import os
import re
+import six
+
+
OrderedDict = extras.try_imports(['collections.OrderedDict',
'ordereddict.OrderedDict'])
@@ -59,17 +62,17 @@
raise Exception("Expansion error. Check error messages above")
self.log.info("Mapping projects to workspace...")
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
dest = os.path.normpath(os.path.join(workspace, dest[0]))
ret[project] = dest
self.log.info(" %s -> %s", project, dest)
self.log.debug("Checking overlap in destination directories...")
check = defaultdict(list)
- for project, dest in ret.iteritems():
+ for project, dest in six.iteritems(ret):
check[dest].append(project)
- dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+ dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
if dupes:
raise Exception("Some projects share the same destination: %s",
dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index 62ab938..3c7f04d 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
import re
import yaml
+import six
+
from git import GitCommandError
from zuul import exceptions
from zuul.lib.clonemapper import CloneMapper
@@ -68,7 +70,7 @@
dests = mapper.expand(workspace=self.workspace)
self.log.info("Preparing %s repositories", len(dests))
- for project, dest in dests.iteritems():
+ for project, dest in six.iteritems(dests):
self.prepareRepo(project, dest)
self.log.info("Prepared all repositories")
diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py
new file mode 100644
index 0000000..1b7fed9
--- /dev/null
+++ b/zuul/lib/commandsocket.py
@@ -0,0 +1,83 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2016 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import socket
+import threading
+import Queue
+
+
+class CommandSocket(object):
+ log = logging.getLogger("zuul.CommandSocket")
+
+ def __init__(self, path):
+ self.running = False
+ self.path = path
+ self.queue = Queue.Queue()
+
+ def start(self):
+ self.running = True
+ if os.path.exists(self.path):
+ os.unlink(self.path)
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.bind(self.path)
+ self.socket.listen(1)
+ self.socket_thread = threading.Thread(target=self._socketListener)
+ self.socket_thread.daemon = True
+ self.socket_thread.start()
+
+ def stop(self):
+ # First, wake up our listener thread with a connection and
+ # tell it to stop running.
+ self.running = False
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(self.path)
+ s.sendall('_stop\n')
+ # The command '_stop' will be ignored by our listener, so
+ # directly inject it into the queue so that consumers of this
+ # class which are waiting in .get() are awakened. They can
+ # either handle '_stop' or just ignore the unknown command and
+ # then check to see if they should continue to run before
+ # re-entering their loop.
+ self.queue.put('_stop')
+ self.socket_thread.join()
+
+ def _socketListener(self):
+ while self.running:
+ try:
+ s, addr = self.socket.accept()
+ self.log.debug("Accepted socket connection %s" % (s,))
+ buf = ''
+ while True:
+ buf += s.recv(1)
+ if buf[-1] == '\n':
+ break
+ buf = buf.strip()
+ self.log.debug("Received %s from socket" % (buf,))
+ s.close()
+ # Because we use '_stop' internally to wake up a
+ # waiting thread, don't allow it to actually be
+ # injected externally.
+ if buf != '_stop':
+ self.queue.put(buf)
+ except Exception:
+ self.log.exception("Exception in socket handler")
+
+ def get(self):
+ if not self.running:
+ raise Exception("CommandSocket.get called while stopped")
+ return self.queue.get()
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+ def handlePacket(self, packet):
+ if packet.ptype == MASS_DO:
+ self.log.info("Received packet from %s: %s" % (packet.connection,
+ packet))
+ self.handleMassDo(packet)
+ else:
+ return super(GearServer, self).handlePacket(packet)
+
+ def handleMassDo(self, packet):
+ packet.connection.functions = set()
+ for name in packet.data.split(b'\x00'):
+ self.log.debug("Adding function %s to %s" % (
+ name, packet.connection))
+ packet.connection.functions.add(name)
+ self.functions.add(name)
diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py
index 3c411d3..b5d3bc7 100644
--- a/zuul/lib/swift.py
+++ b/zuul/lib/swift.py
@@ -19,8 +19,8 @@
import os
import random
import six
+from six.moves import urllib
import string
-import urlparse
class Swift(object):
@@ -156,7 +156,7 @@
url = os.path.join(self.storage_url, settings['container'],
settings['file_path_prefix'],
destination_prefix)
- u = urlparse.urlparse(url)
+ u = urllib.parse.urlparse(url)
hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
settings['max_file_size'],
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..3bc29e6 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -210,7 +210,7 @@
fd.write('#!/bin/bash\n')
fd.write('ssh -i %s $@\n' % key)
fd.close()
- os.chmod(name, 0755)
+ os.chmod(name, 0o755)
def addProject(self, project, url):
repo = None
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..d56993c 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
import gear
-import merger
+from zuul.merger import merger
class MergeServer(object):
diff --git a/zuul/model.py b/zuul/model.py
index 5bea5d0..46b0b98 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
# under the License.
import copy
+import os
import re
+import struct
import time
from uuid import uuid4
import extras
@@ -108,7 +110,11 @@
return job_tree
def getProjects(self):
- return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+ # cmp is not in python3, applied idiom from
+ # http://python-future.org/compatible_idioms.html#cmp
+ return sorted(
+ self.job_trees.keys(),
+ key=lambda p: p.name)
def addQueue(self, queue):
self.queues.append(queue)
@@ -415,7 +421,7 @@
elif self.window_decrease_type == 'exponential':
self.window = max(
self.window_floor,
- self.window / self.window_decrease_factor)
+ int(self.window / self.window_decrease_factor))
class Project(object):
@@ -1073,7 +1079,7 @@
for a in approvals:
for k, v in a.items():
if k == 'username':
- pass
+ a['username'] = re.compile(v)
elif k in ['email', 'email-filter']:
a['email'] = re.compile(v)
elif k == 'newer-than':
@@ -1092,7 +1098,7 @@
by = approval.get('by', {})
for k, v in rapproval.items():
if k == 'username':
- if (by.get('username', '') != v):
+ if (not v.search(by.get('username', ''))):
return False
elif k == 'email':
if (not v.search(by.get('email', ''))):
@@ -1380,3 +1386,78 @@
job.copy(metajob)
self.jobs[name] = job
return job
+
+
+class JobTimeData(object):
+ format = 'B10H10H10B'
+ version = 0
+
+ def __init__(self, path):
+ self.path = path
+ self.success_times = [0 for x in range(10)]
+ self.failure_times = [0 for x in range(10)]
+ self.results = [0 for x in range(10)]
+
+ def load(self):
+ if not os.path.exists(self.path):
+ return
+ with open(self.path) as f:
+ data = struct.unpack(self.format, f.read())
+ version = data[0]
+ if version != self.version:
+ raise Exception("Unkown data version")
+ self.success_times = list(data[1:11])
+ self.failure_times = list(data[11:21])
+ self.results = list(data[21:32])
+
+ def save(self):
+ tmpfile = self.path + '.tmp'
+ data = [self.version]
+ data.extend(self.success_times)
+ data.extend(self.failure_times)
+ data.extend(self.results)
+ data = struct.pack(self.format, *data)
+ with open(tmpfile, 'w') as f:
+ f.write(data)
+ os.rename(tmpfile, self.path)
+
+ def add(self, elapsed, result):
+ elapsed = int(elapsed)
+ if result == 'SUCCESS':
+ self.success_times.append(elapsed)
+ self.success_times.pop(0)
+ result = 0
+ else:
+ self.failure_times.append(elapsed)
+ self.failure_times.pop(0)
+ result = 1
+ self.results.append(result)
+ self.results.pop(0)
+
+ def getEstimatedTime(self):
+ times = [x for x in self.success_times if x]
+ if times:
+ return float(sum(times)) / len(times)
+ return 0.0
+
+
+class TimeDataBase(object):
+ def __init__(self, root):
+ self.root = root
+ self.jobs = {}
+
+ def _getTD(self, name):
+ td = self.jobs.get(name)
+ if not td:
+ td = JobTimeData(os.path.join(self.root, name))
+ self.jobs[name] = td
+ td.load()
+ return td
+
+ def getEstimatedTime(self, name):
+ return self._getTD(name).getEstimatedTime()
+
+ def update(self, name, elapsed, result):
+ td = self._getTD(name)
+ td.add(elapsed, result)
+ td.save()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d54da9f..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
import gear
import six
-import model
+from zuul import model
class RPCListener(object):
@@ -40,11 +40,11 @@
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
+ self.worker.waitForServer()
+ self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
- self.worker.waitForServer()
- self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
@@ -66,8 +66,8 @@
while self._running:
try:
job = self.worker.getJob()
- z, jobname = job.name.split(':')
self.log.debug("Received job %s" % job.name)
+ z, jobname = job.name.split(':')
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index aea9a67..a30b735 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,6 +20,7 @@
import logging
import os
import pickle
+import six
from six.moves import queue as Queue
import re
import sys
@@ -27,10 +28,10 @@
import time
import yaml
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
from zuul import change_matcher, exceptions
from zuul import version as zuul_version
@@ -125,12 +126,10 @@
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
- self._exception = None
- self._traceback = None
+ self._exc_info = None
- def exception(self, e, tb):
- self._exception = e
- self._traceback = tb
+ def exception(self, exc_info):
+ self._exc_info = exc_info
self._wait_event.set()
def done(self):
@@ -138,8 +137,8 @@
def wait(self, timeout=None):
self._wait_event.wait(timeout)
- if self._exception:
- raise self._exception, None, self._traceback
+ if self._exc_info:
+ six.reraise(*self._exc_info)
return self._wait_event.is_set()
@@ -236,7 +235,7 @@
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
- def __init__(self, config):
+ def __init__(self, config, testonly=False):
threading.Thread.__init__(self)
self.daemon = True
self.wake_event = threading.Event()
@@ -262,6 +261,10 @@
self.management_event_queue = Queue.Queue()
self.layout = model.Layout()
+ if not testonly:
+ time_dir = self._get_time_database_dir()
+ self.time_database = model.TimeDataBase(time_dir)
+
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
@@ -408,7 +411,9 @@
base = os.path.dirname(os.path.realpath(config_path))
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
- execfile(fn, config_env)
+ with open(fn) as _f:
+ code = compile(_f.read(), fn, 'exec')
+ six.exec_(code, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
@@ -740,6 +745,17 @@
state_dir = '/var/lib/zuul'
return os.path.join(state_dir, 'queue.pickle')
+ def _get_time_database_dir(self):
+ if self.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(self.config.get('zuul',
+ 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ d = os.path.join(state_dir, 'times')
+ if not os.path.exists(d):
+ os.mkdir(d)
+ return d
+
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
@@ -1038,8 +1054,8 @@
else:
self.log.error("Unable to handle event %s" % event)
event.done()
- except Exception as e:
- event.exception(e, sys.exc_info()[2])
+ except Exception:
+ event.exception(sys.exc_info())
self.management_event_queue.task_done()
def process_result_queue(self):
@@ -1069,6 +1085,11 @@
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
+ try:
+ build.estimated_time = float(self.time_database.getEstimatedTime(
+ build.job.name))
+ except Exception:
+ self.log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(event.build)
def _doBuildCompletedEvent(self, event):
@@ -1082,6 +1103,13 @@
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
+ if build.end_time and build.start_time and build.result:
+ duration = build.end_time - build.start_time
+ try:
+ self.time_database.update(
+ build.job.name, duration, build.result)
+ except Exception:
+ self.log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):