James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 1 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 2 | # not use this file except in compliance with the License. You may obtain |
| 3 | # a copy of the License at |
| 4 | # |
| 5 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 6 | # |
| 7 | # Unless required by applicable law or agreed to in writing, software |
| 8 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 9 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 10 | # License for the specific language governing permissions and limitations |
| 11 | # under the License. |
| 12 | |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 13 | from zuul import model |
Monty Taylor | c75478c | 2016-07-29 12:04:21 -0700 | [diff] [blame] | 14 | from zuul.manager import PipelineManager, StaticChangeQueueContextManager |
Jesse Keating | 78f544a | 2017-07-13 14:27:40 -0700 | [diff] [blame] | 15 | from zuul.manager import DynamicChangeQueueContextManager |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 16 | |
| 17 | |
Monty Taylor | c75478c | 2016-07-29 12:04:21 -0700 | [diff] [blame] | 18 | class DependentPipelineManager(PipelineManager): |
Monty Taylor | 82dfd41 | 2016-07-29 12:01:28 -0700 | [diff] [blame] | 19 | """PipelineManager for handling interrelated Changes. |
| 20 | |
| 21 | The DependentPipelineManager puts Changes that share a Pipeline |
| 22 | into a shared :py:class:`~zuul.model.ChangeQueue`. It them processes them |
| 23 | using the Optmistic Branch Prediction logic with Nearest Non-Failing Item |
| 24 | reparenting algorithm for handling errors. |
| 25 | """ |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 26 | changes_merge = True |
| 27 | |
| 28 | def __init__(self, *args, **kwargs): |
| 29 | super(DependentPipelineManager, self).__init__(*args, **kwargs) |
| 30 | |
| 31 | def _postConfig(self, layout): |
| 32 | super(DependentPipelineManager, self)._postConfig(layout) |
| 33 | self.buildChangeQueues() |
| 34 | |
| 35 | def buildChangeQueues(self): |
| 36 | self.log.debug("Building shared change queues") |
James E. Blair | 0dcef7a | 2016-08-19 09:35:17 -0700 | [diff] [blame] | 37 | change_queues = {} |
| 38 | project_configs = self.pipeline.layout.project_configs |
James E. Blair | 0ffa010 | 2017-03-30 13:11:33 -0700 | [diff] [blame] | 39 | tenant = self.pipeline.layout.tenant |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 40 | |
James E. Blair | f59f3cf | 2017-02-19 14:50:26 -0800 | [diff] [blame] | 41 | for project_config in project_configs.values(): |
| 42 | project_pipeline_config = project_config.pipelines.get( |
| 43 | self.pipeline.name) |
| 44 | if project_pipeline_config is None: |
| 45 | continue |
James E. Blair | 0ffa010 | 2017-03-30 13:11:33 -0700 | [diff] [blame] | 46 | (trusted, project) = tenant.getProject(project_config.name) |
James E. Blair | 0dcef7a | 2016-08-19 09:35:17 -0700 | [diff] [blame] | 47 | queue_name = project_pipeline_config.queue_name |
| 48 | if queue_name and queue_name in change_queues: |
| 49 | change_queue = change_queues[queue_name] |
| 50 | else: |
| 51 | p = self.pipeline |
| 52 | change_queue = model.ChangeQueue( |
| 53 | p, |
| 54 | window=p.window, |
| 55 | window_floor=p.window_floor, |
| 56 | window_increase_type=p.window_increase_type, |
| 57 | window_increase_factor=p.window_increase_factor, |
| 58 | window_decrease_type=p.window_decrease_type, |
| 59 | window_decrease_factor=p.window_decrease_factor, |
| 60 | name=queue_name) |
| 61 | if queue_name: |
| 62 | # If this is a named queue, keep track of it in |
| 63 | # case it is referenced again. Otherwise, it will |
| 64 | # have a name automatically generated from its |
| 65 | # constituent projects. |
| 66 | change_queues[queue_name] = change_queue |
| 67 | self.pipeline.addQueue(change_queue) |
| 68 | self.log.debug("Created queue: %s" % change_queue) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 69 | change_queue.addProject(project) |
James E. Blair | 0dcef7a | 2016-08-19 09:35:17 -0700 | [diff] [blame] | 70 | self.log.debug("Added project %s to queue: %s" % |
| 71 | (project, change_queue)) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 72 | |
| 73 | def getChangeQueue(self, change, existing=None): |
| 74 | if existing: |
| 75 | return StaticChangeQueueContextManager(existing) |
Jesse Keating | 78f544a | 2017-07-13 14:27:40 -0700 | [diff] [blame] | 76 | queue = self.pipeline.getQueue(change.project) |
| 77 | if queue: |
| 78 | return StaticChangeQueueContextManager(queue) |
| 79 | else: |
| 80 | # There is no existing queue for this change. Create a |
| 81 | # dynamic one for this one change's use |
| 82 | change_queue = model.ChangeQueue(self.pipeline, dynamic=True) |
| 83 | change_queue.addProject(change.project) |
| 84 | self.pipeline.addQueue(change_queue) |
| 85 | self.log.debug("Dynamically created queue %s", change_queue) |
| 86 | return DynamicChangeQueueContextManager(change_queue) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 87 | |
| 88 | def isChangeReadyToBeEnqueued(self, change): |
James E. Blair | 6053de4 | 2017-04-05 11:27:11 -0700 | [diff] [blame] | 89 | source = change.project.source |
| 90 | if not source.canMerge(change, self.getSubmitAllowNeeds()): |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 91 | self.log.debug("Change %s can not merge, ignoring" % change) |
| 92 | return False |
| 93 | return True |
| 94 | |
| 95 | def enqueueChangesBehind(self, change, quiet, ignore_requirements, |
| 96 | change_queue): |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 97 | self.log.debug("Checking for changes needing %s:" % change) |
James E. Blair | 6053de4 | 2017-04-05 11:27:11 -0700 | [diff] [blame] | 98 | to_enqueue = [] |
| 99 | source = change.project.source |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 100 | if not hasattr(change, 'needed_by_changes'): |
Clint Byrum | f8cc990 | 2017-03-22 22:38:25 -0700 | [diff] [blame] | 101 | self.log.debug(" %s does not support dependencies" % type(change)) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 102 | return |
| 103 | for other_change in change.needed_by_changes: |
| 104 | with self.getChangeQueue(other_change) as other_change_queue: |
| 105 | if other_change_queue != change_queue: |
| 106 | self.log.debug(" Change %s in project %s can not be " |
| 107 | "enqueued in the target queue %s" % |
| 108 | (other_change, other_change.project, |
| 109 | change_queue)) |
| 110 | continue |
James E. Blair | 6053de4 | 2017-04-05 11:27:11 -0700 | [diff] [blame] | 111 | if source.canMerge(other_change, self.getSubmitAllowNeeds()): |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 112 | self.log.debug(" Change %s needs %s and is ready to merge" % |
| 113 | (other_change, change)) |
| 114 | to_enqueue.append(other_change) |
| 115 | |
| 116 | if not to_enqueue: |
| 117 | self.log.debug(" No changes need %s" % change) |
| 118 | |
| 119 | for other_change in to_enqueue: |
| 120 | self.addChange(other_change, quiet=quiet, |
| 121 | ignore_requirements=ignore_requirements, |
| 122 | change_queue=change_queue) |
| 123 | |
| 124 | def enqueueChangesAhead(self, change, quiet, ignore_requirements, |
Tobias Henkel | 6b9390f | 2017-03-28 11:23:21 +0200 | [diff] [blame] | 125 | change_queue, history=None): |
| 126 | if history and change.number in history: |
| 127 | # detected dependency cycle |
| 128 | self.log.warn("Dependency cycle detected") |
| 129 | return False |
| 130 | if hasattr(change, 'number'): |
| 131 | history = history or [] |
| 132 | history.append(change.number) |
| 133 | |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 134 | ret = self.checkForChangesNeededBy(change, change_queue) |
| 135 | if ret in [True, False]: |
| 136 | return ret |
| 137 | self.log.debug(" Changes %s must be merged ahead of %s" % |
| 138 | (ret, change)) |
| 139 | for needed_change in ret: |
| 140 | r = self.addChange(needed_change, quiet=quiet, |
| 141 | ignore_requirements=ignore_requirements, |
Tobias Henkel | 6b9390f | 2017-03-28 11:23:21 +0200 | [diff] [blame] | 142 | change_queue=change_queue, history=history) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 143 | if not r: |
| 144 | return False |
| 145 | return True |
| 146 | |
| 147 | def checkForChangesNeededBy(self, change, change_queue): |
| 148 | self.log.debug("Checking for changes needed by %s:" % change) |
James E. Blair | 6053de4 | 2017-04-05 11:27:11 -0700 | [diff] [blame] | 149 | source = change.project.source |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 150 | # Return true if okay to proceed enqueing this change, |
| 151 | # false if the change should not be enqueued. |
| 152 | if not hasattr(change, 'needs_changes'): |
Clint Byrum | f8cc990 | 2017-03-22 22:38:25 -0700 | [diff] [blame] | 153 | self.log.debug(" %s does not support dependencies" % type(change)) |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 154 | return True |
| 155 | if not change.needs_changes: |
| 156 | self.log.debug(" No changes needed") |
| 157 | return True |
| 158 | changes_needed = [] |
| 159 | # Ignore supplied change_queue |
| 160 | with self.getChangeQueue(change) as change_queue: |
| 161 | for needed_change in change.needs_changes: |
| 162 | self.log.debug(" Change %s needs change %s:" % ( |
| 163 | change, needed_change)) |
| 164 | if needed_change.is_merged: |
| 165 | self.log.debug(" Needed change is merged") |
| 166 | continue |
| 167 | with self.getChangeQueue(needed_change) as needed_change_queue: |
| 168 | if needed_change_queue != change_queue: |
| 169 | self.log.debug(" Change %s in project %s does not " |
| 170 | "share a change queue with %s " |
| 171 | "in project %s" % |
| 172 | (needed_change, needed_change.project, |
| 173 | change, change.project)) |
| 174 | return False |
| 175 | if not needed_change.is_current_patchset: |
| 176 | self.log.debug(" Needed change is not the " |
| 177 | "current patchset") |
| 178 | return False |
| 179 | if self.isChangeAlreadyInQueue(needed_change, change_queue): |
| 180 | self.log.debug(" Needed change is already ahead " |
| 181 | "in the queue") |
| 182 | continue |
James E. Blair | 6053de4 | 2017-04-05 11:27:11 -0700 | [diff] [blame] | 183 | if source.canMerge(needed_change, self.getSubmitAllowNeeds()): |
James E. Blair | 8300578 | 2015-12-11 14:46:03 -0800 | [diff] [blame] | 184 | self.log.debug(" Change %s is needed" % needed_change) |
| 185 | if needed_change not in changes_needed: |
| 186 | changes_needed.append(needed_change) |
| 187 | continue |
| 188 | # The needed change can't be merged. |
| 189 | self.log.debug(" Change %s is needed but can not be merged" % |
| 190 | needed_change) |
| 191 | return False |
| 192 | if changes_needed: |
| 193 | return changes_needed |
| 194 | return True |
| 195 | |
| 196 | def getFailingDependentItems(self, item): |
| 197 | if not hasattr(item.change, 'needs_changes'): |
| 198 | return None |
| 199 | if not item.change.needs_changes: |
| 200 | return None |
| 201 | failing_items = set() |
| 202 | for needed_change in item.change.needs_changes: |
| 203 | needed_item = self.getItemForChange(needed_change) |
| 204 | if not needed_item: |
| 205 | continue |
| 206 | if needed_item.current_build_set.failing_reasons: |
| 207 | failing_items.add(needed_item) |
| 208 | if failing_items: |
| 209 | return failing_items |
| 210 | return None |
Jesse Keating | 78f544a | 2017-07-13 14:27:40 -0700 | [diff] [blame] | 211 | |
| 212 | def dequeueItem(self, item): |
| 213 | super(DependentPipelineManager, self).dequeueItem(item) |
| 214 | # If this was a dynamic queue from a speculative change, |
| 215 | # remove the queue (if empty) |
| 216 | if item.queue.dynamic: |
| 217 | if not item.queue.queue: |
| 218 | self.pipeline.removeQueue(item.queue) |