Merge "Merge master into feature/zuulv3" into feature/zuulv3
diff --git a/.gitignore b/.gitignore
index 21a0a9f..f516785 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+*.sw?
 *.egg
 *.egg-info
 *.pyc
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index 0a1e0e7..c61cea8 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -239,7 +239,7 @@
 instead.  As an example, the OpenStack project uses the following
 script to prepare the workspace for its integration testing:
 
-  https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh
+  https://git.openstack.org/cgit/openstack-infra/devstack-gate/tree/devstack-vm-gate-wrap.sh
 
 Turbo Hipster Worker
 ~~~~~~~~~~~~~~~~~~~~
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 74ce360..b5b8d7b 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -568,8 +568,8 @@
       my_gerrit:
         verified: 1
     failure:
-      gerrit:
-        my_gerrit: -1
+      my_gerrit:
+        verified: -1
 
 This will trigger jobs each time a new patchset (or change) is
 uploaded to Gerrit, and report +/-1 values to Gerrit in the
@@ -704,6 +704,11 @@
   would largely defeat the parallelization of dependent change testing
   that is the main feature of Zuul.  Default: ``false``.
 
+**mutex (optional)**
+  This is a string that names a mutex that should be observed by this
+  job.  Only one build of any job that references the same named mutex
+  will be enqueued at a time.  This applies across all pipelines.
+
 **branch (optional)**
   This job should only be run on matching branches.  This field is
   treated as a regular expression and multiple branches may be
diff --git a/requirements.txt b/requirements.txt
index f626f4c..5818c5f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,9 +1,9 @@
-pbr>=0.5.21,<1.0
+pbr>=1.1.0
 
 argparse
 PyYAML>=3.1.0
 Paste
-WebOb>=1.2.3,<1.3
+WebOb>=1.2.3
 paramiko>=1.8.0
 GitPython>=0.3.3
 ordereddict
@@ -12,7 +12,7 @@
 statsd>=1.0.0,<3.0
 voluptuous>=0.7
 gear>=0.5.7,<1.0.0
-apscheduler>=2.1.1,<3.0
+apscheduler>=3.0
 PrettyTable>=0.6,<0.8
 babel>=1.0
 six>=1.6.0
diff --git a/tests/fixtures/layout-mutex.yaml b/tests/fixtures/layout-mutex.yaml
new file mode 100644
index 0000000..fcd0529
--- /dev/null
+++ b/tests/fixtures/layout-mutex.yaml
@@ -0,0 +1,25 @@
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+jobs:
+  - name: mutex-one
+    mutex: test-mutex
+  - name: mutex-two
+    mutex: test-mutex
+
+projects:
+  - name: org/project
+    check:
+      - project-test1
+      - mutex-one
+      - mutex-two
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index e30147f..757fffe 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -132,6 +132,10 @@
     parameter-function: select_debian_node
   - name: project1-project2-integration
     queue-name: integration
+  - name: mutex-one
+    mutex: test-mutex
+  - name: mutex-two
+    mutex: test-mutex
 
 project-templates:
   - name: test-one-and-two
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 85ac600..b56e227 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2286,6 +2286,70 @@
         self.sched.reconfigure(self.config)
         self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
 
+    def test_mutex(self):
+        "Test job mutexes"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-mutex.yaml')
+        self.sched.reconfigure(self.config)
+
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 3)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'mutex-one')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+
+        self.worker.release('mutex-one')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 3)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test1')
+        self.assertEqual(self.builds[2].name, 'mutex-two')
+        self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+        self.worker.release('mutex-two')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 3)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test1')
+        self.assertEqual(self.builds[2].name, 'mutex-one')
+        self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+        self.worker.release('mutex-one')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 3)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test1')
+        self.assertEqual(self.builds[2].name, 'mutex-two')
+        self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
+
+        self.worker.release('mutex-two')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test1')
+        self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 0)
+
+        self.assertEqual(A.reported, 1)
+        self.assertEqual(B.reported, 1)
+        self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
+
     def test_node_label(self):
         "Test that a job runs on a specific node label"
         self.worker.registerFunction('build:node-project-test1:debian')
@@ -2742,11 +2806,11 @@
                 'tests/fixtures/layout-idle.yaml')
             self.sched.reconfigure(self.config)
             self.registerJobs()
+            self.waitUntilSettled()
 
             # The pipeline triggers every second, so we should have seen
             # several by now.
             time.sleep(5)
-            self.waitUntilSettled()
 
             # Stop queuing timer triggered jobs so that the assertions
             # below don't race against more jobs being queued.
@@ -2754,6 +2818,7 @@
                 'tests/fixtures/layout-no-timer.yaml')
             self.sched.reconfigure(self.config)
             self.registerJobs()
+            self.waitUntilSettled()
 
             self.assertEqual(len(self.builds), 2)
             self.worker.release('.*')
@@ -3412,6 +3477,31 @@
         self.assertEqual('The merge failed! For more information...',
                          self.smtp_messages[0]['body'])
 
+    def test_default_merge_failure_reports(self):
+        """Check that the default merge failure reports are correct."""
+
+        # A should report success, B should report merge failure.
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addPatchset(['conflict'])
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        B.addPatchset(['conflict'])
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(3, len(self.history))  # A jobs
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertIn('Build succeeded', A.messages[1])
+        self.assertIn('Merge Failed', B.messages[1])
+        self.assertIn('automatically merged', B.messages[1])
+        self.assertNotIn('logs.example.com', B.messages[1])
+        self.assertNotIn('SKIPPED', B.messages[1])
+
     def test_swift_instructions(self):
         "Test that the correct swift instructions are sent to the workers"
         self.updateConfigLayout(
diff --git a/tox.ini b/tox.ini
index f9b0df1..79ea939 100644
--- a/tox.ini
+++ b/tox.ini
@@ -17,9 +17,6 @@
 commands =
   python setup.py testr --slowest --testr-args='{posargs}'
 
-[tox:jenkins]
-downloadcache = ~/cache/pip
-
 [testenv:pep8]
 commands = flake8 {posargs}
 
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 396a015..a9dbbc2 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -60,6 +60,7 @@
                'failure-url': str,
                'success-url': str,
                'voting': bool,
+               'mutex': str,
                'branches': to_list(str),
                'files': to_list(str),
                'swift': to_list(swift),
@@ -81,6 +82,7 @@
         job.pre_run = as_list(conf.get('pre-run', job.pre_run))
         job.post_run = as_list(conf.get('post-run', job.post_run))
         job.voting = conf.get('voting', True)
+        job.mutex = conf.get('mutex', None)
 
         job.failure_message = conf.get('failure-message', job.failure_message)
         job.success_message = conf.get('success-message', job.success_message)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index f2ea47a..402528f 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -43,6 +43,14 @@
         self.connection_name = connection_name
         self.connection_config = connection_config
 
+        # Keep track of the sources, triggers and reporters using this
+        # connection
+        self.attached_to = {
+            'source': [],
+            'trigger': [],
+            'reporter': [],
+        }
+
     def onLoad(self):
         pass
 
@@ -51,3 +59,6 @@
 
     def registerScheduler(self, sched):
         self.sched = sched
+
+    def registerUse(self, what, instance):
+        self.attached_to[what].append(instance)
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index a203c24..08a6569 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -47,7 +47,6 @@
     def _handleEvent(self):
         ts, data = self.connection.getEvent()
         if self._stopped:
-            self.connection.eventDone()
             return
         # Gerrit can produce inconsistent data immediately after an
         # event, So ensure that we do not deliver the event to Zuul
@@ -99,16 +98,27 @@
                     Can not get account information." % event.type)
             event.account = None
 
-        # TODOv3(jeblair,jhesketh): this is broken in the main branch and
-        # the fix needs to be merged here
-        # if (event.change_number and
-        # self.connection.sched.getProject(event.project_name)):
         if event.change_number:
-            # Mark the change as needing a refresh in the cache
-            event._needs_refresh = True
+            # TODO(jhesketh): Check if the project exists?
+            # and self.connection.sched.getProject(event.project_name):
 
+            # Call _getChange for the side effect of updating the
+            # cache.  Note that this modifies Change objects outside
+            # the main thread.
+            # NOTE(jhesketh): Ideally we'd just remove the change from the
+            # cache to denote that it needs updating. However the change
+            # object is already used by Item's and hence BuildSet's etc. and
+            # we need to update those objects by reference so that they have
+            # the correct/new information and also avoid hitting gerrit
+            # multiple times.
+            if self.connection.attached_to['source']:
+                self.connection.attached_to['source'][0]._getChange(
+                    event.change_number, event.patch_number, refresh=True)
+                # We only need to do this once since the connection maintains
+                # the cache (which is shared between all the sources)
+                # NOTE(jhesketh): We may couple sources and connections again
+                # at which point this becomes more sensible.
         self.connection.sched.addEvent(event)
-        self.connection.eventDone()
 
     def run(self):
         while True:
@@ -118,6 +128,8 @@
                 self._handleEvent()
             except:
                 self.log.exception("Exception moving Gerrit event:")
+            finally:
+                self.connection.eventDone()
 
 
 class GerritWatcher(threading.Thread):
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 0adb78c..6c3b079 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -127,6 +127,7 @@
            'success-pattern': str,
            'hold-following-changes': bool,
            'voting': bool,
+           'mutex': str,
            'parameter-function': str,
            'branch': toList(str),
            'files': toList(str),
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 5f42c3a..64cc3a7 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -71,12 +71,12 @@
         if 'gerrit' in config.sections():
             connections['gerrit'] = \
                 zuul.connection.gerrit.GerritConnection(
-                    '_legacy_gerrit', dict(config.items('gerrit')))
+                    'gerrit', dict(config.items('gerrit')))
 
         if 'smtp' in config.sections():
             connections['smtp'] = \
                 zuul.connection.smtp.SMTPConnection(
-                    '_legacy_smtp', dict(config.items('smtp')))
+                    'smtp', dict(config.items('smtp')))
 
         self.connections = connections
 
@@ -118,6 +118,9 @@
                 driver_config, self.sched, connection
         )
 
+        if connection:
+            connection.registerUse(dtype, driver_instance)
+
         return driver_instance
 
     def getSource(self, connection_name):
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index e3e0997..ce369f1 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -77,14 +77,16 @@
                     efilters += str(tree.job.skip_if_matcher)
                 if efilters:
                     efilters = ' ' + efilters
-                hold = ''
+                tags = []
                 if tree.job.hold_following_changes:
-                    hold = ' [hold]'
-                voting = ''
+                    tags.append('[hold]')
                 if not tree.job.voting:
-                    voting = ' [nonvoting]'
-                self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
-                                              efilters, hold, voting))
+                    tags.append('[nonvoting]')
+                if tree.job.mutex:
+                    tags.append('[mutex: %s]' % tree.job.mutex)
+                tags = ' '.join(tags)
+                self.log.info("%s%s%s %s" % (istr, repr(tree.job),
+                                             efilters, tags))
             for x in tree.job_trees:
                 log_jobs(x, indent + 2)
 
@@ -348,7 +350,7 @@
     def launchJobs(self, item):
         # TODO(jeblair): This should return a value indicating a job
         # was launched.  Appears to be a longstanding bug.
-        jobs = self.pipeline.findJobsToRun(item)
+        jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
         if jobs:
             self._launchJobs(item, jobs)
 
@@ -474,13 +476,23 @@
 
     def updateBuildDescriptions(self, build_set):
         for build in build_set.getBuilds():
-            desc = self.formatDescription(build)
-            self.sched.launcher.setBuildDescription(build, desc)
+            try:
+                desc = self.formatDescription(build)
+                self.sched.launcher.setBuildDescription(build, desc)
+            except:
+                # Log the failure and let loop continue
+                self.log.error("Failed to update description for build %s" %
+                               (build))
 
         if build_set.previous_build_set:
             for build in build_set.previous_build_set.getBuilds():
-                desc = self.formatDescription(build)
-                self.sched.launcher.setBuildDescription(build, desc)
+                try:
+                    desc = self.formatDescription(build)
+                    self.sched.launcher.setBuildDescription(build, desc)
+                except:
+                    # Log the failure and let loop continue
+                    self.log.error("Failed to update description for "
+                                   "build %s in previous build set" % (build))
 
     def onBuildStarted(self, build):
         self.log.debug("Build %s started" % build)
@@ -491,6 +503,7 @@
         item = build.build_set.item
 
         self.pipeline.setResult(item, build)
+        self.sched.mutex.release(item, build.job)
         self.log.debug("Item %s status is now:\n %s" %
                        (item, item.formatStatus()))
         return True
@@ -503,9 +516,9 @@
         if event.merged:
             build_set.commit = event.commit
         elif event.updated:
-            if not isinstance(item, NullChange):
+            if not isinstance(item.change, NullChange):
                 build_set.commit = item.change.newrev
-        if not build_set.commit:
+        if not build_set.commit and not isinstance(item.change, NullChange):
             self.log.info("Unable to merge change %s" % item.change)
             self.pipeline.setUnableToMerge(item)
 
diff --git a/zuul/model.py b/zuul/model.py
index 22d19b4..2571dc0 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -146,7 +146,7 @@
             return []
         return item.change.filterJobs(tree.getJobs())
 
-    def _findJobsToRun(self, job_trees, item):
+    def _findJobsToRun(self, job_trees, item, mutex):
         torun = []
         for tree in job_trees:
             job = tree.job
@@ -160,20 +160,23 @@
                 else:
                     # There is no build for the root of this job tree,
                     # so we should run it.
-                    torun.append(job)
+                    if mutex.acquire(item, job):
+                        # If this job needs a mutex, either acquire it or make
+                        # sure that we have it before running the job.
+                        torun.append(job)
             # If there is no job, this is a null job tree, and we should
             # run all of its jobs.
             if result == 'SUCCESS' or not job:
-                torun.extend(self._findJobsToRun(tree.job_trees, item))
+                torun.extend(self._findJobsToRun(tree.job_trees, item, mutex))
         return torun
 
-    def findJobsToRun(self, item):
+    def findJobsToRun(self, item, mutex):
         if not item.live:
             return []
         tree = item.job_tree
         if not tree:
             return []
-        return self._findJobsToRun(tree.job_trees, item)
+        return self._findJobsToRun(tree.job_trees, item, mutex)
 
     def haveAllJobsStarted(self, item):
         for job in self.getJobs(item):
@@ -464,6 +467,7 @@
         swift=None,  # TODOv3(jeblair): move to auth
         parameter_function=None,  # TODOv3(jeblair): remove
         success_pattern=None,  # TODOv3(jeblair): remove
+        mutex=None,
     )
 
     def __init__(self, name):
@@ -1051,9 +1055,6 @@
         # an admin command, etc):
         self.forced_pipeline = None
 
-        # Internal mechanism to track if the change needs a refresh from cache
-        self._needs_refresh = False
-
     def __repr__(self):
         ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
 
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index 0f967c7..777b058 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -81,6 +81,8 @@
     def _formatItemReportFailure(self, pipeline, item):
         if item.dequeued_needing_change:
             msg = 'This change depends on a change that failed to merge.\n'
+        elif not pipeline.didMergerSucceed(item):
+            msg = pipeline.merge_failure_message
         else:
             msg = (pipeline.failure_message + '\n\n' +
                    self._formatItemReportJobs(pipeline, item))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 48107e4..0266d68 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -34,6 +34,68 @@
 statsd = extras.try_import('statsd.statsd')
 
 
+class MutexHandler(object):
+    log = logging.getLogger("zuul.MutexHandler")
+
+    def __init__(self):
+        self.mutexes = {}
+
+    def acquire(self, item, job):
+        if not job.mutex:
+            return True
+        mutex_name = job.mutex
+        m = self.mutexes.get(mutex_name)
+        if not m:
+            # The mutex is not held, acquire it
+            self._acquire(mutex_name, item, job.name)
+            return True
+        held_item, held_job_name = m
+        if held_item is item and held_job_name == job.name:
+            # This item already holds the mutex
+            return True
+        held_build = held_item.current_build_set.getBuild(held_job_name)
+        if held_build and held_build.result:
+            # The build that held the mutex is complete, release it
+            # and let the new item have it.
+            self.log.error("Held mutex %s being released because "
+                           "the build that holds it is complete" %
+                           (mutex_name,))
+            self._release(mutex_name, item, job.name)
+            self._acquire(mutex_name, item, job.name)
+            return True
+        return False
+
+    def release(self, item, job):
+        if not job.mutex:
+            return
+        mutex_name = job.mutex
+        m = self.mutexes.get(mutex_name)
+        if not m:
+            # The mutex is not held, nothing to do
+            self.log.error("Mutex can not be released for %s "
+                           "because the mutex is not held" %
+                           (item,))
+            return
+        held_item, held_job_name = m
+        if held_item is item and held_job_name == job.name:
+            # This item holds the mutex
+            self._release(mutex_name, item, job.name)
+            return
+        self.log.error("Mutex can not be released for %s "
+                       "which does not hold it" %
+                       (item,))
+
+    def _acquire(self, mutex_name, item, job_name):
+        self.log.debug("Job %s of item %s acquiring mutex %s" %
+                       (job_name, item, mutex_name))
+        self.mutexes[mutex_name] = (item, job_name)
+
+    def _release(self, mutex_name, item, job_name):
+        self.log.debug("Job %s of item %s releasing mutex %s" %
+                       (job_name, item, mutex_name))
+        del self.mutexes[mutex_name]
+
+
 class ManagementEvent(object):
     """An event that should be processed within the main queue run loop"""
     def __init__(self):
@@ -162,6 +224,7 @@
         self.merger = None
         self.connections = None
         # TODO(jeblair): fix this
+        self.mutex = MutexHandler()
         # Despite triggers being part of the pipeline, there is one trigger set
         # per scheduler. The pipeline handles the trigger filters but since
         # the events are handled by the scheduler itself it needs to handle
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
index e1a137a..dffc643 100644
--- a/zuul/source/gerrit.py
+++ b/zuul/source/gerrit.py
@@ -132,9 +132,6 @@
     def getChange(self, event):
         if event.change_number:
             refresh = False
-            if event._needs_refresh:
-                refresh = True
-                event._needs_refresh = False
             change = self._getChange(event.change_number, event.patch_number,
                                      refresh=refresh)
         elif event.ref:
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index c93a638..d42e3db 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -13,7 +13,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import apscheduler.scheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.cron import CronTrigger
 import logging
 import voluptuous as v
 from zuul.model import EventFilter, TriggerEvent
@@ -26,7 +27,7 @@
 
     def __init__(self, trigger_config={}, sched=None, connection=None):
         super(TimerTrigger, self).__init__(trigger_config, sched, connection)
-        self.apsched = apscheduler.scheduler.Scheduler()
+        self.apsched = BackgroundScheduler()
         self.apsched.start()
 
     def _onTrigger(self, pipeline_name, timespec):
@@ -62,7 +63,7 @@
 
     def postConfig(self):
         for job in self.apsched.get_jobs():
-            self.apsched.unschedule_job(job)
+            job.remove()
         for pipeline in self.sched.layout.pipelines.values():
             for ef in pipeline.manager.event_filters:
                 if ef.trigger != self:
@@ -81,14 +82,11 @@
                         second = parts[5]
                     else:
                         second = None
-                    self.apsched.add_cron_job(self._onTrigger,
-                                              day=dom,
-                                              day_of_week=dow,
-                                              hour=hour,
-                                              minute=minute,
-                                              second=second,
-                                              args=(pipeline.name,
-                                                    timespec,))
+                    trigger = CronTrigger(day=dom, day_of_week=dow, hour=hour,
+                                          minute=minute, second=second)
+
+                    self.apsched.add_job(self._onTrigger, trigger=trigger,
+                                         args=(pipeline.name, timespec,))
 
 
 def getSchema():