session CHANGE poll sessions now have state
Mainly so that one invalid session is not
returned several times, but it may also
increase concurrency.
Fixes cesnet/netopeer2#146
diff --git a/src/session_server.c b/src/session_server.c
index ba708c3..6d77a08 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -826,7 +826,8 @@
nc_ps_unlock(ps, q_id, __func__);
return -1;
}
- ps->sessions[ps->session_count - 1] = session;
+ ps->sessions[ps->session_count - 1].session = session;
+ ps->sessions[ps->session_count - 1].state = NC_PS_STATE_NONE;
/* UNLOCK */
return nc_ps_unlock(ps, q_id, __func__);
@@ -842,7 +843,7 @@
goto remove;
}
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i] == session) {
+ if (ps->sessions[i].session == session) {
remove:
--ps->session_count;
if (i < ps->session_count) {
@@ -904,8 +905,8 @@
}
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i]->id == sid) {
- ret = ps->sessions[i];
+ if (ps->sessions[i].session->id == sid) {
+ ret = ps->sessions[i].session;
break;
}
}
@@ -1150,30 +1151,31 @@
/* check timeout of all the sessions */
nc_gettimespec(&ts_cur);
for (i = 0; i < ps->session_count; ++i) {
- if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
+ cur_session = ps->sessions[i].session;
+ if ((cur_session->status != NC_STATUS_RUNNING) && (ps->sessions[i].state != NC_PS_STATE_INVALID)) {
/* when the status change occurred an error was printed, no need to print another */
- if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME)) {
- /* ... so the application should have handled it and removed this session before unless
- * this is a Call Home session, when it could not */
- WRN("Session %u: polling an invalid session.", ps->sessions[i]->id);
- }
ret = NC_PSPOLL_SESSION_TERM;
- if (ps->sessions[i]->term_reason != NC_SESSION_TERM_CLOSED) {
+ if (cur_session->term_reason != NC_SESSION_TERM_CLOSED) {
ret |= NC_PSPOLL_SESSION_ERROR;
}
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
- *session = ps->sessions[i];
+ *session = cur_session;
}
goto ps_unlock_finish;
- } else if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME) && !ps->sessions[i]->opts.server.ntf_status
+ } else if (!(cur_session->flags & NC_SESSION_CALLHOME) && !cur_session->opts.server.ntf_status
&& server_opts.idle_timeout
- && (ts_cur.tv_sec >= ps->sessions[i]->opts.server.last_rpc + server_opts.idle_timeout)) {
- ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
- ps->sessions[i]->status = NC_STATUS_INVALID;
- ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
+ && (ts_cur.tv_sec >= cur_session->opts.server.last_rpc + server_opts.idle_timeout)) {
+ ERR("Session %u: session idle timeout elapsed.", cur_session->id);
+ cur_session->status = NC_STATUS_INVALID;
+ cur_session->term_reason = NC_SESSION_TERM_TIMEOUT;
+
ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
- *session = ps->sessions[i];
+ *session = cur_session;
}
goto ps_unlock_finish;
}
@@ -1193,10 +1195,10 @@
i = j = ps->last_event_session + 1;
}
do {
- cur_session = ps->sessions[i];
+ cur_session = ps->sessions[i].session;
/* SESSION LOCK */
- if ((cur_session->status == NC_STATUS_RUNNING)
+ if ((cur_session->status == NC_STATUS_RUNNING) && (ps->sessions[i].state == NC_PS_STATE_NONE)
&& !*cur_session->ti_inuse && ((r = nc_session_lock(cur_session, 0, __func__)))) {
/* we go here if we successfully lock the session or there was an error, on timeout we simply skip it */
if (r == -1) {
@@ -1204,12 +1206,15 @@
goto ps_unlock_finish;
}
/* damn race condition */
- if (cur_session->status != NC_STATUS_RUNNING) {
+ if ((cur_session->status != NC_STATUS_RUNNING) || (ps->sessions[i].state != NC_PS_STATE_NONE)) {
/* SESSION UNLOCK */
nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
goto next_iteration;
}
+ /* it is being polled */
+ ps->sessions[i].state = NC_PS_STATE_BUSY;
+
switch (cur_session->ti_type) {
#ifdef NC_ENABLED_SSH
case NC_TI_LIBSSH:
@@ -1243,20 +1248,24 @@
if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
&& (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
/* new NETCONF SSH channel */
+ ret = NC_PSPOLL_SSH_CHANNEL;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SSH_CHANNEL;
goto session_ps_unlock_finish;
}
}
}
/* just some SSH message */
+ ret = NC_PSPOLL_SSH_MSG;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SSH_MSG;
goto session_ps_unlock_finish;
} else {
/* we have some application data */
@@ -1273,6 +1282,11 @@
if (pfd.fd < 0) {
ERRINT;
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
}
pfd.events = POLLIN;
@@ -1330,6 +1344,11 @@
case NC_TI_NONE:
ERRINT;
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_NONE;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
}
/* we have some data, but it may be just an SSH message */
@@ -1342,14 +1361,23 @@
ERR("Session %u: %s.", cur_session->id, msg);
cur_session->status = NC_STATUS_INVALID;
cur_session->term_reason = term_reason;
+
+ ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
if (session) {
*session = cur_session;
}
- ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
goto session_ps_unlock_finish;
} else if (poll_ret == -1) {
ERR("Session %u: %s.", cur_session->id, msg);
+
ret = NC_PSPOLL_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+
+ if (session) {
+ *session = cur_session;
+ }
goto session_ps_unlock_finish;
} else if (poll_ret > 0) {
break;
@@ -1363,6 +1391,7 @@
}
next_iteration:
+ ps->sessions[i].state = NC_PS_STATE_NONE;
if (i == ps->session_count - 1) {
i = 0;
} else {
@@ -1396,6 +1425,7 @@
if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
if (cur_session->status != NC_STATUS_RUNNING) {
ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
}
goto session_unlock_finish;
}
@@ -1411,6 +1441,9 @@
if (!(cur_session->term_reason & (NC_SESSION_TERM_CLOSED | NC_SESSION_TERM_KILLED))) {
ret |= NC_PSPOLL_SESSION_ERROR;
}
+ ps->sessions[i].state = NC_PS_STATE_INVALID;
+ } else {
+ ps->sessions[i].state = NC_PS_STATE_NONE;
}
session_unlock_finish:
@@ -1447,7 +1480,7 @@
if (all) {
for (i = 0; i < ps->session_count; i++) {
- nc_session_free(ps->sessions[i], data_free);
+ nc_session_free(ps->sessions[i].session, data_free);
}
free(ps->sessions);
ps->sessions = NULL;
@@ -1455,8 +1488,8 @@
ps->last_event_session = 0;
} else {
for (i = 0; i < ps->session_count; ) {
- if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
- session = ps->sessions[i];
+ if (ps->sessions[i].session->status != NC_STATUS_RUNNING) {
+ session = ps->sessions[i].session;
_nc_ps_del_session(ps, NULL, i);
nc_session_free(session, data_free);
continue;