From: Felix Huettner <felix.huettner@stackit.cloud> Previously jsonrpc_recv would return EAGAIN on different occations:
* When the underlying recv call returnes -EAGAIN. * When we did not receive a full json message after 50 recv calls. In the first case there is no point in further calling jsonrpc_recv again. However in the second case we will make progress by calling jsonrpc_recv again and potentially get a full message back. This can later be used to allow us to receive until a certain time limit. Co-authored-by: Martin Morgenstern <martin.morgenst...@cloudandheat.com> Signed-off-by: Martin Morgenstern <martin.morgenst...@cloudandheat.com> Signed-off-by: Felix Huettner <felix.huettner@stackit.cloud> --- lib/jsonrpc.c | 32 +++++++++++++++++++------------- lib/jsonrpc.h | 2 +- lib/ovsdb-cs.c | 16 ++++++++++++---- ovsdb/jsonrpc-server.c | 4 ++-- ovsdb/ovsdb-client.c | 16 +++++++++++++--- ovsdb/raft.c | 3 ++- ovsdb/replication.c | 4 ++-- tests/test-jsonrpc.c | 2 +- 8 files changed, 52 insertions(+), 27 deletions(-) diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index eacfd7e26..167490968 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -312,12 +312,14 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) /* Attempts to receive a message from 'rpc'. * - * If successful, stores the received message in '*msgp' and returns 0. The + * If successful, stores the received message in '*msgp'. The * caller takes ownership of '*msgp' and must eventually destroy it with * jsonrpc_msg_destroy(). * * Otherwise, stores NULL in '*msgp' and returns one of the following: * + * - 0: Message could not be completely received in this batch. + * * - EAGAIN: No message has been received. * * - EOF: The remote end closed the connection gracefully. @@ -394,7 +396,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) * iterations. We want to know how often we abort for this reason. */ COVERAGE_INC(jsonrpc_recv_needs_retry); - return EAGAIN; + return 0; } /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other @@ -443,15 +445,15 @@ jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) { for (;;) { int error = jsonrpc_recv(rpc, msgp); - if (error != EAGAIN) { + if (!*msgp && (error == 0 || error == EAGAIN)) { + jsonrpc_run(rpc); + jsonrpc_wait(rpc); + jsonrpc_recv_wait(rpc); + poll_block(); + } else { fatal_signal_run(); return error; } - - jsonrpc_run(rpc); - jsonrpc_wait(rpc); - jsonrpc_recv_wait(rpc); - poll_block(); } } @@ -1180,15 +1182,16 @@ jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg) } } -struct jsonrpc_msg * -jsonrpc_session_recv(struct jsonrpc_session *s) +int +jsonrpc_session_recv(struct jsonrpc_session *s, struct jsonrpc_msg **full_msg) { if (s->rpc) { unsigned int received_bytes; struct jsonrpc_msg *msg; + int ret; received_bytes = jsonrpc_get_received_bytes(s->rpc); - jsonrpc_recv(s->rpc, &msg); + ret = jsonrpc_recv(s->rpc, &msg); long long int now = time_msec(); reconnect_receive_attempted(s->reconnect, now); @@ -1213,12 +1216,15 @@ jsonrpc_session_recv(struct jsonrpc_session *s) && !strcmp(msg->id->string, "echo")) { /* It's a reply to our echo request. Suppress it. */ } else { - return msg; + *full_msg = msg; + return 0; } jsonrpc_msg_destroy(msg); + } else { + return ret; } } - return NULL; + return 0; } /* Preemptively send an echo reply if needed. */ diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index d3796f094..462c733da 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -126,7 +126,7 @@ const char *jsonrpc_session_get_name(const struct jsonrpc_session *); size_t jsonrpc_session_get_n_remotes(const struct jsonrpc_session *); int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *); -struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *); +int jsonrpc_session_recv(struct jsonrpc_session *, struct jsonrpc_msg **); void jsonrpc_session_recv_wait(struct jsonrpc_session *); bool jsonrpc_session_is_alive(const struct jsonrpc_session *); diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c index 9250db267..98386532d 100644 --- a/lib/ovsdb-cs.c +++ b/lib/ovsdb-cs.c @@ -639,13 +639,21 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) } } + int ret; for (int i = 0; i < 50; i++) { - struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session); - if (!msg) { + struct jsonrpc_msg *msg = NULL; + ret = jsonrpc_session_recv(cs->session, &msg); + if (ret == EAGAIN) { break; } - ovsdb_cs_process_msg(cs, msg); - jsonrpc_msg_destroy(msg); + /* Even if we would not block we might not receive a message for two + * reasons: + * 1. We did not yet receive the message fully and stopped reading. + * 2. The message was already handled by the jsonrpc layer. */ + if (msg) { + ovsdb_cs_process_msg(cs, msg); + jsonrpc_msg_destroy(msg); + } } diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 26a53898f..9e437c47e 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -667,11 +667,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) ovsdb_jsonrpc_trigger_complete_done(s); if (!jsonrpc_session_get_backlog(s->js)) { - struct jsonrpc_msg *msg; + struct jsonrpc_msg *msg = NULL; ovsdb_jsonrpc_monitor_flush_all(s); - msg = jsonrpc_session_recv(s->js); + jsonrpc_session_recv(s->js, &msg); if (msg) { if (msg->type == JSONRPC_REQUEST) { ovsdb_jsonrpc_session_got_request(s, msg); diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c index 3fa1d9afc..dc99fb059 100644 --- a/ovsdb/ovsdb-client.c +++ b/ovsdb/ovsdb-client.c @@ -217,7 +217,8 @@ open_rpc(int min_args, enum args_needed need, jsonrpc_session_send(js, txn); } - struct jsonrpc_msg *reply = jsonrpc_session_recv(js); + struct jsonrpc_msg *reply = NULL; + jsonrpc_session_recv(js, &reply); if (reply && id && reply->id && json_equal(id, reply->id)) { if (reply->type == JSONRPC_REPLY && should_stay_connected(jsonrpc_session_get_name(js), @@ -1496,6 +1497,10 @@ do_monitor__(struct jsonrpc *rpc, const char *database, ovs_fatal(error, "%s: receive failed", server); } + if (!msg) { + break; + } + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params), msg->id)); @@ -2308,7 +2313,7 @@ do_lock(struct jsonrpc *rpc, const char *method, const char *lock) } for (;;) { - struct jsonrpc_msg *msg; + struct jsonrpc_msg *msg = NULL; int error; unixctl_server_run(unixctl); @@ -2326,6 +2331,10 @@ do_lock(struct jsonrpc *rpc, const char *method, const char *lock) ovs_fatal(error, "%s: receive failed", jsonrpc_get_name(rpc)); } + if (!msg) { + goto no_msg; + } + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params), msg->id)); @@ -2479,7 +2488,8 @@ do_wait(struct jsonrpc *rpc_unused OVS_UNUSED, jsonrpc_session_send(js, rq); } - struct jsonrpc_msg *reply = jsonrpc_session_recv(js); + struct jsonrpc_msg *reply = NULL; + jsonrpc_session_recv(js, &reply); if (reply && reply->id) { if (sdca_id && json_equal(sdca_id, reply->id)) { if (reply->type == JSONRPC_ERROR) { diff --git a/ovsdb/raft.c b/ovsdb/raft.c index 9c3c351b5..67211e266 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -1503,7 +1503,8 @@ static bool raft_conn_receive(struct raft *raft, struct raft_conn *conn, union raft_rpc *rpc) { - struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js); + struct jsonrpc_msg *msg = NULL; + jsonrpc_session_recv(conn->js, &msg); if (!msg) { return false; } diff --git a/ovsdb/replication.c b/ovsdb/replication.c index 56720cb10..73e19d180 100644 --- a/ovsdb/replication.c +++ b/ovsdb/replication.c @@ -188,7 +188,7 @@ replication_run_db(struct replication_db *rdb) jsonrpc_session_run(rdb->session); for (int i = 0; i < 50; i++) { - struct jsonrpc_msg *msg; + struct jsonrpc_msg *msg = NULL; unsigned int seqno; if (!jsonrpc_session_is_connected(rdb->session)) { @@ -210,7 +210,7 @@ replication_run_db(struct replication_db *rdb) VLOG_DBG("%s: send server ID request.", rdb->db->name); } - msg = jsonrpc_session_recv(rdb->session); + jsonrpc_session_recv(rdb->session, &msg); if (!msg) { continue; } diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c index 04e941b14..2de9188f5 100644 --- a/tests/test-jsonrpc.c +++ b/tests/test-jsonrpc.c @@ -211,7 +211,7 @@ do_listen(struct ovs_cmdl_context *ctx) jsonrpc_run(rpc); if (!jsonrpc_get_backlog(rpc)) { error = jsonrpc_recv(rpc, &msg); - if (!error) { + if (msg) { error = handle_rpc(rpc, msg, &done); jsonrpc_msg_destroy(msg); } else if (error == EAGAIN) { -- 2.45.2 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev