On Fri, 2006-09-08 at 15:59 +0200, Philippe Gerum wrote:
> On Thu, 2006-09-07 at 14:53 -0500, Jeff Webb wrote:
> > Thanks for your input, Jan.
> >
> > Jan Kiszka wrote:
[...]
> > > Allocation in Xenomai pipes works differently...
> >
> > Is there some reason the RTAI FIFO emulation is not handled the same way?
> > I believe the real RTL and RTAI FIFOs are handled like the Xenomai pipes
> > are done now -- using kmalloc or vmalloc:
> >
[...]
> This patch on top of 2.2.2 should fix the leakage. This said, I'm going
> to rework the fifo emulation a bit to get closer to the RTAI behaviour.
The following patch against stock 2.2.2 improves the RTAI fifo
emulation. It reduces the overhead induced by sending highly scattered
data over a short period of time, and basically provides a much saner
implementation.
I'd be interested to have some feedback about this; I plan to change the
data streaming mode of message pipes from the native API the same way.
TIA,
--- include/rtai/fifo.h (revision 1562)
+++ include/rtai/fifo.h (working copy)
@@ -26,6 +26,8 @@
#if defined(__KERNEL__) || defined(__XENO_SIM__)
+#define RTFIFO_SYNCWAIT 0
+
typedef struct rt_fifo {
xnholder_t link; /* !< Link in flush queue. */
@@ -43,7 +45,7 @@
size_t fillsz; /* !< Bytes written to the buffer. */
- u_long flushable; /* !< Flush request flag. */
+ u_long status; /* !< Status information. */
int (*handler)(unsigned minor); /* !< Input handler. */
Index: ksrc/skins/rtai/fifo.c
===================================================================
--- ksrc/skins/rtai/fifo.c (revision 1562)
+++ ksrc/skins/rtai/fifo.c (working copy)
@@ -24,42 +24,6 @@
static RT_FIFO __fifo_table[CONFIG_XENO_OPT_PIPE_NRDEV];
-static int __fifo_flush_apc;
-
-static DECLARE_XNQUEUE(__fifo_flush_q);
-
-static inline ssize_t __fifo_flush(RT_FIFO * fifo)
-{
- ssize_t nbytes = fifo->fillsz + sizeof(xnpipe_mh_t);
- void *buffer = fifo->buffer;
-
- fifo->buffer = NULL;
- fifo->fillsz = 0;
-
- return xnpipe_send(fifo->minor, buffer, nbytes, XNPIPE_NORMAL);
- /* The buffer will be freed by the output handler. */
-}
-
-static void __fifo_flush_handler(void *cookie)
-{
- xnholder_t *holder;
- spl_t s;
-
- xnlock_get_irqsave(&nklock, s);
-
- /* Flush all fifos with pending data. */
-
- while ((holder = getq(&__fifo_flush_q)) != NULL) {
- RT_FIFO *fifo = link2rtfifo(holder);
- __clear_bit(0, &fifo->flushable);
- xnlock_put_irqrestore(&nklock, s);
- __fifo_flush(fifo);
- xnlock_get_irqsave(&nklock, s);
- }
-
- xnlock_put_irqrestore(&nklock, s);
-}
-
#define X_FIFO_HANDLER2(handler) ((int (*)(int, ...))(handler))
static int __fifo_exec_handler(int minor,
@@ -82,7 +46,8 @@
RT_FIFO *fifo = __fifo_table + minor;
int err;
- xnfree(mh);
+ fifo->fillsz = 0;
+ __clear_bit(RTFIFO_SYNCWAIT, &fifo->status);
if (retval >= 0 &&
fifo->handler != NULL &&
@@ -96,66 +61,96 @@
{
int i;
- __fifo_flush_apc =
- rthal_apc_alloc("fifo_flush", &__fifo_flush_handler, NULL);
-
- for (i = 0; i < CONFIG_XENO_OPT_PIPE_NRDEV; i++) {
+ for (i = 0; i < CONFIG_XENO_OPT_PIPE_NRDEV; i++)
inith(&__fifo_table[i].link);
- }
- if (__fifo_flush_apc < 0)
- return __fifo_flush_apc;
-
return 0;
}
void __rtai_fifo_pkg_cleanup(void)
{
- rthal_apc_free(__fifo_flush_apc);
}
int rtf_create(unsigned minor, int size)
{
+ int err, oldsize;
RT_FIFO *fifo;
- int err;
+ void *buffer;
spl_t s;
if (minor >= CONFIG_XENO_OPT_PIPE_NRDEV)
return -ENODEV;
+ /* <!> We do check for the calling context albeit the original
+ API doesn't, but we don't want the box to break for
+ whatever reason, so sanity takes precedence over
+ compatibility here. */
+
+ if (!xnpod_root_p())
+ return -EPERM;
+
+ if (!size)
+ return -EINVAL;
+
fifo = __fifo_table + minor;
err = xnpipe_connect(minor,
&__fifo_output_handler,
&__fifo_exec_handler, NULL, fifo);
+ if (err < 0 && err != -EBUSY)
+ return err;
+
xnlock_get_irqsave(&nklock, s);
++fifo->refcnt;
if (err == -EBUSY) {
- if (fifo->bufsz < size) {
- /* Resize the fifo on-the-fly if the specified buffer
size
- for the fifo is larger than the current one; we first
- flush any pending output. */
+ /* Resize the fifo on-the-fly if the specified buffer
+ size is different from the current one. */
- if (__test_and_clear_bit(0, &fifo->flushable)) {
- removeq(&__fifo_flush_q, &fifo->link);
- __fifo_flush(fifo);
- }
- /* Otherwise, there is no currently allocated buffer. */
- fifo->bufsz = size;
+ buffer = fifo->buffer;
+ oldsize = fifo->bufsz;
+
+ if (buffer == NULL) /* Conflicting create/resize requests. */
+ goto unlock_and_exit;
+
+ if (oldsize == size) {
+ err = minor;
+ goto unlock_and_exit; /* Same size, nop. */
}
+ fifo->buffer = NULL;
+ /* We must not keep the nucleus lock while running
+ * Linux services. */
+ xnlock_put_irqrestore(&nklock, s);
+ xnarch_sysfree(buffer, oldsize);
+ xnlock_get_irqsave(&nklock, s);
+ } else
+ fifo->buffer = NULL;
+
+ xnlock_put_irqrestore(&nklock, s);
+ buffer = xnarch_sysalloc(size + sizeof(xnpipe_mh_t));
+ xnlock_get_irqsave(&nklock, s);
+
+ if (buffer == NULL) {
+ if (!err)
+ /* First open, we need to disconnect upon
+ * error. Caveat: we still hold the lock while
+ * flushing the message pipe's input and
+ * output queues during disconnection. */
+ xnpipe_disconnect(minor);
+
+ --fifo->refcnt;
+ err = -ENOMEM;
goto unlock_and_exit;
}
- /* <!> We don't pre-allocate the internal buffer unlike the
- original API. */
- fifo->buffer = NULL;
+ err = minor;
+ fifo->buffer = buffer;
fifo->bufsz = size;
fifo->fillsz = 0;
- fifo->flushable = 0;
+ fifo->status = 0;
fifo->minor = minor;
fifo->handler = NULL;
@@ -163,18 +158,22 @@
xnlock_put_irqrestore(&nklock, s);
- return 0;
+ return err;
}
int rtf_destroy(unsigned minor)
{
+ int refcnt, err = 0, oldsize;
RT_FIFO *fifo;
- int refcnt;
+ void *buffer;
spl_t s;
if (minor >= CONFIG_XENO_OPT_PIPE_NRDEV)
return -ENODEV;
+ if (!xnpod_root_p())
+ return -EPERM;
+
fifo = __fifo_table + minor;
xnlock_get_irqsave(&nklock, s);
@@ -182,23 +181,33 @@
refcnt = fifo->refcnt;
if (refcnt == 0)
- refcnt = -EINVAL;
+ err = -EINVAL;
else {
if (--refcnt == 0) {
- if (__test_and_clear_bit(0, &fifo->flushable)) {
- removeq(&__fifo_flush_q, &fifo->link);
- xnfree(fifo->buffer);
+ buffer = fifo->buffer;
+ oldsize = fifo->bufsz;
+
+ if (buffer == NULL) { /* Fifo under (re-)construction.
*/
+ err = -EBUSY;
+ goto unlock_and_exit;
}
xnpipe_disconnect(minor);
+ fifo->refcnt = 0;
+ xnlock_put_irqrestore(&nklock, s);
+ xnarch_sysfree(buffer, oldsize);
+
+ return 0;
}
fifo->refcnt = refcnt;
}
+unlock_and_exit:
+
xnlock_put_irqrestore(&nklock, s);
- return refcnt;
+ return err;
}
int rtf_get(unsigned minor, void *buf, int count)
@@ -223,6 +232,11 @@
goto unlock_and_exit;
}
+ if (fifo->buffer == NULL) {
+ nbytes = -EBUSY;
+ goto unlock_and_exit;
+ }
+
nbytes = xnpipe_recv(minor, &msg, XN_NONBLOCK);
if (nbytes < 0) {
@@ -256,9 +270,9 @@
int rtf_put(unsigned minor, const void *buf, int count)
{
- ssize_t outbytes = 0;
+ ssize_t outbytes;
+ size_t fillptr;
RT_FIFO *fifo;
- size_t n;
spl_t s;
if (minor >= CONFIG_XENO_OPT_PIPE_NRDEV)
@@ -273,52 +287,37 @@
goto unlock_and_exit;
}
- while (count > 0) {
- if (count >= fifo->bufsz - fifo->fillsz)
- n = fifo->bufsz - fifo->fillsz;
- else
- n = count;
+ if (fifo->buffer == NULL) {
+ outbytes = -EBUSY;
+ goto unlock_and_exit;
+ }
- if (n == 0) {
- ssize_t err = __fifo_flush(fifo);
+ if (count > fifo->bufsz - fifo->fillsz)
+ outbytes = fifo->bufsz - fifo->fillsz;
+ else
+ outbytes = count;
- if (__test_and_clear_bit(0, &fifo->flushable))
- removeq(&__fifo_flush_q, &fifo->link);
+ if (outbytes > 0) {
+ fillptr = fifo->fillsz;
+ fifo->fillsz += outbytes;
- if (err < 0) {
- outbytes = err;
- goto unlock_and_exit;
- }
+ xnlock_put_irqrestore(&nklock, s);
- continue;
- }
+ memcpy(xnpipe_m_data(fifo->buffer) + fillptr,
+ (caddr_t) buf, outbytes);
- if (fifo->buffer == NULL) {
- fifo->buffer =
- (xnpipe_mh_t *)xnmalloc(fifo->bufsz +
- sizeof(xnpipe_mh_t));
+ xnlock_get_irqsave(&nklock, s);
- if (fifo->buffer == NULL) {
- outbytes = -ENOMEM;
- goto unlock_and_exit;
- }
-
- inith(&fifo->buffer->link);
- fifo->buffer->size = count;
+ if (__test_and_set_bit(RTFIFO_SYNCWAIT, &fifo->status))
+ outbytes = xnpipe_send_more(fifo->minor, fifo->buffer,
outbytes);
+ else {
+ outbytes = xnpipe_send(fifo->minor, fifo->buffer,
+ outbytes + sizeof(xnpipe_mh_t),
XNPIPE_NORMAL);
+ if (outbytes > 0)
+ outbytes -= sizeof(xnpipe_mh_t);
}
-
- memcpy(xnpipe_m_data(fifo->buffer) + fifo->fillsz,
- (caddr_t) buf + outbytes, n);
- fifo->fillsz += n;
- outbytes += n;
- count -= n;
}
- if (fifo->fillsz > 0 && !__test_and_set_bit(0, &fifo->flushable)) {
- appendq(&__fifo_flush_q, &fifo->link);
- rthal_apc_schedule(__fifo_flush_apc);
- }
-
unlock_and_exit:
xnlock_put_irqrestore(&nklock, s);
@@ -329,24 +328,13 @@
int rtf_reset(unsigned minor)
{
RT_FIFO *fifo;
- spl_t s;
if (minor >= CONFIG_XENO_OPT_PIPE_NRDEV)
return -ENODEV;
fifo = __fifo_table + minor;
+ fifo->fillsz = 0;
- xnlock_get_irqsave(&nklock, s);
-
- if (__test_and_clear_bit(0, &fifo->flushable)) {
- removeq(&__fifo_flush_q, &fifo->link);
- xnfree(fifo->buffer);
- fifo->buffer = NULL;
- fifo->fillsz = 0;
- }
-
- xnlock_put_irqrestore(&nklock, s);
-
return 0;
}
Index: include/nucleus/pipe.h
===================================================================
--- include/nucleus/pipe.h (revision 1562)
+++ include/nucleus/pipe.h (working copy)
@@ -130,6 +130,10 @@
size_t size,
int flags);
+ssize_t xnpipe_send_more(int minor,
+ struct xnpipe_mh *mh,
+ ssize_t size);
+
ssize_t xnpipe_recv(int minor,
struct xnpipe_mh **pmh,
xnticks_t timeout);
Index: ksrc/nucleus/pipe.c
===================================================================
--- ksrc/nucleus/pipe.c (revision 1562)
+++ ksrc/nucleus/pipe.c (working copy)
@@ -306,6 +306,12 @@
__clrbits(state->status, XNPIPE_KERN_CONN);
+ if (state->output_handler != NULL) {
+ while ((holder = getq(&state->outq)) != NULL)
+ state->output_handler(minor, link2mh(holder),
+ -EPIPE, state->cookie);
+ }
+
if (testbits(state->status, XNPIPE_USER_CONN)) {
while ((holder = getq(&state->inq)) != NULL) {
if (state->input_handler != NULL)
@@ -315,12 +321,6 @@
xnfree(link2mh(holder));
}
- if (state->output_handler != NULL) {
- while ((holder = getq(&state->outq)) != NULL)
- state->output_handler(minor, link2mh(holder),
- -EPIPE, state->cookie);
- }
-
if (xnsynch_destroy(&state->synchbase) == XNSYNCH_RESCHED)
xnpod_schedule();
@@ -369,11 +369,6 @@
return -EBADF;
}
- if (!testbits(state->status, XNPIPE_USER_CONN)) {
- xnlock_put_irqrestore(&nklock, s);
- return -EPIPE;
- }
-
inith(xnpipe_m_link(mh));
xnpipe_m_size(mh) = size - sizeof(*mh);
state->ionrd += xnpipe_m_size(mh);
@@ -383,6 +378,11 @@
else
appendq(&state->outq, xnpipe_m_link(mh));
+ if (!testbits(state->status, XNPIPE_USER_CONN)) {
+ xnlock_put_irqrestore(&nklock, s);
+ return (ssize_t) size;
+ }
+
if (testbits(state->status, XNPIPE_USER_WREAD)) {
/* Wake up the userland thread waiting for input
from the kernel side. */
@@ -403,6 +403,34 @@
return (ssize_t) size;
}
+ssize_t xnpipe_send_more(int minor, struct xnpipe_mh *mh, ssize_t size)
+{
+ xnpipe_state_t *state;
+ spl_t s;
+
+ if (minor < 0 || minor >= XNPIPE_NDEVS)
+ return -ENODEV;
+
+ if (size < 0)
+ return -EINVAL;
+
+ state = &xnpipe_states[minor];
+
+ xnlock_get_irqsave(&nklock, s);
+
+ if (!testbits(state->status, XNPIPE_KERN_CONN)) {
+ xnlock_put_irqrestore(&nklock, s);
+ return -EBADF;
+ }
+
+ xnpipe_m_size(mh) += size;
+ state->ionrd += size;
+
+ xnlock_put_irqrestore(&nklock, s);
+
+ return (ssize_t) size;
+}
+
ssize_t xnpipe_recv(int minor, struct xnpipe_mh **pmh, xnticks_t timeout)
{
xnpipe_state_t *state;
@@ -640,10 +668,11 @@
char *buf, size_t count, loff_t *ppos)
{
xnpipe_state_t *state = (xnpipe_state_t *)file->private_data;
+ size_t nbytes, inbytes;
struct xnpipe_mh *mh;
+ int sigpending, err;
xnholder_t *holder;
- int sigpending;
- ssize_t ret;
+ ssize_t ret = 0;
spl_t s;
if (!access_ok(VERIFY_WRITE, buf, count))
@@ -679,30 +708,51 @@
}
if (mh) {
- xnpipe_io_handler *handler = state->output_handler;
- void *cookie = state->cookie;
+ nbytes = xnpipe_m_size(mh); /* Cannot be zero */
+ inbytes = 0;
- ret = (ssize_t) xnpipe_m_size(mh); /* Cannot be zero */
- state->ionrd -= ret;
+ /*
+ * We allow more data to be appended to the current
+ * message bucket while its contents is being copied
+ * to the user buffer, therefore, we need to loop
+ * until:
+ * 1) all the data has been copied,
+ * 2) we consumed the user buffer space entirely.
+ */
- xnlock_put_irqrestore(&nklock, s);
+ do {
+ if (inbytes <= count) {
+ xnlock_put_irqrestore(&nklock, s);
+ /* More data could be appended while doing
this: */
+ err = __copy_to_user(buf + inbytes,
xnpipe_m_data(mh) + inbytes, nbytes);
+ xnlock_get_irqsave(&nklock, s);
+ if (err) {
+ ret = -EFAULT;
+ break;
+ }
+ inbytes += nbytes;
+ nbytes = xnpipe_m_size(mh) - inbytes;
+ } else {
+ /* Return buffer is too small - message is
lost. */
+ ret = -ENOBUFS;
+ break;
+ }
+ } while(nbytes > 0);
- if (ret <= count) {
- if (__copy_to_user(buf, xnpipe_m_data(mh), ret))
- ret = -EFAULT;
- } else
- /* Return buffer is too small - message is lost. */
- ret = -ENOBUFS;
+ if (ret < 0)
+ inbytes = xnpipe_m_size(mh);
+ else
+ ret = (ssize_t) inbytes;
- if (handler != NULL)
- ret =
- handler(xnminor_from_state(state), mh, ret, cookie);
- } else { /* Closed by peer. */
+ state->ionrd -= inbytes;
- xnlock_put_irqrestore(&nklock, s);
- ret = 0;
+ if (state->output_handler != NULL)
+ ret = state->output_handler(xnminor_from_state(state),
+ mh, ret, state->cookie);
}
+ xnlock_put_irqrestore(&nklock, s);
+
return ret;
}
@@ -999,6 +1049,7 @@
EXPORT_SYMBOL(xnpipe_connect);
EXPORT_SYMBOL(xnpipe_disconnect);
EXPORT_SYMBOL(xnpipe_send);
+EXPORT_SYMBOL(xnpipe_send_more);
EXPORT_SYMBOL(xnpipe_recv);
EXPORT_SYMBOL(xnpipe_inquire);
EXPORT_SYMBOL(xnpipe_setup);
--
Philippe.
_______________________________________________
Xenomai-help mailing list
[email protected]
https://mail.gna.org/listinfo/xenomai-help