mod_netconf: notification history
Send notification history from given time slot.
Still causes segfaults sometimes.
- refs #993
diff --git a/src/mod_netconf.c b/src/mod_netconf.c
index b1c8555..4389125 100644
--- a/src/mod_netconf.c
+++ b/src/mod_netconf.c
@@ -107,8 +107,12 @@
module AP_MODULE_DECLARE_DATA netconf_module;
pthread_rwlock_t session_lock; /**< mutex protecting netconf_sessions_list from multiple access errors */
+pthread_mutex_t ntf_history_lock; /**< mutex protecting notification history list */
apr_hash_t *netconf_sessions_list = NULL;
+json_object *notif_history_array = NULL;
+server_rec *http_server = NULL;
+
volatile int isterminated = 0;
static char* password;
@@ -408,6 +412,58 @@
}
}
+int netconf_process_op(server_rec* server, struct nc_session *session, nc_rpc* rpc)
+{
+ nc_reply* reply = NULL;
+ int retval = EXIT_SUCCESS;
+ NC_MSG_TYPE msgt;
+ NC_REPLY_TYPE replyt;
+
+ /* check requests */
+ if (rpc == NULL) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: rpc is not created");
+ return (EXIT_FAILURE);
+ }
+
+ if (session != NULL) {
+ /* send the request and get the reply */
+ msgt = nc_session_send_recv(session, rpc, &reply);
+ /* process the result of the operation */
+ switch (msgt) {
+ case NC_MSG_UNKNOWN:
+ if (nc_session_get_status(session) != NC_SESSION_STATUS_WORKING) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: receiving rpc-reply failed");
+ return (EXIT_FAILURE);
+ }
+ /* no break */
+ case NC_MSG_NONE:
+ /* there is error handled by callback */
+ return (EXIT_FAILURE);
+ break;
+ case NC_MSG_REPLY:
+ switch (replyt = nc_reply_get_type(reply)) {
+ case NC_REPLY_OK:
+ retval = EXIT_SUCCESS;
+ break;
+ default:
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: unexpected rpc-reply (%d)", replyt);
+ retval = EXIT_FAILURE;
+ break;
+ }
+ break;
+ default:
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: unexpected reply message received (%d)", msgt);
+ retval = EXIT_FAILURE;
+ break;
+ }
+ nc_reply_free(reply);
+ return (retval);
+ } else {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "Unknown session to process.");
+ return (EXIT_FAILURE);
+ }
+}
+
int netconf_op(server_rec* server, const char* session_key, nc_rpc* rpc)
{
struct nc_session *session = NULL;
@@ -1413,6 +1469,104 @@
return reply;
}
+void notification_history(time_t eventtime, const char *content)
+{
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "Got notification from history.");
+ json_object *notif = json_object_new_object();
+ if (notif == NULL) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "Could not allocate memory for notification (json).");
+ return;
+ }
+ json_object_object_add(notif, "eventtime", json_object_new_int64(eventtime));
+ json_object_object_add(notif, "content", json_object_new_string(content));
+ json_object_array_add(notif_history_array, notif);
+}
+
+json_object *handle_op_ntfgethistory(server_rec *server, apr_pool_t *pool, json_object *request, const char *session_key)
+{
+ json_object *reply = NULL;
+ const char *sid = NULL;
+ struct session_with_mutex *locked_session = NULL;
+ struct nc_session *temp_session = NULL;
+ nc_rpc *rpc = NULL;
+ time_t start = 0;
+ time_t stop = 0;
+ int64_t from, to;
+
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Request: get notification history, session %s", session_key);
+
+ sid = json_object_get_string(json_object_object_get(request, "session"));
+ from = json_object_get_int64(json_object_object_get(request, "from"));
+ to = json_object_get_int64(json_object_object_get(request, "to"));
+
+ start = time(NULL) + from;
+ stop = time(NULL) + to;
+
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "notification history interval %li %li", (long int) from, (long int) to);
+
+ if (sid == NULL) {
+ return create_error("Missing session parameter.");
+ }
+
+ if (pthread_rwlock_rdlock(&session_lock) != 0) {
+ ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+ return NULL;
+ }
+
+ locked_session = (struct session_with_mutex *)apr_hash_get(netconf_sessions_list, session_key, APR_HASH_KEY_STRING);
+ if (locked_session != NULL) {
+ pthread_mutex_lock(&locked_session->lock);
+ if (pthread_rwlock_unlock(&session_lock) != 0) {
+ ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "creating temporal NC session.");
+ temp_session = nc_session_connect_channel(locked_session->session, NULL);
+ if (temp_session != NULL) {
+ rpc = nc_rpc_subscribe(NULL /* stream */, NULL /* filter */, &start, &stop);
+ if (rpc == NULL) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "notifications: creating an rpc request failed.");
+ return create_error("notifications: creating an rpc request failed.");
+ }
+
+ ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Send NC subscribe.");
+ /** \todo replace with sth like netconf_op(http_server, session_hash, rpc) */
+ if (netconf_process_op(server, temp_session, rpc) != 0) {
+ ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Subscription RPC failed.");
+ return create_error("Subscription RPC failed.");
+ }
+ rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
+
+ pthread_mutex_unlock(&locked_session->lock);
+ pthread_mutex_lock(&ntf_history_lock);
+ notif_history_array = json_object_new_array();
+
+ ncntf_dispatch_receive(temp_session, notification_history);
+
+ reply = json_object_new_object();
+ json_object_object_add(reply, "notifications", notif_history_array);
+ //json_object_put(notif_history_array);
+
+ pthread_mutex_unlock(&ntf_history_lock);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "closing temporal NC session.");
+ nc_session_close(temp_session, NC_SESSION_TERM_CLOSED);
+ } else {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Get history of notification failed due to channel establishment");
+ reply = create_error("Get history of notification was unsuccessful, connection failed.");
+ }
+ pthread_mutex_unlock(&locked_session->lock);
+ } else {
+ if (pthread_rwlock_unlock(&session_lock) != 0) {
+ ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+ }
+ reply = create_error("Invalid session identifier.");
+ }
+
+ if ((reply == NULL) && (locked_session->hello_message != NULL)) {
+ reply = locked_session->hello_message;
+ }
+ return reply;
+}
+
void * thread_routine (void * arg)
{
void * retval = NULL;
@@ -1581,6 +1735,9 @@
case MSG_GENERIC:
reply = handle_op_generic(server, pool, request, session_key);
break;
+ case MSG_NTF_GETHISTORY:
+ reply = handle_op_ntfgethistory(server, pool, request, session_key);
+ break;
default:
ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "Unknown mod_netconf operation requested (%d)", operation);
reply = create_error("Operation not supported.");
@@ -1750,6 +1907,8 @@
char use_notifications = 0;
#endif
+ http_server = server;
+
/* wait at most 5 seconds for every thread to terminate */
maxtime.tv_sec = 5;
maxtime.tv_nsec = 0;