If we wake a coroutine from a different context, we must ensure that it will yield exactly once (now or later), awaiting that wake.
curl’s current .ret == -EINPROGRESS loop may lead to the coroutine not yielding if the request finishes before the loop gets run. To fix it, drop the loop and just yield exactly once, unless the request is served from the cache or failed before it was submitted – that last part makes it a bit complicated, as the result of curl_find_buf() now needs to be a tristate. (Can be reproduced with multiqueue by adding a usleep(100000) before the `while (acb.ret == -EINPROGRESS)` loop.) Also, add a comment why aio_co_wake() is safe regardless of whether the coroutine and curl_multi_check_completion() run in the same context. Cc: [email protected] Signed-off-by: Hanna Czenczek <[email protected]> --- block/curl.c | 55 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/block/curl.c b/block/curl.c index 68cf83ce55..65996a8866 100644 --- a/block/curl.c +++ b/block/curl.c @@ -124,6 +124,16 @@ typedef struct BDRVCURLState { char *proxypassword; } BDRVCURLState; +/** Possible result states of curl_find_buf() */ +typedef enum { + /* No buffer found, need to create new request */ + CURL_NO_BUF_FOUND, + /* Buffer found, request filled and done */ + CURL_REQUEST_FILLED, + /* Ongoing request found, need to yield */ + CURL_REQUEST_ONGOING, +} CURLFindBufResult; + static void curl_clean_state(CURLState *s); static void curl_multi_do(void *arg); @@ -258,8 +268,8 @@ read_end: } /* Called with s->mutex held. */ -static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len, - CURLAIOCB *acb) +static CURLFindBufResult curl_find_buf(BDRVCURLState *s, uint64_t start, + uint64_t len, CURLAIOCB *acb) { int i; uint64_t end = start + len; @@ -289,7 +299,7 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len, qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len); } acb->ret = 0; - return true; + return CURL_REQUEST_FILLED; } // Wait for unfinished chunks @@ -307,13 +317,13 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len, for (j=0; j<CURL_NUM_ACB; j++) { if (!state->acb[j]) { state->acb[j] = acb; - return true; + return CURL_REQUEST_ONGOING; } } } } - return false; + return CURL_NO_BUF_FOUND; } /* Called with s->mutex held. */ @@ -378,6 +388,16 @@ static void curl_multi_check_completion(BDRVCURLState *s) acb->ret = error ? -EIO : 0; state->acb[i] = NULL; qemu_mutex_unlock(&s->mutex); + /* + * Current AioContext is the BDS context, which may or may not + * be the request (coroutine) context. + * - If it is, the coroutine must have yielded or the FD handler + * (curl_multi_do()/curl_multi_timeout_do()) could not have + * been called and we would not be here + * - If it is not, it doesn't matter whether it has already + * yielded or not; it will be scheduled once it does yield + * So aio_co_wake() is safe to call. + */ aio_co_wake(acb->co); qemu_mutex_lock(&s->mutex); } @@ -868,7 +888,8 @@ out_noclean: return -EINVAL; } -static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) +/* Return whether a request was submitted that requires yielding */ +static bool coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) { CURLState *state; int running; @@ -877,13 +898,15 @@ static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) uint64_t start = acb->offset; uint64_t end; + CURLFindBufResult find_buf_res; - qemu_mutex_lock(&s->mutex); + QEMU_LOCK_GUARD(&s->mutex); // In case we have the requested data already (e.g. read-ahead), // we can just call the callback and be done. - if (curl_find_buf(s, start, acb->bytes, acb)) { - goto out; + find_buf_res = curl_find_buf(s, start, acb->bytes, acb); + if (find_buf_res != CURL_NO_BUF_FOUND) { + return find_buf_res == CURL_REQUEST_ONGOING; } // No cache found, so let's start a new request @@ -898,7 +921,7 @@ static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) if (curl_init_state(s, state) < 0) { curl_clean_state(state); acb->ret = -EIO; - goto out; + return false; } acb->start = 0; @@ -913,7 +936,7 @@ static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) if (state->buf_len && state->orig_buf == NULL) { curl_clean_state(state); acb->ret = -ENOMEM; - goto out; + return false; } state->acb[0] = acb; @@ -925,14 +948,12 @@ static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb) acb->ret = -EIO; curl_clean_state(state); - goto out; + return false; } /* Tell curl it needs to kick things off */ curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running); - -out: - qemu_mutex_unlock(&s->mutex); + return true; } static int coroutine_fn curl_co_preadv(BlockDriverState *bs, @@ -941,14 +962,12 @@ static int coroutine_fn curl_co_preadv(BlockDriverState *bs, { CURLAIOCB acb = { .co = qemu_coroutine_self(), - .ret = -EINPROGRESS, .qiov = qiov, .offset = offset, .bytes = bytes }; - curl_setup_preadv(bs, &acb); - while (acb.ret == -EINPROGRESS) { + if (curl_setup_preadv(bs, &acb)) { qemu_coroutine_yield(); } return acb.ret; -- 2.51.0
