Monitor job root and kill over limit jobs

If a job were to be pointed at an abnormally large git repository (or
a maliciously large one), a clone would fill the disk. Or anything else
that might happen that writes data onto the executor disk.

We run a single thread periodically running du on the root of all jobs
on this executor. This is called the DiskAccountant.

We set a config item per executor of the limit per job. This won't
actually save a server from a full disk if many thousands of concurrent
changes are submitted, but this will prevent any accidental filling of
the disk, and make malicious disk filling much harder.

We also ignore hard links from the merge root, which will exempt bits
cloned from the merge root from disk accounting.

Change-Id: I415e5930cc3ebe2c7e1a84316e78578d6b9ecf30
Story: 2000879
Task: 3504
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index c11b5cb..08671c9 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -354,6 +354,13 @@
 
      variables=/etc/zuul/variables.yaml
 
+**disk_limit_per_job**
+  This integer is the maximum number of megabytes that any one job is
+  allowed to consume on disk while it is running. If a job's scratch
+  space has more than this much space consumed, it will be aborted::
+
+      disk_limit_per_job=100
+
 merger
 """"""
 
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
new file mode 100644
index 0000000..95ab870
--- /dev/null
+++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
@@ -0,0 +1,6 @@
+- hosts: localhost
+  tasks:
+    - command: dd if=/dev/zero of=toobig bs=1M count=2
+    - wait_for:
+        delay: 10
+        path: /
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml b/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml
new file mode 100644
index 0000000..83a5158
--- /dev/null
+++ b/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml
@@ -0,0 +1,22 @@
+- pipeline:
+    name: check
+    manager: independent
+    allow-secrets: true
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+- job:
+    name: dd-big-empty-file
+
+- project:
+    name: org/project
+    check:
+      jobs:
+        - dd-big-empty-file
diff --git a/tests/fixtures/config/disk-accountant/git/org_project/README b/tests/fixtures/config/disk-accountant/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/disk-accountant/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/disk-accountant/main.yaml b/tests/fixtures/config/disk-accountant/main.yaml
new file mode 100644
index 0000000..208e274
--- /dev/null
+++ b/tests/fixtures/config/disk-accountant/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
diff --git a/tests/fixtures/zuul-disk-accounting.conf b/tests/fixtures/zuul-disk-accounting.conf
new file mode 100644
index 0000000..b0ae48e
--- /dev/null
+++ b/tests/fixtures/zuul-disk-accounting.conf
@@ -0,0 +1,28 @@
+[gearman]
+server=127.0.0.1
+
+[scheduler]
+tenant_config=main.yaml
+
+[merger]
+git_dir=/tmp/zuul-test/merger-git
+git_user_email=zuul@example.com
+git_user_name=zuul
+zuul_url=http://zuul.example.com/p
+
+[executor]
+git_dir=/tmp/zuul-test/executor-git
+disk_limit_per_job=1
+
+[connection gerrit]
+driver=gerrit
+server=review.example.com
+user=jenkins
+sshkey=fake_id_rsa_path
+
+[connection smtp]
+driver=smtp
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py
new file mode 100644
index 0000000..22c8f34
--- /dev/null
+++ b/tests/unit/test_disk_accountant.py
@@ -0,0 +1,89 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+import time
+
+from tests.base import BaseTestCase
+
+from zuul.executor.server import DiskAccountant
+
+
+class FakeExecutor(object):
+    def __init__(self):
+        self.stopped_jobs = set()
+        self.used = {}
+
+    def stopJobByJobDir(self, jobdir):
+        self.stopped_jobs.add(jobdir)
+
+    def usage(self, dirname, used):
+        self.used[dirname] = used
+
+
+class TestDiskAccountant(BaseTestCase):
+    def test_disk_accountant(self):
+        jobs_dir = tempfile.mkdtemp()
+        cache_dir = tempfile.mkdtemp()
+        executor_server = FakeExecutor()
+        da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
+                            cache_dir)
+        da.start()
+
+        jobdir = os.path.join(jobs_dir, '012345')
+        os.mkdir(jobdir)
+        testfile = os.path.join(jobdir, 'tfile')
+        with open(testfile, 'w') as tf:
+            tf.write(2 * 1024 * 1024 * '.')
+
+        # da should catch over-limit dir within 5 seconds
+        for i in range(0, 50):
+            if jobdir in executor_server.stopped_jobs:
+                break
+            time.sleep(0.1)
+        self.assertEqual(set([jobdir]), executor_server.stopped_jobs)
+        da.stop()
+        self.assertFalse(da.thread.is_alive())
+
+    def test_cache_hard_links(self):
+        root_dir = tempfile.mkdtemp()
+        jobs_dir = os.path.join(root_dir, 'jobs')
+        os.mkdir(jobs_dir)
+        cache_dir = os.path.join(root_dir, 'cache')
+        os.mkdir(cache_dir)
+
+        executor_server = FakeExecutor()
+        da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
+                            cache_dir, executor_server.usage)
+        da.start()
+
+        jobdir = os.path.join(jobs_dir, '012345')
+        os.mkdir(jobdir)
+
+        repo_dir = os.path.join(cache_dir, 'a.repo')
+        os.mkdir(repo_dir)
+        source_file = os.path.join(repo_dir, 'big_file')
+        with open(source_file, 'w') as tf:
+            tf.write(2 * 1024 * 1024 * '.')
+        dest_link = os.path.join(jobdir, 'big_file')
+        os.link(source_file, dest_link)
+
+        # da should _not_ count this file. Wait for 5s to get noticed
+        for i in range(0, 50):
+            if jobdir in executor_server.used:
+                break
+            time.sleep(0.1)
+        self.assertEqual(set(), executor_server.stopped_jobs)
+        self.assertIn(jobdir, executor_server.used)
+        self.assertEqual(1, executor_server.used[jobdir])
+        da.stop()
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 7dcb4ae..3d8d37c 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -924,3 +924,15 @@
         self.assertIn('- data-return-relative '
                       'http://example.com/test/log/url/docs/index.html',
                       A.messages[-1])
+
+
+class TestDiskAccounting(AnsibleZuulTestCase):
+    config_file = 'zuul-disk-accounting.conf'
+    tenant_config_file = 'config/disk-accountant/main.yaml'
+
+    def test_disk_accountant_kills_job(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='dd-big-empty-file', result='ABORTED', changes='1,1')])
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 84aab55..11b7c3a 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -55,6 +55,88 @@
     pass
 
 
+class DiskAccountant(object):
+    ''' A single thread to periodically run du and monitor a base directory
+
+    Whenever the accountant notices a dir over limit, it will call the
+    given func with an argument of the job directory. That function
+    should be used to remediate the problem, generally by killing the
+    job producing the disk bloat). The function will be called every
+    time the problem is noticed, so it should be handled synchronously
+    to avoid stacking up calls.
+    '''
+    log = logging.getLogger("zuul.ExecutorDiskAccountant")
+
+    def __init__(self, jobs_base, limit, func, cache_dir, usage_func=None):
+        '''
+        :param str jobs_base: absolute path name of dir to be monitored
+        :param int limit: maximum number of MB allowed to be in use in any one
+                          subdir
+        :param callable func: Function to call with overlimit dirs
+        :param str cache_dir: absolute path name of dir to be passed as the
+                              first argument to du. This will ensure du does
+                              not count any hardlinks to files in this
+                              directory against a single job.
+        :param callable usage_func: Optional function to call with usage
+                                    for every dir _NOT_ over limit
+        '''
+        # Don't cross the streams
+        if cache_dir == jobs_base:
+            raise Exception("Cache dir and jobs dir cannot be the same")
+        self.thread = threading.Thread(target=self._run,
+                                       name='executor-diskaccountant')
+        self.thread.daemon = True
+        self._running = False
+        self.jobs_base = jobs_base
+        self.limit = limit
+        self.func = func
+        self.cache_dir = cache_dir
+        self.usage_func = usage_func
+        self.stop_event = threading.Event()
+
+    def _run(self):
+        while self._running:
+            # Walk job base
+            before = time.time()
+            du = subprocess.Popen(
+                ['du', '-m', '--max-depth=1', self.cache_dir, self.jobs_base],
+                stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+            for line in du.stdout:
+                (size, dirname) = line.rstrip().split()
+                dirname = dirname.decode('utf8')
+                if dirname == self.jobs_base or dirname == self.cache_dir:
+                    continue
+                if os.path.dirname(dirname) == self.cache_dir:
+                    continue
+                size = int(size)
+                if size > self.limit:
+                    self.log.info(
+                        "{job} is using {size}MB (limit={limit})"
+                        .format(size=size, job=dirname, limit=self.limit))
+                    self.func(dirname)
+                elif self.usage_func:
+                    self.log.debug(
+                        "{job} is using {size}MB (limit={limit})"
+                        .format(size=size, job=dirname, limit=self.limit))
+                    self.usage_func(dirname, size)
+            du.wait()
+            after = time.time()
+            # Sleep half as long as that took, or 1s, whichever is longer
+            delay_time = max((after - before) / 2, 1.0)
+            self.stop_event.wait(delay_time)
+
+    def start(self):
+        self._running = True
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+        self.stop_event.set()
+        # We join here to avoid whitelisting the thread -- if it takes more
+        # than 5s to stop in tests, there's a problem.
+        self.thread.join(timeout=5)
+
+
 class Watchdog(object):
     def __init__(self, timeout, function, args):
         self.timeout = timeout
@@ -443,6 +525,8 @@
                                       '/var/lib/zuul/executor-git')
         self.default_username = get_default(self.config, 'executor',
                                             'default_username', 'zuul')
+        self.disk_limit_per_job = int(get_default(self.config, 'executor',
+                                                  'disk_limit_per_job', 250))
         self.merge_email = get_default(self.config, 'merger', 'git_user_email')
         self.merge_name = get_default(self.config, 'merger', 'git_user_name')
         execution_wrapper_name = get_default(self.config, 'executor',
@@ -486,6 +570,10 @@
             pass
 
         self.job_workers = {}
+        self.disk_accountant = DiskAccountant(self.jobdir_root,
+                                              self.disk_limit_per_job,
+                                              self.stopJobByJobdir,
+                                              self.merge_root)
 
     def _getMerger(self, root, logger=None):
         if root != self.merge_root:
@@ -530,6 +618,7 @@
         self.executor_thread = threading.Thread(target=self.run_executor)
         self.executor_thread.daemon = True
         self.executor_thread.start()
+        self.disk_accountant.start()
 
     def register(self):
         self.executor_worker.registerFunction("executor:execute")
@@ -540,6 +629,7 @@
 
     def stop(self):
         self.log.debug("Stopping")
+        self.disk_accountant.stop()
         self._running = False
         self._command_running = False
         self.command_socket.stop()
@@ -675,23 +765,30 @@
     def finishJob(self, unique):
         del(self.job_workers[unique])
 
+    def stopJobByJobdir(self, jobdir):
+        unique = os.path.basename(jobdir)
+        self.stopJobByUnique(unique)
+
     def stopJob(self, job):
         try:
             args = json.loads(job.arguments)
             self.log.debug("Stop job with arguments: %s" % (args,))
             unique = args['uuid']
-            job_worker = self.job_workers.get(unique)
-            if not job_worker:
-                self.log.debug("Unable to find worker for job %s" % (unique,))
-                return
-            try:
-                job_worker.stop()
-            except Exception:
-                self.log.exception("Exception sending stop command "
-                                   "to worker:")
+            self.stopJobByUnique(unique)
         finally:
             job.sendWorkComplete()
 
+    def stopJobByUnique(self, unique):
+        job_worker = self.job_workers.get(unique)
+        if not job_worker:
+            self.log.debug("Unable to find worker for job %s" % (unique,))
+            return
+        try:
+            job_worker.stop()
+        except Exception:
+            self.log.exception("Exception sending stop command "
+                               "to worker:")
+
     def cat(self, job):
         args = json.loads(job.arguments)
         task = self.update(args['connection'], args['project'])
@@ -1429,6 +1526,7 @@
             if timeout:
                 watchdog.stop()
                 self.log.debug("Stopped watchdog")
+            self.log.debug("Stopped disk job killer")
 
         with self.proc_lock:
             self.proc = None