Module: xenomai-forge Branch: master Commit: 407a38fcd1c5abd79e9691649e3bc86edc2cd629 URL: http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=407a38fcd1c5abd79e9691649e3bc86edc2cd629
Author: Philippe Gerum <r...@xenomai.org> Date: Wed Jun 19 23:34:20 2013 +0200 cobalt/kernel/pipe: turn queues into regular kernel lists --- include/cobalt/kernel/list.h | 3 + include/cobalt/kernel/pipe.h | 38 ++---- kernel/cobalt/pipe.c | 258 ++++++++++++++++++++++-------------------- kernel/drivers/ipc/xddp.c | 4 +- 4 files changed, 153 insertions(+), 150 deletions(-) diff --git a/include/cobalt/kernel/list.h b/include/cobalt/kernel/list.h index dbbc3b7..626f56f 100644 --- a/include/cobalt/kernel/list.h +++ b/include/cobalt/kernel/list.h @@ -50,4 +50,7 @@ do { \ __item; \ }) +#define list_next_entry(__item, __member) \ + list_entry((__item)->__member.next, typeof(*(__item)), __member) + #endif /* !_COBALT_KERNEL_LIST_H_ */ diff --git a/include/cobalt/kernel/pipe.h b/include/cobalt/kernel/pipe.h index f8eaccc..7976f7e 100644 --- a/include/cobalt/kernel/pipe.h +++ b/include/cobalt/kernel/pipe.h @@ -40,7 +40,6 @@ #ifdef __KERNEL__ -#include <cobalt/kernel/queue.h> #include <cobalt/kernel/synch.h> #include <cobalt/kernel/thread.h> #include <linux/types.h> @@ -61,18 +60,11 @@ #define XNPIPE_USER_ALL_READY \ (XNPIPE_USER_WREAD_READY|XNPIPE_USER_WSYNC_READY) -typedef struct xnpipe_mh { - - struct xnholder link; - unsigned size; - unsigned rdoff; - -} xnpipe_mh_t; - -static inline xnpipe_mh_t *link2mh(struct xnholder *ln) -{ - return ln ? container_of(ln, xnpipe_mh_t, link) : NULL; -} +struct xnpipe_mh { + size_t size; + size_t rdoff; + struct list_head link; +}; struct xnpipe_state; @@ -86,13 +78,13 @@ struct xnpipe_operations { }; struct xnpipe_state { + struct list_head slink; /* Link on sleep queue */ + struct list_head alink; /* Link on async queue */ - struct xnholder slink; /* Link on sleep queue */ - struct xnholder alink; /* Link on async queue */ -#define link2xnpipe(ln, fld) container_of(ln, struct xnpipe_state, fld) - - struct xnqueue inq; /* From user-space to kernel */ - struct xnqueue outq; /* From kernel to user-space */ + struct list_head inq; /* From user-space to kernel */ + int nrinq; + struct list_head outq; /* From kernel to user-space */ + int nroutq; struct xnsynch synchbase; struct xnpipe_operations ops; void *xstate; /* Extra state managed by caller */ @@ -104,7 +96,6 @@ struct xnpipe_state { wait_queue_head_t syncq; /* sync waiters */ int wcount; /* number of waiters on this minor */ size_t ionrd; - }; extern struct xnpipe_state xnpipe_states[]; @@ -132,12 +123,7 @@ ssize_t xnpipe_recv(int minor, int xnpipe_flush(int minor, int mode); -static inline struct xnholder *xnpipe_m_link(xnpipe_mh_t *mh) -{ - return &mh->link; -} - -static inline char *xnpipe_m_data(xnpipe_mh_t *mh) +static inline char *xnpipe_m_data(struct xnpipe_mh *mh) { return (char *)(mh + 1); } diff --git a/kernel/cobalt/pipe.c b/kernel/cobalt/pipe.c index 145529b..ea3c174 100644 --- a/kernel/cobalt/pipe.c +++ b/kernel/cobalt/pipe.c @@ -39,9 +39,12 @@ static int xnpipe_asyncsig = SIGIO; struct xnpipe_state xnpipe_states[XNPIPE_NDEVS]; #define XNPIPE_BITMAP_SIZE ((XNPIPE_NDEVS + BITS_PER_LONG - 1) / BITS_PER_LONG) + static unsigned long xnpipe_bitmap[XNPIPE_BITMAP_SIZE]; -struct xnqueue xnpipe_sleepq, xnpipe_asyncq; +static LIST_HEAD(xnpipe_sleepq); + +static LIST_HEAD(xnpipe_asyncq); int xnpipe_wakeup_apc; @@ -83,7 +86,7 @@ static inline void xnpipe_minor_free(int minor) static inline void xnpipe_enqueue_wait(struct xnpipe_state *state, int mask) { if (state->wcount != 0x7fffffff && state->wcount++ == 0) - appendq(&xnpipe_sleepq, &state->slink); + list_add_tail(&state->slink, &xnpipe_sleepq); __setbits(state->status, mask); } @@ -92,7 +95,7 @@ static inline void xnpipe_dequeue_wait(struct xnpipe_state *state, int mask) { if (testbits(state->status, mask)) if (--state->wcount == 0) { - removeq(&xnpipe_sleepq, &state->slink); + list_del(&state->slink); __clrbits(state->status, mask); } } @@ -102,7 +105,7 @@ static inline void xnpipe_dequeue_all(struct xnpipe_state *state, int mask) if (testbits(state->status, mask)) { if (state->wcount) { state->wcount = 0; - removeq(&xnpipe_sleepq, &state->slink); + list_del(&state->slink); __clrbits(state->status, mask); } } @@ -141,71 +144,76 @@ static inline void xnpipe_dequeue_all(struct xnpipe_state *state, int mask) static void xnpipe_wakeup_proc(void *cookie) { struct xnpipe_state *state; - struct xnholder *h, *nh; - u_long rbits; + unsigned long rbits; spl_t s; xnlock_get_irqsave(&nklock, s); - nh = getheadq(&xnpipe_sleepq); - while ((h = nh) != NULL) { - nh = nextq(&xnpipe_sleepq, h); - state = link2xnpipe(h, slink); - rbits = testbits(state->status, XNPIPE_USER_ALL_READY); - if (rbits) { - __clrbits(state->status, rbits); - /* - * We could be switched out as a result of - * waking up a waiter, so we need the - * housekeeping and release the nklock before - * calling wake_up_interruptible(). - */ - if ((rbits & XNPIPE_USER_WREAD_READY) != 0) { - if (waitqueue_active(&state->readq)) { - xnlock_put_irqrestore(&nklock, s); - wake_up_interruptible(&state->readq); - xnlock_get_irqsave(&nklock, s); - } + /* + * NOTE: sleepers might enter/leave the queue while we don't + * hold the nklock in these wakeup loops. So we iterate over + * each sleeper list until we find no more candidate for + * wakeup after an entire scan, redoing the scan from the list + * head otherwise. + */ + for (;;) { + if (list_empty(&xnpipe_sleepq)) + goto check_async; + + state = list_first_entry(&xnpipe_sleepq, struct xnpipe_state, slink); + + for (;;) { + rbits = testbits(state->status, XNPIPE_USER_ALL_READY); + if (rbits) + break; + if (list_is_last(&state->slink, &xnpipe_sleepq)) + goto check_async; + state = list_next_entry(state, slink); + } + + __clrbits(state->status, rbits); + + if ((rbits & XNPIPE_USER_WREAD_READY) != 0) { + if (waitqueue_active(&state->readq)) { + xnlock_put_irqrestore(&nklock, s); + wake_up_interruptible(&state->readq); + xnlock_get_irqsave(&nklock, s); } - if ((rbits & XNPIPE_USER_WSYNC_READY) != 0) { - if (waitqueue_active(&state->syncq)) { - xnlock_put_irqrestore(&nklock, s); - wake_up_interruptible(&state->syncq); - xnlock_get_irqsave(&nklock, s); - } + } + if ((rbits & XNPIPE_USER_WSYNC_READY) != 0) { + if (waitqueue_active(&state->syncq)) { + xnlock_put_irqrestore(&nklock, s); + wake_up_interruptible(&state->syncq); + xnlock_get_irqsave(&nklock, s); } -#ifdef CONFIG_SMP - /* - * A waiter may have entered/left the queue - * from another CPU, so we need to refetch the - * sleep queue head to be safe. - */ - nh = getheadq(&xnpipe_sleepq); -#endif /* CONFIG_SMP */ } } +check_async: /* * Scan the async queue, sending the proper signal to * subscribers. */ - nh = getheadq(&xnpipe_asyncq); + for (;;) { + if (list_empty(&xnpipe_asyncq)) + goto out; - while ((h = nh) != NULL) { - nh = nextq(&xnpipe_asyncq, h); - state = link2xnpipe(h, alink); + state = list_first_entry(&xnpipe_asyncq, struct xnpipe_state, alink); - if (testbits(state->status, XNPIPE_USER_SIGIO)) { - __clrbits(state->status, XNPIPE_USER_SIGIO); - xnlock_put_irqrestore(&nklock, s); - kill_fasync(&state->asyncq, xnpipe_asyncsig, POLL_IN); - xnlock_get_irqsave(&nklock, s); -#ifdef CONFIG_SMP - nh = getheadq(&xnpipe_asyncq); -#endif + for (;;) { + if (testbits(state->status, XNPIPE_USER_SIGIO)) + break; + if (list_is_last(&state->alink, &xnpipe_asyncq)) + goto out; + state = list_next_entry(state, alink); } - } + __clrbits(state->status, XNPIPE_USER_SIGIO); + xnlock_put_irqrestore(&nklock, s); + kill_fasync(&state->asyncq, xnpipe_asyncsig, POLL_IN); + xnlock_get_irqsave(&nklock, s); + } +out: xnlock_put_irqrestore(&nklock, s); } @@ -215,42 +223,44 @@ static inline void xnpipe_schedule_request(void) /* hw IRQs off */ } static inline ssize_t xnpipe_flush_bufq(void (*fn)(void *buf, void *xstate), - struct xnqueue *q, + struct list_head *q, void *xstate) { - struct xnpipe_mh *mh; - struct xnholder *h; + struct xnpipe_mh *mh, *tmp; ssize_t n = 0; + if (list_empty(q)) + return 0; + /* Queue is private, no locking is required. */ - while ((h = getq(q)) != NULL) { - mh = link2mh(h); + list_for_each_entry_safe(mh, tmp, q, link) { + list_del(&mh->link); n += xnpipe_m_size(mh); fn(mh, xstate); } - /* We must return the overall count of bytes flushed. */ + /* Return the overall count of bytes flushed. */ return n; } /* * Move the specified queue contents to a private queue, then call the - * flush handler to purge it. The latter is run without locking. + * flush handler to purge it. The latter runs without locking. * Returns the number of bytes flushed. Must be entered with nklock * held, interrupts off. */ #define xnpipe_flushq(__state, __q, __f, __s) \ ({ \ - struct xnqueue __privq; \ - ssize_t n; \ + LIST_HEAD(__privq); \ + ssize_t __n; \ \ - initq(&__privq); \ - moveq(&__privq, &(state)->__q); \ + list_splice_init(&(state)->__q, &__privq); \ + (__state)->nr ## __q = 0; \ xnlock_put_irqrestore(&nklock, (__s)); \ - n = xnpipe_flush_bufq((__state)->ops.__f, &__privq, (__state)->xstate); \ + __n = xnpipe_flush_bufq((__state)->ops.__f, &__privq, (__state)->xstate); \ xnlock_get_irqsave(&nklock, (__s)); \ \ - n; \ + __n; \ }) static void *xnpipe_default_alloc_ibuf(size_t size, void *xstate) @@ -440,15 +450,16 @@ ssize_t xnpipe_send(int minor, struct xnpipe_mh *mh, size_t size, int flags) return -EBADF; } - inith(xnpipe_m_link(mh)); xnpipe_m_size(mh) = size - sizeof(*mh); xnpipe_m_rdoff(mh) = 0; state->ionrd += xnpipe_m_size(mh); if (flags & XNPIPE_URGENT) - prependq(&state->outq, xnpipe_m_link(mh)); + list_add(&mh->link, &state->outq); else - appendq(&state->outq, xnpipe_m_link(mh)); + list_add_tail(&mh->link, &state->outq); + + state->nroutq++; if (!testbits(state->status, XNPIPE_USER_CONN)) { xnlock_put_irqrestore(&nklock, s); @@ -510,8 +521,9 @@ EXPORT_SYMBOL_GPL(xnpipe_mfixup); ssize_t xnpipe_recv(int minor, struct xnpipe_mh **pmh, xnticks_t timeout) { struct xnpipe_state *state; - struct xnholder *h; - xnthread_t *thread; + struct xnpipe_mh *mh; + xnflags_t info; + xntmode_t mode; ssize_t ret; spl_t s; @@ -530,43 +542,51 @@ ssize_t xnpipe_recv(int minor, struct xnpipe_mh **pmh, xnticks_t timeout) goto unlock_and_exit; } - thread = xnpod_current_thread(); + /* + * If we received a relative timespec, rescale it to an + * absolute time value based on the monotonic clock. + */ + mode = XN_RELATIVE; + if (timeout != XN_NONBLOCK && timeout != XN_INFINITE) { + mode = XN_ABSOLUTE; + timeout += xnclock_read_monotonic(); + } + + for (;;) { + if (!list_empty(&state->inq)) + break; - while ((h = getq(&state->inq)) == NULL) { if (timeout == XN_NONBLOCK) { ret = -EWOULDBLOCK; goto unlock_and_exit; } - xnsynch_sleep_on(&state->synchbase, timeout, XN_RELATIVE); - - if (xnthread_test_info(thread, XNTIMEO)) { + info = xnsynch_sleep_on(&state->synchbase, timeout, mode); + if (info & XNTIMEO) { ret = -ETIMEDOUT; goto unlock_and_exit; } - if (xnthread_test_info(thread, XNBREAK)) { + if (info & XNBREAK) { ret = -EINTR; goto unlock_and_exit; } - if (xnthread_test_info(thread, XNRMID)) { + if (info & XNRMID) { ret = -EIDRM; goto unlock_and_exit; } - - /* remaining timeout */ - timeout = xnthread_timeout(thread); } - *pmh = link2mh(h); - - ret = (ssize_t) xnpipe_m_size(*pmh); + mh = list_get_entry(&state->inq, struct xnpipe_mh, link); + *pmh = mh; + state->nrinq--; + ret = (ssize_t)xnpipe_m_size(mh); if (testbits(state->status, XNPIPE_USER_WSYNC)) { __setbits(state->status, XNPIPE_USER_WSYNC_READY); xnpipe_schedule_request(); } - unlock_and_exit: +unlock_and_exit: xnlock_put_irqrestore(&nklock, s); @@ -592,7 +612,7 @@ int xnpipe_flush(int minor, int mode) return -EBADF; } - msgcount = countq(&state->outq) + countq(&state->inq); + msgcount = state->nroutq + state->nrinq; if (mode & XNPIPE_OFLUSH) state->ionrd -= xnpipe_flushq(state, outq, free_obuf, s); @@ -601,7 +621,7 @@ int xnpipe_flush(int minor, int mode) xnpipe_flushq(state, inq, free_ibuf, s); if (testbits(state->status, XNPIPE_USER_WSYNC) && - msgcount > countq(&state->outq) + countq(&state->inq)) { + msgcount > state->nroutq + state->nrinq) { __setbits(state->status, XNPIPE_USER_WSYNC_READY); xnpipe_schedule_request(); } @@ -710,7 +730,7 @@ static int xnpipe_release(struct inode *inode, struct file *file) state->ops.input(NULL, -EPIPE, state->xstate); if (state->asyncq) { /* Clear the async queue */ - removeq(&xnpipe_asyncq, &state->alink); + list_del(&state->alink); __clrbits(state->status, XNPIPE_USER_SIGIO); xnlock_put_irqrestore(&nklock, s); fasync_helper(-1, file, 0, &state->asyncq); @@ -736,7 +756,6 @@ static ssize_t xnpipe_read(struct file *file, int sigpending, err = 0; size_t nbytes, inbytes; struct xnpipe_mh *mh; - struct xnholder *h; ssize_t ret; spl_t s; @@ -753,26 +772,24 @@ static ssize_t xnpipe_read(struct file *file, * Queue probe and proc enqueuing must be seen atomically, * including from the Xenomai side. */ - h = getq(&state->outq); - mh = link2mh(h); - - if (mh == NULL) { + if (list_empty(&state->outq)) { if (file->f_flags & O_NONBLOCK) { xnlock_put_irqrestore(&nklock, s); return -EWOULDBLOCK; } sigpending = xnpipe_wait(state, XNPIPE_USER_WREAD, s, - !emptyq_p(&state->outq)); - h = getq(&state->outq); - mh = link2mh(h); + !list_empty(&state->outq)); - if (mh == NULL) { + if (list_empty(&state->outq)) { xnlock_put_irqrestore(&nklock, s); return sigpending ? -ERESTARTSYS : 0; } } + mh = list_get_entry(&state->outq, struct xnpipe_mh, link); + state->nroutq--; + /* * We allow more data to be appended to the current message * bucket while its contents is being copied to the user @@ -793,11 +810,12 @@ static ssize_t xnpipe_read(struct file *file, break; xnlock_put_irqrestore(&nklock, s); + /* More data could be appended while doing this: */ - err = - __copy_to_user(buf + inbytes, - xnpipe_m_data(mh) + xnpipe_m_rdoff(mh), - nbytes); + err = __copy_to_user(buf + inbytes, + xnpipe_m_data(mh) + xnpipe_m_rdoff(mh), + nbytes); + xnlock_get_irqsave(&nklock, s); if (err) { @@ -812,9 +830,10 @@ static ssize_t xnpipe_read(struct file *file, state->ionrd -= inbytes; ret = inbytes; - if (xnpipe_m_size(mh) > xnpipe_m_rdoff(mh)) - prependq(&state->outq, &mh->link); - else { + if (xnpipe_m_size(mh) > xnpipe_m_rdoff(mh)) { + list_add(&mh->link, &state->outq); + state->nroutq++; + } else { /* * We always want to fire the output handler because * whatever the error state is for userland (e.g @@ -853,14 +872,13 @@ static ssize_t xnpipe_write(struct file *file, xnlock_get_irqsave(&nklock, s); - retry: - +retry: if (!testbits(state->status, XNPIPE_KERN_CONN)) { xnlock_put_irqrestore(&nklock, s); return -EPIPE; } - pollnum = countq(&state->inq) + countq(&state->outq); + pollnum = state->nrinq + state->nroutq; xnlock_put_irqrestore(&nklock, s); mh = state->ops.alloc_ibuf(count + sizeof(*mh), state->xstate); @@ -873,15 +891,13 @@ static ssize_t xnpipe_write(struct file *file, xnlock_get_irqsave(&nklock, s); if (xnpipe_wait(state, XNPIPE_USER_WSYNC, s, - pollnum > - countq(&state->inq) + countq(&state->outq))) { + pollnum > state->nrinq + state->nroutq)) { xnlock_put_irqrestore(&nklock, s); return -ERESTARTSYS; } goto retry; } - inith(xnpipe_m_link(mh)); xnpipe_m_size(mh) = count; xnpipe_m_rdoff(mh) = 0; @@ -892,7 +908,8 @@ static ssize_t xnpipe_write(struct file *file, xnlock_get_irqsave(&nklock, s); - appendq(&state->inq, &mh->link); + list_add_tail(&mh->link, &state->inq); + state->nrinq++; /* Wake up a Xenomai sleeper if any. */ if (xnsynch_wakeup_one_sleeper(&state->synchbase)) @@ -905,9 +922,9 @@ static ssize_t xnpipe_write(struct file *file, } if (file->f_flags & O_SYNC) { - if (!emptyq_p(&state->inq)) { + if (!list_empty(&state->inq)) { if (xnpipe_wait(state, XNPIPE_USER_WSYNC, s, - emptyq_p(&state->inq))) + list_empty(&state->inq))) count = -ERESTARTSYS; } } @@ -1009,12 +1026,12 @@ static int xnpipe_fasync(int fd, struct file *file, int on) if (state->asyncq) { if (!queued) { xnlock_get_irqsave(&nklock, s); - appendq(&xnpipe_asyncq, &state->alink); + list_add_tail(&state->alink, &xnpipe_asyncq); xnlock_put_irqrestore(&nklock, s); } } else if (queued) { xnlock_get_irqsave(&nklock, s); - removeq(&xnpipe_asyncq, &state->alink); + list_del(&state->alink); xnlock_put_irqrestore(&nklock, s); } @@ -1036,7 +1053,7 @@ static unsigned xnpipe_poll(struct file *file, poll_table *pt) else r_mask |= POLLHUP; - if (!emptyq_p(&state->outq)) + if (!list_empty(&state->outq)) r_mask |= (POLLIN | POLLRDNORM); else /* @@ -1077,17 +1094,14 @@ int xnpipe_mount(void) for (state = &xnpipe_states[0]; state < &xnpipe_states[XNPIPE_NDEVS]; state++) { - inith(&state->slink); - inith(&state->alink); state->status = 0; state->asyncq = NULL; - initq(&state->inq); - initq(&state->outq); + INIT_LIST_HEAD(&state->inq); + state->nrinq = 0; + INIT_LIST_HEAD(&state->outq); + state->nroutq = 0; } - initq(&xnpipe_sleepq); - initq(&xnpipe_asyncq); - xnpipe_class = class_create(THIS_MODULE, "rtpipe"); if (IS_ERR(xnpipe_class)) { printk(XENO_ERR "error creating rtpipe class, err=%ld\n", diff --git a/kernel/drivers/ipc/xddp.c b/kernel/drivers/ipc/xddp.c index 50a3a22..1be918c 100644 --- a/kernel/drivers/ipc/xddp.c +++ b/kernel/drivers/ipc/xddp.c @@ -184,7 +184,7 @@ static void __xddp_free_handler(void *buf, void *skarg) /* nklock free */ rtdm_lock_put_irqrestore(&sk->lock, lockctx); } -static void __xddp_output_handler(xnpipe_mh_t *mh, void *skarg) /* nklock held */ +static void __xddp_output_handler(struct xnpipe_mh *mh, void *skarg) /* nklock held */ { struct xddp_socket *sk = skarg; @@ -192,7 +192,7 @@ static void __xddp_output_handler(xnpipe_mh_t *mh, void *skarg) /* nklock held * sk->monitor(sk->fd, XDDP_EVTOUT, xnpipe_m_size(mh)); } -static int __xddp_input_handler(xnpipe_mh_t *mh, int retval, void *skarg) /* nklock held */ +static int __xddp_input_handler(struct xnpipe_mh *mh, int retval, void *skarg) /* nklock held */ { struct xddp_socket *sk = skarg; _______________________________________________ Xenomai-git mailing list Xenomai-git@xenomai.org http://www.xenomai.org/mailman/listinfo/xenomai-git