Introduce two implementations of the jsonrpc input parsing. One "normal" that works as before on the receiver thread. And "threaded" that runs additional thread per each rpc session.
In "threaded" case all received data is sent to separate thread to be parsed and result is received on receiver thread as parsed jsons. Motivation for "threaded" version is to free up resources of receiver thread for other processing work. Specifically in ovn / northd json parsing can take from 9% to 30% of all computations. Signed-off-by: Dmitry Porokh <dpor...@nvidia.com> --- Note: This patch breaks record/replay test because of some races with replay_seqno and filenames. All other tests passed without issues. As option threaded option can be enabled optionally by command line argument. Before trying to fix record/replay I would like to receive feedback from maintainers if this patch make sense for them and worth to be polished. lib/automake.mk | 5 + lib/jsonrpc-in-normal.h | 106 ++++++++++++++++++++++ lib/jsonrpc-in-threaded.c | 186 ++++++++++++++++++++++++++++++++++++++ lib/jsonrpc-in-threaded.h | 63 +++++++++++++ lib/jsonrpc-in.c | 141 +++++++++++++++++++++++++++++ lib/jsonrpc-in.h | 66 ++++++++++++++ lib/jsonrpc.c | 139 +++++++++++++++------------- lib/jsonrpc.h | 3 + lib/stream.c | 2 +- lib/stream.h | 3 + ovsdb/ovsdb-server.c | 4 + 11 files changed, 655 insertions(+), 63 deletions(-) create mode 100644 lib/jsonrpc-in-normal.h create mode 100644 lib/jsonrpc-in-threaded.c create mode 100644 lib/jsonrpc-in-threaded.h create mode 100644 lib/jsonrpc-in.c create mode 100644 lib/jsonrpc-in.h diff --git a/lib/automake.mk b/lib/automake.mk index 78d6e6516..d002af998 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -181,6 +181,11 @@ lib_libopenvswitch_la_SOURCES = \ lib/json.h \ lib/jsonrpc.c \ lib/jsonrpc.h \ + lib/jsonrpc-in.c \ + lib/jsonrpc-in.h \ + lib/jsonrpc-in-normal.h \ + lib/jsonrpc-in-threaded.c \ + lib/jsonrpc-in-threaded.h \ lib/lacp.c \ lib/lacp.h \ lib/latch.h \ diff --git a/lib/jsonrpc-in-normal.h b/lib/jsonrpc-in-normal.h new file mode 100644 index 000000000..e32a702e2 --- /dev/null +++ b/lib/jsonrpc-in-normal.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVS_JSONRPC_IN_NORMAL_H +#define OVS_JSONRPC_IN_NORMAL_H + +#include <config.h> +#include <stdbool.h> +#include "byteq.h" +#include "json.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct jsonrpc_in_normal { + struct byteq input; + uint8_t input_buffer[4096*32]; + struct json_parser *parser; +}; + +static inline void +jsonrpc_in_normal_init(struct jsonrpc_in_normal *nin) { + byteq_init(&nin->input, nin->input_buffer, sizeof nin->input_buffer); + nin->parser = NULL; +} + +static inline void * +jsonrpc_in_normal_read_buffer(struct jsonrpc_in_normal *nin, size_t *size) { + if (byteq_is_empty(&nin->input)) { + *size = byteq_headroom(&nin->input); + return byteq_head(&nin->input); + } else { + *size = 0; + return NULL; + } +} + +static inline void +jsonrpc_in_normal_read_complete(struct jsonrpc_in_normal *nin, size_t size) { + byteq_advance_head(&nin->input, size); +} + +static inline struct json *jsonrpc_in_normal_poll(struct jsonrpc_in_normal *nin) { + size_t n = byteq_tailroom(&nin->input); + if (n != 0) { + if (nin->parser == NULL) { + nin->parser = json_parser_create(0); + } + size_t used = json_parser_feed(nin->parser, (char *)byteq_tail(&nin->input), n); + byteq_advance_tail(&nin->input, used); + } + if (nin->parser != NULL && json_parser_is_done(nin->parser)) { + struct json *json = json_parser_finish(nin->parser); + nin->parser = NULL; + return json; + } + return NULL; +} + +static inline void +jsonrpc_in_normal_cleanup(struct jsonrpc_in_normal *nin) { + json_parser_abort(nin->parser); + nin->parser = NULL; +} + +static inline unsigned int +jsonrpc_in_normal_get_received_bytes(const struct jsonrpc_in_normal *nin) { + return nin->input.head; +} + +static inline int +jsonrpc_in_normal_wait(struct jsonrpc_in_normal *nin) { + return byteq_is_empty(&nin->input) ? JSONRPC_IN_IDLE : JSONRPC_IN_ACTIVE_WAKEUP_NOW; +} + +static inline size_t +jsonrpc_in_normal_fill_stream_report_data(struct jsonrpc_in_normal *nin, void *data, size_t datasz) { + if (nin->input.head < nin->input.size) { + size_t towrite = MIN(datasz, nin->input.head); + memcpy(data, nin->input.buffer, towrite); + return towrite; + } else { + return 0; + } +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/jsonrpc-in-threaded.c b/lib/jsonrpc-in-threaded.c new file mode 100644 index 000000000..83798dea3 --- /dev/null +++ b/lib/jsonrpc-in-threaded.c @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "openvswitch/vlog.h" +#include "jsonrpc-in-threaded.h" +#include "jsonrpc-in.h" + +VLOG_DEFINE_THIS_MODULE(jsonrpc_in_thr); + +static void *jsonrpc_in_threaded_worker(void *dummy); + +void jsonrpc_in_threaded_init(struct jsonrpc_in_threaded *tin) { + ovs_mutex_init(&tin->mutex); + pthread_cond_init(&tin->cond, NULL); + byteq_init(&tin->input, tin->input_buffer, sizeof tin->input_buffer); + tin->parser = NULL; + tin->shutdown = false; + latch_init(&tin->result_latch); + tin->jsons_head = tin->jsons_tail = 0; + pthread_create(&tin->thread, NULL, jsonrpc_in_threaded_worker, tin); +} + +void *jsonrpc_in_threaded_read_buffer(struct jsonrpc_in_threaded *tin, size_t *size) { + void *data = NULL; + ovs_mutex_lock(&tin->mutex); + *size = byteq_headroom(&tin->input); + data = byteq_head(&tin->input); + ovs_mutex_unlock(&tin->mutex); + return data; +} + +void jsonrpc_in_threaded_read_complete(struct jsonrpc_in_threaded *tin, size_t size) { + ovs_mutex_lock(&tin->mutex); + if (byteq_is_empty(&tin->input)) { + /* Only need to signal thread if it was empty + * (condition has changed) */ + pthread_cond_signal(&tin->cond); + } + byteq_advance_head(&tin->input, size); + ovs_mutex_unlock(&tin->mutex); +} + +struct json *jsonrpc_in_threaded_poll(struct jsonrpc_in_threaded *tin) { + struct json *result = NULL; + ovs_mutex_lock(&tin->mutex); + if (tin->jsons_head != tin->jsons_tail) { + result = tin->jsons[tin->jsons_tail % JSON_RPC_IN_NUM_PENDING_JSONS]; + tin->jsons_tail++; + } + ovs_mutex_unlock(&tin->mutex); + return result; +} + +void jsonrpc_in_threaded_cleanup(struct jsonrpc_in_threaded *tin) { + ovs_mutex_lock(&tin->mutex); + tin->shutdown = true; + pthread_cond_signal(&tin->cond); + ovs_mutex_unlock(&tin->mutex); + void *dummy; + pthread_join(tin->thread, &dummy); + while (tin->jsons_tail != tin->jsons_head) { + json_destroy(tin->jsons[tin->jsons_tail % JSON_RPC_IN_NUM_PENDING_JSONS]); + tin->jsons_tail++; + } + latch_destroy(&tin->result_latch); + ovs_mutex_destroy(&tin->mutex); + pthread_cond_destroy(&tin->cond); +} + +unsigned int jsonrpc_in_threaded_get_received_bytes(const struct jsonrpc_in_threaded *tin) { + ovs_mutex_lock(&tin->mutex); + unsigned int result = tin->input.head; + ovs_mutex_unlock(&tin->mutex); + return result; +} + +int jsonrpc_in_threaded_wait(struct jsonrpc_in_threaded *tin) { + ovs_mutex_lock(&tin->mutex); + enum jsonrpc_in_wait_result result = JSONRPC_IN_IDLE; + latch_poll(&tin->result_latch); + if (tin->jsons_head != tin->jsons_tail) { + result = JSONRPC_IN_ACTIVE_WAKEUP_NOW; + } else if (!byteq_is_empty(&tin->input)) { + latch_wait(&tin->result_latch); + if (byteq_headroom(&tin->input) == 0) { + result = JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM; + } else { + result = JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM; + } + } + ovs_mutex_unlock(&tin->mutex); + return result; +} + +int jsonrpc_in_threaded_status(struct jsonrpc_in_threaded *tin) { + ovs_mutex_lock(&tin->mutex); + enum jsonrpc_in_wait_result result = JSONRPC_IN_IDLE; + if (tin->jsons_head != tin->jsons_tail) { + result = JSONRPC_IN_ACTIVE_WAKEUP_NOW; + } else if (!byteq_is_empty(&tin->input)) { + if (byteq_headroom(&tin->input) == 0) { + result = JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM; + } else { + result = JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM; + } + } + ovs_mutex_unlock(&tin->mutex); + return result; +} + +static void *jsonrpc_in_threaded_worker(void *tin_raw) { + struct jsonrpc_in_threaded *tin = tin_raw; + for (;;) { + ovs_mutex_lock(&tin->mutex); + /* Wait new data or until reader read json from result ring */ + while ((byteq_is_empty(&tin->input) + || (tin->jsons_head - tin->jsons_tail) + == JSON_RPC_IN_NUM_PENDING_JSONS) + && !tin->shutdown) { + ovs_mutex_cond_wait(&tin->cond, &tin->mutex); + } + if (tin->shutdown) { + ovs_mutex_unlock(&tin->mutex); + break; + } + /* Jsons ring must be able to fit result */ + ovs_assert(tin->jsons_head - tin->jsons_tail < JSON_RPC_IN_NUM_PENDING_JSONS); + + const size_t tail_size = byteq_tailroom(&tin->input); + const char *tail = (const char *)byteq_tail(&tin->input); + ovs_mutex_unlock(&tin->mutex); + if (tin->parser == NULL) { + tin->parser = json_parser_create(0); + } + /* Parsing without mutex is safe because nobody can + * can write data between tail and head. */ + size_t used = json_parser_feed(tin->parser, tail, tail_size); + struct json *json = NULL; + if (json_parser_is_done(tin->parser)) { + json = json_parser_finish(tin->parser); + tin->parser = NULL; + } + ovs_mutex_lock(&tin->mutex); + bool was_staturated = byteq_headroom(&tin->input) == 0; + byteq_advance_tail(&tin->input, used); + if (json != NULL) { + tin->jsons[tin->jsons_head % JSON_RPC_IN_NUM_PENDING_JSONS] = json; + tin->jsons_head++; + ovs_mutex_unlock(&tin->mutex); + } else { + ovs_mutex_unlock(&tin->mutex); + } + if (was_staturated || json != NULL) { + latch_set(&tin->result_latch); + } + } + return NULL; +} + +size_t jsonrpc_in_threaded_fill_stream_report_data(struct jsonrpc_in_threaded *tin, void *data, size_t datasz) { + ovs_mutex_lock(&tin->mutex); + if (tin->input.head < tin->input.size) { + size_t towrite = MIN(datasz, tin->input.head); + memcpy(data, tin->input.buffer, towrite); + ovs_mutex_unlock(&tin->mutex); + return towrite; + } else { + ovs_mutex_unlock(&tin->mutex); + return 0; + } +} diff --git a/lib/jsonrpc-in-threaded.h b/lib/jsonrpc-in-threaded.h new file mode 100644 index 000000000..5d479af13 --- /dev/null +++ b/lib/jsonrpc-in-threaded.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVS_JSONRPC_IN_THREADED_H +#define OVS_JSONRPC_IN_THREADED_H + +#include <config.h> +#include <pthread.h> + +#include "openvswitch/thread.h" +#include "byteq.h" +#include "json.h" +#include "latch.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define JSON_RPC_IN_NUM_PENDING_JSONS 16 + +struct jsonrpc_in_threaded { + struct ovs_mutex mutex; + pthread_cond_t cond; + pthread_t thread; + struct byteq input; + uint8_t input_buffer[65536*2]; + struct json_parser *parser; + bool shutdown; + struct latch result_latch; + struct json *jsons[JSON_RPC_IN_NUM_PENDING_JSONS]; + unsigned int jsons_head; + unsigned int jsons_tail; +}; + +void jsonrpc_in_threaded_init(struct jsonrpc_in_threaded *tin); +void *jsonrpc_in_threaded_read_buffer(struct jsonrpc_in_threaded *tin, size_t *size); +void jsonrpc_in_threaded_read_complete(struct jsonrpc_in_threaded *tin, size_t size); +struct json *jsonrpc_in_threaded_poll(struct jsonrpc_in_threaded *tin); +void jsonrpc_in_threaded_cleanup(struct jsonrpc_in_threaded *tin); +unsigned int jsonrpc_in_threaded_get_received_bytes(const struct jsonrpc_in_threaded *tin); +int jsonrpc_in_threaded_wait(struct jsonrpc_in_threaded *tin); +int jsonrpc_in_threaded_status(struct jsonrpc_in_threaded *tin); +bool jsonrpc_in_threaded_is_idle(struct jsonrpc_in_threaded *tin); +size_t jsonrpc_in_threaded_fill_stream_report_data(struct jsonrpc_in_threaded *tin, void *data, size_t datasz); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/jsonrpc-in.c b/lib/jsonrpc-in.c new file mode 100644 index 000000000..d87532a01 --- /dev/null +++ b/lib/jsonrpc-in.c @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <string.h> + +#include "openvswitch/thread.h" +#include "jsonrpc-in.h" +#include "jsonrpc-in-normal.h" +#include "jsonrpc-in-threaded.h" +#include "util.h" +#include "json.h" +#include "byteq.h" + +struct jsonrpc_in { + struct jsonrpc_in_config config; + union { + struct jsonrpc_in_normal normal; + struct jsonrpc_in_threaded threaded; + }; +}; + +struct jsonrpc_in *jsonrpc_in_new(const struct jsonrpc_in_config *cfg) { + struct jsonrpc_in *input = xmalloc(sizeof(struct jsonrpc_in)); + input->config = *cfg; + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + jsonrpc_in_normal_init(&input->normal); + return input; + case JSONRPC_IN_MODE_THREADED: + jsonrpc_in_threaded_init(&input->threaded); + return input; + } + OVS_NOT_REACHED(); + free(input); + return NULL; +} + +void *jsonrpc_in_read_buffer(struct jsonrpc_in *input, size_t *size) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_read_buffer(&input->normal, size); + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_read_buffer(&input->threaded, size); + } + OVS_NOT_REACHED(); + return 0; +} + +void jsonrpc_in_read_complete(struct jsonrpc_in *input, size_t size) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + jsonrpc_in_normal_read_complete(&input->normal, size); + return; + case JSONRPC_IN_MODE_THREADED: + jsonrpc_in_threaded_read_complete(&input->threaded, size); + return; + } + OVS_NOT_REACHED(); +} + +struct json *jsonrpc_in_poll(struct jsonrpc_in *input) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_poll(&input->normal); + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_poll(&input->threaded); + } + OVS_NOT_REACHED(); + return NULL; +} + +void jsonrpc_in_cleanup(struct jsonrpc_in *input) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + jsonrpc_in_normal_cleanup(&input->normal); + break; + case JSONRPC_IN_MODE_THREADED: + jsonrpc_in_threaded_cleanup(&input->threaded); + break; + } + free(input); +} + +unsigned int jsonrpc_in_get_received_bytes(const struct jsonrpc_in *input) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_get_received_bytes(&input->normal); + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_get_received_bytes(&input->threaded); + } + OVS_NOT_REACHED(); + return 0; +} + +enum jsonrpc_in_wait_result jsonrpc_in_wait(struct jsonrpc_in *input) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_wait(&input->normal); + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_wait(&input->threaded); + } + OVS_NOT_REACHED(); + return 0; +} + +enum jsonrpc_in_wait_result jsonrpc_in_status(struct jsonrpc_in *input) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_wait(&input->normal); /* sic! normal does not really wait */ + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_status(&input->threaded); + } + OVS_NOT_REACHED(); + return 0; +} + +size_t jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input, void *data, size_t datasz) { + switch (input->config.mode) { + case JSONRPC_IN_MODE_NORMAL: + return jsonrpc_in_normal_fill_stream_report_data(&input->normal, data, datasz); + case JSONRPC_IN_MODE_THREADED: + return jsonrpc_in_threaded_fill_stream_report_data(&input->threaded, data, datasz); + } + OVS_NOT_REACHED(); + return 0; +} + diff --git a/lib/jsonrpc-in.h b/lib/jsonrpc-in.h new file mode 100644 index 000000000..ac227e300 --- /dev/null +++ b/lib/jsonrpc-in.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVS_JSONRPC_IN_H +#define OVS_JSONRPC_IN_H + +#include <config.h> +#include <stdbool.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct json; +struct jsonrpc_in; + +enum jsonrpc_in_mode { + JSONRPC_IN_MODE_NORMAL, + JSONRPC_IN_MODE_THREADED, +}; + +struct jsonrpc_in_config { + enum jsonrpc_in_mode mode; +}; + +#define JSONRPC_IN_CONFIG_DEFAULT { \ + .mode = JSONRPC_IN_MODE_NORMAL \ +} + +enum jsonrpc_in_wait_result { + JSONRPC_IN_IDLE, + JSONRPC_IN_ACTIVE_WAKEUP_NOW, + JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM, + JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM, +}; + +struct jsonrpc_in *jsonrpc_in_new(const struct jsonrpc_in_config *cfg); +void *jsonrpc_in_read_buffer(struct jsonrpc_in *input, size_t *size); +void jsonrpc_in_read_complete(struct jsonrpc_in *input, size_t size); +struct json *jsonrpc_in_poll(struct jsonrpc_in *input); +size_t jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input, void *data, size_t datasz); +void jsonrpc_in_cleanup(struct jsonrpc_in *input); +unsigned int jsonrpc_in_get_received_bytes(const struct jsonrpc_in *input); +enum jsonrpc_in_wait_result jsonrpc_in_wait(struct jsonrpc_in *input); +enum jsonrpc_in_wait_result jsonrpc_in_status(struct jsonrpc_in *input); +/* writes data that can be used as stream report */ +size_t jsonrpc_in_fill_stream_report_data(struct jsonrpc_in *input, void *data, size_t datasz); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 90723a42b..8294accd4 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -34,6 +34,7 @@ #include "stream.h" #include "svec.h" #include "timeval.h" +#include "jsonrpc-in.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(jsonrpc); @@ -46,9 +47,8 @@ struct jsonrpc { int status; /* Input. */ - struct byteq input; - uint8_t input_buffer[4096]; - struct json_parser *parser; + struct jsonrpc_in *input; + int error_received; /* Output. */ struct ovs_list output; /* Contains "struct ofpbuf"s. */ @@ -62,8 +62,10 @@ struct jsonrpc { /* Rate limit for error messages. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); +/* Configuration of jsonrpc input for all new sessions */ +static struct jsonrpc_in_config in_cfg = JSONRPC_IN_CONFIG_DEFAULT; -static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); +static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *, struct json *); static void jsonrpc_cleanup(struct jsonrpc *); static void jsonrpc_error(struct jsonrpc *, int error); @@ -95,7 +97,8 @@ jsonrpc_open(struct stream *stream) rpc = xzalloc(sizeof *rpc); rpc->name = xstrdup(stream_get_name(stream)); rpc->stream = stream; - byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); + rpc->input = jsonrpc_in_new(&in_cfg); + rpc->error_received = 0; ovs_list_init(&rpc->output); return rpc; @@ -158,6 +161,12 @@ jsonrpc_wait(struct jsonrpc *rpc) stream_send_wait(rpc->stream); } } + +} + +/* Configures jsonrpc input processing for all new sessions */ +void jsonrpc_set_in_config(struct jsonrpc_in_config *cfg) { + in_cfg = *cfg; } /* @@ -203,7 +212,10 @@ jsonrpc_set_backlog_threshold(struct jsonrpc *rpc, unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *rpc) { - return rpc->input.head; + if (rpc->input == NULL) { + return 0; + } + return jsonrpc_in_get_received_bytes(rpc->input); } /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for @@ -328,67 +340,54 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) int jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) { - int i; - *msgp = NULL; if (rpc->status) { return rpc->status; } - for (i = 0; i < 50; i++) { - size_t n, used; - - /* Fill our input buffer if it's empty. */ - if (byteq_is_empty(&rpc->input)) { - size_t chunk; + if (rpc->error_received == 0) { + size_t chunk; + void *data = jsonrpc_in_read_buffer(rpc->input, &chunk); + if (chunk != 0) { int retval; - - byteq_fast_forward(&rpc->input); - chunk = byteq_headroom(&rpc->input); - retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk); - if (retval < 0) { - if (retval == -EAGAIN) { - return EAGAIN; - } else { - VLOG_WARN_RL(&rl, "%s: receive error: %s", - rpc->name, ovs_strerror(-retval)); - jsonrpc_error(rpc, -retval); - return rpc->status; - } + int read_size = 0; + retval = stream_recv(rpc->stream, data, chunk); + if (retval > 0) { + read_size = retval; } else if (retval == 0) { - jsonrpc_error(rpc, EOF); - return EOF; + rpc->error_received = EOF; + } else if (retval != -EAGAIN) { + VLOG_WARN_RL(&rl, "%s: receive error: %s", + rpc->name, ovs_strerror(-retval)); + rpc->error_received = -retval; } - byteq_advance_head(&rpc->input, retval); + jsonrpc_in_read_complete(rpc->input, read_size); } + } - /* We have some input. Feed it into the JSON parser. */ - if (!rpc->parser) { - rpc->parser = json_parser_create(0); + struct json *json = jsonrpc_in_poll(rpc->input); + /* If we have complete JSON, attempt to parse it as JSON-RPC. */ + if (json != NULL) { + *msgp = jsonrpc_parse_received_message(rpc, json); + if (*msgp) { + return 0; } - n = byteq_tailroom(&rpc->input); - used = json_parser_feed(rpc->parser, - (char *) byteq_tail(&rpc->input), n); - byteq_advance_tail(&rpc->input, used); - - /* If we have complete JSON, attempt to parse it as JSON-RPC. */ - if (json_parser_is_done(rpc->parser)) { - *msgp = jsonrpc_parse_received_message(rpc); - if (*msgp) { - return 0; - } - - if (rpc->status) { - const struct byteq *q = &rpc->input; - if (q->head <= q->size) { - stream_report_content(q->buffer, q->head, STREAM_JSONRPC, - &this_module, rpc->name); - } - return rpc->status; + if (rpc->status) { + uint8_t sdata[STREAM_CONTENT_REPORT_MIN_SIZE]; + size_t written = jsonrpc_in_fill_stream_report_data(rpc->input, sdata, sizeof sdata); + if (written != 0) { + stream_report_content(sdata, written, STREAM_JSONRPC, + &this_module, rpc->name); } + return rpc->status; } } + if (rpc->error_received != 0 && jsonrpc_in_status(rpc->input) == JSONRPC_IN_IDLE) { + jsonrpc_error(rpc, rpc->error_received); + return rpc->status; + } + /* We tried hard but didn't get a complete JSON message within the above * iterations. We want to know how often we abort for this reason. */ COVERAGE_INC(jsonrpc_recv_incomplete); @@ -401,10 +400,28 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) void jsonrpc_recv_wait(struct jsonrpc *rpc) { - if (rpc->status || !byteq_is_empty(&rpc->input)) { + if (rpc->status) { poll_immediate_wake_at(rpc->name); - } else { - stream_recv_wait(rpc->stream); + return; + } + switch (jsonrpc_in_wait(rpc->input)) { + case JSONRPC_IN_IDLE: + if (rpc->error_received == 0) { + stream_recv_wait(rpc->stream); + } else { + poll_immediate_wake_at(rpc->name); + } + break; + case JSONRPC_IN_ACTIVE_WAKEUP_NOW: + poll_immediate_wake_at(rpc->name); + break; + case JSONRPC_IN_ACTIVE_SLEEP_HAS_ROOM: + if (rpc->error_received == 0) { + stream_recv_wait(rpc->stream); + } + break; + case JSONRPC_IN_ACTIVE_SLEEP_NO_ROOM: + break; } } @@ -495,14 +512,10 @@ jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request, * JSON-RPC message. If successful, returns the JSON-RPC message. On failure, * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */ static struct jsonrpc_msg * -jsonrpc_parse_received_message(struct jsonrpc *rpc) +jsonrpc_parse_received_message(struct jsonrpc *rpc, struct json *json) { struct jsonrpc_msg *msg; - struct json *json; char *error; - - json = json_parser_finish(rpc->parser); - rpc->parser = NULL; if (json->type == JSON_STRING) { VLOG_WARN_RL(&rl, "%s: error parsing stream: %s", rpc->name, json_string(json)); @@ -540,8 +553,10 @@ jsonrpc_cleanup(struct jsonrpc *rpc) stream_close(rpc->stream); rpc->stream = NULL; - json_parser_abort(rpc->parser); - rpc->parser = NULL; + if (rpc->input != NULL) { + jsonrpc_in_cleanup(rpc->input); + } + rpc->input = NULL; ofpbuf_list_delete(&rpc->output); rpc->backlog = 0; diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index 1baffcd80..8aa75746f 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -25,6 +25,7 @@ #include "openvswitch/types.h" struct json; +struct jsonrpc_in_config; struct jsonrpc_msg; struct pstream; struct reconnect_stats; @@ -49,6 +50,8 @@ void jsonrpc_close(struct jsonrpc *); void jsonrpc_run(struct jsonrpc *); void jsonrpc_wait(struct jsonrpc *); +void jsonrpc_set_in_config(struct jsonrpc_in_config *cfg); + int jsonrpc_get_status(const struct jsonrpc *); size_t jsonrpc_get_backlog(const struct jsonrpc *); void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs, diff --git a/lib/stream.c b/lib/stream.c index aa48a973b..3b31e3c01 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -799,7 +799,7 @@ stream_parse_target_with_default_port(const char *target, int default_port, static enum stream_content_type stream_guess_content(const uint8_t *data, ssize_t size) { - if (size >= 2) { + if (size >= STREAM_CONTENT_REPORT_MIN_SIZE) { #define PAIR(A, B) (((A) << 8) | (B)) switch (PAIR(data[0], data[1])) { case PAIR(0x16, 0x03): /* Handshake, version 3. */ diff --git a/lib/stream.h b/lib/stream.h index e30c51275..57cc43e27 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -91,6 +91,9 @@ enum stream_content_type { STREAM_JSONRPC }; +/* Number of bytes required to guess content type when reporting content */ +#define STREAM_CONTENT_REPORT_MIN_SIZE 2 + void stream_report_content(const void *, ssize_t, enum stream_content_type, struct vlog_module *, const char *stream_name); diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index a247ae8f0..40a4dada1 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -34,6 +34,7 @@ #include "hash.h" #include "openvswitch/json.h" #include "jsonrpc.h" +#include "jsonrpc-in.h" #include "jsonrpc-server.h" #include "openvswitch/list.h" #include "memory.h" @@ -730,6 +731,7 @@ main(int argc, char *argv[]) struct shash remotes = SHASH_INITIALIZER(&remotes); char *sync_from = NULL, *sync_exclude = NULL; bool is_backup; + struct jsonrpc_in_config jsonrpc_in_config = JSONRPC_IN_CONFIG_DEFAULT; struct server_config server_config = { .remotes = &remotes, @@ -777,6 +779,8 @@ main(int argc, char *argv[]) perf_counters_init(); + jsonrpc_in_config.mode = JSONRPC_IN_MODE_THREADED; + jsonrpc_set_in_config(&jsonrpc_in_config); /* Start ovsdb jsonrpc server. Both read and write transactions are * allowed by default, individual remotes and databases will be configured * as read-only, if necessary. */ -- 2.43.0 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev