session CHANGE poll sessions now have state
Mainly so that one invalid session is not
returned several times, but it may also
increase concurrency.
Fixes cesnet/netopeer2#146
diff --git a/src/session_p.h b/src/session_p.h
index 8837e34..8e43c53 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -395,9 +395,18 @@
} opts;
};
+enum nc_ps_session_state {
+ NC_PS_STATE_NONE = 0, /**< session is not being worked with */
+ NC_PS_STATE_BUSY, /**< session is being polled or communicated on (and locked) */
+ NC_PS_STATE_INVALID /**< session is invalid and was already returned by another poll */
+};
+
/* ACCESS locked */
struct nc_pollsession {
- struct nc_session **sessions;
+ struct {
+ struct nc_session *session;
+ enum nc_ps_session_state state;
+ } *sessions;
uint16_t session_count;
uint16_t last_event_session;
diff --git a/src/session_server.c b/src/session_server.c
index ba708c3..6d77a08 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -826,7 +826,8 @@
nc_ps_unlock(ps, q_id, __func__);
return -1;
}
- ps->sessions[ps->session_count - 1] = session;
+ ps->sessions[ps->session_count - 1].session = session;
+ ps->sessions[ps->session_count - 1].state = NC_PS_STATE_NONE;
/* UNLOCK */
return nc_ps_unlock(ps, q_id, __func__);
@@ -842,7 +843,7 @@
goto remove;
}
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i] == session) {
+ if (ps->sessions[i].session == session) {
remove:
--ps->session_count;
if (i < ps->session_count) {
@@ -904,8 +905,8 @@
}
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i]->id == sid) {
- ret = ps->sessions[i];
+ if (ps->sessions[i].session->id == sid) {
+ ret = ps->sessions[i].session;
break;
}
}
@@ -1150,30 +1151,31 @@
/* check timeout of all the sessions */
nc_gettimespec(&ts_cur);
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
+ cur_session = ps->sessions[i].session;
+ if ((cur_session->status != NC_STATUS_RUNNING) && (ps->sessions[i].state != NC_PS_STATE_INVALID)) {
/* when the status change occurred an error was printed, no need to print another */
- if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME)) {
- /* ... so the application should have handled it and removed this session before unless
- * this is a Call Home session, when it could not */
- WRN("Session %u: polling an invalid session.", ps->sessions[i]->id);
- }
ret = NC_PSPOLL_SESSION_TERM;
- if (ps->sessions[i]->term_reason != NC_SESSION_TERM_CLOSED) {
+ if (cur_session->term_reason != NC_SESSION_TERM_CLOSED) {
ret |= NC_PSPOLL_SESSION_ERROR;
}
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
- *session = ps->sessions[i];
+ *session = cur_session;
}
goto ps_unlock_finish;
- } else if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME) && !ps->sessions[i]->opts.server.ntf_status
+ } else if (!(cur_session->flags & NC_SESSION_CALLHOME) && !cur_session->opts.server.ntf_status
&& server_opts.idle_timeout
- && (ts_cur.tv_sec >= ps->sessions[i]->opts.server.last_rpc + server_opts.idle_timeout)) {
- ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
- ps->sessions[i]->status = NC_STATUS_INVALID;
- ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
+ && (ts_cur.tv_sec >= cur_session->opts.server.last_rpc + server_opts.idle_timeout)) {
+ ERR("Session %u: session idle timeout elapsed.", cur_session->id);
+ cur_session->status = NC_STATUS_INVALID;
+ cur_session->term_reason = NC_SESSION_TERM_TIMEOUT;
+
ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
- *session = ps->sessions[i];
+ *session = cur_session;
}
goto ps_unlock_finish;
}
@@ -1193,10 +1195,10 @@
i = j = ps->last_event_session + 1;
}
do {
- cur_session = ps->sessions[i];
+ cur_session = ps->sessions[i].session;
/* SESSION LOCK */
- if ((cur_session->status == NC_STATUS_RUNNING)
+ if ((cur_session->status == NC_STATUS_RUNNING) && (ps->sessions[i].state == NC_PS_STATE_NONE)
&& !*cur_session->ti_inuse && ((r = nc_session_lock(cur_session, 0, __func__)))) {
/* we go here if we successfully lock the session or there was an error, on timeout we simply skip it */
if (r == -1) {
@@ -1204,12 +1206,15 @@
goto ps_unlock_finish;
}
/* damn race condition */
- if (cur_session->status != NC_STATUS_RUNNING) {
+ if ((cur_session->status != NC_STATUS_RUNNING) || (ps->sessions[i].state != NC_PS_STATE_NONE)) {
/* SESSION UNLOCK */
nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
goto next_iteration;
}
+ /* it is being polled */
+ ps->sessions[i].state = NC_PS_STATE_BUSY;
+
switch (cur_session->ti_type) {
#ifdef NC_ENABLED_SSH
case NC_TI_LIBSSH:
@@ -1243,20 +1248,24 @@
if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
&& (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
/* new NETCONF SSH channel */
+ ret = NC_PSPOLL_SSH_CHANNEL;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SSH_CHANNEL;
goto session_ps_unlock_finish;
}
}
}
/* just some SSH message */
+ ret = NC_PSPOLL_SSH_MSG;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SSH_MSG;
goto session_ps_unlock_finish;
} else {
/* we have some application data */
@@ -1273,6 +1282,11 @@
if (pfd.fd < 0) {
ERRINT;
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
}
pfd.events = POLLIN;
@@ -1330,6 +1344,11 @@
case NC_TI_NONE:
ERRINT;
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
}
/* we have some data, but it may be just an SSH message */
@@ -1342,14 +1361,23 @@
ERR("Session %u: %s.", cur_session->id, msg);
cur_session->status = NC_STATUS_INVALID;
cur_session->term_reason = term_reason;
+
+ ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
goto session_ps_unlock_finish;
} else if (poll_ret == -1) {
ERR("Session %u: %s.", cur_session->id, msg);
+
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
} else if (poll_ret > 0) {
break;
@@ -1363,6 +1391,7 @@
}
next_iteration:
+ ps->sessions[i].state = NC_PS_STATE_NONE;
if (i == ps->session_count - 1) {
i = 0;
} else {
@@ -1396,6 +1425,7 @@
if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
if (cur_session->status != NC_STATUS_RUNNING) {
ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
}
goto session_unlock_finish;
}
@@ -1411,6 +1441,9 @@
if (!(cur_session->term_reason & (NC_SESSION_TERM_CLOSED | NC_SESSION_TERM_KILLED))) {
ret |= NC_PSPOLL_SESSION_ERROR;
}
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+ } else {
+ ps->sessions[i].state = NC_PS_STATE_NONE;
}
session_unlock_finish:
@@ -1447,7 +1480,7 @@
if (all) {
for (i = 0; i < ps->session_count; i++) {
- nc_session_free(ps->sessions[i], data_free);
+ nc_session_free(ps->sessions[i].session, data_free);
}
free(ps->sessions);
ps->sessions = NULL;
@@ -1455,8 +1488,8 @@
ps->last_event_session = 0;
} else {
for (i = 0; i < ps->session_count; ) {
- if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
- session = ps->sessions[i];
+ if (ps->sessions[i].session->status != NC_STATUS_RUNNING) {
+ session = ps->sessions[i].session;
_nc_ps_del_session(ps, NULL, i);
nc_session_free(session, data_free);
continue;
diff --git a/src/session_server.h b/src/session_server.h
index 99c9f8a..c5f488b 100644
--- a/src/session_server.h
+++ b/src/session_server.h
@@ -252,11 +252,9 @@
/**
* @brief Poll sessions and process any received RPCs.
*
- * All the sessions must be running. Only one event on one session
- * is handled in one function call. If this event is a session termination
- * (#NC_PSPOLL_SESSION_TERM returned), the session must be removed from \p ps
- * before another call. Otherwise no session will be polled and the same
- * return value and \p session will be returned.
+ * Only one event on one session is handled in one function call. If this event
+ * is a session termination (#NC_PSPOLL_SESSION_TERM returned), the session
+ * should be removed from \p ps.
*
* @param[in] ps Pollsession structure to use.
* @param[in] timeout Poll timeout in milliseconds. 0 for non-blocking call, -1 for
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 86637e6..47a9c85 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -1520,7 +1520,7 @@
{
uint8_t q_id;
NC_MSG_TYPE msgtype;
- struct nc_session *new_session = NULL;
+ struct nc_session *new_session = NULL, *cur_session;
uint16_t i;
if (!ps) {
@@ -1537,11 +1537,12 @@
}
for (i = 0; i < ps->session_count; ++i) {
- if ((ps->sessions[i]->status == NC_STATUS_RUNNING) && (ps->sessions[i]->ti_type == NC_TI_LIBSSH)
- && ps->sessions[i]->ti.libssh.next) {
+ cur_session = ps->sessions[i].session;
+ if ((cur_session->status == NC_STATUS_RUNNING) && (cur_session->ti_type == NC_TI_LIBSSH)
+ && cur_session->ti.libssh.next) {
/* an SSH session with more channels */
- for (new_session = ps->sessions[i]->ti.libssh.next;
- new_session != ps->sessions[i];
+ for (new_session = cur_session->ti.libssh.next;
+ new_session != cur_session;
new_session = new_session->ti.libssh.next) {
if ((new_session->status == NC_STATUS_STARTING) && new_session->ti.libssh.channel
&& (new_session->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
@@ -1549,7 +1550,7 @@
break;
}
}
- if (new_session != ps->sessions[i]) {
+ if (new_session != cur_session) {
break;
}