Module: xenomai-head
Branch: master
Commit: d1fb5d409c9f4056a7d4d565cc5422d885297884
URL:    
http://git.xenomai.org/?p=xenomai-head.git;a=commit;h=d1fb5d409c9f4056a7d4d565cc5422d885297884

Author: Philippe Gerum <r...@xenomai.org>
Date:   Mon Sep  7 18:30:16 2009 +0200

rtipc/iddp: introduce local per-socket datagram pool

---

 examples/rtdm/profiles/ipc/iddp-sendrecv.c |   12 ++
 include/rtdm/rtipc.h                       |    3 +-
 ksrc/drivers/ipc/Kconfig                   |   15 --
 ksrc/drivers/ipc/iddp.c                    |  226 ++++++++++++++++++++--------
 4 files changed, 176 insertions(+), 80 deletions(-)

diff --git a/examples/rtdm/profiles/ipc/iddp-sendrecv.c 
b/examples/rtdm/profiles/ipc/iddp-sendrecv.c
index feca754..78ae564 100644
--- a/examples/rtdm/profiles/ipc/iddp-sendrecv.c
+++ b/examples/rtdm/profiles/ipc/iddp-sendrecv.c
@@ -56,12 +56,24 @@ void *server(void *arg)
        struct sockaddr_ipc saddr, claddr;
        socklen_t addrlen;
        char buf[128];
+       size_t poolsz;
        int ret, s;
 
        s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_IDDP);
        if (s < 0)
                fail("socket");
 
+       /*
+        * Set a local 32k pool for the server endpoint. Memory needed
+        * to convey datagrams will be pulled from this pool, instead
+        * of Xenomai's system pool.
+        */
+       poolsz = 32768; /* bytes */
+       ret = setsockopt(s, SOL_RTIPC, IDDP_SETLOCALPOOL,
+                        &poolsz, sizeof(poolsz));
+       if (ret)
+               fail("setsockopt");
+
        saddr.sipc_family = AF_RTIPC;
        saddr.sipc_port = IDDP_SVPORT;
        ret = bind(s, (struct sockaddr *)&saddr, sizeof(saddr));
diff --git a/include/rtdm/rtipc.h b/include/rtdm/rtipc.h
index f264507..d49c492 100644
--- a/include/rtdm/rtipc.h
+++ b/include/rtdm/rtipc.h
@@ -86,7 +86,8 @@ enum {
 #define XDDP_GETLABEL          6
 #define IDDP_SETTXTIMEOUT      7
 #define IDDP_SETRXTIMEOUT      8
-#define IDDP_GETSTALLCOUNT     9
+#define IDDP_SETLOCALPOOL      9
+#define IDDP_GETSTALLCOUNT     10
 
 #define XDDP_LABEL_LEN         XNOBJECT_NAME_LEN
 
diff --git a/ksrc/drivers/ipc/Kconfig b/ksrc/drivers/ipc/Kconfig
index d6e614e..38db875 100644
--- a/ksrc/drivers/ipc/Kconfig
+++ b/ksrc/drivers/ipc/Kconfig
@@ -41,21 +41,6 @@ config XENO_DRIVERS_RTIPC_IDDP
        Xenomai's IDDP protocol enables real-time threads to exchange
        datagrams within the Xenomai domain.
 
-config XENO_OPT_IDDP_POOLSZ
-       depends on XENO_DRIVERS_RTIPC_IDDP
-       int "Size of message pool (Kb)"
-       default 32
-       help
-
-       Memory consumed by outstanding datagrams is obtained from a
-       local memory pool managed by the IDDP protocol driver. This
-       parameter selects the size of such pool.  The size is
-       expressed in Kilobytes.
-
-       NOTE: senders may stall until enough memory is available from
-       the pool, as the receivers consume the pending
-       datagrams. Therefore, this parameter may affect performances.
-
 config XENO_OPT_IDDP_NRPORT
        depends on XENO_DRIVERS_RTIPC_IDDP
        int "Number of communication ports"
diff --git a/ksrc/drivers/ipc/iddp.c b/ksrc/drivers/ipc/iddp.c
index 6a2aec6..7cdcae8 100644
--- a/ksrc/drivers/ipc/iddp.c
+++ b/ksrc/drivers/ipc/iddp.c
@@ -40,12 +40,22 @@ struct iddp_socket {
        struct sockaddr_ipc name;
        struct sockaddr_ipc peer;
 
+       struct xnheap *bufpool;
+       struct xnheap privpool;
+       rtdm_event_t *poolevt;
+       rtdm_event_t privevt;
+       int *poolwait;
+       int privwait;
+       size_t poolsz;
        rtdm_sem_t insem;
        struct list_head inq;
+       u_long status;
 
        nanosecs_rel_t rx_timeout;
        nanosecs_rel_t tx_timeout;
        unsigned long stalls;   /* Buffer stall counter. */
+
+       struct rtipc_private *priv;
 };
 
 static struct sockaddr_ipc nullsa = {
@@ -55,20 +65,14 @@ static struct sockaddr_ipc nullsa = {
 
 static struct iddp_socket *portmap[CONFIG_XENO_OPT_IDDP_NRPORT];
 
-static struct xnheap msgpool;
-
 static rtdm_event_t poolevt;
 
 static int poolwait;
 
-static void *poolmem;
-
-static unsigned long poolsz = CONFIG_XENO_OPT_IDDP_POOLSZ;
-module_param_named(poolsz, poolsz, ulong, 0444);
-MODULE_PARM_DESC(poolsz, "Size of the IDDP message pool (in Kbytes)");
-
 #define MAX_IOV_NUMBER  64
 
+#define _IDDP_BINDING  0
+
 static inline void __iddp_init_mbuf(struct iddp_message *mbuf, size_t len)
 {
        mbuf->rdoff = 0;
@@ -76,17 +80,18 @@ static inline void __iddp_init_mbuf(struct iddp_message 
*mbuf, size_t len)
        INIT_LIST_HEAD(&mbuf->next);
 }
 
-static struct iddp_message *__iddp_alloc_mbuf(struct iddp_socket *sk,
-                                             size_t len, int flags, int *pret)
+static struct iddp_message *
+__iddp_alloc_mbuf(struct iddp_socket *sk, size_t len,
+                 nanosecs_rel_t timeout, int flags, int *pret)
 {
        struct iddp_message *mbuf = NULL;
        rtdm_toseq_t timeout_seq;
        int ret = 0;
 
-       rtdm_toseq_init(&timeout_seq, sk->tx_timeout);
+       rtdm_toseq_init(&timeout_seq, timeout);
 
        for (;;) {
-               mbuf = xnheap_alloc(&msgpool, len + sizeof(*mbuf));
+               mbuf = xnheap_alloc(sk->bufpool, len + sizeof(*mbuf));
                if (mbuf) {
                        __iddp_init_mbuf(mbuf, len);
                        break;
@@ -108,10 +113,11 @@ static struct iddp_message *__iddp_alloc_mbuf(struct 
iddp_socket *sk,
                         * by this construct.
                         */
                        ++sk->stalls;
-                       ++poolwait;
-                       ret = rtdm_event_timedwait(&poolevt, sk->tx_timeout,
+                       ++sk->poolwait;
+                       ret = rtdm_event_timedwait(sk->poolevt,
+                                                  timeout,
                                                   &timeout_seq);
-                       poolwait--;
+                       sk->poolwait--;
                        if (unlikely(ret == -EIDRM))
                                ret = -ECONNRESET;
                );
@@ -124,16 +130,23 @@ static struct iddp_message *__iddp_alloc_mbuf(struct 
iddp_socket *sk,
        return mbuf;
 }
 
-static void __iddp_free_mbuf(struct iddp_message *mbuf)
+static void __iddp_free_mbuf(struct iddp_socket *sk,
+                            struct iddp_message *mbuf)
 {
-       xnheap_free(&msgpool, mbuf);
+       xnheap_free(sk->bufpool, mbuf);
        RTDM_EXECUTE_ATOMICALLY(
                /* Wake up sleepers if any. */
-               if (poolwait > 0)
-                       rtdm_event_pulse(&poolevt);
+               if (*sk->poolwait > 0)
+                       rtdm_event_pulse(sk->poolevt);
        );
 }
 
+static void __iddp_flush_pool(struct xnheap *heap,
+                             void *poolmem, u_long poolsz, void *cookie)
+{
+       xnarch_free_host_mem(poolmem, poolsz);
+}
+
 static int iddp_socket(struct rtipc_private *priv,
                       rtdm_user_info_t *user_info)
 {
@@ -142,10 +155,16 @@ static int iddp_socket(struct rtipc_private *priv,
        rtdm_sem_init(&sk->insem, 0);
        sk->name = nullsa;      /* Unbound */
        sk->peer = nullsa;
+       sk->bufpool = &kheap;
+       sk->poolevt = &poolevt;
+       sk->poolwait = &poolwait;
+       sk->poolsz = 0;
+       sk->status = 0;
        sk->rx_timeout = RTDM_TIMEOUT_INFINITE;
        sk->tx_timeout = RTDM_TIMEOUT_INFINITE;
        sk->stalls = 0;
        INIT_LIST_HEAD(&sk->inq);
+       sk->priv = priv;
 
        return 0;
 }
@@ -160,17 +179,23 @@ static int iddp_close(struct rtipc_private *priv,
        RTDM_EXECUTE_ATOMICALLY(
                if (sk->name.sipc_port > -1)
                        portmap[sk->name.sipc_port] = NULL;
-               list_splice(&sk->inq, &head);
        );
 
+       rtdm_sem_destroy(&sk->insem);
+
+       if (sk->bufpool != &kheap) {
+               xnheap_destroy(&sk->privpool, __iddp_flush_pool, NULL);
+               return 0;
+       }
+
+       /* Send unread datagrams back to the system heap. */
+       list_splice(&sk->inq, &head);
        while (!list_empty(&head)) {
                mbuf = list_entry(head.next, struct iddp_message, next);
                list_del(&mbuf->next);
-               __iddp_free_mbuf(mbuf);
+               __iddp_free_mbuf(sk, mbuf);
        }
 
-       rtdm_sem_destroy(&sk->insem);
-
        return 0;
 }
 
@@ -242,7 +267,7 @@ static ssize_t __iddp_recvmsg(struct rtipc_private *priv,
        }
 
        if (dofree)
-               __iddp_free_mbuf(mbuf);
+               __iddp_free_mbuf(sk, mbuf);
 
        return ret ?: len;
 }
@@ -307,6 +332,7 @@ static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
                              const struct sockaddr_ipc *daddr)
 {
        struct iddp_socket *sk = priv->state, *rsk;
+       struct rtdm_dev_context *rcontext = NULL; /* Fake GCC */
        struct iddp_message *mbuf;
        int nvec, wroff, ret, to;
        ssize_t len, rdlen, vlen;
@@ -322,9 +348,26 @@ static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
        if (len == 0)
                return 0;
 
-       mbuf = __iddp_alloc_mbuf(sk, len, flags, &ret);
-       if (unlikely(ret))
+       to = daddr->sipc_port;
+
+       RTDM_EXECUTE_ATOMICALLY(
+               rsk = portmap[to];
+               if (unlikely(rsk == NULL))
+                       ret = -ECONNRESET;
+               else {
+                       rcontext = rtdm_private_to_context(rsk->priv);
+                       rtdm_context_lock(rcontext);
+                       ret = 0;
+               }
+       );
+       if (ret)
+               return ret;
+
+       mbuf = __iddp_alloc_mbuf(rsk, len, flags, sk->tx_timeout, &ret);
+       if (unlikely(ret)) {
+               rtdm_context_unlock(rcontext);
                return ret;
+       }
 
        /* Now, move "len" bytes to mbuf->data from the vector cells */
        for (nvec = 0, rdlen = len, wroff = 0; rdlen > 0; nvec++) {
@@ -341,10 +384,12 @@ static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
                wroff += vlen;
        }
 
-       to = daddr->sipc_port;
-
        RTDM_EXECUTE_ATOMICALLY(
                rsk = portmap[to];
+               /*
+                * IDDP ports may be unbound dynamically, and we only
+                * hold closure, so we have to test this again.
+                */
                if (unlikely(rsk == NULL))
                        ret = -ECONNRESET;
                else {
@@ -358,10 +403,13 @@ static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
        );
        if (unlikely(ret)) {
        fail:
-               __iddp_free_mbuf(mbuf);
+               __iddp_free_mbuf(rsk, mbuf);
+               rtdm_context_unlock(rcontext);
                return ret;
        }
 
+       rtdm_context_unlock(rcontext);
+
        return len;
 }
 
@@ -434,6 +482,10 @@ static ssize_t iddp_write(struct rtipc_private *priv,
 static int __iddp_bind_socket(struct iddp_socket *sk,
                              struct sockaddr_ipc *sa)
 {
+       void *poolmem;
+       size_t poolsz;
+       int ret = 0;
+
        if (sa == NULL) {
                sa = &nullsa;
                goto set_binding;
@@ -446,6 +498,36 @@ static int __iddp_bind_socket(struct iddp_socket *sk,
            sa->sipc_port >= CONFIG_XENO_OPT_IDDP_NRPORT)
                return -EINVAL;
 
+       if (test_and_set_bit(_IDDP_BINDING, &sk->status))
+               return -EINPROGRESS;
+
+       /*
+        * Allocate a local buffer pool if we were told to do so via
+        * setsockopt() before we got there.
+        */
+       poolsz = sk->poolsz;
+       if (poolsz > 0) {
+               poolsz = xnheap_rounded_size(poolsz, XNHEAP_PAGE_SIZE);
+               poolmem = xnarch_alloc_host_mem(poolsz);
+               if (poolmem == NULL) {
+                       ret = -ENOMEM;
+                       goto out;
+               }
+
+               ret = xnheap_init(&sk->privpool,
+                                 poolmem, poolsz, XNHEAP_PAGE_SIZE);
+               if (ret) {
+                       xnarch_free_host_mem(poolmem, poolsz);
+                       goto out;
+               }
+
+               RTDM_EXECUTE_ATOMICALLY(
+                       sk->poolevt = &sk->privevt;
+                       sk->poolwait = &sk->privwait;
+                       sk->bufpool = &sk->privpool;
+               );
+       }
+
  set_binding:
        RTDM_EXECUTE_ATOMICALLY(
                if (sk->name.sipc_port >= 0) /* Clear previous binding. */
@@ -455,7 +537,10 @@ static int __iddp_bind_socket(struct iddp_socket *sk,
                sk->name = *sa;
        );
 
-       return 0;
+out:
+       clear_bit(_IDDP_BINDING, &sk->status);
+
+       return ret;
 }
 
 static int __iddp_connect_socket(struct iddp_socket *sk,
@@ -548,6 +633,7 @@ static int __iddp_setsockopt(struct iddp_socket *sk,
        struct _rtdm_setsockopt_args sopt;
        nanosecs_rel_t timeout;
        int ret = 0;
+       size_t len;
 
        if (rtipc_get_arg(user_info, &sopt, arg, sizeof(sopt)))
                return -EFAULT;
@@ -575,6 +661,27 @@ static int __iddp_setsockopt(struct iddp_socket *sk,
                sk->tx_timeout = timeout;
                break;
 
+       case IDDP_SETLOCALPOOL:
+               if (sopt.optlen != sizeof(len))
+                       return -EINVAL;
+               if (rtipc_get_arg(user_info, &len,
+                                 sopt.optval, sizeof(len)))
+                       return -EFAULT;
+               if (len == 0)
+                       return -EINVAL;
+               RTDM_EXECUTE_ATOMICALLY(
+                       /*
+                        * We may not do this more than once, and we
+                        * have to do this before the first binding.
+                        */
+                       if (test_bit(_IDDP_BINDING, &sk->status) ||
+                           sk->bufpool != &kheap)
+                               ret = -EALREADY;
+                       else
+                               sk->poolsz = len;
+               );
+               break;
+
        case IDDP_GETSTALLCOUNT:
                if (rtipc_put_arg(user_info, arg,
                                  &sk->stalls, sizeof(sk->stalls)))
@@ -623,12 +730,11 @@ static int __iddp_getsockopt(struct iddp_socket *sk,
        return ret;
 }
 
-static int iddp_ioctl(struct rtipc_private *priv,
-                     rtdm_user_info_t *user_info,
-                     unsigned int request, void *arg)
+static int __iddp_ioctl(struct iddp_socket *sk,
+                       rtdm_user_info_t *user_info,
+                       unsigned int request, void *arg)
 {
        struct sockaddr_ipc saddr, *saddrp = &saddr;
-       struct iddp_socket *sk = priv->state;
        int ret = 0;
 
        switch (request) {
@@ -641,11 +747,7 @@ static int iddp_ioctl(struct rtipc_private *priv,
                break;
 
        case _RTIOC_BIND:
-               ret = __iddp_getuser_address(user_info, arg, &saddrp);
-               if (ret)
-                       return ret;
-               ret = __iddp_bind_socket(sk, saddrp);
-               break;
+               return -ENOSYS; /* Downgrade to NRT */
 
        case _RTIOC_GETSOCKNAME:
                ret = __iddp_putuser_address(user_info, arg, &sk->name);
@@ -679,46 +781,42 @@ static int iddp_ioctl(struct rtipc_private *priv,
        return ret;
 }
 
-static int __init iddp_init(void)
+static int iddp_ioctl(struct rtipc_private *priv,
+                     rtdm_user_info_t *user_info,
+                     unsigned int request, void *arg)
 {
-       int ret = -ENOMEM;
+       struct iddp_socket *sk = priv->state;
+       struct sockaddr_ipc saddr, *saddrp;
+       int ret;
 
-       if (poolsz < 4) {
-               printk(KERN_ERR "invalid pool size (min 4k)\n");
-               goto failed;
-       }
+       if (rtdm_in_rt_context() || request != _RTIOC_BIND)
+               return __iddp_ioctl(sk, user_info, request, arg);
 
-       poolmem = vmalloc(poolsz * 1024);
-       if (poolmem == NULL) {
-               printk(KERN_ERR
-                      "vmalloc(%lu) failed for message pool\n",
-                      poolsz * 1024);
-               goto failed;
-       }
+       saddrp = &saddr;
+       ret = __iddp_getuser_address(user_info, arg, &saddrp);
+       if (ret)
+               return ret;
 
-       ret = xnheap_init(&msgpool, poolmem, poolsz * 1024, 512);
-       if (ret) {
-               printk(KERN_ERR
-                      "xnheap_create() failed for poolsz = %luK\n",
-                      poolsz);
-               goto cleanup_poolmem;
-       }
+       return __iddp_bind_socket(sk, saddrp);
+}
 
+static int __init iddp_init(void)
+{
        rtdm_event_init(&poolevt, 0);
 
        return 0;
+}
 
-cleanup_poolmem:
-       vfree(poolmem);
-
-failed:
-       return ret;
+static void __exit iddp_exit(void)
+{
+       rtdm_event_destroy(&poolevt);
 }
 
 struct rtipc_protocol iddp_proto_driver = {
        .proto_name = "iddp",
        .proto_statesz = sizeof(struct iddp_socket),
        .proto_init = iddp_init,
+       .proto_exit = iddp_exit,
        .proto_ops = {
                .socket = iddp_socket,
                .close = iddp_close,


_______________________________________________
Xenomai-git mailing list
Xenomai-git@gna.org
https://mail.gna.org/listinfo/xenomai-git

Reply via email to