On 8/20/21 11:16 AM, [email protected] wrote:
> From: Anton Ivanov <[email protected]>
> 
> Set a soft time limit of "raft election timer"/2 on ovsdb
> processing.
> 
> This improves behaviour in large heavily loaded clusters.
> While it cannot fully eliminate spurious raft elections
> under heavy load, it significantly decreases their number.
> 
> Processing is (to the extent possible) restarted where it
> stopped on the previous iteration to ensure that sessions
> towards the tail of the session list are not starved.
> 
> Signed-off-by: Anton Ivanov <[email protected]>
> ---
>  ovsdb/jsonrpc-server.c | 98 ++++++++++++++++++++++++++++++++++++------
>  ovsdb/jsonrpc-server.h |  2 +-
>  ovsdb/ovsdb-server.c   | 16 ++++++-
>  ovsdb/raft.c           |  6 +++
>  ovsdb/raft.h           |  3 ++
>  ovsdb/storage.c        | 12 ++++++
>  ovsdb/storage.h        |  2 +
>  7 files changed, 124 insertions(+), 15 deletions(-)
> 
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index 351c39d8a..457e1c040 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session 
> *ovsdb_jsonrpc_session_create(
>      struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
>  static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
>                                                 struct ovsdb *);
> -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
> +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *,
> +                                          uint64_t limit);
>  static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
>  static void ovsdb_jsonrpc_session_get_memory_usage_all(
>      const struct ovsdb_jsonrpc_remote *, struct simap *usage);
> @@ -128,6 +129,9 @@ struct ovsdb_jsonrpc_server {
>      bool read_only;            /* This server is does not accept any
>                                    transactions that can modify the database. 
> */
>      struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. 
> */
> +    struct ovs_list worklist; /* List of remotes to work on. */
> +    bool must_wake_up; /* The processing loop must be re-run. It was
> +                          interrupted due to exceeding a time constraint. */
>  };
>  
>  /* A configured remote.  This is either a passive stream listener plus a list
> @@ -137,6 +141,7 @@ struct ovsdb_jsonrpc_remote {
>      struct ovsdb_jsonrpc_server *server;
>      struct pstream *listener;   /* Listener, if passive. */
>      struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. 
> */
> +    struct ovs_list work_node;
>      uint8_t dscp;
>      bool read_only;
>      char *role;
> @@ -158,6 +163,7 @@ ovsdb_jsonrpc_server_create(bool read_only)
>      struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
>      ovsdb_server_init(&server->up);
>      shash_init(&server->remotes);
> +    ovs_list_init(&server->worklist);
>      server->read_only = read_only;
>      return server;
>  }
> @@ -255,6 +261,7 @@ ovsdb_jsonrpc_server_set_remotes(struct 
> ovsdb_jsonrpc_server *svr,
>  
>          ovsdb_jsonrpc_session_set_all_options(remote, options);
>      }
> +    ovs_list_init(&svr->worklist); /* Reset any pending work. */
>  }
>  
>  static struct ovsdb_jsonrpc_remote *
> @@ -280,6 +287,7 @@ ovsdb_jsonrpc_server_add_remote(struct 
> ovsdb_jsonrpc_server *svr,
>      remote->read_only = options->read_only;
>      remote->role = nullable_xstrdup(options->role);
>      shash_add(&svr->remotes, name, remote);
> +    ovs_list_init(&svr->worklist); /* Reset any pending work. */
>  
>      if (!listener) {
>          ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, 
> true),
> @@ -292,10 +300,10 @@ static void
>  ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
>  {
>      struct ovsdb_jsonrpc_remote *remote = node->data;
> -
>      ovsdb_jsonrpc_session_close_all(remote);
>      pstream_close(remote->listener);
>      shash_delete(&remote->server->remotes, node);
> +    ovs_list_init(&remote->server->worklist); /* Reset any pending work. */
>      free(remote->role);
>      free(remote);
>  }
> @@ -378,32 +386,55 @@ ovsdb_jsonrpc_server_set_read_only(struct 
> ovsdb_jsonrpc_server *svr,
>  }
>  
>  void
> -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
> +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit)
>  {
>      struct shash_node *node;
> +    uint64_t elapsed = 0;
> +    uint64_t start_time = time_msec();
> +    struct ovsdb_jsonrpc_remote *work;
>  
> -    SHASH_FOR_EACH (node, &svr->remotes) {
> -        struct ovsdb_jsonrpc_remote *remote = node->data;
> +    svr->must_wake_up = false;
>  
> -        if (remote->listener) {
> +    if (ovs_list_is_empty(&svr->worklist)) {
> +        SHASH_FOR_EACH (node, &svr->remotes) {
> +            struct ovsdb_jsonrpc_remote *remote = node->data;
> +            ovs_list_push_back(&svr->worklist, &remote->work_node);
> +        }
> +    }

If there is unfinished work, the loop below will only iterate
over remaining elements, but it will not process elements that
were processed last time.  But we're not setting 'must_wake_up'.
Doesn't look right.

Anyway, I don't understand why we need two different ways of
iteration for remotes and for sessions.  Why can't we maintain
this worklist adding and removing elements along with shahs
and re-ordering in a same way as you did for sessions?
This will save us some wakeups and make code more uniform.

> +
> +    LIST_FOR_EACH_POP (work, work_node, &svr->worklist) {
> +        if (work->listener) {
>              struct stream *stream;
>              int error;
>  
> -            error = pstream_accept(remote->listener, &stream);
> +            error = pstream_accept(work->listener, &stream);
>              if (!error) {
>                  struct jsonrpc_session *js;
>                  js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
> -                                                     remote->dscp);
> -                ovsdb_jsonrpc_session_create(remote, js, svr->read_only ||
> -                                                         remote->read_only);
> +                                                     work->dscp);
> +                ovsdb_jsonrpc_session_create(work, js, svr->read_only ||
> +                                                         work->read_only);

Please, keep the original indentation.

>              } else if (error != EAGAIN) {
>                  VLOG_WARN_RL(&rl, "%s: accept failed: %s",
> -                             pstream_get_name(remote->listener),
> +                             pstream_get_name(work->listener),
>                               ovs_strerror(error));
>              }
>          }
>  
> -        ovsdb_jsonrpc_session_run_all(remote);
> +        /* We assume accept and session creation time to be
> +         * negligible for the purposes of computing timeouts.
> +         */
> +        ovsdb_jsonrpc_session_run_all(work, limit - elapsed);
> +
> +        elapsed = time_msec() - start_time;
> +        if (elapsed > limit) {
> +            /* Push the current (timed out) item at the end of the
> +             * work queue. This ensures that with multiple remotes
> +             * timeouts on one do not starve the others. */
> +            ovs_list_push_back(&svr->worklist, &work->work_node);
> +            svr->must_wake_up = true;
> +            break;
> +        }
>      }
>  }
>  
> @@ -412,6 +443,16 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server 
> *svr)
>  {
>      struct shash_node *node;
>  
> +    if (svr->must_wake_up) {
> +        /* We have stopped processing due to a time constraint.
> +         * In this case there is no point to walk all sessions
> +         * and rebuild the poll structure for the poll loop.
> +         */

See review for v7.

> +        poll_immediate_wake();
> +        svr->must_wake_up = false;
> +        return;
> +    }
> +
>      SHASH_FOR_EACH (node, &svr->remotes) {
>          struct ovsdb_jsonrpc_remote *remote = node->data;
>  
> @@ -583,15 +624,46 @@ ovsdb_jsonrpc_session_set_options(struct 
> ovsdb_jsonrpc_session *session,
>  }
>  
>  static void
> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
> +fast_forward_list(struct ovs_list *list, struct ovs_list *element)
> +{
> +    struct ovs_list temp = OVS_LIST_INITIALIZER(&temp);
> +
> +    if (ovs_list_is_short(list)) {
> +        return;
> +    }
> +    if (list->prev == element || element == list) {

Please, use accessors instad, i.e. ovs_list_back().
Same for code below.

> +        return;
> +    }
> +    /* Cut the "not yet processed" part out of the list and move it to
> +     * temp.

This comment talks about processing, but this function seems
to be more generic, so it should be re-worded to be more generic.
It may even be moved to list.c with appropriate re-naming,
a good comment about what it does and some unit tests.

> +     */
> +    ovs_list_splice(&temp, element, list->prev);
> +    /* Push the processed part after the not processed. */
> +    ovs_list_push_back_all(&temp, list);
> +    /* Swap back the rearranged list. */
> +    ovs_list_push_back_all(list, &temp);
> +}
> +
> +static void
> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
> +                              uint64_t limit)
>  {
>      struct ovsdb_jsonrpc_session *s, *next;
> +    uint64_t start_time = time_msec();
>  
>      LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
>          int error = ovsdb_jsonrpc_session_run(s);
>          if (error) {
>              ovsdb_jsonrpc_session_close(s);
>          }
> +
> +        if (time_msec() - start_time > limit) {
> +            /* We rotate the session list so that the next processing
> +             * iteration restarts from the next element.
> +             */
> +            fast_forward_list(&remote->sessions, &next->node);

Should we call this function 'rotate' insted of 'fast_forward' ?

> +            break;
> +        }
>      }
>  }
>  
> diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
> index e0653aa39..218152e9d 100644
> --- a/ovsdb/jsonrpc-server.h
> +++ b/ovsdb/jsonrpc-server.h
> @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status(
>  void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool 
> force,
>                                      char *comment);
>  
> -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
> +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit);
>  void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
>  
>  void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 0b3d2bb71..ed43da968 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -216,7 +216,21 @@ main_loop(struct server_config *config,
>              reconfigure_remotes(jsonrpc, all_dbs, remotes),
>              &remotes_error);
>          report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
> -        ovsdb_jsonrpc_server_run(jsonrpc);
> +
> +        /* Figure out current processing time limit. */
> +        uint64_t limit = UINT64_MAX;
> +        SHASH_FOR_EACH (node, all_dbs) {
> +            struct db *db = node->data;
> +            uint64_t db_limit;
> +
> +            db_limit = ovsdb_storage_max_processing_time(db->db->storage);
> +            limit = MIN(db_limit, limit);
> +        }
> +        if (ovs_replay_is_active()) {
> +            limit = UINT64_MAX;
> +        }
> +

See review for v7.

> +        ovsdb_jsonrpc_server_run(jsonrpc, limit);
>  
>          if (*is_backup) {
>              replication_run();
> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
> index 2fb515651..183463aba 100644
> --- a/ovsdb/raft.c
> +++ b/ovsdb/raft.c
> @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_)
>      }
>  }
>  
> +uint64_t
> +raft_get_election_timer(const struct raft *raft)
> +{
> +    return raft->election_timer;
> +}
> +
>  static struct raft *
>  raft_alloc(void)
>  {
> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
> index 3545c41c2..575e7f609 100644
> --- a/ovsdb/raft.h
> +++ b/ovsdb/raft.h
> @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *);
>  void raft_transfer_leadership(struct raft *, const char *reason);
>  
>  const struct uuid *raft_current_eid(const struct raft *);
> +
> +uint64_t raft_get_election_timer(const struct raft *);
> +
>  #endif /* lib/raft.h */
> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
> index d727b1eac..58018fe6d 100644
> --- a/ovsdb/storage.c
> +++ b/ovsdb/storage.c
> @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage 
> *storage)
>      }
>      return raft_current_eid(storage->raft);
>  }
> +
> +uint64_t
> +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage)

See review for v7.

> +{
> +    if (!storage->raft) {
> +        return UINT64_MAX;
> +    }
> +    if (raft_get_election_timer(storage->raft) > 2) {
> +        return raft_get_election_timer(storage->raft) / 2;
> +    }
> +    return 1;
> +}
> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
> index e120094d7..a8a02e0bd 100644
> --- a/ovsdb/storage.h
> +++ b/ovsdb/storage.h
> @@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct 
> ovsdb_storage *);
>  
>  const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *);
>  
> +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *);
> +
>  #endif /* ovsdb/storage.h */
> 

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to