Merge "Support autoholding nodes for specific changes/refs"
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 2e18b51..88b898f 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -71,7 +71,7 @@
[zookeeper]
hosts=zk1.example.com,zk2.example.com,zk3.example.com
- [webapp]
+ [web]
status_url=https://zuul.example.com/status
[scheduler]
@@ -234,22 +234,7 @@
An openssl file containing the server private key in PEM format.
-.. attr:: webapp
-
- .. attr:: listen_address
- :default: all addresses
-
- IP address or domain name on which to listen.
-
- .. attr:: port
- :default: 8001
-
- Port on which the webapp is listening.
-
- .. attr:: status_expiry
- :default: 1
-
- Zuul will cache the status.json file for this many seconds.
+.. attr:: web
.. attr:: status_url
diff --git a/doc/source/admin/drivers/github.rst b/doc/source/admin/drivers/github.rst
index 4f46af6..83ac77f 100644
--- a/doc/source/admin/drivers/github.rst
+++ b/doc/source/admin/drivers/github.rst
@@ -317,10 +317,10 @@
reporter should set as the commit status on github.
.. TODO support role markup in :default: so we can xref
- :attr:`webapp.status_url` below
+ :attr:`web.status_url` below
.. attr:: status-url
- :default: webapp.status_url or the empty string
+ :default: web.status_url or the empty string
String value for a link url to set in the github
status. Defaults to the zuul server status_url, or the empty
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 0fdb3b2..1c17c28 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -131,15 +131,51 @@
component of the key will be replaced with the hostname of the
executor.
+ .. stat:: merger.<result>
+ :type: counter
+
+ Incremented to represent the status of a Zuul executor's merger
+ operations. ``<result>`` can be either ``SUCCESS`` or ``FAILURE``.
+ A failed merge operation which would be accounted for as a ``FAILURE``
+ is what ends up being returned by Zuul as a ``MERGER_FAILURE``.
+
.. stat:: builds
:type: counter
Incremented each time the executor starts a build.
+ .. stat:: starting_builds
+ :type: gauge
+
+ The number of builds starting on this executor. These are
+ builds which have not yet begun their first pre-playbook.
+
.. stat:: running_builds
:type: gauge
- The number of builds currently running on this executor.
+ The number of builds currently running on this executor. This
+ includes starting builds.
+
+ .. stat:: phase
+
+ Subtree detailing per-phase execution statistics:
+
+ .. stat:: <phase>
+
+ ``<phase>`` represents a phase in the execution of a job.
+ This can be an *internal* phase (such as ``setup`` or ``cleanup``) as
+ well as *job* phases such as ``pre``, ``run`` or ``post``.
+
+ .. stat:: <result>
+ :type: counter
+
+ A counter for each type of result.
+ These results do not, by themselves, determine the status of a build
+ but are indicators of the exit status provided by Ansible for the
+ execution of a particular phase.
+
+ Example of possible counters for each phase are: ``RESULT_NORMAL``,
+ ``RESULT_TIMED_OUT``, ``RESULT_UNREACHABLE``, ``RESULT_ABORTED``.
.. stat:: load_average
:type: gauge
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index 17092af..62b5086 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -39,10 +39,6 @@
port=9000
static_cache_expiry=0
;sql_connection_name=mydatabase
-
-[webapp]
-listen_address=0.0.0.0
-port=8001
status_url=https://zuul.example.com/status
[connection gerrit]
diff --git a/requirements.txt b/requirements.txt
index 3ab5850..f24f195 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,7 +6,7 @@
PyYAML>=3.1.0
Paste
WebOb>=1.2.3
-paramiko>=1.8.0,<2.0.0
+paramiko>=2.0.1
GitPython>=2.1.8
python-daemon>=2.0.4,<2.1.0
extras
diff --git a/tests/base.py b/tests/base.py
index 20e664e..70889bb 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -57,7 +57,6 @@
import zuul.driver.gerrit.gerritconnection as gerritconnection
import zuul.driver.github.githubconnection as githubconnection
import zuul.scheduler
-import zuul.webapp
import zuul.executor.server
import zuul.executor.client
import zuul.lib.connections
@@ -66,9 +65,11 @@
import zuul.merger.server
import zuul.model
import zuul.nodepool
+import zuul.rpcclient
import zuul.zk
import zuul.configloader
from zuul.exceptions import MergeFailure
+from zuul.lib.config import get_default
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@@ -939,7 +940,7 @@
class FakeGithubConnection(githubconnection.GithubConnection):
log = logging.getLogger("zuul.test.FakeGithubConnection")
- def __init__(self, driver, connection_name, connection_config,
+ def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakeGithubConnection, self).__init__(driver, connection_name,
connection_config)
@@ -952,12 +953,16 @@
self.merge_not_allowed_count = 0
self.reports = []
self.github_client = tests.fakegithub.FakeGithub(changes_db)
+ self.rpcclient = rpcclient
def getGithubClient(self,
project=None,
user_id=None):
return self.github_client
+ def setZuulWebPort(self, port):
+ self.zuul_web_port = port
+
def openFakePullRequest(self, project, branch, subject, files=[],
body=None):
self.pr_number += 1
@@ -991,19 +996,25 @@
}
return (name, data)
- def emitEvent(self, event):
+ def emitEvent(self, event, use_zuulweb=False):
"""Emulates sending the GitHub webhook event to the connection."""
- port = self.webapp.server.socket.getsockname()[1]
name, data = event
payload = json.dumps(data).encode('utf8')
secret = self.connection_config['webhook_token']
signature = githubconnection._sign_request(payload, secret)
- headers = {'X-Github-Event': name, 'X-Hub-Signature': signature}
- req = urllib.request.Request(
- 'http://localhost:%s/connection/%s/payload'
- % (port, self.connection_name),
- data=payload, headers=headers)
- return urllib.request.urlopen(req)
+ headers = {'x-github-event': name, 'x-hub-signature': signature}
+
+ if use_zuulweb:
+ req = urllib.request.Request(
+ 'http://127.0.0.1:%s/connection/%s/payload'
+ % (self.zuul_web_port, self.connection_name),
+ data=payload, headers=headers)
+ return urllib.request.urlopen(req)
+ else:
+ job = self.rpcclient.submitJob(
+ 'github:%s:payload' % self.connection_name,
+ {'headers': headers, 'body': data})
+ return json.loads(job.data[0])
def addProject(self, project):
# use the original method here and additionally register it in the
@@ -1978,6 +1989,13 @@
'gearman', 'ssl_key',
os.path.join(FIXTURE_DIR, 'gearman/client.key'))
+ self.rpcclient = zuul.rpcclient.RPCClient(
+ self.config.get('gearman', 'server'),
+ self.gearman_server.port,
+ get_default(self.config, 'gearman', 'ssl_key'),
+ get_default(self.config, 'gearman', 'ssl_cert'),
+ get_default(self.config, 'gearman', 'ssl_ca'))
+
gerritsource.GerritSource.replication_timeout = 1.5
gerritsource.GerritSource.replication_retry_interval = 0.5
gerritconnection.GerritEventConnector.delay = 0.0
@@ -1985,9 +2003,6 @@
self.sched = zuul.scheduler.Scheduler(self.config)
self.sched._stats_interval = 1
- self.webapp = zuul.webapp.WebApp(
- self.sched, port=0, listen_address='127.0.0.1')
-
self.event_queues = [
self.sched.result_event_queue,
self.sched.trigger_event_queue,
@@ -1995,7 +2010,7 @@
]
self.configure_connections()
- self.sched.registerConnections(self.connections, self.webapp)
+ self.sched.registerConnections(self.connections)
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
@@ -2027,7 +2042,6 @@
self.sched.setZooKeeper(self.zk)
self.sched.start()
- self.webapp.start()
self.executor_client.gearman.waitForServer()
# Cleanups are run in reverse order
self.addCleanup(self.assertCleanShutdown)
@@ -2061,6 +2075,7 @@
server = config.get('server', 'github.com')
db = self.github_changes_dbs.setdefault(server, {})
con = FakeGithubConnection(driver, name, config,
+ self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.event_queues.append(con.event_queue)
@@ -2293,8 +2308,7 @@
self.sched.join()
self.statsd.stop()
self.statsd.join()
- self.webapp.stop()
- self.webapp.join()
+ self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.zk.disconnect()
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
index 95ab870..ba35eb0 100644
--- a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
+++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
@@ -1,6 +1,7 @@
- hosts: localhost
tasks:
- command: dd if=/dev/zero of=toobig bs=1M count=2
+ - command: sync
- wait_for:
delay: 10
path: /
diff --git a/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/governor/git/common-config/zuul.yaml b/tests/fixtures/config/governor/git/common-config/zuul.yaml
new file mode 100644
index 0000000..093da16
--- /dev/null
+++ b/tests/fixtures/config/governor/git/common-config/zuul.yaml
@@ -0,0 +1,34 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+ run: playbooks/base.yaml
+
+- job:
+ name: test1
+
+- job:
+ name: test2
+
+- job:
+ name: test3
+
+- project:
+ name: common-config
+ check:
+ jobs:
+ - test1
+ - test2
+ - test3
diff --git a/tests/fixtures/config/governor/main.yaml b/tests/fixtures/config/governor/main.yaml
new file mode 100644
index 0000000..9d01f54
--- /dev/null
+++ b/tests/fixtures/config/governor/main.yaml
@@ -0,0 +1,6 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
diff --git a/tests/fixtures/git_fetch_error.sh b/tests/fixtures/git_fetch_error.sh
new file mode 100755
index 0000000..49c568c
--- /dev/null
+++ b/tests/fixtures/git_fetch_error.sh
@@ -0,0 +1,17 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+ fetch)
+ if [ -f ./stamp1 ]; then
+ touch ./stamp2
+ exit 0
+ fi
+ touch ./stamp1
+ exit 1
+ ;;
+ version)
+ echo "git version 1.0.0"
+ exit 0
+ ;;
+esac
diff --git a/tests/fixtures/zuul-connections-merger.conf b/tests/fixtures/zuul-connections-merger.conf
index 771fc50..15769ef 100644
--- a/tests/fixtures/zuul-connections-merger.conf
+++ b/tests/fixtures/zuul-connections-merger.conf
@@ -1,7 +1,7 @@
[gearman]
server=127.0.0.1
-[webapp]
+[web]
status_url=http://zuul.example.com/status
[merger]
diff --git a/tests/fixtures/zuul-github-driver.conf b/tests/fixtures/zuul-github-driver.conf
index a96bde2..b6a7753 100644
--- a/tests/fixtures/zuul-github-driver.conf
+++ b/tests/fixtures/zuul-github-driver.conf
@@ -1,7 +1,7 @@
[gearman]
server=127.0.0.1
-[webapp]
+[web]
status_url=http://zuul.example.com/status/#{change.number},{change.patchset}
[merger]
diff --git a/tests/fixtures/zuul-push-reqs.conf b/tests/fixtures/zuul-push-reqs.conf
index 2217f94..b902d3f 100644
--- a/tests/fixtures/zuul-push-reqs.conf
+++ b/tests/fixtures/zuul-push-reqs.conf
@@ -1,7 +1,7 @@
[gearman]
server=127.0.0.1
-[webapp]
+[web]
status_url=http://zuul.example.com/status
[merger]
diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py
index 7081b53..e12846d 100644
--- a/tests/unit/test_disk_accountant.py
+++ b/tests/unit/test_disk_accountant.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import fixtures
import os
import tempfile
import time
@@ -32,6 +33,10 @@
class TestDiskAccountant(BaseTestCase):
+ def setUp(self):
+ super(TestDiskAccountant, self).setUp()
+ self.useFixture(fixtures.NestedTempfile())
+
def test_disk_accountant(self):
jobs_dir = tempfile.mkdtemp(
dir=os.environ.get("ZUUL_TEST_ROOT", None))
@@ -47,6 +52,8 @@
testfile = os.path.join(jobdir, 'tfile')
with open(testfile, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
+ tf.flush()
+ os.fsync(tf.fileno())
# da should catch over-limit dir within 5 seconds
for i in range(0, 50):
diff --git a/tests/unit/test_encryption.py b/tests/unit/test_encryption.py
index b424769..0a5c0a4 100644
--- a/tests/unit/test_encryption.py
+++ b/tests/unit/test_encryption.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import fixtures
import os
import subprocess
import tempfile
@@ -26,6 +27,10 @@
def setUp(self):
super(TestEncryption, self).setUp()
self.private, self.public = encryption.generate_rsa_keypair()
+ # Because we set delete to False when using NamedTemporaryFile below
+ # we need to stick our usage of temporary files in the NestedTempfile
+ # fixture ensuring everything gets cleaned up when it is done.
+ self.useFixture(fixtures.NestedTempfile())
def test_serialization(self):
"Verify key serialization"
diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py
index 474859d..46e1d99 100755
--- a/tests/unit/test_executor.py
+++ b/tests/unit/test_executor.py
@@ -15,6 +15,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
import logging
import time
@@ -436,3 +441,78 @@
def test_executor_hostname(self):
self.assertEqual('test-executor-hostname.example.com',
self.executor_server.hostname)
+
+
+class TestGovernor(ZuulTestCase):
+ tenant_config_file = 'config/governor/main.yaml'
+
+ @mock.patch('os.getloadavg')
+ @mock.patch('psutil.virtual_memory')
+ def test_load_governor(self, vm_mock, loadavg_mock):
+ class Dummy(object):
+ pass
+ ram = Dummy()
+ ram.percent = 20.0 # 20% used
+ vm_mock.return_value = ram
+ loadavg_mock.return_value = (0.0, 0.0, 0.0)
+ self.executor_server.manageLoad()
+ self.assertTrue(self.executor_server.accepting_work)
+ ram.percent = 99.0 # 99% used
+ loadavg_mock.return_value = (100.0, 100.0, 100.0)
+ self.executor_server.manageLoad()
+ self.assertFalse(self.executor_server.accepting_work)
+
+ def waitForExecutorBuild(self, jobname):
+ timeout = time.time() + 30
+ build = None
+ while (time.time() < timeout and not build):
+ for b in self.builds:
+ if b.name == jobname:
+ build = b
+ break
+ time.sleep(0.1)
+ build_id = build.uuid
+ while (time.time() < timeout and
+ build_id not in self.executor_server.job_workers):
+ time.sleep(0.1)
+ worker = self.executor_server.job_workers[build_id]
+ while (time.time() < timeout and
+ not worker.started):
+ time.sleep(0.1)
+ return build
+
+ def waitForWorkerCompletion(self, build):
+ timeout = time.time() + 30
+ while (time.time() < timeout and
+ build.uuid in self.executor_server.job_workers):
+ time.sleep(0.1)
+
+ def test_slow_start(self):
+ self.executor_server.hold_jobs_in_build = True
+ self.executor_server.max_starting_builds = 1
+ self.executor_server.min_starting_builds = 1
+ self.executor_server.manageLoad()
+ self.assertTrue(self.executor_server.accepting_work)
+ A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ build1 = self.waitForExecutorBuild('test1')
+ # With one job (test1) being started, we should no longer
+ # be accepting new work
+ self.assertFalse(self.executor_server.accepting_work)
+ self.assertEqual(len(self.executor_server.job_workers), 1)
+ # Allow enough starting builds for the test to complete.
+ self.executor_server.max_starting_builds = 3
+ build1.release()
+ self.waitForWorkerCompletion(build1)
+ self.executor_server.manageLoad()
+
+ self.waitForExecutorBuild('test2')
+ self.waitForExecutorBuild('test3')
+ self.assertFalse(self.executor_server.accepting_work)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+ self.executor_server.manageLoad()
+ self.assertTrue(self.executor_server.accepting_work)
diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py
index a8924b9..ad25c47 100644
--- a/tests/unit/test_gerrit_crd.py
+++ b/tests/unit/test_gerrit_crd.py
@@ -54,8 +54,11 @@
A.setDependsOn(AM1, 1)
AM1.setDependsOn(AM2, 1)
+ # So that at least one test uses the /#/c/ form of the url,
+ # use it here.
+ url = 'https://%s/#/c/%s' % (B.gerrit.server, B.number)
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
- A.subject, B.data['url'])
+ A.subject, url)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index 3942b0b..cd36ba3 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -12,15 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
+import asyncio
+import threading
import os
import re
from testtools.matchers import MatchesRegex, StartsWith
import urllib
+import socket
import time
from unittest import skip
import git
+import zuul.web
+
from tests.base import ZuulTestCase, simple_layout, random_sha1
@@ -734,3 +739,85 @@
# project2 should have no parsed branch
self.assertEqual(0, len(project2.unparsed_branch_config.keys()))
+
+
+class TestGithubWebhook(ZuulTestCase):
+ config_file = 'zuul-github-driver.conf'
+
+ def setUp(self):
+ super(TestGithubWebhook, self).setUp()
+
+ # Start the web server
+ self.web = zuul.web.ZuulWeb(
+ listen_address='127.0.0.1', listen_port=0,
+ gear_server='127.0.0.1', gear_port=self.gearman_server.port,
+ connections=[self.fake_github])
+ loop = asyncio.new_event_loop()
+ loop.set_debug(True)
+ ws_thread = threading.Thread(target=self.web.run, args=(loop,))
+ ws_thread.start()
+ self.addCleanup(loop.close)
+ self.addCleanup(ws_thread.join)
+ self.addCleanup(self.web.stop)
+
+ host = '127.0.0.1'
+ # Wait until web server is started
+ while True:
+ time.sleep(0.1)
+ if self.web.server is None:
+ continue
+ port = self.web.server.sockets[0].getsockname()[1]
+ try:
+ with socket.create_connection((host, port)):
+ break
+ except ConnectionRefusedError:
+ pass
+
+ self.fake_github.setZuulWebPort(port)
+
+ def tearDown(self):
+ super(TestGithubWebhook, self).tearDown()
+
+ @simple_layout('layouts/basic-github.yaml', driver='github')
+ def test_webhook(self):
+ """Test that we can get github events via zuul-web."""
+
+ self.executor_server.hold_jobs_in_build = True
+
+ A = self.fake_github.openFakePullRequest('org/project', 'master', 'A')
+ self.fake_github.emitEvent(A.getPullRequestOpenedEvent(),
+ use_zuulweb=True)
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test1').result)
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test2').result)
+
+ job = self.getJobFromHistory('project-test2')
+ zuulvars = job.parameters['zuul']
+ self.assertEqual(str(A.number), zuulvars['change'])
+ self.assertEqual(str(A.head_sha), zuulvars['patchset'])
+ self.assertEqual('master', zuulvars['branch'])
+ self.assertEqual(1, len(A.comments))
+ self.assertThat(
+ A.comments[0],
+ MatchesRegex('.*\[project-test1 \]\(.*\).*', re.DOTALL))
+ self.assertThat(
+ A.comments[0],
+ MatchesRegex('.*\[project-test2 \]\(.*\).*', re.DOTALL))
+ self.assertEqual(2, len(self.history))
+
+ # test_pull_unmatched_branch_event(self):
+ self.create_branch('org/project', 'unmatched_branch')
+ B = self.fake_github.openFakePullRequest(
+ 'org/project', 'unmatched_branch', 'B')
+ self.fake_github.emitEvent(B.getPullRequestOpenedEvent(),
+ use_zuulweb=True)
+ self.waitUntilSettled()
+
+ self.assertEqual(2, len(self.history))
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index ec30a2b..fb2f199 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -82,7 +82,7 @@
os.path.join(FIXTURE_DIR, 'fake_git.sh'))
work_repo = Repo(parent_path, self.workspace_root,
'none@example.org', 'User Name', '0', '0',
- git_timeout=0.001)
+ git_timeout=0.001, retry_attempts=1)
# TODO: have the merger and repo classes catch fewer
# exceptions, including this one on initialization. For the
# test, we try cloning again.
@@ -93,10 +93,26 @@
def test_fetch_timeout(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
work_repo = Repo(parent_path, self.workspace_root,
- 'none@example.org', 'User Name', '0', '0')
+ 'none@example.org', 'User Name', '0', '0',
+ retry_attempts=1)
work_repo.git_timeout = 0.001
self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
os.path.join(FIXTURE_DIR, 'fake_git.sh'))
with testtools.ExpectedException(git.exc.GitCommandError,
'.*exit code\(-9\)'):
work_repo.update()
+
+ def test_fetch_retry(self):
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0',
+ retry_interval=1)
+ self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+ os.path.join(FIXTURE_DIR, 'git_fetch_error.sh'))
+ work_repo.update()
+ # This is created on the first fetch
+ self.assertTrue(os.path.exists(os.path.join(
+ self.workspace_root, 'stamp1')))
+ # This is created on the second fetch
+ self.assertTrue(os.path.exists(os.path.join(
+ self.workspace_root, 'stamp2')))
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 5ae8607..c833fa2 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -19,14 +19,12 @@
import textwrap
import os
-import re
import shutil
import time
from unittest import skip
import git
import testtools
-import urllib
import zuul.change_matcher
from zuul.driver.gerrit import gerritreporter
@@ -39,6 +37,7 @@
ZuulTestCase,
repack_repo,
simple_layout,
+ iterate_timeout,
)
@@ -2710,110 +2709,6 @@
self.assertEqual(self.history[4].pipeline, 'check')
self.assertEqual(self.history[5].pipeline, 'check')
- def test_json_status(self):
- "Test that we can retrieve JSON status info"
- self.executor_server.hold_jobs_in_build = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addApproval('Code-Review', 2)
- self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
- self.waitUntilSettled()
-
- self.executor_server.release('project-merge')
- self.waitUntilSettled()
-
- port = self.webapp.server.socket.getsockname()[1]
-
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status" % port)
- f = urllib.request.urlopen(req)
- headers = f.info()
- self.assertIn('Content-Length', headers)
- self.assertIn('Content-Type', headers)
- self.assertIsNotNone(re.match('^application/json(; charset=UTF-8)?$',
- headers['Content-Type']))
- self.assertIn('Access-Control-Allow-Origin', headers)
- self.assertIn('Cache-Control', headers)
- self.assertIn('Last-Modified', headers)
- self.assertIn('Expires', headers)
- data = f.read().decode('utf8')
-
- self.executor_server.hold_jobs_in_build = False
- self.executor_server.release()
- self.waitUntilSettled()
-
- data = json.loads(data)
- status_jobs = []
- for p in data['pipelines']:
- for q in p['change_queues']:
- if p['name'] in ['gate', 'conflict']:
- self.assertEqual(q['window'], 20)
- else:
- self.assertEqual(q['window'], 0)
- for head in q['heads']:
- for change in head:
- self.assertTrue(change['active'])
- self.assertEqual(change['id'], '1,1')
- for job in change['jobs']:
- status_jobs.append(job)
- self.assertEqual('project-merge', status_jobs[0]['name'])
- # TODO(mordred) pull uuids from self.builds
- self.assertEqual(
- 'stream.html?uuid={uuid}&logfile=console.log'.format(
- uuid=status_jobs[0]['uuid']),
- status_jobs[0]['url'])
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[0]['uuid']),
- status_jobs[0]['finger_url'])
- # TOOD(mordred) configure a success-url on the base job
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[0]['uuid']),
- status_jobs[0]['report_url'])
- self.assertEqual('project-test1', status_jobs[1]['name'])
- self.assertEqual(
- 'stream.html?uuid={uuid}&logfile=console.log'.format(
- uuid=status_jobs[1]['uuid']),
- status_jobs[1]['url'])
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[1]['uuid']),
- status_jobs[1]['finger_url'])
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[1]['uuid']),
- status_jobs[1]['report_url'])
-
- self.assertEqual('project-test2', status_jobs[2]['name'])
- self.assertEqual(
- 'stream.html?uuid={uuid}&logfile=console.log'.format(
- uuid=status_jobs[2]['uuid']),
- status_jobs[2]['url'])
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[2]['uuid']),
- status_jobs[2]['finger_url'])
- self.assertEqual(
- 'finger://{hostname}/{uuid}'.format(
- hostname=self.executor_server.hostname,
- uuid=status_jobs[2]['uuid']),
- status_jobs[2]['report_url'])
-
- # check job dependencies
- self.assertIsNotNone(status_jobs[0]['dependencies'])
- self.assertIsNotNone(status_jobs[1]['dependencies'])
- self.assertIsNotNone(status_jobs[2]['dependencies'])
- self.assertEqual(len(status_jobs[0]['dependencies']), 0)
- self.assertEqual(len(status_jobs[1]['dependencies']), 1)
- self.assertEqual(len(status_jobs[2]['dependencies']), 1)
- self.assertIn('project-merge', status_jobs[1]['dependencies'])
- self.assertIn('project-merge', status_jobs[2]['dependencies'])
-
def test_reconfigure_merge(self):
"""Test that two reconfigure events are merged"""
@@ -3389,13 +3284,6 @@
self.assertEqual(len(self.builds), 2)
- port = self.webapp.server.socket.getsockname()[1]
-
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status" % port)
- f = urllib.request.urlopen(req)
- data = f.read().decode('utf8')
-
self.executor_server.hold_jobs_in_build = False
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
@@ -3417,16 +3305,6 @@
ref='refs/heads/stable'),
], ordered=False)
- data = json.loads(data)
- status_jobs = set()
- for p in data['pipelines']:
- for q in p['change_queues']:
- for head in q['heads']:
- for change in head:
- for job in change['jobs']:
- status_jobs.add(job['name'])
- self.assertIn('project-bitrot', status_jobs)
-
def test_idle(self):
"Test that frequent periodic jobs work"
# This test can not use simple_layout because it must start
@@ -4659,6 +4537,54 @@
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
+ def test_zookeeper_disconnect2(self):
+ "Test that jobs are executed after a zookeeper disconnect"
+
+ # This tests receiving a ZK disconnect between the arrival of
+ # a fulfilled request and when we accept its nodes.
+ self.fake_nodepool.paused = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ # We're waiting on the nodepool request to complete. Stop the
+ # scheduler from processing further events, then fulfill the
+ # nodepool request.
+ self.sched.run_handler_lock.acquire()
+
+ # Fulfill the nodepool request.
+ self.fake_nodepool.paused = False
+ requests = list(self.sched.nodepool.requests.values())
+ self.assertEqual(1, len(requests))
+ request = requests[0]
+ for x in iterate_timeout(30, 'fulfill request'):
+ if request.fulfilled:
+ break
+ id1 = request.id
+
+ # The request is fulfilled, but the scheduler hasn't processed
+ # it yet. Reconnect ZK.
+ self.zk.client.stop()
+ self.zk.client.start()
+
+ # Allow the scheduler to continue and process the (now
+ # out-of-date) notification that nodes are ready.
+ self.sched.run_handler_lock.release()
+
+ # It should resubmit the request, once it's fulfilled, we can
+ # wait for it to run jobs and settle.
+ for x in iterate_timeout(30, 'fulfill request'):
+ if request.fulfilled:
+ break
+ self.waitUntilSettled()
+
+ id2 = request.id
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ # Make sure it was resubmitted (the id's should be different).
+ self.assertNotEqual(id1, id2)
+
def test_nodepool_failure(self):
"Test that jobs are reported after a nodepool failure"
diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py
index 6881a83..b5ebe9f 100644
--- a/tests/unit/test_web.py
+++ b/tests/unit/test_web.py
@@ -78,14 +78,105 @@
super(TestWeb, self).tearDown()
def test_web_status(self):
- "Test that we can filter to only certain changes in the webapp."
+ "Test that we can retrieve JSON status info"
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ self.executor_server.release('project-merge')
+ self.waitUntilSettled()
req = urllib.request.Request(
"http://localhost:%s/tenant-one/status.json" % self.port)
f = urllib.request.urlopen(req)
- data = json.loads(f.read().decode('utf8'))
+ headers = f.info()
+ self.assertIn('Content-Length', headers)
+ self.assertIn('Content-Type', headers)
+ self.assertEqual(
+ 'application/json; charset=utf-8', headers['Content-Type'])
+ self.assertIn('Access-Control-Allow-Origin', headers)
+ self.assertIn('Cache-Control', headers)
+ self.assertIn('Last-Modified', headers)
+ data = f.read().decode('utf8')
- self.assertIn('pipelines', data)
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ data = json.loads(data)
+ status_jobs = []
+ for p in data['pipelines']:
+ for q in p['change_queues']:
+ if p['name'] in ['gate', 'conflict']:
+ self.assertEqual(q['window'], 20)
+ else:
+ self.assertEqual(q['window'], 0)
+ for head in q['heads']:
+ for change in head:
+ self.assertTrue(change['active'])
+ self.assertIn(change['id'], ('1,1', '2,1', '3,1'))
+ for job in change['jobs']:
+ status_jobs.append(job)
+ self.assertEqual('project-merge', status_jobs[0]['name'])
+ # TODO(mordred) pull uuids from self.builds
+ self.assertEqual(
+ 'stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['finger_url'])
+ # TOOD(mordred) configure a success-url on the base job
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['report_url'])
+ self.assertEqual('project-test1', status_jobs[1]['name'])
+ self.assertEqual(
+ 'stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['finger_url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['report_url'])
+
+ self.assertEqual('project-test2', status_jobs[2]['name'])
+ self.assertEqual(
+ 'stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['finger_url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['report_url'])
+
+ # check job dependencies
+ self.assertIsNotNone(status_jobs[0]['dependencies'])
+ self.assertIsNotNone(status_jobs[1]['dependencies'])
+ self.assertIsNotNone(status_jobs[2]['dependencies'])
+ self.assertEqual(len(status_jobs[0]['dependencies']), 0)
+ self.assertEqual(len(status_jobs[1]['dependencies']), 1)
+ self.assertEqual(len(status_jobs[2]['dependencies']), 1)
+ self.assertIn('project-merge', status_jobs[1]['dependencies'])
+ self.assertIn('project-merge', status_jobs[2]['dependencies'])
def test_web_bad_url(self):
# do we 404 correctly
diff --git a/tests/unit/test_webapp.py b/tests/unit/test_webapp.py
deleted file mode 100644
index c06fc93..0000000
--- a/tests/unit/test_webapp.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# Copyright 2014 Rackspace Australia
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import json
-import urllib
-
-import webob
-
-from tests.base import ZuulTestCase, FIXTURE_DIR
-
-
-class TestWebapp(ZuulTestCase):
- tenant_config_file = 'config/single-tenant/main.yaml'
-
- def setUp(self):
- super(TestWebapp, self).setUp()
- self.executor_server.hold_jobs_in_build = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addApproval('Code-Review', 2)
- self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
- B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
- B.addApproval('Code-Review', 2)
- self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
- self.waitUntilSettled()
- self.port = self.webapp.server.socket.getsockname()[1]
-
- def tearDown(self):
- self.executor_server.hold_jobs_in_build = False
- self.executor_server.release()
- self.waitUntilSettled()
- super(TestWebapp, self).tearDown()
-
- def test_webapp_status(self):
- "Test that we can filter to only certain changes in the webapp."
-
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status" % self.port)
- f = urllib.request.urlopen(req)
- data = json.loads(f.read().decode('utf8'))
-
- self.assertIn('pipelines', data)
-
- def test_webapp_status_compat(self):
- # testing compat with status.json
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status.json" % self.port)
- f = urllib.request.urlopen(req)
- data = json.loads(f.read().decode('utf8'))
-
- self.assertIn('pipelines', data)
-
- def test_webapp_bad_url(self):
- # do we 404 correctly
- req = urllib.request.Request(
- "http://localhost:%s/status/foo" % self.port)
- self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
-
- def test_webapp_find_change(self):
- # can we filter by change id
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status/change/1,1" % self.port)
- f = urllib.request.urlopen(req)
- data = json.loads(f.read().decode('utf8'))
-
- self.assertEqual(1, len(data), data)
- self.assertEqual("org/project", data[0]['project'])
-
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/status/change/2,1" % self.port)
- f = urllib.request.urlopen(req)
- data = json.loads(f.read().decode('utf8'))
-
- self.assertEqual(1, len(data), data)
- self.assertEqual("org/project1", data[0]['project'], data)
-
- def test_webapp_keys(self):
- with open(os.path.join(FIXTURE_DIR, 'public.pem'), 'rb') as f:
- public_pem = f.read()
-
- req = urllib.request.Request(
- "http://localhost:%s/tenant-one/keys/gerrit/org/project.pub" %
- self.port)
- f = urllib.request.urlopen(req)
- self.assertEqual(f.read(), public_pem)
-
- def test_webapp_custom_handler(self):
- def custom_handler(path, tenant_name, request):
- return webob.Response(body='ok')
-
- self.webapp.register_path('/custom', custom_handler)
- req = urllib.request.Request(
- "http://localhost:%s/custom" % self.port)
- f = urllib.request.urlopen(req)
- self.assertEqual(b'ok', f.read())
-
- self.webapp.unregister_path('/custom')
- self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
-
- def test_webapp_404_on_unknown_tenant(self):
- req = urllib.request.Request(
- "http://localhost:{}/non-tenant/status.json".format(self.port))
- e = self.assertRaises(
- urllib.error.HTTPError, urllib.request.urlopen, req)
- self.assertEqual(404, e.code)
diff --git a/tools/github-debugging.py b/tools/github-debugging.py
index 101fd11..da6fd0c 100755
--- a/tools/github-debugging.py
+++ b/tools/github-debugging.py
@@ -11,6 +11,8 @@
# TODO: for real use override the following variables
server = 'github.com'
api_token = 'xxxx'
+appid = 2
+appkey = '/opt/project/appkey'
org = 'example'
repo = 'sandbox'
@@ -42,20 +44,36 @@
return conn
+def create_connection_app(server, appid, appkey):
+ driver = GithubDriver()
+ connection_config = {
+ 'server': server,
+ 'app_id': appid,
+ 'app_key': appkey,
+ }
+ conn = GithubConnection(driver, 'github', connection_config)
+ conn._authenticateGithubAPI()
+ conn._prime_installation_map()
+ return conn
+
+
def get_change(connection: GithubConnection,
org: str,
repo: str,
pull: int) -> Change:
p = Project("%s/%s" % (org, repo), connection.source)
- github = connection.getGithubClient(p)
+ github = connection.getGithubClient(p.name)
pr = github.pull_request(org, repo, pull)
sha = pr.head.sha
return conn._getChange(p, pull, sha, True)
-# create github connection
+# create github connection with api token
conn = create_connection(server, api_token)
+# create github connection with app key
+# conn = create_connection_app(server, appid, appkey)
+
# Now we can do anything we want with the connection, e.g. check canMerge for
# a pull request.
diff --git a/tools/nodepool-integration-setup.sh b/tools/nodepool-integration-setup.sh
index c02a016..58c39cf 100755
--- a/tools/nodepool-integration-setup.sh
+++ b/tools/nodepool-integration-setup.sh
@@ -3,7 +3,7 @@
/usr/zuul-env/bin/zuul-cloner --workspace /tmp --cache-dir /opt/git \
git://git.openstack.org openstack-infra/nodepool
-ln -s /tmp/nodepool/log $WORKSPACE/logs
+ln -s /tmp/nodepool/log $HOME/logs
cd /tmp/openstack-infra/nodepool
/usr/local/jenkins/slave_scripts/install-distro-packages.sh
diff --git a/tox.ini b/tox.ini
index 5326ba4..e5035bd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,7 +6,7 @@
[testenv]
basepython = python3
setenv = VIRTUAL_ENV={envdir}
- OS_TEST_TIMEOUT=120
+ OS_TEST_TIMEOUT=150
passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE OS_LOG_DEFAULTS
usedevelop = True
install_command = pip install {opts} {packages}
@@ -50,6 +50,6 @@
[flake8]
# These are ignored intentionally in openstack-infra projects;
# please don't submit patches that solely correct them or enable them.
-ignore = E125,E129,E402,E741,H,W503
+ignore = E124,E125,E129,E402,E741,H,W503
show-source = True
exclude = .venv,.tox,dist,doc,build,*.egg
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 7748a80..68c9000 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -120,7 +120,6 @@
import zuul.executor.client
import zuul.merger.client
import zuul.nodepool
- import zuul.webapp
import zuul.zk
if (self.config.has_option('gearman_server', 'start') and
@@ -144,15 +143,6 @@
zookeeper.connect(zookeeper_hosts, timeout=zookeeper_timeout)
- cache_expiry = get_default(self.config, 'webapp', 'status_expiry', 1)
- listen_address = get_default(self.config, 'webapp', 'listen_address',
- '0.0.0.0')
- port = get_default(self.config, 'webapp', 'port', 8001)
-
- webapp = zuul.webapp.WebApp(
- self.sched, port=port, cache_expiry=cache_expiry,
- listen_address=listen_address)
-
self.configure_connections()
self.sched.setExecutor(gearman)
self.sched.setMerger(merger)
@@ -162,7 +152,7 @@
self.log.info('Starting scheduler')
try:
self.sched.start()
- self.sched.registerConnections(self.connections, webapp)
+ self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
self.sched.resume()
except Exception:
@@ -170,8 +160,6 @@
# TODO(jeblair): If we had all threads marked as daemon,
# we might be able to have a nicer way of exiting here.
sys.exit(1)
- self.log.info('Starting Webapp')
- webapp.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py
index ad3062f..abdb1cb 100755
--- a/zuul/cmd/web.py
+++ b/zuul/cmd/web.py
@@ -22,7 +22,6 @@
import zuul.cmd
import zuul.web
-from zuul.driver.sql import sqlconnection
from zuul.lib.config import get_default
@@ -49,29 +48,15 @@
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
- sql_conn_name = get_default(self.config, 'web',
- 'sql_connection_name')
- sql_conn = None
- if sql_conn_name:
- # we want a specific sql connection
- sql_conn = self.connections.connections.get(sql_conn_name)
- if not sql_conn:
- self.log.error("Couldn't find sql connection '%s'" %
- sql_conn_name)
+ params['connections'] = []
+ # Validate config here before we spin up the ZuulWeb object
+ for conn_name, connection in self.connections.connections.items():
+ try:
+ if connection.validateWebConfig(self.config, self.connections):
+ params['connections'].append(connection)
+ except Exception:
+ self.log.exception("Error validating config")
sys.exit(1)
- else:
- # look for any sql connection
- connections = [c for c in self.connections.connections.values()
- if isinstance(c, sqlconnection.SQLConnection)]
- if len(connections) > 1:
- self.log.error("Multiple sql connection found, "
- "set the sql_connection_name option "
- "in zuul.conf [web] section")
- sys.exit(1)
- if connections:
- # use this sql connection by default
- sql_conn = connections[0]
- params['sql_connection'] = sql_conn
try:
self.web = zuul.web.ZuulWeb(**params)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 483495d..86f14d6 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -75,20 +75,29 @@
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
- def registerWebapp(self, webapp):
- self.webapp = webapp
+ def getWebHandlers(self, zuul_web):
+ """Return a list of web handlers to register with zuul-web.
- def registerHttpHandler(self, path, handler):
- """Add connection handler for HTTP URI.
-
- Connection can use builtin HTTP server for listening on incoming event
- requests. The resulting path will be /connection/connection_name/path.
+ :param zuul.web.ZuulWeb zuul_web:
+ Zuul Web instance.
+ :returns: List of `zuul.web.handler.BaseWebHandler` instances.
"""
- self.webapp.register_path(self._connectionPath(path), handler)
+ return []
- def unregisterHttpHandler(self, path):
- """Remove the connection handler for HTTP URI."""
- self.webapp.unregister_path(self._connectionPath(path))
+ def validateWebConfig(self, config, connections):
+ """Validate config and determine whether to register web handlers.
- def _connectionPath(self, path):
- return '/connection/%s/%s' % (self.connection_name, path)
+ By default this method returns False, which means this connection
+ has no web handlers to register.
+
+ If the method returns True, then its `getWebHandlers` method
+ should be called during route registration.
+
+ If there is a fatal error, the method should raise an exception.
+
+ :param config:
+ The parsed config object.
+ :param zuul.lib.connections.ConnectionRegistry connections:
+ Registry of all configured connections.
+ """
+ return False
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index ed8e7ad..8f3408e 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -54,7 +54,10 @@
parsed = urllib.parse.urlparse(url)
except ValueError:
return None
- m = self.change_re.match(parsed.path)
+ path = parsed.path
+ if parsed.fragment:
+ path += '#' + parsed.fragment
+ m = self.change_re.match(path)
if not m:
return None
try:
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 02cbfdb..27d31b4 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,20 +21,25 @@
import threading
import time
import re
+import json
+import traceback
+from aiohttp import web
import cachecontrol
from cachecontrol.cache import DictCache
from cachecontrol.heuristics import BaseHeuristic
import iso8601
import jwt
import requests
-import webob
-import webob.dec
import voluptuous as v
import github3
import github3.exceptions
+import gear
+
from zuul.connection import BaseConnection
+from zuul.web.handler import BaseDriverWebHandler
+from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
@@ -65,71 +70,101 @@
utc = UTC()
-class GithubWebhookListener():
-
- log = logging.getLogger("zuul.GithubWebhookListener")
+class GithubGearmanWorker(object):
+ """A thread that answers gearman requests"""
+ log = logging.getLogger("zuul.GithubGearmanWorker")
def __init__(self, connection):
+ self.config = connection.sched.config
self.connection = connection
+ self.thread = threading.Thread(target=self._run,
+ name='github-gearman-worker')
+ self._running = False
+ handler = "github:%s:payload" % self.connection.connection_name
+ self.jobs = {
+ handler: self.handle_payload,
+ }
- def handle_request(self, path, tenant_name, request):
- if request.method != 'POST':
- self.log.debug("Only POST method is allowed.")
- raise webob.exc.HTTPMethodNotAllowed(
- 'Only POST method is allowed.')
+ def _run(self):
+ while self._running:
+ try:
+ job = self.gearman.getJob()
+ try:
+ if job.name not in self.jobs:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ continue
+ output = self.jobs[job.name](json.loads(job.arguments))
+ job.sendWorkComplete(json.dumps(output))
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(
+ traceback.format_exc().encode('utf8'))
+ except gear.InterruptedError:
+ pass
+ except Exception:
+ self.log.exception("Exception while getting job")
- delivery = request.headers.get('X-GitHub-Delivery')
+ def handle_payload(self, args):
+ headers = args.get("headers")
+ body = args.get("body")
+
+ delivery = headers.get('X-GitHub-Delivery')
self.log.debug("Github Webhook Received: {delivery}".format(
delivery=delivery))
- self._validate_signature(request)
# TODO(jlk): Validate project in the request is a project we know
try:
- self.__dispatch_event(request)
+ self.__dispatch_event(body, headers)
+ output = {'return_code': 200}
except Exception:
+ output = {'return_code': 503}
self.log.exception("Exception handling Github event:")
- def __dispatch_event(self, request):
+ return output
+
+ def __dispatch_event(self, body, headers):
try:
- event = request.headers['X-Github-Event']
+ event = headers['x-github-event']
self.log.debug("X-Github-Event: " + event)
except KeyError:
self.log.debug("Request headers missing the X-Github-Event.")
- raise webob.exc.HTTPBadRequest('Please specify a X-Github-Event '
- 'header.')
+ raise Exception('Please specify a X-Github-Event header.')
try:
- json_body = request.json_body
- self.connection.addEvent(json_body, event)
+ self.connection.addEvent(body, event)
except Exception:
message = 'Exception deserializing JSON body'
self.log.exception(message)
- raise webob.exc.HTTPBadRequest(message)
+ # TODO(jlk): Raise this as something different?
+ raise Exception(message)
- def _validate_signature(self, request):
- secret = self.connection.connection_config.get('webhook_token', None)
- if secret is None:
- raise RuntimeError("webhook_token is required")
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+ self.gearman = gear.TextWorker('Zuul Github Connector')
+ self.log.debug("Connect to gearman")
+ self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+ self.log.debug("Waiting for server")
+ self.gearman.waitForServer()
+ self.log.debug("Registering")
+ for job in self.jobs:
+ self.gearman.registerFunction(job)
+ self.thread.start()
- body = request.body
- try:
- request_signature = request.headers['X-Hub-Signature']
- except KeyError:
- raise webob.exc.HTTPUnauthorized(
- 'Please specify a X-Hub-Signature header with secret.')
-
- payload_signature = _sign_request(body, secret)
-
- self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
- self.log.debug("Request Signature: {0}".format(str(request_signature)))
- if not hmac.compare_digest(
- str(payload_signature), str(request_signature)):
- raise webob.exc.HTTPUnauthorized(
- 'Request signature does not match calculated payload '
- 'signature. Check that secret is correct.')
-
- return True
+ def stop(self):
+ self._running = False
+ self.gearman.stopWaitingForJobs()
+ # We join here to avoid whitelisting the thread -- if it takes more
+ # than 5s to stop in tests, there's a problem.
+ self.thread.join(timeout=5)
+ self.gearman.shutdown()
class GithubEventConnector(threading.Thread):
@@ -423,6 +458,7 @@
self._github = None
self.app_id = None
self.app_key = None
+ self.sched = None
self.installation_map = {}
self.installation_token_cache = {}
@@ -458,15 +494,18 @@
re.MULTILINE | re.IGNORECASE)
def onLoad(self):
- webhook_listener = GithubWebhookListener(self)
- self.registerHttpHandler(self.payload_path,
- webhook_listener.handle_request)
+ self.log.info('Starting GitHub connection: %s' % self.connection_name)
+ self.gearman_worker = GithubGearmanWorker(self)
+ self.log.info('Authing to GitHub')
self._authenticateGithubAPI()
self._prime_installation_map()
+ self.log.info('Starting event connector')
self._start_event_connector()
+ self.log.info('Starting GearmanWorker')
+ self.gearman_worker.start()
def onStop(self):
- self.unregisterHttpHandler(self.payload_path)
+ self.gearman_worker.stop()
self._stop_event_connector()
def _start_event_connector(self):
@@ -789,7 +828,8 @@
change.updated_at = self._ghTimestampToDate(
change.pr.get('updated_at'))
- self.sched.onChangeUpdated(change)
+ if self.sched:
+ self.sched.onChangeUpdated(change)
return change
@@ -1100,6 +1140,69 @@
return statuses
+ def getWebHandlers(self, zuul_web):
+ return [GithubWebhookHandler(self, zuul_web, 'POST', 'payload')]
+
+ def validateWebConfig(self, config, connections):
+ if 'webhook_token' not in self.connection_config:
+ raise Exception(
+ "webhook_token not found in config for connection %s" %
+ self.connection_name)
+ return True
+
+
+class GithubWebhookHandler(BaseDriverWebHandler):
+
+ log = logging.getLogger("zuul.GithubWebhookHandler")
+
+ def __init__(self, connection, zuul_web, method, path):
+ super(GithubWebhookHandler, self).__init__(
+ connection=connection, zuul_web=zuul_web, method=method, path=path)
+ self.token = self.connection.connection_config.get('webhook_token')
+
+ def _validate_signature(self, body, headers):
+ try:
+ request_signature = headers['x-hub-signature']
+ except KeyError:
+ raise web.HTTPUnauthorized(
+ reason='X-Hub-Signature header missing.')
+
+ payload_signature = _sign_request(body, self.token)
+
+ self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
+ self.log.debug("Request Signature: {0}".format(str(request_signature)))
+ if not hmac.compare_digest(
+ str(payload_signature), str(request_signature)):
+ raise web.HTTPUnauthorized(
+ reason=('Request signature does not match calculated payload '
+ 'signature. Check that secret is correct.'))
+
+ return True
+
+ async def handleRequest(self, request):
+ # Note(tobiash): We need to normalize the headers. Otherwise we will
+ # have trouble to get them from the dict afterwards.
+ # e.g.
+ # GitHub: sent: X-GitHub-Event received: X-GitHub-Event
+ # urllib: sent: X-GitHub-Event received: X-Github-Event
+ #
+ # We cannot easily solve this mismatch as every http processing lib
+ # modifies the header casing in its own way and by specification http
+ # headers are case insensitive so just lowercase all so we don't have
+ # to take care later.
+ headers = dict()
+ for key, value in request.headers.items():
+ headers[key.lower()] = value
+ body = await request.read()
+ self._validate_signature(body, headers)
+ # We cannot send the raw body through gearman, so it's easy to just
+ # encode it as json, after decoding it as utf-8
+ json_body = json.loads(body.decode('utf-8'))
+ job = self.zuul_web.rpc.submitJob(
+ 'github:%s:payload' % self.connection.connection_name,
+ {'headers': headers, 'body': json_body})
+ return web.json_response(json.loads(job.data[0]))
+
def _status_as_tuple(status):
"""Translate a status into a tuple of user, context, state"""
diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py
index 848ae1b..57b594b 100644
--- a/zuul/driver/github/githubreporter.py
+++ b/zuul/driver/github/githubreporter.py
@@ -105,8 +105,8 @@
url_pattern = self.config.get('status-url')
if not url_pattern:
sched_config = self.connection.sched.config
- if sched_config.has_option('webapp', 'status_url'):
- url_pattern = sched_config.get('webapp', 'status_url')
+ if sched_config.has_option('web', 'status_url'):
+ url_pattern = sched_config.get('web', 'status_url')
url = item.formatUrlPattern(url_pattern) if url_pattern else ''
description = '%s status: %s' % (item.pipeline.name,
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index 715d72b..501a2c5 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -14,14 +14,19 @@
import logging
+from aiohttp import web
import alembic
import alembic.command
import alembic.config
import sqlalchemy as sa
import sqlalchemy.pool
-import voluptuous as v
+from sqlalchemy.sql import select
+import urllib.parse
+import voluptuous
from zuul.connection import BaseConnection
+from zuul.lib.config import get_default
+from zuul.web.handler import BaseWebHandler, StaticHandler
BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
@@ -120,7 +125,122 @@
return zuul_buildset_table, zuul_build_table
+ def getWebHandlers(self, zuul_web):
+ return [
+ SqlWebHandler(self, zuul_web, 'GET', '/{tenant}/builds.json'),
+ StaticHandler(zuul_web, '/{tenant}/builds.html'),
+ ]
+
+ def validateWebConfig(self, config, connections):
+ sql_conn_name = get_default(config, 'web', 'sql_connection_name')
+ if sql_conn_name:
+ # The config wants a specific sql connection. Check the whole
+ # list of connections to make sure it can be satisfied.
+ sql_conn = connections.connections.get(sql_conn_name)
+ if not sql_conn:
+ raise Exception(
+ "Couldn't find sql connection '%s'" % sql_conn_name)
+ if self.connection_name == sql_conn.connection_name:
+ return True
+ else:
+ # Check to see if there is more than one connection
+ conn_objects = [c for c in connections.connections.values()
+ if isinstance(c, SQLConnection)]
+ if len(conn_objects) > 1:
+ raise Exception("Multiple sql connection found, "
+ "set the sql_connection_name option "
+ "in zuul.conf [web] section")
+ return True
+
+
+class SqlWebHandler(BaseWebHandler):
+ log = logging.getLogger("zuul.web.SqlHandler")
+ filters = ("project", "pipeline", "change", "patchset", "ref",
+ "result", "uuid", "job_name", "voting", "node_name", "newrev")
+
+ def __init__(self, connection, zuul_web, method, path):
+ super(SqlWebHandler, self).__init__(
+ connection=connection, zuul_web=zuul_web, method=method, path=path)
+
+ def query(self, args):
+ build = self.connection.zuul_build_table
+ buildset = self.connection.zuul_buildset_table
+ query = select([
+ buildset.c.project,
+ buildset.c.pipeline,
+ buildset.c.change,
+ buildset.c.patchset,
+ buildset.c.ref,
+ buildset.c.newrev,
+ buildset.c.ref_url,
+ build.c.result,
+ build.c.uuid,
+ build.c.job_name,
+ build.c.voting,
+ build.c.node_name,
+ build.c.start_time,
+ build.c.end_time,
+ build.c.log_url]).select_from(build.join(buildset))
+ for table in ('build', 'buildset'):
+ for key, val in args['%s_filters' % table].items():
+ if table == 'build':
+ column = build.c
+ else:
+ column = buildset.c
+ query = query.where(getattr(column, key).in_(val))
+ return query.limit(args['limit']).offset(args['skip']).order_by(
+ build.c.id.desc())
+
+ async def get_builds(self, args):
+ """Return a list of build"""
+ builds = []
+ with self.connection.engine.begin() as conn:
+ query = self.query(args)
+ for row in conn.execute(query):
+ build = dict(row)
+ # Convert date to iso format
+ if row.start_time:
+ build['start_time'] = row.start_time.strftime(
+ '%Y-%m-%dT%H:%M:%S')
+ if row.end_time:
+ build['end_time'] = row.end_time.strftime(
+ '%Y-%m-%dT%H:%M:%S')
+ # Compute run duration
+ if row.start_time and row.end_time:
+ build['duration'] = (row.end_time -
+ row.start_time).total_seconds()
+ builds.append(build)
+ return builds
+
+ async def handleRequest(self, request):
+ try:
+ args = {
+ 'buildset_filters': {},
+ 'build_filters': {},
+ 'limit': 50,
+ 'skip': 0,
+ }
+ for k, v in urllib.parse.parse_qsl(request.rel_url.query_string):
+ if k in ("tenant", "project", "pipeline", "change",
+ "patchset", "ref", "newrev"):
+ args['buildset_filters'].setdefault(k, []).append(v)
+ elif k in ("uuid", "job_name", "voting", "node_name",
+ "result"):
+ args['build_filters'].setdefault(k, []).append(v)
+ elif k in ("limit", "skip"):
+ args[k] = int(v)
+ else:
+ raise ValueError("Unknown parameter %s" % k)
+ data = await self.get_builds(args)
+ resp = web.json_response(data)
+ resp.headers['Access-Control-Allow-Origin'] = '*'
+ except Exception as e:
+ self.log.exception("Jobs exception:")
+ resp = web.json_response({'error_description': 'Internal error'},
+ status=500)
+ return resp
+
def getSchema():
- sql_connection = v.Any(str, v.Schema(dict))
+ sql_connection = voluptuous.Any(str, voluptuous.Schema(dict))
return sql_connection
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 52e54bb..a831a53 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -574,6 +574,7 @@
self.proc = None
self.proc_lock = threading.Lock()
self.running = False
+ self.started = False # Whether playbooks have started running
self.aborted = False
self.aborted_reason = None
self.thread = None
@@ -779,8 +780,17 @@
ret = merger.mergeChanges(items, repo_state=repo_state)
if not ret: # merge conflict
result = dict(result='MERGER_FAILURE')
+ if self.executor_server.statsd:
+ base_key = ("zuul.executor.%s.merger" %
+ self.executor_server.hostname)
+ self.executor_server.statsd.incr(base_key + ".FAILURE")
self.job.sendWorkComplete(json.dumps(result))
return False
+
+ if self.executor_server.statsd:
+ base_key = ("zuul.executor.%s.merger" %
+ self.executor_server.hostname)
+ self.executor_server.statsd.incr(base_key + ".SUCCESS")
recent = ret[3]
for key, commit in recent.items():
(connection, project, branch) = key
@@ -850,6 +860,7 @@
pre_failed = False
success = False
+ self.started = True
for index, playbook in enumerate(self.jobdir.pre_playbooks):
# TODOv3(pabelanger): Implement pre-run timeout setting.
pre_status, pre_code = self.runAnsiblePlaybook(
@@ -1463,6 +1474,11 @@
wrapped=False)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
+ if self.executor_server.statsd:
+ base_key = ("zuul.executor.%s.phase.setup" %
+ self.executor_server.hostname)
+ self.executor_server.statsd.incr(base_key + ".%s" %
+ self.RESULT_MAP[result])
return result, code
def runAnsibleCleanup(self, playbook):
@@ -1483,6 +1499,11 @@
wrapped=False)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
+ if self.executor_server.statsd:
+ base_key = ("zuul.executor.%s.phase.cleanup" %
+ self.executor_server.hostname)
+ self.executor_server.statsd.incr(base_key + ".%s" %
+ self.RESULT_MAP[result])
return result, code
def emitPlaybookBanner(self, playbook, step, phase, result=None):
@@ -1552,6 +1573,11 @@
cmd=cmd, timeout=timeout, playbook=playbook)
self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code))
+ if self.executor_server.statsd:
+ base_key = ("zuul.executor.%s.phase.%s" %
+ (self.executor_server.hostname, phase or 'unknown'))
+ self.executor_server.statsd.incr(base_key + ".%s" %
+ self.RESULT_MAP[result])
self.emitPlaybookBanner(playbook, 'END', phase, result=result)
return result, code
@@ -1598,9 +1624,10 @@
# TODOv3(mordred): make the executor name more unique --
# perhaps hostname+pid.
self.hostname = get_default(self.config, 'executor', 'hostname',
- socket.gethostname())
+ socket.getfqdn())
self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock()
+ self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
self.verbose = False
self.command_map = dict(
@@ -1632,6 +1659,8 @@
load_multiplier = float(get_default(self.config, 'executor',
'load_multiplier', '2.5'))
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.max_starting_builds = self.max_load_avg * 2
+ self.min_starting_builds = 4
self.min_avail_mem = float(get_default(self.config, 'executor',
'min_avail_mem', '5.0'))
self.accepting_work = False
@@ -1751,6 +1780,10 @@
if self._running:
self.accepting_work = True
self.executor_worker.registerFunction("executor:execute")
+ # TODO(jeblair): Update geard to send a noop after
+ # registering for a job which is in the queue, then remove
+ # this API violation.
+ self.executor_worker._sendGrabJobUniq()
def unregister_work(self):
self.accepting_work = False
@@ -1890,22 +1923,21 @@
self.log.exception("Exception while getting job")
def mergerJobDispatch(self, job):
- with self.run_lock:
- if job.name == 'merger:cat':
- self.log.debug("Got cat job: %s" % job.unique)
- self.cat(job)
- elif job.name == 'merger:merge':
- self.log.debug("Got merge job: %s" % job.unique)
- self.merge(job)
- elif job.name == 'merger:refstate':
- self.log.debug("Got refstate job: %s" % job.unique)
- self.refstate(job)
- elif job.name == 'merger:fileschanges':
- self.log.debug("Got fileschanges job: %s" % job.unique)
- self.fileschanges(job)
- else:
- self.log.error("Unable to handle job %s" % job.name)
- job.sendWorkFail()
+ if job.name == 'merger:cat':
+ self.log.debug("Got cat job: %s" % job.unique)
+ self.cat(job)
+ elif job.name == 'merger:merge':
+ self.log.debug("Got merge job: %s" % job.unique)
+ self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
+ elif job.name == 'merger:fileschanges':
+ self.log.debug("Got fileschanges job: %s" % job.unique)
+ self.fileschanges(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
def run_executor(self):
self.log.debug("Starting executor listener")
@@ -1944,9 +1976,10 @@
self.statsd.incr(base_key + '.builds')
self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[job.unique].run()
+ self.manageLoad()
def run_governor(self):
- while not self.governor_stop_event.wait(30):
+ while not self.governor_stop_event.wait(10):
try:
self.manageLoad()
except Exception:
@@ -1954,12 +1987,23 @@
def manageLoad(self):
''' Apply some heuristics to decide whether or not we should
- be askign for more jobs '''
+ be asking for more jobs '''
+ with self.governor_lock:
+ return self._manageLoad()
+
+ def _manageLoad(self):
load_avg = os.getloadavg()[0]
avail_mem_pct = 100.0 - psutil.virtual_memory().percent
+ starting_builds = 0
+ for worker in self.job_workers.values():
+ if not worker.started:
+ starting_builds += 1
+ max_starting_builds = max(
+ self.max_starting_builds - len(self.job_workers),
+ self.min_starting_builds)
if self.accepting_work:
# Don't unregister if we don't have any active jobs.
- if load_avg > self.max_load_avg and self.job_workers:
+ if load_avg > self.max_load_avg:
self.log.info(
"Unregistering due to high system load {} > {}".format(
load_avg, self.max_load_avg))
@@ -1969,14 +2013,20 @@
"Unregistering due to low memory {:3.1f}% < {}".format(
avail_mem_pct, self.min_avail_mem))
self.unregister_work()
+ elif starting_builds >= max_starting_builds:
+ self.log.info(
+ "Unregistering due to too many starting builds {} >= {}"
+ .format(starting_builds, max_starting_builds))
+ self.unregister_work()
elif (load_avg <= self.max_load_avg and
- avail_mem_pct >= self.min_avail_mem):
+ avail_mem_pct >= self.min_avail_mem and
+ starting_builds < max_starting_builds):
self.log.info(
"Re-registering as job is within limits "
- "{} <= {} {:3.1f}% <= {}".format(load_avg,
- self.max_load_avg,
- avail_mem_pct,
- self.min_avail_mem))
+ "{} <= {} {:3.1f}% <= {} {} < {}".format(
+ load_avg, self.max_load_avg,
+ avail_mem_pct, self.min_avail_mem,
+ starting_builds, max_starting_builds))
self.register_work()
if self.statsd:
base_key = 'zuul.executor.%s' % self.hostname
@@ -1986,6 +2036,8 @@
int(avail_mem_pct * 100))
self.statsd.gauge(base_key + '.running_builds',
len(self.job_workers))
+ self.statsd.gauge(base_key + '.starting_builds',
+ starting_builds)
def finishJob(self, unique):
del(self.job_workers[unique])
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 3b3f1ae..995eeb7 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -66,13 +66,6 @@
if load:
connection.onLoad()
- def registerWebapp(self, webapp):
- for driver_name, driver in self.drivers.items():
- if hasattr(driver, 'registerWebapp'):
- driver.registerWebapp(webapp)
- for connection_name, connection in self.connections.items():
- connection.registerWebapp(webapp)
-
def reconfigureDrivers(self, tenant):
for driver in self.drivers.values():
if hasattr(driver, 'reconfigure'):
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 035dbf5..07f3e69 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -17,6 +17,7 @@
import logging
import os
import shutil
+import time
import git
import gitdb
@@ -52,14 +53,10 @@
raise
-class ZuulReference(git.Reference):
- _common_path_default = "refs/zuul"
- _points_to_commits_only = True
-
-
class Repo(object):
def __init__(self, remote, local, email, username, speed_limit, speed_time,
- sshkey=None, cache_path=None, logger=None, git_timeout=300):
+ sshkey=None, cache_path=None, logger=None, git_timeout=300,
+ retry_attempts=3, retry_interval=30):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
@@ -78,6 +75,8 @@
self.username = username
self.cache_path = cache_path
self._initialized = False
+ self.retry_attempts = retry_attempts
+ self.retry_interval = retry_interval
try:
self._ensure_cloned()
except Exception:
@@ -123,14 +122,37 @@
def _git_clone(self, url):
mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env)
- with timeout_handler(self.local_path):
- mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
- kill_after_timeout=self.git_timeout)
+
+ for attempt in range(1, self.retry_attempts + 1):
+ try:
+ with timeout_handler(self.local_path):
+ mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+ kill_after_timeout=self.git_timeout)
+ break
+ except Exception as e:
+ if attempt < self.retry_attempts:
+ time.sleep(self.retry_interval)
+ self.log.warning("Retry %s: Clone %s" % (
+ attempt, self.local_path))
+ else:
+ raise
def _git_fetch(self, repo, remote, ref=None, **kwargs):
- with timeout_handler(self.local_path):
- repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
- **kwargs)
+ for attempt in range(1, self.retry_attempts + 1):
+ try:
+ with timeout_handler(self.local_path):
+ repo.git.fetch(remote, ref,
+ kill_after_timeout=self.git_timeout,
+ **kwargs)
+ break
+ except Exception as e:
+ if attempt < self.retry_attempts:
+ time.sleep(self.retry_interval)
+ self.log.exception("Retry %s: Fetch %s %s %s" % (
+ attempt, self.local_path, remote, ref))
+ self._ensure_cloned()
+ else:
+ raise
def createRepoObject(self):
self._ensure_cloned()
@@ -280,12 +302,6 @@
repo = self.createRepoObject()
self._git_fetch(repo, repository, ref)
- def createZuulRef(self, ref, commit='HEAD'):
- repo = self.createRepoObject()
- self.log.debug("CreateZuulRef %s at %s on %s" % (ref, commit, repo))
- ref = ZuulReference.create(repo, ref, commit)
- return ref.commit
-
def push(self, local, remote):
repo = self.createRepoObject()
self.log.debug("Pushing %s:%s to %s" % (local, remote,
@@ -516,20 +532,6 @@
return None
# Store this commit as the most recent for this project-branch
recent[key] = commit
- # Set the Zuul ref for this item to point to the most recent
- # commits of each project-branch
- for key, mrc in recent.items():
- connection, project, branch = key
- zuul_ref = None
- try:
- repo = self.getRepo(connection, project)
- zuul_ref = branch + '/' + item['buildset_uuid']
- if not repo.getCommitFromRef(zuul_ref):
- repo.createZuulRef(zuul_ref, mrc)
- except Exception:
- self.log.exception("Unable to set zuul ref %s for "
- "item %s" % (zuul_ref, item))
- return None
return commit
def mergeChanges(self, items, files=None, dirs=None, repo_state=None):
diff --git a/zuul/model.py b/zuul/model.py
index 9cfbd0a..38f2d6b 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -2549,6 +2549,7 @@
# that override some attribute of the job. These aspects all
# inherit from the reference definition.
noop = Job('noop')
+ noop.description = 'A job that will always succeed, no operation.'
noop.parent = noop.BASE_JOB_MARKER
noop.run = 'noop.yaml'
self.jobs = {'noop': [noop]}
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
self.log.debug("Updating node request %s" % (request,))
if request.uid not in self.requests:
+ self.log.debug("Request %s is unknown" % (request.uid,))
return False
if request.canceled:
@@ -193,14 +194,21 @@
def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
- # nodes for (potential) use.
+ # nodes for (potential) use. Return False if there is a
+ # problem with the request (canceled or retrying), True if it
+ # is ready to be acted upon (success or failure).
self.log.info("Accepting node request %s" % (request,))
if request_id != request.id:
self.log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
- return
+ return False
+
+ if request.canceled:
+ self.log.info("Ignoring canceled node request %s" % (request,))
+ # The request was already deleted when it was canceled
+ return False
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
if not self.sched.zk.nodeRequestExists(request):
- self.log.info("Request %s no longer exists", request.id)
- return
-
- if request.canceled:
- self.log.info("Ignoring canceled node request %s" % (request,))
- # The request was already deleted when it was canceled
- return
+ self.log.info("Request %s no longer exists, resubmitting",
+ request.id)
+ request.id = None
+ request.state = model.STATE_REQUESTED
+ self.requests[request.uid] = request
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ return False
locked = False
if request.fulfilled:
@@ -239,3 +247,4 @@
# them.
if locked:
self.unlockNodeSet(request.nodeset)
+ return True
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index 1bff5cb..e05ee06 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -75,7 +75,7 @@
def _formatItemReportStart(self, item, with_jobs=True):
status_url = get_default(self.connection.sched.config,
- 'webapp', 'status_url', '')
+ 'web', 'status_url', '')
return item.pipeline.start_message.format(pipeline=item.pipeline,
status_url=status_url)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index f86bc89..1b65871 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -215,7 +215,7 @@
def __init__(self, config, testonly=False):
threading.Thread.__init__(self)
self.daemon = True
- self.hostname = socket.gethostname()
+ self.hostname = socket.getfqdn()
self.wake_event = threading.Event()
self.layout_lock = threading.Lock()
self.run_handler_lock = threading.Lock()
@@ -293,11 +293,10 @@
except Exception:
self.log.exception("Exception while processing command")
- def registerConnections(self, connections, webapp, load=True):
+ def registerConnections(self, connections, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
- self.connections.registerWebapp(webapp)
self.connections.registerScheduler(self, load)
def stopConnections(self):
@@ -1096,8 +1095,8 @@
request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request, request_id)
- if request.canceled:
+ ready = self.nodepool.acceptNodes(request, request_id)
+ if not ready:
return
if build_set is not build_set.item.current_build_set:
@@ -1145,6 +1144,10 @@
pipelines = []
data['pipelines'] = pipelines
tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ self.log.warning("Tenant %s isn't loaded" % tenant_name)
+ return json.dumps(
+ {"message": "Tenant %s isn't ready" % tenant_name})
for pipeline in tenant.layout.pipelines.values():
pipelines.append(pipeline.formatStatusJSON(websocket_url))
return json.dumps(data)
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index a98a6c8..adbafb5 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -20,15 +20,13 @@
import logging
import os
import time
-import urllib.parse
import uvloop
import aiohttp
from aiohttp import web
-from sqlalchemy.sql import select
-
import zuul.rpcclient
+from zuul.web.handler import StaticHandler
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
@@ -161,11 +159,11 @@
'key_get': self.key_get,
}
- def tenant_list(self, request):
+ async def tenant_list(self, request):
job = self.rpc.submitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
- def status_get(self, request):
+ async def status_get(self, request):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
@@ -179,14 +177,14 @@
resp.last_modified = self.cache_time[tenant]
return resp
- def job_list(self, request):
+ async def job_list(self, request):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
resp = web.json_response(json.loads(job.data[0]))
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
- def key_get(self, request):
+ async def key_get(self, request):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
@@ -195,7 +193,7 @@
async def processRequest(self, request, action):
try:
- resp = self.controllers[action](request)
+ resp = await self.controllers[action](request)
except asyncio.CancelledError:
self.log.debug("request handling cancelled")
except Exception as e:
@@ -205,93 +203,6 @@
return resp
-class SqlHandler(object):
- log = logging.getLogger("zuul.web.SqlHandler")
- filters = ("project", "pipeline", "change", "patchset", "ref",
- "result", "uuid", "job_name", "voting", "node_name", "newrev")
-
- def __init__(self, connection):
- self.connection = connection
-
- def query(self, args):
- build = self.connection.zuul_build_table
- buildset = self.connection.zuul_buildset_table
- query = select([
- buildset.c.project,
- buildset.c.pipeline,
- buildset.c.change,
- buildset.c.patchset,
- buildset.c.ref,
- buildset.c.newrev,
- buildset.c.ref_url,
- build.c.result,
- build.c.uuid,
- build.c.job_name,
- build.c.voting,
- build.c.node_name,
- build.c.start_time,
- build.c.end_time,
- build.c.log_url]).select_from(build.join(buildset))
- for table in ('build', 'buildset'):
- for k, v in args['%s_filters' % table].items():
- if table == 'build':
- column = build.c
- else:
- column = buildset.c
- query = query.where(getattr(column, k).in_(v))
- return query.limit(args['limit']).offset(args['skip']).order_by(
- build.c.id.desc())
-
- def get_builds(self, args):
- """Return a list of build"""
- builds = []
- with self.connection.engine.begin() as conn:
- query = self.query(args)
- for row in conn.execute(query):
- build = dict(row)
- # Convert date to iso format
- if row.start_time:
- build['start_time'] = row.start_time.strftime(
- '%Y-%m-%dT%H:%M:%S')
- if row.end_time:
- build['end_time'] = row.end_time.strftime(
- '%Y-%m-%dT%H:%M:%S')
- # Compute run duration
- if row.start_time and row.end_time:
- build['duration'] = (row.end_time -
- row.start_time).total_seconds()
- builds.append(build)
- return builds
-
- async def processRequest(self, request):
- try:
- args = {
- 'buildset_filters': {},
- 'build_filters': {},
- 'limit': 50,
- 'skip': 0,
- }
- for k, v in urllib.parse.parse_qsl(request.rel_url.query_string):
- if k in ("tenant", "project", "pipeline", "change",
- "patchset", "ref", "newrev"):
- args['buildset_filters'].setdefault(k, []).append(v)
- elif k in ("uuid", "job_name", "voting", "node_name",
- "result"):
- args['build_filters'].setdefault(k, []).append(v)
- elif k in ("limit", "skip"):
- args[k] = int(v)
- else:
- raise ValueError("Unknown parameter %s" % k)
- data = self.get_builds(args)
- resp = web.json_response(data)
- resp.headers['Access-Control-Allow-Origin'] = '*'
- except Exception as e:
- self.log.exception("Jobs exception:")
- resp = web.json_response({'error_description': 'Internal error'},
- status=500)
- return resp
-
-
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
@@ -300,7 +211,7 @@
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
- sql_connection=None):
+ connections=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.event_loop = None
@@ -312,10 +223,10 @@
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc)
- if sql_connection:
- self.sql_handler = SqlHandler(sql_connection)
- else:
- self.sql_handler = None
+ self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler]
+ connections = connections or []
+ for connection in connections:
+ self._plugin_routes.extend(connection.getWebHandlers(self))
async def _handleWebsocket(self, request):
return await self.log_streaming_handler.processRequest(
@@ -331,30 +242,9 @@
async def _handleJobsRequest(self, request):
return await self.gearman_handler.processRequest(request, 'job_list')
- async def _handleSqlRequest(self, request):
- return await self.sql_handler.processRequest(request)
-
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
- async def _handleStaticRequest(self, request):
- fp = None
- if request.path.endswith("tenants.html") or request.path.endswith("/"):
- fp = os.path.join(STATIC_DIR, "index.html")
- elif request.path.endswith("status.html"):
- fp = os.path.join(STATIC_DIR, "status.html")
- elif request.path.endswith("jobs.html"):
- fp = os.path.join(STATIC_DIR, "jobs.html")
- elif request.path.endswith("builds.html"):
- fp = os.path.join(STATIC_DIR, "builds.html")
- elif request.path.endswith("stream.html"):
- fp = os.path.join(STATIC_DIR, "stream.html")
- headers = {}
- if self.static_cache_expiry:
- headers['Cache-Control'] = "public, max-age=%d" % \
- self.static_cache_expiry
- return web.FileResponse(fp, headers=headers)
-
def run(self, loop=None):
"""
Run the websocket daemon.
@@ -372,18 +262,18 @@
('GET', '/{tenant}/jobs.json', self._handleJobsRequest),
('GET', '/{tenant}/console-stream', self._handleWebsocket),
('GET', '/{tenant}/{project:.*}.pub', self._handleKeyRequest),
- ('GET', '/{tenant}/status.html', self._handleStaticRequest),
- ('GET', '/{tenant}/jobs.html', self._handleStaticRequest),
- ('GET', '/{tenant}/stream.html', self._handleStaticRequest),
- ('GET', '/tenants.html', self._handleStaticRequest),
- ('GET', '/', self._handleStaticRequest),
]
- if self.sql_handler:
- routes.append(('GET', '/{tenant}/builds.json',
- self._handleSqlRequest))
- routes.append(('GET', '/{tenant}/builds.html',
- self._handleStaticRequest))
+ static_routes = [
+ StaticHandler(self, '/{tenant}/status.html'),
+ StaticHandler(self, '/{tenant}/jobs.html'),
+ StaticHandler(self, '/{tenant}/stream.html'),
+ StaticHandler(self, '/tenants.html', 'index.html'),
+ StaticHandler(self, '/', 'index.html'),
+ ]
+
+ for route in static_routes + self._plugin_routes:
+ routes.append((route.method, route.path, route.handleRequest))
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
diff --git a/zuul/web/handler.py b/zuul/web/handler.py
new file mode 100644
index 0000000..43a4695
--- /dev/null
+++ b/zuul/web/handler.py
@@ -0,0 +1,61 @@
+# Copyright 2018 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import os
+
+from aiohttp import web
+
+STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
+
+
+class BaseWebHandler(object, metaclass=abc.ABCMeta):
+
+ def __init__(self, connection, zuul_web, method, path):
+ self.connection = connection
+ self.zuul_web = zuul_web
+ self.method = method
+ self.path = path
+
+ @abc.abstractmethod
+ async def handleRequest(self, request):
+ """Process a web request."""
+
+
+class BaseDriverWebHandler(BaseWebHandler):
+
+ def __init__(self, connection, zuul_web, method, path):
+ super(BaseDriverWebHandler, self).__init__(
+ connection=connection, zuul_web=zuul_web, method=method, path=path)
+ if path.startswith('/'):
+ path = path[1:]
+ self.path = '/connection/{connection}/{path}'.format(
+ connection=self.connection.connection_name,
+ path=path)
+
+
+class StaticHandler(BaseWebHandler):
+
+ def __init__(self, zuul_web, path, file_path=None):
+ super(StaticHandler, self).__init__(None, zuul_web, 'GET', path)
+ self.file_path = file_path or path.split('/')[-1]
+
+ async def handleRequest(self, request):
+ """Process a web request."""
+ headers = {}
+ fp = os.path.join(STATIC_DIR, self.file_path)
+ if self.zuul_web.static_cache_expiry:
+ headers['Cache-Control'] = "public, max-age=%d" % \
+ self.zuul_web.static_cache_expiry
+ return web.FileResponse(fp, headers=headers)
diff --git a/zuul/webapp.py b/zuul/webapp.py
deleted file mode 100644
index b5fdc0e..0000000
--- a/zuul/webapp.py
+++ /dev/null
@@ -1,200 +0,0 @@
-# Copyright 2012 Hewlett-Packard Development Company, L.P.
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import json
-import logging
-import re
-import threading
-import time
-from paste import httpserver
-import webob
-from webob import dec
-
-from zuul.lib import encryption
-
-"""Zuul main web app.
-
-Zuul supports HTTP requests directly against it for determining the
-change status. These responses are provided as json data structures.
-
-The supported urls are:
-
- - /status: return a complex data structure that represents the entire
- queue / pipeline structure of the system
- - /status.json (backwards compatibility): same as /status
- - /status/change/X,Y: return status just for gerrit change X,Y
- - /keys/SOURCE/PROJECT.pub: return the public key for PROJECT
-
-When returning status for a single gerrit change you will get an
-array of changes, they will not include the queue structure.
-"""
-
-
-class WebApp(threading.Thread):
- log = logging.getLogger("zuul.WebApp")
- change_path_regexp = '/status/change/(.*)$'
-
- def __init__(self, scheduler, port=8001, cache_expiry=1,
- listen_address='0.0.0.0'):
- threading.Thread.__init__(self)
- self.scheduler = scheduler
- self.listen_address = listen_address
- self.port = port
- self.cache_expiry = cache_expiry
- self.cache_time = 0
- self.cache = {}
- self.daemon = True
- self.routes = {}
- self._init_default_routes()
- self.server = httpserver.serve(
- dec.wsgify(self.app), host=self.listen_address, port=self.port,
- start_loop=False)
-
- def _init_default_routes(self):
- self.register_path('/(status\.json|status)$', self.status)
- self.register_path(self.change_path_regexp, self.change)
-
- def run(self):
- self.server.serve_forever()
-
- def stop(self):
- self.server.server_close()
-
- def _changes_by_func(self, func, tenant_name):
- """Filter changes by a user provided function.
-
- In order to support arbitrary collection of subsets of changes
- we provide a low level filtering mechanism that takes a
- function which applies to changes. The output of this function
- is a flattened list of those collected changes.
- """
- status = []
- jsonstruct = json.loads(self.cache[tenant_name])
- for pipeline in jsonstruct['pipelines']:
- for change_queue in pipeline['change_queues']:
- for head in change_queue['heads']:
- for change in head:
- if func(change):
- status.append(copy.deepcopy(change))
- return json.dumps(status)
-
- def _status_for_change(self, rev, tenant_name):
- """Return the statuses for a particular change id X,Y."""
- def func(change):
- return change['id'] == rev
- return self._changes_by_func(func, tenant_name)
-
- def register_path(self, path, handler):
- path_re = re.compile(path)
- self.routes[path] = (path_re, handler)
-
- def unregister_path(self, path):
- if self.routes.get(path):
- del self.routes[path]
-
- def _handle_keys(self, request, path):
- m = re.match('/keys/(.*?)/(.*?).pub', path)
- if not m:
- raise webob.exc.HTTPBadRequest()
- source_name = m.group(1)
- project_name = m.group(2)
- source = self.scheduler.connections.getSource(source_name)
- if not source:
- raise webob.exc.HTTPNotFound(
- detail="Cannot locate a source named %s" % source_name)
- project = source.getProject(project_name)
- if not project or not hasattr(project, 'public_key'):
- raise webob.exc.HTTPNotFound(
- detail="Cannot locate a project named %s" % project_name)
-
- pem_public_key = encryption.serialize_rsa_public_key(
- project.public_key)
-
- response = webob.Response(body=pem_public_key,
- content_type='text/plain')
- return response.conditional_response_app
-
- def app(self, request):
- # Try registered paths without a tenant_name first
- path = request.path
- for path_re, handler in self.routes.values():
- if path_re.match(path):
- return handler(path, '', request)
-
- # Now try with a tenant_name stripped
- x, tenant_name, path = request.path.split('/', 2)
- path = '/' + path
- # Handle keys
- if path.startswith('/keys'):
- try:
- return self._handle_keys(request, path)
- except Exception as e:
- self.log.exception("Issue with _handle_keys")
- raise
- for path_re, handler in self.routes.values():
- if path_re.match(path):
- return handler(path, tenant_name, request)
- else:
- raise webob.exc.HTTPNotFound()
-
- def status(self, path, tenant_name, request):
- def func():
- return webob.Response(body=self.cache[tenant_name],
- content_type='application/json',
- charset='utf8')
- if tenant_name not in self.scheduler.abide.tenants:
- raise webob.exc.HTTPNotFound()
- return self._response_with_status_cache(func, tenant_name)
-
- def change(self, path, tenant_name, request):
- def func():
- m = re.match(self.change_path_regexp, path)
- change_id = m.group(1)
- status = self._status_for_change(change_id, tenant_name)
- if status:
- return webob.Response(body=status,
- content_type='application/json',
- charset='utf8')
- else:
- raise webob.exc.HTTPNotFound()
- return self._response_with_status_cache(func, tenant_name)
-
- def _refresh_status_cache(self, tenant_name):
- if (tenant_name not in self.cache or
- (time.time() - self.cache_time) > self.cache_expiry):
- try:
- self.cache[tenant_name] = self.scheduler.formatStatusJSON(
- tenant_name)
- # Call time.time() again because formatting above may take
- # longer than the cache timeout.
- self.cache_time = time.time()
- except Exception:
- self.log.exception("Exception formatting status:")
- raise
-
- def _response_with_status_cache(self, func, tenant_name):
- self._refresh_status_cache(tenant_name)
-
- response = func()
-
- response.headers['Access-Control-Allow-Origin'] = '*'
-
- response.cache_control.public = True
- response.cache_control.max_age = self.cache_expiry
- response.last_modified = self.cache_time
- response.expires = self.cache_time + self.cache_expiry
-
- return response.conditional_response_app