On 7/1/22 01:34, Ilya Maximets wrote:
> Conversion of the database data into JSON object, serialization
> and destruction of that object are the most heavy operations
> during the database compaction.  If these operations are moved
> to a separate thread, the main thread can continue processing
> database requests in the meantime.
> 
> With this change, the compaction is split in 3 phases:
> 
> 1. Initialization:
>    - Create a copy of the database.
>    - Remember current database index.
>    - Start a separate thread to convert a copy of the database
>      into serialized JSON object.
> 
> 2. Wait:
>    - Continue normal operation until compaction thread is done.
>    - Meanwhile, compaction thread:
>      * Convert database copy to JSON.
>      * Serialize resulted JSON.
>      * Destroy original JSON object.
> 
> 3. Finish:
>    - Destroy the database copy.
>    - Take the snapshot created by the thread.
>    - Write on disk.
> 
> The key for this schema to be fast is the ability to create
> a shallow copy of the database.  This doesn't take too much
> time allowing the thread to do most of work.
> 
> Database copy is created and destroyed only by the main thread,
> so there is no need for synchronization.
> 
> Such solution allows to reduce the time main thread is blocked
> by compaction by 80-90%.  For example, in ovn-heater tests
> with 120 node density-heavy scenario, where compaction normally
> takes 5-6 seconds at the end of a test, measured compaction
> times was all below 1 second with the change applied.  Also,
> note that these measured times are the sum of phases 1 and 3,
> so actual poll intervals are about half a second in this case.
> 
> Only implemented for raft storage for now.  The implementation
> for standalone databases can be added later by using a file
> offset as a database index and copying newly added changes
> from the old file to a new one during ovsdb_log_replace().
> 

Let's add a TODO item, what do you think?

Aside from this I have a few minor comments/questions below.
Nothing that can't be fixed at apply time if needed.  The rest
looks good to me, thanks!

Acked-by: Dumitru Ceara <[email protected]>

> Reported-at: https://bugzilla.redhat.com/2069108
> Signed-off-by: Ilya Maximets <[email protected]>
> ---
>  ovsdb/ovsdb-server.c |  18 +++++-
>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>  ovsdb/ovsdb.h        |  24 ++++++++
>  ovsdb/raft.c         |   8 ++-
>  ovsdb/raft.h         |   3 +-
>  ovsdb/row.c          |  17 +++++
>  ovsdb/row.h          |   1 +
>  ovsdb/storage.c      |  11 ++--
>  ovsdb/storage.h      |   3 +-
>  9 files changed, 204 insertions(+), 24 deletions(-)
> 
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 5549b4e3a..eae2f6679 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>                  remove_db(config, node,
>                            xasprintf("removing database %s because storage "
>                                      "disconnected permanently", node->name));
> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
> +            } else if (!ovsdb_snapshot_in_progress(db->db)
> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
> +                           ovsdb_snapshot_ready(db->db))) {
>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>              }
>          }
> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>              ovsdb_trigger_wait(db->db, time_msec());
>              ovsdb_storage_wait(db->db->storage);
>              ovsdb_storage_read_wait(db->db->storage);
> +            ovsdb_snapshot_wait(db->db);
>          }
>          if (run_process) {
>              process_wait(run_process);
> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int 
> argc,
>              ? !strcmp(node->name, db_name)
>              : node->name[0] != '_') {
>              if (db->db) {
> +                struct ovsdb_error *error = NULL;
> +
>                  VLOG_INFO("compacting %s database by user request",
>                            node->name);
>  
> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
> -                                                           trim_memory);
> +                error = ovsdb_snapshot(db->db, trim_memory);
> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
> +                    while (ovsdb_snapshot_in_progress(db->db)) {
> +                        ovsdb_snapshot_wait(db->db);
> +                        poll_block();
> +                    }

This is not worse than before in the sense that, if snapshots take long,
the appctl to "ovsdb-server/compact" will block everything in the main
thread potentially causing raft and others timeouts to fire.

Can we improve this now that snapshots happen in a background thread?
Maybe worth a TODO for a follow up patch, what do you think?

> +                    error = ovsdb_snapshot(db->db, trim_memory);
> +                }
> +
>                  if (error) {
>                      char *s = ovsdb_error_to_string(error);
>                      ds_put_format(&reply, "%s\n", s);
> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
> index 91b4a01af..8cbefbe3d 100644
> --- a/ovsdb/ovsdb.c
> +++ b/ovsdb/ovsdb.c
> @@ -25,9 +25,13 @@
>  #include "file.h"
>  #include "monitor.h"
>  #include "openvswitch/json.h"
> +#include "openvswitch/poll-loop.h"
> +#include "ovs-thread.h"
>  #include "ovsdb-error.h"
>  #include "ovsdb-parser.h"
>  #include "ovsdb-types.h"
> +#include "row.h"
> +#include "seq.h"
>  #include "simap.h"
>  #include "storage.h"
>  #include "table.h"
> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>      if (db) {
>          struct shash_node *node;
>  
> +        /* Need to wait for compaction thread to finish the work. */
> +        while (ovsdb_snapshot_in_progress(db)) {
> +            ovsdb_snapshot_wait(db);
> +            poll_block();
> +        }
> +        if (ovsdb_snapshot_ready(db)) {
> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
> +
> +            if (error) {
> +                char *s = ovsdb_error_to_string_free(error);
> +                VLOG_INFO("%s: %s", db->name, s);
> +                free(s);
> +            }
> +        }
> +
>          /* Close the log. */
>          ovsdb_storage_close(db->storage);
>  
> @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char 
> *name)
>      return shash_find_data(&db->tables, name);
>  }
>  
> +static struct ovsdb *
> +ovsdb_clone_data(const struct ovsdb *db)
> +{
> +    struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL);
> +
> +    struct shash_node *node;
> +    SHASH_FOR_EACH (node, &db->tables) {
> +        struct ovsdb_table *table = node->data;
> +        struct ovsdb_table *new_table = shash_find_data(&new->tables,
> +                                                        node->name);
> +        struct ovsdb_row *row, *new_row;
> +
> +        hmap_reserve(&new_table->rows, hmap_count(&table->rows));
> +        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
> +            new_row = ovsdb_row_datum_clone(row);
> +            hmap_insert(&new_table->rows, &new_row->hmap_node,
> +                        ovsdb_row_hash(new_row));
> +        }
> +    }
> +
> +    return new;
> +}
> +
> +static void *
> +compaction_thread(void *aux)
> +{
> +    struct ovsdb_compaction_state *state = aux;
> +    uint64_t start_time = time_msec();
> +    struct json *data;
> +
> +    VLOG_DBG("%s: Compaction thread started.", state->db->name);
> +    data = ovsdb_to_txn_json(state->db, "compacting database online");
> +    state->data = json_serialized_object_create(data);
> +    json_destroy(data);
> +
> +    state->thread_time = time_msec() - start_time;
> +
> +    VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
> +             state->db->name, state->thread_time);
> +    seq_change(state->done);
> +    return NULL;
> +}
> +
> +void
> +ovsdb_snapshot_wait(struct ovsdb *db)
> +{
> +    if (db->snap_state) {
> +        seq_wait(db->snap_state->done, db->snap_state->seqno);
> +    }
> +}
> +
> +bool
> +ovsdb_snapshot_in_progress(struct ovsdb *db)
> +{
> +    return db->snap_state &&
> +           seq_read(db->snap_state->done) == db->snap_state->seqno;
> +}
> +
> +bool
> +ovsdb_snapshot_ready(struct ovsdb *db)
> +{
> +    return db->snap_state &&
> +           seq_read(db->snap_state->done) != db->snap_state->seqno;
> +}
> +
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
>  {
> -    if (!db->storage) {
> +    if (!db->storage || ovsdb_snapshot_in_progress(db)) {
>          return NULL;
>      }
>  
> +    uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
>      uint64_t elapsed, start_time = time_msec();
> -    struct json *schema = ovsdb_schema_to_json(db->schema);
> -    struct json *data = ovsdb_to_txn_json(db, "compacting database online");
> -    struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
> -                                                             schema, data);
> -    json_destroy(schema);
> -    json_destroy(data);
> +    struct ovsdb_compaction_state *state;
> +
> +    if (!applied_index) {
> +        /* Parallel compaction is not supported for standalone databases. */


Nit: It might be a bit too much but I think I'd add an API like
ovsdb_storage_supports_parallel_compaction() instead of relying on
checking the applied_index value.  It's a detail of the parallel
compaction implementation that it needs the applied_index to work.

I you go this way something like the following (maybe with a better
name) should do:

bool
ovsdb_storage_supports_parallel_compaction(const struct ovsdb_storage *storage)
{
    return storage->raft != NULL;
}

> +        state = xzalloc(sizeof *state);
> +        state->data = ovsdb_to_txn_json(db, "compacting database online");
> +        state->schema = ovsdb_schema_to_json(db->schema);
> +    } else if (ovsdb_snapshot_ready(db)) {
> +        xpthread_join(db->snap_state->thread, NULL);
> +
> +        state = db->snap_state;
> +        db->snap_state = NULL;
> +
> +        ovsdb_destroy(state->db);
> +        seq_destroy(state->done);
> +    } else {
> +        /* Creating a thread. */
> +        ovs_assert(!db->snap_state);
> +        state = xzalloc(sizeof *state);
> +
> +        state->db = ovsdb_clone_data(db);
> +        state->schema = ovsdb_schema_to_json(db->schema);
> +        state->applied_index = applied_index;
> +        state->done = seq_create();
> +        state->seqno = seq_read(state->done);
> +        state->thread = ovs_thread_create("compaction",
> +                                          compaction_thread, state);
> +        state->init_time = time_msec() - start_time;
> +
> +        db->snap_state = state;
> +        return NULL;
> +    }
> +
> +    struct ovsdb_error *error;
> +
> +    error = ovsdb_storage_store_snapshot(db->storage, state->schema,
> +                                         state->data, state->applied_index);
> +    json_destroy(state->schema);
> +    json_destroy(state->data);
>  
>  #if HAVE_DECL_MALLOC_TRIM
>      if (!error && trim_memory) {
> @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory 
> OVS_UNUSED)
>  #endif
>  
>      elapsed = time_msec() - start_time;
> -    if (elapsed > 1000) {
> -        VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
> -                  db->name, elapsed);
> -    }
> +    VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
> +         "%s: Database compaction took %"PRIu64"ms "
> +         "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
> +         db->name, elapsed + state->init_time,
> +         state->init_time, elapsed, state->thread_time);
> +
> +    free(state);
>      return error;
>  }
>  
> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
> index ec2d235ec..2f77821e0 100644
> --- a/ovsdb/ovsdb.h
> +++ b/ovsdb/ovsdb.h
> @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node {
>      struct ovsdb_txn *txn;
>  };
>  
> +struct ovsdb_compaction_state {
> +    pthread_t thread;          /* Thread handle. */
> +
> +    struct ovsdb *db;          /* Copy of a database data to compact. */
> +
> +    struct json *data;         /* 'db' as a serialized json. */
> +    struct json *schema;       /* 'db' schema json. */
> +    uint64_t applied_index;    /* Last applied index reported by the storage
> +                                * at the moment of a database copy. */
> +
> +    /* Completion signaling. */
> +    struct seq *done;
> +    uint64_t seqno;
> +
> +    uint64_t init_time;        /* Time spent by the main thread preparing. */
> +    uint64_t thread_time;      /* Time spent for compaction by the thread. */
> +};
> +
>  struct ovsdb {
>      char *name;
>      struct ovsdb_schema *schema;
> @@ -101,6 +119,9 @@ struct ovsdb {
>      struct ovs_list txn_forward_new;
>      /* Hash map for transactions that are already sent and waits for reply. 
> */
>      struct hmap txn_forward_sent;
> +
> +    /* Database compaction. */
> +    struct ovsdb_compaction_state *snap_state;
>  };
>  
>  struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
> @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct 
> ovsdb_session *,
>  
>  struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory)
>      OVS_WARN_UNUSED_RESULT;
> +void ovsdb_snapshot_wait(struct ovsdb *);
> +bool ovsdb_snapshot_in_progress(struct ovsdb *);
> +bool ovsdb_snapshot_ready(struct ovsdb *);
>  
>  void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src);
>  
> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
> index 856d083f2..b2c21e70f 100644
> --- a/ovsdb/raft.c
> +++ b/ovsdb/raft.c
> @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft)
>   * only valuable to call it if raft_get_log_length() is significant and
>   * especially if raft_grew_lots() returns true. */
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
> -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
> +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
> +                    uint64_t applied_index)
>  {
>      if (raft->joining) {
>          return ovsdb_error(NULL,
> @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct 
> json *new_snapshot_data)
>                             "cannot store a snapshot following failure");
>      }
>  
> -    if (raft->last_applied < raft->log_start) {
> +    uint64_t new_log_start = applied_index ? applied_index + 1
> +                                           : raft->last_applied + 1;

I tried to figure it out but I'm not sure.  Is the following scenario
possible?

a. snapshot operation starts, we stored applied_index in db->snap_state
b. a (broken?) server sends an append request:
   raft_handle_append_request():
     raft_handle_append_entries():
       raft_truncate(raft, log_index)

Can 'log_index < raft->commit_index' ever be true?

Furthermore, can 'log_index < raft->last_applied' ever be true?

c. snapshot operation finishes:
   ovsdb_storage_store_snapshot():
     raft_store_snapshot(.., db->snap_state->applied_index):

Would we be potentially skipping entries that are not in the snapshot?

Anyhow, it's not related to this patch, but if that's the case maybe
we should add a check in raft_handle_append_entries() to only
raft_truncate() if log_index > raft->commit_index.

> +    if (new_log_start <= raft->log_start) {
>          return ovsdb_error(NULL, "not storing a duplicate snapshot");
>      }
>  
> -    uint64_t new_log_start = raft->last_applied + 1;
>      struct raft_entry new_snapshot = {
>          .term = raft_get_term(raft, new_log_start - 1),
>          .eid = *raft_get_eid(raft, new_log_start - 1),
> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
> index 599bc0ae8..403ed3dd7 100644
> --- a/ovsdb/raft.h
> +++ b/ovsdb/raft.h
> @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *);
>  bool raft_may_snapshot(const struct raft *);
>  void raft_notify_snapshot_recommended(struct raft *);
>  struct ovsdb_error *raft_store_snapshot(struct raft *,
> -                                        const struct json *new_snapshot)
> +                                        const struct json *new_snapshot,
> +                                        uint64_t applied_index)
>      OVS_WARN_UNUSED_RESULT;
>  
>  /* Cluster management. */
> diff --git a/ovsdb/row.c b/ovsdb/row.c
> index fd50c7e7b..3f0bb8acf 100644
> --- a/ovsdb/row.c
> +++ b/ovsdb/row.c
> @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old)
>      return new;
>  }
>  
> +struct ovsdb_row *
> +ovsdb_row_datum_clone(const struct ovsdb_row *old)
> +{
> +    const struct ovsdb_table *table = old->table;
> +    const struct shash_node *node;
> +    struct ovsdb_row *new;
> +
> +    new = allocate_row(table);
> +    SHASH_FOR_EACH (node, &table->schema->columns) {
> +        const struct ovsdb_column *column = node->data;
> +        ovsdb_datum_clone(&new->fields[column->index],
> +                          &old->fields[column->index]);
> +    }
> +    return new;
> +}
> +
> +
>  /* The caller is responsible for ensuring that 'row' has been removed from 
> its
>   * table and that it is not participating in a transaction. */
>  void
> diff --git a/ovsdb/row.h b/ovsdb/row.h
> index 4d3c17afc..ff91288fe 100644
> --- a/ovsdb/row.h
> +++ b/ovsdb/row.h
> @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *);
>  
>  struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
>  struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *);
> +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *);
>  void ovsdb_row_destroy(struct ovsdb_row *);
>  
>  uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *,
> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
> index d4984be25..e8f95ce64 100644
> --- a/ovsdb/storage.c
> +++ b/ovsdb/storage.c
> @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage 
> *storage)
>  static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>                                 const struct json *schema,
> -                               const struct json *data)
> +                               const struct json *data, uint64_t index)
>  {
>      if (storage->raft) {
>          struct json *entries = json_array_create_empty();
> @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage 
> *storage,
>              json_array_add(entries, json_clone(data));
>          }
>          struct ovsdb_error *error = raft_store_snapshot(storage->raft,
> -                                                        entries);
> +                                                        entries, index);
>          json_destroy(entries);
>          return error;
>      } else if (storage->log) {
> @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage 
> *storage,
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
>                               const struct json *schema,
> -                             const struct json *data)
> +                             const struct json *data, uint64_t index)
>  {
>      struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
> -                                                               schema, data);
> +                                                               schema, data,
> +                                                               index);
>      bool retry_quickly = error != NULL;
>      schedule_next_snapshot(storage, retry_quickly);
>      return error;
> @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage 
> *storage,
>                                            prereq, &result);
>          json_destroy(txn_json);
>      } else if (storage->log) {
> -        w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
> +        w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0);
>      } else {
>          /* When 'error' and 'command' are both null, it indicates that the
>           * command is complete.  This is fine since this unbacked storage 
> drops
> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
> index ff026b77f..a1fdaa564 100644
> --- a/ovsdb/storage.h
> +++ b/ovsdb/storage.h
> @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *);
>  bool ovsdb_storage_should_snapshot(struct ovsdb_storage *);
>  struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage 
> *storage,
>                                                   const struct json *schema,
> -                                                 const struct json *snapshot)
> +                                                 const struct json *snapshot,
> +                                                 uint64_t applied_index)
>      OVS_WARN_UNUSED_RESULT;
>  
>  struct ovsdb_write *ovsdb_storage_write_schema_change(

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to