Introduce two implementations of the jsonrpc
input parsing. One "normal" that works as before
on the receiver thread. And "threaded" that
runs additional thread per each rpc session.

In "threaded" case all received data is sent to
separate thread to be parsed and result is received
on receiver thread as parsed jsons.

Motivation for "threaded" version is to free up
resources of receiver thread for other processing
work. Specifically in ovn / northd json parsing can
take from 9% to 30% of all computations.

Signed-off-by: Dmitry Porokh <dpor...@nvidia.com>
---
Note:
 v1:
   This patch breaks record/replay test because of
   some races with replay_seqno and filenames. All other
   tests passed without issues.
   
   Before trying to fix record/replay I would like to receive
   feedback from maintainers if this patch make sense for them
   and worth to be polished.
 v2:
   - Don't enable thread offload for ovsdb-server
     because record/replay test fail.
   - Fix: use NULL instead of 0.
   - Fix: commit summary message.
 v3:
   - Fix: comments and line length. (./utilities/checkpatch.py noted)

 lib/automake.mk           |   5 +
 lib/jsonrpc-in-normal.h   | 117 +++++++++++++++++++++
 lib/jsonrpc-in-threaded.c | 209 ++++++++++++++++++++++++++++++++++++++
 lib/jsonrpc-in-threaded.h |  80 +++++++++++++++
 lib/jsonrpc-in.c          | 156 ++++++++++++++++++++++++++++
 lib/jsonrpc-in.h          | 100 ++++++++++++++++++
 lib/jsonrpc.c             | 144 +++++++++++++++-----------
 lib/jsonrpc.h             |   3 +
 lib/stream.c              |   2 +-
 lib/stream.h              |   3 +
 ovsdb/ovsdb-server.c      |   7 ++
 11 files changed, 763 insertions(+), 63 deletions(-)
 create mode 100644 lib/jsonrpc-in-normal.h
 create mode 100644 lib/jsonrpc-in-threaded.c
 create mode 100644 lib/jsonrpc-in-threaded.h
 create mode 100644 lib/jsonrpc-in.c
 create mode 100644 lib/jsonrpc-in.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 78d6e6516..d002af998 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -181,6 +181,11 @@ lib_libopenvswitch_la_SOURCES = \
        lib/json.h \
        lib/jsonrpc.c \
        lib/jsonrpc.h \
+       lib/jsonrpc-in.c \
+       lib/jsonrpc-in.h \
+       lib/jsonrpc-in-normal.h \
+       lib/jsonrpc-in-threaded.c \
+       lib/jsonrpc-in-threaded.h \
        lib/lacp.c \
        lib/lacp.h \
        lib/latch.h \
diff --git a/lib/jsonrpc-in-normal.h b/lib/jsonrpc-in-normal.h
new file mode 100644
index 000000000..289644e31
--- /dev/null
+++ b/lib/jsonrpc-in-normal.h
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2025 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVS_JSONRPC_IN_NORMAL_H
+#define OVS_JSONRPC_IN_NORMAL_H
+
+#include <config.h>
+#include <stdbool.h>
+#include "byteq.h"
+#include "json.h"
+#include "util.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct jsonrpc_in_normal {
+    struct byteq input;
+    uint8_t input_buffer[4096 * 32];
+    struct json_parser *parser;
+};
+
+static inline void
+jsonrpc_in_normal_init(struct jsonrpc_in_normal *nin)
+{
+    byteq_init(&nin->input, nin->input_buffer, sizeof nin->input_buffer);
+    nin->parser = NULL;
+}
+
+static inline void *
+jsonrpc_in_normal_read_buffer(struct jsonrpc_in_normal *nin, size_t *size)
+{
+    if (byteq_is_empty(&nin->input)) {
+        *size = byteq_headroom(&nin->input);
+        return byteq_head(&nin->input);
+    } else {
+        *size = 0;
+        return NULL;
+    }
+}
+
+static inline void
+jsonrpc_in_normal_read_complete(struct jsonrpc_in_normal *nin, size_t size)
+{
+    byteq_advance_head(&nin->input, size);
+}
+
+static inline struct json *
+jsonrpc_in_normal_poll(struct jsonrpc_in_normal *nin)
+{
+    size_t n = byteq_tailroom(&nin->input);
+    if (n != 0) {
+        if (nin->parser == NULL) {
+            nin->parser = json_parser_create(0);
+        }
+        size_t used = json_parser_feed(nin->parser,
+                                       (char *) byteq_tail(&nin->input), n);
+        byteq_advance_tail(&nin->input, used);
+    }
+    if (nin->parser != NULL && json_parser_is_done(nin->parser)) {
+        struct json *json = json_parser_finish(nin->parser);
+        nin->parser = NULL;
+        return json;
+    }
+    return NULL;
+}
+
+static inline void
+jsonrpc_in_normal_cleanup(struct jsonrpc_in_normal *nin) {
+    json_parser_abort(nin->parser);
+    nin->parser = NULL;
+}
+
+static inline unsigned int
+jsonrpc_in_normal_get_received_bytes(const struct jsonrpc_in_normal *nin)
+{
+    return nin->input.head;
+}
+
+static inline int
+jsonrpc_in_normal_wait(struct jsonrpc_in_normal *nin)
+{
+    return byteq_is_empty(&nin->input)
+        ? JSONRPC_IN_IDLE : JSONRPC_IN_ACTIVE_WAKEUP_NOW;
+}
+
+static inline size_t
+jsonrpc_in_normal_fill_stream_report_data(struct jsonrpc_in_normal *nin,
+                                          void *data, size_t datasz)
+{
+    if (nin->input.head < nin->input.size) {
+        size_t towrite = MIN(datasz, nin->input.head);
+        memcpy(data, nin->input.buffer, towrite);
+        return towrite;
+    } else {
+        return 0;
+    }
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/jsonrpc-in-threaded.c b/lib/jsonrpc-in-threaded.c
new file mode 100644
index 000000000..fe92332c1
--- /dev/null
+++ b/lib/jsonrpc-in-threaded.c
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2025 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "openvswitch/vlog.h"
+#include "jsonrpc-in-threaded.h"
+#include "jsonrpc-in.h"
+
+VLOG_DEFINE_THIS_MODULE(jsonrpc_in_thr);
+
+static void *jsonrpc_in_threaded_worker(void *dummy);
+
+void
+jsonrpc_in_threaded_init(struct jsonrpc_in_threaded *tin)
+{
+    ovs_mutex_init(&tin->mutex);
+    pthread_cond_init(&tin->cond, NULL);
+    byteq_init(&tin->input, tin->input_buffer, sizeof tin->input_buffer);
+    tin->parser = NULL;
+    tin->shutdown = false;
+    latch_init(&tin->result_latch);
+    tin->jsons_head = tin->jsons_tail = 0;
+    pthread_create(&tin->thread, NULL, jsonrpc_in_threaded_worker, tin);
+}
+
+void *
+jsonrpc_in_threaded_read_buffer(struct jsonrpc_in_threaded *tin, size_t *size)
+{
+    void *data = NULL;
+    ovs_mutex_lock(&tin->mutex);
+    *size = byteq_headroom(&tin->input);
+    data = byteq_head(&tin->input);
+    ovs_mutex_unlock(&tin->mutex);
+    return data;
+}
+
+void
+jsonrpc_in_threaded_read_complete(struct jsonrpc_in_threaded *tin, size_t size)
+{
+    ovs_mutex_lock(&tin->mutex);
+    if (byteq_is_empty(&tin->input)) {
+        /* Only need to signal thread if it was empty
+         * (condition has changed) */
+        pthread_cond_signal(&tin->cond);
+    }
+    byteq_advance_head(&tin->input, size);
+    ovs_mutex_unlock(&tin->mutex);
+}
+
+struct json *
+jsonrpc_in_threaded_poll(struct jsonrpc_in_threaded *tin)
+{
+    struct json *result = NULL;
+    ovs_mutex_lock(&tin->mutex);
+    if (tin->jsons_head != tin->jsons_tail) {
+        result = tin->jsons[tin->jsons_tail % JSON_RPC_IN_NUM_PENDING_JSONS];
+        tin->jsons_tail++;
+    }
+    ovs_mutex_unlock(&tin->mutex);
+    return result;
+}
+
+void
+jsonrpc_in_threaded_cleanup(struct jsonrpc_in_threaded *tin)
+{
+    ovs_mutex_lock(&tin->mutex);
+    tin->shutdown = true;
+    pthread_cond_signal(&tin->cond);
+    ovs_mutex_unlock(&tin->mutex);
+    void *dummy;
+    pthread_join(tin->thread, &dummy);
+    while (tin->jsons_tail != tin->jsons_head) {
+        json_destroy(
+            tin->jsons[tin->jsons_tail % JSON_RPC_IN_NUM_PENDING_JSONS]);
+        tin->jsons_tail++;
+    }
+    latch_destroy(&tin->result_latch);
+    ovs_mutex_destroy(&tin->mutex);
+    pthread_cond_destroy(&tin->cond);
+}
+
+unsigned int
+jsonrpc_in_threaded_get_received_bytes(const struct jsonrpc_in_threaded *tin)
+{
+    ovs_mutex_lock(&tin->mutex);
+    unsigned int result = tin->input.head;
+    ovs_mutex_unlock(&tin->mutex);
+    return result;
+}
+
+int
+jsonrpc_in_threaded_wait(struct jsonrpc_in_threaded *tin)
+{
+    ovs_mutex_lock(&tin->mutex);
+    enum jsonrpc_in_wait_result result = JSONRPC_IN_IDLE;
+    latch_poll(&tin->result_latch);
+    if (tin->jsons_head != tin->jsons_tail) {
+        result = JSONRPC_IN_ACTIVE_WAKEUP_NOW;
+    } else if (!byteq_is_empty(&tin->input)) {
+        latch_wait(&tin->result_latch);
+        if (byteq_headroom(&tin->input) == 0) {
+            result = JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM;
+        } else {
+            result = JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM;
+        }
+    }
+    ovs_mutex_unlock(&tin->mutex);
+    return result;
+}
+
+int
+jsonrpc_in_threaded_status(struct jsonrpc_in_threaded *tin)
+{
+    ovs_mutex_lock(&tin->mutex);
+    enum jsonrpc_in_wait_result result = JSONRPC_IN_IDLE;
+    if (tin->jsons_head != tin->jsons_tail) {
+        result = JSONRPC_IN_ACTIVE_WAKEUP_NOW;
+    } else if (!byteq_is_empty(&tin->input)) {
+        if (byteq_headroom(&tin->input) == 0) {
+            result = JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM;
+        } else {
+            result = JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM;
+        }
+    }
+    ovs_mutex_unlock(&tin->mutex);
+    return result;
+}
+
+static void *
+jsonrpc_in_threaded_worker(void *tin_raw)
+{
+    struct jsonrpc_in_threaded *tin = tin_raw;
+    for (;;) {
+        ovs_mutex_lock(&tin->mutex);
+        /* Wait new data or until reader read json from result ring */
+        while ((byteq_is_empty(&tin->input)
+                || (tin->jsons_head - tin->jsons_tail)
+                == JSON_RPC_IN_NUM_PENDING_JSONS)
+               && !tin->shutdown) {
+            ovs_mutex_cond_wait(&tin->cond, &tin->mutex);
+        }
+        if (tin->shutdown) {
+            ovs_mutex_unlock(&tin->mutex);
+            break;
+        }
+        /* Jsons ring must be able to fit result */
+        ovs_assert(tin->jsons_head - tin->jsons_tail
+                   < JSON_RPC_IN_NUM_PENDING_JSONS);
+
+        const size_t tail_size = byteq_tailroom(&tin->input);
+        const char *tail = (const char *) byteq_tail(&tin->input);
+        ovs_mutex_unlock(&tin->mutex);
+        if (tin->parser == NULL) {
+            tin->parser = json_parser_create(0);
+        }
+        /* Parsing without mutex is safe because nobody can
+         * can write data between tail and head. */
+        size_t used = json_parser_feed(tin->parser, tail, tail_size);
+        struct json *json = NULL;
+        if (json_parser_is_done(tin->parser)) {
+            json = json_parser_finish(tin->parser);
+            tin->parser = NULL;
+        }
+        ovs_mutex_lock(&tin->mutex);
+        bool was_staturated = byteq_headroom(&tin->input) == 0;
+        byteq_advance_tail(&tin->input, used);
+        if (json != NULL) {
+            tin->jsons[tin->jsons_head % JSON_RPC_IN_NUM_PENDING_JSONS] = json;
+            tin->jsons_head++;
+            ovs_mutex_unlock(&tin->mutex);
+        } else {
+            ovs_mutex_unlock(&tin->mutex);
+        }
+        if (was_staturated || json != NULL) {
+            latch_set(&tin->result_latch);
+        }
+    }
+    return NULL;
+}
+
+size_t
+jsonrpc_in_threaded_fill_stream_report_data(struct jsonrpc_in_threaded *tin,
+                                            void *data, size_t datasz)
+{
+    ovs_mutex_lock(&tin->mutex);
+    if (tin->input.head < tin->input.size) {
+        size_t towrite = MIN(datasz, tin->input.head);
+        memcpy(data, tin->input.buffer, towrite);
+        ovs_mutex_unlock(&tin->mutex);
+        return towrite;
+    } else {
+        ovs_mutex_unlock(&tin->mutex);
+        return 0;
+    }
+}
diff --git a/lib/jsonrpc-in-threaded.h b/lib/jsonrpc-in-threaded.h
new file mode 100644
index 000000000..9d2cc65bb
--- /dev/null
+++ b/lib/jsonrpc-in-threaded.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2025 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVS_JSONRPC_IN_THREADED_H
+#define OVS_JSONRPC_IN_THREADED_H
+
+#include <config.h>
+#include <pthread.h>
+
+#include "openvswitch/thread.h"
+#include "byteq.h"
+#include "json.h"
+#include "latch.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define JSON_RPC_IN_NUM_PENDING_JSONS 16
+
+struct jsonrpc_in_threaded {
+    struct ovs_mutex mutex;
+    pthread_cond_t cond;
+    pthread_t thread;
+    struct byteq input;
+    uint8_t input_buffer[65536 * 2];
+    struct json_parser *parser;
+    bool shutdown;
+    struct latch result_latch;
+    struct json *jsons[JSON_RPC_IN_NUM_PENDING_JSONS];
+    unsigned int jsons_head;
+    unsigned int jsons_tail;
+};
+
+void jsonrpc_in_threaded_init(struct jsonrpc_in_threaded *tin);
+
+void *
+jsonrpc_in_threaded_read_buffer(struct jsonrpc_in_threaded *tin, size_t *size);
+
+void
+jsonrpc_in_threaded_read_complete(struct jsonrpc_in_threaded *tin,
+                                  size_t size);
+
+struct json *jsonrpc_in_threaded_poll(struct jsonrpc_in_threaded *tin);
+
+void jsonrpc_in_threaded_cleanup(struct jsonrpc_in_threaded *tin);
+
+unsigned int
+jsonrpc_in_threaded_get_received_bytes(const struct jsonrpc_in_threaded *tin);
+
+int jsonrpc_in_threaded_wait(struct jsonrpc_in_threaded *tin);
+
+
+int jsonrpc_in_threaded_status(struct jsonrpc_in_threaded *tin);
+
+bool jsonrpc_in_threaded_is_idle(struct jsonrpc_in_threaded *tin);
+
+size_t
+jsonrpc_in_threaded_fill_stream_report_data(struct jsonrpc_in_threaded *tin,
+                                            void *data, size_t datasz);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/jsonrpc-in.c b/lib/jsonrpc-in.c
new file mode 100644
index 000000000..47f98917f
--- /dev/null
+++ b/lib/jsonrpc-in.c
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2025 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <string.h>
+
+#include "openvswitch/thread.h"
+#include "jsonrpc-in.h"
+#include "jsonrpc-in-normal.h"
+#include "jsonrpc-in-threaded.h"
+#include "util.h"
+#include "json.h"
+#include "byteq.h"
+
+struct jsonrpc_in {
+    struct jsonrpc_in_config config;
+    union {
+        struct jsonrpc_in_normal normal;
+        struct jsonrpc_in_threaded threaded;
+    };
+};
+
+struct jsonrpc_in *jsonrpc_in_new(const struct jsonrpc_in_config *cfg)
+{
+    struct jsonrpc_in *input = xmalloc(sizeof(struct jsonrpc_in));
+    input->config = *cfg;
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        jsonrpc_in_normal_init(&input->normal);
+        return input;
+    case JSONRPC_IN_MODE_THREADED:
+        jsonrpc_in_threaded_init(&input->threaded);
+        return input;
+    }
+    OVS_NOT_REACHED();
+    free(input);
+    return NULL;
+}
+
+void *jsonrpc_in_read_buffer(struct jsonrpc_in *input, size_t *size)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        return jsonrpc_in_normal_read_buffer(&input->normal, size);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_read_buffer(&input->threaded, size);
+    }
+    OVS_NOT_REACHED();
+    return NULL;
+}
+
+void jsonrpc_in_read_complete(struct jsonrpc_in *input, size_t size)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        jsonrpc_in_normal_read_complete(&input->normal, size);
+        return;
+    case JSONRPC_IN_MODE_THREADED:
+        jsonrpc_in_threaded_read_complete(&input->threaded, size);
+        return;
+    }
+    OVS_NOT_REACHED();
+}
+
+struct json *jsonrpc_in_poll(struct jsonrpc_in *input)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        return jsonrpc_in_normal_poll(&input->normal);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_poll(&input->threaded);
+    }
+    OVS_NOT_REACHED();
+    return NULL;
+}
+
+void jsonrpc_in_cleanup(struct jsonrpc_in *input)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        jsonrpc_in_normal_cleanup(&input->normal);
+        break;
+    case JSONRPC_IN_MODE_THREADED:
+        jsonrpc_in_threaded_cleanup(&input->threaded);
+        break;
+    }
+    free(input);
+}
+
+unsigned int jsonrpc_in_get_received_bytes(const struct jsonrpc_in *input)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        return jsonrpc_in_normal_get_received_bytes(&input->normal);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_get_received_bytes(&input->threaded);
+    }
+    OVS_NOT_REACHED();
+    return 0;
+}
+
+enum jsonrpc_in_wait_result jsonrpc_in_wait(struct jsonrpc_in *input)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        return jsonrpc_in_normal_wait(&input->normal);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_wait(&input->threaded);
+    }
+    OVS_NOT_REACHED();
+    return 0;
+}
+
+enum jsonrpc_in_wait_result jsonrpc_in_status(struct jsonrpc_in *input)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        /* sic! normal does not really wait */
+        return jsonrpc_in_normal_wait(&input->normal);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_status(&input->threaded);
+    }
+    OVS_NOT_REACHED();
+    return 0;
+}
+
+size_t
+jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input,
+                                   void *data, size_t datasz)
+{
+    switch (input->config.mode) {
+    case JSONRPC_IN_MODE_NORMAL:
+        return jsonrpc_in_normal_fill_stream_report_data(&input->normal,
+                                                         data, datasz);
+    case JSONRPC_IN_MODE_THREADED:
+        return jsonrpc_in_threaded_fill_stream_report_data(&input->threaded,
+                                                           data, datasz);
+    }
+    OVS_NOT_REACHED();
+    return 0;
+}
+
+
diff --git a/lib/jsonrpc-in.h b/lib/jsonrpc-in.h
new file mode 100644
index 000000000..66d53ab84
--- /dev/null
+++ b/lib/jsonrpc-in.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2025 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVS_JSONRPC_IN_H
+#define OVS_JSONRPC_IN_H
+
+#include <config.h>
+#include <stdbool.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct json;
+struct jsonrpc_in;
+
+enum jsonrpc_in_mode {
+    JSONRPC_IN_MODE_NORMAL,
+    JSONRPC_IN_MODE_THREADED,
+};
+
+struct jsonrpc_in_config {
+    enum jsonrpc_in_mode mode;
+};
+
+#define JSONRPC_IN_CONFIG_DEFAULT {             \
+        .mode = JSONRPC_IN_MODE_NORMAL          \
+}
+
+enum jsonrpc_in_wait_result {
+    JSONRPC_IN_IDLE,
+    JSONRPC_IN_ACTIVE_WAKEUP_NOW,
+    JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM,
+    JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM,
+};
+
+/* Create jsonrcp input processor. Depends on mode.
+ * if mode is threaded then all raw data will be sent to
+ * separate thread for parsing. */
+struct jsonrpc_in *jsonrpc_in_new(const struct jsonrpc_in_config *cfg);
+
+/* Returns buffer for read from stream.
+ * Pointer is valid until jsonrpc_in_read_complete called.
+ * Size of data that can be written to buffer is returned in size
+ * parameter. Returned size can be 0 and it means that
+ * no room for more data is available. */
+void *jsonrpc_in_read_buffer(struct jsonrpc_in *input, size_t *size);
+
+/* Finishes read from stream with real amount of data
+ * that has been read from the stream. */
+void jsonrpc_in_read_complete(struct jsonrpc_in *input, size_t size);
+
+/* Polls parser for the next parsed json return NULL if no new jsons
+ * parsed. */
+struct json *jsonrpc_in_poll(struct jsonrpc_in *input);
+
+/* Function that is been called if to retrieve stream rport data
+ * from input. This data is used to create better diagonstics
+ * if stream of invalid type is connectect (for example TLS instead of TCP). */
+size_t
+jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input,
+                                   void *data, size_t datasz);
+
+/* Cleanup state of json rpc input processor */
+void jsonrpc_in_cleanup(struct jsonrpc_in *input);
+
+/* Returns counter of received bytes via stream */
+unsigned int jsonrpc_in_get_received_bytes(const struct jsonrpc_in *input);
+
+/* Called on input thread when it is going to wait
+ * for next main loop. Threaded version of input adds
+ * latch wait to polling descriptors if json parsing is in
+ * progress at the moment.  */
+enum jsonrpc_in_wait_result jsonrpc_in_wait(struct jsonrpc_in *input);
+/* Same as wait but whithout adding additional polling descriptors */
+enum jsonrpc_in_wait_result jsonrpc_in_status(struct jsonrpc_in *input);
+
+/* Writes data that can be used as stream report. */
+size_t
+jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input,
+                                   void *data, size_t datasz);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 90723a42b..7c4882081 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -34,6 +34,7 @@
 #include "stream.h"
 #include "svec.h"
 #include "timeval.h"
+#include "jsonrpc-in.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(jsonrpc);
@@ -46,9 +47,8 @@ struct jsonrpc {
     int status;
 
     /* Input. */
-    struct byteq input;
-    uint8_t input_buffer[4096];
-    struct json_parser *parser;
+    struct jsonrpc_in *input;
+    int error_received;
 
     /* Output. */
     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
@@ -62,8 +62,12 @@ struct jsonrpc {
 
 /* Rate limit for error messages. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+/* Configuration of jsonrpc input for all new sessions */
+static struct jsonrpc_in_config in_cfg = JSONRPC_IN_CONFIG_DEFAULT;
+
+static struct jsonrpc_msg *
+jsonrpc_parse_received_message(struct jsonrpc *, struct json *);
 
-static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
 static void jsonrpc_cleanup(struct jsonrpc *);
 static void jsonrpc_error(struct jsonrpc *, int error);
 
@@ -95,7 +99,8 @@ jsonrpc_open(struct stream *stream)
     rpc = xzalloc(sizeof *rpc);
     rpc->name = xstrdup(stream_get_name(stream));
     rpc->stream = stream;
-    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
+    rpc->input = jsonrpc_in_new(&in_cfg);
+    rpc->error_received = 0;
     ovs_list_init(&rpc->output);
 
     return rpc;
@@ -158,6 +163,12 @@ jsonrpc_wait(struct jsonrpc *rpc)
             stream_send_wait(rpc->stream);
         }
     }
+
+}
+
+/* Configures jsonrpc input processing for all new sessions */
+void jsonrpc_set_in_config(struct jsonrpc_in_config *cfg) {
+    in_cfg = *cfg;
 }
 
 /*
@@ -203,7 +214,10 @@ jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
 unsigned int
 jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
 {
-    return rpc->input.head;
+    if (rpc->input == NULL) {
+        return 0;
+    }
+    return jsonrpc_in_get_received_bytes(rpc->input);
 }
 
 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
@@ -328,67 +342,57 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 int
 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
 {
-    int i;
-
     *msgp = NULL;
     if (rpc->status) {
         return rpc->status;
     }
 
-    for (i = 0; i < 50; i++) {
-        size_t n, used;
-
-        /* Fill our input buffer if it's empty. */
-        if (byteq_is_empty(&rpc->input)) {
-            size_t chunk;
+    if (rpc->error_received == 0) {
+        size_t chunk;
+        void *data = jsonrpc_in_read_buffer(rpc->input, &chunk);
+        if (chunk != 0) {
             int retval;
-
-            byteq_fast_forward(&rpc->input);
-            chunk = byteq_headroom(&rpc->input);
-            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
-            if (retval < 0) {
-                if (retval == -EAGAIN) {
-                    return EAGAIN;
-                } else {
-                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
-                                 rpc->name, ovs_strerror(-retval));
-                    jsonrpc_error(rpc, -retval);
-                    return rpc->status;
-                }
+            int read_size = 0;
+            retval = stream_recv(rpc->stream, data, chunk);
+            if (retval > 0) {
+                read_size = retval;
             } else if (retval == 0) {
-                jsonrpc_error(rpc, EOF);
-                return EOF;
+                rpc->error_received = EOF;
+            } else if (retval != -EAGAIN) {
+                VLOG_WARN_RL(&rl, "%s: receive error: %s",
+                             rpc->name, ovs_strerror(-retval));
+                rpc->error_received = -retval;
             }
-            byteq_advance_head(&rpc->input, retval);
+            jsonrpc_in_read_complete(rpc->input, read_size);
         }
+    }
 
-        /* We have some input.  Feed it into the JSON parser. */
-        if (!rpc->parser) {
-            rpc->parser = json_parser_create(0);
+    struct json *json = jsonrpc_in_poll(rpc->input);
+    /* If we have complete JSON, attempt to parse it as JSON-RPC. */
+    if (json != NULL) {
+        *msgp = jsonrpc_parse_received_message(rpc, json);
+        if (*msgp) {
+            return 0;
         }
-        n = byteq_tailroom(&rpc->input);
-        used = json_parser_feed(rpc->parser,
-                                (char *) byteq_tail(&rpc->input), n);
-        byteq_advance_tail(&rpc->input, used);
-
-        /* If we have complete JSON, attempt to parse it as JSON-RPC. */
-        if (json_parser_is_done(rpc->parser)) {
-            *msgp = jsonrpc_parse_received_message(rpc);
-            if (*msgp) {
-                return 0;
-            }
-
-            if (rpc->status) {
-                const struct byteq *q = &rpc->input;
-                if (q->head <= q->size) {
-                    stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
-                                          &this_module, rpc->name);
-                }
-                return rpc->status;
+        if (rpc->status) {
+            uint8_t sdata[STREAM_CONTENT_REPORT_MIN_SIZE];
+            size_t written = jsonrpc_in_fill_stream_report_data(rpc->input,
+                                                                sdata,
+                                                                sizeof sdata);
+            if (written != 0) {
+                stream_report_content(sdata, written, STREAM_JSONRPC,
+                                      &this_module, rpc->name);
             }
+            return rpc->status;
         }
     }
 
+    if (rpc->error_received != 0
+        && jsonrpc_in_status(rpc->input) == JSONRPC_IN_IDLE) {
+        jsonrpc_error(rpc, rpc->error_received);
+        return rpc->status;
+    }
+
     /* We tried hard but didn't get a complete JSON message within the above
      * iterations.  We want to know how often we abort for this reason. */
     COVERAGE_INC(jsonrpc_recv_incomplete);
@@ -401,10 +405,28 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg 
**msgp)
 void
 jsonrpc_recv_wait(struct jsonrpc *rpc)
 {
-    if (rpc->status || !byteq_is_empty(&rpc->input)) {
+    if (rpc->status) {
         poll_immediate_wake_at(rpc->name);
-    } else {
-        stream_recv_wait(rpc->stream);
+        return;
+    }
+    switch (jsonrpc_in_wait(rpc->input)) {
+    case JSONRPC_IN_IDLE:
+        if (rpc->error_received == 0) {
+            stream_recv_wait(rpc->stream);
+        } else {
+            poll_immediate_wake_at(rpc->name);
+        }
+        break;
+    case JSONRPC_IN_ACTIVE_WAKEUP_NOW:
+        poll_immediate_wake_at(rpc->name);
+        break;
+    case JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM:
+        if (rpc->error_received == 0) {
+            stream_recv_wait(rpc->stream);
+        }
+        break;
+    case JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM:
+        break;
     }
 }
 
@@ -495,14 +517,10 @@ jsonrpc_transact_block(struct jsonrpc *rpc, struct 
jsonrpc_msg *request,
  * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
  * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
 static struct jsonrpc_msg *
-jsonrpc_parse_received_message(struct jsonrpc *rpc)
+jsonrpc_parse_received_message(struct jsonrpc *rpc, struct json *json)
 {
     struct jsonrpc_msg *msg;
-    struct json *json;
     char *error;
-
-    json = json_parser_finish(rpc->parser);
-    rpc->parser = NULL;
     if (json->type == JSON_STRING) {
         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
                      rpc->name, json_string(json));
@@ -540,8 +558,10 @@ jsonrpc_cleanup(struct jsonrpc *rpc)
     stream_close(rpc->stream);
     rpc->stream = NULL;
 
-    json_parser_abort(rpc->parser);
-    rpc->parser = NULL;
+    if (rpc->input != NULL) {
+        jsonrpc_in_cleanup(rpc->input);
+    }
+    rpc->input = NULL;
 
     ofpbuf_list_delete(&rpc->output);
     rpc->backlog = 0;
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 1baffcd80..8aa75746f 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -25,6 +25,7 @@
 #include "openvswitch/types.h"
 
 struct json;
+struct jsonrpc_in_config;
 struct jsonrpc_msg;
 struct pstream;
 struct reconnect_stats;
@@ -49,6 +50,8 @@ void jsonrpc_close(struct jsonrpc *);
 void jsonrpc_run(struct jsonrpc *);
 void jsonrpc_wait(struct jsonrpc *);
 
+void jsonrpc_set_in_config(struct jsonrpc_in_config *cfg);
+
 int jsonrpc_get_status(const struct jsonrpc *);
 size_t jsonrpc_get_backlog(const struct jsonrpc *);
 void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
diff --git a/lib/stream.c b/lib/stream.c
index aa48a973b..3b31e3c01 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -799,7 +799,7 @@ stream_parse_target_with_default_port(const char *target, 
int default_port,
 static enum stream_content_type
 stream_guess_content(const uint8_t *data, ssize_t size)
 {
-    if (size >= 2) {
+    if (size >= STREAM_CONTENT_REPORT_MIN_SIZE) {
 #define PAIR(A, B) (((A) << 8) | (B))
         switch (PAIR(data[0], data[1])) {
         case PAIR(0x16, 0x03):  /* Handshake, version 3. */
diff --git a/lib/stream.h b/lib/stream.h
index e30c51275..57cc43e27 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -91,6 +91,9 @@ enum stream_content_type {
     STREAM_JSONRPC
 };
 
+/* Number of bytes required to guess content type when reporting content */
+#define STREAM_CONTENT_REPORT_MIN_SIZE 2
+
 void stream_report_content(const void *, ssize_t, enum stream_content_type,
                            struct vlog_module *, const char *stream_name);
 
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index a247ae8f0..9ea5f7042 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -34,6 +34,7 @@
 #include "hash.h"
 #include "openvswitch/json.h"
 #include "jsonrpc.h"
+#include "jsonrpc-in.h"
 #include "jsonrpc-server.h"
 #include "openvswitch/list.h"
 #include "memory.h"
@@ -730,6 +731,7 @@ main(int argc, char *argv[])
     struct shash remotes = SHASH_INITIALIZER(&remotes);
     char *sync_from = NULL, *sync_exclude = NULL;
     bool is_backup;
+    struct jsonrpc_in_config jsonrpc_in_config = JSONRPC_IN_CONFIG_DEFAULT;
 
     struct server_config server_config = {
         .remotes = &remotes,
@@ -777,6 +779,11 @@ main(int argc, char *argv[])
 
     perf_counters_init();
 
+    /* Change this to threaded:
+     * jsonrpc_in_config.mode = JSONRPC_IN_MODE_THREADED;
+     * to enable parsing incoming jsons on separate thread */
+    jsonrpc_in_config.mode = JSONRPC_IN_MODE_NORMAL;
+    jsonrpc_set_in_config(&jsonrpc_in_config);
     /* Start ovsdb jsonrpc server.  Both read and write transactions are
      * allowed by default, individual remotes and databases will be configured
      * as read-only, if necessary. */
-- 
2.43.0
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to