input: DBus semaphore listener
Adds an input source responsible for listening to changes that happens
on specified dbus object which provides a semaphore-status health.
We will use this for monitoring cla-sysrepod health status.
Change-Id: Ib1b82efa181d445ad779f2b181146c9d5ab7722a
Depends-on: https://cesnet-gerrit-public/c/CzechLight/dependencies/+/2778
Depends-on: https://gerrit.cesnet.cz/c/CzechLight/dependencies/+/2778
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 74f2ca2..3a3a812 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -23,6 +23,7 @@
find_package(Boost REQUIRED)
find_package(spdlog REQUIRED)
+find_package(sdbus-c++ REQUIRED)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/src/)
@@ -35,11 +36,12 @@
add_library(velia-state-manager STATIC
src/inputs/AbstractInput.cpp
+ src/inputs/DbusSemaphoreInput.cpp
src/manager/AbstractManager.cpp
src/manager/StateManager.cpp
src/State.cpp
)
-target_link_libraries(velia-state-manager PUBLIC velia-utils Boost::boost)
+target_link_libraries(velia-state-manager PUBLIC velia-utils Boost::boost SDBusCpp::sdbus-c++)
add_executable(veliad
src/main.cpp
@@ -57,11 +59,17 @@
add_library(DoctestIntegration STATIC
tests/doctest_integration.cpp
tests/trompeloeil_doctest.h
+ tests/wait-a-bit-longer.cpp
)
target_include_directories(DoctestIntegration PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tests/ ${CMAKE_CURRENT_SOURCE_DIR}/src/)
target_link_libraries(DoctestIntegration doctest::doctest trompeloeil spdlog::spdlog)
target_compile_definitions(DoctestIntegration PUBLIC DOCTEST_CONFIG_SUPER_FAST_ASSERTS)
+ add_library(DbusTesting STATIC
+ tests/dbus-helpers/dbus_semaphore_server.cpp
+ )
+ target_link_libraries(DbusTesting PUBLIC SDBusCpp::sdbus-c++)
+
function(velia_test name)
add_executable(test_${name} tests/${name}.cpp)
target_link_libraries(test_${name} DoctestIntegration ${ARGN})
@@ -73,6 +81,7 @@
endfunction()
velia_test(state-manager velia-state-manager)
+ velia_test(input-semaphore velia-state-manager DbusTesting)
endif()
if(WITH_DOCS)
diff --git a/src/inputs/DbusSemaphoreInput.cpp b/src/inputs/DbusSemaphoreInput.cpp
new file mode 100644
index 0000000..efceb74
--- /dev/null
+++ b/src/inputs/DbusSemaphoreInput.cpp
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+#include "DbusSemaphoreInput.h"
+#include "utils/log.h"
+
+namespace {
+
+velia::State stateFromString(const std::string& str)
+{
+ if (str == "WARNING")
+ return velia::State::WARNING;
+ if (str == "ERROR")
+ return velia::State::ERROR;
+ if (str == "OK")
+ return velia::State::OK;
+
+ throw std::invalid_argument("DbusSemaphoreInput received invalid state");
+}
+}
+
+namespace velia {
+
+DbusSemaphoreInput::DbusSemaphoreInput(std::shared_ptr<AbstractManager> manager, sdbus::IConnection& connection, const std::string& bus, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface)
+ : AbstractInput(std::move(manager))
+ , m_dbusObjectProxy(sdbus::createProxy(connection, bus, objectPath))
+ , m_propertyName(propertyName)
+ , m_propertyInterface(propertyInterface)
+ , m_log(spdlog::get("input"))
+{
+ m_dbusObjectProxy->uponSignal("PropertiesChanged").onInterface("org.freedesktop.DBus.Properties").call([&](const std::string& iface, const std::map<std::string, sdbus::Variant>& changed, [[maybe_unused]] const std::vector<std::string>& invalidated) {
+ if (iface != m_propertyInterface) {
+ return;
+ }
+
+ if (auto it = changed.find(m_propertyName); it != changed.end()) {
+ std::string newState = it->second.get<std::string>();
+ m_log->trace("Property changed to {}", newState);
+ updateState(stateFromString(newState));
+ }
+ });
+ m_dbusObjectProxy->finishRegistration();
+ m_log->trace("Watching for property changes of {}, object {}, property {}.{}", bus, objectPath, propertyInterface, propertyName);
+
+ // we might update the state twice here (once from the callback, once from here).
+ // But better than querying the current state before the registration; we might miss a state change that could happen between querying and callback registration
+ std::string currentState = m_dbusObjectProxy->getProperty(m_propertyName).onInterface(propertyInterface).get<std::string>();
+ m_log->trace("Property initialized to {}", currentState);
+ updateState(stateFromString(currentState));
+}
+
+DbusSemaphoreInput::~DbusSemaphoreInput() = default;
+
+}
\ No newline at end of file
diff --git a/src/inputs/DbusSemaphoreInput.h b/src/inputs/DbusSemaphoreInput.h
new file mode 100644
index 0000000..97f1de8
--- /dev/null
+++ b/src/inputs/DbusSemaphoreInput.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+#pragma once
+
+#include <memory>
+#include <sdbus-c++/sdbus-c++.h>
+#include "inputs/AbstractInput.h"
+#include "manager/StateManager.h"
+
+namespace velia {
+
+class DbusSemaphoreInput : public AbstractInput {
+public:
+ DbusSemaphoreInput(std::shared_ptr<AbstractManager> mx, sdbus::IConnection& connection, const std::string& bus, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface);
+ ~DbusSemaphoreInput() override;
+
+private:
+ std::shared_ptr<sdbus::IProxy> m_dbusObjectProxy;
+ std::string m_propertyName;
+ std::string m_propertyInterface;
+ velia::Log m_log;
+};
+}
\ No newline at end of file
diff --git a/tests/dbus-helpers/dbus_semaphore_server.cpp b/tests/dbus-helpers/dbus_semaphore_server.cpp
new file mode 100644
index 0000000..35eb45f
--- /dev/null
+++ b/tests/dbus-helpers/dbus_semaphore_server.cpp
@@ -0,0 +1,38 @@
+#include <sdbus-c++/sdbus-c++.h>
+#include <thread>
+#include "dbus_semaphore_server.h"
+#include "utils/log-init.h"
+
+/* Ask for current value:
+ * dbus-send --print-reply --system --dest=<bus> /cz/cesnet/led org.freedesktop.DBus.Properties.Get string:cz.cesnet.Led string:semaphore
+ */
+
+DbusSemaphoreServer::DbusSemaphoreServer(sdbus::IConnection& connection, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface, const std::string& state)
+ : m_object(sdbus::createObject(connection, objectPath))
+ , m_propertyName(propertyName)
+ , m_propertyInterface(propertyInterface)
+ , m_semaphoreState(state)
+{
+ // Register D-Bus methods and signals on the object, and exports the object.
+ m_object->registerProperty(m_propertyName).onInterface(m_propertyInterface).withGetter([&]() {
+ std::lock_guard<std::mutex> lock(m_semaphoreStateMtx);
+ return m_semaphoreState;
+ });
+ m_object->finishRegistration();
+}
+
+void DbusSemaphoreServer::runStateChanges(const std::vector<std::pair<std::string, std::chrono::milliseconds>>& sequence)
+{
+ std::thread serverThr([&]() {
+ for (const auto& [state, sleepTime] : sequence) {
+ {
+ std::lock_guard<std::mutex> lock(m_semaphoreStateMtx);
+ m_semaphoreState = state;
+ }
+ m_object->emitPropertiesChangedSignal(m_propertyInterface, {m_propertyName});
+ std::this_thread::sleep_for(sleepTime);
+ }
+ });
+
+ serverThr.join();
+}
\ No newline at end of file
diff --git a/tests/dbus-helpers/dbus_semaphore_server.h b/tests/dbus-helpers/dbus_semaphore_server.h
new file mode 100644
index 0000000..1a57c1b
--- /dev/null
+++ b/tests/dbus-helpers/dbus_semaphore_server.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <sdbus-c++/sdbus-c++.h>
+#include <string>
+
+class DbusSemaphoreServer {
+public:
+ DbusSemaphoreServer(sdbus::IConnection& connection, const std::string& objectPath, const std::string& propertyName, const std::string& propertyInterface, const std::string& state);
+ void runStateChanges(const std::vector<std::pair<std::string, std::chrono::milliseconds>>& sequence);
+
+private:
+ std::unique_ptr<sdbus::IObject> m_object;
+ std::string m_propertyName;
+ std::string m_propertyInterface;
+ std::string m_semaphoreState;
+ std::mutex m_semaphoreStateMtx;
+};
\ No newline at end of file
diff --git a/tests/fake.h b/tests/fake.h
index 42bbb41..1e347c2 100644
--- a/tests/fake.h
+++ b/tests/fake.h
@@ -7,6 +7,7 @@
#include "trompeloeil_doctest.h"
#include "inputs/AbstractInput.h"
+#include "manager/AbstractManager.h"
class ManuallyInvokableInput : public velia::AbstractInput {
public:
@@ -23,5 +24,11 @@
MAKE_MOCK1(update, void(velia::State));
};
-
#define REQUIRE_STATE_OUTPUT(STATE) REQUIRE_CALL(*o1, update(velia::State::STATE)).IN_SEQUENCE(seq1)
+
+class FakeManager : public velia::AbstractManager {
+public:
+ MAKE_MOCK2(updateState, void(void*, velia::State), override);
+ MAKE_MOCK2(registerInput, void(void*, velia::State), override);
+ MAKE_MOCK1(unregisterInput, void(void*), override);
+};
diff --git a/tests/input-semaphore.cpp b/tests/input-semaphore.cpp
new file mode 100644
index 0000000..1735320
--- /dev/null
+++ b/tests/input-semaphore.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ *
+ * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
+ *
+*/
+
+#include "trompeloeil_doctest.h"
+#include <chrono>
+#include <functional>
+#include <future>
+#include "dbus-helpers/dbus_semaphore_server.h"
+#include "fake.h"
+#include "inputs/DbusSemaphoreInput.h"
+#include "test_log_setup.h"
+
+TEST_CASE("Test semaphore input")
+{
+ using namespace std::literals::chrono_literals;
+
+ TEST_INIT_LOGS;
+ trompeloeil::sequence seq1;
+
+ const std::string dbusObj = "/cz/cesnet/led";
+ const std::string dbusProp = "Semaphore";
+ const std::string dbusPropIface = "cz.cesnet.Led";
+
+ std::vector<std::pair<std::string, std::chrono::milliseconds>> stateSequence;
+ SECTION("Sequence with pauses")
+ {
+ stateSequence = {
+ {"OK", 505ms},
+ {"OK", 311ms},
+ {"WARNING", 143ms},
+ {"ERROR", 87ms},
+ {"WARNING", 333ms},
+ {"OK", 1ms},
+ };
+ }
+ SECTION("Sequence without pauses")
+ {
+ stateSequence = {
+ {"OK", 0ms},
+ {"OK", 0ms},
+ {"WARNING", 0ms},
+ {"ERROR", 0ms},
+ {"WARNING", 0ms},
+ {"OK", 0ms},
+ };
+ }
+
+ // setup separate connections for both client and server. Can be done using one connection only but this way it is more generic
+ auto clientConnection = sdbus::createSessionBusConnection();
+ auto serverConnection = sdbus::createSessionBusConnection();
+
+ // enter client and servers event loops
+ clientConnection->enterEventLoopAsync();
+ serverConnection->enterEventLoopAsync();
+
+ auto mx = std::make_shared<FakeManager>();
+ auto server = DbusSemaphoreServer(*serverConnection, dbusObj, dbusProp, dbusPropIface, "ERROR"); // let the first state be ERROR, because why not
+
+ // i1 gets constructed which means:
+ // - a registration is performed, along with an updateState call (State::OK)
+ // - i1's constructor queries the current state and performs updateState
+ REQUIRE_CALL(*mx, registerInput(ANY(void*), velia::State::OK)).LR_SIDE_EFFECT(mx->updateState(_1, _2)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(ANY(void*), velia::State::OK)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(ANY(void*), velia::State::ERROR)).IN_SEQUENCE(seq1);
+ auto i1 = std::make_shared<velia::DbusSemaphoreInput>(mx, *clientConnection, serverConnection->getUniqueName(), dbusObj, dbusProp, dbusPropIface);
+ // i1 now listens for dbus events, we can start the semaphore server
+
+ // mux should get notified for every semaphore state change
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::WARNING)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::ERROR)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::WARNING)).IN_SEQUENCE(seq1);
+ REQUIRE_CALL(*mx, updateState(i1.get(), velia::State::OK)).IN_SEQUENCE(seq1);
+
+
+ auto a1 = std::async(std::launch::async, [&]() { server.runStateChanges(stateSequence); });
+
+ waitForCompletionAndBitMore(seq1); // do not leave event loops until all dbus messages are received
+ serverConnection->leaveEventLoop();
+ clientConnection->leaveEventLoop();
+
+ REQUIRE_CALL(*mx, unregisterInput(i1.get())).IN_SEQUENCE(seq1);
+ i1.reset();
+}
diff --git a/tests/trompeloeil_doctest.h b/tests/trompeloeil_doctest.h
index c9c8887..8fcfb5a 100644
--- a/tests/trompeloeil_doctest.h
+++ b/tests/trompeloeil_doctest.h
@@ -17,4 +17,6 @@
#define REQUIRE_THROWS_WITH(expr, e) DOCTEST_REQUIRE_THROWS_WITH(static_cast<void>(expr), e)
#define REQUIRE_NOTHROW(expr) DOCTEST_REQUIRE_NOTHROW(static_cast<void>(expr))
-extern template struct trompeloeil::reporter<trompeloeil::specialized>;
\ No newline at end of file
+extern template struct trompeloeil::reporter<trompeloeil::specialized>;
+
+void waitForCompletionAndBitMore(const trompeloeil::sequence& seq);
diff --git a/tests/wait-a-bit-longer.cpp b/tests/wait-a-bit-longer.cpp
new file mode 100644
index 0000000..fce354d
--- /dev/null
+++ b/tests/wait-a-bit-longer.cpp
@@ -0,0 +1,39 @@
+#include <chrono>
+#include <doctest/doctest.h>
+#include <thread>
+#include <trompeloeil.hpp>
+
+/** @short Wait until a given sequence of expectation is matched, and then a bit more to ensure that there's silence afterwards */
+void waitForCompletionAndBitMore(const trompeloeil::sequence& seq)
+{
+ using namespace std::literals;
+ using clock = std::chrono::steady_clock;
+
+ // We're busy-waiting a bit
+ const auto waitingStep = 30ms;
+ // Timeout after this much
+ const auto completionTimeout = 5000ms;
+ // When checking for silence afterwards, wait at least this long.
+ // We'll also wait as long as it originally took to process everything.
+ const auto minExtraWait = 100ms;
+
+ auto start = clock::now();
+ while (true) {
+ {
+ auto lock = trompeloeil::get_lock();
+ if (seq.is_completed()) {
+ break;
+ }
+ }
+ std::this_thread::sleep_for(waitingStep);
+ if (clock::now() - start > completionTimeout) {
+ break;
+ }
+ }
+ {
+ auto lock = trompeloeil::get_lock();
+ REQUIRE(seq.is_completed());
+ }
+ auto duration = std::chrono::duration<double>(clock::now() - start);
+ std::this_thread::sleep_for(std::max(duration, decltype(duration)(minExtraWait)));
+}