CHANGE allow concurrent notifications and RPCs
Fixes cesnet/netopeer2#256
diff --git a/src/session_client.c b/src/session_client.c
index 79046aa..0ae3b07 100644
--- a/src/session_client.c
+++ b/src/session_client.c
@@ -856,20 +856,15 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
/* transport specific data */
session->ti_type = NC_TI_FD;
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
-
session->ti.fd.in = fdin;
session->ti.fd.out = fdout;
@@ -887,7 +882,7 @@
session->ctx = ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
@@ -969,7 +964,6 @@
static NC_MSG_TYPE
get_msg(struct nc_session *session, int timeout, uint64_t msgid, struct lyxml_elem **msg)
{
- int r;
char *ptr;
const char *str_msgid;
uint64_t cur_msgid;
@@ -977,15 +971,6 @@
struct nc_msg_cont *cont, **cont_ptr;
NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
- r = nc_session_lock(session, timeout, __func__);
- if (r == -1) {
- /* error */
- return NC_MSG_ERROR;
- } else if (!r) {
- /* timeout */
- return NC_MSG_WOULDBLOCK;
- }
-
/* try to get notification from the session's queue */
if (!msgid && session->opts.client.notifs) {
cont = session->opts.client.notifs;
@@ -1010,7 +995,7 @@
if (!msgtype) {
/* read message from wire */
- msgtype = nc_read_msg_poll(session, timeout, &xml);
+ msgtype = nc_read_msg_poll_io(session, timeout, &xml);
}
/* we read rpc-reply, want a notif */
@@ -1022,7 +1007,6 @@
*cont_ptr = malloc(sizeof **cont_ptr);
if (!*cont_ptr) {
ERRMEM;
- nc_session_unlock(session, timeout, __func__);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
}
@@ -1033,7 +1017,6 @@
/* we read notif, want a rpc-reply */
if (msgid && (msgtype == NC_MSG_NOTIF)) {
if (!session->opts.client.ntf_tid) {
- pthread_mutex_unlock(session->ti_lock);
ERR("Session %u: received a <notification> but session is not subscribed.", session->id);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
@@ -1046,7 +1029,6 @@
*cont_ptr = malloc(sizeof **cont_ptr);
if (!cont_ptr) {
ERRMEM;
- nc_session_unlock(session, timeout, __func__);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
}
@@ -1054,8 +1036,6 @@
(*cont_ptr)->next = NULL;
}
- nc_session_unlock(session, timeout, __func__);
-
switch (msgtype) {
case NC_MSG_NOTIF:
if (!msgid) {
@@ -1729,12 +1709,18 @@
void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
struct nc_notif *notif;
NC_MSG_TYPE msgtype;
+ pthread_t *ntf_tid;
+
+ pthread_detach(pthread_self());
ntarg = (struct nc_ntf_thread_arg *)arg;
session = ntarg->session;
notif_clb = ntarg->notif_clb;
free(ntarg);
+ /* remember our allocated tid, we will be freeing it */
+ ntf_tid = (pthread_t *)session->opts.client.ntf_tid;
+
while (session->opts.client.ntf_tid) {
msgtype = nc_recv_notif(session, NC_CLIENT_NOTIF_THREAD_SLEEP / 1000, ¬if);
if (msgtype == NC_MSG_NOTIF) {
@@ -1755,6 +1741,7 @@
VRB("Session %u: notification thread exit.", session->id);
session->opts.client.ntf_tid = NULL;
+ free(ntf_tid);
return NULL;
}
@@ -1810,7 +1797,7 @@
nc_send_rpc(struct nc_session *session, struct nc_rpc *rpc, int timeout, uint64_t *msgid)
{
NC_MSG_TYPE r;
- int ret, dofree = 1;
+ int dofree = 1;
struct nc_rpc_act_generic *rpc_gen;
struct nc_rpc_getconfig *rpc_gc;
struct nc_rpc_edit *rpc_e;
@@ -2284,6 +2271,11 @@
return NC_MSG_ERROR;
}
+ if (!data) {
+ /* error was already printed */
+ return NC_MSG_ERROR;
+ }
+
if (lyd_validate(&data, LYD_OPT_RPC | LYD_OPT_NOEXTDEPS
| (session->flags & NC_SESSION_CLIENT_NOT_STRICT ? 0 : LYD_OPT_STRICT), NULL)) {
if (dofree) {
@@ -2292,30 +2284,18 @@
return NC_MSG_ERROR;
}
- ret = nc_session_lock(session, timeout, __func__);
- if (ret == -1) {
- /* error */
- r = NC_MSG_ERROR;
- } else if (!ret) {
- /* blocking */
- r = NC_MSG_WOULDBLOCK;
- } else {
- /* send RPC, store its message ID */
- r = nc_send_msg(session, data);
- cur_msgid = session->opts.client.msgid;
- }
- nc_session_unlock(session, timeout, __func__);
+ /* send RPC, store its message ID */
+ r = nc_send_msg_io(session, timeout, data);
+ cur_msgid = session->opts.client.msgid;
if (dofree) {
lyd_free(data);
}
- if (r != NC_MSG_RPC) {
- return r;
+ if (r == NC_MSG_RPC) {
+ *msgid = cur_msgid;
}
-
- *msgid = cur_msgid;
- return NC_MSG_RPC;
+ return r;
}
API void