CHANGE allow concurrent notifications and RPCs

Fixes cesnet/netopeer2#256
diff --git a/src/session_client.c b/src/session_client.c
index 79046aa..0ae3b07 100644
--- a/src/session_client.c
+++ b/src/session_client.c
@@ -856,20 +856,15 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
 
     /* transport specific data */
     session->ti_type = NC_TI_FD;
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
-
     session->ti.fd.in = fdin;
     session->ti.fd.out = fdout;
 
@@ -887,7 +882,7 @@
     session->ctx = ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
@@ -969,7 +964,6 @@
 static NC_MSG_TYPE
 get_msg(struct nc_session *session, int timeout, uint64_t msgid, struct lyxml_elem **msg)
 {
-    int r;
     char *ptr;
     const char *str_msgid;
     uint64_t cur_msgid;
@@ -977,15 +971,6 @@
     struct nc_msg_cont *cont, **cont_ptr;
     NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
 
-    r = nc_session_lock(session, timeout, __func__);
-    if (r == -1) {
-        /* error */
-        return NC_MSG_ERROR;
-    } else if (!r) {
-        /* timeout */
-        return NC_MSG_WOULDBLOCK;
-    }
-
     /* try to get notification from the session's queue */
     if (!msgid && session->opts.client.notifs) {
         cont = session->opts.client.notifs;
@@ -1010,7 +995,7 @@
 
     if (!msgtype) {
         /* read message from wire */
-        msgtype = nc_read_msg_poll(session, timeout, &xml);
+        msgtype = nc_read_msg_poll_io(session, timeout, &xml);
     }
 
     /* we read rpc-reply, want a notif */
@@ -1022,7 +1007,6 @@
         *cont_ptr = malloc(sizeof **cont_ptr);
         if (!*cont_ptr) {
             ERRMEM;
-            nc_session_unlock(session, timeout, __func__);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
         }
@@ -1033,7 +1017,6 @@
     /* we read notif, want a rpc-reply */
     if (msgid && (msgtype == NC_MSG_NOTIF)) {
         if (!session->opts.client.ntf_tid) {
-            pthread_mutex_unlock(session->ti_lock);
             ERR("Session %u: received a <notification> but session is not subscribed.", session->id);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
@@ -1046,7 +1029,6 @@
         *cont_ptr = malloc(sizeof **cont_ptr);
         if (!cont_ptr) {
             ERRMEM;
-            nc_session_unlock(session, timeout, __func__);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
         }
@@ -1054,8 +1036,6 @@
         (*cont_ptr)->next = NULL;
     }
 
-    nc_session_unlock(session, timeout, __func__);
-
     switch (msgtype) {
     case NC_MSG_NOTIF:
         if (!msgid) {
@@ -1729,12 +1709,18 @@
     void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
     struct nc_notif *notif;
     NC_MSG_TYPE msgtype;
+    pthread_t *ntf_tid;
+
+    pthread_detach(pthread_self());
 
     ntarg = (struct nc_ntf_thread_arg *)arg;
     session = ntarg->session;
     notif_clb = ntarg->notif_clb;
     free(ntarg);
 
+    /* remember our allocated tid, we will be freeing it */
+    ntf_tid = (pthread_t *)session->opts.client.ntf_tid;
+
     while (session->opts.client.ntf_tid) {
         msgtype = nc_recv_notif(session, NC_CLIENT_NOTIF_THREAD_SLEEP / 1000, &notif);
         if (msgtype == NC_MSG_NOTIF) {
@@ -1755,6 +1741,7 @@
 
     VRB("Session %u: notification thread exit.", session->id);
     session->opts.client.ntf_tid = NULL;
+    free(ntf_tid);
     return NULL;
 }
 
@@ -1810,7 +1797,7 @@
 nc_send_rpc(struct nc_session *session, struct nc_rpc *rpc, int timeout, uint64_t *msgid)
 {
     NC_MSG_TYPE r;
-    int ret, dofree = 1;
+    int dofree = 1;
     struct nc_rpc_act_generic *rpc_gen;
     struct nc_rpc_getconfig *rpc_gc;
     struct nc_rpc_edit *rpc_e;
@@ -2284,6 +2271,11 @@
         return NC_MSG_ERROR;
     }
 
+    if (!data) {
+        /* error was already printed */
+        return NC_MSG_ERROR;
+    }
+
     if (lyd_validate(&data, LYD_OPT_RPC | LYD_OPT_NOEXTDEPS
                      | (session->flags & NC_SESSION_CLIENT_NOT_STRICT ? 0 : LYD_OPT_STRICT), NULL)) {
         if (dofree) {
@@ -2292,30 +2284,18 @@
         return NC_MSG_ERROR;
     }
 
-    ret = nc_session_lock(session, timeout, __func__);
-    if (ret == -1) {
-        /* error */
-        r = NC_MSG_ERROR;
-    } else if (!ret) {
-        /* blocking */
-        r = NC_MSG_WOULDBLOCK;
-    } else {
-        /* send RPC, store its message ID */
-        r = nc_send_msg(session, data);
-        cur_msgid = session->opts.client.msgid;
-    }
-    nc_session_unlock(session, timeout, __func__);
+    /* send RPC, store its message ID */
+    r = nc_send_msg_io(session, timeout, data);
+    cur_msgid = session->opts.client.msgid;
 
     if (dofree) {
         lyd_free(data);
     }
 
-    if (r != NC_MSG_RPC) {
-        return r;
+    if (r == NC_MSG_RPC) {
+        *msgid = cur_msgid;
     }
-
-    *msgid = cur_msgid;
-    return NC_MSG_RPC;
+    return r;
 }
 
 API void