At Wed, 17 Dec 2014 17:32:49 +0900,
FUKUDA Yasuhito wrote:
> 
> Current auto recovery to consume as much as possible the resources of 
> sheepdog node.
> 
> So, this patch intended to allow the speed throttling of auto-recovery.
> By speed throttling, reduce the resource consumption at auto recovery.
> 
> Add new options to sheep and dog commands.
> Its options are "interval" and "object processing number".
> 
> see examples follows.
> 
> ex) sheep -R max=5,interval=1000 /var/lib/sheepdog
>     dog node recovery set-throttle 5 1000
>     dog node recovery get-throttle
> 
> Signed-off-by: Yasuhito Fukuda <[email protected]>
> ---
>  dog/node.c               |  146 
> +++++++++++++++++++++++++++++++++++++++++++++-
>  include/internal_proto.h |    2 +
>  include/sheepdog_proto.h |    6 ++
>  sheep/ops.c              |   38 ++++++++++++
>  sheep/recovery.c         |  139 ++++++++++++++++++++++++++++++++++++++++++-
>  sheep/sheep.c            |   41 +++++++++++++
>  sheep/sheep_priv.h       |    4 +
>  7 files changed, 370 insertions(+), 6 deletions(-)

The change seems good, but current master required rebase of this
patch. Could you rebase on the latest master?

Thanks,
Hitoshi

> 
> diff --git a/dog/node.c b/dog/node.c
> index a4e9142..d4c8fe7 100644
> --- a/dog/node.c
> +++ b/dog/node.c
> @@ -183,7 +183,7 @@ static int node_recovery_progress(void)
>       return result < 0 ? EXIT_SYSFAIL : EXIT_SUCCESS;
>  }
>  
> -static int node_recovery(int argc, char **argv)
> +static int node_recovery_info(int argc, char **argv)
>  {
>       struct sd_node *n;
>       int ret, i = 0;
> @@ -235,6 +235,120 @@ static int node_recovery(int argc, char **argv)
>       return EXIT_SUCCESS;
>  }
>  
> +static int node_recovery_set(int argc, char **argv)
> +{
> +     char *p;
> +     struct recovery_throttling *rthrottling;
> +
> +     rthrottling = xmalloc(sizeof(struct recovery_throttling));
> +
> +     if (!argv[optind] || !argv[optind + 1]) {
> +             sd_err("Invalid interval max (%s), interval (%s)",
> +              argv[optind], argv[optind + 1]);
> +             exit(EXIT_USAGE);
> +     }
> +
> +     rthrottling->max_exec_count = strtoul(argv[optind], &p, 10);
> +     if (argv[optind] == p || rthrottling->max_exec_count < 0 ||
> +      UINT32_MAX <= rthrottling->max_exec_count || errno != 0 ||
> +      *p != '\0') {
> +             sd_err("Invalid max (%s)", argv[optind]);
> +             exit(EXIT_USAGE);
> +     }
> +
> +     optind++;
> +
> +     rthrottling->queue_work_interval = strtoull(argv[optind], &p, 10);
> +     if (argv[optind] == p || rthrottling->queue_work_interval < 0 ||
> +      UINT64_MAX <= rthrottling->queue_work_interval || errno != 0 ||
> +      *p != '\0') {
> +             sd_err("Invalid interval (%s)", argv[optind]);
> +             exit(EXIT_USAGE);
> +     }
> +
> +     if ((rthrottling->max_exec_count == 0 &&
> +      rthrottling->queue_work_interval != 0) ||
> +      (rthrottling->max_exec_count != 0 &&
> +      rthrottling->queue_work_interval == 0)) {
> +             sd_err("Invalid interval max (%"PRIu32"), interval (%"PRIu64")",
> +             rthrottling->max_exec_count, rthrottling->queue_work_interval);
> +             exit(EXIT_USAGE);
> +     }
> +
> +     int ret = 0;
> +     struct sd_req req;
> +     struct sd_rsp *rsp = (struct sd_rsp *)&req;
> +
> +     sd_init_req(&req, SD_OP_SET_RECOVERY);
> +     req.flags = SD_FLAG_CMD_WRITE;
> +     req.data_length = sizeof(struct recovery_throttling);
> +     ret = dog_exec_req(&sd_nid, &req, rthrottling);
> +
> +     if (ret < 0)
> +             ret = EXIT_SYSFAIL;
> +
> +     if (rsp->result == SD_RES_SUCCESS)
> +             ret = EXIT_SUCCESS;
> +     else
> +             ret = EXIT_FAILURE;
> +
> +     switch (ret) {
> +     case EXIT_FAILURE:
> +     case EXIT_SYSFAIL:
> +             sd_err("Failed to execute request");
> +             ret = -1;
> +             break;
> +     case EXIT_SUCCESS:
> +             /* do nothing */
> +             break;
> +     default:
> +             sd_err("unknown return code: %d", ret);
> +             ret = -1;
> +             break;
> +     }
> +
> +     free(rthrottling);
> +     return ret;
> +}
> +
> +static int node_recovery_get(int argc, char **argv)
> +{
> +     struct recovery_throttling rthrottling;
> +     int ret = 0;
> +
> +     struct sd_req req;
> +     struct sd_rsp *rsp = (struct sd_rsp *)&req;
> +
> +     sd_init_req(&req, SD_OP_GET_RECOVERY);
> +     req.data_length = sizeof(rthrottling);
> +
> +     ret = dog_exec_req(&sd_nid, &req, &rthrottling);
> +     if (ret < 0)
> +             ret = EXIT_SYSFAIL;
> +
> +     if (rsp->result == SD_RES_SUCCESS)
> +             ret = EXIT_SUCCESS;
> +     else
> +             ret = EXIT_FAILURE;
> +
> +     switch (ret) {
> +     case EXIT_FAILURE:
> +     case EXIT_SYSFAIL:
> +             sd_err("Failed to execute request");
> +             ret = -1;
> +             break;
> +     case EXIT_SUCCESS:
> +             sd_info("max (%"PRIu32"), interval (%"PRIu64")",
> +              rthrottling.max_exec_count, rthrottling.queue_work_interval);
> +             break;
> +     default:
> +             sd_err("unknown return code: %d", ret);
> +             ret = -1;
> +             break;
> +     }
> +     return ret;
> +}
> +
>  static struct sd_node *idx_to_node(struct rb_root *nroot, int idx)
>  {
>       struct sd_node *n = rb_entry(rb_first(nroot), struct sd_node, rb);
> @@ -538,6 +652,31 @@ static struct sd_option node_options[] = {
>       { 0, NULL, false, NULL },
>  };
>  
> +static struct subcommand node_recovery_cmd[] = {
> +     {"info", NULL, "aphPrT", "show recovery information of nodes (default)",
> +      NULL, CMD_NEED_NODELIST, node_recovery_info, node_options},
> +     {"set-throttle", "<max> <interval>", NULL, "set new throttling", NULL,
> +      CMD_NEED_ARG|CMD_NEED_NODELIST, node_recovery_set, node_options},
> +     {"get-throttle", NULL, NULL, "get current throttling", NULL,
> +      CMD_NEED_NODELIST, node_recovery_get, node_options},
> +     {NULL},
> +};
> +
> +static int node_recovery(int argc, char **argv)
> +{
> +     int ret;
> +     if (argc == optind) {
> +             ret = update_node_list(SD_MAX_NODES);
> +             if (ret < 0) {
> +                     sd_err("Failed to get node list");
> +                     exit(EXIT_SYSFAIL);
> +             }
> +             return node_recovery_info(argc, argv);
> +     }
> +
> +     return do_generic_subcommand(node_recovery_cmd, argc, argv);
> +}
> +
>  static int node_log_level_set(int argc, char **argv)
>  {
>       int ret = 0;
> @@ -632,8 +771,9 @@ static struct subcommand node_cmd[] = {
>        CMD_NEED_NODELIST, node_list},
>       {"info", NULL, "aprhT", "show information about each node", NULL,
>        CMD_NEED_NODELIST, node_info},
> -     {"recovery", NULL, "aphPrT", "show recovery information of nodes", NULL,
> -      CMD_NEED_NODELIST, node_recovery, node_options},
> +     {"recovery", "<max> <interval>", "aphPrT",
> +      "show recovery information or set/get recovery speed throttling of 
> nodes",
> +      node_recovery_cmd, 0, node_recovery, node_options},
>       {"md", "[disks]", "aprAfhT", "See 'dog node md' for more information",
>        node_md_cmd, CMD_NEED_ARG, node_md, node_options},
>       {"stat", NULL, "aprwhT", "show stat information about the node", NULL,
> diff --git a/include/internal_proto.h b/include/internal_proto.h
> index 3f5d77f..f6ba18e 100644
> --- a/include/internal_proto.h
> +++ b/include/internal_proto.h
> @@ -111,6 +111,8 @@
>  #define SD_OP_VDI_STATE_SNAPSHOT_CTL  0xC7
>  #define SD_OP_INODE_COHERENCE 0xC8
>  #define SD_OP_READ_DEL_VDIS  0xC9
> +#define SD_OP_GET_RECOVERY      0xCA
> +#define SD_OP_SET_RECOVERY      0xCB
>  
>  /* internal flags for hdr.flags, must be above 0x80 */
>  #define SD_FLAG_CMD_RECOVERY 0x0080
> diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
> index 4f0c48c..5f6d157 100644
> --- a/include/sheepdog_proto.h
> +++ b/include/sheepdog_proto.h
> @@ -272,6 +272,12 @@ struct generation_reference {
>       int32_t count;
>  };
>  
> +struct recovery_throttling {
> +     uint32_t max_exec_count;
> +     uint64_t queue_work_interval;
> +     bool throttling;
> +};
> +
>  struct sd_inode {
>       char name[SD_MAX_VDI_LEN];
>       char tag[SD_MAX_VDI_TAG_LEN];
> diff --git a/sheep/ops.c b/sheep/ops.c
> index e4daca2..2b4a769 100644
> --- a/sheep/ops.c
> +++ b/sheep/ops.c
> @@ -1444,6 +1444,30 @@ static int cluster_inode_coherence(const struct sd_req 
> *req,
>                              !!req->inode_coherence.validate, &sender->nid);
>  }
>  
> +static int local_get_recovery(struct request *req)
> +{
> +     struct recovery_throttling rthrottling;
> +
> +     rthrottling = get_recovery();
> +     memcpy(req->data, &rthrottling, sizeof(rthrottling));
> +     req->rp.data_length = sizeof(rthrottling);
> +
> +     return SD_RES_SUCCESS;
> +}
> +
> +static int local_set_recovery(struct request *req)
> +{
> +     struct recovery_throttling *rthrottling;
> +
> +     rthrottling = xmalloc(sizeof(struct recovery_throttling));
> +
> +     memcpy(rthrottling, req->data, sizeof(struct recovery_throttling));
> +     set_recovery(rthrottling);
> +
> +     free(rthrottling);
> +     return SD_RES_SUCCESS;
> +}
> +
>  static struct sd_op_template sd_ops[] = {
>  
>       /* cluster operations */
> @@ -1891,6 +1915,20 @@ static struct sd_op_template sd_ops[] = {
>               .type = SD_OP_TYPE_PEER,
>               .process_work = peer_decref_object,
>       },
> +
> +     [SD_OP_GET_RECOVERY] = {
> +             .name = "GET_RECOVERY",
> +             .type = SD_OP_TYPE_LOCAL,
> +             .force = true,
> +             .process_work = local_get_recovery,
> +     },
> +
> +     [SD_OP_SET_RECOVERY] = {
> +             .name = "SET_RECOVERY",
> +             .type = SD_OP_TYPE_LOCAL,
> +             .force = true,
> +             .process_work = local_set_recovery,
> +     },
>  };
>  
>  const struct sd_op_template *get_sd_op(uint8_t opcode)
> diff --git a/sheep/recovery.c b/sheep/recovery.c
> index 85dad21..325122b 100644
> --- a/sheep/recovery.c
> +++ b/sheep/recovery.c
> @@ -78,6 +78,15 @@ struct recovery_info {
>       struct sd_mutex vinfo_lock;
>  
>       struct sd_node *excluded;
> +
> +     uint32_t max_exec_count;
> +     uint64_t queue_work_interval;
> +     bool throttling;
> +};
> +
> +struct recovery_timer {
> +     void (*callback)(void *);
> +     void *data;
>  };
>  
>  static struct recovery_info *next_rinfo;
> @@ -900,6 +909,91 @@ void resume_suspended_recovery(void)
>       }
>  }
>  
> +static void recovery_timer_handler(int fd, int events, void *data)
> +{
> +     struct recovery_timer *t = data;
> +     uint64_t val;
> +
> +     if (read(fd, &val, sizeof(val)) < 0)
> +             return;
> +     t->callback(t->data);
> +     unregister_event(fd);
> +     close(fd);
> +}
> +
> +static void add_recovery_timer(struct recovery_timer *t, unsigned int 
> mseconds)
> +{
> +     struct itimerspec it;
> +     int tfd;
> +
> +     tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
> +     if (tfd < 0) {
> +             sd_err("timerfd_create: %m");
> +             return;
> +     }
> +
> +     memset(&it, 0, sizeof(it));
> +     it.it_value.tv_sec = mseconds / 1000;
> +     it.it_value.tv_nsec = (mseconds % 1000) * 1000000;
> +
> +     if (timerfd_settime(tfd, 0, &it, NULL) < 0) {
> +             sd_err("timerfd_settime: %m");
> +             return;
> +     }
> +
> +     if (register_event(tfd, recovery_timer_handler, t) < 0)
> +             sd_err("failed to register timer fd");
> +}
> +
> +static void recover_next_object_delay(void *arg)
> +{
> +     struct recovery_info *rinfo = main_thread_get(current_rinfo);
> +     uint32_t nr_threads = md_nr_disks() * 2;
> +     double thread_unit_exec = 0;
> +     double mod = 0;
> +
> +     if (!rinfo)
> +             return;
> +
> +     thread_unit_exec = (double) rinfo->max_exec_count / nr_threads;
> +     mod = rinfo->max_exec_count % nr_threads;
> +
> +     if (rinfo->max_exec_count <= nr_threads || mod != 0) {
> +             if (rand() % 100 + 1 <= (mod / nr_threads) * 100)
> +                     thread_unit_exec = ceil(thread_unit_exec);
> +             else
> +                     thread_unit_exec = floor(thread_unit_exec);
> +     }
> +
> +     for (int i = 0; i < thread_unit_exec; i++) {
> +             rinfo = main_thread_get(current_rinfo);
> +
> +             if (!rinfo)
> +                     return;
> +
> +             if (rinfo->next - rinfo->done > rinfo->max_exec_count)
> +                     break;
> +
> +             recover_next_object(rinfo);
> +     }
> +
> +     if (rinfo->throttling != sys->rthrottling.throttling) {
> +             rinfo->max_exec_count = sys->rthrottling.max_exec_count;
> +             rinfo->queue_work_interval =
> +                              sys->rthrottling.queue_work_interval;
> +             rinfo->throttling = sys->rthrottling.throttling;
> +     }
> +
> +     if (rinfo->throttling) {
> +             static struct recovery_timer rt = {
> +                     .callback = recover_next_object_delay,
> +                     .data = &rt,
> +             };
> +             add_recovery_timer(&rt, rinfo->queue_work_interval);
> +     } else
> +             recover_next_object(rinfo);
> +}
> +
>  static void recover_object_main(struct work *work)
>  {
>       struct recovery_work *rw = container_of(work, struct recovery_work,
> @@ -935,7 +1029,16 @@ static void recover_object_main(struct work *work)
>       if (rinfo->done >= rinfo->count)
>               goto finish_recovery;
>  
> -     recover_next_object(rinfo);
> +     if (!rinfo->throttling && !sys->rthrottling.throttling)
> +             recover_next_object(rinfo);
> +     else if (!rinfo->throttling && sys->rthrottling.throttling) {
> +             static struct recovery_timer rt = {
> +                     .callback = recover_next_object_delay,
> +                     .data = &rt,
> +             };
> +             add_recovery_timer(&rt, sys->rthrottling.queue_work_interval);
> +     }
> +
>       free_recovery_obj_work(row);
>       return;
>  finish_recovery:
> @@ -982,8 +1085,17 @@ static void finish_object_list(struct work *work)
>               return;
>       }
>  
> -     for (uint32_t i = 0; i < nr_threads; i++)
> -             recover_next_object(rinfo);
> +     for (uint32_t i = 0; i < nr_threads; i++) {
> +             if (rinfo->throttling) {
> +                     static struct recovery_timer rt = {
> +                             .callback = recover_next_object_delay,
> +                             .data = &rt,
> +                     };
> +                     add_recovery_timer(&rt, rinfo->queue_work_interval);
> +             } else
> +                     recover_next_object(rinfo);
> +     }
> +
>       return;
>  }
>  
> @@ -1143,6 +1255,9 @@ int start_recovery(struct vnode_info *cur_vinfo, struct 
> vnode_info *old_vinfo,
>       rinfo->max_epoch = sys->cinfo.epoch;
>       rinfo->vinfo_array = xzalloc(sizeof(struct vnode_info *) *
>                                    rinfo->max_epoch);
> +     rinfo->max_exec_count = sys->rthrottling.max_exec_count;
> +     rinfo->queue_work_interval = sys->rthrottling.queue_work_interval;
> +     rinfo->throttling = sys->rthrottling.throttling;
>       sd_init_mutex(&rinfo->vinfo_lock);
>       if (epoch_lifted)
>               rinfo->notify_complete = true; /* Reweight or node recovery */
> @@ -1236,3 +1351,21 @@ void get_recovery_state(struct recovery_state *state)
>       state->nr_finished = rinfo->done;
>       state->nr_total = rinfo->count;
>  }
> +
> +void set_recovery(struct recovery_throttling *rthrottling)
> +{
> +     sys->rthrottling.max_exec_count = rthrottling->max_exec_count;
> +     sys->rthrottling.queue_work_interval =
> +                              rthrottling->queue_work_interval;
> +     if (rthrottling->max_exec_count > 0 &&
> +      rthrottling->queue_work_interval > 0)
> +             sys->rthrottling.throttling = true;
> +     else
> +             sys->rthrottling.throttling = false;
> +}
> +
> +struct recovery_throttling get_recovery(void)
> +{
> +     return sys->rthrottling;
> +}
> +
> diff --git a/sheep/sheep.c b/sheep/sheep.c
> index ef45a33..9fc7610 100644
> --- a/sheep/sheep.c
> +++ b/sheep/sheep.c
> @@ -115,6 +115,12 @@ static const char log_help[] =
>  "  syslog             syslog of the system\n"
>  "  stdout             standard output\n";
>  
> +static const char recovery_help[] =
> +"Available arguments:\n"
> +"\tmax=: object recovery process maximum count of each interval\n"
> +"\tinterval=: object recovery interval time (millisec)\n"
> +"Example:\n\t$ sheep -R max=50,interval=1000 ...\n";
> +
>  static struct sd_option sheep_options[] = {
>       {'b', "bindaddr", true, "specify IP address of interface to listen on",
>        bind_help},
> @@ -137,6 +143,8 @@ static struct sd_option sheep_options[] = {
>       {'P', "pidfile", true, "create a pid file"},
>       {'r', "http", true, "enable http service. (default: disabled)",
>        http_help},
> +     {'R', "recovery", true, "specify the recovery speed throttling",
> +      recovery_help},
>       {'u', "upgrade", false, "upgrade to the latest data layout"},
>       {'v', "version", false, "show the version"},
>       {'w', "cache", true, "enable object cache", cache_help},
> @@ -424,6 +432,26 @@ static struct option_parser journal_parsers[] = {
>       { NULL, NULL },
>  };
>  
> +static uint32_t max_exec_count;
> +static uint64_t queue_work_interval;
> +static int max_exec_count_parser(const char *s)
> +{
> +     max_exec_count = strtol(s, NULL, 10);
> +     return 0;
> +}
> +
> +static int queue_work_interval_parser(const char *s)
> +{
> +     queue_work_interval = strtol(s, NULL, 10);
> +     return 0;
> +}
> +
> +static struct option_parser recovery_parsers[] = {
> +     { "max=", max_exec_count_parser },
> +     { "interval=", queue_work_interval_parser },
> +     { NULL, NULL },
> +};
> +
>  static size_t get_nr_nodes(void)
>  {
>       struct vnode_info *vinfo;
> @@ -633,6 +661,10 @@ int main(int argc, char **argv)
>  
>       sys->node_status = SD_NODE_STATUS_INITIALIZATION;
>  
> +     sys->rthrottling.max_exec_count = 0;
> +     sys->rthrottling.queue_work_interval = 0;
> +     sys->rthrottling.throttling = false;
> +
>       install_crash_handler(crash_handler);
>       signal(SIGPIPE, SIG_IGN);
>  
> @@ -751,6 +783,15 @@ int main(int argc, char **argv)
>               case 'h':
>                       usage(0);
>                       break;
> +             case 'R':
> +                     if (option_parse(optarg, ",", recovery_parsers) < 0)
> +                             exit(1);
> +                     sys->rthrottling.max_exec_count = max_exec_count;
> +                     sys->rthrottling.queue_work_interval
> +                                              = queue_work_interval;
> +                     if (max_exec_count > 0 && queue_work_interval > 0)
> +                             sys->rthrottling.throttling = true;
> +                     break;
>               case 'v':
>                       fprintf(stdout, "Sheepdog daemon version %s\n",
>                               PACKAGE_VERSION);
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index 4ac08f8..170e8ff 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -143,6 +143,8 @@ struct system_info {
>       bool gateway_only;
>       bool nosync;
>  
> +     struct recovery_throttling rthrottling;
> +
>       struct work_queue *net_wqueue;
>       struct work_queue *gateway_wqueue;
>       struct work_queue *io_wqueue;
> @@ -428,6 +430,8 @@ int start_recovery(struct vnode_info *cur_vinfo, struct 
> vnode_info *, bool,
>  bool oid_in_recovery(uint64_t oid);
>  bool node_in_recovery(void);
>  void get_recovery_state(struct recovery_state *state);
> +void set_recovery(struct recovery_throttling *rthrottling);
> +struct recovery_throttling get_recovery(void);
>  
>  int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
>                      uint64_t offset);
> -- 
> 1.7.1
> 
> 
> 
> -- 
> NTTソフトウェア株式会社
> クラウド事業部 第一事業ユニット(C一BU)
> 福田康人(FUKUDA Yasuhito)
> E-mail:[email protected]
> 〒220-0012 横浜市西区みなとみらい4-4-5
> 横浜アイマークプレイス13階
> TEL:045-212-7393/FAX:045-662-7856
> 
> 
> -- 
> sheepdog mailing list
> [email protected]
> http://lists.wpkg.org/mailman/listinfo/sheepdog
-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to