server session CHANGE make ps poll transport-specific

This means it must be active polling, but should
be always reliable.

Fixes cesnet/netopeer2#14
diff --git a/src/session.c b/src/session.c
index 1d90e91..2ac3a46 100644
--- a/src/session.c
+++ b/src/session.c
@@ -64,6 +64,30 @@
 #endif
 }
 
+/* ts1 < ts2, returns milliseconds */
+uint32_t
+nc_difftimespec(struct timespec *ts1, struct timespec *ts2)
+{
+    uint64_t nsec_diff = 0;
+
+    if (ts1->tv_nsec > ts2->tv_nsec) {
+        ts2->tv_nsec += 1000000000L;
+        --ts2->tv_sec;
+    }
+
+    if (ts1->tv_sec <= ts2->tv_sec) {
+        nsec_diff += (ts2->tv_sec - ts1->tv_sec) * 1000000000L;
+    } else {
+        ERRINT;
+    }
+
+    if (ts1->tv_nsec < ts2->tv_nsec) {
+        nsec_diff += ts2->tv_nsec - ts1->tv_nsec;
+    }
+
+    return (nsec_diff ? nsec_diff / 1000000L : 0);
+}
+
 #ifndef HAVE_PTHREAD_MUTEX_TIMEDLOCK
 int
 pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime)
diff --git a/src/session_p.h b/src/session_p.h
index 5358bb8..4f3e790 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -295,9 +295,9 @@
 
 /* ACCESS locked */
 struct nc_pollsession {
-    struct pollfd *pfds;
     struct nc_session **sessions;
     uint16_t session_count;
+    uint16_t last_event_session;
 
     pthread_cond_t cond;
     pthread_mutex_t lock;
@@ -321,6 +321,8 @@
 
 int nc_gettimespec(struct timespec *ts);
 
+uint32_t nc_difftimespec(struct timespec *ts1, struct timespec *ts2);
+
 int nc_timedlock(pthread_mutex_t *lock, int timeout, const char *func);
 
 int nc_ps_lock(struct nc_pollsession *ps, uint8_t *id, const char *func);
@@ -487,24 +489,6 @@
  */
 int nc_sshcb_msg(ssh_session sshsession, ssh_message msg, void *data);
 
-/**
- * @brief Inspect what exactly happened if a SSH session socket poll
- * returned POLLIN.
- *
- * @param[in] session NETCONF session communicating on the socket.
- * @param[in] timeout Timeout for locking ti_lock.
- * @param[in] ssh_message Whether to also check for standard SSH messages or just do
- *            application data poll.
- * @return NC_PSPOLL_TIMEOUT,
- *         NC_PSPOLL_RPC (has new data),
- *         NC_PSPOLL_PENDING (other channel has data),
- *         NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR,
- *         NC_PSPOLL_SSH_MSG,
- *         NC_PSPOLL_SSH_CHANNEL,
- *         NC_PSPOLL_ERROR.
- */
-int nc_ssh_pollin(struct nc_session *session, int timeout, int ssh_message);
-
 void nc_server_ssh_clear_opts(struct nc_server_ssh_opts *opts);
 
 void nc_client_ssh_destroy_opts(void);
diff --git a/src/session_server.c b/src/session_server.c
index a3977ff..61f954d 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -703,7 +703,6 @@
         ERR("FATAL: Freeing a pollsession structure that is currently being worked with!");
     }
 
-    free(ps->pfds);
     free(ps->sessions);
     pthread_mutex_destroy(&ps->lock);
     pthread_cond_destroy(&ps->cond);
@@ -730,40 +729,13 @@
     }
 
     ++ps->session_count;
-    ps->pfds = nc_realloc(ps->pfds, ps->session_count * sizeof *ps->pfds);
     ps->sessions = nc_realloc(ps->sessions, ps->session_count * sizeof *ps->sessions);
-    if (!ps->pfds || !ps->sessions) {
+    if (!ps->sessions) {
         ERRMEM;
         /* UNLOCK */
         nc_ps_unlock(ps, q_id, __func__);
         return -1;
     }
-
-    switch (session->ti_type) {
-    case NC_TI_FD:
-        ps->pfds[ps->session_count - 1].fd = session->ti.fd.in;
-        break;
-
-#ifdef NC_ENABLED_SSH
-    case NC_TI_LIBSSH:
-        ps->pfds[ps->session_count - 1].fd = ssh_get_fd(session->ti.libssh.session);
-        break;
-#endif
-
-#ifdef NC_ENABLED_TLS
-    case NC_TI_OPENSSL:
-        ps->pfds[ps->session_count - 1].fd = SSL_get_rfd(session->ti.tls);
-        break;
-#endif
-
-    default:
-        ERRINT;
-        /* UNLOCK */
-        nc_ps_unlock(ps, q_id, __func__);
-        return -1;
-    }
-    ps->pfds[ps->session_count - 1].events = POLLIN;
-    ps->pfds[ps->session_count - 1].revents = 0;
     ps->sessions[ps->session_count - 1] = session;
 
     /* UNLOCK */
@@ -785,12 +757,12 @@
             --ps->session_count;
             if (i < ps->session_count) {
                 ps->sessions[i] = ps->sessions[ps->session_count];
-                memcpy(&ps->pfds[i], &ps->pfds[ps->session_count], sizeof *ps->pfds);
+                if (ps->last_event_session == i) {
+                    ps->last_event_session = 0;
+                }
             } else if (!ps->session_count) {
                 free(ps->sessions);
                 ps->sessions = NULL;
-                free(ps->pfds);
-                ps->pfds = NULL;
             }
             return 0;
         }
@@ -1007,25 +979,32 @@
 API int
 nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
 {
-    int ret;
+    int ret, r, poll_ret;
     uint8_t q_id;
-    uint16_t i;
-    time_t cur_time;
+    uint16_t i, j;
+    char msg[256];
+    NC_SESSION_TERM_REASON term_reason;
+    struct pollfd pfd;
+    struct timespec begin_ts, cur_ts;
     struct nc_session *cur_session;
     struct nc_server_rpc *rpc = NULL;
+#ifdef NC_ENABLED_SSH
+    struct nc_session *new;
+#endif
 
     if (!ps || !ps->session_count) {
         ERRARG("ps");
         return NC_PSPOLL_ERROR;
     }
 
-    cur_time = time(NULL);
+    nc_gettimespec(&begin_ts);
 
     /* LOCK */
     if (nc_ps_lock(ps, &q_id, __func__)) {
         return NC_PSPOLL_ERROR;
     }
 
+    /* check that all session are fine */
     for (i = 0; i < ps->session_count; ++i) {
         if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
             ERR("Session %u: session not running.", ps->sessions[i]->id);
@@ -1037,7 +1016,7 @@
         }
 
         /* TODO invalidate only sessions without subscription */
-        if (server_opts.idle_timeout && (cur_time >= ps->sessions[i]->last_rpc + server_opts.idle_timeout)) {
+        if (server_opts.idle_timeout && (begin_ts.tv_sec >= ps->sessions[i]->last_rpc + server_opts.idle_timeout)) {
             ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
             ps->sessions[i]->status = NC_STATUS_INVALID;
             ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
@@ -1047,122 +1026,211 @@
             }
             goto finish;
         }
-
-        if (ps->pfds[i].revents) {
-            break;
-        }
     }
 
-    if (i == ps->session_count) {
-#ifdef NC_ENABLED_SSH
-retry_poll:
-#endif
-        /* no leftover event */
-        i = 0;
-        ret = poll(ps->pfds, ps->session_count, timeout);
-        if (ret < 0) {
-            ERR("Poll failed (%s).", strerror(errno));
-            ret = NC_PSPOLL_ERROR;
-            goto finish;
-        } else if (!ret) {
-            ret = NC_PSPOLL_TIMEOUT;
-            goto finish;
+    /* poll on all the sessions one-by-one */
+    do {
+        /* loop from i to j */
+        if (ps->last_event_session == ps->session_count - 1) {
+            i = j = 0;
+        } else {
+            i = j = ps->last_event_session + 1;
         }
-    }
+        do {
+            cur_session = ps->sessions[i];
 
-    /* find the first fd with POLLIN, we don't care if there are more now */
-    for (; i < ps->session_count; ++i) {
-        if (ps->pfds[i].revents & (POLLHUP | POLLNVAL)) {
-            ERR("Session %u: communication socket unexpectedly closed.", ps->sessions[i]->id);
-            ps->sessions[i]->status = NC_STATUS_INVALID;
-            ps->sessions[i]->term_reason = NC_SESSION_TERM_DROPPED;
-            ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
-            if (session) {
-                *session = ps->sessions[i];
-            }
-            goto finish;
-        } else if (ps->pfds[i].revents & POLLERR) {
-            ERR("Session %u: communication socket error.", ps->sessions[i]->id);
-            ps->sessions[i]->status = NC_STATUS_INVALID;
-            ps->sessions[i]->term_reason = NC_SESSION_TERM_OTHER;
-            ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
-            if (session) {
-                *session = ps->sessions[i];
-            }
-            goto finish;
-        } else if (ps->pfds[i].revents & POLLIN) {
+            switch (cur_session->ti_type) {
 #ifdef NC_ENABLED_SSH
-            if (ps->sessions[i]->ti_type == NC_TI_LIBSSH) {
-                uint16_t j;
-
-                /* things are not that simple with SSH... */
-                ret = nc_ssh_pollin(ps->sessions[i], timeout, 1);
-
-                /* clear POLLIN on sessions sharing this session's SSH session */
-                if (ret & (NC_PSPOLL_RPC | NC_PSPOLL_SSH_MSG | NC_PSPOLL_SSH_CHANNEL)) {
-                    for (j = i + 1; j < ps->session_count; ++j) {
-                        if (ps->pfds[j].fd == ps->pfds[i].fd) {
-                            ps->pfds[j].revents = 0;
-                        }
+            case NC_TI_LIBSSH:
+                r = ssh_channel_poll_timeout(cur_session->ti.libssh.channel, 0, 0);
+                if (r < 1) {
+                    if (r == SSH_EOF) {
+                        sprintf(msg, "SSH channel unexpectedly closed");
+                        term_reason = NC_SESSION_TERM_DROPPED;
+                        poll_ret = -2;
+                    } else if (r == SSH_ERROR) {
+                        sprintf(msg, "SSH channel poll error (%s)", ssh_get_error(cur_session->ti.libssh.session));
+                        poll_ret = -1;
+                    } else {
+                        poll_ret = 0;
                     }
+                    break;
                 }
+                /* we have some data, but it may be just an SSH message */
 
-                /* SSH message only */
-                if (!(ret & (NC_PSPOLL_RPC | NC_PSPOLL_PENDING))) {
-                    ps->pfds[i].revents = 0;
+                r = nc_timedlock(cur_session->ti_lock, timeout, __func__);
+                if (r < 0) {
                     if (session) {
-                        *session = ps->sessions[i];
+                        *session = cur_session;
                     }
+                    ret = NC_PSPOLL_ERROR;
                     goto finish;
-
-                /* event occurred on some other channel */
-                } else if (ret & NC_PSPOLL_PENDING) {
-                    ps->pfds[i].revents = 0;
-                    if (i == ps->session_count - 1) {
-                        /* last session and it is not the right channel, ... */
-                        if (!timeout) {
-                            /* ... timeout is 0, so that is it */
-                            ret = NC_PSPOLL_TIMEOUT;
-                            goto finish;
-                        }
-                        /* ... retry polling reasonable time apart ... */
-                        usleep(NC_TIMEOUT_STEP);
-                        if (timeout > 0) {
-                            /* ... and decrease timeout, if not -1 */
-                            timeout -= NC_TIMEOUT_STEP * 1000;
-                        }
-                        goto retry_poll;
+                } else if (!r) {
+                    if (session) {
+                        *session = cur_session;
                     }
-                    /* check other sessions */
-                    continue;
+                    ret = NC_PSPOLL_TIMEOUT;
+                    goto finish;
                 }
+                r = ssh_execute_message_callbacks(cur_session->ti.libssh.session);
+                pthread_mutex_unlock(cur_session->ti_lock);
+
+                if (r != SSH_OK) {
+                    sprintf(msg, "failed to receive SSH messages (%s)", ssh_get_error(cur_session->ti.libssh.session));
+                    term_reason = NC_SESSION_TERM_OTHER;
+                    poll_ret = -1;
+                } else if (cur_session->flags & NC_SESSION_SSH_NEW_MSG) {
+                    /* new SSH message */
+                    cur_session->flags &= ~NC_SESSION_SSH_NEW_MSG;
+                    if (cur_session->ti.libssh.next) {
+                        for (new = cur_session->ti.libssh.next; new != cur_session; new = new->ti.libssh.next) {
+                            if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
+                                    && (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
+                                /* new NETCONF SSH channel */
+                                if (session) {
+                                    *session = cur_session;
+                                }
+                                ret = NC_PSPOLL_SSH_CHANNEL;
+                                goto finish;
+                            }
+                        }
+                    }
+
+                    /* just some SSH message */
+                    if (session) {
+                        *session = cur_session;
+                    }
+                    ret = NC_PSPOLL_SSH_MSG;
+                    goto finish;
+                } else {
+                    /* we have some application data */
+                    poll_ret = 1;
+                }
+                break;
+#endif
+#ifdef NC_ENABLED_TLS
+            case NC_TI_OPENSSL:
+                r = SSL_pending(cur_session->ti.tls);
+                if (!r) {
+                    /* no data pending in the SSL buffer, poll fd */
+                    pfd.fd = SSL_get_rfd(cur_session->ti.tls);
+                    if (pfd.fd < 0) {
+                        ERRINT;
+                        ret = NC_PSPOLL_ERROR;
+                        goto finish;
+                    }
+                    pfd.events = POLLIN;
+                    pfd.revents = 0;
+                    r = poll(&pfd, 1, 0);
+
+                    if (r < 0) {
+                        sprintf(msg, "poll failed (%s)", strerror(errno));
+                        poll_ret = -1;
+                    } else if (r > 0) {
+                        if (pfd.revents & (POLLHUP | POLLNVAL)) {
+                            sprintf(msg, "communication socket unexpectedly closed");
+                            term_reason = NC_SESSION_TERM_DROPPED;
+                            poll_ret = -2;
+                        } else if (pfd.revents & POLLERR) {
+                            sprintf(msg, "communication socket error");
+                            term_reason = NC_SESSION_TERM_OTHER;
+                            poll_ret = -2;
+                        } else {
+                            poll_ret = 1;
+                        }
+                    } else {
+                        poll_ret = 0;
+                    }
+                } else {
+                    poll_ret = 1;
+                }
+                break;
+#endif
+            case NC_TI_FD:
+                pfd.fd = cur_session->ti.fd.in;
+                pfd.events = POLLIN;
+                pfd.revents = 0;
+                r = poll(&pfd, 1, 0);
+
+                if (r < 0) {
+                    sprintf(msg, "poll failed (%s)", strerror(errno));
+                    poll_ret = 0;
+                } else if (r > 0) {
+                    if (pfd.revents & (POLLHUP | POLLNVAL)) {
+                        sprintf(msg, "communication socket unexpectedly closed");
+                        term_reason = NC_SESSION_TERM_DROPPED;
+                        poll_ret = -2;
+                    } else if (pfd.revents & POLLERR) {
+                        sprintf(msg, "communication socket error");
+                        term_reason = NC_SESSION_TERM_OTHER;
+                        poll_ret = -2;
+                    } else {
+                        poll_ret = 1;
+                    }
+                } else {
+                    poll_ret = 0;
+                }
+                break;
+            case NC_TI_NONE:
+                ERRINT;
+                ret = NC_PSPOLL_ERROR;
+                goto finish;
             }
-#endif /* NC_ENABLED_SSH */
 
-            /* we are going to process it now */
-            ps->pfds[i].revents = 0;
-            break;
+             /* here: poll_ret == -2 - session error, session terminated,
+            *       poll_ret == -1 - generic error,
+             *       poll_ret == 0 - nothing to read,
+             *       poll_ret > 0 - data available */
+            if (poll_ret == -2) {
+                ERR("Session %u: %s.", cur_session->id, msg);
+                cur_session->status = NC_STATUS_INVALID;
+                cur_session->term_reason = term_reason;
+                if (session) {
+                    *session = cur_session;
+                }
+                ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+                goto finish;
+            } else if (poll_ret == -1) {
+                ERR("Session %u: %s.", cur_session->id, msg);
+                ret = NC_PSPOLL_ERROR;
+                goto finish;
+            } else if (poll_ret > 0) {
+                break;
+            }
+
+            /* next iteration */
+            if (i == ps->session_count - 1) {
+                i = 0;
+            } else {
+                ++i;
+            }
+        } while (i != j);
+
+        /* no event */
+        if (!poll_ret && (timeout > -1)) {
+            usleep(NC_TIMEOUT_STEP);
+
+            nc_gettimespec(&cur_ts);
+            /* final timeout */
+            if (nc_difftimespec(&begin_ts, &cur_ts) >= (unsigned)timeout) {
+                ret = NC_PSPOLL_TIMEOUT;
+                goto finish;
+            }
         }
-    }
-
-    if (i == ps->session_count) {
-        ERRINT;
-        ret = NC_PSPOLL_ERROR;
-        goto finish;
-    }
+    } while (!poll_ret);
 
     /* this is the session with some data available for reading */
-    cur_session = ps->sessions[i];
     if (session) {
         *session = cur_session;
     }
+    ps->last_event_session = i;
 
     /* reading an RPC and sending a reply must be atomic (no other RPC should be read) */
-    ret = nc_timedlock(cur_session->ti_lock, timeout, __func__);
-    if (ret < 0) {
+    r = nc_timedlock(cur_session->ti_lock, timeout, __func__);
+    if (r < 0) {
         ret = NC_PSPOLL_ERROR;
         goto finish;
-    } else if (!ret) {
+    } else if (!r) {
         ret = NC_PSPOLL_TIMEOUT;
         goto finish;
     }
@@ -1191,25 +1259,6 @@
 
     nc_server_rpc_free(rpc, server_opts.ctx);
 
-#ifdef NC_ENABLED_SSH
-    /* is there any data received but not processed? */
-    if (cur_session->ti_type == NC_TI_LIBSSH) {
-        /* ignore errors */
-        if (nc_ssh_pollin(cur_session, 0, 0) == NC_PSPOLL_RPC) {
-            ps->pfds[i].revents = POLLIN;
-            ret |= NC_PSPOLL_PENDING;
-        }
-    }
-#endif /* NC_ENABLED_SSH */
-
-    /* is there some other socket waiting? */
-    for (++i; i < ps->session_count; ++i) {
-        if (ps->pfds[i].revents) {
-            ret |= NC_PSPOLL_PENDING;
-            break;
-        }
-    }
-
 finish:
     /* UNLOCK */
     nc_ps_unlock(ps, q_id, __func__);
@@ -1239,9 +1288,8 @@
         }
         free(ps->sessions);
         ps->sessions = NULL;
-        free(ps->pfds);
-        ps->pfds = NULL;
         ps->session_count = 0;
+        ps->last_event_session = 0;
     } else {
         for (i = 0; i < ps->session_count; ) {
             if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
diff --git a/src/session_server.h b/src/session_server.h
index eff619a..dc0e556 100644
--- a/src/session_server.h
+++ b/src/session_server.h
@@ -234,12 +234,11 @@
 #define NC_PSPOLL_REPLY_ERROR 0x0008   /**< Response to the RPC was a \<rpc-reply\> of type error. */
 #define NC_PSPOLL_SESSION_TERM 0x0010  /**< Some session was terminated. */
 #define NC_PSPOLL_SESSION_ERROR 0x0020 /**< Some session was terminated incorrectly (not by a \<close-session\> or \<kill-session\> RPC). */
-#define NC_PSPOLL_PENDING 0x0040       /**< Unhandled pending events on other session. */
-#define NC_PSPOLL_ERROR 0x0080         /**< Other fatal errors (they are printed). */
+#define NC_PSPOLL_ERROR 0x0040         /**< Other fatal errors (they are printed). */
 
 #ifdef NC_ENABLED_SSH
-#   define NC_PSPOLL_SSH_MSG 0x0100       /**< SSH message received (and processed, if relevant, only with SSH support). */
-#   define NC_PSPOLL_SSH_CHANNEL 0x0200   /**< New SSH channel opened on an existing session (only with SSH support). */
+#   define NC_PSPOLL_SSH_MSG 0x0080       /**< SSH message received (and processed, if relevant, only with SSH support). */
+#   define NC_PSPOLL_SSH_CHANNEL 0x0100   /**< New SSH channel opened on an existing session (only with SSH support). */
 #endif
 
 /**
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 0fe95d9..56da365 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -1047,71 +1047,6 @@
     return 0;
 }
 
-int
-nc_ssh_pollin(struct nc_session *session, int timeout, int ssh_message)
-{
-    int ret;
-    struct nc_session *new;
-
-    if (ssh_message) {
-        ret = nc_timedlock(session->ti_lock, timeout, __func__);
-        if (ret < 0) {
-            return NC_PSPOLL_ERROR;
-        } else if (!ret) {
-            return NC_PSPOLL_TIMEOUT;
-        }
-
-        ret = ssh_execute_message_callbacks(session->ti.libssh.session);
-        pthread_mutex_unlock(session->ti_lock);
-
-        if (ret != SSH_OK) {
-            ERR("Session %u: failed to receive SSH messages (%s).", session->id,
-                ssh_get_error(session->ti.libssh.session));
-            session->status = NC_STATUS_INVALID;
-            session->term_reason = NC_SESSION_TERM_OTHER;
-            return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
-        }
-
-        /* new SSH message */
-        if (session->flags & NC_SESSION_SSH_NEW_MSG) {
-            session->flags &= ~NC_SESSION_SSH_NEW_MSG;
-            if (session->ti.libssh.next) {
-                for (new = session->ti.libssh.next; new != session; new = new->ti.libssh.next) {
-                    if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
-                            && (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
-                        /* new NETCONF SSH channel */
-                        return NC_PSPOLL_SSH_CHANNEL;
-                    }
-                }
-            }
-
-            /* just some SSH message */
-            return NC_PSPOLL_SSH_MSG;
-        }
-    }
-
-    /* no new SSH message, maybe NETCONF data? */
-    ret = ssh_channel_poll_timeout(session->ti.libssh.channel, 0, 0);
-    /* not this one */
-    if (!ret) {
-        return NC_PSPOLL_PENDING;
-    } else if (ret == SSH_ERROR) {
-        ERR("Session %u: SSH channel error (%s).", session->id,
-            ssh_get_error(session->ti.libssh.session));
-        session->status = NC_STATUS_INVALID;
-        session->term_reason = NC_SESSION_TERM_OTHER;
-        return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
-    } else if (ret == SSH_EOF) {
-        ERR("Session %u: communication channel unexpectedly closed (libssh).",
-            session->id);
-        session->status = NC_STATUS_INVALID;
-        session->term_reason = NC_SESSION_TERM_DROPPED;
-        return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
-    }
-
-    return NC_PSPOLL_RPC;
-}
-
 API NC_MSG_TYPE
 nc_connect_callhome_ssh(const char *host, uint16_t port, struct nc_session **session)
 {