Hi Dimitry,
interesting implementation. Is your queue linearizable, e.g. can it be used
as FIFO?
Regards
Ben
On Friday, November 12, 2010 at 9:42:14 AM UTC+1, Dmitriy Vyukov wrote:
>
> On Nov 12, 4:05 am, Pierre Habouzit <pierre.habou...@intersec-
> group.com> wrote:
> > I'm using a work-stealing scheduler, with a per thread bounded dequeue
> > that can be pushed/poped at the back from the local thread, and popped
> > from the front by thieves. Yes this means that my tasks run in LIFO
> > order most of the time, and I'm fine with that.
> >
> > When a task is queued in a full dequeue then the task is run
> > immediately instead of beeing queued, and when thieves cannot steal
> > anything they are blocked using an eventcount.
> >
> > In addition to that, I'd like to have serial queues of tasks that
> > ensure that:
> > - tasks are run in a full FIFO fashion;
> > - at most one of their task can be run at the same time.
> > Though tasks from different serial queues can be run at the same time
> > without problem. The idea is that a queue allows to protect a resource
> > (think file, GUI, ...).
> >
> > My requirements are that:
> > - queues are registered at most once in the scheduler at any time;
> > - non empty queues are always registered within the scheduler;
> > - empty queues are not registered in the scheduler (though it is
> > acceptable that just after a queue becomes empty it remains registered
> > when a race occurs, as long as it eventually finds its way out)
> >
> > I'm using a setup very similar to the low overhead mpsc queue, that
> > I've tweaked this way:
> >
> > enum {
> > QUEUE_EMPTY,
> > QUEUE_NOT_EMPTY,
> > QUEUE_RUNNING,
> >
> > };
> >
> > struct mpsc_queue_t {
> > ... /* the usual stuff */
> > volatile unsigned state;
> > job_t run_queue;
> >
> > };
> >
> > struct mpsc_node_t {
> > ... /* the usual stuff */
> > job_t *job;
> >
> > };
> >
> > mpsc_node_t *mpsc_queue_push(mpsc_queue_t *q, job_t *job)
> > {
> > mpsc_node_t *n = mpsc_node_alloc(); /* uses a per-thread cache */
> >
> > n->job = job;
> > mpsc_queue_push(q, n);
> > if (XCHG(&q->state, QUEUE_NOT_EMPTY)) == QUEUE_EMPTY)
> > schedule_job(&q->run_queue); /* points to queue_run */
> >
> > }
> >
> > void queue_run(job_t *job)
> > {
> > mpsc_queue_t *q = container_of(job, mpsc_queue_t, run_queue);
> > mpsc_node_t *n;
> >
> > do {
> > XCHG(&q->state, QUEUE_RUNNING);
> >
> > while ((n = mpsc_queue_pop(q))) {
> > job_t *job = n->job;
> >
> > mpsc_node_free(n); /* releases in cache first, so that
> > it's hot if quickly reused */
> > job_run(job);
> > }
> > } while (!CAS(&q->state, QUEUE_RUNNING, QUEUE_EMPTY));
> >
> > }
> >
> > I think this works, but I'm also pretty sure that this is spoiling the
> > underlying mpsc a lot, because I create contention on q->state for
> > everyone involved which is ugly, not to mention the ugly loop in the
> > consumer. So, is there someone that has any idea on how to make that
> > idea better, or does it looks like it's good enough ?
> >
> > Note: since the scheduler uses a LIFO that registering into the
> > scheduler from queue_run() will just do the same as the loop on the
> > CAS because jobs run in LIFO order most of the time, so it just hides
> > the loop in the scheduler, and it still doesn't fix the contention due
> > to the additionnal XCHG in the _push operation :/
>
> Hi Pierre,
>
> Yes, it's a way too much overhead for RUNNABLE/NONRUNNABLE detection.
> It can be done with ~1 cycle overhead for producers and 0 cycle
> overhead for consumer's fast-path. The idea is that we only need to
> track state changes between RUNNABLE<->NONRUNNABLE, moreover on
> producer's side it can be combined with the XCHG in enqueue(), while
> on consumer's side we need an additional RMW which is executed only
> when queue become empty. The key is that we can use low bit of queue
> tail pointer as RUNNABLE/NONRUNNABLE flag.
>
> Here is the code (and yes, here I still call queue's tail as 'head'):
>
> template<typename T>
> T XCHG(T volatile* dest, T value)
> {
> return (T)_InterlockedExchange((long*)dest, (long)value);
> }
>
> template<typename T>
> bool CAS(T volatile* dest, T cmp, T xchg)
> {
> return cmp == (T)_InterlockedCompareExchange((long*)dest,
> (long)xchg, (long)cmp);
> }
>
> struct mpscq_node_t
> {
> mpscq_node_t* volatile next;
> void* state;
> };
>
> struct mpscq_t
> {
> mpscq_node_t* volatile head;
> mpscq_node_t* tail;
> };
>
> void mpscq_create(mpscq_t* self, mpscq_node_t* stub)
> {
> stub->next = 0;
> self->tail = stub;
> // mark it as empty
> self->head = (mpscq_node_t*)((uintptr_t)stub | 1);
> }
>
> bool spscq_push(mpscq_t* self, mpscq_node_t* n)
> {
> n->next = 0;
> // serialization-point wrt producers
> mpscq_node_t* prev = XCHG(&self->head, n);
> // only 2 AND instructions on fast-path added
> bool was_empty = ((uintptr_t)prev & 1) != 0;
> prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
> prev->next = n; // serialization-point wrt consumer
> return was_empty;
> }
>
> mpscq_node_t* spscq_pop(mpscq_t* self)
> {
> mpscq_node_t* tail = self->tail;
> l_retry:
> // fast-path is not modified
> mpscq_node_t* next = tail->next; // serialization-point wrt
> producers
> if (next)
> {
> self->tail = next;
> tail->state = next->state;
> return tail;
> }
> mpscq_node_t* head = self->head;
> // if head is marked as empty,
> // then the queue had not be scheduled in the first place
> assert(((uintptr_t)head & 1) == 0);
> if (tail != head)
> {
> // there is just a temporal gap -> wait for the producer to
> update 'next' link
> while (tail->next == 0)
> _mm_pause();
> goto l_retry;
> }
> else
> {
> // the queue seems to be really empty -> try to mark it as
> empty
> mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
> if (CAS(&self->head, tail, xchg))
> // if we succesfully marked it as empty -> return
> // the following producer will re-schedule the queue for
> execution
> return 0;
> // producer had enqueued new item
> goto l_retry;
> }
> }
>
> int main()
> {
> mpscq_t q;
> mpscq_create(&q, new mpscq_node_t);
>
> mpscq_node_t* n = 0;
> //n = spscq_pop(&q);
>
> spscq_push(&q, new mpscq_node_t);
> n = spscq_pop(&q);
> n = spscq_pop(&q);
>
> spscq_push(&q, new mpscq_node_t);
> spscq_push(&q, new mpscq_node_t);
> n = spscq_pop(&q);
> n = spscq_pop(&q);
> n = spscq_pop(&q);
>
> spscq_push(&q, new mpscq_node_t);
> spscq_push(&q, new mpscq_node_t);
> n = spscq_pop(&q);
> spscq_push(&q, new mpscq_node_t);
> n = spscq_pop(&q);
> n = spscq_pop(&q);
> n = spscq_pop(&q);
> }
>
> Now, whenever spscq_push() returns true, the thread has to put it into
> scheduler for execution. Once the queue scheduled for execution, a
> thread pops from the queue until it returns 0.
> The nice thing is that consumer is not obliged to process all items in
> a queue. It can process for example 100 items, and then just return
> the queue to scheduler, and switch to other starving queues.
> Also with regard to that _mm_pause() loop in pop(), instead of waiting
> for producer to restore 'next' pointer, consumer can decide to just
> return the queue to scheduler. Then handle some other queues, and when
> he will back the first queue, the 'next' link will be most likely
> already restored. So no senseless waiting.
>
>
> --
> Dmitriy V'jukov
--
---
You received this message because you are subscribed to the Google Groups
"Scalable Synchronization Algorithms" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/lock-free/db978b34-b943-4313-8ef5-fa9b26804675%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.