CHANGE allow concurrent notifications and RPCs
Fixes cesnet/netopeer2#256
diff --git a/src/io.c b/src/io.c
index 5d2a5e4..5092fdf 100644
--- a/src/io.c
+++ b/src/io.c
@@ -262,11 +262,11 @@
return count;
}
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
NC_MSG_TYPE
-nc_read_msg(struct nc_session *session, struct lyxml_elem **data)
+nc_read_msg_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data, int passing_io_lock)
{
- int ret;
+ int ret, io_locked = passing_io_lock;
char *msg = NULL, *chunk;
uint64_t chunk_len, len = 0;
/* use timeout in milliseconds instead seconds */
@@ -279,18 +279,33 @@
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
ERR("Session %u: invalid session to read from.", session->id);
- return NC_MSG_ERROR;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_gettimespec_mono(&ts_act_timeout);
nc_addtimespec(&ts_act_timeout, NC_READ_ACT_TIMEOUT * 1000);
+ if (!io_locked) {
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ ret = NC_MSG_ERROR;
+ goto cleanup;
+ } else if (!ret) {
+ ret = NC_MSG_WOULDBLOCK;
+ goto cleanup;
+ }
+ io_locked = 1;
+ }
+
/* read the message */
switch (session->version) {
case NC_VERSION_10:
ret = nc_read_until(session, NC_VERSION_10_ENDTAG, 0, inact_timeout, &ts_act_timeout, &msg);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* cut off the end tag */
@@ -300,11 +315,13 @@
while (1) {
ret = nc_read_until(session, "\n#", 0, inact_timeout, &ts_act_timeout, NULL);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
ret = nc_read_until(session, "\n", 0, inact_timeout, &ts_act_timeout, &chunk);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
if (!strcmp(chunk, "#\n")) {
@@ -328,14 +345,16 @@
/* now we have size of next chunk, so read the chunk */
ret = nc_read_chunk(session, chunk_len, inact_timeout, &ts_act_timeout, &chunk);
if (ret == -1) {
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* realloc message buffer, remember to count terminating null byte */
msg = realloc(msg, len + chunk_len + 1);
if (!msg) {
ERRMEM;
- goto error;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
memcpy(msg + len, chunk, chunk_len);
len += chunk_len;
@@ -345,6 +364,12 @@
break;
}
+
+ /* SESSION IO UNLOCK */
+ assert(io_locked);
+ nc_session_io_unlock(session, __func__);
+ io_locked = 0;
+
DBG("Session %u: received message:\n%s\n", session->id, msg);
/* build XML tree */
@@ -388,7 +413,7 @@
/* NETCONF version 1.1 defines sending error reply from the server (RFC 6241 sec. 3) */
reply = nc_server_reply_err(nc_err(NC_ERR_MALFORMED_MSG));
- if (nc_write_msg(session, NC_MSG_REPLY, NULL, reply) == -1) {
+ if (nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, NULL, reply) != NC_MSG_REPLY) {
ERR("Session %u: unable to send a \"Malformed message\" error reply, terminating session.", session->id);
if (session->status != NC_STATUS_INVALID) {
session->status = NC_STATUS_INVALID;
@@ -397,19 +422,22 @@
}
nc_server_reply_free(reply);
}
+ ret = NC_MSG_ERROR;
-error:
- /* cleanup */
+cleanup:
+ if (io_locked) {
+ nc_session_io_unlock(session, __func__);
+ }
free(msg);
free(*data);
*data = NULL;
- return NC_MSG_ERROR;
+ return ret;
}
/* return -1 means either poll error or that session was invalidated (socket error), EINTR is handled inside */
static int
-nc_read_poll(struct nc_session *session, int timeout)
+nc_read_poll(struct nc_session *session, int io_timeout)
{
sigset_t sigmask, origmask;
int ret = -2;
@@ -424,7 +452,7 @@
#ifdef NC_ENABLED_SSH
case NC_TI_LIBSSH:
/* EINTR is handled, it resumes waiting */
- ret = ssh_channel_poll_timeout(session->ti.libssh.channel, timeout, 0);
+ ret = ssh_channel_poll_timeout(session->ti.libssh.channel, io_timeout, 0);
if (ret == SSH_ERROR) {
ERR("Session %u: SSH channel poll error (%s).", session->id,
ssh_get_error(session->ti.libssh.session));
@@ -468,7 +496,7 @@
sigfillset(&sigmask);
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
- ret = poll(&fds, 1, timeout);
+ ret = poll(&fds, 1, io_timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL);
break;
@@ -504,9 +532,9 @@
return ret;
}
-/* return NC_MSG_ERROR can change session status */
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
NC_MSG_TYPE
-nc_read_msg_poll(struct nc_session *session, int timeout, struct lyxml_elem **data)
+nc_read_msg_poll_io(struct nc_session *session, int io_timeout, struct lyxml_elem **data)
{
int ret;
@@ -518,16 +546,31 @@
return NC_MSG_ERROR;
}
- ret = nc_read_poll(session, timeout);
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ return NC_MSG_ERROR;
+ } else if (!ret) {
+ return NC_MSG_WOULDBLOCK;
+ }
+
+ ret = nc_read_poll(session, io_timeout);
if (ret == 0) {
/* timed out */
+
+ /* SESSION IO UNLOCK */
+ nc_session_io_unlock(session, __func__);
return NC_MSG_WOULDBLOCK;
} else if (ret < 0) {
/* poll error, error written */
+
+ /* SESSION IO UNLOCK */
+ nc_session_io_unlock(session, __func__);
return NC_MSG_ERROR;
}
- return nc_read_msg(session, data);
+ /* SESSION IO LOCK passed down */
+ return nc_read_msg_io(session, io_timeout, data, 1);
}
/* does not really log, only fatal errors */
@@ -1001,12 +1044,12 @@
nc_write_error_elem(arg, "rpc-error", 9, prefix, pref_len, 0, 0);
}
-/* return -1 can change session status */
-int
-nc_write_msg(struct nc_session *session, int type, ...)
+/* return NC_MSG_ERROR can change session status, acquires IO lock as needed */
+NC_MSG_TYPE
+nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...)
{
va_list ap;
- int count;
+ int count, ret;
const char *attrs, *base_prefix;
struct lyd_node *content;
struct lyxml_elem *rpc_elem;
@@ -1023,14 +1066,21 @@
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
ERR("Session %u: invalid session to write to.", session->id);
- return -1;
+ return NC_MSG_ERROR;
}
- va_start(ap, type);
-
arg.session = session;
arg.len = 0;
+ /* SESSION IO LOCK */
+ ret = nc_session_io_lock(session, io_timeout, __func__);
+ if (ret < 0) {
+ return NC_MSG_ERROR;
+ } else if (!ret) {
+ return NC_MSG_WOULDBLOCK;
+ }
+
+ va_start(ap, type);
switch (type) {
case NC_MSG_RPC:
@@ -1041,15 +1091,15 @@
NC_NS_BASE, session->opts.client.msgid + 1, attrs ? attrs : "");
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, content, LYD_XML, LYP_WITHSIBLINGS | LYP_NETCONF)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, "</rpc>", 6, 0);
@@ -1106,8 +1156,8 @@
}
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, ((struct nc_reply_data *)reply)->data, LYD_XML,
LYP_WITHSIBLINGS | LYP_NETCONF | wd)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
break;
case NC_RPL_ERROR:
@@ -1119,8 +1169,8 @@
default:
ERRINT;
nc_write_clb((void *)&arg, NULL, 0, 0);
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
if (rpc_elem && rpc_elem->ns && rpc_elem->ns->prefix) {
nc_write_clb((void *)&arg, "</", 2, 0);
@@ -1140,16 +1190,16 @@
nc_write_clb((void *)&arg, notif->eventtime, strlen(notif->eventtime), 0);
nc_write_clb((void *)&arg, "</eventTime>", 12, 0);
if (lyd_print_clb(nc_write_xmlclb, (void *)&arg, notif->tree, LYD_XML, 0)) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, "</notification>", 15, 0);
break;
case NC_MSG_HELLO:
if (session->version != NC_VERSION_10) {
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
capabilities = va_arg(ap, const char **);
sid = va_arg(ap, uint32_t*);
@@ -1157,8 +1207,8 @@
count = asprintf(&buf, "<hello xmlns=\"%s\"><capabilities>", NC_NS_BASE);
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
@@ -1171,8 +1221,8 @@
count = asprintf(&buf, "</capabilities><session-id>%u</session-id></hello>", *sid);
if (count == -1) {
ERRMEM;
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
nc_write_clb((void *)&arg, buf, count, 0);
free(buf);
@@ -1182,20 +1232,25 @@
break;
default:
- va_end(ap);
- return -1;
+ ret = NC_MSG_ERROR;
+ goto cleanup;
}
/* flush message */
nc_write_clb((void *)&arg, NULL, 0, 0);
- va_end(ap);
if ((session->status != NC_STATUS_RUNNING) && (session->status != NC_STATUS_STARTING)) {
/* error was already written */
- return -1;
+ ret = NC_MSG_ERROR;
+ } else {
+ /* specific message successfully sent */
+ ret = type;
}
- return 0;
+cleanup:
+ va_end(ap);
+ nc_session_io_unlock(session, __func__);
+ return ret;
}
void *
diff --git a/src/session.c b/src/session.c
index fae2988..098ae2b 100644
--- a/src/session.c
+++ b/src/session.c
@@ -145,7 +145,7 @@
#endif
struct nc_session *
-nc_new_session(int not_allocate_ti)
+nc_new_session(NC_SIDE side, int shared_ti)
{
struct nc_session *sess;
@@ -154,20 +154,38 @@
return NULL;
}
- if (!not_allocate_ti) {
- sess->ti_lock = malloc(sizeof *sess->ti_lock);
- sess->ti_cond = malloc(sizeof *sess->ti_cond);
- sess->ti_inuse = malloc(sizeof *sess->ti_inuse);
- if (!sess->ti_lock || !sess->ti_cond || !sess->ti_inuse) {
- free(sess->ti_lock);
- free(sess->ti_cond);
- free((int *)sess->ti_inuse);
- free(sess);
- return NULL;
+ sess->side = side;
+
+ if (side == NC_SERVER) {
+ sess->opts.server.rpc_lock = malloc(sizeof *sess->opts.server.rpc_lock);
+ sess->opts.server.rpc_cond = malloc(sizeof *sess->opts.server.rpc_cond);
+ sess->opts.server.rpc_inuse = malloc(sizeof *sess->opts.server.rpc_inuse);
+ if (!sess->opts.server.rpc_lock || !sess->opts.server.rpc_cond || !sess->opts.server.rpc_inuse) {
+ goto error;
}
+ pthread_mutex_init(sess->opts.server.rpc_lock, NULL);
+ pthread_cond_init(sess->opts.server.rpc_cond, NULL);
+ *sess->opts.server.rpc_inuse = 0;
+ }
+
+ if (!shared_ti) {
+ sess->io_lock = malloc(sizeof *sess->io_lock);
+ if (!sess->io_lock) {
+ goto error;
+ }
+ pthread_mutex_init(sess->io_lock, NULL);
}
return sess;
+
+error:
+ if (side == NC_SERVER) {
+ free(sess->opts.server.rpc_lock);
+ free(sess->opts.server.rpc_cond);
+ free((int *)sess->opts.server.rpc_inuse);
+ }
+ free(sess);
+ return NULL;
}
/*
@@ -176,49 +194,54 @@
* -1 - error
*/
int
-nc_session_lock(struct nc_session *session, int timeout, const char *func)
+nc_session_rpc_lock(struct nc_session *session, int timeout, const char *func)
{
int ret;
struct timespec ts_timeout;
+ if (session->side != NC_SERVER) {
+ ERRINT;
+ return -1;
+ }
+
if (timeout > 0) {
nc_gettimespec_real(&ts_timeout);
nc_addtimespec(&ts_timeout, timeout);
/* LOCK */
- ret = pthread_mutex_timedlock(session->ti_lock, &ts_timeout);
+ ret = pthread_mutex_timedlock(session->opts.server.rpc_lock, &ts_timeout);
if (!ret) {
- while (*session->ti_inuse) {
- ret = pthread_cond_timedwait(session->ti_cond, session->ti_lock, &ts_timeout);
+ while (*session->opts.server.rpc_inuse) {
+ ret = pthread_cond_timedwait(session->opts.server.rpc_cond, session->opts.server.rpc_lock, &ts_timeout);
if (ret) {
- pthread_mutex_unlock(session->ti_lock);
+ pthread_mutex_unlock(session->opts.server.rpc_lock);
break;
}
}
}
} else if (!timeout) {
- if (*session->ti_inuse) {
+ if (*session->opts.server.rpc_inuse) {
/* immediate timeout */
return 0;
}
/* LOCK */
- ret = pthread_mutex_trylock(session->ti_lock);
+ ret = pthread_mutex_trylock(session->opts.server.rpc_lock);
if (!ret) {
/* be extra careful, someone could have been faster */
- if (*session->ti_inuse) {
- pthread_mutex_unlock(session->ti_lock);
+ if (*session->opts.server.rpc_inuse) {
+ pthread_mutex_unlock(session->opts.server.rpc_lock);
return 0;
}
}
} else { /* timeout == -1 */
/* LOCK */
- ret = pthread_mutex_lock(session->ti_lock);
+ ret = pthread_mutex_lock(session->opts.server.rpc_lock);
if (!ret) {
- while (*session->ti_inuse) {
- ret = pthread_cond_wait(session->ti_cond, session->ti_lock);
+ while (*session->opts.server.rpc_inuse) {
+ ret = pthread_cond_wait(session->opts.server.rpc_cond, session->opts.server.rpc_lock);
if (ret) {
- pthread_mutex_unlock(session->ti_lock);
+ pthread_mutex_unlock(session->opts.server.rpc_lock);
break;
}
}
@@ -232,19 +255,19 @@
}
/* error */
- ERR("%s: failed to lock a session (%s).", func, strerror(ret));
+ ERR("%s: failed to RPC lock a session (%s).", func, strerror(ret));
return -1;
}
/* ok */
- assert(*session->ti_inuse == 0);
- *session->ti_inuse = 1;
+ assert(*session->opts.server.rpc_inuse == 0);
+ *session->opts.server.rpc_inuse = 1;
/* UNLOCK */
- ret = pthread_mutex_unlock(session->ti_lock);
+ ret = pthread_mutex_unlock(session->opts.server.rpc_lock);
if (ret) {
/* error */
- ERR("%s: faile to unlock a session (%s).", func, strerror(ret));
+ ERR("%s: faile to RPC unlock a session (%s).", func, strerror(ret));
return -1;
}
@@ -252,44 +275,49 @@
}
int
-nc_session_unlock(struct nc_session *session, int timeout, const char *func)
+nc_session_rpc_unlock(struct nc_session *session, int timeout, const char *func)
{
int ret;
struct timespec ts_timeout;
- assert(*session->ti_inuse);
+ if (session->side != NC_SERVER) {
+ ERRINT;
+ return -1;
+ }
+
+ assert(*session->opts.server.rpc_inuse);
if (timeout > 0) {
nc_gettimespec_real(&ts_timeout);
nc_addtimespec(&ts_timeout, timeout);
/* LOCK */
- ret = pthread_mutex_timedlock(session->ti_lock, &ts_timeout);
+ ret = pthread_mutex_timedlock(session->opts.server.rpc_lock, &ts_timeout);
} else if (!timeout) {
/* LOCK */
- ret = pthread_mutex_trylock(session->ti_lock);
+ ret = pthread_mutex_trylock(session->opts.server.rpc_lock);
} else { /* timeout == -1 */
/* LOCK */
- ret = pthread_mutex_lock(session->ti_lock);
+ ret = pthread_mutex_lock(session->opts.server.rpc_lock);
}
if (ret && (ret != EBUSY) && (ret != ETIMEDOUT)) {
/* error */
- ERR("%s: failed to lock a session (%s).", func, strerror(ret));
+ ERR("%s: failed to RPC lock a session (%s).", func, strerror(ret));
return -1;
} else if (ret) {
- WRN("%s: session lock timeout, should not happen.");
+ WRN("%s: session RPC lock timeout, should not happen.");
}
- *session->ti_inuse = 0;
- pthread_cond_signal(session->ti_cond);
+ *session->opts.server.rpc_inuse = 0;
+ pthread_cond_signal(session->opts.server.rpc_cond);
if (!ret) {
/* UNLOCK */
- ret = pthread_mutex_unlock(session->ti_lock);
+ ret = pthread_mutex_unlock(session->opts.server.rpc_lock);
if (ret) {
/* error */
- ERR("%s: failed to unlock a session (%s).", func, strerror(ret));
+ ERR("%s: failed to RPC unlock a session (%s).", func, strerror(ret));
return -1;
}
}
@@ -297,6 +325,57 @@
return 1;
}
+/*
+ * @return 1 - success
+ * 0 - timeout
+ * -1 - error
+ */
+int
+nc_session_io_lock(struct nc_session *session, int timeout, const char *func)
+{
+ int ret;
+ struct timespec ts_timeout;
+
+ if (timeout > 0) {
+ nc_gettimespec_real(&ts_timeout);
+ nc_addtimespec(&ts_timeout, timeout);
+
+ ret = pthread_mutex_timedlock(session->io_lock, &ts_timeout);
+ } else if (!timeout) {
+ ret = pthread_mutex_trylock(session->io_lock);
+ } else { /* timeout == -1 */
+ ret = pthread_mutex_lock(session->opts.server.rpc_lock);
+ }
+
+ if (ret) {
+ if ((ret == EBUSY) || (ret == ETIMEDOUT)) {
+ /* timeout */
+ return 0;
+ }
+
+ /* error */
+ ERR("%s: failed to IO lock a session (%s).", func, strerror(ret));
+ return -1;
+ }
+
+ return 1;
+}
+
+int
+nc_session_io_unlock(struct nc_session *session, const char *func)
+{
+ int ret;
+
+ ret = pthread_mutex_unlock(session->io_lock);
+ if (ret) {
+ /* error */
+ ERR("%s: failed to IO unlock a session (%s).", func, strerror(ret));
+ return -1;
+ }
+
+ return 1;
+}
+
API NC_STATUS
nc_session_get_status(const struct nc_session *session)
{
@@ -430,32 +509,23 @@
}
NC_MSG_TYPE
-nc_send_msg(struct nc_session *session, struct lyd_node *op)
+nc_send_msg_io(struct nc_session *session, int io_timeout, struct lyd_node *op)
{
- int r;
-
if (session->ctx != op->schema->module->ctx) {
ERR("Session %u: RPC \"%s\" was created in different context than that of the session.",
session->id, op->schema->name);
return NC_MSG_ERROR;
}
- r = nc_write_msg(session, NC_MSG_RPC, op, NULL);
-
- if (r) {
- return NC_MSG_ERROR;
- }
-
- return NC_MSG_RPC;
+ return nc_write_msg_io(session, io_timeout, NC_MSG_RPC, op, NULL);
}
API void
nc_session_free(struct nc_session *session, void (*data_free)(void *))
{
- int r, i, locked, sock = -1;
+ int r, i, rpc_locked = 0, sock = -1;
int connected; /* flag to indicate whether the transport socket is still connected */
int multisession = 0; /* flag for more NETCONF sessions on a single SSH session */
- pthread_t tid;
struct nc_session *siter;
struct nc_msg_cont *contiter;
struct lyxml_elem *rpl, *child;
@@ -469,30 +539,20 @@
/* stop notifications loop if any */
if ((session->side == NC_CLIENT) && session->opts.client.ntf_tid) {
- tid = *session->opts.client.ntf_tid;
- free((pthread_t *)session->opts.client.ntf_tid);
session->opts.client.ntf_tid = NULL;
/* the thread now knows it should quit */
-
- pthread_join(tid, NULL);
}
- if (session->ti_lock) {
- r = nc_session_lock(session, NC_SESSION_FREE_LOCK_TIMEOUT, __func__);
+ if ((session->side == NC_SERVER) && session->opts.server.rpc_lock) {
+ r = nc_session_rpc_lock(session, NC_SESSION_FREE_LOCK_TIMEOUT, __func__);
if (r == -1) {
return;
- } else if (!r) {
- /* we failed to lock it, too bad */
- locked = 0;
- } else {
- locked = 1;
- }
- } else {
- ERRINT;
- return;
+ } else if (r) {
+ rpc_locked = 1;
+ } /* else failed to lock it, too bad */
}
- if ((session->side == NC_CLIENT) && (session->status == NC_STATUS_RUNNING) && locked) {
+ if ((session->side == NC_CLIENT) && (session->status == NC_STATUS_RUNNING)) {
/* cleanup message queues */
/* notifications */
for (contiter = session->opts.client.notifs; contiter; ) {
@@ -518,9 +578,9 @@
WRN("Session %u: missing ietf-netconf schema in context, unable to send <close-session>.", session->id);
} else {
close_rpc = lyd_new(NULL, ietfnc, "close-session");
- nc_send_msg(session, close_rpc);
+ nc_send_msg_io(session, NC_SESSION_FREE_LOCK_TIMEOUT, close_rpc);
lyd_free(close_rpc);
- switch (nc_read_msg_poll(session, NC_CLOSE_REPLY_TIMEOUT, &rpl)) {
+ switch (nc_read_msg_poll_io(session, NC_CLOSE_REPLY_TIMEOUT, &rpl)) {
case NC_MSG_REPLY:
LY_TREE_FOR(rpl->child, child) {
if (!strcmp(child->name, "ok") && child->ns && !strcmp(child->ns->value, NC_NS_BASE)) {
@@ -692,17 +752,20 @@
lydict_remove(session->ctx, session->host);
/* final cleanup */
- if (session->ti_lock) {
- if (locked) {
- nc_session_unlock(session, NC_SESSION_LOCK_TIMEOUT, __func__);
+ if ((session->side == NC_SERVER) && session->opts.server.rpc_lock) {
+ if (rpc_locked) {
+ nc_session_rpc_unlock(session, NC_SESSION_LOCK_TIMEOUT, __func__);
}
- if (!multisession) {
- pthread_mutex_destroy(session->ti_lock);
- pthread_cond_destroy(session->ti_cond);
- free(session->ti_lock);
- free(session->ti_cond);
- free((int *)session->ti_inuse);
- }
+ pthread_mutex_destroy(session->opts.server.rpc_lock);
+ pthread_cond_destroy(session->opts.server.rpc_cond);
+ free(session->opts.server.rpc_lock);
+ free(session->opts.server.rpc_cond);
+ free((int *)session->opts.server.rpc_inuse);
+ }
+
+ if (session->io_lock && !multisession) {
+ pthread_mutex_destroy(session->io_lock);
+ free(session->io_lock);
}
if (!(session->flags & NC_SESSION_SHAREDCTX)) {
@@ -1032,70 +1095,56 @@
}
static NC_MSG_TYPE
-nc_send_client_hello(struct nc_session *session)
+nc_send_hello_io(struct nc_session *session)
{
- int r, i;
+ NC_MSG_TYPE ret;
+ int i, io_timeout;
const char **cpblts;
+ uint32_t *sid;
- /* client side hello - send only NETCONF base capabilities */
- cpblts = malloc(3 * sizeof *cpblts);
- if (!cpblts) {
- ERRMEM;
- return NC_MSG_ERROR;
+ if (session->side == NC_CLIENT) {
+ /* client side hello - send only NETCONF base capabilities */
+ cpblts = malloc(3 * sizeof *cpblts);
+ if (!cpblts) {
+ ERRMEM;
+ return NC_MSG_ERROR;
+ }
+ cpblts[0] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.0", 0);
+ cpblts[1] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.1", 0);
+ cpblts[2] = NULL;
+
+ io_timeout = NC_CLIENT_HELLO_TIMEOUT * 1000;
+ sid = NULL;
+ } else {
+ cpblts = nc_server_get_cpblts_version(session->ctx, LYS_VERSION_1);
+
+ io_timeout = NC_SERVER_HELLO_TIMEOUT * 1000;
+ sid = &session->id;
}
- cpblts[0] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.0", 0);
- cpblts[1] = lydict_insert(session->ctx, "urn:ietf:params:netconf:base:1.1", 0);
- cpblts[2] = NULL;
- r = nc_write_msg(session, NC_MSG_HELLO, cpblts, NULL);
+ ret = nc_write_msg_io(session, io_timeout, NC_MSG_HELLO, cpblts, sid);
for (i = 0; cpblts[i]; ++i) {
lydict_remove(session->ctx, cpblts[i]);
}
free(cpblts);
- if (r) {
- return NC_MSG_ERROR;
- }
-
- return NC_MSG_HELLO;
+ return ret;
}
static NC_MSG_TYPE
-nc_send_server_hello(struct nc_session *session)
-{
- int r, i;
- const char **cpblts;
-
- cpblts = nc_server_get_cpblts_version(session->ctx, LYS_VERSION_1);
-
- r = nc_write_msg(session, NC_MSG_HELLO, cpblts, &session->id);
-
- for (i = 0; cpblts[i]; ++i) {
- lydict_remove(session->ctx, cpblts[i]);
- }
- free(cpblts);
-
- if (r) {
- return NC_MSG_ERROR;
- }
-
- return NC_MSG_HELLO;
-}
-
-static NC_MSG_TYPE
-nc_recv_client_hello(struct nc_session *session)
+nc_recv_client_hello_io(struct nc_session *session)
{
struct lyxml_elem *xml = NULL, *node;
- NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
+ NC_MSG_TYPE msgtype;
int ver = -1;
char *str;
long long int id;
int flag = 0;
- msgtype = nc_read_msg_poll(session, NC_CLIENT_HELLO_TIMEOUT * 1000, &xml);
+ msgtype = nc_read_msg_poll_io(session, NC_CLIENT_HELLO_TIMEOUT * 1000, &xml);
- switch(msgtype) {
+ switch (msgtype) {
case NC_MSG_HELLO:
/* parse <hello> data */
LY_TREE_FOR(xml->child, node) {
@@ -1162,14 +1211,15 @@
}
static NC_MSG_TYPE
-nc_recv_server_hello(struct nc_session *session)
+nc_recv_server_hello_io(struct nc_session *session)
{
struct lyxml_elem *xml = NULL, *node;
NC_MSG_TYPE msgtype;
int ver = -1;
int flag = 0;
- msgtype = nc_read_msg_poll(session, (server_opts.hello_timeout ? server_opts.hello_timeout * 1000 : NC_SERVER_HELLO_TIMEOUT * 1000), &xml);
+ msgtype = nc_read_msg_poll_io(session, (server_opts.hello_timeout ?
+ server_opts.hello_timeout * 1000 : NC_SERVER_HELLO_TIMEOUT * 1000), &xml);
switch (msgtype) {
case NC_MSG_HELLO:
@@ -1212,29 +1262,23 @@
cleanup:
lyxml_free(session->ctx, xml);
-
return msgtype;
}
NC_MSG_TYPE
-nc_handshake(struct nc_session *session)
+nc_handshake_io(struct nc_session *session)
{
NC_MSG_TYPE type;
- if (session->side == NC_CLIENT) {
- type = nc_send_client_hello(session);
- } else {
- type = nc_send_server_hello(session);
- }
-
+ type = nc_send_hello_io(session);
if (type != NC_MSG_HELLO) {
return type;
}
if (session->side == NC_CLIENT) {
- type = nc_recv_client_hello(session);
+ type = nc_recv_client_hello_io(session);
} else {
- type = nc_recv_server_hello(session);
+ type = nc_recv_server_hello_io(session);
}
return type;
diff --git a/src/session_client.c b/src/session_client.c
index 79046aa..0ae3b07 100644
--- a/src/session_client.c
+++ b/src/session_client.c
@@ -856,20 +856,15 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
/* transport specific data */
session->ti_type = NC_TI_FD;
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
-
session->ti.fd.in = fdin;
session->ti.fd.out = fdout;
@@ -887,7 +882,7 @@
session->ctx = ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
@@ -969,7 +964,6 @@
static NC_MSG_TYPE
get_msg(struct nc_session *session, int timeout, uint64_t msgid, struct lyxml_elem **msg)
{
- int r;
char *ptr;
const char *str_msgid;
uint64_t cur_msgid;
@@ -977,15 +971,6 @@
struct nc_msg_cont *cont, **cont_ptr;
NC_MSG_TYPE msgtype = 0; /* NC_MSG_ERROR */
- r = nc_session_lock(session, timeout, __func__);
- if (r == -1) {
- /* error */
- return NC_MSG_ERROR;
- } else if (!r) {
- /* timeout */
- return NC_MSG_WOULDBLOCK;
- }
-
/* try to get notification from the session's queue */
if (!msgid && session->opts.client.notifs) {
cont = session->opts.client.notifs;
@@ -1010,7 +995,7 @@
if (!msgtype) {
/* read message from wire */
- msgtype = nc_read_msg_poll(session, timeout, &xml);
+ msgtype = nc_read_msg_poll_io(session, timeout, &xml);
}
/* we read rpc-reply, want a notif */
@@ -1022,7 +1007,6 @@
*cont_ptr = malloc(sizeof **cont_ptr);
if (!*cont_ptr) {
ERRMEM;
- nc_session_unlock(session, timeout, __func__);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
}
@@ -1033,7 +1017,6 @@
/* we read notif, want a rpc-reply */
if (msgid && (msgtype == NC_MSG_NOTIF)) {
if (!session->opts.client.ntf_tid) {
- pthread_mutex_unlock(session->ti_lock);
ERR("Session %u: received a <notification> but session is not subscribed.", session->id);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
@@ -1046,7 +1029,6 @@
*cont_ptr = malloc(sizeof **cont_ptr);
if (!cont_ptr) {
ERRMEM;
- nc_session_unlock(session, timeout, __func__);
lyxml_free(session->ctx, xml);
return NC_MSG_ERROR;
}
@@ -1054,8 +1036,6 @@
(*cont_ptr)->next = NULL;
}
- nc_session_unlock(session, timeout, __func__);
-
switch (msgtype) {
case NC_MSG_NOTIF:
if (!msgid) {
@@ -1729,12 +1709,18 @@
void (*notif_clb)(struct nc_session *session, const struct nc_notif *notif);
struct nc_notif *notif;
NC_MSG_TYPE msgtype;
+ pthread_t *ntf_tid;
+
+ pthread_detach(pthread_self());
ntarg = (struct nc_ntf_thread_arg *)arg;
session = ntarg->session;
notif_clb = ntarg->notif_clb;
free(ntarg);
+ /* remember our allocated tid, we will be freeing it */
+ ntf_tid = (pthread_t *)session->opts.client.ntf_tid;
+
while (session->opts.client.ntf_tid) {
msgtype = nc_recv_notif(session, NC_CLIENT_NOTIF_THREAD_SLEEP / 1000, ¬if);
if (msgtype == NC_MSG_NOTIF) {
@@ -1755,6 +1741,7 @@
VRB("Session %u: notification thread exit.", session->id);
session->opts.client.ntf_tid = NULL;
+ free(ntf_tid);
return NULL;
}
@@ -1810,7 +1797,7 @@
nc_send_rpc(struct nc_session *session, struct nc_rpc *rpc, int timeout, uint64_t *msgid)
{
NC_MSG_TYPE r;
- int ret, dofree = 1;
+ int dofree = 1;
struct nc_rpc_act_generic *rpc_gen;
struct nc_rpc_getconfig *rpc_gc;
struct nc_rpc_edit *rpc_e;
@@ -2284,6 +2271,11 @@
return NC_MSG_ERROR;
}
+ if (!data) {
+ /* error was already printed */
+ return NC_MSG_ERROR;
+ }
+
if (lyd_validate(&data, LYD_OPT_RPC | LYD_OPT_NOEXTDEPS
| (session->flags & NC_SESSION_CLIENT_NOT_STRICT ? 0 : LYD_OPT_STRICT), NULL)) {
if (dofree) {
@@ -2292,30 +2284,18 @@
return NC_MSG_ERROR;
}
- ret = nc_session_lock(session, timeout, __func__);
- if (ret == -1) {
- /* error */
- r = NC_MSG_ERROR;
- } else if (!ret) {
- /* blocking */
- r = NC_MSG_WOULDBLOCK;
- } else {
- /* send RPC, store its message ID */
- r = nc_send_msg(session, data);
- cur_msgid = session->opts.client.msgid;
- }
- nc_session_unlock(session, timeout, __func__);
+ /* send RPC, store its message ID */
+ r = nc_send_msg_io(session, timeout, data);
+ cur_msgid = session->opts.client.msgid;
if (dofree) {
lyd_free(data);
}
- if (r != NC_MSG_RPC) {
- return r;
+ if (r == NC_MSG_RPC) {
+ *msgid = cur_msgid;
}
-
- *msgid = cur_msgid;
- return NC_MSG_RPC;
+ return r;
}
API void
diff --git a/src/session_client_ssh.c b/src/session_client_ssh.c
index 332d5d1..83ab4d8 100644
--- a/src/session_client_ssh.c
+++ b/src/session_client_ssh.c
@@ -1487,19 +1487,12 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
-
- /* transport lock */
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
-
session->ti_type = NC_TI_LIBSSH;
session->ti.libssh.session = ssh_session;
@@ -1582,7 +1575,7 @@
ctx = session->ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
@@ -1642,20 +1635,14 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
- /* transport lock */
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
-
- /* other transport-specific data */
+ /* transport-specific data */
session->ti_type = NC_TI_LIBSSH;
session->ti.libssh.session = ssh_new();
if (!session->ti.libssh.session) {
@@ -1698,7 +1685,7 @@
ctx = session->ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
@@ -1736,30 +1723,26 @@
}
/* prepare session structure */
- new_session = nc_new_session(1);
+ new_session = nc_new_session(NC_CLIENT, 1);
if (!new_session) {
ERRMEM;
return NULL;
}
new_session->status = NC_STATUS_STARTING;
- new_session->side = NC_CLIENT;
- /* share some parameters including the session lock */
+ /* share some parameters including the IO lock (we are using one socket for both sessions) */
new_session->ti_type = NC_TI_LIBSSH;
- new_session->ti_lock = session->ti_lock;
- new_session->ti_cond = session->ti_cond;
- new_session->ti_inuse = session->ti_inuse;
new_session->ti.libssh.session = session->ti.libssh.session;
+ new_session->io_lock = session->io_lock;
/* create the channel safely */
- if (nc_session_lock(new_session, -1, __func__) != 1) {
+ if (nc_session_io_lock(new_session, -1, __func__) != 1) {
goto fail;
}
-
- /* open a channel */
if (open_netconf_channel(new_session, NC_TRANSPORT_TIMEOUT) != 1) {
goto fail;
}
+ nc_session_io_unlock(new_session, __func__);
if (nc_session_new_ctx(new_session, ctx) != EXIT_SUCCESS) {
goto fail;
@@ -1767,12 +1750,12 @@
ctx = session->ctx;
/* NETCONF handshake */
- if (nc_handshake(new_session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(new_session) != NC_MSG_HELLO) {
goto fail;
}
new_session->status = NC_STATUS_RUNNING;
- nc_session_unlock(new_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+ nc_session_io_unlock(new_session, __func__);
if (nc_ctx_check_and_fill(new_session) == -1) {
goto fail;
diff --git a/src/session_client_tls.c b/src/session_client_tls.c
index 2495506..fc780ad 100644
--- a/src/session_client_tls.c
+++ b/src/session_client_tls.c
@@ -602,18 +602,12 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
-
- /* transport lock */
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
/* fill the session */
session->ti_type = NC_TI_OPENSSL;
@@ -676,7 +670,7 @@
ctx = session->ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
@@ -711,19 +705,12 @@
}
/* prepare session structure */
- session = nc_new_session(0);
+ session = nc_new_session(NC_CLIENT, 0);
if (!session) {
ERRMEM;
return NULL;
}
session->status = NC_STATUS_STARTING;
- session->side = NC_CLIENT;
-
- /* transport lock */
- pthread_mutex_init(session->ti_lock, NULL);
- pthread_cond_init(session->ti_cond, NULL);
- *session->ti_inuse = 0;
-
session->ti_type = NC_TI_OPENSSL;
session->ti.tls = tls;
@@ -733,7 +720,7 @@
ctx = session->ctx;
/* NETCONF handshake */
- if (nc_handshake(session) != NC_MSG_HELLO) {
+ if (nc_handshake_io(session) != NC_MSG_HELLO) {
goto fail;
}
session->status = NC_STATUS_RUNNING;
diff --git a/src/session_p.h b/src/session_p.h
index 5a0c899..845fd3f 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -354,11 +354,9 @@
/* Transport implementation */
NC_TRANSPORT_IMPL ti_type; /**< transport implementation type to select items from ti union */
- pthread_mutex_t *ti_lock; /**< lock to access ti. Note that in case of libssh TI, it can be shared with other
- NETCONF sessions on the same SSH session (but different SSH channel) */
- pthread_cond_t *ti_cond; /**< ti_inuse condition */
- volatile int *ti_inuse; /**< variable indicating whether TI is being communicated on or not, protected by
- ti_cond and ti_lock */
+ pthread_mutex_t *io_lock; /**< input/output lock, note that in case of libssh TI, it will be shared
+ with other NETCONF sessions on the same SSH session (but different SSH channel) */
+
union {
struct {
int in; /**< input file descriptor */
@@ -406,6 +404,12 @@
time_t session_start; /**< real time the session was created */
time_t last_rpc; /**< monotonic time (seconds) the last RPC was received on this session */
int ntf_status; /**< flag whether the session is subscribed to any stream */
+
+ pthread_mutex_t *rpc_lock; /**< lock indicating RPC processing, this lock is always locked before io_lock!! */
+ pthread_cond_t *rpc_cond; /**< RPC condition (tied with rpc_lock and rpc_inuse) */
+ volatile int *rpc_inuse; /**< variable indicating whether there is RPC being processed or not (tied with
+ rpc_cond and rpc_lock) */
+
pthread_mutex_t *ch_lock; /**< Call Home thread lock */
pthread_cond_t *ch_cond; /**< Call Home thread condition */
@@ -460,7 +464,7 @@
void *nc_realloc(void *ptr, size_t size);
-NC_MSG_TYPE nc_send_msg(struct nc_session *session, struct lyd_node *op);
+NC_MSG_TYPE nc_send_msg_io(struct nc_session *session, int io_timeout, struct lyd_node *op);
#ifndef HAVE_PTHREAD_MUTEX_TIMEDLOCK
int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime);
@@ -474,11 +478,15 @@
void nc_addtimespec(struct timespec *ts, uint32_t msec);
-struct nc_session *nc_new_session(int not_allocate_ti);
+struct nc_session *nc_new_session(NC_SIDE side, int shared_ti);
-int nc_session_lock(struct nc_session *session, int timeout, const char *func);
+int nc_session_rpc_lock(struct nc_session *session, int timeout, const char *func);
-int nc_session_unlock(struct nc_session *session, int timeout, const char *func);
+int nc_session_rpc_unlock(struct nc_session *session, int timeout, const char *func);
+
+int nc_session_io_lock(struct nc_session *session, int timeout, const char *func);
+
+int nc_session_io_unlock(struct nc_session *session, const char *func);
int nc_ps_lock(struct nc_pollsession *ps, uint8_t *id, const char *func);
@@ -501,7 +509,7 @@
* @return NC_MSG_HELLO on success, NC_MSG_BAD_HELLO on client \<hello\> message parsing fail
* (server-side only), NC_MSG_WOULDBLOCK on timeout, NC_MSG_ERROR on other error.
*/
-NC_MSG_TYPE nc_handshake(struct nc_session *session);
+NC_MSG_TYPE nc_handshake_io(struct nc_session *session);
/**
* @brief Create a socket connection.
@@ -680,14 +688,14 @@
* libyang XML tree and the message type is detected from the top level element.
*
* @param[in] session NETCONF session from which the message is being read.
- * @param[in] timeout Timeout in milliseconds. Negative value means infinite timeout,
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
* zero value causes to return immediately.
* @param[out] data XML tree built from the read data.
* @return Type of the read message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
* (or zero) value and it passed out without any data on the wire. #NC_MSG_ERROR is
* returned on error and #NC_MSG_NONE is never returned by this function.
*/
-NC_MSG_TYPE nc_read_msg_poll(struct nc_session* session, int timeout, struct lyxml_elem **data);
+NC_MSG_TYPE nc_read_msg_poll_io(struct nc_session* session, int io_timeout, struct lyxml_elem **data);
/**
* @brief Read message from the wire.
@@ -696,17 +704,23 @@
* libyang XML tree and the message type is detected from the top level element.
*
* @param[in] session NETCONF session from which the message is being read.
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
+ * zero value causes to return immediately.
* @param[out] data XML tree built from the read data.
+ * @param[in] passing_io_lock True if \p session IO lock is already held. This function always unlocks
+ * it before returning!
* @return Type of the read message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
* (or zero) value and it passed out without any data on the wire. #NC_MSG_ERROR is
* returned on error and #NC_MSG_NONE is never returned by this function.
*/
-NC_MSG_TYPE nc_read_msg(struct nc_session* session, struct lyxml_elem **data);
+NC_MSG_TYPE nc_read_msg_io(struct nc_session* session, int io_timeout, struct lyxml_elem **data, int passing_io_lock);
/**
* @brief Write message into wire.
*
* @param[in] session NETCONF session to which the message will be written.
+ * @param[in] io_timeout Timeout in milliseconds. Negative value means infinite timeout,
+ * zero value causes to return immediately.
* @param[in] type The type of the message to write, specified as #NC_MSG_TYPE value. According to the type, the
* specific additional parameters are required or accepted:
* - #NC_MSG_RPC
@@ -719,10 +733,16 @@
* - `struct lyxml_node *rpc_elem;` - root of the RPC object to reply to. Required parameter.
* - `struct nc_server_reply *reply;` - RPC reply. Required parameter.
* - #NC_MSG_NOTIF
- * - TODO: content
- * @return 0 on success
+ * - `struct nc_server_notif *notif;` - notification object. Required parameter.
+ * - #NC_MSG_HELLO
+ * - `const char **capabs;` - capabilities array ended with NULL. Required parameter.
+ * - `uint32_t *sid;` - session ID to be included in the hello message. Optional parameter.
+ *
+ * @return Type of the written message. #NC_MSG_WOULDBLOCK is returned if timeout is positive
+ * (or zero) value and IO lock could not be acquired in that time. #NC_MSG_ERROR is
+ * returned on error and #NC_MSG_NONE is never returned by this function.
*/
-int nc_write_msg(struct nc_session *session, int type, ...);
+NC_MSG_TYPE nc_write_msg_io(struct nc_session *session, int io_timeout, int type, ...);
/**
* @brief Check whether a session is still connected (on transport layer).
diff --git a/src/session_server.c b/src/session_server.c
index f76887f..defa5f5 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -633,18 +633,12 @@
}
/* prepare session structure */
- *session = nc_new_session(0);
+ *session = nc_new_session(NC_SERVER, 0);
if (!(*session)) {
ERRMEM;
return NC_MSG_ERROR;
}
(*session)->status = NC_STATUS_STARTING;
- (*session)->side = NC_SERVER;
-
- /* transport lock */
- pthread_mutex_init((*session)->ti_lock, NULL);
- pthread_cond_init((*session)->ti_cond, NULL);
- *(*session)->ti_inuse = 0;
/* transport specific data */
(*session)->ti_type = NC_TI_FD;
@@ -661,7 +655,7 @@
pthread_spin_unlock(&server_opts.sid_lock);
/* NETCONF handshake */
- msgtype = nc_handshake(*session);
+ msgtype = nc_handshake_io(*session);
if (msgtype != NC_MSG_HELLO) {
nc_session_free(*session, NULL);
*session = NULL;
@@ -1003,14 +997,14 @@
return ps->session_count;
}
-/* must be called holding the session lock!
+/* should be called holding the session RPC lock! IO lock will be acquired as needed
* returns: NC_PSPOLL_ERROR,
* NC_PSPOLL_BAD_RPC,
* NC_PSPOLL_BAD_RPC | NC_PSPOLL_REPLY_ERROR,
* NC_PSPOLL_RPC
*/
static int
-nc_server_recv_rpc(struct nc_session *session, struct nc_server_rpc **rpc)
+nc_server_recv_rpc_io(struct nc_session *session, int io_timeout, struct nc_server_rpc **rpc)
{
struct lyxml_elem *xml = NULL;
NC_MSG_TYPE msgtype;
@@ -1028,7 +1022,7 @@
return NC_PSPOLL_ERROR;
}
- msgtype = nc_read_msg(session, &xml);
+ msgtype = nc_read_msg_io(session, io_timeout, &xml, 0);
switch (msgtype) {
case NC_MSG_RPC:
@@ -1044,7 +1038,7 @@
if (!(*rpc)->tree) {
/* parsing RPC failed */
reply = nc_server_reply_err(nc_err_libyang(server_opts.ctx));
- ret = nc_write_msg(session, NC_MSG_REPLY, xml, reply);
+ ret = nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, xml, reply);
nc_server_reply_free(reply);
if (ret == -1) {
ERR("Session %u: failed to write reply.", session->id);
@@ -1069,7 +1063,7 @@
goto error;
default:
/* NC_MSG_ERROR,
- * NC_MSG_WOULDBLOCK and NC_MSG_NONE is not returned by nc_read_msg()
+ * NC_MSG_WOULDBLOCK and NC_MSG_NONE is not returned by nc_read_msg_io()
*/
ret = NC_PSPOLL_ERROR;
break;
@@ -1093,8 +1087,7 @@
API NC_MSG_TYPE
nc_server_notif_send(struct nc_session *session, struct nc_server_notif *notif, int timeout)
{
- NC_MSG_TYPE result = NC_MSG_NOTIF;
- int ret;
+ NC_MSG_TYPE ret;
/* check parameters */
if (!session || (session->side != NC_SERVER) || !session->opts.server.ntf_status) {
@@ -1105,39 +1098,30 @@
return NC_MSG_ERROR;
}
- /* reading an RPC and sending a reply must be atomic (no other RPC should be read) */
- ret = nc_session_lock(session, timeout, __func__);
- if (ret < 0) {
- return NC_MSG_ERROR;
- } else if (!ret) {
- return NC_MSG_WOULDBLOCK;
- }
-
- ret = nc_write_msg(session, NC_MSG_NOTIF, notif);
- if (ret == -1) {
+ /* we do not need RPC lock for this, IO lock will be acquired properly */
+ ret = nc_write_msg_io(session, timeout, NC_MSG_NOTIF, notif);
+ if (ret == NC_MSG_ERROR) {
ERR("Session %u: failed to write notification.", session->id);
- result = NC_MSG_ERROR;
}
- nc_session_unlock(session, timeout, __func__);
-
- return result;
+ return ret;
}
-/* must be called holding the session lock!
+/* must be called holding the session RPC lock! IO lock will be acquired as needed
* returns: NC_PSPOLL_ERROR,
* NC_PSPOLL_ERROR | NC_PSPOLL_REPLY_ERROR,
* NC_PSPOLL_REPLY_ERROR,
* 0
*/
static int
-nc_server_send_reply(struct nc_session *session, struct nc_server_rpc *rpc)
+nc_server_send_reply_io(struct nc_session *session, int io_timeout, struct nc_server_rpc *rpc)
{
nc_rpc_clb clb;
struct nc_server_reply *reply;
struct lys_node *rpc_act = NULL;
struct lyd_node *next, *elem;
- int ret = 0, r;
+ int ret = 0;
+ NC_MSG_TYPE r;
if (!rpc) {
ERRINT;
@@ -1173,13 +1157,13 @@
if (!reply) {
reply = nc_server_reply_err(nc_err(NC_ERR_OP_FAILED, NC_ERR_TYPE_APP));
}
- r = nc_write_msg(session, NC_MSG_REPLY, rpc->root, reply);
+ r = nc_write_msg_io(session, io_timeout, NC_MSG_REPLY, rpc->root, reply);
if (reply->type == NC_RPL_ERROR) {
ret |= NC_PSPOLL_REPLY_ERROR;
}
nc_server_reply_free(reply);
- if (r == -1) {
+ if (r != NC_MSG_REPLY) {
ERR("Session %u: failed to write reply.", session->id);
ret |= NC_PSPOLL_ERROR;
}
@@ -1192,7 +1176,7 @@
return ret;
}
-/* session must be running and session lock held!
+/* session must be running and session RPC lock held!
* returns: NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR, (msg filled)
* NC_PSPOLL_ERROR, (msg filled)
* NC_PSPOLL_TIMEOUT,
@@ -1201,7 +1185,7 @@
* NC_PSPOLL_SSH_MSG
*/
static int
-nc_ps_poll_session(struct nc_session *session, time_t now_mono, char *msg)
+nc_ps_poll_session_io(struct nc_session *session, int io_timeout, time_t now_mono, char *msg)
{
struct pollfd pfd;
int r, ret;
@@ -1218,6 +1202,14 @@
return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
}
+ r = nc_session_io_lock(session, io_timeout, __func__);
+ if (r < 0) {
+ sprintf(msg, "session IO lock failed to be acquired");
+ return NC_PSPOLL_ERROR;
+ } else if (!r) {
+ return NC_PSPOLL_TIMEOUT;
+ }
+
switch (session->ti_type) {
#ifdef NC_ENABLED_SSH
case NC_TI_LIBSSH:
@@ -1336,6 +1328,7 @@
break;
}
+ nc_session_io_unlock(session, __func__);
return ret;
}
@@ -1385,8 +1378,8 @@
cur_ps_session = ps->sessions[i];
cur_session = cur_ps_session->session;
- /* SESSION LOCK */
- r = nc_session_lock(cur_session, 0, __func__);
+ /* SESSION RPC LOCK */
+ r = nc_session_rpc_lock(cur_session, 0, __func__);
if (r == -1) {
ret = NC_PSPOLL_ERROR;
} else if (r == 1) {
@@ -1397,7 +1390,7 @@
/* session is fine, work with it */
cur_ps_session->state = NC_PS_STATE_BUSY;
- ret = nc_ps_poll_session(cur_session, ts_cur.tv_sec, msg);
+ ret = nc_ps_poll_session_io(cur_session, NC_SESSION_LOCK_TIMEOUT, ts_cur.tv_sec, msg);
switch (ret) {
case NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR:
ERR("Session %u: %s.", cur_session->id, msg);
@@ -1438,10 +1431,10 @@
break;
}
- /* keep the session locked only in this one case */
+ /* keep RPC lock in this one case */
if (ret != NC_PSPOLL_RPC) {
- /* SESSION UNLOCK */
- nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+ /* SESSION RPC UNLOCK */
+ nc_session_rpc_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
}
} else {
/* timeout */
@@ -1494,9 +1487,9 @@
/* PS UNLOCK */
nc_ps_unlock(ps, q_id, __func__);
- /* we have some data available and the session is locked */
+ /* we have some data available and the session is RPC locked (but not IO locked) */
if (ret == NC_PSPOLL_RPC) {
- ret = nc_server_recv_rpc(cur_session, &rpc);
+ ret = nc_server_recv_rpc_io(cur_session, timeout, &rpc);
if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
if (cur_session->status != NC_STATUS_RUNNING) {
ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
@@ -1508,7 +1501,7 @@
cur_session->opts.server.last_rpc = ts_cur.tv_sec;
/* process RPC, not needed afterwards */
- ret |= nc_server_send_reply(cur_session, rpc);
+ ret |= nc_server_send_reply_io(cur_session, timeout, rpc);
nc_server_rpc_free(rpc, server_opts.ctx);
if (cur_session->status != NC_STATUS_RUNNING) {
@@ -1522,8 +1515,8 @@
}
}
- /* SESSION UNLOCK */
- nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
+ /* SESSION RPC UNLOCK */
+ nc_session_rpc_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
}
return ret;
@@ -1915,7 +1908,7 @@
sock = ret;
- *session = nc_new_session(0);
+ *session = nc_new_session(NC_SERVER, 0);
if (!(*session)) {
ERRMEM;
close(sock);
@@ -1924,17 +1917,11 @@
goto cleanup;
}
(*session)->status = NC_STATUS_STARTING;
- (*session)->side = NC_SERVER;
(*session)->ctx = server_opts.ctx;
(*session)->flags = NC_SESSION_SHAREDCTX;
(*session)->host = lydict_insert_zc(server_opts.ctx, host);
(*session)->port = port;
- /* transport lock */
- pthread_mutex_init((*session)->ti_lock, NULL);
- pthread_cond_init((*session)->ti_cond, NULL);
- *(*session)->ti_inuse = 0;
-
/* sock gets assigned to session or closed */
#ifdef NC_ENABLED_SSH
if (server_opts.endpts[bind_idx].ti == NC_TI_LIBSSH) {
@@ -1982,7 +1969,7 @@
pthread_spin_unlock(&server_opts.sid_lock);
/* NETCONF handshake */
- msgtype = nc_handshake(*session);
+ msgtype = nc_handshake_io(*session);
if (msgtype != NC_MSG_HELLO) {
nc_session_free(*session, NULL);
*session = NULL;
@@ -2661,24 +2648,18 @@
return NC_MSG_ERROR;
}
- *session = nc_new_session(0);
+ *session = nc_new_session(NC_SERVER, 0);
if (!(*session)) {
ERRMEM;
close(sock);
return NC_MSG_ERROR;
}
(*session)->status = NC_STATUS_STARTING;
- (*session)->side = NC_SERVER;
(*session)->ctx = server_opts.ctx;
(*session)->flags = NC_SESSION_SHAREDCTX;
(*session)->host = lydict_insert(server_opts.ctx, endpt->address, 0);
(*session)->port = endpt->port;
- /* transport lock */
- pthread_mutex_init((*session)->ti_lock, NULL);
- pthread_cond_init((*session)->ti_cond, NULL);
- *(*session)->ti_inuse = 0;
-
/* sock gets assigned to session or closed */
#ifdef NC_ENABLED_SSH
if (client->ti == NC_TI_LIBSSH) {
@@ -2725,7 +2706,7 @@
pthread_spin_unlock(&server_opts.sid_lock);
/* NETCONF handshake */
- msgtype = nc_handshake(*session);
+ msgtype = nc_handshake_io(*session);
if (msgtype != NC_MSG_HELLO) {
goto fail;
}
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 639475f..88aa0a3 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -990,7 +990,7 @@
session->flags |= NC_SESSION_SSH_SUBSYS_NETCONF;
} else {
/* additional channel subsystem request, new session is ready as far as SSH is concerned */
- new_session = nc_new_session(1);
+ new_session = nc_new_session(NC_SERVER, 1);
if (!new_session) {
ERRMEM;
return -1;
@@ -1005,11 +1005,8 @@
session->ti.libssh.next = new_session;
new_session->status = NC_STATUS_STARTING;
- new_session->side = NC_SERVER;
new_session->ti_type = NC_TI_LIBSSH;
- new_session->ti_lock = session->ti_lock;
- new_session->ti_cond = session->ti_cond;
- new_session->ti_inuse = session->ti_inuse;
+ new_session->io_lock = session->io_lock;
new_session->ti.libssh.channel = channel;
new_session->ti.libssh.session = session->ti.libssh.session;
new_session->username = lydict_insert(server_opts.ctx, session->username, 0);
@@ -1230,27 +1227,18 @@
return -1;
}
- ret = nc_session_lock(session, timeout, __func__);
- if (ret != 1) {
- return ret;
- }
-
ret = ssh_execute_message_callbacks(session->ti.libssh.session);
if (ret != SSH_OK) {
ERR("Failed to receive SSH messages on a session (%s).",
ssh_get_error(session->ti.libssh.session));
- nc_session_unlock(session, timeout, __func__);
return -1;
}
if (!session->ti.libssh.channel) {
- /* we did not receive channel-open, timeout */
- nc_session_unlock(session, timeout, __func__);
return 0;
}
ret = ssh_execute_message_callbacks(session->ti.libssh.session);
- nc_session_unlock(session, timeout, __func__);
if (ret != SSH_OK) {
ERR("Failed to receive SSH messages on a session (%s).",
ssh_get_error(session->ti.libssh.session));
@@ -1275,13 +1263,7 @@
return -1;
}
- ret = nc_session_lock(session, timeout, __func__);
- if (ret != 1) {
- return ret;
- }
-
ret = ssh_execute_message_callbacks(session->ti.libssh.session);
- nc_session_unlock(session, timeout, __func__);
if (ret != SSH_OK) {
ERR("Failed to receive SSH messages on a session (%s).",
ssh_get_error(session->ti.libssh.session));
@@ -1535,7 +1517,7 @@
pthread_spin_unlock(&server_opts.sid_lock);
/* NETCONF handshake */
- msgtype = nc_handshake(new_session);
+ msgtype = nc_handshake_io(new_session);
if (msgtype != NC_MSG_HELLO) {
return msgtype;
}
@@ -1608,7 +1590,7 @@
pthread_spin_unlock(&server_opts.sid_lock);
/* NETCONF handshake */
- msgtype = nc_handshake(new_session);
+ msgtype = nc_handshake_io(new_session);
if (msgtype != NC_MSG_HELLO) {
return msgtype;
}
diff --git a/tests/test_fd_comm.c b/tests/test_fd_comm.c
index 6ee3a05..f42f1b3 100644
--- a/tests/test_fd_comm.c
+++ b/tests/test_fd_comm.c
@@ -38,6 +38,7 @@
struct nc_session *server_session;
struct nc_session *client_session;
struct ly_ctx *ctx;
+volatile int glob_state;
struct nc_server_reply *
my_get_rpc_clb(struct lyd_node *rpc, struct nc_session *session)
@@ -63,6 +64,65 @@
return nc_server_reply_data(data, NC_WD_EXPLICIT, NC_PARAMTYPE_FREE);
}
+struct nc_server_reply *
+my_commit_rpc_clb(struct lyd_node *rpc, struct nc_session *session)
+{
+ assert_string_equal(rpc->schema->name, "commit");
+ assert_ptr_equal(session, server_session);
+
+ /* update state */
+ glob_state = 1;
+
+ /* wait until the client receives the notification */
+ while (glob_state != 3) {
+ usleep(100000);
+ }
+
+ return nc_server_reply_ok();
+}
+
+static struct nc_session *
+test_new_session(NC_SIDE side)
+{
+ struct nc_session *sess;
+
+ sess = calloc(1, sizeof *sess);
+ if (!sess) {
+ return NULL;
+ }
+
+ sess->side = side;
+
+ if (side == NC_SERVER) {
+ sess->opts.server.rpc_lock = malloc(sizeof *sess->opts.server.rpc_lock);
+ sess->opts.server.rpc_cond = malloc(sizeof *sess->opts.server.rpc_cond);
+ sess->opts.server.rpc_inuse = malloc(sizeof *sess->opts.server.rpc_inuse);
+ if (!sess->opts.server.rpc_lock || !sess->opts.server.rpc_cond || !sess->opts.server.rpc_inuse) {
+ goto error;
+ }
+ pthread_mutex_init(sess->opts.server.rpc_lock, NULL);
+ pthread_cond_init(sess->opts.server.rpc_cond, NULL);
+ *sess->opts.server.rpc_inuse = 0;
+ }
+
+ sess->io_lock = malloc(sizeof *sess->io_lock);
+ if (!sess->io_lock) {
+ goto error;
+ }
+ pthread_mutex_init(sess->io_lock, NULL);
+
+ return sess;
+
+error:
+ if (side == NC_SERVER) {
+ free(sess->opts.server.rpc_lock);
+ free(sess->opts.server.rpc_cond);
+ free((int *)sess->opts.server.rpc_inuse);
+ }
+ free(sess);
+ return NULL;
+}
+
static int
setup_sessions(void **state)
{
@@ -73,34 +133,20 @@
socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
/* create server session */
- server_session = calloc(1, sizeof *server_session);
+ server_session = test_new_session(NC_SERVER);
server_session->status = NC_STATUS_RUNNING;
- server_session->side = NC_SERVER;
server_session->id = 1;
server_session->ti_type = NC_TI_FD;
- server_session->ti_lock = malloc(sizeof *server_session->ti_lock);
- pthread_mutex_init(server_session->ti_lock, NULL);
- server_session->ti_cond = malloc(sizeof *server_session->ti_cond);
- pthread_cond_init(server_session->ti_cond, NULL);
- server_session->ti_inuse = malloc(sizeof *server_session->ti_inuse);
- *server_session->ti_inuse = 0;
server_session->ti.fd.in = sock[0];
server_session->ti.fd.out = sock[0];
server_session->ctx = ctx;
server_session->flags = NC_SESSION_SHAREDCTX;
/* create client session */
- client_session = calloc(1, sizeof *server_session);
+ client_session = test_new_session(NC_CLIENT);
client_session->status = NC_STATUS_RUNNING;
- client_session->side = NC_CLIENT;
client_session->id = 1;
client_session->ti_type = NC_TI_FD;
- client_session->ti_lock = malloc(sizeof *client_session->ti_lock);
- pthread_mutex_init(client_session->ti_lock, NULL);
- client_session->ti_cond = malloc(sizeof *client_session->ti_cond);
- pthread_cond_init(client_session->ti_cond, NULL);
- client_session->ti_inuse = malloc(sizeof *client_session->ti_inuse);
- *client_session->ti_inuse = 0;
client_session->ti.fd.in = sock[1];
client_session->ti.fd.out = sock[1];
client_session->ctx = ctx;
@@ -116,20 +162,10 @@
(void)state;
close(server_session->ti.fd.in);
- pthread_mutex_destroy(server_session->ti_lock);
- pthread_cond_destroy(server_session->ti_cond);
- free(server_session->ti_lock);
- free(server_session->ti_cond);
- free((int *)server_session->ti_inuse);
- free(server_session);
+ nc_session_free(server_session, NULL);
close(client_session->ti.fd.in);
- pthread_mutex_destroy(client_session->ti_lock);
- pthread_cond_destroy(client_session->ti_cond);
- free(client_session->ti_lock);
- free(client_session->ti_cond);
- free((int *)client_session->ti_inuse);
- free(client_session);
+ nc_session_free(client_session, NULL);
return 0;
}
@@ -312,12 +348,127 @@
test_send_recv_data();
}
-/* TODO
+static void
+test_notif_clb(struct nc_session *session, const struct nc_notif *notif)
+{
+ assert_ptr_equal(session, client_session);
+ assert_string_equal(notif->tree->schema->name, "notificationComplete");
+
+ /* client notification received, update state */
+ while (glob_state != 2) {
+ usleep(1000);
+ }
+ glob_state = 3;
+}
+
+static void *
+server_send_notif_thread(void *arg)
+{
+ NC_MSG_TYPE msg_type;
+ struct lyd_node *notif_tree;
+ struct nc_server_notif *notif;
+ char *buf;
+ (void)arg;
+
+ /* wait for the RPC callback to be called */
+ while (glob_state != 1) {
+ usleep(1000);
+ }
+
+ /* create notif */
+ notif_tree = lyd_new_path(NULL, ctx, "/nc-notifications:notificationComplete", NULL, 0, 0);
+ assert_non_null(notif_tree);
+ buf = malloc(64);
+ assert_non_null(buf);
+ notif = nc_server_notif_new(notif_tree, nc_time2datetime(time(NULL), NULL, buf), NC_PARAMTYPE_FREE);
+ assert_non_null(notif);
+
+ /* send notif */
+ nc_session_set_notif_status(server_session, 1);
+ msg_type = nc_server_notif_send(server_session, notif, 100);
+ nc_server_notif_free(notif);
+ assert_int_equal(msg_type, NC_MSG_NOTIF);
+
+ /* update state */
+ glob_state = 2;
+
+ return NULL;
+}
+
static void
test_send_recv_notif(void)
{
+ int ret;
+ pthread_t tid;
+ uint64_t msgid;
+ NC_MSG_TYPE msgtype;
+ struct nc_rpc *rpc;
+ struct nc_reply *reply;
+ struct nc_pollsession *ps;
-}*/
+ /* client RPC */
+ rpc = nc_rpc_commit(0, 0, NULL, NULL, 0);
+ assert_non_null(rpc);
+
+ msgtype = nc_send_rpc(client_session, rpc, 0, &msgid);
+ assert_int_equal(msgtype, NC_MSG_RPC);
+
+ /* client subscription */
+ ret = nc_recv_notif_dispatch(client_session, test_notif_clb);
+ assert_int_equal(ret, 0);
+
+ /* create server */
+ ps = nc_ps_new();
+ assert_non_null(ps);
+ nc_ps_add_session(ps, server_session);
+
+ /* server will send a notification */
+ glob_state = 0;
+ ret = pthread_create(&tid, NULL, server_send_notif_thread, NULL);
+ assert_int_equal(ret, 0);
+
+ /* server blocked on RPC */
+ ret = nc_ps_poll(ps, 0, NULL);
+ assert_int_equal(ret, NC_PSPOLL_RPC);
+
+ /* RPC, notification finished fine */
+ assert_int_equal(glob_state, 3);
+
+ /* server finished */
+ ret = pthread_join(tid, NULL);
+ assert_int_equal(ret, 0);
+ nc_ps_free(ps);
+
+ /* client reply */
+ msgtype = nc_recv_reply(client_session, rpc, msgid, 0, 0, &reply);
+ assert_int_equal(msgtype, NC_MSG_REPLY);
+
+ nc_rpc_free(rpc);
+ assert_int_equal(reply->type, NC_RPL_OK);
+ nc_reply_free(reply);
+}
+
+static void
+test_send_recv_notif_10(void **state)
+{
+ (void)state;
+
+ server_session->version = NC_VERSION_10;
+ client_session->version = NC_VERSION_10;
+
+ test_send_recv_notif();
+}
+
+static void
+test_send_recv_notif_11(void **state)
+{
+ (void)state;
+
+ server_session->version = NC_VERSION_11;
+ client_session->version = NC_VERSION_11;
+
+ test_send_recv_notif();
+}
int
main(void)
@@ -336,6 +487,11 @@
module = ly_ctx_load_module(ctx, "ietf-netconf", NULL);
assert_non_null(module);
+ ret = lys_features_enable(module, "candidate");
+ assert_int_equal(ret, 0);
+
+ module = ly_ctx_load_module(ctx, "nc-notifications", NULL);
+ assert_non_null(module);
/* set RPC callbacks */
node = ly_ctx_get_node(module->ctx, NULL, "/ietf-netconf:get", 0);
@@ -346,15 +502,21 @@
assert_non_null(node);
lys_set_private(node, my_getconfig_rpc_clb);
+ node = ly_ctx_get_node(module->ctx, NULL, "/ietf-netconf:commit", 0);
+ assert_non_null(node);
+ lys_set_private(node, my_commit_rpc_clb);
+
nc_server_init(ctx);
const struct CMUnitTest comm[] = {
cmocka_unit_test_setup_teardown(test_send_recv_ok_10, setup_sessions, teardown_sessions),
cmocka_unit_test_setup_teardown(test_send_recv_error_10, setup_sessions, teardown_sessions),
cmocka_unit_test_setup_teardown(test_send_recv_data_10, setup_sessions, teardown_sessions),
+ cmocka_unit_test_setup_teardown(test_send_recv_notif_10, setup_sessions, teardown_sessions),
cmocka_unit_test_setup_teardown(test_send_recv_ok_11, setup_sessions, teardown_sessions),
cmocka_unit_test_setup_teardown(test_send_recv_error_11, setup_sessions, teardown_sessions),
- cmocka_unit_test_setup_teardown(test_send_recv_data_11, setup_sessions, teardown_sessions)
+ cmocka_unit_test_setup_teardown(test_send_recv_data_11, setup_sessions, teardown_sessions),
+ cmocka_unit_test_setup_teardown(test_send_recv_notif_11, setup_sessions, teardown_sessions),
};
ret = cmocka_run_group_tests(comm, NULL, NULL);
diff --git a/tests/test_io.c b/tests/test_io.c
index 28d7890..78a512f 100644
--- a/tests/test_io.c
+++ b/tests/test_io.c
@@ -62,12 +62,8 @@
w->session->version = NC_VERSION_10;
w->session->opts.client.msgid = 999;
w->session->ti_type = NC_TI_FD;
- w->session->ti_lock = malloc(sizeof *w->session->ti_lock);
- pthread_mutex_init(w->session->ti_lock, NULL);
- w->session->ti_cond = malloc(sizeof *w->session->ti_cond);
- pthread_cond_init(w->session->ti_cond, NULL);
- w->session->ti_inuse = malloc(sizeof *w->session->ti_inuse);
- *w->session->ti_inuse = 0;
+ w->session->io_lock = malloc(sizeof *w->session->io_lock);
+ pthread_mutex_init(w->session->io_lock, NULL);
w->session->ti.fd.in = STDIN_FILENO;
w->session->ti.fd.out = STDOUT_FILENO;
@@ -141,6 +137,12 @@
NC_MSG_TYPE type;
w->session->side = NC_SERVER;
+ w->session->opts.server.rpc_lock = malloc(sizeof *w->session->opts.server.rpc_lock);
+ pthread_mutex_init(w->session->opts.server.rpc_lock, NULL);
+ w->session->opts.server.rpc_cond = malloc(sizeof *w->session->opts.server.rpc_cond);
+ pthread_cond_init(w->session->opts.server.rpc_cond, NULL);
+ w->session->opts.server.rpc_inuse = malloc(sizeof *w->session->opts.server.rpc_inuse);
+ *w->session->opts.server.rpc_inuse = 0;
do {
type = nc_send_rpc(w->session, w->rpc, 1000, &msgid);