One of the goals of Open vSwitch is to be as resource efficient as possible. Core parts of the program has been implemented as asynchronous state machines, and when absolutely necessary additional threads are used.
Introduce cooperative multitasking module which allow us to interleave important processing with long running tasks while avoiding the additional resource consumption of threads and complexity of asynchronous state machines. We will use this module to ensure long running processing in the OVSDB server does not interfere with stable maintenance of the RAFT cluster in subsequent patches. Suggested-by: Ilya Maximets <[email protected]> Signed-off-by: Frode Nordahl <[email protected]> --- lib/automake.mk | 3 + lib/cooperative-multitasking-private.h | 31 +++ lib/cooperative-multitasking.c | 195 +++++++++++++++++++ lib/cooperative-multitasking.h | 42 ++++ tests/automake.mk | 1 + tests/library.at | 10 + tests/ovsdb-server.at | 1 + tests/test-cooperative-multitasking.c | 259 +++++++++++++++++++++++++ 8 files changed, 542 insertions(+) create mode 100644 lib/cooperative-multitasking-private.h create mode 100644 lib/cooperative-multitasking.c create mode 100644 lib/cooperative-multitasking.h create mode 100644 tests/test-cooperative-multitasking.c diff --git a/lib/automake.mk b/lib/automake.mk index 0dc8a35cc..8596171c6 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \ lib/conntrack-other.c \ lib/conntrack.c \ lib/conntrack.h \ + lib/cooperative-multitasking.c \ + lib/cooperative-multitasking.h \ + lib/cooperative-multitasking-private.h \ lib/coverage.c \ lib/coverage.h \ lib/cpu.c \ diff --git a/lib/cooperative-multitasking-private.h b/lib/cooperative-multitasking-private.h new file mode 100644 index 000000000..b2e4e7291 --- /dev/null +++ b/lib/cooperative-multitasking-private.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Canonical Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H +#define COOPERATIVE_MULTITASKING_PRIVATE_H 1 + +#include "openvswitch/hmap.h" + +struct cooperative_multitasking_callback { + struct hmap_node node; + void (*cb)(void *); + void *arg; + long long int time_threshold; + long long int last_run; + const char *msg; +}; + +#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */ diff --git a/lib/cooperative-multitasking.c b/lib/cooperative-multitasking.c new file mode 100644 index 000000000..1b1205e8f --- /dev/null +++ b/lib/cooperative-multitasking.c @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2023 Canonical Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "backtrace.h" +#include "cooperative-multitasking-private.h" +#include "cooperative-multitasking.h" +#include "hash.h" +#include "openvswitch/hmap.h" +#include "openvswitch/vlog.h" +#include "timeval.h" + +VLOG_DEFINE_THIS_MODULE(cooperative_multitasking); + +static struct hmap *cooperative_multitasking_callbacks = NULL; + +/* One time initialization for process that wants to make use of cooperative + * multitasking module. References to data is stored in 'hmap_container' and + * will be referenced by all calls to this module. The ownership of the + * container itself remains with the caller while the data in the hmap is owned + * by this module and must be freed with a call to + * cooperative_multitasking_destroy(). + * + * The purpose of having the caller own 'hmap_container' is: + * 1) Allow runtime decision whether to use cooperative multitasking without + * having to pass data between loosely connected parts of a program. This + * is useful for the raft code which is consumed by both the ovsdb-server + * daemon and the ovsdb-tool CLI utility. + * 2) Allow inspection of internal data by unit tests. */ +void +cooperative_multitasking_init(struct hmap *hmap_container) +{ + cooperative_multitasking_callbacks = hmap_container; + hmap_init(cooperative_multitasking_callbacks); +} + +/* Register callback 'cb' with argument 'arg' to be called when + * cooperating long running functions yield and 'time_threshold' msec has + * passed since the last call to the function. If the optional 'msg' is not + * NULL it will be used when logging time threshold overrun conditions. + * + * It is possible to register the same callback multiple times as long as 'arg' + * is different for each registration. It is up to the caller to ensure no + * unwanted duplicates are registered. + * + * The callback is expected to update the timestamp for last run with a call to + * cooperative_multitasking_update() using the same values for 'cb' and 'arg'. + */ +void +cooperative_multitasking_register(void (*cb)(void *), void *arg, + long long int time_threshold, + const char *msg) +{ + if (!cooperative_multitasking_callbacks) { + return; + } + + struct cooperative_multitasking_callback *cm_entry; + + cm_entry = xzalloc(sizeof *cm_entry); + cm_entry->cb = cb; + cm_entry->arg = arg; + cm_entry->time_threshold = time_threshold; + cm_entry->last_run = time_msec(); + cm_entry->msg = msg; + + hmap_insert(cooperative_multitasking_callbacks, + &cm_entry->node, + hash_pointer( + cm_entry->arg ? cm_entry->arg : (void *) cm_entry->cb, 0)); +} + +/* Free any data allocated by calls to cooperative_multitasking_register(). */ +void +cooperative_multitasking_destroy(void) +{ + struct cooperative_multitasking_callback *cm_entry; + HMAP_FOR_EACH_SAFE (cm_entry, node, cooperative_multitasking_callbacks) { + hmap_remove(cooperative_multitasking_callbacks, &cm_entry->node); + free(cm_entry); + } +} + +/* Update data for already registered callback identified by 'cb' and 'arg'. + * + * The value for 'last_run' must at a minimal be updated each time the callback + * is run. It can also be useful to update for multiple entry points to the + * part serviced by the callback to avoid unnecessary callbacks on next call to + * cooperative_multitasking_yield(). + * + * Updating the value for 'time_threshold' may be necessary as a consequence of + * the change in runtime configuration or requirements of the serviced + * callback. + * + * Providing a value of 0 for 'last_run' or 'time_threshold' will result in + * the respective stored value left untouched. */ +void +cooperative_multitasking_update(void (*cb)(void *), void *arg, + long long int last_run, + long long int time_threshold) +{ + if (!cooperative_multitasking_callbacks) { + return; + } + + struct cooperative_multitasking_callback *cm_entry; + + HMAP_FOR_EACH_WITH_HASH (cm_entry, node, + hash_pointer(arg ? arg : (void *) cb, 0), + cooperative_multitasking_callbacks) + { + if (cm_entry->cb == cb && cm_entry->arg == arg) { + if (last_run) { + cm_entry->last_run = last_run; + } + + if (time_threshold) { + cm_entry->time_threshold = time_threshold; + } + return; + } + } +} + +static void +cooperative_multitasking_yield_at__(const char *source_location) +{ + long long int now = time_msec(); + struct cooperative_multitasking_callback *cm_entry; + + HMAP_FOR_EACH (cm_entry, node, cooperative_multitasking_callbacks) { + long long int elapsed = now - cm_entry->last_run; + + if (elapsed >= cm_entry->time_threshold) { + VLOG_DBG("yield called from %s: " + "%lld: %lld >= %lld, executing %p(%p)", + source_location, now, elapsed, cm_entry->time_threshold, + cm_entry->cb, cm_entry->arg); + (*cm_entry->cb)(cm_entry->arg); + if (elapsed - cm_entry->time_threshold > + cm_entry->time_threshold / 8) + { + VLOG_WARN("yield threshold overrun with %lld msec. %s", + elapsed - cm_entry->time_threshold, + cm_entry->msg ? cm_entry->msg : ""); + if (VLOG_IS_DBG_ENABLED()) { + /* log_backtrace() logs at ERROR level but we only want to + * log a backtrace when DEBUG is enabled */ + log_backtrace(); + } + } + } + } +} + +/* Iterate over registered callbacks and execute callbacks as demanded by the + * recorded time threshold. */ +void +cooperative_multitasking_yield_at(const char *source_location) +{ + static bool yield_in_progress = false; + + if (!cooperative_multitasking_callbacks) { + return; + } + + if (yield_in_progress) { + VLOG_ERR_ONCE("nested yield avoided, this is a bug! " + "enable debug logging for more details."); + if (VLOG_IS_DBG_ENABLED()) { + VLOG_DBG("nested yield, called from %s", source_location); + log_backtrace(); + } + return; + } + yield_in_progress = true; + + cooperative_multitasking_yield_at__(source_location); + + yield_in_progress = false; +} diff --git a/lib/cooperative-multitasking.h b/lib/cooperative-multitasking.h new file mode 100644 index 000000000..6286bfbf5 --- /dev/null +++ b/lib/cooperative-multitasking.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Canonical Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COOPERATIVE_MULTITASKING_H +#define COOPERATIVE_MULTITASKING_H 1 + +struct hmap; + +void cooperative_multitasking_init(struct hmap *); + +void cooperative_multitasking_register(void (*)(void *), void *, + long long int, const char *); +#define COOPERATIVE_MULTITASKING_REGISTER(CB, ARG, TIME_THRESHOLD, MSG) \ + cooperative_multitasking_register((void (*)(void *)) CB, (void *) ARG, \ + TIME_THRESHOLD, MSG) + +void cooperative_multitasking_destroy(void); + +void cooperative_multitasking_update(void (*)(void *), void *, long long int, + long long int); +#define COOPERATIVE_MULTITASKING_UPDATE(CB, ARG, LAST_RUN, TIME_THRESHOLD) \ + cooperative_multitasking_update((void (*) (void *)) CB, (void *) ARG, \ + LAST_RUN, TIME_THRESHOLD) + +void cooperative_multitasking_yield_at(const char *); +#define cooperative_multitasking_yield() \ + cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR) + +#endif /* COOPERATIVE_MULTITASKING_H */ diff --git a/tests/automake.mk b/tests/automake.mk index 10c9fbb01..08c9b74d4 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -456,6 +456,7 @@ tests_ovstest_SOURCES = \ tests/test-ccmap.c \ tests/test-cmap.c \ tests/test-conntrack.c \ + tests/test-cooperative-multitasking.c \ tests/test-csum.c \ tests/test-flows.c \ tests/test-hash.c \ diff --git a/tests/library.at b/tests/library.at index 3f9df2f87..77d5abb01 100644 --- a/tests/library.at +++ b/tests/library.at @@ -296,3 +296,13 @@ AT_CLEANUP AT_SETUP([uuidset module]) AT_CHECK([ovstest test-uuidset], [0], [], [ignore]) AT_CLEANUP + +AT_SETUP([cooperative-multitasking module]) +AT_CHECK([ovstest test-cooperative-multitasking], [0], []) +AT_CLEANUP + +AT_SETUP([cooperative-multitasking module nested yield detection]) +AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl +cooperative_multitasking|ERR|nested yield avoided, this is a bug! enable debug logging for more details. +]) +AT_CLEANUP diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at index 6eb758e22..88a9a9a27 100644 --- a/tests/ovsdb-server.at +++ b/tests/ovsdb-server.at @@ -2387,6 +2387,7 @@ m4_define([CLEAN_LOG_FILE], [sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl sed '/|poll_loop|/d' | dnl sed '/|socket_util|/d' | dnl + sed '/|cooperative_multitasking|DBG|/d' | dnl sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2]) CLEAN_LOG_FILE([1.log], [1.log.clear]) diff --git a/tests/test-cooperative-multitasking.c b/tests/test-cooperative-multitasking.c new file mode 100644 index 000000000..ec6be865f --- /dev/null +++ b/tests/test-cooperative-multitasking.c @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2023 Canonical Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#undef NDEBUG +#include "cooperative-multitasking.h" +#include "cooperative-multitasking-private.h" +#include "openvswitch/hmap.h" +#include "ovstest.h" +#include "timeval.h" +#include "util.h" +#include "openvswitch/vlog.h" + +static struct hmap cm_callbacks; + +struct fixture_arg { + bool called; +}; + +static void +fixture_run(struct fixture_arg *arg) +{ + COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, arg, time_msec(), 0); + if (arg) { + arg->called = true; + } +} + +static void +fixture_other_run(struct fixture_arg *arg) +{ + COOPERATIVE_MULTITASKING_UPDATE(&fixture_other_run, arg, time_msec(), 0); + if (arg) { + arg->called = true; + } +} + +static void +test_cm_register(void) +{ + struct cooperative_multitasking_callback *cm_entry; + struct fixture_arg arg1 = { + .called = false, + }; + struct fixture_arg arg2 = { + .called = false, + }; + + timeval_stop(); + long long int now = time_msec(); + + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL); + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL); + COOPERATIVE_MULTITASKING_REGISTER(&fixture_other_run, NULL, 3000, NULL); + + ovs_assert(hmap_count(&cm_callbacks) == 3); + + HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) { + if (cm_entry->arg == (void *)&arg1) { + ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run); + ovs_assert (cm_entry->time_threshold == 1000); + ovs_assert (cm_entry->last_run == now); + } else if (cm_entry->arg == (void *)&arg2) { + ovs_assert (cm_entry->cb == (void (*)(void *)) &fixture_run); + ovs_assert (cm_entry->time_threshold == 2000); + ovs_assert (cm_entry->last_run == now); + } else if (cm_entry->cb == (void (*)(void *)) &fixture_other_run) { + ovs_assert (cm_entry->arg == NULL); + ovs_assert (cm_entry->time_threshold == 3000); + ovs_assert (cm_entry->last_run == now); + } else { + OVS_NOT_REACHED(); + } + } + + cooperative_multitasking_destroy(); +} + +static void +test_cm_update(void) +{ + struct cooperative_multitasking_callback *cm_entry; + struct fixture_arg arg1 = { + .called = false, + }; + struct fixture_arg arg2 = { + .called = false, + }; + + timeval_stop(); + long long int now = time_msec(); + + /* first register a couple of callbacks. */ + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 0, NULL); + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 0, NULL); + + ovs_assert(hmap_count(&cm_callbacks) == 2); + + HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) { + if (cm_entry->arg == (void *)&arg1) { + ovs_assert (cm_entry->time_threshold == 0); + ovs_assert (cm_entry->last_run == now); + } else if (cm_entry->arg == (void *)&arg2) { + ovs_assert (cm_entry->time_threshold == 0); + ovs_assert (cm_entry->last_run == now); + } else { + OVS_NOT_REACHED(); + } + } + + /* update 'last_run' and 'time_threshold' for each callback and validate + * that the correct entry was actually updated. */ + COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 1, 2); + COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 3, 4); + + HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) { + if (cm_entry->arg == (void *)&arg1) { + ovs_assert (cm_entry->time_threshold == 2); + ovs_assert (cm_entry->last_run == 1); + } else if (cm_entry->arg == (void *)&arg2) { + ovs_assert (cm_entry->time_threshold == 4); + ovs_assert (cm_entry->last_run == 3); + } else { + OVS_NOT_REACHED(); + } + } + + /* confirm that providing 0 for 'last_run' or 'time_threshold' leaves the + * existing value untouched. */ + COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg1, 0, 5); + COOPERATIVE_MULTITASKING_UPDATE(&fixture_run, &arg2, 6, 0); + + HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) { + if (cm_entry->arg == (void *)&arg1) { + ovs_assert (cm_entry->time_threshold == 5); + ovs_assert (cm_entry->last_run == 1); + } else if (cm_entry->arg == (void *)&arg2) { + ovs_assert (cm_entry->time_threshold == 4); + ovs_assert (cm_entry->last_run == 6); + } else { + OVS_NOT_REACHED(); + } + } + + cooperative_multitasking_destroy(); +} + +static void +test_cm_yield(void) +{ + struct cooperative_multitasking_callback *cm_entry; + struct fixture_arg arg1 = { + .called = false, + }; + struct fixture_arg arg2 = { + .called = false, + }; + + timeval_stop(); + long long int now = time_msec(); + + /* first register a couple of callbacks. */ + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg1, 1000, NULL); + COOPERATIVE_MULTITASKING_REGISTER(&fixture_run, &arg2, 2000, NULL); + + ovs_assert(hmap_count(&cm_callbacks) == 2); + + /* call to yield should not execute callbacks until time threshold. */ + cooperative_multitasking_yield(); + ovs_assert(arg1.called == false); + ovs_assert(arg2.called == false); + + HMAP_FOR_EACH (cm_entry, node, &cm_callbacks) { + ovs_assert(cm_entry->last_run == now); + } + + /* move clock forward and confirm the expected callbacks to be executed. */ + timeval_warp(0, 1000); + timeval_stop(); + cooperative_multitasking_yield(); + ovs_assert(arg1.called == true); + ovs_assert(arg2.called == false); + + /* move clock forward and confirm the expected callbacks to be executed. */ + arg1.called = arg2.called = false; + timeval_warp(0, 1000); + timeval_stop(); + cooperative_multitasking_yield(); + ovs_assert(arg1.called == true); + ovs_assert(arg2.called == true); + + timeval_warp(0, 1); + cooperative_multitasking_destroy(); +} + +static void +fixture_buggy_run(struct fixture_arg *arg) +{ + COOPERATIVE_MULTITASKING_UPDATE(&fixture_buggy_run, arg, time_msec(), 0); + if (arg) { + arg->called = true; + } + /* A real run function MUST NOT directly or indirectly call yield, this is + * here to test the detection of such a programming error. */ + cooperative_multitasking_yield(); +} + +static void +test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[]) +{ + struct fixture_arg arg1 = { + .called = false, + }; + + set_program_name(argv[0]); + vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m"); + vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF); + + time_msec(); /* ensure timeval is initialized */ + timeval_timewarp_enable(); + + cooperative_multitasking_init(&cm_callbacks); + + COOPERATIVE_MULTITASKING_REGISTER(&fixture_buggy_run, &arg1, 1000, NULL); + timeval_warp(0, 1000); + cooperative_multitasking_yield(); + cooperative_multitasking_destroy(); +} + +static void +test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) +{ + time_msec(); /* ensure timeval is initialized */ + timeval_timewarp_enable(); + + cooperative_multitasking_init(&cm_callbacks); + + test_cm_register(); + test_cm_update(); + test_cm_yield(); +} + +OVSTEST_REGISTER("test-cooperative-multitasking", + test_cooperative_multitasking); +OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield", + test_cooperative_multitasking_nested_yield); -- 2.34.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
