blob: b7d334e59b4f72a352d8e15afca24276b322d17c [file] [log] [blame]
Tomas Cejkad340dbf2013-03-24 20:36:57 +01001/*
2 * libwebsockets-test-server - libwebsockets test implementation
3 *
4 * Copyright (C) 2010-2011 Andy Green <andy@warmcat.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation:
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21#include <stdio.h>
22#include <stdlib.h>
23#include <unistd.h>
24#include <getopt.h>
25#include <string.h>
26#include <sys/time.h>
27#include <sys/stat.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020028#include <sys/queue.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010029#include <fcntl.h>
30#include <assert.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020031#include <pthread.h>
32#include <libnetconf.h>
33#include <libwebsockets.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010034#include "notification_module.h"
Tomas Cejkaba21b382013-04-13 02:37:32 +020035#include "mod_netconf.h"
Tomas Cejkad340dbf2013-03-24 20:36:57 +010036
37#ifndef TEST_NOTIFICATION_SERVER
38#include <httpd.h>
39#include <http_log.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020040#include <apr_hash.h>
41#include <apr_tables.h>
42
43#else
44static int force_exit = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010045#endif
46
47#if defined(TEST_NOTIFICATION_SERVER) || defined(WITH_NOTIFICATIONS)
48static int close_testing;
49static int max_poll_elements;
50
51static struct pollfd *pollfds;
52static int *fd_lookup;
53static int count_pollfds;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010054static struct libwebsocket_context *context = NULL;
55static server_rec *http_server = NULL;
56
Tomas Cejkaba21b382013-04-13 02:37:32 +020057struct ntf_thread_config {
58 struct nc_session *session;
59 char *session_hash;
60};
61
62static apr_hash_t *netconf_locked_sessions = NULL;
63static pthread_key_t thread_key;
64
Tomas Cejkad340dbf2013-03-24 20:36:57 +010065/*
66 * This demo server shows how to use libwebsockets for one or more
67 * websocket protocols in the same server
68 *
69 * It defines the following websocket protocols:
70 *
71 * dumb-increment-protocol: once the socket is opened, an incrementing
72 * ascii string is sent down it every 50ms.
73 * If you send "reset\n" on the websocket, then
74 * the incrementing number is reset to 0.
75 *
76 * lws-mirror-protocol: copies any received packet to every connection also
77 * using this protocol, including the sender
78 */
79
80enum demo_protocols {
81 /* always first */
82 PROTOCOL_HTTP = 0,
83
84 PROTOCOL_NOTIFICATION,
85
86 /* always last */
87 DEMO_PROTOCOL_COUNT
88};
89
90
91#define LOCAL_RESOURCE_PATH "."
92char *resource_path = LOCAL_RESOURCE_PATH;
93
94/*
95 * We take a strict whitelist approach to stop ../ attacks
96 */
97
98struct serveable {
99 const char *urlpath;
100 const char *mimetype;
101};
102
103static const struct serveable whitelist[] = {
104 { "/favicon.ico", "image/x-icon" },
105 { "/libwebsockets.org-logo.png", "image/png" },
106
107 /* last one is the default served if no match */
108 { "/test.html", "text/html" },
109};
110
111struct per_session_data__http {
112 int fd;
113};
114
115/* this protocol server (always the first one) just knows how to do HTTP */
116
117static int callback_http(struct libwebsocket_context *context,
118 struct libwebsocket *wsi,
119 enum libwebsocket_callback_reasons reason, void *user,
120 void *in, size_t len)
121{
122 char client_name[128];
123 char client_ip[128];
124 char buf[256];
125 int n, m;
126 unsigned char *p;
127 static unsigned char buffer[4096];
128 struct stat stat_buf;
129 struct per_session_data__http *pss = (struct per_session_data__http *)user;
130 int fd = (int)(long)in;
131
132 switch (reason) {
133 case LWS_CALLBACK_HTTP:
134
135 /* check for the "send a big file by hand" example case */
136
137 if (!strcmp((const char *)in, "/leaf.jpg")) {
138 char leaf_path[1024];
139 snprintf(leaf_path, sizeof(leaf_path), "%s/leaf.jpg", resource_path);
140
141 /* well, let's demonstrate how to send the hard way */
142
143 p = buffer;
144
145 pss->fd = open(leaf_path, O_RDONLY);
146
147 if (pss->fd < 0)
148 return -1;
149
150 fstat(pss->fd, &stat_buf);
151
152 /*
153 * we will send a big jpeg file, but it could be
154 * anything. Set the Content-Type: appropriately
155 * so the browser knows what to do with it.
156 */
157
158 p += sprintf((char *)p,
159 "HTTP/1.0 200 OK\x0d\x0a"
160 "Server: libwebsockets\x0d\x0a"
161 "Content-Type: image/jpeg\x0d\x0a"
162 "Content-Length: %u\x0d\x0a\x0d\x0a",
163 (unsigned int)stat_buf.st_size);
164
165 /*
166 * send the http headers...
167 * this won't block since it's the first payload sent
168 * on the connection since it was established
169 * (too small for partial)
170 */
171
172 n = libwebsocket_write(wsi, buffer,
173 p - buffer, LWS_WRITE_HTTP);
174
175 if (n < 0) {
176 close(pss->fd);
177 return -1;
178 }
179 /*
180 * book us a LWS_CALLBACK_HTTP_WRITEABLE callback
181 */
182 libwebsocket_callback_on_writable(context, wsi);
183 break;
184 }
185
186 /* if not, send a file the easy way */
187
188 for (n = 0; n < (sizeof(whitelist) / sizeof(whitelist[0]) - 1); n++)
189 if (in && strcmp((const char *)in, whitelist[n].urlpath) == 0)
190 break;
191
192 sprintf(buf, "%s%s", resource_path, whitelist[n].urlpath);
193
194 if (libwebsockets_serve_http_file(context, wsi, buf, whitelist[n].mimetype))
195 return -1; /* through completion or error, close the socket */
196
197 /*
198 * notice that the sending of the file completes asynchronously,
199 * we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when
200 * it's done
201 */
202
203 break;
204
205 case LWS_CALLBACK_HTTP_FILE_COMPLETION:
206// lwsl_info("LWS_CALLBACK_HTTP_FILE_COMPLETION seen\n");
207 /* kill the connection after we sent one file */
208 return -1;
209
210 case LWS_CALLBACK_HTTP_WRITEABLE:
211 /*
212 * we can send more of whatever it is we were sending
213 */
214
215 do {
216 n = read(pss->fd, buffer, sizeof buffer);
217 /* problem reading, close conn */
218 if (n < 0)
219 goto bail;
220 /* sent it all, close conn */
221 if (n == 0)
222 goto bail;
223 /*
224 * because it's HTTP and not websocket, don't need to take
225 * care about pre and postamble
226 */
227 m = libwebsocket_write(wsi, buffer, n, LWS_WRITE_HTTP);
228 if (m < 0)
229 /* write failed, close conn */
230 goto bail;
231 if (m != n)
232 /* partial write, adjust */
233 lseek(pss->fd, m - n, SEEK_CUR);
234
235 } while (!lws_send_pipe_choked(wsi));
236 libwebsocket_callback_on_writable(context, wsi);
237 break;
238
239bail:
240 close(pss->fd);
241 return -1;
242
243 /*
244 * callback for confirming to continue with client IP appear in
245 * protocol 0 callback since no websocket protocol has been agreed
246 * yet. You can just ignore this if you won't filter on client IP
247 * since the default uhandled callback return is 0 meaning let the
248 * connection continue.
249 */
250
251 case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
252 libwebsockets_get_peer_addresses(context, wsi, (int)(long)in, client_name,
253 sizeof(client_name), client_ip, sizeof(client_ip));
254
Tomas Cejkaba21b382013-04-13 02:37:32 +0200255 //fprintf(stderr, "Received network connect from %s (%s)\n", client_name, client_ip);
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100256 /* if we returned non-zero from here, we kill the connection */
257 break;
258
259 /*
260 * callbacks for managing the external poll() array appear in
261 * protocol 0 callback
262 */
263
264 case LWS_CALLBACK_ADD_POLL_FD:
265
266 if (count_pollfds >= max_poll_elements) {
267 lwsl_err("LWS_CALLBACK_ADD_POLL_FD: too many sockets to track\n");
268 return 1;
269 }
270
271 fd_lookup[fd] = count_pollfds;
272 pollfds[count_pollfds].fd = fd;
273 pollfds[count_pollfds].events = (int)(long)len;
274 pollfds[count_pollfds++].revents = 0;
275 break;
276
277 case LWS_CALLBACK_DEL_POLL_FD:
278 if (!--count_pollfds)
279 break;
280 m = fd_lookup[fd];
281 /* have the last guy take up the vacant slot */
282 pollfds[m] = pollfds[count_pollfds];
283 fd_lookup[pollfds[count_pollfds].fd] = m;
284 break;
285
286 case LWS_CALLBACK_SET_MODE_POLL_FD:
287 pollfds[fd_lookup[fd]].events |= (int)(long)len;
288 break;
289
290 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
291 pollfds[fd_lookup[fd]].events &= ~(int)(long)len;
292 break;
293
294 default:
295 break;
296 }
297
298 return 0;
299}
300
Tomas Cejkaba21b382013-04-13 02:37:32 +0200301/**
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100302 * this is just an example of parsing handshake headers, you don't need this
303 * in your code unless you will filter allowing connections by the header
304 * content
305 */
Tomas Cejkaba21b382013-04-13 02:37:32 +0200306//static void dump_handshake_info(struct libwebsocket *wsi)
307//{
308// int n;
309// static const char *token_names[WSI_TOKEN_COUNT] = {
310// /*[WSI_TOKEN_GET_URI] =*/ "GET URI",
311// /*[WSI_TOKEN_HOST] =*/ "Host",
312// /*[WSI_TOKEN_CONNECTION] =*/ "Connection",
313// /*[WSI_TOKEN_KEY1] =*/ "key 1",
314// /*[WSI_TOKEN_KEY2] =*/ "key 2",
315// /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol",
316// /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade",
317// /*[WSI_TOKEN_ORIGIN] =*/ "Origin",
318// /*[WSI_TOKEN_DRAFT] =*/ "Draft",
319// /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge",
320//
321// /* new for 04 */
322// /*[WSI_TOKEN_KEY] =*/ "Key",
323// /*[WSI_TOKEN_VERSION] =*/ "Version",
324// /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin",
325//
326// /* new for 05 */
327// /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions",
328//
329// /* client receives these */
330// /*[WSI_TOKEN_ACCEPT] =*/ "Accept",
331// /*[WSI_TOKEN_NONCE] =*/ "Nonce",
332// /*[WSI_TOKEN_HTTP] =*/ "Http",
333// /*[WSI_TOKEN_MUXURL] =*/ "MuxURL",
334// };
335// char buf[256];
336//
337// for (n = 0; n < WSI_TOKEN_COUNT; n++) {
338// if (!lws_hdr_total_length(wsi, n))
339// continue;
340//
341// //lws_hdr_copy(wsi, buf, sizeof buf, n);
342//
343// //fprintf(stderr, " %s = %s\n", token_names[n], buf);
344// }
345//}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100346
347/* dumb_increment protocol */
348
349/*
350 * one of these is auto-created for each connection and a pointer to the
351 * appropriate instance is passed to the callback in the user parameter
352 *
353 * for this example protocol we use it to individualize the count for each
354 * connection.
355 */
356
Tomas Cejkaba21b382013-04-13 02:37:32 +0200357struct per_session_data__notif_client {
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100358 int number;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200359 char *session_key;
360 struct nc_session *session;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100361};
362
Tomas Cejkaba21b382013-04-13 02:37:32 +0200363struct session_with_mutex *get_ncsession_from_key(const char *session_key)
364{
365 struct session_with_mutex *locked_session = NULL;
366 if (session_key == NULL) {
367 return (NULL);
368 }
369 if (pthread_rwlock_wrlock(&session_lock) != 0) {
370 #ifndef TEST_NOTIFICATION_SERVER
371 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
372 #endif
373 return (NULL);
374 }
375 locked_session = (struct session_with_mutex *)apr_hash_get(netconf_locked_sessions, session_key, APR_HASH_KEY_STRING);
376 if (pthread_rwlock_unlock (&session_lock) != 0) {
377 #ifndef TEST_NOTIFICATION_SERVER
378 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
379 #endif
380 return (NULL);
381 }
382 return locked_session;
383}
384
385/* rpc parameter is freed after the function call */
386static int send_recv_process(struct nc_session *session, const char* operation, nc_rpc* rpc)
387{
388 nc_reply *reply = NULL;
389 char *data = NULL;
390 int ret = EXIT_SUCCESS;
391
392 /* send the request and get the reply */
393 switch (nc_session_send_recv(session, rpc, &reply)) {
394 case NC_MSG_UNKNOWN:
395 if (nc_session_get_status(session) != NC_SESSION_STATUS_WORKING) {
396 #ifndef TEST_NOTIFICATION_SERVER
397 if (http_server != NULL) {
398 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: receiving rpc-reply failed.");
399 }
400 #endif
401 //cmd_disconnect(NULL);
402 ret = EXIT_FAILURE;
403 break;
404 }
405 #ifndef TEST_NOTIFICATION_SERVER
406 if (http_server != NULL) {
407 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
408 }
409 #endif
410 ret = EXIT_FAILURE;
411 break;
412 case NC_MSG_NONE:
413 /* error occurred, but processed by callback */
414 break;
415 case NC_MSG_REPLY:
416 switch (nc_reply_get_type(reply)) {
417 case NC_REPLY_OK:
418 break;
419 case NC_REPLY_DATA:
420 #ifndef TEST_NOTIFICATION_SERVER
421 if (http_server != NULL) {
422 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: recv: %s.", data = nc_reply_get_data (reply));
423 free(data);
424 }
425 #endif
426 break;
427 case NC_REPLY_ERROR:
428 /* wtf, you shouldn't be here !?!? */
429 #ifndef TEST_NOTIFICATION_SERVER
430 if (http_server != NULL) {
431 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: operation failed, but rpc-error was not processed.");
432 }
433 #endif
434 ret = EXIT_FAILURE;
435 break;
436 default:
437 #ifndef TEST_NOTIFICATION_SERVER
438 if (http_server != NULL) {
439 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: unexpected operation result.");
440 }
441 #endif
442 ret = EXIT_FAILURE;
443 break;
444 }
445 break;
446 default:
447 #ifndef TEST_NOTIFICATION_SERVER
448 if (http_server != NULL) {
449 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
450 }
451 #endif
452 ret = EXIT_FAILURE;
453 break;
454 }
455 nc_rpc_free(rpc);
456 nc_reply_free(reply);
457
458 return (ret);
459}
460
461/**
462 * \brief Callback to store incoming notification
463 * \param [in] eventtime - time when notification occured
464 * \param [in] content - content of notification
465 */
466static void notification_fileprint (time_t eventtime, const char* content)
467{
468 char t[128];
469 struct session_with_mutex *target_session = NULL;
470 notification_t *ntf = NULL;
471 char *session_hash = NULL;
472
473 t[0] = 0;
474 strftime(t, sizeof(t), "%c", localtime(&eventtime));
475
476 if (http_server != NULL) {
477 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: eventTime: %s\n%s\n", t, content);
478 }
479
480 /* \todo replace last_session_key with client identification */
481 session_hash = pthread_getspecific(thread_key);
482 if (http_server != NULL) {
483 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: fileprint getspecific (%s)", session_hash);
484 }
485 target_session = get_ncsession_from_key(session_hash);
486 if (target_session == NULL) {
487 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "no session found last_session_key (%s)", session_hash);
488 return;
489 }
490 if (pthread_mutex_lock(&target_session->lock) != 0) {
491 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
492 return;
493 }
494 if (target_session->notifications == NULL) {
495 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "target_session->notifications is NULL");
496 if (pthread_mutex_unlock(&target_session->lock) != 0) {
497 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
498 return;
499 }
500 return;
501 }
502 if (http_server != NULL) {
503 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: ready to push to notifications queue");
504 }
Tomas Cejkaf38a54c2013-05-27 21:57:35 +0200505 /** \todo push to all clients */
Tomas Cejkaba21b382013-04-13 02:37:32 +0200506 ntf = (notification_t *) apr_array_push(target_session->notifications);
507 if (ntf == NULL) {
508 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Failed to allocate element ");
509 if (pthread_mutex_unlock(&target_session->lock) != 0) {
510 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
511 return;
512 }
513 return;
514 }
Tomas Cejka73286932013-05-27 22:54:35 +0200515 ntf->eventtime = eventtime;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200516 ntf->content = strdup(content);
517
518 if (http_server != NULL) {
519 ap_log_error (APLOG_MARK, APLOG_NOTICE, 0, http_server, "added notif to queue %u (%s)", (unsigned int) ntf->eventtime, "notifikace");
520 }
521
522 if (pthread_mutex_unlock(&target_session->lock) != 0) {
523 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
524 return;
525 }
526}
527
528/**
529 * \brief Thread for libnetconf notifications dispatch
530 * \param [in] arg - struct ntf_thread_config * with nc_session
531 */
532void* notification_thread(void* arg)
533{
534 struct ntf_thread_config *config = (struct ntf_thread_config*)arg;
535 #ifndef TEST_NOTIFICATION_SERVER
536 if (http_server != NULL) {
537 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: in thread for libnetconf notifications");
538 }
539 #endif
540 if (http_server != NULL) {
541 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: pthread_key_create");
542 }
543 if (pthread_key_create(&thread_key, NULL) != 0) {
544 #ifndef TEST_NOTIFICATION_SERVER
545 if (http_server != NULL) {
546 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: pthread_key_create failed");
547 }
548 #endif
549 }
550 pthread_setspecific(thread_key, config->session_hash);
551 if (http_server != NULL) {
552 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: dispatching");
553 }
554 ncntf_dispatch_receive(config->session, notification_fileprint);
555 #ifndef TEST_NOTIFICATION_SERVER
556 if (http_server != NULL) {
557 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: ended thread for libnetconf notifications");
558 }
559 #endif
560 free(config);
561 return (NULL);
562}
563
564
565int notif_subscribe(struct session_with_mutex *locked_session, const char *session_hash, time_t start_time, time_t stop_time)
566{
567 time_t start = -1;
568 time_t stop = -1;
569 struct nc_filter *filter = NULL;
570 char *stream = NULL;
571 nc_rpc *rpc = NULL;
572 pthread_t thread;
573 struct ntf_thread_config *tconfig;
574 struct nc_session *session;
575
576 if (locked_session == NULL) {
577 if (http_server != NULL) {
578 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: no locked_session was given.");
579 }
580 return (EXIT_FAILURE);
581 }
582
583 session = locked_session->session;
584
585 start = time(NULL) + start_time;
586 stop = time(NULL) + stop_time;
587 if (http_server != NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: history: %u %u", (unsigned int) start, (unsigned int) stop);
588 }
589
590 if (session == NULL) {
591 #ifndef TEST_NOTIFICATION_SERVER
592 if (http_server != NULL) {
593 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: NETCONF session not established.");
594 }
595 #endif
596 return (EXIT_FAILURE);
597 }
598
599 /* check if notifications are allowed on this session */
600 if (nc_session_notif_allowed(session) == 0) {
601 #ifndef TEST_NOTIFICATION_SERVER
602 if (http_server != NULL) {
603 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Notification subscription is not allowed on this session.");
604 }
605 #endif
606 return (EXIT_FAILURE);
607 }
608 /* check times */
609 if (start != -1 && stop != -1 && start > stop) {
610 #ifndef TEST_NOTIFICATION_SERVER
611 if (http_server != NULL) {
612 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Subscription start time must be lower than the end time.");
613 }
614 #endif
615 return (EXIT_FAILURE);
616 }
617
618 /* create requests */
619 rpc = nc_rpc_subscribe(stream, filter, (start_time == 0)?NULL:&start, (stop_time == 0)?NULL:&stop);
620 nc_filter_free(filter);
621 if (rpc == NULL) {
622 #ifndef TEST_NOTIFICATION_SERVER
623 if (http_server != NULL) {
624 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating an rpc request failed.");
625 }
626 #endif
627 return (EXIT_FAILURE);
628 }
629
630 if (send_recv_process(session, "subscribe", rpc) != 0) {
631 return (EXIT_FAILURE);
632 }
633 rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
Tomas Cejka654f84e2013-04-19 11:55:01 +0200634 locked_session->ntfc_subscribed = 1;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200635
636 tconfig = malloc(sizeof(struct ntf_thread_config));
637 tconfig->session = session;
638 tconfig->session_hash = strdup(session_hash);
639 if (http_server != NULL) {
640 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating libnetconf notification thread (%s).",
641 tconfig->session_hash);
642 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200643
Tomas Cejkaba21b382013-04-13 02:37:32 +0200644 if (pthread_create(&thread, NULL, notification_thread, tconfig) != 0) {
645 #ifndef TEST_NOTIFICATION_SERVER
646 if (http_server != NULL) {
647 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating a thread for receiving notifications failed");
648 }
649 #endif
650 return (EXIT_FAILURE);
651 }
652 pthread_detach(thread);
653 return (EXIT_SUCCESS);
654}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100655
656static int callback_notification(struct libwebsocket_context *context,
657 struct libwebsocket *wsi,
658 enum libwebsocket_callback_reasons reason,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200659 void *user, void *in, size_t len)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100660{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200661 int n = 0;
662 int m = 0;
663 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100664 unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
Tomas Cejkaba21b382013-04-13 02:37:32 +0200665 struct per_session_data__notif_client *pss = (struct per_session_data__notif_client *)user;
666
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100667 switch (reason) {
668
669 case LWS_CALLBACK_ESTABLISHED:
670 lwsl_info("callback_notification: LWS_CALLBACK_ESTABLISHED\n");
671 pss->number = 0;
672 break;
673
674 case LWS_CALLBACK_SERVER_WRITEABLE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200675 if (pss->session_key == NULL) {
676 return 0;
677 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200678
679 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
680 if (ls == NULL) {
681 if (http_server != NULL) {
682 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: session not found");
683 }
684 return -1;
685 }
686 if (pthread_mutex_lock(&ls->lock) != 0) {
687 #ifndef TEST_NOTIFICATION_SERVER
688 if (http_server != NULL) {
689 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot lock session");
690 }
691 #endif
692 }
693 notification_t *notif = NULL;
694 if (ls->notifications == NULL) {
695 #ifndef TEST_NOTIFICATION_SERVER
696 if (http_server != NULL) {
697 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: no notifications array");
698 }
699 #endif
700 pthread_mutex_unlock(&ls->lock);
701 return -1;
702 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200703 if (!apr_is_empty_array(ls->notifications)) {
Tomas Cejkaba21b382013-04-13 02:37:32 +0200704
Tomas Cejka654f84e2013-04-19 11:55:01 +0200705 if (http_server != NULL) {
706 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications for session");
707 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200708
Tomas Cejka654f84e2013-04-19 11:55:01 +0200709 while ((notif = (notification_t *) apr_array_pop(ls->notifications)) != NULL) {
710 char t[128];
Tomas Cejka73286932013-05-27 22:54:35 +0200711 memset(&t, 0, 128);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200712 strftime(t, sizeof(t), "%c", localtime(&notif->eventtime));
713 n = 0;
Tomas Cejka73286932013-05-27 22:54:35 +0200714 json_object *notif_json = json_object_new_object();
715 json_object_object_add(notif_json, "eventtime", json_object_new_string(t));
716 json_object_object_add(notif_json, "content", json_object_new_string(notif->content));
717
718 char *msgtext = json_object_to_json_string(notif_json);
719 m = libwebsocket_write(wsi, (unsigned char *) msgtext, strlen(msgtext), LWS_WRITE_TEXT);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200720 }
721 if (http_server != NULL) {
722 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications done");
723 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200724 }
725
726 if (pthread_mutex_unlock(&ls->lock) != 0) {
727 #ifndef TEST_NOTIFICATION_SERVER
728 if (http_server != NULL) {
729 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot unlock session");
730 }
731 #endif
732 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200733 //if (http_server != NULL) {
734 // ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: unlocked session lock");
735 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200736
737
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100738 if (m < n) {
739 lwsl_err("ERROR %d writing to di socket\n", n);
740 return -1;
741 }
742 if (close_testing && pss->number == 50) {
743 lwsl_info("close tesing limit, closing\n");
744 return -1;
745 }
746 break;
747
748 case LWS_CALLBACK_RECEIVE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200749 #ifndef TEST_NOTIFICATION_SERVER
750 if (http_server != NULL) {
751 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "received: (%s)", (char *)in);
752 }
753 #endif
754 if (pss->session_key == NULL) {
755 char session_key_buf[41];
756 int start = -1;
757 time_t stop = time(NULL) + 30;
758
759 strncpy((char *) session_key_buf, (const char *) in, 40);
760 session_key_buf[40] = '\0';
761 pss->session_key = strdup(session_key_buf);
762 sscanf(in+40, "%d %d", (int *) &start, (int *) &stop);
763 if (http_server != NULL) {
764 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: get key (%s) from (%s) (%i,%i)", pss->session_key, (char *) in, (int) start, (int) stop);
765 }
766
Tomas Cejkaba21b382013-04-13 02:37:32 +0200767 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
768 if (ls == NULL) {
769 if (http_server != NULL) {
770 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: session_key not found (%s)", pss->session_key);
771 }
772 return 0;
773 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200774 if (ls->ntfc_subscribed != 0) {
775 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: already subscribed");
776 return 0;
777 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200778 if (http_server != NULL) {
779 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: prepare to subscribe stream");
780 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200781
782 //if (pthread_rwlock_rdlock (&session_lock) != 0) {
783 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
784 // return EXIT_FAILURE;
785 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200786 notif_subscribe(ls, pss->session_key, (time_t) start, (time_t) stop);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200787 //if (pthread_rwlock_unlock (&session_lock) != 0) {
788 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
789 // return EXIT_FAILURE;
790 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200791 }
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100792 if (len < 6)
793 break;
794 if (strcmp((const char *)in, "reset\n") == 0)
795 pss->number = 0;
796 break;
797 /*
798 * this just demonstrates how to use the protocol filter. If you won't
799 * study and reject connections based on header content, you don't need
800 * to handle this callback
801 */
802
803 case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
804 //dump_handshake_info(wsi);
805 /* you could return non-zero here and kill the connection */
806 break;
807
808 default:
809 break;
810 }
811
812 return 0;
813}
814
815/* list of supported protocols and callbacks */
816
817static struct libwebsocket_protocols protocols[] = {
818 /* first protocol must always be HTTP handler */
819
820 {
821 "http-only", /* name */
822 callback_http, /* callback */
823 sizeof (struct per_session_data__http), /* per_session_data_size */
824 0, /* max frame size / rx buffer */
825 },
826 {
827 "notification-protocol",
828 callback_notification,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200829 sizeof(struct per_session_data__notif_client),
830 80,
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100831 },
832 { NULL, NULL, 0, 0 } /* terminator */
833};
834
835
Tomas Cejkaba21b382013-04-13 02:37:32 +0200836/**
837 * initialization of notification module
838 */
839int notification_init(apr_pool_t * pool, server_rec * server, apr_hash_t *conns)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100840{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200841 //char cert_path[1024];
842 //char key_path[1024];
843 //int use_ssl = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100844 struct lws_context_creation_info info;
845 int opts = 0;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200846 //char interface_name[128] = "";
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100847 const char *iface = NULL;
848 int debug_level = 7;
849
Tomas Cejkaba21b382013-04-13 02:37:32 +0200850 netconf_locked_sessions = conns;
851
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100852 memset(&info, 0, sizeof info);
853 info.port = NOTIFICATION_SERVER_PORT;
854
855 /* tell the library what debug level to emit and to send it to syslog */
856 lws_set_log_level(debug_level, lwsl_emit_syslog);
857
858 #ifndef TEST_NOTIFICATION_SERVER
859 if (server != NULL) {
860 http_server = server;
861 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "Initialization of libwebsocket");
862 }
863 #endif
864 lwsl_notice("libwebsockets test server - "
865 "(C) Copyright 2010-2013 Andy Green <andy@warmcat.com> - "
866 "licensed under LGPL2.1\n");
867 max_poll_elements = getdtablesize();
868 pollfds = malloc(max_poll_elements * sizeof (struct pollfd));
869 fd_lookup = malloc(max_poll_elements * sizeof (int));
870 if (pollfds == NULL || fd_lookup == NULL) {
871 lwsl_err("Out of memory pollfds=%d\n", max_poll_elements);
872 return -1;
873 }
874
875 info.iface = iface;
876 info.protocols = protocols;
877
878 //snprintf(cert_path, sizeof(cert_path), "%s/libwebsockets-test-server.pem", resource_path);
879 //snprintf(key_path, sizeof(cert_path), "%s/libwebsockets-test-server.key.pem", resource_path);
880
881 //info.ssl_cert_filepath = cert_path;
882 //info.ssl_private_key_filepath = key_path;
883
884 info.gid = -1;
885 info.uid = -1;
886 info.options = opts;
887
888 /* create server */
889 context = libwebsocket_create_context(&info);
890 if (context == NULL) {
891 lwsl_err("libwebsocket init failed\n");
892 return -1;
893 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200894 return 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100895}
896
897void notification_close()
898{
899 libwebsocket_context_destroy(context);
900
901 lwsl_notice("libwebsockets-test-server exited cleanly\n");
902}
903
Tomas Cejkaba21b382013-04-13 02:37:32 +0200904
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100905/**
906 * \brief send notification if any
907 * \return < 0 on error
908 */
909int notification_handle()
910{
911 static struct timeval tv;
912 static unsigned int olds = 0;
913 int n = 0;
914
915 gettimeofday(&tv, NULL);
916
917 /*
918 * This provokes the LWS_CALLBACK_SERVER_WRITEABLE for every
919 * live websocket connection using the DUMB_INCREMENT protocol,
920 * as soon as it can take more packets (usually immediately)
921 */
922
923 if (((unsigned int)tv.tv_sec - olds) > 0) {
924 libwebsocket_callback_on_writable_all_protocol(&protocols[PROTOCOL_NOTIFICATION]);
925 olds = tv.tv_sec;
926 }
927
928
929 /*
930 * this represents an existing server's single poll action
931 * which also includes libwebsocket sockets
932 */
933
934 n = poll(pollfds, count_pollfds, 50);
935 if (n < 0)
936 return n;
937
938
939 if (n) {
940 for (n = 0; n < count_pollfds; n++) {
941 if (pollfds[n].revents) {
942 /*
943 * returns immediately if the fd does not
944 * match anything under libwebsockets
945 * control
946 */
947 if (libwebsocket_service_fd(context, &pollfds[n]) < 0) {
948 return 1;
949 }
950 }
951 }
952 }
953 return 0;
954}
955
956#endif
957
958
959#ifndef WITH_NOTIFICATIONS
960#ifdef TEST_NOTIFICATION_SERVER
961int main(int argc, char **argv)
962{
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100963 if (notification_init(NULL, NULL) == -1) {
964 fprintf(stderr, "Error during initialization\n");
965 return 1;
966 }
967 while (!force_exit) {
968 notification_handle();
969 }
970 notification_close();
971}
972#endif
973#endif