blob: 5aef45357136df52303a907e9fb748d6e59435f9 [file] [log] [blame]
James E. Blair83005782015-12-11 14:46:03 -08001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
James E. Blair83005782015-12-11 14:46:03 -080013from zuul import model
Monty Taylorc75478c2016-07-29 12:04:21 -070014from zuul.manager import PipelineManager, StaticChangeQueueContextManager
Jesse Keating78f544a2017-07-13 14:27:40 -070015from zuul.manager import DynamicChangeQueueContextManager
James E. Blair83005782015-12-11 14:46:03 -080016
17
Monty Taylorc75478c2016-07-29 12:04:21 -070018class DependentPipelineManager(PipelineManager):
Monty Taylor82dfd412016-07-29 12:01:28 -070019 """PipelineManager for handling interrelated Changes.
20
21 The DependentPipelineManager puts Changes that share a Pipeline
22 into a shared :py:class:`~zuul.model.ChangeQueue`. It them processes them
23 using the Optmistic Branch Prediction logic with Nearest Non-Failing Item
24 reparenting algorithm for handling errors.
25 """
James E. Blair83005782015-12-11 14:46:03 -080026 changes_merge = True
27
28 def __init__(self, *args, **kwargs):
29 super(DependentPipelineManager, self).__init__(*args, **kwargs)
30
31 def _postConfig(self, layout):
32 super(DependentPipelineManager, self)._postConfig(layout)
33 self.buildChangeQueues()
34
35 def buildChangeQueues(self):
36 self.log.debug("Building shared change queues")
James E. Blair0dcef7a2016-08-19 09:35:17 -070037 change_queues = {}
38 project_configs = self.pipeline.layout.project_configs
James E. Blair0ffa0102017-03-30 13:11:33 -070039 tenant = self.pipeline.layout.tenant
James E. Blair83005782015-12-11 14:46:03 -080040
James E. Blairf59f3cf2017-02-19 14:50:26 -080041 for project_config in project_configs.values():
42 project_pipeline_config = project_config.pipelines.get(
43 self.pipeline.name)
44 if project_pipeline_config is None:
45 continue
James E. Blair0ffa0102017-03-30 13:11:33 -070046 (trusted, project) = tenant.getProject(project_config.name)
James E. Blair0dcef7a2016-08-19 09:35:17 -070047 queue_name = project_pipeline_config.queue_name
48 if queue_name and queue_name in change_queues:
49 change_queue = change_queues[queue_name]
50 else:
51 p = self.pipeline
52 change_queue = model.ChangeQueue(
53 p,
54 window=p.window,
55 window_floor=p.window_floor,
56 window_increase_type=p.window_increase_type,
57 window_increase_factor=p.window_increase_factor,
58 window_decrease_type=p.window_decrease_type,
59 window_decrease_factor=p.window_decrease_factor,
60 name=queue_name)
61 if queue_name:
62 # If this is a named queue, keep track of it in
63 # case it is referenced again. Otherwise, it will
64 # have a name automatically generated from its
65 # constituent projects.
66 change_queues[queue_name] = change_queue
67 self.pipeline.addQueue(change_queue)
68 self.log.debug("Created queue: %s" % change_queue)
James E. Blair83005782015-12-11 14:46:03 -080069 change_queue.addProject(project)
James E. Blair0dcef7a2016-08-19 09:35:17 -070070 self.log.debug("Added project %s to queue: %s" %
71 (project, change_queue))
James E. Blair83005782015-12-11 14:46:03 -080072
73 def getChangeQueue(self, change, existing=None):
74 if existing:
75 return StaticChangeQueueContextManager(existing)
Jesse Keating78f544a2017-07-13 14:27:40 -070076 queue = self.pipeline.getQueue(change.project)
77 if queue:
78 return StaticChangeQueueContextManager(queue)
79 else:
80 # There is no existing queue for this change. Create a
81 # dynamic one for this one change's use
82 change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
83 change_queue.addProject(change.project)
84 self.pipeline.addQueue(change_queue)
85 self.log.debug("Dynamically created queue %s", change_queue)
86 return DynamicChangeQueueContextManager(change_queue)
James E. Blair83005782015-12-11 14:46:03 -080087
88 def isChangeReadyToBeEnqueued(self, change):
James E. Blair6053de42017-04-05 11:27:11 -070089 source = change.project.source
90 if not source.canMerge(change, self.getSubmitAllowNeeds()):
James E. Blair83005782015-12-11 14:46:03 -080091 self.log.debug("Change %s can not merge, ignoring" % change)
92 return False
93 return True
94
95 def enqueueChangesBehind(self, change, quiet, ignore_requirements,
96 change_queue):
James E. Blair83005782015-12-11 14:46:03 -080097 self.log.debug("Checking for changes needing %s:" % change)
James E. Blair6053de42017-04-05 11:27:11 -070098 to_enqueue = []
99 source = change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800100 if not hasattr(change, 'needed_by_changes'):
Clint Byrumf8cc9902017-03-22 22:38:25 -0700101 self.log.debug(" %s does not support dependencies" % type(change))
James E. Blair83005782015-12-11 14:46:03 -0800102 return
103 for other_change in change.needed_by_changes:
104 with self.getChangeQueue(other_change) as other_change_queue:
105 if other_change_queue != change_queue:
106 self.log.debug(" Change %s in project %s can not be "
107 "enqueued in the target queue %s" %
108 (other_change, other_change.project,
109 change_queue))
110 continue
James E. Blair6053de42017-04-05 11:27:11 -0700111 if source.canMerge(other_change, self.getSubmitAllowNeeds()):
James E. Blair83005782015-12-11 14:46:03 -0800112 self.log.debug(" Change %s needs %s and is ready to merge" %
113 (other_change, change))
114 to_enqueue.append(other_change)
115
116 if not to_enqueue:
117 self.log.debug(" No changes need %s" % change)
118
119 for other_change in to_enqueue:
120 self.addChange(other_change, quiet=quiet,
121 ignore_requirements=ignore_requirements,
122 change_queue=change_queue)
123
124 def enqueueChangesAhead(self, change, quiet, ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200125 change_queue, history=None):
126 if history and change.number in history:
127 # detected dependency cycle
128 self.log.warn("Dependency cycle detected")
129 return False
130 if hasattr(change, 'number'):
131 history = history or []
132 history.append(change.number)
133
James E. Blair83005782015-12-11 14:46:03 -0800134 ret = self.checkForChangesNeededBy(change, change_queue)
135 if ret in [True, False]:
136 return ret
137 self.log.debug(" Changes %s must be merged ahead of %s" %
138 (ret, change))
139 for needed_change in ret:
140 r = self.addChange(needed_change, quiet=quiet,
141 ignore_requirements=ignore_requirements,
Tobias Henkel6b9390f2017-03-28 11:23:21 +0200142 change_queue=change_queue, history=history)
James E. Blair83005782015-12-11 14:46:03 -0800143 if not r:
144 return False
145 return True
146
147 def checkForChangesNeededBy(self, change, change_queue):
148 self.log.debug("Checking for changes needed by %s:" % change)
James E. Blair6053de42017-04-05 11:27:11 -0700149 source = change.project.source
James E. Blair83005782015-12-11 14:46:03 -0800150 # Return true if okay to proceed enqueing this change,
151 # false if the change should not be enqueued.
152 if not hasattr(change, 'needs_changes'):
Clint Byrumf8cc9902017-03-22 22:38:25 -0700153 self.log.debug(" %s does not support dependencies" % type(change))
James E. Blair83005782015-12-11 14:46:03 -0800154 return True
155 if not change.needs_changes:
156 self.log.debug(" No changes needed")
157 return True
158 changes_needed = []
159 # Ignore supplied change_queue
160 with self.getChangeQueue(change) as change_queue:
161 for needed_change in change.needs_changes:
162 self.log.debug(" Change %s needs change %s:" % (
163 change, needed_change))
164 if needed_change.is_merged:
165 self.log.debug(" Needed change is merged")
166 continue
167 with self.getChangeQueue(needed_change) as needed_change_queue:
168 if needed_change_queue != change_queue:
169 self.log.debug(" Change %s in project %s does not "
170 "share a change queue with %s "
171 "in project %s" %
172 (needed_change, needed_change.project,
173 change, change.project))
174 return False
175 if not needed_change.is_current_patchset:
176 self.log.debug(" Needed change is not the "
177 "current patchset")
178 return False
179 if self.isChangeAlreadyInQueue(needed_change, change_queue):
180 self.log.debug(" Needed change is already ahead "
181 "in the queue")
182 continue
James E. Blair6053de42017-04-05 11:27:11 -0700183 if source.canMerge(needed_change, self.getSubmitAllowNeeds()):
James E. Blair83005782015-12-11 14:46:03 -0800184 self.log.debug(" Change %s is needed" % needed_change)
185 if needed_change not in changes_needed:
186 changes_needed.append(needed_change)
187 continue
188 # The needed change can't be merged.
189 self.log.debug(" Change %s is needed but can not be merged" %
190 needed_change)
191 return False
192 if changes_needed:
193 return changes_needed
194 return True
195
196 def getFailingDependentItems(self, item):
197 if not hasattr(item.change, 'needs_changes'):
198 return None
199 if not item.change.needs_changes:
200 return None
201 failing_items = set()
202 for needed_change in item.change.needs_changes:
203 needed_item = self.getItemForChange(needed_change)
204 if not needed_item:
205 continue
206 if needed_item.current_build_set.failing_reasons:
207 failing_items.add(needed_item)
208 if failing_items:
209 return failing_items
210 return None
Jesse Keating78f544a2017-07-13 14:27:40 -0700211
212 def dequeueItem(self, item):
213 super(DependentPipelineManager, self).dequeueItem(item)
214 # If this was a dynamic queue from a speculative change,
215 # remove the queue (if empty)
216 if item.queue.dynamic:
217 if not item.queue.queue:
218 self.pipeline.removeQueue(item.queue)