blob: 56bebd553513ce73a8514aa44ef31b7d93ea243c [file] [log] [blame]
Joshua Heskethe76a0dd2014-01-16 17:57:45 +11001# Copyright 2013 Rackspace Australia
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15
16import copy
17import json
18import logging
19import os
Jan Kundrát834733b2015-06-12 01:50:40 +020020import tempfile
Jan Kundrát2221d482014-11-28 01:54:34 +010021import pkg_resources
22import socket
Joshua Hesketh62721fb2014-12-02 13:22:10 +110023import uuid
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110024
Joshua Hesketh62245542014-01-16 18:00:56 +110025from turbo_hipster.lib import common
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110026from turbo_hipster.lib import utils
27
28
29class Task(object):
Joshua Hesketh9cd2f932014-03-05 16:49:22 +110030 """ A base object for running a job (aka Task) """
Joshua Heskethd5d7a212014-10-29 17:42:59 +110031 log = logging.getLogger("task")
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110032
Joshua Hesketh62721fb2014-12-02 13:22:10 +110033 def __init__(self, worker_server, job_name, job_config):
34 # TODO(jhesketh): remove the need for worker_server here
Joshua Hesketh96adb282014-03-25 16:26:45 +110035 self.worker_server = worker_server
Joshua Hesketh62721fb2014-12-02 13:22:10 +110036 # NOTE(jhesketh): job_config may be in the old format where name
37 # refers to the plugin and function is the job name. Thus these should
38 # never be used in a job, instead use the provided job_name.
39 self.job_config = job_config
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110040 self.job_name = job_name
Joshua Hesketh81f87ed2014-01-18 15:24:48 +110041 self._reset()
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110042
Joshua Hesketh81f87ed2014-01-18 15:24:48 +110043 # Define the number of steps we will do to determine our progress.
44 self.total_steps = 0
45
Joshua Heskethd5d7a212014-10-29 17:42:59 +110046 def _cleanup(self):
47 if self.log_handler:
48 self.log.removeHandler(self.log_handler)
49 self.log_handler.flush()
50 self.log_handler.close()
Joshua Hesketh91f4ff62015-02-24 16:21:24 +110051 if ('shutdown-th' in self.job_config and
Joshua Hesketha4b178d2015-06-04 14:13:31 +100052 self.job_config['shutdown-th']):
Joshua Hesketh91f4ff62015-02-24 16:21:24 +110053 self.worker_server.shutdown_gracefully()
Joshua Heskethd5d7a212014-10-29 17:42:59 +110054
Joshua Hesketh81f87ed2014-01-18 15:24:48 +110055 def _reset(self):
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110056 self.job = None
57 self.job_arguments = None
58 self.work_data = None
59 self.cancelled = False
Joshua Hesketh81f87ed2014-01-18 15:24:48 +110060 self.success = True
61 self.messages = []
Joshua Heskethe76a0dd2014-01-16 17:57:45 +110062 self.current_step = 0
Joshua Heskethd5d7a212014-10-29 17:42:59 +110063 self.log_handler = None
Joshua Hesketh62721fb2014-12-02 13:22:10 +110064 self.th_uuid = str(uuid.uuid4())[-12:]
Joshua Heskethd5d7a212014-10-29 17:42:59 +110065
66 def _prep_working_dir(self):
Joshua Hesketh62721fb2014-12-02 13:22:10 +110067 # Use the th_uuid so that if the same job is somehow taken twice from
68 # zuul we won't re-use zuul's uuid. This shouldn't happen but if it
69 # does it prevents overwriting previous results
Joshua Heskethd5d7a212014-10-29 17:42:59 +110070 self.job_working_dir = os.path.join(
71 self.worker_server.config['jobs_working_dir'],
Joshua Hesketh62721fb2014-12-02 13:22:10 +110072 self.th_uuid,
73 self.job_arguments['LOG_PATH']
Joshua Heskethd5d7a212014-10-29 17:42:59 +110074 )
75 self.job_results_dir = os.path.join(
76 self.job_working_dir,
77 'results'
78 )
79 self.task_output_log = os.path.join(
80 self.job_results_dir,
81 'task_output.log'
82 )
83
84 if not os.path.isdir(os.path.dirname(self.task_output_log)):
85 os.makedirs(os.path.dirname(self.task_output_log))
86
87 def _setup_task_logging(self):
88 self.log_handler = logging.FileHandler(self.task_output_log)
89 log_formatter = logging.Formatter('%(asctime)s %(message)s')
90 self.log_handler.setFormatter(log_formatter)
91 self.log.addHandler(self.log_handler)
92 self.log.setLevel(logging.DEBUG)
Joshua Hesketh81f87ed2014-01-18 15:24:48 +110093
94 def start_job(self, job):
95 self._reset()
96 self.job = job
97
98 if self.job is not None:
99 try:
100 self.job_arguments = \
101 json.loads(self.job.arguments.decode('utf-8'))
102 self.log.debug("Got job from ZUUL %s" % self.job_arguments)
103
104 # Send an initial WORK_DATA and WORK_STATUS packets
105 self._send_work_data()
106
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100107 # Prep working dirs
108 self._prep_working_dir()
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100109
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100110 # Now we have working dirs we can log the job details to a file
111 self._setup_task_logging()
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100112
113 except Exception as e:
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100114 # If something failed during this section we have been unable
115 # to log to file. As such raise an exception to gearman
116 self.log.exception("Failure during setup")
117 self.log.exception(e)
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100118 if not self.cancelled:
Joshua Hesketh6e20b162014-04-09 13:09:19 +1000119 self.success = False
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100120 self.messages.append('FAILURE during the job setup')
121 self.messages.append('Exception: %s' % e)
122 self._send_work_data()
123 self.job.sendWorkException(str(e).encode('utf-8'))
124 # No point trying the job, lets return here
Joshua Heskethf4b20382015-02-24 17:35:07 +1100125 self._send_final_results()
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100126 return
127
128 # From here we can log exceptions
129 try:
130 # Execute the job_steps
131 self.do_job_steps()
132 except Exception as e:
133 # Log the problem
134 if not self.cancelled:
135 self.success = False
136 self.log.exception('Something failed running the job!')
137 self.messages.append('FAILURE running the job')
138 self.messages.append('Exception: %s' % e)
139 # Don't return from here as we can continue uploading the
140 # logs
141
142 try:
143 self._cleanup()
144 self._upload_results()
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100145 except Exception as e:
146 # If something failed during this section we have been unable
147 # to upload the log. As such raise an exception to gearman
148 self.log.exception("Failure during cleanup and upload")
149 self.log.exception(e)
150 if not self.cancelled:
151 self.success = False
152 self.messages.append('FAILURE during cleanup and log '
153 'upload')
Joshua Hesketh6e20b162014-04-09 13:09:19 +1000154 self.messages.append('Exception: %s' % e)
155 self._send_work_data()
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100156 self.job.sendWorkException(str(e).encode('utf-8'))
Joshua Heskethf4b20382015-02-24 17:35:07 +1100157 finally:
158 # Finally, send updated work data and completed packets
159 self._send_final_results()
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100160
Joshua Hesketh38a17182014-03-05 14:19:38 +1100161 def stop_working(self, number=None):
162 # Check the number is for this job instance (None will cancel all)
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100163 # (makes it possible to run multiple workers with this task
164 # on this server)
Joshua Hesketh38a17182014-03-05 14:19:38 +1100165 if number is None or number == self.job.unique:
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100166 self.log.debug("We've been asked to stop by our gearman manager")
167 self.cancelled = True
168 # TODO: Work out how to kill current step
169
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100170 def _get_work_data(self):
171 if self.work_data is None:
172 hostname = os.uname()[1]
Jan Kundrát2221d482014-11-28 01:54:34 +0100173 fqdn = socket.getfqdn()
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100174 self.work_data = dict(
175 name=self.job_name,
176 number=self.job.unique,
177 manager='turbo-hipster-manager-%s' % hostname,
178 url='http://localhost',
Jan Kundrát2221d482014-11-28 01:54:34 +0100179 worker_hostname=hostname,
180 worker_fqdn=fqdn,
181 worker_program='turbo-hipster',
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100182 )
Jan Kundrát2221d482014-11-28 01:54:34 +0100183 try:
184 self.work_data['worker_version'] = (
185 pkg_resources.get_distribution('turbo_hipster').version
186 )
187 except pkg_resources.DistributionNotFound:
188 # Package isn't installed; I do not think that manually
189 # attempting to extract version in some ad-hoc manner would be
190 # worth it -> just ignore this.
191 pass
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100192 return self.work_data
193
194 def _send_work_data(self):
195 """ Send the WORK DATA in json format for job """
196 self.log.debug("Send the work data response: %s" %
197 json.dumps(self._get_work_data()))
Joshua Hesketh3cda79f2014-01-31 13:10:29 +1100198 if self.success:
199 self.work_data['result'] = 'SUCCESS'
200 else:
Jan Kundrát5643bea2017-01-26 21:11:22 +0100201 self.work_data['result'] = 'FAILURE'
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100202 self.job.sendWorkData(json.dumps(self._get_work_data()))
203
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100204 def _send_final_results(self):
205 self._send_work_data()
206
Joshua Heskethb5f99b62014-01-30 16:03:19 +1100207 if self.success:
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100208 self.job.sendWorkComplete(
209 json.dumps(self._get_work_data()))
210 else:
211 self.job.sendWorkFail()
212
Joshua Heskethe76a0dd2014-01-16 17:57:45 +1100213 def _do_next_step(self):
214 """ Send a WORK_STATUS command to the gearman server.
215 This can provide a progress bar. """
216
217 # Each opportunity we should check if we need to stop
218 if self.cancelled:
219 self.work_data['result'] = "Failed: Job cancelled"
220 self.job.sendWorkStatus(self.current_step, self.total_steps)
221 self.job.sendWorkFail()
222 raise Exception('Job cancelled')
223
224 self.current_step += 1
225 self.job.sendWorkStatus(self.current_step, self.total_steps)
Joshua Hesketh91778762014-01-16 18:24:46 +1100226
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100227 def _upload_results(self):
228 """Upload the contents of the working dir either using the instructions
229 provided by zuul and/or our configuration"""
230
231 self.log.debug("Process the resulting files (upload/push)")
232
Joshua Hesketh05baf012014-12-02 16:33:09 +1100233 dir_list = os.listdir(self.job_results_dir)
234 path_list = [os.path.join(self.job_results_dir, i) for i in dir_list]
235
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100236 if 'publish_logs' in self.worker_server.config:
Joshua Hesketh05baf012014-12-02 16:33:09 +1100237 index_url = utils.push_files(
238 self.job_arguments['LOG_PATH'], path_list,
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100239 self.worker_server.config['publish_logs'])
240 self.log.debug("Index URL found at %s" % index_url)
241 self.work_data['url'] = index_url
242
243 if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments:
244 # Upload to zuul's url as instructed
245 utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
Joshua Hesketh62721fb2014-12-02 13:22:10 +1100246 self.work_data['url'] = self.job_arguments['LOG_PATH']
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100247
Joshua Hesketh91778762014-01-16 18:24:46 +1100248
249class ShellTask(Task):
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100250 log = logging.getLogger("task.shell_task")
Joshua Hesketh91778762014-01-16 18:24:46 +1100251
Joshua Hesketh62721fb2014-12-02 13:22:10 +1100252 def __init__(self, worker_server, job_name, job_config):
253 super(ShellTask, self).__init__(worker_server, job_name, job_config)
Joshua Hesketh91778762014-01-16 18:24:46 +1100254 # Define the number of steps we will do to determine our progress.
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100255 self.total_steps = 5
Joshua Hesketh91778762014-01-16 18:24:46 +1100256
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100257 def _reset(self):
258 super(ShellTask, self)._reset()
259 self.git_path = None
Joshua Heskethc73328c2014-01-18 16:09:54 +1100260 self.job_working_dir = None
261 self.shell_output_log = None
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100262 self.git_prep_log = None
Jan Kundrát834733b2015-06-12 01:50:40 +0200263 self.output_summary = None
Joshua Hesketh91778762014-01-16 18:24:46 +1100264
Joshua Hesketh235b13d2014-01-30 15:14:04 +1100265 def do_job_steps(self):
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100266 self.log.info('Step 1: Setup environment')
267 self._setup_environment()
Joshua Heskethc73328c2014-01-18 16:09:54 +1100268
Joshua Hesketh96adb282014-03-25 16:26:45 +1100269 self.log.info('Step 2: Checkout updates from git')
Joshua Hesketh1f2d1a22014-01-30 15:41:21 +1100270 self._grab_patchset(self.job_arguments)
271
Joshua Hesketh96adb282014-03-25 16:26:45 +1100272 self.log.info('Step 3: Run shell script')
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100273 self._execute_script()
Joshua Hesketh91778762014-01-16 18:24:46 +1100274
Joshua Hesketh96adb282014-03-25 16:26:45 +1100275 self.log.info('Step 4: Analyse logs for errors')
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100276 self._parse_and_check_results()
Joshua Hesketh91778762014-01-16 18:24:46 +1100277
Joshua Heskethaae97862014-12-02 15:19:14 +1100278 self.log.info('Step 5: Handle the results')
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100279 self._handle_results()
Joshua Hesketh91778762014-01-16 18:24:46 +1100280
Joshua Hesketh96adb282014-03-25 16:26:45 +1100281 self.log.info('Step 6: Handle extra actions such as shutting down')
282 self._handle_cleanup()
283
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100284 @common.task_step
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100285 def _setup_environment(self):
286 self.git_prep_log = os.path.join(
287 self.job_results_dir,
288 'git_prep.log'
Joshua Hesketh1f2d1a22014-01-30 15:41:21 +1100289 )
290 self.shell_output_log = os.path.join(
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100291 self.job_results_dir,
Joshua Hesketh1f2d1a22014-01-30 15:41:21 +1100292 'shell_output.log'
293 )
Jan Kundrát834733b2015-06-12 01:50:40 +0200294 self.output_summary = tempfile.mkstemp()
Joshua Hesketh9b2c3122015-02-26 13:18:37 +1100295 self.log.info('Working on node %s' % (os.uname()[1]))
Joshua Hesketh1f2d1a22014-01-30 15:41:21 +1100296
Joshua Hesketh1f2d1a22014-01-30 15:41:21 +1100297 @common.task_step
298 def _grab_patchset(self, job_args):
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100299 """ Checkout the reference into config['git_working_dir'] """
Joshua Hesketh91778762014-01-16 18:24:46 +1100300
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100301 # TODO(jhesketh): Use the zuul cloner stuff instead :-)
302
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100303 self.log.debug("Grab the patchset we want to test against")
Joshua Hesketh96adb282014-03-25 16:26:45 +1100304 local_path = os.path.join(self.worker_server.config['git_working_dir'],
Joshua Hesketh62721fb2014-12-02 13:22:10 +1100305 self.th_uuid, job_args['ZUUL_PROJECT'])
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100306 if not os.path.exists(local_path):
307 os.makedirs(local_path)
Joshua Hesketh91778762014-01-16 18:24:46 +1100308
Joshua Heskethbad0b1f2015-06-03 23:01:30 +1000309 env = os.environ
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100310 git_args = copy.deepcopy(job_args)
Joshua Heskethbad0b1f2015-06-03 23:01:30 +1000311 env.update(git_args)
Joshua Hesketh91778762014-01-16 18:24:46 +1100312
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100313 cmd = os.path.join(
314 os.path.join(os.path.dirname(os.path.abspath(__file__)),
315 'gerrit-git-prep.sh')
316 )
Joshua Hesketh96adb282014-03-25 16:26:45 +1100317 cmd += ' ' + self.worker_server.config['zuul_server']['gerrit_site']
Joshua Hesketh1c8d2df2014-05-05 15:48:31 +1000318 cmd += ' ' + self.worker_server.config['zuul_server']['git_origin']
Joshua Hesketh91f4ff62015-02-24 16:21:24 +1100319
320 # NOTE(jhesketh): The most common problem is the git remote timing out
321 # Retry cloning multiple times before raising a failure.
322 tries = 0
323 return_code = 1
324 while return_code != 0:
325 tries += 1
Joshua Heskethf2652452015-06-05 20:14:38 +1000326 env.update({'GIT_CURL_VERBOSE': '1', 'GIT_TRACE': '1'})
Joshua Hesketh91f4ff62015-02-24 16:21:24 +1100327 return_code = utils.execute_to_log(cmd, self.git_prep_log,
Joshua Heskethbad0b1f2015-06-03 23:01:30 +1000328 env=env, cwd=local_path)
329 if tries == 2:
Joshua Hesketh5ef62272015-05-01 11:59:36 +1000330 # Try upping the post buffer. See:
331 # http://stackoverflow.com/questions/6842687/
332 # the-remote-end-hung-up-unexpectedly-while-git-cloning
333 utils.execute_to_log(
334 "git config --global http.postBuffer 1048576000",
Joshua Heskethbad0b1f2015-06-03 23:01:30 +1000335 self.git_prep_log, env=env, cwd=local_path)
336 if tries >= 4:
Joshua Hesketh91f4ff62015-02-24 16:21:24 +1100337 break
338 if return_code != 0:
Joshua Hesketh9b2c3122015-02-26 13:18:37 +1100339 cmd = 'ifconfig'
340 utils.execute_to_log(cmd, self.git_prep_log)
Joshua Hesketh91f4ff62015-02-24 16:21:24 +1100341 raise Exception("Failed to fetch patchset")
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100342 self.git_path = local_path
343 return local_path
344
345 @common.task_step
346 def _execute_script(self):
347 # Run script
Joshua Hesketh62721fb2014-12-02 13:22:10 +1100348 cmd = self.job_config['shell_script']
Joshua Heskethc73328c2014-01-18 16:09:54 +1100349 cmd += (
350 (' %(git_path)s %(job_working_dir)s %(unique_id)s')
351 % {
352 'git_path': self.git_path,
353 'job_working_dir': self.job_working_dir,
354 'unique_id': self.job.unique
355 }
356 )
Jan Kundrát977a6542014-11-27 23:16:36 +0100357
358 env_args = copy.deepcopy(os.environ)
359 env_args.update(self.job_arguments)
Jan Kundrát3e3deef2014-11-28 02:20:12 +0100360 if self.job.name.startswith('build:'):
361 env_args['TH_JOB_NAME'] = self.job.name[len('build:'):]
362 else:
363 env_args['TH_JOB_NAME'] = self.job.name
Jan Kundrát834733b2015-06-12 01:50:40 +0200364 env_args['TH_RESULT_FILE'] = self.output_summary[1]
Jan Kundrát977a6542014-11-27 23:16:36 +0100365
Joshua Heskethc73328c2014-01-18 16:09:54 +1100366 self.script_return_code = utils.execute_to_log(
367 cmd,
Jan Kundrát977a6542014-11-27 23:16:36 +0100368 self.shell_output_log,
369 env=env_args
Joshua Heskethc73328c2014-01-18 16:09:54 +1100370 )
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100371
372 @common.task_step
373 def _parse_and_check_results(self):
374 if self.script_return_code > 0:
375 self.success = False
Jan Kundrát834733b2015-06-12 01:50:40 +0200376 with os.fdopen(self.output_summary[0]) as fp:
377 line = fp.readline().strip()
378 if len(line) and not line.startswith('SUCCESS'):
379 self.messages.append(line)
Joshua Hesketh81f87ed2014-01-18 15:24:48 +1100380 self.messages.append('Return code from test script was non-zero '
381 '(%d)' % self.script_return_code)
382
383 @common.task_step
Joshua Hesketh96adb282014-03-25 16:26:45 +1100384 def _handle_cleanup(self):
385 """Handle and cleanup functions. Shutdown if requested to so that no
386 further jobs are ran if the environment is dirty."""
Jan Kundrát834733b2015-06-12 01:50:40 +0200387
388 try:
389 os.remove(self.output_summary[1])
390 except OSError:
391 pass
Joshua Heskethd5d7a212014-10-29 17:42:59 +1100392
393 @common.task_step
394 def _handle_results(self):
395 pass