On Thu, Mar 21, 2024 at 4:57 PM Jonah Palmer <jonah.pal...@oracle.com> wrote: > > Implements in-order handling for most virtio devices using the > VIRTIO_F_IN_ORDER transport feature, specifically those who call > virtqueue_push to push their used elements onto the used ring. > > The logic behind this implementation is as follows: > > 1.) virtqueue_pop always enqueues VirtQueueElements in-order. > > virtqueue_pop always retrieves one or more buffer descriptors in-order > from the available ring and converts them into a VirtQueueElement. This > means that the order in which VirtQueueElements are enqueued are > in-order by default. > > By virtue, as VirtQueueElements are created, we can assign a sequential > key value to them. This preserves the order of buffers that have been > made available to the device by the driver. > > As VirtQueueElements are assigned a key value, the current sequence > number is incremented. > > 2.) Requests can be completed out-of-order. > > While most devices complete requests in the same order that they were > enqueued by default, some devices don't (e.g. virtio-blk). The goal of > this out-of-order handling is to reduce the impact of devices that > process elements in-order by default while also guaranteeing compliance > with the VIRTIO_F_IN_ORDER feature. > > Below is the logic behind handling completed requests (which may or may > not be in-order). > > 3.) Does the incoming used VirtQueueElement preserve the correct order? > > In other words, is the sequence number (key) assigned to the > VirtQueueElement the expected number that would preserve the original > order? > > 3a.) > If it does... immediately push the used element onto the used ring. > Then increment the next expected sequence number and check to see if > any previous out-of-order VirtQueueElements stored on the hash table > has a key that matches this next expected sequence number. > > For each VirtQueueElement found on the hash table with a matching key: > push the element on the used ring, remove the key-value pair from the > hash table, and then increment the next expected sequence number. Repeat > this process until we're unable to find an element with a matching key. > > Note that if the device uses batching (e.g. virtio-net), then we skip > the virtqueue_flush call and let the device call it themselves. > > 3b.) > If it does not... stash the VirtQueueElement, along with relevant data, > as a InOrderVQElement on the hash table. The key used is the order_key > that was assigned when the VirtQueueElement was created. > > Signed-off-by: Jonah Palmer <jonah.pal...@oracle.com> > --- > hw/virtio/virtio.c | 70 ++++++++++++++++++++++++++++++++++++-- > include/hw/virtio/virtio.h | 8 +++++ > 2 files changed, 76 insertions(+), 2 deletions(-) > > diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c > index 40124545d6..40e4377f1e 100644 > --- a/hw/virtio/virtio.c > +++ b/hw/virtio/virtio.c > @@ -992,12 +992,56 @@ void virtqueue_flush(VirtQueue *vq, unsigned int count) > } > } > > +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem, > + unsigned int len, unsigned int idx, > + unsigned int count) > +{ > + InOrderVQElement *in_order_elem; > + > + if (elem->order_key == vq->current_order_idx) { > + /* Element is in-order, push to used ring */ > + virtqueue_fill(vq, elem, len, idx); > + > + /* Batching? Don't flush */ > + if (count) { > + virtqueue_flush(vq, count);
The "count" parameter is the number of heads used, but here you're only using one head (elem). Same with the other virtqueue_flush in the function. Also, this function sometimes replaces virtqueue_fill and other replaces virtqueue_fill + virtqueue_flush (both examples in patch 6/8). I have the impression the series would be simpler if virtqueue_order_element is a static function just handling the virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER) path of virtqueue_fill, so the caller does not need to know if the in_order feature is on or off. > + } > + > + /* Increment next expected order, search for more in-order elements > */ > + while ((in_order_elem = g_hash_table_lookup(vq->in_order_ht, > + GUINT_TO_POINTER(++vq->current_order_idx))) != NULL) > { > + /* Found in-order element, push to used ring */ > + virtqueue_fill(vq, in_order_elem->elem, in_order_elem->len, > + in_order_elem->idx); > + > + /* Batching? Don't flush */ > + if (count) { > + virtqueue_flush(vq, in_order_elem->count); > + } > + > + /* Remove key-value pair from hash table */ > + g_hash_table_remove(vq->in_order_ht, > + GUINT_TO_POINTER(vq->current_order_idx)); > + } > + } else { > + /* Element is out-of-order, stash in hash table */ > + in_order_elem = virtqueue_alloc_in_order_element(elem, len, idx, > + count); > + g_hash_table_insert(vq->in_order_ht, > GUINT_TO_POINTER(elem->order_key), > + in_order_elem); > + } > +} > + > void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem, > unsigned int len) > { > RCU_READ_LOCK_GUARD(); > - virtqueue_fill(vq, elem, len, 0); > - virtqueue_flush(vq, 1); > + if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER)) { > + virtqueue_order_element(vq, elem, len, 0, 1); > + } else { > + virtqueue_fill(vq, elem, len, 0); > + virtqueue_flush(vq, 1); > + } > } > > /* Called within rcu_read_lock(). */ > @@ -1478,6 +1522,18 @@ void virtqueue_map(VirtIODevice *vdev, > VirtQueueElement *elem) > > false); > } > > +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem, > + unsigned int len, unsigned int idx, > + unsigned int count) > +{ > + InOrderVQElement *in_order_elem = g_malloc(sizeof(InOrderVQElement)); > + in_order_elem->elem = elem; > + in_order_elem->len = len; > + in_order_elem->idx = idx; > + in_order_elem->count = count; > + return in_order_elem; > +} > + > static void *virtqueue_alloc_element(size_t sz, unsigned out_num, unsigned > in_num) > { > VirtQueueElement *elem; > @@ -1626,6 +1682,11 @@ static void *virtqueue_split_pop(VirtQueue *vq, size_t > sz) > elem->in_sg[i] = iov[out_num + i]; > } > > + /* Assign key for in-order processing */ > + if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) { > + elem->order_key = vq->current_order_key++; Since you're adding this in both split_pop and packed_pop, why not add it in virtqueue_pop? > + } > + > vq->inuse++; > > trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num); > @@ -1762,6 +1823,11 @@ static void *virtqueue_packed_pop(VirtQueue *vq, > size_t sz) > vq->last_avail_wrap_counter ^= 1; > } > > + /* Assign key for in-order processing */ > + if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) { > + elem->order_key = vq->current_order_key++; > + } > + > vq->shadow_avail_idx = vq->last_avail_idx; > vq->shadow_avail_wrap_counter = vq->last_avail_wrap_counter; > > diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h > index f83d7e1fee..eeeda397a9 100644 > --- a/include/hw/virtio/virtio.h > +++ b/include/hw/virtio/virtio.h > @@ -275,6 +275,14 @@ void virtio_del_queue(VirtIODevice *vdev, int n); > > void virtio_delete_queue(VirtQueue *vq); > > +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem, > + unsigned int len, unsigned int idx, > + unsigned int count); > + > +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem, > + unsigned int len, unsigned int idx, > + unsigned int count); > + > void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem, > unsigned int len); > void virtqueue_flush(VirtQueue *vq, unsigned int count); > -- > 2.39.3 >