session CHANGE poll sessions now have state

Mainly so that one invalid session is not
returned several times, but it may also
increase concurrency.

Fixes cesnet/netopeer2#146
diff --git a/src/session_p.h b/src/session_p.h
index 8837e34..8e43c53 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -395,9 +395,18 @@
     } opts;
 };
 
+enum nc_ps_session_state {
+    NC_PS_STATE_NONE = 0,      /**< session is not being worked with */
+    NC_PS_STATE_BUSY,          /**< session is being polled or communicated on (and locked) */
+    NC_PS_STATE_INVALID        /**< session is invalid and was already returned by another poll */
+};
+
 /* ACCESS locked */
 struct nc_pollsession {
-    struct nc_session **sessions;
+    struct {
+        struct nc_session *session;
+        enum nc_ps_session_state state;
+    } *sessions;
     uint16_t session_count;
     uint16_t last_event_session;
 
diff --git a/src/session_server.c b/src/session_server.c
index ba708c3..6d77a08 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -826,7 +826,8 @@
         nc_ps_unlock(ps, q_id, __func__);
         return -1;
     }
-    ps->sessions[ps->session_count - 1] = session;
+    ps->sessions[ps->session_count - 1].session = session;
+    ps->sessions[ps->session_count - 1].state = NC_PS_STATE_NONE;
 
     /* UNLOCK */
     return nc_ps_unlock(ps, q_id, __func__);
@@ -842,7 +843,7 @@
         goto remove;
     }
     for (i = 0; i < ps->session_count; ++i) {
-        if (ps->sessions[i] == session) {
+        if (ps->sessions[i].session == session) {
 remove:
             --ps->session_count;
             if (i < ps->session_count) {
@@ -904,8 +905,8 @@
     }
 
     for (i = 0; i < ps->session_count; ++i) {
-        if (ps->sessions[i]->id == sid) {
-            ret = ps->sessions[i];
+        if (ps->sessions[i].session->id == sid) {
+            ret = ps->sessions[i].session;
             break;
         }
     }
@@ -1150,30 +1151,31 @@
     /* check timeout of all the sessions */
     nc_gettimespec(&ts_cur);
     for (i = 0; i < ps->session_count; ++i) {
-        if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
+        cur_session = ps->sessions[i].session;
+        if ((cur_session->status != NC_STATUS_RUNNING) && (ps->sessions[i].state != NC_PS_STATE_INVALID)) {
             /* when the status change occurred an error was printed, no need to print another */
-            if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME)) {
-                /* ... so the application should have handled it and removed this session before unless
-                 * this is a Call Home session, when it could not */
-                WRN("Session %u: polling an invalid session.", ps->sessions[i]->id);
-            }
             ret = NC_PSPOLL_SESSION_TERM;
-            if (ps->sessions[i]->term_reason != NC_SESSION_TERM_CLOSED) {
+            if (cur_session->term_reason != NC_SESSION_TERM_CLOSED) {
                 ret |= NC_PSPOLL_SESSION_ERROR;
             }
+            ps->sessions[i].state = NC_PS_STATE_INVALID;
+
             if (session) {
-                *session = ps->sessions[i];
+                *session = cur_session;
             }
             goto ps_unlock_finish;
-        } else if (!(ps->sessions[i]->flags & NC_SESSION_CALLHOME) && !ps->sessions[i]->opts.server.ntf_status
+        } else if (!(cur_session->flags & NC_SESSION_CALLHOME) && !cur_session->opts.server.ntf_status
                 && server_opts.idle_timeout
-                && (ts_cur.tv_sec >= ps->sessions[i]->opts.server.last_rpc + server_opts.idle_timeout)) {
-            ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
-            ps->sessions[i]->status = NC_STATUS_INVALID;
-            ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
+                && (ts_cur.tv_sec >= cur_session->opts.server.last_rpc + server_opts.idle_timeout)) {
+            ERR("Session %u: session idle timeout elapsed.", cur_session->id);
+            cur_session->status = NC_STATUS_INVALID;
+            cur_session->term_reason = NC_SESSION_TERM_TIMEOUT;
+
             ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+            ps->sessions[i].state = NC_PS_STATE_INVALID;
+
             if (session) {
-                *session = ps->sessions[i];
+                *session = cur_session;
             }
             goto ps_unlock_finish;
         }
@@ -1193,10 +1195,10 @@
             i = j = ps->last_event_session + 1;
         }
         do {
-            cur_session = ps->sessions[i];
+            cur_session = ps->sessions[i].session;
 
             /* SESSION LOCK */
-            if ((cur_session->status == NC_STATUS_RUNNING)
+            if ((cur_session->status == NC_STATUS_RUNNING) && (ps->sessions[i].state == NC_PS_STATE_NONE)
                     && !*cur_session->ti_inuse && ((r = nc_session_lock(cur_session, 0, __func__)))) {
                 /* we go here if we successfully lock the session or there was an error, on timeout we simply skip it */
                 if (r == -1) {
@@ -1204,12 +1206,15 @@
                     goto ps_unlock_finish;
                 }
                 /* damn race condition */
-                if (cur_session->status != NC_STATUS_RUNNING) {
+                if ((cur_session->status != NC_STATUS_RUNNING) || (ps->sessions[i].state != NC_PS_STATE_NONE)) {
                     /* SESSION UNLOCK */
                     nc_session_unlock(cur_session, NC_SESSION_LOCK_TIMEOUT, __func__);
                     goto next_iteration;
                 }
 
+                /* it is being polled */
+                ps->sessions[i].state = NC_PS_STATE_BUSY;
+
                 switch (cur_session->ti_type) {
 #ifdef NC_ENABLED_SSH
                 case NC_TI_LIBSSH:
@@ -1243,20 +1248,24 @@
                                 if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
                                         && (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
                                     /* new NETCONF SSH channel */
+                                    ret = NC_PSPOLL_SSH_CHANNEL;
+                                    ps->sessions[i].state = NC_PS_STATE_NONE;
+
                                     if (session) {
                                         *session = cur_session;
                                     }
-                                    ret = NC_PSPOLL_SSH_CHANNEL;
                                     goto session_ps_unlock_finish;
                                 }
                             }
                         }
 
                         /* just some SSH message */
+                        ret = NC_PSPOLL_SSH_MSG;
+                        ps->sessions[i].state = NC_PS_STATE_NONE;
+
                         if (session) {
                             *session = cur_session;
                         }
-                        ret = NC_PSPOLL_SSH_MSG;
                         goto session_ps_unlock_finish;
                     } else {
                         /* we have some application data */
@@ -1273,6 +1282,11 @@
                         if (pfd.fd < 0) {
                             ERRINT;
                             ret = NC_PSPOLL_ERROR;
+                            ps->sessions[i].state = NC_PS_STATE_NONE;
+
+                            if (session) {
+                                *session = cur_session;
+                            }
                             goto session_ps_unlock_finish;
                         }
                         pfd.events = POLLIN;
@@ -1330,6 +1344,11 @@
                 case NC_TI_NONE:
                     ERRINT;
                     ret = NC_PSPOLL_ERROR;
+                    ps->sessions[i].state = NC_PS_STATE_NONE;
+
+                    if (session) {
+                        *session = cur_session;
+                    }
                     goto session_ps_unlock_finish;
                 }
                 /* we have some data, but it may be just an SSH message */
@@ -1342,14 +1361,23 @@
                     ERR("Session %u: %s.", cur_session->id, msg);
                     cur_session->status = NC_STATUS_INVALID;
                     cur_session->term_reason = term_reason;
+
+                    ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+                    ps->sessions[i].state = NC_PS_STATE_INVALID;
+
                     if (session) {
                         *session = cur_session;
                     }
-                    ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
                     goto session_ps_unlock_finish;
                 } else if (poll_ret == -1) {
                     ERR("Session %u: %s.", cur_session->id, msg);
+
                     ret = NC_PSPOLL_ERROR;
+                    ps->sessions[i].state = NC_PS_STATE_INVALID;
+
+                    if (session) {
+                        *session = cur_session;
+                    }
                     goto session_ps_unlock_finish;
                 } else if (poll_ret > 0) {
                     break;
@@ -1363,6 +1391,7 @@
             }
 
 next_iteration:
+            ps->sessions[i].state = NC_PS_STATE_NONE;
             if (i == ps->session_count - 1) {
                 i = 0;
             } else {
@@ -1396,6 +1425,7 @@
     if (ret & (NC_PSPOLL_ERROR | NC_PSPOLL_BAD_RPC)) {
         if (cur_session->status != NC_STATUS_RUNNING) {
             ret |= NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+            ps->sessions[i].state = NC_PS_STATE_INVALID;
         }
         goto session_unlock_finish;
     }
@@ -1411,6 +1441,9 @@
         if (!(cur_session->term_reason & (NC_SESSION_TERM_CLOSED | NC_SESSION_TERM_KILLED))) {
             ret |= NC_PSPOLL_SESSION_ERROR;
         }
+        ps->sessions[i].state = NC_PS_STATE_INVALID;
+    } else {
+        ps->sessions[i].state = NC_PS_STATE_NONE;
     }
 
 session_unlock_finish:
@@ -1447,7 +1480,7 @@
 
     if (all) {
         for (i = 0; i < ps->session_count; i++) {
-            nc_session_free(ps->sessions[i], data_free);
+            nc_session_free(ps->sessions[i].session, data_free);
         }
         free(ps->sessions);
         ps->sessions = NULL;
@@ -1455,8 +1488,8 @@
         ps->last_event_session = 0;
     } else {
         for (i = 0; i < ps->session_count; ) {
-            if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
-                session = ps->sessions[i];
+            if (ps->sessions[i].session->status != NC_STATUS_RUNNING) {
+                session = ps->sessions[i].session;
                 _nc_ps_del_session(ps, NULL, i);
                 nc_session_free(session, data_free);
                 continue;
diff --git a/src/session_server.h b/src/session_server.h
index 99c9f8a..c5f488b 100644
--- a/src/session_server.h
+++ b/src/session_server.h
@@ -252,11 +252,9 @@
 /**
  * @brief Poll sessions and process any received RPCs.
  *
- * All the sessions must be running. Only one event on one session
- * is handled in one function call. If this event is a session termination
- * (#NC_PSPOLL_SESSION_TERM returned), the session must be removed from \p ps
- * before another call. Otherwise no session will be polled and the same
- * return value and \p session will be returned.
+ * Only one event on one session is handled in one function call. If this event
+ * is a session termination (#NC_PSPOLL_SESSION_TERM returned), the session
+ * should be removed from \p ps.
  *
  * @param[in] ps Pollsession structure to use.
  * @param[in] timeout Poll timeout in milliseconds. 0 for non-blocking call, -1 for
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 86637e6..47a9c85 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -1520,7 +1520,7 @@
 {
     uint8_t q_id;
     NC_MSG_TYPE msgtype;
-    struct nc_session *new_session = NULL;
+    struct nc_session *new_session = NULL, *cur_session;
     uint16_t i;
 
     if (!ps) {
@@ -1537,11 +1537,12 @@
     }
 
     for (i = 0; i < ps->session_count; ++i) {
-        if ((ps->sessions[i]->status == NC_STATUS_RUNNING) && (ps->sessions[i]->ti_type == NC_TI_LIBSSH)
-                && ps->sessions[i]->ti.libssh.next) {
+        cur_session = ps->sessions[i].session;
+        if ((cur_session->status == NC_STATUS_RUNNING) && (cur_session->ti_type == NC_TI_LIBSSH)
+                && cur_session->ti.libssh.next) {
             /* an SSH session with more channels */
-            for (new_session = ps->sessions[i]->ti.libssh.next;
-                    new_session != ps->sessions[i];
+            for (new_session = cur_session->ti.libssh.next;
+                    new_session != cur_session;
                     new_session = new_session->ti.libssh.next) {
                 if ((new_session->status == NC_STATUS_STARTING) && new_session->ti.libssh.channel
                         && (new_session->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
@@ -1549,7 +1550,7 @@
                     break;
                 }
             }
-            if (new_session != ps->sessions[i]) {
+            if (new_session != cur_session) {
                 break;
             }