Estimate job runtimes internally

Rather than relying on the workers to supply estimated job runtimes,
record the last 10 successful run times and use those to estimate
the run time of each job.  This means that workers (which may be
highly distributed and lack access to a substantial job history)
no longer need to provide these values, and the central scheduler,
which is better placed to do so since italready sees all job run
times, will.

Failure times and a scoreboard of results are kept for each job
as well for potential future use in evaluating likelihood of
job success.

Change-Id: If0955e15a3da9eb842dbee02a4750a177a092d3e
diff --git a/tests/base.py b/tests/base.py
index 405caa0..585f2d2 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -876,11 +876,13 @@
         self.test_root = os.path.join(tmp_root, "zuul-test")
         self.upstream_root = os.path.join(self.test_root, "upstream")
         self.git_root = os.path.join(self.test_root, "git")
+        self.state_root = os.path.join(self.test_root, "lib")
 
         if os.path.exists(self.test_root):
             shutil.rmtree(self.test_root)
         os.makedirs(self.test_root)
         os.makedirs(self.upstream_root)
+        os.makedirs(self.state_root)
 
         # Make per test copy of Configuration.
         self.setup_config()
@@ -888,6 +890,7 @@
                         os.path.join(FIXTURE_DIR,
                                      self.config.get('zuul', 'layout_config')))
         self.config.set('merger', 'git_dir', self.git_root)
+        self.config.set('zuul', 'state_dir', self.state_root)
 
         # For each project in config:
         self.init_repo("org/project")
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..ac19383 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,6 +12,11 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import os
+import random
+
+import fixtures
+
 from zuul import change_matcher as cm
 from zuul import model
 
@@ -62,3 +67,76 @@
         metajob = model.Job('^job')
         job.copy(metajob)
         self._assert_job_booleans_are_not_none(job)
+
+
+class TestJobTimeData(BaseTestCase):
+    def setUp(self):
+        super(TestJobTimeData, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+
+    def test_empty_timedata(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+
+    def test_save_reload(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        success_times = []
+        failure_times = []
+        results = []
+        for x in range(10):
+            success_times.append(int(random.random() * 1000))
+            failure_times.append(int(random.random() * 1000))
+            results.append(0)
+            results.append(1)
+        random.shuffle(results)
+        s = f = 0
+        for result in results:
+            if result:
+                td.add(failure_times[f], 'FAILURE')
+                f += 1
+            else:
+                td.add(success_times[s], 'SUCCESS')
+                s += 1
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+        td.save()
+        self.assertTrue(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        td.load()
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+
+
+class TestTimeDataBase(BaseTestCase):
+    def setUp(self):
+        super(TestTimeDataBase, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+        self.db = model.TimeDataBase(self.tmp_root)
+
+    def test_timedatabase(self):
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
+        self.db.update('job-name', 50, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
+        self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
+        for x in range(10):
+            self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 100)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index fe7c7cc..15d33c8 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -34,7 +34,6 @@
 import zuul.reporter.smtp
 
 from tests.base import (
-    BaseTestCase,
     ZuulTestCase,
     repack_repo,
 )
@@ -44,40 +43,6 @@
                     '%(levelname)-8s %(message)s')
 
 
-class TestSchedulerConfigParsing(BaseTestCase):
-
-    def test_parse_skip_if(self):
-        job_yaml = """
-jobs:
-  - name: job_name
-    skip-if:
-      - project: ^project_name$
-        branch: ^stable/icehouse$
-        all-files-match-any:
-          - ^filename$
-      - project: ^project2_name$
-        all-files-match-any:
-          - ^filename2$
-    """.strip()
-        data = yaml.load(job_yaml)
-        config_job = data.get('jobs')[0]
-        sched = zuul.scheduler.Scheduler({})
-        cm = zuul.change_matcher
-        expected = cm.MatchAny([
-            cm.MatchAll([
-                cm.ProjectMatcher('^project_name$'),
-                cm.BranchMatcher('^stable/icehouse$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
-            ]),
-            cm.MatchAll([
-                cm.ProjectMatcher('^project2_name$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
-            ]),
-        ])
-        matcher = sched._parseSkipIf(config_job)
-        self.assertEqual(expected, matcher)
-
-
 class TestScheduler(ZuulTestCase):
 
     def test_jobs_launched(self):
@@ -495,6 +460,46 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def _test_time_database(self, iteration):
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        time.sleep(2)
+
+        data = json.loads(self.sched.formatStatusJSON())
+        found_job = None
+        for pipeline in data['pipelines']:
+            if pipeline['name'] != 'gate':
+                continue
+            for queue in pipeline['change_queues']:
+                for head in queue['heads']:
+                    for item in head:
+                        for job in item['jobs']:
+                            if job['name'] == 'project-merge':
+                                found_job = job
+                                break
+
+        self.assertIsNotNone(found_job)
+        if iteration == 1:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertIsNone(found_job['remaining_time'])
+        else:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertTrue(found_job['estimated_time'] >= 2)
+            self.assertIsNotNone(found_job['remaining_time'])
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+    def test_time_database(self):
+        "Test the time database"
+
+        self._test_time_database(1)
+        self._test_time_database(2)
+
     def test_two_failed_changes_at_head(self):
         "Test that changes are reparented correctly if 2 fail at head"
 
@@ -600,6 +605,36 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_parse_skip_if(self):
+        job_yaml = """
+jobs:
+  - name: job_name
+    skip-if:
+      - project: ^project_name$
+        branch: ^stable/icehouse$
+        all-files-match-any:
+          - ^filename$
+      - project: ^project2_name$
+        all-files-match-any:
+          - ^filename2$
+    """.strip()
+        data = yaml.load(job_yaml)
+        config_job = data.get('jobs')[0]
+        cm = zuul.change_matcher
+        expected = cm.MatchAny([
+            cm.MatchAll([
+                cm.ProjectMatcher('^project_name$'),
+                cm.BranchMatcher('^stable/icehouse$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
+            ]),
+            cm.MatchAll([
+                cm.ProjectMatcher('^project2_name$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
+            ]),
+        ])
+        matcher = self.sched._parseSkipIf(config_job)
+        self.assertEqual(expected, matcher)
+
     def test_patch_order(self):
         "Test that dependent patches are tested in the right order"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 69fb71b..f3b867c 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -456,9 +456,6 @@
                 build.number = data.get('number')
                 build.__gearman_manager = data.get('manager')
                 self.sched.onBuildStarted(build)
-
-            if job.denominator:
-                build.estimated_time = float(job.denominator) / 1000
         else:
             self.log.error("Unable to find build %s" % job.unique)
 
diff --git a/zuul/model.py b/zuul/model.py
index 5bea5d0..3fb0577 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
 # under the License.
 
 import copy
+import os
 import re
+import struct
 import time
 from uuid import uuid4
 import extras
@@ -1380,3 +1382,78 @@
                     job.copy(metajob)
             self.jobs[name] = job
         return job
+
+
+class JobTimeData(object):
+    format = 'B10H10H10B'
+    version = 0
+
+    def __init__(self, path):
+        self.path = path
+        self.success_times = [0 for x in range(10)]
+        self.failure_times = [0 for x in range(10)]
+        self.results = [0 for x in range(10)]
+
+    def load(self):
+        if not os.path.exists(self.path):
+            return
+        with open(self.path) as f:
+            data = struct.unpack(self.format, f.read())
+        version = data[0]
+        if version != self.version:
+            raise Exception("Unkown data version")
+        self.success_times = list(data[1:11])
+        self.failure_times = list(data[11:21])
+        self.results = list(data[21:32])
+
+    def save(self):
+        tmpfile = self.path + '.tmp'
+        data = [self.version]
+        data.extend(self.success_times)
+        data.extend(self.failure_times)
+        data.extend(self.results)
+        data = struct.pack(self.format, *data)
+        with open(tmpfile, 'w') as f:
+            f.write(data)
+        os.rename(tmpfile, self.path)
+
+    def add(self, elapsed, result):
+        elapsed = int(elapsed)
+        if result == 'SUCCESS':
+            self.success_times.append(elapsed)
+            self.success_times.pop(0)
+            result = 0
+        else:
+            self.failure_times.append(elapsed)
+            self.failure_times.pop(0)
+            result = 1
+        self.results.append(result)
+        self.results.pop(0)
+
+    def getEstimatedTime(self):
+        times = [x for x in self.success_times if x]
+        if times:
+            return float(sum(times)) / len(times)
+        return 0.0
+
+
+class TimeDataBase(object):
+    def __init__(self, root):
+        self.root = root
+        self.jobs = {}
+
+    def _getTD(self, name):
+        td = self.jobs.get(name)
+        if not td:
+            td = JobTimeData(os.path.join(self.root, name))
+            self.jobs[name] = td
+            td.load()
+        return td
+
+    def getEstimatedTime(self, name):
+        return self._getTD(name).getEstimatedTime()
+
+    def update(self, name, elapsed, result):
+        td = self._getTD(name)
+        td.add(elapsed, result)
+        td.save()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index aea9a67..ee5cd2b 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -262,6 +262,9 @@
         self.management_event_queue = Queue.Queue()
         self.layout = model.Layout()
 
+        time_dir = self._get_time_database_dir()
+        self.time_database = model.TimeDataBase(time_dir)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
 
@@ -740,6 +743,17 @@
             state_dir = '/var/lib/zuul'
         return os.path.join(state_dir, 'queue.pickle')
 
+    def _get_time_database_dir(self):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(self.config.get('zuul',
+                                                           'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        d = os.path.join(state_dir, 'times')
+        if not os.path.exists(d):
+            os.mkdir(d)
+        return d
+
     def _save_queue(self):
         pickle_file = self._get_queue_pickle_file()
         events = []
@@ -1069,6 +1083,11 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        try:
+            build.estimated_time = float(self.time_database.getEstimatedTime(
+                build.job.name))
+        except Exception:
+            self.log.exception("Exception estimating build time:")
         pipeline.manager.onBuildStarted(event.build)
 
     def _doBuildCompletedEvent(self, event):
@@ -1082,6 +1101,12 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        if build.end_time and build.start_time and build.result:
+            duration = build.end_time - build.start_time
+        try:
+            self.time_database.update(build.job.name, duration, build.result)
+        except Exception:
+            self.log.exception("Exception recording build time:")
         pipeline.manager.onBuildCompleted(event.build)
 
     def _doMergeCompletedEvent(self, event):