Merge "Merge project pipeline definitions" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index 27f2ca1..b6612ae 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -16,7 +16,7 @@
 - job:
     name: zuul-stream-functional
     parent: multinode
-    nodes: zuul-functional
+    nodeset: zuul-functional
     pre-run: playbooks/zuul-stream/pre
     run: playbooks/zuul-stream/functional
     post-run:
@@ -43,7 +43,8 @@
     name: openstack-infra/zuul
     check:
       jobs:
-        - tox-docs
+        - build-openstack-infra-sphinx-docs:
+            success-url: 'html/feature/zuulv3/'
         - tox-cover:
             voting: false
         - tox-pep8
@@ -55,7 +56,8 @@
               - playbooks/zuul-migrate.yaml
     gate:
       jobs:
-        - tox-docs
+        - build-openstack-infra-sphinx-docs:
+            success-url: 'html/feature/zuulv3/'
         - tox-pep8
         - tox-py35
         - zuul-stream-functional
diff --git a/tests/base.py b/tests/base.py
index c159865..df9fbc1 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2419,7 +2419,7 @@
 
     def create_branch(self, project, branch):
         path = os.path.join(self.upstream_root, project)
-        repo = git.Repo.init(path)
+        repo = git.Repo(path)
         fn = os.path.join(path, 'README')
 
         branch_head = repo.create_head(branch)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 9a10e9d..94f169a 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -748,6 +748,48 @@
         self.assertIn('appears multiple times', A.messages[0],
                       "A should have a syntax error reported")
 
+    def test_secret_not_found_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: test
+                secrets: does-not-exist
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('secret "does-not-exist" was not found', A.messages[0],
+                      "A should have a syntax error reported")
+
+    def test_nodeset_not_found_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: test
+                nodeset: does-not-exist
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('nodeset "does-not-exist" was not found', A.messages[0],
+                      "A should have a syntax error reported")
+
     def test_multi_repo(self):
         downstream_repo_conf = textwrap.dedent(
             """
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 770a719..0a266df 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -294,11 +294,19 @@
         if result._task.loop and 'results' in result_dict:
             # items have their own events
             pass
-        elif (result_dict.get('msg') == 'MODULE FAILURE' and
-              'module_stdout' in result_dict):
-            self._log_message(
-                result, status='MODULE FAILURE',
-                msg=result_dict['module_stdout'])
+        elif (result_dict.get('msg') == 'MODULE FAILURE'):
+            if 'module_stdout' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['module_stdout'])
+            elif 'exception' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['exception'])
+            elif 'module_stderr' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['module_stderr'])
         else:
             self._log_message(
                 result=result, status='ERROR', result_dict=result_dict)
@@ -363,11 +371,19 @@
             # items have their own events
             pass
 
-        elif (result_dict.get('msg') == 'MODULE FAILURE' and
-              'module_stdout' in result_dict):
-            self._log_message(
-                result, status='MODULE FAILURE',
-                msg=result_dict['module_stdout'])
+        elif (result_dict.get('msg') == 'MODULE FAILURE'):
+            if 'module_stdout' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['module_stdout'])
+            elif 'exception' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['exception'])
+            elif 'module_stderr' in result_dict:
+                self._log_message(
+                    result, status='MODULE FAILURE',
+                    msg=result_dict['module_stderr'])
         elif (len([key for key in result_dict.keys()
                    if not key.startswith('_ansible')]) == 1):
             # this is a debug statement, handle it special
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 62439c4..7722a7e 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -106,6 +106,24 @@
         super(ProjectNotFoundError, self).__init__(message)
 
 
+class SecretNotFoundError(Exception):
+    def __init__(self, secret):
+        message = textwrap.dedent("""\
+        The secret "{secret}" was not found.
+        """)
+        message = textwrap.fill(message.format(secret=secret))
+        super(SecretNotFoundError, self).__init__(message)
+
+
+class NodesetNotFoundError(Exception):
+    def __init__(self, nodeset):
+        message = textwrap.dedent("""\
+        The nodeset "{nodeset}" was not found.
+        """)
+        message = textwrap.fill(message.format(nodeset=nodeset))
+        super(NodesetNotFoundError, self).__init__(message)
+
+
 class PipelineNotPermittedError(Exception):
     def __init__(self):
         message = textwrap.dedent("""\
@@ -358,10 +376,6 @@
 
     @staticmethod
     def getSchema():
-        node = {vs.Required('name'): str,
-                vs.Required('label'): str,
-                }
-
         zuul_role = {vs.Required('zuul'): str,
                      'name': str}
 
@@ -391,7 +405,6 @@
                'files': to_list(str),
                'secrets': to_list(vs.Any(secret, str)),
                'irrelevant-files': to_list(str),
-               'nodes': vs.Any([node], str),
                # validation happens in NodeSetParser
                'nodeset': vs.Any(dict, str),
                'timeout': int,
@@ -489,16 +502,18 @@
         # Secrets are part of the playbook context so we must establish
         # them earlier than playbooks.
         secrets = []
-        for secret_config in conf.get('secrets', []):
+        for secret_config in as_list(conf.get('secrets', [])):
             if isinstance(secret_config, str):
                 secret_name = secret_config
-                secret = layout.secrets[secret_name]
+                secret = layout.secrets.get(secret_name)
             else:
                 secret_name = secret_config['name']
-                secret = layout.secrets[secret_config['secret']]
+                secret = layout.secrets.get(secret_config['secret'])
+            if secret is None:
+                raise SecretNotFoundError(secret_name)
             if secret_name == 'zuul':
                 raise Exception("Secrets named 'zuul' are not allowed.")
-            if secret.source_context != job.source_context:
+            if not secret.source_context.isSameProject(job.source_context):
                 raise Exception(
                     "Unable to use secret %s.  Secrets must be "
                     "defined in the same project in which they "
@@ -576,27 +591,15 @@
             conf_nodeset = conf['nodeset']
             if isinstance(conf_nodeset, str):
                 # This references an existing named nodeset in the layout.
-                ns = layout.nodesets[conf_nodeset]
+                ns = layout.nodesets.get(conf_nodeset)
+                if ns is None:
+                    raise NodesetNotFoundError(conf_nodeset)
             else:
                 ns = NodeSetParser.fromYaml(conf_nodeset, anonymous=True)
             if tenant.max_nodes_per_job != -1 and \
                len(ns) > tenant.max_nodes_per_job:
                 raise MaxNodeError(job, tenant)
             job.nodeset = ns
-        elif 'nodes' in conf:
-            conf_nodes = conf['nodes']
-            if isinstance(conf_nodes, str):
-                # This references an existing named nodeset in the layout.
-                ns = layout.nodesets[conf_nodes]
-            else:
-                ns = model.NodeSet()
-                for conf_node in conf_nodes:
-                    node = model.Node(conf_node['name'], conf_node['label'])
-                    ns.addNode(node)
-            if tenant.max_nodes_per_job != -1 and \
-               len(ns) > tenant.max_nodes_per_job:
-                raise MaxNodeError(job, tenant)
-            job.nodeset = ns
 
         if 'required-projects' in conf:
             new_projects = {}
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 98c7350..b94b8a5 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -435,7 +435,7 @@
         loader = zuul.configloader.ConfigLoader()
 
         build_set = item.current_build_set
-        self.log.debug("Load dynamic layout with %s" % build_set.files)
+        self.log.debug("Loading dynamic layout")
         try:
             # First parse the config as it will land with the
             # full set of config and project repos.  This lets us
@@ -456,13 +456,11 @@
                 include_config_projects=False)
         except zuul.configloader.ConfigurationSyntaxError as e:
             self.log.info("Configuration syntax error "
-                          "in dynamic layout %s" %
-                          build_set.files)
+                          "in dynamic layout")
             item.setConfigError(str(e))
             return None
         except Exception:
-            self.log.exception("Error in dynamic layout %s" %
-                               build_set.files)
+            self.log.exception("Error in dynamic layout")
             item.setConfigError("Unknown configuration error")
             return None
         return layout
diff --git a/zuul/model.py b/zuul/model.py
index 0e42368..4c5a51f 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -627,6 +627,13 @@
         return self.__class__(self.project, self.branch, self.path,
                               self.trusted)
 
+    def isSameProject(self, other):
+        if not isinstance(other, SourceContext):
+            return False
+        return (self.project == other.project and
+                self.branch == other.branch and
+                self.trusted == other.trusted)
+
     def __ne__(self, other):
         return not self.__eq__(other)
 
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
old mode 100644
new mode 100755
index faf22b5..308c0c9
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -32,14 +32,11 @@
 class LogStreamingHandler(object):
     log = logging.getLogger("zuul.web.LogStreamingHandler")
 
-    def __init__(self, loop, gear_server, gear_port,
-                 ssl_key=None, ssl_cert=None, ssl_ca=None):
-        self.event_loop = loop
-        self.gear_server = gear_server
-        self.gear_port = gear_port
-        self.ssl_key = ssl_key
-        self.ssl_cert = ssl_cert
-        self.ssl_ca = ssl_ca
+    def __init__(self, rpc):
+        self.rpc = rpc
+
+    def setEventLoop(self, event_loop):
+        self.event_loop = event_loop
 
     def _getPortLocation(self, job_uuid):
         """
@@ -49,12 +46,7 @@
         """
         # TODO: Fetch the entire list of uuid/file/server/ports once and
         #       share that, and fetch a new list on cache misses perhaps?
-        # TODO: Avoid recreating a client for each request.
-        rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
-                                       self.ssl_key, self.ssl_cert,
-                                       self.ssl_ca)
-        ret = rpc.get_job_log_stream_address(job_uuid)
-        rpc.shutdown()
+        ret = self.rpc.get_job_log_stream_address(job_uuid)
         return ret
 
     async def _fingerClient(self, ws, server, port, job_uuid):
@@ -159,19 +151,16 @@
                  ssl_key=None, ssl_cert=None, ssl_ca=None):
         self.listen_address = listen_address
         self.listen_port = listen_port
-        self.gear_server = gear_server
-        self.gear_port = gear_port
-        self.ssl_key = ssl_key
-        self.ssl_cert = ssl_cert
-        self.ssl_ca = ssl_ca
         self.event_loop = None
         self.term = None
+        # instanciate handlers
+        self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
+                                            ssl_key, ssl_cert, ssl_ca)
+        self.log_streaming_handler = LogStreamingHandler(self.rpc)
 
     async def _handleWebsocket(self, request):
-        handler = LogStreamingHandler(self.event_loop,
-                                      self.gear_server, self.gear_port,
-                                      self.ssl_key, self.ssl_cert, self.ssl_ca)
-        return await handler.processRequest(request)
+        return await self.log_streaming_handler.processRequest(
+            request)
 
     def run(self, loop=None):
         """
@@ -196,6 +185,7 @@
         asyncio.set_event_loop(loop)
 
         self.event_loop = loop
+        self.log_streaming_handler.setEventLoop(loop)
 
         app = web.Application()
         for method, path, handler in routes:
@@ -229,6 +219,8 @@
             loop.stop()
             loop.close()
 
+        self.rpc.shutdown()
+
     def stop(self):
         if self.event_loop and self.term:
             self.event_loop.call_soon_threadsafe(self.term.set_result, True)