hardware: push ietf-hardware data into sysrepo periodically

We want to raise alarms in future, so prepare for that by reading the
sensors every now and then. It also makes sense to  switch from
providing ops callbacks for ietf-hardware data to pushing ops data
periodically.

Change-Id: If602ab64be674e766ef5d2b88e0a8f68b9cdb6d5
diff --git a/src/ietf-hardware/sysrepo/Sysrepo.cpp b/src/ietf-hardware/sysrepo/Sysrepo.cpp
index aa434bf..444a714 100644
--- a/src/ietf-hardware/sysrepo/Sysrepo.cpp
+++ b/src/ietf-hardware/sysrepo/Sysrepo.cpp
@@ -1,40 +1,82 @@
 /*
- * Copyright (C) 2020 CESNET, https://photonics.cesnet.cz/
+ * Copyright (C) 2020-2023 CESNET, https://photonics.cesnet.cz/
  *
  * Written by Tomáš Pecka <tomas.pecka@fit.cvut.cz>
  *
  */
 
+#include <regex>
+#include <sysrepo-cpp/Connection.hpp>
 #include "Sysrepo.h"
 #include "utils/log.h"
 #include "utils/sysrepo.h"
 
-namespace velia::ietf_hardware::sysrepo {
-
 namespace {
 
-const auto IETF_HARDWARE_MODULE_NAME = "ietf-hardware"s;
-const auto IETF_HARDWARE_MODULE_PREFIX = "/"s + IETF_HARDWARE_MODULE_NAME + ":hardware/*"s;
+/** @brief Extracts component path prefix from an XPath under /ietf-hardware/component node
+ *
+ * Example input:  /ietf-hardware:hardware/component[name='ne:psu:child']/oper-state/disabled
+ * Example output: /ietf-hardware:hardware/component[name='ne:psu:child']
+ */
+std::string extractComponentPrefix(const std::string& componentXPath)
+{
+    static const std::regex regex(R"((/ietf-hardware:hardware/component\[name=('|").*?(\2)\]).*)");
+    std::smatch match;
 
+    if (std::regex_match(componentXPath, match, regex)) {
+        return match.str(1);
+    }
+
+    throw std::logic_error("Invalid xPath provided ('" + componentXPath + "')");
+}
 }
 
-/** @brief The constructor expects the HardwareState instance which will provide the actual hardware state data */
-Sysrepo::Sysrepo(::sysrepo::Session session, std::shared_ptr<IETFHardware> hwState)
-    : m_hwState(std::move(hwState))
-    , m_srSubscribe()
+namespace velia::ietf_hardware::sysrepo {
+
+/** @brief The constructor expects the HardwareState instance which will provide the actual hardware state data and the poll interval */
+Sysrepo::Sysrepo(::sysrepo::Session session, std::shared_ptr<IETFHardware> hwState, std::chrono::microseconds pollInterval)
+    : m_log(spdlog::get("hardware"))
+    , m_pollInterval(std::move(pollInterval))
+    , m_session(std::move(session))
+    , m_hwState(std::move(hwState))
+    , m_quit(false)
+    , m_pollThread([&]() {
+        auto conn = m_session.getConnection();
+
+        DataTree prevValues;
+
+        while (!m_quit) {
+            m_log->trace("IetfHardware poll");
+
+            auto hwStateValues = m_hwState->process();
+            std::set<std::string> deletedComponents;
+
+            /* Some data readers can stop returning data in some cases (e.g. ejected PSU).
+             * Prune tree components that were removed before updating to avoid having not current data from previous invocations.
+             */
+            for (const auto& [k, v] : prevValues) {
+                if (!hwStateValues.contains(k)) {
+                    deletedComponents.emplace(extractComponentPrefix(k));
+                }
+            }
+
+            for (const auto& component : deletedComponents) {
+                conn.discardOperationalChanges(component);
+            }
+
+            utils::valuesPush(hwStateValues, {}, m_session, ::sysrepo::Datastore::Operational);
+
+            prevValues = std::move(hwStateValues);
+            std::this_thread::sleep_for(m_pollInterval);
+        }
+    })
 {
-    ::sysrepo::OperGetCb cb = [this](::sysrepo::Session session, auto, auto, auto, auto, auto, auto& parent) {
-        auto hwStateValues = m_hwState->process();
-        utils::valuesToYang(hwStateValues, {}, session, parent);
+}
 
-        spdlog::get("hardware")->trace("Pushing to sysrepo (JSON): {}", *parent->printStr(::libyang::DataFormat::JSON, libyang::PrintFlags::WithSiblings));
-        return ::sysrepo::ErrorCode::Ok;
-    };
-
-    m_srSubscribe = session.onOperGet(
-        IETF_HARDWARE_MODULE_NAME,
-        cb,
-        IETF_HARDWARE_MODULE_PREFIX,
-        ::sysrepo::SubscribeOptions::Passive | ::sysrepo::SubscribeOptions::OperMerge);
+Sysrepo::~Sysrepo()
+{
+    m_log->trace("Requesting poll thread stop");
+    m_quit = true;
+    m_pollThread.join();
 }
 }