Module: xenomai-forge
Branch: next
Commit: 7c32e355d80ec87a465ca57e0c346d6fbc93ef17
URL:    
http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=7c32e355d80ec87a465ca57e0c346d6fbc93ef17

Author: Philippe Gerum <r...@xenomai.org>
Date:   Tue Aug 12 21:33:41 2014 +0200

drivers/ipc: add support for select()

---

 kernel/drivers/ipc/bufp.c     |  128 ++++++++++++++++++++++++++++----------
 kernel/drivers/ipc/iddp.c     |  138 +++++++++++++++++++++++++++++------------
 kernel/drivers/ipc/internal.h |   26 ++++----
 kernel/drivers/ipc/rtipc.c    |   89 +++++++++++++++++++-------
 kernel/drivers/ipc/xddp.c     |  120 +++++++++++++++++++++++++----------
 5 files changed, 355 insertions(+), 146 deletions(-)

diff --git a/kernel/drivers/ipc/bufp.c b/kernel/drivers/ipc/bufp.c
index 8b935a7..46abbe0 100644
--- a/kernel/drivers/ipc/bufp.c
+++ b/kernel/drivers/ipc/bufp.c
@@ -17,7 +17,6 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
-
 #include <linux/module.h>
 #include <linux/list.h>
 #include <linux/kernel.h>
@@ -26,6 +25,7 @@
 #include <cobalt/kernel/heap.h>
 #include <cobalt/kernel/map.h>
 #include <cobalt/kernel/bufd.h>
+#include <linux/poll.h>
 #include <rtdm/ipc.h>
 #include "internal.h"
 
@@ -69,8 +69,9 @@ static struct sockaddr_ipc nullsa = {
 
 static struct xnmap *portmap;
 
-#define _BUFP_BINDING  0
-#define _BUFP_BOUND    1
+#define _BUFP_BINDING   0
+#define _BUFP_BOUND     1
+#define _BUFP_CONNECTED 2
 
 #ifdef CONFIG_XENO_OPT_VFILE
 
@@ -102,9 +103,9 @@ static struct xnpnode_link __bufp_pnode = {
 
 #endif /* !CONFIG_XENO_OPT_VFILE */
 
-static int bufp_socket(struct rtipc_private *priv,
-                      struct rtdm_fd *fd)
+static int bufp_socket(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct bufp_socket *sk = priv->state;
 
        sk->magic = BUFP_SOCKET_MAGIC;
@@ -129,9 +130,9 @@ static int bufp_socket(struct rtipc_private *priv,
        return 0;
 }
 
-static void bufp_close(struct rtipc_private *priv,
-               struct rtdm_fd *fd)
+static void bufp_close(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct bufp_socket *sk = priv->state;
        rtdm_lockctx_t s;
 
@@ -166,6 +167,7 @@ static ssize_t __bufp_readbuf(struct bufp_socket *sk,
        rtdm_lockctx_t s;
        u_long rdtoken;
        off_t rdoff;
+       int resched;
 
        len = bufd->b_len;
 
@@ -224,6 +226,13 @@ redo:
                sk->rdoff = rdoff;
                ret = len;
 
+               resched = 0;
+               if (sk->fillsz + len == sk->bufsz) /* -> writable */
+                       resched |= xnselect_signal(&sk->priv->send_block, 
POLLOUT);
+
+               if (sk->fillsz == 0) /* -> non-readable */
+                       resched |= xnselect_signal(&sk->priv->recv_block, 0);
+
                /*
                 * Wake up all threads pending on the output wait
                 * queue, if we freed enough room for the leading one
@@ -237,7 +246,10 @@ redo:
                XENO_BUGON(NUCLEUS, wc == NULL);
                bufwc = container_of(wc, struct bufp_wait_context, wc);
                if (bufwc->len + sk->fillsz <= sk->bufsz)
+                       /* This call rescheds internally. */
                        rtdm_event_pulse(&sk->o_event);
+               else if (resched)
+                       xnsched_run();
                /*
                 * We cannot fail anymore once some data has been
                 * copied via the buffer descriptor, so no need to
@@ -281,11 +293,11 @@ out:
        return ret;
 }
 
-static ssize_t __bufp_recvmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __bufp_recvmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              struct sockaddr_ipc *saddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct bufp_socket *sk = priv->state;
        ssize_t len, wrlen, vlen, ret;
        struct xnbufd bufd;
@@ -343,8 +355,7 @@ static ssize_t __bufp_recvmsg(struct rtipc_private *priv,
        return len - wrlen;
 }
 
-static ssize_t bufp_recvmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t bufp_recvmsg(struct rtdm_fd *fd,
                            struct msghdr *msg, int flags)
 {
        struct iovec iov[RTIPC_IOV_MAX];
@@ -368,8 +379,7 @@ static ssize_t bufp_recvmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __bufp_recvmsg(priv, fd,
-                            iov, msg->msg_iovlen, flags, &saddr);
+       ret = __bufp_recvmsg(fd, iov, msg->msg_iovlen, flags, &saddr);
        if (ret <= 0)
                return ret;
 
@@ -389,12 +399,11 @@ static ssize_t bufp_recvmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t bufp_read(struct rtipc_private *priv,
-                        struct rtdm_fd *fd,
-                        void *buf, size_t len)
+static ssize_t bufp_read(struct rtdm_fd *fd, void *buf, size_t len)
 {
        struct iovec iov = { .iov_base = buf, .iov_len = len };
-       return __bufp_recvmsg(priv, fd, &iov, 1, 0, NULL);
+
+       return __bufp_recvmsg(fd, &iov, 1, 0, NULL);
 }
 
 static ssize_t __bufp_writebuf(struct bufp_socket *rsk,
@@ -411,6 +420,7 @@ static ssize_t __bufp_writebuf(struct bufp_socket *rsk,
        size_t wbytes, n;
        u_long wrtoken;
        off_t wroff;
+       int resched;
 
        len = bufd->b_len;
 
@@ -467,7 +477,13 @@ redo:
                rsk->fillsz += len;
                rsk->wroff = wroff;
                ret = len;
+               resched = 0;
+
+               if (rsk->fillsz == len) /* -> readable */
+                       resched |= xnselect_signal(&rsk->priv->recv_block, 
POLLIN);
 
+               if (rsk->fillsz == rsk->bufsz) /* non-writable */
+                       resched |= xnselect_signal(&rsk->priv->send_block, 0);
                /*
                 * Wake up all threads pending on the input wait
                 * queue, if we accumulated enough data to feed the
@@ -482,13 +498,14 @@ redo:
                bufwc = container_of(wc, struct bufp_wait_context, wc);
                if (bufwc->len <= rsk->fillsz)
                        rtdm_event_pulse(&rsk->i_event);
+               else if (resched)
+                       xnsched_run();
                /*
                 * We cannot fail anymore once some data has been
                 * copied via the buffer descriptor, so no need to
                 * check for any reason to invalidate the latter.
                 */
                goto out;
-
        wait:
                if (flags & MSG_DONTWAIT) {
                        ret = -EWOULDBLOCK;
@@ -513,11 +530,11 @@ out:
        return ret;
 }
 
-static ssize_t __bufp_sendmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __bufp_sendmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              const struct sockaddr_ipc *daddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct bufp_socket *sk = priv->state, *rsk;
        ssize_t len, rdlen, vlen, ret = 0;
        struct rtdm_fd *rfd;
@@ -586,10 +603,10 @@ fail:
        return ret;
 }
 
-static ssize_t bufp_sendmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t bufp_sendmsg(struct rtdm_fd *fd,
                            const struct msghdr *msg, int flags)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct bufp_socket *sk = priv->state;
        struct iovec iov[RTIPC_IOV_MAX];
        struct sockaddr_ipc daddr;
@@ -626,8 +643,7 @@ static ssize_t bufp_sendmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __bufp_sendmsg(priv, fd, iov,
-                            msg->msg_iovlen, flags, &daddr);
+       ret = __bufp_sendmsg(fd, iov, msg->msg_iovlen, flags, &daddr);
        if (ret <= 0)
                return ret;
 
@@ -639,17 +655,17 @@ static ssize_t bufp_sendmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t bufp_write(struct rtipc_private *priv,
-                         struct rtdm_fd *fd,
+static ssize_t bufp_write(struct rtdm_fd *fd,
                          const void *buf, size_t len)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iovec iov = { .iov_base = (void *)buf, .iov_len = len };
        struct bufp_socket *sk = priv->state;
 
        if (sk->peer.sipc_port < 0)
                return -EDESTADDRREQ;
 
-       return __bufp_sendmsg(priv, fd, &iov, 1, 0, &sk->peer);
+       return __bufp_sendmsg(fd, &iov, 1, 0, &sk->peer);
 }
 
 static int __bufp_bind_socket(struct rtipc_private *priv,
@@ -717,6 +733,8 @@ static int __bufp_bind_socket(struct rtipc_private *priv,
        cobalt_atomic_enter(s);
        __clear_bit(_BUFP_BINDING, &sk->status);
        __set_bit(_BUFP_BOUND, &sk->status);
+       if (xnselect_signal(&priv->send_block, POLLOUT))
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -731,9 +749,9 @@ static int __bufp_connect_socket(struct bufp_socket *sk,
                                 struct sockaddr_ipc *sa)
 {
        struct bufp_socket *rsk;
+       int ret, resched = 0;
        rtdm_lockctx_t s;
        xnhandle_t h;
-       int ret;
 
        if (sa == NULL) {
                sa = &nullsa;
@@ -773,9 +791,11 @@ static int __bufp_connect_socket(struct bufp_socket *sk,
                rsk = xnregistry_lookup(h, NULL);
                if (rsk == NULL || rsk->magic != BUFP_SOCKET_MAGIC)
                        ret = -EINVAL;
-               else
+               else {
                        /* Fetch labeled port number. */
                        sa->sipc_port = rsk->name.sipc_port;
+                       resched = xnselect_signal(&sk->priv->send_block, 
POLLOUT);
+               }
                cobalt_atomic_leave(s);
                if (ret)
                        return ret;
@@ -788,6 +808,12 @@ set_assoc:
                sk->name = *sa;
        /* Set default destination. */
        sk->peer = *sa;
+       if (sa->sipc_port < 0)
+               __clear_bit(_BUFP_CONNECTED, &sk->status);
+       else
+               __set_bit(_BUFP_CONNECTED, &sk->status);
+       if (resched)
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -956,10 +982,10 @@ static int __bufp_getsockopt(struct bufp_socket *sk,
        return ret;
 }
 
-static int __bufp_ioctl(struct rtipc_private *priv,
-                       struct rtdm_fd *fd,
+static int __bufp_ioctl(struct rtdm_fd *fd,
                        unsigned int request, void *arg)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct sockaddr_ipc saddr, *saddrp = &saddr;
        struct bufp_socket *sk = priv->state;
        int ret = 0;
@@ -1014,14 +1040,47 @@ static int __bufp_ioctl(struct rtipc_private *priv,
        return ret;
 }
 
-static int bufp_ioctl(struct rtipc_private *priv,
-                     struct rtdm_fd *fd,
+static int bufp_ioctl(struct rtdm_fd *fd,
                      unsigned int request, void *arg)
 {
        if (rtdm_in_rt_context() && request == _RTIOC_BIND)
                return -ENOSYS; /* Try downgrading to NRT */
 
-       return __bufp_ioctl(priv, fd, request, arg);
+       return __bufp_ioctl(fd, request, arg);
+}
+
+static unsigned int bufp_pollstate(struct rtdm_fd *fd)
+{
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       struct bufp_socket *sk = priv->state, *rsk;
+       unsigned int mask = 0;
+       struct rtdm_fd *rfd;
+       spl_t s;
+
+       cobalt_atomic_enter(s);
+
+       if (test_bit(_BUFP_BOUND, &sk->status) && sk->fillsz > 0)
+               mask |= POLLIN;
+
+       /*
+        * If the socket is connected, POLLOUT means that the peer
+        * exists, is bound and can receive data. Otherwise POLLOUT is
+        * always set, assuming the client is likely to use explicit
+        * addressing in send operations.
+        */
+       if (test_bit(_BUFP_CONNECTED, &sk->status)) {
+               rfd = xnmap_fetch_nocheck(portmap, sk->peer.sipc_port);
+               if (rfd) {
+                       rsk = rtipc_fd_to_state(rfd);
+                       if (rsk->fillsz < rsk->bufsz)
+                               mask |= POLLOUT;
+               }
+       } else
+               mask |= POLLOUT;
+
+       cobalt_atomic_leave(s);
+
+       return mask;
 }
 
 static int bufp_init(void)
@@ -1051,5 +1110,6 @@ struct rtipc_protocol bufp_proto_driver = {
                .read = bufp_read,
                .write = bufp_write,
                .ioctl = bufp_ioctl,
+               .pollstate = bufp_pollstate,
        }
 };
diff --git a/kernel/drivers/ipc/iddp.c b/kernel/drivers/ipc/iddp.c
index b8677fd..82ddebb 100644
--- a/kernel/drivers/ipc/iddp.c
+++ b/kernel/drivers/ipc/iddp.c
@@ -17,12 +17,12 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
-
 #include <linux/module.h>
 #include <linux/list.h>
 #include <linux/kernel.h>
 #include <linux/vmalloc.h>
 #include <linux/slab.h>
+#include <linux/poll.h>
 #include <cobalt/kernel/heap.h>
 #include <cobalt/kernel/bufd.h>
 #include <cobalt/kernel/map.h>
@@ -68,8 +68,9 @@ static struct xnmap *portmap;
 
 static rtdm_waitqueue_t poolwaitq;
 
-#define _IDDP_BINDING  0
-#define _IDDP_BOUND    1
+#define _IDDP_BINDING   0
+#define _IDDP_BOUND     1
+#define _IDDP_CONNECTED 2
 
 #ifdef CONFIG_XENO_OPT_VFILE
 
@@ -164,9 +165,9 @@ static void __iddp_flush_pool(struct xnheap *heap,
        free_pages_exact(poolmem, poolsz);
 }
 
-static int iddp_socket(struct rtipc_private *priv,
-                      struct rtdm_fd *fd)
+static int iddp_socket(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state;
 
        sk->magic = IDDP_SOCKET_MAGIC;
@@ -189,9 +190,9 @@ static int iddp_socket(struct rtipc_private *priv,
        return 0;
 }
 
-static void iddp_close(struct rtipc_private *priv,
-               struct rtdm_fd *fd)
+static void iddp_close(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state;
        struct iddp_message *mbuf;
        rtdm_lockctx_t s;
@@ -225,11 +226,11 @@ static void iddp_close(struct rtipc_private *priv,
        return;
 }
 
-static ssize_t __iddp_recvmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __iddp_recvmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              struct sockaddr_ipc *saddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state;
        ssize_t maxlen, len, wrlen, vlen;
        rtdm_toseq_t timeout_seq, *toseq;
@@ -281,17 +282,21 @@ static ssize_t __iddp_recvmsg(struct rtipc_private *priv,
        if (maxlen >= len) {
                list_del(&mbuf->next);
                dofree = 1;
+               if (list_empty(&sk->inq)) /* -> non-readable */
+                       xnselect_signal(&priv->recv_block, 0);
+
        } else {
                /* Buffer is only partially read: repost. */
                mbuf->rdoff += maxlen;
                len = maxlen;
                dofree = 0;
        }
-       cobalt_atomic_leave(s);
 
        if (!dofree)
                rtdm_sem_up(&sk->insem);
 
+       cobalt_atomic_leave(s);
+
        /* Now, write "len" bytes from mbuf->data to the vector cells */
        for (nvec = 0, wrlen = len; nvec < iovlen && wrlen > 0; nvec++) {
                if (iov[nvec].iov_len == 0)
@@ -320,10 +325,10 @@ static ssize_t __iddp_recvmsg(struct rtipc_private *priv,
        return ret ?: len;
 }
 
-static ssize_t iddp_recvmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t iddp_recvmsg(struct rtdm_fd *fd,
                            struct msghdr *msg, int flags)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iovec iov[RTIPC_IOV_MAX];
        struct sockaddr_ipc saddr;
        ssize_t ret;
@@ -345,8 +350,7 @@ static ssize_t iddp_recvmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __iddp_recvmsg(priv, fd,
-                            iov, msg->msg_iovlen, flags, &saddr);
+       ret = __iddp_recvmsg(fd, iov, msg->msg_iovlen, flags, &saddr);
        if (ret <= 0)
                return ret;
 
@@ -366,24 +370,23 @@ static ssize_t iddp_recvmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t iddp_read(struct rtipc_private *priv,
-                        struct rtdm_fd *fd,
-                        void *buf, size_t len)
+static ssize_t iddp_read(struct rtdm_fd *fd, void *buf, size_t len)
 {
        struct iovec iov = { .iov_base = buf, .iov_len = len };
-       return __iddp_recvmsg(priv, fd, &iov, 1, 0, NULL);
+
+       return __iddp_recvmsg(fd, &iov, 1, 0, NULL);
 }
 
-static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __iddp_sendmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              const struct sockaddr_ipc *daddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state, *rsk;
        struct iddp_message *mbuf;
        ssize_t len, rdlen, vlen;
-       struct rtdm_fd *rfd;
        int nvec, wroff, ret;
+       struct rtdm_fd *rfd;
        struct xnbufd bufd;
        rtdm_lockctx_t s;
 
@@ -435,13 +438,24 @@ static ssize_t __iddp_sendmsg(struct rtipc_private *priv,
        }
 
        cobalt_atomic_enter(s);
+
+       /*
+        * CAUTION: we must remain atomic from the moment we signal
+        * POLLIN, until sem_up has happened.
+        */
+       if (list_empty(&rsk->inq)) /* -> readable */
+               xnselect_signal(&rsk->priv->recv_block, POLLIN);
+
        mbuf->from = sk->name.sipc_port;
+
        if (flags & MSG_OOB)
                list_add(&mbuf->next, &rsk->inq);
        else
                list_add_tail(&mbuf->next, &rsk->inq);
+
+       rtdm_sem_up(&rsk->insem); /* Will resched. */
+
        cobalt_atomic_leave(s);
-       rtdm_sem_up(&rsk->insem);
 
        rtdm_fd_unlock(rfd);
 
@@ -455,10 +469,10 @@ fail:
        return ret;
 }
 
-static ssize_t iddp_sendmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t iddp_sendmsg(struct rtdm_fd *fd,
                            const struct msghdr *msg, int flags)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state;
        struct iovec iov[RTIPC_IOV_MAX];
        struct sockaddr_ipc daddr;
@@ -495,8 +509,7 @@ static ssize_t iddp_sendmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __iddp_sendmsg(priv, fd, iov,
-                            msg->msg_iovlen, flags, &daddr);
+       ret = __iddp_sendmsg(fd, iov, msg->msg_iovlen, flags, &daddr);
        if (ret <= 0)
                return ret;
 
@@ -508,25 +521,25 @@ static ssize_t iddp_sendmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t iddp_write(struct rtipc_private *priv,
-                         struct rtdm_fd *fd,
+static ssize_t iddp_write(struct rtdm_fd *fd,
                          const void *buf, size_t len)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iovec iov = { .iov_base = (void *)buf, .iov_len = len };
        struct iddp_socket *sk = priv->state;
 
        if (sk->peer.sipc_port < 0)
                return -EDESTADDRREQ;
 
-       return __iddp_sendmsg(priv, fd, &iov, 1, 0, &sk->peer);
+       return __iddp_sendmsg(fd, &iov, 1, 0, &sk->peer);
 }
 
-static int __iddp_bind_socket(struct rtipc_private *priv,
+static int __iddp_bind_socket(struct rtdm_fd *fd,
                              struct sockaddr_ipc *sa)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iddp_socket *sk = priv->state;
        int ret = 0, port;
-       struct rtdm_fd *fd;
        rtdm_lockctx_t s;
        void *poolmem;
        size_t poolsz;
@@ -548,7 +561,6 @@ static int __iddp_bind_socket(struct rtipc_private *priv,
 
        /* Will auto-select a free port number if unspec (-1). */
        port = sa->sipc_port;
-       fd = rtdm_private_to_fd(priv);
        cobalt_atomic_enter(s);
        port = xnmap_enter(portmap, port, fd);
        cobalt_atomic_leave(s);
@@ -601,6 +613,8 @@ static int __iddp_bind_socket(struct rtipc_private *priv,
        cobalt_atomic_enter(s);
        __clear_bit(_IDDP_BINDING, &sk->status);
        __set_bit(_IDDP_BOUND, &sk->status);
+       if (xnselect_signal(&priv->send_block, POLLOUT))
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -615,9 +629,9 @@ static int __iddp_connect_socket(struct iddp_socket *sk,
                                 struct sockaddr_ipc *sa)
 {
        struct iddp_socket *rsk;
+       int ret, resched = 0;
        rtdm_lockctx_t s;
        xnhandle_t h;
-       int ret;
 
        if (sa == NULL) {
                sa = &nullsa;
@@ -657,9 +671,11 @@ static int __iddp_connect_socket(struct iddp_socket *sk,
                rsk = xnregistry_lookup(h, NULL);
                if (rsk == NULL || rsk->magic != IDDP_SOCKET_MAGIC)
                        ret = -EINVAL;
-               else
+               else {
                        /* Fetch labeled port number. */
                        sa->sipc_port = rsk->name.sipc_port;
+                       resched = xnselect_signal(&sk->priv->send_block, 
POLLOUT);
+               }
                cobalt_atomic_leave(s);
                if (ret)
                        return ret;
@@ -672,6 +688,12 @@ set_assoc:
                sk->name = *sa;
        /* Set default destination. */
        sk->peer = *sa;
+       if (sa->sipc_port < 0)
+               __clear_bit(_IDDP_CONNECTED, &sk->status);
+       else
+               __set_bit(_IDDP_CONNECTED, &sk->status);
+       if (resched)
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -840,10 +862,10 @@ static int __iddp_getsockopt(struct iddp_socket *sk,
        return ret;
 }
 
-static int __iddp_ioctl(struct rtipc_private *priv,
-                       struct rtdm_fd *fd,
+static int __iddp_ioctl(struct rtdm_fd *fd,
                        unsigned int request, void *arg)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct sockaddr_ipc saddr, *saddrp = &saddr;
        struct iddp_socket *sk = priv->state;
        int ret = 0;
@@ -863,7 +885,7 @@ static int __iddp_ioctl(struct rtipc_private *priv,
                        return ret;
                if (saddrp == NULL)
                        return -EFAULT;
-               ret = __iddp_bind_socket(priv, saddrp);
+               ret = __iddp_bind_socket(fd, saddrp);
                break;
 
        case _RTIOC_GETSOCKNAME:
@@ -898,14 +920,13 @@ static int __iddp_ioctl(struct rtipc_private *priv,
        return ret;
 }
 
-static int iddp_ioctl(struct rtipc_private *priv,
-                     struct rtdm_fd *fd,
+static int iddp_ioctl(struct rtdm_fd *fd,
                      unsigned int request, void *arg)
 {
        if (rtdm_in_rt_context() && request == _RTIOC_BIND)
                return -ENOSYS; /* Try downgrading to NRT */
 
-       return __iddp_ioctl(priv, fd, request, arg);
+       return __iddp_ioctl(fd, request, arg);
 }
 
 static int iddp_init(void)
@@ -925,6 +946,42 @@ static void iddp_exit(void)
        xnmap_delete(portmap);
 }
 
+static unsigned int iddp_pollstate(struct rtdm_fd *fd)
+{
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       struct iddp_socket *sk = priv->state;
+       unsigned int mask = 0;
+       struct rtdm_fd *rfd;
+       spl_t s;
+
+       cobalt_atomic_enter(s);
+
+       if (test_bit(_IDDP_BOUND, &sk->status) && !list_empty(&sk->inq))
+               mask |= POLLIN;
+
+       /*
+        * If the socket is connected, POLLOUT means that the peer
+        * exists. Otherwise POLLOUT is always set, assuming the
+        * client is likely to use explicit addressing in send
+        * operations.
+        *
+        * If the peer exists, we still can't really know whether
+        * writing to the socket would block as it depends on the
+        * message size and other highly dynamic factors, so pretend
+        * it would not.
+        */
+       if (test_bit(_IDDP_CONNECTED, &sk->status)) {
+               rfd = xnmap_fetch_nocheck(portmap, sk->peer.sipc_port);
+               if (rfd)
+                       mask |= POLLOUT;
+       } else
+               mask |= POLLOUT;
+
+       cobalt_atomic_leave(s);
+
+       return mask;
+}
+
 struct rtipc_protocol iddp_proto_driver = {
        .proto_name = "iddp",
        .proto_statesz = sizeof(struct iddp_socket),
@@ -938,5 +995,6 @@ struct rtipc_protocol iddp_proto_driver = {
                .read = iddp_read,
                .write = iddp_write,
                .ioctl = iddp_ioctl,
+               .pollstate = iddp_pollstate,
        }
 };
diff --git a/kernel/drivers/ipc/internal.h b/kernel/drivers/ipc/internal.h
index d388b73..b857a40 100644
--- a/kernel/drivers/ipc/internal.h
+++ b/kernel/drivers/ipc/internal.h
@@ -17,12 +17,12 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
-
 #ifndef _RTIPC_INTERNAL_H
 #define _RTIPC_INTERNAL_H
 
 #include <cobalt/kernel/registry.h>
 #include <cobalt/kernel/clock.h>
+#include <cobalt/kernel/select.h>
 #include <rtdm/rtdm.h>
 #include <rtdm/driver.h>
 
@@ -32,6 +32,8 @@ struct rtipc_protocol;
 
 struct rtipc_private {
        struct rtipc_protocol *proto;
+       DECLARE_XNSELECT(send_block);
+       DECLARE_XNSELECT(recv_block);
        void *state;
 };
 
@@ -41,25 +43,19 @@ struct rtipc_protocol {
        int (*proto_init)(void);
        void (*proto_exit)(void);
        struct {
-               int (*socket)(struct rtipc_private *priv,
-                             struct rtdm_fd *fd);
-               void (*close)(struct rtipc_private *priv,
-                       struct rtdm_fd *fd);
-               ssize_t (*recvmsg)(struct rtipc_private *priv,
-                                  struct rtdm_fd *fd,
+               int (*socket)(struct rtdm_fd *fd);
+               void (*close)(struct rtdm_fd *fd);
+               ssize_t (*recvmsg)(struct rtdm_fd *fd,
                                   struct msghdr *msg, int flags);
-               ssize_t (*sendmsg)(struct rtipc_private *priv,
-                                  struct rtdm_fd *fd,
+               ssize_t (*sendmsg)(struct rtdm_fd *fd,
                                   const struct msghdr *msg, int flags);
-               ssize_t (*read)(struct rtipc_private *priv,
-                               struct rtdm_fd *fd,
+               ssize_t (*read)(struct rtdm_fd *fd,
                                void *buf, size_t len);
-               ssize_t (*write)(struct rtipc_private *priv,
-                                struct rtdm_fd *fd,
+               ssize_t (*write)(struct rtdm_fd *fd,
                                 const void *buf, size_t len);
-               int (*ioctl)(struct rtipc_private *priv,
-                            struct rtdm_fd *fd,
+               int (*ioctl)(struct rtdm_fd *fd,
                             unsigned int request, void *arg);
+               unsigned int (*pollstate)(struct rtdm_fd *fd);
        } proto_ops;
 };
 
diff --git a/kernel/drivers/ipc/rtipc.c b/kernel/drivers/ipc/rtipc.c
index 85c3143..fb2aada 100644
--- a/kernel/drivers/ipc/rtipc.c
+++ b/kernel/drivers/ipc/rtipc.c
@@ -17,10 +17,10 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
-
 #include <linux/module.h>
 #include <linux/init.h>
 #include <linux/slab.h>
+#include <linux/poll.h>
 #include <rtdm/ipc.h>
 #include "internal.h"
 
@@ -135,7 +135,7 @@ ssize_t rtipc_get_iov_flatlen(struct iovec *iov, int iovlen)
 static int rtipc_socket(struct rtdm_fd *fd, int protocol)
 {
        struct rtipc_protocol *proto;
-       struct rtipc_private *p;
+       struct rtipc_private *priv;
        int ret;
 
        if (protocol < 0 || protocol >= IPCPROTO_MAX)
@@ -149,65 +149,107 @@ static int rtipc_socket(struct rtdm_fd *fd, int protocol)
        if (proto == NULL)      /* Not compiled in? */
                return -ENOPROTOOPT;
 
-       p = rtdm_fd_to_private(fd);
-       p->proto = proto;
-       p->state = kmalloc(proto->proto_statesz, GFP_KERNEL);
-       if (p->state == NULL)
+       priv = rtdm_fd_to_private(fd);
+       priv->proto = proto;
+       priv->state = kmalloc(proto->proto_statesz, GFP_KERNEL);
+       if (priv->state == NULL)
                return -ENOMEM;
 
-       ret = proto->proto_ops.socket(p, fd);
+       xnselect_init(&priv->send_block);
+       xnselect_init(&priv->recv_block);
+
+       ret = proto->proto_ops.socket(fd);
        if (ret)
-               kfree(p->state);
+               kfree(priv->state);
 
        return ret;
 }
 
 static void rtipc_close(struct rtdm_fd *fd)
 {
-       struct rtipc_private *p;
-
-       p = rtdm_fd_to_private(fd);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        /*
-        * CAUTION: p->state shall be released by the
+        * CAUTION: priv->state shall be released by the
         * proto_ops.close() handler when appropriate (which may be
         * done asynchronously later, see XDDP).
         */
-       p->proto->proto_ops.close(p, fd);
+       priv->proto->proto_ops.close(fd);
+       xnselect_destroy(&priv->recv_block);
+       xnselect_destroy(&priv->send_block);
 }
 
 static ssize_t rtipc_recvmsg(struct rtdm_fd *fd,
                             struct msghdr *msg, int flags)
 {
-       struct rtipc_private *p = rtdm_fd_to_private(fd);
-       return p->proto->proto_ops.recvmsg(p, fd, msg, flags);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       return priv->proto->proto_ops.recvmsg(fd, msg, flags);
 }
 
 static ssize_t rtipc_sendmsg(struct rtdm_fd *fd,
                             const struct msghdr *msg, int flags)
 {
-       struct rtipc_private *p = rtdm_fd_to_private(fd);
-       return p->proto->proto_ops.sendmsg(p, fd, msg, flags);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       return priv->proto->proto_ops.sendmsg(fd, msg, flags);
 }
 
 static ssize_t rtipc_read(struct rtdm_fd *fd,
                          void *buf, size_t len)
 {
-       struct rtipc_private *p = rtdm_fd_to_private(fd);
-       return p->proto->proto_ops.read(p, fd, buf, len);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       return priv->proto->proto_ops.read(fd, buf, len);
 }
 
 static ssize_t rtipc_write(struct rtdm_fd *fd,
                           const void *buf, size_t len)
 {
-       struct rtipc_private *p = rtdm_fd_to_private(fd);
-       return p->proto->proto_ops.write(p, fd, buf, len);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       return priv->proto->proto_ops.write(fd, buf, len);
 }
 
 static int rtipc_ioctl(struct rtdm_fd *fd,
                       unsigned int request, void *arg)
 {
-       struct rtipc_private *p = rtdm_fd_to_private(fd);
-       return p->proto->proto_ops.ioctl(p, fd, request, arg);
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       return priv->proto->proto_ops.ioctl(fd, request, arg);
+}
+
+static int rtipc_select_bind(struct rtdm_fd *fd, struct xnselector *selector,
+                            unsigned int type, unsigned int index)
+{
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       struct xnselect_binding *binding;
+       unsigned int pollstate, mask;
+       struct xnselect *block;
+       spl_t s;
+       int ret;
+       
+       pollstate = priv->proto->proto_ops.pollstate(fd);
+
+       switch (type) {
+       case XNSELECT_READ:
+               mask = pollstate & POLLIN;
+               block = &priv->recv_block;
+               break;
+       case XNSELECT_WRITE:
+               mask = pollstate & POLLOUT;
+               block = &priv->send_block;
+               break;
+       default:
+               return -EINVAL;
+       }
+
+       binding = xnmalloc(sizeof(*binding));
+       if (binding == NULL)
+               return -ENOMEM;
+
+       xnlock_get_irqsave(&nklock, s);
+       ret = xnselect_bind(block, binding, selector, type, index, mask);
+       xnlock_put_irqrestore(&nklock, s);
+
+       if (ret)
+               xnfree(binding);
+
+       return ret;
 }
 
 static struct rtdm_device device = {
@@ -230,6 +272,7 @@ static struct rtdm_device device = {
                .read_nrt       =       NULL,
                .write_rt       =       rtipc_write,
                .write_nrt      =       NULL,
+               .select_bind    =       rtipc_select_bind,
        },
        .device_class           =       RTDM_CLASS_RTIPC,
        .device_sub_class       =       RTDM_SUBCLASS_GENERIC,
diff --git a/kernel/drivers/ipc/xddp.c b/kernel/drivers/ipc/xddp.c
index e085e31..649931c 100644
--- a/kernel/drivers/ipc/xddp.c
+++ b/kernel/drivers/ipc/xddp.c
@@ -17,13 +17,13 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  */
-
 #include <linux/module.h>
 #include <linux/string.h>
 #include <linux/slab.h>
 #include <cobalt/kernel/heap.h>
 #include <cobalt/kernel/bufd.h>
 #include <cobalt/kernel/pipe.h>
+#include <linux/poll.h>
 #include <rtdm/ipc.h>
 #include "internal.h"
 
@@ -72,6 +72,7 @@ static struct rtdm_fd *portmap[CONFIG_XENO_OPT_PIPE_NRDEV]; 
/* indexes RTDM fild
 #define _XDDP_ATOMIC    1
 #define _XDDP_BINDING   2
 #define _XDDP_BOUND     3
+#define _XDDP_CONNECTED 4
 
 #ifdef CONFIG_XENO_OPT_VFILE
 
@@ -211,9 +212,9 @@ static void __xddp_release_handler(void *skarg) /* nklock 
free */
        kfree(sk);
 }
 
-static int xddp_socket(struct rtipc_private *priv,
-                      struct rtdm_fd *fd)
+static int xddp_socket(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct xddp_socket *sk = priv->state;
 
        sk->magic = XDDP_SOCKET_MAGIC;
@@ -238,9 +239,9 @@ static int xddp_socket(struct rtipc_private *priv,
        return 0;
 }
 
-static void xddp_close(struct rtipc_private *priv,
-                     struct rtdm_fd *fd)
+static void xddp_close(struct rtdm_fd *fd)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct xddp_socket *sk = priv->state;
        rtdm_lockctx_t s;
 
@@ -259,11 +260,11 @@ static void xddp_close(struct rtipc_private *priv,
        xnpipe_disconnect(sk->minor);
 }
 
-static ssize_t __xddp_recvmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __xddp_recvmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              struct sockaddr_ipc *saddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct xddp_message *mbuf = NULL; /* Fake GCC */
        struct xddp_socket *sk = priv->state;
        ssize_t maxlen, len, wrlen, vlen;
@@ -271,6 +272,7 @@ static ssize_t __xddp_recvmsg(struct rtipc_private *priv,
        struct xnpipe_mh *mh;
        int nvec, rdoff, ret;
        struct xnbufd bufd;
+       spl_t s;
 
        if (!test_bit(_XDDP_BOUND, &sk->status))
                return -EAGAIN;
@@ -316,15 +318,18 @@ static ssize_t __xddp_recvmsg(struct rtipc_private *priv,
                wrlen -= vlen;
                rdoff += vlen;
        }
-
 out:
        xnheap_free(sk->bufpool, mbuf);
+       cobalt_atomic_enter(s);
+       if ((__xnpipe_pollstate(sk->minor) & POLLIN) == 0 &&
+           xnselect_signal(&priv->recv_block, 0))
+               xnsched_run();
+       cobalt_atomic_leave(s);
 
        return ret ?: len;
 }
 
-static ssize_t xddp_recvmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t xddp_recvmsg(struct rtdm_fd *fd,
                            struct msghdr *msg, int flags)
 {
        struct iovec iov[RTIPC_IOV_MAX];
@@ -348,8 +353,7 @@ static ssize_t xddp_recvmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __xddp_recvmsg(priv, fd,
-                            iov, msg->msg_iovlen, flags, &saddr);
+       ret = __xddp_recvmsg(fd, iov, msg->msg_iovlen, flags, &saddr);
        if (ret <= 0)
                return ret;
 
@@ -369,12 +373,11 @@ static ssize_t xddp_recvmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t xddp_read(struct rtipc_private *priv,
-                        struct rtdm_fd *fd,
-                        void *buf, size_t len)
+static ssize_t xddp_read(struct rtdm_fd *fd, void *buf, size_t len)
 {
        struct iovec iov = { .iov_base = buf, .iov_len = len };
-       return __xddp_recvmsg(priv, fd, &iov, 1, 0, NULL);
+
+       return __xddp_recvmsg(fd, &iov, 1, 0, NULL);
 }
 
 static ssize_t __xddp_stream(struct xddp_socket *sk,
@@ -451,11 +454,11 @@ out:
        return outbytes;
 }
 
-static ssize_t __xddp_sendmsg(struct rtipc_private *priv,
-                             struct rtdm_fd *fd,
+static ssize_t __xddp_sendmsg(struct rtdm_fd *fd,
                              struct iovec *iov, int iovlen, int flags,
                              const struct sockaddr_ipc *daddr)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        ssize_t len, rdlen, wrlen, vlen, ret, sublen;
        struct xddp_socket *sk = priv->state;
        struct xddp_message *mbuf;
@@ -528,8 +531,8 @@ static ssize_t __xddp_sendmsg(struct rtipc_private *priv,
                                goto nostream;
                        }
                }
-               rtdm_fd_unlock(rfd);
-               return wrlen;
+               len = wrlen;
+               goto done;
        }
 
 nostream:
@@ -575,16 +578,22 @@ nostream:
                rtdm_fd_unlock(rfd);
                return ret;
        }
+ done:
+       cobalt_atomic_enter(s);
+       if ((__xnpipe_pollstate(rsk->minor) & POLLIN) != 0 &&
+           xnselect_signal(&rsk->priv->recv_block, POLLIN))
+               xnsched_run();
+       cobalt_atomic_leave(s);
 
        rtdm_fd_unlock(rfd);
 
        return len;
 }
 
-static ssize_t xddp_sendmsg(struct rtipc_private *priv,
-                           struct rtdm_fd *fd,
+static ssize_t xddp_sendmsg(struct rtdm_fd *fd,
                            const struct msghdr *msg, int flags)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct xddp_socket *sk = priv->state;
        struct iovec iov[RTIPC_IOV_MAX];
        struct sockaddr_ipc daddr;
@@ -633,8 +642,7 @@ static ssize_t xddp_sendmsg(struct rtipc_private *priv,
                          sizeof(iov[0]) * msg->msg_iovlen))
                return -EFAULT;
 
-       ret = __xddp_sendmsg(priv, fd, iov,
-                            msg->msg_iovlen, flags, &daddr);
+       ret = __xddp_sendmsg(fd, iov, msg->msg_iovlen, flags, &daddr);
        if (ret <= 0)
                return ret;
 
@@ -646,17 +654,17 @@ static ssize_t xddp_sendmsg(struct rtipc_private *priv,
        return ret;
 }
 
-static ssize_t xddp_write(struct rtipc_private *priv,
-                         struct rtdm_fd *fd,
+static ssize_t xddp_write(struct rtdm_fd *fd,
                          const void *buf, size_t len)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct iovec iov = { .iov_base = (void *)buf, .iov_len = len };
        struct xddp_socket *sk = priv->state;
 
        if (sk->peer.sipc_port < 0)
                return -EDESTADDRREQ;
 
-       return __xddp_sendmsg(priv, fd, &iov, 1, 0, &sk->peer);
+       return __xddp_sendmsg(fd, &iov, 1, 0, &sk->peer);
 }
 
 static int __xddp_bind_socket(struct rtipc_private *priv,
@@ -759,6 +767,8 @@ static int __xddp_bind_socket(struct rtipc_private *priv,
        portmap[sk->minor] = rtdm_private_to_fd(priv);
        __clear_bit(_XDDP_BINDING, &sk->status);
        __set_bit(_XDDP_BOUND, &sk->status);
+       if (xnselect_signal(&priv->send_block, POLLOUT))
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -768,9 +778,9 @@ static int __xddp_connect_socket(struct xddp_socket *sk,
                                 struct sockaddr_ipc *sa)
 {
        struct xddp_socket *rsk;
+       int ret, resched = 0;
        rtdm_lockctx_t s;
        xnhandle_t h;
-       int ret;
 
        if (sa == NULL) {
                sa = &nullsa;
@@ -810,9 +820,11 @@ static int __xddp_connect_socket(struct xddp_socket *sk,
                rsk = xnregistry_lookup(h, NULL);
                if (rsk == NULL || rsk->magic != XDDP_SOCKET_MAGIC)
                        ret = -EINVAL;
-               else
+               else {
                        /* Fetch labeled port number. */
                        sa->sipc_port = rsk->minor;
+                       resched = xnselect_signal(&sk->priv->send_block, 
POLLOUT);
+               }
                cobalt_atomic_leave(s);
                if (ret)
                        return ret;
@@ -825,6 +837,12 @@ set_assoc:
                sk->name = *sa;
        /* Set default destination. */
        sk->peer = *sa;
+       if (sa->sipc_port < 0)
+               __clear_bit(_XDDP_CONNECTED, &sk->status);
+       else
+               __set_bit(_XDDP_CONNECTED, &sk->status);
+       if (resched)
+               xnsched_run();
        cobalt_atomic_leave(s);
 
        return 0;
@@ -1003,10 +1021,10 @@ static int __xddp_getsockopt(struct xddp_socket *sk,
        return ret;
 }
 
-static int __xddp_ioctl(struct rtipc_private *priv,
-                       struct rtdm_fd *fd,
+static int __xddp_ioctl(struct rtdm_fd *fd,
                        unsigned int request, void *arg)
 {
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
        struct sockaddr_ipc saddr, *saddrp = &saddr;
        struct xddp_socket *sk = priv->state;
        int ret = 0;
@@ -1060,14 +1078,47 @@ static int __xddp_ioctl(struct rtipc_private *priv,
        return ret;
 }
 
-static int xddp_ioctl(struct rtipc_private *priv,
-                     struct rtdm_fd *fd,
+static int xddp_ioctl(struct rtdm_fd *fd,
                      unsigned int request, void *arg)
 {
        if (rtdm_in_rt_context() && request == _RTIOC_BIND)
                return -ENOSYS; /* Try downgrading to NRT */
 
-       return __xddp_ioctl(priv, fd, request, arg);
+       return __xddp_ioctl(fd, request, arg);
+}
+
+static unsigned int xddp_pollstate(struct rtdm_fd *fd)
+{
+       struct rtipc_private *priv = rtdm_fd_to_private(fd);
+       struct xddp_socket *sk = priv->state, *rsk;
+       unsigned int mask = 0, pollstate;
+       struct rtdm_fd *rfd;
+       spl_t s;
+
+       cobalt_atomic_enter(s);
+
+       pollstate = __xnpipe_pollstate(sk->minor);
+       if (test_bit(_XDDP_BOUND, &sk->status))
+               mask |= (pollstate & POLLIN);
+
+       /*
+        * If the socket is connected, POLLOUT means that the peer
+        * exists, is bound and can receive data. Otherwise POLLOUT is
+        * always set, assuming the client is likely to use explicit
+        * addressing in send operations.
+        */
+       if (test_bit(_XDDP_CONNECTED, &sk->status)) {
+               rfd = portmap[sk->peer.sipc_port];
+               if (rfd) {
+                       rsk = rtipc_fd_to_state(rfd);
+                       mask |= (pollstate & POLLOUT);
+               }
+       } else
+               mask |= POLLOUT;
+
+       cobalt_atomic_leave(s);
+
+       return mask;
 }
 
 struct rtipc_protocol xddp_proto_driver = {
@@ -1081,5 +1132,6 @@ struct rtipc_protocol xddp_proto_driver = {
                .read = xddp_read,
                .write = xddp_write,
                .ioctl = xddp_ioctl,
+               .pollstate = xddp_pollstate,
        }
 };


_______________________________________________
Xenomai-git mailing list
Xenomai-git@xenomai.org
http://www.xenomai.org/mailman/listinfo/xenomai-git

Reply via email to