Hi all,

I found some races in the AIO call back path which is the root cause for
a few problems in the AIO code. The race is between the aio call back
path ( executed by softirqs ) and the aio_run_iocb().

(1) Suppose a retry returned -EIOCBRETRY and wait has been queued.

Now, whenever the wait event has been completed, the aio_wake_function()
is invoked by the softirqs. aio_wake_function deletes the wait entry
(iocb->ki_wait.task_list) and invokes kick_iocb() to kick the iocb and
queue it back to the run_list.

Consider a situation like:

SOFTIRQ:
        list_del_init(&wait->task_list);
          ## SOFTIRQ gets scheduled or PROCESS executes on another CPU ##
        kick_iocb(iocb);
PROCESS:
        /*
         * Issue an additional retry to avoid waiting forever if
         * no waits were queued (e.g. in case of a short read).
         */
        if (list_empty(&iocb->ki_wait.task_list))
        kiocbSetKicked(iocb);

So, PROCESS might see the deleted(empty) wait list
(iocb->ki_wait.task_list) and will kick and queue it up and it will be
retried again. Now if the next retry is going to happen before SOFTIRQ
gets a chance to run, we are running into problems.

There are two possiblities:

(a) The iocb may get completed.
        If this happens before the SOFTIRQ enters kick_iocb(iocb), we are about
to kick an iocb which is no more valid. This might cause a Kernel Oops
while trying,
        spin_lock_irqsave(&ctx->ctx_lock, flags);
since the iocb->ki_ctx may be invalid.

(b) The iocb might encounter another wait condition.
        This case iocb would be queued for some events. This would cause the
SOFTIRQ hit
        WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
in queue_kicked_iocb().

(2) Similar races occur due to Kicking the iocb.

i.e,
SOFTIRQ: [in kick_iocb()]
                if(!kiocbTryKick(iocb)) {
        # If we get scheduled here. And PROCESS reaches as below #
                        queue_kicked_iocb(iocb);
PROCESS: [ in aio_run_iocb() ]
                /* will make __queue_kicked_iocb succeed from here on */
                INIT_LIST_HEAD(&iocb->ki_run_list);
                /* we must queue the next iteration ourselves, if it
                 * has already been kicked */
                if (kiocbIsKicked(iocb)) { <- Will be true see ( kicked by 
kick_iocb)
                        __queue_kicked_iocb(iocb);
                }

Thus iocb will get queued and may be retried. This again can cause the
problems described above.

Conclusion
===========

 From this I conclude that the following operations in call back path
for async iocbs should be atomic.

(*) Deletion of the wait queue entry.
(*) Kicking the iocb.
(*) Queue back to run_list. ( which is already atomic with ctx_lock held ).

Also the check for waits being queued in aio_run_iocb() should be done
with the ctx_lock held.

Solution
========
I have created a patch which makes above operations to be done with the
iocb->ki_ctx->ctx_lock held. Also, I have moved the check :

  > if (list_empty(&iocb->ki_wait.task_list))
  >  kiocbSetKicked(iocb);

to be done under the ctx_lock held.

Since we already holds a lock before calling queue_kicked_iocb(), there
is no need for a queue_kicked_iocb anymore. So I have renamed the
__queue_kicked_iocb to queue_kicked_iocb.

Testing
========
I tested the proposed patch in 2.6.13-rc3-mm1 on a 2-way Intel Xeon(with HT) 
with aio-stress and fsx-linux test cases.
The tests ran fine.


Comments ?

regards,

Suzuki K P
Linux Technology Centre
IBM India Software Labs
Bangalore.

"Sorrow keeps u Human ,Failure Keeps u Humble,Success keeps u Glowing.
   But only God Keeps u Going..... "



--- linux-2.6.13-rc3/fs/aio.c.old       2005-07-27 10:19:21.000000000 -0700
+++ linux-2.6.13-rc3/fs/aio.c   2005-07-27 10:22:27.000000000 -0700
@@ -609,7 +609,7 @@ static void unuse_mm(struct mm_struct *m
  * Should be called with the spin lock iocb->ki_ctx->ctx_lock
  * held
  */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
+static inline int queue_kicked_iocb(struct kiocb *iocb)
 {
        struct kioctx *ctx = iocb->ki_ctx;
 
@@ -724,13 +724,6 @@ static ssize_t aio_run_iocb(struct kiocb
                        aio_complete(iocb, ret, 0);
                        /* must not access the iocb after this */
                }
-       } else {
-               /*
-                * Issue an additional retry to avoid waiting forever if
-                * no waits were queued (e.g. in case of a short read).
-                */
-               if (list_empty(&iocb->ki_wait.task_list))
-                       kiocbSetKicked(iocb);
        }
 out:
        spin_lock_irq(&ctx->ctx_lock);
@@ -741,17 +734,23 @@ out:
                 * and know that there is more left to go,
                 * this is where we let go so that a subsequent
                 * "kick" can start the next iteration
+                *
+                * Issue an additional retry to avoid waiting forever if
+                * no waits were queued (e.g. in case of a short read).
+                * (Should be done with ctx_lock held.)
                 */
 
-               /* will make __queue_kicked_iocb succeed from here on */
+               if (list_empty(&iocb->ki_wait.task_list))
+                       kiocbSetKicked(iocb);
+               /* will make queue_kicked_iocb succeed from here on */
                INIT_LIST_HEAD(&iocb->ki_run_list);
                /* we must queue the next iteration ourselves, if it
                 * has already been kicked */
                if (kiocbIsKicked(iocb)) {
-                       __queue_kicked_iocb(iocb);
+                       queue_kicked_iocb(iocb);
 
                        /*
-                        * __queue_kicked_iocb will always return 1 here, 
because
+                        * queue_kicked_iocb will always return 1 here, because
                         * iocb->ki_run_list is empty at this point so it should
                         * be safe to unconditionally queue the context into the
                         * work queue.
@@ -870,21 +869,31 @@ static void aio_kick_handler(void *data)
 
 
 /*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
+ * Kicking an async iocb.
+ * The following operations for the async iocbs should be atomic
+ * to avoid races with the aio_run_iocb() code.
+ * (1) Deleting the wait queue entry.
+ * (2) Kicking the iocb.
+ * (3) Queue the iocb back to run_list.
+ * Holds the ctx->ctx_lock to avoid races.
  */
-static void queue_kicked_iocb(struct kiocb *iocb)
+static void kick_async_iocb(struct kiocb *iocb)
 {
        struct kioctx   *ctx = iocb->ki_ctx;
        unsigned long flags;
        int run = 0;
-
-       WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
-
+       
        spin_lock_irqsave(&ctx->ctx_lock, flags);
-       run = __queue_kicked_iocb(iocb);
+       list_del_init(&iocb->ki_wait.task_list);
+       /* If its already kicked we shouldn't queue it again */
+       if (!kiocbTryKick(iocb)) {
+               run = queue_kicked_iocb(iocb);
+       }
        spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+
+       /* Activate the aio worker queue if we have successfully queued
+        * the iocb, so that it can be processed
+        */
        if (run)
                aio_queue_work(ctx);
 }
@@ -901,15 +910,13 @@ void fastcall kick_iocb(struct kiocb *io
        /* sync iocbs are easy: they can only ever be executing from a 
         * single context. */
        if (is_sync_kiocb(iocb)) {
+               list_del_init(&iocb->ki_wait.task_list);
                kiocbSetKicked(iocb);
                wake_up_process(iocb->ki_obj.tsk);
                return;
-       }
-
-       /* If its already kicked we shouldn't queue it again */
-       if (!kiocbTryKick(iocb)) {
-               queue_kicked_iocb(iocb);
-       }
+       } else 
+               kick_async_iocb(iocb);
+       
 }
 EXPORT_SYMBOL(kick_iocb);
 
@@ -1461,7 +1468,6 @@ static int aio_wake_function(wait_queue_
 {
        struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
 
-       list_del_init(&wait->task_list);
        kick_iocb(iocb);
        return 1;
 }

Reply via email to