blob: 5a2f972e702df6b3ceb9c7a1c6efa357a1d63668 [file] [log] [blame]
Joshua Hesketh66f9f602013-08-14 11:28:10 +10001#!/usr/bin/python2
2#
3# Copyright 2013 Rackspace Australia
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17import gear
18import threading
19import os
20
21from turbo_hipster.worker_manager import GearmanManager
22
23
24class FakeGearmanManager(GearmanManager):
25 def __init__(self, config, tasks, test):
26 self.test = test
27 super(FakeGearmanManager, self).__init__(config, tasks)
28
29 def setup_gearman(self):
30 hostname = os.uname()[1]
31 self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
32 % hostname, self.test)
33 self.gearman_worker.addServer(
34 self.config['zuul_server']['gearman_host'],
35 self.config['zuul_server']['gearman_port']
36 )
37 self.gearman_worker.registerFunction(
38 'stop:turbo-hipster-manager-%s' % hostname)
39
40
41class FakeWorker(gear.Worker):
42 def __init__(self, worker_id, test):
43 super(FakeWorker, self).__init__(worker_id)
44 self.gearman_jobs = {}
45 self.build_history = []
46 self.running_builds = []
47 self.build_counter = 0
48 self.fail_tests = {}
49 self.test = test
50
51 self.hold_jobs_in_build = False
52 self.lock = threading.Lock()
53 self.__work_thread = threading.Thread(target=self.work)
54 self.__work_thread.daemon = True
55 self.__work_thread.start()
56
57 def handleJob(self, job):
58 parts = job.name.split(":")
59 cmd = parts[0]
60 name = parts[1]
61 if len(parts) > 2:
62 node = parts[2]
63 else:
64 node = None
65 if cmd == 'build':
66 self.handleBuild(job, name, node)
67 elif cmd == 'stop':
68 self.handleStop(job, name)
69 elif cmd == 'set_description':
70 self.handleSetDescription(job, name)
71
72 def handleBuild(self, job, name, node):
73 build = FakeBuild(self, job, self.build_counter, node)
74 job.build = build
75 self.gearman_jobs[job.unique] = job
76 self.build_counter += 1
77
78 self.running_builds.append(build)
79 build.start()
80
81 def handleStop(self, job, name):
82 self.log.debug("handle stop")
83 parameters = json.loads(job.arguments)
84 name = parameters['name']
85 number = parameters['number']
86 for build in self.running_builds:
87 if build.name == name and build.number == number:
88 build.aborted = True
89 build.release()
90 job.sendWorkComplete()
91 return
92 job.sendWorkFail()
93
94 def handleSetDescription(self, job, name):
95 self.log.debug("handle set description")
96 parameters = json.loads(job.arguments)
97 name = parameters['name']
98 number = parameters['number']
99 descr = parameters['html_description']
100 for build in self.running_builds:
101 if build.name == name and build.number == number:
102 build.description = descr
103 job.sendWorkComplete()
104 return
105 for build in self.build_history:
106 if build.name == name and build.number == number:
107 build.description = descr
108 job.sendWorkComplete()
109 return
110 job.sendWorkFail()
111
112 def work(self):
113 while self.running:
114 try:
115 job = self.getJob()
116 except gear.InterruptedError:
117 continue
118 try:
119 self.handleJob(job)
120 except:
121 self.log.exception("Worker exception:")
122
123 def addFailTest(self, name, change):
124 l = self.fail_tests.get(name, [])
125 l.append(change)
126 self.fail_tests[name] = l
127
128 def shouldFailTest(self, name, ref):
129 l = self.fail_tests.get(name, [])
130 for change in l:
131 if self.test.ref_has_change(ref, change):
132 return True
133 return False
134
135 def release(self, regex=None):
136 builds = self.running_builds[:]
137 self.log.debug("releasing build %s (%s)" % (regex,
138 len(self.running_builds)))
139 for build in builds:
140 if not regex or re.match(regex, build.name):
141 self.log.debug("releasing build %s" %
142 (build.parameters['ZUUL_UUID']))
143 build.release()
144 else:
145 self.log.debug("not releasing build %s" %
146 (build.parameters['ZUUL_UUID']))
147 self.log.debug("done releasing builds %s (%s)" %
148 (regex, len(self.running_builds)))
149
150
151class FakeBuild(threading.Thread):
152 def __init__(self, worker, job, number, node):
153 threading.Thread.__init__(self)
154 self.daemon = True
155 self.worker = worker
156 self.job = job
157 self.name = job.name.split(':')[1]
158 self.number = number
159 self.node = node
160 self.parameters = json.loads(job.arguments)
161 self.unique = self.parameters['ZUUL_UUID']
162 self.wait_condition = threading.Condition()
163 self.waiting = False
164 self.aborted = False
165 self.created = time.time()
166 self.description = ''
167
168 def release(self):
169 self.wait_condition.acquire()
170 self.wait_condition.notify()
171 self.waiting = False
172 self.log.debug("Build %s released" % self.unique)
173 self.wait_condition.release()
174
175 def isWaiting(self):
176 self.wait_condition.acquire()
177 if self.waiting:
178 ret = True
179 else:
180 ret = False
181 self.wait_condition.release()
182 return ret
183
184 def _wait(self):
185 self.wait_condition.acquire()
186 self.waiting = True
187 self.log.debug("Build %s waiting" % self.unique)
188 self.wait_condition.wait()
189 self.wait_condition.release()
190
191 def run(self):
192 data = {
193 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
194 'name': self.name,
195 'number': self.number,
196 'manager': self.worker.worker_id,
197 }
198
199 self.job.sendWorkData(json.dumps(data))
200 self.job.sendWorkStatus(0, 100)
201
202 if self.worker.hold_jobs_in_build:
203 self._wait()
204 self.log.debug("Build %s continuing" % self.unique)
205
206 self.worker.lock.acquire()
207
208 result = 'SUCCESS'
209 if (('ZUUL_REF' in self.parameters) and
210 self.worker.shouldFailTest(self.name,
211 self.parameters['ZUUL_REF'])):
212 result = 'FAILURE'
213 if self.aborted:
214 result = 'ABORTED'
215
216 data = {'result': result}
217 changes = None
218 if 'ZUUL_CHANGE_IDS' in self.parameters:
219 changes = self.parameters['ZUUL_CHANGE_IDS']
220
221 self.worker.build_history.append(
222 BuildHistory(name=self.name, number=self.number,
223 result=result, changes=changes, node=self.node,
224 uuid=self.unique, description=self.description,
225 pipeline=self.parameters['ZUUL_PIPELINE'])
226 )
227
228 self.job.sendWorkComplete(json.dumps(data))
229 del self.worker.gearman_jobs[self.job.unique]
230 self.worker.running_builds.remove(self)
231 self.worker.lock.release()
232
233
234class FakeGearmanServer(gear.Server):
235 def __init__(self, port=4730):
236 self.hold_jobs_in_queue = False
237 super(FakeGearmanServer, self).__init__(port)
238
239 def getJobForConnection(self, connection, peek=False):
240 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
241 for job in queue:
242 if not hasattr(job, 'waiting'):
243 if job.name.startswith('build:'):
244 job.waiting = self.hold_jobs_in_queue
245 else:
246 job.waiting = False
247 if job.waiting:
248 continue
249 if job.name in connection.functions:
250 if not peek:
251 queue.remove(job)
252 connection.related_jobs[job.handle] = job
253 job.worker_connection = connection
254 job.running = True
255 return job
256 return None
257
258 def release(self, regex=None):
259 released = False
260 qlen = (len(self.high_queue) + len(self.normal_queue) +
261 len(self.low_queue))
262 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
263 for job in self.getQueue():
264 cmd, name = job.name.split(':')
265 if cmd != 'build':
266 continue
267 if not regex or re.match(regex, name):
268 self.log.debug("releasing queued job %s" %
269 job.unique)
270 job.waiting = False
271 released = True
272 else:
273 self.log.debug("not releasing queued job %s" %
274 job.unique)
275 if released:
276 self.wakeConnections()
277 qlen = (len(self.high_queue) + len(self.normal_queue) +
278 len(self.low_queue))
279 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
280