blob: 1f2f7f2037a860020b662b0020096a5d086df607 [file] [log] [blame]
Tomas Cejkad340dbf2013-03-24 20:36:57 +01001/*
2 * libwebsockets-test-server - libwebsockets test implementation
3 *
4 * Copyright (C) 2010-2011 Andy Green <andy@warmcat.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation:
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21#include <stdio.h>
22#include <stdlib.h>
23#include <unistd.h>
24#include <getopt.h>
25#include <string.h>
26#include <sys/time.h>
27#include <sys/stat.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020028#include <sys/queue.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010029#include <fcntl.h>
30#include <assert.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020031#include <pthread.h>
32#include <libnetconf.h>
33#include <libwebsockets.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010034#include "notification_module.h"
Tomas Cejkaba21b382013-04-13 02:37:32 +020035#include "mod_netconf.h"
Tomas Cejkad340dbf2013-03-24 20:36:57 +010036
37#ifndef TEST_NOTIFICATION_SERVER
38#include <httpd.h>
39#include <http_log.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020040#include <apr_hash.h>
41#include <apr_tables.h>
42
43#else
44static int force_exit = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010045#endif
46
47#if defined(TEST_NOTIFICATION_SERVER) || defined(WITH_NOTIFICATIONS)
48static int close_testing;
49static int max_poll_elements;
50
51static struct pollfd *pollfds;
52static int *fd_lookup;
53static int count_pollfds;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010054static struct libwebsocket_context *context = NULL;
55static server_rec *http_server = NULL;
56
Tomas Cejkaba21b382013-04-13 02:37:32 +020057struct ntf_thread_config {
58 struct nc_session *session;
59 char *session_hash;
60};
61
62static apr_hash_t *netconf_locked_sessions = NULL;
63static pthread_key_t thread_key;
64
Tomas Cejkad340dbf2013-03-24 20:36:57 +010065/*
66 * This demo server shows how to use libwebsockets for one or more
67 * websocket protocols in the same server
68 *
69 * It defines the following websocket protocols:
70 *
71 * dumb-increment-protocol: once the socket is opened, an incrementing
72 * ascii string is sent down it every 50ms.
73 * If you send "reset\n" on the websocket, then
74 * the incrementing number is reset to 0.
75 *
76 * lws-mirror-protocol: copies any received packet to every connection also
77 * using this protocol, including the sender
78 */
79
80enum demo_protocols {
81 /* always first */
82 PROTOCOL_HTTP = 0,
83
84 PROTOCOL_NOTIFICATION,
85
86 /* always last */
87 DEMO_PROTOCOL_COUNT
88};
89
90
91#define LOCAL_RESOURCE_PATH "."
92char *resource_path = LOCAL_RESOURCE_PATH;
93
94/*
95 * We take a strict whitelist approach to stop ../ attacks
96 */
97
98struct serveable {
99 const char *urlpath;
100 const char *mimetype;
Tomas Cejka15c56302013-05-30 01:11:30 +0200101};
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100102
103static const struct serveable whitelist[] = {
104 { "/favicon.ico", "image/x-icon" },
105 { "/libwebsockets.org-logo.png", "image/png" },
106
107 /* last one is the default served if no match */
108 { "/test.html", "text/html" },
109};
110
111struct per_session_data__http {
112 int fd;
113};
114
115/* this protocol server (always the first one) just knows how to do HTTP */
116
117static int callback_http(struct libwebsocket_context *context,
118 struct libwebsocket *wsi,
119 enum libwebsocket_callback_reasons reason, void *user,
120 void *in, size_t len)
121{
122 char client_name[128];
123 char client_ip[128];
124 char buf[256];
125 int n, m;
126 unsigned char *p;
127 static unsigned char buffer[4096];
128 struct stat stat_buf;
129 struct per_session_data__http *pss = (struct per_session_data__http *)user;
130 int fd = (int)(long)in;
131
132 switch (reason) {
133 case LWS_CALLBACK_HTTP:
134
135 /* check for the "send a big file by hand" example case */
136
137 if (!strcmp((const char *)in, "/leaf.jpg")) {
138 char leaf_path[1024];
139 snprintf(leaf_path, sizeof(leaf_path), "%s/leaf.jpg", resource_path);
140
141 /* well, let's demonstrate how to send the hard way */
142
143 p = buffer;
144
145 pss->fd = open(leaf_path, O_RDONLY);
146
147 if (pss->fd < 0)
148 return -1;
149
150 fstat(pss->fd, &stat_buf);
151
152 /*
153 * we will send a big jpeg file, but it could be
154 * anything. Set the Content-Type: appropriately
155 * so the browser knows what to do with it.
156 */
157
158 p += sprintf((char *)p,
159 "HTTP/1.0 200 OK\x0d\x0a"
160 "Server: libwebsockets\x0d\x0a"
161 "Content-Type: image/jpeg\x0d\x0a"
162 "Content-Length: %u\x0d\x0a\x0d\x0a",
163 (unsigned int)stat_buf.st_size);
164
165 /*
166 * send the http headers...
167 * this won't block since it's the first payload sent
168 * on the connection since it was established
169 * (too small for partial)
170 */
171
172 n = libwebsocket_write(wsi, buffer,
173 p - buffer, LWS_WRITE_HTTP);
174
175 if (n < 0) {
176 close(pss->fd);
177 return -1;
178 }
179 /*
180 * book us a LWS_CALLBACK_HTTP_WRITEABLE callback
181 */
182 libwebsocket_callback_on_writable(context, wsi);
183 break;
184 }
185
186 /* if not, send a file the easy way */
187
188 for (n = 0; n < (sizeof(whitelist) / sizeof(whitelist[0]) - 1); n++)
189 if (in && strcmp((const char *)in, whitelist[n].urlpath) == 0)
190 break;
191
192 sprintf(buf, "%s%s", resource_path, whitelist[n].urlpath);
193
194 if (libwebsockets_serve_http_file(context, wsi, buf, whitelist[n].mimetype))
195 return -1; /* through completion or error, close the socket */
196
197 /*
198 * notice that the sending of the file completes asynchronously,
199 * we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when
200 * it's done
201 */
202
203 break;
204
205 case LWS_CALLBACK_HTTP_FILE_COMPLETION:
206// lwsl_info("LWS_CALLBACK_HTTP_FILE_COMPLETION seen\n");
207 /* kill the connection after we sent one file */
208 return -1;
209
210 case LWS_CALLBACK_HTTP_WRITEABLE:
211 /*
212 * we can send more of whatever it is we were sending
213 */
214
215 do {
216 n = read(pss->fd, buffer, sizeof buffer);
217 /* problem reading, close conn */
218 if (n < 0)
219 goto bail;
220 /* sent it all, close conn */
221 if (n == 0)
222 goto bail;
223 /*
224 * because it's HTTP and not websocket, don't need to take
225 * care about pre and postamble
226 */
227 m = libwebsocket_write(wsi, buffer, n, LWS_WRITE_HTTP);
228 if (m < 0)
229 /* write failed, close conn */
230 goto bail;
231 if (m != n)
232 /* partial write, adjust */
233 lseek(pss->fd, m - n, SEEK_CUR);
234
235 } while (!lws_send_pipe_choked(wsi));
236 libwebsocket_callback_on_writable(context, wsi);
237 break;
238
239bail:
240 close(pss->fd);
241 return -1;
242
243 /*
244 * callback for confirming to continue with client IP appear in
245 * protocol 0 callback since no websocket protocol has been agreed
246 * yet. You can just ignore this if you won't filter on client IP
247 * since the default uhandled callback return is 0 meaning let the
248 * connection continue.
249 */
250
251 case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
252 libwebsockets_get_peer_addresses(context, wsi, (int)(long)in, client_name,
253 sizeof(client_name), client_ip, sizeof(client_ip));
254
Tomas Cejkaba21b382013-04-13 02:37:32 +0200255 //fprintf(stderr, "Received network connect from %s (%s)\n", client_name, client_ip);
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100256 /* if we returned non-zero from here, we kill the connection */
257 break;
258
259 /*
260 * callbacks for managing the external poll() array appear in
261 * protocol 0 callback
262 */
263
264 case LWS_CALLBACK_ADD_POLL_FD:
265
266 if (count_pollfds >= max_poll_elements) {
267 lwsl_err("LWS_CALLBACK_ADD_POLL_FD: too many sockets to track\n");
268 return 1;
269 }
270
271 fd_lookup[fd] = count_pollfds;
272 pollfds[count_pollfds].fd = fd;
273 pollfds[count_pollfds].events = (int)(long)len;
274 pollfds[count_pollfds++].revents = 0;
275 break;
276
277 case LWS_CALLBACK_DEL_POLL_FD:
278 if (!--count_pollfds)
279 break;
280 m = fd_lookup[fd];
281 /* have the last guy take up the vacant slot */
282 pollfds[m] = pollfds[count_pollfds];
283 fd_lookup[pollfds[count_pollfds].fd] = m;
284 break;
285
286 case LWS_CALLBACK_SET_MODE_POLL_FD:
287 pollfds[fd_lookup[fd]].events |= (int)(long)len;
288 break;
289
290 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
291 pollfds[fd_lookup[fd]].events &= ~(int)(long)len;
292 break;
293
294 default:
295 break;
296 }
297
298 return 0;
299}
300
Tomas Cejkaba21b382013-04-13 02:37:32 +0200301/**
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100302 * this is just an example of parsing handshake headers, you don't need this
303 * in your code unless you will filter allowing connections by the header
304 * content
305 */
Tomas Cejkaba21b382013-04-13 02:37:32 +0200306//static void dump_handshake_info(struct libwebsocket *wsi)
307//{
308// int n;
309// static const char *token_names[WSI_TOKEN_COUNT] = {
310// /*[WSI_TOKEN_GET_URI] =*/ "GET URI",
311// /*[WSI_TOKEN_HOST] =*/ "Host",
312// /*[WSI_TOKEN_CONNECTION] =*/ "Connection",
313// /*[WSI_TOKEN_KEY1] =*/ "key 1",
314// /*[WSI_TOKEN_KEY2] =*/ "key 2",
315// /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol",
316// /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade",
317// /*[WSI_TOKEN_ORIGIN] =*/ "Origin",
318// /*[WSI_TOKEN_DRAFT] =*/ "Draft",
319// /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge",
320//
321// /* new for 04 */
322// /*[WSI_TOKEN_KEY] =*/ "Key",
323// /*[WSI_TOKEN_VERSION] =*/ "Version",
324// /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin",
325//
326// /* new for 05 */
327// /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions",
328//
329// /* client receives these */
330// /*[WSI_TOKEN_ACCEPT] =*/ "Accept",
331// /*[WSI_TOKEN_NONCE] =*/ "Nonce",
332// /*[WSI_TOKEN_HTTP] =*/ "Http",
333// /*[WSI_TOKEN_MUXURL] =*/ "MuxURL",
334// };
335// char buf[256];
336//
337// for (n = 0; n < WSI_TOKEN_COUNT; n++) {
338// if (!lws_hdr_total_length(wsi, n))
339// continue;
340//
341// //lws_hdr_copy(wsi, buf, sizeof buf, n);
342//
343// //fprintf(stderr, " %s = %s\n", token_names[n], buf);
344// }
345//}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100346
347/* dumb_increment protocol */
348
349/*
350 * one of these is auto-created for each connection and a pointer to the
351 * appropriate instance is passed to the callback in the user parameter
352 *
353 * for this example protocol we use it to individualize the count for each
354 * connection.
355 */
356
Tomas Cejkaba21b382013-04-13 02:37:32 +0200357struct per_session_data__notif_client {
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100358 int number;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200359 char *session_key;
360 struct nc_session *session;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100361};
362
Tomas Cejkaba21b382013-04-13 02:37:32 +0200363struct session_with_mutex *get_ncsession_from_key(const char *session_key)
364{
365 struct session_with_mutex *locked_session = NULL;
366 if (session_key == NULL) {
367 return (NULL);
368 }
369 if (pthread_rwlock_wrlock(&session_lock) != 0) {
370 #ifndef TEST_NOTIFICATION_SERVER
371 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
372 #endif
373 return (NULL);
374 }
375 locked_session = (struct session_with_mutex *)apr_hash_get(netconf_locked_sessions, session_key, APR_HASH_KEY_STRING);
376 if (pthread_rwlock_unlock (&session_lock) != 0) {
377 #ifndef TEST_NOTIFICATION_SERVER
378 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
379 #endif
380 return (NULL);
381 }
382 return locked_session;
383}
384
385/* rpc parameter is freed after the function call */
386static int send_recv_process(struct nc_session *session, const char* operation, nc_rpc* rpc)
387{
388 nc_reply *reply = NULL;
389 char *data = NULL;
390 int ret = EXIT_SUCCESS;
391
392 /* send the request and get the reply */
393 switch (nc_session_send_recv(session, rpc, &reply)) {
394 case NC_MSG_UNKNOWN:
395 if (nc_session_get_status(session) != NC_SESSION_STATUS_WORKING) {
396 #ifndef TEST_NOTIFICATION_SERVER
397 if (http_server != NULL) {
398 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: receiving rpc-reply failed.");
399 }
400 #endif
401 //cmd_disconnect(NULL);
402 ret = EXIT_FAILURE;
403 break;
404 }
405 #ifndef TEST_NOTIFICATION_SERVER
406 if (http_server != NULL) {
407 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
408 }
409 #endif
410 ret = EXIT_FAILURE;
411 break;
412 case NC_MSG_NONE:
413 /* error occurred, but processed by callback */
414 break;
415 case NC_MSG_REPLY:
416 switch (nc_reply_get_type(reply)) {
417 case NC_REPLY_OK:
418 break;
419 case NC_REPLY_DATA:
420 #ifndef TEST_NOTIFICATION_SERVER
421 if (http_server != NULL) {
422 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: recv: %s.", data = nc_reply_get_data (reply));
423 free(data);
424 }
425 #endif
426 break;
427 case NC_REPLY_ERROR:
428 /* wtf, you shouldn't be here !?!? */
429 #ifndef TEST_NOTIFICATION_SERVER
430 if (http_server != NULL) {
431 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: operation failed, but rpc-error was not processed.");
432 }
433 #endif
434 ret = EXIT_FAILURE;
435 break;
436 default:
437 #ifndef TEST_NOTIFICATION_SERVER
438 if (http_server != NULL) {
439 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: unexpected operation result.");
440 }
441 #endif
442 ret = EXIT_FAILURE;
443 break;
444 }
445 break;
446 default:
447 #ifndef TEST_NOTIFICATION_SERVER
448 if (http_server != NULL) {
449 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
450 }
451 #endif
452 ret = EXIT_FAILURE;
453 break;
454 }
455 nc_rpc_free(rpc);
456 nc_reply_free(reply);
457
458 return (ret);
459}
460
461/**
462 * \brief Callback to store incoming notification
463 * \param [in] eventtime - time when notification occured
464 * \param [in] content - content of notification
465 */
466static void notification_fileprint (time_t eventtime, const char* content)
467{
468 char t[128];
469 struct session_with_mutex *target_session = NULL;
470 notification_t *ntf = NULL;
471 char *session_hash = NULL;
472
Tomas Cejkaba21b382013-04-13 02:37:32 +0200473 if (http_server != NULL) {
Tomas Cejka15c56302013-05-30 01:11:30 +0200474 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "Accepted notif: %lu %s\n", (unsigned long int) eventtime, content);
Tomas Cejkaba21b382013-04-13 02:37:32 +0200475 }
Tomas Cejka15c56302013-05-30 01:11:30 +0200476
Tomas Cejkaba21b382013-04-13 02:37:32 +0200477 session_hash = pthread_getspecific(thread_key);
478 if (http_server != NULL) {
479 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: fileprint getspecific (%s)", session_hash);
480 }
481 target_session = get_ncsession_from_key(session_hash);
482 if (target_session == NULL) {
483 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "no session found last_session_key (%s)", session_hash);
484 return;
485 }
Tomas Cejka15c56302013-05-30 01:11:30 +0200486
487 t[0] = 0;
488 strftime(t, sizeof(t), "%c", localtime(&eventtime));
489
Tomas Cejkaba21b382013-04-13 02:37:32 +0200490 if (pthread_mutex_lock(&target_session->lock) != 0) {
491 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
492 return;
493 }
494 if (target_session->notifications == NULL) {
495 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "target_session->notifications is NULL");
496 if (pthread_mutex_unlock(&target_session->lock) != 0) {
497 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
498 return;
499 }
500 return;
501 }
502 if (http_server != NULL) {
503 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: ready to push to notifications queue");
504 }
505 ntf = (notification_t *) apr_array_push(target_session->notifications);
506 if (ntf == NULL) {
507 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Failed to allocate element ");
508 if (pthread_mutex_unlock(&target_session->lock) != 0) {
509 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
510 return;
511 }
512 return;
513 }
Tomas Cejka73286932013-05-27 22:54:35 +0200514 ntf->eventtime = eventtime;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200515 ntf->content = strdup(content);
516
517 if (http_server != NULL) {
518 ap_log_error (APLOG_MARK, APLOG_NOTICE, 0, http_server, "added notif to queue %u (%s)", (unsigned int) ntf->eventtime, "notifikace");
519 }
520
521 if (pthread_mutex_unlock(&target_session->lock) != 0) {
522 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
523 return;
524 }
525}
526
527/**
528 * \brief Thread for libnetconf notifications dispatch
529 * \param [in] arg - struct ntf_thread_config * with nc_session
530 */
531void* notification_thread(void* arg)
532{
533 struct ntf_thread_config *config = (struct ntf_thread_config*)arg;
534 #ifndef TEST_NOTIFICATION_SERVER
535 if (http_server != NULL) {
536 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: in thread for libnetconf notifications");
537 }
538 #endif
Tomas Cejka15c56302013-05-30 01:11:30 +0200539
540 /* store hash identification of netconf session for notifications printing callback */
541 if (pthread_setspecific(thread_key, config->session_hash) != 0) {
542 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: cannot set thread-specific hash value.");
Tomas Cejkaba21b382013-04-13 02:37:32 +0200543 }
Tomas Cejka15c56302013-05-30 01:11:30 +0200544
Tomas Cejkaba21b382013-04-13 02:37:32 +0200545 if (http_server != NULL) {
546 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: dispatching");
547 }
548 ncntf_dispatch_receive(config->session, notification_fileprint);
549 #ifndef TEST_NOTIFICATION_SERVER
550 if (http_server != NULL) {
551 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: ended thread for libnetconf notifications");
552 }
553 #endif
554 free(config);
555 return (NULL);
556}
557
558
559int notif_subscribe(struct session_with_mutex *locked_session, const char *session_hash, time_t start_time, time_t stop_time)
560{
561 time_t start = -1;
562 time_t stop = -1;
563 struct nc_filter *filter = NULL;
564 char *stream = NULL;
565 nc_rpc *rpc = NULL;
566 pthread_t thread;
567 struct ntf_thread_config *tconfig;
568 struct nc_session *session;
569
570 if (locked_session == NULL) {
571 if (http_server != NULL) {
572 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: no locked_session was given.");
573 }
574 return (EXIT_FAILURE);
575 }
576
577 session = locked_session->session;
578
579 start = time(NULL) + start_time;
580 stop = time(NULL) + stop_time;
581 if (http_server != NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: history: %u %u", (unsigned int) start, (unsigned int) stop);
582 }
583
584 if (session == NULL) {
585 #ifndef TEST_NOTIFICATION_SERVER
586 if (http_server != NULL) {
587 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: NETCONF session not established.");
588 }
589 #endif
590 return (EXIT_FAILURE);
591 }
592
593 /* check if notifications are allowed on this session */
594 if (nc_session_notif_allowed(session) == 0) {
595 #ifndef TEST_NOTIFICATION_SERVER
596 if (http_server != NULL) {
597 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Notification subscription is not allowed on this session.");
598 }
599 #endif
600 return (EXIT_FAILURE);
601 }
602 /* check times */
603 if (start != -1 && stop != -1 && start > stop) {
604 #ifndef TEST_NOTIFICATION_SERVER
605 if (http_server != NULL) {
606 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Subscription start time must be lower than the end time.");
607 }
608 #endif
609 return (EXIT_FAILURE);
610 }
611
612 /* create requests */
613 rpc = nc_rpc_subscribe(stream, filter, (start_time == 0)?NULL:&start, (stop_time == 0)?NULL:&stop);
614 nc_filter_free(filter);
615 if (rpc == NULL) {
616 #ifndef TEST_NOTIFICATION_SERVER
617 if (http_server != NULL) {
618 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating an rpc request failed.");
619 }
620 #endif
621 return (EXIT_FAILURE);
622 }
623
624 if (send_recv_process(session, "subscribe", rpc) != 0) {
625 return (EXIT_FAILURE);
626 }
627 rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
Tomas Cejka654f84e2013-04-19 11:55:01 +0200628 locked_session->ntfc_subscribed = 1;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200629
630 tconfig = malloc(sizeof(struct ntf_thread_config));
631 tconfig->session = session;
632 tconfig->session_hash = strdup(session_hash);
633 if (http_server != NULL) {
634 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating libnetconf notification thread (%s).",
635 tconfig->session_hash);
636 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200637
Tomas Cejkaba21b382013-04-13 02:37:32 +0200638 if (pthread_create(&thread, NULL, notification_thread, tconfig) != 0) {
639 #ifndef TEST_NOTIFICATION_SERVER
640 if (http_server != NULL) {
641 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating a thread for receiving notifications failed");
642 }
643 #endif
644 return (EXIT_FAILURE);
645 }
646 pthread_detach(thread);
647 return (EXIT_SUCCESS);
648}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100649
650static int callback_notification(struct libwebsocket_context *context,
651 struct libwebsocket *wsi,
652 enum libwebsocket_callback_reasons reason,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200653 void *user, void *in, size_t len)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100654{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200655 int n = 0;
656 int m = 0;
657 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100658 unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
Tomas Cejkaba21b382013-04-13 02:37:32 +0200659 struct per_session_data__notif_client *pss = (struct per_session_data__notif_client *)user;
660
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100661 switch (reason) {
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100662 case LWS_CALLBACK_ESTABLISHED:
Tomas Cejka15c56302013-05-30 01:11:30 +0200663 if (http_server != NULL) {
664 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification client connected.");
665 }
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100666 pss->number = 0;
667 break;
668
669 case LWS_CALLBACK_SERVER_WRITEABLE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200670 if (pss->session_key == NULL) {
671 return 0;
672 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200673
674 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
675 if (ls == NULL) {
676 if (http_server != NULL) {
677 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: session not found");
678 }
679 return -1;
680 }
681 if (pthread_mutex_lock(&ls->lock) != 0) {
682 #ifndef TEST_NOTIFICATION_SERVER
683 if (http_server != NULL) {
684 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot lock session");
685 }
686 #endif
687 }
688 notification_t *notif = NULL;
689 if (ls->notifications == NULL) {
690 #ifndef TEST_NOTIFICATION_SERVER
691 if (http_server != NULL) {
692 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: no notifications array");
693 }
694 #endif
695 pthread_mutex_unlock(&ls->lock);
696 return -1;
697 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200698 if (!apr_is_empty_array(ls->notifications)) {
Tomas Cejkaba21b382013-04-13 02:37:32 +0200699
Tomas Cejka654f84e2013-04-19 11:55:01 +0200700 if (http_server != NULL) {
701 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications for session");
702 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200703
Tomas Cejka654f84e2013-04-19 11:55:01 +0200704 while ((notif = (notification_t *) apr_array_pop(ls->notifications)) != NULL) {
705 char t[128];
Tomas Cejka73286932013-05-27 22:54:35 +0200706 memset(&t, 0, 128);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200707 strftime(t, sizeof(t), "%c", localtime(&notif->eventtime));
708 n = 0;
Tomas Cejka73286932013-05-27 22:54:35 +0200709 json_object *notif_json = json_object_new_object();
710 json_object_object_add(notif_json, "eventtime", json_object_new_string(t));
711 json_object_object_add(notif_json, "content", json_object_new_string(notif->content));
712
Tomas Cejka00635972013-06-03 15:10:52 +0200713 const char *msgtext = json_object_to_json_string(notif_json);
Tomas Cejka15c56302013-05-30 01:11:30 +0200714
715 n = sprintf((char *)p, "%s", msgtext);
716 m = libwebsocket_write(wsi, p, n, LWS_WRITE_TEXT);
717
Tomas Cejka8a82dab2013-05-30 23:37:23 +0200718 json_object_put(notif_json);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200719 }
720 if (http_server != NULL) {
721 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications done");
722 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200723 }
724
725 if (pthread_mutex_unlock(&ls->lock) != 0) {
726 #ifndef TEST_NOTIFICATION_SERVER
727 if (http_server != NULL) {
728 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot unlock session");
729 }
730 #endif
731 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200732 //if (http_server != NULL) {
733 // ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: unlocked session lock");
734 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200735
736
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100737 if (m < n) {
Tomas Cejka15c56302013-05-30 01:11:30 +0200738 if (http_server != NULL) {
739 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "ERROR %d writing to di socket\n", n);
740 }
741
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100742 return -1;
743 }
744 if (close_testing && pss->number == 50) {
Tomas Cejka15c56302013-05-30 01:11:30 +0200745 if (http_server != NULL) {
746 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "close tesing limit, closing\n");
747 }
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100748 return -1;
749 }
750 break;
751
752 case LWS_CALLBACK_RECEIVE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200753 #ifndef TEST_NOTIFICATION_SERVER
754 if (http_server != NULL) {
755 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "received: (%s)", (char *)in);
756 }
757 #endif
758 if (pss->session_key == NULL) {
759 char session_key_buf[41];
760 int start = -1;
761 time_t stop = time(NULL) + 30;
762
763 strncpy((char *) session_key_buf, (const char *) in, 40);
764 session_key_buf[40] = '\0';
765 pss->session_key = strdup(session_key_buf);
766 sscanf(in+40, "%d %d", (int *) &start, (int *) &stop);
767 if (http_server != NULL) {
768 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: get key (%s) from (%s) (%i,%i)", pss->session_key, (char *) in, (int) start, (int) stop);
769 }
Tomas Cejka15c56302013-05-30 01:11:30 +0200770
Tomas Cejkaba21b382013-04-13 02:37:32 +0200771 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
772 if (ls == NULL) {
773 if (http_server != NULL) {
774 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: session_key not found (%s)", pss->session_key);
775 }
776 return 0;
777 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200778 if (ls->ntfc_subscribed != 0) {
779 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: already subscribed");
780 return 0;
781 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200782 if (http_server != NULL) {
783 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: prepare to subscribe stream");
784 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200785
786 //if (pthread_rwlock_rdlock (&session_lock) != 0) {
787 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
788 // return EXIT_FAILURE;
789 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200790 notif_subscribe(ls, pss->session_key, (time_t) start, (time_t) stop);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200791 //if (pthread_rwlock_unlock (&session_lock) != 0) {
792 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
793 // return EXIT_FAILURE;
794 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200795 }
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100796 if (len < 6)
797 break;
798 if (strcmp((const char *)in, "reset\n") == 0)
799 pss->number = 0;
800 break;
801 /*
802 * this just demonstrates how to use the protocol filter. If you won't
803 * study and reject connections based on header content, you don't need
804 * to handle this callback
805 */
806
807 case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
808 //dump_handshake_info(wsi);
809 /* you could return non-zero here and kill the connection */
810 break;
811
812 default:
813 break;
814 }
815
816 return 0;
817}
818
819/* list of supported protocols and callbacks */
820
821static struct libwebsocket_protocols protocols[] = {
822 /* first protocol must always be HTTP handler */
823
824 {
825 "http-only", /* name */
826 callback_http, /* callback */
827 sizeof (struct per_session_data__http), /* per_session_data_size */
828 0, /* max frame size / rx buffer */
829 },
830 {
831 "notification-protocol",
832 callback_notification,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200833 sizeof(struct per_session_data__notif_client),
834 80,
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100835 },
836 { NULL, NULL, 0, 0 } /* terminator */
837};
838
839
Tomas Cejkaba21b382013-04-13 02:37:32 +0200840/**
841 * initialization of notification module
842 */
843int notification_init(apr_pool_t * pool, server_rec * server, apr_hash_t *conns)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100844{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200845 //char cert_path[1024];
846 //char key_path[1024];
847 //int use_ssl = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100848 struct lws_context_creation_info info;
849 int opts = 0;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200850 //char interface_name[128] = "";
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100851 const char *iface = NULL;
852 int debug_level = 7;
853
Tomas Cejkaba21b382013-04-13 02:37:32 +0200854 netconf_locked_sessions = conns;
855
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100856 memset(&info, 0, sizeof info);
857 info.port = NOTIFICATION_SERVER_PORT;
858
859 /* tell the library what debug level to emit and to send it to syslog */
860 lws_set_log_level(debug_level, lwsl_emit_syslog);
861
862 #ifndef TEST_NOTIFICATION_SERVER
863 if (server != NULL) {
864 http_server = server;
865 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "Initialization of libwebsocket");
866 }
867 #endif
Tomas Cejka15c56302013-05-30 01:11:30 +0200868 //lwsl_notice("libwebsockets test server - "
869 // "(C) Copyright 2010-2013 Andy Green <andy@warmcat.com> - "
870 // "licensed under LGPL2.1\n");
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100871 max_poll_elements = getdtablesize();
872 pollfds = malloc(max_poll_elements * sizeof (struct pollfd));
873 fd_lookup = malloc(max_poll_elements * sizeof (int));
874 if (pollfds == NULL || fd_lookup == NULL) {
Tomas Cejka15c56302013-05-30 01:11:30 +0200875 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Out of memory pollfds=%d\n", max_poll_elements);
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100876 return -1;
877 }
878
879 info.iface = iface;
880 info.protocols = protocols;
881
882 //snprintf(cert_path, sizeof(cert_path), "%s/libwebsockets-test-server.pem", resource_path);
883 //snprintf(key_path, sizeof(cert_path), "%s/libwebsockets-test-server.key.pem", resource_path);
884
885 //info.ssl_cert_filepath = cert_path;
886 //info.ssl_private_key_filepath = key_path;
887
888 info.gid = -1;
889 info.uid = -1;
890 info.options = opts;
891
892 /* create server */
893 context = libwebsocket_create_context(&info);
894 if (context == NULL) {
Tomas Cejka15c56302013-05-30 01:11:30 +0200895 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "libwebsocket init failed.");
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100896 return -1;
897 }
Tomas Cejka15c56302013-05-30 01:11:30 +0200898
899 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: init of pthread_key_create.");
900 if (pthread_key_create(&thread_key, NULL) != 0) {
901 #ifndef TEST_NOTIFICATION_SERVER
902 if (http_server != NULL) {
903 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: pthread_key_create failed");
904 }
905 #endif
906 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200907 return 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100908}
909
910void notification_close()
911{
912 libwebsocket_context_destroy(context);
913
Tomas Cejka15c56302013-05-30 01:11:30 +0200914 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "libwebsockets-test-server exited cleanly\n");
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100915}
916
Tomas Cejkaba21b382013-04-13 02:37:32 +0200917
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100918/**
919 * \brief send notification if any
920 * \return < 0 on error
921 */
922int notification_handle()
923{
924 static struct timeval tv;
925 static unsigned int olds = 0;
926 int n = 0;
927
928 gettimeofday(&tv, NULL);
929
930 /*
931 * This provokes the LWS_CALLBACK_SERVER_WRITEABLE for every
932 * live websocket connection using the DUMB_INCREMENT protocol,
933 * as soon as it can take more packets (usually immediately)
934 */
935
936 if (((unsigned int)tv.tv_sec - olds) > 0) {
937 libwebsocket_callback_on_writable_all_protocol(&protocols[PROTOCOL_NOTIFICATION]);
938 olds = tv.tv_sec;
939 }
940
941
942 /*
943 * this represents an existing server's single poll action
944 * which also includes libwebsocket sockets
945 */
946
947 n = poll(pollfds, count_pollfds, 50);
948 if (n < 0)
949 return n;
950
951
952 if (n) {
953 for (n = 0; n < count_pollfds; n++) {
954 if (pollfds[n].revents) {
955 /*
956 * returns immediately if the fd does not
957 * match anything under libwebsockets
958 * control
959 */
960 if (libwebsocket_service_fd(context, &pollfds[n]) < 0) {
961 return 1;
962 }
963 }
964 }
965 }
966 return 0;
967}
968
969#endif
970
971
972#ifndef WITH_NOTIFICATIONS
973#ifdef TEST_NOTIFICATION_SERVER
974int main(int argc, char **argv)
975{
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100976 if (notification_init(NULL, NULL) == -1) {
977 fprintf(stderr, "Error during initialization\n");
978 return 1;
979 }
980 while (!force_exit) {
981 notification_handle();
982 }
983 notification_close();
984}
985#endif
986#endif