Signed-off-by: Thomas Schoebel-Theuer <[email protected]>
---
 drivers/staging/mars/xio_bricks/xio_net.c | 1830 +++++++++++++++++++++++++++++
 include/linux/xio/xio_net.h               |  171 +++
 2 files changed, 2001 insertions(+)
 create mode 100644 drivers/staging/mars/xio_bricks/xio_net.c
 create mode 100644 include/linux/xio/xio_net.h

diff --git a/drivers/staging/mars/xio_bricks/xio_net.c 
b/drivers/staging/mars/xio_bricks/xio_net.c
new file mode 100644
index 0000000..dcc443c
--- /dev/null
+++ b/drivers/staging/mars/xio_bricks/xio_net.c
@@ -0,0 +1,1830 @@
+/*
+ * MARS Long Distance Replication Software
+ *
+ * Copyright (C) 2010-2014 Thomas Schoebel-Theuer
+ * Copyright (C) 2011-2014 1&1 Internet AG
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/moduleparam.h>
+#include <linux/lzo.h>
+#include <linux/utsname.h>
+
+#include <linux/xio/xio.h>
+#include <linux/xio/xio_net.h>
+
+/******************************************************************/
+
+/*  provisionary version detection */
+
+#ifndef TCP_MAX_REORDERING
+#define __HAS_IOV_ITER
+#endif
+
+#ifdef sk_net_refcnt
+/* see eeb1bd5c40edb0e2fd925c8535e2fdebdbc5cef2 */
+#define __HAS_STRUCT_NET
+#endif
+
+/******************************************************************/
+
+#define USE_BUFFERING
+
+#define SEND_PROTO_VERSION             2
+
+enum COMPRESS_TYPES {
+       COMPRESS_NONE = 0,
+       COMPRESS_LZO = 1,
+       /* insert further methods here */
+};
+
+int xio_net_compress_data;
+
+const u16 net_global_flags = 0
+#ifdef __HAVE_LZO
+       | COMPRESS_LZO
+#endif
+       ;
+
+/******************************************************************/
+
+/* Internal data structures for low-level transfer of C structures
+ * described by struct meta.
+ * Only these low-level fields need to have a fixed size like s64.
+ * The size and bytesex of the higher-level C structures is converted
+ * automatically; therefore classical "int" or "long long" etc is viable.
+ */
+
+#define MAX_FIELD_LEN                  (32 + 16)
+
+/* Please keep this at a size of 64 bytes by
+ * reuse of *spare* fields.
+ */
+struct xio_desc_cache {
+       u8    cache_sender_proto;
+       u8    cache_recver_proto;
+       s8    cache_is_bigendian;
+       u8    cache_spare0;
+       s16   cache_items;
+       u16   cache_spare1;
+       u32   cache_spare2;
+       u32   cache_spare3;
+       u64   cache_spare4[4];
+       u64   cache_sender_cookie;
+       u64   cache_recver_cookie;
+};
+
+/* Please keep this also at a size of 64 bytes by
+ * reuse of *spare* fields.
+ */
+struct xio_desc_item {
+       s8    field_type;
+       s8    field_spare0;
+       s16   field_data_size;
+       s16   field_sender_size;
+       s16   field_sender_offset;
+       s16   field_recver_size;
+       s16   field_recver_offset;
+       s32   field_spare;
+       char  field_name[MAX_FIELD_LEN];
+};
+
+/* This must not be mirror symmetric between big and little endian
+ */
+#define XIO_DESC_MAGIC                 0x73D0A2EC6148F48Ell
+
+struct xio_desc_header {
+       u64 h_magic;
+       u64 h_cookie;
+       s16 h_meta_len;
+       s16 h_index;
+       u32 h_spare1;
+       u64 h_spare2;
+};
+
+#define MAX_INT_TRANSFER               16
+
+/******************************************************************/
+
+/* Bytesex conversion / sign extension
+ */
+
+#ifdef __LITTLE_ENDIAN
+static const bool myself_is_bigendian;
+
+#endif
+#ifdef __BIG_ENDIAN
+static const bool myself_is_bigendian = true;
+
+#endif
+
+static inline
+void swap_bytes(void *data, int len)
+{
+       char *a = data;
+       char *b = data + len - 1;
+
+       while (a < b) {
+               char tmp = *a;
+
+               *a = *b;
+               *b = tmp;
+               a++;
+               b--;
+       }
+}
+
+#define SWAP_FIELD(x) swap_bytes(&(x), sizeof(x))
+
+static inline
+void swap_mc(struct xio_desc_cache *mc, int len)
+{
+       struct xio_desc_item *mi;
+
+       SWAP_FIELD(mc->cache_sender_cookie);
+       SWAP_FIELD(mc->cache_recver_cookie);
+       SWAP_FIELD(mc->cache_items);
+
+       len -= sizeof(*mc);
+
+       for (mi = (void *)(mc + 1); len > 0; mi++, len -= sizeof(*mi)) {
+               SWAP_FIELD(mi->field_data_size);
+               SWAP_FIELD(mi->field_sender_size);
+               SWAP_FIELD(mi->field_sender_offset);
+               SWAP_FIELD(mi->field_recver_size);
+               SWAP_FIELD(mi->field_recver_offset);
+       }
+}
+
+static inline
+char get_sign(const void *data, int len, bool is_bigendian, bool is_signed)
+{
+       if (is_signed) {
+               char x = is_bigendian ?
+                       ((const char *)data)[0] :
+                       ((const char *)data)[len - 1];
+               if (x < 0)
+                       return -1;
+       }
+       return 0;
+}
+
+/******************************************************************/
+
+/* Low-level network traffic
+ */
+
+int xio_net_default_port = CONFIG_MARS_DEFAULT_PORT;
+
+module_param_named(xio_port, xio_net_default_port, int, 0);
+
+int xio_net_bind_before_listen = 1;
+
+module_param_named(xio_net_bind_before_listen, xio_net_bind_before_listen, 
int, 0);
+
+int xio_net_bind_before_connect = 1;
+
+/* TODO: make all the socket options configurable.
+ * TODO: implement signal handling.
+ * TODO: add authentication.
+ * TODO: add encryption.
+ */
+
+struct xio_tcp_params default_tcp_params = {
+       .ip_tos = IPTOS_LOWDELAY,
+       .tcp_window_size = 8 * 1024 * 1024, /*  for long distance replications 
*/
+       .tcp_nodelay = 0,
+       .tcp_timeout = 2,
+       .tcp_keepcnt = 3,
+       .tcp_keepintvl = 3, /*  keepalive ping time */
+       .tcp_keepidle = 4,
+};
+
+static char *id;
+
+char *my_id(void)
+{
+       struct new_utsname *u;
+
+       if (!id) {
+               /* down_read(&uts_sem); // FIXME: this is currenty not EXPORTed 
from the kernel! */
+               u = utsname();
+               if (u)
+                       id = brick_strdup(u->nodename);
+               /* up_read(&uts_sem); */
+       }
+       return id;
+}
+
+static
+void __setsockopt(struct socket *sock, int level, int optname, char *optval, 
int optsize)
+{
+       int status = kernel_setsockopt(sock, level, optname, optval, optsize);
+
+       if (status < 0) {
+               XIO_WRN("cannot set %d socket option %d to value %d, status = 
%d\n",
+                        level, optname, *(int *)optval, status);
+       }
+}
+
+#define _setsockopt(sock, level, optname, val) __setsockopt(sock, level, 
optname, (char *)&(val), sizeof(val))
+
+int xio_create_sockaddr(struct sockaddr_storage *addr, const char *spec)
+{
+       struct sockaddr_in *sockaddr = (void *)addr;
+       const char *new_spec;
+       const char *tmp_spec;
+       int status = 0;
+
+       memset(addr, 0, sizeof(*addr));
+       sockaddr->sin_family = AF_INET;
+       sockaddr->sin_port = htons(xio_net_default_port);
+
+       /* Try to translate hostnames to IPs if possible.
+        */
+       if (xio_translate_hostname)
+               new_spec = xio_translate_hostname(spec);
+       else
+               new_spec = brick_strdup(spec);
+       tmp_spec = new_spec;
+
+       /* This is PROVISIONARY!
+        * TODO: add IPV6 syntax and many more features :)
+        */
+       if (!*tmp_spec)
+               goto done;
+       if (*tmp_spec != ':') {
+               unsigned char u0 = 0, u1 = 0, u2 = 0, u3 = 0;
+
+               status = sscanf(tmp_spec, "%hhu.%hhu.%hhu.%hhu", &u0, &u1, &u2, 
&u3);
+               if (status != 4) {
+                       XIO_ERR("invalid sockaddr IP syntax '%s', status = 
%d\n", tmp_spec, status);
+                       status = -EINVAL;
+                       goto done;
+               }
+               XIO_DBG("decoded IP = %u.%u.%u.%u\n", u0, u1, u2, u3);
+               sockaddr->sin_addr.s_addr = (__be32)u0 | (__be32)u1 << 8 | 
(__be32)u2 << 16 | (__be32)u3 << 24;
+       }
+       /*  deocde port number (when present) */
+       tmp_spec = spec;
+       while (*tmp_spec && *tmp_spec++ != ':')
+               /*empty*/;
+       if (*tmp_spec) {
+               int port = 0;
+
+               status = kstrtoint(tmp_spec, 10, &port);
+               if (unlikely(status)) {
+                       XIO_ERR("invalid sockaddr PORT syntax '%s', status = 
%d\n", tmp_spec, status);
+                       status = -EINVAL;
+                       goto done;
+               }
+               XIO_DBG("decoded PORT = %d\n", port);
+               sockaddr->sin_port = htons(port);
+       }
+       status = 0;
+done:
+       brick_string_free(new_spec);
+       return status;
+}
+
+static int current_debug_nr; /*  no locking, just for debugging */
+
+static
+void _set_socketopts(struct socket *sock)
+{
+       struct timeval t = {
+               .tv_sec = default_tcp_params.tcp_timeout,
+       };
+       int x_true = 1;
+
+       /* TODO: improve this by a table-driven approach
+        */
+       sock->sk->sk_rcvtimeo = sock->sk->sk_sndtimeo = 
default_tcp_params.tcp_timeout * HZ;
+       sock->sk->sk_reuse = 1;
+       _setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, 
default_tcp_params.tcp_window_size);
+       _setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 
default_tcp_params.tcp_window_size);
+       _setsockopt(sock, SOL_IP, SO_PRIORITY, default_tcp_params.ip_tos);
+       _setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, 
default_tcp_params.tcp_nodelay);
+       _setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, x_true);
+       _setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, 
default_tcp_params.tcp_keepcnt);
+       _setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, 
default_tcp_params.tcp_keepintvl);
+       _setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, 
default_tcp_params.tcp_keepidle);
+       _setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, t);
+       _setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, t);
+
+       if (sock->file) { /*  switch back to blocking mode */
+               sock->file->f_flags &= ~O_NONBLOCK;
+       }
+}
+
+static int _xio_send_raw(struct xio_socket *msock, const void *buf, int len, 
int flags);
+static int _xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int 
maxlen, int flags);
+
+static
+void xio_proto_check(struct xio_socket *msock)
+{
+       u8 service_version = 0;
+       u16 service_flags = 0;
+       int status;
+
+       status = _xio_recv_raw(msock, &service_version, 1, 1, 0);
+       if (unlikely(status < 0)) {
+               XIO_DBG("#%d protocol exchange failed at receiving, status = 
%d\n",
+                        msock->s_debug_nr,
+                        status);
+               goto out_return;
+       }
+
+       /*  take the the minimum of both protocol versions */
+       if (service_version > msock->s_send_proto)
+               service_version = msock->s_send_proto;
+       msock->s_send_proto = service_version;
+
+       status = _xio_recv_raw(msock, &service_flags, 2, 2, 0);
+       if (unlikely(status < 0)) {
+               XIO_DBG("#%d protocol exchange failed at receiving, status = 
%d\n",
+                        msock->s_debug_nr,
+                        status);
+               goto out_return;
+       }
+
+       msock->s_recv_flags = service_flags;
+out_return:;
+}
+
+static
+int xio_proto_exchange(struct xio_socket *msock, const char *msg)
+{
+       int status;
+
+       msock->s_send_proto = SEND_PROTO_VERSION;
+       status = xio_send_raw(msock, &msock->s_send_proto, 1, false);
+       if (unlikely(status < 0)) {
+               XIO_DBG("#%d protocol exchange on %s failed at sending, status 
= %d\n",
+                        msock->s_debug_nr,
+                        msg,
+                        status);
+               goto done;
+       }
+
+       msock->s_send_flags = net_global_flags;
+       status = xio_send_raw(msock, &msock->s_send_flags, 2, false);
+       if (unlikely(status < 0)) {
+               XIO_DBG("#%d flags exchange on %s failed at sending, status = 
%d\n",
+                        msock->s_debug_nr,
+                        msg,
+                        status);
+               goto done;
+       }
+
+done:
+       return status;
+}
+
+int xio_create_socket(struct xio_socket *msock, struct sockaddr_storage 
*src_addr, struct sockaddr_storage *dst_addr)
+{
+       struct socket *sock;
+       struct sockaddr *src_sockaddr = (void *)src_addr;
+       struct sockaddr *dst_sockaddr = (void *)dst_addr;
+       int status = -EEXIST;
+
+       if (unlikely(atomic_read(&msock->s_count))) {
+               XIO_ERR("#%d socket already in use\n", msock->s_debug_nr);
+               goto final;
+       }
+       if (unlikely(msock->s_socket)) {
+               XIO_ERR("#%d socket already open\n", msock->s_debug_nr);
+               goto final;
+       }
+       atomic_set(&msock->s_count, 1);
+
+#ifdef __HAS_STRUCT_NET
+       status = sock_create_kern(&init_net, AF_INET, SOCK_STREAM, IPPROTO_TCP, 
&msock->s_socket);
+#else
+       status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, 
&msock->s_socket);
+#endif
+       if (unlikely(status < 0 || !msock->s_socket)) {
+               msock->s_socket = NULL;
+               XIO_WRN("cannot create socket, status = %d\n", status);
+               goto final;
+       }
+       msock->s_debug_nr = ++current_debug_nr;
+       sock = msock->s_socket;
+       CHECK_PTR(sock, done);
+       msock->s_alive = true;
+
+       _set_socketopts(sock);
+
+       if (!dst_sockaddr) { /* we are server */
+               struct sockaddr_in bind_addr;
+
+               if (unlikely(!src_sockaddr)) {
+                       XIO_ERR("no srcaddr given for bind()\n");
+                       status = -EINVAL;
+                       goto done;
+               }
+
+               memcpy(&bind_addr, src_sockaddr, sizeof(bind_addr));
+               if (!xio_net_bind_before_listen)
+                       memset(&bind_addr.sin_addr, 0, 
sizeof(bind_addr.sin_addr));
+
+               status = kernel_bind(sock, (struct sockaddr *)&bind_addr, 
sizeof(bind_addr));
+               if (unlikely(status < 0)) {
+                       XIO_WRN("#%d bind failed, status = %d\n", 
msock->s_debug_nr, status);
+                       goto done;
+               }
+               status = kernel_listen(sock, 16);
+               if (status < 0)
+                       XIO_WRN("#%d listen failed, status = %d\n", 
msock->s_debug_nr, status);
+       } else {
+               /* When both src and dst are given, explicitly bind local 
address.
+                * Needed for multihomed hosts.
+                */
+               if (src_sockaddr && xio_net_bind_before_connect) {
+                       struct sockaddr_in bind_addr;
+
+                       memcpy(&bind_addr, src_sockaddr, sizeof(bind_addr));
+                       bind_addr.sin_port = 0;
+
+                       status = kernel_bind(sock, (struct sockaddr 
*)&bind_addr, sizeof(struct sockaddr));
+                       if (unlikely(status < 0)) {
+                               XIO_WRN("#%d bind before connect failed 
(ignored), status = %d\n",
+                                        msock->s_debug_nr, status);
+                       }
+               }
+
+               status = kernel_connect(sock, dst_sockaddr, 
sizeof(*dst_sockaddr), 0);
+               /* Treat non-blocking connects as successful.
+                * Any potential errors will show up later during traffic.
+                */
+               if (status == -EINPROGRESS) {
+                       XIO_DBG("#%d connect in progress\n", msock->s_debug_nr);
+                       status = 0;
+               }
+               if (unlikely(status < 0)) {
+                       XIO_DBG("#%d connect failed, status = %d\n", 
msock->s_debug_nr, status);
+                       goto done;
+               }
+               status = xio_proto_exchange(msock, "connect");
+       }
+
+done:
+       if (status < 0)
+               xio_put_socket(msock);
+       else
+               XIO_DBG("successfully created socket #%d\n", msock->s_debug_nr);
+final:
+       return status;
+}
+
+int xio_accept_socket(struct xio_socket *new_msock, struct xio_socket 
*old_msock)
+{
+       int status = -ENOENT;
+       struct socket *new_socket = NULL;
+       bool ok;
+
+       ok = xio_get_socket(old_msock);
+       if (likely(ok)) {
+               struct socket *sock = old_msock->s_socket;
+
+               if (unlikely(!sock))
+                       goto err;
+
+               status = kernel_accept(sock, &new_socket, O_NONBLOCK);
+               if (unlikely(status < 0))
+                       goto err;
+               if (unlikely(!new_socket)) {
+                       status = -EBADF;
+                       goto err;
+               }
+
+               _set_socketopts(new_socket);
+
+               memset(new_msock, 0, sizeof(struct xio_socket));
+               new_msock->s_socket = new_socket;
+               atomic_set(&new_msock->s_count, 1);
+               new_msock->s_alive = true;
+               new_msock->s_debug_nr = ++current_debug_nr;
+               XIO_DBG("#%d successfully accepted socket #%d\n", 
old_msock->s_debug_nr, new_msock->s_debug_nr);
+
+               status = xio_proto_exchange(new_msock, "accept");
+err:
+               xio_put_socket(old_msock);
+       }
+       return status;
+}
+
+bool xio_get_socket(struct xio_socket *msock)
+{
+       if (unlikely(atomic_read(&msock->s_count) <= 0)) {
+               XIO_ERR("#%d bad nesting on msock = %p\n", msock->s_debug_nr, 
msock);
+               return false;
+       }
+
+       atomic_inc(&msock->s_count);
+
+       if (unlikely(!msock->s_socket || !msock->s_alive)) {
+               xio_put_socket(msock);
+               return false;
+       }
+       return true;
+}
+
+void xio_put_socket(struct xio_socket *msock)
+{
+       if (unlikely(atomic_read(&msock->s_count) <= 0)) {
+               XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", 
msock->s_debug_nr, msock, msock->s_socket);
+       } else if (atomic_dec_and_test(&msock->s_count)) {
+               struct socket *sock = msock->s_socket;
+               int i;
+
+               XIO_DBG("#%d closing socket %p\n", msock->s_debug_nr, sock);
+               if (likely(sock && cmpxchg(&msock->s_alive, true, false)))
+                       kernel_sock_shutdown(sock, SHUT_RDWR);
+               if (likely(sock && !msock->s_alive)) {
+                       XIO_DBG("#%d releasing socket %p\n", msock->s_debug_nr, 
sock);
+                       sock_release(sock);
+               }
+               for (i = 0; i < MAX_DESC_CACHE; i++) {
+                       if (msock->s_desc_send[i])
+                               brick_block_free(msock->s_desc_send[i], 
PAGE_SIZE);
+                       if (msock->s_desc_recv[i])
+                               brick_block_free(msock->s_desc_recv[i], 
PAGE_SIZE);
+               }
+               brick_block_free(msock->s_buffer, PAGE_SIZE);
+               memset(msock, 0, sizeof(struct xio_socket));
+       }
+}
+
+void xio_shutdown_socket(struct xio_socket *msock)
+{
+       if (msock->s_socket) {
+               bool ok = xio_get_socket(msock);
+
+               if (likely(ok)) {
+                       struct socket *sock = msock->s_socket;
+
+                       if (likely(sock && cmpxchg(&msock->s_alive, true, 
false))) {
+                               XIO_DBG("#%d shutdown socket %p\n", 
msock->s_debug_nr, sock);
+                               kernel_sock_shutdown(sock, SHUT_RDWR);
+                       }
+                       xio_put_socket(msock);
+               }
+       }
+}
+
+bool xio_socket_is_alive(struct xio_socket *msock)
+{
+       bool res = false;
+
+       if (!msock->s_socket || !msock->s_alive)
+               goto done;
+       if (unlikely(atomic_read(&msock->s_count) <= 0)) {
+               XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", 
msock->s_debug_nr, msock, msock->s_socket);
+               goto done;
+       }
+       res = true;
+done:
+       return res;
+}
+
+long xio_socket_send_space_available(struct xio_socket *msock)
+{
+       struct socket *raw_sock = msock->s_socket;
+       long res = 0;
+
+       if (!msock->s_alive || !raw_sock || !raw_sock->sk)
+               goto done;
+       if (unlikely(atomic_read(&msock->s_count) <= 0)) {
+               XIO_ERR("#%d bad nesting on msock = %p sock = %p\n", 
msock->s_debug_nr, msock, msock->s_socket);
+               goto done;
+       }
+
+       res = raw_sock->sk->sk_sndbuf - raw_sock->sk->sk_wmem_queued;
+       if (res < 0)
+               res = 0;
+       res += msock->s_pos;
+
+done:
+       return res;
+}
+
+static
+int _xio_send_raw(struct xio_socket *msock, const void *buf, int len, int 
flags)
+{
+       int sleeptime = 1000 / HZ;
+       int sent = 0;
+       int status = 0;
+
+       msock->s_send_cnt = 0;
+       while (len > 0) {
+               int this_len = len;
+               struct socket *sock = msock->s_socket;
+
+               if (unlikely(!sock || !xio_net_is_alive || 
brick_thread_should_stop())) {
+                       XIO_WRN("interrupting, sent = %d\n", sent);
+                       status = -EIDRM;
+                       break;
+               }
+
+               {
+                       struct kvec iov = {
+                               .iov_base = (void *)buf,
+                               .iov_len = this_len,
+                       };
+                       struct msghdr msg = {
+#ifndef __HAS_IOV_ITER
+                               .msg_iov = (struct iovec *)&iov,
+#endif
+                               .msg_flags = 0 | MSG_NOSIGNAL,
+                       };
+                       status = kernel_sendmsg(sock, &msg, &iov, 1, this_len);
+               }
+
+               if (status == -EAGAIN) {
+                       if (msock->s_send_abort > 0 && ++msock->s_send_cnt > 
msock->s_send_abort) {
+                               XIO_WRN("#%d reached send abort %d\n", 
msock->s_debug_nr, msock->s_send_abort);
+                               status = -EINTR;
+                               break;
+                       }
+                       brick_msleep(sleeptime);
+                       /*  linearly increasing backoff */
+                       if (sleeptime < 100)
+                               sleeptime += 1000 / HZ;
+                       continue;
+               }
+               msock->s_send_cnt = 0;
+               if (unlikely(status == -EINTR)) { /*  ignore it */
+                       flush_signals(current);
+                       brick_msleep(50);
+                       continue;
+               }
+               if (unlikely(!status)) {
+                       XIO_WRN("#%d EOF from socket upon send_page()\n", 
msock->s_debug_nr);
+                       brick_msleep(50);
+                       status = -ECOMM;
+                       break;
+               }
+               if (unlikely(status < 0)) {
+                       XIO_WRN("#%d bad socket sendmsg, len=%d, this_len=%d, 
sent=%d, status = %d\n",
+                               msock->s_debug_nr,
+                               len,
+                               this_len,
+                               sent,
+                               status);
+                       break;
+               }
+
+               len -= status;
+               buf += status;
+               sent += status;
+               sleeptime = 1000 / HZ;
+       }
+
+       msock->s_send_bytes += sent;
+       if (status >= 0)
+               status = sent;
+
+       return status;
+}
+
+int xio_send_raw(struct xio_socket *msock, const void *buf, int len, bool cork)
+{
+#ifdef USE_BUFFERING
+       int sent = 0;
+       int rest = len;
+
+#endif
+       int status = -EINVAL;
+
+       if (!xio_get_socket(msock))
+               goto final;
+
+#ifdef USE_BUFFERING
+restart:
+       if (!msock->s_buffer) {
+               msock->s_pos = 0;
+               msock->s_buffer = brick_block_alloc(0, PAGE_SIZE);
+       }
+
+       if (msock->s_pos + rest < PAGE_SIZE) {
+               memcpy(msock->s_buffer + msock->s_pos, buf, rest);
+               msock->s_pos += rest;
+               sent += rest;
+               rest = 0;
+               status = sent;
+               if (cork)
+                       goto done;
+       }
+
+       if (msock->s_pos > 0) {
+               status = _xio_send_raw(msock, msock->s_buffer, msock->s_pos, 0);
+               if (status < 0)
+                       goto done;
+
+               brick_block_free(msock->s_buffer, PAGE_SIZE);
+               msock->s_buffer = NULL;
+               msock->s_pos = 0;
+       }
+
+       if (rest >= PAGE_SIZE) {
+               status = _xio_send_raw(msock, buf, rest, 0);
+               goto done;
+       } else if (rest > 0) {
+               goto restart;
+       }
+       status = sent;
+
+done:
+#else
+       status = _xio_send_raw(msock, buf, len, 0);
+#endif
+       if (status < 0 && msock->s_shutdown_on_err)
+               xio_shutdown_socket(msock);
+
+       xio_put_socket(msock);
+
+final:
+       return status;
+}
+
+/**
+ * xio_recv_raw() - Get [min, max] number of bytes
+ * @msock:     socket to read from
+ * @buf:       buffer to put the data in
+ * @minlen:    minimum number of bytes to read
+ * @maxlen:    maximum number of bytes to read
+ *
+ * Returns a negative error code or a number between [@minlen, @maxlen].
+ * Short reads are mapped to an error.
+ *
+ * Hint: by setting @minlen to 1, you can read any number up to @maxlen.
+ * However, the most important use case is @minlen == @maxlen.
+ *
+ * Note: buf may be NULL. In this case, the data is simply consumed,
+ * like /dev/null
+ */
+static
+int _xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen, 
int flags)
+{
+       void *dummy = NULL;
+       int sleeptime = 1000 / HZ;
+       int status = -EIDRM;
+       int done = 0;
+
+       if (!buf)
+               buf = dummy = brick_block_alloc(0, maxlen);
+
+       if (!xio_get_socket(msock))
+               goto final;
+
+       if (minlen < maxlen) {
+               struct socket *sock = msock->s_socket;
+
+               if (sock && sock->file) {
+                       /* Use nonblocking reads to consume as much data
+                        * as possible
+                        */
+                       sock->file->f_flags |= O_NONBLOCK;
+               }
+       }
+
+       msock->s_recv_cnt = 0;
+       while (done < minlen || (!minlen && !done)) {
+               struct kvec iov = {
+                       .iov_base = buf + done,
+                       .iov_len = maxlen - done,
+               };
+               struct msghdr msg = {
+#ifndef __HAS_IOV_ITER
+                       .msg_iovlen = 1,
+                       .msg_iov = (struct iovec *)&iov,
+#endif
+                       .msg_flags = flags | MSG_NOSIGNAL,
+               };
+               struct socket *sock = msock->s_socket;
+
+               if (unlikely(!sock)) {
+                       XIO_WRN("#%d socket has disappeared\n", 
msock->s_debug_nr);
+                       status = -EIDRM;
+                       goto err;
+               }
+
+               if (!xio_net_is_alive || brick_thread_should_stop()) {
+                       XIO_WRN("#%d interrupting, done = %d\n", 
msock->s_debug_nr, done);
+                       status = -EIDRM;
+                       goto err;
+               }
+
+               status = kernel_recvmsg(sock, &msg, &iov, 1, maxlen-done, 
msg.msg_flags);
+
+               if (!xio_net_is_alive || brick_thread_should_stop()) {
+                       XIO_WRN("#%d interrupting, done = %d\n", 
msock->s_debug_nr, done);
+                       status = -EIDRM;
+                       goto err;
+               }
+
+               if (status == -EAGAIN) {
+                       if (msock->s_recv_abort > 0 && ++msock->s_recv_cnt > 
msock->s_recv_abort) {
+                               XIO_WRN("#%d reached recv abort %d\n", 
msock->s_debug_nr, msock->s_recv_abort);
+                               status = -EINTR;
+                               goto err;
+                       }
+                       brick_msleep(sleeptime);
+                       if (minlen <= 0)
+                               break;
+                       /*  linearly increasing backoff */
+                       if (sleeptime < 100)
+                               sleeptime += 1000 / HZ;
+                       continue;
+               }
+               msock->s_recv_cnt = 0;
+               if (!status) { /*  EOF */
+                       XIO_WRN("#%d got EOF from socket (done=%d, 
req_size=%d)\n",
+                               msock->s_debug_nr,
+                               done,
+                               maxlen - done);
+                       status = -EPIPE;
+                       goto err;
+               }
+               if (status < 0) {
+                       XIO_WRN("#%d bad recvmsg, status = %d\n", 
msock->s_debug_nr, status);
+                       goto err;
+               }
+               done += status;
+               sleeptime = 1000 / HZ;
+       }
+       status = done;
+       msock->s_recv_bytes += done;
+
+err:
+       if (status < 0 && msock->s_shutdown_on_err)
+               xio_shutdown_socket(msock);
+       xio_put_socket(msock);
+final:
+       if (dummy)
+               brick_block_free(dummy, maxlen);
+       return status;
+}
+
+int xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int maxlen)
+{
+       /* Check the very first received byte for higher-level protocol
+        * information. This safes one ping-pong cycle at
+        * xio_proto_exchange() because the sender can immediately
+        * start sending bulk data without need to wait there.
+        * This is important for latency, thus we exceptionally break
+        * the layering hierarchy here. Also, we start sending at
+        * the lowest possible protocol version and may increase
+        * the protocol capabilities dynamically at runtime,
+        * somewhen later. This bears some slight nondeterminism,
+        * but we take it into account for performance reasons.
+        */
+       if (unlikely(!msock->s_recv_bytes))
+               xio_proto_check(msock);
+
+       return _xio_recv_raw(msock, buf, minlen, maxlen, 0);
+}
+
+int xio_send_compressed(struct xio_socket *msock, const void *buf, s32 len, 
int compress, bool cork)
+{
+       void *compr_data = NULL;
+
+       s16 compr_code = 0;
+       int status;
+
+       switch (compress) {
+       case COMPRESS_LZO:
+#ifdef __HAVE_LZO
+               /*  tolerate mixes of different proto versions */
+               if (msock->s_send_proto >= 2 && (msock->s_recv_flags & 
COMPRESS_LZO)) {
+                       size_t compr_len = 0;
+                       int lzo_status;
+                       void *wrkmem;
+
+                       compr_data = brick_mem_alloc(lzo1x_worst_compress(len));
+                       wrkmem = brick_mem_alloc(LZO1X_1_MEM_COMPRESS);
+
+                       lzo_status = lzo1x_1_compress(buf, len, compr_data, 
&compr_len, wrkmem);
+
+                       brick_mem_free(wrkmem);
+                       if (likely(lzo_status == LZO_E_OK && compr_len < len)) {
+                               compr_code = COMPRESS_LZO;
+                               buf = compr_data;
+                               len = compr_len;
+                       }
+               }
+#endif
+               break;
+
+               /* implement further methods here */
+
+       default:
+               /* ignore unknown compress codes */
+               break;
+       }
+
+       /*  allow mixing of different proto versions */
+       if (likely(msock->s_send_proto >= 2)) {
+               status = xio_send_raw(msock, &compr_code, sizeof(compr_code), 
true);
+               if (unlikely(status < 0))
+                       goto done;
+               if (compr_code > 0) {
+                       status = xio_send_raw(msock, &len, sizeof(len), true);
+                       if (unlikely(status < 0))
+                               goto done;
+               }
+       }
+
+       status = xio_send_raw(msock, buf, len, cork);
+
+done:
+       brick_mem_free(compr_data);
+       return status;
+}
+
+int xio_recv_compressed(struct xio_socket *msock, void *buf, int minlen, int 
maxlen)
+{
+       void *compr_data = NULL;
+
+       s16 compr_code = COMPRESS_NONE;
+       int status;
+
+       /*  allow mixing of different proto versions */
+       if (msock->s_send_proto >= 2) {
+               status = xio_recv_raw(msock, &compr_code, sizeof(compr_code), 
sizeof(compr_code));
+               if (unlikely(status < 0))
+                       goto done;
+       }
+
+       switch (compr_code) {
+       case COMPRESS_NONE:
+               status = xio_recv_raw(msock, buf, minlen, maxlen);
+               break;
+
+       case COMPRESS_LZO:
+#ifdef __HAVE_LZO
+               {
+                       s32 compr_len = 0;
+                       size_t this_len;
+                       int lzo_status;
+
+                       status = xio_recv_raw(msock, &compr_len, 
sizeof(compr_len), sizeof(compr_len));
+                       if (unlikely(status < 0))
+                               goto done;
+                       if (unlikely(compr_len <= 0 || compr_len >= maxlen)) {
+                               XIO_ERR("bad comp_len = %d, real minlen = %d 
maxlen = %d\n",
+                                        compr_len, minlen, maxlen);
+                               status = -EOVERFLOW;
+                               goto done;
+                       }
+
+                       compr_data = brick_mem_alloc(compr_len);
+
+                       status = xio_recv_raw(msock, compr_data, compr_len, 
compr_len);
+                       if (unlikely(status < 0))
+                               goto done;
+
+                       this_len = maxlen;
+                       lzo_status = lzo1x_decompress_safe(compr_data, 
compr_len, buf, &this_len);
+
+                       status = this_len;
+                       if (unlikely(lzo_status != LZO_E_OK)) {
+                               XIO_ERR("bad decompression, lzo_status = %d\n", 
lzo_status);
+                               status = -EBADE;
+                               goto done;
+                       }
+                       if (unlikely(this_len < minlen || this_len > maxlen)) {
+                               XIO_WRN("bad decompression length this_len = 
%ld, minlen = %d maxlen = %d\n",
+                                       (long)this_len,
+                                       minlen,
+                                       maxlen);
+                               status = -EBADMSG;
+                               goto done;
+                       }
+                       break;
+               }
+#else
+               XIO_WRN("cannot LZO decompress\n");
+               status = -EBADMSG;
+               break;
+#endif
+
+       /* implement further methods here */
+
+       default:
+               XIO_WRN("got unknown compr_code = %d\n", compr_code);
+               status = -EBADRQC;
+       }
+
+done:
+       brick_mem_free(compr_data);
+       return status;
+}
+
+/*********************************************************************/
+
+/* Mid-level field data exchange
+ */
+
+static
+void dump_meta(const struct meta *meta)
+{
+       int count = 0;
+
+       for (; meta->field_name != NULL; meta++) {
+               XIO_ERR("%2d %4d %4d %4d %p '%s'\n",
+                        meta->field_type,
+                        meta->field_data_size,
+                        meta->field_transfer_size,
+                        meta->field_offset,
+                        meta->field_ref,
+                        meta->field_name);
+               count++;
+       }
+       XIO_ERR("-------- %d fields.\n", count);
+}
+
+static
+int _add_fields(struct xio_desc_item *mi, const struct meta *meta, int offset, 
const char *prefix, int maxlen)
+{
+       int count = 0;
+
+       for (; meta->field_name != NULL; meta++) {
+               const char *new_prefix;
+               int new_offset;
+               int len;
+
+               short this_size;
+
+               new_prefix = mi->field_name;
+               new_offset = offset + meta->field_offset;
+
+               if (unlikely(maxlen < sizeof(struct xio_desc_item))) {
+                       XIO_ERR("desc cache item overflow\n");
+                       count = -1;
+                       goto done;
+               }
+
+               len = scnprintf(mi->field_name, MAX_FIELD_LEN, "%s.%s", prefix, 
meta->field_name);
+               if (unlikely(len >= MAX_FIELD_LEN)) {
+                       XIO_ERR("field len overflow on '%s.%s'\n", prefix, 
meta->field_name);
+                       count = -1;
+                       goto done;
+               }
+               mi->field_type = meta->field_type;
+               this_size = meta->field_data_size;
+               mi->field_data_size = this_size;
+               mi->field_sender_size = this_size;
+               this_size = meta->field_transfer_size;
+               if (this_size > 0)
+                       mi->field_sender_size = this_size;
+               mi->field_sender_offset = new_offset;
+               mi->field_recver_offset = -1;
+
+               mi++;
+               maxlen -= sizeof(struct xio_desc_item);
+               count++;
+
+               if (meta->field_type == FIELD_SUB) {
+                       int sub_count;
+
+                       sub_count = _add_fields(mi, meta->field_ref, 
new_offset, new_prefix, maxlen);
+                       if (sub_count < 0)
+                               return sub_count;
+
+                       mi += sub_count;
+                       count += sub_count;
+                       maxlen -= sub_count * sizeof(struct xio_desc_item);
+               }
+       }
+done:
+       return count;
+}
+
+static
+struct xio_desc_cache *make_sender_cache(struct xio_socket *msock, const 
struct meta *meta, int *cache_index)
+{
+       int orig_len = PAGE_SIZE;
+       int maxlen = orig_len;
+       struct xio_desc_cache *mc;
+       struct xio_desc_item *mi;
+       int i;
+       int status;
+
+       for (i = 0; i < MAX_DESC_CACHE; i++) {
+               mc = msock->s_desc_send[i];
+               if (!mc)
+                       break;
+               if (mc->cache_sender_cookie == (u64)meta)
+                       goto done;
+       }
+
+       if (unlikely(i >= MAX_DESC_CACHE - 1)) {
+               XIO_ERR("#%d desc cache overflow\n", msock->s_debug_nr);
+               return NULL;
+       }
+
+       mc = brick_block_alloc(0, maxlen);
+
+       memset(mc, 0, maxlen);
+       mc->cache_sender_cookie = (u64)meta;
+       /*  further bits may be used in future */
+       mc->cache_sender_proto = msock->s_send_proto;
+       mc->cache_recver_proto = msock->s_recv_proto;
+
+       maxlen -= sizeof(struct xio_desc_cache);
+       mi = (void *)(mc + 1);
+
+       status = _add_fields(mi, meta, 0, "", maxlen);
+
+       if (likely(status > 0)) {
+               mc->cache_items = status;
+               mc->cache_is_bigendian = myself_is_bigendian;
+               msock->s_desc_send[i] = mc;
+               *cache_index = i;
+       } else {
+               brick_block_free(mc, orig_len);
+               mc = NULL;
+       }
+
+done:
+       return mc;
+}
+
+static
+int _make_recver_cache(struct xio_desc_cache *mc, const struct meta *meta, int 
offset, const char *prefix)
+{
+       char *tmp = brick_string_alloc(MAX_FIELD_LEN);
+       int count = 0;
+       int i;
+
+       for (; meta->field_name != NULL; meta++, count++) {
+               snprintf(tmp, MAX_FIELD_LEN, "%s.%s", prefix, meta->field_name);
+               for (i = 0; i < mc->cache_items; i++) {
+                       struct xio_desc_item *mi = ((struct xio_desc_item *)(mc 
+ 1)) + i;
+
+                       if (meta->field_type == mi->field_type &&
+                           !strcmp(tmp, mi->field_name)) {
+                               mi->field_recver_size = meta->field_data_size;
+                               mi->field_recver_offset = offset + 
meta->field_offset;
+                               if (meta->field_type == FIELD_SUB) {
+                                       int sub_count = _make_recver_cache(mc,
+
+                                               meta->field_ref,
+                                               mi->field_recver_offset,
+                                               tmp);
+                                       if (unlikely(sub_count <= 0)) {
+                                               count = 0;
+                                               goto done;
+                                       }
+                               }
+                               goto found;
+                       }
+               }
+               if (unlikely(!count)) {
+                       XIO_ERR("field '%s' is missing\n", meta->field_name);
+                       goto done;
+               }
+               XIO_WRN("field %2d '%s' is missing\n", count, meta->field_name);
+found:;
+       }
+done:
+       brick_string_free(tmp);
+       return count;
+}
+
+static
+int make_recver_cache(struct xio_desc_cache *mc, const struct meta *meta)
+{
+       int count;
+       int i;
+
+       mc->cache_recver_cookie = (u64)meta;
+       count = _make_recver_cache(mc, meta, 0, "");
+
+       for (i = 0; i < mc->cache_items; i++) {
+               struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + 
i;
+
+               if (unlikely(mi->field_recver_offset < 0))
+                       XIO_WRN("field '%s' is not transferred\n", 
mi->field_name);
+       }
+       return count;
+}
+
+#define _CHECK_STATUS(_txt_)                                           \
+do {                                                                   \
+       if (unlikely(status < 0)) {                                     \
+               XIO_DBG("%s status = %d\n", _txt_, status);             \
+               goto err;                                               \
+       }                                                               \
+} while (0)
+
+static
+int _desc_send_item(struct xio_socket *msock,
+       const void *data,
+       const struct xio_desc_cache *mc,
+       int index,
+       bool cork)
+{
+       struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + index;
+       const void *item = data + mi->field_sender_offset;
+
+       s16 data_len = mi->field_data_size;
+       s16 transfer_len = mi->field_sender_size;
+       int status;
+       bool is_signed = false;
+       int res = -1;
+
+       switch (mi->field_type) {
+       case FIELD_REF:
+               XIO_ERR("field '%s' NYI type = %d\n", mi->field_name, 
mi->field_type);
+               goto err;
+       case FIELD_SUB:
+               /* skip this */
+               res = 0;
+               break;
+       case FIELD_INT:
+               is_signed = true;
+               /* fallthrough */
+       case FIELD_UINT:
+               if (unlikely(data_len <= 0 || data_len > MAX_INT_TRANSFER)) {
+                       XIO_ERR("field '%s' bad data_len = %d\n", 
mi->field_name, data_len);
+                       goto err;
+               }
+               if (unlikely(transfer_len > MAX_INT_TRANSFER)) {
+                       XIO_ERR("field '%s' bad transfer_len = %d\n", 
mi->field_name, transfer_len);
+                       goto err;
+               }
+
+               if (likely(data_len == transfer_len))
+                       goto raw;
+
+               if (transfer_len > data_len) {
+                       int diff = transfer_len - data_len;
+                       char empty[diff];
+                       char sign;
+
+                       sign = get_sign(item, data_len, myself_is_bigendian, 
is_signed);
+                       memset(empty, sign, diff);
+
+                       if (myself_is_bigendian) {
+                               status = xio_send_raw(msock, empty, diff, true);
+                               _CHECK_STATUS("send_diff");
+                               status = xio_send_raw(msock, item, data_len, 
cork);
+                               _CHECK_STATUS("send_item");
+
+                       } else {
+                               status = xio_send_raw(msock, item, data_len, 
true);
+                               _CHECK_STATUS("send_item");
+                               status = xio_send_raw(msock, empty, diff, cork);
+                               _CHECK_STATUS("send_diff");
+                       }
+
+                       res = data_len;
+                       break;
+               } else if (unlikely(transfer_len <= 0)) {
+                       XIO_ERR("bad transfer_len = %d\n", transfer_len);
+                       goto err;
+               } else { /*  transfer_len < data_len */
+                       char check = get_sign(item, data_len, 
myself_is_bigendian, is_signed);
+                       int start;
+                       int end;
+                       int i;
+
+                       if (is_signed &&
+                           unlikely(get_sign(item, transfer_len, 
myself_is_bigendian, true) != check)) {
+                               XIO_ERR("cannot sign-reduce signed integer from 
%d to %d bytes, byte %d !~ %d\n",
+                                        data_len,
+                                        transfer_len,
+                                        ((char *)item)[transfer_len - 1],
+                                        check);
+                               goto err;
+                       }
+
+                       if (myself_is_bigendian) {
+                               start = 0;
+                               end = data_len - transfer_len;
+                       } else {
+                               start = transfer_len;
+                               end = data_len;
+                       }
+
+                       for (i = start; i < end; i++) {
+                               if (unlikely(((char *)item)[i] != check)) {
+                                       XIO_ERR("cannot sign-reduce %ssigned 
integer from %d to %d bytes at pos %d, byte %d != %d\n",
+                                                is_signed ? "" : "un",
+                                                data_len,
+                                                transfer_len,
+                                                i,
+                                                ((char *)item)[i],
+                                                check);
+                                       goto err;
+                               }
+                       }
+
+                       /*  just omit the higher/lower bytes */
+                       data_len = transfer_len;
+                       if (myself_is_bigendian)
+                               item += end;
+                       goto raw;
+               }
+       case FIELD_STRING:
+               item = *(void **)item;
+               data_len = 0;
+               if (item)
+                       data_len = strlen(item) + 1;
+
+               status = xio_send_raw(msock, &data_len, sizeof(data_len), true);
+               _CHECK_STATUS("send_string_len");
+               /* fallthrough */
+       case FIELD_RAW:
+raw:
+               if (unlikely(data_len < 0)) {
+                       XIO_ERR("field '%s' bad data_len = %d\n", 
mi->field_name, data_len);
+                       goto err;
+               }
+               status = xio_send_raw(msock, item, data_len, cork);
+               _CHECK_STATUS("send_raw");
+               res = data_len;
+               break;
+       default:
+               XIO_ERR("field '%s' unknown type = %d\n", mi->field_name, 
mi->field_type);
+       }
+err:
+       return res;
+}
+
+static
+int _desc_recv_item(struct xio_socket *msock, void *data, const struct 
xio_desc_cache *mc, int index, int line)
+{
+       struct xio_desc_item *mi = ((struct xio_desc_item *)(mc + 1)) + index;
+       void *item = NULL;
+
+       s16 data_len = mi->field_recver_size;
+       s16 transfer_len = mi->field_sender_size;
+       int status;
+       bool is_signed = false;
+       int res = -1;
+
+       if (likely(data && data_len > 0 && mi->field_recver_offset >= 0))
+               item = data + mi->field_recver_offset;
+
+       switch (mi->field_type) {
+       case FIELD_REF:
+               XIO_ERR("field '%s' NYI type = %d\n", mi->field_name, 
mi->field_type);
+               goto err;
+       case FIELD_SUB:
+               /* skip this */
+               res = 0;
+               break;
+       case FIELD_INT:
+               is_signed = true;
+               /* fallthrough */
+       case FIELD_UINT:
+               if (unlikely(data_len <= 0 || data_len > MAX_INT_TRANSFER)) {
+                       XIO_ERR("field '%s' bad data_len = %d\n", 
mi->field_name, data_len);
+                       goto err;
+               }
+               if (unlikely(transfer_len > MAX_INT_TRANSFER)) {
+                       XIO_ERR("field '%s' bad transfer_len = %d\n", 
mi->field_name, transfer_len);
+                       goto err;
+               }
+
+               if (likely(data_len == transfer_len))
+                       goto raw;
+
+               if (transfer_len > data_len) {
+                       int diff = transfer_len - data_len;
+                       char empty[diff];
+                       char check;
+
+                       memset(empty, 0, diff);
+
+                       if (myself_is_bigendian) {
+                               status = xio_recv_raw(msock, empty, diff, diff);
+                               _CHECK_STATUS("recv_diff");
+                       }
+
+                       status = xio_recv_raw(msock, item, data_len, data_len);
+                       _CHECK_STATUS("recv_item");
+                       if (unlikely(mc->cache_is_bigendian != 
myself_is_bigendian && item))
+                               swap_bytes(item, data_len);
+
+                       if (!myself_is_bigendian) {
+                               status = xio_recv_raw(msock, empty, diff, diff);
+                               _CHECK_STATUS("recv_diff");
+                       }
+
+                       /*  check that sign extension did no harm */
+                       check = get_sign(empty, diff, mc->cache_is_bigendian, 
is_signed);
+                       while (--diff >= 0) {
+                               if (unlikely(empty[diff] != check)) {
+                                       XIO_ERR("field '%s' %sSIGNED INTEGER 
OVERFLOW on size reduction from %d to %d, byte %d != %d\n",
+                                                mi->field_name,
+                                                is_signed ? "" : "UN",
+                                                transfer_len,
+                                                data_len,
+                                                empty[diff],
+                                                check);
+                                       goto err;
+                               }
+                       }
+                       if (is_signed && item &&
+                           unlikely(get_sign(item, data_len, 
myself_is_bigendian, true) != check)) {
+                               XIO_ERR("field '%s' SIGNED INTEGER OVERLOW on 
reduction from size %d to %d, byte %d !~ %d\n",
+                                        mi->field_name,
+                                        transfer_len,
+                                        data_len,
+                                        ((char *)item)[data_len - 1],
+                                        check);
+                               goto err;
+                       }
+
+                       res = data_len;
+                       break;
+               } else if (unlikely(transfer_len <= 0)) {
+                       XIO_ERR("field '%s' bad transfer_len = %d\n", 
mi->field_name, transfer_len);
+                       goto err;
+               } else if (unlikely(!item)) { /*  shortcut without checks */
+                       data_len = transfer_len;
+                       goto raw;
+               } else { /*  transfer_len < data_len */
+                       int diff = data_len - transfer_len;
+                       char *transfer_ptr = item;
+                       char sign;
+
+                       if (myself_is_bigendian)
+                               transfer_ptr += diff;
+
+                       status = xio_recv_raw(msock, transfer_ptr, 
transfer_len, transfer_len);
+                       _CHECK_STATUS("recv_transfer");
+                       if (unlikely(mc->cache_is_bigendian != 
myself_is_bigendian))
+                               swap_bytes(transfer_ptr, transfer_len);
+
+                       /*  sign-extend from transfer_len to data_len */
+                       sign = get_sign(transfer_ptr, transfer_len, 
myself_is_bigendian, is_signed);
+                       if (myself_is_bigendian)
+                               memset(item, sign, diff);
+                       else
+                               memset(item + transfer_len, sign, diff);
+                       res = data_len;
+                       break;
+               }
+       case FIELD_STRING:
+               data_len = 0;
+               status = xio_recv_raw(msock, &data_len, sizeof(data_len), 
sizeof(data_len));
+               _CHECK_STATUS("recv_string_len");
+
+               if (unlikely(mc->cache_is_bigendian != myself_is_bigendian))
+                       swap_bytes(&data_len, sizeof(data_len));
+
+               if (data_len > 0 && item) {
+                       char *str = _brick_string_alloc(data_len, line);
+
+                       *(void **)item = str;
+                       item = str;
+               }
+
+               transfer_len = data_len;
+               /* fallthrough */
+       case FIELD_RAW:
+raw:
+               if (unlikely(data_len < 0)) {
+                       XIO_ERR("field = '%s' implausible data_len = %d\n", 
mi->field_name, data_len);
+                       goto err;
+               }
+               if (likely(data_len > 0)) {
+                       if (unlikely(transfer_len != data_len)) {
+                               XIO_ERR("cannot handle generic mismatch in 
transfer sizes, field = '%s', %d != %d\n",
+                                       mi->field_name,
+                                       transfer_len,
+                                       data_len);
+                               goto err;
+                       }
+                       status = xio_recv_raw(msock, item, data_len, data_len);
+                       _CHECK_STATUS("recv_raw");
+               }
+               res = data_len;
+               break;
+       default:
+               XIO_ERR("field '%s' unknown type = %d\n", mi->field_name, 
mi->field_type);
+       }
+err:
+       return res;
+}
+
+static inline
+int _desc_send_struct(struct xio_socket *msock, int cache_index, const void 
*data, int h_meta_len, bool cork)
+{
+       const struct xio_desc_cache *mc = msock->s_desc_send[cache_index];
+
+       struct xio_desc_header header = {
+               .h_magic = XIO_DESC_MAGIC,
+               .h_cookie = mc->cache_sender_cookie,
+               .h_meta_len = h_meta_len,
+               .h_index = data ? cache_index : -1,
+       };
+       int index;
+       int count = 0;
+       int status = 0;
+
+       status = xio_send_raw(msock, &header, sizeof(header), cork || data);
+       _CHECK_STATUS("send_header");
+
+       if (unlikely(h_meta_len > 0)) {
+               status = xio_send_raw(msock, mc, h_meta_len, true);
+               _CHECK_STATUS("send_meta");
+       }
+
+       if (likely(data)) {
+               for (index = 0; index < mc->cache_items; index++) {
+                       status = _desc_send_item(msock, data, mc, index, cork 
|| index < mc->cache_items-1);
+                       _CHECK_STATUS("send_cache_item");
+                       count++;
+               }
+       }
+
+       if (status >= 0)
+               status = count;
+err:
+       return status;
+}
+
+static
+int desc_send_struct(struct xio_socket *msock, const void *data, const struct 
meta *meta, bool cork)
+{
+       struct xio_desc_cache *mc;
+       int i;
+       int h_meta_len = 0;
+       int status = -EINVAL;
+
+       for (i = 0; i < MAX_DESC_CACHE; i++) {
+               mc = msock->s_desc_send[i];
+               if (!mc)
+                       break;
+               if (mc->cache_sender_cookie == (u64)meta)
+                       goto found;
+       }
+
+       mc = make_sender_cache(msock, meta, &i);
+       if (unlikely(!mc))
+               goto done;
+
+       h_meta_len = mc->cache_items * sizeof(struct xio_desc_item) + 
sizeof(struct xio_desc_cache);
+
+found:
+       status = _desc_send_struct(msock, i, data, h_meta_len, cork);
+
+done:
+       return status;
+}
+
+static
+int desc_recv_struct(struct xio_socket *msock, void *data, const struct meta 
*meta, int line)
+{
+       struct xio_desc_header header = {};
+       struct xio_desc_cache *mc;
+       int cache_index;
+       int index;
+       int count = 0;
+       int status = 0;
+       bool need_swap = false;
+
+       status = xio_recv_raw(msock, &header, sizeof(header), sizeof(header));
+       _CHECK_STATUS("recv_header");
+
+       if (unlikely(header.h_magic != XIO_DESC_MAGIC)) {
+               need_swap = true;
+               SWAP_FIELD(header.h_magic);
+               if (unlikely(header.h_magic != XIO_DESC_MAGIC)) {
+                       XIO_WRN("#%d called from line %d bad packet header 
magic = %llx\n",
+                               msock->s_debug_nr,
+                               line,
+                               header.h_magic);
+                       status = -ENOMSG;
+                       goto err;
+               }
+               SWAP_FIELD(header.h_cookie);
+               SWAP_FIELD(header.h_meta_len);
+               SWAP_FIELD(header.h_index);
+       }
+
+       cache_index = header.h_index;
+       if (cache_index < 0) { /*  EOR */
+               goto done;
+       }
+       if (unlikely(cache_index >= MAX_DESC_CACHE - 1)) {
+               XIO_WRN("#%d called from line %d bad cache index %d\n", 
msock->s_debug_nr, line, cache_index);
+               status = -EBADF;
+               goto err;
+       }
+
+       mc = msock->s_desc_recv[cache_index];
+       if (unlikely(!mc)) {
+               if (unlikely(header.h_meta_len <= 0)) {
+                       XIO_WRN("#%d called from line %d missing meta 
information\n", msock->s_debug_nr, line);
+                       status = -ENOMSG;
+                       goto err;
+               }
+
+               mc = _brick_block_alloc(0, PAGE_SIZE, line);
+
+               status = xio_recv_raw(msock, mc, header.h_meta_len, 
header.h_meta_len);
+               if (unlikely(status < 0))
+                       brick_block_free(mc, PAGE_SIZE);
+               _CHECK_STATUS("recv_meta");
+
+               if (unlikely(need_swap))
+                       swap_mc(mc, header.h_meta_len);
+
+               status = make_recver_cache(mc, meta);
+               if (unlikely(status < 0)) {
+                       brick_block_free(mc, PAGE_SIZE);
+                       goto err;
+               }
+               msock->s_desc_recv[cache_index] = mc;
+       } else if (unlikely(header.h_meta_len > 0)) {
+               XIO_WRN("#%d called from line %d has %d unexpected meta 
bytes\n",
+                       msock->s_debug_nr,
+                       line,
+                       header.h_meta_len);
+               status = -EMSGSIZE;
+               goto err;
+       } else if (unlikely(mc->cache_recver_cookie != (u64)meta)) {
+               XIO_ERR("#%d protocol error %p != %p\n", msock->s_debug_nr, 
meta, (void *)mc->cache_recver_cookie);
+               dump_meta((void *)mc->cache_recver_cookie);
+               dump_meta(meta);
+               status = -EPROTO;
+               goto err;
+       }
+
+       for (index = 0; index < mc->cache_items; index++) {
+               status = _desc_recv_item(msock, data, mc, index, line);
+               _CHECK_STATUS("recv_cache_item");
+               count++;
+       }
+
+done:
+       if (status >= 0)
+               status = count;
+err:
+       return status;
+}
+
+int xio_send_struct(struct xio_socket *msock, const void *data, const struct 
meta *meta)
+{
+       return desc_send_struct(msock, data, meta, false);
+}
+
+int _xio_recv_struct(struct xio_socket *msock, void *data, const struct meta 
*meta, int line)
+{
+       return desc_recv_struct(msock, data, meta, line);
+}
+
+/*********************************************************************/
+
+/* High-level transport of xio structures
+ */
+
+const struct meta xio_cmd_meta[] = {
+       META_INI_SUB(cmd_stamp, struct xio_cmd, xio_timespec_meta),
+       META_INI(cmd_code, struct xio_cmd, FIELD_INT),
+       META_INI(cmd_int1, struct xio_cmd, FIELD_INT),
+       META_INI(cmd_str1, struct xio_cmd, FIELD_STRING),
+       {}
+};
+
+int xio_send_aio(struct xio_socket *msock, struct aio_object *aio)
+{
+       struct xio_cmd cmd = {
+               .cmd_code = CMD_AIO,
+               .cmd_int1 = aio->io_id,
+       };
+       int seq = 0;
+       int status;
+
+       if (aio->io_rw != 0 && aio->io_data && aio->io_cs_mode < 2)
+               cmd.cmd_code |= CMD_FLAG_HAS_DATA;
+
+       get_lamport(&cmd.cmd_stamp);
+
+       status = desc_send_struct(msock, &cmd, xio_cmd_meta, true);
+       if (status < 0)
+               goto done;
+
+       seq = 0;
+       status = desc_send_struct(msock, aio, xio_aio_user_meta, cmd.cmd_code & 
CMD_FLAG_HAS_DATA);
+       if (status < 0)
+               goto done;
+
+       if (cmd.cmd_code & CMD_FLAG_HAS_DATA)
+               status = xio_send_compressed(msock, aio->io_data, aio->io_len, 
xio_net_compress_data, false);
+done:
+       return status;
+}
+
+int xio_recv_aio(struct xio_socket *msock, struct aio_object *aio, struct 
xio_cmd *cmd)
+{
+       int status;
+
+       status = desc_recv_struct(msock, aio, xio_aio_user_meta, __LINE__);
+       if (status < 0)
+               goto done;
+
+       set_lamport(&cmd->cmd_stamp);
+
+       if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
+               if (!aio->io_data)
+                       aio->io_data = brick_block_alloc(0, aio->io_len);
+               status = xio_recv_compressed(msock, aio->io_data, aio->io_len, 
aio->io_len);
+               if (unlikely(status < 0))
+                       XIO_WRN("#%d aio_len = %d, status = %d\n", 
msock->s_debug_nr, aio->io_len, status);
+       }
+done:
+       return status;
+}
+
+int xio_send_cb(struct xio_socket *msock, struct aio_object *aio)
+{
+       struct xio_cmd cmd = {
+               .cmd_code = CMD_CB,
+               .cmd_int1 = aio->io_id,
+       };
+       int seq = 0;
+       int status;
+
+       if (aio->io_rw == 0 && aio->io_data && aio->io_cs_mode < 2)
+               cmd.cmd_code |= CMD_FLAG_HAS_DATA;
+
+       get_lamport(&cmd.cmd_stamp);
+
+       status = desc_send_struct(msock, &cmd, xio_cmd_meta, true);
+       if (status < 0)
+               goto done;
+
+       seq = 0;
+       status = desc_send_struct(msock, aio, xio_aio_user_meta, cmd.cmd_code & 
CMD_FLAG_HAS_DATA);
+       if (status < 0)
+               goto done;
+
+       if (cmd.cmd_code & CMD_FLAG_HAS_DATA)
+               status = xio_send_compressed(msock, aio->io_data, aio->io_len, 
xio_net_compress_data, false);
+done:
+       return status;
+}
+
+int xio_recv_cb(struct xio_socket *msock, struct aio_object *aio, struct 
xio_cmd *cmd)
+{
+       int status;
+
+       status = desc_recv_struct(msock, aio, xio_aio_user_meta, __LINE__);
+       if (status < 0)
+               goto done;
+
+       set_lamport(&cmd->cmd_stamp);
+
+       if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
+               if (!aio->io_data) {
+                       XIO_WRN("#%d no internal buffer available\n", 
msock->s_debug_nr);
+                       status = -EINVAL;
+                       goto done;
+               }
+               status = xio_recv_compressed(msock, aio->io_data, aio->io_len, 
aio->io_len);
+       }
+done:
+       return status;
+}
+
+/***************** module init stuff ************************/
+
+char *(*xio_translate_hostname)(const char *name) = NULL;
+
+bool xio_net_is_alive;
+
+int __init init_xio_net(void)
+{
+       XIO_INF("init_net()\n");
+       xio_net_is_alive = true;
+       return 0;
+}
+
+void exit_xio_net(void)
+{
+       xio_net_is_alive = false;
+       brick_string_free(id);
+       id = NULL;
+       XIO_INF("exit_net()\n");
+}
diff --git a/include/linux/xio/xio_net.h b/include/linux/xio/xio_net.h
new file mode 100644
index 0000000..083b75c
--- /dev/null
+++ b/include/linux/xio/xio_net.h
@@ -0,0 +1,171 @@
+/*
+ * MARS Long Distance Replication Software
+ *
+ * Copyright (C) 2010-2014 Thomas Schoebel-Theuer
+ * Copyright (C) 2011-2014 1&1 Internet AG
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef XIO_NET_H
+#define XIO_NET_H
+
+#include <net/sock.h>
+#include <net/ipconfig.h>
+#include <net/tcp.h>
+
+#include <linux/brick/brick.h>
+
+extern int xio_net_compress_data;
+
+extern int xio_net_default_port;
+extern int xio_net_bind_before_listen;
+extern int xio_net_bind_before_connect;
+
+extern bool xio_net_is_alive;
+
+#define MAX_DESC_CACHE                 16
+
+/* The original struct socket has no refcount. This leads to problems
+ * during long-lasting system calls when racing with socket shutdown.
+ *
+ * The original idea of struct xio_socket was just a small wrapper
+ * adding a refcount and some debugging aid.
+ * Later, some buffering was added in order to take advantage of
+ * kernel_sendpage().
+ * Caching of meta description has also been added.
+ *
+ * Notice: we have a slightly restricted parallelism model.
+ * One sender and one receiver thread may work in parallel
+ * on the same socket instance. At low level, there must not exist
+ * multiple readers in parallel to each other, or multiple
+ * writers in parallel to each other. Otherwise, higher level
+ * protocol sequences would be disturbed anyway.
+ * When needed, you may achieve higher parallelism by doing your own
+ * semaphore locking around xio_{send, recv}_struct() or even longer
+ * sequences of subsets of your high-level protocol.
+ */
+struct xio_socket {
+       struct socket *s_socket;
+
+       u64 s_send_bytes;
+       u64 s_recv_bytes;
+       void *s_buffer;
+       atomic_t s_count;
+       int s_pos;
+       int s_debug_nr;
+       int s_send_abort;
+       int s_recv_abort;
+       int s_send_cnt;
+       int s_recv_cnt;
+       bool s_shutdown_on_err;
+       bool s_alive;
+
+       u8   s_send_proto;
+       u8   s_recv_proto;
+       u16  s_send_flags;
+       u16  s_recv_flags;
+       struct xio_desc_cache *s_desc_send[MAX_DESC_CACHE];
+       struct xio_desc_cache *s_desc_recv[MAX_DESC_CACHE];
+};
+
+struct xio_tcp_params {
+       int ip_tos;
+       int tcp_window_size;
+       int tcp_nodelay;
+       int tcp_timeout;
+       int tcp_keepcnt;
+       int tcp_keepintvl;
+       int tcp_keepidle;
+};
+
+extern struct xio_tcp_params default_tcp_params;
+
+enum {
+       CMD_NOP,
+       CMD_NOTIFY,
+       CMD_CONNECT,
+       CMD_GETINFO,
+       CMD_GETENTS,
+       CMD_AIO,
+       CMD_CB,
+};
+
+#define CMD_FLAG_MASK                  255
+#define CMD_FLAG_HAS_DATA              256
+
+struct xio_cmd {
+       struct timespec cmd_stamp; /*  for automatic lamport clock */
+       int cmd_code;
+       int cmd_int1;
+
+       /* int cmd_int2; */
+       /* int cmd_int3; */
+       char *cmd_str1;
+
+       /* char *cmd_str2; */
+       /* char *cmd_str3; */
+};
+
+extern const struct meta xio_cmd_meta[];
+
+extern char *(*xio_translate_hostname)(const char *name);
+
+extern char *my_id(void);
+
+/* Low-level network traffic
+ */
+extern int xio_create_sockaddr(struct sockaddr_storage *addr, const char 
*spec);
+
+extern int xio_create_socket(struct xio_socket *msock,
+       struct sockaddr_storage *src_addr,
+       struct sockaddr_storage *dst_addr);
+
+extern int xio_accept_socket(struct xio_socket *new_msock, struct xio_socket 
*old_msock);
+extern bool xio_get_socket(struct xio_socket *msock);
+extern void xio_put_socket(struct xio_socket *msock);
+extern void xio_shutdown_socket(struct xio_socket *msock);
+extern bool xio_socket_is_alive(struct xio_socket *msock);
+extern long xio_socket_send_space_available(struct xio_socket *msock);
+
+extern int xio_send_raw(struct xio_socket *msock, const void *buf, int len, 
bool cork);
+extern int xio_recv_raw(struct xio_socket *msock, void *buf, int minlen, int 
maxlen);
+
+int xio_send_compressed(struct xio_socket *msock, const void *buf, s32 len, 
int compress, bool cork);
+int xio_recv_compressed(struct xio_socket *msock, void *buf, int minlen, int 
maxlen);
+
+/* Mid-level generic field data exchange
+ */
+extern int xio_send_struct(struct xio_socket *msock, const void *data, const 
struct meta *meta);
+#define xio_recv_struct(_sock_, _data_, _meta_)                                
\
+       ({                                                              \
+               _xio_recv_struct(_sock_, _data_, _meta_, __LINE__);     \
+       })
+extern int _xio_recv_struct(struct xio_socket *msock, void *data, const struct 
meta *meta, int line);
+
+/* High-level transport of xio structures
+ */
+extern int xio_send_dent_list(struct xio_socket *msock, struct list_head 
*anchor);
+extern int xio_recv_dent_list(struct xio_socket *msock, struct list_head 
*anchor);
+
+extern int xio_send_aio(struct xio_socket *msock, struct aio_object *aio);
+extern int xio_recv_aio(struct xio_socket *msock, struct aio_object *aio, 
struct xio_cmd *cmd);
+extern int xio_send_cb(struct xio_socket *msock, struct aio_object *aio);
+extern int xio_recv_cb(struct xio_socket *msock, struct aio_object *aio, 
struct xio_cmd *cmd);
+
+/***********************************************************************/
+
+/*  init */
+
+extern int init_xio_net(void);
+extern void exit_xio_net(void);
+
+#endif
-- 
2.6.4

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to