Merge "Provide error message on malformed job list" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index ff1a523..c081235 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
     name: openstack-infra/zuul
     infra-check:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
             irrelevant-files:
               - zuul/cmd/migrate.py
               - playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
               - playbooks/zuul-migrate/.*
     infra-gate:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
+            irrelevant-files:
+              - zuul/cmd/migrate.py
+              - playbooks/zuul-migrate/.*
         - tox-pep8
         - tox-py35:
             irrelevant-files:
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 5e7e0e1..464cb60 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -265,6 +265,22 @@
 
       Directory in which Zuul should clone git repositories.
 
+   .. attr:: git_http_low_speed_limit
+      :default: 1000
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in bytes, setting to 0 will disable.
+
+   .. attr:: git_http_low_speed_time
+      :default: 30
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in seconds, setting to 0 will disable.
+
    .. attr:: git_user_email
 
       Value to pass to `git config user.email
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..67be273 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -49,7 +49,7 @@
             msg='.git file in submodule should be a file')
 
         work_repo = Repo(parent_path, self.workspace_root,
-                         'none@example.org', 'User Name')
+                         'none@example.org', 'User Name', '0', '0')
         self.assertTrue(
             os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
             msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +60,7 @@
         sub_repo = Repo(
             os.path.join(self.upstream_root, 'org/project2'),
             os.path.join(self.workspace_root, 'subdir'),
-            'none@example.org', 'User Name')
+            'none@example.org', 'User Name', '0', '0')
         self.assertTrue(os.path.exists(
             os.path.join(self.workspace_root, 'subdir', '.git')),
             msg='Cloned over the submodule placeholder')
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
         self.assertEqual(request.state, 'fulfilled')
 
         # Accept the nodes
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request.id)
         nodeset = request.nodeset
 
         for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
 
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 0)
+
+    def test_accept_nodes_resubmitted(self):
+        # Test that a resubmitted request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        # Accept the nodes, passing a different ID
+        self.nodepool.acceptNodes(request, "invalid")
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
+    def test_accept_nodes_lost_request(self):
+        # Test that a lost request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        self.zk.deleteNodeRequest(request)
+
+        # Accept the nodes
+        self.nodepool.acceptNodes(request, request.id)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 68ea74f..4b0bfc5 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -17,6 +17,8 @@
 import json
 import os
 import textwrap
+import gc
+from unittest import skip
 
 import testtools
 
@@ -170,6 +172,39 @@
         self.assertIn('tenant-one-gate', A.messages[1],
                       "A should transit tenant-one gate")
 
+    @skip("This test is useful, but not reliable")
+    def test_full_and_dynamic_reconfig(self):
+        self.executor_server.hold_jobs_in_build = True
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project:
+                name: org/project
+                tenant-one-gate:
+                  jobs:
+                    - project-test1
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        gc.collect()
+        pipelines = [obj for obj in gc.get_objects()
+                     if isinstance(obj, zuul.model.Pipeline)]
+        self.assertEqual(len(pipelines), 4)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
     def test_dynamic_config(self):
         in_repo_conf = textwrap.dedent(
             """
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 82921fb..28ac5a5 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -562,6 +562,10 @@
                                                   'disk_limit_per_job', 250))
         self.merge_email = get_default(self.config, 'merger', 'git_user_email')
         self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+        self.merge_speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        self.merge_speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
         execution_wrapper_name = get_default(self.config, 'executor',
                                              'execution_wrapper', 'bubblewrap')
         load_multiplier = float(get_default(self.config, 'executor',
@@ -618,9 +622,9 @@
             cache_root = self.merge_root
         else:
             cache_root = None
-        return zuul.merger.merger.Merger(root, self.connections,
-                                         self.merge_email, self.merge_name,
-                                         cache_root, logger)
+        return zuul.merger.merger.Merger(
+            root, self.connections, self.merge_email, self.merge_name,
+            self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
 
     def start(self):
         self._running = True
@@ -1693,6 +1697,13 @@
         elif ret == -9:
             # Received abort request.
             return (self.RESULT_ABORTED, None)
+        elif ret == 1:
+            if syntax_buffer[0].startswith('ERROR!'):
+                with open(self.jobdir.job_output_file, 'a') as job_output:
+                    for line in syntax_buffer:
+                        job_output.write("{now} | {line}\n".format(
+                            now=datetime.datetime.now(),
+                            line=line.decode('utf-8').rstrip()))
         elif ret == 4:
             # Ansible could not parse the yaml.
             self.log.debug("Ansible parse error")
@@ -1759,15 +1770,15 @@
         if result is not None:
             result = self.RESULT_MAP[result]
             msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
-            msg.format(phase=phase, step=step, result=result,
-                       trusted=trusted, playbook=playbook, branch=branch)
+            msg = msg.format(phase=phase, step=step, result=result,
+                             trusted=trusted, playbook=playbook, branch=branch)
         else:
             msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
-            msg.format(phase=phase, step=step, trusted=trusted,
-                       playbook=playbook, branch=branch)
+            msg = msg.format(phase=phase, step=step, trusted=trusted,
+                             playbook=playbook, branch=branch)
 
         with open(self.jobdir.job_output_file, 'a') as job_output:
-            job_output.write("{now} | {msg}".format(
+            job_output.write("{now} | {msg}\n".format(
                 now=datetime.datetime.now(),
                 msg=msg))
 
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0d53fc8..740e29f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -430,27 +430,39 @@
         import zuul.configloader
         loader = zuul.configloader.ConfigLoader()
 
+        self.log.debug("Loading dynamic layout")
+        (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
         build_set = item.current_build_set
         try:
             # First parse the config as it will land with the
             # full set of config and project repos.  This lets us
             # catch syntax errors in config repos even though we won't
             # actually run with that config.
-            self.log.debug("Loading dynamic layout (phase 1)")
-            loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=True,
-                scheduler=self.sched,
-                connections=self.sched.connections)
+            if trusted_updates:
+                self.log.debug("Loading dynamic layout (phase 1)")
+                loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=True,
+                    scheduler=self.sched,
+                    connections=self.sched.connections)
 
             # Then create the config a second time but without changes
             # to config repos so that we actually use this config.
-            self.log.debug("Loading dynamic layout (phase 2)")
-            layout = loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=False)
+            if untrusted_updates:
+                self.log.debug("Loading dynamic layout (phase 2)")
+                layout = loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=False)
+            else:
+                # We're a change to a config repo (with no untrusted
+                # items ahead), so just use the most recently
+                # generated layout.
+                if item.item_ahead:
+                    return item.item_ahead.layout
+                else:
+                    return item.queue.pipeline.layout
             self.log.debug("Loading dynamic layout complete")
         except zuul.configloader.ConfigurationSyntaxError as e:
             self.log.info("Configuration syntax error "
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 7b732c7..fbacbee 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -44,16 +44,15 @@
 
 
 class Repo(object):
-    def __init__(self, remote, local, email, username, sshkey=None,
-                 cache_path=None, logger=None):
+    def __init__(self, remote, local, email, username, speed_limit, speed_time,
+                 sshkey=None, cache_path=None, logger=None):
         if logger is None:
             self.log = logging.getLogger("zuul.Repo")
         else:
             self.log = logger
-        # TODO(pabelanger): Expose to user via zuul.conf.
         self.env = {
-            'GIT_HTTP_LOW_SPEED_LIMIT': '1000',
-            'GIT_HTTP_LOW_SPEED_TIME': '30',
+            'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+            'GIT_HTTP_LOW_SPEED_TIME': speed_time,
         }
         if sshkey:
             self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
@@ -297,7 +296,7 @@
 
 class Merger(object):
     def __init__(self, working_root, connections, email, username,
-                 cache_root=None, logger=None):
+                 speed_limit, speed_time, cache_root=None, logger=None):
         self.logger = logger
         if logger is None:
             self.log = logging.getLogger("zuul.Merger")
@@ -310,6 +309,8 @@
         self.connections = connections
         self.email = email
         self.username = username
+        self.speed_limit = speed_limit
+        self.speed_time = speed_time
         self.cache_root = cache_root
 
     def _addProject(self, hostname, project_name, url, sshkey):
@@ -322,8 +323,9 @@
                                           project_name)
             else:
                 cache_path = None
-            repo = Repo(url, path, self.email, self.username,
-                        sshkey, cache_path, self.logger)
+            repo = Repo(
+                url, path, self.email, self.username, self.speed_limit,
+                self.speed_time, sshkey, cache_path, self.logger)
 
             self.repos[key] = repo
         except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
                                  '/var/lib/zuul/merger-git')
         merge_email = get_default(self.config, 'merger', 'git_user_email')
         merge_name = get_default(self.config, 'merger', 'git_user_name')
-
-        self.merger = merger.Merger(merge_root, connections, merge_email,
-                                    merge_name)
+        speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
+        self.merger = merger.Merger(
+            merge_root, connections, merge_email, merge_name, speed_limit,
+            speed_time)
 
     def start(self):
         self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index c95a169..6eebbfb 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1157,7 +1157,6 @@
         self.start_time = None
         self.end_time = None
         self.estimated_time = None
-        self.pipeline = None
         self.canceled = False
         self.retry = False
         self.parameters = {}
@@ -1169,6 +1168,10 @@
         return ('<Build %s of %s on %s>' %
                 (self.uuid, self.job.name, self.worker))
 
+    @property
+    def pipeline(self):
+        return self.build_set.item.pipeline
+
     def getSafeAttributes(self):
         return Attributes(uuid=self.uuid,
                           result=self.result,
@@ -1432,7 +1435,6 @@
 
     def addBuild(self, build):
         self.current_build_set.addBuild(build)
-        build.pipeline = self.pipeline
 
     def removeBuild(self, build):
         self.current_build_set.removeBuild(build)
@@ -1518,6 +1520,25 @@
     def wasDequeuedNeedingChange(self):
         return self.dequeued_needing_change
 
+    def includesConfigUpdates(self):
+        includes_trusted = False
+        includes_untrusted = False
+        tenant = self.pipeline.layout.tenant
+        item = self
+        while item:
+            if item.change.updatesConfig():
+                (trusted, project) = tenant.getProject(
+                    item.change.project.canonical_name)
+                if trusted:
+                    includes_trusted = True
+                else:
+                    includes_untrusted = True
+            if includes_trusted and includes_untrusted:
+                # We're done early
+                return (includes_trusted, includes_untrusted)
+            item = item.item_ahead
+        return (includes_trusted, includes_untrusted)
+
     def isHoldingFollowingChanges(self):
         if not self.live:
             return False
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
             except Exception:
                 self.log.exception("Error unlocking node:")
 
-    def lockNodeSet(self, nodeset):
-        self._lockNodes(nodeset.getNodes())
+    def lockNodeSet(self, nodeset, request_id):
+        self._lockNodes(nodeset.getNodes(), request_id)
 
-    def _lockNodes(self, nodes):
+    def _lockNodes(self, nodes, request_id):
         # Try to lock all of the supplied nodes.  If any lock fails,
         # try to unlock any which have already been locked before
         # re-raising the error.
         locked_nodes = []
         try:
             for node in nodes:
+                if node.allocated_to != request_id:
+                    raise Exception("Node %s allocated to %s, not %s" %
+                                    (node.id, node.allocated_to, request_id))
                 self.log.debug("Locking node %s" % (node,))
                 self.sched.zk.lockNode(node, timeout=30)
                 locked_nodes.append(node)
@@ -141,7 +144,12 @@
             del self.requests[request.uid]
             return False
 
-        if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+        # TODOv3(jeblair): handle allocation failure
+        if deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            request.id = None
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
             self.log.info("Node request %s %s" % (request, request.state))
 
             # Give our results to the scheduler.
@@ -150,18 +158,29 @@
 
             # Stop watching this request node.
             return False
-        # TODOv3(jeblair): handle allocation failure
-        elif deleted:
-            self.log.debug("Resubmitting lost node request %s" % (request,))
-            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
         return True
 
-    def acceptNodes(self, request):
+    def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
         self.log.info("Accepting node request %s" % (request,))
 
+        if request_id != request.id:
+            self.log.info("Skipping node accept for %s (resubmitted as %s)",
+                          request_id, request.id)
+            return
+
+        # Make sure the request still exists. It's possible it could have
+        # disappeared if we lost the ZK session between when the fulfillment
+        # response was added to our queue, and when we actually get around to
+        # processing it. Nodepool will automatically reallocate the assigned
+        # nodes in that situation.
+        if not self.sched.zk.nodeRequestExists(request):
+            self.log.info("Request %s no longer exists", request.id)
+            return
+
         if request.canceled:
             self.log.info("Ignoring canceled node request %s" % (request,))
             # The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
         if request.fulfilled:
             # If the request suceeded, try to lock the nodes.
             try:
-                self.lockNodeSet(request.nodeset)
+                self.lockNodeSet(request.nodeset, request.id)
                 locked = True
             except Exception:
                 self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..ab147ba 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
 
     def __init__(self, request):
         self.request = request
+        self.request_id = request.id
 
 
 def toList(item):
@@ -889,9 +890,10 @@
 
     def _doNodesProvisionedEvent(self, event):
         request = event.request
+        request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request_id)
         if request.canceled:
             return
 
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..2fca749 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -160,7 +160,6 @@
         def callback(data, stat):
             if data:
                 data = self._strToDict(data)
-                node_request.updateFromDict(data)
                 request_nodes = list(node_request.nodeset.getNodes())
                 for i, nodeid in enumerate(data.get('nodes', [])):
                     node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
                     node_data = self._strToDict(node_data)
                     request_nodes[i].id = nodeid
                     request_nodes[i].updateFromDict(node_data)
+                node_request.updateFromDict(data)
             deleted = (data is None)  # data *are* none
             return watcher(node_request, deleted)
 
@@ -187,6 +187,19 @@
         except kze.NoNodeError:
             pass
 
+    def nodeRequestExists(self, node_request):
+        '''
+        See if a NodeRequest exists in ZooKeeper.
+
+        :param NodeRequest node_request: A NodeRequest to verify.
+
+        :returns: True if the request exists, False otherwise.
+        '''
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        if self.client.exists(path):
+            return True
+        return False
+
     def storeNode(self, node):
         '''Store the node.