Merge "Be explicit about legacy var additions/omissions" into feature/zuulv3
diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst
index 6962b8f..cf607b9 100644
--- a/doc/source/user/jobs.rst
+++ b/doc/source/user/jobs.rst
@@ -249,6 +249,13 @@
A boolean indicating whether this project appears in the
:attr:`job.required-projects` list for this job.
+ .. var:: _projects
+ :type: dict
+
+ The same as ``projects`` but a dictionary indexed by the
+ ``name`` value of each entry. ``projects`` will be converted to
+ this.
+
.. var:: tenant
The name of the current Zuul tenant.
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index 3735004..50dbed5 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -53,6 +53,7 @@
'msg_id': '#zuul_msg',
'pipelines_id': '#zuul_pipelines',
'queue_events_num': '#zuul_queue_events_num',
+ 'queue_management_events_num': '#zuul_queue_management_events_num',
'queue_results_num': '#zuul_queue_results_num',
}, options);
@@ -713,6 +714,10 @@
data.trigger_event_queue ?
data.trigger_event_queue.length : '0'
);
+ $(options.queue_management_events_num).text(
+ data.management_event_queue ?
+ data.management_event_queue.length : '0'
+ );
$(options.queue_results_num).text(
data.result_event_queue ?
data.result_event_queue.length : '0'
diff --git a/etc/status/public_html/zuul.app.js b/etc/status/public_html/zuul.app.js
index ae950e8..7ceb2dd 100644
--- a/etc/status/public_html/zuul.app.js
+++ b/etc/status/public_html/zuul.app.js
@@ -33,7 +33,7 @@
+ '<div class="zuul-container" id="zuul-container">'
+ '<div style="display: none;" class="alert" id="zuul_msg"></div>'
+ '<button class="btn pull-right zuul-spinner">updating <span class="glyphicon glyphicon-refresh"></span></button>'
- + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_results_num">0</span> results.</p>'
+ + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_management_events_num">0</span> management events, <span id="zuul_queue_results_num">0</span> results.</p>'
+ '<div id="zuul_controls"></div>'
+ '<div id="zuul_pipelines" class="row"></div>'
+ '<p>Zuul version: <span id="zuul-version-span"></span></p>'
diff --git a/tests/base.py b/tests/base.py
index 797bfe6..a02ee5a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2384,7 +2384,7 @@
# before noticing they should exit, but they should exit on their own.
# Further the pydevd threads also need to be whitelisted so debugging
# e.g. in PyCharm is possible without breaking shutdown.
- whitelist = ['executor-watchdog',
+ whitelist = ['watchdog',
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
new file mode 100644
index 0000000..40b3ca7
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
@@ -0,0 +1,13 @@
+- hosts: all
+ tasks:
+ - debug: var=waitpath
+ - file:
+ path: "{{zuul._test.test_root}}/{{zuul.build}}.post_start.flag"
+ state: touch
+ # Do not finish until test creates the flag file
+ - wait_for:
+ state: present
+ path: "{{waitpath}}"
+ - file:
+ path: "{{zuul._test.test_root}}/{{zuul.build}}.post_end.flag"
+ state: touch
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
new file mode 100644
index 0000000..92a5515
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -0,0 +1,24 @@
+- pipeline:
+ name: check
+ manager: independent
+ post-review: true
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+
+- job:
+ name: python27
+ pre-run: playbooks/pre
+ post-run: playbooks/post
+ vars:
+ waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
diff --git a/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
new file mode 100644
index 0000000..89a5674
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
@@ -0,0 +1,5 @@
+- project:
+ name: org/project
+ check:
+ jobs:
+ - python27
diff --git a/tests/fixtures/config/post-playbook/git/org_project/README b/tests/fixtures/config/post-playbook/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/post-playbook/main.yaml b/tests/fixtures/config/post-playbook/main.yaml
new file mode 100644
index 0000000..6033879
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/main.yaml
@@ -0,0 +1,9 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project
+
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9090421..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,8 +875,7 @@
# already (without approvals), we need to clear the cache
# first.
for connection in self.connections.connections.values():
- if hasattr(connection, '_change_cache'):
- connection._change_cache.clear()
+ connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
A.addApproval('Approved', 1)
@@ -946,8 +945,7 @@
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
- self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
- 0)
+ self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@@ -2508,6 +2506,28 @@
self.assertIn('project-merge', status_jobs[1]['dependencies'])
self.assertIn('project-merge', status_jobs[2]['dependencies'])
+ def test_reconfigure_merge(self):
+ """Test that two reconfigure events are merged"""
+
+ tenant = self.sched.abide.tenants['tenant-one']
+ (trusted, project) = tenant.getProject('org/project')
+
+ self.sched.run_handler_lock.acquire()
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+ self.sched.reconfigureTenant(tenant, project)
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.reconfigureTenant(tenant, project)
+ # The second event should have been combined with the first
+ # so we should still only have one entry.
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.run_handler_lock.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
self.executor_server.hold_jobs_in_build = True
@@ -3933,8 +3953,7 @@
self.assertEqual(B.data['status'], 'NEW')
for connection in self.connections.connections.values():
- if hasattr(connection, '_change_cache'):
- connection._change_cache.clear()
+ connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)
diff --git a/tests/unit/test_stack_dump.py b/tests/unit/test_stack_dump.py
index 824e04c..13e83c6 100644
--- a/tests/unit/test_stack_dump.py
+++ b/tests/unit/test_stack_dump.py
@@ -31,4 +31,5 @@
zuul.cmd.stack_dump_handler(signal.SIGUSR2, None)
self.assertIn("Thread", self.log_fixture.output)
+ self.assertIn("MainThread", self.log_fixture.output)
self.assertIn("test_stack_dump_logs", self.log_fixture.output)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index bfb9088..b30d710 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -20,6 +20,7 @@
import os
import textwrap
import gc
+import time
from unittest import skip
import testtools
@@ -1460,6 +1461,40 @@
"The file %s should exist" % post_flag_path)
+class TestPostPlaybooks(AnsibleZuulTestCase):
+ tenant_config_file = 'config/post-playbook/main.yaml'
+
+ def test_post_playbook_abort(self):
+ # Test that when we abort a job in the post playbook, that we
+ # don't send back POST_FAILURE.
+ self.executor_server.verbose = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ while not len(self.builds):
+ time.sleep(0.1)
+ build = self.builds[0]
+
+ post_start = os.path.join(self.test_root, build.uuid +
+ '.post_start.flag')
+ start = time.time()
+ while time.time() < start + 90:
+ if os.path.exists(post_start):
+ break
+ time.sleep(0.1)
+ # The post playbook has started, abort the job
+ self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+ self.waitUntilSettled()
+
+ build = self.getJobFromHistory('python27')
+ self.assertEqual('ABORTED', build.result)
+
+ post_end = os.path.join(self.test_root, build.uuid +
+ '.post_end.flag')
+ self.assertTrue(os.path.exists(post_start))
+ self.assertFalse(os.path.exists(post_end))
+
+
class TestBrokenConfig(ZuulTestCase):
# Test that we get an appropriate syntax error if we start with a
# broken config.
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 1870890..86f7f12 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,6 +23,7 @@
import signal
import sys
import traceback
+import threading
yappi = extras.try_import('yappi')
objgraph = extras.try_import('objgraph')
@@ -41,9 +42,17 @@
log = logging.getLogger("zuul.stack_dump")
log.debug("Beginning debug handler")
try:
+ threads = {}
+ for t in threading.enumerate():
+ threads[t.ident] = t
log_str = ""
for thread_id, stack_frame in sys._current_frames().items():
- log_str += "Thread: %s\n" % thread_id
+ thread = threads.get(thread_id)
+ if thread:
+ thread_name = thread.name
+ else:
+ thread_name = thread.ident
+ log_str += "Thread: %s %s\n" % (thread_id, thread_name)
log_str += "".join(traceback.format_stack(stack_frame))
log.debug(log_str)
except Exception:
diff --git a/zuul/configloader.py b/zuul/configloader.py
index cee744d..426842b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1374,7 +1374,12 @@
# branch. Remember the branch and then implicitly add a
# branch selector to each job there. This makes the
# in-repo configuration apply only to that branch.
- for branch in project.source.getProjectBranches(project, tenant):
+ branches = sorted(project.source.getProjectBranches(
+ project, tenant))
+ if 'master' in branches:
+ branches.remove('master')
+ branches = ['master'] + branches
+ for branch in branches:
new_project_unparsed_branch_config[project][branch] = \
model.UnparsedTenantConfig()
job = merger.getFiles(
@@ -1567,6 +1572,7 @@
# Don't call this method from dynamic reconfiguration because
# it interacts with drivers and connections.
layout = model.Layout(tenant)
+ TenantParser.log.debug("Created layout id %s", layout.uuid)
TenantParser._parseLayoutItems(layout, tenant, data,
scheduler, connections)
@@ -1695,6 +1701,7 @@
self._loadDynamicProjectData(config, project, files, False, tenant)
layout = model.Layout(tenant)
+ self.log.debug("Created layout id %s", layout.uuid)
if not include_config_projects:
# NOTE: the actual pipeline objects (complete with queues
# and enqueued items) are copied by reference here. This
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index ca10f21..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,6 +68,13 @@
def registerScheduler(self, sched):
self.sched = sched
+ def maintainCache(self, relevant):
+ """Make cache contain relevant changes.
+
+ This lets the user supply a list of change objects that are
+ still in use. Anything in our cache that isn't in the supplied
+ list should be safe to remove from the cache."""
+
def registerWebapp(self, webapp):
self.webapp = webapp
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 343c305..83871e3 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,10 +23,9 @@
import pprint
import shlex
import queue
-import weakref
+import voluptuous as v
from typing import Dict, List
-import voluptuous as v
from zuul.connection import BaseConnection
from zuul.model import Ref, Tag, Branch, Project
@@ -143,8 +142,7 @@
# cache as it may be a dependency
if event.change_number:
refresh = True
- if ((event.change_number, event.patch_number) not in
- self.connection._change_cache):
+ if event.change_number not in self.connection._change_cache:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -302,7 +300,7 @@
self.baseurl = self.connection_config.get('baseurl',
'https://%s' % self.server)
- self._change_cache = weakref.WeakValueDictionary()
+ self._change_cache = {}
self.projects = {}
self.gerrit_event_connector = None
self.source = driver.getSource(self)
@@ -313,6 +311,22 @@
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
+ def maintainCache(self, relevant):
+ # This lets the user supply a list of change objects that are
+ # still in use. Anything in our cache that isn't in the supplied
+ # list should be safe to remove from the cache.
+ remove = {}
+ for change_number, patchsets in self._change_cache.items():
+ for patchset, change in patchsets.items():
+ if change not in relevant:
+ remove.setdefault(change_number, [])
+ remove[change_number].append(patchset)
+ for change_number, patchsets in remove.items():
+ for patchset in patchsets:
+ del self._change_cache[change_number][patchset]
+ if not self._change_cache[change_number]:
+ del self._change_cache[change_number]
+
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
@@ -357,19 +371,22 @@
return change
def _getChange(self, number, patchset, refresh=False, history=None):
- change = self._change_cache.get((number, patchset))
+ change = self._change_cache.get(number, {}).get(patchset)
if change and not refresh:
return change
if not change:
change = GerritChange(None)
change.number = number
change.patchset = patchset
- self._change_cache[(change.number, change.patchset)] = change
+ self._change_cache.setdefault(change.number, {})
+ self._change_cache[change.number][change.patchset] = change
try:
self._updateChange(change, history)
except Exception:
- if self._change_cache.get((change.number, change.patchset)):
- del self._change_cache[(change.number, change.patchset)]
+ if self._change_cache.get(change.number, {}).get(change.patchset):
+ del self._change_cache[change.number][change.patchset]
+ if not self._change_cache[change.number]:
+ del self._change_cache[change.number]
raise
return change
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 46c8ee5..3d0eb37 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,7 +21,6 @@
import threading
import time
import re
-import weakref
import cachecontrol
from cachecontrol.cache import DictCache
@@ -395,7 +394,7 @@
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
- self._change_cache = weakref.WeakValueDictionary()
+ self._change_cache = {}
self._project_branch_cache = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
@@ -568,6 +567,11 @@
# authenticated, if not then anonymous is the best we have.
return self._github
+ def maintainCache(self, relevant):
+ for key, change in self._change_cache.items():
+ if change not in relevant:
+ del self._change_cache[key]
+
def getChange(self, event, refresh=False):
"""Get the change representing an event."""
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 0f8d7d7..ae22c8e 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -187,6 +187,7 @@
and item.change.newrev != '0' * 40):
zuul_params['newrev'] = item.change.newrev
zuul_params['projects'] = [] # Set below
+ zuul_params['_projects'] = {} # transitional to convert to dict
zuul_params['items'] = []
for i in all_items:
d = dict()
@@ -270,14 +271,24 @@
projects.add(project)
for p in projects:
- zuul_params['projects'].append(dict(
+ zuul_params['_projects'][p.canonical_name] = (dict(
name=p.name,
short_name=p.name.split('/')[-1],
- canonical_hostname=p.canonical_hostname,
+ # Duplicate this into the dict too, so that iterating
+ # project.values() is easier for callers
canonical_name=p.canonical_name,
+ canonical_hostname=p.canonical_hostname,
src_dir=os.path.join('src', p.canonical_name),
required=(p in required_projects),
))
+ # We are transitioning "projects" from a list to a dict
+ # indexed by canonical name, as it is much easier to access
+ # values in ansible. Existing callers are converted to
+ # "_projects", then once "projects" is unused we switch it,
+ # then convert callers back. Finally when "_projects" is
+ # unused it will be removed.
+ for cn, p in zuul_params['_projects'].items():
+ zuul_params['projects'].append(p)
build = Build(job, uuid)
build.parameters = params
@@ -379,6 +390,11 @@
result_data = data.get('data', {})
self.log.info("Build %s complete, result %s" %
(job, result))
+ # If the build should be retried, don't supply the result
+ # so that elsewhere we don't have to deal with keeping
+ # track of which results are non-final.
+ if build.retry:
+ result = None
self.sched.onBuildCompleted(build, result, result_data)
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 5998922..e3f8a24 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -95,7 +95,7 @@
if cache_dir == jobs_base:
raise Exception("Cache dir and jobs dir cannot be the same")
self.thread = threading.Thread(target=self._run,
- name='executor-diskaccountant')
+ name='diskaccountant')
self.thread.daemon = True
self._running = False
self.jobs_base = jobs_base
@@ -154,7 +154,7 @@
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run,
- name='executor-watchdog')
+ name='watchdog')
self.thread.daemon = True
self.timed_out = None
@@ -556,7 +556,8 @@
def run(self):
self.running = True
- self.thread = threading.Thread(target=self.execute)
+ self.thread = threading.Thread(target=self.execute,
+ name='build-%s' % self.job.unique)
self.thread.start()
def stop(self, reason=None):
@@ -834,6 +835,8 @@
# TODOv3(pabelanger): Implement post-run timeout setting.
post_status, post_code = self.runAnsiblePlaybook(
playbook, args['timeout'], success, phase='post', index=index)
+ if post_status == self.RESULT_ABORTED:
+ return 'ABORTED'
if post_status != self.RESULT_NORMAL or post_code != 0:
success = False
# If we encountered a pre-failure, that takes
@@ -1645,22 +1648,27 @@
self.log.debug("Starting command processor")
self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand)
+ self.command_thread = threading.Thread(target=self.runCommand,
+ name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting worker")
- self.update_thread = threading.Thread(target=self._updateLoop)
+ self.update_thread = threading.Thread(target=self._updateLoop,
+ name='update')
self.update_thread.daemon = True
self.update_thread.start()
- self.merger_thread = threading.Thread(target=self.run_merger)
+ self.merger_thread = threading.Thread(target=self.run_merger,
+ name='merger')
self.merger_thread.daemon = True
self.merger_thread.start()
- self.executor_thread = threading.Thread(target=self.run_executor)
+ self.executor_thread = threading.Thread(target=self.run_executor,
+ name='executor')
self.executor_thread.daemon = True
self.executor_thread.start()
self.governor_stop_event = threading.Event()
- self.governor_thread = threading.Thread(target=self.run_governor)
+ self.governor_thread = threading.Thread(target=self.run_governor,
+ name='governor')
self.governor_thread.daemon = True
self.governor_thread.start()
self.disk_accountant.start()
@@ -1869,7 +1877,10 @@
def run_governor(self):
while not self.governor_stop_event.wait(30):
- self.manageLoad()
+ try:
+ self.manageLoad()
+ except Exception:
+ self.log.exception("Exception in governor thread:")
def manageLoad(self):
''' Apply some heuristics to decide whether or not we should
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+ def __init__(self):
+ self.queue = collections.deque()
+ self.lock = threading.RLock()
+ self.condition = threading.Condition(self.lock)
+ self.join_condition = threading.Condition(self.lock)
+ self.tasks = 0
+
+ def qsize(self):
+ return len(self.queue)
+
+ def empty(self):
+ return self.qsize() == 0
+
+ def put(self, item):
+ # Returns the original item if added, or an updated equivalent
+ # item if already enqueued.
+ self.condition.acquire()
+ ret = None
+ try:
+ for x in self.queue:
+ if item == x:
+ ret = x
+ if hasattr(ret, 'merge'):
+ ret.merge(item)
+ if ret is None:
+ ret = item
+ self.queue.append(item)
+ self.condition.notify()
+ finally:
+ self.condition.release()
+ return ret
+
+ def get(self):
+ self.condition.acquire()
+ try:
+ while True:
+ try:
+ ret = self.queue.popleft()
+ self.join_condition.acquire()
+ self.tasks += 1
+ self.join_condition.release()
+ return ret
+ except IndexError:
+ self.condition.wait()
+ finally:
+ self.condition.release()
+
+ def task_done(self):
+ self.join_condition.acquire()
+ self.tasks -= 1
+ self.join_condition.notify()
+ self.join_condition.release()
+
+ def join(self):
+ self.join_condition.acquire()
+ while self.tasks:
+ self.join_condition.wait()
+ self.join_condition.release()
diff --git a/zuul/model.py b/zuul/model.py
index b7c3031..7d6e80c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -888,6 +888,8 @@
return Attributes(name=self.name)
def setRun(self):
+ msg = 'self %s' % (repr(self),)
+ self.inheritance_path = self.inheritance_path + (msg,)
if not self.run:
self.run = self.implied_run
@@ -1261,8 +1263,6 @@
self.item = item
self.builds = {}
self.result = None
- self.next_build_set = None
- self.previous_build_set = None
self.uuid = None
self.commit = None
self.dependent_items = None
@@ -1400,10 +1400,8 @@
self.pipeline = queue.pipeline
self.queue = queue
self.change = change # a ref
- self.build_sets = []
self.dequeued_needing_change = False
self.current_build_set = BuildSet(self)
- self.build_sets.append(self.current_build_set)
self.item_ahead = None
self.items_behind = []
self.enqueue_time = None
@@ -1425,12 +1423,7 @@
id(self), self.change, pipeline)
def resetAllBuilds(self):
- old = self.current_build_set
- self.current_build_set.result = 'CANCELED'
self.current_build_set = BuildSet(self)
- old.next_build_set = self.current_build_set
- self.current_build_set.previous_build_set = old
- self.build_sets.append(self.current_build_set)
self.layout = None
self.job_graph = None
@@ -2327,6 +2320,7 @@
"""Holds all of the Pipelines."""
def __init__(self, tenant):
+ self.uuid = uuid4().hex
self.tenant = tenant
self.project_configs = {}
self.project_templates = {}
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 026763b..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
from zuul import version as zuul_version
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
class ManagementEvent(object):
@@ -76,8 +77,23 @@
"""
def __init__(self, tenant, project):
super(TenantReconfigureEvent, self).__init__()
- self.tenant = tenant
- self.project = project
+ self.tenant_name = tenant.name
+ self.projects = set([project])
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ if not isinstance(other, TenantReconfigureEvent):
+ return False
+ # We don't check projects because they will get combined when
+ # merged.
+ return (self.tenant_name == other.tenant_name)
+
+ def merge(self, other):
+ if self.tenant_name != other.tenant_name:
+ raise Exception("Can not merge events from different tenants")
+ self.projects |= other.projects
class PromoteEvent(ManagementEvent):
@@ -223,7 +239,7 @@
self.trigger_event_queue = queue.Queue()
self.result_event_queue = queue.Queue()
- self.management_event_queue = queue.Queue()
+ self.management_event_queue = zuul.lib.queue.MergedQueue()
self.abide = model.Abide()
if not testonly:
@@ -475,15 +491,16 @@
self.log.debug("Tenant reconfiguration beginning")
# If a change landed to a project, clear out the cached
# config before reconfiguring.
- if event.project:
- event.project.unparsed_config = None
+ for project in event.projects:
+ project.unparsed_config = None
+ old_tenant = self.abide.tenants[event.tenant_name]
loader = configloader.ConfigLoader()
abide = loader.reloadTenant(
self.config.get('scheduler', 'tenant_config'),
self._get_project_key_dir(),
self, self.merger, self.connections,
- self.abide, event.tenant)
- tenant = abide.tenants[event.tenant.name]
+ self.abide, old_tenant)
+ tenant = abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant)
self.abide = abide
finally:
@@ -595,6 +612,8 @@
self._reenqueueTenant(old_tenant, tenant)
+ # TODOv3(jeblair): update for tenants
+ # self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@@ -726,6 +745,23 @@
finally:
self.run_handler_lock.release()
+ def maintainConnectionCache(self):
+ # TODOv3(jeblair): update for tenants
+ relevant = set()
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.log.debug("Gather relevant cache items for: %s" %
+ pipeline)
+
+ for item in pipeline.getAllItems():
+ relevant.add(item.change)
+ relevant.update(item.change.getRelatedChanges())
+ for connection in self.connections.values():
+ connection.maintainCache(relevant)
+ self.log.debug(
+ "End maintain connection cache for: %s" % connection)
+ self.log.debug("Connection cache size: %s" % len(relevant))
+
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()
@@ -920,6 +956,9 @@
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
+ data['management_event_queue'] = {}
+ data['management_event_queue']['length'] = \
+ self.management_event_queue.qsize()
if self.last_reconfigured:
data['last_reconfigured'] = self.last_reconfigured * 1000