blob: e354d5dd1845737e5170e9a26333294b9fafddea [file] [log] [blame]
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
from uuid import uuid4
import gear
import zuul.model
from zuul.lib.config import get_default
def getJobData(job):
if not len(job.data):
return {}
d = job.data[-1]
if not d:
return {}
return json.loads(d)
class MergeGearmanClient(gear.Client):
def __init__(self, merge_client):
super(MergeGearmanClient, self).__init__('Zuul Merge Client')
self.__merge_client = merge_client
def handleWorkComplete(self, packet):
job = super(MergeGearmanClient, self).handleWorkComplete(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkFail(self, packet):
job = super(MergeGearmanClient, self).handleWorkFail(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkException(self, packet):
job = super(MergeGearmanClient, self).handleWorkException(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleDisconnect(self, job):
job = super(MergeGearmanClient, self).handleDisconnect(job)
self.__merge_client.onBuildCompleted(job)
class MergeJob(gear.TextJob):
def __init__(self, *args, **kw):
super(MergeJob, self).__init__(*args, **kw)
self.__event = threading.Event()
def setComplete(self):
self.__event.set()
def wait(self, timeout=300):
return self.__event.wait(timeout)
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
def __init__(self, config, sched):
self.config = config
self.sched = sched
server = self.config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = MergeGearmanClient(self)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
self.jobs = set()
def stop(self):
self.gearman.shutdown()
def areMergesOutstanding(self):
if self.jobs:
return True
return False
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
uuid = str(uuid4().hex)
job = MergeJob(name,
json.dumps(data),
unique=uuid)
job.build_set = build_set
self.log.debug("Submitting job %s with data %s" % (job, data))
self.jobs.add(job)
self.gearman.submitJob(job, precedence=precedence,
timeout=300)
return job
def mergeChanges(self, items, build_set, files=None, dirs=None,
repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(items=items,
files=files,
dirs=dirs,
repo_state=repo_state)
self.submitJob('merger:merge', data, build_set, precedence)
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
precedence=zuul.model.PRECEDENCE_HIGH):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
files=files,
dirs=dirs)
job = self.submitJob('merger:cat', data, None, precedence)
return job
def onBuildCompleted(self, job):
data = getJobData(job)
zuul_url = data.get('zuul_url')
merged = data.get('merged', False)
updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
repo_state = data.get('repo_state', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
(job, merged, updated, commit))
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set, zuul_url,
merged, updated, commit, files,
repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)