blob: 2f02a8186fac2ce81fd485073d0c427fd90d357b [file] [log] [blame]
Tomas Cejkad340dbf2013-03-24 20:36:57 +01001/*
2 * libwebsockets-test-server - libwebsockets test implementation
3 *
4 * Copyright (C) 2010-2011 Andy Green <andy@warmcat.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation:
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301 USA
20 */
21#include <stdio.h>
22#include <stdlib.h>
23#include <unistd.h>
24#include <getopt.h>
25#include <string.h>
26#include <sys/time.h>
27#include <sys/stat.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020028#include <sys/queue.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010029#include <fcntl.h>
30#include <assert.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020031#include <pthread.h>
32#include <libnetconf.h>
33#include <libwebsockets.h>
Tomas Cejkad340dbf2013-03-24 20:36:57 +010034#include "notification_module.h"
Tomas Cejkaba21b382013-04-13 02:37:32 +020035#include "mod_netconf.h"
Tomas Cejkad340dbf2013-03-24 20:36:57 +010036
37#ifndef TEST_NOTIFICATION_SERVER
38#include <httpd.h>
39#include <http_log.h>
Tomas Cejkaba21b382013-04-13 02:37:32 +020040#include <apr_hash.h>
41#include <apr_tables.h>
42
43#else
44static int force_exit = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010045#endif
46
47#if defined(TEST_NOTIFICATION_SERVER) || defined(WITH_NOTIFICATIONS)
48static int close_testing;
49static int max_poll_elements;
50
51static struct pollfd *pollfds;
52static int *fd_lookup;
53static int count_pollfds;
Tomas Cejkad340dbf2013-03-24 20:36:57 +010054static struct libwebsocket_context *context = NULL;
55static server_rec *http_server = NULL;
56
Tomas Cejkaba21b382013-04-13 02:37:32 +020057struct ntf_thread_config {
58 struct nc_session *session;
59 char *session_hash;
60};
61
62static apr_hash_t *netconf_locked_sessions = NULL;
63static pthread_key_t thread_key;
64
Tomas Cejkad340dbf2013-03-24 20:36:57 +010065/*
66 * This demo server shows how to use libwebsockets for one or more
67 * websocket protocols in the same server
68 *
69 * It defines the following websocket protocols:
70 *
71 * dumb-increment-protocol: once the socket is opened, an incrementing
72 * ascii string is sent down it every 50ms.
73 * If you send "reset\n" on the websocket, then
74 * the incrementing number is reset to 0.
75 *
76 * lws-mirror-protocol: copies any received packet to every connection also
77 * using this protocol, including the sender
78 */
79
80enum demo_protocols {
81 /* always first */
82 PROTOCOL_HTTP = 0,
83
84 PROTOCOL_NOTIFICATION,
85
86 /* always last */
87 DEMO_PROTOCOL_COUNT
88};
89
90
91#define LOCAL_RESOURCE_PATH "."
92char *resource_path = LOCAL_RESOURCE_PATH;
93
94/*
95 * We take a strict whitelist approach to stop ../ attacks
96 */
97
98struct serveable {
99 const char *urlpath;
100 const char *mimetype;
101};
102
103static const struct serveable whitelist[] = {
104 { "/favicon.ico", "image/x-icon" },
105 { "/libwebsockets.org-logo.png", "image/png" },
106
107 /* last one is the default served if no match */
108 { "/test.html", "text/html" },
109};
110
111struct per_session_data__http {
112 int fd;
113};
114
115/* this protocol server (always the first one) just knows how to do HTTP */
116
117static int callback_http(struct libwebsocket_context *context,
118 struct libwebsocket *wsi,
119 enum libwebsocket_callback_reasons reason, void *user,
120 void *in, size_t len)
121{
122 char client_name[128];
123 char client_ip[128];
124 char buf[256];
125 int n, m;
126 unsigned char *p;
127 static unsigned char buffer[4096];
128 struct stat stat_buf;
129 struct per_session_data__http *pss = (struct per_session_data__http *)user;
130 int fd = (int)(long)in;
131
132 switch (reason) {
133 case LWS_CALLBACK_HTTP:
134
135 /* check for the "send a big file by hand" example case */
136
137 if (!strcmp((const char *)in, "/leaf.jpg")) {
138 char leaf_path[1024];
139 snprintf(leaf_path, sizeof(leaf_path), "%s/leaf.jpg", resource_path);
140
141 /* well, let's demonstrate how to send the hard way */
142
143 p = buffer;
144
145 pss->fd = open(leaf_path, O_RDONLY);
146
147 if (pss->fd < 0)
148 return -1;
149
150 fstat(pss->fd, &stat_buf);
151
152 /*
153 * we will send a big jpeg file, but it could be
154 * anything. Set the Content-Type: appropriately
155 * so the browser knows what to do with it.
156 */
157
158 p += sprintf((char *)p,
159 "HTTP/1.0 200 OK\x0d\x0a"
160 "Server: libwebsockets\x0d\x0a"
161 "Content-Type: image/jpeg\x0d\x0a"
162 "Content-Length: %u\x0d\x0a\x0d\x0a",
163 (unsigned int)stat_buf.st_size);
164
165 /*
166 * send the http headers...
167 * this won't block since it's the first payload sent
168 * on the connection since it was established
169 * (too small for partial)
170 */
171
172 n = libwebsocket_write(wsi, buffer,
173 p - buffer, LWS_WRITE_HTTP);
174
175 if (n < 0) {
176 close(pss->fd);
177 return -1;
178 }
179 /*
180 * book us a LWS_CALLBACK_HTTP_WRITEABLE callback
181 */
182 libwebsocket_callback_on_writable(context, wsi);
183 break;
184 }
185
186 /* if not, send a file the easy way */
187
188 for (n = 0; n < (sizeof(whitelist) / sizeof(whitelist[0]) - 1); n++)
189 if (in && strcmp((const char *)in, whitelist[n].urlpath) == 0)
190 break;
191
192 sprintf(buf, "%s%s", resource_path, whitelist[n].urlpath);
193
194 if (libwebsockets_serve_http_file(context, wsi, buf, whitelist[n].mimetype))
195 return -1; /* through completion or error, close the socket */
196
197 /*
198 * notice that the sending of the file completes asynchronously,
199 * we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when
200 * it's done
201 */
202
203 break;
204
205 case LWS_CALLBACK_HTTP_FILE_COMPLETION:
206// lwsl_info("LWS_CALLBACK_HTTP_FILE_COMPLETION seen\n");
207 /* kill the connection after we sent one file */
208 return -1;
209
210 case LWS_CALLBACK_HTTP_WRITEABLE:
211 /*
212 * we can send more of whatever it is we were sending
213 */
214
215 do {
216 n = read(pss->fd, buffer, sizeof buffer);
217 /* problem reading, close conn */
218 if (n < 0)
219 goto bail;
220 /* sent it all, close conn */
221 if (n == 0)
222 goto bail;
223 /*
224 * because it's HTTP and not websocket, don't need to take
225 * care about pre and postamble
226 */
227 m = libwebsocket_write(wsi, buffer, n, LWS_WRITE_HTTP);
228 if (m < 0)
229 /* write failed, close conn */
230 goto bail;
231 if (m != n)
232 /* partial write, adjust */
233 lseek(pss->fd, m - n, SEEK_CUR);
234
235 } while (!lws_send_pipe_choked(wsi));
236 libwebsocket_callback_on_writable(context, wsi);
237 break;
238
239bail:
240 close(pss->fd);
241 return -1;
242
243 /*
244 * callback for confirming to continue with client IP appear in
245 * protocol 0 callback since no websocket protocol has been agreed
246 * yet. You can just ignore this if you won't filter on client IP
247 * since the default uhandled callback return is 0 meaning let the
248 * connection continue.
249 */
250
251 case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
252 libwebsockets_get_peer_addresses(context, wsi, (int)(long)in, client_name,
253 sizeof(client_name), client_ip, sizeof(client_ip));
254
Tomas Cejkaba21b382013-04-13 02:37:32 +0200255 //fprintf(stderr, "Received network connect from %s (%s)\n", client_name, client_ip);
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100256 /* if we returned non-zero from here, we kill the connection */
257 break;
258
259 /*
260 * callbacks for managing the external poll() array appear in
261 * protocol 0 callback
262 */
263
264 case LWS_CALLBACK_ADD_POLL_FD:
265
266 if (count_pollfds >= max_poll_elements) {
267 lwsl_err("LWS_CALLBACK_ADD_POLL_FD: too many sockets to track\n");
268 return 1;
269 }
270
271 fd_lookup[fd] = count_pollfds;
272 pollfds[count_pollfds].fd = fd;
273 pollfds[count_pollfds].events = (int)(long)len;
274 pollfds[count_pollfds++].revents = 0;
275 break;
276
277 case LWS_CALLBACK_DEL_POLL_FD:
278 if (!--count_pollfds)
279 break;
280 m = fd_lookup[fd];
281 /* have the last guy take up the vacant slot */
282 pollfds[m] = pollfds[count_pollfds];
283 fd_lookup[pollfds[count_pollfds].fd] = m;
284 break;
285
286 case LWS_CALLBACK_SET_MODE_POLL_FD:
287 pollfds[fd_lookup[fd]].events |= (int)(long)len;
288 break;
289
290 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
291 pollfds[fd_lookup[fd]].events &= ~(int)(long)len;
292 break;
293
294 default:
295 break;
296 }
297
298 return 0;
299}
300
Tomas Cejkaba21b382013-04-13 02:37:32 +0200301/**
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100302 * this is just an example of parsing handshake headers, you don't need this
303 * in your code unless you will filter allowing connections by the header
304 * content
305 */
Tomas Cejkaba21b382013-04-13 02:37:32 +0200306//static void dump_handshake_info(struct libwebsocket *wsi)
307//{
308// int n;
309// static const char *token_names[WSI_TOKEN_COUNT] = {
310// /*[WSI_TOKEN_GET_URI] =*/ "GET URI",
311// /*[WSI_TOKEN_HOST] =*/ "Host",
312// /*[WSI_TOKEN_CONNECTION] =*/ "Connection",
313// /*[WSI_TOKEN_KEY1] =*/ "key 1",
314// /*[WSI_TOKEN_KEY2] =*/ "key 2",
315// /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol",
316// /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade",
317// /*[WSI_TOKEN_ORIGIN] =*/ "Origin",
318// /*[WSI_TOKEN_DRAFT] =*/ "Draft",
319// /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge",
320//
321// /* new for 04 */
322// /*[WSI_TOKEN_KEY] =*/ "Key",
323// /*[WSI_TOKEN_VERSION] =*/ "Version",
324// /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin",
325//
326// /* new for 05 */
327// /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions",
328//
329// /* client receives these */
330// /*[WSI_TOKEN_ACCEPT] =*/ "Accept",
331// /*[WSI_TOKEN_NONCE] =*/ "Nonce",
332// /*[WSI_TOKEN_HTTP] =*/ "Http",
333// /*[WSI_TOKEN_MUXURL] =*/ "MuxURL",
334// };
335// char buf[256];
336//
337// for (n = 0; n < WSI_TOKEN_COUNT; n++) {
338// if (!lws_hdr_total_length(wsi, n))
339// continue;
340//
341// //lws_hdr_copy(wsi, buf, sizeof buf, n);
342//
343// //fprintf(stderr, " %s = %s\n", token_names[n], buf);
344// }
345//}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100346
347/* dumb_increment protocol */
348
349/*
350 * one of these is auto-created for each connection and a pointer to the
351 * appropriate instance is passed to the callback in the user parameter
352 *
353 * for this example protocol we use it to individualize the count for each
354 * connection.
355 */
356
Tomas Cejkaba21b382013-04-13 02:37:32 +0200357struct per_session_data__notif_client {
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100358 int number;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200359 char *session_key;
360 struct nc_session *session;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100361};
362
Tomas Cejkaba21b382013-04-13 02:37:32 +0200363struct session_with_mutex *get_ncsession_from_key(const char *session_key)
364{
365 struct session_with_mutex *locked_session = NULL;
366 if (session_key == NULL) {
367 return (NULL);
368 }
369 if (pthread_rwlock_wrlock(&session_lock) != 0) {
370 #ifndef TEST_NOTIFICATION_SERVER
371 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
372 #endif
373 return (NULL);
374 }
375 locked_session = (struct session_with_mutex *)apr_hash_get(netconf_locked_sessions, session_key, APR_HASH_KEY_STRING);
376 if (pthread_rwlock_unlock (&session_lock) != 0) {
377 #ifndef TEST_NOTIFICATION_SERVER
378 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
379 #endif
380 return (NULL);
381 }
382 return locked_session;
383}
384
385/* rpc parameter is freed after the function call */
386static int send_recv_process(struct nc_session *session, const char* operation, nc_rpc* rpc)
387{
388 nc_reply *reply = NULL;
389 char *data = NULL;
390 int ret = EXIT_SUCCESS;
391
392 /* send the request and get the reply */
393 switch (nc_session_send_recv(session, rpc, &reply)) {
394 case NC_MSG_UNKNOWN:
395 if (nc_session_get_status(session) != NC_SESSION_STATUS_WORKING) {
396 #ifndef TEST_NOTIFICATION_SERVER
397 if (http_server != NULL) {
398 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: receiving rpc-reply failed.");
399 }
400 #endif
401 //cmd_disconnect(NULL);
402 ret = EXIT_FAILURE;
403 break;
404 }
405 #ifndef TEST_NOTIFICATION_SERVER
406 if (http_server != NULL) {
407 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
408 }
409 #endif
410 ret = EXIT_FAILURE;
411 break;
412 case NC_MSG_NONE:
413 /* error occurred, but processed by callback */
414 break;
415 case NC_MSG_REPLY:
416 switch (nc_reply_get_type(reply)) {
417 case NC_REPLY_OK:
418 break;
419 case NC_REPLY_DATA:
420 #ifndef TEST_NOTIFICATION_SERVER
421 if (http_server != NULL) {
422 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: recv: %s.", data = nc_reply_get_data (reply));
423 free(data);
424 }
425 #endif
426 break;
427 case NC_REPLY_ERROR:
428 /* wtf, you shouldn't be here !?!? */
429 #ifndef TEST_NOTIFICATION_SERVER
430 if (http_server != NULL) {
431 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: operation failed, but rpc-error was not processed.");
432 }
433 #endif
434 ret = EXIT_FAILURE;
435 break;
436 default:
437 #ifndef TEST_NOTIFICATION_SERVER
438 if (http_server != NULL) {
439 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: unexpected operation result.");
440 }
441 #endif
442 ret = EXIT_FAILURE;
443 break;
444 }
445 break;
446 default:
447 #ifndef TEST_NOTIFICATION_SERVER
448 if (http_server != NULL) {
449 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Unknown error occurred.");
450 }
451 #endif
452 ret = EXIT_FAILURE;
453 break;
454 }
455 nc_rpc_free(rpc);
456 nc_reply_free(reply);
457
458 return (ret);
459}
460
461/**
462 * \brief Callback to store incoming notification
463 * \param [in] eventtime - time when notification occured
464 * \param [in] content - content of notification
465 */
466static void notification_fileprint (time_t eventtime, const char* content)
467{
468 char t[128];
469 struct session_with_mutex *target_session = NULL;
470 notification_t *ntf = NULL;
471 char *session_hash = NULL;
472
473 t[0] = 0;
474 strftime(t, sizeof(t), "%c", localtime(&eventtime));
475
476 if (http_server != NULL) {
477 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: eventTime: %s\n%s\n", t, content);
478 }
479
480 /* \todo replace last_session_key with client identification */
481 session_hash = pthread_getspecific(thread_key);
482 if (http_server != NULL) {
483 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: fileprint getspecific (%s)", session_hash);
484 }
485 target_session = get_ncsession_from_key(session_hash);
486 if (target_session == NULL) {
487 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "no session found last_session_key (%s)", session_hash);
488 return;
489 }
490 if (pthread_mutex_lock(&target_session->lock) != 0) {
491 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
492 return;
493 }
494 if (target_session->notifications == NULL) {
495 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "target_session->notifications is NULL");
496 if (pthread_mutex_unlock(&target_session->lock) != 0) {
497 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
498 return;
499 }
500 return;
501 }
502 if (http_server != NULL) {
503 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: ready to push to notifications queue");
504 }
505 ntf = (notification_t *) apr_array_push(target_session->notifications);
506 if (ntf == NULL) {
507 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Failed to allocate element ");
508 if (pthread_mutex_unlock(&target_session->lock) != 0) {
509 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
510 return;
511 }
512 return;
513 }
514 ntf->eventtime = time(NULL);
Tomas Cejkaba21b382013-04-13 02:37:32 +0200515 ntf->content = strdup(content);
516
517 if (http_server != NULL) {
518 ap_log_error (APLOG_MARK, APLOG_NOTICE, 0, http_server, "added notif to queue %u (%s)", (unsigned int) ntf->eventtime, "notifikace");
519 }
520
521 if (pthread_mutex_unlock(&target_session->lock) != 0) {
522 ap_log_error (APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
523 return;
524 }
525}
526
527/**
528 * \brief Thread for libnetconf notifications dispatch
529 * \param [in] arg - struct ntf_thread_config * with nc_session
530 */
531void* notification_thread(void* arg)
532{
533 struct ntf_thread_config *config = (struct ntf_thread_config*)arg;
534 #ifndef TEST_NOTIFICATION_SERVER
535 if (http_server != NULL) {
536 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: in thread for libnetconf notifications");
537 }
538 #endif
539 if (http_server != NULL) {
540 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: pthread_key_create");
541 }
542 if (pthread_key_create(&thread_key, NULL) != 0) {
543 #ifndef TEST_NOTIFICATION_SERVER
544 if (http_server != NULL) {
545 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: pthread_key_create failed");
546 }
547 #endif
548 }
549 pthread_setspecific(thread_key, config->session_hash);
550 if (http_server != NULL) {
551 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notifications: dispatching");
552 }
553 ncntf_dispatch_receive(config->session, notification_fileprint);
554 #ifndef TEST_NOTIFICATION_SERVER
555 if (http_server != NULL) {
556 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notifications: ended thread for libnetconf notifications");
557 }
558 #endif
559 free(config);
560 return (NULL);
561}
562
563
564int notif_subscribe(struct session_with_mutex *locked_session, const char *session_hash, time_t start_time, time_t stop_time)
565{
566 time_t start = -1;
567 time_t stop = -1;
568 struct nc_filter *filter = NULL;
569 char *stream = NULL;
570 nc_rpc *rpc = NULL;
571 pthread_t thread;
572 struct ntf_thread_config *tconfig;
573 struct nc_session *session;
574
575 if (locked_session == NULL) {
576 if (http_server != NULL) {
577 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: no locked_session was given.");
578 }
579 return (EXIT_FAILURE);
580 }
581
582 session = locked_session->session;
583
584 start = time(NULL) + start_time;
585 stop = time(NULL) + stop_time;
586 if (http_server != NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: history: %u %u", (unsigned int) start, (unsigned int) stop);
587 }
588
589 if (session == NULL) {
590 #ifndef TEST_NOTIFICATION_SERVER
591 if (http_server != NULL) {
592 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: NETCONF session not established.");
593 }
594 #endif
595 return (EXIT_FAILURE);
596 }
597
598 /* check if notifications are allowed on this session */
599 if (nc_session_notif_allowed(session) == 0) {
600 #ifndef TEST_NOTIFICATION_SERVER
601 if (http_server != NULL) {
602 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Notification subscription is not allowed on this session.");
603 }
604 #endif
605 return (EXIT_FAILURE);
606 }
607 /* check times */
608 if (start != -1 && stop != -1 && start > stop) {
609 #ifndef TEST_NOTIFICATION_SERVER
610 if (http_server != NULL) {
611 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: Subscription start time must be lower than the end time.");
612 }
613 #endif
614 return (EXIT_FAILURE);
615 }
616
617 /* create requests */
618 rpc = nc_rpc_subscribe(stream, filter, (start_time == 0)?NULL:&start, (stop_time == 0)?NULL:&stop);
619 nc_filter_free(filter);
620 if (rpc == NULL) {
621 #ifndef TEST_NOTIFICATION_SERVER
622 if (http_server != NULL) {
623 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating an rpc request failed.");
624 }
625 #endif
626 return (EXIT_FAILURE);
627 }
628
629 if (send_recv_process(session, "subscribe", rpc) != 0) {
630 return (EXIT_FAILURE);
631 }
632 rpc = NULL; /* just note that rpc is already freed by send_recv_process() */
Tomas Cejka654f84e2013-04-19 11:55:01 +0200633 locked_session->ntfc_subscribed = 1;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200634
635 tconfig = malloc(sizeof(struct ntf_thread_config));
636 tconfig->session = session;
637 tconfig->session_hash = strdup(session_hash);
638 if (http_server != NULL) {
639 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating libnetconf notification thread (%s).",
640 tconfig->session_hash);
641 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200642
Tomas Cejkaba21b382013-04-13 02:37:32 +0200643 if (pthread_create(&thread, NULL, notification_thread, tconfig) != 0) {
644 #ifndef TEST_NOTIFICATION_SERVER
645 if (http_server != NULL) {
646 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notifications: creating a thread for receiving notifications failed");
647 }
648 #endif
649 return (EXIT_FAILURE);
650 }
651 pthread_detach(thread);
652 return (EXIT_SUCCESS);
653}
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100654
655static int callback_notification(struct libwebsocket_context *context,
656 struct libwebsocket *wsi,
657 enum libwebsocket_callback_reasons reason,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200658 void *user, void *in, size_t len)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100659{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200660 int n = 0;
661 int m = 0;
662 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100663 unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
Tomas Cejkaba21b382013-04-13 02:37:32 +0200664 struct per_session_data__notif_client *pss = (struct per_session_data__notif_client *)user;
665
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100666 switch (reason) {
667
668 case LWS_CALLBACK_ESTABLISHED:
669 lwsl_info("callback_notification: LWS_CALLBACK_ESTABLISHED\n");
670 pss->number = 0;
671 break;
672
673 case LWS_CALLBACK_SERVER_WRITEABLE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200674 if (pss->session_key == NULL) {
675 return 0;
676 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200677
678 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
679 if (ls == NULL) {
680 if (http_server != NULL) {
681 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: session not found");
682 }
683 return -1;
684 }
685 if (pthread_mutex_lock(&ls->lock) != 0) {
686 #ifndef TEST_NOTIFICATION_SERVER
687 if (http_server != NULL) {
688 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot lock session");
689 }
690 #endif
691 }
692 notification_t *notif = NULL;
693 if (ls->notifications == NULL) {
694 #ifndef TEST_NOTIFICATION_SERVER
695 if (http_server != NULL) {
696 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: no notifications array");
697 }
698 #endif
699 pthread_mutex_unlock(&ls->lock);
700 return -1;
701 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200702 if (!apr_is_empty_array(ls->notifications)) {
Tomas Cejkaba21b382013-04-13 02:37:32 +0200703
Tomas Cejka654f84e2013-04-19 11:55:01 +0200704 if (http_server != NULL) {
705 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications for session");
706 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200707
Tomas Cejka654f84e2013-04-19 11:55:01 +0200708 while ((notif = (notification_t *) apr_array_pop(ls->notifications)) != NULL) {
709 char t[128];
710 t[0] = 0;
711 strftime(t, sizeof(t), "%c", localtime(&notif->eventtime));
712 n = 0;
713 n = sprintf((char *)p, "%s\n", notif->content);
714 m = libwebsocket_write(wsi, p, n, LWS_WRITE_TEXT);
715 }
716 if (http_server != NULL) {
717 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: POP notifications done");
718 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200719 }
720
721 if (pthread_mutex_unlock(&ls->lock) != 0) {
722 #ifndef TEST_NOTIFICATION_SERVER
723 if (http_server != NULL) {
724 ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "notification: cannot unlock session");
725 }
726 #endif
727 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200728 //if (http_server != NULL) {
729 // ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: unlocked session lock");
730 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200731
732
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100733 if (m < n) {
734 lwsl_err("ERROR %d writing to di socket\n", n);
735 return -1;
736 }
737 if (close_testing && pss->number == 50) {
738 lwsl_info("close tesing limit, closing\n");
739 return -1;
740 }
741 break;
742
743 case LWS_CALLBACK_RECEIVE:
Tomas Cejkaba21b382013-04-13 02:37:32 +0200744 #ifndef TEST_NOTIFICATION_SERVER
745 if (http_server != NULL) {
746 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "received: (%s)", (char *)in);
747 }
748 #endif
749 if (pss->session_key == NULL) {
750 char session_key_buf[41];
751 int start = -1;
752 time_t stop = time(NULL) + 30;
753
754 strncpy((char *) session_key_buf, (const char *) in, 40);
755 session_key_buf[40] = '\0';
756 pss->session_key = strdup(session_key_buf);
757 sscanf(in+40, "%d %d", (int *) &start, (int *) &stop);
758 if (http_server != NULL) {
759 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: get key (%s) from (%s) (%i,%i)", pss->session_key, (char *) in, (int) start, (int) stop);
760 }
761
Tomas Cejkaba21b382013-04-13 02:37:32 +0200762 struct session_with_mutex *ls = get_ncsession_from_key(pss->session_key);
763 if (ls == NULL) {
764 if (http_server != NULL) {
765 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: session_key not found (%s)", pss->session_key);
766 }
767 return 0;
768 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200769 if (ls->ntfc_subscribed != 0) {
770 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "notification: already subscribed");
771 return 0;
772 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200773 if (http_server != NULL) {
774 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, http_server, "notification: prepare to subscribe stream");
775 }
Tomas Cejka654f84e2013-04-19 11:55:01 +0200776
777 //if (pthread_rwlock_rdlock (&session_lock) != 0) {
778 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while locking rwlock: %d (%s)", errno, strerror(errno));
779 // return EXIT_FAILURE;
780 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200781 notif_subscribe(ls, pss->session_key, (time_t) start, (time_t) stop);
Tomas Cejka654f84e2013-04-19 11:55:01 +0200782 //if (pthread_rwlock_unlock (&session_lock) != 0) {
783 // ap_log_error(APLOG_MARK, APLOG_ERR, 0, http_server, "Error while unlocking rwlock: %d (%s)", errno, strerror(errno));
784 // return EXIT_FAILURE;
785 //}
Tomas Cejkaba21b382013-04-13 02:37:32 +0200786 }
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100787 if (len < 6)
788 break;
789 if (strcmp((const char *)in, "reset\n") == 0)
790 pss->number = 0;
791 break;
792 /*
793 * this just demonstrates how to use the protocol filter. If you won't
794 * study and reject connections based on header content, you don't need
795 * to handle this callback
796 */
797
798 case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
799 //dump_handshake_info(wsi);
800 /* you could return non-zero here and kill the connection */
801 break;
802
803 default:
804 break;
805 }
806
807 return 0;
808}
809
810/* list of supported protocols and callbacks */
811
812static struct libwebsocket_protocols protocols[] = {
813 /* first protocol must always be HTTP handler */
814
815 {
816 "http-only", /* name */
817 callback_http, /* callback */
818 sizeof (struct per_session_data__http), /* per_session_data_size */
819 0, /* max frame size / rx buffer */
820 },
821 {
822 "notification-protocol",
823 callback_notification,
Tomas Cejkaba21b382013-04-13 02:37:32 +0200824 sizeof(struct per_session_data__notif_client),
825 80,
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100826 },
827 { NULL, NULL, 0, 0 } /* terminator */
828};
829
830
Tomas Cejkaba21b382013-04-13 02:37:32 +0200831/**
832 * initialization of notification module
833 */
834int notification_init(apr_pool_t * pool, server_rec * server, apr_hash_t *conns)
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100835{
Tomas Cejkaba21b382013-04-13 02:37:32 +0200836 //char cert_path[1024];
837 //char key_path[1024];
838 //int use_ssl = 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100839 struct lws_context_creation_info info;
840 int opts = 0;
Tomas Cejkaba21b382013-04-13 02:37:32 +0200841 //char interface_name[128] = "";
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100842 const char *iface = NULL;
843 int debug_level = 7;
844
Tomas Cejkaba21b382013-04-13 02:37:32 +0200845 netconf_locked_sessions = conns;
846
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100847 memset(&info, 0, sizeof info);
848 info.port = NOTIFICATION_SERVER_PORT;
849
850 /* tell the library what debug level to emit and to send it to syslog */
851 lws_set_log_level(debug_level, lwsl_emit_syslog);
852
853 #ifndef TEST_NOTIFICATION_SERVER
854 if (server != NULL) {
855 http_server = server;
856 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, http_server, "Initialization of libwebsocket");
857 }
858 #endif
859 lwsl_notice("libwebsockets test server - "
860 "(C) Copyright 2010-2013 Andy Green <andy@warmcat.com> - "
861 "licensed under LGPL2.1\n");
862 max_poll_elements = getdtablesize();
863 pollfds = malloc(max_poll_elements * sizeof (struct pollfd));
864 fd_lookup = malloc(max_poll_elements * sizeof (int));
865 if (pollfds == NULL || fd_lookup == NULL) {
866 lwsl_err("Out of memory pollfds=%d\n", max_poll_elements);
867 return -1;
868 }
869
870 info.iface = iface;
871 info.protocols = protocols;
872
873 //snprintf(cert_path, sizeof(cert_path), "%s/libwebsockets-test-server.pem", resource_path);
874 //snprintf(key_path, sizeof(cert_path), "%s/libwebsockets-test-server.key.pem", resource_path);
875
876 //info.ssl_cert_filepath = cert_path;
877 //info.ssl_private_key_filepath = key_path;
878
879 info.gid = -1;
880 info.uid = -1;
881 info.options = opts;
882
883 /* create server */
884 context = libwebsocket_create_context(&info);
885 if (context == NULL) {
886 lwsl_err("libwebsocket init failed\n");
887 return -1;
888 }
Tomas Cejkaba21b382013-04-13 02:37:32 +0200889 return 0;
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100890}
891
892void notification_close()
893{
894 libwebsocket_context_destroy(context);
895
896 lwsl_notice("libwebsockets-test-server exited cleanly\n");
897}
898
Tomas Cejkaba21b382013-04-13 02:37:32 +0200899
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100900/**
901 * \brief send notification if any
902 * \return < 0 on error
903 */
904int notification_handle()
905{
906 static struct timeval tv;
907 static unsigned int olds = 0;
908 int n = 0;
909
910 gettimeofday(&tv, NULL);
911
912 /*
913 * This provokes the LWS_CALLBACK_SERVER_WRITEABLE for every
914 * live websocket connection using the DUMB_INCREMENT protocol,
915 * as soon as it can take more packets (usually immediately)
916 */
917
918 if (((unsigned int)tv.tv_sec - olds) > 0) {
919 libwebsocket_callback_on_writable_all_protocol(&protocols[PROTOCOL_NOTIFICATION]);
920 olds = tv.tv_sec;
921 }
922
923
924 /*
925 * this represents an existing server's single poll action
926 * which also includes libwebsocket sockets
927 */
928
929 n = poll(pollfds, count_pollfds, 50);
930 if (n < 0)
931 return n;
932
933
934 if (n) {
935 for (n = 0; n < count_pollfds; n++) {
936 if (pollfds[n].revents) {
937 /*
938 * returns immediately if the fd does not
939 * match anything under libwebsockets
940 * control
941 */
942 if (libwebsocket_service_fd(context, &pollfds[n]) < 0) {
943 return 1;
944 }
945 }
946 }
947 }
948 return 0;
949}
950
951#endif
952
953
954#ifndef WITH_NOTIFICATIONS
955#ifdef TEST_NOTIFICATION_SERVER
956int main(int argc, char **argv)
957{
Tomas Cejkad340dbf2013-03-24 20:36:57 +0100958 if (notification_init(NULL, NULL) == -1) {
959 fprintf(stderr, "Error during initialization\n");
960 return 1;
961 }
962 while (!force_exit) {
963 notification_handle();
964 }
965 notification_close();
966}
967#endif
968#endif