Merge "Update trigger script for new zuul url parameter"
diff --git a/NEWS.rst b/NEWS.rst
index 52f5c4f..bd09bfe 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -1,9 +1,16 @@
Since 2.0.0:
* The push_change_refs option which specified that Zuul refs should be
- pushed to Gerrit has been removed. Similar functionality may be
- obtained using the replication feature. See the Triggers
- documentation for details.
+ pushed to Gerrit has been removed.
+
+* Git merge operations are now performed in a separate process. Run
+ at least one instance of the ``zuul-merger`` program which is now
+ included. Any number of Zuul-Mergers may be run in order to
+ distribute the work of speculatively merging changes into git and
+ serving the results to test workers. This system is designed to
+ scale out to many servers, but one instance may be co-located with
+ the Zuul server in smaller deployments. Several configuration
+ options have moved from the ``zuul`` section to ``merger``.
Since 1.3.0:
diff --git a/doc/source/gating.rst b/doc/source/gating.rst
index f3f2d3c..43a5928 100644
--- a/doc/source/gating.rst
+++ b/doc/source/gating.rst
@@ -28,6 +28,9 @@
Zuul was designed to handle the workflow of the OpenStack project, but
can be used with any project.
+Testing in parallel
+-------------------
+
A particular focus of Zuul is ensuring correctly ordered testing of
changes in parallel. A gating system should always test each change
applied to the tip of the branch exactly as it is going to be merged.
@@ -208,3 +211,72 @@
}
}
+
+Cross projects dependencies
+---------------------------
+
+When your projects are closely coupled together, you want to make sure
+changes entering the gate are going to be tested with the version of
+other projects currently enqueued in the gate (since they will
+eventually be merged and might introduce breaking features).
+
+Such dependencies can be defined in Zuul configuration by registering a job
+in a DependentPipeline of several projects. Whenever a change enters such a
+pipeline, it will create references for the other projects as well. As an
+example, given a main project ``acme`` and a plugin ``plugin`` you can
+define a job ``acme-tests`` which should be run for both projects:
+
+.. code-block:: yaml
+
+ pipelines:
+ - name: gate
+ manager: DependentPipelineManager
+
+ projects::
+ - name: acme
+ gate:
+ - acme-tests
+ - name: plugin
+ gate:
+ - acme-tests # Register job again
+
+Whenever a change enters the ``gate`` pipeline queue, Zuul creates a reference
+for it. For each subsequent change, an additional reference is created for the
+changes ahead in the queue. As a result, you will always be able to fetch the
+future state of your project dependencies for each change in the queue.
+
+Based on the pipeline and project definitions above, three changes are
+inserted in the ``gate`` pipeline with the associated references:
+
+ ======== ======= ====== =========
+ Change Project Branch Zuul Ref.
+ ======== ======= ====== =========
+ Change 1 acme master master/Z1
+ Change 2 plugin stable stable/Z2
+ Change 3 plugin master master/Z3
+ ======== ======= ====== =========
+
+Since the changes enter a DependentPipelineManager pipeline, Zuul creates
+additional references:
+
+ ====== ======= ========= =============================
+ Change Project Zuul Ref. Description
+ ====== ======= ========= =============================
+ 1 acme master/Z1 acme master + change 1
+ ------ ------- --------- -----------------------------
+ 2 acme master/Z2 acme master + change 1
+ 2 plugin stable/Z2 plugin stable + change 2
+ ------ ------- --------- -----------------------------
+ 3 acme master/Z3 acme master + change 1
+ 3 plugin stable/Z3 plugin stable + change 2
+ 3 plugin master/Z3 plugin master + change 3
+ ====== ======= ========= =============================
+
+In order to test change 3, you would clone both repositories and simply
+fetch the Z3 reference for each combination of project/branch you are
+interested in testing. For example, you could fetch ``acme`` with
+master/Z3 and ``plugin`` with master/Z3 and thus have ``acme`` with
+change 1 applied as the expected state for when Change 3 would merge.
+When your job fetches several repositories without changes ahead in the
+queue, they may not have a Z reference in which case you can just check
+out the branch.
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 4b7b4b0..c5beda0 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -20,6 +20,7 @@
gating
triggers
+ merger
launchers
reporters
zuul
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c56d6e9..db49933 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -87,8 +87,8 @@
**ZUUL_PIPELINE**
The Zuul pipeline that is building this job
**ZUUL_URL**
- The url for the zuul server as configured in zuul.conf.
- A test runner may use this URL as the basis for fetching
+ The url for the zuul server as configured in zuul.conf.
+ A test runner may use this URL as the basis for fetching
git commits.
The following additional parameters will only be provided for builds
@@ -195,6 +195,30 @@
The URL with the status or results of the build. Will be used in
the status page and the final report.
+To help with debugging builds a worker may send back some optional
+metadata:
+
+**worker_name** (optional)
+ The name of the worker.
+
+**worker_hostname** (optional)
+ The hostname of the worker.
+
+**worker_ips** (optional)
+ A list of IPs for the worker.
+
+**worker_fqdn** (optional)
+ The FQDN of the worker.
+
+**worker_program** (optional)
+ The program name of the worker. For example Jenkins or turbo-hipster.
+
+**worker_version** (optional)
+ The version of the software running the job.
+
+**worker_extra** (optional)
+ A dictionary of any extra metadata you may want to pass along.
+
It should then immediately send a WORK_STATUS packet with a value of 0
percent complete. It may then optionally send subsequent WORK_STATUS
packets with updated completion values.
diff --git a/doc/source/merger.rst b/doc/source/merger.rst
new file mode 100644
index 0000000..4c445c6
--- /dev/null
+++ b/doc/source/merger.rst
@@ -0,0 +1,63 @@
+:title: Merger
+
+Merger
+======
+
+The Zuul Merger is a separate component which communicates with the
+main Zuul server via Gearman. Its purpose is to speculatively merge
+the changes for Zuul in preparation for testing. The resulting git
+commits also must be served to the test workers, and the server(s)
+running the Zuul Merger are expected to do this as well. Because both
+of these tasks are resource intensive, any number of Zuul Mergers can
+be run in parallel on distinct hosts.
+
+Configuration
+~~~~~~~~~~~~~
+
+The Zuul Merger can read the same zuul.conf file as the main Zuul
+server and requires the ``gearman``, ``gerrit``, ``merger``, and
+``zuul`` sections (indicated fields only). Be sure the zuul_url is
+set appropriately on each host that runs a zuul-merger.
+
+Zuul References
+~~~~~~~~~~~~~~~
+
+As the DependentPipelineManager may combine several changes together
+for testing when performing speculative execution, determining exactly
+how the workspace should be set up when running a Job can be complex.
+To alleviate this problem, Zuul performs merges itself, merging or
+cherry-picking changes as required and identifies the result with a
+Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
+Preparing the workspace is then a simple matter of fetching that ref
+and checking it out. The parameters that provide this information are
+described in :ref:`launchers`.
+
+These references need to be made available via a Git repository that
+is available to Jenkins. This is accomplished by serving Zuul's Git
+repositories directly.
+
+Serving Zuul Git Repos
+~~~~~~~~~~~~~~~~~~~~~~
+
+Zuul maintains its own copies of any needed Git repositories in the
+directory specified by ``git_dir`` in the ``merger`` section of
+zuul.conf (by default, /var/lib/zuul/git). To directly serve Zuul's
+Git repositories in order to provide Zuul refs for Jenkins, you can
+configure Apache to do so using the following directives::
+
+ SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
+ SetEnv GIT_HTTP_EXPORT_ALL
+
+ AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
+ AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
+ ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
+
+And set ``push_change_refs`` to ``false`` (the default) in the
+``zuul`` section of zuul.conf.
+
+Note that Zuul's Git repositories are not bare, which means they have
+a working tree, and are not suitable for public consumption (for
+instance, a clone will produce a repository in an unpredictable state
+depending on what the state of Zuul's repository is when the clone
+happens). They are, however, suitable for automated systems that
+respond to Zuul triggers.
diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst
index 21aab13..c4485bf 100644
--- a/doc/source/triggers.rst
+++ b/doc/source/triggers.rst
@@ -35,79 +35,6 @@
be added to Gerrit. Zuul is very flexible and can take advantage of
those.
-Zuul References
-~~~~~~~~~~~~~~~
-
-As the DependentPipelineManager may combine several changes together
-for testing when performing speculative execution, determining exactly
-how the workspace should be set up when running a Job can be complex.
-To alleviate this problem, Zuul performs merges itself, merging or
-cherry-picking changes as required and identifies the result with a
-Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
-Preparing the workspace is then a simple matter of fetching that ref
-and checking it out. The parameters that provide this information are
-described in :ref:`launchers`.
-
-These references need to be made available via a Git repository that
-is available to Jenkins. You may accomplish this by either serving
-Zuul's git repositories directly, allowing Zuul to push the references
-back to Gerrit, or pushing the references to a third location.
-Instructions for each of these alternatives are in the following
-sections.
-
-Serving Zuul Git Repos
-""""""""""""""""""""""
-
-Zuul maintains its own copies of any needed Git repositories in the
-directory specified by ``git_dir`` in the ``zuul`` section of
-zuul.conf (by default, /var/lib/zuul/git). If you want to serve
-Zuul's Git repositories in order to provide Zuul refs for Jenkins, you
-can configure Apache to do so using the following directives::
-
- SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
- SetEnv GIT_HTTP_EXPORT_ALL
-
- AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
- AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
- ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
-
-And set ``push_change_refs`` to ``false`` (the default) in the
-``zuul`` section of zuul.conf.
-
-Note that Zuul's Git repositories are not bare, which means they have
-a working tree, and are not suitable for public consumption (for
-instance, a clone will produce a repository in an unpredictable state
-depending on what the state of Zuul's repository is when the clone
-happens). They are, however, suitable for automated systems that
-respond to Zuul triggers.
-
-Pushing to Gerrit
-"""""""""""""""""
-
-If you want to push Zuul refs back to Gerrit, set the following
-permissions for your project (or ``All-Projects``) in Gerrit (where
-``CI Tools`` is a group of which the user you created above is a
-member)::
-
- [access "refs/zuul/*"]
- create = group CI Tools
- push = +force CI Tools
- pushMerge = group CI Tools
- forgeAuthor = group CI Tools
- [access "refs/for/refs/zuul/*"]
- pushMerge = group CI Tools
-
-And set the following in ``zuul.conf``:
-
- [replication]
- url1=ssh://user@review.example.com:29418/
-
-Pushing to Another Location
-"""""""""""""""""""""""""""
-
-Simply set one or more destination URLs in the ``replication`` section
-of zuul.conf as above.
-
Timer
-----
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index e560a45..1a6a23d 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -83,21 +83,49 @@
""""
**layout_config**
- Path to layout config file.
+ Path to layout config file. Used by zuul-server only.
``layout_config=/etc/zuul/layout.yaml``
**log_config**
- Path to log config file.
+ Path to log config file. Used by zuul-server only.
``log_config=/etc/zuul/logging.yaml``
**pidfile**
- Path to PID lock file.
+ Path to PID lock file. Used by zuul-server only.
``pidfile=/var/run/zuul/zuul.pid``
**state_dir**
- Path to directory that Zuul should save state to.
+ Path to directory that Zuul should save state to. Used by all Zuul
+ commands.
``state_dir=/var/lib/zuul``
+**report_times**
+ Boolean value (``true`` or ``false``) that determines if Zuul should
+ include elapsed times for each job in the textual report. Used by
+ zuul-server only.
+ ``report_times=true``
+
+**status_url**
+ URL that will be posted in Zuul comments made to Gerrit changes when
+ starting jobs for a change. Used by zuul-server only.
+ ``status_url=https://zuul.example.com/status``
+
+**url_pattern**
+ If you are storing build logs external to the system that originally
+ ran jobs and wish to link to those logs when Zuul makes comments on
+ Gerrit changes for completed jobs this setting configures what the
+ URLs for those links should be. Used by zuul-server only.
+ ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
+
+**job_name_in_report**
+ Boolean value (``true`` or ``false``) that indicates whether the
+ job name should be included in the report (normally only the URL
+ is included). Defaults to ``false``. Used by zuul-server only.
+ ``job_name_in_report=true``
+
+merger
+""""""
+
**git_dir**
Directory that Zuul should clone local git repositories to.
``git_dir=/var/lib/zuul/git``
@@ -110,32 +138,18 @@
Optional: Value to pass to `git config user.name`.
``git_user_name=zuul``
-**report_times**
- Boolean value (``true`` or ``false``) that determines if Zuul should
- include elapsed times for each job in the textual report.
- ``report_times=true``
-
-**status_url**
- URL that will be posted in Zuul comments made to Gerrit changes when
- starting jobs for a change.
- ``status_url=https://zuul.example.com/status``
-
-**url_pattern**
- If you are storing build logs external to the system that originally
- ran jobs and wish to link to those logs when Zuul makes comments on
- Gerrit changes for completed jobs this setting configures what the
- URLs for those links should be.
- ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
-
-**job_name_in_report**
- Boolean value (``true`` or ``false``) that indicates whether the
- job name should be included in the report (normally only the URL
- is included). Defaults to ``false``.
- ``job_name_in_report=true``
-
**zuul_url**
- URL of Zuul's git repos, accessible to test workers.
- Usually "http://zuul.example.com/p".
+ URL of this merger's git repos, accessible to test workers. Usually
+ "http://zuul.example.com/p" or "http://zuul-merger01.example.com/p"
+ depending on whether the merger is co-located with the Zuul server.
+
+**log_config**
+ Path to log config file for the merger process.
+ ``log_config=/etc/zuul/logging.yaml``
+
+**pidfile**
+ Path to PID lock file for the merger process.
+ ``pidfile=/var/run/zuul-merger/merger.pid``
smtp
""""
@@ -154,16 +168,6 @@
This can be overridden by individual pipelines.
``default_to=you@example.com``
-replication
-"""""""""""
-
-Zuul can push the refs it creates to any number of servers. To do so,
-list the git push URLs in this section, one per line as follows::
-
- [replication]
- url1=ssh://user@host1.example.com:port/path/to/repo
- url2=ssh://user@host2.example.com:port/path/to/repo
-
layout.yaml
~~~~~~~~~~~
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index a4d1390..75c84e4 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -15,10 +15,12 @@
log_config=/etc/zuul/logging.conf
pidfile=/var/run/zuul/zuul.pid
state_dir=/var/lib/zuul
+status_url=https://jenkins.example.com/zuul/status
+
+[merger]
git_dir=/var/lib/zuul/git
;git_user_email=zuul@example.com
;git_user_name=zuul
-status_url=https://jenkins.example.com/zuul/status
zuul_url=http://zuul.example.com/p
[smtp]
diff --git a/requirements.txt b/requirements.txt
index 170b5152..92bb296 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,5 +13,5 @@
extras
statsd>=1.0.0,<3.0
voluptuous>=0.7
-gear>=0.4.0,<1.0.0
+gear>=0.5.1,<1.0.0
apscheduler>=2.1.1,<3.0
diff --git a/setup.cfg b/setup.cfg
index 9ff62d6..21b1199 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
+ zuul-merger = zuul.cmd.merger:main
zuul = zuul.cmd.client:main
[build_sphinx]
diff --git a/tests/fixtures/layout-no-jobs.yaml b/tests/fixtures/layout-no-jobs.yaml
new file mode 100644
index 0000000..ee8dc62
--- /dev/null
+++ b/tests/fixtures/layout-no-jobs.yaml
@@ -0,0 +1,43 @@
+includes:
+ - python-file: custom_functions.py
+
+pipelines:
+ - name: check
+ manager: IndependentPipelineManager
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ verified: 1
+ failure:
+ gerrit:
+ verified: -1
+
+ - name: gate
+ manager: DependentPipelineManager
+ failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - approved: 1
+ success:
+ gerrit:
+ verified: 2
+ submit: true
+ failure:
+ gerrit:
+ verified: -2
+ start:
+ gerrit:
+ verified: 0
+ precedence: high
+
+projects:
+ - name: org/project
+ merge-mode: cherry-pick
+ check:
+ - noop
+ gate:
+ - noop
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 98dfe86..b1c94de 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -231,3 +231,7 @@
- conflict-project-merge:
- conflict-project-test1
- conflict-project-test2
+
+ - name: org/noop-project
+ gate:
+ - noop
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index f77e07e..bee06e4 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -8,11 +8,13 @@
[zuul]
layout_config=layout.yaml
+url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
+job_name_in_report=true
+
+[merger]
git_dir=/tmp/zuul-test/git
git_user_email=zuul@example.com
git_user_name=zuul
-url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
-job_name_in_report=true
zuul_url=http://zuul.example.com/p
[smtp]
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 9787ae1..9576440 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -47,6 +47,8 @@
import zuul.rpclistener
import zuul.rpcclient
import zuul.launcher.gearman
+import zuul.merger.server
+import zuul.merger.client
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
@@ -66,9 +68,10 @@
def repack_repo(path):
- output = subprocess.Popen(
- ['git', '--git-dir=%s/.git' % path, 'repack', '-afd'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
+ output = subprocess.Popen(cmd, close_fds=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode:
raise Exception("git repack returned %d" % output.returncode)
@@ -158,6 +161,7 @@
repo.head.reference = 'master'
repo.head.reset(index=True, working_tree=True)
repo.git.clean('-x', '-f', '-d')
+ repo.heads['master'].checkout()
return r
def addPatchset(self, files=[], large=False):
@@ -494,12 +498,23 @@
'name': self.name,
'number': self.number,
'manager': self.worker.worker_id,
+ 'worker_name': 'My Worker',
+ 'worker_hostname': 'localhost',
+ 'worker_ips': ['127.0.0.1', '192.168.1.1'],
+ 'worker_fqdn': 'zuul.example.org',
+ 'worker_program': 'FakeBuilder',
+ 'worker_version': 'v1.1',
+ 'worker_extra': {'something': 'else'}
}
+ self.log.debug('Running build %s' % self.unique)
+
self.job.sendWorkData(json.dumps(data))
+ self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
self.job.sendWorkStatus(0, 100)
if self.worker.hold_jobs_in_build:
+ self.log.debug('Holding build %s' % self.unique)
self._wait()
self.log.debug("Build %s continuing" % self.unique)
@@ -762,7 +777,7 @@
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
- CONFIG.set('zuul', 'git_dir', self.git_root)
+ CONFIG.set('merger', 'git_dir', self.git_root)
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
@@ -802,6 +817,9 @@
self.worker.addServer('127.0.0.1', self.gearman_server.port)
self.gearman_server.worker = self.worker
+ self.merge_server = zuul.merger.server.MergeServer(self.config)
+ self.merge_server.start()
+
self.sched = zuul.scheduler.Scheduler()
def URLOpenerFactory(*args, **kw):
@@ -810,6 +828,8 @@
urllib2.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
+ self.merge_client = zuul.merger.client.MergeClient(
+ self.config, self.sched)
self.smtp_messages = []
@@ -831,6 +851,7 @@
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.setLauncher(self.launcher)
+ self.sched.setMerger(self.merge_client)
self.sched.registerTrigger(self.gerrit)
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
@@ -871,6 +892,9 @@
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launcher.stop()
+ self.merge_server.stop()
+ self.merge_server.join()
+ self.merge_client.stop()
self.worker.shutdown()
self.gearman_server.shutdown()
self.gerrit.stop()
@@ -944,11 +968,17 @@
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
commit_messages = ['%s-1' % commit.subject for commit in commits]
+ self.log.debug("Checking if job %s has changes; commit_messages %s;"
+ " repo_messages %s; sha %s" % (job, commit_messages,
+ repo_messages, sha))
for msg in commit_messages:
if msg not in repo_messages:
+ self.log.debug(" messages do not match")
return False
if repo_shas[0] != sha:
+ self.log.debug(" sha does not match")
return False
+ self.log.debug(" OK")
return True
def registerJobs(self):
@@ -983,13 +1013,15 @@
done = True
for connection in self.gearman_server.active_connections:
if (connection.functions and
- connection.client_id != 'Zuul RPC Listener'):
+ connection.client_id not in ['Zuul RPC Listener',
+ 'Zuul Merger']):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
self.rpc.register()
+ self.merge_server.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
@@ -1075,13 +1107,17 @@
self.fake_gerrit.event_queue.join()
self.sched.trigger_event_queue.join()
self.sched.result_event_queue.join()
+ self.sched.run_handler_lock.acquire()
if (self.sched.trigger_event_queue.empty() and
self.sched.result_event_queue.empty() and
self.fake_gerrit.event_queue.empty() and
+ not self.merge_client.build_sets and
self.areAllBuildsWaiting()):
+ self.sched.run_handler_lock.release()
self.worker.lock.release()
self.log.debug("...settled.")
return
+ self.sched.run_handler_lock.release()
self.worker.lock.release()
self.sched.wake_event.wait(0.1)
@@ -2343,6 +2379,10 @@
def test_merger_repack_large_change(self):
"Test that the merger works with large changes after a repack"
# https://bugs.launchpad.net/zuul/+bug/1078946
+ # This test assumes the repo is already cloned; make sure it is
+ url = self.sched.triggers['gerrit'].getGitUrl(
+ self.sched.layout.projects['org/project1'])
+ self.merge_server.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")
@@ -2464,7 +2504,7 @@
def test_zuul_url_return(self):
"Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
- self.assertTrue(self.sched.config.has_option('zuul', 'zuul_url'))
+ self.assertTrue(self.sched.config.has_option('merger', 'zuul_url'))
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -2784,6 +2824,29 @@
self.assertReportedStat('test-timing', '3|ms')
self.assertReportedStat('test-guage', '12|g')
+ def test_stuck_job_cleanup(self):
+ "Test that pending jobs are cleaned up if removed from layout"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+ self.assertEqual(len(self.gearman_server.getQueue()), 1)
+
+ self.config.set('zuul', 'layout_config',
+ 'tests/fixtures/layout-no-jobs.yaml')
+ self.sched.reconfigure(self.config)
+ self.waitUntilSettled()
+
+ self.gearman_server.release('noop')
+ self.waitUntilSettled()
+ self.assertEqual(len(self.gearman_server.getQueue()), 0)
+ self.assertTrue(self.sched._areAllBuildsComplete())
+
+ self.assertEqual(len(self.history), 1)
+ self.assertEqual(self.history[0].name, 'noop')
+ self.assertEqual(self.history[0].result, 'SUCCESS')
+
def test_file_jobs(self):
"Test that file jobs run only when appropriate"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -3025,39 +3088,6 @@
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
- def test_push_urls(self):
- "Test that Zuul can push refs to multiple URLs"
- upstream_path = os.path.join(self.upstream_root, 'org/project')
- replica1 = os.path.join(self.upstream_root, 'replica1')
- replica2 = os.path.join(self.upstream_root, 'replica2')
-
- self.config.add_section('replication')
- self.config.set('replication', 'url1', 'file://%s' % replica1)
- self.config.set('replication', 'url2', 'file://%s' % replica2)
- self.sched.reconfigure(self.config)
-
- r1 = git.Repo.clone_from(upstream_path, replica1 + '/org/project.git')
- r2 = git.Repo.clone_from(upstream_path, replica2 + '/org/project.git')
-
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addApproval('CRVW', 2)
- self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
- B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
- B.addApproval('CRVW', 2)
- self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
- self.waitUntilSettled()
- count = 0
- for ref in r1.refs:
- if ref.path.startswith('refs/zuul'):
- count += 1
- self.assertEqual(count, 3)
-
- count = 0
- for ref in r2.refs:
- if ref.path.startswith('refs/zuul'):
- count += 1
- self.assertEqual(count, 3)
-
def test_timer(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
@@ -3274,6 +3304,7 @@
self.assertEqual(
enqueue_times[str(item.change)], item.enqueue_time)
+ self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
@@ -3340,6 +3371,7 @@
r = client.promote(pipeline='gate',
change_ids=['3,1'])
+ self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
@@ -3570,3 +3602,41 @@
self.assertEqual(queue.window, 2)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(C.data['status'], 'MERGED')
+
+ def test_worker_update_metadata(self):
+ "Test if a worker can send back metadata about itself"
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.launcher.builds), 1)
+
+ self.log.debug('Current builds:')
+ self.log.debug(self.launcher.builds)
+
+ start = time.time()
+ while True:
+ if time.time() - start > 10:
+ raise Exception("Timeout waiting for gearman server to report "
+ + "back to the client")
+ build = self.launcher.builds.values()[0]
+ if build.worker.name == "My Worker":
+ break
+ else:
+ time.sleep(0)
+
+ self.log.debug(build)
+ self.assertEqual("My Worker", build.worker.name)
+ self.assertEqual("localhost", build.worker.hostname)
+ self.assertEqual(['127.0.0.1', '192.168.1.1'], build.worker.ips)
+ self.assertEqual("zuul.example.org", build.worker.fqdn)
+ self.assertEqual("FakeBuilder", build.worker.program)
+ self.assertEqual("v1.1", build.worker.version)
+ self.assertEqual({'something': 'else'}, build.worker.extra)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
new file mode 100644
index 0000000..f046235
--- /dev/null
+++ b/zuul/cmd/merger.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import daemon
+import extras
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+import logging
+import logging.config
+import os
+import sys
+import signal
+import traceback
+
+# No zuul imports here because they pull in paramiko which must not be
+# imported until after the daemonization.
+# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
+
+
+def stack_dump_handler(signum, frame):
+ signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ log_str = ""
+ for thread_id, stack_frame in sys._current_frames().items():
+ log_str += "Thread: %s\n" % thread_id
+ log_str += "".join(traceback.format_stack(stack_frame))
+ log = logging.getLogger("zuul.stack_dump")
+ log.debug(log_str)
+ signal.signal(signal.SIGUSR2, stack_dump_handler)
+
+
+class Merger(object):
+ def __init__(self):
+ self.args = None
+ self.config = None
+
+ def parse_arguments(self):
+ parser = argparse.ArgumentParser(description='Zuul merge worker.')
+ parser.add_argument('-c', dest='config',
+ help='specify the config file')
+ parser.add_argument('-d', dest='nodaemon', action='store_true',
+ help='do not run as a daemon')
+ parser.add_argument('--version', dest='version', action='store_true',
+ help='show zuul version')
+ self.args = parser.parse_args()
+
+ def read_config(self):
+ self.config = ConfigParser.ConfigParser()
+ if self.args.config:
+ locations = [self.args.config]
+ else:
+ locations = ['/etc/zuul/zuul.conf',
+ '~/zuul.conf']
+ for fp in locations:
+ if os.path.exists(os.path.expanduser(fp)):
+ self.config.read(os.path.expanduser(fp))
+ return
+ raise Exception("Unable to locate config file in %s" % locations)
+
+ def setup_logging(self, section, parameter):
+ if self.config.has_option(section, parameter):
+ fp = os.path.expanduser(self.config.get(section, parameter))
+ if not os.path.exists(fp):
+ raise Exception("Unable to read logging config file at %s" %
+ fp)
+ logging.config.fileConfig(fp)
+ else:
+ logging.basicConfig(level=logging.DEBUG)
+
+ def exit_handler(self, signum, frame):
+ signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ self.merger.stop()
+ self.merger.join()
+
+ def main(self):
+ # See comment at top of file about zuul imports
+ import zuul.merger.server
+
+ self.setup_logging('merger', 'log_config')
+
+ self.merger = zuul.merger.server.MergeServer(self.config)
+ self.merger.start()
+
+ signal.signal(signal.SIGUSR1, self.exit_handler)
+ signal.signal(signal.SIGUSR2, stack_dump_handler)
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking merger to exit nicely...\n"
+ self.exit_handler(signal.SIGINT, None)
+
+
+def main():
+ server = Merger()
+ server.parse_arguments()
+
+ if server.args.version:
+ from zuul.version import version_info as zuul_version_info
+ print "Zuul version: %s" % zuul_version_info.version_string()
+ sys.exit(0)
+
+ server.read_config()
+
+ if server.config.has_option('zuul', 'state_dir'):
+ state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
+ else:
+ state_dir = '/var/lib/zuul'
+ test_fn = os.path.join(state_dir, 'test')
+ try:
+ f = open(test_fn, 'w')
+ f.close()
+ os.unlink(test_fn)
+ except Exception:
+ print
+ print "Unable to write to state directory: %s" % state_dir
+ print
+ raise
+
+ if server.config.has_option('merger', 'pidfile'):
+ pid_fn = os.path.expanduser(server.config.get('merger', 'pidfile'))
+ else:
+ pid_fn = '/var/run/zuul-merger/merger.pid'
+ pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+ if server.args.nodaemon:
+ server.main()
+ else:
+ with daemon.DaemonContext(pidfile=pid):
+ server.main()
+
+
+if __name__ == "__main__":
+ sys.path.insert(0, '.')
+ main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 7901535..79a2538 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -154,7 +154,13 @@
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
import gear
- gear.Server(4730)
+ statsd_host = os.environ.get('STATSD_HOST')
+ statsd_port = int(os.environ.get('STATSD_PORT', 8125))
+ gear.Server(4730,
+ statsd_host=statsd_host,
+ statsd_port=statsd_port,
+ statsd_prefix='zuul.geard')
+
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
pipe_read.read()
@@ -172,6 +178,7 @@
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.gearman
+ import zuul.merger.client
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
@@ -184,10 +191,12 @@
self.start_gear_server()
self.setup_logging('zuul', 'log_config')
+ self.log = logging.getLogger("zuul.Server")
self.sched = zuul.scheduler.Scheduler()
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
+ merger = zuul.merger.client.MergeClient(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
@@ -205,15 +214,19 @@
)
self.sched.setLauncher(gearman)
+ self.sched.setMerger(merger)
self.sched.registerTrigger(gerrit)
self.sched.registerTrigger(timer)
self.sched.registerReporter(gerrit_reporter)
self.sched.registerReporter(smtp_reporter)
+ self.log.info('Starting scheduler')
self.sched.start()
self.sched.reconfigure(self.config)
self.sched.resume()
+ self.log.info('Starting Webapp')
webapp.start()
+ self.log.info('Starting RPC')
rpc.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
@@ -243,21 +256,6 @@
path = None
sys.exit(server.test_config(path))
- if server.config.has_option('zuul', 'state_dir'):
- state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
- else:
- state_dir = '/var/lib/zuul'
- test_fn = os.path.join(state_dir, 'test')
- try:
- f = open(test_fn, 'w')
- f.close()
- os.unlink(test_fn)
- except:
- print
- print "Unable to write to state directory: %s" % state_dir
- print
- raise
-
if server.config.has_option('zuul', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
else:
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 3500445..3a690dc 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -155,7 +155,6 @@
self.sched = sched
self.builds = {}
self.meta_jobs = {} # A list of meta-jobs like stop or describe
- self.zuul_server = config.get('zuul', 'zuul_url')
server = config.get('gearman', 'server')
if config.has_option('gearman', 'port'):
@@ -226,7 +225,7 @@
params = dict(ZUUL_UUID=uuid,
ZUUL_PROJECT=item.change.project.name)
params['ZUUL_PIPELINE'] = pipeline.name
- params['ZUUL_URL'] = self.zuul_server
+ params['ZUUL_URL'] = item.current_build_set.zuul_url
if hasattr(item.change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,
@@ -299,7 +298,7 @@
if not self.isJobRegistered(gearman_job.name):
self.log.error("Job %s is not registered with Gearman" %
gearman_job)
- self.onBuildCompleted(gearman_job, 'LOST')
+ self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
return build
if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
@@ -313,14 +312,14 @@
self.gearman.submitJob(gearman_job, precedence=precedence)
except Exception:
self.log.exception("Unable to submit job to Gearman")
- self.onBuildCompleted(gearman_job, 'LOST')
+ self.onBuildCompleted(gearman_job, 'EXCEPTION')
return build
if not gearman_job.handle:
self.log.error("No job handle was received for %s after 30 seconds"
" marking as lost." %
gearman_job)
- self.onBuildCompleted(gearman_job, 'LOST')
+ self.onBuildCompleted(gearman_job, 'NO_HANDLE')
return build
@@ -381,6 +380,8 @@
if build:
# Allow URL to be updated
build.url = data.get('url') or build.url
+ # Update information about worker
+ build.worker.updateFromData(data)
if build.number is None:
self.log.info("Build %s started" % job)
@@ -395,7 +396,7 @@
def onDisconnect(self, job):
self.log.info("Gearman job %s lost due to disconnect" % job)
- self.onBuildCompleted(job, 'LOST')
+ self.onBuildCompleted(job)
def onUnknownJob(self, job):
self.log.info("Gearman job %s lost due to unknown handle" % job)
diff --git a/zuul/merger.py b/zuul/merger.py
deleted file mode 100644
index a580a21..0000000
--- a/zuul/merger.py
+++ /dev/null
@@ -1,309 +0,0 @@
-# Copyright 2012 Hewlett-Packard Development Company, L.P.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import git
-import os
-import logging
-import model
-import threading
-
-
-class ZuulReference(git.Reference):
- _common_path_default = "refs/zuul"
- _points_to_commits_only = True
-
-
-class Repo(object):
- log = logging.getLogger("zuul.Repo")
-
- def __init__(self, remote, local, email, username):
- self.remote_url = remote
- self.local_path = local
- self.email = email
- self.username = username
- self._initialized = False
- try:
- self._ensure_cloned()
- except:
- self.log.exception("Unable to initialize repo for %s" % remote)
-
- def _ensure_cloned(self):
- repo_is_cloned = os.path.exists(self.local_path)
- if self._initialized and repo_is_cloned:
- return
- # If the repo does not exist, clone the repo.
- if not repo_is_cloned:
- self.log.debug("Cloning from %s to %s" % (self.remote_url,
- self.local_path))
- git.Repo.clone_from(self.remote_url, self.local_path)
- repo = git.Repo(self.local_path)
- if self.email:
- repo.config_writer().set_value('user', 'email',
- self.email)
- if self.username:
- repo.config_writer().set_value('user', 'name',
- self.username)
- repo.config_writer().write()
- self._initialized = True
-
- def createRepoObject(self):
- try:
- self._ensure_cloned()
- repo = git.Repo(self.local_path)
- except:
- self.log.exception("Unable to initialize repo for %s" %
- self.local_path)
- return repo
-
- def reset(self):
- repo = self.createRepoObject()
- self.log.debug("Resetting repository %s" % self.local_path)
- self.update()
- origin = repo.remotes.origin
- for ref in origin.refs:
- if ref.remote_head == 'HEAD':
- continue
- repo.create_head(ref.remote_head, ref, force=True)
-
- # Reset to remote HEAD (usually origin/master)
- repo.head.reference = origin.refs['HEAD']
- repo.head.reset(index=True, working_tree=True)
- repo.git.clean('-x', '-f', '-d')
-
- def getBranchHead(self, branch):
- repo = self.createRepoObject()
- branch_head = repo.heads[branch]
- return branch_head
-
- def checkout(self, ref):
- repo = self.createRepoObject()
- self.log.debug("Checking out %s" % ref)
- repo.head.reference = ref
- repo.head.reset(index=True, working_tree=True)
-
- def cherryPick(self, ref):
- repo = self.createRepoObject()
- self.log.debug("Cherry-picking %s" % ref)
- self.fetch(ref)
- repo.git.cherry_pick("FETCH_HEAD")
-
- def merge(self, ref, strategy=None):
- repo = self.createRepoObject()
- args = []
- if strategy:
- args += ['-s', strategy]
- args.append('FETCH_HEAD')
- self.fetch(ref)
- self.log.debug("Merging %s with args %s" % (ref, args))
- repo.git.merge(*args)
-
- def fetch(self, ref):
- repo = self.createRepoObject()
- # The git.remote.fetch method may read in git progress info and
- # interpret it improperly causing an AssertionError. Because the
- # data was fetched properly subsequent fetches don't seem to fail.
- # So try again if an AssertionError is caught.
- origin = repo.remotes.origin
- try:
- origin.fetch(ref)
- except AssertionError:
- origin.fetch(ref)
-
- def createZuulRef(self, ref, commit='HEAD'):
- repo = self.createRepoObject()
- self.log.debug("CreateZuulRef %s at %s " % (ref, commit))
- ref = ZuulReference.create(repo, ref, commit)
- return ref.commit
-
- def push(self, local, remote):
- repo = self.createRepoObject()
- self.log.debug("Pushing %s:%s to %s " % (local, remote,
- self.remote_url))
- repo.remotes.origin.push('%s:%s' % (local, remote))
-
- def push_url(self, url, refspecs):
- repo = self.createRepoObject()
- self.log.debug("Pushing %s to %s" % (refspecs, url))
- repo.git.push([url] + refspecs)
-
- def update(self):
- repo = self.createRepoObject()
- self.log.debug("Updating repository %s" % self.local_path)
- origin = repo.remotes.origin
- origin.update()
-
-
-class Merger(object):
- log = logging.getLogger("zuul.Merger")
-
- def __init__(self, working_root, sshkey, email, username, replicate_urls):
- self.repos = {}
- self.working_root = working_root
- if not os.path.exists(working_root):
- os.makedirs(working_root)
- if sshkey:
- self._makeSSHWrapper(sshkey)
- self.email = email
- self.username = username
- self.replicate_urls = replicate_urls
-
- def _makeSSHWrapper(self, key):
- name = os.path.join(self.working_root, '.ssh_wrapper')
- fd = open(name, 'w')
- fd.write('#!/bin/bash\n')
- fd.write('ssh -i %s $@\n' % key)
- fd.close()
- os.chmod(name, 0755)
- os.environ['GIT_SSH'] = name
-
- def addProject(self, project, url):
- try:
- path = os.path.join(self.working_root, project.name)
- repo = Repo(url, path, self.email, self.username)
-
- self.repos[project] = repo
- except:
- self.log.exception("Unable to add project %s" % project)
-
- def getRepo(self, project):
- return self.repos.get(project, None)
-
- def updateRepo(self, project):
- repo = self.getRepo(project)
- try:
- self.log.info("Updating local repository %s", project)
- repo.update()
- except:
- self.log.exception("Unable to update %s", project)
-
- def _mergeChange(self, change, ref, target_ref):
- repo = self.getRepo(change.project)
- try:
- repo.checkout(ref)
- except:
- self.log.exception("Unable to checkout %s" % ref)
- return False
-
- try:
- mode = change.project.merge_mode
- if mode == model.MERGER_MERGE:
- repo.merge(change.refspec)
- elif mode == model.MERGER_MERGE_RESOLVE:
- repo.merge(change.refspec, 'resolve')
- elif mode == model.MERGER_CHERRY_PICK:
- repo.cherryPick(change.refspec)
- else:
- raise Exception("Unsupported merge mode: %s" % mode)
- except Exception:
- # Log exceptions at debug level because they are
- # usually benign merge conflicts
- self.log.debug("Unable to merge %s" % change, exc_info=True)
- return False
-
- try:
- # Keep track of the last commit, it's the commit that
- # will be passed to jenkins because it's the commit
- # for the triggering change
- zuul_ref = change.branch + '/' + target_ref
- commit = repo.createZuulRef(zuul_ref, 'HEAD').hexsha
- except:
- self.log.exception("Unable to set zuul ref %s for change %s" %
- (zuul_ref, change))
- return False
- return commit
-
- def replicateRefspecs(self, refspecs):
- threads = []
- for url in self.replicate_urls:
- t = threading.Thread(target=self._replicate,
- args=(url, refspecs))
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
-
- def _replicate(self, url, project_refspecs):
- try:
- for project, refspecs in project_refspecs.items():
- repo = self.getRepo(project)
- repo.push_url(os.path.join(url, project.name + '.git'),
- refspecs)
- except Exception:
- self.log.exception("Exception pushing to %s" % url)
-
- def mergeChanges(self, items, target_ref=None):
- # Merge shortcuts:
- # if this is the only change just merge it against its branch.
- # elif there are changes ahead of us that are from the same project and
- # branch we can merge against the commit associated with that change
- # instead of going back up the tree.
- #
- # Shortcuts assume some external entity is checking whether or not
- # changes from other projects can merge.
- commit = False
- item = items[-1]
- sibling_filter = lambda i: (i.change.project == item.change.project and
- i.change.branch == item.change.branch)
- sibling_items = filter(sibling_filter, items)
- # Only current change to merge against tip of change.branch
- if len(sibling_items) == 1:
- repo = self.getRepo(item.change.project)
- # we need to reset here in order to call getBranchHead
- try:
- repo.reset()
- except:
- self.log.exception("Unable to reset repo %s" % repo)
- return False
- commit = self._mergeChange(item.change,
- repo.getBranchHead(item.change.branch),
- target_ref=target_ref)
- # Sibling changes exist. Merge current change against newest sibling.
- elif (len(sibling_items) >= 2 and
- sibling_items[-2].current_build_set.commit):
- last_commit = sibling_items[-2].current_build_set.commit
- commit = self._mergeChange(item.change, last_commit,
- target_ref=target_ref)
- # Either change did not merge or we did not need to merge as there were
- # previous merge conflicts.
- if not commit:
- return commit
-
- project_branches = []
- replicate_refspecs = {}
- for i in reversed(items):
- # Here we create all of the necessary zuul refs and potentially
- # push them back to Gerrit.
- if (i.change.project, i.change.branch) in project_branches:
- continue
- repo = self.getRepo(i.change.project)
- if (i.change.project != item.change.project or
- i.change.branch != item.change.branch):
- # Create a zuul ref for all dependent changes project
- # branch combinations as this is the ref that jenkins will
- # use to test. The ref for change has already been set so
- # we skip it here.
- try:
- zuul_ref = i.change.branch + '/' + target_ref
- repo.createZuulRef(zuul_ref, i.current_build_set.commit)
- except:
- self.log.exception("Unable to set zuul ref %s for "
- "change %s" % (zuul_ref, i.change))
- return False
- ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
- refspecs = replicate_refspecs.get(i.change.project, [])
- refspecs.append('%s:%s' % (ref, ref))
- replicate_refspecs[i.change.project] = refspecs
- project_branches.append((i.change.project, i.change.branch))
- self.replicateRefspecs(replicate_refspecs)
- return commit
diff --git a/zuul/merger/__init__.py b/zuul/merger/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/merger/__init__.py
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
new file mode 100644
index 0000000..72fd4c5
--- /dev/null
+++ b/zuul/merger/client.py
@@ -0,0 +1,117 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+from uuid import uuid4
+
+import gear
+
+
+def getJobData(job):
+ if not len(job.data):
+ return {}
+ d = job.data[-1]
+ if not d:
+ return {}
+ return json.loads(d)
+
+
+class MergeGearmanClient(gear.Client):
+ def __init__(self, merge_client):
+ super(MergeGearmanClient, self).__init__()
+ self.__merge_client = merge_client
+
+ def handleWorkComplete(self, packet):
+ job = super(MergeGearmanClient, self).handleWorkComplete(packet)
+ self.__merge_client.onBuildCompleted(job)
+ return job
+
+ def handleWorkFail(self, packet):
+ job = super(MergeGearmanClient, self).handleWorkFail(packet)
+ self.__merge_client.onBuildCompleted(job)
+ return job
+
+ def handleWorkException(self, packet):
+ job = super(MergeGearmanClient, self).handleWorkException(packet)
+ self.__merge_client.onBuildCompleted(job)
+ return job
+
+ def handleDisconnect(self, job):
+ job = super(MergeGearmanClient, self).handleDisconnect(job)
+ self.__merge_client.onBuildCompleted(job)
+
+
+class MergeClient(object):
+ log = logging.getLogger("zuul.MergeClient")
+
+ def __init__(self, config, sched):
+ self.config = config
+ self.sched = sched
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+ self.gearman = MergeGearmanClient(self)
+ self.gearman.addServer(server, port)
+ self.log.debug("Waiting for gearman")
+ self.gearman.waitForServer()
+ self.build_sets = {}
+
+ def stop(self):
+ self.gearman.shutdown()
+
+ def areMergesOutstanding(self):
+ if self.build_sets:
+ return True
+ return False
+
+ def submitJob(self, name, data, build_set):
+ uuid = str(uuid4().hex)
+ self.log.debug("Submitting job %s with data %s" % (name, data))
+ job = gear.Job(name,
+ json.dumps(data),
+ unique=uuid)
+ self.build_sets[uuid] = build_set
+ self.gearman.submitJob(job)
+
+ def mergeChanges(self, items, build_set):
+ data = dict(items=items)
+ self.submitJob('merger:merge', data, build_set)
+
+ def updateRepo(self, project, url, build_set):
+ data = dict(project=project,
+ url=url)
+ self.submitJob('merger:update', data, build_set)
+
+ def onBuildCompleted(self, job):
+ build_set = self.build_sets.get(job.unique)
+ if build_set:
+ data = getJobData(job)
+ zuul_url = data.get('zuul_url')
+ merged = data.get('merged', False)
+ updated = data.get('updated', False)
+ commit = data.get('commit')
+ self.log.info("Merge %s complete, merged: %s, updated: %s, "
+ "commit: %s" %
+ (job, merged, updated, build_set.commit))
+ self.sched.onMergeCompleted(build_set, zuul_url,
+ merged, updated, commit)
+ # The test suite expects the build_set to be removed from
+ # the internal dict after the wake flag is set.
+ del self.build_sets[job.unique]
+ else:
+ self.log.error("Unable to find build set for uuid %s" % job.unique)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
new file mode 100644
index 0000000..10ce82c
--- /dev/null
+++ b/zuul/merger/merger.py
@@ -0,0 +1,288 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import git
+import os
+import logging
+
+import zuul.model
+
+
+class ZuulReference(git.Reference):
+ _common_path_default = "refs/zuul"
+ _points_to_commits_only = True
+
+
+class Repo(object):
+ log = logging.getLogger("zuul.Repo")
+
+ def __init__(self, remote, local, email, username):
+ self.remote_url = remote
+ self.local_path = local
+ self.email = email
+ self.username = username
+ self._initialized = False
+ try:
+ self._ensure_cloned()
+ except:
+ self.log.exception("Unable to initialize repo for %s" % remote)
+
+ def _ensure_cloned(self):
+ repo_is_cloned = os.path.exists(self.local_path)
+ if self._initialized and repo_is_cloned:
+ return
+ # If the repo does not exist, clone the repo.
+ if not repo_is_cloned:
+ self.log.debug("Cloning from %s to %s" % (self.remote_url,
+ self.local_path))
+ git.Repo.clone_from(self.remote_url, self.local_path)
+ repo = git.Repo(self.local_path)
+ if self.email:
+ repo.config_writer().set_value('user', 'email',
+ self.email)
+ if self.username:
+ repo.config_writer().set_value('user', 'name',
+ self.username)
+ repo.config_writer().write()
+ self._initialized = True
+
+ def createRepoObject(self):
+ try:
+ self._ensure_cloned()
+ repo = git.Repo(self.local_path)
+ except:
+ self.log.exception("Unable to initialize repo for %s" %
+ self.local_path)
+ return repo
+
+ def reset(self):
+ repo = self.createRepoObject()
+ self.log.debug("Resetting repository %s" % self.local_path)
+ self.update()
+ origin = repo.remotes.origin
+ for ref in origin.refs:
+ if ref.remote_head == 'HEAD':
+ continue
+ repo.create_head(ref.remote_head, ref, force=True)
+
+ # Reset to remote HEAD (usually origin/master)
+ repo.head.reference = origin.refs['HEAD']
+ repo.head.reset(index=True, working_tree=True)
+ repo.git.clean('-x', '-f', '-d')
+
+ def getBranchHead(self, branch):
+ repo = self.createRepoObject()
+ branch_head = repo.heads[branch]
+ return branch_head.commit
+
+ def getCommitFromRef(self, refname):
+ repo = self.createRepoObject()
+ if not refname in repo.refs:
+ return None
+ ref = repo.refs[refname]
+ return ref.commit
+
+ def checkout(self, ref):
+ repo = self.createRepoObject()
+ self.log.debug("Checking out %s" % ref)
+ repo.head.reference = ref
+ repo.head.reset(index=True, working_tree=True)
+
+ def cherryPick(self, ref):
+ repo = self.createRepoObject()
+ self.log.debug("Cherry-picking %s" % ref)
+ self.fetch(ref)
+ repo.git.cherry_pick("FETCH_HEAD")
+ return repo.head.commit
+
+ def merge(self, ref, strategy=None):
+ repo = self.createRepoObject()
+ args = []
+ if strategy:
+ args += ['-s', strategy]
+ args.append('FETCH_HEAD')
+ self.fetch(ref)
+ self.log.debug("Merging %s with args %s" % (ref, args))
+ repo.git.merge(*args)
+ return repo.head.commit
+
+ def fetch(self, ref):
+ repo = self.createRepoObject()
+ # The git.remote.fetch method may read in git progress info and
+ # interpret it improperly causing an AssertionError. Because the
+ # data was fetched properly subsequent fetches don't seem to fail.
+ # So try again if an AssertionError is caught.
+ origin = repo.remotes.origin
+ try:
+ origin.fetch(ref)
+ except AssertionError:
+ origin.fetch(ref)
+
+ def createZuulRef(self, ref, commit='HEAD'):
+ repo = self.createRepoObject()
+ self.log.debug("CreateZuulRef %s at %s " % (ref, commit))
+ ref = ZuulReference.create(repo, ref, commit)
+ return ref.commit
+
+ def push(self, local, remote):
+ repo = self.createRepoObject()
+ self.log.debug("Pushing %s:%s to %s " % (local, remote,
+ self.remote_url))
+ repo.remotes.origin.push('%s:%s' % (local, remote))
+
+ def update(self):
+ repo = self.createRepoObject()
+ self.log.debug("Updating repository %s" % self.local_path)
+ origin = repo.remotes.origin
+ origin.update()
+
+
+class Merger(object):
+ log = logging.getLogger("zuul.Merger")
+
+ def __init__(self, working_root, sshkey, email, username):
+ self.repos = {}
+ self.working_root = working_root
+ if not os.path.exists(working_root):
+ os.makedirs(working_root)
+ if sshkey:
+ self._makeSSHWrapper(sshkey)
+ self.email = email
+ self.username = username
+
+ def _makeSSHWrapper(self, key):
+ name = os.path.join(self.working_root, '.ssh_wrapper')
+ fd = open(name, 'w')
+ fd.write('#!/bin/bash\n')
+ fd.write('ssh -i %s $@\n' % key)
+ fd.close()
+ os.chmod(name, 0755)
+ os.environ['GIT_SSH'] = name
+
+ def addProject(self, project, url):
+ repo = None
+ try:
+ path = os.path.join(self.working_root, project)
+ repo = Repo(url, path, self.email, self.username)
+
+ self.repos[project] = repo
+ except Exception:
+ self.log.exception("Unable to add project %s" % project)
+ return repo
+
+ def getRepo(self, project, url):
+ if project in self.repos:
+ return self.repos[project]
+ if not url:
+ raise Exception("Unable to set up repo for project %s"
+ " without a url" % (project,))
+ return self.addProject(project, url)
+
+ def updateRepo(self, project, url):
+ repo = self.getRepo(project, url)
+ try:
+ self.log.info("Updating local repository %s", project)
+ repo.update()
+ except:
+ self.log.exception("Unable to update %s", project)
+
+ def _mergeChange(self, item, ref):
+ repo = self.getRepo(item['project'], item['url'])
+ try:
+ repo.checkout(ref)
+ except Exception:
+ self.log.exception("Unable to checkout %s" % ref)
+ return None
+
+ try:
+ mode = item['merge_mode']
+ if mode == zuul.model.MERGER_MERGE:
+ commit = repo.merge(item['refspec'])
+ elif mode == zuul.model.MERGER_MERGE_RESOLVE:
+ commit = repo.merge(item['refspec'], 'resolve')
+ elif mode == zuul.model.MERGER_CHERRY_PICK:
+ commit = repo.cherryPick(item['refspec'])
+ else:
+ raise Exception("Unsupported merge mode: %s" % mode)
+ except git.GitCommandError:
+ # Log git exceptions at debug level because they are
+ # usually benign merge conflicts
+ self.log.debug("Unable to merge %s" % item, exc_info=True)
+ return None
+ except Exception:
+ self.log.exception("Exception while merging a change:")
+ return None
+
+ return commit
+
+ def _mergeItem(self, item, recent):
+ self.log.debug("Processing refspec %s for project %s / %s ref %s" %
+ (item['refspec'], item['project'], item['branch'],
+ item['ref']))
+ repo = self.getRepo(item['project'], item['url'])
+ key = (item['project'], item['branch'])
+ # See if we have a commit for this change already in this repo
+ zuul_ref = item['branch'] + '/' + item['ref']
+ commit = repo.getCommitFromRef(zuul_ref)
+ if commit:
+ self.log.debug("Found commit %s for ref %s" % (commit, zuul_ref))
+ # Store this as the most recent commit for this
+ # project-branch
+ recent[key] = commit
+ return commit
+ self.log.debug("Unable to find commit for ref %s" % (zuul_ref,))
+ # We need to merge the change
+ # Get the most recent commit for this project-branch
+ base = recent.get(key)
+ if not base:
+ # There is none, so use the branch tip
+ # we need to reset here in order to call getBranchHead
+ self.log.debug("No base commit found for %s" % (key,))
+ try:
+ repo.reset()
+ except Exception:
+ self.log.exception("Unable to reset repo %s" % repo)
+ return None
+ base = repo.getBranchHead(item['branch'])
+ else:
+ self.log.debug("Found base commit %s for %s" % (base, key,))
+ # Merge the change
+ commit = self._mergeChange(item, base)
+ if not commit:
+ return None
+ # Store this commit as the most recent for this project-branch
+ recent[key] = commit
+ # Set the Zuul ref for this item to point to the most recent
+ # commits of each project-branch
+ for key, commit in recent.items():
+ project, branch = key
+ try:
+ repo = self.getRepo(project, None)
+ zuul_ref = branch + '/' + item['ref']
+ repo.createZuulRef(zuul_ref, commit)
+ except Exception:
+ self.log.exception("Unable to set zuul ref %s for "
+ "item %s" % (zuul_ref, item))
+ return None
+ return commit
+
+ def mergeChanges(self, items):
+ recent = {}
+ commit = None
+ for item in items:
+ commit = self._mergeItem(item, recent)
+ if not commit:
+ return None
+ return commit.hexsha
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
new file mode 100644
index 0000000..d8bc1b8
--- /dev/null
+++ b/zuul/merger/server.py
@@ -0,0 +1,118 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import merger
+
+
+class MergeServer(object):
+ log = logging.getLogger("zuul.MergeServer")
+
+ def __init__(self, config):
+ self.config = config
+ self.zuul_url = config.get('merger', 'zuul_url')
+
+ if self.config.has_option('merger', 'git_dir'):
+ merge_root = self.config.get('merger', 'git_dir')
+ else:
+ merge_root = '/var/lib/zuul/git'
+
+ if self.config.has_option('merger', 'git_user_email'):
+ merge_email = self.config.get('merger', 'git_user_email')
+ else:
+ merge_email = None
+
+ if self.config.has_option('merger', 'git_user_name'):
+ merge_name = self.config.get('merger', 'git_user_name')
+ else:
+ merge_name = None
+
+ if self.config.has_option('gerrit', 'sshkey'):
+ sshkey = self.config.get('gerrit', 'sshkey')
+ else:
+ sshkey = None
+
+ self.merger = merger.Merger(merge_root, sshkey,
+ merge_email, merge_name)
+
+ def start(self):
+ self._running = True
+ server = self.config.get('gearman', 'server')
+ if self.config.has_option('gearman', 'port'):
+ port = self.config.get('gearman', 'port')
+ else:
+ port = 4730
+ self.worker = gear.Worker('Zuul Merger')
+ self.worker.addServer(server, port)
+ self.log.debug("Waiting for server")
+ self.worker.waitForServer()
+ self.log.debug("Registering")
+ self.register()
+ self.log.debug("Starting worker")
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def register(self):
+ self.worker.registerFunction("merger:merge")
+ self.worker.registerFunction("merger:update")
+
+ def stop(self):
+ self.log.debug("Stopping")
+ self._running = False
+ self.worker.shutdown()
+ self.log.debug("Stopped")
+
+ def join(self):
+ self.thread.join()
+
+ def run(self):
+ self.log.debug("Starting merge listener")
+ while self._running:
+ try:
+ job = self.worker.getJob()
+ try:
+ if job.name == 'merger:merge':
+ self.merge(job)
+ elif job.name == 'merger:update':
+ self.update(job)
+ else:
+ self.log.error("Unable to handle job %s" % job.name)
+ job.sendWorkFail()
+ except Exception:
+ self.log.exception("Exception while running job")
+ job.sendWorkException(traceback.format_exc())
+ except Exception:
+ self.log.exception("Exception while getting job")
+
+ def merge(self, job):
+ args = json.loads(job.arguments)
+ commit = self.merger.mergeChanges(args['items'])
+ result = dict(merged=(commit is not None),
+ commit=commit,
+ zuul_url=self.zuul_url)
+ job.sendWorkComplete(json.dumps(result))
+
+ def update(self, job):
+ args = json.loads(job.arguments)
+ self.merger.updateRepo(args['project'], args['url'])
+ result = dict(updated=True,
+ zuul_url=self.zuul_url)
+ job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index 5da9cef..22475e6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -627,12 +627,44 @@
self.canceled = False
self.retry = False
self.parameters = {}
+ self.worker = Worker()
def __repr__(self):
- return '<Build %s of %s>' % (self.uuid, self.job.name)
+ return ('<Build %s of %s on %s>' %
+ (self.uuid, self.job.name, self.worker))
+
+
+class Worker(object):
+ """A model of the worker running a job"""
+ def __init__(self):
+ self.name = "Unknown"
+ self.hostname = None
+ self.ips = []
+ self.fqdn = None
+ self.program = None
+ self.version = None
+ self.extra = {}
+
+ def updateFromData(self, data):
+ """Update worker information if contained in the WORK_DATA response."""
+ self.name = data.get('worker_name', self.name)
+ self.hostname = data.get('worker_hostname', self.hostname)
+ self.ips = data.get('worker_ips', self.ips)
+ self.fqdn = data.get('worker_fqdn', self.fqdn)
+ self.program = data.get('worker_program', self.program)
+ self.version = data.get('worker_version', self.version)
+ self.extra = data.get('worker_extra', self.extra)
+
+ def __repr__(self):
+ return '<Worker %s>' % self.name
class BuildSet(object):
+ # Merge states:
+ NEW = 1
+ PENDING = 2
+ COMPLETE = 3
+
def __init__(self, item):
self.item = item
self.other_changes = []
@@ -642,9 +674,11 @@
self.previous_build_set = None
self.ref = None
self.commit = None
+ self.zuul_url = None
self.unable_to_merge = False
self.unable_to_merge_message = None
self.failing_reasons = []
+ self.merge_state = self.NEW
def setConfiguration(self):
# The change isn't enqueued until after it's created
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 73034c7..815da8c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,7 +30,6 @@
import layoutvalidator
import model
from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
-import merger
from zuul import version as zuul_version
statsd = extras.try_import('statsd.statsd')
@@ -109,6 +108,51 @@
self.change_ids = change_ids
+class ResultEvent(object):
+ """An event that needs to modify the pipeline state due to a
+ result from an external system."""
+
+ pass
+
+
+class BuildStartedEvent(ResultEvent):
+ """A build has started.
+
+ :arg Build build: The build which has started.
+ """
+
+ def __init__(self, build):
+ self.build = build
+
+
+class BuildCompletedEvent(ResultEvent):
+ """A build has completed
+
+ :arg Build build: The build which has completed.
+ """
+
+ def __init__(self, build):
+ self.build = build
+
+
+class MergeCompletedEvent(ResultEvent):
+ """A remote merge operation has completed
+
+ :arg BuildSet build_set: The build_set which is ready.
+ :arg str zuul_url: The URL of the Zuul Merger.
+ :arg bool merged: Whether the merge succeeded (changes with refs).
+ :arg bool updated: Whether the repo was updated (changes without refs).
+ :arg str commit: The SHA of the merged commit (changes with refs).
+ """
+
+ def __init__(self, build_set, zuul_url, merged, updated, commit):
+ self.build_set = build_set
+ self.zuul_url = zuul_url
+ self.merged = merged
+ self.updated = updated
+ self.commit = commit
+
+
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
@@ -117,10 +161,12 @@
self.daemon = True
self.wake_event = threading.Event()
self.layout_lock = threading.Lock()
+ self.run_handler_lock = threading.Lock()
self._pause = False
self._exit = False
self._stopped = False
self.launcher = None
+ self.merger = None
self.triggers = dict()
self.reporters = dict()
self.config = None
@@ -351,44 +397,12 @@
return layout
- def _setupMerger(self):
- if self.config.has_option('zuul', 'git_dir'):
- merge_root = self.config.get('zuul', 'git_dir')
- else:
- merge_root = '/var/lib/zuul/git'
-
- if self.config.has_option('zuul', 'git_user_email'):
- merge_email = self.config.get('zuul', 'git_user_email')
- else:
- merge_email = None
-
- if self.config.has_option('zuul', 'git_user_name'):
- merge_name = self.config.get('zuul', 'git_user_name')
- else:
- merge_name = None
-
- replicate_urls = []
- if self.config.has_section('replication'):
- for k, v in self.config.items('replication'):
- replicate_urls.append(v)
-
- if self.config.has_option('gerrit', 'sshkey'):
- sshkey = self.config.get('gerrit', 'sshkey')
- else:
- sshkey = None
-
- # TODO: The merger should have an upstream repo independent of
- # triggers, and then each trigger should provide a fetch
- # location.
- self.merger = merger.Merger(merge_root, sshkey, merge_email,
- merge_name, replicate_urls)
- for project in self.layout.projects.values():
- url = self.triggers['gerrit'].getGitUrl(project)
- self.merger.addProject(project, url)
-
def setLauncher(self, launcher):
self.launcher = launcher
+ def setMerger(self, merger):
+ self.merger = merger
+
def registerTrigger(self, trigger, name=None):
if name is None:
name = trigger.name
@@ -422,7 +436,8 @@
def onBuildStarted(self, build):
self.log.debug("Adding start event for build: %s" % build)
build.start_time = time.time()
- self.result_event_queue.put(('started', build))
+ event = BuildStartedEvent(build)
+ self.result_event_queue.put(event)
self.wake_event.set()
self.log.debug("Done adding start event for build: %s" % build)
@@ -442,10 +457,19 @@
statsd.incr(key)
except:
self.log.exception("Exception reporting runtime stats")
- self.result_event_queue.put(('completed', build))
+ event = BuildCompletedEvent(build)
+ self.result_event_queue.put(event)
self.wake_event.set()
self.log.debug("Done adding complete event for build: %s" % build)
+ def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
+ self.log.debug("Adding merge complete event for build set: %s" %
+ build_set)
+ event = MergeCompletedEvent(build_set, zuul_url,
+ merged, updated, commit)
+ self.result_event_queue.put(event)
+ self.wake_event.set()
+
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
event = ReconfigureEvent(config)
@@ -541,9 +565,9 @@
self.log.warning("No old pipeline matching %s found "
"when reconfiguring" % name)
continue
- self.log.debug("Re-enqueueing changes for pipeline %s" %
- name)
+ self.log.debug("Re-enqueueing changes for pipeline %s" % name)
items_to_remove = []
+ builds_to_remove = []
for shared_queue in old_pipeline.queues:
for item in shared_queue.queue:
item.item_ahead = None
@@ -558,21 +582,27 @@
items_to_remove.append(item)
continue
item.change.project = project
+ for build in item.current_build_set.getBuilds():
+ job = layout.jobs.get(build.job.name)
+ if job:
+ build.job = job
+ else:
+ builds_to_remove.append(build)
if not new_pipeline.manager.reEnqueueItem(item):
items_to_remove.append(item)
- builds_to_remove = []
- for build, item in old_pipeline.manager.building_jobs.items():
- if item in items_to_remove:
+ for item in items_to_remove:
+ for build in item.current_build_set.getBuilds():
builds_to_remove.append(build)
- self.log.warning("Deleting running build %s for "
- "change %s while reenqueueing" % (
- build, item.change))
for build in builds_to_remove:
- del old_pipeline.manager.building_jobs[build]
- new_pipeline.manager.building_jobs = \
- old_pipeline.manager.building_jobs
+ self.log.warning(
+ "Canceling build %s during reconfiguration" % (build,))
+ try:
+ self.launcher.cancel(build)
+ except Exception:
+ self.log.exception(
+ "Exception while canceling build %s "
+ "for change %s" % (build, item.change))
self.layout = layout
- self._setupMerger()
for trigger in self.triggers.values():
trigger.postConfig()
if statsd:
@@ -625,16 +655,19 @@
item.change,
enqueue_time=item.enqueue_time,
quiet=True)
- while pipeline.manager.processQueue():
- pass
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
+ if self.merger.areMergesOutstanding():
+ waiting = True
for pipeline in self.layout.pipelines.values():
- for build in pipeline.manager.building_jobs.keys():
- self.log.debug("%s waiting on %s" % (pipeline.manager, build))
- waiting = True
+ for item in pipeline.getAllItems():
+ for build in item.current_build_set.getBuilds():
+ if build.result is None:
+ self.log.debug("%s waiting on %s" %
+ (pipeline.manager, build))
+ waiting = True
if not waiting:
self.log.debug("All builds are complete")
return True
@@ -652,37 +685,40 @@
self.wake_event.wait()
self.wake_event.clear()
if self._stopped:
+ self.log.debug("Run handler stopping")
return
self.log.debug("Run handler awake")
+ self.run_handler_lock.acquire()
try:
- if not self.management_event_queue.empty():
+ while not self.management_event_queue.empty():
self.process_management_queue()
# Give result events priority -- they let us stop builds,
# whereas trigger evensts cause us to launch builds.
- if not self.result_event_queue.empty():
+ while not self.result_event_queue.empty():
self.process_result_queue()
- elif not self._pause:
- if not self.trigger_event_queue.empty():
+
+ if not self._pause:
+ while not self.trigger_event_queue.empty():
self.process_event_queue()
if self._pause and self._areAllBuildsComplete():
self._doPauseEvent()
- if not self._pause:
- if not (self.trigger_event_queue.empty() and
- self.result_event_queue.empty()):
- self.wake_event.set()
- else:
- if not self.result_event_queue.empty():
- self.wake_event.set()
+ for pipeline in self.layout.pipelines.values():
+ while pipeline.manager.processQueue():
+ pass
if self._maintain_trigger_cache:
self.maintainTriggerCache()
self._maintain_trigger_cache = False
- except:
+ except Exception:
self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
def maintainTriggerCache(self):
relevant = set()
@@ -700,34 +736,23 @@
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event)
- project = self.layout.projects.get(event.project_name)
- if not project:
- self.log.warning("Project %s not found" % event.project_name)
+ try:
+ project = self.layout.projects.get(event.project_name)
+ if not project:
+ self.log.warning("Project %s not found" % event.project_name)
+ return
+
+ for pipeline in self.layout.pipelines.values():
+ change = event.getChange(project,
+ self.triggers.get(event.trigger_name))
+ if event.type == 'patchset-created':
+ pipeline.manager.removeOldVersionsOfChange(change)
+ if pipeline.manager.eventMatches(event, change):
+ self.log.info("Adding %s, %s to %s" %
+ (project, change, pipeline))
+ pipeline.manager.addChange(change)
+ finally:
self.trigger_event_queue.task_done()
- return
-
- # Preprocessing for ref-update events
- if event.ref:
- # Make sure the local git repo is up-to-date with the remote one.
- # We better have the new ref before enqueuing the changes.
- # This is done before enqueuing the changes to avoid calling an
- # update per pipeline accepting the change.
- self.log.info("Fetching references for %s" % project)
- self.merger.updateRepo(project)
-
- for pipeline in self.layout.pipelines.values():
- change = event.getChange(project,
- self.triggers.get(event.trigger_name))
- if event.type == 'patchset-created':
- pipeline.manager.removeOldVersionsOfChange(change)
- if pipeline.manager.eventMatches(event, change):
- self.log.info("Adding %s, %s to %s" %
- (project, change, pipeline))
- pipeline.manager.addChange(change)
- while pipeline.manager.processQueue():
- pass
-
- self.trigger_event_queue.task_done()
def process_management_queue(self):
self.log.debug("Fetching management event")
@@ -747,19 +772,57 @@
def process_result_queue(self):
self.log.debug("Fetching result event")
- event_type, build = self.result_event_queue.get()
- self.log.debug("Processing result event %s" % build)
- for pipeline in self.layout.pipelines.values():
- if event_type == 'started':
- if pipeline.manager.onBuildStarted(build):
- self.result_event_queue.task_done()
- return
- elif event_type == 'completed':
- if pipeline.manager.onBuildCompleted(build):
- self.result_event_queue.task_done()
- return
- self.log.warning("Build %s not found by any queue manager" % (build))
- self.result_event_queue.task_done()
+ event = self.result_event_queue.get()
+ self.log.debug("Processing result event %s" % event)
+ try:
+ if isinstance(event, BuildStartedEvent):
+ self._doBuildStartedEvent(event)
+ elif isinstance(event, BuildCompletedEvent):
+ self._doBuildCompletedEvent(event)
+ elif isinstance(event, MergeCompletedEvent):
+ self._doMergeCompletedEvent(event)
+ else:
+ self.log.error("Unable to handle event %s" % event)
+ finally:
+ self.result_event_queue.task_done()
+
+ def _doBuildStartedEvent(self, event):
+ build = event.build
+ if build.build_set is not build.build_set.item.current_build_set:
+ self.log.warning("Build %s is not in the current build set" %
+ (build,))
+ return
+ pipeline = build.build_set.item.pipeline
+ if not pipeline:
+ self.log.warning("Build %s is not associated with a pipeline" %
+ (build,))
+ return
+ pipeline.manager.onBuildStarted(event.build)
+
+ def _doBuildCompletedEvent(self, event):
+ build = event.build
+ if build.build_set is not build.build_set.item.current_build_set:
+ self.log.warning("Build %s is not in the current build set" %
+ (build,))
+ return
+ pipeline = build.build_set.item.pipeline
+ if not pipeline:
+ self.log.warning("Build %s is not associated with a pipeline" %
+ (build,))
+ return
+ pipeline.manager.onBuildCompleted(event.build)
+
+ def _doMergeCompletedEvent(self, event):
+ build_set = event.build_set
+ if build_set is not build_set.item.current_build_set:
+ self.log.warning("Build set %s is not current" % (build_set,))
+ return
+ pipeline = build_set.item.pipeline
+ if not pipeline:
+ self.log.warning("Build set %s is not associated with a pipeline" %
+ (build_set,))
+ return
+ pipeline.manager.onMergeCompleted(event)
def formatStatusHTML(self):
ret = '<html><pre>'
@@ -822,7 +885,6 @@
def __init__(self, sched, pipeline):
self.sched = sched
self.pipeline = pipeline
- self.building_jobs = {}
self.event_filters = []
if self.sched.config and self.sched.config.has_option(
'zuul', 'report_times'):
@@ -1045,26 +1107,41 @@
self.dequeueItem(item)
self.reportStats(item)
+ def _makeMergerItem(self, item):
+ # Create a dictionary with all info about the item needed by
+ # the merger.
+ return dict(project=item.change.project.name,
+ url=self.pipeline.trigger.getGitUrl(
+ item.change.project),
+ merge_mode=item.change.project.merge_mode,
+ refspec=item.change.refspec,
+ branch=item.change.branch,
+ ref=item.current_build_set.ref,
+ )
+
def prepareRef(self, item):
- # Returns False on success.
- # Returns True if we were unable to prepare the ref.
- ref = item.current_build_set.ref
+ # Returns True if the ref is ready, false otherwise
+ build_set = item.current_build_set
+ if build_set.merge_state == build_set.COMPLETE:
+ return True
+ if build_set.merge_state == build_set.PENDING:
+ return False
+ build_set.merge_state = build_set.PENDING
+ ref = build_set.ref
if hasattr(item.change, 'refspec') and not ref:
self.log.debug("Preparing ref for: %s" % item.change)
item.current_build_set.setConfiguration()
- ref = item.current_build_set.ref
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
- commit = self.sched.merger.mergeChanges(all_items, ref)
- item.current_build_set.commit = commit
- if not commit:
- self.log.info("Unable to merge change %s" % item.change)
- msg = ("This change was unable to be automatically merged "
- "with the current state of the repository. Please "
- "rebase your change and upload a new patchset.")
- self.pipeline.setUnableToMerge(item, msg)
- return True
+ merger_items = map(self._makeMergerItem, all_items)
+ self.sched.merger.mergeChanges(merger_items,
+ item.current_build_set)
+ else:
+ self.log.debug("Preparing update repo for: %s" % item.change)
+ url = self.pipeline.trigger.getGitUrl(item.change.project)
+ self.sched.merger.updateRepo(item.change.project.name,
+ url, build_set)
return False
def _launchJobs(self, item, jobs):
@@ -1076,7 +1153,6 @@
build = self.sched.launcher.launch(job, item,
self.pipeline,
dependent_items)
- self.building_jobs[build] = item
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
@@ -1092,24 +1168,17 @@
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
- to_remove = []
+ old_build_set = item.current_build_set
if prime and item.current_build_set.ref:
item.resetAllBuilds()
- for build, build_item in self.building_jobs.items():
- if build_item == item:
- self.log.debug("Found build %s for change %s to cancel" %
- (build, item.change))
- try:
- self.sched.launcher.cancel(build)
- except:
- self.log.exception("Exception while canceling build %s "
- "for change %s" % (build, item.change))
- to_remove.append(build)
- canceled = True
- for build in to_remove:
- self.log.debug("Removing build %s from running builds" % build)
+ for build in old_build_set.getBuilds():
+ try:
+ self.sched.launcher.cancel(build)
+ except:
+ self.log.exception("Exception while canceling build %s "
+ "for change %s" % (build, item.change))
build.result = 'CANCELED'
- del self.building_jobs[build]
+ canceled = True
for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
(item_behind.change, item.change))
@@ -1117,7 +1186,7 @@
canceled = True
return canceled
- def _processOneItem(self, item, nnfi):
+ def _processOneItem(self, item, nnfi, ready_ahead):
changed = False
item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project)
@@ -1134,10 +1203,11 @@
self.reportItem(item)
except MergeFailure:
pass
- return (True, nnfi)
+ return (True, nnfi, ready_ahead)
dep_item = self.getFailingDependentItem(item)
actionable = change_queue.isActionable(item)
item.active = actionable
+ ready = False
if dep_item:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
@@ -1157,10 +1227,13 @@
changed = True
self.cancelJobs(item)
if actionable:
- self.prepareRef(item)
+ ready = self.prepareRef(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
- if actionable and self.launchJobs(item):
+ ready = False
+ if not ready:
+ ready_ahead = False
+ if actionable and ready_ahead and self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
@@ -1182,7 +1255,7 @@
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
- return (changed, nnfi)
+ return (changed, nnfi, ready_ahead)
def processQueue(self):
# Do whatever needs to be done for each change in the queue
@@ -1191,8 +1264,10 @@
for queue in self.pipeline.queues:
queue_changed = False
nnfi = None # Nearest non-failing item
+ ready_ahead = True # All build sets ahead are ready
for item in queue.queue[:]:
- item_changed, nnfi = self._processOneItem(item, nnfi)
+ item_changed, nnfi, ready_ahhead = self._processOneItem(
+ item, nnfi, ready_ahead)
if item_changed:
queue_changed = True
self.reportStats(item)
@@ -1219,38 +1294,36 @@
self.sched.launcher.setBuildDescription(build, desc)
def onBuildStarted(self, build):
- if build not in self.building_jobs:
- # Or triggered externally, or triggered before zuul started,
- # or restarted
- return False
-
self.log.debug("Build %s started" % build)
self.updateBuildDescriptions(build.build_set)
- while self.processQueue():
- pass
return True
def onBuildCompleted(self, build):
- if build not in self.building_jobs:
- # Or triggered externally, or triggered before zuul started,
- # or restarted
- return False
-
self.log.debug("Build %s completed" % build)
- change = self.building_jobs[build]
- self.log.debug("Found change %s which triggered completed build %s" %
- (change, build))
+ item = build.build_set.item
- del self.building_jobs[build]
-
- self.pipeline.setResult(change, build)
- self.log.debug("Change %s status is now:\n %s" %
- (change, self.pipeline.formatStatus(change)))
+ self.pipeline.setResult(item, build)
+ self.log.debug("Item %s status is now:\n %s" %
+ (item, self.pipeline.formatStatus(item)))
self.updateBuildDescriptions(build.build_set)
- while self.processQueue():
- pass
return True
+ def onMergeCompleted(self, event):
+ build_set = event.build_set
+ item = build_set.item
+ build_set.merge_state = build_set.COMPLETE
+ build_set.zuul_url = event.zuul_url
+ if event.merged:
+ build_set.commit = event.commit
+ elif event.updated:
+ build_set.commit = item.change.newrev
+ if not build_set.commit:
+ self.log.info("Unable to merge change %s" % item.change)
+ msg = ("This change was unable to be automatically merged "
+ "with the current state of the repository. Please "
+ "rebase your change and upload a new patchset.")
+ self.pipeline.setUnableToMerge(item, msg)
+
def reportItem(self, item):
if item.reported:
raise Exception("Already reported change %s" % item.change)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index f055a50..904fa7a 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -86,7 +86,8 @@
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
- pass
+ # For the moment, the timer trigger requires gerrit.
+ return self.sched.triggers['gerrit'].getGitUrl(project)
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)