Merge "Only report to gerrit if the action is from gerrit" into feature/zuulv3
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 56cc6a8..a78a0fa 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -108,6 +108,10 @@
commands.
``state_dir=/var/lib/zuul``
+**jobroot_dir**
+ Path to directory that Zuul should store temporary job files.
+ ``jobroot_dir=/tmp``
+
**report_times**
Boolean value (``true`` or ``false``) that determines if Zuul should
include elapsed times for each job in the textual report. Used by
@@ -165,6 +169,29 @@
Path to PID lock file for the merger process.
``pidfile=/var/run/zuul-merger/merger.pid``
+executor
+""""""""
+
+The zuul-executor process configuration.
+
+**git_dir**
+ Directory that Zuul should clone local git repositories to.
+ ``git_dir=/var/lib/zuul/git``
+
+**log_config**
+ Path to log config file for the executor process.
+ ``log_config=/etc/zuul/logging.yaml``
+
+**private_key_file**
+ SSH private key file to be used when logging into worker nodes.
+ ``private_key_file=~/.ssh/id_rsa``
+
+**user**
+ User ID for the zuul-executor process. In normal operation as a daemon,
+ the executor should be started as the ``root`` user, but it will drop
+ privileges to this user during startup.
+ ``user=zuul``
+
.. _connection:
connection ArbitraryName
diff --git a/requirements.txt b/requirements.txt
index e4f0212..746bbcb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,3 +23,6 @@
sqlalchemy
alembic
cryptography>=1.6
+cachecontrol
+pyjwt
+iso8601
diff --git a/tests/base.py b/tests/base.py
index 2bcd1ca..5bbf065 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -565,6 +565,7 @@
self.head_sha = None
self.is_merged = False
self.merge_message = None
+ self.state = 'open'
self._createPRRef()
self._addCommitToRepo(files=files)
self._updateTimeStamp()
@@ -921,6 +922,7 @@
'ref': pr.branch,
},
'mergeable': True,
+ 'state': pr.state,
'head': {
'sha': pr.head_sha,
'repo': {
@@ -994,8 +996,12 @@
owner, proj = project.split('/')
for pr in self.pull_requests:
pr_owner, pr_project = pr.project.split('/')
+ # This is somewhat risky, if the same commit exists in multiple
+ # PRs, we might grab the wrong one that doesn't have a status
+ # that is expected to be there. Maybe re-work this so that there
+ # is a global registry of commit statuses like with github.
if (pr_owner == owner and pr_project == proj and
- pr.head_sha == sha):
+ sha in pr.statuses):
return pr.statuses[sha]
def setCommitStatus(self, project, sha, state,
@@ -1286,9 +1292,10 @@
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
- def doMergeChanges(self, items):
+ def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
- commit = super(RecordingAnsibleJob, self).doMergeChanges(items)
+ commit = super(RecordingAnsibleJob, self).doMergeChanges(
+ items, repo_state)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')
return commit
diff --git a/tests/fixtures/layouts/requirements-github.yaml b/tests/fixtures/layouts/requirements-github.yaml
index 5b92b58..9933f27 100644
--- a/tests/fixtures/layouts/requirements-github.yaml
+++ b/tests/fixtures/layouts/requirements-github.yaml
@@ -14,6 +14,19 @@
comment: true
- pipeline:
+ name: trigger_status
+ manager: independent
+ trigger:
+ github:
+ - event: pull_request
+ action: comment
+ comment: 'trigger me'
+ require-status: "zuul:check:success"
+ success:
+ github:
+ comment: true
+
+- pipeline:
name: trigger
manager: independent
trigger:
@@ -126,6 +139,35 @@
github:
comment: true
+- pipeline:
+ name: require_open
+ manager: independent
+ require:
+ github:
+ open: true
+ trigger:
+ github:
+ - event: pull_request
+ action: comment
+ comment: 'test me'
+ success:
+ github:
+ comment: true
+
+- pipeline:
+ name: require_current
+ manager: independent
+ require:
+ github:
+ current-patchset: true
+ trigger:
+ github:
+ - event: pull_request
+ action: changed
+ success:
+ github:
+ comment: true
+
- job:
name: project1-pipeline
- job:
@@ -140,12 +182,19 @@
name: project6-newerthan
- job:
name: project7-olderthan
+- job:
+ name: project8-requireopen
+- job:
+ name: project9-requirecurrent
- project:
name: org/project1
pipeline:
jobs:
- project1-pipeline
+ trigger_status:
+ jobs:
+ - project1-pipeline
- project:
name: org/project2
@@ -182,3 +231,15 @@
older_than:
jobs:
- project7-olderthan
+
+- project:
+ name: org/project8
+ require_open:
+ jobs:
+ - project8-requireopen
+
+- project:
+ name: org/project9
+ require_current:
+ jobs:
+ - project9-requirecurrent
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index c8e7341..227d659 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -260,7 +260,7 @@
check_status = A.statuses[A.head_sha][0]
check_url = ('http://zuul.example.com/status/#%s,%s' %
(A.number, A.head_sha))
- self.assertEqual('check', check_status['context'])
+ self.assertEqual('tenant-one/check', check_status['context'])
self.assertEqual('Standard check', check_status['description'])
self.assertEqual('pending', check_status['state'])
self.assertEqual(check_url, check_status['url'])
@@ -274,7 +274,7 @@
check_status = A.statuses[A.head_sha][0]
check_url = ('http://zuul.example.com/status/#%s,%s' %
(A.number, A.head_sha))
- self.assertEqual('check', check_status['context'])
+ self.assertEqual('tenant-one/check', check_status['context'])
self.assertEqual('success', check_status['state'])
self.assertEqual(check_url, check_status['url'])
self.assertEqual(1, len(A.comments))
@@ -297,7 +297,7 @@
# pipeline reports success status
self.assertEqual(3, len(A.statuses[A.head_sha]))
report_status = A.statuses[A.head_sha][0]
- self.assertEqual('reporting', report_status['context'])
+ self.assertEqual('tenant-one/reporting', report_status['context'])
self.assertEqual('success', report_status['state'])
self.assertEqual(2, len(A.comments))
diff --git a/tests/unit/test_github_requirements.py b/tests/unit/test_github_requirements.py
index 60bcf74..5dd6e80 100644
--- a/tests/unit/test_github_requirements.py
+++ b/tests/unit/test_github_requirements.py
@@ -49,6 +49,30 @@
@simple_layout('layouts/requirements-github.yaml', driver='github')
def test_trigger_require_status(self):
"Test trigger requirement: status"
+ A = self.fake_github.openFakePullRequest('org/project1', 'master', 'A')
+ # A comment event that we will keep submitting to trigger
+ comment = A.getCommentAddedEvent('trigger me')
+ self.fake_github.emitEvent(comment)
+ self.waitUntilSettled()
+ # No status from zuul so should not be enqueued
+ self.assertEqual(len(self.history), 0)
+
+ # An error status should not cause it to be enqueued
+ A.setStatus(A.head_sha, 'error', 'null', 'null', 'check')
+ self.fake_github.emitEvent(comment)
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 0)
+
+ # A success status goes in
+ A.setStatus(A.head_sha, 'success', 'null', 'null', 'check')
+ self.fake_github.emitEvent(comment)
+ self.waitUntilSettled()
+ self.assertEqual(len(self.history), 1)
+ self.assertEqual(self.history[0].name, 'project1-pipeline')
+
+ @simple_layout('layouts/requirements-github.yaml', driver='github')
+ def test_trigger_on_status(self):
+ "Test trigger on: status"
A = self.fake_github.openFakePullRequest('org/project2', 'master', 'A')
# An error status should not cause it to be enqueued
@@ -264,3 +288,41 @@
self.waitUntilSettled()
self.assertEqual(len(self.history), 1)
self.assertEqual(self.history[0].name, 'project7-olderthan')
+
+ @simple_layout('layouts/requirements-github.yaml', driver='github')
+ def test_require_open(self):
+
+ A = self.fake_github.openFakePullRequest('org/project8', 'master', 'A')
+ # A comment event that we will keep submitting to trigger
+ comment = A.getCommentAddedEvent('test me')
+ self.fake_github.emitEvent(comment)
+ self.waitUntilSettled()
+
+ # PR is open, we should have enqueued
+ self.assertEqual(len(self.history), 1)
+
+ # close the PR and try again
+ A.state = 'closed'
+ self.fake_github.emitEvent(comment)
+ self.waitUntilSettled()
+ # PR is closed, should not trigger
+ self.assertEqual(len(self.history), 1)
+
+ @simple_layout('layouts/requirements-github.yaml', driver='github')
+ def test_require_current(self):
+
+ A = self.fake_github.openFakePullRequest('org/project9', 'master', 'A')
+ # A sync event that we will keep submitting to trigger
+ sync = A.getPullRequestSynchronizeEvent()
+ self.fake_github.emitEvent(sync)
+ self.waitUntilSettled()
+
+ # PR head is current should enqueue
+ self.assertEqual(len(self.history), 1)
+
+ # Add a commit to the PR, re-issue the original comment event
+ A.addCommit()
+ self.fake_github.emitEvent(sync)
+ self.waitUntilSettled()
+ # Event hash is not current, should not trigger
+ self.assertEqual(len(self.history), 1)
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
new file mode 100644
index 0000000..3ea5a8e
--- /dev/null
+++ b/tests/unit/test_log_streamer.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import socket
+import tempfile
+
+import zuul.lib.log_streamer
+import tests.base
+
+
+class TestLogStreamer(tests.base.BaseTestCase):
+
+ log = logging.getLogger("zuul.test.cloner")
+
+ def setUp(self):
+ super(TestLogStreamer, self).setUp()
+ self.host = '0.0.0.0'
+
+ def startStreamer(self, port, root=None):
+ if not root:
+ root = tempfile.gettempdir()
+ return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
+
+ def test_start_stop(self):
+ port = 7900
+ streamer = self.startStreamer(port)
+ self.addCleanup(streamer.stop)
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.addCleanup(s.close)
+ self.assertEqual(0, s.connect_ex((self.host, port)))
+ s.close()
+
+ streamer.stop()
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.addCleanup(s.close)
+ self.assertNotEqual(0, s.connect_ex((self.host, port)))
+ s.close()
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index d416369..2624944 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -994,8 +994,9 @@
"Test that delayed check merge conflicts are handled properly"
# Hold jobs in the gearman queue so that we can test whether
- # the executor returns a merge failure after the scheduler has
- # successfully merged.
+ # the executor sucesfully merges a change based on an old
+ # repo state (frozen by the scheduler) which would otherwise
+ # conflict.
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
@@ -1068,9 +1069,12 @@
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
- dict(name='project-merge', result='MERGER_FAILURE', changes='2,1'),
- dict(name='project-merge', result='MERGER_FAILURE',
- changes='2,1 3,1'),
+ dict(name='project-merge', result='SUCCESS', changes='2,1'),
+ dict(name='project-test1', result='SUCCESS', changes='2,1'),
+ dict(name='project-test2', result='SUCCESS', changes='2,1'),
+ dict(name='project-merge', result='SUCCESS', changes='2,1 3,1'),
+ dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+ dict(name='project-test2', result='SUCCESS', changes='2,1 3,1'),
], ordered=False)
def test_post(self):
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index fd95e92..904316c 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -24,14 +24,14 @@
def linesplit(socket):
- buff = socket.recv(4096)
+ buff = socket.recv(4096).decode("utf-8")
buffering = True
while buffering:
if "\n" in buff:
(line, buff) = buff.split("\n", 1)
yield line + "\n"
else:
- more = socket.recv(4096)
+ more = socket.recv(4096).decode("utf-8")
if not more:
buffering = False
else:
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index 1893f5a..ea12b0b 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -24,9 +24,11 @@
import logging
import os
+import pwd
import socket
import sys
import signal
+import tempfile
import zuul.cmd
import zuul.executor.server
@@ -37,6 +39,9 @@
# Similar situation with gear and statsd.
+FINGER_PORT = 79
+
+
class Executor(zuul.cmd.ZuulApp):
def parse_arguments(self):
@@ -72,15 +77,62 @@
self.executor.stop()
self.executor.join()
+ def start_log_streamer(self):
+ pipe_read, pipe_write = os.pipe()
+ child_pid = os.fork()
+ if child_pid == 0:
+ os.close(pipe_write)
+ import zuul.lib.log_streamer
+
+ self.log.info("Starting log streamer")
+ streamer = zuul.lib.log_streamer.LogStreamer(
+ self.user, '0.0.0.0', FINGER_PORT, self.jobroot_dir)
+
+ # Keep running until the parent dies:
+ pipe_read = os.fdopen(pipe_read)
+ pipe_read.read()
+ self.log.info("Stopping log streamer")
+ streamer.stop()
+ os._exit(0)
+ else:
+ os.close(pipe_read)
+ self.log_streamer_pid = child_pid
+
+ def change_privs(self):
+ '''
+ Drop our privileges to the zuul user.
+ '''
+ if os.getuid() != 0:
+ return
+ pw = pwd.getpwnam(self.user)
+ os.setgroups([])
+ os.setgid(pw.pw_gid)
+ os.setuid(pw.pw_uid)
+ os.umask(0o022)
+
def main(self, daemon=True):
# See comment at top of file about zuul imports
- self.setup_logging('executor', 'log_config')
+ if self.config.has_option('executor', 'user'):
+ self.user = self.config.get('executor', 'user')
+ else:
+ self.user = 'zuul'
+ if self.config.has_option('zuul', 'jobroot_dir'):
+ self.jobroot_dir = os.path.expanduser(
+ self.config.get('zuul', 'jobroot_dir'))
+ else:
+ self.jobroot_dir = tempfile.gettempdir()
+
+ self.setup_logging('executor', 'log_config')
self.log = logging.getLogger("zuul.Executor")
+ self.start_log_streamer()
+ self.change_privs()
+
ExecutorServer = zuul.executor.server.ExecutorServer
self.executor = ExecutorServer(self.config, self.connections,
+ jobdir_root=self.jobroot_dir,
keep_jobdir=self.args.keep_jobdir)
self.executor.start()
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 1374e9b..7d03eef 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -519,6 +519,7 @@
'templates': [str],
'merge-mode': vs.Any('merge', 'merge-resolve',
'cherry-pick'),
+ 'default-branch': str,
'_source_context': model.SourceContext,
'_start_mark': yaml.Mark,
}
@@ -554,15 +555,20 @@
configs.extend([layout.project_templates[name]
for name in conf_templates])
configs.append(project_template)
+ # Set the following values to the first one that we find and
+ # ignore subsequent settings.
mode = conf.get('merge-mode')
if mode and project_config.merge_mode is None:
- # Set the merge mode to the first one that we find and
- # ignore subsequent settings.
project_config.merge_mode = model.MERGER_MAP[mode]
+ default_branch = conf.get('default-branch')
+ if default_branch and project_config.default_branch is None:
+ project_config.default_branch = default_branch
if project_config.merge_mode is None:
# If merge mode was not specified in any project stanza,
# set it to the default.
project_config.merge_mode = model.MERGER_MAP['merge-resolve']
+ if project_config.default_branch is None:
+ project_config.default_branch = 'master'
for pipeline in layout.pipelines.values():
project_pipeline = model.ProjectPipelineConfig()
queue_name = None
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index dcbc172..06962e5 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -725,13 +725,13 @@
if stdin_data:
stdin.write(stdin_data)
- out = stdout.read()
+ out = stdout.read().decode('utf-8')
self.log.debug("SSH received stdout:\n%s" % out)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
- err = stderr.read()
+ err = stderr.read().decode('utf-8')
self.log.debug("SSH received stderr:\n%s" % err)
if ret:
raise Exception("Gerrit error executing %s" % command)
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 66ffb3f..27ece54 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -13,11 +13,17 @@
# under the License.
import collections
+import datetime
import logging
import hmac
import hashlib
import time
+import cachecontrol
+from cachecontrol.cache import DictCache
+import iso8601
+import jwt
+import requests
import webob
import webob.dec
import voluptuous as v
@@ -29,6 +35,25 @@
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
+ACCESS_TOKEN_URL = 'https://api.github.com/installations/%s/access_tokens'
+PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json'
+
+
+class UTC(datetime.tzinfo):
+ """UTC"""
+
+ def utcoffset(self, dt):
+ return datetime.timedelta(0)
+
+ def tzname(self, dt):
+ return "UTC"
+
+ def dst(self, dt):
+ return datetime.timedelta(0)
+
+
+utc = UTC()
+
class GithubWebhookListener():
@@ -66,17 +91,38 @@
raise webob.exc.HTTPBadRequest(message)
try:
- event = method(request)
+ json_body = request.json_body
+ except:
+ message = 'Exception deserializing JSON body'
+ self.log.exception(message)
+ raise webob.exc.HTTPBadRequest(message)
+
+ # If there's any installation mapping information in the body then
+ # update the project mapping before any requests are made.
+ installation_id = json_body.get('installation', {}).get('id')
+ project_name = json_body.get('repository', {}).get('full_name')
+
+ if installation_id and project_name:
+ old_id = self.connection.installation_map.get(project_name)
+
+ if old_id and old_id != installation_id:
+ msg = "Unexpected installation_id change for %s. %d -> %d."
+ self.log.warning(msg, project_name, old_id, installation_id)
+
+ self.connection.installation_map[project_name] = installation_id
+
+ try:
+ event = method(json_body)
except:
self.log.exception('Exception when handling event:')
+ event = None
if event:
event.project_hostname = self.connection.canonical_hostname
self.log.debug('Scheduling github event: {0}'.format(event.type))
self.connection.sched.addEvent(event)
- def _event_push(self, request):
- body = request.json_body
+ def _event_push(self, body):
base_repo = body.get('repository')
event = GithubTriggerEvent()
@@ -96,8 +142,7 @@
return event
- def _event_pull_request(self, request):
- body = request.json_body
+ def _event_pull_request(self, body):
action = body.get('action')
pr_body = body.get('pull_request')
@@ -124,9 +169,8 @@
return event
- def _event_issue_comment(self, request):
+ def _event_issue_comment(self, body):
"""Handles pull request comments"""
- body = request.json_body
action = body.get('action')
if action != 'created':
return
@@ -144,9 +188,8 @@
event.action = 'comment'
return event
- def _event_pull_request_review(self, request):
+ def _event_pull_request_review(self, body):
"""Handles pull request reviews"""
- body = request.json_body
pr_body = body.get('pull_request')
if pr_body is None:
return
@@ -162,8 +205,7 @@
event.action = body.get('action')
return event
- def _event_status(self, request):
- body = request.json_body
+ def _event_status(self, body):
action = body.get('action')
if action == 'pending':
return
@@ -277,20 +319,32 @@
driver_name = 'github'
log = logging.getLogger("connection.github")
payload_path = 'payload'
- git_user = 'git'
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
- self.github = None
self._change_cache = {}
self.projects = {}
- self._git_ssh = bool(self.connection_config.get('sshkey', None))
+ self.git_ssh_key = self.connection_config.get('sshkey')
self.git_host = self.connection_config.get('git_host', 'github.com')
self.canonical_hostname = self.connection_config.get(
'canonical_hostname', self.git_host)
self.source = driver.getSource(self)
+ self._github = None
+ self.app_id = None
+ self.app_key = None
+
+ self.installation_map = {}
+ self.installation_token_cache = {}
+
+ # NOTE(jamielennox): Better here would be to cache to memcache or file
+ # or something external - but zuul already sucks at restarting so in
+ # memory probably doesn't make this much worse.
+ self.cache_adapter = cachecontrol.CacheControlAdapter(
+ DictCache(),
+ cache_etags=True)
+
def onLoad(self):
webhook_listener = GithubWebhookListener(self)
self.registerHttpHandler(self.payload_path,
@@ -300,20 +354,107 @@
def onStop(self):
self.unregisterHttpHandler(self.payload_path)
- def _authenticateGithubAPI(self):
- token = self.connection_config.get('api_token', None)
- if token is not None:
- if self.git_host != 'github.com':
- url = 'https://%s/' % self.git_host
- self.github = github3.enterprise_login(token=token, url=url)
- else:
- self.github = github3.login(token=token)
- self.log.info("Github API Authentication successful.")
+ def _createGithubClient(self):
+ if self.git_host != 'github.com':
+ url = 'https://%s/' % self.git_host
+ github = github3.GitHubEnterprise(url)
else:
- self.github = None
- self.log.info(
- "No Github credentials found in zuul configuration, cannot "
- "authenticate.")
+ github = github3.GitHub()
+
+ # anything going through requests to http/s goes through cache
+ github.session.mount('http://', self.cache_adapter)
+ github.session.mount('https://', self.cache_adapter)
+ return github
+
+ def _authenticateGithubAPI(self):
+ config = self.connection_config
+
+ api_token = config.get('api_token')
+
+ app_id = config.get('app_id')
+ app_key = None
+ app_key_file = config.get('app_key')
+
+ self._github = self._createGithubClient()
+
+ if api_token:
+ self._github.login(token=api_token)
+
+ if app_key_file:
+ try:
+ with open(app_key_file, 'r') as f:
+ app_key = f.read()
+ except IOError:
+ m = "Failed to open app key file for reading: %s"
+ self.log.error(m, app_key_file)
+
+ if (app_id or app_key) and \
+ not (app_id and app_key):
+ self.log.warning("You must provide an app_id and "
+ "app_key to use installation based "
+ "authentication")
+
+ return
+
+ if app_id:
+ self.app_id = int(app_id)
+ if app_key:
+ self.app_key = app_key
+
+ def _get_installation_key(self, project, user_id=None):
+ installation_id = self.installation_map.get(project)
+
+ if not installation_id:
+ self.log.error("No installation ID available for project %s",
+ project)
+ return ''
+
+ now = datetime.datetime.now(utc)
+ token, expiry = self.installation_token_cache.get(installation_id,
+ (None, None))
+
+ if ((not expiry) or (not token) or (now >= expiry)):
+ expiry = now + datetime.timedelta(minutes=5)
+
+ data = {'iat': now, 'exp': expiry, 'iss': self.app_id}
+ app_token = jwt.encode(data,
+ self.app_key,
+ algorithm='RS256')
+
+ url = ACCESS_TOKEN_URL % installation_id
+ headers = {'Accept': PREVIEW_JSON_ACCEPT,
+ 'Authorization': 'Bearer %s' % app_token}
+ json_data = {'user_id': user_id} if user_id else None
+
+ response = requests.post(url, headers=headers, json=json_data)
+ response.raise_for_status()
+
+ data = response.json()
+
+ expiry = iso8601.parse_date(data['expires_at'])
+ expiry -= datetime.timedelta(minutes=2)
+ token = data['token']
+
+ self.installation_token_cache[installation_id] = (token, expiry)
+
+ return token
+
+ def getGithubClient(self,
+ project=None,
+ user_id=None,
+ use_app=True):
+ # if you're authenticating for a project and you're an integration then
+ # you need to use the installation specific token. There are some
+ # operations that are not yet supported by integrations so
+ # use_app lets you use api_key auth.
+ if use_app and project and self.app_id:
+ github = self._createGithubClient()
+ github.login(token=self._get_installation_key(project, user_id))
+ return github
+
+ # if we're using api_key authentication then this is already token
+ # authenticated, if not then anonymous is the best we have.
+ return self._github
def maintainCache(self, relevant):
for key, change in self._change_cache.items():
@@ -338,6 +479,10 @@
change.status = self._get_statuses(project, event.patch_number)
change.reviews = self.getPullReviews(project, change.number)
change.source_event = event
+ change.open = self.getPullOpen(project, change.number)
+ change.is_current_patchset = self.getIsCurrent(project,
+ change.number,
+ event.patch_number)
elif event.ref:
change = Ref(project)
change.ref = event.ref
@@ -350,12 +495,16 @@
return change
def getGitUrl(self, project):
- if self._git_ssh:
- url = 'ssh://%s@%s/%s.git' % \
- (self.git_user, self.git_host, project)
- else:
- url = 'https://%s/%s' % (self.git_host, project)
- return url
+ if self.git_ssh_key:
+ return 'ssh://git@%s/%s.git' % (self.git_host, project)
+
+ if self.app_id:
+ installation_key = self._get_installation_key(project)
+ return 'https://x-access-token:%s@%s/%s' % (installation_key,
+ self.git_host,
+ project)
+
+ return 'https://%s/%s' % (self.git_host, project)
def getGitwebUrl(self, project, sha=None):
url = 'https://%s/%s' % (self.git_host, project)
@@ -370,19 +519,21 @@
self.projects[project.name] = project
def getProjectBranches(self, project):
+ github = self.getGithubClient()
owner, proj = project.name.split('/')
- repository = self.github.repository(owner, proj)
+ repository = github.repository(owner, proj)
branches = [branch.name for branch in repository.branches()]
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
return branches
def getPullUrl(self, project, number):
return '%s/pull/%s' % (self.getGitwebUrl(project), number)
def getPull(self, project_name, number):
+ github = self.getGithubClient(project_name)
owner, proj = project_name.split('/')
- pr = self.github.pull_request(owner, proj, number).as_dict()
- log_rate_limit(self.log, self.github)
+ pr = github.pull_request(owner, proj, number).as_dict()
+ log_rate_limit(self.log, github)
return pr
def canMerge(self, change, allow_needs):
@@ -401,20 +552,22 @@
def getPullBySha(self, sha):
query = '%s type:pr is:open' % sha
pulls = []
- for issue in self.github.search_issues(query=query):
- pr_url = issue.pull_request.get('url')
+ github = self.getGithubClient()
+ for issue in github.search_issues(query=query):
+ pr_url = issue.issue.pull_request().as_dict().get('url')
if not pr_url:
continue
# the issue provides no good description of the project :\
owner, project, _, number = pr_url.split('/')[4:]
- pr = self.github.pull_request(owner, project, number)
+ github = self.getGithubClient("%s/%s" % (owner, project))
+ pr = github.pull_request(owner, project, number)
if pr.head.sha != sha:
continue
if pr.as_dict() in pulls:
continue
pulls.append(pr.as_dict())
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
if len(pulls) > 1:
raise Exception('Multiple pulls found with head sha %s' % sha)
@@ -423,10 +576,11 @@
return pulls.pop()
def getPullFileNames(self, project, number):
+ github = self.getGithubClient(project)
owner, proj = project.name.split('/')
filenames = [f.filename for f in
- self.github.pull_request(owner, proj, number).files()]
- log_rate_limit(self.log, self.github)
+ github.pull_request(owner, proj, number).files()]
+ log_rate_limit(self.log, github)
return filenames
def getPullReviews(self, project, number):
@@ -471,33 +625,38 @@
def _getPullReviews(self, owner, project, number):
# make a list out of the reviews so that we complete our
# API transaction
+ # reviews are not yet supported by integrations, use api_key:
+ # https://platform.github.community/t/api-endpoint-for-pr-reviews/409
+ github = self.getGithubClient("%s/%s" % (owner, project),
+ use_app=False)
reviews = [review.as_dict() for review in
- self.github.pull_request(owner, project, number).reviews()]
+ github.pull_request(owner, project, number).reviews()]
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
return reviews
def getUser(self, login):
- return GithubUser(self.github, login)
+ return GithubUser(self.getGithubClient(), login)
def getUserUri(self, login):
return 'https://%s/%s' % (self.git_host, login)
def getRepoPermission(self, project, login):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
# This gets around a missing API call
# need preview header
headers = {'Accept': 'application/vnd.github.korra-preview'}
# Create a repo object
- repository = self.github.repository(owner, proj)
+ repository = github.repository(owner, project)
# Build up a URL
url = repository._build_url('collaborators', login, 'permission',
base_url=repository._api)
# Get the data
perms = repository._get(url, headers=headers)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
# no known user, maybe deleted since review?
if perms.status_code == 404:
@@ -507,53 +666,67 @@
return perms.json()['permission']
def commentPull(self, project, pr_number, message):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- repository = self.github.repository(owner, proj)
+ repository = github.repository(owner, proj)
pull_request = repository.issue(pr_number)
pull_request.create_comment(message)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
def mergePull(self, project, pr_number, commit_message='', sha=None):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- pull_request = self.github.pull_request(owner, proj, pr_number)
+ pull_request = github.pull_request(owner, proj, pr_number)
try:
result = pull_request.merge(commit_message=commit_message, sha=sha)
except MethodNotAllowed as e:
raise MergeFailure('Merge was not successful due to mergeability'
' conflict, original error is %s' % e)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
if not result:
raise Exception('Pull request was not merged')
def getCommitStatuses(self, project, sha):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- repository = self.github.repository(owner, proj)
+ repository = github.repository(owner, proj)
commit = repository.commit(sha)
# make a list out of the statuses so that we complete our
# API transaction
statuses = [status.as_dict() for status in commit.statuses()]
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
return statuses
def setCommitStatus(self, project, sha, state, url='', description='',
context=''):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- repository = self.github.repository(owner, proj)
+ repository = github.repository(owner, proj)
repository.create_status(sha, state, url, description, context)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
def labelPull(self, project, pr_number, label):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- pull_request = self.github.issue(owner, proj, pr_number)
+ pull_request = github.issue(owner, proj, pr_number)
pull_request.add_labels(label)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
def unlabelPull(self, project, pr_number, label):
+ github = self.getGithubClient(project)
owner, proj = project.split('/')
- pull_request = self.github.issue(owner, proj, pr_number)
+ pull_request = github.issue(owner, proj, pr_number)
pull_request.remove_label(label)
- log_rate_limit(self.log, self.github)
+ log_rate_limit(self.log, github)
+
+ def getPullOpen(self, project, number):
+ pr = self.getPull(project, number)
+ return pr.get('state') == 'open'
+
+ def getIsCurrent(self, project, number, sha):
+ pr = self.getPull(project, number)
+ return pr.get('head').get('sha') == sha
def _ghTimestampToDate(self, timestamp):
return time.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
diff --git a/zuul/driver/github/githubmodel.py b/zuul/driver/github/githubmodel.py
index bbacc9b..3e25115 100644
--- a/zuul/driver/github/githubmodel.py
+++ b/zuul/driver/github/githubmodel.py
@@ -59,10 +59,11 @@
return False
-class GithubReviewFilter(object):
- def __init__(self, required_reviews=[]):
+class GithubCommonFilter(object):
+ def __init__(self, required_reviews=[], required_statuses=[]):
self._required_reviews = copy.deepcopy(required_reviews)
self.required_reviews = self._tidy_reviews(required_reviews)
+ self.required_statuses = required_statuses
def _tidy_reviews(self, reviews):
for r in reviews:
@@ -126,14 +127,27 @@
return False
return True
+ def matchesRequiredStatuses(self, change):
+ # statuses are ORed
+ # A PR head can have multiple statuses on it. If the change
+ # statuses and the filter statuses are a null intersection, there
+ # are no matches and we return false
+ if self.required_statuses:
+ if set(change.status).isdisjoint(set(self.required_statuses)):
+ return False
+ return True
-class GithubEventFilter(EventFilter):
+
+class GithubEventFilter(EventFilter, GithubCommonFilter):
def __init__(self, trigger, types=[], branches=[], refs=[],
comments=[], actions=[], labels=[], unlabels=[],
- states=[], statuses=[], ignore_deletes=True):
+ states=[], statuses=[], required_statuses=[],
+ ignore_deletes=True):
EventFilter.__init__(self, trigger)
+ GithubCommonFilter.__init__(self, required_statuses=required_statuses)
+
self._types = types
self._branches = branches
self._refs = refs
@@ -147,6 +161,7 @@
self.unlabels = unlabels
self.states = states
self.statuses = statuses
+ self.required_statuses = required_statuses
self.ignore_deletes = ignore_deletes
def __repr__(self):
@@ -172,6 +187,8 @@
ret += ' states: %s' % ', '.join(self.states)
if self.statuses:
ret += ' statuses: %s' % ', '.join(self.statuses)
+ if self.required_statuses:
+ ret += ' required_statuses: %s' % ', '.join(self.required_statuses)
ret += '>'
return ret
@@ -239,15 +256,22 @@
if self.statuses and event.status not in self.statuses:
return False
+ if not self.matchesRequiredStatuses(change):
+ return False
+
return True
-class GithubRefFilter(RefFilter, GithubReviewFilter):
- def __init__(self, statuses=[], required_reviews=[]):
+class GithubRefFilter(RefFilter, GithubCommonFilter):
+ def __init__(self, statuses=[], required_reviews=[], open=None,
+ current_patchset=None):
RefFilter.__init__(self)
- GithubReviewFilter.__init__(self, required_reviews=required_reviews)
+ GithubCommonFilter.__init__(self, required_reviews=required_reviews,
+ required_statuses=statuses)
self.statuses = statuses
+ self.open = open
+ self.current_patchset = current_patchset
def __repr__(self):
ret = '<GithubRefFilter'
@@ -257,18 +281,25 @@
if self.required_reviews:
ret += (' required-reviews: %s' %
str(self.required_reviews))
+ if self.open:
+ ret += ' open: %s' % self.open
+ if self.current_patchset:
+ ret += ' current-patchset: %s' % self.current_patchset
ret += '>'
return ret
def matches(self, change):
- # statuses are ORed
- # A PR head can have multiple statuses on it. If the change
- # statuses and the filter statuses are a null intersection, there
- # are no matches and we return false
- if self.statuses:
- if set(change.status).isdisjoint(set(self.statuses)):
+ if not self.matchesRequiredStatuses(change):
+ return False
+
+ if self.open is not None:
+ if self.open != change.open:
+ return False
+
+ if self.current_patchset is not None:
+ if self.current_patchset != change.is_current_patchset:
return False
# required reviews are ANDed
diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py
index dba5157..68c6af0 100644
--- a/zuul/driver/github/githubreporter.py
+++ b/zuul/driver/github/githubreporter.py
@@ -18,6 +18,7 @@
from zuul.reporter import BaseReporter
from zuul.exceptions import MergeFailure
+from zuul.driver.util import scalar_or_list
class GithubReporter(BaseReporter):
@@ -67,7 +68,7 @@
def setPullStatus(self, pipeline, item):
project = item.change.project.name
sha = item.change.patchset
- context = pipeline.name
+ context = '%s/%s' % (pipeline.layout.tenant.name, pipeline.name)
state = self._commit_status
url = ''
if self.connection.sched.config.has_option('zuul', 'status_url'):
@@ -102,12 +103,12 @@
item.change.is_merged = True
return
except MergeFailure:
- self.log.debug(
+ self.log.exception(
'Merge attempt of change %s %s/2 failed.' %
- (i, item.change))
+ (item.change, i), exc_info=True)
if i == 1:
time.sleep(2)
- self.log.debug(
+ self.log.warning(
'Merge of change %s failed after 2 attempts, giving up' %
item.change)
@@ -154,14 +155,11 @@
def getSchema():
- def toList(x):
- return v.Any([x], x)
-
github_reporter = v.Schema({
'status': v.Any('pending', 'success', 'failure'),
'comment': bool,
'merge': bool,
- 'label': toList(str),
- 'unlabel': toList(str)
+ 'label': scalar_or_list(str),
+ 'unlabel': scalar_or_list(str)
})
return github_reporter
diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py
index 0aada4d..58ca2b9 100644
--- a/zuul/driver/github/githubsource.py
+++ b/zuul/driver/github/githubsource.py
@@ -98,6 +98,8 @@
f = GithubRefFilter(
statuses=to_list(config.get('status')),
required_reviews=to_list(config.get('review')),
+ open=config.get('open'),
+ current_patchset=config.get('current-patchset'),
)
return [f]
@@ -116,7 +118,9 @@
def getRequireSchema():
require = {'status': scalar_or_list(str),
- 'review': scalar_or_list(review)}
+ 'review': scalar_or_list(review),
+ 'open': bool,
+ 'current-patchset': bool}
return require
diff --git a/zuul/driver/github/githubtrigger.py b/zuul/driver/github/githubtrigger.py
index 3269c36..328879d 100644
--- a/zuul/driver/github/githubtrigger.py
+++ b/zuul/driver/github/githubtrigger.py
@@ -16,6 +16,7 @@
import voluptuous as v
from zuul.trigger import BaseTrigger
from zuul.driver.github.githubmodel import GithubEventFilter
+from zuul.driver.util import scalar_or_list, to_list
class GithubTrigger(BaseTrigger):
@@ -23,26 +24,20 @@
log = logging.getLogger("zuul.trigger.GithubTrigger")
def getEventFilters(self, trigger_config):
- def toList(item):
- if not item:
- return []
- if isinstance(item, list):
- return item
- return [item]
-
efilters = []
- for trigger in toList(trigger_config):
+ for trigger in to_list(trigger_config):
f = GithubEventFilter(
trigger=self,
- types=toList(trigger['event']),
- actions=toList(trigger.get('action')),
- branches=toList(trigger.get('branch')),
- refs=toList(trigger.get('ref')),
- comments=toList(trigger.get('comment')),
- labels=toList(trigger.get('label')),
- unlabels=toList(trigger.get('unlabel')),
- states=toList(trigger.get('state')),
- statuses=toList(trigger.get('status'))
+ types=to_list(trigger['event']),
+ actions=to_list(trigger.get('action')),
+ branches=to_list(trigger.get('branch')),
+ refs=to_list(trigger.get('ref')),
+ comments=to_list(trigger.get('comment')),
+ labels=to_list(trigger.get('label')),
+ unlabels=to_list(trigger.get('unlabel')),
+ states=to_list(trigger.get('state')),
+ statuses=to_list(trigger.get('status')),
+ required_statuses=to_list(trigger.get('require-status'))
)
efilters.append(f)
@@ -53,22 +48,20 @@
def getSchema():
- def toList(x):
- return v.Any([x], x)
-
github_trigger = {
v.Required('event'):
- toList(v.Any('pull_request',
- 'pull_request_review',
- 'push')),
- 'action': toList(str),
- 'branch': toList(str),
- 'ref': toList(str),
- 'comment': toList(str),
- 'label': toList(str),
- 'unlabel': toList(str),
- 'state': toList(str),
- 'status': toList(str)
+ scalar_or_list(v.Any('pull_request',
+ 'pull_request_review',
+ 'push')),
+ 'action': scalar_or_list(str),
+ 'branch': scalar_or_list(str),
+ 'ref': scalar_or_list(str),
+ 'comment': scalar_or_list(str),
+ 'label': scalar_or_list(str),
+ 'unlabel': scalar_or_list(str),
+ 'state': scalar_or_list(str),
+ 'require-status': scalar_or_list(str),
+ 'status': scalar_or_list(str)
}
return github_trigger
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index e1eed2d..0d40716 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -169,7 +169,8 @@
self.log.debug("Function %s is not registered" % name)
return False
- def execute(self, job, item, pipeline, dependent_items=[]):
+ def execute(self, job, item, pipeline, dependent_items=[],
+ merger_items=[]):
tenant = pipeline.layout.tenant
uuid = str(uuid4().hex)
self.log.info(
@@ -179,8 +180,11 @@
item.current_build_set.getJobNodeSet(job.name),
item.change,
[x.change for x in dependent_items]))
+
dependent_items = dependent_items[:]
dependent_items.reverse()
+ all_items = dependent_items + [item]
+
# TODOv3(jeblair): This ansible vars data structure will
# replace the environment variables below.
project = dict(
@@ -210,7 +214,7 @@
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,
i.change.refspec)
- for i in dependent_items + [item]])
+ for i in all_items])
params['ZUUL_BRANCH'] = item.change.branch
params['ZUUL_CHANGES'] = changes_str
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
@@ -220,7 +224,7 @@
zuul_changes = ' '.join(['%s,%s' % (i.change.number,
i.change.patchset)
- for i in dependent_items + [item]])
+ for i in all_items])
params['ZUUL_CHANGE_IDS'] = zuul_changes
params['ZUUL_CHANGE'] = str(item.change.number)
params['ZUUL_PATCHSET'] = str(item.change.patchset)
@@ -253,13 +257,11 @@
# ZUUL_OLDREV
# ZUUL_NEWREV
- all_items = dependent_items + [item]
- merger_items = [i.makeMergerItem() for i in all_items]
-
params['job'] = job.name
params['timeout'] = job.timeout
params['items'] = merger_items
params['projects'] = []
+ params['repo_state'] = item.current_build_set.repo_state
if job.name != 'noop':
params['playbooks'] = [x.toDict() for x in job.run]
@@ -284,21 +286,28 @@
params['vars'][secret.name] = copy.deepcopy(secret.secret_data)
params['vars']['zuul'] = zuul_params
projects = set()
+
+ def make_project_dict(project):
+ project_config = item.current_build_set.layout.project_configs.get(
+ project.canonical_name, None)
+ if project_config:
+ project_default_branch = project_config.default_branch
+ else:
+ project_default_branch = 'master'
+ connection = project.source.connection
+ return dict(connection=connection.connection_name,
+ name=project.name,
+ default_branch=project_default_branch)
+
if job.repos:
for repo in job.repos:
(trusted, project) = tenant.getProject(repo)
- connection = project.source.connection
- params['projects'].append(
- dict(connection=connection.connection_name,
- name=project.name))
+ params['projects'].append(make_project_dict(project))
projects.add(project)
for item in all_items:
if item.change.project not in projects:
project = item.change.project
- connection = item.change.project.source.connection
- params['projects'].append(
- dict(connection=connection.connection_name,
- name=project.name))
+ params['projects'].append(make_project_dict(project))
projects.add(project)
build = Build(job, uuid)
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 99d2a9c..bb3ea9e 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -80,7 +80,15 @@
class JobDir(object):
- def __init__(self, root=None, keep=False):
+ def __init__(self, root, keep, build_uuid):
+ '''
+ :param str root: Root directory for the individual job directories.
+ Can be None to use the default system temp root directory.
+ :param bool keep: If True, do not delete the job directory.
+ :param str build_uuid: The unique build UUID. If supplied, this will
+ be used as the temp job directory name. Using this will help the
+ log streaming daemon find job logs.
+ '''
# root
# ansible
# trusted.cfg
@@ -89,7 +97,12 @@
# src
# logs
self.keep = keep
- self.root = tempfile.mkdtemp(dir=root)
+ if root:
+ tmpdir = root
+ else:
+ tmpdir = tempfile.gettempdir()
+ self.root = os.path.join(tmpdir, build_uuid)
+ os.mkdir(self.root, 0o700)
# Work
self.work_root = os.path.join(self.root, 'work')
os.makedirs(self.work_root)
@@ -484,16 +497,14 @@
def merge(self, job):
args = json.loads(job.arguments)
- ret = self.merger.mergeChanges(args['items'], args.get('files'))
+ ret = self.merger.mergeChanges(args['items'], args.get('files'),
+ args.get('repo_state'))
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
- if args.get('files'):
- if ret:
- result['commit'], result['files'] = ret
- else:
- result['commit'], result['files'] = (None, None)
+ if ret is None:
+ result['commit'] = result['files'] = result['repo_state'] = None
else:
- result['commit'] = ret
+ result['commit'], result['files'], result['repo_state'] = ret
job.sendWorkComplete(json.dumps(result))
@@ -533,8 +544,9 @@
def execute(self):
try:
- self.jobdir = JobDir(root=self.executor_server.jobdir_root,
- keep=self.executor_server.keep_jobdir)
+ self.jobdir = JobDir(self.executor_server.jobdir_root,
+ self.executor_server.keep_jobdir,
+ str(self.job.unique))
self._execute()
except Exception:
self.log.exception("Exception while executing job")
@@ -588,13 +600,10 @@
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
- commit = self.doMergeChanges(merge_items)
- if not commit:
+ if not self.doMergeChanges(merge_items, args['repo_state']):
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
return
- else:
- commit = args['items'][-1]['newrev'] # noqa
# Delete the origin remote from each repo we set up since
# it will not be valid within the jobs.
@@ -637,14 +646,15 @@
result = dict(result=result)
self.job.sendWorkComplete(json.dumps(result))
- def doMergeChanges(self, items):
+ def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root)
- commit = merger.mergeChanges(items) # noqa
- if not commit: # merge conflict
+ ret = merger.mergeChanges(items, repo_state=repo_state)
+ if not ret: # merge conflict
result = dict(result='MERGER_FAILURE')
self.job.sendWorkComplete(json.dumps(result))
- return commit
+ return False
+ return True
def runPlaybooks(self, args):
result = None
diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py
new file mode 100644
index 0000000..8bb586f
--- /dev/null
+++ b/zuul/lib/log_streamer.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 IBM Corp.
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import os.path
+import pwd
+import re
+import select
+import socket
+import threading
+import time
+
+try:
+ import SocketServer as ss # python 2.x
+except ImportError:
+ import socketserver as ss # python 3
+
+
+class Log(object):
+
+ def __init__(self, path):
+ self.path = path
+ self.file = open(path)
+ self.stat = os.stat(path)
+ self.size = self.stat.st_size
+
+
+class RequestHandler(ss.BaseRequestHandler):
+ '''
+ Class to handle a single log streaming request.
+
+ The log streaming code was blatantly stolen from zuul_console.py. Only
+ the (class/method/attribute) names were changed to protect the innocent.
+ '''
+
+ def handle(self):
+ build_uuid = self.request.recv(1024).decode("utf-8")
+ build_uuid = build_uuid.rstrip()
+
+ # validate build ID
+ if not re.match("[0-9A-Fa-f]+$", build_uuid):
+ msg = 'Build ID %s is not valid' % build_uuid
+ self.request.sendall(msg.encode("utf-8"))
+ return
+
+ job_dir = os.path.join(self.server.jobdir_root, build_uuid)
+ if not os.path.exists(job_dir):
+ msg = 'Build ID %s not found' % build_uuid
+ self.request.sendall(msg.encode("utf-8"))
+ return
+
+ # check if log file exists
+ log_file = os.path.join(job_dir, 'ansible', 'ansible_log.txt')
+ if not os.path.exists(log_file):
+ msg = 'Log not found for build ID %s' % build_uuid
+ self.request.sendall(msg.encode("utf-8"))
+ return
+
+ self.stream_log(log_file)
+
+ def stream_log(self, log_file):
+ log = None
+ while True:
+ if log is not None:
+ try:
+ log.file.close()
+ except:
+ pass
+ while True:
+ log = self.chunk_log(log_file)
+ if log:
+ break
+ time.sleep(0.5)
+ while True:
+ if self.follow_log(log):
+ break
+ else:
+ return
+
+ def chunk_log(self, log_file):
+ try:
+ log = Log(log_file)
+ except Exception:
+ return
+ while True:
+ chunk = log.file.read(4096)
+ if not chunk:
+ break
+ self.request.send(chunk.encode('utf-8'))
+ return log
+
+ def follow_log(self, log):
+ while True:
+ # As long as we have unread data, keep reading/sending
+ while True:
+ chunk = log.file.read(4096)
+ if chunk:
+ self.request.send(chunk.encode('utf-8'))
+ else:
+ break
+
+ # At this point, we are waiting for more data to be written
+ time.sleep(0.5)
+
+ # Check to see if the remote end has sent any data, if so,
+ # discard
+ r, w, e = select.select([self.request], [], [self.request], 0)
+ if self.request in e:
+ return False
+ if self.request in r:
+ ret = self.request.recv(1024)
+ # Discard anything read, if input is eof, it has
+ # disconnected.
+ if not ret:
+ return False
+
+ # See if the file has been truncated
+ try:
+ st = os.stat(log.path)
+ if (st.st_ino != log.stat.st_ino or
+ st.st_size < log.size):
+ return True
+ except Exception:
+ return True
+ log.size = st.st_size
+
+
+class CustomForkingTCPServer(ss.ForkingTCPServer):
+ '''
+ Custom version that allows us to drop privileges after port binding.
+ '''
+ def __init__(self, *args, **kwargs):
+ self.user = kwargs.pop('user')
+ self.jobdir_root = kwargs.pop('jobdir_root')
+ # For some reason, setting custom attributes does not work if we
+ # call the base class __init__ first. Wha??
+ ss.ForkingTCPServer.__init__(self, *args, **kwargs)
+
+ def change_privs(self):
+ '''
+ Drop our privileges to the zuul user.
+ '''
+ if os.getuid() != 0:
+ return
+ pw = pwd.getpwnam(self.user)
+ os.setgroups([])
+ os.setgid(pw.pw_gid)
+ os.setuid(pw.pw_uid)
+ os.umask(0o022)
+
+ def server_bind(self):
+ self.allow_reuse_address = True
+ ss.ForkingTCPServer.server_bind(self)
+ if self.user:
+ self.change_privs()
+
+ def server_close(self):
+ '''
+ Overridden from base class to shutdown the socket immediately.
+ '''
+ self.socket.shutdown(socket.SHUT_RD)
+ self.socket.close()
+
+
+class LogStreamer(object):
+ '''
+ Class implementing log streaming over the finger daemon port.
+ '''
+
+ def __init__(self, user, host, port, jobdir_root):
+ self.server = CustomForkingTCPServer((host, port),
+ RequestHandler,
+ user=user,
+ jobdir_root=jobdir_root)
+
+ # We start the actual serving within a thread so we can return to
+ # the owner.
+ self.thd = threading.Thread(target=self.server.serve_forever)
+ self.thd.daemon = True
+ self.thd.start()
+
+ def stop(self):
+ if self.thd.isAlive():
+ self.server.shutdown()
+ self.server.server_close()
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 1f2062f..7649944 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -189,17 +189,6 @@
def getFailingDependentItems(self, item):
return None
- def getDependentItems(self, item):
- orig_item = item
- items = []
- while item.item_ahead:
- items.append(item.item_ahead)
- item = item.item_ahead
- self.log.info("Change %s depends on changes %s" %
- (orig_item.change,
- [x.change for x in items]))
- return items
-
def getItemForChange(self, change):
for item in self.pipeline.getAllItems():
if item.change.equals(change):
@@ -361,7 +350,7 @@
def _executeJobs(self, item, jobs):
self.log.debug("Executing jobs for change %s" % item.change)
- dependent_items = self.getDependentItems(item)
+ build_set = item.current_build_set
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
@@ -369,7 +358,8 @@
self.sched.nodepool.useNodeSet(nodeset)
build = self.sched.executor.execute(job, item,
self.pipeline,
- dependent_items)
+ build_set.dependent_items,
+ build_set.merger_items)
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
@@ -499,16 +489,11 @@
self.log.debug("Scheduling merge for item %s (files: %s)" %
(item, files))
- dependent_items = self.getDependentItems(item)
- dependent_items.reverse()
- all_items = dependent_items + [item]
- merger_items = [i.makeMergerItem() for i in all_items]
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
- self.sched.merger.mergeChanges(merger_items,
- item.current_build_set,
- files,
- self.pipeline.precedence)
+ self.sched.merger.mergeChanges(build_set.merger_items,
+ item.current_build_set, files,
+ precedence=self.pipeline.precedence)
return False
def prepareItem(self, item):
@@ -681,6 +666,7 @@
if event.merged:
build_set.commit = event.commit
build_set.files.setFiles(event.files)
+ build_set.repo_state = event.repo_state
elif event.updated:
build_set.commit = item.change.newrev
if not build_set.commit:
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index e164195..c98f20e 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -107,10 +107,11 @@
timeout=300)
return job
- def mergeChanges(self, items, build_set, files=None,
+ def mergeChanges(self, items, build_set, files=None, repo_state=None,
precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(items=items,
- files=files)
+ files=files,
+ repo_state=repo_state)
self.submitJob('merger:merge', data, build_set, precedence)
def getFiles(self, connection_name, project_name, branch, files,
@@ -129,6 +130,7 @@
updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
+ repo_state = data.get('repo_state', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
@@ -136,7 +138,8 @@
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set, zuul_url,
- merged, updated, commit, files)
+ merged, updated, commit, files,
+ repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 714d643..ee83fa0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -14,6 +14,7 @@
# under the License.
import git
+import gitdb
import os
import logging
@@ -124,6 +125,26 @@
ref = repo.refs[refname]
return ref.commit
+ def getRefs(self):
+ repo = self.createRepoObject()
+ return repo.refs
+
+ def setRefs(self, refs):
+ repo = self.createRepoObject()
+ current_refs = {}
+ for ref in repo.refs:
+ current_refs[ref.path] = ref
+ unseen = set(current_refs.keys())
+ for path, hexsha in refs.items():
+ binsha = gitdb.util.to_bin_sha(hexsha)
+ obj = git.objects.Object.new_from_sha(repo, binsha)
+ self.log.debug("Create reference %s", path)
+ git.refs.Reference.create(repo, path, obj, force=True)
+ unseen.discard(path)
+ for path in unseen:
+ self.log.debug("Delete reference %s", path)
+ git.refs.SymbolicReference.delete(repo, ref.path)
+
def checkout(self, ref):
repo = self.createRepoObject()
self.log.debug("Checking out %s" % ref)
@@ -285,6 +306,31 @@
raise Exception("Project %s/%s does not have branch %s" %
(connection_name, project_name, branch))
+ def _saveRepoState(self, connection_name, project_name, repo,
+ repo_state):
+ projects = repo_state.setdefault(connection_name, {})
+ project = projects.setdefault(project_name, {})
+ if project:
+ # We already have a state for this project.
+ return
+ for ref in repo.getRefs():
+ if ref.path.startswith('refs/zuul'):
+ continue
+ if ref.path.startswith('refs/remotes'):
+ continue
+ project[ref.path] = ref.object.hexsha
+
+ def _restoreRepoState(self, connection_name, project_name, repo,
+ repo_state):
+ projects = repo_state.get(connection_name, {})
+ project = projects.get(project_name, {})
+ if not project:
+ # We don't have a state for this project.
+ return
+ self.log.debug("Restore repo state for project %s/%s",
+ connection_name, project_name)
+ repo.setRefs(project)
+
def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project'])
try:
@@ -314,27 +360,13 @@
return commit
- def _mergeItem(self, item, recent):
+ def _mergeItem(self, item, recent, repo_state):
self.log.debug("Processing refspec %s for project %s/%s / %s ref %s" %
(item['refspec'], item['connection'],
item['project'], item['branch'], item['ref']))
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
- # See if we have a commit for this change already in this repo
- zuul_ref = item['branch'] + '/' + item['ref']
- with repo.createRepoObject().git.custom_environment(
- GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection'])):
- commit = repo.getCommitFromRef(zuul_ref)
- if commit:
- self.log.debug(
- "Found commit %s for ref %s" % (commit, zuul_ref))
- # Store this as the most recent commit for this
- # project-branch
- recent[key] = commit
- return commit
-
- self.log.debug("Unable to find commit for ref %s" % (zuul_ref,))
# We need to merge the change
# Get the most recent commit for this project-branch
base = recent.get(key)
@@ -347,7 +379,14 @@
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return None
+ self._restoreRepoState(item['connection'], item['project'], repo,
+ repo_state)
+
base = repo.getBranchHead(item['branch'])
+ # Save the repo state so that later mergers can repeat
+ # this process.
+ self._saveRepoState(item['connection'], item['project'], repo,
+ repo_state)
else:
self.log.debug("Found base commit %s for %s" % (base, key,))
# Merge the change
@@ -362,20 +401,26 @@
# commits of each project-branch
for key, mrc in recent.items():
connection, project, branch = key
+ zuul_ref = None
try:
repo = self.getRepo(connection, project)
zuul_ref = branch + '/' + item['ref']
- repo.createZuulRef(zuul_ref, mrc)
+ if not repo.getCommitFromRef(zuul_ref):
+ repo.createZuulRef(zuul_ref, mrc)
except Exception:
self.log.exception("Unable to set zuul ref %s for "
"item %s" % (zuul_ref, item))
return None
return commit
- def mergeChanges(self, items, files=None):
+ def mergeChanges(self, items, files=None, repo_state=None):
+ # connection+project+branch -> commit
recent = {}
commit = None
read_files = []
+ # connection -> project -> ref -> commit
+ if repo_state is None:
+ repo_state = {}
for item in items:
if item.get("number") and item.get("patchset"):
self.log.debug("Merging for change %s,%s." %
@@ -383,7 +428,7 @@
elif item.get("newrev") and item.get("oldrev"):
self.log.debug("Merging for rev %s with oldrev %s." %
(item["newrev"], item["oldrev"]))
- commit = self._mergeItem(item, recent)
+ commit = self._mergeItem(item, recent, repo_state)
if not commit:
return None
if files:
@@ -394,9 +439,7 @@
project=item['project'],
branch=item['branch'],
files=repo_files))
- if files:
- return commit.hexsha, read_files
- return commit.hexsha
+ return commit.hexsha, read_files, repo_state
def getFiles(self, connection_name, project_name, branch, files):
repo = self.getRepo(connection_name, project_name)
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index c09d7ba..15f1a41 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -103,16 +103,14 @@
def merge(self, job):
args = json.loads(job.arguments)
- ret = self.merger.mergeChanges(args['items'], args.get('files'))
+ ret = self.merger.mergeChanges(args['items'], args.get('files'),
+ args.get('repo_state'))
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
- if args.get('files'):
- if ret:
- result['commit'], result['files'] = ret
- else:
- result['commit'], result['files'] = (None, None)
+ if ret is None:
+ result['commit'] = result['files'] = result['repo_state'] = None
else:
- result['commit'] = ret
+ result['commit'], result['files'], result['repo_state'] = ret
job.sendWorkComplete(json.dumps(result))
def cat(self, job):
diff --git a/zuul/model.py b/zuul/model.py
index 4ae6f9a..bfd4d76 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1131,7 +1131,6 @@
def __init__(self, item):
self.item = item
- self.other_changes = []
self.builds = {}
self.result = None
self.next_build_set = None
@@ -1139,6 +1138,8 @@
self.ref = None
self.commit = None
self.zuul_url = None
+ self.dependent_items = None
+ self.merger_items = None
self.unable_to_merge = False
self.config_error = None # None or an error message string.
self.failing_reasons = []
@@ -1146,6 +1147,7 @@
self.nodesets = {} # job -> nodeset
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
+ self.repo_state = {}
self.layout = None
self.tries = {}
@@ -1159,13 +1161,19 @@
# The change isn't enqueued until after it's created
# so we don't know what the other changes ahead will be
# until jobs start.
- if not self.other_changes:
+ if self.dependent_items is None:
+ items = []
next_item = self.item.item_ahead
while next_item:
- self.other_changes.append(next_item.change)
+ items.append(next_item)
next_item = next_item.item_ahead
+ self.dependent_items = items
if not self.ref:
self.ref = 'Z' + uuid4().hex
+ if self.merger_items is None:
+ items = [self.item] + self.dependent_items
+ items.reverse()
+ self.merger_items = [i.makeMergerItem() for i in items]
def getStateName(self, state_num):
return self.states_map.get(
@@ -1217,9 +1225,26 @@
return self.tries.get(job_name, 0)
def getMergeMode(self):
- if self.layout:
+ # We may be called before this build set has a shadow layout
+ # (ie, we are called to perform the merge to create that
+ # layout). It's possible that the change we are merging will
+ # update the merge-mode for the project, but there's not much
+ # we can do about that here. Instead, do the best we can by
+ # using the nearest shadow layout to determine the merge mode,
+ # or if that fails, the current live layout, or if that fails,
+ # use the default: merge-resolve.
+ item = self.item
+ layout = None
+ while item:
+ layout = item.current_build_set.layout
+ if layout:
+ break
+ item = item.item_ahead
+ if not layout:
+ layout = self.item.pipeline.layout
+ if layout:
project = self.item.change.project
- project_config = self.layout.project_configs.get(
+ project_config = layout.project_configs.get(
project.canonical_name)
if project_config:
return project_config.merge_mode
@@ -1919,6 +1944,7 @@
def __init__(self, name):
self.name = name
self.merge_mode = None
+ self.default_branch = None
self.pipelines = {}
self.private_key_file = None
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a67973e..40d5eb7 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -138,16 +138,18 @@
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
+ :arg dict repo_state: The starting repo state before the merge.
"""
def __init__(self, build_set, zuul_url, merged, updated, commit,
- files):
+ files, repo_state):
self.build_set = build_set
self.zuul_url = zuul_url
self.merged = merged
self.updated = updated
self.commit = commit
self.files = files
+ self.repo_state = repo_state
class NodesProvisionedEvent(ResultEvent):
@@ -316,11 +318,11 @@
self.log.debug("Done adding complete event for build: %s" % build)
def onMergeCompleted(self, build_set, zuul_url, merged, updated,
- commit, files):
+ commit, files, repo_state):
self.log.debug("Adding merge complete event for build set: %s" %
build_set)
event = MergeCompletedEvent(build_set, zuul_url, merged,
- updated, commit, files)
+ updated, commit, files, repo_state)
self.result_event_queue.put(event)
self.wake_event.set()