Use previously stored repo state on executor
When the initial speculative merge for a change is performed at
the request of the pipeline manager, the repo state used to
construct that merge is saved in a data structure. Pass that
structure to the executor when running jobs so that, after cloning
each repo into the jobdir, the repos are made to appear the same
as those an the merger before it started its merge. The subsequent
merge operatons on the executor will repeat the same operations
producing the same content (though the actual commits will be
different due to timestamps).
It would be more efficient to have the executors pull changes from
the mergers, however, that would require the mergers to run an
accessible git service, which is one of the things that adds
significant complexity to a zuul deployment. This method only
requires that the mergers be able to initiate outgoing connections
to gearman and sources.
Because the initial merge may happen well before jobs are executed,
save the dependency chain for a given BuildSet when it's configuration
is being finalized. This will cause us to save not only the repository
configuration that the merger uses, but also the exact sequence of
changes applied on top of that state. (Currently, we build the series
of changes we apply before running each job, however, the queue state
can change (especially if items are merged) in the period between the
inital merge and job launch).
The initial merge is performed before we have a shadow layout for the
item, yet, we must specify a merge mode for each project for which we
merge a change. Currently, we are defaulting to the 'merge-resolve'
merge mode for every project during the initial speculative merge, but
then the secondary merge on the executor will use the correct merge
mode since we have a layout at that point. With this change, where
we are trying to replicate the initial merge exactly, we can't rely
on that behavior any more. Instead, when attempting to find the merge
mode to use for a project, we use the shadow layout of the nearest
item ahead, or else the current live layout, to find the merge mode,
and only if those fail, do we use the default. This means that a change
to a project's merge-mode will not use that merge mode. However,
subsequent changes will. This seems to be the best we can do, short
of detecting this case and merging such changes twice. This seems
rare enough that we don't need to do that.
The test_delayed_merge_conflict method is updated to essentially invert
the meaning of the test. Since the old behavior was for the initial
merge check to be completely independent of the executor merge, this
test examined the case where the initial merge worked but between that
time and when the executor performed its merge, a conflicting change
landed. That should no longer be possible since the executor merge
now uses the results of the initial merge. We keep the test, but invert
its final assertion -- instead of checking for a merge conflict being
reported, we check that no merge conflict is reported.
Change-Id: I34cd58ec9775c1d151db02034c342bd971af036f
diff --git a/tests/base.py b/tests/base.py
index d62d9ca..a9bcee1 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1189,9 +1189,10 @@
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
- def doMergeChanges(self, items):
+ def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
- commit = super(RecordingAnsibleJob, self).doMergeChanges(items)
+ commit = super(RecordingAnsibleJob, self).doMergeChanges(
+ items, repo_state)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')
return commit
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index d416369..2624944 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -994,8 +994,9 @@
"Test that delayed check merge conflicts are handled properly"
# Hold jobs in the gearman queue so that we can test whether
- # the executor returns a merge failure after the scheduler has
- # successfully merged.
+ # the executor sucesfully merges a change based on an old
+ # repo state (frozen by the scheduler) which would otherwise
+ # conflict.
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
@@ -1068,9 +1069,12 @@
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
- dict(name='project-merge', result='MERGER_FAILURE', changes='2,1'),
- dict(name='project-merge', result='MERGER_FAILURE',
- changes='2,1 3,1'),
+ dict(name='project-merge', result='SUCCESS', changes='2,1'),
+ dict(name='project-test1', result='SUCCESS', changes='2,1'),
+ dict(name='project-test2', result='SUCCESS', changes='2,1'),
+ dict(name='project-merge', result='SUCCESS', changes='2,1 3,1'),
+ dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+ dict(name='project-test2', result='SUCCESS', changes='2,1 3,1'),
], ordered=False)
def test_post(self):
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index e1eed2d..cb30a82 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -169,7 +169,8 @@
self.log.debug("Function %s is not registered" % name)
return False
- def execute(self, job, item, pipeline, dependent_items=[]):
+ def execute(self, job, item, pipeline, dependent_items=[],
+ merger_items=[]):
tenant = pipeline.layout.tenant
uuid = str(uuid4().hex)
self.log.info(
@@ -179,8 +180,11 @@
item.current_build_set.getJobNodeSet(job.name),
item.change,
[x.change for x in dependent_items]))
+
dependent_items = dependent_items[:]
dependent_items.reverse()
+ all_items = dependent_items + [item]
+
# TODOv3(jeblair): This ansible vars data structure will
# replace the environment variables below.
project = dict(
@@ -210,7 +214,7 @@
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,
i.change.refspec)
- for i in dependent_items + [item]])
+ for i in all_items])
params['ZUUL_BRANCH'] = item.change.branch
params['ZUUL_CHANGES'] = changes_str
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
@@ -220,7 +224,7 @@
zuul_changes = ' '.join(['%s,%s' % (i.change.number,
i.change.patchset)
- for i in dependent_items + [item]])
+ for i in all_items])
params['ZUUL_CHANGE_IDS'] = zuul_changes
params['ZUUL_CHANGE'] = str(item.change.number)
params['ZUUL_PATCHSET'] = str(item.change.patchset)
@@ -253,13 +257,11 @@
# ZUUL_OLDREV
# ZUUL_NEWREV
- all_items = dependent_items + [item]
- merger_items = [i.makeMergerItem() for i in all_items]
-
params['job'] = job.name
params['timeout'] = job.timeout
params['items'] = merger_items
params['projects'] = []
+ params['repo_state'] = item.current_build_set.repo_state
if job.name != 'noop':
params['playbooks'] = [x.toDict() for x in job.run]
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index c26dc2d..eb608c9 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -586,7 +586,7 @@
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
- if not self.doMergeChanges(merge_items):
+ if not self.doMergeChanges(merge_items, args['repo_state']):
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
return
@@ -632,10 +632,10 @@
result = dict(result=result)
self.job.sendWorkComplete(json.dumps(result))
- def doMergeChanges(self, items):
+ def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root)
- ret = merger.mergeChanges(items) # noqa
+ ret = merger.mergeChanges(items, repo_state=repo_state)
if not ret: # merge conflict
result = dict(result='MERGER_FAILURE')
self.job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 3545e1e..db9dc4c 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -192,17 +192,6 @@
def getFailingDependentItems(self, item):
return None
- def getDependentItems(self, item):
- orig_item = item
- items = []
- while item.item_ahead:
- items.append(item.item_ahead)
- item = item.item_ahead
- self.log.info("Change %s depends on changes %s" %
- (orig_item.change,
- [x.change for x in items]))
- return items
-
def getItemForChange(self, change):
for item in self.pipeline.getAllItems():
if item.change.equals(change):
@@ -364,7 +353,7 @@
def _executeJobs(self, item, jobs):
self.log.debug("Executing jobs for change %s" % item.change)
- dependent_items = self.getDependentItems(item)
+ build_set = item.current_build_set
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
@@ -372,7 +361,8 @@
self.sched.nodepool.useNodeSet(nodeset)
build = self.sched.executor.execute(job, item,
self.pipeline,
- dependent_items)
+ build_set.dependent_items,
+ build_set.merger_items)
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
@@ -502,13 +492,9 @@
self.log.debug("Scheduling merge for item %s (files: %s)" %
(item, files))
- dependent_items = self.getDependentItems(item)
- dependent_items.reverse()
- all_items = dependent_items + [item]
- merger_items = [i.makeMergerItem() for i in all_items]
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
- self.sched.merger.mergeChanges(merger_items,
+ self.sched.merger.mergeChanges(build_set.merger_items,
item.current_build_set, files,
precedence=self.pipeline.precedence)
return False
@@ -683,6 +669,7 @@
if event.merged:
build_set.commit = event.commit
build_set.files.setFiles(event.files)
+ build_set.repo_state = event.repo_state
elif event.updated:
build_set.commit = item.change.newrev
if not build_set.commit:
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index e2f2d43..c98f20e 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -130,6 +130,7 @@
updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
+ repo_state = data.get('repo_state', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
@@ -137,7 +138,8 @@
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set, zuul_url,
- merged, updated, commit, files)
+ merged, updated, commit, files,
+ repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 40c5fd4..ee83fa0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -14,6 +14,7 @@
# under the License.
import git
+import gitdb
import os
import logging
@@ -128,6 +129,22 @@
repo = self.createRepoObject()
return repo.refs
+ def setRefs(self, refs):
+ repo = self.createRepoObject()
+ current_refs = {}
+ for ref in repo.refs:
+ current_refs[ref.path] = ref
+ unseen = set(current_refs.keys())
+ for path, hexsha in refs.items():
+ binsha = gitdb.util.to_bin_sha(hexsha)
+ obj = git.objects.Object.new_from_sha(repo, binsha)
+ self.log.debug("Create reference %s", path)
+ git.refs.Reference.create(repo, path, obj, force=True)
+ unseen.discard(path)
+ for path in unseen:
+ self.log.debug("Delete reference %s", path)
+ git.refs.SymbolicReference.delete(repo, ref.path)
+
def checkout(self, ref):
repo = self.createRepoObject()
self.log.debug("Checking out %s" % ref)
@@ -299,8 +316,21 @@
for ref in repo.getRefs():
if ref.path.startswith('refs/zuul'):
continue
+ if ref.path.startswith('refs/remotes'):
+ continue
project[ref.path] = ref.object.hexsha
+ def _restoreRepoState(self, connection_name, project_name, repo,
+ repo_state):
+ projects = repo_state.get(connection_name, {})
+ project = projects.get(project_name, {})
+ if not project:
+ # We don't have a state for this project.
+ return
+ self.log.debug("Restore repo state for project %s/%s",
+ connection_name, project_name)
+ repo.setRefs(project)
+
def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project'])
try:
@@ -349,6 +379,9 @@
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return None
+ self._restoreRepoState(item['connection'], item['project'], repo,
+ repo_state)
+
base = repo.getBranchHead(item['branch'])
# Save the repo state so that later mergers can repeat
# this process.
@@ -368,6 +401,7 @@
# commits of each project-branch
for key, mrc in recent.items():
connection, project, branch = key
+ zuul_ref = None
try:
repo = self.getRepo(connection, project)
zuul_ref = branch + '/' + item['ref']
diff --git a/zuul/model.py b/zuul/model.py
index 4ae6f9a..15ba45e 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1131,7 +1131,6 @@
def __init__(self, item):
self.item = item
- self.other_changes = []
self.builds = {}
self.result = None
self.next_build_set = None
@@ -1139,6 +1138,8 @@
self.ref = None
self.commit = None
self.zuul_url = None
+ self.dependent_items = None
+ self.merger_items = None
self.unable_to_merge = False
self.config_error = None # None or an error message string.
self.failing_reasons = []
@@ -1146,6 +1147,7 @@
self.nodesets = {} # job -> nodeset
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
+ self.repo_state = {}
self.layout = None
self.tries = {}
@@ -1159,13 +1161,19 @@
# The change isn't enqueued until after it's created
# so we don't know what the other changes ahead will be
# until jobs start.
- if not self.other_changes:
+ if self.dependent_items is None:
+ items = []
next_item = self.item.item_ahead
while next_item:
- self.other_changes.append(next_item.change)
+ items.append(next_item)
next_item = next_item.item_ahead
+ self.dependent_items = items
if not self.ref:
self.ref = 'Z' + uuid4().hex
+ if self.merger_items is None:
+ items = [self.item] + self.dependent_items
+ items.reverse()
+ self.merger_items = [i.makeMergerItem() for i in items]
def getStateName(self, state_num):
return self.states_map.get(
@@ -1217,9 +1225,26 @@
return self.tries.get(job_name, 0)
def getMergeMode(self):
- if self.layout:
+ # We may be called before this build set has a shadow layout
+ # (ie, we are called to perform the merge to create that
+ # layout). It's possible that the change we are merging will
+ # update the merge-mode for the project, but there's not much
+ # we can do about that here. Instead, do the best we can by
+ # using the nearest shadow layout to determine the merge mode,
+ # or if that fails, the current live layout, or if that fails,
+ # use the default: merge-resolve.
+ item = self.item
+ layout = None
+ while item:
+ layout = item.current_build_set.layout
+ if layout:
+ break
+ item = item.item_ahead
+ if not layout:
+ layout = self.item.pipeline.layout
+ if layout:
project = self.item.change.project
- project_config = self.layout.project_configs.get(
+ project_config = layout.project_configs.get(
project.canonical_name)
if project_config:
return project_config.merge_mode
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a67973e..40d5eb7 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -138,16 +138,18 @@
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
+ :arg dict repo_state: The starting repo state before the merge.
"""
def __init__(self, build_set, zuul_url, merged, updated, commit,
- files):
+ files, repo_state):
self.build_set = build_set
self.zuul_url = zuul_url
self.merged = merged
self.updated = updated
self.commit = commit
self.files = files
+ self.repo_state = repo_state
class NodesProvisionedEvent(ResultEvent):
@@ -316,11 +318,11 @@
self.log.debug("Done adding complete event for build: %s" % build)
def onMergeCompleted(self, build_set, zuul_url, merged, updated,
- commit, files):
+ commit, files, repo_state):
self.log.debug("Adding merge complete event for build set: %s" %
build_set)
event = MergeCompletedEvent(build_set, zuul_url, merged,
- updated, commit, files)
+ updated, commit, files, repo_state)
self.result_event_queue.put(event)
self.wake_event.set()