+ data->async_id = random_uint32();
+ target_control = &io_pool->controls[data->async_id % io_pool->size];
+ /* these are just fd pairs, no need to play with pointers, we
+ * can pass them around
+ */
+ data->tx_run_notify = target_control->async_latch;
+ latch_init(&data->rx_notify);
+ ovs_mutex_lock(&target_control->mutex);
+ ovs_list_push_back(&target_control->work_items, &data->list_node);
+ ovs_mutex_unlock(&target_control->mutex);
+ latch_set(&target_control->async_latch);
+ }
+}
+
+void
+async_stream_enable(struct async_data *data)
+{
+ data->async_mode = allow_async_io;
+}
+
+void
+async_stream_disable(struct async_data *data)
+{
+ struct async_io_control *target_control;
+ bool needs_wake = false;
+
+
+ if (data->async_mode) {
+ if (not_in_error(data) && (async_get_backlog(data) > 0)) {
+ needs_wake = true;
+ latch_poll(&data->rx_notify);
+ latch_wait(&data->rx_notify);
+ latch_set(&data->tx_run_notify);
+ /* limit this to 50ms - should be enough for
+ * a single flush and we will not get stuck here
+ * waiting for a send to complete
+ */
+ poll_timer_wait(50);
+ poll_block();
+ }
+ if (needs_wake) {
+ /* we have lost all poll-wait info because we block()-ed
+ * locally, we need to force the upper layers to rerun so
+ * that they reinstate the correct waits
+ */
+ poll_immediate_wake();
+ }
+ target_control = &io_pool->controls[data->async_id % io_pool->size];
+ ovs_mutex_lock(&target_control->mutex);
+ ovs_list_remove(&data->list_node);
+ ovs_mutex_unlock(&target_control->mutex);
+ data->async_mode = false;
+ latch_destroy(&data->rx_notify);
+ }
+ if (data->input_buffer) {
+ free(data->input_buffer);
+ data->input_buffer = NULL;
+ }
+}
+
+void
+async_cleanup_data(struct async_data *data)
+{
+ if (async_get_backlog(data)) {
+ ofpbuf_list_delete(&data->output);
+ }
+ atomic_store_relaxed(&data->backlog, 0);
+ data->output_count = 0;
+}
+
+/* Routines intended for async IO */
+
+long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) {
+ long retval = -EAGAIN;
+ long discard;
+
+ ovs_mutex_lock(&data->mutex);
+ if (buf) {
+ ovs_list_push_back(&data->output, &buf->list_node);
+ data->output_count ++;
+ atomic_add_relaxed(&data->backlog, buf->size, &discard);
+ atomic_thread_fence(memory_order_release);
+ }
+ atomic_read_relaxed(&data->backlog, &retval);
+ ovs_mutex_unlock(&data->mutex);
+ return retval;
+}
+
+static int do_stream_flush(struct async_data *data) {
+ struct ofpbuf *buf;
+ int count = 0;
+ bool stamp = false;
+ int retval = -stream_connect(data->stream);
+ long discard;
+
+ if (!retval) {
+ while (!ovs_list_is_empty(&data->output) && count < 10) {
+ buf = ofpbuf_from_list(data->output.next);
+ if (data->stream->class->enqueue) {
+ ovs_list_remove(&buf->list_node);
+ retval = (data->stream->class->enqueue)(data->stream, buf);
+ if (retval > 0) {
+ data->output_count--;
+ } else {
+ ovs_list_push_front(&data->output, &buf->list_node);
+ }
+ } else {
+ retval = stream_send(data->stream, buf->data, buf->size);
+ if (retval > 0) {
+ stamp = true;
+ atomic_sub_relaxed(&data->backlog, retval, &discard);
+ ofpbuf_pull(buf, retval);
+ if (!buf->size) {
+ /* stream now owns buf */
+ ovs_list_remove(&buf->list_node);
+ data->output_count--;
+ ofpbuf_delete(buf);
+ }
+ }
+ }
+ if (retval <= 0) {
+ break;
+ }
+ count++;
+ }
+ if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) {
+ (data->stream->class->flush)(data->stream, &retval);
+ if (retval > 0) {
+ stamp = true;
+ atomic_sub_relaxed(&data->backlog, retval, &discard);
+ }
+ }
+ if (stamp) {
+ atomic_store_relaxed(&data->active, true);
+ }
+ }
+ atomic_store_relaxed(&data->tx_error, retval);
+ return retval;
+}
+
+int async_stream_flush(struct async_data *data) {
+ int retval;
+
+ if (data->async_mode) {
+ atomic_read_relaxed(&data->tx_error, &retval);
+ if (retval >= 0) {
+ retval = -EAGAIN; /* fake a busy so that upper layers do not
+ * retry, we will flush the backlog in the
+ * background
+ */
+ }
+ if (async_get_backlog(data)) {
+ latch_set(&data->tx_run_notify);
+ }
+ } else {
+ retval = do_stream_flush(data);
+ }
+ return retval;
+}
+
+static int do_async_recv(struct async_data *data) {
+ size_t chunk;
+ int retval;
+
+ atomic_read_relaxed(&data->rx_error, &retval);
+ if (retval > 0 || retval == -EAGAIN) {
+ chunk = byteq_headroom(&data->input);
+ if (chunk > 0) {
+ retval = stream_recv(
+ data->stream, byteq_head(&data->input), chunk);
+ if (retval > 0) {
+ byteq_advance_head(&data->input, retval);
+ }
+ }
+ }
+ if (retval > 0 || retval == -EAGAIN) {
+ retval = byteq_used(&data->input);
+ if (retval == 0) {
+ retval = -EAGAIN;
+ }
+ }
+ atomic_store_relaxed(&data->rx_error, retval);
+ return retval;
+}
+
+
+int async_stream_recv(struct async_data *data) {
+ int retval = -EAGAIN;
+
+ if (data->async_mode) {
+ atomic_read_relaxed(&data->rx_error, &retval);
+ /* clear RX notifications */
+ latch_poll(&data->rx_notify);
+ /* fake a retval from byteq usage */
+ if (retval > 0 || retval == -EAGAIN) {
+ retval = byteq_used(&data->input);
+ if (retval == 0) {
+ retval = -EAGAIN;
+ }
+ }
+ } else {
+ retval = do_async_recv(data);
+ }
+ return retval;
+}
+
+void async_stream_run(struct async_data *data) {
+ if (!data->async_mode) {
+ stream_run(data->stream);
+ } else {
+ latch_set(&data->tx_run_notify);
+ }
+ }
+
+void async_io_kick(struct async_data *data) {
+ if (data->async_mode) {
+ latch_set(&data->tx_run_notify);
+ }
+}
+
+void async_recv_wait(struct async_data *data) {
+ if (data->async_mode) {
+ latch_poll(&data->rx_notify);
+ latch_wait(&data->rx_notify);
+ } else {
+ stream_recv_wait(data->stream);
+ }
+}
+
+void async_io_enable(void) {
+ allow_async_io = true;
+}
+
+/* Accessors for JSON RPC */
+
+struct byteq *async_get_input(struct async_data *data) {
+ return &data->input;
+}
+struct stream *async_get_stream(struct async_data *data) {
+ return data->stream;
+}
+
+bool async_output_is_empty(struct async_data *data) {
+ bool retval;
+ ovs_mutex_lock(&data->mutex);
+ /* backlog tracks backlog across the full stack all the
+ * way to the actual send. It is the source of truth
+ * if we have output or not so anybody asking if we
+ * have output should be told if we have backlog
+ * instead.
+ */
+ retval = (data->backlog == 0);
+ ovs_mutex_unlock(&data->mutex);
+ return retval;
+}
+
+long async_get_backlog(struct async_data *data) {
+ long retval;
+ /* This is used only by the unixctl connection
+ * so not worth it to convert backlog to atomics
+ */
+ atomic_read_relaxed(&data->backlog, &retval);
+ return retval;
+}
+
+bool async_get_active(struct async_data *data) {
+ bool test = true;
+ return atomic_compare_exchange_weak(&data->active, &test, false);
+}
+
+
diff --git a/lib/async-io.h b/lib/async-io.h
new file mode 100644
index 000000000..dea070ee6
--- /dev/null
+++ b/lib/async-io.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2020 Red Hat, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ASYNC_IO_H
+#define ASYNC_IO_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include "openvswitch/types.h"
+#include "openvswitch/ofpbuf.h"
+#include "socket-util.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "latch.h"
+#include "byteq.h"
+#include "util.h"
+
+#define ASYNC_BUFFER_SIZE (4096)
+
+struct stream;
+
+struct async_data {
+ struct stream *stream;
+ struct ovs_list output;
+ struct ovs_list list_node;
+ long backlog;
+ size_t output_count;
+ atomic_bool active;
+ atomic_int rx_error, tx_error;
+ uint32_t async_id;
+ struct latch rx_notify, tx_run_notify;
+ struct ovs_mutex mutex;
+ bool async_mode, valid;
+ struct byteq input;
+ uint8_t *input_buffer;
+};
+
+struct async_io_control {
+ struct latch async_latch;
+ struct ovs_list work_items;
+ struct ovs_mutex mutex;
+};
+
+struct async_io_pool {
+ struct ovs_list list_node;
+ struct async_io_control *controls;
+ int size;
+};
+
+struct async_io_pool *add_pool(void *(*start)(void *));
+
+long async_stream_enqueue(struct async_data *, struct ofpbuf *buf);
+int async_stream_flush(struct async_data *);
+int async_stream_recv(struct async_data *);
+struct byteq *async_get_input(struct async_data *);
+struct stream *async_get_stream(struct async_data *);
+bool async_output_is_empty(struct async_data *);
+long async_get_backlog(struct async_data *);
+bool async_get_active(struct async_data *);
+
+void async_stream_enable(struct async_data *);
+void async_stream_disable(struct async_data *);
+
+void async_init_data(struct async_data *, struct stream *);
+void async_cleanup_data(struct async_data *);
+void async_stream_run(struct async_data *data);
+void async_io_kick(struct async_data *data);
+void async_recv_wait(struct async_data *data);
+void async_io_enable(void);
+
+#endif /* async-io.h */
diff --git a/lib/automake.mk b/lib/automake.mk
index 86940ccd2..6f7870f26 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \
lib/aes128.c \
lib/aes128.h \
lib/async-append.h \
+ lib/async-io.h \
+ lib/async-io.c \
lib/backtrace.c \
lib/backtrace.h \
lib/bfd.c \
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ed748dbde..f831bc2dd 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -30,28 +30,23 @@
#include "openvswitch/poll-loop.h"
#include "reconnect.h"
#include "stream.h"
+#include "stream-provider.h"
#include "svec.h"
#include "timeval.h"
+#include "async-io.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(jsonrpc);
struct jsonrpc {
- struct stream *stream;
char *name;
int status;
-
- /* Input. */
- struct byteq input;
- uint8_t input_buffer[4096];
struct json_parser *parser;
-
- /* Output. */
- struct ovs_list output; /* Contains "struct ofpbuf"s. */
- size_t output_count; /* Number of elements in "output". */
- size_t backlog;
+ struct async_data data;
};
+#define MIN_IDLE_TIME 10
+
/* Rate limit for error messages. */
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
@@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
static void jsonrpc_cleanup(struct jsonrpc *);
static void jsonrpc_error(struct jsonrpc *, int error);
+static inline struct async_data *adata(struct jsonrpc *rpc) {
+ return &rpc->data;
+}
+
+
/* This is just the same as stream_open() except that it uses the default
* JSONRPC port if none is specified. */
int
@@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream)
rpc = xzalloc(sizeof *rpc);
rpc->name = xstrdup(stream_get_name(stream));
- rpc->stream = stream;
- byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
- ovs_list_init(&rpc->output);
-
+ async_init_data(adata(rpc), stream);
+ async_stream_enable(adata(rpc));
return rpc;
}
@@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc)
void
jsonrpc_run(struct jsonrpc *rpc)
{
+ int retval;
if (rpc->status) {
return;
}
- stream_run(rpc->stream);
- while (!ovs_list_is_empty(&rpc->output)) {
- struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
- int retval;
-
- retval = stream_send(rpc->stream, buf->data, buf->size);
- if (retval >= 0) {
- rpc->backlog -= retval;
- ofpbuf_pull(buf, retval);
- if (!buf->size) {
- ovs_list_remove(&buf->list_node);
- rpc->output_count--;
- ofpbuf_delete(buf);
- }
- } else {
+ async_stream_run(adata(rpc));
+ do {
+ retval = async_stream_flush(&rpc->data);
+ if (retval < 0) {
if (retval != -EAGAIN) {
VLOG_WARN_RL(&rl, "%s: send error: %s",
rpc->name, ovs_strerror(-retval));
jsonrpc_error(rpc, -retval);
}
- break;
}
- }
+ } while (retval > 0);
}
/* Arranges for the poll loop to wake up when 'rpc' needs to perform
@@ -144,9 +131,13 @@ void
jsonrpc_wait(struct jsonrpc *rpc)
{
if (!rpc->status) {
- stream_run_wait(rpc->stream);
- if (!ovs_list_is_empty(&rpc->output)) {
- stream_send_wait(rpc->stream);
+ if (adata(rpc)->async_mode) {
+ async_recv_wait(adata(rpc));
+ } else {
+ stream_run_wait(rpc->data.stream);
+ if (!async_output_is_empty(adata(rpc))) {
+ stream_send_wait(async_get_stream(adata(rpc)));
+ }
}
}
}
@@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc)
size_t
jsonrpc_get_backlog(const struct jsonrpc *rpc)
{
- return rpc->status ? 0 : rpc->backlog;
+ return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc));
}
/* Returns the number of bytes that have been received on 'rpc''s underlying
@@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
unsigned int
jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
{
- return rpc->input.head;
+ return async_get_input(adata((struct jsonrpc *) rpc))->head;
}
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
@@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char
*title,
* buffered in 'rpc'.)
*
* Always takes ownership of 'msg', regardless of success. */
+
int
jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
{
struct ofpbuf *buf;
struct json *json;
struct ds ds = DS_EMPTY_INITIALIZER;
- size_t length;
if (rpc->status) {
jsonrpc_msg_destroy(msg);
@@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
json = jsonrpc_msg_to_json(msg);
json_to_ds(json, 0, &ds);
- length = ds.length;
json_destroy(json);
buf = xmalloc(sizeof *buf);
ofpbuf_use_ds(buf, &ds);
- ovs_list_push_back(&rpc->output, &buf->list_node);
- rpc->output_count++;
- rpc->backlog += length;
-
- if (rpc->output_count >= 50) {
- VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
- " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
- rpc->output_count, rpc->backlog);
- }
+ async_stream_enqueue(adata(rpc), buf);
- if (rpc->backlog == length) {
- jsonrpc_run(rpc);
- }
+ jsonrpc_run(rpc);
return rpc->status;
}
@@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
int
jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
{
- int i;
+ int i, retval;
*msgp = NULL;
if (rpc->status) {
@@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg
**msgp)
size_t n, used;
/* Fill our input buffer if it's empty. */
- if (byteq_is_empty(&rpc->input)) {
- size_t chunk;
- int retval;
-
- chunk = byteq_headroom(&rpc->input);
- retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
- if (retval < 0) {
- if (retval == -EAGAIN) {
- return EAGAIN;
- } else {
- VLOG_WARN_RL(&rl, "%s: receive error: %s",
- rpc->name, ovs_strerror(-retval));
- jsonrpc_error(rpc, -retval);
- return rpc->status;
- }
- } else if (retval == 0) {
- jsonrpc_error(rpc, EOF);
- return EOF;
+ retval = async_stream_recv(adata(rpc));
+ if (retval < 0) {
+ if (retval == -EAGAIN) {
+ return EAGAIN;
+ } else {
+ VLOG_WARN_RL(&rl, "%s: receive error: %s",
+ rpc->name, ovs_strerror(-retval));
+ jsonrpc_error(rpc, -retval);
+ return rpc->status;
}
- byteq_advance_head(&rpc->input, retval);
+ } else if (retval == 0) {
+ jsonrpc_error(rpc, EOF);
+ return EOF;
}
/* We have some input. Feed it into the JSON parser. */
if (!rpc->parser) {
rpc->parser = json_parser_create(0);
}
- n = byteq_tailroom(&rpc->input);
+ n = byteq_tailroom(async_get_input(adata(rpc)));
+ if (n == 0) {
+ break;
+ }
used = json_parser_feed(rpc->parser,
- (char *) byteq_tail(&rpc->input), n);
- byteq_advance_tail(&rpc->input, used);
+ (char *) byteq_tail(async_get_input(adata(rpc))), n);
+ byteq_advance_tail(async_get_input(adata(rpc)), used);
/* If we have complete JSON, attempt to parse it as JSON-RPC. */
if (json_parser_is_done(rpc->parser)) {
@@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
}
if (rpc->status) {
- const struct byteq *q = &rpc->input;
+ const struct byteq *q = async_get_input(adata(rpc));
if (q->head <= q->size) {
stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
&this_module, rpc->name);
@@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg
**msgp)
void
jsonrpc_recv_wait(struct jsonrpc *rpc)
{
- if (rpc->status || !byteq_is_empty(&rpc->input)) {
+ if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) {
poll_immediate_wake_at(rpc->name);
} else {
- stream_recv_wait(rpc->stream);
+ async_recv_wait(adata(rpc));
}
}
@@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
for (;;) {
jsonrpc_run(rpc);
- if (ovs_list_is_empty(&rpc->output) || rpc->status) {
+ if (async_output_is_empty(adata(rpc)) || rpc->status) {
return rpc->status;
}
jsonrpc_wait(rpc);
@@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error)
static void
jsonrpc_cleanup(struct jsonrpc *rpc)
{
- stream_close(rpc->stream);
- rpc->stream = NULL;
+ async_stream_disable(adata(rpc));
+ stream_close(rpc->data.stream);
+ rpc->data.stream = NULL;
json_parser_abort(rpc->parser);
rpc->parser = NULL;
- ofpbuf_list_delete(&rpc->output);
- rpc->backlog = 0;
- rpc->output_count = 0;
+ async_cleanup_data(adata(rpc));
}
static struct jsonrpc_msg *
@@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s)
}
if (s->rpc) {
- size_t backlog;
int error;
+ bool active = async_get_active(adata(s->rpc));
- backlog = jsonrpc_get_backlog(s->rpc);
jsonrpc_run(s->rpc);
- if (jsonrpc_get_backlog(s->rpc) < backlog) {
+
+ active |= async_get_active(adata(s->rpc));
+
+ if (active) {
/* Data previously caught in a queue was successfully sent (or
* there's an error, which we'll catch below.)
*
@@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s)
const char *
jsonrpc_session_get_id(const struct jsonrpc_session *s)
{
- if (s->rpc && s->rpc->stream) {
- return stream_get_peer_id(s->rpc->stream);
+ if (s->rpc && async_get_stream(adata(s->rpc))) {
+ return stream_get_peer_id(adata(s->rpc)->stream);
} else {
return NULL;
}
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 46ee7ae27..747d543cf 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -30,6 +30,7 @@
#include "stream-provider.h"
#include "stream.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/list.h"
VLOG_DEFINE_THIS_MODULE(stream_fd);
@@ -40,6 +41,8 @@ struct stream_fd
struct stream stream;
int fd;
int fd_type;
+ struct ovs_list output;
+ int queue_depth;
};
static const struct stream_class stream_fd_class;
@@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int
fd_type,
stream_init(&s->stream, &stream_fd_class, connect_status, name);
s->fd = fd;
s->fd_type = fd_type;
+ s->queue_depth = 0;
+ ovs_list_init(&s->output);
*streamp = &s->stream;
return 0;
}
@@ -83,6 +88,7 @@ fd_close(struct stream *stream)
{
struct stream_fd *s = stream_fd_cast(stream);
closesocket(s->fd);
+ ofpbuf_list_delete(&s->output);
free(s);
}
@@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
if (error == WSAEWOULDBLOCK) {
error = EAGAIN;
}
+#endif
+#ifdef __linux__
+ if (error == ENOBUFS) {
+ error = EAGAIN;
+ }
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
@@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
}
}
+static int
+fd_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+ struct stream_fd *sfd = stream_fd_cast(stream);
+ ovs_list_push_back(&sfd->output, &buf->list_node);
+ sfd->queue_depth ++;
+ return buf->size;
+}
+
+static bool
+fd_flush(struct stream *stream, int *retval)
+{
+ struct stream_fd *sfd = stream_fd_cast(stream);
+ int old_q_depth;
+
+ if (sfd->queue_depth == 0) {
+ * retval = -EAGAIN;
+ return true;
+ } else {
+ int sent, i = 0;
+ struct msghdr msg;
+ struct ofpbuf *buf;
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth);
+ msg.msg_iovlen = sfd->queue_depth;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = 0;
+
+ LIST_FOR_EACH (buf, list_node, &sfd->output) {
+ msg.msg_iov[i].iov_base = buf->data;
+ msg.msg_iov[i].iov_len = buf->size;
+ i++;
+ }
+
+ sent = sendmsg(sfd->fd, &msg, 0);
+
+ free(msg.msg_iov);
+
+ if (sent > 0) {
+ * retval = sent;
+ old_q_depth = sfd->queue_depth;
+ for (i = 0; i < old_q_depth ; i++) {
+ buf = ofpbuf_from_list(sfd->output.next);
+ if (buf->size > sent) {
+ ofpbuf_pull(buf, sent);
+ sent = 0;
+ } else {
+ sent -= buf->size;
+ sfd->queue_depth --;
+ ovs_list_remove(&buf->list_node);
+ ofpbuf_delete(buf);
+ }
+ if (sent == 0) {
+ break;
+ }
+ }
+ return true;
+ } else {
+ *retval = -sock_errno();
+ return false;
+ }
+ }
+}
+
+
+
static const struct stream_class stream_fd_class = {
"fd", /* name */
false, /* needs_probes */
@@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = {
NULL, /* run */
NULL, /* run_wait */
fd_wait, /* wait */
+ fd_enqueue, /* enqueue */
+ fd_flush, /* flush */
};
/* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 75f4f059b..b5161bd04 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -18,9 +18,13 @@
#define STREAM_PROVIDER_H 1
#include <sys/types.h>
+#include <poll.h>
+#include "openvswitch/list.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/thread.h"
#include "stream.h"
-
-/* Active stream connection. */
+#include "byteq.h"
+#include "latch.h"
/* Active stream connection.
*
@@ -124,6 +128,31 @@ struct stream_class {
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
* action of the given 'type'. */
void (*wait)(struct stream *stream, enum stream_wait_type type);
+ /* Enqueues an ofpbuf and surrenders its ownership to the
+ * stream
+ *
+ * - If successful - stream now owns the buffer, returns
+ * backlog size
+ *
+ * - On error, negative value, buffer is not claimed by
+ * the stream.
+ *
+ * The enqueue function must not block. If no bytes can be immediately
+ * accepted for transmission, it should return -EAGAIN immediately. */
+ int (*enqueue)(struct stream *stream, struct ofpbuf *buf);
+ /* Flushes any stream buffers
+ *
+ * - If successful returns true and retval contains the backlog size
+ *
+ * - If partially successful (EAGAIN), returns false and retval is
+ * a positive backlog size
+ *
+ * - If unsuccessful, returns false and retval contains a negative
+ * error value
+ *
+ * The flush function must not block. If buffers cannot be flushed
+ * completely it should return "partial success" immediately. */
+ bool (*flush)(struct stream *stream, int *retval);
};
/* Passive listener for incoming stream connections.
@@ -184,6 +213,7 @@ struct pstream_class {
/* Arranges for the poll loop to wake up when a connection is ready to be
* accepted on 'pstream'. */
void (*wait)(struct pstream *pstream);
+
};
/* Active and passive stream classes. */
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 078fcbc3a..0046e383e 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -85,6 +85,8 @@ struct ssl_stream
SSL *ssl;
struct ofpbuf *txbuf;
unsigned int session_nr;
+ int last_enqueued;
+ long backlog_to_report;
/* rx_want and tx_want record the result of the last call to SSL_read()
* and SSL_write(), respectively:
@@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum
session_type type,
sslv->rx_want = sslv->tx_want = SSL_NOTHING;
sslv->session_nr = next_session_nr++;
sslv->n_head = 0;
+ sslv->last_enqueued = 0;
+ sslv->backlog_to_report = 0;
if (VLOG_IS_DBG_ENABLED()) {
SSL_set_msg_callback(ssl, ssl_protocol_cb);
@@ -784,8 +788,59 @@ ssl_run(struct stream *stream)
{
struct ssl_stream *sslv = ssl_stream_cast(stream);
- if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
- ssl_clear_txbuf(sslv);
+ if (sslv->txbuf) {
+ if (ssl_do_tx(stream) != EAGAIN) {
+ sslv->backlog_to_report += sslv->last_enqueued;
+ ssl_clear_txbuf(sslv);
+ }
+ }
+}
+
+static int
+ssl_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+ int n = buf->size;
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+ if (sslv->txbuf) {
+ return -EAGAIN;
+ }
+ sslv->txbuf = buf;
+ sslv->last_enqueued = n;
+ return n;
+}
+
+static bool
+ssl_flush(struct stream *stream, int *retval)
+{
+ struct ssl_stream *sslv = ssl_stream_cast(stream);
+
+ if (!sslv->txbuf) {
+ if (sslv->backlog_to_report) {
+ * retval = sslv->backlog_to_report;
+ sslv->backlog_to_report = 0;
+ } else {
+ * retval = -EAGAIN;
+ }
+ return true;
+ } else {
+ int error;
+
+ error = ssl_do_tx(stream);
+ switch (error) {
+ case 0:
+ ssl_clear_txbuf(sslv);
+ * retval = sslv->backlog_to_report + sslv->last_enqueued;
+ sslv->backlog_to_report = 0;
+ sslv->last_enqueued = 0;
+ return true;
+ case EAGAIN:
+ * retval = 0;
+ return false;
+ default:
+ ssl_clear_txbuf(sslv);
+ * retval = -error;
+ return false;
+ }
}
}
@@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
/* We have room in our tx queue. */
poll_immediate_wake();
} else {
- /* stream_run_wait() will do the right thing; don't bother with
- * redundancy. */
+ poll_fd_wait(sslv->fd, POLLOUT);
}
break;
@@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = {
ssl_run, /* run */
ssl_run_wait, /* run_wait */
ssl_wait, /* wait */
+ ssl_enqueue, /* send_buf */