Switch the launcher to Gearman.

Remove the Jenkins launcher and add a new Gearman launcher (designed
to be compatible with Jenkins) in its place.

See the documentation for how to set up the Gearman Plugin for
Jenkins.

Change-Id: Ie7224396271d7375f4ea42eebb57f883bc291738
diff --git a/tests/fixtures/custom_functions.py b/tests/fixtures/custom_functions.py
new file mode 100644
index 0000000..e796722
--- /dev/null
+++ b/tests/fixtures/custom_functions.py
@@ -0,0 +1,2 @@
+def select_debian_node(change, params):
+    params['ZUUL_NODE'] = 'debian'
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index ecdd2da..8d6c852 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -1,3 +1,6 @@
+includes:
+  - python-file: custom_functions.py
+
 pipelines:
   - name: check
     manager: IndependentPipelineManager
@@ -46,6 +49,8 @@
   - name: project-testfile
     files:
       - '.*-requires'
+  - name: node-project-test1
+    parameter-function: select_debian_node
 
 project-templates:
   - name: test-one-and-two
@@ -135,3 +140,9 @@
     template:
      - name: test-one-and-two
        projectname: project
+
+  - name: org/node-project
+    gate:
+      - node-project-merge:
+        - node-project-test1
+        - node-project-test2
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index c031ae3..8948ae8 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -1,7 +1,5 @@
-[jenkins]
-server=https://jenkins.example.com
-user=jenkins
-apikey=1234
+[gearman]
+server=127.0.0.1
 
 [gerrit]
 server=review.example.com
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 508af42..4d3351e 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -33,10 +33,12 @@
 import shutil
 import socket
 import string
+from cStringIO import StringIO
 import git
+import gear
 
 import zuul.scheduler
-import zuul.launcher.jenkins
+import zuul.launcher.gearman
 import zuul.trigger.gerrit
 
 FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
@@ -54,7 +56,9 @@
 
 CONFIG.set('zuul', 'git_dir', GIT_ROOT)
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.DEBUG,
+                    format='%(asctime)s %(name)-32s '
+                    '%(levelname)-8s %(message)s')
 
 
 def random_sha1():
@@ -138,11 +142,15 @@
 def job_has_changes(*args):
     job = args[0]
     commits = args[1:]
-    project = job.parameters['ZUUL_PROJECT']
+    if isinstance(job, FakeBuild):
+        parameters = job.parameters
+    else:
+        parameters = json.loads(job.arguments)
+    project = parameters['ZUUL_PROJECT']
     path = os.path.join(GIT_ROOT, project)
     repo = git.Repo(path)
-    ref = job.parameters['ZUUL_REF']
-    sha = job.parameters['ZUUL_COMMIT']
+    ref = parameters['ZUUL_REF']
+    sha = parameters['ZUUL_COMMIT']
     repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
     repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
     commit_messages = ['%s-1' % commit.subject for commit in commits]
@@ -381,240 +389,13 @@
         pass
 
 
-class FakeJenkinsEvent(object):
-    def __init__(self, name, number, parameters, phase, status=None):
-        data = {
-            'build': {
-                'full_url': 'https://server/job/%s/%s/' % (name, number),
-                'number': number,
-                'parameters': parameters,
-                'phase': phase,
-                'url': 'job/%s/%s/' % (name, number),
-            },
-            'name': name,
-            'url': 'job/%s/' % name,
-        }
-        if status:
-            data['build']['status'] = status
-        self.body = json.dumps(data)
+class BuildHistory(object):
+    def __init__(self, **kw):
+        self.__dict__.update(kw)
 
-
-class FakeJenkinsJob(threading.Thread):
-    log = logging.getLogger("zuul.test")
-
-    def __init__(self, jenkins, callback, name, number, parameters):
-        threading.Thread.__init__(self)
-        self.jenkins = jenkins
-        self.callback = callback
-        self.name = name
-        self.number = number
-        self.parameters = parameters
-        self.wait_condition = threading.Condition()
-        self.waiting = False
-        self.aborted = False
-        self.canceled = False
-        self.created = time.time()
-
-    def release(self):
-        self.wait_condition.acquire()
-        self.wait_condition.notify()
-        self.waiting = False
-        self.log.debug("Job %s released" % (self.parameters['UUID']))
-        self.wait_condition.release()
-
-    def isWaiting(self):
-        self.wait_condition.acquire()
-        if self.waiting:
-            ret = True
-        else:
-            ret = False
-        self.wait_condition.release()
-        return ret
-
-    def _wait(self):
-        self.wait_condition.acquire()
-        self.waiting = True
-        self.log.debug("Job %s waiting" % (self.parameters['UUID']))
-        self.wait_condition.wait()
-        self.wait_condition.release()
-
-    def run(self):
-        self.jenkins.fakeEnqueue(self)
-        if self.jenkins.hold_jobs_in_queue:
-            self._wait()
-        self.jenkins.fakeDequeue(self)
-        if self.canceled:
-            self.jenkins.all_jobs.remove(self)
-            return
-        self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
-                                                        self.number,
-                                                        self.parameters,
-                                                        'STARTED'))
-        if self.jenkins.hold_jobs_in_build:
-            self._wait()
-        self.log.debug("Job %s continuing" % (self.parameters['UUID']))
-
-        result = 'SUCCESS'
-        if (('ZUUL_REF' in self.parameters) and
-            self.jenkins.fakeShouldFailTest(self.name,
-                                            self.parameters['ZUUL_REF'])):
-            result = 'FAILURE'
-        if self.aborted:
-            result = 'ABORTED'
-
-        changes = None
-        if 'ZUUL_CHANGE_IDS' in self.parameters:
-            changes = self.parameters['ZUUL_CHANGE_IDS']
-
-        self.jenkins.fakeAddHistory(name=self.name, number=self.number,
-                                    result=result, changes=changes)
-        self.jenkins.lock.acquire()
-        self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
-                                                        self.number,
-                                                        self.parameters,
-                                                        'COMPLETED',
-                                                        result))
-        self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
-                                                        self.number,
-                                                        self.parameters,
-                                                        'FINISHED',
-                                                        result))
-        self.jenkins.all_jobs.remove(self)
-        self.jenkins.lock.release()
-
-
-class FakeJenkins(object):
-    log = logging.getLogger("zuul.test")
-
-    def __init__(self, *args, **kw):
-        self.queue = []
-        self.all_jobs = []
-        self.job_counter = {}
-        self.queue_counter = 0
-        self.job_history = []
-        self.hold_jobs_in_queue = False
-        self.hold_jobs_in_build = False
-        self.fail_tests = {}
-        self.nonexistent_jobs = []
-        self.lock = threading.Lock()
-
-    def fakeEnqueue(self, job):
-        self.queue.append(job)
-
-    def fakeDequeue(self, job):
-        self.queue.remove(job)
-
-    class FakeJobHistory(object):
-        def __init__(self, **kw):
-            self.__dict__.update(kw)
-
-        def __repr__(self):
-            return ("<Completed job, result: %s name: %s #%s changes: %s>" %
-                    (self.result, self.name, self.number, self.changes))
-
-    def fakeAddHistory(self, **kw):
-        self.job_history.append(self.FakeJobHistory(**kw))
-
-    def fakeRelease(self, regex=None):
-        all_jobs = self.all_jobs[:]
-        self.log.debug("releasing jobs %s (%s)" % (regex, len(self.all_jobs)))
-        for job in all_jobs:
-            if not regex or re.match(regex, job.name):
-                self.log.debug("releasing job %s" % (job.parameters['UUID']))
-                job.release()
-            else:
-                self.log.debug("not releasing job %s" %
-                               (job.parameters['UUID']))
-        self.log.debug("done releasing jobs %s (%s)" % (regex,
-                                                        len(self.all_jobs)))
-
-    def fakeAllWaiting(self, regex=None):
-        all_jobs = self.all_jobs[:] + self.queue[:]
-        for job in all_jobs:
-            self.log.debug("job %s %s" % (job.parameters['UUID'],
-                                          job.isWaiting()))
-            if not job.isWaiting():
-                return False
-        return True
-
-    def fakeAddFailTest(self, name, change):
-        l = self.fail_tests.get(name, [])
-        l.append(change)
-        self.fail_tests[name] = l
-
-    def fakeShouldFailTest(self, name, ref):
-        l = self.fail_tests.get(name, [])
-        for change in l:
-            if ref_has_change(ref, change):
-                return True
-        return False
-
-    def build_job(self, name, parameters):
-        if name in self.nonexistent_jobs:
-            raise Exception("Job does not exist")
-        count = self.job_counter.get(name, 0)
-        count += 1
-        self.job_counter[name] = count
-
-        queue_count = self.queue_counter
-        self.queue_counter += 1
-        job = FakeJenkinsJob(self, self.callback, name, count, parameters)
-        job.queue_id = queue_count
-
-        self.all_jobs.append(job)
-        job.start()
-
-    def stop_build(self, name, number):
-        for job in self.all_jobs:
-            if job.name == name and job.number == number:
-                job.aborted = True
-                job.release()
-                return
-
-    def cancel_queue(self, id):
-        for job in self.queue:
-            if job.queue_id == id:
-                job.canceled = True
-                job.release()
-                return
-
-    def get_queue_info(self):
-        items = []
-        for job in self.queue[:]:
-            self.log.debug("Queue info: %s %s" % (job.name,
-                                                  job.parameters['UUID']))
-            paramstr = ''
-            paramlst = []
-            d = {'actions': [{'parameters': paramlst},
-                             {'causes': [{'shortDescription':
-                                          'Started by user Jenkins',
-                                          'userId': 'jenkins',
-                                          'userName': 'Jenkins'}]}],
-                 'blocked': False,
-                 'buildable': True,
-                 'buildableStartMilliseconds': (job.created * 1000) + 5,
-                 'id': job.queue_id,
-                 'inQueueSince': (job.created * 1000),
-                 'params': paramstr,
-                 'stuck': False,
-                 'task': {'color': 'blue',
-                          'name': job.name,
-                          'url': 'https://server/job/%s/' % job.name},
-                 'why': 'Waiting for next available executor'}
-            for k, v in job.parameters.items():
-                paramstr += "\n(StringParameterValue) %s='%s'" % (k, v)
-                pd = {'name': k, 'value': v}
-                paramlst.append(pd)
-            items.append(d)
-        return items
-
-    def set_build_description(self, *args, **kw):
-        pass
-
-
-class FakeJenkinsCallback(zuul.launcher.jenkins.JenkinsCallback):
-    def start(self):
-        pass
+    def __repr__(self):
+        return ("<Completed build, result: %s name: %s #%s changes: %s>" %
+                (self.result, self.name, self.number, self.changes))
 
 
 class FakeURLOpener(object):
@@ -655,21 +436,253 @@
 
     def run(self):
         while True:
-            read_ready = select.select([self.sock, self.wake_read], [], [])[0]
-            for sock in read_ready:
-                if sock is self.sock:
+            poll = select.poll()
+            poll.register(self.sock, select.POLLIN)
+            poll.register(self.wake_read, select.POLLIN)
+            ret = poll.poll()
+            for (fd, event) in ret:
+                if fd == self.sock.fileno():
                     data = self.sock.recvfrom(1024)
                     if not data:
                         return
                     self.stats.append(data[0])
-                else:
-                    # wake_read
+                if fd == self.wake_read:
                     return
 
     def stop(self):
         os.write(self.wake_write, '1\n')
 
 
+class FakeBuild(threading.Thread):
+    log = logging.getLogger("zuul.test")
+
+    def __init__(self, worker, job, number, node):
+        threading.Thread.__init__(self)
+        self.worker = worker
+        self.job = job
+        self.name = job.name.split(':')[1]
+        self.number = number
+        self.node = node
+        self.parameters = json.loads(job.arguments)
+        self.unique = self.parameters['ZUUL_UUID']
+        self.wait_condition = threading.Condition()
+        self.waiting = False
+        self.aborted = False
+        self.created = time.time()
+        self.description = ''
+
+    def release(self):
+        self.wait_condition.acquire()
+        self.wait_condition.notify()
+        self.waiting = False
+        self.log.debug("Build %s released" % self.unique)
+        self.wait_condition.release()
+
+    def isWaiting(self):
+        self.wait_condition.acquire()
+        if self.waiting:
+            ret = True
+        else:
+            ret = False
+        self.wait_condition.release()
+        return ret
+
+    def _wait(self):
+        self.wait_condition.acquire()
+        self.waiting = True
+        self.log.debug("Build %s waiting" % self.unique)
+        self.wait_condition.wait()
+        self.wait_condition.release()
+
+    def run(self):
+        data = {
+            'full_url': 'https://server/job/%s/%s/' % (self.name, self.number),
+            'number': self.number,
+            'master': self.worker.worker_id,
+            }
+
+        self.job.sendWorkData(json.dumps(data))
+        self.job.sendWorkStatus(0, 100)
+
+        if self.worker.hold_jobs_in_build:
+            self._wait()
+        self.log.debug("Build %s continuing" % self.unique)
+
+        self.worker.lock.acquire()
+
+        result = 'SUCCESS'
+        if (('ZUUL_REF' in self.parameters) and
+            self.worker.shouldFailTest(self.name,
+                                       self.parameters['ZUUL_REF'])):
+            result = 'FAILURE'
+        if self.aborted:
+            result = 'ABORTED'
+
+        data = {'result': result}
+        changes = None
+        if 'ZUUL_CHANGE_IDS' in self.parameters:
+            changes = self.parameters['ZUUL_CHANGE_IDS']
+
+        self.worker.build_history.append(
+            BuildHistory(name=self.name, number=self.number,
+                         result=result, changes=changes, node=self.node,
+                         uuid=self.unique, description=self.description)
+            )
+
+        self.job.sendWorkComplete(json.dumps(data))
+        del self.worker.gearman_jobs[self.job.unique]
+        self.worker.running_builds.remove(self)
+        self.worker.lock.release()
+
+
+class FakeWorker(gear.Worker):
+    def __init__(self, worker_id):
+        super(FakeWorker, self).__init__(worker_id)
+        self.gearman_jobs = {}
+        self.build_history = []
+        self.running_builds = []
+        self.build_counter = 0
+        self.fail_tests = {}
+
+        self.hold_jobs_in_build = False
+        self.lock = threading.Lock()
+        self.__work_thread = threading.Thread(target=self.work)
+        self.__work_thread.start()
+
+    def handleJob(self, job):
+        parts = job.name.split(":")
+        cmd = parts[0]
+        name = parts[1]
+        if len(parts) > 2:
+            node = parts[2]
+        else:
+            node = None
+        if cmd == 'build':
+            self.handleBuild(job, name, node)
+        elif cmd == 'stop':
+            self.handleStop(job, name)
+        elif cmd == 'set_description':
+            self.handleSetDescription(job, name)
+
+    def handleBuild(self, job, name, node):
+        build = FakeBuild(self, job, self.build_counter, node)
+        job.build = build
+        self.gearman_jobs[job.unique] = job
+        self.build_counter += 1
+
+        self.running_builds.append(build)
+        build.start()
+
+    def handleStop(self, job, name):
+        self.log.debug("handle stop")
+        unique = job.arguments
+        for build in self.running_builds:
+            if build.unique == unique:
+                build.aborted = True
+                build.release()
+                job.sendWorkComplete()
+                return
+        job.sendWorkFail()
+
+    def handleSetDescription(self, job, name):
+        self.log.debug("handle set description")
+        parameters = json.loads(job.arguments)
+        unique = parameters['unique_id']
+        descr = parameters['html_description']
+        for build in self.running_builds:
+            if build.unique == unique:
+                build.description = descr
+                job.sendWorkComplete()
+                return
+        for build in self.build_history:
+            if build.uuid == unique:
+                build.description = descr
+                job.sendWorkComplete()
+                return
+        job.sendWorkFail()
+
+    def work(self):
+        while self.running:
+            try:
+                job = self.getJob()
+            except gear.InterruptedError:
+                continue
+            try:
+                self.handleJob(job)
+            except:
+                self.log.exception("Worker exception:")
+
+    def addFailTest(self, name, change):
+        l = self.fail_tests.get(name, [])
+        l.append(change)
+        self.fail_tests[name] = l
+
+    def shouldFailTest(self, name, ref):
+        l = self.fail_tests.get(name, [])
+        for change in l:
+            if ref_has_change(ref, change):
+                return True
+        return False
+
+    def release(self, regex=None):
+        builds = self.running_builds[:]
+        self.log.debug("releasing build %s (%s)" % (regex,
+                                                   len(self.running_builds)))
+        for build in builds:
+            if not regex or re.match(regex, build.name):
+                self.log.debug("releasing build %s" %
+                               (build.parameters['ZUUL_UUID']))
+                build.release()
+            else:
+                self.log.debug("not releasing build %s" %
+                               (build.parameters['ZUUL_UUID']))
+        self.log.debug("done releasing builds %s (%s)" %
+                       (regex, len(self.running_builds)))
+
+
+class FakeGearmanServer(gear.Server):
+    def __init__(self):
+        self.hold_jobs_in_queue = False
+        super(FakeGearmanServer, self).__init__(0)
+
+    def getJobForConnection(self, connection, peek=False):
+        for job in self.queue:
+            if not hasattr(job, 'waiting'):
+                if job.name.startswith('build:'):
+                    job.waiting = self.hold_jobs_in_queue
+                else:
+                    job.waiting = False
+            if job.waiting:
+                continue
+            if job.name in connection.functions:
+                if not peek:
+                    self.queue.remove(job)
+                return job
+        return None
+
+    def release(self, regex=None):
+        released = False
+        queue = self.queue[:]
+        self.log.debug("releasing queued job %s (%s)" % (regex,
+                                                         len(self.queue)))
+        for job in queue:
+            cmd, name = job.name.split(':')
+            if cmd != 'build':
+                continue
+            if not regex or re.match(regex, name):
+                self.log.debug("releasing queued job %s" %
+                               job.unique)
+                job.waiting = False
+                released = True
+            else:
+                self.log.debug("not releasing queued job %s" %
+                               job.unique)
+        if released:
+            self.wakeConnections()
+        self.log.debug("done releasing queued jobs %s (%s)" %
+                       (regex, len(self.queue)))
+
+
 class testScheduler(unittest.TestCase):
     log = logging.getLogger("zuul.test")
 
@@ -688,7 +701,7 @@
         init_repo("org/one-job-project")
         init_repo("org/nonvoting-project")
         init_repo("org/templated-project")
-        self.config = CONFIG
+        init_repo("org/node-project")
 
         self.statsd = FakeStatsd()
         os.environ['STATSD_HOST'] = 'localhost'
@@ -698,25 +711,27 @@
         reload(statsd)
         reload(zuul.scheduler)
 
+        self.gearman_server = FakeGearmanServer()
+
+        self.config = ConfigParser.ConfigParser()
+        cfg = StringIO()
+        CONFIG.write(cfg)
+        cfg.seek(0)
+        self.config.readfp(cfg)
+        self.config.set('gearman', 'port', str(self.gearman_server.port))
+
+        self.worker = FakeWorker('fake_worker')
+        self.worker.addServer('127.0.0.1', self.gearman_server.port)
+        self.gearman_server.worker = self.worker
+
         self.sched = zuul.scheduler.Scheduler()
 
-        def jenkinsFactory(*args, **kw):
-            self.fake_jenkins = FakeJenkins()
-            return self.fake_jenkins
-
-        def jenkinsCallbackFactory(*args, **kw):
-            self.fake_jenkins_callback = FakeJenkinsCallback(*args, **kw)
-            return self.fake_jenkins_callback
-
         def URLOpenerFactory(*args, **kw):
             args = [self.fake_gerrit] + list(args)
             return FakeURLOpener(*args, **kw)
 
-        zuul.launcher.jenkins.ExtendedJenkins = jenkinsFactory
-        zuul.launcher.jenkins.JenkinsCallback = jenkinsCallbackFactory
         urllib2.urlopen = URLOpenerFactory
-        self.jenkins = zuul.launcher.jenkins.Jenkins(self.config, self.sched)
-        self.fake_jenkins.callback = self.fake_jenkins_callback
+        self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
 
         zuul.lib.gerrit.Gerrit = FakeGerrit
 
@@ -725,22 +740,131 @@
         self.gerrit.replication_retry_interval = 0.5
         self.fake_gerrit = self.gerrit.gerrit
 
-        self.sched.setLauncher(self.jenkins)
+        self.sched.setLauncher(self.launcher)
         self.sched.setTrigger(self.gerrit)
 
         self.sched.start()
         self.sched.reconfigure(self.config)
         self.sched.resume()
+        self.launcher.gearman.waitForServer()
+        self.registerJobs()
 
     def tearDown(self):
-        self.jenkins.stop()
+        self.launcher.stop()
+        self.worker.shutdown()
+        self.gearman_server.shutdown()
         self.gerrit.stop()
         self.sched.stop()
         self.sched.join()
         self.statsd.stop()
         self.statsd.join()
+        threads = threading.enumerate()
+        if len(threads) > 1:
+            self.log.error("More than one thread is running: %s" % threads)
         #shutil.rmtree(TEST_ROOT)
 
+    def registerJobs(self):
+        count = 0
+        for job in self.sched.jobs.keys():
+            self.worker.registerFunction('build:' + job)
+            count += 1
+        self.worker.registerFunction('stop:' + self.worker.worker_id)
+        count += 1
+
+        while len(self.gearman_server.functions) < count:
+            time.sleep(0)
+
+    def release(self, job):
+        if isinstance(job, FakeBuild):
+            job.release()
+        else:
+            job.waiting = False
+            self.log.debug("Queued job %s released" % job.unique)
+            self.gearman_server.wakeConnections()
+
+    def getParameter(self, job, name):
+        if isinstance(job, FakeBuild):
+            return job.parameters[name]
+        else:
+            parameters = json.loads(job.arguments)
+            return parameters[name]
+
+    def resetGearmanServer(self):
+        self.worker.setFunctions([])
+        while True:
+            done = True
+            for connection in self.gearman_server.active_connections:
+                if connection.functions:
+                    done = False
+            if done:
+                break
+            time.sleep(0)
+        self.gearman_server.functions = set()
+
+    def haveAllBuildsReported(self):
+        # See if Zuul is waiting on a meta job to complete
+        if self.launcher.meta_jobs:
+            return False
+        # Find out if every build that the worker has completed has been
+        # reported back to Zuul.  If it hasn't then that means a Gearman
+        # event is still in transit and the system is not stable.
+        for build in self.worker.build_history:
+            zbuild = self.launcher.builds.get(build.uuid)
+            if not zbuild:
+                # It has already been reported
+                continue
+            # It hasn't been reported yet.
+            return False
+        # Make sure that none of the worker connections are in GRAB_WAIT
+        for connection in self.worker.active_connections:
+            if connection.state == 'GRAB_WAIT':
+                return False
+        return True
+
+    def areAllBuildsWaiting(self):
+        ret = True
+
+        builds = self.launcher.builds.values()
+        for build in builds:
+            client_job = None
+            for conn in self.launcher.gearman.active_connections:
+                for j in conn.related_jobs.values():
+                    if j.unique == build.uuid:
+                        client_job = j
+                        break
+            if not client_job:
+                self.log.debug("%s is not known to the gearman client" %
+                               build)
+                ret = False
+                continue
+            if not client_job.handle:
+                self.log.debug("%s has no handle" % client_job)
+                ret = False
+                continue
+            server_job = self.gearman_server.jobs.get(client_job.handle)
+            if not server_job:
+                self.log.debug("%s is not known to the gearman server" %
+                               client_job)
+                ret = False
+                continue
+            if not hasattr(server_job, 'waiting'):
+                self.log.debug("%s is being enqueued" % server_job)
+                ret = False
+                continue
+            if server_job.waiting:
+                continue
+            worker_job = self.worker.gearman_jobs.get(server_job.unique)
+            if worker_job:
+                if worker_job.build.isWaiting():
+                    continue
+                else:
+                    self.log.debug("%s is running" % worker_job)
+                    ret = False
+            else:
+                self.log.debug("%s is unassigned" % server_job)
+                ret = False
+        return ret
+
     def waitUntilSettled(self):
         self.log.debug("Waiting until settled...")
         start = time.time()
@@ -750,23 +874,25 @@
                 print self.sched.trigger_event_queue.empty(),
                 print self.sched.result_event_queue.empty(),
                 print self.fake_gerrit.event_queue.empty(),
+                print self.areAllBuildsWaiting()
                 raise Exception("Timeout waiting for Zuul to settle")
-            # Make sure our fake jenkins doesn't end any jobs
-            # (and therefore, emit events) while we're checking
-            self.fake_jenkins.lock.acquire()
-            # Join ensures that the queue is empty _and_ events have been
-            # processed
-            self.fake_gerrit.event_queue.join()
-            self.sched.trigger_event_queue.join()
-            self.sched.result_event_queue.join()
-            if (self.sched.trigger_event_queue.empty() and
-                self.sched.result_event_queue.empty() and
-                self.fake_gerrit.event_queue.empty() and
-                self.fake_jenkins.fakeAllWaiting()):
-                self.fake_jenkins.lock.release()
-                self.log.debug("...settled.")
-                return
-            self.fake_jenkins.lock.release()
+            # Make sure no new events show up while we're checking
+            self.worker.lock.acquire()
+            # have all build states propogated to zuul?
+            if self.haveAllBuildsReported():
+                # Join ensures that the queue is empty _and_ events have been
+                # processed
+                self.fake_gerrit.event_queue.join()
+                self.sched.trigger_event_queue.join()
+                self.sched.result_event_queue.join()
+                if (self.sched.trigger_event_queue.empty() and
+                    self.sched.result_event_queue.empty() and
+                    self.fake_gerrit.event_queue.empty() and
+                    self.areAllBuildsWaiting()):
+                    self.worker.lock.release()
+                    self.log.debug("...settled.")
+                    return
+            self.worker.lock.release()
             self.sched.wake_event.wait(0.1)
 
     def countJobResults(self, jobs, result):
@@ -802,18 +928,20 @@
 
     def test_jobs_launched(self):
         "Test that jobs are launched and a change is merged"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert 'project-merge' in job_names
         assert 'project-test1' in job_names
         assert 'project-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
-        assert jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         self.assertEmptyQueues()
@@ -830,7 +958,10 @@
 
     def test_parallel_changes(self):
         "Test that changes are tested in parallel and merged in series"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -843,62 +974,60 @@
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        assert len(jobs) == 1
-        assert jobs[0].name == 'project-merge'
-        assert job_has_changes(jobs[0], A)
+        assert len(builds) == 1
+        assert builds[0].name == 'project-merge'
+        assert job_has_changes(builds[0], A)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        assert len(jobs) == 3
-        assert jobs[0].name == 'project-test1'
-        assert job_has_changes(jobs[0], A)
-        assert jobs[1].name == 'project-test2'
-        assert job_has_changes(jobs[1], A)
-        assert jobs[2].name == 'project-merge'
-        assert job_has_changes(jobs[2], A, B)
+        assert len(builds) == 3
+        assert builds[0].name == 'project-test1'
+        assert job_has_changes(builds[0], A)
+        assert builds[1].name == 'project-test2'
+        assert job_has_changes(builds[1], A)
+        assert builds[2].name == 'project-merge'
+        assert job_has_changes(builds[2], A, B)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        assert len(jobs) == 5
-        assert jobs[0].name == 'project-test1'
-        assert job_has_changes(jobs[0], A)
-        assert jobs[1].name == 'project-test2'
-        assert job_has_changes(jobs[1], A)
+        assert len(builds) == 5
+        assert builds[0].name == 'project-test1'
+        assert job_has_changes(builds[0], A)
+        assert builds[1].name == 'project-test2'
+        assert job_has_changes(builds[1], A)
 
-        assert jobs[2].name == 'project-test1'
-        assert job_has_changes(jobs[2], A, B)
-        assert jobs[3].name == 'project-test2'
-        assert job_has_changes(jobs[3], A, B)
+        assert builds[2].name == 'project-test1'
+        assert job_has_changes(builds[2], A, B)
+        assert builds[3].name == 'project-test2'
+        assert job_has_changes(builds[3], A, B)
 
-        assert jobs[4].name == 'project-merge'
-        assert job_has_changes(jobs[4], A, B, C)
+        assert builds[4].name == 'project-merge'
+        assert job_has_changes(builds[4], A, B, C)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        assert len(jobs) == 6
-        assert jobs[0].name == 'project-test1'
-        assert job_has_changes(jobs[0], A)
-        assert jobs[1].name == 'project-test2'
-        assert job_has_changes(jobs[1], A)
+        assert len(builds) == 6
+        assert builds[0].name == 'project-test1'
+        assert job_has_changes(builds[0], A)
+        assert builds[1].name == 'project-test2'
+        assert job_has_changes(builds[1], A)
 
-        assert jobs[2].name == 'project-test1'
-        assert job_has_changes(jobs[2], A, B)
-        assert jobs[3].name == 'project-test2'
-        assert job_has_changes(jobs[3], A, B)
+        assert builds[2].name == 'project-test1'
+        assert job_has_changes(builds[2], A, B)
+        assert builds[3].name == 'project-test2'
+        assert job_has_changes(builds[3], A, B)
 
-        assert jobs[4].name == 'project-test1'
-        assert job_has_changes(jobs[4], A, B, C)
-        assert jobs[5].name == 'project-test2'
-        assert job_has_changes(jobs[5], A, B, C)
+        assert builds[4].name == 'project-test1'
+        assert job_has_changes(builds[4], A, B, C)
+        assert builds[5].name == 'project-test2'
+        assert job_has_changes(builds[5], A, B, C)
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
-        assert len(jobs) == 0
+        assert len(builds) == 0
 
-        jobs = self.fake_jenkins.job_history
-        assert len(jobs) == 9
+        assert len(history) == 9
         assert A.data['status'] == 'MERGED'
         assert B.data['status'] == 'MERGED'
         assert C.data['status'] == 'MERGED'
@@ -909,6 +1038,9 @@
 
     def test_failed_changes(self):
         "Test that a change behind a failed change is retested"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         A.addApproval('CRVW', 2)
@@ -917,11 +1049,10 @@
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
 
-        self.fake_jenkins.fakeAddFailTest('project-test1', A)
+        self.worker.addFailTest('project-test1', A)
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        assert len(jobs) > 6
+        assert len(history) > 6
         assert A.data['status'] == 'NEW'
         assert B.data['status'] == 'MERGED'
         assert A.reported == 2
@@ -930,7 +1061,10 @@
 
     def test_independent_queues(self):
         "Test that changes end up in the right queues"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
@@ -942,33 +1076,31 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
 
-        jobs = self.fake_jenkins.all_jobs
         self.waitUntilSettled()
 
         # There should be one merge job at the head of each queue running
-        assert len(jobs) == 2
-        assert jobs[0].name == 'project-merge'
-        assert job_has_changes(jobs[0], A)
-        assert jobs[1].name == 'project1-merge'
-        assert job_has_changes(jobs[1], B)
+        assert len(builds) == 2
+        assert builds[0].name == 'project-merge'
+        assert job_has_changes(builds[0], A)
+        assert builds[1].name == 'project1-merge'
+        assert job_has_changes(builds[1], B)
 
-        # Release the current merge jobs
-        self.fake_jenkins.fakeRelease('.*-merge')
+        # Release the current merge builds
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
         # Release the merge job for project2 which is behind project1
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
-        # All the test jobs should be running:
+        # All the test builds should be running:
         # project1 (3) + project2 (3) + project (2) = 8
-        assert len(jobs) == 8
+        assert len(builds) == 8
 
-        self.fake_jenkins.fakeRelease()
+        self.worker.release()
         self.waitUntilSettled()
-        assert len(jobs) == 0
+        assert len(builds) == 0
 
-        jobs = self.fake_jenkins.job_history
-        assert len(jobs) == 11
+        assert len(history) == 11
         assert A.data['status'] == 'MERGED'
         assert B.data['status'] == 'MERGED'
         assert C.data['status'] == 'MERGED'
@@ -979,8 +1111,10 @@
 
     def test_failed_change_at_head(self):
         "Test that if a change at the head fails, jobs behind it are canceled"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -988,52 +1122,45 @@
         B.addApproval('CRVW', 2)
         C.addApproval('CRVW', 2)
 
-        self.fake_jenkins.fakeAddFailTest('project-test1', A)
+        self.worker.addFailTest('project-test1', A)
 
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
-        assert len(jobs) == 1
-        assert jobs[0].name == 'project-merge'
-        assert job_has_changes(jobs[0], A)
+        assert len(builds) == 1
+        assert builds[0].name == 'project-merge'
+        assert job_has_changes(builds[0], A)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
-        assert len(jobs) == 6
-        assert jobs[0].name == 'project-test1'
-        assert jobs[1].name == 'project-test2'
-        assert jobs[2].name == 'project-test1'
-        assert jobs[3].name == 'project-test2'
-        assert jobs[4].name == 'project-test1'
-        assert jobs[5].name == 'project-test2'
+        assert len(builds) == 6
+        assert builds[0].name == 'project-test1'
+        assert builds[1].name == 'project-test2'
+        assert builds[2].name == 'project-test1'
+        assert builds[3].name == 'project-test2'
+        assert builds[4].name == 'project-test1'
+        assert builds[5].name == 'project-test2'
 
-        jobs[0].release()
+        self.release(builds[0])
         self.waitUntilSettled()
 
-        assert len(jobs) == 2  # project-test2, project-merge for B
-        assert self.countJobResults(finished_jobs, 'ABORTED') == 4
+        assert len(builds) == 2  # project-test2, project-merge for B
+        assert self.countJobResults(history, 'ABORTED') == 4
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
-        assert len(jobs) == 0
-        assert len(finished_jobs) == 15
+        assert len(builds) == 0
+        assert len(history) == 15
         assert A.data['status'] == 'NEW'
         assert B.data['status'] == 'MERGED'
         assert C.data['status'] == 'MERGED'
@@ -1044,8 +1171,11 @@
 
     def test_failed_change_at_head_with_queue(self):
         "Test that if a change at the head fails, queued jobs are canceled"
-        self.fake_jenkins.hold_jobs_in_queue = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+        queue = self.gearman_server.queue
 
+        self.gearman_server.hold_jobs_in_queue = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1053,51 +1183,47 @@
         B.addApproval('CRVW', 2)
         C.addApproval('CRVW', 2)
 
-        self.fake_jenkins.fakeAddFailTest('project-test1', A)
+        self.worker.addFailTest('project-test1', A)
 
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-        queue = self.fake_jenkins.queue
-
-        assert len(jobs) == 1
+        assert len(builds) == 0
         assert len(queue) == 1
-        assert jobs[0].name == 'project-merge'
-        assert job_has_changes(jobs[0], A)
+        assert queue[0].name == 'build:project-merge'
+        assert job_has_changes(queue[0], A)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
 
-        assert len(jobs) == 6
+        assert len(builds) == 0
         assert len(queue) == 6
-        assert jobs[0].name == 'project-test1'
-        assert jobs[1].name == 'project-test2'
-        assert jobs[2].name == 'project-test1'
-        assert jobs[3].name == 'project-test2'
-        assert jobs[4].name == 'project-test1'
-        assert jobs[5].name == 'project-test2'
+        assert queue[0].name == 'build:project-test1'
+        assert queue[1].name == 'build:project-test2'
+        assert queue[2].name == 'build:project-test1'
+        assert queue[3].name == 'build:project-test2'
+        assert queue[4].name == 'build:project-test1'
+        assert queue[5].name == 'build:project-test2'
 
-        jobs[0].release()
+        self.release(queue[0])
         self.waitUntilSettled()
 
-        assert len(jobs) == 2  # project-test2, project-merge for B
-        assert len(queue) == 2
-        assert self.countJobResults(finished_jobs, 'ABORTED') == 0
+        assert len(builds) == 0
+        assert len(queue) == 2  # project-test2, project-merge for B
+        assert self.countJobResults(history, 'ABORTED') == 0
 
-        self.fake_jenkins.hold_jobs_in_queue = False
-        self.fake_jenkins.fakeRelease()
+        self.gearman_server.hold_jobs_in_queue = False
+        self.gearman_server.release()
         self.waitUntilSettled()
 
-        assert len(jobs) == 0
-        assert len(finished_jobs) == 11
+        assert len(builds) == 0
+        assert len(history) == 11
         assert A.data['status'] == 'NEW'
         assert B.data['status'] == 'MERGED'
         assert C.data['status'] == 'MERGED'
@@ -1170,7 +1296,11 @@
 
     def test_build_configuration(self):
         "Test that zuul merges the right commits for testing"
-        self.fake_jenkins.hold_jobs_in_queue = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+        queue = self.gearman_server.queue
+
+        self.gearman_server.hold_jobs_in_queue = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1182,18 +1312,16 @@
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
 
-        ref = jobs[-1].parameters['ZUUL_REF']
-        self.fake_jenkins.hold_jobs_in_queue = False
-        self.fake_jenkins.fakeRelease()
+        ref = self.getParameter(queue[-1], 'ZUUL_REF')
+        self.gearman_server.hold_jobs_in_queue = False
+        self.gearman_server.release()
         self.waitUntilSettled()
 
         path = os.path.join(GIT_ROOT, "org/project")
@@ -1206,7 +1334,11 @@
 
     def test_build_configuration_conflict(self):
         "Test that merge conflicts are handled"
-        self.fake_jenkins.hold_jobs_in_queue = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+        queue = self.gearman_server.queue
+
+        self.gearman_server.hold_jobs_in_queue = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addPatchset(['conflict'])
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@@ -1220,17 +1352,15 @@
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        ref = jobs[-1].parameters['ZUUL_REF']
-        self.fake_jenkins.hold_jobs_in_queue = False
-        self.fake_jenkins.fakeRelease()
+        ref = self.getParameter(queue[-1], 'ZUUL_REF')
+        self.gearman_server.hold_jobs_in_queue = False
+        self.gearman_server.release()
         self.waitUntilSettled()
 
         assert A.data['status'] == 'MERGED'
@@ -1243,6 +1373,9 @@
 
     def test_post(self):
         "Test that post jobs run"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         e = {
             "type": "ref-updated",
             "submitter": {
@@ -1258,15 +1391,18 @@
         self.fake_gerrit.addEvent(e)
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
-        assert len(jobs) == 1
+        job_names = [x.name for x in history]
+        assert len(history) == 1
         assert 'project-post' in job_names
         self.assertEmptyQueues()
 
     def test_build_configuration_branch(self):
         "Test that the right commits are on alternate branches"
-        self.fake_jenkins.hold_jobs_in_queue = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+        queue = self.gearman_server.queue
+
+        self.gearman_server.hold_jobs_in_queue = True
         A = self.fake_gerrit.addFakeChange('org/project', 'mp', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'mp', 'C')
@@ -1278,17 +1414,15 @@
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        ref = jobs[-1].parameters['ZUUL_REF']
-        self.fake_jenkins.hold_jobs_in_queue = False
-        self.fake_jenkins.fakeRelease()
+        ref = self.getParameter(queue[-1], 'ZUUL_REF')
+        self.gearman_server.hold_jobs_in_queue = False
+        self.gearman_server.release()
         self.waitUntilSettled()
 
         path = os.path.join(GIT_ROOT, "org/project")
@@ -1312,7 +1446,11 @@
 
     def test_build_configuration_multi_branch(self):
         "Test that dependent changes on multiple branches are merged"
-        self.fake_jenkins.hold_jobs_in_queue = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+        queue = self.gearman_server.queue
+
+        self.gearman_server.hold_jobs_in_queue = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1324,18 +1462,16 @@
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        ref_mp = jobs[-1].parameters['ZUUL_REF']
-        self.fake_jenkins.fakeRelease('.*-merge')
+        ref_mp = self.getParameter(queue[-1], 'ZUUL_REF')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        ref_master = jobs[-1].parameters['ZUUL_REF']
-        self.fake_jenkins.hold_jobs_in_queue = False
-        self.fake_jenkins.fakeRelease()
+        ref_master = self.getParameter(queue[-1], 'ZUUL_REF')
+        self.gearman_server.hold_jobs_in_queue = False
+        self.gearman_server.release()
         self.waitUntilSettled()
 
         path = os.path.join(GIT_ROOT, "org/project")
@@ -1366,9 +1502,6 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         assert B.data['status'] == 'MERGED'
@@ -1377,20 +1510,25 @@
 
     def test_job_from_templates_launched(self):
         "Test whether a job generated via a template can be launched"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange(
             'org/templated-project', 'master', 'A')
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
 
         assert 'project-test1' in job_names
         assert 'project-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
 
     def test_dependent_changes_dequeue(self):
         "Test that dependent patches are not needlessly tested"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1407,7 +1545,7 @@
         B.setDependsOn(A, 1)
         A.setDependsOn(M1, 1)
 
-        self.fake_jenkins.fakeAddFailTest('project-merge', A)
+        self.worker.addFailTest('project-merge', A)
 
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
@@ -1415,29 +1553,23 @@
 
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
         assert A.data['status'] == 'NEW'
         assert A.reported == 2
         assert B.data['status'] == 'NEW'
         assert B.reported == 2
         assert C.data['status'] == 'NEW'
         assert C.reported == 2
-        assert len(finished_jobs) == 1
+        assert len(history) == 1
         self.assertEmptyQueues()
 
     def test_head_is_dequeued_once(self):
         "Test that if a change at the head fails it is dequeued only once"
         # If it's dequeued more than once, we should see extra
         # aborted jobs.
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
@@ -1445,52 +1577,50 @@
         B.addApproval('CRVW', 2)
         C.addApproval('CRVW', 2)
 
-        self.fake_jenkins.fakeAddFailTest('project1-test1', A)
-        self.fake_jenkins.fakeAddFailTest('project1-test2', A)
-        self.fake_jenkins.fakeAddFailTest('project1-project2-integration', A)
+        self.worker.addFailTest('project1-test1', A)
+        self.worker.addFailTest('project1-test2', A)
+        self.worker.addFailTest('project1-project2-integration', A)
 
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
-        assert len(jobs) == 1
-        assert jobs[0].name == 'project1-merge'
-        assert job_has_changes(jobs[0], A)
+        assert len(builds) == 1
+        assert builds[0].name == 'project1-merge'
+        assert job_has_changes(builds[0], A)
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
-        assert len(jobs) == 9
-        assert jobs[0].name == 'project1-test1'
-        assert jobs[1].name == 'project1-test2'
-        assert jobs[2].name == 'project1-project2-integration'
-        assert jobs[3].name == 'project1-test1'
-        assert jobs[4].name == 'project1-test2'
-        assert jobs[5].name == 'project1-project2-integration'
-        assert jobs[6].name == 'project1-test1'
-        assert jobs[7].name == 'project1-test2'
-        assert jobs[8].name == 'project1-project2-integration'
+        assert len(builds) == 9
+        assert builds[0].name == 'project1-test1'
+        assert builds[1].name == 'project1-test2'
+        assert builds[2].name == 'project1-project2-integration'
+        assert builds[3].name == 'project1-test1'
+        assert builds[4].name == 'project1-test2'
+        assert builds[5].name == 'project1-project2-integration'
+        assert builds[6].name == 'project1-test1'
+        assert builds[7].name == 'project1-test2'
+        assert builds[8].name == 'project1-project2-integration'
 
-        jobs[0].release()
+        self.release(builds[0])
         self.waitUntilSettled()
 
-        assert len(jobs) == 3  # test2, integration, merge for B
-        assert self.countJobResults(finished_jobs, 'ABORTED') == 6
+        assert len(builds) == 3  # test2, integration, merge for B
+        assert self.countJobResults(history, 'ABORTED') == 6
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        assert len(jobs) == 0
-        assert len(finished_jobs) == 20
+        assert len(builds) == 0
+        assert len(history) == 20
 
         assert A.data['status'] == 'NEW'
         assert B.data['status'] == 'MERGED'
@@ -1502,62 +1632,68 @@
 
     def test_nonvoting_job(self):
         "Test that non-voting jobs don't vote."
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/nonvoting-project',
                                            'master', 'A')
         A.addApproval('CRVW', 2)
-        self.fake_jenkins.fakeAddFailTest('nonvoting-project-test2', A)
+        self.worker.addFailTest('nonvoting-project-test2', A)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
-        assert finished_jobs[0].result == 'SUCCESS'
-        assert finished_jobs[1].result == 'SUCCESS'
-        assert finished_jobs[2].result == 'FAILURE'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'FAILURE'
         self.assertEmptyQueues()
 
     def test_check_queue_success(self):
         "Test successful check queue jobs."
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
         assert A.data['status'] == 'NEW'
         assert A.reported == 1
-        assert finished_jobs[0].result == 'SUCCESS'
-        assert finished_jobs[1].result == 'SUCCESS'
-        assert finished_jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         self.assertEmptyQueues()
 
     def test_check_queue_failure(self):
         "Test failed check queue jobs."
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        self.fake_jenkins.fakeAddFailTest('project-test2', A)
+        self.worker.addFailTest('project-test2', A)
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
 
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
         assert A.data['status'] == 'NEW'
         assert A.reported == 1
-        assert finished_jobs[0].result == 'SUCCESS'
-        assert finished_jobs[1].result == 'SUCCESS'
-        assert finished_jobs[2].result == 'FAILURE'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'FAILURE'
         self.assertEmptyQueues()
 
     def test_dependent_behind_dequeue(self):
         "test that dependent changes behind dequeued changes work"
         # This complicated test is a reproduction of a real life bug
         self.sched.reconfigure(self.config)
-        self.fake_jenkins.hold_jobs_in_build = True
 
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
@@ -1574,8 +1710,6 @@
         F.addApproval('CRVW', 2)
 
         A.fail_merge = True
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
 
         # Change object re-use in the gerrit trigger is hidden if
         # changes are added in quick succession; waiting makes it more
@@ -1585,9 +1719,9 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
         self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
@@ -1599,38 +1733,30 @@
         self.fake_gerrit.addEvent(F.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
-        for x in jobs:
-            print x
         # all jobs running
 
         # Grab pointers to the jobs we want to release before
         # releasing any, because list indexes may change as
         # the jobs complete.
-        a, b, c = jobs[:3]
+        a, b, c = builds[:3]
         a.release()
         b.release()
         c.release()
         self.waitUntilSettled()
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-        print self.sched.formatStatusHTML()
-
         assert A.data['status'] == 'NEW'
         assert B.data['status'] == 'MERGED'
         assert C.data['status'] == 'MERGED'
@@ -1645,24 +1771,26 @@
         assert E.reported == 2
         assert F.reported == 2
 
-        assert self.countJobResults(finished_jobs, 'ABORTED') == 15
-        assert len(finished_jobs) == 44
+        assert self.countJobResults(history, 'ABORTED') == 15
+        assert len(history) == 44
         self.assertEmptyQueues()
 
     def test_merger_repack(self):
         "Test that the merger works after a repack"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert 'project-merge' in job_names
         assert 'project-test1' in job_names
         assert 'project-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
-        assert jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         self.assertEmptyQueues()
@@ -1674,14 +1802,13 @@
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert 'project-merge' in job_names
         assert 'project-test1' in job_names
         assert 'project-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
-        assert jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         self.assertEmptyQueues()
@@ -1689,6 +1816,9 @@
     def test_merger_repack_large_change(self):
         "Test that the merger works with large changes after a repack"
         # https://bugs.launchpad.net/zuul/+bug/1078946
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(UPSTREAM_ROOT, "org/project1")
@@ -1699,22 +1829,25 @@
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert 'project1-merge' in job_names
         assert 'project1-test1' in job_names
         assert 'project1-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
-        assert jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         self.assertEmptyQueues()
 
     def test_nonexistent_job(self):
         "Test launching a job that doesn't exist"
-        self.fake_jenkins.nonexistent_jobs.append('project-merge')
-        self.jenkins.launch_retry_timeout = 0.1
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        # Set to the state immediately after a restart
+        self.resetGearmanServer()
+        self.launcher.negative_function_cache_ttl = 0
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
@@ -1722,35 +1855,33 @@
         # There may be a thread about to report a lost change
         while A.reported < 2:
             self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert not job_names
         assert A.data['status'] == 'NEW'
         assert A.reported == 2
         self.assertEmptyQueues()
 
         # Make sure things still work:
-        self.fake_jenkins.nonexistent_jobs = []
+        self.registerJobs()
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
-        jobs = self.fake_jenkins.job_history
-        job_names = [x.name for x in jobs]
+        job_names = [x.name for x in history]
         assert 'project-merge' in job_names
         assert 'project-test1' in job_names
         assert 'project-test2' in job_names
-        assert jobs[0].result == 'SUCCESS'
-        assert jobs[1].result == 'SUCCESS'
-        assert jobs[2].result == 'SUCCESS'
+        assert history[0].result == 'SUCCESS'
+        assert history[1].result == 'SUCCESS'
+        assert history[2].result == 'SUCCESS'
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         self.assertEmptyQueues()
 
     def test_single_nonexistent_post_job(self):
         "Test launching a single post job that doesn't exist"
-        self.fake_jenkins.nonexistent_jobs.append('project-post')
-        self.jenkins.launch_retry_timeout = 0.1
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
         e = {
             "type": "ref-updated",
@@ -1764,18 +1895,23 @@
                 "project": "org/project",
             }
         }
+        # Set to the state immediately after a restart
+        self.resetGearmanServer()
+        self.launcher.negative_function_cache_ttl = 0
+
         self.fake_gerrit.addEvent(e)
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.job_history
-        assert len(jobs) == 0
+        assert len(history) == 0
         self.assertEmptyQueues()
 
     def test_new_patchset_dequeues_old(self):
         "Test that a new patchset causes the old to be dequeued"
         # D -> C (depends on B) -> B (depends on A) -> A -> M
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
         M.setMerged()
 
@@ -1802,18 +1938,10 @@
         self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
         self.waitUntilSettled()
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         assert B.data['status'] == 'NEW'
@@ -1822,14 +1950,16 @@
         assert C.reported == 2
         assert D.data['status'] == 'MERGED'
         assert D.reported == 2
-        assert len(finished_jobs) == 9  # 3 each for A, B, D.
+        assert len(history) == 9  # 3 each for A, B, D.
         self.assertEmptyQueues()
 
     def test_new_patchset_dequeues_old_on_head(self):
         "Test that a new patchset causes the old to be dequeued (at head)"
         # D -> C (depends on B) -> B (depends on A) -> A -> M
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
         M.setMerged()
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1855,18 +1985,10 @@
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
         self.waitUntilSettled()
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
         assert A.data['status'] == 'NEW'
         assert A.reported == 2
         assert B.data['status'] == 'NEW'
@@ -1875,13 +1997,15 @@
         assert C.reported == 2
         assert D.data['status'] == 'MERGED'
         assert D.reported == 2
-        assert len(finished_jobs) == 7
+        assert len(history) == 7
         self.assertEmptyQueues()
 
     def test_new_patchset_dequeues_old_without_dependents(self):
         "Test that a new patchset causes only the old to be dequeued"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1898,31 +2022,25 @@
         self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
         self.waitUntilSettled()
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
         assert A.data['status'] == 'MERGED'
         assert A.reported == 2
         assert B.data['status'] == 'NEW'
         assert B.reported == 2
         assert C.data['status'] == 'MERGED'
         assert C.reported == 2
-        assert len(finished_jobs) == 9
+        assert len(history) == 9
         self.assertEmptyQueues()
 
     def test_new_patchset_dequeues_old_independent_queue(self):
         "Test that a new patchset causes the old to be dequeued (independent)"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
         C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@@ -1935,32 +2053,26 @@
         self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
         self.waitUntilSettled()
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        for x in jobs:
-            print x
-        for x in finished_jobs:
-            print x
-
         assert A.data['status'] == 'NEW'
         assert A.reported == 1
         assert B.data['status'] == 'NEW'
         assert B.reported == 1
         assert C.data['status'] == 'NEW'
         assert C.reported == 1
-        assert len(finished_jobs) == 10
-        assert self.countJobResults(finished_jobs, 'ABORTED') == 1
+        assert len(history) == 10
+        assert self.countJobResults(history, 'ABORTED') == 1
         self.assertEmptyQueues()
 
     def test_zuul_refs(self):
         "Test that zuul refs exist and have the right changes"
-        self.fake_jenkins.hold_jobs_in_build = True
+        builds = self.worker.running_builds
+        history = self.worker.build_history
 
+        self.worker.hold_jobs_in_build = True
         M1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'M1')
         M1.setMerged()
         M2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'M2')
@@ -1980,20 +2092,17 @@
         self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
 
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
-        self.fake_jenkins.fakeRelease('.*-merge')
+        self.worker.release('.*-merge')
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
         a_zref = b_zref = c_zref = d_zref = None
-        for x in jobs:
+        for x in builds:
             if x.parameters['ZUUL_CHANGE'] == '3':
                 a_zref = x.parameters['ZUUL_REF']
             if x.parameters['ZUUL_CHANGE'] == '4':
@@ -2035,8 +2144,8 @@
         assert ref_has_change(d_zref, C)
         assert ref_has_change(d_zref, D)
 
-        self.fake_jenkins.hold_jobs_in_build = False
-        self.fake_jenkins.fakeRelease()
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
         self.waitUntilSettled()
 
         assert A.data['status'] == 'MERGED'
@@ -2062,6 +2171,9 @@
 
     def test_file_jobs(self):
         "Test that file jobs run only when appropriate"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addPatchset(['pip-requires'])
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@@ -2071,10 +2183,7 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        jobs = self.fake_jenkins.all_jobs
-        finished_jobs = self.fake_jenkins.job_history
-
-        testfile_jobs = [x for x in finished_jobs
+        testfile_jobs = [x for x in history
                          if x.name == 'project-testfile']
 
         assert len(testfile_jobs) == 1
@@ -2089,3 +2198,39 @@
         "Test that we can test the config"
         sched = zuul.scheduler.Scheduler()
         sched.testConfig(CONFIG.get('zuul', 'layout_config'))
+
+    def test_build_description(self):
+        "Test that build descriptions update"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        self.worker.registerFunction('set_description:' +
+                                     self.worker.worker_id)
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        desc = history[0].description
+        self.log.debug("Description: %s" % desc)
+        assert re.search("Branch.*master", desc)
+        assert re.search("Pipeline.*gate", desc)
+        assert re.search("project-merge.*SUCCESS", desc)
+        assert re.search("project-test1.*SUCCESS", desc)
+        assert re.search("project-test2.*SUCCESS", desc)
+        assert re.search("Reported result.*SUCCESS", desc)
+
+    def test_node_label(self):
+        "Test that a job runs on a specific node label"
+        builds = self.worker.running_builds
+        history = self.worker.build_history
+
+        self.worker.registerFunction('build:node-project-test1:debian')
+
+        A = self.fake_gerrit.addFakeChange('org/node-project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        assert history[0].node is None
+        assert history[1].node == 'debian'
+        assert history[2].node is None