FEATURE writing to NETCONF session
Writing reply and notification is not finished - higher level functions
are missing so the way to define what is going to be written is still
unclear even in the low level functions.
diff --git a/src/io.c b/src/io.c
index 88c3142..b896caa 100644
--- a/src/io.c
+++ b/src/io.c
@@ -20,9 +20,11 @@
*
*/
+#define _GNU_SOURCE /* asprintf */
#include <assert.h>
#include <errno.h>
#include <poll.h>
+#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -32,8 +34,9 @@
#include "config.h"
#include "libnetconf.h"
#include "session_p.h"
+#include "messages_p.h"
-#define BUFFERSIZE 4096
+#define BUFFERSIZE 512
static ssize_t
nc_read(struct nc_session *session, char *buf, size_t count)
@@ -160,7 +163,7 @@
}
/* terminating null byte */
- *chunk[r] = 0;
+ (*chunk)[r] = 0;
return r;
}
@@ -230,6 +233,8 @@
if (result) {
*result = chunk;
+ } else {
+ free(chunk);
}
return count;
}
@@ -242,7 +247,7 @@
struct pollfd fds;
const char *emsg = NULL;
char *msg = NULL, *chunk, *aux;
- unsigned long int chunk_len, len;
+ unsigned long int chunk_len, len = 0;
assert(data);
*data = NULL;
@@ -355,11 +360,11 @@
/* convert string to the size of the following chunk */
chunk_len = strtoul(chunk, (char **) NULL, 10);
- if (!chunk_len == 0) {
+ free (chunk);
+ if (!chunk_len) {
ERR("Invalid frame chunk size detected, fatal error.");
goto error;
}
- free (chunk);
/* now we have size of next chunk, so read the chunk */
status = nc_read_chunk(session, chunk_len, &chunk);
@@ -449,3 +454,343 @@
return NC_MSG_ERROR;
}
+
+#define WRITE_BUFSIZE (2 * BUFFERSIZE)
+struct wclb_arg {
+ struct nc_session *session;
+ char buf[WRITE_BUFSIZE];
+ size_t len;
+};
+
+static ssize_t
+write_(struct nc_session *session, const void *buf, size_t count)
+{
+ int c = 0;
+ char chunksize[20];
+
+ switch (session->ti_type) {
+ case NC_TI_FD:
+ if (session->version == NC_VERSION_11) {
+ c = dprintf(session->ti.fd.out, "\n#%zu\n", count);
+ }
+ return write(session->ti.fd.out, buf, count) + c;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+ if (session->version == NC_VERSION_11) {
+ c = snprintf(chunksize, 20, "\n#%zu\n", count);
+ ssh_channel_write(session->ti.libssh.channel, chunksize, c);
+ }
+ return ssh_channel_write(session->ti.libssh.channel, buf, count) + c;
+#endif
+
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+ if (session->version == NC_VERSION_11) {
+ c = snprintf(chunksize, "\n#%zu\n", count);
+ SSL_write(session->ti.tls, chunksize, c);
+ }
+ return SSL_write(session->ti.tls, buf, count) + c;
+#endif
+ }
+
+ return -1;
+}
+
+static int
+write_endtag(struct nc_session *session)
+{
+ switch(session->ti_type) {
+ case NC_TI_FD:
+ if (session->version == NC_VERSION_11) {
+ write(session->ti.fd.out, "\n##\n", 4);
+ } else {
+ write(session->ti.fd.out, "]]>]]>", 6);
+ }
+ break;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+ if (session->version == NC_VERSION_11) {
+ ssh_channel_write(session->ti.libssh.channel, "\n##\n", 4);
+ } else {
+ ssh_channel_write(session->ti.libssh.channel, "]]>]]>", 6);
+ }
+ break;
+#endif
+
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+ if (session->version == NC_VERSION_11) {
+ SSL_write(session->ti.tls, "\n##\n", 4);
+ } else {
+ SSL_write(session->ti.tls, "]]>]]>", 6);
+ }
+ break;
+#endif
+ }
+
+ return 0;
+}
+
+static void
+write_clb_flush(struct wclb_arg *warg)
+{
+ /* flush current buffer */
+ if (warg->len) {
+ write_(warg->session, warg->buf, warg->len);
+ warg->len = 0;
+ }
+}
+
+static ssize_t
+write_clb(void *arg, const void *buf, size_t count)
+{
+ struct wclb_arg *warg = (struct wclb_arg *)arg;
+
+ if (!buf) {
+ write_clb_flush(warg);
+
+ /* endtag */
+ write_endtag(warg->session);
+ return 0;
+ }
+
+ if (warg->len && (warg->len + count > WRITE_BUFSIZE)) {
+ /* dump current buffer */
+ write_clb_flush(warg);
+ }
+ if (count > WRITE_BUFSIZE) {
+ /* write directly */
+ write_(warg->session, buf, count);
+ } else {
+ /* keep in buffer and write later */
+ memcpy(&warg->buf[warg->len], buf, count);
+ warg->len += count; /* is <= WRITE_BUFSIZE */
+ }
+
+ return (ssize_t)count;
+}
+
+/*
+ * NETCONF 1.0 format
+ */
+static int
+write_msg_10(struct nc_session *session, NC_MSG_TYPE type, va_list ap)
+{
+ int count;
+ const char *attrs;
+ struct lyd_node *content;
+ struct nc_rpc *rpc;
+ struct lyxml_elem *capabilities;
+ char *buf = NULL;
+ struct wclb_arg arg;
+
+ arg.session = session;
+ arg.len = 0;
+
+ switch (type) {
+ case NC_MSG_RPC:
+ content = va_arg(ap, struct lyd_node *);
+ attrs = va_arg(ap, const char *);
+ switch (session->ti_type) {
+ case NC_TI_FD:
+ dprintf(session->ti.fd.out, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+ NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+ lyd_print_fd(session->ti.fd.out, content, LYD_XML);
+ write(session->ti.fd.out, "</rpc>]]>]]>", 12);
+ break;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+ count = asprintf(&buf, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+ NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+ write_clb((void *)&arg, buf, count);
+ free(buf);
+ lyd_print_clb(write_clb, (void *)&arg, content, LYD_XML);
+ write_clb((void *)&arg, "</rpc>", 6);
+
+ /* flush message */
+ write_clb((void *)&arg, NULL, 0);
+ break;
+#endif
+ }
+
+ session->msgid++;
+ break;
+
+ case NC_MSG_REPLY:
+ rpc = va_arg(ap, struct nc_rpc *);
+ switch (session->ti_type) {
+ case NC_TI_FD:
+ write(session->ti.fd.out, "<rpc-reply", 10);
+ lyxml_dump_fd(session->ti.fd.out, rpc->root, LYXML_DUMP_ATTRS);
+ write(session->ti.fd.out, ">", 1);
+
+ /* TODO content */
+
+ write(session->ti.fd.out, "</rpc-reply>]]>]]>", 18);
+ break;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+ write_clb((void *)&arg, "<rpc-reply", 10);
+ lyxml_dump_clb(write_clb, (void *)&arg, rpc->root, LYXML_DUMP_ATTRS);
+
+ /* TODO content */
+
+ write_clb((void *)&arg, "</rpc-reply>", 12);
+
+ /* flush message */
+ write_clb((void *)&arg, NULL, 0);
+ break;
+#endif
+ }
+ break;
+
+ case NC_MSG_NOTIF:
+ switch (session->ti_type) {
+ case NC_TI_FD:
+ write(session->ti.fd.out, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+
+ /* TODO content */
+
+ write(session->ti.fd.out, "</notification>]]>]]>", 18);
+ break;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+ write_clb((void *)&arg, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+
+ /* TODO content */
+
+ write_clb((void *)&arg, "</notification>", 12);
+
+ /* flush message */
+ write_clb((void *)&arg, NULL, 0);
+ break;
+#endif
+ }
+ break;
+
+ case NC_MSG_HELLO:
+ capabilities = va_arg(ap, struct lyxml_elem *);
+ switch (session->ti_type) {
+ case NC_TI_FD:
+ dprintf(session->ti.fd.out, "<hello xmlns=\"%s\">", NC_NS_BASE);
+ lyxml_dump_fd(session->ti.fd.out, capabilities, 0);
+ write(session->ti.fd.out, "</hello>]]>]]>", 11);
+ break;
+
+#ifdef ENABLE_LIBSSH
+ case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+ case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+ count = asprintf(&buf, "<hello xmlns=\"%s\">", NC_NS_BASE);
+ write_clb((void *)&arg, buf, count);
+ free(buf);
+ lyxml_dump_clb(write_clb, (void *)&arg, capabilities, 0);
+ write_clb((void *)&arg, "</hello>", 8);
+
+ /* flush message */
+ write_clb((void *)&arg, NULL, 0);
+ break;
+#endif
+ }
+ break;
+
+ default:
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * NETCONF 1.1 format
+ */
+static int
+write_msg_11(struct nc_session *session, NC_MSG_TYPE type, va_list ap)
+{
+ int count;
+ const char *attrs;
+ struct lyd_node *content;
+ struct nc_rpc *rpc;
+ char *buf = NULL;
+ struct wclb_arg arg;
+
+ arg.session = session;
+ arg.len = 0;
+
+ switch (type) {
+ case NC_MSG_RPC:
+ content = va_arg(ap, struct lyd_node *);
+ attrs = va_arg(ap, const char *);
+ count = asprintf(&buf, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+ NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+ write_clb((void *)&arg, buf, count);
+ free(buf);
+ lyd_print_clb(write_clb, (void *)&arg, content, LYD_XML);
+ write_clb((void *)&arg, "</rpc>", 6);
+
+ session->msgid++;
+ break;
+ case NC_MSG_REPLY:
+ rpc = va_arg(ap, struct nc_rpc *);
+ write_clb((void *)&arg, "<rpc-reply", 10);
+ lyxml_dump_clb(write_clb, (void *)&arg, rpc->root, LYXML_DUMP_ATTRS);
+ write_clb((void *)&arg, ">", 1);
+ /* TODO content */
+ write_clb((void *)&arg, "</rpc-reply>", 12);
+ break;
+ case NC_MSG_NOTIF:
+ write_clb((void *)&arg, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+ /* TODO content */
+ write_clb((void *)&arg, "</notification>", 12);
+ break;
+ default:
+ /* just to make compiler quiet */
+ return -1;
+ }
+
+ /* flush message */
+ write_clb((void *)&arg, NULL, 0);
+ return 0;
+}
+
+int
+nc_write_msg(struct nc_session *session, NC_MSG_TYPE type, ...)
+{
+ va_list ap;
+ int r;
+
+ va_start(ap, type);
+
+ if (session->version == NC_VERSION_10) {
+ r = write_msg_10(session, type, ap);
+ } else {
+ r = write_msg_11(session, type, ap);
+ }
+
+ va_end(ap);
+
+ return r;
+}
diff --git a/src/session.c b/src/session.c
index 157f8c0..f54e80e 100644
--- a/src/session.c
+++ b/src/session.c
@@ -339,3 +339,34 @@
return NC_MSG_ERROR;
}
+
+API NC_MSG_TYPE
+nc_send_rpc(struct nc_session* session, struct lyd_node *op, const char *attrs)
+{
+ int r;
+
+ if (!session || !op) {
+ ERR("%s: Invalid parameter", __func__);
+ return NC_MSG_ERROR;
+ } else if (session->side != NC_SIDE_CLIENT) {
+ ERR("%s: only clients are allowed to send RPCs.", __func__);
+ return NC_MSG_ERROR;
+ }
+
+ r = session_ti_lock(session, 0);
+ if (r != 0) {
+ /* error or blocking */
+ return NC_MSG_WOULDBLOCK;
+ }
+
+ r = nc_write_msg(session, NC_MSG_RPC, op, attrs);
+
+ session_ti_unlock(session);
+
+ if (r) {
+ return NC_MSG_ERROR;
+ } else {
+ return NC_MSG_RPC;
+ }
+}
+
diff --git a/src/session.h b/src/session.h
index 742f2d4..086e1cd 100644
--- a/src/session.h
+++ b/src/session.h
@@ -23,6 +23,8 @@
#ifndef NC_SESSION_H_
#define NC_SESSION_H_
+#include <stdint.h>
+
#include "messages.h"
/**
@@ -31,18 +33,54 @@
struct nc_session;
/**
- * @brief Receive NETCONF RPC
+ * @brief Receive NETCONF RPC.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ * server side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ * waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] notif Resulting object of NETCONF RPC.
+ * @return NC_MSG_RPC for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ * when reading has failed.
*/
NC_MSG_TYPE nc_recv_rpc(struct nc_session* session, int timeout, struct nc_rpc **rpc);
/**
- * @brief Receive NETCONF RPC reply
+ * @brief Receive NETCONF RPC reply.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ * client side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ * waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] reply Resulting object of NETCONF RPC reply.
+ * @return NC_MSG_REPLY for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ * when reading has failed.
*/
NC_MSG_TYPE nc_recv_reply(struct nc_session* session, int timeout, struct nc_reply **reply);
/**
- * @brief Receive NETCONF Notification
+ * @brief Receive NETCONF Notification.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ * client side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ * waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] notif Resulting object of NETCONF Notification.
+ * @return NC_MSG_NOTIF for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ * when reading has failed.
*/
NC_MSG_TYPE nc_recv_notif(struct nc_session* session, int timeout, struct nc_notif **notif);
+/**
+ * @brief Send NETCONF RPC message via the session.
+ *
+ * @param[in] session NETCONF session where the RPC will be written.
+ * @param[in] op NETCONF RPC operation to be sent.
+ * @param[in] attrs Additional (optional) XML attributes to be added into the \<rpc\> element.
+ * Note, that "message-id" attribute is added automatically.
+ * @return #NC_MSG_RPC on success, #NC_MSG_WOULDBLOCK in case of busy session
+ * (try to repeat the function call) and #NC_MSG_ERROR in case of error.
+ */
+NC_MSG_TYPE nc_send_rpc(struct nc_session* session, struct lyd_node *op, const char *attrs);
+
#endif /* NC_SESSION_H_ */
diff --git a/src/session_p.h b/src/session_p.h
index c133b09..7770304 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -23,6 +23,7 @@
#ifndef NC_SESSION_PRIVATE_H_
#define NC_SESSION_PRIVATE_H_
+#include <stdint.h>
#include <pthread.h>
#ifdef ENABLE_LIBSSH
@@ -129,6 +130,7 @@
struct ly_ctx *ctx; /**< libyang context of the session */
/* client side only data */
+ uint64_t msgid;
struct nc_reply_cont *replies; /**< queue for RPC replies received instead of notifications */
struct nc_notif_cont *notifs; /**< queue for notifications received instead of RPC reply */
};
@@ -154,4 +156,24 @@
*/
NC_MSG_TYPE nc_read_msg(struct nc_session* session, int timeout, struct lyxml_elem **data);
+/**
+ * @brief Write message into wire.
+ *
+ * @param[in] session NETCONF session to which the message will be written.
+ * @param[in] type Type of the message to write. According to the type, the
+ * specific additional parameters are required or accepted:
+ * - #NC_MSG_RPC
+ * - `struct lyd_node *op;` - operation (content of the \<rpc/\> to be sent. Required parameter.
+ * - `const char *attrs;` - additional attributes to be added into the \<rpc/\> element.
+ * `message-id` attribute is added automatically and default namespace is set to
+ * #NC_NS_BASE. Optional parameter.
+ * - #NC_MSG_REPLY
+ * - `struct nc_rpc *rpc;` - RPC object to reply. Required parameter.
+ * - TODO: content
+ * - #NC_MSG_NOTIF
+ * - TODO: content
+ * @return 0 on success
+ */
+int nc_write_msg(struct nc_session *session, NC_MSG_TYPE type, ...);
+
#endif /* NC_SESSION_PRIVATE_H_ */
diff --git a/tests/data/nc11/rpc-lock b/tests/data/nc11/rpc-lock
new file mode 100644
index 0000000..1e8e833
--- /dev/null
+++ b/tests/data/nc11/rpc-lock
@@ -0,0 +1,7 @@
+
+#11
+<rpc xmlns=
+#103
+"urn:ietf:params:xml:ns:netconf:base:1.0" message-id="1"><lock><target><running/></target></lock></rpc>
+##
+
diff --git a/tests/test_io.c b/tests/test_io.c
index c53fa29..3246dc0 100644
--- a/tests/test_io.c
+++ b/tests/test_io.c
@@ -39,6 +39,7 @@
#include "config.h"
struct nc_session session = {0};
+struct nc_rpc *rpc = NULL;
static int
setup_f(void **state)
@@ -66,6 +67,13 @@
{
(void) state; /* unused */
+ if (rpc) {
+ lyxml_free_elem(session.ctx, rpc->root);
+ lyd_free(rpc->tree);
+ free(rpc);
+ rpc = NULL;
+ }
+
ly_ctx_destroy(session.ctx);
return 0;
@@ -75,16 +83,15 @@
test_read_rpc(void **state)
{
(void) state; /* unused */
- struct nc_rpc *rpc;
NC_MSG_TYPE type;
/* test IO with standard file descriptors */
session.ti_type = NC_TI_FD;
session.ti.fd.c = 0;
session.side = NC_SIDE_SERVER;
- session.version = NC_VERSION_10;
+ session.version = NC_VERSION_11;
- session.ti.fd.in = open(TESTS_DIR"/data/nc10/rpc-lock", O_RDONLY);
+ session.ti.fd.in = open(TESTS_DIR"/data/nc11/rpc-lock", O_RDONLY);
if (session.ti.fd.in == -1) {
fail_msg(" Openning \"%s\" failed (%s)", TESTS_DIR"/data/nc10/rpc-lock", strerror(errno));
}
@@ -93,14 +100,31 @@
assert_int_equal(type, NC_MSG_RPC);
assert_non_null(rpc);
- lyxml_free_elem(session.ctx, rpc->root);
- lyd_free(rpc->tree);
- free(rpc);
+}
+
+static void
+test_write_rpc(void **state)
+{
+ (void) state; /* unused */
+ NC_MSG_TYPE type;
+
+ session.side = NC_SIDE_CLIENT;
+ session.ti.fd.out = STDOUT_FILENO;
+
+ do {
+ type = nc_send_rpc(&session, rpc->tree, NULL);
+ } while(type == NC_MSG_WOULDBLOCK);
+
+ assert_int_equal(type, NC_MSG_RPC);
+
+ write( session.ti.fd.out, "\n", 1);
}
int main(void)
{
- const struct CMUnitTest io[] = {cmocka_unit_test_setup_teardown(test_read_rpc, setup_f, teardown_f)};
+ const struct CMUnitTest io[] = {
+ cmocka_unit_test_setup(test_read_rpc, setup_f),
+ cmocka_unit_test_teardown(test_write_rpc, teardown_f)};
return cmocka_run_group_tests(io, NULL, NULL);
}