On 2019/08/06 7:05, Damien Le Moal wrote:
> On 2019/08/06 6:59, Damien Le Moal wrote:
>> On 2019/08/06 6:28, Jens Axboe wrote:
>>> On 8/5/19 2:27 PM, Damien Le Moal wrote:
>>>> On 2019/08/06 6:26, Jens Axboe wrote:
>>>>>> In any case, looking again at this code, it looks like there is a
>>>>>> problem with dio->size being incremented early, even for fragments
>>>>>> that get BLK_QC_T_EAGAIN, because dio->size is being used in
>>>>>> blkdev_bio_end_io(). So an incorrect size can be reported to user
>>>>>> space in that case on completion (e.g. large asynchronous no-wait dio
>>>>>> that cannot be issued in one go).
>>>>>>
>>>>>> So maybe something like this ? (completely untested)
>>>>>
>>>>> I think that looks pretty good, I like not double accounting with
>>>>> this_size and dio->size, and we retain the old style ordering for the
>>>>> ret value.
>>>>
>>>> Do you want a proper patch with real testing backup ? I can send that
>>>> later today.
>>>
>>> Yeah that'd be great, I like your approach better.
>>>
>>
>> Looking again, I think this is not it yet: dio->size is being referenced
>> after
>> submit_bio(), so blkdev_bio_end_io() may see the old value if the bio
>> completes
>> before dio->size increment. So the use-after-free is still there. And since
>> blkdev_bio_end_io() processes completion to user space only when dio->ref
>> becomes 0, adding an atomic_inc/dec(&dio->ref) over the loop would not help
>> and
>> does not cover the single BIO case. Any idea how to address this one ?
>>
>
> May be add a bio_get/put() over the 2 places that do submit_bio() would work,
> for all cases (single/multi BIO, sync & async). E.g.:
>
> + bio_get(bio);
> qc = submit_bio(bio);
> if (qc == BLK_QC_T_EAGAIN) {
> if (!dio->size)
> ret = -EAGAIN;
> + bio_put(bio);
> goto error;
> }
> dio->size += bio_size;
> + bio_put(bio);
>
> Thoughts ?
>
That does not work since the reference to dio->size in blkdev_bio_end_io()
depends on atomic_dec_and_test(&dio->ref) which counts the BIO fragments for the
dio (+1 for async multi-bio case). So completion of the last bio can still
reference the old value of dio->size.
Adding a bio_get/put() on dio->bio ensures that dio stays around, but does not
prevent the use of the wrong dio->size. Adding an additional
atomic_inc/dec(&dio->ref) would prevent that, but we would need to handle dio
completion at the end of __blkdev_direct_IO() if all BIO fragments already
completed at that point. That is a lot more plumbing needed, relying completely
on dio->ref for all cases, thus removing the dio->multi_bio management.
Something like this:
diff --git a/fs/block_dev.c b/fs/block_dev.c
index a6f7c892cb4a..51d36baa367c 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -279,7 +279,6 @@ struct blkdev_dio {
};
size_t size;
atomic_t ref;
- bool multi_bio : 1;
bool should_dirty : 1;
bool is_sync : 1;
struct bio bio;
@@ -295,15 +294,14 @@ static int blkdev_iopoll(struct kiocb *kiocb, bool wait)
return blk_poll(q, READ_ONCE(kiocb->ki_cookie), wait);
}
-static void blkdev_bio_end_io(struct bio *bio)
+static inline void blkdev_get_dio(struct blkdev_dio *dio)
{
- struct blkdev_dio *dio = bio->bi_private;
- bool should_dirty = dio->should_dirty;
-
- if (bio->bi_status && !dio->bio.bi_status)
- dio->bio.bi_status = bio->bi_status;
+ atomic_inc(&dio->ref);
+}
- if (!dio->multi_bio || atomic_dec_and_test(&dio->ref)) {
+static void blkdev_put_dio(struct blkdev_dio *dio)
+{
+ if (atomic_dec_and_test(&dio->ref)) {
if (!dio->is_sync) {
struct kiocb *iocb = dio->iocb;
ssize_t ret;
@@ -316,15 +314,25 @@ static void blkdev_bio_end_io(struct bio *bio)
}
dio->iocb->ki_complete(iocb, ret, 0);
- if (dio->multi_bio)
- bio_put(&dio->bio);
} else {
struct task_struct *waiter = dio->waiter;
WRITE_ONCE(dio->waiter, NULL);
blk_wake_io_task(waiter);
}
+ bio_put(&dio->bio);
}
+}
+
+static void blkdev_bio_end_io(struct bio *bio)
+{
+ struct blkdev_dio *dio = bio->bi_private;
+ bool should_dirty = dio->should_dirty;
+
+ if (bio->bi_status && !dio->bio.bi_status)
+ dio->bio.bi_status = bio->bi_status;
+
+ blkdev_put_dio(dio);
if (should_dirty) {
bio_check_pages_dirty(bio);
@@ -349,7 +357,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
loff_t pos = iocb->ki_pos;
blk_qc_t qc = BLK_QC_T_NONE;
gfp_t gfp;
- ssize_t ret;
+ ssize_t ret = 0;
if ((pos | iov_iter_alignment(iter)) &
(bdev_logical_block_size(bdev) - 1))
@@ -366,17 +374,24 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
dio = container_of(bio, struct blkdev_dio, bio);
dio->is_sync = is_sync = is_sync_kiocb(iocb);
- if (dio->is_sync) {
+ if (dio->is_sync)
dio->waiter = current;
- bio_get(bio);
- } else {
+ else
dio->iocb = iocb;
- }
dio->size = 0;
- dio->multi_bio = false;
dio->should_dirty = is_read && iter_is_iovec(iter);
+ /*
+ * Get an extra reference on the first bio to ensure that the dio
+ * structure which is embedded into the first bio stays around for AIOs
+ * and while we are still doing dio->size accounting in the loop below.
+ * For dio->is_sync case, the extra reference is released on exit of
+ * this function.
+ */
+ bio_get(bio);
+ blkdev_get_dio(dio);
+
/*
* Don't plug for HIPRI/polled IO, as those should go straight
* to issue
@@ -386,6 +401,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
ret = 0;
for (;;) {
+ size_t bio_size;
int err;
bio_set_dev(bio, bdev);
@@ -421,7 +437,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
if (nowait)
bio->bi_opf |= (REQ_NOWAIT | REQ_NOWAIT_INLINE);
- dio->size += bio->bi_iter.bi_size;
+ bio_size = bio->bi_iter.bi_size;
pos += bio->bi_iter.bi_size;
nr_pages = iov_iter_npages(iter, BIO_MAX_PAGES);
@@ -435,42 +451,30 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
qc = submit_bio(bio);
if (qc == BLK_QC_T_EAGAIN) {
- if (!ret)
+ if (!dio->size)
ret = -EAGAIN;
goto error;
}
- ret = dio->size;
+ dio->size += bio_size;
if (polled)
WRITE_ONCE(iocb->ki_cookie, qc);
break;
}
- if (!dio->multi_bio) {
- /*
- * AIO needs an extra reference to ensure the dio
- * structure which is embedded into the first bio
- * stays around.
- */
- if (!is_sync)
- bio_get(bio);
- dio->multi_bio = true;
- atomic_set(&dio->ref, 2);
- } else {
- atomic_inc(&dio->ref);
- }
+ blkdev_get_dio(dio);
qc = submit_bio(bio);
if (qc == BLK_QC_T_EAGAIN) {
- if (!ret)
+ if (!dio->size)
ret = -EAGAIN;
goto error;
}
- ret = dio->size;
+ dio->size += bio_size;
bio = bio_alloc(gfp, nr_pages);
if (!bio) {
- if (!ret)
+ if (!dio->size)
ret = -EAGAIN;
goto error;
}
@@ -496,8 +500,10 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter
*iter, int nr_pages)
out:
if (!ret)
ret = blk_status_to_errno(dio->bio.bi_status);
+ if (likely(!ret))
+ ret = dio->size;
- bio_put(&dio->bio);
+ blkdev_put_dio(dio);
return ret;
error:
if (!is_poll)
I think that works for all cases. Starting tests. Let me know if you think this
is not appropriate.
--
Damien Le Moal
Western Digital Research