blob: 40b3b7212e7cb6f5f21b9af25aa4dcb6a40296a6 [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
import time
from tests.base import BaseTestCase
from zuul.executor.server import DiskAccountant
class FakeExecutor(object):
def __init__(self):
self.stopped_jobs = set()
self.used = {}
def stopJobByJobDir(self, jobdir):
self.stopped_jobs.add(jobdir)
def usage(self, dirname, used):
self.used[dirname] = used
class TestDiskAccountant(BaseTestCase):
def test_disk_accountant(self):
jobs_dir = tempfile.mkdtemp()
cache_dir = tempfile.mkdtemp()
executor_server = FakeExecutor()
da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
cache_dir)
da.start()
jobdir = os.path.join(jobs_dir, '012345')
os.mkdir(jobdir)
testfile = os.path.join(jobdir, 'tfile')
with open(testfile, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
# da should catch over-limit dir within 5 seconds
for i in range(0, 50):
if jobdir in executor_server.stopped_jobs:
break
time.sleep(0.1)
try:
self.assertEqual(set([jobdir]), executor_server.stopped_jobs)
finally:
da.stop()
self.assertFalse(da.thread.is_alive())
def test_cache_hard_links(self):
root_dir = tempfile.mkdtemp()
jobs_dir = os.path.join(root_dir, 'jobs')
os.mkdir(jobs_dir)
cache_dir = os.path.join(root_dir, 'cache')
os.mkdir(cache_dir)
executor_server = FakeExecutor()
da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
cache_dir, executor_server.usage)
da.start()
jobdir = os.path.join(jobs_dir, '012345')
os.mkdir(jobdir)
repo_dir = os.path.join(cache_dir, 'a.repo')
os.mkdir(repo_dir)
source_file = os.path.join(repo_dir, 'big_file')
with open(source_file, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
dest_link = os.path.join(jobdir, 'big_file')
os.link(source_file, dest_link)
# da should _not_ count this file. Wait for 5s to get noticed
for i in range(0, 50):
if jobdir in executor_server.used:
break
time.sleep(0.1)
try:
self.assertEqual(set(), executor_server.stopped_jobs)
self.assertIn(jobdir, executor_server.used)
self.assertEqual(1, executor_server.used[jobdir])
finally:
da.stop()