On 5/7/25 3:41 PM, Martin Morgenstern via dev wrote: > Adds a timing based variant of ovsdb_cs_run() that accepts an additional > 'deadline' parameter and a new coverage counter 'ovsdb_cs_run_blocked' > that tracks how often the call returned early because it was blocked > with EAGAIN. > > ovsdb_cs_run_until() will process messages until the given deadline has > passed and allows clients to batch messages together. In contrast to > ovsdb_cs_run(), it will not abort the batch if the underlying JSON layer > has already processed the message (e.g., answering an echo request).
We should not need that if the underlying jsonrpc layer will retry instead. With that we can also avoid having two versions of the same ovsdb_cs_run* by using the same do {} while(); loop. > > Since both ovsdb_cs_run() and ovsdb_cs_run_until() perform the same > preparation steps, they were extracted into a dedicated function. > > Signed-off-by: Martin Morgenstern <martin.morgenst...@cloudandheat.com> > --- > lib/ovsdb-cs.c | 82 ++++++++++++++++++++++++++++++++++++++++++-------- > lib/ovsdb-cs.h | 1 + > 2 files changed, 71 insertions(+), 12 deletions(-) > > diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c > index b5eda88ad..36ffd1ed9 100644 > --- a/lib/ovsdb-cs.c > +++ b/lib/ovsdb-cs.c > @@ -20,6 +20,7 @@ > > #include <errno.h> > > +#include "coverage.h" > #include "hash.h" > #include "jsonrpc.h" > #include "openvswitch/dynamic-string.h" > @@ -35,11 +36,14 @@ > #include "ovsdb-types.h" > #include "sset.h" > #include "svec.h" > +#include "timeval.h" > #include "util.h" > #include "uuid.h" > > VLOG_DEFINE_THIS_MODULE(ovsdb_cs); > > +COVERAGE_DEFINE(ovsdb_cs_run_blocked); > + > /* Connection state machine. > * > * When a JSON-RPC session connects, the CS layer sends a "monitor_cond" > @@ -603,19 +607,9 @@ ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum > ovsdb_cs_event_type type) > return event; > } > > -/* Processes a batch of messages from the database server on 'cs'. This may > - * cause the CS's contents to change. > - * > - * Initializes 'events' with a list of events that occurred on 'cs'. The > - * caller must process and destroy all of the events. */ > -void > -ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) > +static void > +ovsdb_cs_run_prepare(struct ovsdb_cs *cs) > { > - ovs_list_init(events); > - if (!cs->session) { > - return; > - } > - > ovsdb_cs_send_cond_change(cs); > > jsonrpc_session_run(cs->session); > @@ -638,6 +632,22 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list > *events) > ovsdb_cs_db_compose_lock_request(&cs->data)); > } > } > +} > + > +/* Processes a batch of messages from the database server on 'cs'. This may > + * cause the CS's contents to change. > + * > + * Initializes 'events' with a list of events that occurred on 'cs'. The > + * caller must process and destroy all of the events. */ > +void > +ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) > +{ > + ovs_list_init(events); > + if (!cs->session) { > + return; > + } > + > + ovsdb_cs_run_prepare(cs); > > for (int i = 0; i < 50; i++) { > struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session); > @@ -650,6 +660,54 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list > *events) > ovs_list_push_back_all(events, &cs->data.events); > } > > +/* Processes messages from the database server on 'cs' until the given > + * deadline has passed. This may cause the CS's contents to change. > + * > + * Returns 0 on success. > + * > + * Returns EINVAL if the cs session is invalid. > + * > + * Returns EAGAIN if the underlying jsonrpc layer blocks. > + * > + * Initializes 'events' with a list of events that occurred on 'cs'. The > + * caller must process and destroy all of the events. */ > +int > +ovsdb_cs_run_until(struct ovsdb_cs *cs, struct ovs_list *events, > + long long deadline) > +{ > + ovs_list_init(events); > + if (!cs->session) { > + return EINVAL; > + } > + > + ovsdb_cs_run_prepare(cs); > + > + int ret; > + while (time_msec() < deadline) { > + struct jsonrpc_msg *msg = NULL; > + ret = jsonrpc_session_recv_until(cs->session, &msg, deadline); > + if (ret == EAGAIN) { > + COVERAGE_INC(ovsdb_cs_run_blocked); > + break; > + } > + /* Even if we would not block we might not receive a message for two > + * reasons: > + * 1. We did not yet receive the message fully and stopped reading. > + * 2. The message was already handled by the jsonrpc layer. */ > + if (msg) { > + ovsdb_cs_process_msg(cs, msg); > + jsonrpc_msg_destroy(msg); > + } > + } > + ovs_list_push_back_all(events, &cs->data.events); > + > + if (ret == EAGAIN) { > + return EAGAIN; > + } > + > + return 0; > +} > + > /* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to > * do or when activity occurs on a transaction on 'cs'. */ > void > diff --git a/lib/ovsdb-cs.h b/lib/ovsdb-cs.h > index bcc3dcd71..ff6438bdf 100644 > --- a/lib/ovsdb-cs.h > +++ b/lib/ovsdb-cs.h > @@ -119,6 +119,7 @@ struct ovsdb_cs *ovsdb_cs_create(const char *database, > int max_version, > void ovsdb_cs_destroy(struct ovsdb_cs *); > > void ovsdb_cs_run(struct ovsdb_cs *, struct ovs_list *events); > +int ovsdb_cs_run_until(struct ovsdb_cs *, struct ovs_list *events, long > long); > void ovsdb_cs_wait(struct ovsdb_cs *); > > /* Network connection. */ _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev