On 7/1/22 01:34, Ilya Maximets wrote: > Conversion of the database data into JSON object, serialization > and destruction of that object are the most heavy operations > during the database compaction. If these operations are moved > to a separate thread, the main thread can continue processing > database requests in the meantime. > > With this change, the compaction is split in 3 phases: > > 1. Initialization: > - Create a copy of the database. > - Remember current database index. > - Start a separate thread to convert a copy of the database > into serialized JSON object. > > 2. Wait: > - Continue normal operation until compaction thread is done. > - Meanwhile, compaction thread: > * Convert database copy to JSON. > * Serialize resulted JSON. > * Destroy original JSON object. > > 3. Finish: > - Destroy the database copy. > - Take the snapshot created by the thread. > - Write on disk. > > The key for this schema to be fast is the ability to create > a shallow copy of the database. This doesn't take too much > time allowing the thread to do most of work. > > Database copy is created and destroyed only by the main thread, > so there is no need for synchronization. > > Such solution allows to reduce the time main thread is blocked > by compaction by 80-90%. For example, in ovn-heater tests > with 120 node density-heavy scenario, where compaction normally > takes 5-6 seconds at the end of a test, measured compaction > times was all below 1 second with the change applied. Also, > note that these measured times are the sum of phases 1 and 3, > so actual poll intervals are about half a second in this case. > > Only implemented for raft storage for now. The implementation > for standalone databases can be added later by using a file > offset as a database index and copying newly added changes > from the old file to a new one during ovsdb_log_replace(). >
Let's add a TODO item, what do you think? Aside from this I have a few minor comments/questions below. Nothing that can't be fixed at apply time if needed. The rest looks good to me, thanks! Acked-by: Dumitru Ceara <[email protected]> > Reported-at: https://bugzilla.redhat.com/2069108 > Signed-off-by: Ilya Maximets <[email protected]> > --- > ovsdb/ovsdb-server.c | 18 +++++- > ovsdb/ovsdb.c | 143 +++++++++++++++++++++++++++++++++++++++---- > ovsdb/ovsdb.h | 24 ++++++++ > ovsdb/raft.c | 8 ++- > ovsdb/raft.h | 3 +- > ovsdb/row.c | 17 +++++ > ovsdb/row.h | 1 + > ovsdb/storage.c | 11 ++-- > ovsdb/storage.h | 3 +- > 9 files changed, 204 insertions(+), 24 deletions(-) > > diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c > index 5549b4e3a..eae2f6679 100644 > --- a/ovsdb/ovsdb-server.c > +++ b/ovsdb/ovsdb-server.c > @@ -252,7 +252,9 @@ main_loop(struct server_config *config, > remove_db(config, node, > xasprintf("removing database %s because storage " > "disconnected permanently", node->name)); > - } else if (ovsdb_storage_should_snapshot(db->db->storage)) { > + } else if (!ovsdb_snapshot_in_progress(db->db) > + && (ovsdb_storage_should_snapshot(db->db->storage) || > + ovsdb_snapshot_ready(db->db))) { > log_and_free_error(ovsdb_snapshot(db->db, trim_memory)); > } > } > @@ -287,6 +289,7 @@ main_loop(struct server_config *config, > ovsdb_trigger_wait(db->db, time_msec()); > ovsdb_storage_wait(db->db->storage); > ovsdb_storage_read_wait(db->db->storage); > + ovsdb_snapshot_wait(db->db); > } > if (run_process) { > process_wait(run_process); > @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int > argc, > ? !strcmp(node->name, db_name) > : node->name[0] != '_') { > if (db->db) { > + struct ovsdb_error *error = NULL; > + > VLOG_INFO("compacting %s database by user request", > node->name); > > - struct ovsdb_error *error = ovsdb_snapshot(db->db, > - trim_memory); > + error = ovsdb_snapshot(db->db, trim_memory); > + if (!error && ovsdb_snapshot_in_progress(db->db)) { > + while (ovsdb_snapshot_in_progress(db->db)) { > + ovsdb_snapshot_wait(db->db); > + poll_block(); > + } This is not worse than before in the sense that, if snapshots take long, the appctl to "ovsdb-server/compact" will block everything in the main thread potentially causing raft and others timeouts to fire. Can we improve this now that snapshots happen in a background thread? Maybe worth a TODO for a follow up patch, what do you think? > + error = ovsdb_snapshot(db->db, trim_memory); > + } > + > if (error) { > char *s = ovsdb_error_to_string(error); > ds_put_format(&reply, "%s\n", s); > diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c > index 91b4a01af..8cbefbe3d 100644 > --- a/ovsdb/ovsdb.c > +++ b/ovsdb/ovsdb.c > @@ -25,9 +25,13 @@ > #include "file.h" > #include "monitor.h" > #include "openvswitch/json.h" > +#include "openvswitch/poll-loop.h" > +#include "ovs-thread.h" > #include "ovsdb-error.h" > #include "ovsdb-parser.h" > #include "ovsdb-types.h" > +#include "row.h" > +#include "seq.h" > #include "simap.h" > #include "storage.h" > #include "table.h" > @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db) > if (db) { > struct shash_node *node; > > + /* Need to wait for compaction thread to finish the work. */ > + while (ovsdb_snapshot_in_progress(db)) { > + ovsdb_snapshot_wait(db); > + poll_block(); > + } > + if (ovsdb_snapshot_ready(db)) { > + struct ovsdb_error *error = ovsdb_snapshot(db, false); > + > + if (error) { > + char *s = ovsdb_error_to_string_free(error); > + VLOG_INFO("%s: %s", db->name, s); > + free(s); > + } > + } > + > /* Close the log. */ > ovsdb_storage_close(db->storage); > > @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char > *name) > return shash_find_data(&db->tables, name); > } > > +static struct ovsdb * > +ovsdb_clone_data(const struct ovsdb *db) > +{ > + struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL); > + > + struct shash_node *node; > + SHASH_FOR_EACH (node, &db->tables) { > + struct ovsdb_table *table = node->data; > + struct ovsdb_table *new_table = shash_find_data(&new->tables, > + node->name); > + struct ovsdb_row *row, *new_row; > + > + hmap_reserve(&new_table->rows, hmap_count(&table->rows)); > + HMAP_FOR_EACH (row, hmap_node, &table->rows) { > + new_row = ovsdb_row_datum_clone(row); > + hmap_insert(&new_table->rows, &new_row->hmap_node, > + ovsdb_row_hash(new_row)); > + } > + } > + > + return new; > +} > + > +static void * > +compaction_thread(void *aux) > +{ > + struct ovsdb_compaction_state *state = aux; > + uint64_t start_time = time_msec(); > + struct json *data; > + > + VLOG_DBG("%s: Compaction thread started.", state->db->name); > + data = ovsdb_to_txn_json(state->db, "compacting database online"); > + state->data = json_serialized_object_create(data); > + json_destroy(data); > + > + state->thread_time = time_msec() - start_time; > + > + VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.", > + state->db->name, state->thread_time); > + seq_change(state->done); > + return NULL; > +} > + > +void > +ovsdb_snapshot_wait(struct ovsdb *db) > +{ > + if (db->snap_state) { > + seq_wait(db->snap_state->done, db->snap_state->seqno); > + } > +} > + > +bool > +ovsdb_snapshot_in_progress(struct ovsdb *db) > +{ > + return db->snap_state && > + seq_read(db->snap_state->done) == db->snap_state->seqno; > +} > + > +bool > +ovsdb_snapshot_ready(struct ovsdb *db) > +{ > + return db->snap_state && > + seq_read(db->snap_state->done) != db->snap_state->seqno; > +} > + > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) > { > - if (!db->storage) { > + if (!db->storage || ovsdb_snapshot_in_progress(db)) { > return NULL; > } > > + uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage); > uint64_t elapsed, start_time = time_msec(); > - struct json *schema = ovsdb_schema_to_json(db->schema); > - struct json *data = ovsdb_to_txn_json(db, "compacting database online"); > - struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage, > - schema, data); > - json_destroy(schema); > - json_destroy(data); > + struct ovsdb_compaction_state *state; > + > + if (!applied_index) { > + /* Parallel compaction is not supported for standalone databases. */ Nit: It might be a bit too much but I think I'd add an API like ovsdb_storage_supports_parallel_compaction() instead of relying on checking the applied_index value. It's a detail of the parallel compaction implementation that it needs the applied_index to work. I you go this way something like the following (maybe with a better name) should do: bool ovsdb_storage_supports_parallel_compaction(const struct ovsdb_storage *storage) { return storage->raft != NULL; } > + state = xzalloc(sizeof *state); > + state->data = ovsdb_to_txn_json(db, "compacting database online"); > + state->schema = ovsdb_schema_to_json(db->schema); > + } else if (ovsdb_snapshot_ready(db)) { > + xpthread_join(db->snap_state->thread, NULL); > + > + state = db->snap_state; > + db->snap_state = NULL; > + > + ovsdb_destroy(state->db); > + seq_destroy(state->done); > + } else { > + /* Creating a thread. */ > + ovs_assert(!db->snap_state); > + state = xzalloc(sizeof *state); > + > + state->db = ovsdb_clone_data(db); > + state->schema = ovsdb_schema_to_json(db->schema); > + state->applied_index = applied_index; > + state->done = seq_create(); > + state->seqno = seq_read(state->done); > + state->thread = ovs_thread_create("compaction", > + compaction_thread, state); > + state->init_time = time_msec() - start_time; > + > + db->snap_state = state; > + return NULL; > + } > + > + struct ovsdb_error *error; > + > + error = ovsdb_storage_store_snapshot(db->storage, state->schema, > + state->data, state->applied_index); > + json_destroy(state->schema); > + json_destroy(state->data); > > #if HAVE_DECL_MALLOC_TRIM > if (!error && trim_memory) { > @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory > OVS_UNUSED) > #endif > > elapsed = time_msec() - start_time; > - if (elapsed > 1000) { > - VLOG_INFO("%s: Database compaction took %"PRIu64"ms", > - db->name, elapsed); > - } > + VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG, > + "%s: Database compaction took %"PRIu64"ms " > + "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)", > + db->name, elapsed + state->init_time, > + state->init_time, elapsed, state->thread_time); > + > + free(state); > return error; > } > > diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h > index ec2d235ec..2f77821e0 100644 > --- a/ovsdb/ovsdb.h > +++ b/ovsdb/ovsdb.h > @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node { > struct ovsdb_txn *txn; > }; > > +struct ovsdb_compaction_state { > + pthread_t thread; /* Thread handle. */ > + > + struct ovsdb *db; /* Copy of a database data to compact. */ > + > + struct json *data; /* 'db' as a serialized json. */ > + struct json *schema; /* 'db' schema json. */ > + uint64_t applied_index; /* Last applied index reported by the storage > + * at the moment of a database copy. */ > + > + /* Completion signaling. */ > + struct seq *done; > + uint64_t seqno; > + > + uint64_t init_time; /* Time spent by the main thread preparing. */ > + uint64_t thread_time; /* Time spent for compaction by the thread. */ > +}; > + > struct ovsdb { > char *name; > struct ovsdb_schema *schema; > @@ -101,6 +119,9 @@ struct ovsdb { > struct ovs_list txn_forward_new; > /* Hash map for transactions that are already sent and waits for reply. > */ > struct hmap txn_forward_sent; > + > + /* Database compaction. */ > + struct ovsdb_compaction_state *snap_state; > }; > > struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *); > @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct > ovsdb_session *, > > struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory) > OVS_WARN_UNUSED_RESULT; > +void ovsdb_snapshot_wait(struct ovsdb *); > +bool ovsdb_snapshot_in_progress(struct ovsdb *); > +bool ovsdb_snapshot_ready(struct ovsdb *); > > void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src); > > diff --git a/ovsdb/raft.c b/ovsdb/raft.c > index 856d083f2..b2c21e70f 100644 > --- a/ovsdb/raft.c > +++ b/ovsdb/raft.c > @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft) > * only valuable to call it if raft_get_log_length() is significant and > * especially if raft_grew_lots() returns true. */ > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) > +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data, > + uint64_t applied_index) > { > if (raft->joining) { > return ovsdb_error(NULL, > @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct > json *new_snapshot_data) > "cannot store a snapshot following failure"); > } > > - if (raft->last_applied < raft->log_start) { > + uint64_t new_log_start = applied_index ? applied_index + 1 > + : raft->last_applied + 1; I tried to figure it out but I'm not sure. Is the following scenario possible? a. snapshot operation starts, we stored applied_index in db->snap_state b. a (broken?) server sends an append request: raft_handle_append_request(): raft_handle_append_entries(): raft_truncate(raft, log_index) Can 'log_index < raft->commit_index' ever be true? Furthermore, can 'log_index < raft->last_applied' ever be true? c. snapshot operation finishes: ovsdb_storage_store_snapshot(): raft_store_snapshot(.., db->snap_state->applied_index): Would we be potentially skipping entries that are not in the snapshot? Anyhow, it's not related to this patch, but if that's the case maybe we should add a check in raft_handle_append_entries() to only raft_truncate() if log_index > raft->commit_index. > + if (new_log_start <= raft->log_start) { > return ovsdb_error(NULL, "not storing a duplicate snapshot"); > } > > - uint64_t new_log_start = raft->last_applied + 1; > struct raft_entry new_snapshot = { > .term = raft_get_term(raft, new_log_start - 1), > .eid = *raft_get_eid(raft, new_log_start - 1), > diff --git a/ovsdb/raft.h b/ovsdb/raft.h > index 599bc0ae8..403ed3dd7 100644 > --- a/ovsdb/raft.h > +++ b/ovsdb/raft.h > @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *); > bool raft_may_snapshot(const struct raft *); > void raft_notify_snapshot_recommended(struct raft *); > struct ovsdb_error *raft_store_snapshot(struct raft *, > - const struct json *new_snapshot) > + const struct json *new_snapshot, > + uint64_t applied_index) > OVS_WARN_UNUSED_RESULT; > > /* Cluster management. */ > diff --git a/ovsdb/row.c b/ovsdb/row.c > index fd50c7e7b..3f0bb8acf 100644 > --- a/ovsdb/row.c > +++ b/ovsdb/row.c > @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old) > return new; > } > > +struct ovsdb_row * > +ovsdb_row_datum_clone(const struct ovsdb_row *old) > +{ > + const struct ovsdb_table *table = old->table; > + const struct shash_node *node; > + struct ovsdb_row *new; > + > + new = allocate_row(table); > + SHASH_FOR_EACH (node, &table->schema->columns) { > + const struct ovsdb_column *column = node->data; > + ovsdb_datum_clone(&new->fields[column->index], > + &old->fields[column->index]); > + } > + return new; > +} > + > + > /* The caller is responsible for ensuring that 'row' has been removed from > its > * table and that it is not participating in a transaction. */ > void > diff --git a/ovsdb/row.h b/ovsdb/row.h > index 4d3c17afc..ff91288fe 100644 > --- a/ovsdb/row.h > +++ b/ovsdb/row.h > @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *); > > struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *); > struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *); > +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *); > void ovsdb_row_destroy(struct ovsdb_row *); > > uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *, > diff --git a/ovsdb/storage.c b/ovsdb/storage.c > index d4984be25..e8f95ce64 100644 > --- a/ovsdb/storage.c > +++ b/ovsdb/storage.c > @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage > *storage) > static struct ovsdb_error * OVS_WARN_UNUSED_RESULT > ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage, > const struct json *schema, > - const struct json *data) > + const struct json *data, uint64_t index) > { > if (storage->raft) { > struct json *entries = json_array_create_empty(); > @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage > *storage, > json_array_add(entries, json_clone(data)); > } > struct ovsdb_error *error = raft_store_snapshot(storage->raft, > - entries); > + entries, index); > json_destroy(entries); > return error; > } else if (storage->log) { > @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage > *storage, > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > ovsdb_storage_store_snapshot(struct ovsdb_storage *storage, > const struct json *schema, > - const struct json *data) > + const struct json *data, uint64_t index) > { > struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage, > - schema, data); > + schema, data, > + index); > bool retry_quickly = error != NULL; > schedule_next_snapshot(storage, retry_quickly); > return error; > @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage > *storage, > prereq, &result); > json_destroy(txn_json); > } else if (storage->log) { > - w->error = ovsdb_storage_store_snapshot__(storage, schema, data); > + w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0); > } else { > /* When 'error' and 'command' are both null, it indicates that the > * command is complete. This is fine since this unbacked storage > drops > diff --git a/ovsdb/storage.h b/ovsdb/storage.h > index ff026b77f..a1fdaa564 100644 > --- a/ovsdb/storage.h > +++ b/ovsdb/storage.h > @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *); > bool ovsdb_storage_should_snapshot(struct ovsdb_storage *); > struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage > *storage, > const struct json *schema, > - const struct json *snapshot) > + const struct json *snapshot, > + uint64_t applied_index) > OVS_WARN_UNUSED_RESULT; > > struct ovsdb_write *ovsdb_storage_write_schema_change( _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
