Joshua Hesketh | 66f9f60 | 2013-08-14 11:28:10 +1000 | [diff] [blame^] | 1 | #!/usr/bin/python2 |
| 2 | # |
| 3 | # Copyright 2013 Rackspace Australia |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 6 | # not use this file except in compliance with the License. You may obtain |
| 7 | # a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 13 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 14 | # License for the specific language governing permissions and limitations |
| 15 | # under the License. |
| 16 | |
| 17 | import gear |
| 18 | import threading |
| 19 | import os |
| 20 | |
| 21 | from turbo_hipster.worker_manager import GearmanManager |
| 22 | |
| 23 | |
| 24 | class FakeGearmanManager(GearmanManager): |
| 25 | def __init__(self, config, tasks, test): |
| 26 | self.test = test |
| 27 | super(FakeGearmanManager, self).__init__(config, tasks) |
| 28 | |
| 29 | def setup_gearman(self): |
| 30 | hostname = os.uname()[1] |
| 31 | self.gearman_worker = FakeWorker('turbo-hipster-manager-%s' |
| 32 | % hostname, self.test) |
| 33 | self.gearman_worker.addServer( |
| 34 | self.config['zuul_server']['gearman_host'], |
| 35 | self.config['zuul_server']['gearman_port'] |
| 36 | ) |
| 37 | self.gearman_worker.registerFunction( |
| 38 | 'stop:turbo-hipster-manager-%s' % hostname) |
| 39 | |
| 40 | |
| 41 | class FakeWorker(gear.Worker): |
| 42 | def __init__(self, worker_id, test): |
| 43 | super(FakeWorker, self).__init__(worker_id) |
| 44 | self.gearman_jobs = {} |
| 45 | self.build_history = [] |
| 46 | self.running_builds = [] |
| 47 | self.build_counter = 0 |
| 48 | self.fail_tests = {} |
| 49 | self.test = test |
| 50 | |
| 51 | self.hold_jobs_in_build = False |
| 52 | self.lock = threading.Lock() |
| 53 | self.__work_thread = threading.Thread(target=self.work) |
| 54 | self.__work_thread.daemon = True |
| 55 | self.__work_thread.start() |
| 56 | |
| 57 | def handleJob(self, job): |
| 58 | parts = job.name.split(":") |
| 59 | cmd = parts[0] |
| 60 | name = parts[1] |
| 61 | if len(parts) > 2: |
| 62 | node = parts[2] |
| 63 | else: |
| 64 | node = None |
| 65 | if cmd == 'build': |
| 66 | self.handleBuild(job, name, node) |
| 67 | elif cmd == 'stop': |
| 68 | self.handleStop(job, name) |
| 69 | elif cmd == 'set_description': |
| 70 | self.handleSetDescription(job, name) |
| 71 | |
| 72 | def handleBuild(self, job, name, node): |
| 73 | build = FakeBuild(self, job, self.build_counter, node) |
| 74 | job.build = build |
| 75 | self.gearman_jobs[job.unique] = job |
| 76 | self.build_counter += 1 |
| 77 | |
| 78 | self.running_builds.append(build) |
| 79 | build.start() |
| 80 | |
| 81 | def handleStop(self, job, name): |
| 82 | self.log.debug("handle stop") |
| 83 | parameters = json.loads(job.arguments) |
| 84 | name = parameters['name'] |
| 85 | number = parameters['number'] |
| 86 | for build in self.running_builds: |
| 87 | if build.name == name and build.number == number: |
| 88 | build.aborted = True |
| 89 | build.release() |
| 90 | job.sendWorkComplete() |
| 91 | return |
| 92 | job.sendWorkFail() |
| 93 | |
| 94 | def handleSetDescription(self, job, name): |
| 95 | self.log.debug("handle set description") |
| 96 | parameters = json.loads(job.arguments) |
| 97 | name = parameters['name'] |
| 98 | number = parameters['number'] |
| 99 | descr = parameters['html_description'] |
| 100 | for build in self.running_builds: |
| 101 | if build.name == name and build.number == number: |
| 102 | build.description = descr |
| 103 | job.sendWorkComplete() |
| 104 | return |
| 105 | for build in self.build_history: |
| 106 | if build.name == name and build.number == number: |
| 107 | build.description = descr |
| 108 | job.sendWorkComplete() |
| 109 | return |
| 110 | job.sendWorkFail() |
| 111 | |
| 112 | def work(self): |
| 113 | while self.running: |
| 114 | try: |
| 115 | job = self.getJob() |
| 116 | except gear.InterruptedError: |
| 117 | continue |
| 118 | try: |
| 119 | self.handleJob(job) |
| 120 | except: |
| 121 | self.log.exception("Worker exception:") |
| 122 | |
| 123 | def addFailTest(self, name, change): |
| 124 | l = self.fail_tests.get(name, []) |
| 125 | l.append(change) |
| 126 | self.fail_tests[name] = l |
| 127 | |
| 128 | def shouldFailTest(self, name, ref): |
| 129 | l = self.fail_tests.get(name, []) |
| 130 | for change in l: |
| 131 | if self.test.ref_has_change(ref, change): |
| 132 | return True |
| 133 | return False |
| 134 | |
| 135 | def release(self, regex=None): |
| 136 | builds = self.running_builds[:] |
| 137 | self.log.debug("releasing build %s (%s)" % (regex, |
| 138 | len(self.running_builds))) |
| 139 | for build in builds: |
| 140 | if not regex or re.match(regex, build.name): |
| 141 | self.log.debug("releasing build %s" % |
| 142 | (build.parameters['ZUUL_UUID'])) |
| 143 | build.release() |
| 144 | else: |
| 145 | self.log.debug("not releasing build %s" % |
| 146 | (build.parameters['ZUUL_UUID'])) |
| 147 | self.log.debug("done releasing builds %s (%s)" % |
| 148 | (regex, len(self.running_builds))) |
| 149 | |
| 150 | |
| 151 | class FakeBuild(threading.Thread): |
| 152 | def __init__(self, worker, job, number, node): |
| 153 | threading.Thread.__init__(self) |
| 154 | self.daemon = True |
| 155 | self.worker = worker |
| 156 | self.job = job |
| 157 | self.name = job.name.split(':')[1] |
| 158 | self.number = number |
| 159 | self.node = node |
| 160 | self.parameters = json.loads(job.arguments) |
| 161 | self.unique = self.parameters['ZUUL_UUID'] |
| 162 | self.wait_condition = threading.Condition() |
| 163 | self.waiting = False |
| 164 | self.aborted = False |
| 165 | self.created = time.time() |
| 166 | self.description = '' |
| 167 | |
| 168 | def release(self): |
| 169 | self.wait_condition.acquire() |
| 170 | self.wait_condition.notify() |
| 171 | self.waiting = False |
| 172 | self.log.debug("Build %s released" % self.unique) |
| 173 | self.wait_condition.release() |
| 174 | |
| 175 | def isWaiting(self): |
| 176 | self.wait_condition.acquire() |
| 177 | if self.waiting: |
| 178 | ret = True |
| 179 | else: |
| 180 | ret = False |
| 181 | self.wait_condition.release() |
| 182 | return ret |
| 183 | |
| 184 | def _wait(self): |
| 185 | self.wait_condition.acquire() |
| 186 | self.waiting = True |
| 187 | self.log.debug("Build %s waiting" % self.unique) |
| 188 | self.wait_condition.wait() |
| 189 | self.wait_condition.release() |
| 190 | |
| 191 | def run(self): |
| 192 | data = { |
| 193 | 'url': 'https://server/job/%s/%s/' % (self.name, self.number), |
| 194 | 'name': self.name, |
| 195 | 'number': self.number, |
| 196 | 'manager': self.worker.worker_id, |
| 197 | } |
| 198 | |
| 199 | self.job.sendWorkData(json.dumps(data)) |
| 200 | self.job.sendWorkStatus(0, 100) |
| 201 | |
| 202 | if self.worker.hold_jobs_in_build: |
| 203 | self._wait() |
| 204 | self.log.debug("Build %s continuing" % self.unique) |
| 205 | |
| 206 | self.worker.lock.acquire() |
| 207 | |
| 208 | result = 'SUCCESS' |
| 209 | if (('ZUUL_REF' in self.parameters) and |
| 210 | self.worker.shouldFailTest(self.name, |
| 211 | self.parameters['ZUUL_REF'])): |
| 212 | result = 'FAILURE' |
| 213 | if self.aborted: |
| 214 | result = 'ABORTED' |
| 215 | |
| 216 | data = {'result': result} |
| 217 | changes = None |
| 218 | if 'ZUUL_CHANGE_IDS' in self.parameters: |
| 219 | changes = self.parameters['ZUUL_CHANGE_IDS'] |
| 220 | |
| 221 | self.worker.build_history.append( |
| 222 | BuildHistory(name=self.name, number=self.number, |
| 223 | result=result, changes=changes, node=self.node, |
| 224 | uuid=self.unique, description=self.description, |
| 225 | pipeline=self.parameters['ZUUL_PIPELINE']) |
| 226 | ) |
| 227 | |
| 228 | self.job.sendWorkComplete(json.dumps(data)) |
| 229 | del self.worker.gearman_jobs[self.job.unique] |
| 230 | self.worker.running_builds.remove(self) |
| 231 | self.worker.lock.release() |
| 232 | |
| 233 | |
| 234 | class FakeGearmanServer(gear.Server): |
| 235 | def __init__(self, port=4730): |
| 236 | self.hold_jobs_in_queue = False |
| 237 | super(FakeGearmanServer, self).__init__(port) |
| 238 | |
| 239 | def getJobForConnection(self, connection, peek=False): |
| 240 | for queue in [self.high_queue, self.normal_queue, self.low_queue]: |
| 241 | for job in queue: |
| 242 | if not hasattr(job, 'waiting'): |
| 243 | if job.name.startswith('build:'): |
| 244 | job.waiting = self.hold_jobs_in_queue |
| 245 | else: |
| 246 | job.waiting = False |
| 247 | if job.waiting: |
| 248 | continue |
| 249 | if job.name in connection.functions: |
| 250 | if not peek: |
| 251 | queue.remove(job) |
| 252 | connection.related_jobs[job.handle] = job |
| 253 | job.worker_connection = connection |
| 254 | job.running = True |
| 255 | return job |
| 256 | return None |
| 257 | |
| 258 | def release(self, regex=None): |
| 259 | released = False |
| 260 | qlen = (len(self.high_queue) + len(self.normal_queue) + |
| 261 | len(self.low_queue)) |
| 262 | self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) |
| 263 | for job in self.getQueue(): |
| 264 | cmd, name = job.name.split(':') |
| 265 | if cmd != 'build': |
| 266 | continue |
| 267 | if not regex or re.match(regex, name): |
| 268 | self.log.debug("releasing queued job %s" % |
| 269 | job.unique) |
| 270 | job.waiting = False |
| 271 | released = True |
| 272 | else: |
| 273 | self.log.debug("not releasing queued job %s" % |
| 274 | job.unique) |
| 275 | if released: |
| 276 | self.wakeConnections() |
| 277 | qlen = (len(self.high_queue) + len(self.normal_queue) + |
| 278 | len(self.low_queue)) |
| 279 | self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen)) |
| 280 | |