FEATURE writing to NETCONF session

Writing reply and notification is not finished - higher level functions
are missing so the way to define what is going to be written is still
unclear even in the low level functions.
diff --git a/src/io.c b/src/io.c
index 88c3142..b896caa 100644
--- a/src/io.c
+++ b/src/io.c
@@ -20,9 +20,11 @@
  *
  */
 
+#define _GNU_SOURCE /* asprintf */
 #include <assert.h>
 #include <errno.h>
 #include <poll.h>
+#include <stdarg.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -32,8 +34,9 @@
 #include "config.h"
 #include "libnetconf.h"
 #include "session_p.h"
+#include "messages_p.h"
 
-#define BUFFERSIZE 4096
+#define BUFFERSIZE 512
 
 static ssize_t
 nc_read(struct nc_session *session, char *buf, size_t count)
@@ -160,7 +163,7 @@
     }
 
     /* terminating null byte */
-    *chunk[r] = 0;
+    (*chunk)[r] = 0;
 
     return r;
 }
@@ -230,6 +233,8 @@
 
     if (result) {
         *result = chunk;
+    } else {
+        free(chunk);
     }
     return count;
 }
@@ -242,7 +247,7 @@
     struct pollfd fds;
     const char *emsg = NULL;
     char *msg = NULL, *chunk, *aux;
-    unsigned long int chunk_len, len;
+    unsigned long int chunk_len, len = 0;
 
     assert(data);
     *data = NULL;
@@ -355,11 +360,11 @@
 
             /* convert string to the size of the following chunk */
             chunk_len = strtoul(chunk, (char **) NULL, 10);
-            if (!chunk_len == 0) {
+            free (chunk);
+            if (!chunk_len) {
                 ERR("Invalid frame chunk size detected, fatal error.");
                 goto error;
             }
-            free (chunk);
 
             /* now we have size of next chunk, so read the chunk */
             status = nc_read_chunk(session, chunk_len, &chunk);
@@ -449,3 +454,343 @@
 
     return NC_MSG_ERROR;
 }
+
+#define WRITE_BUFSIZE (2 * BUFFERSIZE)
+struct wclb_arg {
+    struct nc_session *session;
+    char buf[WRITE_BUFSIZE];
+    size_t len;
+};
+
+static ssize_t
+write_(struct nc_session *session, const void *buf, size_t count)
+{
+    int c = 0;
+    char chunksize[20];
+
+    switch (session->ti_type) {
+    case NC_TI_FD:
+        if (session->version == NC_VERSION_11) {
+            c = dprintf(session->ti.fd.out, "\n#%zu\n", count);
+        }
+        return write(session->ti.fd.out, buf, count) + c;
+
+#ifdef ENABLE_LIBSSH
+    case NC_TI_LIBSSH:
+        if (session->version == NC_VERSION_11) {
+            c = snprintf(chunksize, 20, "\n#%zu\n", count);
+            ssh_channel_write(session->ti.libssh.channel, chunksize, c);
+        }
+        return ssh_channel_write(session->ti.libssh.channel, buf, count) + c;
+#endif
+
+#ifdef ENABLE_TLS
+    case NC_TI_OPENSSL:
+        if (session->version == NC_VERSION_11) {
+            c = snprintf(chunksize, "\n#%zu\n", count);
+            SSL_write(session->ti.tls, chunksize, c);
+        }
+        return SSL_write(session->ti.tls, buf, count) + c;
+#endif
+    }
+
+    return -1;
+}
+
+static int
+write_endtag(struct nc_session *session)
+{
+    switch(session->ti_type) {
+    case NC_TI_FD:
+        if (session->version == NC_VERSION_11) {
+            write(session->ti.fd.out, "\n##\n", 4);
+        } else {
+            write(session->ti.fd.out, "]]>]]>", 6);
+        }
+        break;
+
+#ifdef ENABLE_LIBSSH
+    case NC_TI_LIBSSH:
+        if (session->version == NC_VERSION_11) {
+            ssh_channel_write(session->ti.libssh.channel, "\n##\n", 4);
+        } else {
+            ssh_channel_write(session->ti.libssh.channel, "]]>]]>", 6);
+        }
+        break;
+#endif
+
+#ifdef ENABLE_TLS
+    case NC_TI_OPENSSL:
+        if (session->version == NC_VERSION_11) {
+            SSL_write(session->ti.tls, "\n##\n", 4);
+        } else {
+            SSL_write(session->ti.tls, "]]>]]>", 6);
+        }
+        break;
+#endif
+    }
+
+    return 0;
+}
+
+static void
+write_clb_flush(struct wclb_arg *warg)
+{
+    /* flush current buffer */
+    if (warg->len) {
+        write_(warg->session, warg->buf, warg->len);
+        warg->len = 0;
+    }
+}
+
+static ssize_t
+write_clb(void *arg, const void *buf, size_t count)
+{
+    struct wclb_arg *warg = (struct wclb_arg *)arg;
+
+    if (!buf) {
+        write_clb_flush(warg);
+
+        /* endtag */
+        write_endtag(warg->session);
+        return 0;
+    }
+
+    if (warg->len && (warg->len + count > WRITE_BUFSIZE)) {
+        /* dump current buffer */
+        write_clb_flush(warg);
+    }
+    if (count > WRITE_BUFSIZE) {
+        /* write directly */
+        write_(warg->session, buf, count);
+    } else {
+        /* keep in buffer and write later */
+        memcpy(&warg->buf[warg->len], buf, count);
+        warg->len += count; /* is <= WRITE_BUFSIZE */
+    }
+
+    return (ssize_t)count;
+}
+
+/*
+ * NETCONF 1.0 format
+ */
+static int
+write_msg_10(struct nc_session *session, NC_MSG_TYPE type, va_list ap)
+{
+    int count;
+    const char *attrs;
+    struct lyd_node *content;
+    struct nc_rpc *rpc;
+    struct lyxml_elem *capabilities;
+    char *buf = NULL;
+    struct wclb_arg arg;
+
+    arg.session = session;
+    arg.len = 0;
+
+    switch (type) {
+    case NC_MSG_RPC:
+        content = va_arg(ap, struct lyd_node *);
+        attrs = va_arg(ap, const char *);
+        switch (session->ti_type) {
+        case NC_TI_FD:
+            dprintf(session->ti.fd.out, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+                    NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+            lyd_print_fd(session->ti.fd.out, content, LYD_XML);
+            write(session->ti.fd.out, "</rpc>]]>]]>", 12);
+            break;
+
+#ifdef ENABLE_LIBSSH
+        case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+        case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+            count = asprintf(&buf, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+                             NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+            write_clb((void *)&arg, buf, count);
+            free(buf);
+            lyd_print_clb(write_clb, (void *)&arg, content, LYD_XML);
+            write_clb((void *)&arg, "</rpc>", 6);
+
+            /* flush message */
+            write_clb((void *)&arg, NULL, 0);
+            break;
+#endif
+        }
+
+        session->msgid++;
+        break;
+
+    case NC_MSG_REPLY:
+        rpc = va_arg(ap, struct nc_rpc *);
+        switch (session->ti_type) {
+        case NC_TI_FD:
+            write(session->ti.fd.out, "<rpc-reply", 10);
+            lyxml_dump_fd(session->ti.fd.out, rpc->root, LYXML_DUMP_ATTRS);
+            write(session->ti.fd.out, ">", 1);
+
+            /* TODO content */
+
+            write(session->ti.fd.out, "</rpc-reply>]]>]]>", 18);
+            break;
+
+#ifdef ENABLE_LIBSSH
+        case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+        case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+            write_clb((void *)&arg, "<rpc-reply", 10);
+            lyxml_dump_clb(write_clb, (void *)&arg, rpc->root, LYXML_DUMP_ATTRS);
+
+            /* TODO content */
+
+            write_clb((void *)&arg, "</rpc-reply>", 12);
+
+            /* flush message */
+            write_clb((void *)&arg, NULL, 0);
+            break;
+#endif
+        }
+        break;
+
+    case NC_MSG_NOTIF:
+        switch (session->ti_type) {
+        case NC_TI_FD:
+            write(session->ti.fd.out, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+
+            /* TODO content */
+
+            write(session->ti.fd.out, "</notification>]]>]]>", 18);
+            break;
+
+#ifdef ENABLE_LIBSSH
+        case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+        case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+            write_clb((void *)&arg, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+
+            /* TODO content */
+
+            write_clb((void *)&arg, "</notification>", 12);
+
+            /* flush message */
+            write_clb((void *)&arg, NULL, 0);
+            break;
+#endif
+        }
+        break;
+
+    case NC_MSG_HELLO:
+        capabilities = va_arg(ap, struct lyxml_elem *);
+        switch (session->ti_type) {
+        case NC_TI_FD:
+            dprintf(session->ti.fd.out, "<hello xmlns=\"%s\">", NC_NS_BASE);
+            lyxml_dump_fd(session->ti.fd.out, capabilities, 0);
+            write(session->ti.fd.out, "</hello>]]>]]>", 11);
+            break;
+
+#ifdef ENABLE_LIBSSH
+        case NC_TI_LIBSSH:
+#endif
+#ifdef ENABLE_TLS
+        case NC_TI_OPENSSL:
+#endif
+#if defined(ENABLE_LIBSSH) || defined(ENABLE_TLS)
+            count = asprintf(&buf, "<hello xmlns=\"%s\">", NC_NS_BASE);
+            write_clb((void *)&arg, buf, count);
+            free(buf);
+            lyxml_dump_clb(write_clb, (void *)&arg, capabilities, 0);
+            write_clb((void *)&arg, "</hello>", 8);
+
+            /* flush message */
+            write_clb((void *)&arg, NULL, 0);
+            break;
+#endif
+        }
+        break;
+
+    default:
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * NETCONF 1.1 format
+ */
+static int
+write_msg_11(struct nc_session *session, NC_MSG_TYPE type, va_list ap)
+{
+    int count;
+    const char *attrs;
+    struct lyd_node *content;
+    struct nc_rpc *rpc;
+    char *buf = NULL;
+    struct wclb_arg arg;
+
+    arg.session = session;
+    arg.len = 0;
+
+    switch (type) {
+    case NC_MSG_RPC:
+        content = va_arg(ap, struct lyd_node *);
+        attrs = va_arg(ap, const char *);
+        count = asprintf(&buf, "<rpc xmlns=\"%s\" message-id=\"%"PRIu64"\"%s>",
+                         NC_NS_BASE, session->msgid + 1, attrs ? attrs : "");
+        write_clb((void *)&arg, buf, count);
+        free(buf);
+        lyd_print_clb(write_clb, (void *)&arg, content, LYD_XML);
+        write_clb((void *)&arg, "</rpc>", 6);
+
+        session->msgid++;
+        break;
+    case NC_MSG_REPLY:
+        rpc = va_arg(ap, struct nc_rpc *);
+        write_clb((void *)&arg, "<rpc-reply", 10);
+        lyxml_dump_clb(write_clb, (void *)&arg, rpc->root, LYXML_DUMP_ATTRS);
+        write_clb((void *)&arg, ">", 1);
+        /* TODO content */
+        write_clb((void *)&arg, "</rpc-reply>", 12);
+        break;
+    case NC_MSG_NOTIF:
+        write_clb((void *)&arg, "<notification xmlns=\""NC_NS_NOTIF"\"/>", 21 + 47 + 3);
+        /* TODO content */
+        write_clb((void *)&arg, "</notification>", 12);
+        break;
+    default:
+        /* just to make compiler quiet */
+        return -1;
+    }
+
+    /* flush message */
+    write_clb((void *)&arg, NULL, 0);
+    return 0;
+}
+
+int
+nc_write_msg(struct nc_session *session, NC_MSG_TYPE type, ...)
+{
+    va_list ap;
+    int r;
+
+    va_start(ap, type);
+
+    if (session->version == NC_VERSION_10) {
+        r = write_msg_10(session, type, ap);
+    } else {
+        r = write_msg_11(session, type, ap);
+    }
+
+    va_end(ap);
+
+    return r;
+}
diff --git a/src/session.c b/src/session.c
index 157f8c0..f54e80e 100644
--- a/src/session.c
+++ b/src/session.c
@@ -339,3 +339,34 @@
 
     return NC_MSG_ERROR;
 }
+
+API NC_MSG_TYPE
+nc_send_rpc(struct nc_session* session, struct lyd_node *op, const char *attrs)
+{
+    int r;
+
+    if (!session || !op) {
+        ERR("%s: Invalid parameter", __func__);
+        return NC_MSG_ERROR;
+    } else if (session->side != NC_SIDE_CLIENT) {
+        ERR("%s: only clients are allowed to send RPCs.", __func__);
+        return NC_MSG_ERROR;
+    }
+
+    r = session_ti_lock(session, 0);
+    if (r != 0) {
+        /* error or blocking */
+        return NC_MSG_WOULDBLOCK;
+    }
+
+    r = nc_write_msg(session, NC_MSG_RPC, op, attrs);
+
+    session_ti_unlock(session);
+
+    if (r) {
+        return NC_MSG_ERROR;
+    } else {
+        return NC_MSG_RPC;
+    }
+}
+
diff --git a/src/session.h b/src/session.h
index 742f2d4..086e1cd 100644
--- a/src/session.h
+++ b/src/session.h
@@ -23,6 +23,8 @@
 #ifndef NC_SESSION_H_
 #define NC_SESSION_H_
 
+#include <stdint.h>
+
 #include "messages.h"
 
 /**
@@ -31,18 +33,54 @@
 struct nc_session;
 
 /**
- * @brief Receive NETCONF RPC
+ * @brief Receive NETCONF RPC.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ *            server side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ *            waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] notif Resulting object of NETCONF RPC.
+ * @return NC_MSG_RPC for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ *         when reading has failed.
  */
 NC_MSG_TYPE nc_recv_rpc(struct nc_session* session, int timeout, struct nc_rpc **rpc);
 
 /**
- * @brief Receive NETCONF RPC reply
+ * @brief Receive NETCONF RPC reply.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ *            client side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ *            waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] reply Resulting object of NETCONF RPC reply.
+ * @return NC_MSG_REPLY for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ *         when reading has failed.
  */
 NC_MSG_TYPE nc_recv_reply(struct nc_session* session, int timeout, struct nc_reply **reply);
 
 /**
- * @brief Receive NETCONF Notification
+ * @brief Receive NETCONF Notification.
+ *
+ * @param[in] session NETCONF session from which the function gets data. It must be the
+ *            client side session object.
+ * @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite
+ *            waiting and 0 for immediate return if data are not available on wire.
+ * @param[out] notif Resulting object of NETCONF Notification.
+ * @return NC_MSG_NOTIF for success, NC_MSG_WOULDBLOCK if timeout reached and NC_MSG_ERROR
+ *         when reading has failed.
  */
 NC_MSG_TYPE nc_recv_notif(struct nc_session* session, int timeout, struct nc_notif **notif);
 
+/**
+ * @brief Send NETCONF RPC message via the session.
+ *
+ * @param[in] session NETCONF session where the RPC will be written.
+ * @param[in] op NETCONF RPC operation to be sent.
+ * @param[in] attrs Additional (optional) XML attributes to be added into the \<rpc\> element.
+ *            Note, that "message-id" attribute is added automatically.
+ * @return #NC_MSG_RPC on success, #NC_MSG_WOULDBLOCK in case of busy session
+ * (try to repeat the function call) and #NC_MSG_ERROR in case of error.
+ */
+NC_MSG_TYPE nc_send_rpc(struct nc_session* session, struct lyd_node *op, const char *attrs);
+
 #endif /* NC_SESSION_H_ */
diff --git a/src/session_p.h b/src/session_p.h
index c133b09..7770304 100644
--- a/src/session_p.h
+++ b/src/session_p.h
@@ -23,6 +23,7 @@
 #ifndef NC_SESSION_PRIVATE_H_
 #define NC_SESSION_PRIVATE_H_
 
+#include <stdint.h>
 #include <pthread.h>
 
 #ifdef ENABLE_LIBSSH
@@ -129,6 +130,7 @@
     struct ly_ctx *ctx;            /**< libyang context of the session */
 
     /* client side only data */
+    uint64_t msgid;
     struct nc_reply_cont *replies; /**< queue for RPC replies received instead of notifications */
     struct nc_notif_cont *notifs;  /**< queue for notifications received instead of RPC reply */
 };
@@ -154,4 +156,24 @@
  */
 NC_MSG_TYPE nc_read_msg(struct nc_session* session, int timeout, struct lyxml_elem **data);
 
+/**
+ * @brief Write message into wire.
+ *
+ * @param[in] session NETCONF session to which the message will be written.
+ * @param[in] type Type of the message to write. According to the type, the
+ * specific additional parameters are required or accepted:
+ * - #NC_MSG_RPC
+ *   - `struct lyd_node *op;` - operation (content of the \<rpc/\> to be sent. Required parameter.
+ *   - `const char *attrs;` - additional attributes to be added into the \<rpc/\> element.
+ *     `message-id` attribute is added automatically and default namespace is set to
+ *     #NC_NS_BASE. Optional parameter.
+ * - #NC_MSG_REPLY
+ *   - `struct nc_rpc *rpc;` - RPC object to reply. Required parameter.
+ *   - TODO: content
+ * - #NC_MSG_NOTIF
+ *   - TODO: content
+ * @return 0 on success
+ */
+int nc_write_msg(struct nc_session *session, NC_MSG_TYPE type, ...);
+
 #endif /* NC_SESSION_PRIVATE_H_ */
diff --git a/tests/data/nc11/rpc-lock b/tests/data/nc11/rpc-lock
new file mode 100644
index 0000000..1e8e833
--- /dev/null
+++ b/tests/data/nc11/rpc-lock
@@ -0,0 +1,7 @@
+
+#11
+<rpc xmlns=
+#103
+"urn:ietf:params:xml:ns:netconf:base:1.0" message-id="1"><lock><target><running/></target></lock></rpc>
+##
+
diff --git a/tests/test_io.c b/tests/test_io.c
index c53fa29..3246dc0 100644
--- a/tests/test_io.c
+++ b/tests/test_io.c
@@ -39,6 +39,7 @@
 #include "config.h"
 
 struct nc_session session = {0};
+struct nc_rpc *rpc = NULL;
 
 static int
 setup_f(void **state)
@@ -66,6 +67,13 @@
 {
     (void) state; /* unused */
 
+    if (rpc) {
+        lyxml_free_elem(session.ctx, rpc->root);
+        lyd_free(rpc->tree);
+        free(rpc);
+        rpc = NULL;
+    }
+
     ly_ctx_destroy(session.ctx);
 
     return 0;
@@ -75,16 +83,15 @@
 test_read_rpc(void **state)
 {
     (void) state; /* unused */
-    struct nc_rpc *rpc;
     NC_MSG_TYPE type;
 
     /* test IO with standard file descriptors */
     session.ti_type = NC_TI_FD;
     session.ti.fd.c = 0;
     session.side = NC_SIDE_SERVER;
-    session.version = NC_VERSION_10;
+    session.version = NC_VERSION_11;
 
-    session.ti.fd.in = open(TESTS_DIR"/data/nc10/rpc-lock", O_RDONLY);
+    session.ti.fd.in = open(TESTS_DIR"/data/nc11/rpc-lock", O_RDONLY);
     if (session.ti.fd.in == -1) {
         fail_msg(" Openning \"%s\" failed (%s)", TESTS_DIR"/data/nc10/rpc-lock", strerror(errno));
     }
@@ -93,14 +100,31 @@
     assert_int_equal(type, NC_MSG_RPC);
     assert_non_null(rpc);
 
-    lyxml_free_elem(session.ctx, rpc->root);
-    lyd_free(rpc->tree);
-    free(rpc);
+}
+
+static void
+test_write_rpc(void **state)
+{
+    (void) state; /* unused */
+    NC_MSG_TYPE type;
+
+    session.side = NC_SIDE_CLIENT;
+    session.ti.fd.out = STDOUT_FILENO;
+
+    do {
+        type = nc_send_rpc(&session, rpc->tree, NULL);
+    } while(type == NC_MSG_WOULDBLOCK);
+
+    assert_int_equal(type, NC_MSG_RPC);
+
+    write( session.ti.fd.out, "\n", 1);
 }
 
 int main(void)
 {
-    const struct CMUnitTest io[] = {cmocka_unit_test_setup_teardown(test_read_rpc, setup_f, teardown_f)};
+    const struct CMUnitTest io[] = {
+        cmocka_unit_test_setup(test_read_rpc, setup_f),
+        cmocka_unit_test_teardown(test_write_rpc, teardown_f)};
 
     return cmocka_run_group_tests(io, NULL, NULL);
 }