server session CHANGE make ps poll transport-specific
This means it must be active polling, but should
be always reliable.
Fixes cesnet/netopeer2#14
diff --git a/src/session.c b/src/session.c
index 1d90e91..2ac3a46 100644
--- a/src/session.c
+++ b/src/session.c
@@ -64,6 +64,30 @@
#endif
}
+/* ts1 < ts2, returns milliseconds */
+uint32_t
+nc_difftimespec(struct timespec *ts1, struct timespec *ts2)
+{
+ uint64_t nsec_diff = 0;
+
+ if (ts1->tv_nsec > ts2->tv_nsec) {
+ ts2->tv_nsec += 1000000000L;
+ --ts2->tv_sec;
+ }
+
+ if (ts1->tv_sec <= ts2->tv_sec) {
+ nsec_diff += (ts2->tv_sec - ts1->tv_sec) * 1000000000L;
+ } else {
+ ERRINT;
+ }
+
+ if (ts1->tv_nsec < ts2->tv_nsec) {
+ nsec_diff += ts2->tv_nsec - ts1->tv_nsec;
+ }
+
+ return (nsec_diff ? nsec_diff / 1000000L : 0);
+}
+
#ifndef HAVE_PTHREAD_MUTEX_TIMEDLOCK
int
pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abstime)
diff --git a/src/session_p.h b/src/session_p.h
index 5358bb8..4f3e790 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -295,9 +295,9 @@
/* ACCESS locked */
struct nc_pollsession {
- struct pollfd *pfds;
struct nc_session **sessions;
uint16_t session_count;
+ uint16_t last_event_session;
pthread_cond_t cond;
pthread_mutex_t lock;
@@ -321,6 +321,8 @@
int nc_gettimespec(struct timespec *ts);
+uint32_t nc_difftimespec(struct timespec *ts1, struct timespec *ts2);
+
int nc_timedlock(pthread_mutex_t *lock, int timeout, const char *func);
int nc_ps_lock(struct nc_pollsession *ps, uint8_t *id, const char *func);
@@ -487,24 +489,6 @@
*/
int nc_sshcb_msg(ssh_session sshsession, ssh_message msg, void *data);
-/**
- * @brief Inspect what exactly happened if a SSH session socket poll
- * returned POLLIN.
- *
- * @param[in] session NETCONF session communicating on the socket.
- * @param[in] timeout Timeout for locking ti_lock.
- * @param[in] ssh_message Whether to also check for standard SSH messages or just do
- * application data poll.
- * @return NC_PSPOLL_TIMEOUT,
- * NC_PSPOLL_RPC (has new data),
- * NC_PSPOLL_PENDING (other channel has data),
- * NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR,
- * NC_PSPOLL_SSH_MSG,
- * NC_PSPOLL_SSH_CHANNEL,
- * NC_PSPOLL_ERROR.
- */
-int nc_ssh_pollin(struct nc_session *session, int timeout, int ssh_message);
-
void nc_server_ssh_clear_opts(struct nc_server_ssh_opts *opts);
void nc_client_ssh_destroy_opts(void);
diff --git a/src/session_server.c b/src/session_server.c
index a3977ff..61f954d 100644
--- a/src/session_server.c
+++ b/src/session_server.c
@@ -703,7 +703,6 @@
ERR("FATAL: Freeing a pollsession structure that is currently being worked with!");
}
- free(ps->pfds);
free(ps->sessions);
pthread_mutex_destroy(&ps->lock);
pthread_cond_destroy(&ps->cond);
@@ -730,40 +729,13 @@
}
++ps->session_count;
- ps->pfds = nc_realloc(ps->pfds, ps->session_count * sizeof *ps->pfds);
ps->sessions = nc_realloc(ps->sessions, ps->session_count * sizeof *ps->sessions);
- if (!ps->pfds || !ps->sessions) {
+ if (!ps->sessions) {
ERRMEM;
/* UNLOCK */
nc_ps_unlock(ps, q_id, __func__);
return -1;
}
-
- switch (session->ti_type) {
- case NC_TI_FD:
- ps->pfds[ps->session_count - 1].fd = session->ti.fd.in;
- break;
-
-#ifdef NC_ENABLED_SSH
- case NC_TI_LIBSSH:
- ps->pfds[ps->session_count - 1].fd = ssh_get_fd(session->ti.libssh.session);
- break;
-#endif
-
-#ifdef NC_ENABLED_TLS
- case NC_TI_OPENSSL:
- ps->pfds[ps->session_count - 1].fd = SSL_get_rfd(session->ti.tls);
- break;
-#endif
-
- default:
- ERRINT;
- /* UNLOCK */
- nc_ps_unlock(ps, q_id, __func__);
- return -1;
- }
- ps->pfds[ps->session_count - 1].events = POLLIN;
- ps->pfds[ps->session_count - 1].revents = 0;
ps->sessions[ps->session_count - 1] = session;
/* UNLOCK */
@@ -785,12 +757,12 @@
--ps->session_count;
if (i < ps->session_count) {
ps->sessions[i] = ps->sessions[ps->session_count];
- memcpy(&ps->pfds[i], &ps->pfds[ps->session_count], sizeof *ps->pfds);
+ if (ps->last_event_session == i) {
+ ps->last_event_session = 0;
+ }
} else if (!ps->session_count) {
free(ps->sessions);
ps->sessions = NULL;
- free(ps->pfds);
- ps->pfds = NULL;
}
return 0;
}
@@ -1007,25 +979,32 @@
API int
nc_ps_poll(struct nc_pollsession *ps, int timeout, struct nc_session **session)
{
- int ret;
+ int ret, r, poll_ret;
uint8_t q_id;
- uint16_t i;
- time_t cur_time;
+ uint16_t i, j;
+ char msg[256];
+ NC_SESSION_TERM_REASON term_reason;
+ struct pollfd pfd;
+ struct timespec begin_ts, cur_ts;
struct nc_session *cur_session;
struct nc_server_rpc *rpc = NULL;
+#ifdef NC_ENABLED_SSH
+ struct nc_session *new;
+#endif
if (!ps || !ps->session_count) {
ERRARG("ps");
return NC_PSPOLL_ERROR;
}
- cur_time = time(NULL);
+ nc_gettimespec(&begin_ts);
/* LOCK */
if (nc_ps_lock(ps, &q_id, __func__)) {
return NC_PSPOLL_ERROR;
}
+ /* check that all session are fine */
for (i = 0; i < ps->session_count; ++i) {
if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
ERR("Session %u: session not running.", ps->sessions[i]->id);
@@ -1037,7 +1016,7 @@
}
/* TODO invalidate only sessions without subscription */
- if (server_opts.idle_timeout && (cur_time >= ps->sessions[i]->last_rpc + server_opts.idle_timeout)) {
+ if (server_opts.idle_timeout && (begin_ts.tv_sec >= ps->sessions[i]->last_rpc + server_opts.idle_timeout)) {
ERR("Session %u: session idle timeout elapsed.", ps->sessions[i]->id);
ps->sessions[i]->status = NC_STATUS_INVALID;
ps->sessions[i]->term_reason = NC_SESSION_TERM_TIMEOUT;
@@ -1047,122 +1026,211 @@
}
goto finish;
}
-
- if (ps->pfds[i].revents) {
- break;
- }
}
- if (i == ps->session_count) {
-#ifdef NC_ENABLED_SSH
-retry_poll:
-#endif
- /* no leftover event */
- i = 0;
- ret = poll(ps->pfds, ps->session_count, timeout);
- if (ret < 0) {
- ERR("Poll failed (%s).", strerror(errno));
- ret = NC_PSPOLL_ERROR;
- goto finish;
- } else if (!ret) {
- ret = NC_PSPOLL_TIMEOUT;
- goto finish;
+ /* poll on all the sessions one-by-one */
+ do {
+ /* loop from i to j */
+ if (ps->last_event_session == ps->session_count - 1) {
+ i = j = 0;
+ } else {
+ i = j = ps->last_event_session + 1;
}
- }
+ do {
+ cur_session = ps->sessions[i];
- /* find the first fd with POLLIN, we don't care if there are more now */
- for (; i < ps->session_count; ++i) {
- if (ps->pfds[i].revents & (POLLHUP | POLLNVAL)) {
- ERR("Session %u: communication socket unexpectedly closed.", ps->sessions[i]->id);
- ps->sessions[i]->status = NC_STATUS_INVALID;
- ps->sessions[i]->term_reason = NC_SESSION_TERM_DROPPED;
- ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
- if (session) {
- *session = ps->sessions[i];
- }
- goto finish;
- } else if (ps->pfds[i].revents & POLLERR) {
- ERR("Session %u: communication socket error.", ps->sessions[i]->id);
- ps->sessions[i]->status = NC_STATUS_INVALID;
- ps->sessions[i]->term_reason = NC_SESSION_TERM_OTHER;
- ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
- if (session) {
- *session = ps->sessions[i];
- }
- goto finish;
- } else if (ps->pfds[i].revents & POLLIN) {
+ switch (cur_session->ti_type) {
#ifdef NC_ENABLED_SSH
- if (ps->sessions[i]->ti_type == NC_TI_LIBSSH) {
- uint16_t j;
-
- /* things are not that simple with SSH... */
- ret = nc_ssh_pollin(ps->sessions[i], timeout, 1);
-
- /* clear POLLIN on sessions sharing this session's SSH session */
- if (ret & (NC_PSPOLL_RPC | NC_PSPOLL_SSH_MSG | NC_PSPOLL_SSH_CHANNEL)) {
- for (j = i + 1; j < ps->session_count; ++j) {
- if (ps->pfds[j].fd == ps->pfds[i].fd) {
- ps->pfds[j].revents = 0;
- }
+ case NC_TI_LIBSSH:
+ r = ssh_channel_poll_timeout(cur_session->ti.libssh.channel, 0, 0);
+ if (r < 1) {
+ if (r == SSH_EOF) {
+ sprintf(msg, "SSH channel unexpectedly closed");
+ term_reason = NC_SESSION_TERM_DROPPED;
+ poll_ret = -2;
+ } else if (r == SSH_ERROR) {
+ sprintf(msg, "SSH channel poll error (%s)", ssh_get_error(cur_session->ti.libssh.session));
+ poll_ret = -1;
+ } else {
+ poll_ret = 0;
}
+ break;
}
+ /* we have some data, but it may be just an SSH message */
- /* SSH message only */
- if (!(ret & (NC_PSPOLL_RPC | NC_PSPOLL_PENDING))) {
- ps->pfds[i].revents = 0;
+ r = nc_timedlock(cur_session->ti_lock, timeout, __func__);
+ if (r < 0) {
if (session) {
- *session = ps->sessions[i];
+ *session = cur_session;
}
+ ret = NC_PSPOLL_ERROR;
goto finish;
-
- /* event occurred on some other channel */
- } else if (ret & NC_PSPOLL_PENDING) {
- ps->pfds[i].revents = 0;
- if (i == ps->session_count - 1) {
- /* last session and it is not the right channel, ... */
- if (!timeout) {
- /* ... timeout is 0, so that is it */
- ret = NC_PSPOLL_TIMEOUT;
- goto finish;
- }
- /* ... retry polling reasonable time apart ... */
- usleep(NC_TIMEOUT_STEP);
- if (timeout > 0) {
- /* ... and decrease timeout, if not -1 */
- timeout -= NC_TIMEOUT_STEP * 1000;
- }
- goto retry_poll;
+ } else if (!r) {
+ if (session) {
+ *session = cur_session;
}
- /* check other sessions */
- continue;
+ ret = NC_PSPOLL_TIMEOUT;
+ goto finish;
}
+ r = ssh_execute_message_callbacks(cur_session->ti.libssh.session);
+ pthread_mutex_unlock(cur_session->ti_lock);
+
+ if (r != SSH_OK) {
+ sprintf(msg, "failed to receive SSH messages (%s)", ssh_get_error(cur_session->ti.libssh.session));
+ term_reason = NC_SESSION_TERM_OTHER;
+ poll_ret = -1;
+ } else if (cur_session->flags & NC_SESSION_SSH_NEW_MSG) {
+ /* new SSH message */
+ cur_session->flags &= ~NC_SESSION_SSH_NEW_MSG;
+ if (cur_session->ti.libssh.next) {
+ for (new = cur_session->ti.libssh.next; new != cur_session; new = new->ti.libssh.next) {
+ if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
+ && (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
+ /* new NETCONF SSH channel */
+ if (session) {
+ *session = cur_session;
+ }
+ ret = NC_PSPOLL_SSH_CHANNEL;
+ goto finish;
+ }
+ }
+ }
+
+ /* just some SSH message */
+ if (session) {
+ *session = cur_session;
+ }
+ ret = NC_PSPOLL_SSH_MSG;
+ goto finish;
+ } else {
+ /* we have some application data */
+ poll_ret = 1;
+ }
+ break;
+#endif
+#ifdef NC_ENABLED_TLS
+ case NC_TI_OPENSSL:
+ r = SSL_pending(cur_session->ti.tls);
+ if (!r) {
+ /* no data pending in the SSL buffer, poll fd */
+ pfd.fd = SSL_get_rfd(cur_session->ti.tls);
+ if (pfd.fd < 0) {
+ ERRINT;
+ ret = NC_PSPOLL_ERROR;
+ goto finish;
+ }
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ r = poll(&pfd, 1, 0);
+
+ if (r < 0) {
+ sprintf(msg, "poll failed (%s)", strerror(errno));
+ poll_ret = -1;
+ } else if (r > 0) {
+ if (pfd.revents & (POLLHUP | POLLNVAL)) {
+ sprintf(msg, "communication socket unexpectedly closed");
+ term_reason = NC_SESSION_TERM_DROPPED;
+ poll_ret = -2;
+ } else if (pfd.revents & POLLERR) {
+ sprintf(msg, "communication socket error");
+ term_reason = NC_SESSION_TERM_OTHER;
+ poll_ret = -2;
+ } else {
+ poll_ret = 1;
+ }
+ } else {
+ poll_ret = 0;
+ }
+ } else {
+ poll_ret = 1;
+ }
+ break;
+#endif
+ case NC_TI_FD:
+ pfd.fd = cur_session->ti.fd.in;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ r = poll(&pfd, 1, 0);
+
+ if (r < 0) {
+ sprintf(msg, "poll failed (%s)", strerror(errno));
+ poll_ret = 0;
+ } else if (r > 0) {
+ if (pfd.revents & (POLLHUP | POLLNVAL)) {
+ sprintf(msg, "communication socket unexpectedly closed");
+ term_reason = NC_SESSION_TERM_DROPPED;
+ poll_ret = -2;
+ } else if (pfd.revents & POLLERR) {
+ sprintf(msg, "communication socket error");
+ term_reason = NC_SESSION_TERM_OTHER;
+ poll_ret = -2;
+ } else {
+ poll_ret = 1;
+ }
+ } else {
+ poll_ret = 0;
+ }
+ break;
+ case NC_TI_NONE:
+ ERRINT;
+ ret = NC_PSPOLL_ERROR;
+ goto finish;
}
-#endif /* NC_ENABLED_SSH */
- /* we are going to process it now */
- ps->pfds[i].revents = 0;
- break;
+ /* here: poll_ret == -2 - session error, session terminated,
+ * poll_ret == -1 - generic error,
+ * poll_ret == 0 - nothing to read,
+ * poll_ret > 0 - data available */
+ if (poll_ret == -2) {
+ ERR("Session %u: %s.", cur_session->id, msg);
+ cur_session->status = NC_STATUS_INVALID;
+ cur_session->term_reason = term_reason;
+ if (session) {
+ *session = cur_session;
+ }
+ ret = NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
+ goto finish;
+ } else if (poll_ret == -1) {
+ ERR("Session %u: %s.", cur_session->id, msg);
+ ret = NC_PSPOLL_ERROR;
+ goto finish;
+ } else if (poll_ret > 0) {
+ break;
+ }
+
+ /* next iteration */
+ if (i == ps->session_count - 1) {
+ i = 0;
+ } else {
+ ++i;
+ }
+ } while (i != j);
+
+ /* no event */
+ if (!poll_ret && (timeout > -1)) {
+ usleep(NC_TIMEOUT_STEP);
+
+ nc_gettimespec(&cur_ts);
+ /* final timeout */
+ if (nc_difftimespec(&begin_ts, &cur_ts) >= (unsigned)timeout) {
+ ret = NC_PSPOLL_TIMEOUT;
+ goto finish;
+ }
}
- }
-
- if (i == ps->session_count) {
- ERRINT;
- ret = NC_PSPOLL_ERROR;
- goto finish;
- }
+ } while (!poll_ret);
/* this is the session with some data available for reading */
- cur_session = ps->sessions[i];
if (session) {
*session = cur_session;
}
+ ps->last_event_session = i;
/* reading an RPC and sending a reply must be atomic (no other RPC should be read) */
- ret = nc_timedlock(cur_session->ti_lock, timeout, __func__);
- if (ret < 0) {
+ r = nc_timedlock(cur_session->ti_lock, timeout, __func__);
+ if (r < 0) {
ret = NC_PSPOLL_ERROR;
goto finish;
- } else if (!ret) {
+ } else if (!r) {
ret = NC_PSPOLL_TIMEOUT;
goto finish;
}
@@ -1191,25 +1259,6 @@
nc_server_rpc_free(rpc, server_opts.ctx);
-#ifdef NC_ENABLED_SSH
- /* is there any data received but not processed? */
- if (cur_session->ti_type == NC_TI_LIBSSH) {
- /* ignore errors */
- if (nc_ssh_pollin(cur_session, 0, 0) == NC_PSPOLL_RPC) {
- ps->pfds[i].revents = POLLIN;
- ret |= NC_PSPOLL_PENDING;
- }
- }
-#endif /* NC_ENABLED_SSH */
-
- /* is there some other socket waiting? */
- for (++i; i < ps->session_count; ++i) {
- if (ps->pfds[i].revents) {
- ret |= NC_PSPOLL_PENDING;
- break;
- }
- }
-
finish:
/* UNLOCK */
nc_ps_unlock(ps, q_id, __func__);
@@ -1239,9 +1288,8 @@
}
free(ps->sessions);
ps->sessions = NULL;
- free(ps->pfds);
- ps->pfds = NULL;
ps->session_count = 0;
+ ps->last_event_session = 0;
} else {
for (i = 0; i < ps->session_count; ) {
if (ps->sessions[i]->status != NC_STATUS_RUNNING) {
diff --git a/src/session_server.h b/src/session_server.h
index eff619a..dc0e556 100644
--- a/src/session_server.h
+++ b/src/session_server.h
@@ -234,12 +234,11 @@
#define NC_PSPOLL_REPLY_ERROR 0x0008 /**< Response to the RPC was a \<rpc-reply\> of type error. */
#define NC_PSPOLL_SESSION_TERM 0x0010 /**< Some session was terminated. */
#define NC_PSPOLL_SESSION_ERROR 0x0020 /**< Some session was terminated incorrectly (not by a \<close-session\> or \<kill-session\> RPC). */
-#define NC_PSPOLL_PENDING 0x0040 /**< Unhandled pending events on other session. */
-#define NC_PSPOLL_ERROR 0x0080 /**< Other fatal errors (they are printed). */
+#define NC_PSPOLL_ERROR 0x0040 /**< Other fatal errors (they are printed). */
#ifdef NC_ENABLED_SSH
-# define NC_PSPOLL_SSH_MSG 0x0100 /**< SSH message received (and processed, if relevant, only with SSH support). */
-# define NC_PSPOLL_SSH_CHANNEL 0x0200 /**< New SSH channel opened on an existing session (only with SSH support). */
+# define NC_PSPOLL_SSH_MSG 0x0080 /**< SSH message received (and processed, if relevant, only with SSH support). */
+# define NC_PSPOLL_SSH_CHANNEL 0x0100 /**< New SSH channel opened on an existing session (only with SSH support). */
#endif
/**
diff --git a/src/session_server_ssh.c b/src/session_server_ssh.c
index 0fe95d9..56da365 100644
--- a/src/session_server_ssh.c
+++ b/src/session_server_ssh.c
@@ -1047,71 +1047,6 @@
return 0;
}
-int
-nc_ssh_pollin(struct nc_session *session, int timeout, int ssh_message)
-{
- int ret;
- struct nc_session *new;
-
- if (ssh_message) {
- ret = nc_timedlock(session->ti_lock, timeout, __func__);
- if (ret < 0) {
- return NC_PSPOLL_ERROR;
- } else if (!ret) {
- return NC_PSPOLL_TIMEOUT;
- }
-
- ret = ssh_execute_message_callbacks(session->ti.libssh.session);
- pthread_mutex_unlock(session->ti_lock);
-
- if (ret != SSH_OK) {
- ERR("Session %u: failed to receive SSH messages (%s).", session->id,
- ssh_get_error(session->ti.libssh.session));
- session->status = NC_STATUS_INVALID;
- session->term_reason = NC_SESSION_TERM_OTHER;
- return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
- }
-
- /* new SSH message */
- if (session->flags & NC_SESSION_SSH_NEW_MSG) {
- session->flags &= ~NC_SESSION_SSH_NEW_MSG;
- if (session->ti.libssh.next) {
- for (new = session->ti.libssh.next; new != session; new = new->ti.libssh.next) {
- if ((new->status == NC_STATUS_STARTING) && new->ti.libssh.channel
- && (new->flags & NC_SESSION_SSH_SUBSYS_NETCONF)) {
- /* new NETCONF SSH channel */
- return NC_PSPOLL_SSH_CHANNEL;
- }
- }
- }
-
- /* just some SSH message */
- return NC_PSPOLL_SSH_MSG;
- }
- }
-
- /* no new SSH message, maybe NETCONF data? */
- ret = ssh_channel_poll_timeout(session->ti.libssh.channel, 0, 0);
- /* not this one */
- if (!ret) {
- return NC_PSPOLL_PENDING;
- } else if (ret == SSH_ERROR) {
- ERR("Session %u: SSH channel error (%s).", session->id,
- ssh_get_error(session->ti.libssh.session));
- session->status = NC_STATUS_INVALID;
- session->term_reason = NC_SESSION_TERM_OTHER;
- return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
- } else if (ret == SSH_EOF) {
- ERR("Session %u: communication channel unexpectedly closed (libssh).",
- session->id);
- session->status = NC_STATUS_INVALID;
- session->term_reason = NC_SESSION_TERM_DROPPED;
- return NC_PSPOLL_SESSION_TERM | NC_PSPOLL_SESSION_ERROR;
- }
-
- return NC_PSPOLL_RPC;
-}
-
API NC_MSG_TYPE
nc_connect_callhome_ssh(const char *host, uint16_t port, struct nc_session **session)
{