CHANGE allow concurrent notifications and RPCs

Fixes cesnet/netopeer2#256
diff --git a/src/io.c b/src/io.c
index 5d2a5e4..5092fdf 100644
--- a/src/io.c
+++ b/src/io.c
@@ -262,11 +262,11 @@
     return count;
 }
 
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
 NC_MSG_TYPE
-nc_read_msg(struct nc_session *session, struct lyxml_elem **data)
+nc_read_msg_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data, int passing_io_lock)
 {
-    int ret;
+    int ret, io_locked = passing_io_lock;
     char *msg = NULL, *chunk;
     uint64_t chunk_len, len = 0;
     /* use timeout in milliseconds instead seconds */
@@ -279,18 +279,33 @@
 
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         ERR("Session %u: invalid session to read from.", session->id);
-        return NC_MSG_ERROR;
+        ret = NC_MSG_ERROR;
+        goto cleanup;
     }
 
     nc_gettimespec_mono(&ts_act_timeout);
     nc_addtimespec(&ts_act_timeout, NC_READ_ACT_TIMEOUT * 1000);
 
+    if (!io_locked) {
+        /* SESSION IO LOCK */
+        ret = nc_session_io_lock(session, io_timeout, __func__);
+        if (ret < 0) {
+            ret = NC_MSG_ERROR;
+            goto cleanup;
+        } else if (!ret) {
+            ret = NC_MSG_WOULDBLOCK;
+            goto cleanup;
+        }
+        io_locked = 1;
+    }
+
     /* read the message */
     switch (session->version) {
     case NC_VERSION_10:
         ret = nc_read_until(session, NC_VERSION_10_ENDTAG, 0, inact_timeout, &ts_act_timeout, &msg);
         if (ret == -1) {
-            goto error;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
 
         /* cut off the end tag */
@@ -300,11 +315,13 @@
         while (1) {
             ret = nc_read_until(session, "\n#", 0, inact_timeout, &ts_act_timeout, NULL);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             ret = nc_read_until(session, "\n", 0, inact_timeout, &ts_act_timeout, &chunk);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
 
             if (!strcmp(chunk, "#\n")) {
@@ -328,14 +345,16 @@
             /* now we have size of next chunk, so read the chunk */
             ret = nc_read_chunk(session, chunk_len, inact_timeout, &ts_act_timeout, &chunk);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
 
             /* realloc message buffer, remember to count terminating null byte */
             msg = realloc(msg, len + chunk_len + 1);
             if (!msg) {
                 ERRMEM;
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             memcpy(msg + len, chunk, chunk_len);
             len += chunk_len;
@@ -345,6 +364,12 @@
 
         break;
     }
+
+    /* SESSION IO UNLOCK */
+    assert(io_locked);
+    nc_session_io_unlock(session, __func__);
+    io_locked = 0;
+
     DBG("Session %u: received message:\n%s\n", session->id, msg);
 
     /* build XML tree */
@@ -388,7 +413,7 @@
         /* NETCONF version 1.1 defines sending error reply from the server (RFC 6241 sec. 3) */
         reply = nc_server_reply_err(nc_err(NC_ERR_MALFORMED_MSG));
 
-        if (nc_write_msg(session, NC_MSG_REPLY, NULL, reply) == -1) {
+        if (nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, NULL, reply) != NC_MSG_REPLY) {
             ERR("Session %u: unable to send a \"Malformed message\" error reply, terminating session.", session->id);
             if (session->status != NC_STATUS_INVALID) {
                 session->status = NC_STATUS_INVALID;
@@ -397,19 +422,22 @@
         }
         nc_server_reply_free(reply);
     }
+    ret = NC_MSG_ERROR;
 
-error:
-    /* cleanup */
+cleanup:
+    if (io_locked) {
+        nc_session_io_unlock(session, __func__);
+    }
     free(msg);
     free(*data);
     *data = NULL;
 
-    return NC_MSG_ERROR;
+    return ret;
 }
 
 /* return -1 means either poll error or that session was invalidated (socket error), EINTR is handled inside */
 static int
-nc_read_poll(struct nc_session *session, int timeout)
+nc_read_poll(struct nc_session *session, int io_timeout)
 {
     sigset_t sigmask, origmask;
     int ret = -2;
@@ -424,7 +452,7 @@
 #ifdef NC_ENABLED_SSH
     case NC_TI_LIBSSH:
         /* EINTR is handled, it resumes waiting */
-        ret = ssh_channel_poll_timeout(session->ti.libssh.channel, timeout, 0);
+        ret = ssh_channel_poll_timeout(session->ti.libssh.channel, io_timeout, 0);
         if (ret == SSH_ERROR) {
             ERR("Session %u: SSH channel poll error (%s).", session->id,
                 ssh_get_error(session->ti.libssh.session));
@@ -468,7 +496,7 @@
 
         sigfillset(&sigmask);
         pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
-        ret = poll(&fds, 1, timeout);
+        ret = poll(&fds, 1, io_timeout);
         pthread_sigmask(SIG_SETMASK, &origmask, NULL);
 
         break;
@@ -504,9 +532,9 @@
     return ret;
 }
 
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
 NC_MSG_TYPE
-nc_read_msg_poll(struct nc_session *session, int timeout, struct lyxml_elem **data)
+nc_read_msg_poll_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data)
 {
     int ret;
 
@@ -518,16 +546,31 @@
         return NC_MSG_ERROR;
     }
 
-    ret = nc_read_poll(session, timeout);
+    /* SESSION IO LOCK */
+    ret = nc_session_io_lock(session, io_timeout, __func__);
+    if (ret < 0) {
+        return NC_MSG_ERROR;
+    } else if (!ret) {
+        return NC_MSG_WOULDBLOCK;
+    }
+
+    ret = nc_read_poll(session, io_timeout);
     if (ret == 0) {
         /* timed out */
+
+        /* SESSION IO UNLOCK */
+        nc_session_io_unlock(session, __func__);
         return NC_MSG_WOULDBLOCK;
     } else if (ret < 0) {
         /* poll error, error written */
+
+        /* SESSION IO UNLOCK */
+        nc_session_io_unlock(session, __func__);
         return NC_MSG_ERROR;
     }
 
-    return nc_read_msg(session, data);
+    /* SESSION IO LOCK passed down */
+    return nc_read_msg_io(session, io_timeout, data, 1);
 }
 
 /* does not really log, only fatal errors */
@@ -1001,12 +1044,12 @@
     nc_write_error_elem(arg, "rpc-error", 9, prefix, pref_len, 0, 0);
 }
 
-/* return -1 can change session status */
-int
-nc_write_msg(struct nc_session *session, int type, ...)
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
+NC_MSG_TYPE
+nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...)
 {
     va_list ap;
-    int count;
+    int count, ret;
     const char *attrs, *base_prefix;
     struct lyd_node *content;
     struct lyxml_elem *rpc_elem;
@@ -1023,14 +1066,21 @@
 
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         ERR("Session %u: invalid session to write to.", session->id);
-        return -1;
+        return NC_MSG_ERROR;
     }
 
-    va_start(ap, type);
-
     arg.session = session;
     arg.len = 0;
 
+    /* SESSION IO LOCK */
+    ret = nc_session_io_lock(session, io_timeout, __func__);
+    if (ret < 0) {
+        return NC_MSG_ERROR;
+    } else if (!ret) {
+        return NC_MSG_WOULDBLOCK;
+    }
+
+    va_start(ap, type);
 
     switch (type) {
     case NC_MSG_RPC:
@@ -1041,15 +1091,15 @@
                          NC_NS_BASE, session->opts.client.msgid + 1, attrs ? attrs : "");
         if (count == -1) {
             ERRMEM;
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, buf, count, 0);
         free(buf);
 
         if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, content, LYD_XML, LYP_WITHSIBLINGS | LYP_NETCONF)) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, "</rpc>", 6, 0);
 
@@ -1106,8 +1156,8 @@
             }
             if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, ((struct nc_reply_data *)reply)->data, LYD_XML,
                               LYP_WITHSIBLINGS | LYP_NETCONF | wd)) {
-                va_end(ap);
-                return -1;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             break;
         case NC_RPL_ERROR:
@@ -1119,8 +1169,8 @@
         default:
             ERRINT;
             nc_write_clb((void *)&arg, NULL, 0, 0);
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         if (rpc_elem && rpc_elem->ns && rpc_elem->ns->prefix) {
             nc_write_clb((void *)&arg, "</", 2, 0);
@@ -1140,16 +1190,16 @@
         nc_write_clb((void *)&arg, notif->eventtime, strlen(notif->eventtime), 0);
         nc_write_clb((void *)&arg, "</eventTime>", 12, 0);
         if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, notif->tree, LYD_XML, 0)) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, "</notification>", 15, 0);
         break;
 
     case NC_MSG_HELLO:
         if (session->version != NC_VERSION_10) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         capabilities = va_arg(ap, const char **);
         sid = va_arg(ap, uint32_t*);
@@ -1157,8 +1207,8 @@
         count = asprintf(&buf, "<hello xmlns=\"%s\"><capabilities>", NC_NS_BASE);
         if (count == -1) {
             ERRMEM;
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, buf, count, 0);
         free(buf);
@@ -1171,8 +1221,8 @@
             count = asprintf(&buf, "</capabilities><session-id>%u</session-id></hello>", *sid);
             if (count == -1) {
                 ERRMEM;
-                va_end(ap);
-                return -1;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             nc_write_clb((void *)&arg, buf, count, 0);
             free(buf);
@@ -1182,20 +1232,25 @@
         break;
 
     default:
-        va_end(ap);
-        return -1;
+        ret = NC_MSG_ERROR;
+        goto cleanup;
     }
 
     /* flush message */
     nc_write_clb((void *)&arg, NULL, 0, 0);
 
-    va_end(ap);
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         /* error was already written */
-        return -1;
+        ret = NC_MSG_ERROR;
+    } else {
+        /* specific message successfully sent */
+        ret = type;
     }
 
-    return 0;
+cleanup:
+    va_end(ap);
+    nc_session_io_unlock(session, __func__);
+    return ret;
 }
 
 void *
diff --git a/src/session.c b/src/session.c
index fae2988..098ae2b 100644
--- a/src/session.c
+++ b/src/session.c
@@ -145,7 +145,7 @@
 #endif
 
 struct nc_session *
-nc_new_session(int not_allocate_ti)
+nc_new_session(NC_SIDE side, int shared_ti)
 {
     struct nc_session *sess;
 
@@ -154,20 +154,38 @@
         return NULL;
     }
 
-    if (!not_allocate_ti) {
-        sess->ti_lock = malloc(sizeof *sess->ti_lock);
-        sess->ti_cond = malloc(sizeof *sess->ti_cond);
-        sess->ti_inuse = malloc(sizeof *sess->ti_inuse);
-        if (!sess->ti_lock || !sess->ti_cond || !sess->ti_inuse) {
-            free(sess->ti_lock);
-            free(sess->ti_cond);
-            free((int *)sess->ti_inuse);
-            free(sess);
-            return NULL;
+    sess->side = side;
+
+    if (side == NC_SERVER) {
+        sess->opts.server.rpc_lock = malloc(sizeof *sess->opts.server.rpc_lock);
+        sess->opts.server.rpc_cond = malloc(sizeof *sess->opts.server.rpc_cond);
+        sess->opts.server.rpc_inuse = malloc(sizeof *sess->opts.server.rpc_inuse);
+        if (!sess->opts.server.rpc_lock || !sess->opts.server.rpc_cond || !sess->opts.server.rpc_inuse) {
+            goto error;
         }
+        pthread_mutex_init(sess->opts.server.rpc_lock, NULL);
+        pthread_cond_init(sess->opts.server.rpc_cond, NULL);
+        *sess->opts.server.rpc_inuse = 0;
+    }
+
+    if (!shared_ti) {
+        sess->io_lock = malloc(sizeof *sess->io_lock);
+        if (!sess->io_lock) {
+            goto error;
+        }
+        pthread_mutex_init(sess->io_lock, NULL);
     }
 
     return sess;
+
+error:
+    if (side == NC_SERVER) {
+        free(sess->opts.server.rpc_lock);
+        free(sess->opts.server.rpc_cond);
+        free((int *)sess->opts.server.rpc_inuse);
+    }
+    free(sess);
+    return NULL;
 }
 
 /*
@@ -176,49 +194,54 @@
  *        -1 - error
  */
 int
-nc_session_lock(struct nc_session *session, int timeout, const char *func)
+nc_session_rpc_lock(struct nc_session *session, int timeout, const char *func)
 {
     int ret;
     struct timespec ts_timeout;
 
+    if (session->side != NC_SERVER) {
+        ERRINT;
+        return -1;
+    }
+
     if (timeout > 0) {
         nc_gettimespec_real(&ts_timeout);
         nc_addtimespec(&ts_timeout, timeout);
 
         /* LOCK */
-        ret = pthread_mutex_timedlock(session->ti_lock, &ts_timeout);
+        ret = pthread_mutex_timedlock(session->opts.server.rpc_lock, &ts_timeout);
         if (!ret) {
-            while (*session->ti_inuse) {
-                ret = pthread_cond_timedwait(session->ti_cond, session->ti_lock, &ts_timeout);
+            while (*session->opts.server.rpc_inuse) {
+                ret = pthread_cond_timedwait(session->opts.server.rpc_cond, session->opts.server.rpc_lock, &ts_timeout);
                 if (ret) {
-                    pthread_mutex_unlock(session->ti_lock);
+                    pthread_mutex_unlock(session->opts.server.rpc_lock);
                     break;
                 }
             }
         }
     } else if (!timeout) {
-        if (*session->ti_inuse) {
+        if (*session->opts.server.rpc_inuse) {
             /* immediate timeout */
             return 0;
         }
 
         /* LOCK */
-        ret = pthread_mutex_trylock(session->ti_lock);
+        ret = pthread_mutex_trylock(session->opts.server.rpc_lock);
         if (!ret) {
             /* be extra careful, someone could have been faster */
-            if (*session->ti_inuse) {
-                pthread_mutex_unlock(session->ti_lock);
+            if (*session->opts.server.rpc_inuse) {
+                pthread_mutex_unlock(session->opts.server.rpc_lock);
                 return 0;
             }
         }
     } else { /* timeout == -1 */
         /* LOCK */
-        ret = pthread_mutex_lock(session->ti_lock);
+        ret = pthread_mutex_lock(session->opts.server.rpc_lock);
         if (!ret) {
-            while (*session->ti_inuse) {
-                ret = pthread_cond_wait(session->ti_cond, session->ti_lock);
+            while (*session->opts.server.rpc_inuse) {
+                ret = pthread_cond_wait(session->opts.server.rpc_cond, session->opts.server.rpc_lock);
                 if (ret) {
-                    pthread_mutex_unlock(session->ti_lock);
+                    pthread_mutex_unlock(session->opts.server.rpc_lock);
                     break;
                 }
             }
@@ -232,19 +255,19 @@
         }
 
         /* error */
-        ERR("%s: failed to lock a session (%s).", func, strerror(ret));
+        ERR("%s: failed to RPC lock a session (%s).", func, strerror(ret));
         return -1;
     }
 
     /* ok */
-    assert(*session->ti_inuse == 0);
-    *session->ti_inuse = 1;
+    assert(*session->opts.server.rpc_inuse == 0);
+    *session->opts.server.rpc_inuse = 1;
 
     /* UNLOCK */
-    ret = pthread_mutex_unlock(session->ti_lock);
+    ret = pthread_mutex_unlock(session->opts.server.rpc_lock);
     if (ret) {
         /* error */
-        ERR("%s: faile to unlock a session (%s).", func, strerror(ret));
+        ERR("%s: faile to RPC unlock a session (%s).", func, strerror(ret));
         return -1;
     }
 
@@ -252,44 +275,49 @@
 }
 
 int
-nc_session_unlock(struct nc_session *session, int timeout, const char *func)
+nc_session_rpc_unlock(struct nc_session *session, int timeout, const char *func)
 {
     int ret;
     struct timespec ts_timeout;
 
-    assert(*session->ti_inuse);
+    if (session->side != NC_SERVER) {
+        ERRINT;
+        return -1;
+    }
+
+    assert(*session->opts.server.rpc_inuse);
 
     if (timeout > 0) {
         nc_gettimespec_real(&ts_timeout);
         nc_addtimespec(&ts_timeout, timeout);
 
         /* LOCK */
-        ret = pthread_mutex_timedlock(session->ti_lock, &ts_timeout);
+        ret = pthread_mutex_timedlock(session->opts.server.rpc_lock, &ts_timeout);
     } else if (!timeout) {
         /* LOCK */
-        ret = pthread_mutex_trylock(session->ti_lock);
+        ret = pthread_mutex_trylock(session->opts.server.rpc_lock);
     } else { /* timeout == -1 */
         /* LOCK */
-        ret = pthread_mutex_lock(session->ti_lock);
+        ret = pthread_mutex_lock(session->opts.server.rpc_lock);
     }
 
     if (ret && (ret != EBUSY) && (ret != ETIMEDOUT)) {
         /* error */
-        ERR("%s: failed to lock a session (%s).", func, strerror(ret));
+        ERR("%s: failed to RPC lock a session (%s).", func, strerror(ret));
         return -1;
     } else if (ret) {
-        WRN("%s: session lock timeout, should not happen.");
+        WRN("%s: session RPC lock timeout, should not happen.");
     }
 
-    *session->ti_inuse = 0;
-    pthread_cond_signal(session->ti_cond);
+    *session->opts.server.rpc_inuse = 0;
+    pthread_cond_signal(session->opts.server.rpc_cond);
 
     if (!ret) {
         /* UNLOCK */
-        ret = pthread_mutex_unlock(session->ti_lock);
+        ret = pthread_mutex_unlock(session->opts.server.rpc_lock);
         if (ret) {
             /* error */
-            ERR("%s: failed to unlock a session (%s).", func, strerror(ret));
+            ERR("%s: failed to RPC unlock a session (%s).", func, strerror(ret));
             return -1;
         }
     }
@@ -297,6 +325,57 @@
     return 1;
 }
 
+/*
+ * @return 1 - success
+ *         0 - timeout
+ *        -1 - error
+ */
+int
+nc_session_io_lock(struct nc_session *session, int timeout, const char *func)
+{
+    int ret;
+    struct timespec ts_timeout;
+
+    if (timeout > 0) {
+        nc_gettimespec_real(&ts_timeout);
+        nc_addtimespec(&ts_timeout, timeout);
+
+        ret = pthread_mutex_timedlock(session->io_lock, &ts_timeout);
+    } else if (!timeout) {
+        ret = pthread_mutex_trylock(session->io_lock);
+    } else { /* timeout == -1 */
+        ret = pthread_mutex_lock(session->opts.server.rpc_lock);
+    }
+
+    if (ret) {
+        if ((ret == EBUSY) || (ret == ETIMEDOUT)) {
+            /* timeout */
+            return 0;
+        }
+
+        /* error */
+        ERR("%s: failed to IO lock a session (%s).", func, strerror(ret));
+        return -1;
+    }
+
+    return 1;
+}
+
+int
+nc_session_io_unlock(struct nc_session *session, const char *func)
+{
+    int ret;
+
+    ret = pthread_mutex_unlock(session->io_lock);
+    if (ret) {
+        /* error */
+        ERR("%s: failed to IO unlock a session (%s).", func, strerror(ret));
+        return -1;
+    }
+
+    return 1;
+}
+
 API NC_STATUS
 nc_session_get_status(const struct nc_session *session)
 {
@@ -430,32 +509,23 @@
 }
 
 NC_MSG_TYPE
-nc_send_msg(struct nc_session *session, struct lyd_node *op)
+nc_send_msg_io(struct nc_session *session, int io_timeout, struct lyd_node *op)
 {
-    int r;
-
     if (session->ctx != op->schema->module->ctx) {
         ERR("Session %u: RPC \"%s\" was created in different context than that of the session.",
             session->id, op->schema->name);
         return NC_MSG_ERROR;
     }
 
-    r = nc_write_msg(session, NC_MSG_RPC, op, NULL);
-
-    if (r) {
-        return NC_MSG_ERROR;
-    }
-
-    return NC_MSG_RPC;
+    return nc_write_msg_io(session, io_timeout, NC_MSG_RPC, op, NULL);
 }
 
 API void
 nc_session_free(struct nc_session *session, void (*data_free)(void *))
 {
-    int r, i, locked, sock = -1;
+    int r, i, rpc_locked = 0, sock = -1;
     int connected; /* flag to indicate whether the transport socket is still connected */
     int multisession = 0; /* flag for more NETCONF sessions on a single SSH session */
-    pthread_t tid;
     struct nc_session *siter;
     struct nc_msg_cont *contiter;
     struct lyxml_elem *rpl, *child;
@@ -469,30 +539,20 @@
 
     /* stop notifications loop if any */
     if ((session->side == NC_CLIENT) && session->opts.client.ntf_tid) {
-        tid = *session->opts.client.ntf_tid;
-        free((pthread_t *)session->opts.client.ntf_tid);
         session->opts.client.ntf_tid = NULL;
         /* the thread now knows it should quit */
-
-        pthread_join(tid, NULL);
     }
 
-    if (session->ti_lock) {
-        r = nc_session_lock(session, NC_SESSION_FREE_LOCK_TIMEOUT, __func__);
+    if ((session->side == NC_SERVER) && session->opts.server.rpc_lock) {
+        r = nc_session_rpc_lock(session, NC_SESSION_FREE_LOCK_TIMEOUT, __func__);
         if (r == -1) {
             return;
-        } else if (!r) {
-            /* we failed to lock it, too bad */
-            locked = 0;
-        } else {
-            locked = 1;
-        }
-    } else {
-        ERRINT;
-        return;
+        } else if (r) {
+            rpc_locked = 1;
+        } /* else failed to lock it, too bad */
     }
 
-    if ((session->side == NC_CLIENT) && (session->status == NC_STATUS_RUNNING) && locked) {
+    if ((session->side == NC_CLIENT) && (session->status == NC_STATUS_RUNNING)) {
         /* cleanup message queues */
         /* notifications */
         for (contiter = session->opts.client.notifs; contiter; ) {
@@ -518,9 +578,9 @@
             WRN("Session %u: missing ietf-netconf schema in context, unable to send <close-session>.", session->id);
         } else {
             close_rpc = lyd_new(NULL, ietfnc, "close-session");
-            nc_send_msg(session, close_rpc);
+            nc_send_msg_io(session, NC_SESSION_FREE_LOCK_TIMEOUT, close_rpc);
             lyd_free(close_rpc);
-            switch (nc_read_msg_poll(session, NC_CLOSE_REPLY_TIMEOUT, &rpl)) {
+            switch (nc_read_msg_poll_io(session, NC_CLOSE_REPLY_TIMEOUT, &rpl)) {
             case NC_MSG_REPLY:
                 LY_TREE_FOR(rpl->child, child) {
                     if (!strcmp(child->name, "ok") && child->ns && !strcmp(child->ns->value, NC_NS_BASE)) {
@@ -692,17 +752,20 @@
     lydict_remove(session->ctx, session->host);
 
     /* final cleanup */
-    if (session->ti_lock) {
-        if (locked) {
-            nc_session_unlock(session, NC_SESSION_LOCK_TIMEOUT, __func__);
+    if ((session->side == NC_SERVER) && session->opts.server.rpc_lock) {
+        if (rpc_locked) {
+            nc_session_rpc_unlock(session, NC_SESSION_LOCK_TIMEOUT, __func__);
         }
-        if (!multisession) {
-            pthread_mutex_destroy(session->ti_lock);
-            pthread_cond_destroy(session->ti_cond);
-            free(session->ti_lock);
-            free(session->ti_cond);
-            free((int *)session->ti_inuse);
-        }
+        pthread_mutex_destroy(session->opts.server.rpc_lock);
+        pthread_cond_destroy(session->opts.server.rpc_cond);
+        free(session->opts.server.rpc_lock);
+        free(session->opts.server.rpc_cond);
+        free((int *)session->opts.server.rpc_inuse);
+    }
+
+    if (session->io_lock && !multisession) {
+        pthread_mutex_destroy(session->io_lock);
+        free(session->io_lock);
     }
 
     if (!(session->flags & NC_SESSION_SHAREDCTX)) {
@@ -1032,70 +1095,56 @@
 }
 
 static NC_MSG_TYPE
-nc_send_client_hello(struct nc_session *session)
+nc_send_hello_io(struct nc_session *session)
 {
-    int r, i;
+    NC_MSG_TYPE ret;
+    int i, io_timeout;
     const char **cpblts;
+    uint32_t *sid;
 
-    /* client side hello - send only NETCONF base capabilities */
-    cpblts = malloc(3 * sizeof *cpblts);
-    if (!cpblts) {
-        ERRMEM;
-        return NC_MSG_ERROR;
+    if (session->side == NC_CLIENT) {
+        /* client side hello - send only NETCONF base capabilities */
+        cpblts = malloc(3 * sizeof *cpblts);
+        if (!cpblts) {
+            ERRMEM;
+            return NC_MSG_ERROR;
+        }
+        cpblts[0] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.0", 0);
+        cpblts[1] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.1", 0);
+        cpblts[2] = NULL;
+
+        io_timeout = NC_CLIENT_HELLO_TIMEOUT * 1000;
+        sid = NULL;
+    } else {
+        cpblts = nc_server_get_cpblts_version(session->ctx, LYS_VERSION_1);
+
+        io_timeout = NC_SERVER_HELLO_TIMEOUT * 1000;
+        sid = &session->id;
     }
-    cpblts[0] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.0", 0);
-    cpblts[1] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.1", 0);
-    cpblts[2] = NULL;
 
-    r = nc_write_msg(session, NC_MSG_HELLO, cpblts, NULL);
+    ret = nc_write_msg_io(session, io_timeout, NC_MSG_HELLO, cpblts, sid);
 
     for (i = 0; cpblts[i]; ++i) {
         lydict_remove(session->ctx, cpblts[i]);
     }
     free(cpblts);
 
-    if (r) {
-        return NC_MSG_ERROR;
-    }
-
-    return NC_MSG_HELLO;
+    return ret;
 }
 
 static NC_MSG_TYPE
-nc_send_server_hello(struct nc_session *session)
-{
-    int r, i;
-    const char **cpblts;
-
-    cpblts = nc_server_get_cpblts_version(session->ctx, LYS_VERSION_1);
-
-    r = nc_write_msg(session, NC_MSG_HELLO, cpblts, &session->id);
-
-    for (i = 0; cpblts[i]; ++i) {
-        lydict_remove(session->ctx, cpblts[i]);
-    }
-    free(cpblts);
-
-    if (r) {
-        return NC_MSG_ERROR;
-    }
-
-    return NC_MSG_HELLO;
-}
-
-static NC_MSG_TYPE
-nc_recv_client_hello(struct nc_session *session)
+nc_recv_client_hello_io(struct nc_session *session)
 {
     struct lyxml_elem *xml = NULL, *node;
-    NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
+    NC_MSG_TYPE msgtype;
     int ver = -1;
     char *str;
     long long int id;
     int flag = 0;
 
-    msgtype = nc_read_msg_poll(session, NC_CLIENT_HELLO_TIMEOUT * 1000, &xml);
+    msgtype = nc_read_msg_poll_io(session, NC_CLIENT_HELLO_TIMEOUT * 1000, &xml);
 
-    switch(msgtype) {
+    switch (msgtype) {
     case NC_MSG_HELLO:
         /* parse <hello> data */
         LY_TREE_FOR(xml->child, node) {
@@ -1162,14 +1211,15 @@
 }
 
 static NC_MSG_TYPE
-nc_recv_server_hello(struct nc_session *session)
+nc_recv_server_hello_io(struct nc_session *session)
 {
     struct lyxml_elem *xml = NULL, *node;
     NC_MSG_TYPE msgtype;
     int ver = -1;
     int flag = 0;
 
-    msgtype = nc_read_msg_poll(session, (server_opts.hello_timeout ? server_opts.hello_timeout * 1000 : NC_SERVER_HELLO_TIMEOUT * 1000), &xml);
+    msgtype = nc_read_msg_poll_io(session, (server_opts.hello_timeout ?
+                                            server_opts.hello_timeout * 1000 : NC_SERVER_HELLO_TIMEOUT * 1000), &xml);
 
     switch (msgtype) {
     case NC_MSG_HELLO:
@@ -1212,29 +1262,23 @@
 
 cleanup:
     lyxml_free(session->ctx, xml);
-
     return msgtype;
 }
 
 NC_MSG_TYPE
-nc_handshake(struct nc_session *session)
+nc_handshake_io(struct nc_session *session)
 {
     NC_MSG_TYPE type;
 
-    if (session->side == NC_CLIENT) {
-        type = nc_send_client_hello(session);
-    } else {
-        type = nc_send_server_hello(session);
-    }
-
+    type = nc_send_hello_io(session);
     if (type != NC_MSG_HELLO) {
         return type;
     }
 
     if (session->side == NC_CLIENT) {
-        type = nc_recv_client_hello(session);
+        type = nc_recv_client_hello_io(session);
     } else {
-        type = nc_recv_server_hello(session);
+        type = nc_recv_server_hello_io(session);
     }
 
     return type;
diff --git a/src/session_client.c b/src/session_client.c
index 79046aa..0ae3b07 100644
--- a/src/session_client.c
+++ b/src/session_client.c
@@ -856,20 +856,15 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
 
     /* transport specific data */
     session->ti_type = NC_TI_FD;
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
-
     session->ti.fd.in = fdin;
     session->ti.fd.out = fdout;
 
@@ -887,7 +882,7 @@
     session->ctx = ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
@@ -969,7 +964,6 @@
 static NC_MSG_TYPE
 get_msg(struct nc_session *session, int timeout, uint64_t msgid, struct lyxml_elem **msg)
 {
-    int r;
     char *ptr;
     const char *str_msgid;
     uint64_t cur_msgid;
@@ -977,15 +971,6 @@
     struct nc_msg_cont *cont, **cont_ptr;
     NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
 
-    r = nc_session_lock(session, timeout, __func__);
-    if (r == -1) {
-        /* error */
-        return NC_MSG_ERROR;
-    } else if (!r) {
-        /* timeout */
-        return NC_MSG_WOULDBLOCK;
-    }
-
     /* try to get notification from the session's queue */
     if (!msgid && session->opts.client.notifs) {
         cont = session->opts.client.notifs;
@@ -1010,7 +995,7 @@
 
     if (!msgtype) {
         /* read message from wire */
-        msgtype = nc_read_msg_poll(session, timeout, &xml);
+        msgtype = nc_read_msg_poll_io(session, timeout, &xml);
     }
 
     /* we read rpc-reply, want a notif */
@@ -1022,7 +1007,6 @@
         *cont_ptr = malloc(sizeof **cont_ptr);
         if (!*cont_ptr) {
             ERRMEM;
-            nc_session_unlock(session, timeout, __func__);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
         }
@@ -1033,7 +1017,6 @@
     /* we read notif, want a rpc-reply */
     if (msgid && (msgtype == NC_MSG_NOTIF)) {
         if (!session->opts.client.ntf_tid) {
-            pthread_mutex_unlock(session->ti_lock);
             ERR("Session %u: received a <notification> but session is not subscribed.", session->id);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
@@ -1046,7 +1029,6 @@
         *cont_ptr = malloc(sizeof **cont_ptr);
         if (!cont_ptr) {
             ERRMEM;
-            nc_session_unlock(session, timeout, __func__);
             lyxml_free(session->ctx, xml);
             return NC_MSG_ERROR;
         }
@@ -1054,8 +1036,6 @@
         (*cont_ptr)->next = NULL;
     }
 
-    nc_session_unlock(session, timeout, __func__);
-
     switch (msgtype) {
     case NC_MSG_NOTIF:
         if (!msgid) {
@@ -1729,12 +1709,18 @@
     void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
     struct nc_notif *notif;
     NC_MSG_TYPE msgtype;
+    pthread_t *ntf_tid;
+
+    pthread_detach(pthread_self());
 
     ntarg = (struct nc_ntf_thread_arg *)arg;
     session = ntarg->session;
     notif_clb = ntarg->notif_clb;
     free(ntarg);
 
+    /* remember our allocated tid, we will be freeing it */
+    ntf_tid = (pthread_t *)session->opts.client.ntf_tid;
+
     while (session->opts.client.ntf_tid) {
         msgtype = nc_recv_notif(session, NC_CLIENT_NOTIF_THREAD_SLEEP / 1000, &notif);
         if (msgtype == NC_MSG_NOTIF) {
@@ -1755,6 +1741,7 @@
 
     VRB("Session %u: notification thread exit.", session->id);
     session->opts.client.ntf_tid = NULL;
+    free(ntf_tid);
     return NULL;
 }
 
@@ -1810,7 +1797,7 @@
 nc_send_rpc(struct nc_session *session, struct nc_rpc *rpc, int timeout, uint64_t *msgid)
 {
     NC_MSG_TYPE r;
-    int ret, dofree = 1;
+    int dofree = 1;
     struct nc_rpc_act_generic *rpc_gen;
     struct nc_rpc_getconfig *rpc_gc;
     struct nc_rpc_edit *rpc_e;
@@ -2284,6 +2271,11 @@
         return NC_MSG_ERROR;
     }
 
+    if (!data) {
+        /* error was already printed */
+        return NC_MSG_ERROR;
+    }
+
     if (lyd_validate(&data, LYD_OPT_RPC | LYD_OPT_NOEXTDEPS
                      | (session->flags & NC_SESSION_CLIENT_NOT_STRICT ? 0 : LYD_OPT_STRICT), NULL)) {
         if (dofree) {
@@ -2292,30 +2284,18 @@
         return NC_MSG_ERROR;
     }
 
-    ret = nc_session_lock(session, timeout, __func__);
-    if (ret == -1) {
-        /* error */
-        r = NC_MSG_ERROR;
-    } else if (!ret) {
-        /* blocking */
-        r = NC_MSG_WOULDBLOCK;
-    } else {
-        /* send RPC, store its message ID */
-        r = nc_send_msg(session, data);
-        cur_msgid = session->opts.client.msgid;
-    }
-    nc_session_unlock(session, timeout, __func__);
+    /* send RPC, store its message ID */
+    r = nc_send_msg_io(session, timeout, data);
+    cur_msgid = session->opts.client.msgid;
 
     if (dofree) {
         lyd_free(data);
     }
 
-    if (r != NC_MSG_RPC) {
-        return r;
+    if (r == NC_MSG_RPC) {
+        *msgid = cur_msgid;
     }
-
-    *msgid = cur_msgid;
-    return NC_MSG_RPC;
+    return r;
 }
 
 API void
diff --git a/src/session_client_ssh.c b/src/session_client_ssh.c
index 332d5d1..83ab4d8 100644
--- a/src/session_client_ssh.c
+++ b/src/session_client_ssh.c
@@ -1487,19 +1487,12 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
-
-    /* transport lock */
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
-
     session->ti_type = NC_TI_LIBSSH;
     session->ti.libssh.session = ssh_session;
 
@@ -1582,7 +1575,7 @@
     ctx = session->ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
@@ -1642,20 +1635,14 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
 
-    /* transport lock */
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
-
-    /* other transport-specific data */
+    /* transport-specific data */
     session->ti_type = NC_TI_LIBSSH;
     session->ti.libssh.session = ssh_new();
     if (!session->ti.libssh.session) {
@@ -1698,7 +1685,7 @@
     ctx = session->ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
@@ -1736,30 +1723,26 @@
     }
 
     /* prepare session structure */
-    new_session = nc_new_session(1);
+    new_session = nc_new_session(NC_CLIENT, 1);
     if (!new_session) {
         ERRMEM;
         return NULL;
     }
     new_session->status = NC_STATUS_STARTING;
-    new_session->side = NC_CLIENT;
 
-    /* share some parameters including the session lock */
+    /* share some parameters including the IO lock (we are using one socket for both sessions) */
     new_session->ti_type = NC_TI_LIBSSH;
-    new_session->ti_lock = session->ti_lock;
-    new_session->ti_cond = session->ti_cond;
-    new_session->ti_inuse = session->ti_inuse;
     new_session->ti.libssh.session = session->ti.libssh.session;
+    new_session->io_lock = session->io_lock;
 
     /* create the channel safely */
-    if (nc_session_lock(new_session, -1, __func__) != 1) {
+    if (nc_session_io_lock(new_session, -1, __func__) != 1) {
         goto fail;
     }
-
-    /* open a channel */
     if (open_netconf_channel(new_session, NC_TRANSPORT_TIMEOUT) != 1) {
         goto fail;
     }
+    nc_session_io_unlock(new_session, __func__);
 
     if (nc_session_new_ctx(new_session, ctx) != EXIT_SUCCESS) {
         goto fail;
@@ -1767,12 +1750,12 @@
     ctx = session->ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(new_session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(new_session) != NC_MSG_HELLO) {
         goto fail;
     }
     new_session->status = NC_STATUS_RUNNING;
 
-    nc_session_unlock(new_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+    nc_session_io_unlock(new_session, __func__);
 
     if (nc_ctx_check_and_fill(new_session) == -1) {
         goto fail;
diff --git a/src/session_client_tls.c b/src/session_client_tls.c
index 2495506..fc780ad 100644
--- a/src/session_client_tls.c
+++ b/src/session_client_tls.c
@@ -602,18 +602,12 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
-
-    /* transport lock */
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
 
     /* fill the session */
     session->ti_type = NC_TI_OPENSSL;
@@ -676,7 +670,7 @@
     ctx = session->ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
@@ -711,19 +705,12 @@
     }
 
     /* prepare session structure */
-    session = nc_new_session(0);
+    session = nc_new_session(NC_CLIENT, 0);
     if (!session) {
         ERRMEM;
         return NULL;
     }
     session->status = NC_STATUS_STARTING;
-    session->side = NC_CLIENT;
-
-    /* transport lock */
-    pthread_mutex_init(session->ti_lock, NULL);
-    pthread_cond_init(session->ti_cond, NULL);
-    *session->ti_inuse = 0;
-
     session->ti_type = NC_TI_OPENSSL;
     session->ti.tls = tls;
 
@@ -733,7 +720,7 @@
     ctx = session->ctx;
 
     /* NETCONF handshake */
-    if (nc_handshake(session) != NC_MSG_HELLO) {
+    if (nc_handshake_io(session) != NC_MSG_HELLO) {
         goto fail;
     }
     session->status = NC_STATUS_RUNNING;
diff --git a/src/session_p.h b/src/session_p.h
index 5a0c899..845fd3f 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -354,11 +354,9 @@
 
     /* Transport implementation */
     NC_TRANSPORT_IMPL ti_type;   /**< transport implementation type to select items from ti union */
-    pthread_mutex_t *ti_lock;    /**< lock to access ti. Note that in case of libssh TI, it can be shared with other
-                                      NETCONF sessions on the same SSH session (but different SSH channel) */
-    pthread_cond_t *ti_cond;     /**< ti_inuse condition */
-    volatile int *ti_inuse;      /**< variable indicating whether TI is being communicated on or not, protected by
-                                      ti_cond and ti_lock */
+    pthread_mutex_t *io_lock;    /**< input/output lock, note that in case of libssh TI, it will be shared
+                                      with other NETCONF sessions on the same SSH session (but different SSH channel) */
+
     union {
         struct {
             int in;              /**< input file descriptor */
@@ -406,6 +404,12 @@
             time_t session_start;          /**< real time the session was created */
             time_t last_rpc;               /**< monotonic time (seconds) the last RPC was received on this session */
             int ntf_status;                /**< flag whether the session is subscribed to any stream */
+
+            pthread_mutex_t *rpc_lock;   /**< lock indicating RPC processing, this lock is always locked before io_lock!! */
+            pthread_cond_t *rpc_cond;    /**< RPC condition (tied with rpc_lock and rpc_inuse) */
+            volatile int *rpc_inuse;     /**< variable indicating whether there is RPC being processed or not (tied with
+                                              rpc_cond and rpc_lock) */
+
             pthread_mutex_t *ch_lock;      /**< Call Home thread lock */
             pthread_cond_t *ch_cond;       /**< Call Home thread condition */
 
@@ -460,7 +464,7 @@
 
 void *nc_realloc(void *ptr, size_t size);
 
-NC_MSG_TYPE nc_send_msg(struct nc_session *session, struct lyd_node *op);
+NC_MSG_TYPE nc_send_msg_io(struct nc_session *session, int io_timeout, struct lyd_node *op);
 
 #ifndef HAVE_PTHREAD_MUTEX_TIMEDLOCK
 int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime);
@@ -474,11 +478,15 @@
 
 void nc_addtimespec(struct timespec *ts, uint32_t msec);
 
-struct nc_session *nc_new_session(int not_allocate_ti);
+struct nc_session *nc_new_session(NC_SIDE side, int shared_ti);
 
-int nc_session_lock(struct nc_session *session, int timeout, const char *func);
+int nc_session_rpc_lock(struct nc_session *session, int timeout, const char *func);
 
-int nc_session_unlock(struct nc_session *session, int timeout, const char *func);
+int nc_session_rpc_unlock(struct nc_session *session, int timeout, const char *func);
+
+int nc_session_io_lock(struct nc_session *session, int timeout, const char *func);
+
+int nc_session_io_unlock(struct nc_session *session, const char *func);
 
 int nc_ps_lock(struct nc_pollsession *ps, uint8_t *id, const char *func);
 
@@ -501,7 +509,7 @@
  * @return NC_MSG_HELLO on success, NC_MSG_BAD_HELLO on client \<hello\> message parsing fail
  * (server-side only), NC_MSG_WOULDBLOCK on timeout, NC_MSG_ERROR on other error.
  */
-NC_MSG_TYPE nc_handshake(struct nc_session *session);
+NC_MSG_TYPE nc_handshake_io(struct nc_session *session);
 
 /**
  * @brief Create a socket connection.
@@ -680,14 +688,14 @@
  * libyang XML tree and the message type is detected from the top level element.
  *
  * @param[in] session NETCONF session from which the message is being read.
- * @param[in] timeout Timeout in milliseconds. Negative value means infinite timeout,
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
  *            zero value causes to return immediately.
  * @param[out] data XML tree built from the read data.
  * @return Type of the read message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
  * (or zero) value and it passed out without any data on the wire. #NC_MSG_ERROR is
  * returned on error and #NC_MSG_NONE is never returned by this function.
  */
-NC_MSG_TYPE nc_read_msg_poll(struct nc_session* session, int timeout, struct lyxml_elem **data);
+NC_MSG_TYPE nc_read_msg_poll_io(struct nc_session* session, int io_timeout, struct lyxml_elem **data);
 
 /**
  * @brief Read message from the wire.
@@ -696,17 +704,23 @@
  * libyang XML tree and the message type is detected from the top level element.
  *
  * @param[in] session NETCONF session from which the message is being read.
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
+ *            zero value causes to return immediately.
  * @param[out] data XML tree built from the read data.
+ * @param[in] passing_io_lock True if \p session IO lock is already held. This function always unlocks
+ *            it before returning!
  * @return Type of the read message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
  * (or zero) value and it passed out without any data on the wire. #NC_MSG_ERROR is
  * returned on error and #NC_MSG_NONE is never returned by this function.
  */
-NC_MSG_TYPE nc_read_msg(struct nc_session* session, struct lyxml_elem **data);
+NC_MSG_TYPE nc_read_msg_io(struct nc_session* session, int io_timeout, struct lyxml_elem **data, int passing_io_lock);
 
 /**
  * @brief Write message into wire.
  *
  * @param[in] session NETCONF session to which the message will be written.
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
+ *            zero value causes to return immediately.
  * @param[in] type The type of the message to write, specified as #NC_MSG_TYPE value. According to the type, the
  * specific additional parameters are required or accepted:
  * - #NC_MSG_RPC
@@ -719,10 +733,16 @@
  *   - `struct lyxml_node *rpc_elem;` - root of the RPC object to reply to. Required parameter.
  *   - `struct nc_server_reply *reply;` - RPC reply. Required parameter.
  * - #NC_MSG_NOTIF
- *   - TODO: content
- * @return 0 on success
+ *   - `struct nc_server_notif *notif;` - notification object. Required parameter.
+ * - #NC_MSG_HELLO
+ *   - `const char **capabs;` - capabilities array ended with NULL. Required parameter.
+ *   - `uint32_t *sid;` - session ID to be included in the hello message. Optional parameter.
+ *
+ * @return Type of the written message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
+ * (or zero) value and IO lock could not be acquired in that time. #NC_MSG_ERROR is
+ * returned on error and #NC_MSG_NONE is never returned by this function.
  */
-int nc_write_msg(struct nc_session *session, int type, ...);
+NC_MSG_TYPE nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...);
 
 /**
  * @brief Check whether a session is still connected (on transport layer).
diff --git a/src/session_server.c b/src/session_server.c
index f76887f..defa5f5 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -633,18 +633,12 @@
     }
 
     /* prepare session structure */
-    *session = nc_new_session(0);
+    *session = nc_new_session(NC_SERVER, 0);
     if (!(*session)) {
         ERRMEM;
         return NC_MSG_ERROR;
     }
     (*session)->status = NC_STATUS_STARTING;
-    (*session)->side = NC_SERVER;
-
-    /* transport lock */
-    pthread_mutex_init((*session)->ti_lock, NULL);
-    pthread_cond_init((*session)->ti_cond, NULL);
-    *(*session)->ti_inuse = 0;
 
     /* transport specific data */
     (*session)->ti_type = NC_TI_FD;
@@ -661,7 +655,7 @@
     pthread_spin_unlock(&server_opts.sid_lock);
 
     /* NETCONF handshake */
-    msgtype = nc_handshake(*session);
+    msgtype = nc_handshake_io(*session);
     if (msgtype != NC_MSG_HELLO) {
         nc_session_free(*session, NULL);
         *session = NULL;
@@ -1003,14 +997,14 @@
     return ps->session_count;
 }
 
-/* must be called holding the session lock!
+/* should be called holding the session RPC lock! IO lock will be acquired as needed
  * returns: NC_PSPOLL_ERROR,
  *          NC_PSPOLL_BAD_RPC,
  *          NC_PSPOLL_BAD_RPC | NC_PSPOLL_REPLY_ERROR,
  *          NC_PSPOLL_RPC
  */
 static int
-nc_server_recv_rpc(struct nc_session *session, struct nc_server_rpc **rpc)
+nc_server_recv_rpc_io(struct nc_session *session, int io_timeout, struct nc_server_rpc **rpc)
 {
     struct lyxml_elem *xml = NULL;
     NC_MSG_TYPE msgtype;
@@ -1028,7 +1022,7 @@
         return NC_PSPOLL_ERROR;
     }
 
-    msgtype = nc_read_msg(session, &xml);
+    msgtype = nc_read_msg_io(session, io_timeout, &xml, 0);
 
     switch (msgtype) {
     case NC_MSG_RPC:
@@ -1044,7 +1038,7 @@
         if (!(*rpc)->tree) {
             /* parsing RPC failed */
             reply = nc_server_reply_err(nc_err_libyang(server_opts.ctx));
-            ret = nc_write_msg(session, NC_MSG_REPLY, xml, reply);
+            ret = nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, xml, reply);
             nc_server_reply_free(reply);
             if (ret == -1) {
                 ERR("Session %u: failed to write reply.", session->id);
@@ -1069,7 +1063,7 @@
         goto error;
     default:
         /* NC_MSG_ERROR,
-         * NC_MSG_WOULDBLOCK and NC_MSG_NONE is not returned by nc_read_msg()
+         * NC_MSG_WOULDBLOCK and NC_MSG_NONE is not returned by nc_read_msg_io()
          */
         ret = NC_PSPOLL_ERROR;
         break;
@@ -1093,8 +1087,7 @@
 API NC_MSG_TYPE
 nc_server_notif_send(struct nc_session *session, struct nc_server_notif *notif, int timeout)
 {
-    NC_MSG_TYPE result = NC_MSG_NOTIF;
-    int ret;
+    NC_MSG_TYPE ret;
 
     /* check parameters */
     if (!session || (session->side != NC_SERVER) || !session->opts.server.ntf_status) {
@@ -1105,39 +1098,30 @@
         return NC_MSG_ERROR;
     }
 
-    /* reading an RPC and sending a reply must be atomic (no other RPC should be read) */
-    ret = nc_session_lock(session, timeout, __func__);
-    if (ret < 0) {
-        return NC_MSG_ERROR;
-    } else if (!ret) {
-        return NC_MSG_WOULDBLOCK;
-    }
-
-    ret = nc_write_msg(session, NC_MSG_NOTIF, notif);
-    if (ret == -1) {
+    /* we do not need RPC lock for this, IO lock will be acquired properly */
+    ret = nc_write_msg_io(session, timeout, NC_MSG_NOTIF, notif);
+    if (ret == NC_MSG_ERROR) {
         ERR("Session %u: failed to write notification.", session->id);
-        result = NC_MSG_ERROR;
     }
 
-    nc_session_unlock(session, timeout, __func__);
-
-    return result;
+    return ret;
 }
 
-/* must be called holding the session lock!
+/* must be called holding the session RPC lock! IO lock will be acquired as needed
  * returns: NC_PSPOLL_ERROR,
  *          NC_PSPOLL_ERROR | NC_PSPOLL_REPLY_ERROR,
  *          NC_PSPOLL_REPLY_ERROR,
  *          0
  */
 static int
-nc_server_send_reply(struct nc_session *session, struct nc_server_rpc *rpc)
+nc_server_send_reply_io(struct nc_session *session, int io_timeout, struct nc_server_rpc *rpc)
 {
     nc_rpc_clb clb;
     struct nc_server_reply *reply;
     struct lys_node *rpc_act = NULL;
     struct lyd_node *next, *elem;
-    int ret = 0, r;
+    int ret = 0;
+    NC_MSG_TYPE r;
 
     if (!rpc) {
         ERRINT;
@@ -1173,13 +1157,13 @@
     if (!reply) {
         reply = nc_server_reply_err(nc_err(NC_ERR_OP_FAILED, NC_ERR_TYPE_APP));
     }
-    r = nc_write_msg(session, NC_MSG_REPLY, rpc->root, reply);
+    r = nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, rpc->root, reply);
     if (reply->type == NC_RPL_ERROR) {
         ret |= NC_PSPOLL_REPLY_ERROR;
     }
     nc_server_reply_free(reply);
 
-    if (r == -1) {
+    if (r != NC_MSG_REPLY) {
         ERR("Session %u: failed to write reply.", session->id);
         ret |= NC_PSPOLL_ERROR;
     }
@@ -1192,7 +1176,7 @@
     return ret;
 }
 
-/* session must be running and session lock held!
+/* session must be running and session RPC lock held!
  * returns: NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR, (msg filled)
  *          NC_PSPOLL_ERROR, (msg filled)
  *          NC_PSPOLL_TIMEOUT,
@@ -1201,7 +1185,7 @@
  *          NC_PSPOLL_SSH_MSG
  */
 static int
-nc_ps_poll_session(struct nc_session *session, time_t now_mono, char *msg)
+nc_ps_poll_session_io(struct nc_session *session, int io_timeout, time_t now_mono, char *msg)
 {
     struct pollfd pfd;
     int r, ret;
@@ -1218,6 +1202,14 @@
         return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
     }
 
+    r = nc_session_io_lock(session, io_timeout, __func__);
+    if (r < 0) {
+        sprintf(msg, "session IO lock failed to be acquired");
+        return NC_PSPOLL_ERROR;
+    } else if (!r) {
+        return NC_PSPOLL_TIMEOUT;
+    }
+
     switch (session->ti_type) {
 #ifdef NC_ENABLED_SSH
     case NC_TI_LIBSSH:
@@ -1336,6 +1328,7 @@
         break;
     }
 
+    nc_session_io_unlock(session, __func__);
     return ret;
 }
 
@@ -1385,8 +1378,8 @@
             cur_ps_session = ps->sessions[i];
             cur_session = cur_ps_session->session;
 
-            /* SESSION LOCK */
-            r = nc_session_lock(cur_session, 0, __func__);
+            /* SESSION RPC LOCK */
+            r = nc_session_rpc_lock(cur_session, 0, __func__);
             if (r == -1) {
                 ret = NC_PSPOLL_ERROR;
             } else if (r == 1) {
@@ -1397,7 +1390,7 @@
                         /* session is fine, work with it */
                         cur_ps_session->state = NC_PS_STATE_BUSY;
 
-                        ret = nc_ps_poll_session(cur_session, ts_cur.tv_sec, msg);
+                        ret = nc_ps_poll_session_io(cur_session, NC_SESSION_LOCK_TIMEOUT, ts_cur.tv_sec, msg);
                         switch (ret) {
                         case NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR:
                             ERR("Session %u: %s.", cur_session->id, msg);
@@ -1438,10 +1431,10 @@
                     break;
                 }
 
-                /* keep the session locked only in this one case */
+                /* keep RPC lock in this one case */
                 if (ret != NC_PSPOLL_RPC) {
-                    /* SESSION UNLOCK */
-                    nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+                    /* SESSION RPC UNLOCK */
+                    nc_session_rpc_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
                 }
             } else {
                 /* timeout */
@@ -1494,9 +1487,9 @@
     /* PS UNLOCK */
     nc_ps_unlock(ps, q_id, __func__);
 
-    /* we have some data available and the session is locked */
+    /* we have some data available and the session is RPC locked (but not IO locked) */
     if (ret == NC_PSPOLL_RPC) {
-        ret = nc_server_recv_rpc(cur_session, &rpc);
+        ret = nc_server_recv_rpc_io(cur_session, timeout, &rpc);
         if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
             if (cur_session->status != NC_STATUS_RUNNING) {
                 ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
@@ -1508,7 +1501,7 @@
             cur_session->opts.server.last_rpc = ts_cur.tv_sec;
 
             /* process RPC, not needed afterwards */
-            ret |= nc_server_send_reply(cur_session, rpc);
+            ret |= nc_server_send_reply_io(cur_session, timeout, rpc);
             nc_server_rpc_free(rpc, server_opts.ctx);
 
             if (cur_session->status != NC_STATUS_RUNNING) {
@@ -1522,8 +1515,8 @@
             }
         }
 
-        /* SESSION UNLOCK */
-        nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+        /* SESSION RPC UNLOCK */
+        nc_session_rpc_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
     }
 
     return ret;
@@ -1915,7 +1908,7 @@
 
     sock = ret;
 
-    *session = nc_new_session(0);
+    *session = nc_new_session(NC_SERVER, 0);
     if (!(*session)) {
         ERRMEM;
         close(sock);
@@ -1924,17 +1917,11 @@
         goto cleanup;
     }
     (*session)->status = NC_STATUS_STARTING;
-    (*session)->side = NC_SERVER;
     (*session)->ctx = server_opts.ctx;
     (*session)->flags = NC_SESSION_SHAREDCTX;
     (*session)->host = lydict_insert_zc(server_opts.ctx, host);
     (*session)->port = port;
 
-    /* transport lock */
-    pthread_mutex_init((*session)->ti_lock, NULL);
-    pthread_cond_init((*session)->ti_cond, NULL);
-    *(*session)->ti_inuse = 0;
-
     /* sock gets assigned to session or closed */
 #ifdef NC_ENABLED_SSH
     if (server_opts.endpts[bind_idx].ti == NC_TI_LIBSSH) {
@@ -1982,7 +1969,7 @@
     pthread_spin_unlock(&server_opts.sid_lock);
 
     /* NETCONF handshake */
-    msgtype = nc_handshake(*session);
+    msgtype = nc_handshake_io(*session);
     if (msgtype != NC_MSG_HELLO) {
         nc_session_free(*session, NULL);
         *session = NULL;
@@ -2661,24 +2648,18 @@
         return NC_MSG_ERROR;
     }
 
-    *session = nc_new_session(0);
+    *session = nc_new_session(NC_SERVER, 0);
     if (!(*session)) {
         ERRMEM;
         close(sock);
         return NC_MSG_ERROR;
     }
     (*session)->status = NC_STATUS_STARTING;
-    (*session)->side = NC_SERVER;
     (*session)->ctx = server_opts.ctx;
     (*session)->flags = NC_SESSION_SHAREDCTX;
     (*session)->host = lydict_insert(server_opts.ctx, endpt->address, 0);
     (*session)->port = endpt->port;
 
-    /* transport lock */
-    pthread_mutex_init((*session)->ti_lock, NULL);
-    pthread_cond_init((*session)->ti_cond, NULL);
-    *(*session)->ti_inuse = 0;
-
     /* sock gets assigned to session or closed */
 #ifdef NC_ENABLED_SSH
     if (client->ti == NC_TI_LIBSSH) {
@@ -2725,7 +2706,7 @@
     pthread_spin_unlock(&server_opts.sid_lock);
 
     /* NETCONF handshake */
-    msgtype = nc_handshake(*session);
+    msgtype = nc_handshake_io(*session);
     if (msgtype != NC_MSG_HELLO) {
         goto fail;
     }
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 639475f..88aa0a3 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -990,7 +990,7 @@
         session->flags |= NC_SESSION_SSH_SUBSYS_NETCONF;
     } else {
         /* additional channel subsystem request, new session is ready as far as SSH is concerned */
-        new_session = nc_new_session(1);
+        new_session = nc_new_session(NC_SERVER, 1);
         if (!new_session) {
             ERRMEM;
             return -1;
@@ -1005,11 +1005,8 @@
         session->ti.libssh.next = new_session;
 
         new_session->status = NC_STATUS_STARTING;
-        new_session->side = NC_SERVER;
         new_session->ti_type = NC_TI_LIBSSH;
-        new_session->ti_lock = session->ti_lock;
-        new_session->ti_cond = session->ti_cond;
-        new_session->ti_inuse = session->ti_inuse;
+        new_session->io_lock = session->io_lock;
         new_session->ti.libssh.channel = channel;
         new_session->ti.libssh.session = session->ti.libssh.session;
         new_session->username = lydict_insert(server_opts.ctx, session->username, 0);
@@ -1230,27 +1227,18 @@
             return -1;
         }
 
-        ret = nc_session_lock(session, timeout, __func__);
-        if (ret != 1) {
-            return ret;
-        }
-
         ret = ssh_execute_message_callbacks(session->ti.libssh.session);
         if (ret != SSH_OK) {
             ERR("Failed to receive SSH messages on a session (%s).",
                 ssh_get_error(session->ti.libssh.session));
-            nc_session_unlock(session, timeout, __func__);
             return -1;
         }
 
         if (!session->ti.libssh.channel) {
-            /* we did not receive channel-open, timeout */
-            nc_session_unlock(session, timeout, __func__);
             return 0;
         }
 
         ret = ssh_execute_message_callbacks(session->ti.libssh.session);
-        nc_session_unlock(session, timeout, __func__);
         if (ret != SSH_OK) {
             ERR("Failed to receive SSH messages on a session (%s).",
                 ssh_get_error(session->ti.libssh.session));
@@ -1275,13 +1263,7 @@
             return -1;
         }
 
-        ret = nc_session_lock(session, timeout, __func__);
-        if (ret != 1) {
-            return ret;
-        }
-
         ret = ssh_execute_message_callbacks(session->ti.libssh.session);
-        nc_session_unlock(session, timeout, __func__);
         if (ret != SSH_OK) {
             ERR("Failed to receive SSH messages on a session (%s).",
                 ssh_get_error(session->ti.libssh.session));
@@ -1535,7 +1517,7 @@
     pthread_spin_unlock(&server_opts.sid_lock);
 
     /* NETCONF handshake */
-    msgtype = nc_handshake(new_session);
+    msgtype = nc_handshake_io(new_session);
     if (msgtype != NC_MSG_HELLO) {
         return msgtype;
     }
@@ -1608,7 +1590,7 @@
     pthread_spin_unlock(&server_opts.sid_lock);
 
     /* NETCONF handshake */
-    msgtype = nc_handshake(new_session);
+    msgtype = nc_handshake_io(new_session);
     if (msgtype != NC_MSG_HELLO) {
         return msgtype;
     }
diff --git a/tests/test_fd_comm.c b/tests/test_fd_comm.c
index 6ee3a05..f42f1b3 100644
--- a/tests/test_fd_comm.c
+++ b/tests/test_fd_comm.c
@@ -38,6 +38,7 @@
 struct nc_session *server_session;
 struct nc_session *client_session;
 struct ly_ctx *ctx;
+volatile int glob_state;
 
 struct nc_server_reply *
 my_get_rpc_clb(struct lyd_node *rpc, struct nc_session *session)
@@ -63,6 +64,65 @@
     return nc_server_reply_data(data, NC_WD_EXPLICIT, NC_PARAMTYPE_FREE);
 }
 
+struct nc_server_reply *
+my_commit_rpc_clb(struct lyd_node *rpc, struct nc_session *session)
+{
+    assert_string_equal(rpc->schema->name, "commit");
+    assert_ptr_equal(session, server_session);
+
+    /* update state */
+    glob_state = 1;
+
+    /* wait until the client receives the notification */
+    while (glob_state != 3) {
+        usleep(100000);
+    }
+
+    return nc_server_reply_ok();
+}
+
+static struct nc_session *
+test_new_session(NC_SIDE side)
+{
+    struct nc_session *sess;
+
+    sess = calloc(1, sizeof *sess);
+    if (!sess) {
+        return NULL;
+    }
+
+    sess->side = side;
+
+    if (side == NC_SERVER) {
+        sess->opts.server.rpc_lock = malloc(sizeof *sess->opts.server.rpc_lock);
+        sess->opts.server.rpc_cond = malloc(sizeof *sess->opts.server.rpc_cond);
+        sess->opts.server.rpc_inuse = malloc(sizeof *sess->opts.server.rpc_inuse);
+        if (!sess->opts.server.rpc_lock || !sess->opts.server.rpc_cond || !sess->opts.server.rpc_inuse) {
+            goto error;
+        }
+        pthread_mutex_init(sess->opts.server.rpc_lock, NULL);
+        pthread_cond_init(sess->opts.server.rpc_cond, NULL);
+        *sess->opts.server.rpc_inuse = 0;
+    }
+
+    sess->io_lock = malloc(sizeof *sess->io_lock);
+    if (!sess->io_lock) {
+        goto error;
+    }
+    pthread_mutex_init(sess->io_lock, NULL);
+
+    return sess;
+
+error:
+    if (side == NC_SERVER) {
+        free(sess->opts.server.rpc_lock);
+        free(sess->opts.server.rpc_cond);
+        free((int *)sess->opts.server.rpc_inuse);
+    }
+    free(sess);
+    return NULL;
+}
+
 static int
 setup_sessions(void **state)
 {
@@ -73,34 +133,20 @@
     socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
 
     /* create server session */
-    server_session = calloc(1, sizeof *server_session);
+    server_session = test_new_session(NC_SERVER);
     server_session->status = NC_STATUS_RUNNING;
-    server_session->side = NC_SERVER;
     server_session->id = 1;
     server_session->ti_type = NC_TI_FD;
-    server_session->ti_lock = malloc(sizeof *server_session->ti_lock);
-    pthread_mutex_init(server_session->ti_lock, NULL);
-    server_session->ti_cond = malloc(sizeof *server_session->ti_cond);
-    pthread_cond_init(server_session->ti_cond, NULL);
-    server_session->ti_inuse = malloc(sizeof *server_session->ti_inuse);
-    *server_session->ti_inuse = 0;
     server_session->ti.fd.in = sock[0];
     server_session->ti.fd.out = sock[0];
     server_session->ctx = ctx;
     server_session->flags = NC_SESSION_SHAREDCTX;
 
     /* create client session */
-    client_session = calloc(1, sizeof *server_session);
+    client_session = test_new_session(NC_CLIENT);
     client_session->status = NC_STATUS_RUNNING;
-    client_session->side = NC_CLIENT;
     client_session->id = 1;
     client_session->ti_type = NC_TI_FD;
-    client_session->ti_lock = malloc(sizeof *client_session->ti_lock);
-    pthread_mutex_init(client_session->ti_lock, NULL);
-    client_session->ti_cond = malloc(sizeof *client_session->ti_cond);
-    pthread_cond_init(client_session->ti_cond, NULL);
-    client_session->ti_inuse = malloc(sizeof *client_session->ti_inuse);
-    *client_session->ti_inuse = 0;
     client_session->ti.fd.in = sock[1];
     client_session->ti.fd.out = sock[1];
     client_session->ctx = ctx;
@@ -116,20 +162,10 @@
     (void)state;
 
     close(server_session->ti.fd.in);
-    pthread_mutex_destroy(server_session->ti_lock);
-    pthread_cond_destroy(server_session->ti_cond);
-    free(server_session->ti_lock);
-    free(server_session->ti_cond);
-    free((int *)server_session->ti_inuse);
-    free(server_session);
+    nc_session_free(server_session, NULL);
 
     close(client_session->ti.fd.in);
-    pthread_mutex_destroy(client_session->ti_lock);
-    pthread_cond_destroy(client_session->ti_cond);
-    free(client_session->ti_lock);
-    free(client_session->ti_cond);
-    free((int *)client_session->ti_inuse);
-    free(client_session);
+    nc_session_free(client_session, NULL);
 
     return 0;
 }
@@ -312,12 +348,127 @@
     test_send_recv_data();
 }
 
-/* TODO
+static void
+test_notif_clb(struct nc_session *session, const struct nc_notif *notif)
+{
+    assert_ptr_equal(session, client_session);
+    assert_string_equal(notif->tree->schema->name, "notificationComplete");
+
+    /* client notification received, update state */
+    while (glob_state != 2) {
+        usleep(1000);
+    }
+    glob_state = 3;
+}
+
+static void *
+server_send_notif_thread(void *arg)
+{
+    NC_MSG_TYPE msg_type;
+    struct lyd_node *notif_tree;
+    struct nc_server_notif *notif;
+    char *buf;
+    (void)arg;
+
+    /* wait for the RPC callback to be called */
+    while (glob_state != 1) {
+        usleep(1000);
+    }
+
+    /* create notif */
+    notif_tree = lyd_new_path(NULL, ctx, "/nc-notifications:notificationComplete", NULL, 0, 0);
+    assert_non_null(notif_tree);
+    buf = malloc(64);
+    assert_non_null(buf);
+    notif = nc_server_notif_new(notif_tree, nc_time2datetime(time(NULL), NULL, buf), NC_PARAMTYPE_FREE);
+    assert_non_null(notif);
+
+    /* send notif */
+    nc_session_set_notif_status(server_session, 1);
+    msg_type = nc_server_notif_send(server_session, notif, 100);
+    nc_server_notif_free(notif);
+    assert_int_equal(msg_type, NC_MSG_NOTIF);
+
+    /* update state */
+    glob_state = 2;
+
+    return NULL;
+}
+
 static void
 test_send_recv_notif(void)
 {
+    int ret;
+    pthread_t tid;
+    uint64_t msgid;
+    NC_MSG_TYPE msgtype;
+    struct nc_rpc *rpc;
+    struct nc_reply *reply;
+    struct nc_pollsession *ps;
 
-}*/
+    /* client RPC */
+    rpc = nc_rpc_commit(0, 0, NULL, NULL, 0);
+    assert_non_null(rpc);
+
+    msgtype = nc_send_rpc(client_session, rpc, 0, &msgid);
+    assert_int_equal(msgtype, NC_MSG_RPC);
+
+    /* client subscription */
+    ret = nc_recv_notif_dispatch(client_session, test_notif_clb);
+    assert_int_equal(ret, 0);
+
+    /* create server */
+    ps = nc_ps_new();
+    assert_non_null(ps);
+    nc_ps_add_session(ps, server_session);
+
+    /* server will send a notification */
+    glob_state = 0;
+    ret = pthread_create(&tid, NULL, server_send_notif_thread, NULL);
+    assert_int_equal(ret, 0);
+
+    /* server blocked on RPC */
+    ret = nc_ps_poll(ps, 0, NULL);
+    assert_int_equal(ret, NC_PSPOLL_RPC);
+
+    /* RPC, notification finished fine */
+    assert_int_equal(glob_state, 3);
+
+    /* server finished */
+    ret = pthread_join(tid, NULL);
+    assert_int_equal(ret, 0);
+    nc_ps_free(ps);
+
+    /* client reply */
+    msgtype = nc_recv_reply(client_session, rpc, msgid, 0, 0, &reply);
+    assert_int_equal(msgtype, NC_MSG_REPLY);
+
+    nc_rpc_free(rpc);
+    assert_int_equal(reply->type, NC_RPL_OK);
+    nc_reply_free(reply);
+}
+
+static void
+test_send_recv_notif_10(void **state)
+{
+    (void)state;
+
+    server_session->version = NC_VERSION_10;
+    client_session->version = NC_VERSION_10;
+
+    test_send_recv_notif();
+}
+
+static void
+test_send_recv_notif_11(void **state)
+{
+    (void)state;
+
+    server_session->version = NC_VERSION_11;
+    client_session->version = NC_VERSION_11;
+
+    test_send_recv_notif();
+}
 
 int
 main(void)
@@ -336,6 +487,11 @@
 
     module = ly_ctx_load_module(ctx, "ietf-netconf", NULL);
     assert_non_null(module);
+    ret = lys_features_enable(module, "candidate");
+    assert_int_equal(ret, 0);
+
+    module = ly_ctx_load_module(ctx, "nc-notifications", NULL);
+    assert_non_null(module);
 
     /* set RPC callbacks */
     node = ly_ctx_get_node(module->ctx, NULL, "/ietf-netconf:get", 0);
@@ -346,15 +502,21 @@
     assert_non_null(node);
     lys_set_private(node, my_getconfig_rpc_clb);
 
+    node = ly_ctx_get_node(module->ctx, NULL, "/ietf-netconf:commit", 0);
+    assert_non_null(node);
+    lys_set_private(node, my_commit_rpc_clb);
+
     nc_server_init(ctx);
 
     const struct CMUnitTest comm[] = {
         cmocka_unit_test_setup_teardown(test_send_recv_ok_10, setup_sessions, teardown_sessions),
         cmocka_unit_test_setup_teardown(test_send_recv_error_10, setup_sessions, teardown_sessions),
         cmocka_unit_test_setup_teardown(test_send_recv_data_10, setup_sessions, teardown_sessions),
+        cmocka_unit_test_setup_teardown(test_send_recv_notif_10, setup_sessions, teardown_sessions),
         cmocka_unit_test_setup_teardown(test_send_recv_ok_11, setup_sessions, teardown_sessions),
         cmocka_unit_test_setup_teardown(test_send_recv_error_11, setup_sessions, teardown_sessions),
-        cmocka_unit_test_setup_teardown(test_send_recv_data_11, setup_sessions, teardown_sessions)
+        cmocka_unit_test_setup_teardown(test_send_recv_data_11, setup_sessions, teardown_sessions),
+        cmocka_unit_test_setup_teardown(test_send_recv_notif_11, setup_sessions, teardown_sessions),
     };
 
     ret = cmocka_run_group_tests(comm, NULL, NULL);
diff --git a/tests/test_io.c b/tests/test_io.c
index 28d7890..78a512f 100644
--- a/tests/test_io.c
+++ b/tests/test_io.c
@@ -62,12 +62,8 @@
     w->session->version = NC_VERSION_10;
     w->session->opts.client.msgid = 999;
     w->session->ti_type = NC_TI_FD;
-    w->session->ti_lock = malloc(sizeof *w->session->ti_lock);
-    pthread_mutex_init(w->session->ti_lock, NULL);
-    w->session->ti_cond = malloc(sizeof *w->session->ti_cond);
-    pthread_cond_init(w->session->ti_cond, NULL);
-    w->session->ti_inuse = malloc(sizeof *w->session->ti_inuse);
-    *w->session->ti_inuse = 0;
+    w->session->io_lock = malloc(sizeof *w->session->io_lock);
+    pthread_mutex_init(w->session->io_lock, NULL);
     w->session->ti.fd.in = STDIN_FILENO;
     w->session->ti.fd.out = STDOUT_FILENO;
 
@@ -141,6 +137,12 @@
     NC_MSG_TYPE type;
 
     w->session->side = NC_SERVER;
+    w->session->opts.server.rpc_lock = malloc(sizeof *w->session->opts.server.rpc_lock);
+    pthread_mutex_init(w->session->opts.server.rpc_lock, NULL);
+    w->session->opts.server.rpc_cond = malloc(sizeof *w->session->opts.server.rpc_cond);
+    pthread_cond_init(w->session->opts.server.rpc_cond, NULL);
+    w->session->opts.server.rpc_inuse = malloc(sizeof *w->session->opts.server.rpc_inuse);
+    *w->session->opts.server.rpc_inuse = 0;
 
     do {
         type = nc_send_rpc(w->session, w->rpc, 1000, &msgid);