Estimate job runtimes internally
Rather than relying on the workers to supply estimated job runtimes,
record the last 10 successful run times and use those to estimate
the run time of each job. This means that workers (which may be
highly distributed and lack access to a substantial job history)
no longer need to provide these values, and the central scheduler,
which is better placed to do so since italready sees all job run
times, will.
Failure times and a scoreboard of results are kept for each job
as well for potential future use in evaluating likelihood of
job success.
Change-Id: If0955e15a3da9eb842dbee02a4750a177a092d3e
diff --git a/zuul/model.py b/zuul/model.py
index 5bea5d0..3fb0577 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
# under the License.
import copy
+import os
import re
+import struct
import time
from uuid import uuid4
import extras
@@ -1380,3 +1382,78 @@
job.copy(metajob)
self.jobs[name] = job
return job
+
+
+class JobTimeData(object):
+ format = 'B10H10H10B'
+ version = 0
+
+ def __init__(self, path):
+ self.path = path
+ self.success_times = [0 for x in range(10)]
+ self.failure_times = [0 for x in range(10)]
+ self.results = [0 for x in range(10)]
+
+ def load(self):
+ if not os.path.exists(self.path):
+ return
+ with open(self.path) as f:
+ data = struct.unpack(self.format, f.read())
+ version = data[0]
+ if version != self.version:
+ raise Exception("Unkown data version")
+ self.success_times = list(data[1:11])
+ self.failure_times = list(data[11:21])
+ self.results = list(data[21:32])
+
+ def save(self):
+ tmpfile = self.path + '.tmp'
+ data = [self.version]
+ data.extend(self.success_times)
+ data.extend(self.failure_times)
+ data.extend(self.results)
+ data = struct.pack(self.format, *data)
+ with open(tmpfile, 'w') as f:
+ f.write(data)
+ os.rename(tmpfile, self.path)
+
+ def add(self, elapsed, result):
+ elapsed = int(elapsed)
+ if result == 'SUCCESS':
+ self.success_times.append(elapsed)
+ self.success_times.pop(0)
+ result = 0
+ else:
+ self.failure_times.append(elapsed)
+ self.failure_times.pop(0)
+ result = 1
+ self.results.append(result)
+ self.results.pop(0)
+
+ def getEstimatedTime(self):
+ times = [x for x in self.success_times if x]
+ if times:
+ return float(sum(times)) / len(times)
+ return 0.0
+
+
+class TimeDataBase(object):
+ def __init__(self, root):
+ self.root = root
+ self.jobs = {}
+
+ def _getTD(self, name):
+ td = self.jobs.get(name)
+ if not td:
+ td = JobTimeData(os.path.join(self.root, name))
+ self.jobs[name] = td
+ td.load()
+ return td
+
+ def getEstimatedTime(self, name):
+ return self._getTD(name).getEstimatedTime()
+
+ def update(self, name, elapsed, result):
+ td = self._getTD(name)
+ td.add(elapsed, result)
+ td.save()