input: DBus semaphore listener

Adds an input source responsible for listening to changes that happens
on specified dbus object which provides a semaphore-status health.
We will use this for monitoring cla-sysrepod health status.

Change-Id: Ib1b82efa181d445ad779f2b181146c9d5ab7722a
Depends-on: https://cesnet-gerrit-public/c/CzechLight/dependencies/+/2778
Depends-on: https://gerrit.cesnet.cz/c/CzechLight/dependencies/+/2778
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 74f2ca2..3a3a812 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -23,6 +23,7 @@
 
 find_package(Boost REQUIRED)
 find_package(spdlog REQUIRED)
+find_package(sdbus-c++ REQUIRED)
 
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/src/)
 
@@ -35,11 +36,12 @@
 
 add_library(velia-state-manager STATIC
     src/inputs/AbstractInput.cpp
+    src/inputs/DbusSemaphoreInput.cpp
     src/manager/AbstractManager.cpp
     src/manager/StateManager.cpp
     src/State.cpp
     )
-target_link_libraries(velia-state-manager PUBLIC velia-utils Boost::boost)
+target_link_libraries(velia-state-manager PUBLIC velia-utils Boost::boost SDBusCpp::sdbus-c++)
 
 add_executable(veliad
     src/main.cpp
@@ -57,11 +59,17 @@
     add_library(DoctestIntegration STATIC
             tests/doctest_integration.cpp
             tests/trompeloeil_doctest.h
+            tests/wait-a-bit-longer.cpp
             )
     target_include_directories(DoctestIntegration PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tests/ ${CMAKE_CURRENT_SOURCE_DIR}/src/)
     target_link_libraries(DoctestIntegration doctest::doctest trompeloeil spdlog::spdlog)
     target_compile_definitions(DoctestIntegration PUBLIC DOCTEST_CONFIG_SUPER_FAST_ASSERTS)
 
+    add_library(DbusTesting STATIC
+        tests/dbus-helpers/dbus_semaphore_server.cpp
+    )
+    target_link_libraries(DbusTesting PUBLIC SDBusCpp::sdbus-c++)
+
     function(velia_test name)
         add_executable(test_${name} tests/${name}.cpp)
         target_link_libraries(test_${name} DoctestIntegration ${ARGN})
@@ -73,6 +81,7 @@
 
     endfunction()
     velia_test(state-manager velia-state-manager)
+    velia_test(input-semaphore velia-state-manager DbusTesting)
 endif()
 
 if(WITH_DOCS)
diff --git a/src/inputs/DbusSemaphoreInput.cpp b/src/inputs/DbusSemaphoreInput.cpp
new file mode 100644
index 0000000..efceb74
--- /dev/null
+++ b/src/inputs/DbusSemaphoreInput.cpp
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+#include "DbusSemaphoreInput.h"
+#include "utils/log.h"
+
+namespace {
+
+velia::State stateFromString(const std::string& str)
+{
+    if (str == "WARNING")
+        return velia::State::WARNING;
+    if (str == "ERROR")
+        return velia::State::ERROR;
+    if (str == "OK")
+        return velia::State::OK;
+
+    throw std::invalid_argument("DbusSemaphoreInput received invalid state");
+}
+}
+
+namespace velia {
+
+DbusSemaphoreInput::DbusSemaphoreInput(std::shared_ptr<AbstractManager> manager, sdbus::IConnection& connection, const std::string& bus, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface)
+    : AbstractInput(std::move(manager))
+    , m_dbusObjectProxy(sdbus::createProxy(connection, bus, objectPath))
+    , m_propertyName(propertyName)
+    , m_propertyInterface(propertyInterface)
+    , m_log(spdlog::get("input"))
+{
+    m_dbusObjectProxy->uponSignal("PropertiesChanged").onInterface("org.freedesktop.DBus.Properties").call([&](const std::string& iface, const std::map<std::string, sdbus::Variant>& changed, [[maybe_unused]] const std::vector<std::string>& invalidated) {
+        if (iface != m_propertyInterface) {
+            return;
+        }
+
+        if (auto it = changed.find(m_propertyName); it != changed.end()) {
+            std::string newState = it->second.get<std::string>();
+            m_log->trace("Property changed to {}", newState);
+            updateState(stateFromString(newState));
+        }
+    });
+    m_dbusObjectProxy->finishRegistration();
+    m_log->trace("Watching for property changes of {}, object {}, property {}.{}", bus, objectPath, propertyInterface, propertyName);
+
+    // we might update the state twice here (once from the callback, once from here).
+    // But better than querying the current state before the registration; we might miss a state change that could happen between querying and callback registration
+    std::string currentState = m_dbusObjectProxy->getProperty(m_propertyName).onInterface(propertyInterface).get<std::string>();
+    m_log->trace("Property initialized to {}", currentState);
+    updateState(stateFromString(currentState));
+}
+
+DbusSemaphoreInput::~DbusSemaphoreInput() = default;
+
+}
\ No newline at end of file
diff --git a/src/inputs/DbusSemaphoreInput.h b/src/inputs/DbusSemaphoreInput.h
new file mode 100644
index 0000000..97f1de8
--- /dev/null
+++ b/src/inputs/DbusSemaphoreInput.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+#pragma once
+
+#include <memory>
+#include <sdbus-c++/sdbus-c++.h>
+#include "inputs/AbstractInput.h"
+#include "manager/StateManager.h"
+
+namespace velia {
+
+class DbusSemaphoreInput : public AbstractInput {
+public:
+    DbusSemaphoreInput(std::shared_ptr<AbstractManager> mx, sdbus::IConnection& connection, const std::string& bus, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface);
+    ~DbusSemaphoreInput() override;
+
+private:
+    std::shared_ptr<sdbus::IProxy> m_dbusObjectProxy;
+    std::string m_propertyName;
+    std::string m_propertyInterface;
+    velia::Log m_log;
+};
+}
\ No newline at end of file
diff --git a/tests/dbus-helpers/dbus_semaphore_server.cpp b/tests/dbus-helpers/dbus_semaphore_server.cpp
new file mode 100644
index 0000000..35eb45f
--- /dev/null
+++ b/tests/dbus-helpers/dbus_semaphore_server.cpp
@@ -0,0 +1,38 @@
+#include <sdbus-c++/sdbus-c++.h>
+#include <thread>
+#include "dbus_semaphore_server.h"
+#include "utils/log-init.h"
+
+/* Ask for current value:
+ * dbus-send --print-reply --system --dest=<bus> /cz/cesnet/led org.freedesktop.DBus.Properties.Get string:cz.cesnet.Led string:semaphore
+ */
+
+DbusSemaphoreServer::DbusSemaphoreServer(sdbus::IConnection& connection, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface, const std::string& state)
+    : m_object(sdbus::createObject(connection, objectPath))
+    , m_propertyName(propertyName)
+    , m_propertyInterface(propertyInterface)
+    , m_semaphoreState(state)
+{
+    // Register D-Bus methods and signals on the object, and exports the object.
+    m_object->registerProperty(m_propertyName).onInterface(m_propertyInterface).withGetter([&]() {
+        std::lock_guard<std::mutex> lock(m_semaphoreStateMtx);
+        return m_semaphoreState;
+    });
+    m_object->finishRegistration();
+}
+
+void DbusSemaphoreServer::runStateChanges(const std::vector<std::pair<std::string, std::chrono::milliseconds>>& sequence)
+{
+    std::thread serverThr([&]() {
+        for (const auto& [state, sleepTime] : sequence) {
+            {
+                std::lock_guard<std::mutex> lock(m_semaphoreStateMtx);
+                m_semaphoreState = state;
+            }
+            m_object->emitPropertiesChangedSignal(m_propertyInterface, {m_propertyName});
+            std::this_thread::sleep_for(sleepTime);
+        }
+    });
+
+    serverThr.join();
+}
\ No newline at end of file
diff --git a/tests/dbus-helpers/dbus_semaphore_server.h b/tests/dbus-helpers/dbus_semaphore_server.h
new file mode 100644
index 0000000..1a57c1b
--- /dev/null
+++ b/tests/dbus-helpers/dbus_semaphore_server.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <sdbus-c++/sdbus-c++.h>
+#include <string>
+
+class DbusSemaphoreServer {
+public:
+    DbusSemaphoreServer(sdbus::IConnection& connection, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface, const std::string& state);
+    void runStateChanges(const std::vector<std::pair<std::string, std::chrono::milliseconds>>& sequence);
+
+private:
+    std::unique_ptr<sdbus::IObject> m_object;
+    std::string m_propertyName;
+    std::string m_propertyInterface;
+    std::string m_semaphoreState;
+    std::mutex m_semaphoreStateMtx;
+};
\ No newline at end of file
diff --git a/tests/fake.h b/tests/fake.h
index 42bbb41..1e347c2 100644
--- a/tests/fake.h
+++ b/tests/fake.h
@@ -7,6 +7,7 @@
 
 #include "trompeloeil_doctest.h"
 #include "inputs/AbstractInput.h"
+#include "manager/AbstractManager.h"
 
 class ManuallyInvokableInput : public velia::AbstractInput {
 public:
@@ -23,5 +24,11 @@
     MAKE_MOCK1(update, void(velia::State));
 };
 
-
 #define REQUIRE_STATE_OUTPUT(STATE) REQUIRE_CALL(*o1, update(velia::State::STATE)).IN_SEQUENCE(seq1)
+
+class FakeManager : public velia::AbstractManager {
+public:
+    MAKE_MOCK2(updateState, void(void*, velia::State), override);
+    MAKE_MOCK2(registerInput, void(void*, velia::State), override);
+    MAKE_MOCK1(unregisterInput, void(void*), override);
+};
diff --git a/tests/input-semaphore.cpp b/tests/input-semaphore.cpp
new file mode 100644
index 0000000..1735320
--- /dev/null
+++ b/tests/input-semaphore.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+
+#include "trompeloeil_doctest.h"
+#include <chrono>
+#include <functional>
+#include <future>
+#include "dbus-helpers/dbus_semaphore_server.h"
+#include "fake.h"
+#include "inputs/DbusSemaphoreInput.h"
+#include "test_log_setup.h"
+
+TEST_CASE("Test semaphore input")
+{
+    using namespace std::literals::chrono_literals;
+
+    TEST_INIT_LOGS;
+    trompeloeil::sequence seq1;
+
+    const std::string dbusObj = "/cz/cesnet/led";
+    const std::string dbusProp = "Semaphore";
+    const std::string dbusPropIface = "cz.cesnet.Led";
+
+    std::vector<std::pair<std::string, std::chrono::milliseconds>> stateSequence;
+    SECTION("Sequence with pauses")
+    {
+        stateSequence = {
+            {"OK", 505ms},
+            {"OK", 311ms},
+            {"WARNING", 143ms},
+            {"ERROR", 87ms},
+            {"WARNING", 333ms},
+            {"OK", 1ms},
+        };
+    }
+    SECTION("Sequence without pauses")
+    {
+        stateSequence = {
+            {"OK", 0ms},
+            {"OK", 0ms},
+            {"WARNING", 0ms},
+            {"ERROR", 0ms},
+            {"WARNING", 0ms},
+            {"OK", 0ms},
+        };
+    }
+
+    // setup separate connections for both client and server. Can be done using one connection only but this way it is more generic
+    auto clientConnection = sdbus::createSessionBusConnection();
+    auto serverConnection = sdbus::createSessionBusConnection();
+
+    // enter client and servers event loops
+    clientConnection->enterEventLoopAsync();
+    serverConnection->enterEventLoopAsync();
+
+    auto mx = std::make_shared<FakeManager>();
+    auto server = DbusSemaphoreServer(*serverConnection, dbusObj, dbusProp, dbusPropIface, "ERROR"); // let the first state be ERROR, because why not
+
+    // i1 gets constructed which means:
+    //  - a registration is performed, along with an updateState call (State::OK)
+    //  - i1's constructor queries the current state and performs updateState
+    REQUIRE_CALL(*mx, registerInput(ANY(void*), velia::State::OK)).LR_SIDE_EFFECT(mx->updateState(_1, _2)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(ANY(void*), velia::State::OK)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(ANY(void*), velia::State::ERROR)).IN_SEQUENCE(seq1);
+    auto i1 = std::make_shared<velia::DbusSemaphoreInput>(mx, *clientConnection, serverConnection->getUniqueName(), dbusObj, dbusProp, dbusPropIface);
+    // i1 now listens for dbus events, we can start the semaphore server
+
+    // mux should get notified for every semaphore state change
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::WARNING)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::ERROR)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::WARNING)).IN_SEQUENCE(seq1);
+    REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+
+
+    auto a1 = std::async(std::launch::async, [&]() { server.runStateChanges(stateSequence); });
+
+    waitForCompletionAndBitMore(seq1); // do not leave event loops until all dbus messages are received
+    serverConnection->leaveEventLoop();
+    clientConnection->leaveEventLoop();
+
+    REQUIRE_CALL(*mx, unregisterInput(i1.get())).IN_SEQUENCE(seq1);
+    i1.reset();
+}
diff --git a/tests/trompeloeil_doctest.h b/tests/trompeloeil_doctest.h
index c9c8887..8fcfb5a 100644
--- a/tests/trompeloeil_doctest.h
+++ b/tests/trompeloeil_doctest.h
@@ -17,4 +17,6 @@
 #define REQUIRE_THROWS_WITH(expr, e) DOCTEST_REQUIRE_THROWS_WITH(static_cast<void>(expr), e)
 #define REQUIRE_NOTHROW(expr) DOCTEST_REQUIRE_NOTHROW(static_cast<void>(expr))
 
-extern template struct trompeloeil::reporter<trompeloeil::specialized>;
\ No newline at end of file
+extern template struct trompeloeil::reporter<trompeloeil::specialized>;
+
+void waitForCompletionAndBitMore(const trompeloeil::sequence& seq);
diff --git a/tests/wait-a-bit-longer.cpp b/tests/wait-a-bit-longer.cpp
new file mode 100644
index 0000000..fce354d
--- /dev/null
+++ b/tests/wait-a-bit-longer.cpp
@@ -0,0 +1,39 @@
+#include <chrono>
+#include <doctest/doctest.h>
+#include <thread>
+#include <trompeloeil.hpp>
+
+/** @short Wait until a given sequence of expectation is matched, and then a bit more to ensure that there's silence afterwards */
+void waitForCompletionAndBitMore(const trompeloeil::sequence& seq)
+{
+    using namespace std::literals;
+    using clock = std::chrono::steady_clock;
+
+    // We're busy-waiting a bit
+    const auto waitingStep = 30ms;
+    // Timeout after this much
+    const auto completionTimeout = 5000ms;
+    // When checking for silence afterwards, wait at least this long.
+    // We'll also wait as long as it originally took to process everything.
+    const auto minExtraWait = 100ms;
+
+    auto start = clock::now();
+    while (true) {
+        {
+            auto lock = trompeloeil::get_lock();
+            if (seq.is_completed()) {
+                break;
+            }
+        }
+        std::this_thread::sleep_for(waitingStep);
+        if (clock::now() - start > completionTimeout) {
+            break;
+        }
+    }
+    {
+        auto lock = trompeloeil::get_lock();
+        REQUIRE(seq.is_completed());
+    }
+    auto duration = std::chrono::duration<double>(clock::now() - start);
+    std::this_thread::sleep_for(std::max(duration, decltype(duration)(minExtraWait)));
+}