mod_netconf: handle empty notification queue
diff --git a/src/mod_netconf.c b/src/mod_netconf.c
index 932fb1f..34eb93e 100644
--- a/src/mod_netconf.c
+++ b/src/mod_netconf.c
@@ -253,6 +253,7 @@
return NULL;
}
locked_session->notifications = apr_array_make(pool, NOTIFICATION_QUEUE_SIZE, sizeof(notification_t));
+ locked_session->ntfc_subscribed = 0;
ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, server, "Add connection to the list");
apr_hash_set(conns, apr_pstrdup(pool, session_key), APR_HASH_KEY_STRING, (void *) locked_session);
ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, server, "Before session_unlock");
diff --git a/src/mod_netconf.h b/src/mod_netconf.h
index bc97002..6bc297e 100644
--- a/src/mod_netconf.h
+++ b/src/mod_netconf.h
@@ -67,6 +67,7 @@
struct session_with_mutex {
struct nc_session * session; /**< netconf session */
apr_array_header_t *notifications;
+ char ntfc_subscribed; /**< 0 when notifications are not subscribed */
apr_time_t last_activity;
pthread_mutex_t lock; /**< mutex protecting the session from multiple access */
};
diff --git a/src/notification-server.c b/src/notification-server.c
index 0fdbba4..2f02a81 100644
--- a/src/notification-server.c
+++ b/src/notification-server.c
@@ -54,8 +54,6 @@
static struct libwebsocket_context *context = NULL;
static server_rec *http_server = NULL;
-extern pthread_rwlock_t session_lock; /**< mutex protecting netconf_session_list from multiple access errors */
-
struct ntf_thread_config {
struct nc_session *session;
char *session_hash;
@@ -514,8 +512,6 @@
return;
}
ntf->eventtime = time(NULL);
- //ntf->content = strdup("notifikace");
- //ntf->eventtime = eventtime;
ntf->content = strdup(content);
if (http_server != NULL) {
@@ -634,6 +630,7 @@
return (EXIT_FAILURE);
}
rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
+ locked_session->ntfc_subscribed = 1;
tconfig = malloc(sizeof(struct ntf_thread_config));
tconfig->session = session;
@@ -642,6 +639,7 @@
ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating libnetconf notification thread (%s).",
tconfig->session_hash);
}
+
if (pthread_create(&thread, NULL, notification_thread, tconfig) != 0) {
#ifndef TEST_NOTIFICATION_SERVER
if (http_server != NULL) {
@@ -665,12 +663,6 @@
unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
struct per_session_data__notif_client *pss = (struct per_session_data__notif_client *)user;
- //#ifndef TEST_NOTIFICATION_SERVER
- //if (http_server != NULL) {
- // ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "libwebsockets callback_notification");
- //}
- //#endif
-
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
@@ -679,15 +671,9 @@
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
-// get_client_notification();
if (pss->session_key == NULL) {
return 0;
}
- /*
- if (http_server != NULL) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: prepare to SENDING");
- }
- */
struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
if (ls == NULL) {
@@ -713,23 +699,23 @@
pthread_mutex_unlock(&ls->lock);
return -1;
}
+ if (!apr_is_empty_array(ls->notifications)) {
- if (http_server != NULL) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications for session");
- }
+ if (http_server != NULL) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications for session");
+ }
- while ((notif = (notification_t *) apr_array_pop(ls->notifications)) != NULL) {
- char t[128];
- t[0] = 0;
- strftime(t, sizeof(t), "%c", localtime(¬if->eventtime));
- n = 0;
- n = sprintf((char *)p, "%s\n", notif->content);
- m = libwebsocket_write(wsi, p, n, LWS_WRITE_TEXT);
- //free(notif->content);
- //free(notif);
- }
- if (http_server != NULL) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications done");
+ while ((notif = (notification_t *) apr_array_pop(ls->notifications)) != NULL) {
+ char t[128];
+ t[0] = 0;
+ strftime(t, sizeof(t), "%c", localtime(¬if->eventtime));
+ n = 0;
+ n = sprintf((char *)p, "%s\n", notif->content);
+ m = libwebsocket_write(wsi, p, n, LWS_WRITE_TEXT);
+ }
+ if (http_server != NULL) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications done");
+ }
}
if (pthread_mutex_unlock(&ls->lock) != 0) {
@@ -739,9 +725,9 @@
}
#endif
}
- if (http_server != NULL) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: unlocked session lock");
- }
+ //if (http_server != NULL) {
+ // ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: unlocked session lock");
+ //}
if (m < n) {
@@ -755,7 +741,6 @@
break;
case LWS_CALLBACK_RECEIVE:
-// fprintf(stderr, "rx %d\n", (int)len);
#ifndef TEST_NOTIFICATION_SERVER
if (http_server != NULL) {
ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "received: (%s)", (char *)in);
@@ -774,7 +759,6 @@
ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: get key (%s) from (%s) (%i,%i)", pss->session_key, (char *) in, (int) start, (int) stop);
}
- /* TODO subscribe, map with ncnotif print callback */
struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
if (ls == NULL) {
if (http_server != NULL) {
@@ -782,10 +766,23 @@
}
return 0;
}
+ if (ls->ntfc_subscribed != 0) {
+ ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: already subscribed");
+ return 0;
+ }
if (http_server != NULL) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: prepare to subscribe stream");
}
+
+ //if (pthread_rwlock_rdlock (&session_lock) != 0) {
+ // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
+ // return EXIT_FAILURE;
+ //}
notif_subscribe(ls, pss->session_key, (time_t) start, (time_t) stop);
+ //if (pthread_rwlock_unlock (&session_lock) != 0) {
+ // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
+ // return EXIT_FAILURE;
+ //}
}
if (len < 6)
break;