CHANGE allow concurrent notifications and RPCs
Fixes cesnet/netopeer2#256
diff --git a/src/io.c b/src/io.c
index 5d2a5e4..5092fdf 100644
--- a/src/io.c
+++ b/src/io.c
@@ -262,11 +262,11 @@
return count;
}
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
NC_MSG_TYPE
-nc_read_msg(struct nc_session *session, struct lyxml_elem **data)
+nc_read_msg_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data, int passing_io_lock)
{
- int ret;
+ int ret, io_locked = passing_io_lock;
char *msg = NULL, *chunk;
uint64_t chunk_len, len = 0;
/* use timeout in milliseconds instead seconds */
@@ -279,18 +279,33 @@
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
ERR("Session %u: invalid session to read from.", session->id);
- return NC_MSG_ERROR;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_gettimespec_mono(&ts_act_timeout);
nc_addtimespec(&ts_act_timeout, NC_READ_ACT_TIMEOUT * 1000);
+ if (!io_locked) {
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ ret = NC_MSG_ERROR;
+ goto cleanup;
+ } else if (!ret) {
+ ret = NC_MSG_WOULDBLOCK;
+ goto cleanup;
+ }
+ io_locked = 1;
+ }
+
/* read the message */
switch (session->version) {
case NC_VERSION_10:
ret = nc_read_until(session, NC_VERSION_10_ENDTAG, 0, inact_timeout, &ts_act_timeout, &msg);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* cut off the end tag */
@@ -300,11 +315,13 @@
while (1) {
ret = nc_read_until(session, "\n#", 0, inact_timeout, &ts_act_timeout, NULL);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
ret = nc_read_until(session, "\n", 0, inact_timeout, &ts_act_timeout, &chunk);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
if (!strcmp(chunk, "#\n")) {
@@ -328,14 +345,16 @@
/* now we have size of next chunk, so read the chunk */
ret = nc_read_chunk(session, chunk_len, inact_timeout, &ts_act_timeout, &chunk);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* realloc message buffer, remember to count terminating null byte */
msg = realloc(msg, len + chunk_len + 1);
if (!msg) {
ERRMEM;
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
memcpy(msg + len, chunk, chunk_len);
len += chunk_len;
@@ -345,6 +364,12 @@
break;
}
+
+ /* SESSION IO UNLOCK */
+ assert(io_locked);
+ nc_session_io_unlock(session, __func__);
+ io_locked = 0;
+
DBG("Session %u: received message:\n%s\n", session->id, msg);
/* build XML tree */
@@ -388,7 +413,7 @@
/* NETCONF version 1.1 defines sending error reply from the server (RFC 6241 sec. 3) */
reply = nc_server_reply_err(nc_err(NC_ERR_MALFORMED_MSG));
- if (nc_write_msg(session, NC_MSG_REPLY, NULL, reply) == -1) {
+ if (nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, NULL, reply) != NC_MSG_REPLY) {
ERR("Session %u: unable to send a \"Malformed message\" error reply, terminating session.", session->id);
if (session->status != NC_STATUS_INVALID) {
session->status = NC_STATUS_INVALID;
@@ -397,19 +422,22 @@
}
nc_server_reply_free(reply);
}
+ ret = NC_MSG_ERROR;
-error:
- /* cleanup */
+cleanup:
+ if (io_locked) {
+ nc_session_io_unlock(session, __func__);
+ }
free(msg);
free(*data);
*data = NULL;
- return NC_MSG_ERROR;
+ return ret;
}
/* return -1 means either poll error or that session was invalidated (socket error), EINTR is handled inside */
static int
-nc_read_poll(struct nc_session *session, int timeout)
+nc_read_poll(struct nc_session *session, int io_timeout)
{
sigset_t sigmask, origmask;
int ret = -2;
@@ -424,7 +452,7 @@
#ifdef NC_ENABLED_SSH
case NC_TI_LIBSSH:
/* EINTR is handled, it resumes waiting */
- ret = ssh_channel_poll_timeout(session->ti.libssh.channel, timeout, 0);
+ ret = ssh_channel_poll_timeout(session->ti.libssh.channel, io_timeout, 0);
if (ret == SSH_ERROR) {
ERR("Session %u: SSH channel poll error (%s).", session->id,
ssh_get_error(session->ti.libssh.session));
@@ -468,7 +496,7 @@
sigfillset(&sigmask);
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
- ret = poll(&fds, 1, timeout);
+ ret = poll(&fds, 1, io_timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL);
break;
@@ -504,9 +532,9 @@
return ret;
}
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
NC_MSG_TYPE
-nc_read_msg_poll(struct nc_session *session, int timeout, struct lyxml_elem **data)
+nc_read_msg_poll_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data)
{
int ret;
@@ -518,16 +546,31 @@
return NC_MSG_ERROR;
}
- ret = nc_read_poll(session, timeout);
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ return NC_MSG_ERROR;
+ } else if (!ret) {
+ return NC_MSG_WOULDBLOCK;
+ }
+
+ ret = nc_read_poll(session, io_timeout);
if (ret == 0) {
/* timed out */
+
+ /* SESSION IO UNLOCK */
+ nc_session_io_unlock(session, __func__);
return NC_MSG_WOULDBLOCK;
} else if (ret < 0) {
/* poll error, error written */
+
+ /* SESSION IO UNLOCK */
+ nc_session_io_unlock(session, __func__);
return NC_MSG_ERROR;
}
- return nc_read_msg(session, data);
+ /* SESSION IO LOCK passed down */
+ return nc_read_msg_io(session, io_timeout, data, 1);
}
/* does not really log, only fatal errors */
@@ -1001,12 +1044,12 @@
nc_write_error_elem(arg, "rpc-error", 9, prefix, pref_len, 0, 0);
}
-/* return -1 can change session status */
-int
-nc_write_msg(struct nc_session *session, int type, ...)
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
+NC_MSG_TYPE
+nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...)
{
va_list ap;
- int count;
+ int count, ret;
const char *attrs, *base_prefix;
struct lyd_node *content;
struct lyxml_elem *rpc_elem;
@@ -1023,14 +1066,21 @@
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
ERR("Session %u: invalid session to write to.", session->id);
- return -1;
+ return NC_MSG_ERROR;
}
- va_start(ap, type);
-
arg.session = session;
arg.len = 0;
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ return NC_MSG_ERROR;
+ } else if (!ret) {
+ return NC_MSG_WOULDBLOCK;
+ }
+
+ va_start(ap, type);
switch (type) {
case NC_MSG_RPC:
@@ -1041,15 +1091,15 @@
NC_NS_BASE, session->opts.client.msgid + 1, attrs ? attrs : "");
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, content, LYD_XML, LYP_WITHSIBLINGS | LYP_NETCONF)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, "</rpc>", 6, 0);
@@ -1106,8 +1156,8 @@
}
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, ((struct nc_reply_data *)reply)->data, LYD_XML,
LYP_WITHSIBLINGS | LYP_NETCONF | wd)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
break;
case NC_RPL_ERROR:
@@ -1119,8 +1169,8 @@
default:
ERRINT;
nc_write_clb((void *)&arg, NULL, 0, 0);
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
if (rpc_elem && rpc_elem->ns && rpc_elem->ns->prefix) {
nc_write_clb((void *)&arg, "</", 2, 0);
@@ -1140,16 +1190,16 @@
nc_write_clb((void *)&arg, notif->eventtime, strlen(notif->eventtime), 0);
nc_write_clb((void *)&arg, "</eventTime>", 12, 0);
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, notif->tree, LYD_XML, 0)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, "</notification>", 15, 0);
break;
case NC_MSG_HELLO:
if (session->version != NC_VERSION_10) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
capabilities = va_arg(ap, const char **);
sid = va_arg(ap, uint32_t*);
@@ -1157,8 +1207,8 @@
count = asprintf(&buf, "<hello xmlns=\"%s\"><capabilities>", NC_NS_BASE);
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
@@ -1171,8 +1221,8 @@
count = asprintf(&buf, "</capabilities><session-id>%u</session-id></hello>", *sid);
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
@@ -1182,20 +1232,25 @@
break;
default:
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* flush message */
nc_write_clb((void *)&arg, NULL, 0, 0);
- va_end(ap);
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
/* error was already written */
- return -1;
+ ret = NC_MSG_ERROR;
+ } else {
+ /* specific message successfully sent */
+ ret = type;
}
- return 0;
+cleanup:
+ va_end(ap);
+ nc_session_io_unlock(session, __func__);
+ return ret;
}
void *