mod_netconf: notification history

Send notification history from given time slot.
Still causes segfaults sometimes.
- refs #993
diff --git a/src/mod_netconf.c b/src/mod_netconf.c
index b1c8555..4389125 100644
--- a/src/mod_netconf.c
+++ b/src/mod_netconf.c
@@ -107,8 +107,12 @@
 module AP_MODULE_DECLARE_DATA netconf_module;
 
 pthread_rwlock_t session_lock; /**< mutex protecting netconf_sessions_list from multiple access errors */
+pthread_mutex_t ntf_history_lock; /**< mutex protecting notification history list */
 apr_hash_t *netconf_sessions_list = NULL;
 
+json_object *notif_history_array = NULL;
+server_rec *http_server = NULL;
+
 volatile int isterminated = 0;
 
 static char* password;
@@ -408,6 +412,58 @@
 	}
 }
 
+int netconf_process_op(server_rec* server, struct nc_session *session, nc_rpc* rpc)
+{
+	nc_reply* reply = NULL;
+	int retval = EXIT_SUCCESS;
+	NC_MSG_TYPE msgt;
+	NC_REPLY_TYPE replyt;
+
+	/* check requests */
+	if (rpc == NULL) {
+		ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: rpc is not created");
+		return (EXIT_FAILURE);
+	}
+
+	if (session != NULL) {
+		/* send the request and get the reply */
+		msgt = nc_session_send_recv(session, rpc, &reply);
+		/* process the result of the operation */
+		switch (msgt) {
+		case NC_MSG_UNKNOWN:
+			if (nc_session_get_status(session) != NC_SESSION_STATUS_WORKING) {
+				ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: receiving rpc-reply failed");
+				return (EXIT_FAILURE);
+			}
+			/* no break */
+		case NC_MSG_NONE:
+			/* there is error handled by callback */
+			return (EXIT_FAILURE);
+			break;
+		case NC_MSG_REPLY:
+			switch (replyt = nc_reply_get_type(reply)) {
+			case NC_REPLY_OK:
+				retval = EXIT_SUCCESS;
+				break;
+			default:
+				ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: unexpected rpc-reply (%d)", replyt);
+				retval = EXIT_FAILURE;
+				break;
+			}
+			break;
+		default:
+			ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "mod_netconf: unexpected reply message received (%d)", msgt);
+			retval = EXIT_FAILURE;
+			break;
+		}
+		nc_reply_free(reply);
+		return (retval);
+	} else {
+		ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "Unknown session to process.");
+		return (EXIT_FAILURE);
+	}
+}
+
 int netconf_op(server_rec* server, const char* session_key, nc_rpc* rpc)
 {
 	struct nc_session *session = NULL;
@@ -1413,6 +1469,104 @@
 	return reply;
 }
 
+void notification_history(time_t eventtime, const char *content)
+{
+	ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "Got notification from history.");
+	json_object *notif = json_object_new_object();
+	if (notif == NULL) {
+		ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "Could not allocate memory for notification (json).");
+		return;
+	}
+	json_object_object_add(notif, "eventtime", json_object_new_int64(eventtime));
+	json_object_object_add(notif, "content", json_object_new_string(content));
+	json_object_array_add(notif_history_array, notif);
+}
+
+json_object *handle_op_ntfgethistory(server_rec *server, apr_pool_t *pool, json_object *request, const char *session_key)
+{
+	json_object *reply = NULL;
+	const char *sid = NULL;
+	struct session_with_mutex *locked_session = NULL;
+	struct nc_session *temp_session = NULL;
+	nc_rpc *rpc = NULL;
+	time_t start = 0;
+	time_t stop = 0;
+	int64_t from, to;
+
+	ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Request: get notification history, session %s", session_key);
+
+	sid = json_object_get_string(json_object_object_get(request, "session"));
+	from = json_object_get_int64(json_object_object_get(request, "from"));
+	to = json_object_get_int64(json_object_object_get(request, "to"));
+
+	start = time(NULL) + from;
+	stop = time(NULL) + to;
+
+	ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "notification history interval %li %li", (long int) from, (long int) to);
+
+	if (sid == NULL) {
+		return create_error("Missing session parameter.");
+	}
+
+	if (pthread_rwlock_rdlock(&session_lock) != 0) {
+		ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+		return NULL;
+	}
+
+	locked_session = (struct session_with_mutex *)apr_hash_get(netconf_sessions_list, session_key, APR_HASH_KEY_STRING);
+	if (locked_session != NULL) {
+		pthread_mutex_lock(&locked_session->lock);
+		if (pthread_rwlock_unlock(&session_lock) != 0) {
+			ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+		}
+		ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "creating temporal NC session.");
+		temp_session = nc_session_connect_channel(locked_session->session, NULL);
+		if (temp_session != NULL) {
+			rpc = nc_rpc_subscribe(NULL /* stream */, NULL /* filter */, &start, &stop);
+			if (rpc == NULL) {
+				ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "notifications: creating an rpc request failed.");
+				return create_error("notifications: creating an rpc request failed.");
+			}
+
+			ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Send NC subscribe.");
+			/** \todo replace with sth like netconf_op(http_server, session_hash, rpc) */
+			if (netconf_process_op(server, temp_session, rpc) != 0) {
+				ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Subscription RPC failed.");
+				return create_error("Subscription RPC failed.");
+			}
+			rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
+
+			pthread_mutex_unlock(&locked_session->lock);
+			pthread_mutex_lock(&ntf_history_lock);
+			notif_history_array = json_object_new_array();
+
+			ncntf_dispatch_receive(temp_session, notification_history);
+
+			reply = json_object_new_object();
+			json_object_object_add(reply, "notifications", notif_history_array);
+			//json_object_put(notif_history_array);
+
+			pthread_mutex_unlock(&ntf_history_lock);
+			ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "closing temporal NC session.");
+			nc_session_close(temp_session, NC_SESSION_TERM_CLOSED);
+		} else {
+			ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Get history of notification failed due to channel establishment");
+			reply = create_error("Get history of notification was unsuccessful, connection failed.");
+		}
+		pthread_mutex_unlock(&locked_session->lock);
+	} else {
+		if (pthread_rwlock_unlock(&session_lock) != 0) {
+			ap_log_error (APLOG_MARK, APLOG_DEBUG, 0, server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+		}
+		reply = create_error("Invalid session identifier.");
+	}
+
+	if ((reply == NULL) && (locked_session->hello_message != NULL)) {
+		reply = locked_session->hello_message;
+	}
+	return reply;
+}
+
 void * thread_routine (void * arg)
 {
 	void * retval = NULL;
@@ -1581,6 +1735,9 @@
 			case MSG_GENERIC:
 				reply = handle_op_generic(server, pool, request, session_key);
 				break;
+			case MSG_NTF_GETHISTORY:
+				reply = handle_op_ntfgethistory(server, pool, request, session_key);
+				break;
 			default:
 				ap_log_error(APLOG_MARK, APLOG_ERR, 0, server, "Unknown mod_netconf operation requested (%d)", operation);
 				reply = create_error("Operation not supported.");
@@ -1750,6 +1907,8 @@
 	char use_notifications = 0;
 	#endif
 
+	http_server = server;
+
 	/* wait at most 5 seconds for every thread to terminate */
 	maxtime.tv_sec = 5;
 	maxtime.tv_nsec = 0;