Module: xenomai-head
Branch: master
Commit: c3e358adf3ff558349b4636af818e82d911fe2e4
URL:    
http://git.xenomai.org/?p=xenomai-head.git;a=commit;h=c3e358adf3ff558349b4636af818e82d911fe2e4

Author: Philippe Gerum <r...@xenomai.org>
Date:   Wed Jul 22 10:40:37 2009 +0200

native: introduce lockless data copy to/from buffers

---

 include/native/buffer.h    |   11 +-
 ksrc/skins/native/buffer.c |  261 +++++++++++++++++++++++++++++++-------------
 2 files changed, 193 insertions(+), 79 deletions(-)

diff --git a/include/native/buffer.h b/include/native/buffer.h
index 064d3fd..c7ac16d 100644
--- a/include/native/buffer.h
+++ b/include/native/buffer.h
@@ -25,8 +25,8 @@
 #include <native/types.h>
 
 /* Creation flags. */
-#define B_PRIO  XNSYNCH_PRIO   /* Pend by task priority order. */
-#define B_FIFO  XNSYNCH_FIFO   /* Pend by FIFO order. */
+#define B_PRIO   XNSYNCH_PRIO  /* Pend by task priority order. */
+#define B_FIFO   XNSYNCH_FIFO  /* Pend by FIFO order. */
 
 typedef struct rt_buffer_info {
 
@@ -62,10 +62,13 @@ typedef struct rt_buffer {
        char name[XNOBJECT_NAME_LEN]; /* !< Symbolic name. */
 
        int mode;               /* !< Creation mode. */
-       int rptr;               /* !< Read pointer. */
-       int wptr;               /* !< Write pointer. */
+       off_t rdoff;            /* !< Read offset. */
+       off_t wroff;            /* !< Write offset. */
        size_t fillsz;          /* !< Filled space. */
 
+       u_long wrtoken;         /* !< Write token. */
+       u_long rdtoken;         /* !< Read token. */
+
        size_t bufsz;           /* !< Buffer size. */
        void *bufmem;           /* !< Buffer space. */
 
diff --git a/ksrc/skins/native/buffer.c b/ksrc/skins/native/buffer.c
index dee0c5d..ecb8897 100644
--- a/ksrc/skins/native/buffer.c
+++ b/ksrc/skins/native/buffer.c
@@ -205,9 +205,11 @@ int rt_buffer_create(RT_BUFFER *bf, const char *name, 
size_t bufsz, int mode)
 
        bf->mode = mode;
        bf->bufsz = bufsz;
-       bf->rptr = 0;
-       bf->wptr = 0;
+       bf->rdoff = 0;
+       bf->wroff = 0;
        bf->fillsz = 0;
+       bf->rdtoken = 0;
+       bf->wrtoken = 0;
 
 #ifdef CONFIG_XENO_OPT_PERVASIVE
        bf->cpid = 0;
@@ -309,9 +311,10 @@ ssize_t rt_buffer_write_inner(RT_BUFFER *bf,
                              xntmode_t timeout_mode, RTIME timeout)
 {
        xnthread_t *thread, *waiter;
-       size_t rbytes, n, nsum;
-       int resched = 0;
-       ssize_t ret = 0;
+       off_t rdoff, wroff;
+       size_t rbytes, n;
+       u_long wrtoken;
+       ssize_t ret;
        spl_t s;
 
        xnlock_get_irqsave(&nklock, s);
@@ -322,12 +325,18 @@ ssize_t rt_buffer_write_inner(RT_BUFFER *bf,
                goto unlock_and_exit;
        }
 
+       /*
+        * We may only send complete messages, so there is no point in
+        * accepting messages which are larger than what the buffer
+        * can hold.
+        */
        if (size > bf->bufsz) {
                ret = -EINVAL;
                goto unlock_and_exit;
        }
 
-       if (size == 0)
+       ret = (ssize_t)size;
+       if (ret == 0)
                goto unlock_and_exit;
 
        if (timeout_mode == XN_RELATIVE &&
@@ -341,8 +350,6 @@ ssize_t rt_buffer_write_inner(RT_BUFFER *bf,
                timeout += xntbase_get_time(__native_tbase);
        }
 
-       rbytes = size;
-
        /*
         * Let's optimize the case where the buffer is empty on entry,
         * and the leading task blocked on the input queue could be
@@ -351,7 +358,6 @@ ssize_t rt_buffer_write_inner(RT_BUFFER *bf,
         * instead of having it transit through the buffer. Otherwise,
         * we simply accumulate the data into the buffer.
         */
-
        if (bf->fillsz > 0)
                /* Buffer has to be empty to keep FIFO ordering. */
                goto accumulate;
@@ -379,45 +385,73 @@ ssize_t rt_buffer_write_inner(RT_BUFFER *bf,
 
        /* Some bytes were not consumed, move them to the buffer. */
        ptr = (caddr_t)ptr + n;
-       rbytes -= n;
-       size = rbytes;
-       resched = 1;    /* Rescheduling is pending. */
+       size -= n;
 
 accumulate:
 
        for (;;) {
-               /* We should be able to copy the entire message, or block. */
-               if (bf->fillsz + size <= bf->bufsz) {
-                       nsum = 0;
-                       do {
-                               if (bf->wptr + rbytes > bf->bufsz)
-                                       n = bf->bufsz - bf->wptr;
-                               else
-                                       n = rbytes;
-                               memcpy((caddr_t)bf->bufmem + bf->wptr,
-                                      (caddr_t)ptr + nsum, n);
-                               bf->wptr = (bf->wptr + n) % bf->bufsz;
-                               nsum += n;
-                               rbytes -= n;
-                       } while (rbytes > 0);
-                       bf->fillsz += size;
-                       ret = size;
+               /*
+                * We should be able to copy the entire message at
+                * once, or block.
+                */
+               if (bf->fillsz + size > bf->bufsz)
+                       goto wait;
 
+               /*
+                * Draw the next write token so that we can later
+                * detect preemption.
+                */
+               wrtoken = ++bf->wrtoken;
+
+               /* Write to the buffer in a circular way. */
+               rdoff = 0;
+               wroff = bf->wroff;
+               rbytes = size;
+
+               do {
+                       if (wroff + rbytes > bf->bufsz)
+                               n = bf->bufsz - wroff;
+                       else
+                               n = rbytes;
                        /*
-                        * Wake up all threads pending on the input
-                        * wait queue, if we accumulated enough data
-                        * to feed the leading one.
+                        * Release the nklock while copying the source
+                        * data to keep latency low.
                         */
-                       waiter = xnsynch_peek_pendq(&bf->isynch_base);
-                       if (waiter && waiter->wait_u.buffer.size <= bf->fillsz) 
{
-                               if (xnsynch_flush(&bf->isynch_base, 0) == 
XNSYNCH_RESCHED) {
-                                       xnpod_schedule();
-                                       resched = 0;
-                               }
-                       }
-                       break;
+                       xnlock_put_irqrestore(&nklock, s);
+
+                       memcpy((caddr_t)bf->bufmem + wroff,
+                              (caddr_t)ptr + rdoff, n);
+
+                       xnlock_get_irqsave(&nklock, s);
+                       /*
+                        * In case we were preempted while writing
+                        * the message, we have to resend the whole
+                        * thing.
+                        */
+                       if (bf->wrtoken != wrtoken)
+                               goto accumulate;
+
+                       wroff = (wroff + n) % bf->bufsz;
+                       rdoff += n;
+                       rbytes -= n;
+               } while (rbytes > 0);
+
+               bf->fillsz += size;
+               bf->wroff = wroff;
+
+               /*
+                * Wake up all threads pending on the input wait
+                * queue, if we accumulated enough data to feed the
+                * leading one.
+                */
+               waiter = xnsynch_peek_pendq(&bf->isynch_base);
+               if (waiter && waiter->wait_u.buffer.size <= bf->fillsz) {
+                       if (xnsynch_flush(&bf->isynch_base, 0) == 
XNSYNCH_RESCHED)
+                               xnpod_schedule();
                }
 
+               break;
+       wait:
                if (timeout_mode == XN_RELATIVE && timeout == TM_NONBLOCK) {
                        ret = -EWOULDBLOCK;
                        break;
@@ -444,13 +478,16 @@ accumulate:
                        ret = -EINTR;   /* Unblocked. */
                        break;
                }
-               resched = 0;
        }
 
       unlock_and_exit:
 
-       if (resched)
-               xnpod_schedule();
+       /*
+        * xnpod_schedule() is smarter than us; it will detect any
+        * worthless call inline and won't branch to the rescheduling
+        * code in such a case.
+        */
+       xnpod_schedule();
 
        xnlock_put_irqrestore(&nklock, s);
 
@@ -462,8 +499,10 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
                             xntmode_t timeout_mode, RTIME timeout)
 {
        xnthread_t *thread, *waiter;
-       size_t rbytes, n, nsum;
-       ssize_t ret = 0;
+       off_t rdoff, wroff;
+       size_t rbytes, n;
+       u_long rdtoken;
+       ssize_t ret;
        spl_t s;
 
        xnlock_get_irqsave(&nklock, s);
@@ -474,13 +513,20 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
                goto unlock_and_exit;
        }
 
+       /*
+        * We may only return complete messages to readers, so there
+        * is no point in waiting for messages which are larger than
+        * what the buffer can hold.
+        */
        if (size > bf->bufsz) {
                ret = -EINVAL;
                goto unlock_and_exit;
        }
 
-       if (size == 0)
+       if (size == 0) {
+               ret = 0;
                goto unlock_and_exit;
+       }
 
        if (timeout_mode == XN_RELATIVE &&
            timeout != TM_NONBLOCK && timeout != TM_INFINITE) {
@@ -492,42 +538,73 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
                timeout += xntbase_get_time(__native_tbase);
        }
 
+pull:
+
        for (;;) {
                /*
                 * We should be able to read a complete message of the
                 * requested size, or block.
                 */
-               if (bf->fillsz >= size) {
-                       rbytes = size;
-                       nsum = 0;
-                       do {
-                               if (bf->rptr + rbytes > bf->bufsz)
-                                       n = bf->bufsz - bf->rptr;
-                               else
-                                       n = rbytes;
-                               memcpy((caddr_t)ptr + nsum,
-                                      (caddr_t)bf->bufmem + bf->rptr, n);
-                               bf->rptr = (bf->rptr + n) % bf->bufsz;
-                               nsum += n;
-                               rbytes -= n;
-                       } while (rbytes > 0);
-
-                       bf->fillsz -= size;
-                       ret = size;
+               if (bf->fillsz < size)
+                       goto wait;
+
+               /*
+                * Draw the next read token so that we can later
+                * detect preemption.
+                */
+               rdtoken = ++bf->rdtoken;
+
+               /* Read from the buffer in a circular way. */
+               wroff = 0;
+               rdoff = bf->rdoff;
+               rbytes = size;
+
+               do {
+                       if (rdoff + rbytes > bf->bufsz)
+                               n = bf->bufsz - rdoff;
+                       else
+                               n = rbytes;
+                       /*
+                        * Release the nklock while retrieving the
+                        * data to keep latency low.
+                        */
 
+                       xnlock_put_irqrestore(&nklock, s);
+
+                       memcpy((caddr_t)ptr + wroff,
+                              (caddr_t)bf->bufmem + rdoff, n);
+
+                       xnlock_get_irqsave(&nklock, s);
                        /*
-                        * Wake up all threads pending on the output
-                        * wait queue, if we freed enough room for the
-                        * leading one to post its message.
+                        * In case we were preempted while retrieving
+                        * the message, we have to re-read the whole
+                        * thing.
                         */
-                       waiter = xnsynch_peek_pendq(&bf->osynch_base);
-                       if (waiter && waiter->wait_u.buffer.size + bf->fillsz 
<= bf->bufsz) {
-                               if (xnsynch_flush(&bf->osynch_base, 0) == 
XNSYNCH_RESCHED)
-                                       xnpod_schedule();
-                       }
-                       break;
+                       if (bf->rdtoken != rdtoken)
+                               goto pull;
+
+                       rdoff = (rdoff + n) % bf->bufsz;
+                       wroff += n;
+                       rbytes -= n;
+               } while (rbytes > 0);
+
+               bf->fillsz -= size;
+               bf->rdoff = rdoff;
+               ret = (ssize_t)size;
+
+               /*
+                * Wake up all threads pending on the output wait
+                * queue, if we freed enough room for the leading one
+                * to post its message.
+                */
+               waiter = xnsynch_peek_pendq(&bf->osynch_base);
+               if (waiter && waiter->wait_u.buffer.size + bf->fillsz <= 
bf->bufsz) {
+                       if (xnsynch_flush(&bf->osynch_base, 0) == 
XNSYNCH_RESCHED)
+                               xnpod_schedule();
                }
 
+               break;
+       wait:
                if (timeout_mode == XN_RELATIVE && timeout == TM_NONBLOCK) {
                        ret = -EWOULDBLOCK;
                        break;
@@ -538,6 +615,18 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
                        break;
                }
 
+               /*
+                * Check whether writers are already waiting for
+                * sending data, while we are about to wait for
+                * receiving some. In such a case, we have a
+                * pathological use of the buffer. We must allow for a
+                * short read to prevent a deadlock.
+                */
+               if (xnsynch_nsleepers(&bf->osynch_base) > 0) {
+                       size = bf->fillsz;
+                       goto pull;
+               }
+
                thread = xnpod_current_thread();
                thread->wait_u.buffer.ptr = ptr;
                thread->wait_u.buffer.size = size;
@@ -577,7 +666,8 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
  * space is available on entry to hold the message, the caller is
  * allowed to block until enough room is freed. Data written by
  * rt_buffer_write() calls can be read in FIFO order by subsequent
- * rt_buffer_read() calls.
+ * rt_buffer_read() calls. Messages sent via rt_buffer_write() are
+ * handled atomically (no interleave).
  *
  * @param bf The descriptor address of the buffer to write to.
  *
@@ -586,7 +676,7 @@ ssize_t rt_buffer_read_inner(RT_BUFFER *bf,
  *
  * @param size The size in bytes of the message data. Zero is a valid
  * value, in which case the buffer is left untouched, and zero is
- * returned to the caller.
+ * returned to the caller. No partial message is ever sent.
  *
  * @param timeout The number of clock ticks to wait for enough buffer
  * space to be available to hold the message (see note). Passing
@@ -697,6 +787,27 @@ ssize_t rt_buffer_write(RT_BUFFER *bf, const void *ptr, 
size_t size, RTIME timeo
  * system heap to hold a temporary copy of the message (user-space
  * call only).
  *
+ * A short read (i.e. fewer bytes returned than requested by @a size)
+ * may happen whenever a pathological use of the buffer is
+ * encountered. This condition only arises when the system detects
+ * that one or more writers are waiting for sending data, while a
+ * reader would have to wait for receiving a complete message at the
+ * same time. For instance, consider the following sequence, involving
+ * a 1024-byte buffer (bf) and two threads:
+ *
+ * writer thread > rt_write_buffer(&bf, ptr, 1, TM_INFINITE);
+ *        (one byte to read, 1023 bytes available for sending)
+ * writer thread > rt_write_buffer(&bf, ptr, 1024, TM_INFINITE);
+ *        (writer blocks - no space for another 1024-byte message)
+ * reader thread > rt_read_buffer(&bf, ptr, 1024, TM_INFINITE);
+ *        (short read - a truncated 1023-byte message is returned)
+ *
+ * In order to prevent both threads to wait for each other
+ * indefinitely, a short read is allowed, which may be completed by a
+ * subsequent call to rt_buffer_read().  If that case arises, thread
+ * priorities, buffer and/or message sizes should likely be fixed, in
+ * order to eliminate such condition.
+ *
  * Environments:
  *
  * This service can be called from:
@@ -911,8 +1022,8 @@ int rt_buffer_clear(RT_BUFFER *bf)
                goto unlock_and_exit;
        }
 
-       bf->wptr = 0;
-       bf->rptr = 0;
+       bf->wroff = 0;
+       bf->rdoff = 0;
        bf->fillsz = 0;
 
        if (xnsynch_flush(&bf->osynch_base, 0) == XNSYNCH_RESCHED)


_______________________________________________
Xenomai-git mailing list
Xenomai-git@gna.org
https://mail.gna.org/listinfo/xenomai-git

Reply via email to