The throttle group support use a cooperative round robin scheduling algorithm.

The principles of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS compute if a wait must be done and arm the right timer.
- If a wait must be done the token timer will be armed so the token will become
  the next active BDS.

Signed-off-by: Alberto Garcia <be...@igalia.com>
---
 block.c                   | 212 +++++++++++++++++++++++++++++++++++++++-------
 block/qapi.c              |   7 +-
 block/throttle-groups.c   |   2 +-
 blockdev.c                |  19 ++++-
 hmp.c                     |   4 +-
 include/block/block.h     |   3 +-
 include/block/block_int.h |   9 +-
 qapi/block-core.json      |   5 +-
 qemu-options.hx           |   1 +
 qmp-commands.hx           |   3 +-
 10 files changed, 220 insertions(+), 45 deletions(-)

diff --git a/block.c b/block.c
index 642ef04..625f1c8 100644
--- a/block.c
+++ b/block.c
@@ -36,6 +36,7 @@
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -130,7 +131,9 @@ void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_lock(bs->throttle_state);
+    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_unlock(bs->throttle_state);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -157,34 +160,99 @@ static bool bdrv_start_throttled_reqs(BlockDriverState 
*bs)
     return drained;
 }
 
+static void bdrv_throttle_group_add(BlockDriverState *bs)
+{
+    int i;
+    BlockDriverState *token;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+
+        /* If the ThrottleGroup is new set the current BlockDriverState as
+         * token
+         */
+        if (!token) {
+            throttle_group_set_token(bs->throttle_state, bs, i);
+        }
+
+    }
+
+    throttle_group_register_bs(bs->throttle_state, bs);
+}
+
+static void bdrv_throttle_group_remove(BlockDriverState *bs)
+{
+    BlockDriverState *token;
+    int i;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+        /* if this bs is the current token set the next bs as token */
+        if (token == bs) {
+            token = throttle_group_next_bs(token);
+            /* take care of the case where bs is the only bs of the group */
+            if (token == bs) {
+                token = NULL;
+            }
+            throttle_group_set_token(bs->throttle_state, token, i);
+        }
+    }
+
+    /* remove the current bs from the list */
+    QLIST_REMOVE(bs, round_robin);
+}
+
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
+
+    throttle_group_lock(bs->throttle_state);
     bs->io_limits_enabled = false;
+    throttle_group_unlock(bs->throttle_state);
 
     bdrv_start_throttled_reqs(bs);
 
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_remove(bs);
+    throttle_group_unlock(bs->throttle_state);
+
+    throttle_group_unref(bs->throttle_state);
+    bs->throttle_state = NULL;
+
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
 static void bdrv_throttle_read_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, false);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, false);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[0]);
 }
 
 static void bdrv_throttle_write_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, true);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, true);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[1]);
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+    bs->throttle_state = throttle_group_incref(group ? group : 
bdrv_get_device_name(bs));
+
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_add(bs);
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -192,6 +260,53 @@ void bdrv_io_limits_enable(BlockDriverState *bs)
                          bdrv_throttle_write_timer_cb,
                          bs);
     bs->io_limits_enabled = true;
+    throttle_group_unlock(bs->throttle_state);
+}
+
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (throttle_group_compare(bs->throttle_state, group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
+/* This implement the round robin policy and must be called under ThrottleGroup
+ * lock
+ */
+static BlockDriverState *bdrv_next_throttle_token(BlockDriverState *bs,
+                                                  bool is_write)
+{
+    BlockDriverState *token, *start;
+
+    start = token = throttle_group_token(bs->throttle_state, is_write);
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
 }
 
 /* This function makes an IO wait if needed
@@ -204,31 +319,63 @@ static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      bool is_write)
 {
     bool armed;
-
-    /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
-                                             is_write,
-                                             &armed);
-
-    /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+    bool must_wait;
+    BlockDriverState *token;
+
+    throttle_group_lock(bs->throttle_state);
+
+    /* First we check if this I/O has to be throttled.
+     * If that's the case and we need to set a new timer for that,
+     * we have to do it in the next bs according to the scheduling
+     * algorithm. In that case that bs will be used as the new
+     * token. */
+    token = bdrv_next_throttle_token(bs, is_write);
+    must_wait = throttle_schedule_timer(bs->throttle_state,
+                                        &token->throttle_timers,
+                                        is_write,
+                                        &armed);
+    if (armed) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
+
+    /* Wait if there's a timer set or queued requests of this type */
+    if (must_wait || !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        throttle_group_unlock(bs->throttle_state);
         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
-    }
-
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
+        throttle_group_lock(bs->throttle_state);
+    }
+
+    /* The I/O will be executed, so do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+
+    /* Now we check if there's any pending request to schedule next.
+     * If that's the case and it doesn't have to wait, we queue it for
+     * immediate execution. */
+    token = bdrv_next_throttle_token(bs, is_write);
+    if (!qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        must_wait = throttle_schedule_timer(bs->throttle_state,
+                                            &token->throttle_timers,
+                                            is_write,
+                                            &armed);
+
+        if (!must_wait) {
+            /* If we don't need to do any throttling, give preference
+             * to requests from the current bs */
+            if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+                token = bs;
+                qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+            } else {
+                throttle_fire_timer(&token->throttle_timers, is_write);
+            }
+        }
 
-    /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write, &armed)) {
-        return;
+        /* Set the new token unless a timer had already been armed */
+        if (armed || !must_wait) {
+            throttle_group_set_token(bs->throttle_state, token, is_write);
+        }
     }
 
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    throttle_group_unlock(bs->throttle_state);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -2056,15 +2203,16 @@ static void bdrv_move_feature_fields(BlockDriverState 
*bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index 1808e67..9ed3e68 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "block/write-threshold.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
@@ -63,7 +64,11 @@ BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_lock(bs->throttle_state);
+        throttle_get_config(bs->throttle_state, &cfg);
+        throttle_group_unlock(bs->throttle_state);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index ea5baca..399ae5e 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -154,7 +154,7 @@ void throttle_group_register_bs(ThrottleState *ts, 
BlockDriverState *bs)
  */
 BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
 {
-    ThrottleState *ts = &bs->throttle_state;
+    ThrottleState *ts = bs->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
 
diff --git a/blockdev.c b/blockdev.c
index 7d34960..135cdeb 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -361,6 +361,7 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
     BlockDriver *drv = NULL;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -463,6 +464,8 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -518,7 +521,7 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, throttling_group);
         bdrv_set_io_limits(bs, &cfg);
     }
 
@@ -721,6 +724,8 @@ DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType 
block_default_type)
 
         { "iops_size",      "throttling.iops-size" },
 
+        { "group",          "throttling.group" },
+
         { "readonly",       "read-only" },
     };
 
@@ -1889,7 +1894,9 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1941,9 +1948,11 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -3004,6 +3013,10 @@ QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index b47f331..47663ce 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1228,7 +1228,9 @@ void hmp_block_set_io_throttle(Monitor *mon, const QDict 
*qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index 321295e..c74771f 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -162,8 +162,9 @@ void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 28d754c..5d49ed2 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -358,12 +358,13 @@ struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers;
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     BlockAcctStats stats;
diff --git a/qapi/block-core.json b/qapi/block-core.json
index a3fdaf0..563b11f 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -989,6 +989,9 @@
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+#
+# @group: #optional throttle group name (Since 2.3)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -1000,7 +1003,7 @@
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 85ca3ad..ef8e76c 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -442,6 +442,7 @@ DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index a85d847..2ce6af6 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1704,7 +1704,7 @@ EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1730,6 +1730,7 @@ Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example:
 
-- 
2.1.4


Reply via email to