client session FEATURE new function nc_recv_notif_dispatch
diff --git a/src/session.c b/src/session.c
index 23cf3cf..387cf5c 100644
--- a/src/session.c
+++ b/src/session.c
@@ -233,6 +233,7 @@
int r, i;
int connected; /* flag to indicate whether the transport socket is still connected */
int multisession = 0; /* flag for more NETCONF sessions on a single SSH session */
+ pthread_t tid;
struct nc_session *siter;
struct nc_msg_cont *contiter;
struct lyxml_elem *rpl, *child;
@@ -255,9 +256,13 @@
}
/* stop notifications loop if any */
- if (session->notif) {
- pthread_cancel(*session->notif);
- pthread_join(*session->notif, NULL);
+ if (session->ntf_tid) {
+ tid = *session->ntf_tid;
+ free((pthread_t *)session->ntf_tid);
+ session->ntf_tid = NULL;
+ /* the thread now knows it should quit */
+
+ pthread_join(tid, NULL);
}
if ((session->side == NC_CLIENT) && (session->status == NC_STATUS_RUNNING)) {
diff --git a/src/session_client.c b/src/session_client.c
index 2d8f772..e8ccd4d 100644
--- a/src/session_client.c
+++ b/src/session_client.c
@@ -38,6 +38,7 @@
#include "libnetconf.h"
#include "session_client.h"
+#include "messages_client.h"
static const char *ncds2str[] = {NULL, "config", "url", "running", "startup", "candidate"};
@@ -520,13 +521,13 @@
/* we read notif, want a rpc-reply */
if (msgid && (msgtype == NC_MSG_NOTIF)) {
- /* TODO invalid check for a subscription */
- if (!session->notif) {
+ /* TODO check whether the session is even subscribed */
+ /*if (!session->notif) {
pthread_mutex_unlock(session->ti_lock);
ERR("Session %u: received a <notification> but session is not subscribed.", session->id);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
- }
+ }*/
cont_ptr = &session->notifs;
while (*cont_ptr) {
@@ -1019,6 +1020,69 @@
return NC_MSG_ERROR;
}
+static void *
+nc_recv_notif_thread(void *arg)
+{
+ struct nc_ntf_thread_arg *ntarg;
+ struct nc_session *session;
+ void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
+ struct nc_notif *notif;
+ NC_MSG_TYPE msgtype;
+
+ ntarg = (struct nc_ntf_thread_arg *)arg;
+ session = ntarg->session;
+ notif_clb = ntarg->notif_clb;
+ free(ntarg);
+
+ while (session->ntf_tid) {
+ msgtype = nc_recv_notif(session, 0, ¬if);
+ if (msgtype == NC_MSG_NOTIF) {
+ notif_clb(session, notif);
+ nc_notif_free(notif);
+ }
+
+ usleep(NC_CLIENT_NOTIF_THREAD_SLEEP);
+ }
+
+ return NULL;
+}
+
+API int
+nc_recv_notif_dispatch(struct nc_session *session, void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif))
+{
+ struct nc_ntf_thread_arg *ntarg;
+ int ret;
+
+ if (!session || !notif_clb) {
+ ERRARG;
+ return -1;
+ } else if ((session->status != NC_STATUS_RUNNING) || (session->side != NC_CLIENT)) {
+ ERR("Session %u: invalid session to receive Notifications.", session->id);
+ return -1;
+ } else if (session->ntf_tid) {
+ ERR("Session %u: separate notification thread is already running.", session->id);
+ return -1;
+ }
+
+ ntarg = malloc(sizeof *ntarg);
+ ntarg->session = session;
+ ntarg->notif_clb = notif_clb;
+
+ /* just so that nc_recv_notif_thread() does not immediately exit, the value does not matter */
+ session->ntf_tid = malloc(sizeof *session->ntf_tid);
+
+ ret = pthread_create((pthread_t *)session->ntf_tid, NULL, nc_recv_notif_thread, ntarg);
+ if (ret) {
+ ERR("Session %u: failed to create a new thread (%s).", strerror(errno));
+ free(ntarg);
+ free((pthread_t *)session->ntf_tid);
+ session->ntf_tid = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
API NC_MSG_TYPE
nc_send_rpc(struct nc_session *session, struct nc_rpc *rpc, int timeout, uint64_t *msgid)
{
diff --git a/src/session_client.h b/src/session_client.h
index 609a0ad..3b1d9fb 100644
--- a/src/session_client.h
+++ b/src/session_client.h
@@ -495,6 +495,19 @@
NC_MSG_TYPE nc_recv_notif(struct nc_session* session, int timeout, struct nc_notif **notif);
/**
+ * @brief Receive NETCONF Notifications in a separate thread until the session is terminated
+ * or \<notificationComplete\> is received.
+ *
+ * @param[in] session Netconf session to read notifications from.
+ * @param[in] notif_clb Function that is called for every received notification (including
+ * \<notificationComplete\>). Parameters are the session the notification was received on
+ * and the notification itself.
+ * @return 0 if the thread was successfully created, -1 on error.
+ */
+int nc_recv_notif_dispatch(struct nc_session *session,
+ void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif));
+
+/**
* @brief Send NETCONF RPC message via the session.
*
* @param[in] session NETCONF session where the RPC will be written.
diff --git a/src/session_p.h b/src/session_p.h
index 09a038d..c523177 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -32,6 +32,7 @@
#include "libnetconf.h"
#include "netconf.h"
#include "session.h"
+#include "messages_client.h"
#ifdef ENABLE_SSH
@@ -155,6 +156,11 @@
#define NC_READ_SLEEP 100
/**
+ * Sleep time in microseconds to wait between nc_recv_notif() calls.
+ */
+#define NC_CLIENT_NOTIF_THREAD_SLEEP 10000
+
+/**
* Number of sockets kept waiting to be accepted.
*/
#define NC_REVERSE_QUEUE 1
@@ -197,7 +203,7 @@
/* NETCONF data */
uint32_t id; /**< NETCONF session ID (session-id-type) */
NC_VERSION version; /**< NETCONF protocol version */
- pthread_t *notif; /**< running notifications thread - TODO server-side only? */
+ volatile pthread_t *ntf_tid; /**< running notifications thread - TODO client-side only for now */
/* Transport implementation */
NC_TRANSPORT_IMPL ti_type; /**< transport implementation type to select items from ti union */
@@ -263,6 +269,11 @@
uint16_t session_count;
};
+struct nc_ntf_thread_arg {
+ struct nc_session *session;
+ void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
+};
+
NC_MSG_TYPE nc_send_msg(struct nc_session *session, struct lyd_node *op);
int nc_timedlock(pthread_mutex_t *lock, int timeout, int *elapsed);