Scott Lamb wrote:
(4) Detect while streaming that the client connection has been lost, so
that we know to stop sending chunks. Patch attached to return error code
in evhttp_send_reply_chunk(). (Wouldn't hurt to add error returns in
other places, but this is my immediate need.)
Hmm, my patch was still appending to the buffer if the client doesn't
pay attention; that's probably not ideal either.
(6) Flow control (for the --limit-rate case). I need to keep the
outgoing connection input buffer, outgoing request input buffer,
incoming request output buffer, and incoming connection output buffer
from growing to be ridiculous. I'm considering something like this:
* define read watermarks on requests; it will only read from the
outgoing connection when the input buffers are below the high (default
of ~0 to preserve existing behavior). it will only call the chunk
callback when the input buffers are above the low (default of 0).
* define an option to drain the input buffer after calling the chunk
callback, defaulting to 1 for existing behavior. I'd want to turn it off
so I can transfer bytes only if the output buffers are not full.
* define output watermarks as well. If below low watermark (defaulting
to 0), call a fill function which in my case would also transfer from
input buffers.
I'm not too attached to this plan. I keep thinking there might be a
simpler way, but I don't see it.
Hmm, here's a simpler approach (evhttp-flowcontrol.patch). Opinions?
Flow control for evhttp_request
* evhttp_request_set_output_watermarks():
- call "fill" when "output buffer size <= low watermark" becomes true
- call "drain" when "output buffer size >= high watermark" becomes true
* evhttp_request_{pause,resume}():
Pause and resume reading from the socket into the input buffer
My client<->proxy request's fill and drain callbacks resume and pause
the proxy<->server request, respectively.
I'm wondering if there's any point in input watermarks here or for
bufferevent either (remove wm_input? there doesn't seem to be an API
yet). The idea of the downstream thing toggling it seems simpler.
Preliminary version of all patches attached. Still don't have anything
for (5). These apply in this order:
+ evhttp-64bit-length.patch
+ evhttp-stream-in.patch
+ evhttp-nonchunked-streaming.patch
+ evhttp-detect-failure.patch
> evhttp-flowcontrol.patch
Best regards,
Scott
--
Scott Lamb <http://www.slamb.org/>
evhttp: stream incoming responses, even if not chunked
From: Scott Lamb <[EMAIL PROTECTED]>
Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---
libevent/http.c | 19 ++++++++++++++-----
1 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/libevent/http.c b/libevent/http.c
index 500a991..d4d0ce6 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -730,11 +730,6 @@ evhttp_handle_chunked_read(struct evhttp_request *req,
struct evbuffer *buf)
EVBUFFER_DATA(buf), req->ntoread);
evbuffer_drain(buf, req->ntoread);
req->ntoread = -1;
- if (req->chunk_cb != NULL) {
- (*req->chunk_cb)(req, req->cb_arg);
- evbuffer_drain(req->input_buffer,
- EVBUFFER_LENGTH(req->input_buffer));
- }
}
return (0);
@@ -768,7 +763,21 @@ evhttp_read_body(struct evhttp_connection *evcon, struct
evhttp_request *req)
req->ntoread = 0;
evhttp_connection_done(evcon);
return;
+ } else if (req->chunk_cb != NULL) {
+ /*
+ * The condition above is simply for efficiency; we do the
+ * move at the end if we aren't streaming.
+ */
+ req->ntoread -= EVBUFFER_LENGTH(buf);
+ evbuffer_add_buffer(req->input_buffer, buf);
}
+
+ if (EVBUFFER_LENGTH(req->input_buffer) > 0 && req->chunk_cb != NULL) {
+ (*req->chunk_cb)(req, req->cb_arg);
+ evbuffer_drain(req->input_buffer,
+ EVBUFFER_LENGTH(req->input_buffer));
+ }
+
/* Read more! */
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp: use a 64-bit Content-Length: counter even on 32-bit platforms
From: Scott Lamb <[EMAIL PROTECTED]>
Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---
libevent/evhttp.h | 2 +-
libevent/http.c | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index 20a33b9..a47a2f0 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -204,7 +204,7 @@ struct {
char *response_code_line; /* Readable response */
struct evbuffer *input_buffer; /* read data */
- int ntoread;
+ int64_t ntoread;
int chunked;
struct evbuffer *output_buffer; /* outgoing post or data */
diff --git a/libevent/http.c b/libevent/http.c
index 712a899..500a991 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -707,7 +707,7 @@ evhttp_handle_chunked_read(struct evhttp_request *req,
struct evbuffer *buf)
event_free(p);
continue;
}
- req->ntoread = strtol(p, &endp, 16);
+ req->ntoread = strtoll(p, &endp, 16);
error = *p == '\0' || (*endp != '\0' && *endp != ' ');
event_free(p);
if (error) {
@@ -1321,7 +1321,7 @@ evhttp_get_body_length(struct evhttp_request *req)
req->ntoread = -1;
} else {
char *endp;
- req->ntoread = strtol(content_length, &endp, 10);
+ req->ntoread = strtoll(content_length, &endp, 10);
if (*content_length == '\0' || *endp != '\0') {
event_warnx("%s: illegal content length: %s",
__func__, content_length);
Flow control for evhttp_request
From: Scott Lamb <[EMAIL PROTECTED]>
* evhttp_request_set_output_watermarks():
- call "fill" when "output buffer size <= low watermark" becomes true
- call "drain" when "output buffer size >= high watermark" becomes true
* evhttp_request_{pause,resume}():
Pause and resume reading from the socket into the input buffer
Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---
libevent/evhttp.h | 25 +++++++++++++++++++++++++
libevent/http.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 78 insertions(+), 1 deletions(-)
diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index 349cf95..ab1ad75 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -139,6 +139,18 @@ void evhttp_send_error(struct evhttp_request *req, int
error,
void evhttp_send_reply(struct evhttp_request *req, int code,
const char *reason, struct evbuffer *databuf);
+/**
+ * Manipulate watermarks for flow control.
+ * - Calls "fill_cb" with "filldrain_arg" when "output buffer size <=
+ * low watermark" becomes true.
+ * - Calls "drain_cb" with "filldrain_arg" when "output buffer size >=
+ * high watermark" becomes true.
+ */
+void evhttp_request_set_output_watermarks(struct evhttp_request *,
+ size_t low_watermark, void (*fill_cb)(struct evhttp_request *, void *),
+ size_t high_watermark, void (*drain_cb)(struct evhttp_request *, void *),
+ void *filldrain_arg);
+
/* Low-level response interface, for streaming/chunked replies */
void evhttp_send_reply_start(struct evhttp_request *, int, const char *);
int evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
@@ -183,6 +195,7 @@ struct {
#define EVHTTP_REQ_OWN_CONNECTION 0x0001
#define EVHTTP_PROXY_REQUEST 0x0002
#define EVHTTP_USER_OWNED 0x0004
+#define EVHTTP_REQ_PAUSED 0x0008
struct evkeyvalq *input_headers;
struct evkeyvalq *output_headers;
@@ -208,6 +221,7 @@ struct {
int chunked;
struct evbuffer *output_buffer; /* outgoing post or data */
+ struct event_watermark wm_output;
/* Callback */
void (*cb)(struct evhttp_request *, void *);
@@ -219,6 +233,14 @@ struct {
* the regular callback.
*/
void (*chunk_cb)(struct evhttp_request *, void *);
+
+ /*
+ * Fill and drain watermarks - call to maintain consistent
+ * buffer size.
+ */
+ void (*fill_cb)(struct evhttp_request *, void *);
+ void (*drain_cb)(struct evhttp_request *, void *);
+ void *filldrain_arg;
};
/**
@@ -229,6 +251,9 @@ struct {
struct evhttp_request *evhttp_request_new(
void (*cb)(struct evhttp_request *, void *), void *arg);
+void evhttp_request_pause(struct evhttp_request *);
+void evhttp_request_resume(struct evhttp_request *);
+
/** enable delivery of chunks to requestor */
void evhttp_request_set_chunked_cb(struct evhttp_request *,
void (*cb)(struct evhttp_request *, void *));
diff --git a/libevent/http.c b/libevent/http.c
index 52913bd..dfe174a 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -606,6 +606,7 @@ void
evhttp_write(evutil_socket_t fd, short what, void *arg)
{
struct evhttp_connection *evcon = arg;
+ struct evhttp_request* req = TAILQ_FIRST(&evcon->requests);
int n;
if (what == EV_TIMEOUT) {
@@ -626,6 +627,10 @@ evhttp_write(evutil_socket_t fd, short what, void *arg)
return;
}
+ if (EVBUFFER_LENGTH(req->evcon->output_buffer) <=
+ req->wm_output.low && req->fill_cb != NULL)
+ req->fill_cb(req, req->filldrain_arg);
+
if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
evhttp_add_event(&evcon->ev,
evcon->timeout, HTTP_WRITE_TIMEOUT);
@@ -789,7 +794,8 @@ evhttp_read_body(struct evhttp_connection *evcon, struct
evhttp_request *req)
/* Read more! */
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
- evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
+ if ((req->flags & EVHTTP_REQ_PAUSED) == 0)
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
}
/*
@@ -1755,6 +1761,11 @@ evhttp_send_reply_chunk(struct evhttp_request *req,
struct evbuffer *databuf)
}
if (evbuffer_add_buffer(req->evcon->output_buffer, databuf) < 0)
return (-1);
+
+ if (EVBUFFER_LENGTH(req->evcon->output_buffer) >=
+ req->wm_output.high && req->drain_cb != NULL)
+ req->drain_cb(req, req->filldrain_arg);
+
return (evhttp_write_buffer(req->evcon, NULL, NULL));
}
@@ -2255,12 +2266,53 @@ evhttp_request_is_owned(struct evhttp_request *req)
}
void
+evhttp_request_pause(struct evhttp_request *req)
+{
+ req->flags |= EVHTTP_REQ_PAUSED;
+}
+
+void
+evhttp_request_resume(struct evhttp_request *req)
+{
+ struct evhttp_connection *evcon = req->evcon;
+
+ if (req->evcon != NULL && (req->flags & EVHTTP_REQ_PAUSED) != 0)
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
+ req->flags &= ~EVHTTP_REQ_PAUSED;
+
+}
+
+void
evhttp_request_set_chunked_cb(struct evhttp_request *req,
void (*cb)(struct evhttp_request *, void *))
{
req->chunk_cb = cb;
}
+void
+evhttp_request_set_output_watermarks(struct evhttp_request *req,
+ size_t low_watermark, void (*fill_cb)(struct evhttp_request *, void *),
+ size_t high_watermark, void (*drain_cb)(struct evhttp_request *, void *),
+ void *filldrain_arg)
+{
+ req->wm_output.low = low_watermark;
+ req->wm_output.high = high_watermark;
+ req->fill_cb = fill_cb;
+ req->drain_cb = drain_cb;
+ req->filldrain_arg = filldrain_arg;
+
+ if (req->evcon && req->evcon->output_buffer) {
+ /* See if we need to call them immediately. */
+ size_t len = EVBUFFER_LENGTH(req->evcon->output_buffer);
+
+ if (len <= low_watermark)
+ fill_cb(req, filldrain_arg);
+ if (len >= high_watermark)
+ drain_cb(req, filldrain_arg);
+ }
+}
+
+
/*
* Allows for inspection of the request URI
*/
evhttp: stream large responses without requiring chunking
From: Scott Lamb <[EMAIL PROTECTED]>
Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---
libevent/http.c | 9 +++++++--
1 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/libevent/http.c b/libevent/http.c
index d4d0ce6..07426b9 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -1719,8 +1719,13 @@ evhttp_send_reply_start(struct evhttp_request *req, int
code,
/* set up to watch for client close */
evhttp_connection_start_detectclose(req->evcon);
evhttp_response_code(req, code, reason);
- if (req->major == 1 && req->minor == 1) {
- /* use chunked encoding for HTTP/1.1 */
+ if (evhttp_find_header(req->output_headers, "Content-Length") == NULL
+ && req->major == 1 && req->minor == 1) {
+ /*
+ * prefer HTTP/1.1 chunked encoding to closing the connection;
+ * note RFC 2616 section 4.4 forbids it with Content-Length:
+ * and it's not necessary then anyway.
+ */
evhttp_add_header(req->output_headers, "Transfer-Encoding",
"chunked");
req->chunked = 1;
evhttp: return success or failure from evhttp_send_reply_chunk()
From: Scott Lamb <[EMAIL PROTECTED]>
Callers can use this error return to abort streaming response generation.
Signed-off-by: Scott Lamb <[EMAIL PROTECTED]>
---
libevent/evhttp.h | 5 ++++-
libevent/http-internal.h | 2 +-
libevent/http.c | 36 ++++++++++++++++++++++++++----------
3 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/libevent/evhttp.h b/libevent/evhttp.h
index a47a2f0..349cf95 100644
--- a/libevent/evhttp.h
+++ b/libevent/evhttp.h
@@ -141,7 +141,7 @@ void evhttp_send_reply(struct evhttp_request *req, int code,
/* Low-level response interface, for streaming/chunked replies */
void evhttp_send_reply_start(struct evhttp_request *, int, const char *);
-void evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
+int evhttp_send_reply_chunk(struct evhttp_request *, struct evbuffer *);
void evhttp_send_reply_end(struct evhttp_request *);
/**
@@ -233,6 +233,9 @@ struct evhttp_request *evhttp_request_new(
void evhttp_request_set_chunked_cb(struct evhttp_request *,
void (*cb)(struct evhttp_request *, void *));
+/** Aborts a request, killing its connection if active. */
+void evhttp_request_abort(struct evhttp_request *req);
+
/** Frees the request object and removes associated events. */
void evhttp_request_free(struct evhttp_request *req);
diff --git a/libevent/http-internal.h b/libevent/http-internal.h
index f170865..6aa405f 100644
--- a/libevent/http-internal.h
+++ b/libevent/http-internal.h
@@ -123,7 +123,7 @@ void evhttp_start_read(struct evhttp_connection *);
void evhttp_read_header(evutil_socket_t, short, void *);
void evhttp_make_header(struct evhttp_connection *, struct evhttp_request *);
-void evhttp_write_buffer(struct evhttp_connection *,
+int evhttp_write_buffer(struct evhttp_connection *,
void (*)(struct evhttp_connection *, void *), void *);
/* response sending HTML the data in the buffer */
diff --git a/libevent/http.c b/libevent/http.c
index 07426b9..52913bd 100644
--- a/libevent/http.c
+++ b/libevent/http.c
@@ -268,7 +268,7 @@ evhttp_method(enum evhttp_cmd_type type)
return (method);
}
-static void
+static int
evhttp_add_event(struct event *ev, int timeout, int default_timeout)
{
if (timeout != 0) {
@@ -276,18 +276,21 @@ evhttp_add_event(struct event *ev, int timeout, int
default_timeout)
evutil_timerclear(&tv);
tv.tv_sec = timeout != -1 ? timeout : default_timeout;
- event_add(ev, &tv);
+ return (event_add(ev, &tv));
} else {
- event_add(ev, NULL);
+ return (event_add(ev, NULL));
}
}
-void
+int
evhttp_write_buffer(struct evhttp_connection *evcon,
void (*cb)(struct evhttp_connection *, void *), void *arg)
{
event_debug(("%s: preparing to write buffer\n", __func__));
+ if (evcon->fd == -1)
+ return (-1);
+
/* Set call back */
evcon->cb = cb;
evcon->cb_arg = arg;
@@ -298,7 +301,12 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
- evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
+
+ if (evhttp_add_event(&evcon->ev, evcon->timeout,
+ HTTP_WRITE_TIMEOUT) < 0)
+ return (-1);
+
+ return (0);
}
/*
@@ -1734,15 +1742,20 @@ evhttp_send_reply_start(struct evhttp_request *req, int
code,
evhttp_write_buffer(req->evcon, NULL, NULL);
}
-void
+int
evhttp_send_reply_chunk(struct evhttp_request *req, struct evbuffer *databuf)
{
+ if (req->evcon->state == EVCON_DISCONNECTED)
+ return (-1);
+
if (req->chunked) {
- evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n",
- (unsigned)EVBUFFER_LENGTH(databuf));
+ if (evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n",
+ (unsigned)EVBUFFER_LENGTH(databuf)) < 0)
+ return (-1);
}
- evbuffer_add_buffer(req->evcon->output_buffer, databuf);
- evhttp_write_buffer(req->evcon, NULL, NULL);
+ if (evbuffer_add_buffer(req->evcon->output_buffer, databuf) < 0)
+ return (-1);
+ return (evhttp_write_buffer(req->evcon, NULL, NULL));
}
void
@@ -2204,6 +2217,9 @@ evhttp_request_new(void (*cb)(struct evhttp_request *,
void *), void *arg)
void
evhttp_request_free(struct evhttp_request *req)
{
+ if (req == NULL)
+ return;
+
if (req->remote_host != NULL)
event_free(req->remote_host);
if (req->uri != NULL)
_______________________________________________
Libevent-users mailing list
Libevent-users@monkey.org
http://monkeymail.org/mailman/listinfo/libevent-users