From: Felix Huettner <felix.huettner@stackit.cloud>

Previously jsonrpc_recv would return EAGAIN on different occations:

* When the underlying recv call returnes -EAGAIN.
* When we did not receive a full json message after 50 recv calls.

In the first case there is no point in further calling jsonrpc_recv
again.

However in the second case we will make progress by calling jsonrpc_recv
again and potentially get a full message back.

This can later be used to allow us to receive until a certain time
limit.

Co-authored-by: Martin Morgenstern <martin.morgenst...@cloudandheat.com>
Signed-off-by: Martin Morgenstern <martin.morgenst...@cloudandheat.com>
Signed-off-by: Felix Huettner <felix.huettner@stackit.cloud>
---
 lib/jsonrpc.c          | 32 +++++++++++++++++++-------------
 lib/jsonrpc.h          |  2 +-
 lib/ovsdb-cs.c         | 16 ++++++++++++----
 ovsdb/jsonrpc-server.c |  4 ++--
 ovsdb/ovsdb-client.c   | 16 +++++++++++++---
 ovsdb/raft.c           |  3 ++-
 ovsdb/replication.c    |  4 ++--
 tests/test-jsonrpc.c   |  2 +-
 8 files changed, 52 insertions(+), 27 deletions(-)

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index eacfd7e26..167490968 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -312,12 +312,14 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 
 /* Attempts to receive a message from 'rpc'.
  *
- * If successful, stores the received message in '*msgp' and returns 0.  The
+ * If successful, stores the received message in '*msgp'. The
  * caller takes ownership of '*msgp' and must eventually destroy it with
  * jsonrpc_msg_destroy().
  *
  * Otherwise, stores NULL in '*msgp' and returns one of the following:
  *
+ *   - 0: Message could not be completely received in this batch.
+ *
  *   - EAGAIN: No message has been received.
  *
  *   - EOF: The remote end closed the connection gracefully.
@@ -394,7 +396,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
      * iterations. We want to know how often we abort for this reason. */
     COVERAGE_INC(jsonrpc_recv_needs_retry);
 
-    return EAGAIN;
+    return 0;
 }
 
 /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
@@ -443,15 +445,15 @@ jsonrpc_recv_block(struct jsonrpc *rpc, struct 
jsonrpc_msg **msgp)
 {
     for (;;) {
         int error = jsonrpc_recv(rpc, msgp);
-        if (error != EAGAIN) {
+        if (!*msgp && (error == 0 || error == EAGAIN)) {
+            jsonrpc_run(rpc);
+            jsonrpc_wait(rpc);
+            jsonrpc_recv_wait(rpc);
+            poll_block();
+        } else {
             fatal_signal_run();
             return error;
         }
-
-        jsonrpc_run(rpc);
-        jsonrpc_wait(rpc);
-        jsonrpc_recv_wait(rpc);
-        poll_block();
     }
 }
 
@@ -1180,15 +1182,16 @@ jsonrpc_session_send(struct jsonrpc_session *s, struct 
jsonrpc_msg *msg)
     }
 }
 
-struct jsonrpc_msg *
-jsonrpc_session_recv(struct jsonrpc_session *s)
+int
+jsonrpc_session_recv(struct jsonrpc_session *s, struct jsonrpc_msg **full_msg)
 {
     if (s->rpc) {
         unsigned int received_bytes;
         struct jsonrpc_msg *msg;
+        int ret;
 
         received_bytes = jsonrpc_get_received_bytes(s->rpc);
-        jsonrpc_recv(s->rpc, &msg);
+        ret = jsonrpc_recv(s->rpc, &msg);
 
         long long int now = time_msec();
         reconnect_receive_attempted(s->reconnect, now);
@@ -1213,12 +1216,15 @@ jsonrpc_session_recv(struct jsonrpc_session *s)
                        && !strcmp(msg->id->string, "echo")) {
                 /* It's a reply to our echo request.  Suppress it. */
             } else {
-                return msg;
+                *full_msg = msg;
+                return 0;
             }
             jsonrpc_msg_destroy(msg);
+        } else {
+            return ret;
         }
     }
-    return NULL;
+    return 0;
 }
 
 /* Preemptively send an echo reply if needed. */
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index d3796f094..462c733da 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -126,7 +126,7 @@ const char *jsonrpc_session_get_name(const struct 
jsonrpc_session *);
 size_t jsonrpc_session_get_n_remotes(const struct jsonrpc_session *);
 
 int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
-struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *);
+int jsonrpc_session_recv(struct jsonrpc_session *, struct jsonrpc_msg **);
 void jsonrpc_session_recv_wait(struct jsonrpc_session *);
 
 bool jsonrpc_session_is_alive(const struct jsonrpc_session *);
diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c
index 9250db267..98386532d 100644
--- a/lib/ovsdb-cs.c
+++ b/lib/ovsdb-cs.c
@@ -639,13 +639,21 @@ ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
         }
     }
 
+    int ret;
     for (int i = 0; i < 50; i++) {
-        struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
-        if (!msg) {
+        struct jsonrpc_msg *msg = NULL;
+        ret = jsonrpc_session_recv(cs->session, &msg);
+        if (ret == EAGAIN) {
             break;
         }
-        ovsdb_cs_process_msg(cs, msg);
-        jsonrpc_msg_destroy(msg);
+        /* Even if we would not block we might not receive a message for two
+         * reasons:
+         *   1. We did not yet receive the message fully and stopped reading.
+         *   2. The message was already handled by the jsonrpc layer. */
+        if (msg) {
+            ovsdb_cs_process_msg(cs, msg);
+            jsonrpc_msg_destroy(msg);
+        }
     }
 
 
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 26a53898f..9e437c47e 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -667,11 +667,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
     ovsdb_jsonrpc_trigger_complete_done(s);
 
     if (!jsonrpc_session_get_backlog(s->js)) {
-        struct jsonrpc_msg *msg;
+        struct jsonrpc_msg *msg = NULL;
 
         ovsdb_jsonrpc_monitor_flush_all(s);
 
-        msg = jsonrpc_session_recv(s->js);
+        jsonrpc_session_recv(s->js, &msg);
         if (msg) {
             if (msg->type == JSONRPC_REQUEST) {
                 ovsdb_jsonrpc_session_got_request(s, msg);
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 3fa1d9afc..dc99fb059 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -217,7 +217,8 @@ open_rpc(int min_args, enum args_needed need,
             jsonrpc_session_send(js, txn);
         }
 
-        struct jsonrpc_msg *reply = jsonrpc_session_recv(js);
+        struct jsonrpc_msg *reply = NULL;
+        jsonrpc_session_recv(js, &reply);
         if (reply && id && reply->id && json_equal(id, reply->id)) {
             if (reply->type == JSONRPC_REPLY
                 && should_stay_connected(jsonrpc_session_get_name(js),
@@ -1496,6 +1497,10 @@ do_monitor__(struct jsonrpc *rpc, const char *database,
                 ovs_fatal(error, "%s: receive failed", server);
             }
 
+            if (!msg) {
+                break;
+            }
+
             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
                 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
                                                        msg->id));
@@ -2308,7 +2313,7 @@ do_lock(struct jsonrpc *rpc, const char *method, const 
char *lock)
     }
 
     for (;;) {
-        struct jsonrpc_msg *msg;
+        struct jsonrpc_msg *msg = NULL;
         int error;
 
         unixctl_server_run(unixctl);
@@ -2326,6 +2331,10 @@ do_lock(struct jsonrpc *rpc, const char *method, const 
char *lock)
             ovs_fatal(error, "%s: receive failed", jsonrpc_get_name(rpc));
         }
 
+        if (!msg) {
+            goto no_msg;
+        }
+
         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
             jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
                                                    msg->id));
@@ -2479,7 +2488,8 @@ do_wait(struct jsonrpc *rpc_unused OVS_UNUSED,
             jsonrpc_session_send(js, rq);
         }
 
-        struct jsonrpc_msg *reply = jsonrpc_session_recv(js);
+        struct jsonrpc_msg *reply = NULL;
+        jsonrpc_session_recv(js, &reply);
         if (reply && reply->id) {
             if (sdca_id && json_equal(sdca_id, reply->id)) {
                 if (reply->type == JSONRPC_ERROR) {
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 9c3c351b5..67211e266 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -1503,7 +1503,8 @@ static bool
 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
                   union raft_rpc *rpc)
 {
-    struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
+    struct jsonrpc_msg *msg = NULL;
+    jsonrpc_session_recv(conn->js, &msg);
     if (!msg) {
         return false;
     }
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 56720cb10..73e19d180 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -188,7 +188,7 @@ replication_run_db(struct replication_db *rdb)
     jsonrpc_session_run(rdb->session);
 
     for (int i = 0; i < 50; i++) {
-        struct jsonrpc_msg *msg;
+        struct jsonrpc_msg *msg = NULL;
         unsigned int seqno;
 
         if (!jsonrpc_session_is_connected(rdb->session)) {
@@ -210,7 +210,7 @@ replication_run_db(struct replication_db *rdb)
             VLOG_DBG("%s: send server ID request.", rdb->db->name);
         }
 
-        msg = jsonrpc_session_recv(rdb->session);
+        jsonrpc_session_recv(rdb->session, &msg);
         if (!msg) {
             continue;
         }
diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
index 04e941b14..2de9188f5 100644
--- a/tests/test-jsonrpc.c
+++ b/tests/test-jsonrpc.c
@@ -211,7 +211,7 @@ do_listen(struct ovs_cmdl_context *ctx)
             jsonrpc_run(rpc);
             if (!jsonrpc_get_backlog(rpc)) {
                 error = jsonrpc_recv(rpc, &msg);
-                if (!error) {
+                if (msg) {
                     error = handle_rpc(rpc, msg, &done);
                     jsonrpc_msg_destroy(msg);
                 } else if (error == EAGAIN) {
-- 
2.45.2

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to