7. Various minor fixes to enable async io - make rx errors
visible to tx and vice versa. Make activity tracking for
reconnect async friendly, etc.
8. Enable Async IO in ovsdb
Signed-off-by: Anton Ivanov <[email protected]>
---
lib/async-io.c | 521 +++++++++++++++++++++++++++++++++++++++++
lib/async-io.h | 86 +++++++
lib/automake.mk | 2 +
lib/jsonrpc.c | 151 +++++-------
lib/stream-fd.c | 85 +++++++
lib/stream-provider.h | 34 ++-
lib/stream-ssl.c | 64 ++++-
lib/stream-tcp.c | 2 +
lib/stream-unix.c | 2 +
lib/stream-windows.c | 2 +
ovsdb/jsonrpc-server.c | 33 ++-
ovsdb/ovsdb-server.c | 2 +
12 files changed, 873 insertions(+), 111 deletions(-)
create mode 100644 lib/async-io.c
create mode 100644 lib/async-io.h
diff --git a/lib/async-io.c b/lib/async-io.c
new file mode 100644
index 000000000..39b39301c
--- /dev/null
+++ b/lib/async-io.c
@@ -0,0 +1,521 @@
+/*
+ * Copyright (c) 2020 Red Hat Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-provider.h"
+#include <errno.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include "coverage.h"
+#include "fatal-signal.h"
+#include "flow.h"
+#include "jsonrpc.h"
+#include "openflow/nicira-ext.h"
+#include "openflow/openflow.h"
+#include "openvswitch/dynamic-string.h"
+#include "openvswitch/ofp-print.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/vlog.h"
+#include "ovs-thread.h"
+#include "ovs-atomic.h"
+#include "packets.h"
+#include "openvswitch/poll-loop.h"
+#include "random.h"
+#include "socket-util.h"
+#include "util.h"
+#include "timeval.h"
+#include "async-io.h"
+#include "ovs-numa.h"
+
+VLOG_DEFINE_THIS_MODULE(async_io);
+
+static bool allow_async_io = false;
+
+static bool async_io_setup = false;
+static bool kill_async_io = false;
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools);
+
+static int pool_size;
+
+static struct async_io_pool *io_pool = NULL;
+
+static int do_async_recv(struct async_data *data);
+static int do_stream_flush(struct async_data *data);
+
+static inline bool not_in_error(struct async_data *data) {
+ int rx_error, tx_error;
+
+ if (!data->valid) {
+ return false;
+ }
+
+ atomic_read_relaxed(&data->rx_error, &rx_error);
+ atomic_read_relaxed(&data->tx_error, &tx_error);
+
+ return (
+ ((rx_error > 0) || (rx_error == -EAGAIN)) &&
+ ((tx_error >= 0) || (tx_error == -EAGAIN))
+ );
+}
+
+static inline bool in_error(struct async_data *data) {
+ return ! not_in_error(data);
+}
+
+
+static void *default_async_io_helper(void *arg) {
+ struct async_io_control *io_control =
+ (struct async_io_control *) arg;
+ struct async_data *data;
+ int retval;
+
+ do {
+ ovs_mutex_lock(&io_control->mutex);
+ latch_poll(&io_control->async_latch);
+ LIST_FOR_EACH (data, list_node, &io_control->work_items) {
+ long backlog, oldbacklog;
+ ovs_mutex_lock(&data->mutex);
+ retval = -EAGAIN;
+ if (not_in_error(data)) {
+ /*
+ * We stop reading if the input queue is full
+ */
+ if (byteq_headroom(&data->input) != 0) {
+ retval = do_async_recv(data);
+ } else {
+ poll_timer_wait(1);
+ retval = 0;
+ }
+ }
+ if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) {
+ stream_recv_wait(data->stream);
+ }
+ atomic_read_relaxed(&data->backlog, &oldbacklog);
+ if (not_in_error(data)) {
+ stream_run(data->stream);
+ do_stream_flush(data);
+ }
+ atomic_read_relaxed(&data->backlog, &backlog);
+ if (not_in_error(data)) {
+ if (backlog) {
+ /* upper layers will refuse to process rx
+ * until the tx is clear, so no point
+ * notifying them
+ */
+ stream_send_wait(data->stream);
+ }
+ if (!byteq_is_empty(&data->input) || oldbacklog) {
+ latch_set(&data->rx_notify);
+ }
+ }
+ if (data->valid && in_error(data)) {
+ /* make sure that the other thread(s) notice any errors.
+ * this should not be an else because errors may have
+ * changed inside the ifs above.
+ */
+ latch_set(&data->rx_notify);
+ data->valid = false;
+ }
+ if (not_in_error(data)) {
+ stream_run_wait(data->stream);
+ }
+ ovs_mutex_unlock(&data->mutex);
+ }
+ ovs_mutex_unlock(&io_control->mutex);
+ latch_wait(&io_control->async_latch);
+ poll_block();
+ } while (!kill_async_io);
+ return arg;
+}
+
+static void async_io_hook(void *aux OVS_UNUSED) {
+ int i;
+ static struct async_io_pool *pool;
+ kill_async_io = true;
+ LIST_FOR_EACH (pool, list_node, &io_pools) {
+ for (i = 0; i < pool->size ; i++) {
+ latch_set(&pool->controls[i].async_latch);
+ latch_destroy(&pool->controls[i].async_latch);
+ }
+ }
+}
+
+static void setup_async_io(void) {
+ int cores, nodes;
+
+ nodes = ovs_numa_get_n_numas();
+ if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+ nodes = 1;
+ }
+ cores = ovs_numa_get_n_cores();
+ if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+ pool_size = 4;
+ } else {
+ pool_size = cores / nodes;
+ }
+ fatal_signal_add_hook(async_io_hook, NULL, NULL, true);
+ async_io_setup = true;
+}
+
+struct async_io_pool *add_pool(void *(*start)(void *)){
+
+ struct async_io_pool *new_pool = NULL;
+ struct async_io_control *io_control;
+ int i;
+
+ ovs_mutex_lock(&init_mutex);
+
+ if (!async_io_setup) {
+ setup_async_io();
+ }
+
+ new_pool = xmalloc(sizeof(struct async_io_pool));
+ new_pool->size = pool_size; /* we may make this more dynamic later */
+
+ ovs_list_push_back(&io_pools, &new_pool->list_node);
+
+ new_pool->controls =
+ xmalloc(sizeof(struct async_io_control) * new_pool->size);
+ for (i = 0; i < new_pool->size; i++) {
+ io_control = &new_pool->controls[i];
+ latch_init(&io_control->async_latch);
+ ovs_mutex_init(&io_control->mutex);
+ ovs_list_init(&io_control->work_items);
+ }
+ for (i = 0; i < pool_size; i++) {
+ ovs_thread_create("async io helper", start, &new_pool->controls[i]);
+ }
+ ovs_mutex_unlock(&init_mutex);
+ return new_pool;
+}
+
+void
+async_init_data(struct async_data *data, struct stream *stream)
+{
+ struct async_io_control *target_control;
+ unsigned int buffer_size;
+
+ data->stream = stream;
+#ifdef __linux__
+ buffer_size = getpagesize();
+ if (!is_pow2(buffer_size)) {
+ buffer_size = ASYNC_BUFFER_SIZE;
+ }
+#else
+ buffer_size = ASYNC_BUFFER_SIZE;
+#endif
+#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600)
+ /* try to allocate a buffer_size as aligned, that by default is one page
+ * if that fails, fall back to normal memory allocation.
+ */
+ if (posix_memalign(
+ (void **) &data->input_buffer, buffer_size, buffer_size)) {
+ data->input_buffer = xmalloc(buffer_size);
+ }
+#else
+ data->input_buffer = xmalloc(buffer_size);
+#endif
+ byteq_init(&data->input, data->input_buffer, buffer_size);
+ ovs_list_init(&data->output);
+ data->output_count = 0;
+ data->rx_error = ATOMIC_VAR_INIT(-EAGAIN);
+ data->tx_error = ATOMIC_VAR_INIT(0);
+ data->active = ATOMIC_VAR_INIT(false);
+ data->backlog = ATOMIC_VAR_INIT(0);
+ ovs_mutex_init(&data->mutex);
+ data->async_mode = allow_async_io;
+ data->valid = true;
+ if (data->async_mode) {
+ if (!io_pool) {
+ io_pool = add_pool(default_async_io_helper);
+ }
+ data->async_id = random_uint32();
+ target_control = &io_pool->controls[data->async_id % io_pool->size];
+ /* these are just fd pairs, no need to play with pointers, we
+ * can pass them around
+ */
+ data->tx_run_notify = target_control->async_latch;
+ latch_init(&data->rx_notify);
+ ovs_mutex_lock(&target_control->mutex);
+ ovs_list_push_back(&target_control->work_items, &data->list_node);
+ ovs_mutex_unlock(&target_control->mutex);
+ latch_set(&target_control->async_latch);
+ }
+}
+
+void
+async_stream_enable(struct async_data *data)
+{
+ data->async_mode = allow_async_io;
+}
+
+void
+async_stream_disable(struct async_data *data)
+{
+ struct async_io_control *target_control;
+ bool needs_wake = false;
+
+
+ if (data->async_mode) {
+ if (not_in_error(data) && (async_get_backlog(data) > 0)) {
+ needs_wake = true;
+ latch_poll(&data->rx_notify);
+ latch_wait(&data->rx_notify);
+ latch_set(&data->tx_run_notify);
+ /* limit this to 50ms - should be enough for
+ * a single flush and we will not get stuck here
+ * waiting for a send to complete
+ */
+ poll_timer_wait(50);
+ poll_block();
+ }
+ if (needs_wake) {
+ /* we have lost all poll-wait info because we block()-ed
+ * locally, we need to force the upper layers to rerun so
+ * that they reinstate the correct waits
+ */
+ poll_immediate_wake();
+ }
+ target_control = &io_pool->controls[data->async_id % io_pool->size];
+ ovs_mutex_lock(&target_control->mutex);
+ ovs_list_remove(&data->list_node);
+ ovs_mutex_unlock(&target_control->mutex);
+ data->async_mode = false;
+ latch_destroy(&data->rx_notify);
+ }
+ if (data->input_buffer) {
+ free(data->input_buffer);
+ data->input_buffer = NULL;
+ }
+}
+
+void
+async_cleanup_data(struct async_data *data)
+{
+ if (async_get_backlog(data)) {
+ ofpbuf_list_delete(&data->output);
+ }
+ atomic_store_relaxed(&data->backlog, 0);
+ data->output_count = 0;
+}
+
+/* Routines intended for async IO */
+
+long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) {
+ long retval = -EAGAIN;
+ long discard;
+
+ ovs_mutex_lock(&data->mutex);
+ if (buf) {
+ ovs_list_push_back(&data->output, &buf->list_node);
+ data->output_count ++;
+ atomic_add_relaxed(&data->backlog, buf->size, &discard);
+ atomic_thread_fence(memory_order_release);
+ }
+ atomic_read_relaxed(&data->backlog, &retval);
+ ovs_mutex_unlock(&data->mutex);
+ return retval;
+}
+
+static int do_stream_flush(struct async_data *data) {
+ struct ofpbuf *buf;
+ int count = 0;
+ bool stamp = false;
+ int retval = -stream_connect(data->stream);
+ long discard;
+
+ if (!retval) {
+ while (!ovs_list_is_empty(&data->output) && count < 10) {
+ buf = ofpbuf_from_list(data->output.next);
+ if (data->stream->class->enqueue) {
+ ovs_list_remove(&buf->list_node);
+ retval = (data->stream->class->enqueue)(data->stream, buf);
+ if (retval > 0) {
+ data->output_count--;
+ } else {
+ ovs_list_push_front(&data->output, &buf->list_node);
+ }
+ } else {
+ retval = stream_send(data->stream, buf->data, buf->size);
+ if (retval > 0) {
+ stamp = true;
+ atomic_sub_relaxed(&data->backlog, retval, &discard);
+ ofpbuf_pull(buf, retval);
+ if (!buf->size) {
+ /* stream now owns buf */
+ ovs_list_remove(&buf->list_node);
+ data->output_count--;
+ ofpbuf_delete(buf);
+ }
+ }
+ }
+ if (retval <= 0) {
+ break;
+ }
+ count++;
+ }
+ if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) {
+ (data->stream->class->flush)(data->stream, &retval);
+ if (retval > 0) {
+ stamp = true;
+ atomic_sub_relaxed(&data->backlog, retval, &discard);
+ }
+ }
+ if (stamp) {
+ atomic_store_relaxed(&data->active, true);
+ }
+ }
+ atomic_store_relaxed(&data->tx_error, retval);
+ return retval;
+}
+
+int async_stream_flush(struct async_data *data) {
+ int retval;
+
+ if (data->async_mode) {
+ atomic_read_relaxed(&data->tx_error, &retval);
+ if (retval >= 0) {
+ retval = -EAGAIN; /* fake a busy so that upper layers do not
+ * retry, we will flush the backlog in the
+ * background
+ */
+ }
+ if (async_get_backlog(data)) {
+ latch_set(&data->tx_run_notify);
+ }
+ } else {
+ retval = do_stream_flush(data);
+ }
+ return retval;
+}
+
+static int do_async_recv(struct async_data *data) {
+ size_t chunk;
+ int retval;
+
+ atomic_read_relaxed(&data->rx_error, &retval);
+ if (retval > 0 || retval == -EAGAIN) {
+ chunk = byteq_headroom(&data->input);
+ if (chunk > 0) {
+ retval = stream_recv(
+ data->stream, byteq_head(&data->input), chunk);
+ if (retval > 0) {
+ byteq_advance_head(&data->input, retval);
+ }
+ }
+ }
+ if (retval > 0 || retval == -EAGAIN) {
+ retval = byteq_used(&data->input);
+ if (retval == 0) {
+ retval = -EAGAIN;
+ }
+ }
+ atomic_store_relaxed(&data->rx_error, retval);
+ return retval;
+}
+
+
+int async_stream_recv(struct async_data *data) {
+ int retval = -EAGAIN;
+
+ if (data->async_mode) {
+ atomic_read_relaxed(&data->rx_error, &retval);
+ /* clear RX notifications */
+ latch_poll(&data->rx_notify);
+ /* fake a retval from byteq usage */
+ if (retval > 0 || retval == -EAGAIN) {
+ retval = byteq_used(&data->input);
+ if (retval == 0) {
+ retval = -EAGAIN;
+ }
+ }
+ } else {
+ retval = do_async_recv(data);
+ }
+ return retval;
+}
+
+void async_stream_run(struct async_data *data) {
+ if (!data->async_mode) {
+ stream_run(data->stream);
+ } else {
+ latch_set(&data->tx_run_notify);
+ }
+ }
+
+void async_io_kick(struct async_data *data) {
+ if (data->async_mode) {
+ latch_set(&data->tx_run_notify);
+ }
+}
+
+void async_recv_wait(struct async_data *data) {
+ if (data->async_mode) {
+ latch_poll(&data->rx_notify);
+ latch_wait(&data->rx_notify);
+ } else {
+ stream_recv_wait(data->stream);
+ }
+}
+
+void async_io_enable(void) {
+ allow_async_io = true;
+}
+
+/* Accessors for JSON RPC */
+
+struct byteq *async_get_input(struct async_data *data) {
+ return &data->input;
+}
+struct stream *async_get_stream(struct async_data *data) {
+ return data->stream;
+}
+
+bool async_output_is_empty(struct async_data *data) {
+ bool retval;
+ ovs_mutex_lock(&data->mutex);
+ /* backlog tracks backlog across the full stack all the
+ * way to the actual send. It is the source of truth
+ * if we have output or not so anybody asking if we
+ * have output should be told if we have backlog
+ * instead.
+ */
+ retval = (data->backlog == 0);
+ ovs_mutex_unlock(&data->mutex);
+ return retval;
+}
+
+long async_get_backlog(struct async_data *data) {
+ long retval;
+ /* This is used only by the unixctl connection
+ * so not worth it to convert backlog to atomics
+ */
+ atomic_read_relaxed(&data->backlog, &retval);
+ return retval;
+}
+
+bool async_get_active(struct async_data *data) {
+ bool test = true;
+ return atomic_compare_exchange_weak(&data->active, &test, false);
+}
diff --git a/lib/async-io.h b/lib/async-io.h
new file mode 100644
index 000000000..dea070ee6
--- /dev/null
+++ b/lib/async-io.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2020 Red Hat, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ASYNC_IO_H
+#define ASYNC_IO_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include "openvswitch/types.h"
+#include "openvswitch/ofpbuf.h"
+#include "socket-util.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "latch.h"
+#include "byteq.h"
+#include "util.h"
+
+#define ASYNC_BUFFER_SIZE (4096)
+
+struct stream;
+
+struct async_data {
+ struct stream *stream;
+ struct ovs_list output;
+ struct ovs_list list_node;
+ long backlog;
+ size_t output_count;
+ atomic_bool active;
+ atomic_int rx_error, tx_error;
+ uint32_t async_id;
+ struct latch rx_notify, tx_run_notify;
+ struct ovs_mutex mutex;
+ bool async_mode, valid;
+ struct byteq input;
+ uint8_t *input_buffer;
+};
+
+struct async_io_control {
+ struct latch async_latch;
+ struct ovs_list work_items;
+ struct ovs_mutex mutex;
+};
+
+struct async_io_pool {
+ struct ovs_list list_node;
+ struct async_io_control *controls;
+ int size;
+};
+
+struct async_io_pool *add_pool(void *(*start)(void *));
+
+long async_stream_enqueue(struct async_data *, struct ofpbuf *buf);
+int async_stream_flush(struct async_data *);
+int async_stream_recv(struct async_data *);
+struct byteq *async_get_input(struct async_data *);
+struct stream *async_get_stream(struct async_data *);
+bool async_output_is_empty(struct async_data *);
+long async_get_backlog(struct async_data *);
+bool async_get_active(struct async_data *);
+
+void async_stream_enable(struct async_data *);
+void async_stream_disable(struct async_data *);
+
+void async_init_data(struct async_data *, struct stream *);
+void async_cleanup_data(struct async_data *);
+void async_stream_run(struct async_data *data);
+void async_io_kick(struct async_data *data);
+void async_recv_wait(struct async_data *data);
+void async_io_enable(void);
+
+#endif /* async-io.h */
diff --git a/lib/automake.mk b/lib/automake.mk
index 39ff70650..d00a9a2ff 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \
lib/aes128.c \
lib/aes128.h \
lib/async-append.h \
+ lib/async-io.h \
+ lib/async-io.c \
lib/backtrace.c \
lib/backtrace.h \
lib/bfd.c \
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 830b9910f..0ab83a0d7 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -30,28 +30,23 @@
#include "openvswitch/poll-loop.h"
#include "reconnect.h"
#include "stream.h"
+#include "stream-provider.h"
#include "svec.h"
#include "timeval.h"
+#include "async-io.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(jsonrpc);
struct jsonrpc {
- struct stream *stream;
char *name;
int status;
-
- /* Input. */
- struct byteq input;
- uint8_t input_buffer[4096];
struct json_parser *parser;
-
- /* Output. */
- struct ovs_list output; /* Contains "struct ofpbuf"s. */
- size_t output_count; /* Number of elements in "output". */
- size_t backlog;
+ struct async_data data;
};
+#define MIN_IDLE_TIME 10
+
/* Rate limit for error messages. */
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
@@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
static void jsonrpc_cleanup(struct jsonrpc *);
static void jsonrpc_error(struct jsonrpc *, int error);
+static inline struct async_data *adata(struct jsonrpc *rpc) {
+ return &rpc->data;
+}
+
+
/* This is just the same as stream_open() except that it uses the default
* JSONRPC port if none is specified. */
int
@@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream)
rpc = xzalloc(sizeof *rpc);
rpc->name = xstrdup(stream_get_name(stream));
- rpc->stream = stream;
- byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
- ovs_list_init(&rpc->output);
-
+ async_init_data(adata(rpc), stream);
+ async_stream_enable(adata(rpc));
return rpc;
}
@@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc)
void
jsonrpc_run(struct jsonrpc *rpc)
{
+ int retval;
if (rpc->status) {
return;
}
- stream_run(rpc->stream);
- while (!ovs_list_is_empty(&rpc->output)) {
- struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
- int retval;
-
- retval = stream_send(rpc->stream, buf->data, buf->size);
- if (retval >= 0) {
- rpc->backlog -= retval;
- ofpbuf_pull(buf, retval);
- if (!buf->size) {
- ovs_list_remove(&buf->list_node);
- rpc->output_count--;
- ofpbuf_delete(buf);
- }
- } else {
+ async_stream_run(adata(rpc));
+ do {
+ retval = async_stream_flush(&rpc->data);
+ if (retval < 0) {
if (retval != -EAGAIN) {
VLOG_WARN_RL(&rl, "%s: send error: %s",
rpc->name, ovs_strerror(-retval));
jsonrpc_error(rpc, -retval);
}
- break;
}
- }
+ } while (retval > 0);
}
/* Arranges for the poll loop to wake up when 'rpc' needs to perform
@@ -144,9 +131,13 @@ void
jsonrpc_wait(struct jsonrpc *rpc)
{
if (!rpc->status) {
- stream_run_wait(rpc->stream);
- if (!ovs_list_is_empty(&rpc->output)) {
- stream_send_wait(rpc->stream);
+ if (adata(rpc)->async_mode) {
+ async_recv_wait(adata(rpc));
+ } else {
+ stream_run_wait(rpc->data.stream);
+ if (!async_output_is_empty(adata(rpc))) {
+ stream_send_wait(async_get_stream(adata(rpc)));
+ }
}
}
}
@@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc)
size_t
jsonrpc_get_backlog(const struct jsonrpc *rpc)
{
- return rpc->status ? 0 : rpc->backlog;
+ return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc));
}
/* Returns the number of bytes that have been received on 'rpc''s underlying
@@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
unsigned int
jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
{
- return rpc->input.head;
+ return async_get_input(adata((struct jsonrpc *) rpc))->head;
}
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
@@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char
*title,
* buffered in 'rpc'.)
*
* Always takes ownership of 'msg', regardless of success. */
+
int
jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
{
struct ofpbuf *buf;
struct json *json;
struct ds ds = DS_EMPTY_INITIALIZER;
- size_t length;
if (rpc->status) {
jsonrpc_msg_destroy(msg);
@@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
json = jsonrpc_msg_to_json(msg);
json_to_ds(json, 0, &ds);
- length = ds.length;
json_destroy(json);
buf = xmalloc(sizeof *buf);
ofpbuf_use_ds(buf, &ds);
- ovs_list_push_back(&rpc->output, &buf->list_node);
- rpc->output_count++;
- rpc->backlog += length;
-
- if (rpc->output_count >= 50) {
- VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
- " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
- rpc->output_count, rpc->backlog);
- }
+ async_stream_enqueue(adata(rpc), buf);
- if (rpc->backlog == length) {
- jsonrpc_run(rpc);
- }
+ jsonrpc_run(rpc);
return rpc->status;
}
@@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
int
jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
{
- int i;
+ int i, retval;
*msgp = NULL;
if (rpc->status) {
@@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg
**msgp)
size_t n, used;
/* Fill our input buffer if it's empty. */
- if (byteq_is_empty(&rpc->input)) {
- size_t chunk;
- int retval;
-
- chunk = byteq_headroom(&rpc->input);
- retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
- if (retval < 0) {
- if (retval == -EAGAIN) {
- return EAGAIN;
- } else {
- VLOG_WARN_RL(&rl, "%s: receive error: %s",
- rpc->name, ovs_strerror(-retval));
- jsonrpc_error(rpc, -retval);
- return rpc->status;
- }
- } else if (retval == 0) {
- jsonrpc_error(rpc, EOF);
- return EOF;
+ retval = async_stream_recv(adata(rpc));
+ if (retval < 0) {
+ if (retval == -EAGAIN) {
+ return EAGAIN;
+ } else {
+ VLOG_WARN_RL(&rl, "%s: receive error: %s",
+ rpc->name, ovs_strerror(-retval));
+ jsonrpc_error(rpc, -retval);
+ return rpc->status;
}
- byteq_advance_head(&rpc->input, retval);
+ } else if (retval == 0) {
+ jsonrpc_error(rpc, EOF);
+ return EOF;
}
/* We have some input. Feed it into the JSON parser. */
if (!rpc->parser) {
rpc->parser = json_parser_create(0);
}
- n = byteq_tailroom(&rpc->input);
+ n = byteq_tailroom(async_get_input(adata(rpc)));
+ if (n == 0) {
+ break;
+ }
used = json_parser_feed(rpc->parser,
- (char *) byteq_tail(&rpc->input), n);
- byteq_advance_tail(&rpc->input, used);
+ (char *) byteq_tail(async_get_input(adata(rpc))), n);
+ byteq_advance_tail(async_get_input(adata(rpc)), used);
/* If we have complete JSON, attempt to parse it as JSON-RPC. */
if (json_parser_is_done(rpc->parser)) {
@@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
}
if (rpc->status) {
- const struct byteq *q = &rpc->input;
+ const struct byteq *q = async_get_input(adata(rpc));
if (q->head <= q->size) {
stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
&this_module, rpc->name);
@@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg
**msgp)
void
jsonrpc_recv_wait(struct jsonrpc *rpc)
{
- if (rpc->status || !byteq_is_empty(&rpc->input)) {
+ if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) {
poll_immediate_wake_at(rpc->name);
} else {
- stream_recv_wait(rpc->stream);
+ async_recv_wait(adata(rpc));
}
}
@@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
for (;;) {
jsonrpc_run(rpc);
- if (ovs_list_is_empty(&rpc->output) || rpc->status) {
+ if (async_output_is_empty(adata(rpc)) || rpc->status) {
return rpc->status;
}
jsonrpc_wait(rpc);
@@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error)
static void
jsonrpc_cleanup(struct jsonrpc *rpc)
{
- stream_close(rpc->stream);
- rpc->stream = NULL;
+ async_stream_disable(adata(rpc));
+ stream_close(rpc->data.stream);
+ rpc->data.stream = NULL;
json_parser_abort(rpc->parser);
rpc->parser = NULL;
- ofpbuf_list_delete(&rpc->output);
- rpc->backlog = 0;
- rpc->output_count = 0;
+ async_cleanup_data(adata(rpc));
}
static struct jsonrpc_msg *
@@ -998,12 +973,14 @@ jsonrpc_session_run(struct jsonrpc_session *s)
}
if (s->rpc) {
- size_t backlog;
int error;
+ bool active = async_get_active(adata(s->rpc));
- backlog = jsonrpc_get_backlog(s->rpc);
jsonrpc_run(s->rpc);
- if (jsonrpc_get_backlog(s->rpc) < backlog) {
+
+ active |= async_get_active(adata(s->rpc));
+
+ if (active) {
/* Data previously caught in a queue was successfully sent (or
* there's an error, which we'll catch below.)
*
@@ -1103,8 +1080,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s)
const char *
jsonrpc_session_get_id(const struct jsonrpc_session *s)
{
- if (s->rpc && s->rpc->stream) {
- return stream_get_peer_id(s->rpc->stream);
+ if (s->rpc && async_get_stream(adata(s->rpc))) {
+ return stream_get_peer_id(adata(s->rpc)->stream);
} else {
return NULL;
}
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 30622929b..6a6353002 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -30,6 +30,7 @@
#include "stream-provider.h"
#include "stream.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/list.h"
VLOG_DEFINE_THIS_MODULE(stream_fd);
@@ -40,6 +41,9 @@ struct stream_fd
struct stream stream;
int fd;
int fd_type;
+ bool can_read, can_write;
+ struct ovs_list output;
+ int queue_depth;
};
static const struct stream_class stream_fd_class;
@@ -67,6 +71,10 @@ new_fd_stream(char *name, int fd, int connect_status, int
fd_type,
stream_init(&s->stream, &stream_fd_class, connect_status, name);
s->fd = fd;
s->fd_type = fd_type;
+ s->queue_depth = 0;
+ s->can_read = true;
+ s->can_write = true;
+ ovs_list_init(&s->output);
*streamp = &s->stream;
return 0;
}
@@ -83,6 +91,7 @@ fd_close(struct stream *stream)
{
struct stream_fd *s = stream_fd_cast(stream);
closesocket(s->fd);
+ ofpbuf_list_delete(&s->output);
free(s);
}
@@ -111,6 +120,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
if (error == WSAEWOULDBLOCK) {
error = EAGAIN;
}
+#endif
+#ifdef __linux__
+ if (error == ENOBUFS) {
+ error = EAGAIN;
+ }
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
@@ -170,6 +184,75 @@ static bool fd_set_probe_interval(struct stream *stream,
int probe_interval) {
+static int
+fd_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+ struct stream_fd *sfd = stream_fd_cast(stream);
+ ovs_list_push_back(&sfd->output, &buf->list_node);
+ sfd->queue_depth ++;
+ return buf->size;
+}
+
+static bool
+fd_flush(struct stream *stream, int *retval)
+{
+ struct stream_fd *sfd = stream_fd_cast(stream);
+ int old_q_depth;
+
+ if (sfd->queue_depth == 0) {
+ * retval = -EAGAIN;
+ return true;
+ } else {
+ int sent, i = 0;
+ struct msghdr msg;
+ struct ofpbuf *buf;
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth);
+ msg.msg_iovlen = sfd->queue_depth;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = 0;
+
+ LIST_FOR_EACH (buf, list_node, &sfd->output) {
+ msg.msg_iov[i].iov_base = buf->data;
+ msg.msg_iov[i].iov_len = buf->size;
+ i++;
+ }
+
+ sent = sendmsg(sfd->fd, &msg, 0);
+
+ free(msg.msg_iov);
+
+ if (sent > 0) {
+ * retval = sent;
+ old_q_depth = sfd->queue_depth;
+ for (i = 0; i < old_q_depth ; i++) {
+ buf = ofpbuf_from_list(sfd->output.next);
+ if (buf->size > sent) {
+ ofpbuf_pull(buf, sent);
+ sent = 0;
+ } else {
+ sent -= buf->size;
+ sfd->queue_depth --;
+ ovs_list_remove(&buf->list_node);
+ ofpbuf_delete(buf);
+ }
+ if (sent == 0) {
+ break;
+ }
+ }
+ return true;
+ } else {
+ *retval = -sock_errno();
+ return false;
+ }
+ }
+}
+
+
+
static const struct stream_class stream_fd_class = {
"fd", /* name */
false, /* needs_probes */
@@ -182,6 +265,8 @@ static const struct stream_class stream_fd_class = {
NULL, /* run_wait */
fd_wait, /* wait */
fd_set_probe_interval, /* set_probe_interval */
+ fd_enqueue, /* enqueue */
+ fd_flush, /* flush */
};
/* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 6c28cb50b..993c2f742 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -18,9 +18,13 @@
#define STREAM_PROVIDER_H 1
#include <sys/types.h>
+#include <poll.h>
+#include "openvswitch/list.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/thread.h"
#include "stream.h"
-
-/* Active stream connection. */
+#include "byteq.h"
+#include "latch.h"
/* Active stream connection.
*
@@ -130,6 +134,31 @@ struct stream_class {
*
*/
bool (*set_probe_interval)(struct stream *stream, int probe_interval);
+ /* Enqueues an ofpbuf and surrenders its ownership to the
+ * stream
+ *
+ * - If successful - stream now owns the buffer, returns
+ * backlog size
+ *
+ * - On error, negative value, buffer is not claimed by
+ * the stream.
+ *
+ * The enqueue function must not block. If no bytes can be immediately
+ * accepted for transmission, it should return -EAGAIN immediately. */
+ int (*enqueue)(struct stream *stream, struct ofpbuf *buf);
+ /* Flushes any stream buffers
+ *
+ * - If successful returns true and retval contains the backlog size
+ *
+ * - If partially successful (EAGAIN), returns false and retval is
+ * a positive backlog size
+ *
+ * - If unsuccessful, returns false and retval contains a negative
+ * error value
+ *
+ * The flush function must not block. If buffers cannot be flushed
+ * completely it should return "partial success" immediately. */
+ bool (*flush)(struct stream *stream, int *retval);
};
/* Passive listener for incoming stream connections.
@@ -190,6 +219,7 @@ struct pstream_class {
/* Arranges for the poll loop to wake up when a connection is ready to be
* accepted on 'pstream'. */
void (*wait)(struct pstream *pstream);
+
};
/* Active and passive stream classes. */
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 575c55f5b..38a07bf6a 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -85,6 +85,8 @@ struct ssl_stream
SSL *ssl;
struct ofpbuf *txbuf;
unsigned int session_nr;
+ int last_enqueued;
+ long backlog_to_report;
/* rx_want and tx_want record the result of the last call to SSL_read()
* and SSL_write(), respectively:
@@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum
session_type type,
sslv->rx_want = sslv->tx_want = SSL_NOTHING;
sslv->session_nr = next_session_nr++;
sslv->n_head = 0;
+ sslv->last_enqueued = 0;
+ sslv->backlog_to_report = 0;
if (VLOG_IS_DBG_ENABLED()) {
SSL_set_msg_callback(ssl, ssl_protocol_cb);
@@ -784,8 +788,59 @@ ssl_run(struct stream *stream)
{
struct ssl_stream *sslv = ssl_stream_cast(stream);
- if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
- ssl_clear_txbuf(sslv);
+ if (sslv->txbuf) {
+ if (ssl_do_tx(stream) != EAGAIN) {
+ sslv->backlog_to_report += sslv->last_enqueued;
+ ssl_clear_txbuf(sslv);
+ }
+ }
+}
+
+static int
+ssl_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+ int n = buf->size;
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+ if (sslv->txbuf) {
+ return -EAGAIN;
+ }
+ sslv->txbuf = buf;
+ sslv->last_enqueued = n;
+ return n;
+}
+
+static bool
+ssl_flush(struct stream *stream, int *retval)
+{
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+
+ if (!sslv->txbuf) {
+ if (sslv->backlog_to_report) {
+ * retval = sslv->backlog_to_report;
+ sslv->backlog_to_report = 0;
+ } else {
+ * retval = -EAGAIN;
+ }
+ return true;
+ } else {
+ int error;
+
+ error = ssl_do_tx(stream);
+ switch (error) {
+ case 0:
+ ssl_clear_txbuf(sslv);
+ * retval = sslv->backlog_to_report + sslv->last_enqueued;
+ sslv->backlog_to_report = 0;
+ sslv->last_enqueued = 0;
+ return true;
+ case EAGAIN:
+ * retval = 0;
+ return false;
+ default:
+ ssl_clear_txbuf(sslv);
+ * retval = -error;
+ return false;
+ }
}
}
@@ -847,8 +902,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
/* We have room in our tx queue. */
poll_immediate_wake();
} else {
- /* stream_run_wait() will do the right thing; don't bother with
- * redundancy. */
+ poll_fd_wait(sslv->fd, POLLOUT);
}
break;
@@ -869,6 +923,8 @@ const struct stream_class ssl_stream_class = {
ssl_run_wait, /* run_wait */
ssl_wait, /* wait */
ssl_set_probe_interval, /* set_probe_interval */
+ ssl_enqueue, /* send_buf */
+ ssl_flush,
};
/* Passive SSL. */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index 67c912105..7821c1dd1 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -74,6 +74,8 @@ const struct stream_class tcp_stream_class = {
NULL, /* run_wait */
NULL, /* wait */
NULL,
+ NULL, /* enqueue */
+ NULL, /* flush */
};
/* Passive TCP. */
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index 4e96720ab..7c8987638 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -79,6 +79,8 @@ const struct stream_class unix_stream_class = {
NULL, /* run_wait */
NULL, /* wait */
unix_set_probe_interval,
+ NULL,
+ NULL,
};
/* Passive UNIX socket. */
diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 836112f75..d40e55d5c 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -375,6 +375,8 @@ const struct stream_class windows_stream_class = {
NULL, /* run_wait */
windows_wait, /* wait */
NULL,
+ NULL, /* enqueue */
+ NULL, /* flush */
};
struct pwindows_pstream
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 4e2dfc3d7..850b31b9e 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -540,6 +540,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
static int
ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
{
+ struct jsonrpc_msg *msg;
+
jsonrpc_session_run(s->js);
if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
s->js_seqno = jsonrpc_session_get_seqno(s->js);
@@ -549,25 +551,20 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
}
ovsdb_jsonrpc_trigger_complete_done(s);
+ ovsdb_jsonrpc_monitor_flush_all(s);
- if (!jsonrpc_session_get_backlog(s->js)) {
- struct jsonrpc_msg *msg;
-
- ovsdb_jsonrpc_monitor_flush_all(s);
-
- msg = jsonrpc_session_recv(s->js);
- if (msg) {
- if (msg->type == JSONRPC_REQUEST) {
- ovsdb_jsonrpc_session_got_request(s, msg);
- } else if (msg->type == JSONRPC_NOTIFY) {
- ovsdb_jsonrpc_session_got_notify(s, msg);
- } else {
- VLOG_WARN("%s: received unexpected %s message",
- jsonrpc_session_get_name(s->js),
- jsonrpc_msg_type_to_string(msg->type));
- jsonrpc_session_force_reconnect(s->js);
- jsonrpc_msg_destroy(msg);
- }
+ msg = jsonrpc_session_recv(s->js);
+ if (msg) {
+ if (msg->type == JSONRPC_REQUEST) {
+ ovsdb_jsonrpc_session_got_request(s, msg);
+ } else if (msg->type == JSONRPC_NOTIFY) {
+ ovsdb_jsonrpc_session_got_notify(s, msg);
+ } else {
+ VLOG_WARN("%s: received unexpected %s message",
+ jsonrpc_session_get_name(s->js),
+ jsonrpc_msg_type_to_string(msg->type));
+ jsonrpc_session_force_reconnect(s->js);
+ jsonrpc_msg_destroy(msg);
}
}
return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index ef4e996df..d3953f4d9 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -59,6 +59,7 @@
#include "perf-counter.h"
#include "ovsdb-util.h"
#include "openvswitch/vlog.h"
+#include "async-io.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_server);
@@ -398,6 +399,7 @@ main(int argc, char *argv[])
}
daemonize_complete();
+ async_io_enable();
if (!run_command) {
/* ovsdb-server is usually a long-running process, in which case it