CHANGE allow concurrent notifications and RPCs

Fixes cesnet/netopeer2#256
diff --git a/src/io.c b/src/io.c
index 5d2a5e4..5092fdf 100644
--- a/src/io.c
+++ b/src/io.c
@@ -262,11 +262,11 @@
     return count;
 }
 
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
 NC_MSG_TYPE
-nc_read_msg(struct nc_session *session, struct lyxml_elem **data)
+nc_read_msg_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data, int passing_io_lock)
 {
-    int ret;
+    int ret, io_locked = passing_io_lock;
     char *msg = NULL, *chunk;
     uint64_t chunk_len, len = 0;
     /* use timeout in milliseconds instead seconds */
@@ -279,18 +279,33 @@
 
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         ERR("Session %u: invalid session to read from.", session->id);
-        return NC_MSG_ERROR;
+        ret = NC_MSG_ERROR;
+        goto cleanup;
     }
 
     nc_gettimespec_mono(&ts_act_timeout);
     nc_addtimespec(&ts_act_timeout, NC_READ_ACT_TIMEOUT * 1000);
 
+    if (!io_locked) {
+        /* SESSION IO LOCK */
+        ret = nc_session_io_lock(session, io_timeout, __func__);
+        if (ret < 0) {
+            ret = NC_MSG_ERROR;
+            goto cleanup;
+        } else if (!ret) {
+            ret = NC_MSG_WOULDBLOCK;
+            goto cleanup;
+        }
+        io_locked = 1;
+    }
+
     /* read the message */
     switch (session->version) {
     case NC_VERSION_10:
         ret = nc_read_until(session, NC_VERSION_10_ENDTAG, 0, inact_timeout, &ts_act_timeout, &msg);
         if (ret == -1) {
-            goto error;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
 
         /* cut off the end tag */
@@ -300,11 +315,13 @@
         while (1) {
             ret = nc_read_until(session, "\n#", 0, inact_timeout, &ts_act_timeout, NULL);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             ret = nc_read_until(session, "\n", 0, inact_timeout, &ts_act_timeout, &chunk);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
 
             if (!strcmp(chunk, "#\n")) {
@@ -328,14 +345,16 @@
             /* now we have size of next chunk, so read the chunk */
             ret = nc_read_chunk(session, chunk_len, inact_timeout, &ts_act_timeout, &chunk);
             if (ret == -1) {
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
 
             /* realloc message buffer, remember to count terminating null byte */
             msg = realloc(msg, len + chunk_len + 1);
             if (!msg) {
                 ERRMEM;
-                goto error;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             memcpy(msg + len, chunk, chunk_len);
             len += chunk_len;
@@ -345,6 +364,12 @@
 
         break;
     }
+
+    /* SESSION IO UNLOCK */
+    assert(io_locked);
+    nc_session_io_unlock(session, __func__);
+    io_locked = 0;
+
     DBG("Session %u: received message:\n%s\n", session->id, msg);
 
     /* build XML tree */
@@ -388,7 +413,7 @@
         /* NETCONF version 1.1 defines sending error reply from the server (RFC 6241 sec. 3) */
         reply = nc_server_reply_err(nc_err(NC_ERR_MALFORMED_MSG));
 
-        if (nc_write_msg(session, NC_MSG_REPLY, NULL, reply) == -1) {
+        if (nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, NULL, reply) != NC_MSG_REPLY) {
             ERR("Session %u: unable to send a \"Malformed message\" error reply, terminating session.", session->id);
             if (session->status != NC_STATUS_INVALID) {
                 session->status = NC_STATUS_INVALID;
@@ -397,19 +422,22 @@
         }
         nc_server_reply_free(reply);
     }
+    ret = NC_MSG_ERROR;
 
-error:
-    /* cleanup */
+cleanup:
+    if (io_locked) {
+        nc_session_io_unlock(session, __func__);
+    }
     free(msg);
     free(*data);
     *data = NULL;
 
-    return NC_MSG_ERROR;
+    return ret;
 }
 
 /* return -1 means either poll error or that session was invalidated (socket error), EINTR is handled inside */
 static int
-nc_read_poll(struct nc_session *session, int timeout)
+nc_read_poll(struct nc_session *session, int io_timeout)
 {
     sigset_t sigmask, origmask;
     int ret = -2;
@@ -424,7 +452,7 @@
 #ifdef NC_ENABLED_SSH
     case NC_TI_LIBSSH:
         /* EINTR is handled, it resumes waiting */
-        ret = ssh_channel_poll_timeout(session->ti.libssh.channel, timeout, 0);
+        ret = ssh_channel_poll_timeout(session->ti.libssh.channel, io_timeout, 0);
         if (ret == SSH_ERROR) {
             ERR("Session %u: SSH channel poll error (%s).", session->id,
                 ssh_get_error(session->ti.libssh.session));
@@ -468,7 +496,7 @@
 
         sigfillset(&sigmask);
         pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
-        ret = poll(&fds, 1, timeout);
+        ret = poll(&fds, 1, io_timeout);
         pthread_sigmask(SIG_SETMASK, &origmask, NULL);
 
         break;
@@ -504,9 +532,9 @@
     return ret;
 }
 
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
 NC_MSG_TYPE
-nc_read_msg_poll(struct nc_session *session, int timeout, struct lyxml_elem **data)
+nc_read_msg_poll_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data)
 {
     int ret;
 
@@ -518,16 +546,31 @@
         return NC_MSG_ERROR;
     }
 
-    ret = nc_read_poll(session, timeout);
+    /* SESSION IO LOCK */
+    ret = nc_session_io_lock(session, io_timeout, __func__);
+    if (ret < 0) {
+        return NC_MSG_ERROR;
+    } else if (!ret) {
+        return NC_MSG_WOULDBLOCK;
+    }
+
+    ret = nc_read_poll(session, io_timeout);
     if (ret == 0) {
         /* timed out */
+
+        /* SESSION IO UNLOCK */
+        nc_session_io_unlock(session, __func__);
         return NC_MSG_WOULDBLOCK;
     } else if (ret < 0) {
         /* poll error, error written */
+
+        /* SESSION IO UNLOCK */
+        nc_session_io_unlock(session, __func__);
         return NC_MSG_ERROR;
     }
 
-    return nc_read_msg(session, data);
+    /* SESSION IO LOCK passed down */
+    return nc_read_msg_io(session, io_timeout, data, 1);
 }
 
 /* does not really log, only fatal errors */
@@ -1001,12 +1044,12 @@
     nc_write_error_elem(arg, "rpc-error", 9, prefix, pref_len, 0, 0);
 }
 
-/* return -1 can change session status */
-int
-nc_write_msg(struct nc_session *session, int type, ...)
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
+NC_MSG_TYPE
+nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...)
 {
     va_list ap;
-    int count;
+    int count, ret;
     const char *attrs, *base_prefix;
     struct lyd_node *content;
     struct lyxml_elem *rpc_elem;
@@ -1023,14 +1066,21 @@
 
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         ERR("Session %u: invalid session to write to.", session->id);
-        return -1;
+        return NC_MSG_ERROR;
     }
 
-    va_start(ap, type);
-
     arg.session = session;
     arg.len = 0;
 
+    /* SESSION IO LOCK */
+    ret = nc_session_io_lock(session, io_timeout, __func__);
+    if (ret < 0) {
+        return NC_MSG_ERROR;
+    } else if (!ret) {
+        return NC_MSG_WOULDBLOCK;
+    }
+
+    va_start(ap, type);
 
     switch (type) {
     case NC_MSG_RPC:
@@ -1041,15 +1091,15 @@
                          NC_NS_BASE, session->opts.client.msgid + 1, attrs ? attrs : "");
         if (count == -1) {
             ERRMEM;
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, buf, count, 0);
         free(buf);
 
         if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, content, LYD_XML, LYP_WITHSIBLINGS | LYP_NETCONF)) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, "</rpc>", 6, 0);
 
@@ -1106,8 +1156,8 @@
             }
             if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, ((struct nc_reply_data *)reply)->data, LYD_XML,
                               LYP_WITHSIBLINGS | LYP_NETCONF | wd)) {
-                va_end(ap);
-                return -1;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             break;
         case NC_RPL_ERROR:
@@ -1119,8 +1169,8 @@
         default:
             ERRINT;
             nc_write_clb((void *)&arg, NULL, 0, 0);
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         if (rpc_elem && rpc_elem->ns && rpc_elem->ns->prefix) {
             nc_write_clb((void *)&arg, "</", 2, 0);
@@ -1140,16 +1190,16 @@
         nc_write_clb((void *)&arg, notif->eventtime, strlen(notif->eventtime), 0);
         nc_write_clb((void *)&arg, "</eventTime>", 12, 0);
         if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, notif->tree, LYD_XML, 0)) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, "</notification>", 15, 0);
         break;
 
     case NC_MSG_HELLO:
         if (session->version != NC_VERSION_10) {
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         capabilities = va_arg(ap, const char **);
         sid = va_arg(ap, uint32_t*);
@@ -1157,8 +1207,8 @@
         count = asprintf(&buf, "<hello xmlns=\"%s\"><capabilities>", NC_NS_BASE);
         if (count == -1) {
             ERRMEM;
-            va_end(ap);
-            return -1;
+            ret = NC_MSG_ERROR;
+            goto cleanup;
         }
         nc_write_clb((void *)&arg, buf, count, 0);
         free(buf);
@@ -1171,8 +1221,8 @@
             count = asprintf(&buf, "</capabilities><session-id>%u</session-id></hello>", *sid);
             if (count == -1) {
                 ERRMEM;
-                va_end(ap);
-                return -1;
+                ret = NC_MSG_ERROR;
+                goto cleanup;
             }
             nc_write_clb((void *)&arg, buf, count, 0);
             free(buf);
@@ -1182,20 +1232,25 @@
         break;
 
     default:
-        va_end(ap);
-        return -1;
+        ret = NC_MSG_ERROR;
+        goto cleanup;
     }
 
     /* flush message */
     nc_write_clb((void *)&arg, NULL, 0, 0);
 
-    va_end(ap);
     if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
         /* error was already written */
-        return -1;
+        ret = NC_MSG_ERROR;
+    } else {
+        /* specific message successfully sent */
+        ret = type;
     }
 
-    return 0;
+cleanup:
+    va_end(ap);
+    nc_session_io_unlock(session, __func__);
+    return ret;
 }
 
 void *