Module: xenomai-head
Branch: master
Commit: cd9dd67c023de98f340cb8f0309f92a9661da1af
URL:    
http://git.xenomai.org/?p=xenomai-head.git;a=commit;h=cd9dd67c023de98f340cb8f0309f92a9661da1af

Author: Philippe Gerum <r...@xenomai.org>
Date:   Fri Sep 11 15:20:57 2009 +0200

rtipc: introduce BUFP protocol

The buffer protocol implements a byte-oriented, one-way
Producer-Consumer data path, which makes it a bit faster than
datagram-oriented protocols. All messages written are buffered
into a single memory area in strict FIFO order, until read by
the consumer.

This protocol prevents short writes, and only allows short
reads when a potential deadlock situation arises (i.e. readers
and writers waiting for each other indefinitely), which
usually means that the buffer size does not fit the use peer
threads are making from the protocol.

This protocol is strictly identical to the RT_BUFFER interface
available from the native skin.

---

 examples/rtdm/profiles/ipc/Makefile         |    5 +-
 examples/rtdm/profiles/ipc/bufp-label.c     |  225 ++++++
 examples/rtdm/profiles/ipc/bufp-readwrite.c |  190 +++++
 include/rtdm/rtipc.h                        |   33 +-
 ksrc/drivers/ipc/Kconfig                    |   33 +-
 ksrc/drivers/ipc/Makefile                   |    2 +
 ksrc/drivers/ipc/bufp.c                     | 1054 +++++++++++++++++++++++++++
 ksrc/drivers/ipc/internal.h                 |   16 +
 ksrc/drivers/ipc/rtipc.c                    |    3 +
 9 files changed, 1544 insertions(+), 17 deletions(-)

diff --git a/examples/rtdm/profiles/ipc/Makefile 
b/examples/rtdm/profiles/ipc/Makefile
index a44959c..dbd3d1c 100644
--- a/examples/rtdm/profiles/ipc/Makefile
+++ b/examples/rtdm/profiles/ipc/Makefile
@@ -1,7 +1,10 @@
 ###### CONFIGURATION ######
 
 ### List of applications to be built
-APPLICATIONS = xddp-echo xddp-label xddp-stream iddp-sendrecv iddp-label
+APPLICATIONS = \
+       xddp-echo xddp-label xddp-stream \
+       iddp-sendrecv iddp-label \
+       bufp-readwrite bufp-label
 
 ### Note: to override the search path for the xeno-config script, use "make 
XENO=..."
 
diff --git a/examples/rtdm/profiles/ipc/bufp-label.c 
b/examples/rtdm/profiles/ipc/bufp-label.c
new file mode 100644
index 0000000..9a1dde3
--- /dev/null
+++ b/examples/rtdm/profiles/ipc/bufp-label.c
@@ -0,0 +1,225 @@
+/*
+ * BUFP-based client/server demo, using the read(2)/write(2)
+ * system calls to exchange data over a socket.
+ *
+ * In this example, two sockets are created.  A server thread (reader)
+ * is bound to a real-time port and receives a stream of bytes sent to
+ * this port from a client thread (writer).
+ *
+ * See Makefile in this directory for build directives.
+ */
+#include <sys/mman.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include <pthread.h>
+#include <errno.h>
+#include <rtdk.h>
+#include <rtdm/rtipc.h>
+
+pthread_t svtid, cltid;
+
+#define BUFP_PORT_LABEL  "bufp-demo"
+
+static const char *msg[] = {
+    "Surfing With The Alien",
+    "Lords of Karma",
+    "Banana Mango",
+    "Psycho Monkey",
+    "Luminous Flesh Giants",
+    "Moroccan Sunset",
+    "Satch Boogie",
+    "Flying In A Blue Dream",
+    "Ride",
+    "Summer Song",
+    "Speed Of Light",
+    "Crystal Planet",
+    "Raspberry Jam Delta-V",
+    "Champagne?",
+    "Clouds Race Across The Sky",
+    "Engines Of Creation"
+};
+
+static void fail(const char *reason)
+{
+       perror(reason);
+       exit(EXIT_FAILURE);
+}
+
+void *server(void *arg)
+{
+       char label[BUFP_LABEL_LEN];
+       struct sockaddr_ipc saddr;
+       char buf[128];
+       size_t bufsz;
+       int ret, s;
+
+       s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_BUFP);
+       if (s < 0)
+               fail("socket");
+
+       /*
+        * Set a 16k buffer for the server endpoint. This
+        * configuration must be done prior to binding the socket to a
+        * port.
+        */
+       bufsz = 16384; /* bytes */
+       ret = setsockopt(s, SOL_RTIPC, BUFP_SETBUFFER,
+                        &bufsz, sizeof(bufsz));
+       if (ret)
+               fail("setsockopt");
+
+       /*
+        * Set a port label. This name will be registered when
+        * binding, in addition to the port number (if given).
+        */
+       strcpy(label, BUFP_PORT_LABEL);
+       ret = setsockopt(s, SOL_RTIPC, BUFP_SETLABEL,
+                        label, sizeof(label));
+       if (ret)
+               fail("setsockopt");
+
+       /*
+        * Bind the socket to the port. Assign that port a label, so
+        * that peers may use a descriptive information to locate
+        * it. Labeled ports will appear in the
+        * /proc/xenomai/registry/rtipc/bufp directory once the socket
+        * is bound.
+        *
+        * saddr.sipc_port specifies the port number to use. If -1 is
+        * passed, the BUFP driver will auto-select an idle port.
+        */
+       saddr.sipc_family = AF_RTIPC;
+       saddr.sipc_port = -1;
+       ret = bind(s, (struct sockaddr *)&saddr, sizeof(saddr));
+       if (ret)
+               fail("bind");
+
+       for (;;) {
+               ret = read(s, buf, sizeof(buf));
+               if (ret < 0) {
+                       close(s);
+                       fail("read");
+               }
+               rt_printf("%s: received %d bytes, \"%.*s\"\n",
+                         __FUNCTION__, ret, ret, buf);
+       }
+
+       return NULL;
+}
+
+void *client(void *arg)
+{
+       struct sockaddr_ipc svsaddr;
+       char label[BUFP_LABEL_LEN];
+       int ret, s, n = 0, len;
+       struct timespec ts;
+       char buf[128];
+
+       s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_BUFP);
+       if (s < 0)
+               fail("socket");
+
+       /*
+        * Set the port label. This name will be used to find the peer
+        * when connecting, instead of the port number. The label must
+        * be set _after_ the socket is bound to the port, so that
+        * BUFP does not try to register this label for the client
+        * port as well (like the server thread did).
+        */
+       strcpy(label, BUFP_PORT_LABEL);
+       ret = setsockopt(s, SOL_RTIPC, BUFP_SETLABEL,
+                        label, sizeof(label));
+       if (ret)
+               fail("setsockopt");
+
+       memset(&svsaddr, 0, sizeof(svsaddr));
+       svsaddr.sipc_family = AF_RTIPC;
+       svsaddr.sipc_port = -1; /* Tell BUFP to search by label. */
+       ret = connect(s, (struct sockaddr *)&svsaddr, sizeof(svsaddr));
+       if (ret)
+               fail("connect");
+
+       for (;;) {
+               len = strlen(msg[n]);
+               ret = write(s, msg[n], len);
+               if (ret < 0) {
+                       close(s);
+                       fail("write");
+               }
+               rt_printf("%s: sent %d bytes, \"%.*s\"\n",
+                         __FUNCTION__, ret, ret, msg[n]);
+               n = (n + 1) % (sizeof(msg) / sizeof(msg[0]));
+               /*
+                * We run in full real-time mode (i.e. primary mode),
+                * so we have to let the system breathe between two
+                * iterations.
+                */
+               ts.tv_sec = 0;
+               ts.tv_nsec = 500000000; /* 500 ms */
+               clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL);
+       }
+
+       return NULL;
+}
+
+void cleanup_upon_sig(int sig)
+{
+       pthread_cancel(svtid);
+       pthread_cancel(cltid);
+       signal(sig, SIG_DFL);
+       pthread_join(svtid, NULL);
+       pthread_join(cltid, NULL);
+}
+
+int main(int argc, char **argv)
+{
+       struct sched_param svparam = {.sched_priority = 71 };
+       struct sched_param clparam = {.sched_priority = 70 };
+       pthread_attr_t svattr, clattr;
+       sigset_t mask, oldmask;
+
+       mlockall(MCL_CURRENT | MCL_FUTURE);
+
+       sigemptyset(&mask);
+       sigaddset(&mask, SIGINT);
+       signal(SIGINT, cleanup_upon_sig);
+       sigaddset(&mask, SIGTERM);
+       signal(SIGTERM, cleanup_upon_sig);
+       sigaddset(&mask, SIGHUP);
+       signal(SIGHUP, cleanup_upon_sig);
+       pthread_sigmask(SIG_BLOCK, &mask, &oldmask);
+
+       /*
+        * This is a real-time compatible printf() package from
+        * Xenomai's RT Development Kit (RTDK), that does NOT cause
+        * any transition to secondary mode.
+        */
+       rt_print_auto_init(1);
+
+       pthread_attr_init(&svattr);
+       pthread_attr_setdetachstate(&svattr, PTHREAD_CREATE_JOINABLE);
+       pthread_attr_setinheritsched(&svattr, PTHREAD_EXPLICIT_SCHED);
+       pthread_attr_setschedpolicy(&svattr, SCHED_FIFO);
+       pthread_attr_setschedparam(&svattr, &svparam);
+
+       errno = pthread_create(&svtid, &svattr, &server, NULL);
+       if (errno)
+               fail("pthread_create");
+
+       pthread_attr_init(&clattr);
+       pthread_attr_setdetachstate(&clattr, PTHREAD_CREATE_JOINABLE);
+       pthread_attr_setinheritsched(&clattr, PTHREAD_EXPLICIT_SCHED);
+       pthread_attr_setschedpolicy(&clattr, SCHED_FIFO);
+       pthread_attr_setschedparam(&clattr, &clparam);
+
+       errno = pthread_create(&cltid, &clattr, &client, NULL);
+       if (errno)
+               fail("pthread_create");
+
+       sigsuspend(&oldmask);
+
+       return 0;
+}
diff --git a/examples/rtdm/profiles/ipc/bufp-readwrite.c 
b/examples/rtdm/profiles/ipc/bufp-readwrite.c
new file mode 100644
index 0000000..7ea1a07
--- /dev/null
+++ b/examples/rtdm/profiles/ipc/bufp-readwrite.c
@@ -0,0 +1,190 @@
+/*
+ * BUFP-based client/server demo, using the read(2)/write(2)
+ * system calls to exchange data over a socket.
+ *
+ * In this example, two sockets are created.  A server thread (reader)
+ * is bound to a real-time port and receives a stream of bytes sent to
+ * this port from a client thread (writer).
+ *
+ * See Makefile in this directory for build directives.
+ */
+#include <sys/mman.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include <pthread.h>
+#include <errno.h>
+#include <rtdk.h>
+#include <rtdm/rtipc.h>
+
+pthread_t svtid, cltid;
+
+#define BUFP_SVPORT 12
+
+static const char *msg[] = {
+    "Surfing With The Alien",
+    "Lords of Karma",
+    "Banana Mango",
+    "Psycho Monkey",
+    "Luminous Flesh Giants",
+    "Moroccan Sunset",
+    "Satch Boogie",
+    "Flying In A Blue Dream",
+    "Ride",
+    "Summer Song",
+    "Speed Of Light",
+    "Crystal Planet",
+    "Raspberry Jam Delta-V",
+    "Champagne?",
+    "Clouds Race Across The Sky",
+    "Engines Of Creation"
+};
+
+static void fail(const char *reason)
+{
+       perror(reason);
+       exit(EXIT_FAILURE);
+}
+
+void *server(void *arg)
+{
+       struct sockaddr_ipc saddr;
+       char buf[128];
+       size_t bufsz;
+       int ret, s;
+
+       s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_BUFP);
+       if (s < 0)
+               fail("socket");
+
+       /*
+        * Set a 16k buffer for the server endpoint. This
+        * configuration must be done prior to binding the socket to a
+        * port.
+        */
+       bufsz = 16384; /* bytes */
+       ret = setsockopt(s, SOL_RTIPC, BUFP_SETBUFFER,
+                        &bufsz, sizeof(bufsz));
+       if (ret)
+               fail("setsockopt");
+
+       saddr.sipc_family = AF_RTIPC;
+       saddr.sipc_port = BUFP_SVPORT;
+       ret = bind(s, (struct sockaddr *)&saddr, sizeof(saddr));
+       if (ret)
+               fail("bind");
+
+       for (;;) {
+               ret = read(s, buf, sizeof(buf));
+               if (ret < 0) {
+                       close(s);
+                       fail("read");
+               }
+               rt_printf("%s: received %d bytes, \"%.*s\"\n",
+                         __FUNCTION__, ret, ret, buf);
+       }
+
+       return NULL;
+}
+
+void *client(void *arg)
+{
+       struct sockaddr_ipc svsaddr;
+       int ret, s, n = 0, len;
+       struct timespec ts;
+       char buf[128];
+
+       s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_BUFP);
+       if (s < 0)
+               fail("socket");
+
+       memset(&svsaddr, 0, sizeof(svsaddr));
+       svsaddr.sipc_family = AF_RTIPC;
+       svsaddr.sipc_port = BUFP_SVPORT;
+       ret = connect(s, (struct sockaddr *)&svsaddr, sizeof(svsaddr));
+       if (ret)
+               fail("connect");
+
+       for (;;) {
+               len = strlen(msg[n]);
+               ret = write(s, msg[n], len);
+               if (ret < 0) {
+                       close(s);
+                       fail("write");
+               }
+               rt_printf("%s: sent %d bytes, \"%.*s\"\n",
+                         __FUNCTION__, ret, ret, msg[n]);
+               n = (n + 1) % (sizeof(msg) / sizeof(msg[0]));
+               /*
+                * We run in full real-time mode (i.e. primary mode),
+                * so we have to let the system breathe between two
+                * iterations.
+                */
+               ts.tv_sec = 0;
+               ts.tv_nsec = 500000000; /* 500 ms */
+               clock_nanosleep(CLOCK_REALTIME, 0, &ts, NULL);
+       }
+
+       return NULL;
+}
+
+void cleanup_upon_sig(int sig)
+{
+       pthread_cancel(svtid);
+       pthread_cancel(cltid);
+       signal(sig, SIG_DFL);
+       pthread_join(svtid, NULL);
+       pthread_join(cltid, NULL);
+}
+
+int main(int argc, char **argv)
+{
+       struct sched_param svparam = {.sched_priority = 71 };
+       struct sched_param clparam = {.sched_priority = 70 };
+       pthread_attr_t svattr, clattr;
+       sigset_t mask, oldmask;
+
+       mlockall(MCL_CURRENT | MCL_FUTURE);
+
+       sigemptyset(&mask);
+       sigaddset(&mask, SIGINT);
+       signal(SIGINT, cleanup_upon_sig);
+       sigaddset(&mask, SIGTERM);
+       signal(SIGTERM, cleanup_upon_sig);
+       sigaddset(&mask, SIGHUP);
+       signal(SIGHUP, cleanup_upon_sig);
+       pthread_sigmask(SIG_BLOCK, &mask, &oldmask);
+
+       /*
+        * This is a real-time compatible printf() package from
+        * Xenomai's RT Development Kit (RTDK), that does NOT cause
+        * any transition to secondary mode.
+        */
+       rt_print_auto_init(1);
+
+       pthread_attr_init(&svattr);
+       pthread_attr_setdetachstate(&svattr, PTHREAD_CREATE_JOINABLE);
+       pthread_attr_setinheritsched(&svattr, PTHREAD_EXPLICIT_SCHED);
+       pthread_attr_setschedpolicy(&svattr, SCHED_FIFO);
+       pthread_attr_setschedparam(&svattr, &svparam);
+
+       errno = pthread_create(&svtid, &svattr, &server, NULL);
+       if (errno)
+               fail("pthread_create");
+
+       pthread_attr_init(&clattr);
+       pthread_attr_setdetachstate(&clattr, PTHREAD_CREATE_JOINABLE);
+       pthread_attr_setinheritsched(&clattr, PTHREAD_EXPLICIT_SCHED);
+       pthread_attr_setschedpolicy(&clattr, SCHED_FIFO);
+       pthread_attr_setschedparam(&clattr, &clparam);
+
+       errno = pthread_create(&cltid, &clattr, &client, NULL);
+       if (errno)
+               fail("pthread_create");
+
+       sigsuspend(&oldmask);
+
+       return 0;
+}
diff --git a/include/rtdm/rtipc.h b/include/rtdm/rtipc.h
index 091ea5c..0574671 100644
--- a/include/rtdm/rtipc.h
+++ b/include/rtdm/rtipc.h
@@ -58,9 +58,6 @@
 #include <nucleus/types.h>
 #include <rtdm/rtdm.h>
 
-#define RTDM_SUBCLASS_XDDP     0
-#define RTDM_SUBCLASS_IDDP     1
-
 /* Address family */
 #define AF_RTIPC               111
 
@@ -71,9 +68,23 @@ enum {
        IPCPROTO_IPC  = 0,      /* Default protocol (IDDP) */
        IPCPROTO_XDDP = 1,      /* Cross-domain datagram protocol */
        IPCPROTO_IDDP = 2,      /* Intra-domain datagram protocol */
+       IPCPROTO_BUFP = 3,      /* Buffer protocol */
        IPCPROTO_MAX
 };
 
+ /*
+  * Valid port ranges:
+  * XDDP = [0..OPT_PIPE_NRDEV-1]
+  * IDDP = [0..OPT_IDDP_NRPORT-1]
+  * BUFP = [0..OPT_BUFP_NRPORT-1]
+  */
+typedef int16_t rtipc_port_t;
+
+struct sockaddr_ipc {
+       sa_family_t sipc_family; /* AF_RTIPC */
+       rtipc_port_t sipc_port;
+};
+
 /* RTIPC socket level */
 #define SOL_RTIPC  311
 
@@ -87,21 +98,13 @@ enum {
 #define IDDP_GETSTALLCOUNT     7
 #define IDDP_SETLABEL          8
 #define IDDP_GETLABEL          9
+#define BUFP_SETBUFFER         10
+#define BUFP_SETLABEL          11
+#define BUFP_GETLABEL          12
 
 #define XDDP_LABEL_LEN         XNOBJECT_NAME_LEN
 #define IDDP_LABEL_LEN         XNOBJECT_NAME_LEN
-
- /*
-  * Valid port ranges:
-  * XDDP = [0..OPT_PIPE_NRDEV-1]
-  * IDDP = [0..OPT_IDDP_NRPORT-1]
-  */
-typedef int16_t rtipc_port_t;
-
-struct sockaddr_ipc {
-       sa_family_t sipc_family; /* AF_RTIPC */
-       rtipc_port_t sipc_port;
-};
+#define BUFP_LABEL_LEN         XNOBJECT_NAME_LEN
 
 /* XDDP in-kernel monitored events */
 #define XDDP_EVTIN     1
diff --git a/ksrc/drivers/ipc/Kconfig b/ksrc/drivers/ipc/Kconfig
index a3921ac..77355d4 100644
--- a/ksrc/drivers/ipc/Kconfig
+++ b/ksrc/drivers/ipc/Kconfig
@@ -44,7 +44,7 @@ config XENO_DRIVERS_RTIPC_IDDP
 
 config XENO_OPT_IDDP_NRPORT
        depends on XENO_DRIVERS_RTIPC_IDDP
-       int "Number of communication ports"
+       int "Number of IDDP communication ports"
        default 32
        help
 
@@ -52,4 +52,35 @@ config XENO_OPT_IDDP_NRPORT
        the system for creating receiver endpoints. Port numbers range
        from 0 to CONFIG_XENO_OPT_IDDP_NRPORT - 1.
 
+config XENO_DRIVERS_RTIPC_BUFP
+       depends on XENO_DRIVERS_RTIPC
+       select XENO_OPT_MAP
+       bool "Buffer protocol"
+       help
+
+       The buffer protocol implements a byte-oriented, one-way
+       Producer-Consumer data path, which makes it a bit faster than
+       datagram-oriented protocols. All messages written are buffered
+       into a single memory area in strict FIFO order, until read by
+       the consumer.
+
+       This protocol prevents short writes, and only allows short
+       reads when a potential deadlock situation arises (i.e. readers
+       and writers waiting for each other indefinitely), which
+       usually means that the buffer size does not fit the use peer
+       threads are making from the protocol.
+
+       NOTE: this protocol is strictly identical to the RT_BUFFER
+       interface available from the native skin.
+
+config XENO_OPT_BUFP_NRPORT
+       depends on XENO_DRIVERS_RTIPC_BUFP
+       int "Number of BUFP communication ports"
+       default 32
+       help
+
+       This parameter defines the number of BUFP ports available in
+       the system for creating receiver endpoints. Port numbers range
+       from 0 to CONFIG_XENO_OPT_BUFP_NRPORT - 1.
+
 endmenu
diff --git a/ksrc/drivers/ipc/Makefile b/ksrc/drivers/ipc/Makefile
index cfe58ec..04e162f 100644
--- a/ksrc/drivers/ipc/Makefile
+++ b/ksrc/drivers/ipc/Makefile
@@ -10,6 +10,7 @@ xeno_rtipc-y := rtipc.o
 
 xeno_rtipc-$(CONFIG_XENO_DRIVERS_RTIPC_XDDP) += xddp.o
 xeno_rtipc-$(CONFIG_XENO_DRIVERS_RTIPC_IDDP) += iddp.o
+xeno_rtipc-$(CONFIG_XENO_DRIVERS_RTIPC_BUFP) += bufp.o
 
 else
 
@@ -24,6 +25,7 @@ xeno_rtipc-objs := rtipc.o
 opt_objs-y :=
 opt_objs-$(CONFIG_XENO_DRIVERS_RTIPC_XDDP) += xddp.o
 opt_objs-$(CONFIG_XENO_DRIVERS_RTIPC_IDDP) += iddp.o
+opt_objs-$(CONFIG_XENO_DRIVERS_RTIPC_BUFP) += bufp.o
 
 xeno_rtipc-objs += $(opt_objs-y)
 
diff --git a/ksrc/drivers/ipc/bufp.c b/ksrc/drivers/ipc/bufp.c
new file mode 100644
index 0000000..99616bb
--- /dev/null
+++ b/ksrc/drivers/ipc/bufp.c
@@ -0,0 +1,1054 @@
+/**
+ * This file is part of the Xenomai project.
+ *
+ * @note Copyright (C) 2009 Philippe Gerum <r...@xenomai.org> 
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+#include <linux/module.h>
+#include <linux/list.h>
+#include <linux/kernel.h>
+#include <linux/vmalloc.h>
+#include <nucleus/heap.h>
+#include <nucleus/map.h>
+#include <nucleus/bufd.h>
+#include <rtdm/rtipc.h>
+#include "internal.h"
+
+#define trace(m,a...) printk(KERN_WARNING "%s: " m "\n", __FUNCTION__, ##a)
+
+#define BUFP_SOCKET_MAGIC 0xa61a61a6
+
+struct bufp_socket {
+       int magic;
+       struct sockaddr_ipc name;
+       struct sockaddr_ipc peer;
+
+       void *bufmem;
+       size_t bufsz;
+       u_long status;
+       xnhandle_t handle;
+       char label[BUFP_LABEL_LEN];
+
+       off_t rdoff;
+       off_t wroff;
+       size_t fillsz;
+       u_long wrtoken;
+       u_long rdtoken;
+       rtdm_event_t i_event;
+       rtdm_event_t o_event;
+
+       nanosecs_rel_t rx_timeout;
+       nanosecs_rel_t tx_timeout;
+
+       struct rtipc_private *priv;
+};
+
+struct bufp_wait_context {
+       struct rtipc_wait_context wc;
+       size_t len;
+       struct bufp_socket *sk;
+       rtdm_lockctx_t lockctx;
+};
+
+static struct sockaddr_ipc nullsa = {
+       .sipc_family = AF_RTIPC,
+       .sipc_port = -1
+};
+
+static struct xnmap *portmap;
+
+#define _BUFP_BINDING  0
+#define _BUFP_BOUND    1
+
+#ifdef CONFIG_PROC_FS
+
+static ssize_t __bufp_link_proc(char *buf, int count, void *data)
+{
+       struct bufp_socket *sk = data;
+       return snprintf(buf, count, "%d", sk->name.sipc_port);
+}
+
+static struct xnpnode __bufp_pnode = {
+
+       .dir = NULL,
+       .type = "bufp",
+       .entries = 0,
+       .link_proc = &__bufp_link_proc,
+       .root = &rtipc_ptree,
+};
+
+#else /* !CONFIG_PROC_FS */
+
+static struct xnpnode __bufp_pnode = {
+
+       .type = "bufp"
+};
+
+#endif /* !CONFIG_PROC_FS */
+
+static void __bufp_cleanup_handler(struct rtipc_wait_context *wc)
+{
+       struct bufp_wait_context *bufwc;
+       /*
+        * Cancellation request is pending - release the lock we hold,
+        * we'll be vanishing away soon. Granted, we could avoid doing
+        * that, since we know that this particular lock is Xenomai's
+        * nklock, which may be held across rescheduling calls.
+        * Anyway, this illustrates how to use the cleanup handler of
+        * a wait context.
+        */
+       bufwc = container_of(wc, struct bufp_wait_context, wc);
+       rtipc_leave_atomic(bufwc->lockctx);
+}
+
+static int bufp_socket(struct rtipc_private *priv,
+                      rtdm_user_info_t *user_info)
+{
+       struct bufp_socket *sk = priv->state;
+
+       sk->magic = BUFP_SOCKET_MAGIC;
+       sk->name = nullsa;      /* Unbound */
+       sk->peer = nullsa;
+       sk->bufmem = NULL;
+       sk->bufsz = 0;
+       sk->rdoff = 0;
+       sk->wroff = 0;
+       sk->fillsz = 0;
+       sk->rdtoken = 0;
+       sk->wrtoken = 0;
+       sk->status = 0;
+       sk->handle = 0;
+       sk->rx_timeout = RTDM_TIMEOUT_INFINITE;
+       sk->tx_timeout = RTDM_TIMEOUT_INFINITE;
+       *sk->label = 0;
+       rtdm_event_init(&sk->i_event, 0);
+       rtdm_event_init(&sk->o_event, 0);
+       sk->priv = priv;
+
+       return 0;
+}
+
+static int bufp_close(struct rtipc_private *priv,
+                     rtdm_user_info_t *user_info)
+{
+       struct bufp_socket *sk = priv->state;
+
+       rtdm_event_destroy(&sk->i_event);
+       rtdm_event_destroy(&sk->o_event);
+
+       if (sk->name.sipc_port > -1)
+               xnmap_remove(portmap, sk->name.sipc_port);
+
+       if (sk->handle)
+               xnregistry_remove(sk->handle);
+
+       if (sk->bufmem)
+               xnarch_free_host_mem(sk->bufmem, sk->bufsz);
+
+       return 0;
+}
+
+static ssize_t __bufp_readbuf(struct bufp_socket *sk,
+                             struct xnbufd *bufd,
+                             int flags)
+{
+       struct bufp_wait_context wait, *bufwc;
+       struct rtipc_wait_context *wc;
+       rtdm_task_t *waiter;
+       rtdm_toseq_t toseq;
+       ssize_t len, ret;
+       size_t rbytes, n;
+       u_long rdtoken;
+       off_t rdoff;
+
+       len = bufd->b_len;
+
+       rtdm_toseq_init(&toseq, sk->rx_timeout);
+
+       rtipc_enter_atomic(wait.lockctx);
+
+redo:
+       for (;;) {
+               /*
+                * We should be able to read a complete message of the
+                * requested length, or block.
+                */
+               if (sk->fillsz < len)
+                       goto wait;
+
+               /*
+                * Draw the next read token so that we can later
+                * detect preemption.
+                */
+               rdtoken = ++sk->rdtoken;
+
+               /* Read from the buffer in a circular way. */
+               rdoff = sk->rdoff;
+               rbytes = len;
+
+               do {
+                       if (rdoff + rbytes > sk->bufsz)
+                               n = sk->bufsz - rdoff;
+                       else
+                               n = rbytes;
+                       /*
+                        * Release the lock while retrieving the data
+                        * to keep latency low.
+                        */
+                       rtipc_leave_atomic(wait.lockctx);
+                       ret = xnbufd_copy_from_kmem(bufd, sk->bufmem + rdoff, 
n);
+                       if (ret < 0)
+                               return ret;
+
+                       rtipc_enter_atomic(wait.lockctx);
+                       /*
+                        * In case we were preempted while retrieving
+                        * the message, we have to re-read the whole
+                        * thing.
+                        */
+                       if (sk->rdtoken != rdtoken)
+                               goto redo;
+
+                       rdoff = (rdoff + n) % sk->bufsz;
+                       rbytes -= n;
+               } while (rbytes > 0);
+
+               sk->fillsz -= len;
+               sk->rdoff = rdoff;
+               ret = len;
+
+               /*
+                * Wake up all threads pending on the output wait
+                * queue, if we freed enough room for the leading one
+                * to post its message.
+                */
+               waiter = rtipc_peek_wait_head(&sk->o_event);
+               if (waiter == NULL)
+                       goto out;
+
+               wc = rtipc_get_wait_context(waiter);
+               XENO_BUGON(NUCLEUS, wc == NULL);
+               bufwc = container_of(wc, struct bufp_wait_context, wc);
+               if (bufwc->len + sk->fillsz <= sk->bufsz)
+                       rtdm_event_pulse(&sk->o_event);
+               /*
+                * We cannot fail anymore once some data has been
+                * copied via the buffer descriptor, so no need to
+                * check for any reason to invalidate the latter.
+                */
+               goto out;
+
+       wait:
+               if (flags & MSG_DONTWAIT) {
+                       ret = -EWOULDBLOCK;
+                       break;
+               }
+
+               /*
+                * Check whether writers are already waiting for
+                * sending data, while we are about to wait for
+                * receiving some. In such a case, we have a
+                * pathological use of the buffer. We must allow for a
+                * short read to prevent a deadlock.
+                */
+               if (sk->fillsz > 0 && rtipc_peek_wait_head(&sk->o_event)) {
+                       len = sk->fillsz;
+                       goto redo;
+               }
+
+               wait.len = len;
+               wait.sk = sk;
+               rtipc_prepare_wait(&wait.wc);
+               rtipc_leave_atomic(wait.lockctx);
+               ret = rtdm_event_timedwait(&sk->i_event,
+                                          sk->rx_timeout, &toseq);
+               rtipc_enter_atomic(wait.lockctx);
+               rtipc_finish_wait(&wait.wc, __bufp_cleanup_handler);
+
+               if (unlikely(ret))
+                       break;
+       }
+
+out:
+       rtipc_leave_atomic(wait.lockctx);
+
+       return ret;
+}
+
+static ssize_t __bufp_recvmsg(struct rtipc_private *priv,
+                             rtdm_user_info_t *user_info,
+                             struct iovec *iov, int iovlen, int flags,
+                             struct sockaddr_ipc *saddr)
+{
+       struct bufp_socket *sk = priv->state;
+       ssize_t len, wrlen, vlen, ret;
+       struct xnbufd bufd;
+       int nvec;
+
+       if (!test_bit(_BUFP_BOUND, &sk->status))
+               return -EAGAIN;
+
+       len = rtipc_get_iov_flatlen(iov, iovlen);
+       if (len == 0)
+               return 0;
+       /*
+        * We may only return complete messages to readers, so there
+        * is no point in waiting for messages which are larger than
+        * what the buffer can hold.
+        */
+       if (len > sk->bufsz)
+               return -EINVAL;
+
+       /*
+        * Write "len" bytes from the buffer to the vector cells. Each
+        * cell is handled as a separate message.
+        */
+       for (nvec = 0, wrlen = len; nvec < iovlen && wrlen > 0; nvec++) {
+               if (iov[nvec].iov_len == 0)
+                       continue;
+               vlen = wrlen >= iov[nvec].iov_len ? iov[nvec].iov_len : wrlen;
+               if (user_info) {
+                       xnbufd_map_uread(&bufd, iov[nvec].iov_base, vlen);
+                       ret = __bufp_readbuf(sk, &bufd, flags);
+                       xnbufd_unmap_uread(&bufd);
+               } else {
+                       xnbufd_map_kread(&bufd, iov[nvec].iov_base, vlen);
+                       ret = __bufp_readbuf(sk, &bufd, flags);
+                       xnbufd_unmap_kread(&bufd);
+               }
+               if (ret < 0)
+                       return ret;
+               iov[nvec].iov_base += vlen;
+               iov[nvec].iov_len -= vlen;
+               wrlen -= vlen;
+               if (ret < vlen)
+                       /* Short reads may happen in rare cases. */
+                       break;
+       }
+
+       /*
+        * There is no way to determine who the sender was since we
+        * process data in byte-oriented mode, so we just copy our own
+        * sockaddr to send back a valid address.
+        */
+       if (saddr)
+               *saddr = sk->name;
+
+       return len - wrlen;
+}
+
+static ssize_t bufp_recvmsg(struct rtipc_private *priv,
+                           rtdm_user_info_t *user_info,
+                           struct msghdr *msg, int flags)
+{
+       struct iovec iov[RTIPC_IOV_MAX];
+       struct sockaddr_ipc saddr;
+       ssize_t ret;
+
+       if (flags & ~MSG_DONTWAIT)
+               return -EINVAL;
+
+       if (msg->msg_name) {
+               if (msg->msg_namelen < sizeof(struct sockaddr_ipc))
+                       return -EINVAL;
+       } else if (msg->msg_namelen != 0)
+               return -EINVAL;
+
+       if (msg->msg_iovlen >= RTIPC_IOV_MAX)
+               return -EINVAL;
+
+       /* Copy I/O vector in */
+       if (rtipc_get_arg(user_info, iov, msg->msg_iov,
+                         sizeof(iov[0]) * msg->msg_iovlen))
+               return -EFAULT;
+
+       ret = __bufp_recvmsg(priv, user_info,
+                            iov, msg->msg_iovlen, flags, &saddr);
+       if (ret <= 0)
+               return ret;
+
+       /* Copy the updated I/O vector back */
+       if (rtipc_put_arg(user_info, msg->msg_iov, iov,
+                         sizeof(iov[0]) * msg->msg_iovlen))
+               return -EFAULT;
+
+       /* Copy the source address if required. */
+       if (msg->msg_name) {
+               if (rtipc_put_arg(user_info, msg->msg_name,
+                                 &saddr, sizeof(saddr)))
+                       return -EFAULT;
+               msg->msg_namelen = sizeof(struct sockaddr_ipc);
+       }
+
+       return ret;
+}
+
+static ssize_t bufp_read(struct rtipc_private *priv,
+                        rtdm_user_info_t *user_info,
+                        void *buf, size_t len)
+{
+       struct iovec iov = { .iov_base = buf, .iov_len = len };
+       return __bufp_recvmsg(priv, user_info, &iov, 1, 0, NULL);
+}
+
+static ssize_t __bufp_writebuf(struct bufp_socket *rsk,
+                              struct bufp_socket *sk,
+                              struct xnbufd *bufd,
+                              int flags)
+{
+       struct bufp_wait_context wait, *bufwc;
+       struct rtipc_wait_context *wc;
+       rtdm_task_t *waiter;
+       rtdm_toseq_t toseq;
+       ssize_t len, ret;
+       size_t wbytes, n;
+       u_long wrtoken;
+       off_t wroff;
+
+       len = bufd->b_len;
+
+       rtdm_toseq_init(&toseq, sk->rx_timeout);
+
+       rtipc_enter_atomic(wait.lockctx);
+
+redo:
+       for (;;) {
+               /*
+                * We should be able to write the entire message at
+                * once or block.
+                */
+               if (rsk->fillsz + len > rsk->bufsz)
+                       goto wait;
+
+               /*
+                * Draw the next write token so that we can later
+                * detect preemption.
+                */
+               wrtoken = ++rsk->wrtoken;
+
+               /* Write to the buffer in a circular way. */
+               wroff = rsk->wroff;
+               wbytes = len;
+
+               do {
+                       if (wroff + wbytes > rsk->bufsz)
+                               n = rsk->bufsz - wroff;
+                       else
+                               n = wbytes;
+                       /*
+                        * Release the lock while copying the data to
+                        * keep latency low.
+                        */
+                       rtipc_leave_atomic(wait.lockctx);
+                       ret = xnbufd_copy_to_kmem(rsk->bufmem + wroff, bufd, n);
+                       if (ret < 0)
+                               return ret;
+                       rtipc_enter_atomic(wait.lockctx);
+                       /*
+                        * In case we were preempted while copying the
+                        * message, we have to write the whole thing
+                        * again.
+                        */
+                       if (rsk->wrtoken != wrtoken)
+                               goto redo;
+
+                       wroff = (wroff + n) % rsk->bufsz;
+                       wbytes -= n;
+               } while (wbytes > 0);
+
+               rsk->fillsz += len;
+               rsk->wroff = wroff;
+               ret = len;
+
+               /*
+                * Wake up all threads pending on the input wait
+                * queue, if we accumulated enough data to feed the
+                * leading one.
+                */
+               waiter = rtipc_peek_wait_head(&rsk->i_event);
+               if (waiter == NULL)
+                       goto out;
+
+               wc = rtipc_get_wait_context(waiter);
+               XENO_BUGON(NUCLEUS, wc == NULL);
+               bufwc = container_of(wc, struct bufp_wait_context, wc);
+               if (bufwc->len <= rsk->fillsz)
+                       rtdm_event_pulse(&rsk->i_event);
+               /*
+                * We cannot fail anymore once some data has been
+                * copied via the buffer descriptor, so no need to
+                * check for any reason to invalidate the latter.
+                */
+               goto out;
+
+       wait:
+               if (flags & MSG_DONTWAIT) {
+                       ret = -EWOULDBLOCK;
+                       break;
+               }
+
+               wait.len = len;
+               wait.sk = rsk;
+               rtipc_prepare_wait(&wait.wc);
+               rtipc_leave_atomic(wait.lockctx);
+               ret = rtdm_event_timedwait(&rsk->o_event,
+                                          sk->tx_timeout, &toseq);
+               rtipc_enter_atomic(wait.lockctx);
+               rtipc_finish_wait(&wait.wc, __bufp_cleanup_handler);
+               if (unlikely(ret))
+                       break;
+       }
+
+out:
+       rtipc_leave_atomic(wait.lockctx);
+
+       return ret;
+}
+
+static ssize_t __bufp_sendmsg(struct rtipc_private *priv,
+                             rtdm_user_info_t *user_info,
+                             struct iovec *iov, int iovlen, int flags,
+                             const struct sockaddr_ipc *daddr)
+{
+       struct bufp_socket *sk = priv->state, *rsk;
+       struct rtdm_dev_context *rcontext = NULL; /* Fake GCC */
+       ssize_t len, rdlen, vlen, ret;
+       struct xnbufd bufd;
+       int nvec, to;
+
+       len = rtipc_get_iov_flatlen(iov, iovlen);
+       if (len == 0)
+               return 0;
+
+       to = daddr->sipc_port;
+
+       RTDM_EXECUTE_ATOMICALLY(
+               rsk = xnmap_fetch_nocheck(portmap, to);
+               if (unlikely(rsk == NULL))
+                       ret = -ECONNRESET;
+               else {
+                       rcontext = rtdm_private_to_context(rsk->priv);
+                       rtdm_context_lock(rcontext);
+                       ret = 0;
+               }
+       );
+       if (ret)
+               return ret;
+       /*
+        * We may only send complete messages, so there is no point in
+        * accepting messages which are larger than what the buffer
+        * can hold.
+        */
+       if (len > rsk->bufsz) {
+               ret = -EINVAL;
+               goto fail;
+       }
+
+       /*
+        * Read "len" bytes to the buffer from the vector cells. Each
+        * cell is handled as a separate message.
+        */
+       for (nvec = 0, rdlen = len; nvec < iovlen && rdlen > 0; nvec++) {
+               if (iov[nvec].iov_len == 0)
+                       continue;
+               vlen = rdlen >= iov[nvec].iov_len ? iov[nvec].iov_len : rdlen;
+               if (user_info) {
+                       xnbufd_map_uread(&bufd, iov[nvec].iov_base, vlen);
+                       ret = __bufp_writebuf(rsk, sk, &bufd, flags);
+                       xnbufd_unmap_uread(&bufd);
+               } else {
+                       xnbufd_map_kread(&bufd, iov[nvec].iov_base, vlen);
+                       ret = __bufp_writebuf(rsk, sk, &bufd, flags);
+                       xnbufd_unmap_kread(&bufd);
+               }
+               if (ret < 0)
+                       goto fail;
+               iov[nvec].iov_base += vlen;
+               iov[nvec].iov_len -= vlen;
+               rdlen -= vlen;
+       }
+
+       rtdm_context_unlock(rcontext);
+
+       return len - rdlen;
+
+fail:
+       rtdm_context_unlock(rcontext);
+
+       return ret;
+}
+
+static ssize_t bufp_sendmsg(struct rtipc_private *priv,
+                           rtdm_user_info_t *user_info,
+                           const struct msghdr *msg, int flags)
+{
+       struct bufp_socket *sk = priv->state;
+       struct iovec iov[RTIPC_IOV_MAX];
+       struct sockaddr_ipc daddr;
+       ssize_t ret;
+
+       if (flags & ~(MSG_OOB | MSG_DONTWAIT))
+               return -EINVAL;
+
+       if (msg->msg_name) {
+               if (msg->msg_namelen != sizeof(struct sockaddr_ipc))
+                       return -EINVAL;
+
+               /* Fetch the destination address to send to. */
+               if (rtipc_get_arg(user_info, &daddr,
+                                 msg->msg_name, sizeof(daddr)))
+                       return -EFAULT;
+
+               if (daddr.sipc_port < 0 ||
+                   daddr.sipc_port >= CONFIG_XENO_OPT_IDDP_NRPORT)
+                       return -EINVAL;
+       } else {
+               if (msg->msg_namelen != 0)
+                       return -EINVAL;
+               daddr = sk->peer;
+               if (daddr.sipc_port < 0)
+                       return -ENOTCONN;
+       }
+
+       if (msg->msg_iovlen >= RTIPC_IOV_MAX)
+               return -EINVAL;
+
+       /* Copy I/O vector in */
+       if (rtipc_get_arg(user_info, iov, msg->msg_iov,
+                         sizeof(iov[0]) * msg->msg_iovlen))
+               return -EFAULT;
+
+       ret = __bufp_sendmsg(priv, user_info, iov,
+                            msg->msg_iovlen, flags, &daddr);
+       if (ret <= 0)
+               return ret;
+
+       /* Copy updated I/O vector back */
+       if (rtipc_put_arg(user_info, msg->msg_iov, iov,
+                         sizeof(iov[0]) * msg->msg_iovlen))
+               return -EFAULT;
+
+       return ret;
+}
+
+static ssize_t bufp_write(struct rtipc_private *priv,
+                         rtdm_user_info_t *user_info,
+                         const void *buf, size_t len)
+{
+       struct iovec iov = { .iov_base = (void *)buf, .iov_len = len };
+       struct bufp_socket *sk = priv->state;
+
+       if (sk->peer.sipc_port < 0)
+               return -EDESTADDRREQ;
+
+       return __bufp_sendmsg(priv, user_info, &iov, 1, 0, &sk->peer);
+}
+
+static int __bufp_bind_socket(struct bufp_socket *sk,
+                             struct sockaddr_ipc *sa)
+{
+       int ret = 0, port;
+
+       if (sa->sipc_family != AF_RTIPC)
+               return -EINVAL;
+
+       if (sa->sipc_port < -1 ||
+           sa->sipc_port >= CONFIG_XENO_OPT_BUFP_NRPORT)
+               return -EINVAL;
+
+       RTDM_EXECUTE_ATOMICALLY(
+               if (test_bit(_BUFP_BOUND, &sk->status) ||
+                   __test_and_set_bit(_BUFP_BINDING, &sk->status))
+                       ret = -EADDRINUSE;
+       );
+       if (ret)
+               return ret;
+
+       port = sa->sipc_port;
+       /* Will auto-select a free port number if unspec (-1). */
+       port = xnmap_enter(portmap, port, sk);
+       if (port < 0)
+               return port == -EEXIST ? -EADDRINUSE : -ENOMEM;
+
+       sa->sipc_port = port;
+
+       /*
+        * The caller must have told us how much memory is needed for
+        * buffer space via setsockopt(), before we got there.
+        */
+       if (sk->bufsz == 0)
+               return -EINVAL;
+
+       sk->bufmem = xnarch_alloc_host_mem(sk->bufsz);
+       if (sk->bufmem == NULL) {
+               ret = -ENOMEM;
+               goto fail;
+       }
+
+       sk->name = *sa;
+       /* Set default destination if unset at binding time. */
+       if (sk->peer.sipc_port < 0)
+               sk->peer = *sa;
+
+       if (*sk->label) {
+               ret = xnregistry_enter(sk->label, sk,
+                                      &sk->handle, &__bufp_pnode);
+               if (ret) {
+                       xnarch_free_host_mem(sk->bufmem, sk->bufsz);
+                       goto fail;
+               }
+       }
+
+       RTDM_EXECUTE_ATOMICALLY(
+               __clear_bit(_BUFP_BINDING, &sk->status);
+               __set_bit(_BUFP_BOUND, &sk->status);
+       );
+
+       return 0;
+fail:
+       xnmap_remove(portmap, port);
+       clear_bit(_BUFP_BINDING, &sk->status);
+       
+       return ret;
+}
+
+static int __bufp_connect_socket(struct bufp_socket *sk,
+                                struct sockaddr_ipc *sa)
+{
+       struct bufp_socket *rsk;
+       xnhandle_t h;
+       int ret;
+
+       if (sa == NULL) {
+               sa = &nullsa;
+               goto set_assoc;
+       }
+
+       if (sa->sipc_family != AF_RTIPC)
+               return -EINVAL;
+
+       if (sa->sipc_port < -1 ||
+           sa->sipc_port >= CONFIG_XENO_OPT_BUFP_NRPORT)
+               return -EINVAL;
+       /*
+        * - If a valid sipc_port is passed in the [0..NRPORT-1] range,
+        * it is used verbatim and the connection succeeds
+        * immediately, regardless of whether the destination is
+        * bound at the time of the call.
+        *
+        * - If sipc_port is -1 and a label was set via BUFP_SETLABEL,
+        * connect() blocks for the requested amount of time until a
+        * socket is bound to the same label, unless the internal
+        * timeout (see SO_RCVTIMEO) specifies a non-blocking
+        * operation (RTDM_TIMEOUT_NONE).
+        *
+        * - If sipc_port is -1 and no label is given, the default
+        * destination address is cleared, meaning that any subsequent
+        * write() to the socket will return -EDESTADDRREQ, until a
+        * valid destination address is set via connect() or bind().
+        *
+        * - In all other cases, -EINVAL is returned.
+        */
+       if (sa->sipc_port < 0 && *sk->label) {
+               ret = xnregistry_bind(sk->label,
+                                     sk->rx_timeout, XN_RELATIVE, &h);
+               if (ret)
+                       return ret;
+
+               RTDM_EXECUTE_ATOMICALLY(
+                       rsk = xnregistry_fetch(h);
+                       if (rsk == NULL || rsk->magic != BUFP_SOCKET_MAGIC)
+                               ret = -EINVAL;
+                       else
+                               /* Fetch labeled port number. */
+                               sa->sipc_port = rsk->name.sipc_port;
+               );
+               if (ret)
+                       return ret;
+       }
+
+set_assoc:
+       RTDM_EXECUTE_ATOMICALLY(
+               if (!test_bit(_BUFP_BOUND, &sk->status))
+                       /* Set default name. */
+                       sk->name = *sa;
+               /* Set default destination. */
+               sk->peer = *sa;
+       );
+
+       return 0;
+}
+
+static int __bufp_setsockopt(struct bufp_socket *sk,
+                            rtdm_user_info_t *user_info,
+                            void *arg)
+{
+       struct _rtdm_setsockopt_args sopt;
+       char label[BUFP_LABEL_LEN];
+       struct timeval tv;
+       int ret = 0;
+       size_t len;
+
+       if (rtipc_get_arg(user_info, &sopt, arg, sizeof(sopt)))
+               return -EFAULT;
+
+       if (sopt.level == SOL_SOCKET) {
+               switch (sopt.optname) {
+
+               case SO_RCVTIMEO:
+                       if (sopt.optlen != sizeof(tv))
+                               return -EINVAL;
+                       if (rtipc_get_arg(user_info, &tv,
+                                         sopt.optval, sizeof(tv)))
+                               return -EFAULT;
+                       sk->rx_timeout = rtipc_timeval_to_ns(&tv);
+                       break;
+
+               case SO_SNDTIMEO:
+                       if (sopt.optlen != sizeof(tv))
+                               return -EINVAL;
+                       if (rtipc_get_arg(user_info, &tv,
+                                         sopt.optval, sizeof(tv)))
+                               return -EFAULT;
+                       sk->tx_timeout = rtipc_timeval_to_ns(&tv);
+                       break;
+
+               default:
+                       ret = -EINVAL;
+               }
+
+               return ret;
+       }
+
+       if (sopt.level != SOL_RTIPC)
+               return -ENOPROTOOPT;
+
+       switch (sopt.optname) {
+
+       case BUFP_SETBUFFER:
+               if (sopt.optlen != sizeof(len))
+                       return -EINVAL;
+               if (rtipc_get_arg(user_info, &len,
+                                 sopt.optval, sizeof(len)))
+                       return -EFAULT;
+               if (len == 0)
+                       return -EINVAL;
+               RTDM_EXECUTE_ATOMICALLY(
+                       /*
+                        * We may not do this more than once, and we
+                        * have to do this before the first binding.
+                        */
+                       if (test_bit(_BUFP_BOUND, &sk->status) ||
+                           test_bit(_BUFP_BINDING, &sk->status))
+                               ret = -EALREADY;
+                       else
+                               sk->bufsz = len;
+               );
+               break;
+
+       case BUFP_SETLABEL:
+               if (sopt.optlen < sizeof(label))
+                       return -EINVAL;
+               if (rtipc_get_arg(user_info, label,
+                                 sopt.optval, sizeof(label) - 1))
+                       return -EFAULT;
+               RTDM_EXECUTE_ATOMICALLY(
+                       /*
+                        * We may attach a label to a client socket
+                        * which was previously bound in BUFP.
+                        */
+                       if (test_bit(_BUFP_BINDING, &sk->status))
+                               ret = -EALREADY;
+                       else {
+                               strcpy(sk->label, label);
+                               sk->label[BUFP_LABEL_LEN-1] = 0;
+                       }
+               );
+               break;
+
+       default:
+               ret = -EINVAL;
+       }
+
+       return ret;
+}
+
+static int __bufp_getsockopt(struct bufp_socket *sk,
+                            rtdm_user_info_t *user_info,
+                            void *arg)
+{
+       struct _rtdm_getsockopt_args sopt;
+       char label[BUFP_LABEL_LEN];
+       struct timeval tv;
+       socklen_t len;
+       int ret = 0;
+
+       if (rtipc_get_arg(user_info, &sopt, arg, sizeof(sopt)))
+               return -EFAULT;
+
+       if (rtipc_get_arg(user_info, &len, sopt.optlen, sizeof(len)))
+               return -EFAULT;
+
+       if (sopt.level == SOL_SOCKET) {
+               switch (sopt.optname) {
+
+               case SO_RCVTIMEO:
+                       if (len != sizeof(tv))
+                               return -EINVAL;
+                       rtipc_ns_to_timeval(&tv, sk->rx_timeout);
+                       if (rtipc_put_arg(user_info, sopt.optval,
+                                         &tv, sizeof(tv)))
+                               return -EFAULT;
+                       break;
+
+               case SO_SNDTIMEO:
+                       if (len != sizeof(tv))
+                               return -EINVAL;
+                       rtipc_ns_to_timeval(&tv, sk->tx_timeout);
+                       if (rtipc_put_arg(user_info, sopt.optval,
+                                         &tv, sizeof(tv)))
+                               return -EFAULT;
+                       break;
+
+               default:
+                       ret = -EINVAL;
+               }
+
+               return ret;
+       }
+
+       if (sopt.level != SOL_RTIPC)
+               return -ENOPROTOOPT;
+
+       switch (sopt.optname) {
+
+       case BUFP_GETLABEL:
+               if (len < sizeof(label))
+                       return -EINVAL;
+               RTDM_EXECUTE_ATOMICALLY(
+                       strcpy(label, sk->label);
+               );
+               if (rtipc_put_arg(user_info, sopt.optval,
+                                 label, sizeof(label)))
+                       return -EFAULT;
+               break;
+
+       default:
+               ret = -EINVAL;
+       }
+
+       return ret;
+}
+
+static int __bufp_ioctl(struct bufp_socket *sk,
+                       rtdm_user_info_t *user_info,
+                       unsigned int request, void *arg)
+{
+       struct sockaddr_ipc saddr, *saddrp = &saddr;
+       int ret = 0;
+
+       switch (request) {
+       
+       case _RTIOC_CONNECT:
+               ret = rtipc_get_sockaddr(user_info, arg, &saddrp);
+               if (ret)
+                 return ret;
+               ret = __bufp_connect_socket(sk, saddrp);
+               break;
+
+       case _RTIOC_BIND:
+               ret = rtipc_get_sockaddr(user_info, arg, &saddrp);
+               if (ret)
+                       return ret;
+               if (saddrp == NULL)
+                       return -EFAULT;
+               ret = __bufp_bind_socket(sk, saddrp);
+               break;
+
+       case _RTIOC_GETSOCKNAME:
+               ret = rtipc_put_sockaddr(user_info, arg, &sk->name);
+               break;
+
+       case _RTIOC_GETPEERNAME:
+               ret = rtipc_put_sockaddr(user_info, arg, &sk->peer);
+               break;
+
+       case _RTIOC_SETSOCKOPT:
+               ret = __bufp_setsockopt(sk, user_info, arg);
+               break;
+
+       case _RTIOC_GETSOCKOPT:
+               ret = __bufp_getsockopt(sk, user_info, arg);
+               break;
+
+       case _RTIOC_LISTEN:
+       case _RTIOC_ACCEPT:
+               ret = -EOPNOTSUPP;
+               break;
+
+       case _RTIOC_SHUTDOWN:
+               ret = -ENOTCONN;
+               break;
+
+       default:
+               ret = -EINVAL;
+       }
+
+       return ret;
+}
+
+static int bufp_ioctl(struct rtipc_private *priv,
+                     rtdm_user_info_t *user_info,
+                     unsigned int request, void *arg)
+{
+       struct bufp_socket *sk = priv->state;
+
+       if (rtdm_in_rt_context() && request == _RTIOC_BIND)
+               return -ENOSYS; /* Try downgrading to NRT */
+
+       return __bufp_ioctl(sk, user_info, request, arg);
+}
+
+static int __init bufp_init(void)
+{
+       portmap = xnmap_create(CONFIG_XENO_OPT_BUFP_NRPORT, 0, 0);
+       if (portmap == NULL)
+               return -ENOMEM;
+
+       return 0;
+}
+
+static void __exit bufp_exit(void)
+{
+       xnmap_delete(portmap);
+}
+
+struct rtipc_protocol bufp_proto_driver = {
+       .proto_name = "bufp",
+       .proto_statesz = sizeof(struct bufp_socket),
+       .proto_init = bufp_init,
+       .proto_exit = bufp_exit,
+       .proto_ops = {
+               .socket = bufp_socket,
+               .close = bufp_close,
+               .recvmsg = bufp_recvmsg,
+               .sendmsg = bufp_sendmsg,
+               .read = bufp_read,
+               .write = bufp_write,
+               .ioctl = bufp_ioctl,
+       }
+};
diff --git a/ksrc/drivers/ipc/internal.h b/ksrc/drivers/ipc/internal.h
index 57c675f..adf43dc 100644
--- a/ksrc/drivers/ipc/internal.h
+++ b/ksrc/drivers/ipc/internal.h
@@ -98,6 +98,22 @@ extern struct rtipc_protocol xddp_proto_driver;
 
 extern struct rtipc_protocol iddp_proto_driver;
 
+extern struct rtipc_protocol bufp_proto_driver;
+
 extern struct xnptree rtipc_ptree;
 
+#define rtipc_wait_context             xnthread_wait_context
+#define rtipc_prepare_wait             xnthread_prepare_wait
+#define rtipc_finish_wait              xnthread_finish_wait
+#define rtipc_get_wait_context         xnthread_get_wait_context
+
+#define rtipc_peek_wait_head(obj)      xnsynch_peek_pendq(&(obj)->synch_base)
+#define rtipc_enter_atomic(lockctx)    xnlock_get_irqsave(&nklock, (lockctx))
+#define rtipc_leave_atomic(lockctx)    xnlock_put_irqrestore(&nklock, 
(lockctx))
+
+static inline int rtipc_socket_locked(struct rtdm_dev_context *context)
+{
+       return atomic_read(&context->close_lock_count);
+}
+
 #endif /* !_RTIPC_INTERNAL_H */
diff --git a/ksrc/drivers/ipc/rtipc.c b/ksrc/drivers/ipc/rtipc.c
index 5e695ea..d529912 100644
--- a/ksrc/drivers/ipc/rtipc.c
+++ b/ksrc/drivers/ipc/rtipc.c
@@ -30,6 +30,9 @@ static struct rtipc_protocol *protocols[IPCPROTO_MAX] = {
 #ifdef CONFIG_XENO_DRIVERS_RTIPC_IDDP
        [IPCPROTO_IDDP - 1] = &iddp_proto_driver,
 #endif
+#ifdef CONFIG_XENO_DRIVERS_RTIPC_BUFP
+       [IPCPROTO_BUFP - 1] = &bufp_proto_driver,
+#endif
 };
 
 #ifdef CONFIG_PROC_FS


_______________________________________________
Xenomai-git mailing list
Xenomai-git@gna.org
https://mail.gna.org/listinfo/xenomai-git

Reply via email to