| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| |
| from zuul import exceptions |
| |
| |
| class DynamicChangeQueueContextManager(object): |
| def __init__(self, change_queue): |
| self.change_queue = change_queue |
| |
| def __enter__(self): |
| return self.change_queue |
| |
| def __exit__(self, etype, value, tb): |
| if self.change_queue and not self.change_queue.queue: |
| self.change_queue.pipeline.removeQueue(self.change_queue) |
| |
| |
| class StaticChangeQueueContextManager(object): |
| def __init__(self, change_queue): |
| self.change_queue = change_queue |
| |
| def __enter__(self): |
| return self.change_queue |
| |
| def __exit__(self, etype, value, tb): |
| pass |
| |
| |
| class PipelineManager(object): |
| """Abstract Base Class for enqueing and processing Changes in a Pipeline""" |
| |
| log = logging.getLogger("zuul.PipelineManager") |
| |
| def __init__(self, sched, pipeline): |
| self.sched = sched |
| self.pipeline = pipeline |
| self.event_filters = [] |
| self.ref_filters = [] |
| |
| def __str__(self): |
| return "<%s %s>" % (self.__class__.__name__, self.pipeline.name) |
| |
| def _postConfig(self, layout): |
| self.log.info("Configured Pipeline Manager %s" % self.pipeline.name) |
| self.log.info(" Requirements:") |
| for f in self.ref_filters: |
| self.log.info(" %s" % f) |
| self.log.info(" Events:") |
| for e in self.event_filters: |
| self.log.info(" %s" % e) |
| self.log.info(" Projects:") |
| |
| def log_jobs(job_list): |
| for job_name, job_variants in job_list.jobs.items(): |
| for variant in job_variants: |
| # TODOv3(jeblair): represent matchers |
| efilters = '' |
| # for b in tree.job._branches: |
| # efilters += str(b) |
| # for f in tree.job._files: |
| # efilters += str(f) |
| # if tree.job.skip_if_matcher: |
| # efilters += str(tree.job.skip_if_matcher) |
| # if efilters: |
| # efilters = ' ' + efilters |
| tags = [] |
| if variant.hold_following_changes: |
| tags.append('[hold]') |
| if not variant.voting: |
| tags.append('[nonvoting]') |
| if variant.semaphore: |
| tags.append('[semaphore: %s]' % variant.semaphore) |
| tags = ' '.join(tags) |
| self.log.info(" %s%s %s" % (repr(variant), |
| efilters, tags)) |
| |
| for project_name in layout.project_configs.keys(): |
| project_config = layout.project_configs.get(project_name) |
| if project_config: |
| project_pipeline_config = project_config.pipelines.get( |
| self.pipeline.name) |
| if project_pipeline_config: |
| self.log.info(" %s" % project_name) |
| log_jobs(project_pipeline_config.job_list) |
| self.log.info(" On start:") |
| self.log.info(" %s" % self.pipeline.start_actions) |
| self.log.info(" On success:") |
| self.log.info(" %s" % self.pipeline.success_actions) |
| self.log.info(" On failure:") |
| self.log.info(" %s" % self.pipeline.failure_actions) |
| self.log.info(" On merge-failure:") |
| self.log.info(" %s" % self.pipeline.merge_failure_actions) |
| self.log.info(" When disabled:") |
| self.log.info(" %s" % self.pipeline.disabled_actions) |
| |
| def getSubmitAllowNeeds(self): |
| # Get a list of code review labels that are allowed to be |
| # "needed" in the submit records for a change, with respect |
| # to this queue. In other words, the list of review labels |
| # this queue itself is likely to set before submitting. |
| allow_needs = set() |
| for action_reporter in self.pipeline.success_actions: |
| allow_needs.update(action_reporter.getSubmitAllowNeeds()) |
| return allow_needs |
| |
| def eventMatches(self, event, change): |
| if event.forced_pipeline: |
| if event.forced_pipeline == self.pipeline.name: |
| self.log.debug("Event %s for change %s was directly assigned " |
| "to pipeline %s" % (event, change, self)) |
| return True |
| else: |
| return False |
| for ef in self.event_filters: |
| if ef.matches(event, change): |
| self.log.debug("Event %s for change %s matched %s " |
| "in pipeline %s" % (event, change, ef, self)) |
| return True |
| return False |
| |
| def isChangeAlreadyInPipeline(self, change): |
| # Checks live items in the pipeline |
| for item in self.pipeline.getAllItems(): |
| if item.live and change.equals(item.change): |
| return True |
| return False |
| |
| def isChangeAlreadyInQueue(self, change, change_queue): |
| # Checks any item in the specified change queue |
| for item in change_queue.queue: |
| if change.equals(item.change): |
| return True |
| return False |
| |
| def reportStart(self, item): |
| if not self.pipeline._disabled: |
| try: |
| self.log.info("Reporting start, action %s item %s" % |
| (self.pipeline.start_actions, item)) |
| ret = self.sendReport(self.pipeline.start_actions, item) |
| if ret: |
| self.log.error("Reporting item start %s received: %s" % |
| (item, ret)) |
| except: |
| self.log.exception("Exception while reporting start:") |
| |
| def sendReport(self, action_reporters, item, message=None): |
| """Sends the built message off to configured reporters. |
| |
| Takes the action_reporters, item, message and extra options and |
| sends them to the pluggable reporters. |
| """ |
| report_errors = [] |
| if len(action_reporters) > 0: |
| for reporter in action_reporters: |
| ret = reporter.report(item) |
| if ret: |
| report_errors.append(ret) |
| if len(report_errors) == 0: |
| return |
| return report_errors |
| |
| def isChangeReadyToBeEnqueued(self, change): |
| return True |
| |
| def enqueueChangesAhead(self, change, quiet, ignore_requirements, |
| change_queue, history=None): |
| return True |
| |
| def enqueueChangesBehind(self, change, quiet, ignore_requirements, |
| change_queue): |
| return True |
| |
| def checkForChangesNeededBy(self, change, change_queue): |
| return True |
| |
| def getFailingDependentItems(self, item): |
| return None |
| |
| def getItemForChange(self, change): |
| for item in self.pipeline.getAllItems(): |
| if item.change.equals(change): |
| return item |
| return None |
| |
| def findOldVersionOfChangeAlreadyInQueue(self, change): |
| for item in self.pipeline.getAllItems(): |
| if not item.live: |
| continue |
| if change.isUpdateOf(item.change): |
| return item |
| return None |
| |
| def removeOldVersionsOfChange(self, change): |
| if not self.pipeline.dequeue_on_new_patchset: |
| return |
| old_item = self.findOldVersionOfChangeAlreadyInQueue(change) |
| if old_item: |
| self.log.debug("Change %s is a new version of %s, removing %s" % |
| (change, old_item.change, old_item)) |
| self.removeItem(old_item) |
| |
| def removeAbandonedChange(self, change): |
| self.log.debug("Change %s abandoned, removing." % change) |
| for item in self.pipeline.getAllItems(): |
| if not item.live: |
| continue |
| if item.change.equals(change): |
| self.removeItem(item) |
| |
| def reEnqueueItem(self, item, last_head): |
| with self.getChangeQueue(item.change, last_head.queue) as change_queue: |
| if change_queue: |
| self.log.debug("Re-enqueing change %s in queue %s" % |
| (item.change, change_queue)) |
| change_queue.enqueueItem(item) |
| |
| # Get an updated copy of the layout if necessary. |
| # This will return one of the following: |
| # 1) An existing layout from the item ahead or pipeline. |
| # 2) A newly created layout from the cached pipeline |
| # layout config plus the previously returned |
| # in-repo files stored in the buildset. |
| # 3) None in the case that a fetch of the files from |
| # the merger is still pending. |
| item.current_build_set.layout = self.getLayout(item) |
| |
| # Rebuild the frozen job tree from the new layout, if |
| # we have one. If not, it will be built later. |
| if item.current_build_set.layout: |
| item.freezeJobGraph() |
| |
| # Re-set build results in case any new jobs have been |
| # added to the tree. |
| for build in item.current_build_set.getBuilds(): |
| if build.result: |
| item.setResult(build) |
| # Similarly, reset the item state. |
| if item.current_build_set.unable_to_merge: |
| item.setUnableToMerge() |
| if item.current_build_set.config_error: |
| item.setConfigError(item.current_build_set.config_error) |
| if item.dequeued_needing_change: |
| item.setDequeuedNeedingChange() |
| |
| self.reportStats(item) |
| return True |
| else: |
| self.log.error("Unable to find change queue for project %s" % |
| item.change.project) |
| return False |
| |
| def addChange(self, change, quiet=False, enqueue_time=None, |
| ignore_requirements=False, live=True, |
| change_queue=None, history=None): |
| self.log.debug("Considering adding change %s" % change) |
| |
| # If we are adding a live change, check if it's a live item |
| # anywhere in the pipeline. Otherwise, we will perform the |
| # duplicate check below on the specific change_queue. |
| if live and self.isChangeAlreadyInPipeline(change): |
| self.log.debug("Change %s is already in pipeline, " |
| "ignoring" % change) |
| return True |
| |
| if not self.isChangeReadyToBeEnqueued(change): |
| self.log.debug("Change %s is not ready to be enqueued, ignoring" % |
| change) |
| return False |
| |
| if not ignore_requirements: |
| for f in self.ref_filters: |
| if f.connection_name != change.project.connection_name: |
| self.log.debug("Filter %s skipped for change %s due " |
| "to mismatched connections" % (f, change)) |
| continue |
| if not f.matches(change): |
| self.log.debug("Change %s does not match pipeline " |
| "requirement %s" % (change, f)) |
| return False |
| |
| with self.getChangeQueue(change, change_queue) as change_queue: |
| if not change_queue: |
| self.log.debug("Unable to find change queue for " |
| "change %s in project %s" % |
| (change, change.project)) |
| return False |
| |
| if not self.enqueueChangesAhead(change, quiet, ignore_requirements, |
| change_queue, history=history): |
| self.log.debug("Failed to enqueue changes " |
| "ahead of %s" % change) |
| return False |
| |
| if self.isChangeAlreadyInQueue(change, change_queue): |
| self.log.debug("Change %s is already in queue, " |
| "ignoring" % change) |
| return True |
| |
| self.log.info("Adding change %s to queue %s in %s" % |
| (change, change_queue, self.pipeline)) |
| item = change_queue.enqueueChange(change) |
| if enqueue_time: |
| item.enqueue_time = enqueue_time |
| item.live = live |
| self.reportStats(item) |
| item.quiet = quiet |
| self.enqueueChangesBehind(change, quiet, ignore_requirements, |
| change_queue) |
| zuul_driver = self.sched.connections.drivers['zuul'] |
| tenant = self.pipeline.layout.tenant |
| zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline) |
| return True |
| |
| def dequeueItem(self, item): |
| self.log.debug("Removing change %s from queue" % item.change) |
| item.queue.dequeueItem(item) |
| |
| def removeItem(self, item): |
| # Remove an item from the queue, probably because it has been |
| # superseded by another change. |
| self.log.debug("Canceling builds behind change: %s " |
| "because it is being removed." % item.change) |
| self.cancelJobs(item) |
| self.dequeueItem(item) |
| self.reportStats(item) |
| |
| def provisionNodes(self, item): |
| jobs = item.findJobsToRequest() |
| if not jobs: |
| return False |
| build_set = item.current_build_set |
| self.log.debug("Requesting nodes for change %s" % item.change) |
| for job in jobs: |
| req = self.sched.nodepool.requestNodes(build_set, job) |
| self.log.debug("Adding node request %s for job %s to item %s" % |
| (req, job, item)) |
| build_set.setJobNodeRequest(job.name, req) |
| return True |
| |
| def _executeJobs(self, item, jobs): |
| self.log.debug("Executing jobs for change %s" % item.change) |
| build_set = item.current_build_set |
| for job in jobs: |
| self.log.debug("Found job %s for change %s" % (job, item.change)) |
| try: |
| nodeset = item.current_build_set.getJobNodeSet(job.name) |
| self.sched.nodepool.useNodeSet(nodeset) |
| build = self.sched.executor.execute(job, item, |
| self.pipeline, |
| build_set.dependent_items, |
| build_set.merger_items) |
| self.log.debug("Adding build %s of job %s to item %s" % |
| (build, job, item)) |
| item.addBuild(build) |
| except: |
| self.log.exception("Exception while executing job %s " |
| "for change %s:" % (job, item.change)) |
| |
| def executeJobs(self, item): |
| # TODO(jeblair): This should return a value indicating a job |
| # was executed. Appears to be a longstanding bug. |
| if not item.current_build_set.layout: |
| return False |
| |
| jobs = item.findJobsToRun( |
| item.pipeline.layout.tenant.semaphore_handler) |
| if jobs: |
| self._executeJobs(item, jobs) |
| |
| def cancelJobs(self, item, prime=True): |
| self.log.debug("Cancel jobs for change %s" % item.change) |
| canceled = False |
| old_build_set = item.current_build_set |
| if prime and item.current_build_set.ref: |
| item.resetAllBuilds() |
| for req in old_build_set.node_requests.values(): |
| self.sched.nodepool.cancelRequest(req) |
| old_build_set.node_requests = {} |
| canceled_jobs = set() |
| for build in old_build_set.getBuilds(): |
| if build.result: |
| canceled_jobs.add(build.job.name) |
| continue |
| was_running = False |
| try: |
| was_running = self.sched.executor.cancel(build) |
| except: |
| self.log.exception("Exception while canceling build %s " |
| "for change %s" % (build, item.change)) |
| finally: |
| tenant = old_build_set.item.pipeline.layout.tenant |
| tenant.semaphore_handler.release( |
| old_build_set.item, build.job) |
| |
| if not was_running: |
| try: |
| nodeset = build.build_set.getJobNodeSet(build.job.name) |
| self.sched.nodepool.returnNodeSet(nodeset) |
| except Exception: |
| self.log.exception("Unable to return nodeset %s for " |
| "canceled build request %s" % |
| (nodeset, build)) |
| build.result = 'CANCELED' |
| canceled = True |
| canceled_jobs.add(build.job.name) |
| for jobname, nodeset in list(old_build_set.nodesets.items()): |
| if jobname in canceled_jobs: |
| continue |
| self.sched.nodepool.returnNodeSet(nodeset) |
| for item_behind in item.items_behind: |
| self.log.debug("Canceling jobs for change %s, behind change %s" % |
| (item_behind.change, item.change)) |
| if self.cancelJobs(item_behind, prime=prime): |
| canceled = True |
| return canceled |
| |
| def _loadDynamicLayout(self, item): |
| # Load layout |
| # Late import to break an import loop |
| import zuul.configloader |
| loader = zuul.configloader.ConfigLoader() |
| |
| build_set = item.current_build_set |
| self.log.debug("Load dynamic layout with %s" % build_set.files) |
| try: |
| # First parse the config as it will land with the |
| # full set of config and project repos. This lets us |
| # catch syntax errors in config repos even though we won't |
| # actually run with that config. |
| loader.createDynamicLayout( |
| item.pipeline.layout.tenant, |
| build_set.files, |
| include_config_projects=True) |
| |
| # Then create the config a second time but without changes |
| # to config repos so that we actually use this config. |
| layout = loader.createDynamicLayout( |
| item.pipeline.layout.tenant, |
| build_set.files, |
| include_config_projects=False) |
| except zuul.configloader.ConfigurationSyntaxError as e: |
| self.log.info("Configuration syntax error " |
| "in dynamic layout %s" % |
| build_set.files) |
| item.setConfigError(str(e)) |
| return None |
| except Exception: |
| self.log.exception("Error in dynamic layout %s" % |
| build_set.files) |
| item.setConfigError("Unknown configuration error") |
| return None |
| return layout |
| |
| def getLayout(self, item): |
| if not item.change.updatesConfig(): |
| if item.item_ahead: |
| return item.item_ahead.current_build_set.layout |
| else: |
| return item.queue.pipeline.layout |
| # This item updates the config, ask the merger for the result. |
| build_set = item.current_build_set |
| if build_set.merge_state == build_set.PENDING: |
| return None |
| if build_set.merge_state == build_set.COMPLETE: |
| if build_set.unable_to_merge: |
| return None |
| self.log.debug("Preparing dynamic layout for: %s" % item.change) |
| return self._loadDynamicLayout(item) |
| |
| def scheduleMerge(self, item, files=None, dirs=None): |
| build_set = item.current_build_set |
| |
| if not hasattr(item.change, 'branch'): |
| self.log.debug("Change %s does not have an associated branch, " |
| "not scheduling a merge job for item %s" % |
| (item.change, item)) |
| build_set.merge_state = build_set.COMPLETE |
| return True |
| |
| self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" % |
| (item, files, dirs)) |
| build_set = item.current_build_set |
| build_set.merge_state = build_set.PENDING |
| self.sched.merger.mergeChanges(build_set.merger_items, |
| item.current_build_set, files, dirs, |
| precedence=self.pipeline.precedence) |
| return False |
| |
| def prepareItem(self, item): |
| # This runs on every iteration of _processOneItem |
| # Returns True if the item is ready, false otherwise |
| build_set = item.current_build_set |
| if not build_set.ref: |
| build_set.setConfiguration() |
| if build_set.merge_state == build_set.NEW: |
| return self.scheduleMerge(item, |
| files=['zuul.yaml', '.zuul.yaml'], |
| dirs=['zuul.d', '.zuul.d']) |
| if build_set.merge_state == build_set.PENDING: |
| return False |
| if build_set.unable_to_merge: |
| return False |
| if build_set.config_error: |
| return False |
| return True |
| |
| def prepareJobs(self, item): |
| # This only runs once the item is in the pipeline's action window |
| # Returns True if the item is ready, false otherwise |
| build_set = item.current_build_set |
| if not build_set.layout: |
| build_set.layout = self.getLayout(item) |
| if not build_set.layout: |
| return False |
| |
| if not item.job_graph: |
| try: |
| item.freezeJobGraph() |
| except Exception as e: |
| # TODOv3(jeblair): nicify this exception as it will be reported |
| self.log.exception("Error freezing job graph for %s" % |
| item) |
| item.setConfigError("Unable to freeze job graph: %s" % |
| (str(e))) |
| return False |
| return True |
| |
| def _processOneItem(self, item, nnfi): |
| changed = False |
| ready = False |
| failing_reasons = [] # Reasons this item is failing |
| |
| item_ahead = item.item_ahead |
| if item_ahead and (not item_ahead.live): |
| item_ahead = None |
| change_queue = item.queue |
| |
| if self.checkForChangesNeededBy(item.change, change_queue) is not True: |
| # It's not okay to enqueue this change, we should remove it. |
| self.log.info("Dequeuing change %s because " |
| "it can no longer merge" % item.change) |
| self.cancelJobs(item) |
| self.dequeueItem(item) |
| item.setDequeuedNeedingChange() |
| if item.live: |
| try: |
| self.reportItem(item) |
| except exceptions.MergeFailure: |
| pass |
| return (True, nnfi) |
| |
| actionable = change_queue.isActionable(item) |
| item.active = actionable |
| |
| dep_items = self.getFailingDependentItems(item) |
| if dep_items: |
| failing_reasons.append('a needed change is failing') |
| self.cancelJobs(item, prime=False) |
| else: |
| item_ahead_merged = False |
| if (item_ahead and item_ahead.change.is_merged): |
| item_ahead_merged = True |
| if (item_ahead != nnfi and not item_ahead_merged): |
| # Our current base is different than what we expected, |
| # and it's not because our current base merged. Something |
| # ahead must have failed. |
| self.log.info("Resetting builds for change %s because the " |
| "item ahead, %s, is not the nearest non-failing " |
| "item, %s" % (item.change, item_ahead, nnfi)) |
| change_queue.moveItem(item, nnfi) |
| changed = True |
| self.cancelJobs(item) |
| if actionable: |
| ready = self.prepareItem(item) and self.prepareJobs(item) |
| # Starting jobs reporting should only be done once if there are |
| # jobs to run for this item. |
| if ready and len(self.pipeline.start_actions) > 0 \ |
| and len(item.job_graph.jobs) > 0 \ |
| and not item.reported_start \ |
| and not item.quiet: |
| self.reportStart(item) |
| item.reported_start = True |
| if item.current_build_set.unable_to_merge: |
| failing_reasons.append("it has a merge conflict") |
| if item.current_build_set.config_error: |
| failing_reasons.append("it has an invalid configuration") |
| if ready and self.provisionNodes(item): |
| changed = True |
| if ready and self.executeJobs(item): |
| changed = True |
| |
| if item.didAnyJobFail(): |
| failing_reasons.append("at least one job failed") |
| if (not item.live) and (not item.items_behind): |
| failing_reasons.append("is a non-live item with no items behind") |
| self.dequeueItem(item) |
| changed = True |
| if ((not item_ahead) and item.areAllJobsComplete() and item.live): |
| try: |
| self.reportItem(item) |
| except exceptions.MergeFailure: |
| failing_reasons.append("it did not merge") |
| for item_behind in item.items_behind: |
| self.log.info("Resetting builds for change %s because the " |
| "item ahead, %s, failed to merge" % |
| (item_behind.change, item)) |
| self.cancelJobs(item_behind) |
| self.dequeueItem(item) |
| changed = True |
| elif not failing_reasons and item.live: |
| nnfi = item |
| item.current_build_set.failing_reasons = failing_reasons |
| if failing_reasons: |
| self.log.debug("%s is a failing item because %s" % |
| (item, failing_reasons)) |
| return (changed, nnfi) |
| |
| def processQueue(self): |
| # Do whatever needs to be done for each change in the queue |
| self.log.debug("Starting queue processor: %s" % self.pipeline.name) |
| changed = False |
| for queue in self.pipeline.queues: |
| queue_changed = False |
| nnfi = None # Nearest non-failing item |
| for item in queue.queue[:]: |
| item_changed, nnfi = self._processOneItem( |
| item, nnfi) |
| if item_changed: |
| queue_changed = True |
| self.reportStats(item) |
| if queue_changed: |
| changed = True |
| status = '' |
| for item in queue.queue: |
| status += item.formatStatus() |
| if status: |
| self.log.debug("Queue %s status is now:\n %s" % |
| (queue.name, status)) |
| self.log.debug("Finished queue processor: %s (changed: %s)" % |
| (self.pipeline.name, changed)) |
| return changed |
| |
| def onBuildStarted(self, build): |
| self.log.debug("Build %s started" % build) |
| return True |
| |
| def onBuildCompleted(self, build): |
| self.log.debug("Build %s completed" % build) |
| item = build.build_set.item |
| |
| item.setResult(build) |
| item.pipeline.layout.tenant.semaphore_handler.release(item, build.job) |
| self.log.debug("Item %s status is now:\n %s" % |
| (item, item.formatStatus())) |
| |
| if build.retry: |
| build.build_set.removeJobNodeSet(build.job.name) |
| |
| return True |
| |
| def onMergeCompleted(self, event): |
| build_set = event.build_set |
| item = build_set.item |
| build_set.merge_state = build_set.COMPLETE |
| build_set.zuul_url = event.zuul_url |
| if event.merged: |
| build_set.commit = event.commit |
| build_set.files.setFiles(event.files) |
| build_set.repo_state = event.repo_state |
| elif event.updated: |
| build_set.commit = item.change.newrev |
| if not build_set.commit: |
| self.log.info("Unable to merge change %s" % item.change) |
| item.setUnableToMerge() |
| |
| def onNodesProvisioned(self, event): |
| # TODOv3(jeblair): handle provisioning failure here |
| request = event.request |
| build_set = request.build_set |
| build_set.jobNodeRequestComplete(request.job.name, request, |
| request.nodeset) |
| if request.failed or not request.fulfilled: |
| self.log.info("Node request failure for %s" % |
| (request.job.name,)) |
| build_set.item.setNodeRequestFailure(request.job) |
| self.log.info("Completed node request %s for job %s of item %s " |
| "with nodes %s" % |
| (request, request.job, build_set.item, |
| request.nodeset)) |
| |
| def reportItem(self, item): |
| if not item.reported: |
| # _reportItem() returns True if it failed to report. |
| item.reported = not self._reportItem(item) |
| if self.changes_merge: |
| succeeded = item.didAllJobsSucceed() |
| merged = item.reported |
| source = item.change.project.source |
| if merged: |
| merged = source.isMerged(item.change, item.change.branch) |
| self.log.info("Reported change %s status: all-succeeded: %s, " |
| "merged: %s" % (item.change, succeeded, merged)) |
| change_queue = item.queue |
| if not (succeeded and merged): |
| self.log.debug("Reported change %s failed tests or failed " |
| "to merge" % (item.change)) |
| change_queue.decreaseWindowSize() |
| self.log.debug("%s window size decreased to %s" % |
| (change_queue, change_queue.window)) |
| raise exceptions.MergeFailure( |
| "Change %s failed to merge" % item.change) |
| else: |
| change_queue.increaseWindowSize() |
| self.log.debug("%s window size increased to %s" % |
| (change_queue, change_queue.window)) |
| |
| zuul_driver = self.sched.connections.drivers['zuul'] |
| tenant = self.pipeline.layout.tenant |
| zuul_driver.onChangeMerged(tenant, item.change, source) |
| |
| def _reportItem(self, item): |
| self.log.debug("Reporting change %s" % item.change) |
| ret = True # Means error as returned by trigger.report |
| |
| # In the case of failure, we may not hove completed an initial |
| # merge which would get the layout for this item, so in order |
| # to determine whether this item's project is in this |
| # pipeline, use the dynamic layout if available, otherwise, |
| # fall back to the current static layout as a best |
| # approximation. |
| layout = (item.current_build_set.layout or |
| self.pipeline.layout) |
| |
| if not layout.hasProject(item.change.project): |
| self.log.debug("Project %s not in pipeline %s for change %s" % ( |
| item.change.project, self.pipeline, item.change)) |
| actions = [] |
| elif item.getConfigError(): |
| self.log.debug("Invalid config for change %s" % item.change) |
| # TODOv3(jeblair): consider a new reporter action for this |
| actions = self.pipeline.merge_failure_actions |
| item.setReportedResult('CONFIG_ERROR') |
| elif item.didMergerFail(): |
| actions = self.pipeline.merge_failure_actions |
| item.setReportedResult('MERGER_FAILURE') |
| elif item.wasDequeuedNeedingChange(): |
| actions = self.pipeline.failure_actions |
| item.setReportedResult('FAILURE') |
| elif not item.getJobs(): |
| # We don't send empty reports with +1 |
| self.log.debug("No jobs for change %s" % (item.change,)) |
| actions = [] |
| elif item.didAllJobsSucceed(): |
| self.log.debug("success %s" % (self.pipeline.success_actions)) |
| actions = self.pipeline.success_actions |
| item.setReportedResult('SUCCESS') |
| self.pipeline._consecutive_failures = 0 |
| else: |
| actions = self.pipeline.failure_actions |
| item.setReportedResult('FAILURE') |
| self.pipeline._consecutive_failures += 1 |
| if layout.hasProject(item.change.project) and self.pipeline._disabled: |
| actions = self.pipeline.disabled_actions |
| # Check here if we should disable so that we only use the disabled |
| # reporters /after/ the last disable_at failure is still reported as |
| # normal. |
| if (self.pipeline.disable_at and not self.pipeline._disabled and |
| self.pipeline._consecutive_failures >= self.pipeline.disable_at): |
| self.pipeline._disabled = True |
| if actions: |
| try: |
| self.log.info("Reporting item %s, actions: %s" % |
| (item, actions)) |
| ret = self.sendReport(actions, item) |
| if ret: |
| self.log.error("Reporting item %s received: %s" % |
| (item, ret)) |
| except: |
| self.log.exception("Exception while reporting:") |
| item.setReportedResult('ERROR') |
| return ret |
| |
| def reportStats(self, item): |
| if not self.sched.statsd: |
| return |
| try: |
| # Update the gauge on enqueue and dequeue, but timers only |
| # when dequeing. |
| if item.dequeue_time: |
| dt = int((item.dequeue_time - item.enqueue_time) * 1000) |
| else: |
| dt = None |
| items = len(self.pipeline.getAllItems()) |
| |
| # stats.timers.zuul.pipeline.NAME.resident_time |
| # stats_counts.zuul.pipeline.NAME.total_changes |
| # stats.gauges.zuul.pipeline.NAME.current_changes |
| key = 'zuul.pipeline.%s' % self.pipeline.name |
| self.sched.statsd.gauge(key + '.current_changes', items) |
| if dt: |
| self.sched.statsd.timing(key + '.resident_time', dt) |
| self.sched.statsd.incr(key + '.total_changes') |
| |
| # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time |
| # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes |
| project_name = item.change.project.name.replace('/', '.') |
| key += '.%s' % project_name |
| if dt: |
| self.sched.statsd.timing(key + '.resident_time', dt) |
| self.sched.statsd.incr(key + '.total_changes') |
| except: |
| self.log.exception("Exception reporting pipeline stats") |