Merge "Document the existence of zuul_success job var" into feature/zuulv3
diff --git a/doc/source/admin/tenants.rst b/doc/source/admin/tenants.rst
index 54bc10a..4722750 100644
--- a/doc/source/admin/tenants.rst
+++ b/doc/source/admin/tenants.rst
@@ -163,6 +163,11 @@
       The maximum number of nodes a job can request.  A value of
       '-1' value removes the limit.
 
+   .. attr:: max-job-timeout
+      :default: 10800
+
+      The maximum timeout for jobs. A value of '-1' value removes the limit.
+
    .. attr:: exclude-unprotected-branches
       :default: false
 
diff --git a/tests/fixtures/config/multi-tenant/main.yaml b/tests/fixtures/config/multi-tenant/main.yaml
index 4916905..e667588 100644
--- a/tests/fixtures/config/multi-tenant/main.yaml
+++ b/tests/fixtures/config/multi-tenant/main.yaml
@@ -1,5 +1,6 @@
 - tenant:
     name: tenant-one
+    max-job-timeout: 1800
     source:
       gerrit:
         config-projects:
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 2b27b0e..d55ff92 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1402,7 +1402,7 @@
 class TestMaxNodesPerJob(AnsibleZuulTestCase):
     tenant_config_file = 'config/multi-tenant/main.yaml'
 
-    def test_max_nodes_reached(self):
+    def test_max_timeout_exceeded(self):
         in_repo_conf = textwrap.dedent(
             """
             - job:
@@ -1437,6 +1437,32 @@
                          "B should not fail because of nodes limit")
 
 
+class TestMaxTimeout(AnsibleZuulTestCase):
+    tenant_config_file = 'config/multi-tenant/main.yaml'
+
+    def test_max_nodes_reached(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: test-job
+                timeout: 3600
+            """)
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertIn('The job "test-job" exceeds tenant max-job-timeout',
+                      A.messages[0], "A should fail because of timeout limit")
+
+        B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertNotIn("exceeds tenant max-job-timeout", B.messages[0],
+                         "B should not fail because of timeout limit")
+
+
 class TestBaseJobs(ZuulTestCase):
     tenant_config_file = 'config/base-jobs/main.yaml'
 
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 347f080..770a719 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -36,23 +36,6 @@
 LOG_STREAM_PORT = 19885
 
 
-def linesplit(socket):
-    buff = socket.recv(4096).decode("utf-8")
-    buffering = True
-    while buffering:
-        if "\n" in buff:
-            (line, buff) = buff.split("\n", 1)
-            yield line + "\n"
-        else:
-            more = socket.recv(4096).decode("utf-8")
-            if not more:
-                buffering = False
-            else:
-                buff += more
-    if buff:
-        yield buff
-
-
 def zuul_filter_result(result):
     """Remove keys from shell/command output.
 
@@ -145,18 +128,36 @@
                 continue
             msg = "%s\n" % log_id
             s.send(msg.encode("utf-8"))
-            for line in linesplit(s):
-                if "[Zuul] Task exit code" in line:
-                    return
-                elif self._streamers_stop and "[Zuul] Log not found" in line:
-                    return
-                elif "[Zuul] Log not found" in line:
-                    # don't output this line
-                    pass
+            buff = s.recv(4096).decode("utf-8")
+            buffering = True
+            while buffering:
+                if "\n" in buff:
+                    (line, buff) = buff.split("\n", 1)
+                    done = self._log_streamline(host, line)
+                    if done:
+                        return
                 else:
-                    ts, ln = line.split(' | ', 1)
+                    more = s.recv(4096).decode("utf-8")
+                    if not more:
+                        buffering = False
+                    else:
+                        buff += more
+            if buff:
+                self._log_streamline(host, line)
 
-                    self._log("%s | %s " % (host, ln), ts=ts)
+    def _log_streamline(self, host, line):
+        if "[Zuul] Task exit code" in line:
+            return True
+        elif self._streamers_stop and "[Zuul] Log not found" in line:
+            return True
+        elif "[Zuul] Log not found" in line:
+            # don't output this line
+            return False
+        else:
+            ts, ln = line.split(' | ', 1)
+
+            self._log("%s | %s " % (host, ln), ts=ts)
+            return False
 
     def v2_playbook_on_start(self, playbook):
         self._playbook_name = os.path.splitext(playbook._file_name)[0]
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index 06ef0ba..63c621d 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -82,7 +82,7 @@
 
             self.log.info("Starting log streamer")
             streamer = zuul.lib.log_streamer.LogStreamer(
-                self.user, '0.0.0.0', self.finger_port, self.job_dir)
+                self.user, '::', self.finger_port, self.job_dir)
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index a923fca..13fc310 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -76,6 +76,15 @@
         super(MaxNodeError, self).__init__(message)
 
 
+class MaxTimeoutError(Exception):
+    def __init__(self, job, tenant):
+        message = textwrap.dedent("""\
+        The job "{job}" exceeds tenant max-job-timeout {maxtimeout}.""")
+        message = textwrap.fill(message.format(
+            job=job.name, maxtimeout=tenant.max_job_timeout))
+        super(MaxTimeoutError, self).__init__(message)
+
+
 class DuplicateGroupError(Exception):
     def __init__(self, nodeset, group):
         message = textwrap.dedent("""\
@@ -505,6 +514,10 @@
         if secrets and not conf['_source_context'].trusted:
             job.post_review = True
 
+        if conf.get('timeout') and tenant.max_job_timeout != -1 and \
+           int(conf['timeout']) > tenant.max_job_timeout:
+            raise MaxTimeoutError(job, tenant)
+
         if 'post-review' in conf:
             if conf['post-review']:
                 job.post_review = True
@@ -1059,6 +1072,7 @@
     def getSchema(connections=None):
         tenant = {vs.Required('name'): str,
                   'max-nodes-per-job': int,
+                  'max-job-timeout': int,
                   'source': TenantParser.validateTenantSources(connections),
                   'exclude-unprotected-branches': bool,
                   'default-parent': str,
@@ -1072,6 +1086,8 @@
         tenant = model.Tenant(conf['name'])
         if conf.get('max-nodes-per-job') is not None:
             tenant.max_nodes_per_job = conf['max-nodes-per-job']
+        if conf.get('max-job-timeout') is not None:
+            tenant.max_job_timeout = int(conf['max-job-timeout'])
         if conf.get('exclude-unprotected-branches') is not None:
             tenant.exclude_unprotected_branches = \
                 conf['exclude-unprotected-branches']
@@ -1342,6 +1358,8 @@
                 continue
             TenantParser.log.debug("Waiting for cat job %s" % (job,))
             job.wait()
+            if not job.updated:
+                raise Exception("Cat job %s failed" % (job,))
             TenantParser.log.debug("Cat job %s got files %s" %
                                    (job, job.files))
             loaded = False
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 03fcb4a..92f3922 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -1618,7 +1618,8 @@
                     now=datetime.datetime.now()))
                 for line in syntax_buffer:
                     job_output.write("{now} | {line}\n".format(
-                        now=datetime.datetime.now(), line=line))
+                        now=datetime.datetime.now(),
+                        line=line.decode('utf-8').rstrip()))
 
         return (self.RESULT_NORMAL, ret)
 
@@ -1634,7 +1635,7 @@
             cmd.extend(['-e', '@' + playbook.secrets])
 
         if success is not None:
-            cmd.extend(['-e', 'success=%s' % str(bool(success))])
+            cmd.extend(['-e', 'zuul_success=%s' % str(bool(success))])
 
         if phase:
             cmd.extend(['-e', 'zuul_execution_phase=%s' % phase])
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 5191a44..2614e58 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -134,18 +134,18 @@
     def onBuildCompleted(self, job):
         data = getJobData(job)
         merged = data.get('merged', False)
-        updated = data.get('updated', False)
+        job.updated = data.get('updated', False)
         commit = data.get('commit')
         files = data.get('files', {})
         repo_state = data.get('repo_state', {})
         job.files = files
         self.log.info("Merge %s complete, merged: %s, updated: %s, "
                       "commit: %s" %
-                      (job, merged, updated, commit))
+                      (job, merged, job.updated, commit))
         job.setComplete()
         if job.build_set:
             self.sched.onMergeCompleted(job.build_set,
-                                        merged, updated, commit, files,
+                                        merged, job.updated, commit, files,
                                         repo_state)
         # The test suite expects the job to be removed from the
         # internal account after the wake flag is set.
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index fc599c1..881209d 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -111,7 +111,7 @@
     def refstate(self, job):
         args = json.loads(job.arguments)
 
-        success, repo_state = self.merger.getItemRepoState(args['items'])
+        success, repo_state = self.merger.getRepoState(args['items'])
         result = dict(updated=success,
                       repo_state=repo_state)
         job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index 1ef8d3a..429a0c3 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -2506,6 +2506,7 @@
     def __init__(self, name):
         self.name = name
         self.max_nodes_per_job = 5
+        self.max_job_timeout = 10800
         self.exclude_unprotected_branches = False
         self.default_base_job = None
         self.layout = None