Add FakeNodepool test fixture

Add a fake nodepool that immediately successfully fulfills all
requests, but actually uses the Nodepool ZooKeeper API.

Update the Zuul Nodepool facade to use the Nodepool ZooKeeper API.

Change-Id: If7859f0c6531439c3be38cc6ca6b699b3b5eade2
diff --git a/tests/base.py b/tests/base.py
index 9845484..b84bcf9 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -41,6 +41,7 @@
 import gear
 import fixtures
 import kazoo.client
+import kazoo.exceptions
 import statsd
 import testtools
 from git.exc import NoSuchPathError
@@ -64,6 +65,7 @@
 import zuul.trigger.gerrit
 import zuul.trigger.timer
 import zuul.trigger.zuultrigger
+import zuul.zk
 
 FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
                            'fixtures')
@@ -871,6 +873,63 @@
         return endpoint, ''
 
 
+class FakeNodepool(object):
+    REQUEST_ROOT = '/nodepool/requests'
+
+    log = logging.getLogger("zuul.test.FakeNodepool")
+
+    def __init__(self, host, port, chroot):
+        self.client = kazoo.client.KazooClient(
+            hosts='%s:%s%s' % (host, port, chroot))
+        self.client.start()
+        self._running = True
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+        self.thread.join()
+        self.client.stop()
+        self.client.close()
+
+    def run(self):
+        while self._running:
+            self._run()
+            time.sleep(0.1)
+
+    def _run(self):
+        for req in self.getNodeRequests():
+            self.fulfillRequest(req)
+
+    def getNodeRequests(self):
+        try:
+            reqids = self.client.get_children(self.REQUEST_ROOT)
+        except kazoo.exceptions.NoNodeError:
+            return []
+        reqs = []
+        for oid in sorted(reqids):
+            path = self.REQUEST_ROOT + '/' + oid
+            data, stat = self.client.get(path)
+            data = json.loads(data)
+            data['_oid'] = oid
+            reqs.append(data)
+        return reqs
+
+    def fulfillRequest(self, request):
+        if request['state'] == 'fulfilled':
+            return
+        request = request.copy()
+        request['state'] = 'fulfilled'
+        request['state_time'] = time.time()
+        oid = request['_oid']
+        del request['_oid']
+        path = self.REQUEST_ROOT + '/' + oid
+        data = json.dumps(request)
+        self.log.debug("Fulfilling node request: %s %s" % (oid, data))
+        self.client.set(path, data)
+
+
 class ChrootedKazooFixture(fixtures.Fixture):
     def __init__(self):
         super(ChrootedKazooFixture, self).__init__()
@@ -962,27 +1021,29 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
-            # NOTE(notmorgan): Extract logging overrides for specific libraries
-            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
-            # each. This is used to limit the output during test runs from
-            # libraries that zuul depends on such as gear.
-            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+        # NOTE(notmorgan): Extract logging overrides for specific libraries
+        # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+        # each. This is used to limit the output during test runs from
+        # libraries that zuul depends on such as gear.
+        log_defaults_from_env = os.environ.get(
+            'OS_LOG_DEFAULTS',
+            'git.cmd=INFO,kazoo.client=INFO')
 
-            if log_defaults_from_env:
-                for default in log_defaults_from_env.split(','):
-                    try:
-                        name, level_str = default.split('=', 1)
-                        level = getattr(logging, level_str, logging.DEBUG)
-                        self.useFixture(fixtures.FakeLogger(
-                            name=name,
-                            level=level,
-                            format='%(asctime)s %(name)-32s '
-                                   '%(levelname)-8s %(message)s'))
-                    except ValueError:
-                        # NOTE(notmorgan): Invalid format of the log default,
-                        # skip and don't try and apply a logger for the
-                        # specified module
-                        pass
+        if log_defaults_from_env:
+            for default in log_defaults_from_env.split(','):
+                try:
+                    name, level_str = default.split('=', 1)
+                    level = getattr(logging, level_str, logging.DEBUG)
+                    self.useFixture(fixtures.FakeLogger(
+                        name=name,
+                        level=level,
+                        format='%(asctime)s %(name)-32s '
+                               '%(levelname)-8s %(message)s'))
+                except ValueError:
+                    # NOTE(notmorgan): Invalid format of the log default,
+                    # skip and don't try and apply a logger for the
+                    # specified module
+                    pass
 
 
 class ZuulTestCase(BaseTestCase):
@@ -1140,10 +1201,17 @@
         self.merge_client = zuul.merger.client.MergeClient(
             self.config, self.sched)
         self.nodepool = zuul.nodepool.Nodepool(self.sched)
+        self.zk = zuul.zk.ZooKeeper()
+        self.zk.connect([self.zk_config])
+
+        self.fake_nodepool = FakeNodepool(self.zk_config.host,
+                                          self.zk_config.port,
+                                          self.zk_config.chroot)
 
         self.sched.setLauncher(self.launch_client)
         self.sched.setMerger(self.merge_client)
         self.sched.setNodepool(self.nodepool)
+        self.sched.setZooKeeper(self.zk)
 
         self.webapp = zuul.webapp.WebApp(
             self.sched, port=0, listen_address='127.0.0.1')
@@ -1244,9 +1312,10 @@
 
     def setupZK(self):
         self.zk_chroot_fixture = self.useFixture(ChrootedKazooFixture())
-        self.zookeeper_host = self.zk_chroot_fixture.zookeeper_host
-        self.zookeeper_port = self.zk_chroot_fixture.zookeeper_port
-        self.zookeeper_chroot = self.zk_chroot_fixture.zookeeper_chroot
+        self.zk_config = zuul.zk.ZooKeeperConnectionConfig(
+            self.zk_chroot_fixture.zookeeper_host,
+            self.zk_chroot_fixture.zookeeper_port,
+            self.zk_chroot_fixture.zookeeper_chroot)
 
     def copyDirToRepo(self, project, source_path):
         self.init_repo(project)
@@ -1295,6 +1364,8 @@
         self.rpc.stop()
         self.rpc.join()
         self.gearman_server.shutdown()
+        self.fake_nodepool.stop()
+        self.zk.disconnect()
         threads = threading.enumerate()
         if len(threads) > 1:
             self.log.error("More than one thread is running: %s" % threads)
@@ -1428,6 +1499,11 @@
                 return False
         return True
 
+    def areAllNodeRequestsComplete(self):
+        if self.sched.nodepool.requests:
+            return False
+        return True
+
     def eventQueuesEmpty(self):
         for queue in self.event_queues:
             yield queue.empty()
@@ -1461,7 +1537,8 @@
                 if (not self.merge_client.jobs and
                     all(self.eventQueuesEmpty()) and
                     self.haveAllBuildsReported() and
-                    self.areAllBuildsWaiting()):
+                    self.areAllBuildsWaiting() and
+                    self.areAllNodeRequestsComplete()):
                     self.sched.run_handler_lock.release()
                     self.launch_server.lock.release()
                     self.log.debug("...settled.")