Initialize the cooperative multitasking module for the
ovsdb-server.

The server side schema conversion process used for storage engines
such as RAFT is time consuming, yield during processing.

After the schema conversion is done the processing of reconnecting
clients is time consuming, yield during processing of jsonrpc
server requests.

The processing of OVSDB monitors for reconnecting clients after a
schema conversion is time consuming and a primary offender for
missing deadlines, yield during processing.

TODO: Document testing done with size/timing results.

Signed-off-by: Frode Nordahl <[email protected]>
---
 NEWS                   |  8 ++++++++
 ovsdb/file.c           |  2 ++
 ovsdb/jsonrpc-server.c |  3 +++
 ovsdb/monitor.c        | 25 ++++++++++++++++++++++++-
 ovsdb/ovsdb-server.c   |  5 +++++
 5 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/NEWS b/NEWS
index 270ed6673..643fddc98 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,9 @@ Post-v3.2.0
        from older version is supported but it may trigger more leader elections
        during the process, and error logs complaining unrecognized fields may
        be observed on old nodes.
+     * Make use of cooperative multitasking to ensure stable maintenance of
+       RAFT cluster during long running processing such as online schema
+       conversion.
    - OpenFlow:
      * NXT_CT_FLUSH extension is updated to support flushing connections
        based on mark and labels.  'ct-flush' command of ovs-ofctl updated
@@ -36,6 +39,11 @@ Post-v3.2.0
        The existing behaviour is maintained and a non key:value pair value
        will be applied to all other PMD thread cores.'pmd-sleep-show' is
        updated to show the maximum sleep for each PMD thread core.
+   - lib:
+     * Introduce cooperative multitasking module which allow us to interleave
+       important processing with long running tasks while avoiding the
+       additional resource consumption of threads and complexity of
+       asynchronous state machines.
 
 
 v3.2.0 - 17 Aug 2023
diff --git a/ovsdb/file.c b/ovsdb/file.c
index 8bd1d4af3..fb4cbfb77 100644
--- a/ovsdb/file.c
+++ b/ovsdb/file.c
@@ -23,6 +23,7 @@
 
 #include "bitmap.h"
 #include "column.h"
+#include "cooperative-multitasking.h"
 #include "log.h"
 #include "openvswitch/json.h"
 #include "lockfile.h"
@@ -308,6 +309,7 @@ ovsdb_convert_table(struct ovsdb_txn *txn,
     }
 
     HMAP_FOR_EACH (src_row, hmap_node, &src_table->rows) {
+        cooperative_multitasking_yield();
         struct ovsdb_row *dst_row = ovsdb_row_create(dst_table);
         *ovsdb_row_get_uuid_rw(dst_row) = *ovsdb_row_get_uuid(src_row);
 
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index a3ca48a7b..17215eecd 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -21,6 +21,7 @@
 
 #include "bitmap.h"
 #include "column.h"
+#include "cooperative-multitasking.h"
 #include "openvswitch/dynamic-string.h"
 #include "monitor.h"
 #include "openvswitch/json.h"
@@ -599,6 +600,7 @@ ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote 
*remote)
     struct ovsdb_jsonrpc_session *s;
 
     LIST_FOR_EACH_SAFE (s, node, &remote->sessions) {
+        cooperative_multitasking_yield();
         int error = ovsdb_jsonrpc_session_run(s);
         if (error) {
             ovsdb_jsonrpc_session_close(s);
@@ -674,6 +676,7 @@ ovsdb_jsonrpc_session_reconnect_all(struct 
ovsdb_jsonrpc_remote *remote,
     struct ovsdb_jsonrpc_session *s;
 
     LIST_FOR_EACH_SAFE (s, node, &remote->sessions) {
+        cooperative_multitasking_yield();
         if (force || !s->db_change_aware) {
             jsonrpc_session_force_reconnect(s->js);
             if (comment && jsonrpc_session_is_connected(s->js)) {
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index d1e466faa..4398761bf 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -20,6 +20,7 @@
 
 #include "bitmap.h"
 #include "column.h"
+#include "cooperative-multitasking.h"
 #include "openvswitch/dynamic-string.h"
 #include "openvswitch/json.h"
 #include "jsonrpc.h"
@@ -229,6 +230,7 @@ ovsdb_monitor_json_cache_search(const struct ovsdb_monitor 
*dbmon,
     uint32_t hash = json_cache_hash(version, change_set);
 
     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
+        cooperative_multitasking_yield();
         if (uuid_equals(&node->change_set_uuid, &change_set->uuid) &&
             node->version == version) {
             return node;
@@ -262,6 +264,7 @@ ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
     struct ovsdb_monitor_json_cache_node *node;
 
     HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
+        cooperative_multitasking_yield();
         json_destroy(node->json);
         free(node);
     }
@@ -309,6 +312,7 @@ ovsdb_monitor_changes_row_find(
 
     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
                              &changes->rows) {
+        cooperative_multitasking_yield();
         if (uuid_equals(uuid, &row->uuid)) {
             return row;
         }
@@ -406,6 +410,7 @@ ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
     struct shash_node *node;
 
     SHASH_FOR_EACH (node, &dbmon->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table *mt = node->data;
 
         if (mt->n_columns == 0) {
@@ -546,6 +551,7 @@ ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
     struct shash_node *node;
 
     SHASH_FOR_EACH(node, &cond->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table_condition *mtc = node->data;
         struct ovsdb_monitor_table *mt =
             shash_find_data(&dbmon->tables, mtc->table->schema->name);
@@ -576,6 +582,7 @@ ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon,
 
     struct shash_node *node;
     SHASH_FOR_EACH (node, &dbmon->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table *mt = node->data;
         if (!init_only || (mt->select & OJMS_INITIAL)) {
             struct ovsdb_monitor_change_set_for_table *mcst =
@@ -599,6 +606,7 @@ ovsdb_monitor_find_change_set(const struct ovsdb_monitor 
*dbmon,
 {
     struct ovsdb_monitor_change_set *cs;
     LIST_FOR_EACH (cs, list_node, &dbmon->change_sets) {
+        cooperative_multitasking_yield();
         if (uuid_equals(&cs->prev_txn, prev_txn)) {
             /* Check n_columns for each table in dbmon, in case it is changed
              * after the change set is populated. */
@@ -666,6 +674,7 @@ ovsdb_monitor_change_set_destroy(struct 
ovsdb_monitor_change_set *mcs)
 
         struct ovsdb_monitor_row *row;
         HMAP_FOR_EACH_SAFE (row, hmap_node, &mcst->rows) {
+            cooperative_multitasking_yield();
             hmap_remove(&mcst->rows, &row->hmap_node);
             ovsdb_monitor_row_destroy(mcst->mt, row, mcst->n_columns);
         }
@@ -694,6 +703,7 @@ ovsdb_monitor_session_condition_set_mode(
     struct shash_node *node;
 
     SHASH_FOR_EACH (node, &cond->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table_condition *mtc = node->data;
 
         if (!ovsdb_condition_is_true(&mtc->new_condition)) {
@@ -727,6 +737,7 @@ ovsdb_monitor_session_condition_destroy(
     }
 
     SHASH_FOR_EACH_SAFE (node, &condition->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table_condition *mtc = node->data;
 
         ovsdb_condition_destroy(&mtc->new_condition);
@@ -1119,6 +1130,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
     size_t max_columns = 0;
 
     SHASH_FOR_EACH (node, &dbmon->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table *mt = node->data;
 
         max_columns = MAX(max_columns, mt->n_columns);
@@ -1172,6 +1184,7 @@ ovsdb_monitor_compose_update(
         struct ovsdb_monitor_table *mt = mcst->mt;
 
         HMAP_FOR_EACH_SAFE (row, hmap_node, &mcst->rows) {
+            cooperative_multitasking_yield();
             struct json *row_json;
             row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
                                      initial, changed, mcst->n_columns);
@@ -1215,6 +1228,7 @@ ovsdb_monitor_compose_cond_change_update(
 
         /* Iterate over all rows in table */
         HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+            cooperative_multitasking_yield();
             struct json *row_json;
 
             row_json = ovsdb_monitor_compose_row_update2(mt, condition,
@@ -1286,7 +1300,8 @@ ovsdb_monitor_get_update(
 
                         /* Pre-serializing the object to avoid doing this
                          * for every client. */
-                        json_serialized = json_serialized_object_create(json);
+                        json_serialized =
+                            json_serialized_object_create_with_yield(json);
                         json_destroy(json);
                         json = json_serialized;
                     }
@@ -1540,6 +1555,7 @@ ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon,
             if (mcst->mt->select & OJMS_INITIAL) {
                 struct ovsdb_row *row;
                 HMAP_FOR_EACH (row, hmap_node, &mcst->mt->table->rows) {
+                    cooperative_multitasking_yield();
                     ovsdb_monitor_changes_update(NULL, row, mcst->mt, mcst);
                 }
             }
@@ -1595,6 +1611,7 @@ ovsdb_monitor_get_changes_after(const struct uuid 
*txn_uuid,
     struct ovsdb_txn_history_node *h_node;
     bool found = false;
     LIST_FOR_EACH (h_node, node, &dbmon->db->txn_history) {
+        cooperative_multitasking_yield();
         struct ovsdb_txn *txn = h_node->txn;
         if (!found) {
             /* find the txn with last_id in history */
@@ -1627,6 +1644,7 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor 
*dbmon,
 
     /* Find and remove the jsonrpc monitor from the list.  */
     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
+        cooperative_multitasking_yield();
         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
             /* Release the tracked changes. */
             if (change_set) {
@@ -1683,6 +1701,7 @@ ovsdb_monitor_equal(const struct ovsdb_monitor *a,
     }
 
     SHASH_FOR_EACH(node, &a->tables) {
+        cooperative_multitasking_yield();
         const struct ovsdb_monitor_table *mta = node->data;
         const struct ovsdb_monitor_table *mtb;
 
@@ -1740,6 +1759,7 @@ ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
 
     hash = ovsdb_monitor_hash(new_dbmon, 0);
     HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
+        cooperative_multitasking_yield();
         if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
             return dbmon;
         }
@@ -1765,10 +1785,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 
     struct ovsdb_monitor_change_set *cs;
     LIST_FOR_EACH_SAFE (cs, list_node, &dbmon->change_sets) {
+        cooperative_multitasking_yield();
         ovsdb_monitor_change_set_destroy(cs);
     }
 
     SHASH_FOR_EACH (node, &dbmon->tables) {
+        cooperative_multitasking_yield();
         struct ovsdb_monitor_table *mt = node->data;
         ovs_assert(ovs_list_is_empty(&mt->change_sets));
         free(mt->columns);
@@ -1805,6 +1827,7 @@ ovsdb_monitors_commit(struct ovsdb *db, const struct 
ovsdb_txn *txn)
     struct ovsdb_monitor *m;
 
     LIST_FOR_EACH (m, list_node, &db->monitors) {
+        cooperative_multitasking_yield();
         ovsdb_monitor_commit(m, txn);
     }
 }
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 4d29043f4..8f560b6a7 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -24,6 +24,7 @@
 
 #include "column.h"
 #include "command-line.h"
+#include "cooperative-multitasking.h"
 #include "daemon.h"
 #include "dirs.h"
 #include "dns-resolve.h"
@@ -65,6 +66,8 @@
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_server);
 
+static struct hmap cooperative_multitasking_callbacks;
+
 struct db {
     char *filename;
     struct ovsdb *db;
@@ -338,6 +341,7 @@ main(int argc, char *argv[])
     fatal_ignore_sigpipe();
     process_init();
     dns_resolve_init(true);
+    cooperative_multitasking_init(&cooperative_multitasking_callbacks);
 
     bool active = false;
     parse_options(argc, argv, &db_filenames, &remotes, &unixctl_path,
@@ -530,6 +534,7 @@ main(int argc, char *argv[])
     }
     dns_resolve_destroy();
     perf_counters_destroy();
+    cooperative_multitasking_destroy();
     service_stop();
     return 0;
 }
-- 
2.34.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to