On 7/11/22 17:46, Dumitru Ceara wrote: > On 7/1/22 01:34, Ilya Maximets wrote: >> Conversion of the database data into JSON object, serialization >> and destruction of that object are the most heavy operations >> during the database compaction. If these operations are moved >> to a separate thread, the main thread can continue processing >> database requests in the meantime. >> >> With this change, the compaction is split in 3 phases: >> >> 1. Initialization: >> - Create a copy of the database. >> - Remember current database index. >> - Start a separate thread to convert a copy of the database >> into serialized JSON object. >> >> 2. Wait: >> - Continue normal operation until compaction thread is done. >> - Meanwhile, compaction thread: >> * Convert database copy to JSON. >> * Serialize resulted JSON. >> * Destroy original JSON object. >> >> 3. Finish: >> - Destroy the database copy. >> - Take the snapshot created by the thread. >> - Write on disk. >> >> The key for this schema to be fast is the ability to create >> a shallow copy of the database. This doesn't take too much >> time allowing the thread to do most of work. >> >> Database copy is created and destroyed only by the main thread, >> so there is no need for synchronization. >> >> Such solution allows to reduce the time main thread is blocked >> by compaction by 80-90%. For example, in ovn-heater tests >> with 120 node density-heavy scenario, where compaction normally >> takes 5-6 seconds at the end of a test, measured compaction >> times was all below 1 second with the change applied. Also, >> note that these measured times are the sum of phases 1 and 3, >> so actual poll intervals are about half a second in this case. >> >> Only implemented for raft storage for now. The implementation >> for standalone databases can be added later by using a file >> offset as a database index and copying newly added changes >> from the old file to a new one during ovsdb_log_replace(). >> > > Let's add a TODO item, what do you think?
Sure. Though the TODO file needs some rework first. I'll send a separate patch to update the TODO file with some clean up and all the new items I can remember. > > Aside from this I have a few minor comments/questions below. > Nothing that can't be fixed at apply time if needed. The rest > looks good to me, thanks! > > Acked-by: Dumitru Ceara <[email protected]> Thanks! I didn't change anything for now in this patch. :) See replies below. I made changes we discussed for the first patch. With that, I applied the set to master. best regards, Ilya Maximets. > >> Reported-at: https://bugzilla.redhat.com/2069108 >> Signed-off-by: Ilya Maximets <[email protected]> >> --- >> ovsdb/ovsdb-server.c | 18 +++++- >> ovsdb/ovsdb.c | 143 +++++++++++++++++++++++++++++++++++++++---- >> ovsdb/ovsdb.h | 24 ++++++++ >> ovsdb/raft.c | 8 ++- >> ovsdb/raft.h | 3 +- >> ovsdb/row.c | 17 +++++ >> ovsdb/row.h | 1 + >> ovsdb/storage.c | 11 ++-- >> ovsdb/storage.h | 3 +- >> 9 files changed, 204 insertions(+), 24 deletions(-) >> >> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c >> index 5549b4e3a..eae2f6679 100644 >> --- a/ovsdb/ovsdb-server.c >> +++ b/ovsdb/ovsdb-server.c >> @@ -252,7 +252,9 @@ main_loop(struct server_config *config, >> remove_db(config, node, >> xasprintf("removing database %s because storage " >> "disconnected permanently", >> node->name)); >> - } else if (ovsdb_storage_should_snapshot(db->db->storage)) { >> + } else if (!ovsdb_snapshot_in_progress(db->db) >> + && (ovsdb_storage_should_snapshot(db->db->storage) || >> + ovsdb_snapshot_ready(db->db))) { >> log_and_free_error(ovsdb_snapshot(db->db, trim_memory)); >> } >> } >> @@ -287,6 +289,7 @@ main_loop(struct server_config *config, >> ovsdb_trigger_wait(db->db, time_msec()); >> ovsdb_storage_wait(db->db->storage); >> ovsdb_storage_read_wait(db->db->storage); >> + ovsdb_snapshot_wait(db->db); >> } >> if (run_process) { >> process_wait(run_process); >> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int >> argc, >> ? !strcmp(node->name, db_name) >> : node->name[0] != '_') { >> if (db->db) { >> + struct ovsdb_error *error = NULL; >> + >> VLOG_INFO("compacting %s database by user request", >> node->name); >> >> - struct ovsdb_error *error = ovsdb_snapshot(db->db, >> - trim_memory); >> + error = ovsdb_snapshot(db->db, trim_memory); >> + if (!error && ovsdb_snapshot_in_progress(db->db)) { >> + while (ovsdb_snapshot_in_progress(db->db)) { >> + ovsdb_snapshot_wait(db->db); >> + poll_block(); >> + } > > This is not worse than before in the sense that, if snapshots take long, > the appctl to "ovsdb-server/compact" will block everything in the main > thread potentially causing raft and others timeouts to fire. I thought about this while implementing. And there is a way to do that, but it wasn't very clean, so I didn't want to spend a lot of time. > > Can we improve this now that snapshots happen in a background thread? > Maybe worth a TODO for a follow up patch, what do you think? Sure, will add a TODO item. It's a good thing to implement. > >> + error = ovsdb_snapshot(db->db, trim_memory); >> + } >> + >> if (error) { >> char *s = ovsdb_error_to_string(error); >> ds_put_format(&reply, "%s\n", s); >> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c >> index 91b4a01af..8cbefbe3d 100644 >> --- a/ovsdb/ovsdb.c >> +++ b/ovsdb/ovsdb.c >> @@ -25,9 +25,13 @@ >> #include "file.h" >> #include "monitor.h" >> #include "openvswitch/json.h" >> +#include "openvswitch/poll-loop.h" >> +#include "ovs-thread.h" >> #include "ovsdb-error.h" >> #include "ovsdb-parser.h" >> #include "ovsdb-types.h" >> +#include "row.h" >> +#include "seq.h" >> #include "simap.h" >> #include "storage.h" >> #include "table.h" >> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db) >> if (db) { >> struct shash_node *node; >> >> + /* Need to wait for compaction thread to finish the work. */ >> + while (ovsdb_snapshot_in_progress(db)) { >> + ovsdb_snapshot_wait(db); >> + poll_block(); >> + } >> + if (ovsdb_snapshot_ready(db)) { >> + struct ovsdb_error *error = ovsdb_snapshot(db, false); >> + >> + if (error) { >> + char *s = ovsdb_error_to_string_free(error); >> + VLOG_INFO("%s: %s", db->name, s); >> + free(s); >> + } >> + } >> + >> /* Close the log. */ >> ovsdb_storage_close(db->storage); >> >> @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char >> *name) >> return shash_find_data(&db->tables, name); >> } >> >> +static struct ovsdb * >> +ovsdb_clone_data(const struct ovsdb *db) >> +{ >> + struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL); >> + >> + struct shash_node *node; >> + SHASH_FOR_EACH (node, &db->tables) { >> + struct ovsdb_table *table = node->data; >> + struct ovsdb_table *new_table = shash_find_data(&new->tables, >> + node->name); >> + struct ovsdb_row *row, *new_row; >> + >> + hmap_reserve(&new_table->rows, hmap_count(&table->rows)); >> + HMAP_FOR_EACH (row, hmap_node, &table->rows) { >> + new_row = ovsdb_row_datum_clone(row); >> + hmap_insert(&new_table->rows, &new_row->hmap_node, >> + ovsdb_row_hash(new_row)); >> + } >> + } >> + >> + return new; >> +} >> + >> +static void * >> +compaction_thread(void *aux) >> +{ >> + struct ovsdb_compaction_state *state = aux; >> + uint64_t start_time = time_msec(); >> + struct json *data; >> + >> + VLOG_DBG("%s: Compaction thread started.", state->db->name); >> + data = ovsdb_to_txn_json(state->db, "compacting database online"); >> + state->data = json_serialized_object_create(data); >> + json_destroy(data); >> + >> + state->thread_time = time_msec() - start_time; >> + >> + VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.", >> + state->db->name, state->thread_time); >> + seq_change(state->done); >> + return NULL; >> +} >> + >> +void >> +ovsdb_snapshot_wait(struct ovsdb *db) >> +{ >> + if (db->snap_state) { >> + seq_wait(db->snap_state->done, db->snap_state->seqno); >> + } >> +} >> + >> +bool >> +ovsdb_snapshot_in_progress(struct ovsdb *db) >> +{ >> + return db->snap_state && >> + seq_read(db->snap_state->done) == db->snap_state->seqno; >> +} >> + >> +bool >> +ovsdb_snapshot_ready(struct ovsdb *db) >> +{ >> + return db->snap_state && >> + seq_read(db->snap_state->done) != db->snap_state->seqno; >> +} >> + >> struct ovsdb_error * OVS_WARN_UNUSED_RESULT >> ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) >> { >> - if (!db->storage) { >> + if (!db->storage || ovsdb_snapshot_in_progress(db)) { >> return NULL; >> } >> >> + uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage); >> uint64_t elapsed, start_time = time_msec(); >> - struct json *schema = ovsdb_schema_to_json(db->schema); >> - struct json *data = ovsdb_to_txn_json(db, "compacting database online"); >> - struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage, >> - schema, data); >> - json_destroy(schema); >> - json_destroy(data); >> + struct ovsdb_compaction_state *state; >> + >> + if (!applied_index) { >> + /* Parallel compaction is not supported for standalone databases. */ > > > Nit: It might be a bit too much but I think I'd add an API like > ovsdb_storage_supports_parallel_compaction() instead of relying on > checking the applied_index value. It's a detail of the parallel > compaction implementation that it needs the applied_index to work. > > I you go this way something like the following (maybe with a better > name) should do: > > bool > ovsdb_storage_supports_parallel_compaction(const struct ovsdb_storage > *storage) > { > return storage->raft != NULL; > } Yeah. You right that it's not a very clean solution. Though transaction history already depends on this call with a similar semantics. So, I think, it's ok to keep as-is for now. We, probbaly, need a better storage abstraction than we have now to make it clean. > >> + state = xzalloc(sizeof *state); >> + state->data = ovsdb_to_txn_json(db, "compacting database online"); >> + state->schema = ovsdb_schema_to_json(db->schema); >> + } else if (ovsdb_snapshot_ready(db)) { >> + xpthread_join(db->snap_state->thread, NULL); >> + >> + state = db->snap_state; >> + db->snap_state = NULL; >> + >> + ovsdb_destroy(state->db); >> + seq_destroy(state->done); >> + } else { >> + /* Creating a thread. */ >> + ovs_assert(!db->snap_state); >> + state = xzalloc(sizeof *state); >> + >> + state->db = ovsdb_clone_data(db); >> + state->schema = ovsdb_schema_to_json(db->schema); >> + state->applied_index = applied_index; >> + state->done = seq_create(); >> + state->seqno = seq_read(state->done); >> + state->thread = ovs_thread_create("compaction", >> + compaction_thread, state); >> + state->init_time = time_msec() - start_time; >> + >> + db->snap_state = state; >> + return NULL; >> + } >> + >> + struct ovsdb_error *error; >> + >> + error = ovsdb_storage_store_snapshot(db->storage, state->schema, >> + state->data, state->applied_index); >> + json_destroy(state->schema); >> + json_destroy(state->data); >> >> #if HAVE_DECL_MALLOC_TRIM >> if (!error && trim_memory) { >> @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory >> OVS_UNUSED) >> #endif >> >> elapsed = time_msec() - start_time; >> - if (elapsed > 1000) { >> - VLOG_INFO("%s: Database compaction took %"PRIu64"ms", >> - db->name, elapsed); >> - } >> + VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG, >> + "%s: Database compaction took %"PRIu64"ms " >> + "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)", >> + db->name, elapsed + state->init_time, >> + state->init_time, elapsed, state->thread_time); >> + >> + free(state); >> return error; >> } >> >> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h >> index ec2d235ec..2f77821e0 100644 >> --- a/ovsdb/ovsdb.h >> +++ b/ovsdb/ovsdb.h >> @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node { >> struct ovsdb_txn *txn; >> }; >> >> +struct ovsdb_compaction_state { >> + pthread_t thread; /* Thread handle. */ >> + >> + struct ovsdb *db; /* Copy of a database data to compact. */ >> + >> + struct json *data; /* 'db' as a serialized json. */ >> + struct json *schema; /* 'db' schema json. */ >> + uint64_t applied_index; /* Last applied index reported by the storage >> + * at the moment of a database copy. */ >> + >> + /* Completion signaling. */ >> + struct seq *done; >> + uint64_t seqno; >> + >> + uint64_t init_time; /* Time spent by the main thread preparing. >> */ >> + uint64_t thread_time; /* Time spent for compaction by the thread. >> */ >> +}; >> + >> struct ovsdb { >> char *name; >> struct ovsdb_schema *schema; >> @@ -101,6 +119,9 @@ struct ovsdb { >> struct ovs_list txn_forward_new; >> /* Hash map for transactions that are already sent and waits for reply. >> */ >> struct hmap txn_forward_sent; >> + >> + /* Database compaction. */ >> + struct ovsdb_compaction_state *snap_state; >> }; >> >> struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *); >> @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct >> ovsdb_session *, >> >> struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory) >> OVS_WARN_UNUSED_RESULT; >> +void ovsdb_snapshot_wait(struct ovsdb *); >> +bool ovsdb_snapshot_in_progress(struct ovsdb *); >> +bool ovsdb_snapshot_ready(struct ovsdb *); >> >> void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src); >> >> diff --git a/ovsdb/raft.c b/ovsdb/raft.c >> index 856d083f2..b2c21e70f 100644 >> --- a/ovsdb/raft.c >> +++ b/ovsdb/raft.c >> @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft) >> * only valuable to call it if raft_get_log_length() is significant and >> * especially if raft_grew_lots() returns true. */ >> struct ovsdb_error * OVS_WARN_UNUSED_RESULT >> -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) >> +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data, >> + uint64_t applied_index) >> { >> if (raft->joining) { >> return ovsdb_error(NULL, >> @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct >> json *new_snapshot_data) >> "cannot store a snapshot following failure"); >> } >> >> - if (raft->last_applied < raft->log_start) { >> + uint64_t new_log_start = applied_index ? applied_index + 1 >> + : raft->last_applied + 1; > > I tried to figure it out but I'm not sure. Is the following scenario > possible? > > a. snapshot operation starts, we stored applied_index in db->snap_state > b. a (broken?) server sends an append request: > raft_handle_append_request(): > raft_handle_append_entries(): > raft_truncate(raft, log_index) > > Can 'log_index < raft->commit_index' ever be true? > > Furthermore, can 'log_index < raft->last_applied' ever be true? > > c. snapshot operation finishes: > ovsdb_storage_store_snapshot(): > raft_store_snapshot(.., db->snap_state->applied_index): > > Would we be potentially skipping entries that are not in the snapshot? > > Anyhow, it's not related to this patch, but if that's the case maybe > we should add a check in raft_handle_append_entries() to only > raft_truncate() if log_index > raft->commit_index. I think, database currently heavily depends on a fact that committed entries are never rolled back. It will be a very seriuos problem if that can happen in practice. And, yes, if it can happen, it can happen even without compaction involved, so not a concern for a current patch set. I need to think more if this condition is actually possible. > >> + if (new_log_start <= raft->log_start) { >> return ovsdb_error(NULL, "not storing a duplicate snapshot"); >> } >> >> - uint64_t new_log_start = raft->last_applied + 1; >> struct raft_entry new_snapshot = { >> .term = raft_get_term(raft, new_log_start - 1), >> .eid = *raft_get_eid(raft, new_log_start - 1), >> diff --git a/ovsdb/raft.h b/ovsdb/raft.h >> index 599bc0ae8..403ed3dd7 100644 >> --- a/ovsdb/raft.h >> +++ b/ovsdb/raft.h >> @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *); >> bool raft_may_snapshot(const struct raft *); >> void raft_notify_snapshot_recommended(struct raft *); >> struct ovsdb_error *raft_store_snapshot(struct raft *, >> - const struct json *new_snapshot) >> + const struct json *new_snapshot, >> + uint64_t applied_index) >> OVS_WARN_UNUSED_RESULT; >> >> /* Cluster management. */ >> diff --git a/ovsdb/row.c b/ovsdb/row.c >> index fd50c7e7b..3f0bb8acf 100644 >> --- a/ovsdb/row.c >> +++ b/ovsdb/row.c >> @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old) >> return new; >> } >> >> +struct ovsdb_row * >> +ovsdb_row_datum_clone(const struct ovsdb_row *old) >> +{ >> + const struct ovsdb_table *table = old->table; >> + const struct shash_node *node; >> + struct ovsdb_row *new; >> + >> + new = allocate_row(table); >> + SHASH_FOR_EACH (node, &table->schema->columns) { >> + const struct ovsdb_column *column = node->data; >> + ovsdb_datum_clone(&new->fields[column->index], >> + &old->fields[column->index]); >> + } >> + return new; >> +} >> + >> + >> /* The caller is responsible for ensuring that 'row' has been removed from >> its >> * table and that it is not participating in a transaction. */ >> void >> diff --git a/ovsdb/row.h b/ovsdb/row.h >> index 4d3c17afc..ff91288fe 100644 >> --- a/ovsdb/row.h >> +++ b/ovsdb/row.h >> @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *); >> >> struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *); >> struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *); >> +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *); >> void ovsdb_row_destroy(struct ovsdb_row *); >> >> uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *, >> diff --git a/ovsdb/storage.c b/ovsdb/storage.c >> index d4984be25..e8f95ce64 100644 >> --- a/ovsdb/storage.c >> +++ b/ovsdb/storage.c >> @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage >> *storage) >> static struct ovsdb_error * OVS_WARN_UNUSED_RESULT >> ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage, >> const struct json *schema, >> - const struct json *data) >> + const struct json *data, uint64_t index) >> { >> if (storage->raft) { >> struct json *entries = json_array_create_empty(); >> @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage >> *storage, >> json_array_add(entries, json_clone(data)); >> } >> struct ovsdb_error *error = raft_store_snapshot(storage->raft, >> - entries); >> + entries, index); >> json_destroy(entries); >> return error; >> } else if (storage->log) { >> @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage >> *storage, >> struct ovsdb_error * OVS_WARN_UNUSED_RESULT >> ovsdb_storage_store_snapshot(struct ovsdb_storage *storage, >> const struct json *schema, >> - const struct json *data) >> + const struct json *data, uint64_t index) >> { >> struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage, >> - schema, >> data); >> + schema, data, >> + index); >> bool retry_quickly = error != NULL; >> schedule_next_snapshot(storage, retry_quickly); >> return error; >> @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage >> *storage, >> prereq, &result); >> json_destroy(txn_json); >> } else if (storage->log) { >> - w->error = ovsdb_storage_store_snapshot__(storage, schema, data); >> + w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0); >> } else { >> /* When 'error' and 'command' are both null, it indicates that the >> * command is complete. This is fine since this unbacked storage >> drops >> diff --git a/ovsdb/storage.h b/ovsdb/storage.h >> index ff026b77f..a1fdaa564 100644 >> --- a/ovsdb/storage.h >> +++ b/ovsdb/storage.h >> @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *); >> bool ovsdb_storage_should_snapshot(struct ovsdb_storage *); >> struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage >> *storage, >> const struct json *schema, >> - const struct json >> *snapshot) >> + const struct json >> *snapshot, >> + uint64_t applied_index) >> OVS_WARN_UNUSED_RESULT; >> >> struct ovsdb_write *ovsdb_storage_write_schema_change( > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
