Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # SPDX-License-Identifier: GPL-2.0+ |
| 3 | # |
| 4 | # Modified by: Corey Goldberg, 2013 |
| 5 | # |
| 6 | # Original code from: |
| 7 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) |
| 8 | # Copyright (C) 2005-2011 Canonical Ltd |
| 9 | |
| 10 | """Python testtools extension for running unittest suites concurrently. |
| 11 | |
| 12 | The `testtools` project provides a ConcurrentTestSuite class, but does |
| 13 | not provide a `make_tests` implementation needed to use it. |
| 14 | |
| 15 | This allows you to parallelize a test run across a configurable number |
| 16 | of worker processes. While this can speed up CPU-bound test runs, it is |
| 17 | mainly useful for IO-bound tests that spend most of their time waiting for |
| 18 | data to arrive from someplace else and can benefit from cocncurrency. |
| 19 | |
| 20 | Unix only. |
| 21 | """ |
| 22 | |
| 23 | import os |
| 24 | import sys |
| 25 | import traceback |
| 26 | import unittest |
| 27 | from itertools import cycle |
| 28 | from multiprocessing import cpu_count |
| 29 | |
| 30 | from subunit import ProtocolTestCase, TestProtocolClient |
| 31 | from subunit.test_results import AutoTimingTestResultDecorator |
| 32 | |
| 33 | from testtools import ConcurrentTestSuite, iterate_tests |
Alper Nebi Yasak | ebcaafc | 2022-04-02 20:06:08 +0300 | [diff] [blame] | 34 | from testtools.content import TracebackContent, text_content |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 35 | |
| 36 | |
| 37 | _all__ = [ |
| 38 | 'ConcurrentTestSuite', |
| 39 | 'fork_for_tests', |
| 40 | 'partition_tests', |
| 41 | ] |
| 42 | |
| 43 | |
| 44 | CPU_COUNT = cpu_count() |
| 45 | |
| 46 | |
Alper Nebi Yasak | ebcaafc | 2022-04-02 20:06:08 +0300 | [diff] [blame] | 47 | class BufferingTestProtocolClient(TestProtocolClient): |
| 48 | """A TestProtocolClient which can buffer the test outputs |
| 49 | |
| 50 | This class captures the stdout and stderr output streams of the |
| 51 | tests as it runs them, and includes the output texts in the subunit |
| 52 | stream as additional details. |
| 53 | |
| 54 | Args: |
| 55 | stream: A file-like object to write a subunit stream to |
| 56 | buffer (bool): True to capture test stdout/stderr outputs and |
| 57 | include them in the test details |
| 58 | """ |
| 59 | def __init__(self, stream, buffer=True): |
| 60 | super().__init__(stream) |
| 61 | self.buffer = buffer |
| 62 | |
| 63 | def _addOutcome(self, outcome, test, error=None, details=None, |
| 64 | error_permitted=True): |
| 65 | """Report a test outcome to the subunit stream |
| 66 | |
| 67 | The parent class uses this function as a common implementation |
| 68 | for various methods that report successes, errors, failures, etc. |
| 69 | |
| 70 | This version automatically upgrades the error tracebacks to the |
| 71 | new 'details' format by wrapping them in a Content object, so |
| 72 | that we can include the captured test output in the test result |
| 73 | details. |
| 74 | |
| 75 | Args: |
| 76 | outcome: A string describing the outcome - used as the |
| 77 | event name in the subunit stream. |
| 78 | test: The test case whose outcome is to be reported |
| 79 | error: Standard unittest positional argument form - an |
| 80 | exc_info tuple. |
| 81 | details: New Testing-in-python drafted API; a dict from |
| 82 | string to subunit.Content objects. |
| 83 | error_permitted: If True then one and only one of error or |
| 84 | details must be supplied. If False then error must not |
| 85 | be supplied and details is still optional. |
| 86 | """ |
| 87 | if details is None: |
| 88 | details = {} |
| 89 | |
| 90 | # Parent will raise an exception if error_permitted is False but |
| 91 | # error is not None. We want that exception in that case, so |
| 92 | # don't touch error when error_permitted is explicitly False. |
| 93 | if error_permitted and error is not None: |
| 94 | # Parent class prefers error over details |
| 95 | details['traceback'] = TracebackContent(error, test) |
| 96 | error_permitted = False |
| 97 | error = None |
| 98 | |
| 99 | if self.buffer: |
| 100 | stdout = sys.stdout.getvalue() |
| 101 | if stdout: |
| 102 | details['stdout'] = text_content(stdout) |
| 103 | |
| 104 | stderr = sys.stderr.getvalue() |
| 105 | if stderr: |
| 106 | details['stderr'] = text_content(stderr) |
| 107 | |
| 108 | return super()._addOutcome(outcome, test, error=error, |
| 109 | details=details, error_permitted=error_permitted) |
| 110 | |
| 111 | |
| 112 | def fork_for_tests(concurrency_num=CPU_COUNT, buffer=False): |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 113 | """Implementation of `make_tests` used to construct `ConcurrentTestSuite`. |
| 114 | |
| 115 | :param concurrency_num: number of processes to use. |
| 116 | """ |
Alper Nebi Yasak | ebcaafc | 2022-04-02 20:06:08 +0300 | [diff] [blame] | 117 | if buffer: |
| 118 | test_protocol_client_class = BufferingTestProtocolClient |
| 119 | else: |
| 120 | test_protocol_client_class = TestProtocolClient |
| 121 | |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 122 | def do_fork(suite): |
| 123 | """Take suite and start up multiple runners by forking (Unix only). |
| 124 | |
| 125 | :param suite: TestSuite object. |
| 126 | |
| 127 | :return: An iterable of TestCase-like objects which can each have |
| 128 | run(result) called on them to feed tests to result. |
| 129 | """ |
| 130 | result = [] |
| 131 | test_blocks = partition_tests(suite, concurrency_num) |
| 132 | # Clear the tests from the original suite so it doesn't keep them alive |
| 133 | suite._tests[:] = [] |
| 134 | for process_tests in test_blocks: |
| 135 | process_suite = unittest.TestSuite(process_tests) |
| 136 | # Also clear each split list so new suite has only reference |
| 137 | process_tests[:] = [] |
| 138 | c2pread, c2pwrite = os.pipe() |
| 139 | pid = os.fork() |
| 140 | if pid == 0: |
| 141 | try: |
Simon Glass | fe24008 | 2020-12-28 20:34:58 -0700 | [diff] [blame] | 142 | stream = os.fdopen(c2pwrite, 'wb') |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 143 | os.close(c2pread) |
| 144 | # Leave stderr and stdout open so we can see test noise |
| 145 | # Close stdin so that the child goes away if it decides to |
| 146 | # read from stdin (otherwise its a roulette to see what |
| 147 | # child actually gets keystrokes for pdb etc). |
| 148 | sys.stdin.close() |
| 149 | subunit_result = AutoTimingTestResultDecorator( |
Alper Nebi Yasak | ebcaafc | 2022-04-02 20:06:08 +0300 | [diff] [blame] | 150 | test_protocol_client_class(stream) |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 151 | ) |
| 152 | process_suite.run(subunit_result) |
| 153 | except: |
| 154 | # Try and report traceback on stream, but exit with error |
| 155 | # even if stream couldn't be created or something else |
| 156 | # goes wrong. The traceback is formatted to a string and |
| 157 | # written in one go to avoid interleaving lines from |
| 158 | # multiple failing children. |
| 159 | try: |
| 160 | stream.write(traceback.format_exc()) |
| 161 | finally: |
| 162 | os._exit(1) |
| 163 | os._exit(0) |
| 164 | else: |
| 165 | os.close(c2pwrite) |
Simon Glass | fe24008 | 2020-12-28 20:34:58 -0700 | [diff] [blame] | 166 | stream = os.fdopen(c2pread, 'rb') |
Alper Nebi Yasak | ebcaafc | 2022-04-02 20:06:08 +0300 | [diff] [blame] | 167 | # If we don't pass the second argument here, it defaults |
| 168 | # to sys.stdout.buffer down the line. But if we don't |
| 169 | # pass it *now*, it may be resolved after sys.stdout is |
| 170 | # replaced with a StringIO (to capture tests' outputs) |
| 171 | # which doesn't have a buffer attribute and can end up |
| 172 | # occasionally causing a 'broken-runner' error. |
| 173 | test = ProtocolTestCase(stream, sys.stdout.buffer) |
Simon Glass | 11ae93e | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 174 | result.append(test) |
| 175 | return result |
| 176 | return do_fork |
| 177 | |
| 178 | |
| 179 | def partition_tests(suite, count): |
| 180 | """Partition suite into count lists of tests.""" |
| 181 | # This just assigns tests in a round-robin fashion. On one hand this |
| 182 | # splits up blocks of related tests that might run faster if they shared |
| 183 | # resources, but on the other it avoids assigning blocks of slow tests to |
| 184 | # just one partition. So the slowest partition shouldn't be much slower |
| 185 | # than the fastest. |
| 186 | partitions = [list() for _ in range(count)] |
| 187 | tests = iterate_tests(suite) |
| 188 | for partition, test in zip(cycle(partitions), tests): |
| 189 | partition.append(test) |
| 190 | return partitions |
| 191 | |
| 192 | |
| 193 | if __name__ == '__main__': |
| 194 | import time |
| 195 | |
| 196 | class SampleTestCase(unittest.TestCase): |
| 197 | """Dummy tests that sleep for demo.""" |
| 198 | |
| 199 | def test_me_1(self): |
| 200 | time.sleep(0.5) |
| 201 | |
| 202 | def test_me_2(self): |
| 203 | time.sleep(0.5) |
| 204 | |
| 205 | def test_me_3(self): |
| 206 | time.sleep(0.5) |
| 207 | |
| 208 | def test_me_4(self): |
| 209 | time.sleep(0.5) |
| 210 | |
| 211 | # Load tests from SampleTestCase defined above |
| 212 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| 213 | runner = unittest.TextTestRunner() |
| 214 | |
| 215 | # Run tests sequentially |
| 216 | runner.run(suite) |
| 217 | |
| 218 | # Run same tests across 4 processes |
| 219 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| 220 | concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4)) |
| 221 | runner.run(concurrent_suite) |